From 3f6b5c3bc98900edccc21a7889bf457675f4b8fb Mon Sep 17 00:00:00 2001 From: alexey-khabulyak Date: Wed, 15 Jun 2022 18:19:26 +0300 Subject: [PATCH 1/8] [mod_amqp] Added xml handler via amqp --- conf/vanilla/autoload_configs/amqp.conf.xml | 25 + src/mod/event_handlers/mod_amqp/Makefile.am | 2 +- src/mod/event_handlers/mod_amqp/mod_amqp.c | 9 +- src/mod/event_handlers/mod_amqp/mod_amqp.h | 57 ++ .../mod_amqp/mod_amqp_connection.c | 314 ++++++--- .../event_handlers/mod_amqp/mod_amqp_utils.c | 30 + .../mod_amqp/mod_amqp_xml_handler.c | 606 ++++++++++++++++++ 7 files changed, 961 insertions(+), 82 deletions(-) create mode 100644 src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c diff --git a/conf/vanilla/autoload_configs/amqp.conf.xml b/conf/vanilla/autoload_configs/amqp.conf.xml index 3db3c32232..3ee2345816 100644 --- a/conf/vanilla/autoload_configs/amqp.conf.xml +++ b/conf/vanilla/autoload_configs/amqp.conf.xml @@ -84,4 +84,29 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/mod/event_handlers/mod_amqp/Makefile.am b/src/mod/event_handlers/mod_amqp/Makefile.am index 7bb93927b8..c2bfb6a01d 100644 --- a/src/mod/event_handlers/mod_amqp/Makefile.am +++ b/src/mod/event_handlers/mod_amqp/Makefile.am @@ -4,7 +4,7 @@ MODNAME=mod_amqp if HAVE_AMQP mod_LTLIBRARIES = mod_amqp.la -mod_amqp_la_SOURCES = mod_amqp_utils.c mod_amqp_connection.c mod_amqp_producer.c mod_amqp_command.c mod_amqp_logging.c mod_amqp.c +mod_amqp_la_SOURCES = mod_amqp_utils.c mod_amqp_connection.c mod_amqp_producer.c mod_amqp_command.c mod_amqp_logging.c mod_amqp_xml_handler.c mod_amqp.c mod_amqp_la_CFLAGS = $(AM_CFLAGS) $(AMQP_CFLAGS) mod_amqp_la_LIBADD = $(switch_builddir)/libfreeswitch.la mod_amqp_la_LDFLAGS = -avoid-version -module -no-undefined -shared $(AMQP_LIBS) $(SWITCH_AM_LDFLAGS) diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp.c b/src/mod/event_handlers/mod_amqp/mod_amqp.c index cd8dcf849b..a43e4c0d3e 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp.c @@ -65,6 +65,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_amqp_load) switch_core_hash_init(&(mod_amqp_globals.producer_hash)); switch_core_hash_init(&(mod_amqp_globals.command_hash)); switch_core_hash_init(&(mod_amqp_globals.logging_hash)); + switch_core_hash_init(&(mod_amqp_globals.xml_handler_hash)); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_apqp loading: Version %s\n", switch_version_full()); @@ -90,6 +91,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_amqp_shutdown) mod_amqp_producer_profile_t *producer; mod_amqp_command_profile_t *command; mod_amqp_logging_profile_t *logging; + mod_amqp_xml_handler_profile_t *xml_handler; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Mod starting shutting down\n"); switch_event_unbind_callback(mod_amqp_producer_event_handler); @@ -110,10 +112,15 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_amqp_shutdown) mod_amqp_logging_destroy(&logging); } + while ((hi = switch_core_hash_first_iter(mod_amqp_globals.xml_handler_hash, hi))) { + switch_core_hash_this(hi, NULL, NULL, (void **)&xml_handler); + mod_amqp_xml_handler_destroy(&xml_handler); + } + switch_core_hash_destroy(&(mod_amqp_globals.producer_hash)); switch_core_hash_destroy(&(mod_amqp_globals.command_hash)); switch_core_hash_destroy(&(mod_amqp_globals.logging_hash)); - + switch_core_hash_destroy(&(mod_amqp_globals.xml_handler_hash)); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Mod finished shutting down\n"); return SWITCH_STATUS_SUCCESS; } diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp.h b/src/mod/event_handlers/mod_amqp/mod_amqp.h index 0717876040..4e3432ab45 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp.h +++ b/src/mod/event_handlers/mod_amqp/mod_amqp.h @@ -57,6 +57,8 @@ #define MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS 5 #define MAX_AMQP_ROUTING_KEY_LENGTH 255 +#define MAX_TEMP_CONNECTIONS 30 + #define TIME_STATS_TO_AGGREGATE 1024 #define MOD_AMQP_DEBUG_TIMING 0 #define MOD_AMQP_DEFAULT_CONTENT_TYPE "text/json" @@ -67,6 +69,17 @@ typedef struct { char *pjson; } mod_amqp_message_t; +typedef struct mod_amqp_aux_connection_s { + char uuid[SWITCH_UUID_FORMATTED_LENGTH + 1]; + amqp_bytes_t queueName; + unsigned int locked; + amqp_boolean_t ssl_on; + amqp_boolean_t ssl_verify_peer; + amqp_connection_state_t state; + + struct mod_amqp_aux_connection_s *next; +} mod_amqp_aux_connection_t; + typedef struct mod_amqp_connection_s { char *name; char *hostname; @@ -185,12 +198,48 @@ typedef struct { switch_memory_pool_t *pool; } mod_amqp_logging_profile_t; +typedef struct { + char *name; + + char *exchange; + char *exchange_type; + char *reply_exchange; + char *reply_exchange_type; + char *bindings; + int exchange_durable; + int exchange_auto_delete; + int active_channels; + int max_temp_conn; + mod_amqp_keypart_t format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1]; + switch_bool_t enable_fallback_format_fields; + /* Note: The AMQP channel is not reentrant this MUTEX serializes sending events. */ + mod_amqp_connection_t *conn_root; + mod_amqp_connection_t *conn_active; + mod_amqp_aux_connection_t *conn_aux; + + int reconnect_interval_ms; + + int circuit_breaker_ms; + switch_time_t circuit_breaker_reset_time; + + /* xml_handler thread */ + switch_thread_t *xml_handler_thread; + switch_queue_t *send_queue; + unsigned int send_queue_size; + + switch_mutex_t *mutex; + switch_bool_t running; + char *custom_attr; + switch_memory_pool_t *pool; +} mod_amqp_xml_handler_profile_t; + typedef struct mod_amqp_globals_s { switch_memory_pool_t *pool; switch_hash_t *producer_hash; switch_hash_t *command_hash; switch_hash_t *logging_hash; + switch_hash_t *xml_handler_hash; } mod_amqp_globals_t; extern mod_amqp_globals_t mod_amqp_globals; @@ -204,8 +253,10 @@ void mod_amqp_util_msg_destroy(mod_amqp_message_t **msg); /* connection */ switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_xml_t cfg, switch_memory_pool_t *pool); void mod_amqp_connection_destroy(mod_amqp_connection_t **conn); +void mod_amqp_aux_connection_destroy(mod_amqp_aux_connection_t **conn); void mod_amqp_connection_close(mod_amqp_connection_t *connection); switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod_amqp_connection_t **active, char *profile_name, char *custom_attr); +switch_status_t mod_amqp_aux_connection_open(mod_amqp_connection_t *connections, mod_amqp_aux_connection_t **active, char *profile_name, char *custom_attr, char *reply_exchange); /* command */ switch_status_t mod_amqp_command_destroy(mod_amqp_command_profile_t **profile); @@ -228,5 +279,11 @@ switch_status_t mod_amqp_logging_create(char *name, switch_xml_t cfg); switch_status_t mod_amqp_logging_destroy(mod_amqp_logging_profile_t **prof); void * SWITCH_THREAD_FUNC mod_amqp_logging_thread(switch_thread_t *thread, void *data); +/* xml_handler */ +switch_status_t mod_amqp_xml_handler_create(char *name, switch_xml_t cfg); +switch_status_t mod_amqp_xml_handler_destroy(mod_amqp_xml_handler_profile_t **prof); +void * SWITCH_THREAD_FUNC mod_amqp_xml_handler_thread(switch_thread_t *thread, void *data); +switch_status_t mod_amqp_xml_handler_routing_key(mod_amqp_xml_handler_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], + switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[]); #endif /* MOD_AMQP_H */ diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c b/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c index 91feec93d5..31ecbc2eee 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c @@ -1,40 +1,40 @@ /* -* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application -* Copyright (C) 2005-2012, Anthony Minessale II -* -* Version: MPL 1.1 -* -* The contents of this file are subject to the Mozilla Public License Version -* 1.1 (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* http://www.mozilla.org/MPL/ -* -* Software distributed under the License is distributed on an "AS IS" basis, -* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License -* for the specific language governing rights and limitations under the -* License. -* -* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application -* -* The Initial Developer of the Original Code is -* Anthony Minessale II -* Portions created by the Initial Developer are Copyright (C) -* the Initial Developer. All Rights Reserved. -* -* Based on mod_skel by -* Anthony Minessale II -* -* Contributor(s): -* -* Daniel Bryars -* Tim Brown -* Anthony Minessale II -* William King -* Mike Jerris -* -* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker -* -*/ + * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * Copyright (C) 2005-2012, Anthony Minessale II + * + * Version: MPL 1.1 + * + * The contents of this file are subject to the Mozilla Public License Version + * 1.1 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + * for the specific language governing rights and limitations under the + * License. + * + * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * + * The Initial Developer of the Original Code is + * Anthony Minessale II + * Portions created by the Initial Developer are Copyright (C) + * the Initial Developer. All Rights Reserved. + * + * Based on mod_skel by + * Anthony Minessale II + * + * Contributor(s): + * + * Daniel Bryars + * Tim Brown + * Anthony Minessale II + * William King + * Mike Jerris + * + * mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker + * + */ #include "mod_amqp.h" @@ -56,7 +56,31 @@ void mod_amqp_connection_close(mod_amqp_connection_t *connection) } } -switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod_amqp_connection_t **active, char *profile_name, char *custom_attr) +void mod_amqp_aux_connection_close(mod_amqp_aux_connection_t *connection) +{ + amqp_connection_state_t old_state = connection->state; + int status = 0; + + connection->state = NULL; + + if (connection->queueName.bytes) { + amqp_bytes_free(connection->queueName); + connection->queueName.bytes = NULL; + } + + if (old_state != NULL) { + mod_amqp_log_if_amqp_error(amqp_channel_close(old_state, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + mod_amqp_log_if_amqp_error(amqp_connection_close(old_state, AMQP_REPLY_SUCCESS), "Closing connection"); + + if ((status = amqp_destroy_connection(old_state))) { + const char *errstr = amqp_error_string2(-status); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error destroying amqp connection: %s\n", errstr); + } + } +} + +switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod_amqp_connection_t **active, + char *profile_name, char *custom_attr) { int channel_max = 0; int frame_max = 131072; @@ -72,14 +96,12 @@ switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod amqp_connection_state_t newConnection = amqp_new_connection(); amqp_connection_state_t oldConnection = NULL; - if (active && *active) { - oldConnection = (*active)->state; - } + if (active && *active) { oldConnection = (*active)->state; } /* Set up meta data for connection */ bHasHostname = gethostname(hostname, sizeof(hostname)) == 0; - loginProperties.num_entries = sizeof(loginTableEntries)/sizeof(*loginTableEntries); + loginProperties.num_entries = sizeof(loginTableEntries) / sizeof(*loginTableEntries); loginProperties.entries = loginTableEntries; snprintf(key_string, 256, "x_%s_HostMachineName", custom_attr); @@ -115,7 +137,7 @@ switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod connection_attempt = connections; amqp_status = -1; - while (connection_attempt && amqp_status){ + while (connection_attempt && amqp_status) { if (connection_attempt->ssl_on == 1) { amqp_set_initialize_ssl_library(connection_attempt->ssl_on); if (!(socket = amqp_ssl_socket_new(newConnection))) { @@ -127,35 +149,31 @@ switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Profile[%s] trying to connect to AMQP broker %s:%d\n", profile_name, connection_attempt->hostname, connection_attempt->port); - if ((amqp_status = amqp_socket_open(socket, connection_attempt->hostname, connection_attempt->port))){ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Could not open socket connection to AMQP broker %s:%d status(%d) %s\n", - connection_attempt->hostname, connection_attempt->port, amqp_status, amqp_error_string2(amqp_status)); + if ((amqp_status = amqp_socket_open(socket, connection_attempt->hostname, connection_attempt->port))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, + "Could not open socket connection to AMQP broker %s:%d status(%d) %s\n", + connection_attempt->hostname, connection_attempt->port, amqp_status, + amqp_error_string2(amqp_status)); connection_attempt = connection_attempt->next; } } - if (active) { - *active = connection_attempt; - } + if (active) { *active = connection_attempt; } if (!connection_attempt) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] could not connect to any AMQP brokers\n", profile_name); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] could not connect to any AMQP brokers\n", + profile_name); goto err; } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Profile[%s] opened socket connection to AMQP broker %s:%d\n", - profile_name, connection_attempt->hostname, connection_attempt->port); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, + "Profile[%s] opened socket connection to AMQP broker %s:%d\n", profile_name, + connection_attempt->hostname, connection_attempt->port); /* We have a connection, now log in */ - status = amqp_login_with_properties(newConnection, - connection_attempt->virtualhost, - channel_max, - frame_max, - connection_attempt->heartbeat, - &loginProperties, - AMQP_SASL_METHOD_PLAIN, - connection_attempt->username, - connection_attempt->password); + status = amqp_login_with_properties(newConnection, connection_attempt->virtualhost, channel_max, frame_max, + connection_attempt->heartbeat, &loginProperties, AMQP_SASL_METHOD_PLAIN, + connection_attempt->username, connection_attempt->password); if (mod_amqp_log_if_amqp_error(status, "Logging in")) { if (active) { @@ -175,35 +193,165 @@ switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod goto err; } - if (active) { - (*active)->state = newConnection; + if (active) { (*active)->state = newConnection; } + + if (oldConnection) { amqp_destroy_connection(oldConnection); } + + return SWITCH_STATUS_SUCCESS; + +err: + if (newConnection) { amqp_destroy_connection(newConnection); } + return SWITCH_STATUS_GENERR; +} + +switch_status_t mod_amqp_aux_connection_open(mod_amqp_connection_t *base_conn, mod_amqp_aux_connection_t **aux_conn, + char *profile_name, char *custom_attr, char *reply_exchange) +{ + int channel_max = 0; + int frame_max = 131072; + amqp_table_t loginProperties; + amqp_table_entry_t loginTableEntries[5]; + char hostname[64]; + int bHasHostname; + char key_string[256] = {0}; + amqp_rpc_reply_t status; + amqp_socket_t *socket = NULL; + int amqp_status = -1; + amqp_queue_declare_ok_t *recv_queue; + amqp_bytes_t queueName = {0, NULL}; + switch_uuid_t uuid; + + amqp_connection_state_t newConnection = amqp_new_connection(); + + /* Set up meta data for connection */ + bHasHostname = gethostname(hostname, sizeof(hostname)) == 0; + + loginProperties.num_entries = sizeof(loginTableEntries) / sizeof(*loginTableEntries); + loginProperties.entries = loginTableEntries; + + snprintf(key_string, 256, "x_%s_HostMachineName", custom_attr); + loginTableEntries[0].key = amqp_cstring_bytes(key_string); + loginTableEntries[0].value.kind = AMQP_FIELD_KIND_BYTES; + loginTableEntries[0].value.value.bytes = amqp_cstring_bytes(bHasHostname ? hostname : "(unknown)"); + + snprintf(key_string, 256, "x_%s_ProcessDescription", custom_attr); + loginTableEntries[1].key = amqp_cstring_bytes(key_string); + loginTableEntries[1].value.kind = AMQP_FIELD_KIND_BYTES; + loginTableEntries[1].value.value.bytes = amqp_cstring_bytes("FreeSwitch"); + + snprintf(key_string, 256, "x_%s_ProcessType", custom_attr); + loginTableEntries[2].key = amqp_cstring_bytes(key_string); + loginTableEntries[2].value.kind = AMQP_FIELD_KIND_BYTES; + loginTableEntries[2].value.value.bytes = amqp_cstring_bytes("TAP"); + + snprintf(key_string, 256, "x_%s_ProcessBuildVersion", custom_attr); + loginTableEntries[3].key = amqp_cstring_bytes(key_string); + loginTableEntries[3].value.kind = AMQP_FIELD_KIND_BYTES; + loginTableEntries[3].value.value.bytes = amqp_cstring_bytes(switch_version_full()); + + snprintf(key_string, 256, "x_%s_Liquid_ProcessBuildBornOn", custom_attr); + loginTableEntries[4].key = amqp_cstring_bytes(key_string); + loginTableEntries[4].value.kind = AMQP_FIELD_KIND_BYTES; + loginTableEntries[4].value.value.bytes = amqp_cstring_bytes(__DATE__ " " __TIME__); + + + + if (base_conn->ssl_on == 1) { + amqp_set_initialize_ssl_library(base_conn->ssl_on); + if (!(socket = amqp_ssl_socket_new(newConnection))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not create SSL socket\n"); + goto err; + } + amqp_ssl_socket_set_verify_peer(socket, base_conn->ssl_verify_peer); + } else { + if (!(socket = amqp_tcp_socket_new(newConnection))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not create TCP socket\n"); + goto err; + } } - if (oldConnection) { - amqp_destroy_connection(oldConnection); + amqp_status = -1; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying to add additional connect to AMQP %s:%d\n", + base_conn->hostname, base_conn->port); + + if ((amqp_status = amqp_socket_open(socket, base_conn->hostname, base_conn->port))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, + "Could not open socket connection to AMQP broker %s:%d status(%d) %s\n", base_conn->hostname, + base_conn->port, amqp_status, amqp_error_string2(amqp_status)); + goto err; + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, + "Profile[%s] opened socket connection to AMQP broker %s:%d\n", profile_name, base_conn->hostname, + base_conn->port); + + /* We have a connection, now log in */ + status = + amqp_login_with_properties(newConnection, base_conn->virtualhost, channel_max, frame_max, base_conn->heartbeat, + &loginProperties, AMQP_SASL_METHOD_PLAIN, base_conn->username, base_conn->password); + + if (mod_amqp_log_if_amqp_error(status, "Logging in")) { + if (aux_conn) { + mod_amqp_aux_connection_close(*aux_conn); + *aux_conn = NULL; + } + goto err; + } + + // Open a channel (1). This is fairly standard + amqp_channel_open(newConnection, 1); + if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(newConnection), "Opening channel")) { + if (aux_conn) { + mod_amqp_aux_connection_close(*aux_conn); + *aux_conn = NULL; + } + goto err; + } + + if (aux_conn) { + (*aux_conn)->state = newConnection; + (*aux_conn)->locked = 0; + switch_uuid_get(&uuid); + switch_uuid_format((*aux_conn)->uuid, &uuid); + + recv_queue = amqp_queue_declare(newConnection, // state + 1, // channel + amqp_cstring_bytes((*aux_conn)->uuid), // queue name + 0, /* passive */ + 0, /* durable */ + 1, /* exclusive */ + 1, /* auto-delete */ + amqp_empty_table); // args + queueName = amqp_bytes_malloc_dup(recv_queue->queue); + (*aux_conn)->queueName = queueName; + amqp_queue_bind(newConnection, // state + 1, // channel + queueName, // queue + amqp_cstring_bytes(reply_exchange), // exchange + amqp_cstring_bytes((*aux_conn)->uuid), // routing key + amqp_empty_table); // args } return SWITCH_STATUS_SUCCESS; err: - if (newConnection) { - amqp_destroy_connection(newConnection); - } - return SWITCH_STATUS_GENERR; + if (newConnection) { amqp_destroy_connection(newConnection); } + return SWITCH_STATUS_GENERR; } switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_xml_t cfg, switch_memory_pool_t *pool) { mod_amqp_connection_t *new_con = switch_core_alloc(pool, sizeof(mod_amqp_connection_t)); switch_xml_t param; - char *name = (char *) switch_xml_attr_soft(cfg, "name"); + char *name = (char *)switch_xml_attr_soft(cfg, "name"); char *hostname = NULL, *virtualhost = NULL, *username = NULL, *password = NULL; unsigned int port = 0, heartbeat = 0; amqp_boolean_t ssl_on = 0; amqp_boolean_t ssl_verify_peer = 1; if (zstr(name)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection missing name attribute\n%s\n", switch_xml_toxml(cfg, 1)); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection missing name attribute\n%s\n", + switch_xml_toxml(cfg, 1)); return SWITCH_STATUS_GENERR; } @@ -212,16 +360,18 @@ switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_ new_con->next = NULL; for (param = switch_xml_child(cfg, "param"); param; param = param->next) { - char *var = (char *) switch_xml_attr_soft(param, "name"); - char *val = (char *) switch_xml_attr_soft(param, "value"); + char *var = (char *)switch_xml_attr_soft(param, "name"); + char *val = (char *)switch_xml_attr_soft(param, "value"); if (!var) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "AMQP connection[%s] param missing 'name' attribute\n", name); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, + "AMQP connection[%s] param missing 'name' attribute\n", name); continue; } if (!val) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "AMQP connection[%s] param[%s] missing 'value' attribute\n", name, var); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, + "AMQP connection[%s] param[%s] missing 'value' attribute\n", name, var); continue; } @@ -235,14 +385,10 @@ switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_ password = switch_core_strdup(pool, val); } else if (!strncmp(var, "port", 4)) { int interval = atoi(val); - if (interval && interval > 0) { - port = interval; - } + if (interval && interval > 0) { port = interval; } } else if (!strncmp(var, "heartbeat", 9)) { int interval = atoi(val); - if (interval && interval > 0) { - heartbeat = interval; - } + if (interval && interval > 0) { heartbeat = interval; } } else if (!strncmp(var, "ssl_on", 3) && switch_true(val) == SWITCH_TRUE) { ssl_on = 1; } else if (!strncmp(var, "ssl_verify_peer", 15) && switch_true(val) == SWITCH_FALSE) { @@ -271,6 +417,14 @@ void mod_amqp_connection_destroy(mod_amqp_connection_t **conn) } } +void mod_amqp_aux_connection_destroy(mod_amqp_aux_connection_t **conn) +{ + if (conn && *conn) { + mod_amqp_aux_connection_close(*conn); + *conn = NULL; + } +} + /* For Emacs: * Local Variables: * mode:c diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c b/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c index 03a0f262c0..7d2d5873fd 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c @@ -102,6 +102,7 @@ switch_status_t mod_amqp_do_config(switch_bool_t reload) mod_amqp_producer_profile_t *producer; mod_amqp_command_profile_t *command; mod_amqp_logging_profile_t *logging; + mod_amqp_xml_handler_profile_t *xml_handler; switch_event_unbind_callback(mod_amqp_producer_event_handler); @@ -120,6 +121,11 @@ switch_status_t mod_amqp_do_config(switch_bool_t reload) switch_core_hash_this(hi, NULL, NULL, (void **)&logging); mod_amqp_logging_destroy(&logging); } + + while ((hi = switch_core_hash_first_iter(mod_amqp_globals.xml_handler_hash, hi))) { + switch_core_hash_this(hi, NULL, NULL, (void **)&xml_handler); + mod_amqp_xml_handler_destroy(&xml_handler); + } } if ((profiles = switch_xml_child(cfg, "producers"))) { @@ -193,6 +199,30 @@ switch_status_t mod_amqp_do_config(switch_bool_t reload) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unable to locate logging section for mod_amqp\n" ); } + if ((profiles = switch_xml_child(cfg, "xml_handler"))) { + if ((profile = switch_xml_child(profiles, "profile"))) { + for (; profile; profile = profile->next) { + char *name = (char *) switch_xml_attr_soft(profile, "name"); + + if (zstr(name)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile. Check configs missing name attr\n"); + continue; + } + name = switch_core_strdup(mod_amqp_globals.pool, name); + + if ( mod_amqp_xml_handler_create(name, profile) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile [%s]. Check configs\n", name); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Loaded mod_amqp profile [%s] successfully\n", name); + } + } + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unable to locate a profile for mod_amqp\n" ); + } + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unable to locate xml handler section for mod_amqp\n" ); + } + switch_xml_free(xml); return SWITCH_STATUS_SUCCESS; } diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c b/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c new file mode 100644 index 0000000000..b115c4736f --- /dev/null +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c @@ -0,0 +1,606 @@ +/* + * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * Copyright (C) 2005-2012, Anthony Minessale II + * + * Version: MPL 1.1 + * + * The contents of this file are subject to the Mozilla Public License Version + * 1.1 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + * for the specific language governing rights and limitations under the + * License. + * + * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * + * The Initial Developer of the Original Code is + * Anthony Minessale II + * Portions created by the Initial Developer are Copyright (C) + * the Initial Developer. All Rights Reserved. + * + * Based on mod_skel by + * Anthony Minessale II + * + * Contributor(s): + * + * Daniel Bryars + * Tim Brown + * Anthony Minessale II + * William King + * Mike Jerris + * + * mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker + * + */ + +#include "mod_amqp.h" + +switch_status_t mod_amqp_xml_handler_routing_key(mod_amqp_xml_handler_profile_t *profile, + char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], switch_event_t *evt, + mod_amqp_keypart_t routingKeyEventHeaderNames[]) +{ + int i = 0, idx = 0, x = 0; + char keybuffer[MAX_AMQP_ROUTING_KEY_LENGTH]; + + for (i = 0; i < MAX_ROUTING_KEY_FORMAT_FIELDS && idx < MAX_AMQP_ROUTING_KEY_LENGTH; i++) { + if (routingKeyEventHeaderNames[i].size) { + if (idx) { routingKey[idx++] = '.'; } + for (x = 0; x < routingKeyEventHeaderNames[i].size; x++) { + if (routingKeyEventHeaderNames[i].name[x][0] == '#') { + strncpy(routingKey + idx, routingKeyEventHeaderNames[i].name[x] + 1, + MAX_AMQP_ROUTING_KEY_LENGTH - idx); + break; + } else { + char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i].name[x]); + if (value) { + amqp_util_encode(value, keybuffer); + strncpy(routingKey + idx, keybuffer, MAX_AMQP_ROUTING_KEY_LENGTH - idx); + break; + } + } + } + idx += strlen(routingKey + idx); + } + } + return SWITCH_STATUS_SUCCESS; +} + +static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, const char *key_name, + const char *key_value, switch_event_t *params, void *user_data) +{ + switch_xml_t xml = NULL; + mod_amqp_message_t *amqp_message; + + mod_amqp_aux_connection_t *conn = NULL, *conn_next = NULL, *conn_tmp = NULL; + + switch_uuid_t uuid; + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + struct timeval timeout = {0}; + char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; + mod_amqp_xml_handler_profile_t *profile = (mod_amqp_xml_handler_profile_t *)user_data; + switch_time_t now = switch_time_now(); + switch_time_t reset_time; + switch_status_t status = SWITCH_STATUS_SUCCESS; + int i = 0; + + if (!profile) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Event without a profile %p %p\n", (void *)params, + (void *)params->event_user_data); + return xml; + } + + reset_time = profile->circuit_breaker_reset_time; + if (now < reset_time) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] circuit breaker hit[%d] (%d)\n", + profile->name, (int)now, (int)reset_time); + return xml; + } + + if (!params) { + switch_event_create(¶ms, SWITCH_EVENT_REQUEST_PARAMS); + switch_assert(params); + } + + switch_event_add_header_string(params, SWITCH_STACK_TOP, "section", switch_str_nil(section)); + switch_event_add_header_string(params, SWITCH_STACK_TOP, "tag_name", switch_str_nil(tag_name)); + switch_event_add_header_string(params, SWITCH_STACK_TOP, "key_name", switch_str_nil(key_name)); + switch_event_add_header_string(params, SWITCH_STACK_TOP, "key_value", switch_str_nil(key_value)); + + switch_uuid_get(&uuid); + switch_uuid_format(uuid_str, &uuid); + switch_event_add_header_string(params, SWITCH_STACK_TOP, "reply_key", uuid_str); + switch_event_add_header_string(params, SWITCH_STACK_TOP, "reply_exchange", profile->reply_exchange); + + switch_malloc(amqp_message, sizeof(mod_amqp_message_t)); + mod_amqp_xml_handler_routing_key(profile, amqp_message->routing_key, params, profile->format_fields); + if (profile->running && profile->conn_active) { + switch_mutex_lock(profile->mutex); + for (conn = profile->conn_aux; conn; conn = conn_next) { + if (conn->locked == 1) { + if (conn->next == NULL && i < profile->max_temp_conn) { + conn->next = switch_core_alloc(profile->pool, sizeof(mod_amqp_aux_connection_t)); + status = mod_amqp_aux_connection_open(profile->conn_active, &(conn->next), profile->name, + profile->custom_attr, profile->reply_exchange); + if (status == SWITCH_STATUS_SUCCESS) { conn->next->locked = 0; } + } + i++; + conn_next = conn->next; + continue; + } + conn_tmp = conn; + conn_tmp->locked = 1; + break; + } + + switch_mutex_unlock(profile->mutex); + + if (conn_tmp) { + amqp_maybe_release_buffers(conn_tmp->state); + switch_event_add_header_string(params, SWITCH_STACK_TOP, "reply_queue", conn_tmp->uuid); + switch_event_serialize_json(params, &amqp_message->pjson); + if (switch_queue_trypush(profile->send_queue, amqp_message) != SWITCH_STATUS_SUCCESS) { + unsigned int queue_size = switch_queue_size(profile->send_queue); + + /* Trip the circuit breaker for a short period to stop recurring error messages (time is measured in uS) + */ + profile->circuit_breaker_reset_time = now + profile->circuit_breaker_ms * 1000; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, + "AMQP message queue full. Messages will be dropped for %.1fs! (Queue capacity %d)", + profile->circuit_breaker_ms / 1000.0, queue_size); + + mod_amqp_util_msg_destroy(&amqp_message); + } + + // Start a command + amqp_basic_consume(conn_tmp->state, // state + 1, // channel + conn_tmp->queueName, // queue + amqp_empty_bytes, // command tag + 0, 1, 0, // no_local, no_ack, exclusive + amqp_empty_table); // args + + timeout.tv_usec = 5000 * 1000; + for (;;) { + char *fs_resp_id = NULL; + amqp_maybe_release_buffers_on_channel(conn_tmp->state, 1); + res = amqp_consume_message(conn_tmp->state, &envelope, &timeout, 0); + if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) { + if (res.library_error == AMQP_STATUS_UNEXPECTED_STATE) { + /* Unexpected frame. Discard it then continue */ + amqp_frame_t decoded_frame; + amqp_simple_wait_frame(conn_tmp->state, &decoded_frame); + } + + if (res.library_error == AMQP_STATUS_SOCKET_ERROR) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, + "A socket error occurred. Tearing down and reconnecting\n"); + break; + } + + if (res.library_error == AMQP_STATUS_CONNECTION_CLOSED) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, + "AMQP connection was closed. Tearing down and reconnecting\n"); + break; + } + + if (res.library_error == AMQP_STATUS_TCP_ERROR) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, + "A TCP error occurred. Tearing down and reconnecting\n"); + break; + } + + if (res.library_error == AMQP_STATUS_TIMEOUT) { break; } + + /* Try consuming again */ + continue; + } + if (res.reply_type == AMQP_RESPONSE_NORMAL) { + if (envelope.message.properties.headers.num_entries) { + int x = 0; + for (x = 0; x < envelope.message.properties.headers.num_entries; x++) { + char *header_key = (char *)envelope.message.properties.headers.entries[x].key.bytes; + char *header_value = + (char *)envelope.message.properties.headers.entries[x].value.value.bytes.bytes; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, + "AMQP message custom header key[%s] value[%s]\n", header_key, + header_value); + + if (!strncmp(header_key, "x-fs-resp-id", 12)) { + fs_resp_id = header_value; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, + "Ignoring unrecognized event header [%s]\n", header_key); + } + } + } + if (strcmp(fs_resp_id, uuid_str) == 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got my message. Trying to parse...\n"); + break; + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, + "Got wrong message, Trying the next one... %s vs %s\n", fs_resp_id, uuid_str); + continue; + } + } + + if (res.reply_type != AMQP_RESPONSE_NORMAL || + !(xml = switch_xml_parse_str_dynamic((char *)envelope.message.body.bytes, SWITCH_TRUE))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Parsing XML Result!\n"); + } else { + xml = NULL; + } + + amqp_destroy_envelope(&envelope); + } + } + if (conn_tmp) { + switch_mutex_lock(profile->mutex); + conn_tmp->locked = 0; + switch_mutex_unlock(profile->mutex); + conn_tmp = NULL; + } + + return xml; +} + +switch_status_t mod_amqp_xml_handler_destroy(mod_amqp_xml_handler_profile_t **prof) +{ + mod_amqp_message_t *msg = NULL; + switch_status_t status = SWITCH_STATUS_SUCCESS; + mod_amqp_connection_t *conn = NULL, *conn_next = NULL; + mod_amqp_aux_connection_t *conn_aux = NULL, *conn_next_aux = NULL; + switch_memory_pool_t *pool; + mod_amqp_xml_handler_profile_t *profile; + + if (!prof || !*prof) { return SWITCH_STATUS_SUCCESS; } + switch_xml_unbind_search_function_ptr(xml_amqp_fetch); + profile = *prof; + pool = profile->pool; + + if (profile->name) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Profile[%s] shutting down...\n", profile->name); + switch_core_hash_delete(mod_amqp_globals.xml_handler_hash, profile->name); + } + + profile->running = 0; + + if (profile->xml_handler_thread) { switch_thread_join(&status, profile->xml_handler_thread); } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Profile[%s] closing AMQP socket...\n", profile->name); + + for (conn = profile->conn_root; conn; conn = conn_next) { + conn_next = conn->next; + mod_amqp_connection_destroy(&conn); + } + + for (conn_aux = profile->conn_aux; conn_aux; conn_aux = conn_next_aux) { + conn_next_aux = conn_aux->next; + mod_amqp_aux_connection_destroy(&conn_aux); + } + + profile->conn_aux = NULL; + profile->conn_active = NULL; + profile->conn_root = NULL; + + while (profile->send_queue && switch_queue_trypop(profile->send_queue, (void **)&msg) == SWITCH_STATUS_SUCCESS) { + mod_amqp_util_msg_destroy(&msg); + } + + if (pool) { switch_core_destroy_memory_pool(&pool); } + + *prof = NULL; + + return SWITCH_STATUS_SUCCESS; +} + +switch_status_t mod_amqp_xml_handler_create(char *name, switch_xml_t cfg) +{ + int arg = 0, i = 0; + mod_amqp_xml_handler_profile_t *profile = NULL; + switch_xml_t params, param, connections, connection; + switch_threadattr_t *thd_attr = NULL; + char *exchange = NULL, *exchange_type = NULL; + char *reply_exchange = NULL, *reply_exchange_type = NULL; + char *bindings = NULL; + int exchange_durable = 1; /* durable */ + switch_memory_pool_t *pool; + char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS + 1]; + int format_fields_size = 0; + int max_temp_conn = 0; + + if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { goto err; } + profile = switch_core_alloc(pool, sizeof(mod_amqp_xml_handler_profile_t)); + profile->pool = pool; + profile->name = switch_core_strdup(profile->pool, name); + profile->running = 1; + memset(profile->format_fields, 0, (MAX_ROUTING_KEY_FORMAT_FIELDS + 1) * sizeof(mod_amqp_keypart_t)); + // memset(profile->temp_conn, 0, (MAX_TEMP_CONNECTIONS) * sizeof(mod_amqp_temp_conn_t)); + profile->conn_root = NULL; + profile->conn_active = NULL; + profile->send_queue_size = 5000; + profile->circuit_breaker_ms = 10000; + switch_mutex_init(&profile->mutex, SWITCH_MUTEX_NESTED, profile->pool); + if ((params = switch_xml_child(cfg, "params")) != NULL) { + for (param = switch_xml_child(params, "param"); param; param = param->next) { + char *var = (char *)switch_xml_attr_soft(param, "name"); + char *val = (char *)switch_xml_attr_soft(param, "value"); + + if (!var) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param missing 'name' attribute\n", + profile->name); + continue; + } + + if (!val) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, + "Profile[%s] param[%s] missing 'value' attribute\n", profile->name, var); + continue; + } + + if (!strncmp(var, "reconnect_interval_ms", 21)) { + int interval = atoi(val); + if (interval && interval > 0) { profile->reconnect_interval_ms = interval; } + } else if (!strncmp(var, "send_queue_size", 15)) { + int interval = atoi(val); + if (interval && interval > 0) { profile->send_queue_size = interval; } + } else if (!strncmp(var, "exchange-type", 13)) { + exchange_type = switch_core_strdup(profile->pool, val); + } else if (!strncmp(var, "exchange-name", 13)) { + exchange = switch_core_strdup(profile->pool, val); + } else if (!strncmp(var, "reply-exchange-type", 19)) { + reply_exchange_type = switch_core_strdup(profile->pool, val); + } else if (!strncmp(var, "reply-exchange-name", 19)) { + reply_exchange = switch_core_strdup(profile->pool, val); + } else if (!strncmp(var, "exchange-durable", 16)) { + exchange_durable = switch_true(val); + } else if (!strncmp(var, "xml-handler-bindings", 20)) { + bindings = switch_core_strdup(profile->pool, val); + } else if (!strncmp(var, "max-temp-conn", 13)) { + max_temp_conn = atoi(val); + } else if (!strncmp(var, "format_fields", 13)) { + char *tmp = switch_core_strdup(profile->pool, val); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp format fields : %s\n", tmp); + if ((format_fields_size = mod_amqp_count_chars(tmp, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, + "You can have only %d routing fields in the routing key.\n", + MAX_ROUTING_KEY_FORMAT_FIELDS); + goto err; + } + + /* increment size because the count returned the number of separators, not number of fields */ + format_fields_size++; + switch_separate_string(tmp, ',', format_fields, MAX_ROUTING_KEY_FORMAT_FIELDS); + format_fields[format_fields_size] = NULL; + } + } /* params for loop */ + } + /* Handle defaults of string types */ + profile->bindings = bindings ? bindings : switch_core_strdup(profile->pool, "all"); + profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.XML_handler"); + profile->exchange_type = exchange_type ? exchange_type : switch_core_strdup(profile->pool, "topic"); + profile->reply_exchange = + reply_exchange ? reply_exchange : switch_core_strdup(profile->pool, "TAP.XML_handler_reply"); + profile->reply_exchange_type = + reply_exchange_type ? reply_exchange_type : switch_core_strdup(profile->pool, "direct"); + profile->exchange_durable = exchange_durable; + profile->active_channels = 1; + if (max_temp_conn && max_temp_conn > 0 && max_temp_conn < 1000) { + profile->max_temp_conn = max_temp_conn; + } else { + profile->max_temp_conn = MAX_TEMP_CONNECTIONS; + } + + for (i = 0; i < format_fields_size; i++) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : %s\n", i, format_fields[i]); + if (profile->enable_fallback_format_fields) { + profile->format_fields[i].size = switch_separate_string( + format_fields[i], '|', profile->format_fields[i].name, MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS); + if (profile->format_fields[i].size > 1) { + for (arg = 0; arg < profile->format_fields[i].size; arg++) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : sub key %d : %s\n", i, + arg, profile->format_fields[i].name[arg]); + } + } + } else { + profile->format_fields[i].name[0] = format_fields[i]; + profile->format_fields[i].size = 1; + } + } + + if ((connections = switch_xml_child(cfg, "connections")) != NULL) { + for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) { + if (!profile->conn_root) { /* Handle first root node */ + if (mod_amqp_connection_create(&(profile->conn_root), connection, profile->pool) != + SWITCH_STATUS_SUCCESS) { + /* Handle connection create failure */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, + "Profile[%s] failed to create connection\n", profile->name); + continue; + } + profile->conn_active = profile->conn_root; + } else { + if (mod_amqp_connection_create(&(profile->conn_active->next), connection, profile->pool) != + SWITCH_STATUS_SUCCESS) { + /* Handle connection create failure */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, + "Profile[%s] failed to create connection\n", profile->name); + continue; + } + profile->conn_active = profile->conn_active->next; + } + } + } + profile->conn_active = NULL; + /* We are not going to open the xml_handler queue connection on create, but instead wait for the running thread to + * open it */ + /* Create a bounded FIFO queue for sending messages */ + if (switch_queue_create(&(profile->send_queue), profile->send_queue_size, profile->pool) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create send queue of size %d!\n", + profile->send_queue_size); + goto err; + } + /* Start the event send thread. This will set up the initial connection */ + switch_threadattr_create(&thd_attr, profile->pool); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + if (switch_thread_create(&profile->xml_handler_thread, thd_attr, mod_amqp_xml_handler_thread, profile, + profile->pool)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create 'amqp event sender' thread!\n"); + goto err; + } + if (switch_core_hash_insert(mod_amqp_globals.xml_handler_hash, name, (void *)profile) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, + "Failed to insert new profile [%s] into mod_amqp profile hash\n", name); + goto err; + } + + switch_xml_bind_search_function(xml_amqp_fetch, switch_xml_parse_section_string(profile->bindings), profile); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Profile[%s] Successfully started\n", profile->name); + return SWITCH_STATUS_SUCCESS; + +err: + /* Cleanup */ + mod_amqp_xml_handler_destroy(&profile); + return SWITCH_STATUS_GENERR; +} + +/* This should only be called in a single threaded context from the xml_handler profile send thread */ +switch_status_t mod_amqp_xml_handler_send(mod_amqp_xml_handler_profile_t *profile, mod_amqp_message_t *msg) +{ + amqp_basic_properties_t props; + int status; + + if (!profile->conn_active) { + /* No connection, so we can not send the message. */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name); + return SWITCH_STATUS_NOT_INITALIZED; + } + memset(&props, 0, sizeof(amqp_basic_properties_t)); + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG; + props.content_type = amqp_cstring_bytes("application/json"); + + status = amqp_basic_publish(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(msg->routing_key), 0, 0, &props, amqp_cstring_bytes(msg->pjson)); + if (status < 0) { + const char *errstr = amqp_error_string2(-status); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, + "Profile[%s] failed to send event on connection[%s]: %s\n", profile->name, + profile->conn_active->name, errstr); + + /* This is bad, we couldn't send the message. Clear up any connection */ + mod_amqp_connection_close(profile->conn_active); + profile->conn_active = NULL; + return SWITCH_STATUS_SOCKERR; + } + + return SWITCH_STATUS_SUCCESS; +} + +void *SWITCH_THREAD_FUNC mod_amqp_xml_handler_thread(switch_thread_t *thread, void *data) +{ + mod_amqp_message_t *msg = NULL; + switch_status_t status = SWITCH_STATUS_SUCCESS; + mod_amqp_xml_handler_profile_t *profile = (mod_amqp_xml_handler_profile_t *)data; + mod_amqp_aux_connection_t *conn_aux = NULL, *conn_next = NULL; + amqp_boolean_t passive = 0; + amqp_boolean_t durable = 1; + while (profile->running) { + if (!profile->conn_active) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Amqp no connection- reconnecting...\n"); + + status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, + profile->custom_attr); + if (status == SWITCH_STATUS_SUCCESS) { + // init first temp connection (for outgoing msgs) + if (!profile->conn_aux) { + profile->conn_aux = switch_core_alloc(profile->pool, sizeof(mod_amqp_aux_connection_t)); + } + switch_mutex_lock(profile->mutex); + for (conn_aux = profile->conn_aux; conn_aux; conn_aux = conn_next) { + mod_amqp_aux_connection_open(profile->conn_active, &(conn_aux), profile->name, profile->custom_attr, + profile->reply_exchange); + conn_next = conn_aux->next; + } + switch_mutex_unlock(profile->mutex); + // Ensure that the exchange exists, and is of the correct type +#if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR >= 6 + amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(profile->exchange_type), passive, durable, + profile->exchange_auto_delete, 0, amqp_empty_table); + amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->reply_exchange), + amqp_cstring_bytes(profile->reply_exchange_type), passive, durable, + profile->exchange_auto_delete, 0, amqp_empty_table); +#else + amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(profile->exchange_type), passive, durable, amqp_empty_table); + amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(profile->exchange_type), passive, durable, amqp_empty_table); +#endif + + if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), + "Declaring exchange")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n"); + continue; + } + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, + "Profile[%s] failed to connect with code(%d), sleeping for %dms\n", profile->name, status, + profile->reconnect_interval_ms); + switch_sleep(profile->reconnect_interval_ms * 1000); + continue; + } + + if (!msg && switch_queue_pop_timeout(profile->send_queue, (void **)&msg, 1000000) != SWITCH_STATUS_SUCCESS) { + continue; + } + + if (msg) { + switch (mod_amqp_xml_handler_send(profile, msg)) { + case SWITCH_STATUS_SUCCESS: + /* Success: prepare for next message */ + mod_amqp_util_msg_destroy(&msg); + break; + + case SWITCH_STATUS_NOT_INITALIZED: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with 'not initialised'\n"); + break; + + case SWITCH_STATUS_SOCKERR: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with 'socket error'\n"); + break; + + default: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with a generic error\n"); + + /* Send failed and closed the connection; reconnect will happen at the beginning of the loop + * NB: do we need a delay here to prevent a fast reconnect-send-fail loop? */ + break; + } + } + } + + /* Abort the current message */ + mod_amqp_util_msg_destroy(&msg); + + // Terminate the thread + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "XML handler sender thread has been stopped\n"); + switch_thread_exit(thread, SWITCH_STATUS_SUCCESS); + return NULL; +} + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 + */ From 68008d86b59ec34bc9306f47f005726356706d41 Mon Sep 17 00:00:00 2001 From: Aleksei Khabuliak Date: Fri, 17 Jun 2022 22:09:26 +0300 Subject: [PATCH 2/8] fixed bugs --- .../event_handlers/mod_amqp/mod_amqp_connection.c | 14 +++++--------- .../event_handlers/mod_amqp/mod_amqp_xml_handler.c | 13 +++++++------ 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c b/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c index 31ecbc2eee..a8d9ba8787 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c @@ -216,9 +216,7 @@ switch_status_t mod_amqp_aux_connection_open(mod_amqp_connection_t *base_conn, m char key_string[256] = {0}; amqp_rpc_reply_t status; amqp_socket_t *socket = NULL; - int amqp_status = -1; amqp_queue_declare_ok_t *recv_queue; - amqp_bytes_t queueName = {0, NULL}; switch_uuid_t uuid; amqp_connection_state_t newConnection = amqp_new_connection(); @@ -270,14 +268,13 @@ switch_status_t mod_amqp_aux_connection_open(mod_amqp_connection_t *base_conn, m } } - amqp_status = -1; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying to add additional connect to AMQP %s:%d\n", base_conn->hostname, base_conn->port); - if ((amqp_status = amqp_socket_open(socket, base_conn->hostname, base_conn->port))) { + if (amqp_socket_open(socket, base_conn->hostname, base_conn->port)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, - "Could not open socket connection to AMQP broker %s:%d status(%d) %s\n", base_conn->hostname, - base_conn->port, amqp_status, amqp_error_string2(amqp_status)); + "Could not open socket connection to AMQP broker %s:%d\n", base_conn->hostname, + base_conn->port); goto err; } @@ -322,11 +319,10 @@ switch_status_t mod_amqp_aux_connection_open(mod_amqp_connection_t *base_conn, m 1, /* exclusive */ 1, /* auto-delete */ amqp_empty_table); // args - queueName = amqp_bytes_malloc_dup(recv_queue->queue); - (*aux_conn)->queueName = queueName; + (*aux_conn)->queueName = amqp_bytes_malloc_dup(recv_queue->queue); amqp_queue_bind(newConnection, // state 1, // channel - queueName, // queue + (*aux_conn)->queueName, // queue amqp_cstring_bytes(reply_exchange), // exchange amqp_cstring_bytes((*aux_conn)->uuid), // routing key amqp_empty_table); // args diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c b/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c index b115c4736f..71ec88909a 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c @@ -115,8 +115,6 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co switch_event_add_header_string(params, SWITCH_STACK_TOP, "reply_key", uuid_str); switch_event_add_header_string(params, SWITCH_STACK_TOP, "reply_exchange", profile->reply_exchange); - switch_malloc(amqp_message, sizeof(mod_amqp_message_t)); - mod_amqp_xml_handler_routing_key(profile, amqp_message->routing_key, params, profile->format_fields); if (profile->running && profile->conn_active) { switch_mutex_lock(profile->mutex); for (conn = profile->conn_aux; conn; conn = conn_next) { @@ -139,6 +137,8 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co switch_mutex_unlock(profile->mutex); if (conn_tmp) { + switch_malloc(amqp_message, sizeof(mod_amqp_message_t)); + mod_amqp_xml_handler_routing_key(profile, amqp_message->routing_key, params, profile->format_fields); amqp_maybe_release_buffers(conn_tmp->state); switch_event_add_header_string(params, SWITCH_STACK_TOP, "reply_queue", conn_tmp->uuid); switch_event_serialize_json(params, &amqp_message->pjson); @@ -154,6 +154,7 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co profile->circuit_breaker_ms / 1000.0, queue_size); mod_amqp_util_msg_destroy(&amqp_message); + goto done; } // Start a command @@ -218,7 +219,7 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co } } } - if (strcmp(fs_resp_id, uuid_str) == 0) { + if (fs_resp_id && strcmp(fs_resp_id, uuid_str) == 0) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got my message. Trying to parse...\n"); break; } @@ -228,16 +229,16 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co } } - if (res.reply_type != AMQP_RESPONSE_NORMAL || + if (res.reply_type != AMQP_RESPONSE_NORMAL || !(xml = switch_xml_parse_str_dynamic((char *)envelope.message.body.bytes, SWITCH_TRUE))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Parsing XML Result!\n"); - } else { - xml = NULL; } + amqp_destroy_envelope(&envelope); } } +done: if (conn_tmp) { switch_mutex_lock(profile->mutex); conn_tmp->locked = 0; From 10d3a3ab7e884662151a4cabc00c3a7551866d62 Mon Sep 17 00:00:00 2001 From: Aleksei Khabuliak Date: Fri, 6 Jan 2023 13:50:11 -0800 Subject: [PATCH 3/8] refactoring --- conf/vanilla/autoload_configs/amqp.conf.xml | 2 - src/mod/event_handlers/mod_amqp/mod_amqp.h | 3 +- .../mod_amqp/mod_amqp_connection.c | 14 +-- .../mod_amqp/mod_amqp_xml_handler.c | 93 +++++++------------ 4 files changed, 41 insertions(+), 71 deletions(-) diff --git a/conf/vanilla/autoload_configs/amqp.conf.xml b/conf/vanilla/autoload_configs/amqp.conf.xml index 3ee2345816..cf670d2bc1 100644 --- a/conf/vanilla/autoload_configs/amqp.conf.xml +++ b/conf/vanilla/autoload_configs/amqp.conf.xml @@ -99,8 +99,6 @@ - - diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp.h b/src/mod/event_handlers/mod_amqp/mod_amqp.h index 4e3432ab45..3528ff7290 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp.h +++ b/src/mod/event_handlers/mod_amqp/mod_amqp.h @@ -67,6 +67,7 @@ typedef struct { char routing_key[MAX_AMQP_ROUTING_KEY_LENGTH]; char *pjson; + amqp_basic_properties_t props; } mod_amqp_message_t; typedef struct mod_amqp_aux_connection_s { @@ -256,7 +257,7 @@ void mod_amqp_connection_destroy(mod_amqp_connection_t **conn); void mod_amqp_aux_connection_destroy(mod_amqp_aux_connection_t **conn); void mod_amqp_connection_close(mod_amqp_connection_t *connection); switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod_amqp_connection_t **active, char *profile_name, char *custom_attr); -switch_status_t mod_amqp_aux_connection_open(mod_amqp_connection_t *connections, mod_amqp_aux_connection_t **active, char *profile_name, char *custom_attr, char *reply_exchange); +switch_status_t mod_amqp_aux_connection_open(mod_amqp_connection_t *connections, mod_amqp_aux_connection_t **active, char *profile_name, char *custom_attr); /* command */ switch_status_t mod_amqp_command_destroy(mod_amqp_command_profile_t **profile); diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c b/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c index a8d9ba8787..b911fda999 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c @@ -205,7 +205,7 @@ err: } switch_status_t mod_amqp_aux_connection_open(mod_amqp_connection_t *base_conn, mod_amqp_aux_connection_t **aux_conn, - char *profile_name, char *custom_attr, char *reply_exchange) + char *profile_name, char *custom_attr) { int channel_max = 0; int frame_max = 131072; @@ -217,7 +217,6 @@ switch_status_t mod_amqp_aux_connection_open(mod_amqp_connection_t *base_conn, m amqp_rpc_reply_t status; amqp_socket_t *socket = NULL; amqp_queue_declare_ok_t *recv_queue; - switch_uuid_t uuid; amqp_connection_state_t newConnection = amqp_new_connection(); @@ -308,24 +307,17 @@ switch_status_t mod_amqp_aux_connection_open(mod_amqp_connection_t *base_conn, m if (aux_conn) { (*aux_conn)->state = newConnection; (*aux_conn)->locked = 0; - switch_uuid_get(&uuid); - switch_uuid_format((*aux_conn)->uuid, &uuid); recv_queue = amqp_queue_declare(newConnection, // state 1, // channel - amqp_cstring_bytes((*aux_conn)->uuid), // queue name + amqp_empty_bytes, // queue name 0, /* passive */ 0, /* durable */ 1, /* exclusive */ 1, /* auto-delete */ amqp_empty_table); // args (*aux_conn)->queueName = amqp_bytes_malloc_dup(recv_queue->queue); - amqp_queue_bind(newConnection, // state - 1, // channel - (*aux_conn)->queueName, // queue - amqp_cstring_bytes(reply_exchange), // exchange - amqp_cstring_bytes((*aux_conn)->uuid), // routing key - amqp_empty_table); // args + amqp_basic_qos(newConnection, 1, 0, 1, 0); } return SWITCH_STATUS_SUCCESS; diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c b/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c index 71ec88909a..472bc484a6 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c @@ -73,7 +73,7 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co { switch_xml_t xml = NULL; mod_amqp_message_t *amqp_message; - + amqp_basic_properties_t props; mod_amqp_aux_connection_t *conn = NULL, *conn_next = NULL, *conn_tmp = NULL; switch_uuid_t uuid; @@ -112,8 +112,6 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co switch_uuid_get(&uuid); switch_uuid_format(uuid_str, &uuid); - switch_event_add_header_string(params, SWITCH_STACK_TOP, "reply_key", uuid_str); - switch_event_add_header_string(params, SWITCH_STACK_TOP, "reply_exchange", profile->reply_exchange); if (profile->running && profile->conn_active) { switch_mutex_lock(profile->mutex); @@ -122,7 +120,7 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co if (conn->next == NULL && i < profile->max_temp_conn) { conn->next = switch_core_alloc(profile->pool, sizeof(mod_amqp_aux_connection_t)); status = mod_amqp_aux_connection_open(profile->conn_active, &(conn->next), profile->name, - profile->custom_attr, profile->reply_exchange); + profile->custom_attr); if (status == SWITCH_STATUS_SUCCESS) { conn->next->locked = 0; } } i++; @@ -137,11 +135,23 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co switch_mutex_unlock(profile->mutex); if (conn_tmp) { - switch_malloc(amqp_message, sizeof(mod_amqp_message_t)); - mod_amqp_xml_handler_routing_key(profile, amqp_message->routing_key, params, profile->format_fields); + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG | + AMQP_BASIC_CORRELATION_ID_FLAG; + props.content_type = amqp_cstring_bytes("application/json"); + props.reply_to = amqp_bytes_malloc_dup(conn_tmp->queueName); + props.delivery_mode = 1; + if (props.reply_to.bytes == NULL) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Out of memory while copying queue name"); + goto done; + } + props.correlation_id = amqp_cstring_bytes(uuid_str); + + switch_malloc(amqp_message, sizeof(mod_amqp_message_t)); + mod_amqp_xml_handler_routing_key(profile, amqp_message->routing_key, params, profile->format_fields); amqp_maybe_release_buffers(conn_tmp->state); - switch_event_add_header_string(params, SWITCH_STACK_TOP, "reply_queue", conn_tmp->uuid); + // switch_event_add_header_string(params, SWITCH_STACK_TOP, "reply_queue", conn_tmp->uuid); switch_event_serialize_json(params, &amqp_message->pjson); + amqp_message->props = props; if (switch_queue_trypush(profile->send_queue, amqp_message) != SWITCH_STATUS_SUCCESS) { unsigned int queue_size = switch_queue_size(profile->send_queue); @@ -167,8 +177,10 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co timeout.tv_usec = 5000 * 1000; for (;;) { - char *fs_resp_id = NULL; - amqp_maybe_release_buffers_on_channel(conn_tmp->state, 1); + char *correlation_id = NULL; + amqp_basic_properties_t *p; + //amqp_maybe_release_buffers_on_channel(conn_tmp->state, 1); + amqp_maybe_release_buffers(conn_tmp->state); res = amqp_consume_message(conn_tmp->state, &envelope, &timeout, 0); if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) { if (res.library_error == AMQP_STATUS_UNEXPECTED_STATE) { @@ -201,40 +213,23 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co continue; } if (res.reply_type == AMQP_RESPONSE_NORMAL) { - if (envelope.message.properties.headers.num_entries) { - int x = 0; - for (x = 0; x < envelope.message.properties.headers.num_entries; x++) { - char *header_key = (char *)envelope.message.properties.headers.entries[x].key.bytes; - char *header_value = - (char *)envelope.message.properties.headers.entries[x].value.value.bytes.bytes; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, - "AMQP message custom header key[%s] value[%s]\n", header_key, - header_value); - - if (!strncmp(header_key, "x-fs-resp-id", 12)) { - fs_resp_id = header_value; - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, - "Ignoring unrecognized event header [%s]\n", header_key); - } - } - } - if (fs_resp_id && strcmp(fs_resp_id, uuid_str) == 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got my message. Trying to parse...\n"); + p = &envelope.message.properties; + correlation_id = p->correlation_id.bytes; + if (correlation_id && strcmp(correlation_id, uuid_str) == 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got my message. Trying to parse...\n%s\n", (char *)envelope.message.body.bytes); break; } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, - "Got wrong message, Trying the next one... %s vs %s\n", fs_resp_id, uuid_str); + "Got wrong message, Trying the next one... %s vs %s\n", correlation_id, uuid_str); continue; } } - if (res.reply_type != AMQP_RESPONSE_NORMAL || + if (res.reply_type != AMQP_RESPONSE_NORMAL || !(xml = switch_xml_parse_str_dynamic((char *)envelope.message.body.bytes, SWITCH_TRUE))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Parsing XML Result!\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Parsing XML Result! \n"); } - - + amqp_bytes_free(props.reply_to); amqp_destroy_envelope(&envelope); } } @@ -306,7 +301,6 @@ switch_status_t mod_amqp_xml_handler_create(char *name, switch_xml_t cfg) switch_xml_t params, param, connections, connection; switch_threadattr_t *thd_attr = NULL; char *exchange = NULL, *exchange_type = NULL; - char *reply_exchange = NULL, *reply_exchange_type = NULL; char *bindings = NULL; int exchange_durable = 1; /* durable */ switch_memory_pool_t *pool; @@ -353,10 +347,6 @@ switch_status_t mod_amqp_xml_handler_create(char *name, switch_xml_t cfg) exchange_type = switch_core_strdup(profile->pool, val); } else if (!strncmp(var, "exchange-name", 13)) { exchange = switch_core_strdup(profile->pool, val); - } else if (!strncmp(var, "reply-exchange-type", 19)) { - reply_exchange_type = switch_core_strdup(profile->pool, val); - } else if (!strncmp(var, "reply-exchange-name", 19)) { - reply_exchange = switch_core_strdup(profile->pool, val); } else if (!strncmp(var, "exchange-durable", 16)) { exchange_durable = switch_true(val); } else if (!strncmp(var, "xml-handler-bindings", 20)) { @@ -381,13 +371,9 @@ switch_status_t mod_amqp_xml_handler_create(char *name, switch_xml_t cfg) } /* params for loop */ } /* Handle defaults of string types */ - profile->bindings = bindings ? bindings : switch_core_strdup(profile->pool, "all"); + profile->bindings = bindings ? bindings : switch_core_strdup(profile->pool, ""); profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.XML_handler"); profile->exchange_type = exchange_type ? exchange_type : switch_core_strdup(profile->pool, "topic"); - profile->reply_exchange = - reply_exchange ? reply_exchange : switch_core_strdup(profile->pool, "TAP.XML_handler_reply"); - profile->reply_exchange_type = - reply_exchange_type ? reply_exchange_type : switch_core_strdup(profile->pool, "direct"); profile->exchange_durable = exchange_durable; profile->active_channels = 1; if (max_temp_conn && max_temp_conn > 0 && max_temp_conn < 1000) { @@ -473,20 +459,19 @@ err: /* This should only be called in a single threaded context from the xml_handler profile send thread */ switch_status_t mod_amqp_xml_handler_send(mod_amqp_xml_handler_profile_t *profile, mod_amqp_message_t *msg) { - amqp_basic_properties_t props; +// amqp_basic_properties_t props; int status; - if (!profile->conn_active) { /* No connection, so we can not send the message. */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name); return SWITCH_STATUS_NOT_INITALIZED; } - memset(&props, 0, sizeof(amqp_basic_properties_t)); - props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG; - props.content_type = amqp_cstring_bytes("application/json"); +// memset(&props, 0, sizeof(amqp_basic_properties_t)); +// props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG; +// props.content_type = amqp_cstring_bytes("application/json"); status = amqp_basic_publish(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), - amqp_cstring_bytes(msg->routing_key), 0, 0, &props, amqp_cstring_bytes(msg->pjson)); + amqp_cstring_bytes(msg->routing_key), 0, 0, &msg->props, amqp_cstring_bytes(msg->pjson)); if (status < 0) { const char *errstr = amqp_error_string2(-status); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, @@ -523,8 +508,7 @@ void *SWITCH_THREAD_FUNC mod_amqp_xml_handler_thread(switch_thread_t *thread, vo } switch_mutex_lock(profile->mutex); for (conn_aux = profile->conn_aux; conn_aux; conn_aux = conn_next) { - mod_amqp_aux_connection_open(profile->conn_active, &(conn_aux), profile->name, profile->custom_attr, - profile->reply_exchange); + mod_amqp_aux_connection_open(profile->conn_active, &(conn_aux), profile->name, profile->custom_attr); conn_next = conn_aux->next; } switch_mutex_unlock(profile->mutex); @@ -533,14 +517,9 @@ void *SWITCH_THREAD_FUNC mod_amqp_xml_handler_thread(switch_thread_t *thread, vo amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange_type), passive, durable, profile->exchange_auto_delete, 0, amqp_empty_table); - amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->reply_exchange), - amqp_cstring_bytes(profile->reply_exchange_type), passive, durable, - profile->exchange_auto_delete, 0, amqp_empty_table); #else amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange_type), passive, durable, amqp_empty_table); - amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), - amqp_cstring_bytes(profile->exchange_type), passive, durable, amqp_empty_table); #endif if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), From 63bd218f9845ef69ba6060c98bc2906caf64a826 Mon Sep 17 00:00:00 2001 From: Aleksei Khabuliak Date: Sat, 7 Jan 2023 17:32:53 -0800 Subject: [PATCH 4/8] fixed a bug with amqp body. created a variable to store message body --- .../event_handlers/mod_amqp/mod_amqp_xml_handler.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c b/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c index 472bc484a6..6798a76b6a 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c @@ -85,6 +85,8 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co switch_time_t now = switch_time_now(); switch_time_t reset_time; switch_status_t status = SWITCH_STATUS_SUCCESS; + char *amqp_body = NULL; + int amqp_body_len; int i = 0; if (!profile) { @@ -215,8 +217,9 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co if (res.reply_type == AMQP_RESPONSE_NORMAL) { p = &envelope.message.properties; correlation_id = p->correlation_id.bytes; + if (correlation_id && strcmp(correlation_id, uuid_str) == 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got my message. Trying to parse...\n%s\n", (char *)envelope.message.body.bytes); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got my message. Trying to parse\n"); break; } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, @@ -225,10 +228,15 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co } } + amqp_body_len = (int) envelope.message.body.len + 1; + amqp_body = malloc(amqp_body_len); + snprintf(amqp_body, amqp_body_len, "%s", (char *) envelope.message.body.bytes); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "XML from AMQP msg:\n%s\n", amqp_body); if (res.reply_type != AMQP_RESPONSE_NORMAL || - !(xml = switch_xml_parse_str_dynamic((char *)envelope.message.body.bytes, SWITCH_TRUE))) { + !(xml = switch_xml_parse_str_dynamic(amqp_body, SWITCH_TRUE))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Parsing XML Result! \n"); } + switch_safe_free(amqp_body); amqp_bytes_free(props.reply_to); amqp_destroy_envelope(&envelope); } @@ -236,6 +244,7 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co done: if (conn_tmp) { switch_mutex_lock(profile->mutex); + amqp_maybe_release_buffers(conn_tmp->state); conn_tmp->locked = 0; switch_mutex_unlock(profile->mutex); conn_tmp = NULL; From 3aadc42e591b4c075068c60ff55fa3a9f11b7a37 Mon Sep 17 00:00:00 2001 From: Aleksei Khabuliak Date: Sat, 7 Jan 2023 19:49:44 -0800 Subject: [PATCH 5/8] fix reconnection for aux channels --- src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c b/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c index 6798a76b6a..663206b0f8 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c @@ -504,6 +504,10 @@ void *SWITCH_THREAD_FUNC mod_amqp_xml_handler_thread(switch_thread_t *thread, vo mod_amqp_aux_connection_t *conn_aux = NULL, *conn_next = NULL; amqp_boolean_t passive = 0; amqp_boolean_t durable = 1; + // init first temp connection (for outgoing msgs) + if (!profile->conn_aux) { + profile->conn_aux = switch_core_alloc(profile->pool, sizeof(mod_amqp_aux_connection_t)); + } while (profile->running) { if (!profile->conn_active) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Amqp no connection- reconnecting...\n"); @@ -511,10 +515,6 @@ void *SWITCH_THREAD_FUNC mod_amqp_xml_handler_thread(switch_thread_t *thread, vo status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr); if (status == SWITCH_STATUS_SUCCESS) { - // init first temp connection (for outgoing msgs) - if (!profile->conn_aux) { - profile->conn_aux = switch_core_alloc(profile->pool, sizeof(mod_amqp_aux_connection_t)); - } switch_mutex_lock(profile->mutex); for (conn_aux = profile->conn_aux; conn_aux; conn_aux = conn_next) { mod_amqp_aux_connection_open(profile->conn_active, &(conn_aux), profile->name, profile->custom_attr); From fc2992fe7b05f4a51d7b91bc3bfb8243f9b81f24 Mon Sep 17 00:00:00 2001 From: Aleksei Khabuliak Date: Sun, 8 Jan 2023 20:53:14 -0800 Subject: [PATCH 6/8] added logs if we don't have enough aux connections --- src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c b/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c index 663206b0f8..5cd1248fc9 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_xml_handler.c @@ -239,6 +239,8 @@ static switch_xml_t xml_amqp_fetch(const char *section, const char *tag_name, co switch_safe_free(amqp_body); amqp_bytes_free(props.reply_to); amqp_destroy_envelope(&envelope); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "No more aux amqp connections(%d). Increase max-temp-conn\n", i); } } done: From 164af37ebc3acc26c4f167410e1cd78a38d4b158 Mon Sep 17 00:00:00 2001 From: Aleksei Khabuliak Date: Mon, 15 May 2023 14:42:38 -0700 Subject: [PATCH 7/8] Added tmp string for logging --- src/mod/event_handlers/mod_amqp/mod_amqp_connection.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c b/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c index b911fda999..4d7867a122 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c @@ -338,8 +338,9 @@ switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_ amqp_boolean_t ssl_verify_peer = 1; if (zstr(name)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection missing name attribute\n%s\n", - switch_xml_toxml(cfg, 1)); + char *str_tmp = switch_xml_toxml(cfg, 1); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection missing name attribute\n%s\n", str_tmp); + switch_safe_free(str_tmp); return SWITCH_STATUS_GENERR; } From b0d9d42638550a99c932ff44c2c4c3cb3c9779f0 Mon Sep 17 00:00:00 2001 From: Aleksei Khabuliak Date: Mon, 15 May 2023 14:59:49 -0700 Subject: [PATCH 8/8] added blank lines --- src/mod/event_handlers/mod_amqp/mod_amqp_connection.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c b/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c index 4d7867a122..59e12d6ce2 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_connection.c @@ -339,8 +339,10 @@ switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_ if (zstr(name)) { char *str_tmp = switch_xml_toxml(cfg, 1); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection missing name attribute\n%s\n", str_tmp); switch_safe_free(str_tmp); + return SWITCH_STATUS_GENERR; }