From b5301688d7605dc4f9417433af4c6eaf1e04efe0 Mon Sep 17 00:00:00 2001 From: William King Date: Sat, 10 Oct 2015 16:39:53 -0700 Subject: [PATCH] FS-8306 Now command queues can specify the queue to subscribe to. This enables very interesting use cases that would involve single job queue, and multiple consumers. --- src/mod/event_handlers/mod_amqp/mod_amqp.h | 1 + src/mod/event_handlers/mod_amqp/mod_amqp_command.c | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp.h b/src/mod/event_handlers/mod_amqp/mod_amqp.h index 56e7e6372d..f82a8c5a9c 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp.h +++ b/src/mod/event_handlers/mod_amqp/mod_amqp.h @@ -128,6 +128,7 @@ typedef struct { char *name; char *exchange; + char *queue; char *binding_key; /* Note: The AMQP channel is not reentrant this MUTEX serializes sending events. */ diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_command.c b/src/mod/event_handlers/mod_amqp/mod_amqp_command.c index bb7ce5451e..ba12ffcba6 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_command.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_command.c @@ -86,7 +86,7 @@ switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg) switch_xml_t params, param, connections, connection; switch_threadattr_t *thd_attr = NULL; switch_memory_pool_t *pool; - char *exchange = NULL, *binding_key = NULL; + char *exchange = NULL, *binding_key = NULL, *queue = NULL; if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { goto err; @@ -121,6 +121,8 @@ switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg) } } else if (!strncmp(var, "exchange-name", 13)) { exchange = switch_core_strdup(profile->pool, val); + } else if (!strncmp(var, "queue-name", 10)) { + queue = switch_core_strdup(profile->pool, val); } else if (!strncmp(var, "binding_key", 11)) { binding_key = switch_core_strdup(profile->pool, val); } @@ -129,6 +131,7 @@ switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg) /* Handle defaults of string types */ profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Commands"); + profile->queue = queue ? queue : NULL; profile->binding_key = binding_key ? binding_key : switch_core_strdup(profile->pool, "commandBindingKey"); if ((connections = switch_xml_child(cfg, "connections")) != NULL) { @@ -214,7 +217,7 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Creating command queue"); recv_queue = amqp_queue_declare(profile->conn_active->state, // state 1, // channel - amqp_empty_bytes, // queue name + profile->queue ? amqp_cstring_bytes(profile->queue) : amqp_empty_bytes, // queue name 0, 0, // passive, durable 0, 1, // exclusive, auto-delete amqp_empty_table); // args