From 8c8a4dd0f8442f4cf98aa8b833f4cc0ec453644a Mon Sep 17 00:00:00 2001 From: Mike Jerris Date: Tue, 27 Jun 2017 16:42:45 -0500 Subject: [PATCH] FS-10430: [mod_amqp] add support for libamqp >= 0.5.2 --- .../mod_amqp/mod_amqp_command.c | 11 ++++++ .../mod_amqp/mod_amqp_logging.c | 36 +++++++++++++++---- .../mod_amqp/mod_amqp_producer.c | 24 +++++++++++-- 3 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_command.c b/src/mod/event_handlers/mod_amqp/mod_amqp_command.c index 3d54bb69c5..3efd97cc38 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_command.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_command.c @@ -280,12 +280,23 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void } /* Check if exchange already exists */ +#if AMQP_VERSION_MAJOR == 0 && (AMQP_VERSION_MINOR > 5 || (AMQP_VERSION_MINOR == 5 && AMQP_VERSION_PATCH >= 2 )) + amqp_exchange_declare(profile->conn_active->state, 1, + amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes("topic"), + 0, /* passive */ + 1, /* durable */ + 0, /* auto-delete */ + 0, + amqp_empty_table); +#else amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes("topic"), 0, /* passive */ 1, /* durable */ amqp_empty_table); +#endif if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Checking for command exchange")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create missing command exchange", profile->name); diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_logging.c b/src/mod/event_handlers/mod_amqp/mod_amqp_logging.c index f2449a8994..0085c99ddd 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_logging.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_logging.c @@ -244,12 +244,23 @@ switch_status_t mod_amqp_logging_create(char *name, switch_xml_t cfg) goto err; } +#if AMQP_VERSION_MAJOR == 0 && (AMQP_VERSION_MINOR > 5 || (AMQP_VERSION_MINOR == 5 && AMQP_VERSION_PATCH >= 2 )) + amqp_exchange_declare(profile->conn_active->state, 1, + amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(profile->exchange_type), + 0, /* passive */ + profile->exchange_durable, + profile->exchange_auto_delete, + 0, + amqp_empty_table); +#else amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange_type), 0, /* passive */ profile->exchange_durable, amqp_empty_table); +#endif if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] failed to create exchange\n", profile->name); @@ -341,13 +352,24 @@ void * SWITCH_THREAD_FUNC mod_amqp_logging_thread(switch_thread_t *thread, void status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr); if ( status == SWITCH_STATUS_SUCCESS ) { - // Ensure that the exchange exists, and is of the correct type - amqp_exchange_declare(profile->conn_active->state, 1, - amqp_cstring_bytes(profile->exchange), - amqp_cstring_bytes(profile->exchange_type), - passive, - durable, - amqp_empty_table); + // Ensure that the exchange exists, and is of the correct type +#if AMQP_VERSION_MAJOR == 0 && (AMQP_VERSION_MINOR > 5 || (AMQP_VERSION_MINOR == 5 && AMQP_VERSION_PATCH >= 2 )) + amqp_exchange_declare(profile->conn_active->state, 1, + amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(profile->exchange_type), + passive, + durable, + profile->exchange_auto_delete, + 0, + amqp_empty_table); +#else + amqp_exchange_declare(profile->conn_active->state, 1, + amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(profile->exchange_type), + passive, + durable, + amqp_empty_table); +#endif if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n"); diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c index 99c439f57a..d74776f564 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c @@ -325,13 +325,23 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] was unable to connect to any connection\n", profile->name); goto err; } - +#if AMQP_VERSION_MAJOR == 0 && (AMQP_VERSION_MINOR > 5 || (AMQP_VERSION_MINOR == 5 && AMQP_VERSION_PATCH >= 2 )) + amqp_exchange_declare(profile->conn_active->state, 1, + amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(profile->exchange_type), + 0, /* passive */ + profile->exchange_durable, + profile->exchange_auto_delete, + 0, + amqp_empty_table); +#else amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange_type), 0, /* passive */ profile->exchange_durable, amqp_empty_table); +#endif if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] failed to create exchange\n", profile->name); @@ -458,13 +468,23 @@ void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr); if ( status == SWITCH_STATUS_SUCCESS ) { // Ensure that the exchange exists, and is of the correct type +#if AMQP_VERSION_MAJOR == 0 && (AMQP_VERSION_MINOR > 5 || (AMQP_VERSION_MINOR == 5 && AMQP_VERSION_PATCH >= 2 )) + amqp_exchange_declare(profile->conn_active->state, 1, + amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(profile->exchange_type), + passive, + durable, + profile->exchange_auto_delete, + 0, + amqp_empty_table); +#else amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange_type), passive, durable, amqp_empty_table); - +#endif if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n"); continue;