From d55c4a053b837c653292eb6c118b7b2dc3df1139 Mon Sep 17 00:00:00 2001 From: William King Date: Tue, 7 Apr 2015 16:11:34 -0700 Subject: [PATCH] FS-7060 expanded configuration for amqp command configuration --- conf/vanilla/autoload_configs/amqp.conf.xml | 4 +- src/mod/event_handlers/mod_amqp/mod_amqp.h | 7 --- .../mod_amqp/mod_amqp_command.c | 57 +++++++++++++++++++ .../mod_amqp/mod_amqp_producer.c | 2 + 4 files changed, 61 insertions(+), 9 deletions(-) diff --git a/conf/vanilla/autoload_configs/amqp.conf.xml b/conf/vanilla/autoload_configs/amqp.conf.xml index 5ae9dc944a..06bbb0d395 100644 --- a/conf/vanilla/autoload_configs/amqp.conf.xml +++ b/conf/vanilla/autoload_configs/amqp.conf.xml @@ -48,8 +48,8 @@ - - + + diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp.h b/src/mod/event_handlers/mod_amqp/mod_amqp.h index 808415e7ad..07528b234e 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp.h +++ b/src/mod/event_handlers/mod_amqp/mod_amqp.h @@ -114,15 +114,8 @@ typedef struct { char *name; char *exchange; - char *exchange_type; char *binding_key; - /* Array to store the possible event subscriptions */ - char *event_filter; - unsigned int number_of_event_filters; - switch_event_node_t *event_nodes[SWITCH_EVENT_ALL]; - switch_event_types_t event_ids[SWITCH_EVENT_ALL]; - /* Note: The AMQP channel is not reentrant this MUTEX serializes sending events. */ mod_amqp_connection_t *conn_root; mod_amqp_connection_t *conn_active; diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_command.c b/src/mod/event_handlers/mod_amqp/mod_amqp_command.c index 235983bae0..48eec1da1e 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_command.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_command.c @@ -83,8 +83,10 @@ switch_status_t mod_amqp_command_destroy(mod_amqp_command_profile_t **prof) switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg) { mod_amqp_command_profile_t *profile = NULL; + switch_xml_t params, param, connections, connection; switch_threadattr_t *thd_attr = NULL; switch_memory_pool_t *pool; + char *exchange = NULL; if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { goto err; @@ -95,6 +97,61 @@ switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg) profile->pool = pool; profile->name = switch_core_strdup(profile->pool, name); profile->running = 1; + profile->reconnect_interval_ms = 1000; + + if ((params = switch_xml_child(cfg, "params")) != NULL) { + for (param = switch_xml_child(params, "param"); param; param = param->next) { + char *var = (char *) switch_xml_attr_soft(param, "name"); + char *val = (char *) switch_xml_attr_soft(param, "value"); + + if (!var) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param missing 'name' attribute\n", profile->name); + continue; + } + + if (!val) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param[%s] missing 'value' attribute\n", profile->name, var); + continue; + } + + if (!strncmp(var, "reconnect_interval_ms", 21)) { + int interval = atoi(val); + if ( interval && interval > 0 ) { + profile->reconnect_interval_ms = interval; + } + } else if (!strncmp(var, "exchange", 8)) { + exchange = switch_core_strdup(profile->pool, "TAP.Commands"); + } + } + } + + /* Handle defaults of string types */ + profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Commands"); + + if ((connections = switch_xml_child(cfg, "connections")) != NULL) { + for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) { + if ( ! profile->conn_root ) { /* Handle first root node */ + if (mod_amqp_connection_create(&(profile->conn_root), connection, profile->pool) != SWITCH_STATUS_SUCCESS) { + /* Handle connection create failure */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name); + continue; + } + profile->conn_active = profile->conn_root; + } else { + if (mod_amqp_connection_create(&(profile->conn_active->next), connection, profile->pool) != SWITCH_STATUS_SUCCESS) { + /* Handle connection create failure */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name); + continue; + } + profile->conn_active = profile->conn_active->next; + } + } + } + profile->conn_active = NULL; + + if ( mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] was unable to connect to any connection\n", profile->name); + } /* Start the worker threads */ switch_threadattr_create(&thd_attr, profile->pool); diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c index 136bbf4b48..067bb57d7b 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c @@ -221,6 +221,8 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg) if ( interval && interval > 0 ) { profile->send_queue_size = interval; } + } else if (!strncmp(var, "exchange", 8)) { + exchange = switch_core_strdup(profile->pool, "TAP.Events"); } else if (!strncmp(var, "format_fields", 13)) { int size = 0; if ((size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {