FS-10430: [mod_amqp] add support for libamqp >= 0.5.2

This commit is contained in:
Mike Jerris 2017-06-27 16:42:45 -05:00 committed by root
parent f63a3541a1
commit 8c8a4dd0f8
3 changed files with 62 additions and 9 deletions

View File

@ -280,12 +280,23 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void
} }
/* Check if exchange already exists */ /* Check if exchange already exists */
#if AMQP_VERSION_MAJOR == 0 && (AMQP_VERSION_MINOR > 5 || (AMQP_VERSION_MINOR == 5 && AMQP_VERSION_PATCH >= 2 ))
amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes("topic"),
0, /* passive */
1, /* durable */
0, /* auto-delete */
0,
amqp_empty_table);
#else
amqp_exchange_declare(profile->conn_active->state, 1, amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes("topic"), amqp_cstring_bytes("topic"),
0, /* passive */ 0, /* passive */
1, /* durable */ 1, /* durable */
amqp_empty_table); amqp_empty_table);
#endif
if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Checking for command exchange")) { if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Checking for command exchange")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create missing command exchange", profile->name); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create missing command exchange", profile->name);

View File

@ -244,12 +244,23 @@ switch_status_t mod_amqp_logging_create(char *name, switch_xml_t cfg)
goto err; goto err;
} }
#if AMQP_VERSION_MAJOR == 0 && (AMQP_VERSION_MINOR > 5 || (AMQP_VERSION_MINOR == 5 && AMQP_VERSION_PATCH >= 2 ))
amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(profile->exchange_type),
0, /* passive */
profile->exchange_durable,
profile->exchange_auto_delete,
0,
amqp_empty_table);
#else
amqp_exchange_declare(profile->conn_active->state, 1, amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(profile->exchange_type), amqp_cstring_bytes(profile->exchange_type),
0, /* passive */ 0, /* passive */
profile->exchange_durable, profile->exchange_durable,
amqp_empty_table); amqp_empty_table);
#endif
if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] failed to create exchange\n", profile->name); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] failed to create exchange\n", profile->name);
@ -341,13 +352,24 @@ void * SWITCH_THREAD_FUNC mod_amqp_logging_thread(switch_thread_t *thread, void
status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr); status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr);
if ( status == SWITCH_STATUS_SUCCESS ) { if ( status == SWITCH_STATUS_SUCCESS ) {
// Ensure that the exchange exists, and is of the correct type // Ensure that the exchange exists, and is of the correct type
amqp_exchange_declare(profile->conn_active->state, 1, #if AMQP_VERSION_MAJOR == 0 && (AMQP_VERSION_MINOR > 5 || (AMQP_VERSION_MINOR == 5 && AMQP_VERSION_PATCH >= 2 ))
amqp_cstring_bytes(profile->exchange), amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange_type), amqp_cstring_bytes(profile->exchange),
passive, amqp_cstring_bytes(profile->exchange_type),
durable, passive,
amqp_empty_table); durable,
profile->exchange_auto_delete,
0,
amqp_empty_table);
#else
amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(profile->exchange_type),
passive,
durable,
amqp_empty_table);
#endif
if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n");

View File

@ -325,13 +325,23 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] was unable to connect to any connection\n", profile->name); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] was unable to connect to any connection\n", profile->name);
goto err; goto err;
} }
#if AMQP_VERSION_MAJOR == 0 && (AMQP_VERSION_MINOR > 5 || (AMQP_VERSION_MINOR == 5 && AMQP_VERSION_PATCH >= 2 ))
amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(profile->exchange_type),
0, /* passive */
profile->exchange_durable,
profile->exchange_auto_delete,
0,
amqp_empty_table);
#else
amqp_exchange_declare(profile->conn_active->state, 1, amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(profile->exchange_type), amqp_cstring_bytes(profile->exchange_type),
0, /* passive */ 0, /* passive */
profile->exchange_durable, profile->exchange_durable,
amqp_empty_table); amqp_empty_table);
#endif
if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] failed to create exchange\n", profile->name); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] failed to create exchange\n", profile->name);
@ -458,13 +468,23 @@ void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void
status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr); status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr);
if ( status == SWITCH_STATUS_SUCCESS ) { if ( status == SWITCH_STATUS_SUCCESS ) {
// Ensure that the exchange exists, and is of the correct type // Ensure that the exchange exists, and is of the correct type
#if AMQP_VERSION_MAJOR == 0 && (AMQP_VERSION_MINOR > 5 || (AMQP_VERSION_MINOR == 5 && AMQP_VERSION_PATCH >= 2 ))
amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(profile->exchange_type),
passive,
durable,
profile->exchange_auto_delete,
0,
amqp_empty_table);
#else
amqp_exchange_declare(profile->conn_active->state, 1, amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(profile->exchange_type), amqp_cstring_bytes(profile->exchange_type),
passive, passive,
durable, durable,
amqp_empty_table); amqp_empty_table);
#endif
if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n");
continue; continue;