Merge pull request #666 in FS/freeswitch from feature/FS-8728-mod_amqp-add-logging-ability to master

* commit 'e7d2083263319499fbdf897e22fdb24ab3d81b55':
  FS-8728 default configs
  FS-8728 Adding logging profile and functionality
This commit is contained in:
William King 2016-01-14 16:46:34 -06:00
commit 754fd7d783
7 changed files with 519 additions and 12 deletions

View File

@ -60,4 +60,24 @@
</params> </params>
</profile> </profile>
</commands> </commands>
<logging>
<profile name="default">
<connections>
<connection name="primary">
<param name="hostname" value="localhost"/>
<param name="virtualhost" value="/"/>
<param name="username" value="guest"/>
<param name="password" value="guest"/>
<param name="port" value="5672"/>
<param name="heartbeat" value="0"/>
</connection>
</connections>
<params>
<param name="exchange-name" value="TAP.Logging"/>
<param name="send_queue_size" value="5000"/>
<param name="reconnect_interval_ms" value="1000"/>
<param name="log-levels" value="debug,info,notice,warning,err,crit,alert"/>
</params>
</profile>
</logging>
</configuration> </configuration>

View File

@ -4,7 +4,7 @@ MODNAME=mod_amqp
if HAVE_AMQP if HAVE_AMQP
mod_LTLIBRARIES = mod_amqp.la mod_LTLIBRARIES = mod_amqp.la
mod_amqp_la_SOURCES = mod_amqp_utils.c mod_amqp_connection.c mod_amqp_producer.c mod_amqp_command.c mod_amqp.c mod_amqp_la_SOURCES = mod_amqp_utils.c mod_amqp_connection.c mod_amqp_producer.c mod_amqp_command.c mod_amqp_logging.c mod_amqp.c
mod_amqp_la_CFLAGS = $(AM_CFLAGS) $(AMQP_CFLAGS) mod_amqp_la_CFLAGS = $(AM_CFLAGS) $(AMQP_CFLAGS)
mod_amqp_la_LIBADD = $(switch_builddir)/libfreeswitch.la mod_amqp_la_LIBADD = $(switch_builddir)/libfreeswitch.la
mod_amqp_la_LDFLAGS = -avoid-version -module -no-undefined -shared $(AMQP_LIBS) $(SWITCH_AM_LDFLAGS) mod_amqp_la_LDFLAGS = -avoid-version -module -no-undefined -shared $(AMQP_LIBS) $(SWITCH_AM_LDFLAGS)

View File

@ -62,6 +62,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_amqp_load)
globals.pool = pool; globals.pool = pool;
switch_core_hash_init(&(globals.producer_hash)); switch_core_hash_init(&(globals.producer_hash));
switch_core_hash_init(&(globals.command_hash)); switch_core_hash_init(&(globals.command_hash));
switch_core_hash_init(&(globals.logging_hash));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_apqp loading: Version %s\n", switch_version_full()); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_apqp loading: Version %s\n", switch_version_full());
@ -72,6 +73,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_amqp_load)
SWITCH_ADD_API(api_interface, "amqp", "amqp API", amqp_reload, "syntax"); SWITCH_ADD_API(api_interface, "amqp", "amqp API", amqp_reload, "syntax");
switch_log_bind_logger(mod_amqp_logging_recv, SWITCH_LOG_DEBUG, SWITCH_FALSE);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -84,6 +87,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_amqp_shutdown)
switch_hash_index_t *hi; switch_hash_index_t *hi;
mod_amqp_producer_profile_t *producer; mod_amqp_producer_profile_t *producer;
mod_amqp_command_profile_t *command; mod_amqp_command_profile_t *command;
mod_amqp_logging_profile_t *logging;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Mod starting shutting down\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Mod starting shutting down\n");
switch_event_unbind_callback(mod_amqp_producer_event_handler); switch_event_unbind_callback(mod_amqp_producer_event_handler);
@ -98,6 +102,13 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_amqp_shutdown)
mod_amqp_command_destroy(&command); mod_amqp_command_destroy(&command);
} }
switch_log_unbind_logger(mod_amqp_logging_recv);
while ((hi = switch_core_hash_first(globals.logging_hash))) {
switch_core_hash_this(hi, NULL, NULL, (void **)&logging);
mod_amqp_logging_destroy(&logging);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Mod finished shutting down\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Mod finished shutting down\n");
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }

View File

@ -146,17 +146,46 @@ typedef struct {
char *custom_attr; char *custom_attr;
} mod_amqp_command_profile_t; } mod_amqp_command_profile_t;
typedef struct {
char *name;
char *exchange;
char *exchange_type;
int exchange_durable;
int exchange_auto_delete;
uint32_t log_level_mask;
/* Note: The AMQP channel is not reentrant this MUTEX serializes sending events. */
mod_amqp_connection_t *conn_root;
mod_amqp_connection_t *conn_active;
int reconnect_interval_ms;
/* Logging thread */
switch_thread_t *logging_thread;
switch_queue_t *send_queue;
unsigned int send_queue_size;
switch_mutex_t *mutex;
switch_bool_t running;
char *custom_attr;
switch_memory_pool_t *pool;
} mod_amqp_logging_profile_t;
struct { struct {
switch_memory_pool_t *pool; switch_memory_pool_t *pool;
switch_hash_t *producer_hash; switch_hash_t *producer_hash;
switch_hash_t *command_hash; switch_hash_t *command_hash;
switch_hash_t *logging_hash;
} globals; } globals;
/* utils */ /* utils */
switch_status_t mod_amqp_do_config(switch_bool_t reload); switch_status_t mod_amqp_do_config(switch_bool_t reload);
int mod_amqp_log_if_amqp_error(amqp_rpc_reply_t x, char const *context); int mod_amqp_log_if_amqp_error(amqp_rpc_reply_t x, char const *context);
int mod_amqp_count_chars(const char* string, char ch); int mod_amqp_count_chars(const char* string, char ch);
void mod_amqp_util_msg_destroy(mod_amqp_message_t **msg);
/* connection */ /* connection */
switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_xml_t cfg, switch_memory_pool_t *pool); switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_xml_t cfg, switch_memory_pool_t *pool);
@ -179,5 +208,11 @@ void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void
char *amqp_util_encode(char *key, char *dest); char *amqp_util_encode(char *key, char *dest);
/* logging */
switch_status_t mod_amqp_logging_recv(const switch_log_node_t *node, switch_log_level_t level);
switch_status_t mod_amqp_logging_create(char *name, switch_xml_t cfg);
switch_status_t mod_amqp_logging_destroy(mod_amqp_logging_profile_t **prof);
void * SWITCH_THREAD_FUNC mod_amqp_logging_thread(switch_thread_t *thread, void *data);
#endif /* MOD_AMQP_H */ #endif /* MOD_AMQP_H */

View File

@ -0,0 +1,416 @@
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthm@freeswitch.org>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Based on mod_skel by
* Anthony Minessale II <anthm@freeswitch.org>
*
* Contributor(s):
*
* Daniel Bryars <danb@aeriandi.com>
* Tim Brown <tim.brown@aeriandi.com>
* Anthony Minessale II <anthm@freeswitch.org>
* William King <william.king@quentustech.com>
* Mike Jerris <mike@jerris.com>
*
* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
*
*/
#include "mod_amqp.h"
switch_status_t mod_amqp_logging_recv(const switch_log_node_t *node, switch_log_level_t level)
{
switch_hash_index_t *hi = NULL;
mod_amqp_message_t *msg = NULL;
mod_amqp_logging_profile_t *logging = NULL;
char *json = NULL;
if (!strcmp(node->file, "mod_amqp_logging.c")) {
return SWITCH_STATUS_SUCCESS;
}
/*
1. Loop through logging hash of profiles. Check for a profile that accepts this logging level, and file regex.
2. If event not already parsed/created, then create it now
3. Queue copy of event into logging profile send queue
4. Destroy local event copy
*/
for (hi = switch_core_hash_first(globals.logging_hash); hi; hi = switch_core_hash_next(&hi)) {
switch_core_hash_this(hi, NULL, NULL, (void **)&logging);
if ( logging && switch_log_check_mask(logging->log_level_mask, level) ) {
char file[128] = {0};
if ( !json ) {
cJSON *body = NULL;
char date[80] = "";
switch_time_exp_t tm;
switch_time_exp_lt(&tm, node->timestamp);
switch_snprintf(date, sizeof(date), "%0.4d-%0.2d-%0.2d %0.2d:%0.2d:%0.2d.%0.6d",
tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, tm.tm_usec);
/* Create cJSON body */
body = cJSON_CreateObject();
cJSON_AddItemToObject(body, "file", cJSON_CreateString((const char *) node->file));
cJSON_AddItemToObject(body, "function", cJSON_CreateString((const char *) node->func));
cJSON_AddItemToObject(body, "line", cJSON_CreateNumber((double) node->line));
cJSON_AddItemToObject(body, "level", cJSON_CreateString(switch_log_level2str(node->level)));
cJSON_AddItemToObject(body, "timestamp", cJSON_CreateString((const char *)date));
cJSON_AddItemToObject(body, "timestamp_epoch", cJSON_CreateNumber((double) node->timestamp / 1000000));
cJSON_AddItemToObject(body, "content", cJSON_CreateString(node->content ));
json = cJSON_Print(body);
cJSON_Delete(body);
}
/* Create message */
switch_malloc(msg, sizeof(mod_amqp_message_t));
msg->pjson = strdup(json);
strcpy(file, node->file);
switch_replace_char(file, '.', '_', 0);
snprintf(msg->routing_key, sizeof(msg->routing_key), "%s.%s.%s.%s", switch_core_get_hostname(), node->userdata, switch_log_level2str(node->level), file);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "AMQP Created message with routing key[%s]\n", msg->routing_key);
if (switch_queue_trypush(logging->send_queue, msg) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "AMQP logging message queue full. Messages will be dropped!\n");
return SWITCH_STATUS_SUCCESS;
}
}
}
switch_safe_free(json);
return SWITCH_STATUS_SUCCESS;
}
switch_status_t mod_amqp_logging_destroy(mod_amqp_logging_profile_t **prof)
{
mod_amqp_message_t *msg = NULL;
switch_status_t status = SWITCH_STATUS_SUCCESS;
mod_amqp_connection_t *conn = NULL, *conn_next = NULL;
switch_memory_pool_t *pool;
mod_amqp_logging_profile_t *profile;
if (!prof || !*prof) {
return SWITCH_STATUS_SUCCESS;
}
profile = *prof;
pool = profile->pool;
if (profile->name) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Profile[%s] shutting down...\n", profile->name);
switch_core_hash_delete(globals.logging_hash, profile->name);
}
profile->running = 0;
if (profile->logging_thread) {
switch_thread_join(&status, profile->logging_thread);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Profile[%s] closing AMQP socket...\n", profile->name);
for (conn = profile->conn_root; conn; conn = conn_next) {
conn_next = conn->next;
mod_amqp_connection_destroy(&conn);
}
profile->conn_active = NULL;
profile->conn_root = NULL;
while (profile->send_queue && switch_queue_trypop(profile->send_queue, (void**)&msg) == SWITCH_STATUS_SUCCESS) {
mod_amqp_util_msg_destroy(&msg);
}
if (pool) {
switch_core_destroy_memory_pool(&pool);
}
*prof = NULL;
return SWITCH_STATUS_SUCCESS;
}
switch_status_t mod_amqp_logging_create(char *name, switch_xml_t cfg)
{
mod_amqp_logging_profile_t *profile = NULL;
switch_xml_t params, param, connections, connection;
switch_threadattr_t *thd_attr = NULL;
char *exchange = NULL, *exchange_type = NULL;
int exchange_durable = 1; /* durable */
switch_memory_pool_t *pool;
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
goto err;
}
profile = switch_core_alloc(pool, sizeof(mod_amqp_logging_profile_t));
profile->pool = pool;
profile->name = switch_core_strdup(profile->pool, name);
profile->running = 1;
profile->conn_root = NULL;
profile->conn_active = NULL;
profile->log_level_mask = 0;
profile->send_queue_size = 5000;
if ((params = switch_xml_child(cfg, "params")) != NULL) {
for (param = switch_xml_child(params, "param"); param; param = param->next) {
char *var = (char *) switch_xml_attr_soft(param, "name");
char *val = (char *) switch_xml_attr_soft(param, "value");
if (!var) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param missing 'name' attribute\n", profile->name);
continue;
}
if (!val) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param[%s] missing 'value' attribute\n", profile->name, var);
continue;
}
if (!strncmp(var, "reconnect_interval_ms", 21)) {
int interval = atoi(val);
if ( interval && interval > 0 ) {
profile->reconnect_interval_ms = interval;
}
} else if (!strncmp(var, "send_queue_size", 15)) {
int interval = atoi(val);
if ( interval && interval > 0 ) {
profile->send_queue_size = interval;
}
} else if (!strncmp(var, "exchange-type", 13)) {
exchange_type = switch_core_strdup(profile->pool, val);
} else if (!strncmp(var, "exchange-name", 13)) {
exchange = switch_core_strdup(profile->pool, val);
} else if (!strncmp(var, "exchange-durable", 16)) {
exchange_durable = switch_true(val);
} else if (!strncmp(var, "log-levels", 10)) {
profile->log_level_mask = switch_log_str2mask(val);
}
} /* params for loop */
}
/* Handle defaults of string types */
profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Events");
profile->exchange_type = exchange_type ? exchange_type : switch_core_strdup(profile->pool, "topic");
profile->exchange_durable = exchange_durable;
if ((connections = switch_xml_child(cfg, "connections")) != NULL) {
for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) {
if ( ! profile->conn_root ) { /* Handle first root node */
if (mod_amqp_connection_create(&(profile->conn_root), connection, profile->pool) != SWITCH_STATUS_SUCCESS) {
/* Handle connection create failure */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name);
continue;
}
profile->conn_active = profile->conn_root;
} else {
if (mod_amqp_connection_create(&(profile->conn_active->next), connection, profile->pool) != SWITCH_STATUS_SUCCESS) {
/* Handle connection create failure */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name);
continue;
}
profile->conn_active = profile->conn_active->next;
}
}
}
profile->conn_active = NULL;
if ( mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] was unable to connect to any connection\n", profile->name);
goto err;
}
amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(profile->exchange_type),
0, /* passive */
profile->exchange_durable,
amqp_empty_table);
if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] failed to create exchange\n", profile->name);
goto err;
}
/* Create a bounded FIFO queue for sending messages */
if (switch_queue_create(&(profile->send_queue), profile->send_queue_size, profile->pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create send queue of size %d!\n", profile->send_queue_size);
goto err;
}
/* Start the event send thread. This will set up the initial connection */
switch_threadattr_create(&thd_attr, profile->pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
if (switch_thread_create(&profile->logging_thread, thd_attr, mod_amqp_logging_thread, profile, profile->pool)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create 'amqp event sender' thread!\n");
goto err;
}
if ( switch_core_hash_insert(globals.logging_hash, name, (void *) profile) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to insert new profile [%s] into mod_amqp profile hash\n", name);
goto err;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] Successfully started\n", profile->name);
return SWITCH_STATUS_SUCCESS;
err:
/* Cleanup */
mod_amqp_logging_destroy(&profile);
return SWITCH_STATUS_GENERR;
}
/* This should only be called in a single threaded context from the logging profile send thread */
switch_status_t mod_amqp_logging_send(mod_amqp_logging_profile_t *profile, mod_amqp_message_t *msg)
{
amqp_basic_properties_t props;
int status;
if (! profile->conn_active) {
/* No connection, so we can not send the message. */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name);
return SWITCH_STATUS_NOT_INITALIZED;
}
memset(&props, 0, sizeof(amqp_basic_properties_t));
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
props.content_type = amqp_cstring_bytes("application/json");
status = amqp_basic_publish(
profile->conn_active->state,
1,
amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(msg->routing_key),
0,
0,
&props,
amqp_cstring_bytes(msg->pjson));
if (status < 0) {
const char *errstr = amqp_error_string2(-status);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] failed to send event on connection[%s]: %s\n",
profile->name, profile->conn_active->name, errstr);
/* This is bad, we couldn't send the message. Clear up any connection */
mod_amqp_connection_close(profile->conn_active);
profile->conn_active = NULL;
return SWITCH_STATUS_SOCKERR;
}
return SWITCH_STATUS_SUCCESS;
}
void * SWITCH_THREAD_FUNC mod_amqp_logging_thread(switch_thread_t *thread, void *data)
{
mod_amqp_message_t *msg = NULL;
switch_status_t status = SWITCH_STATUS_SUCCESS;
mod_amqp_logging_profile_t *profile = (mod_amqp_logging_profile_t *)data;
amqp_boolean_t passive = 0;
amqp_boolean_t durable = 1;
while (profile->running) {
if (!profile->conn_active) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Amqp no connection- reconnecting...\n");
status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr);
if ( status == SWITCH_STATUS_SUCCESS ) {
// Ensure that the exchange exists, and is of the correct type
amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(profile->exchange_type),
passive,
durable,
amqp_empty_table);
if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n");
continue;
}
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to connect with code(%d), sleeping for %dms\n",
profile->name, status, profile->reconnect_interval_ms);
switch_sleep(profile->reconnect_interval_ms * 1000);
continue;
}
if (!msg && switch_queue_pop_timeout(profile->send_queue, (void**)&msg, 1000000) != SWITCH_STATUS_SUCCESS) {
continue;
}
if (msg) {
switch (mod_amqp_logging_send(profile, msg)) {
case SWITCH_STATUS_SUCCESS:
/* Success: prepare for next message */
mod_amqp_util_msg_destroy(&msg);
break;
case SWITCH_STATUS_NOT_INITALIZED:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with 'not initialised'\n");
break;
case SWITCH_STATUS_SOCKERR:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with 'socket error'\n");
break;
default:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with a generic error\n");
/* Send failed and closed the connection; reconnect will happen at the beginning of the loop
* NB: do we need a delay here to prevent a fast reconnect-send-fail loop? */
break;
}
}
}
/* Abort the current message */
mod_amqp_util_msg_destroy(&msg);
// Terminate the thread
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Event sender thread stopped\n");
switch_thread_exit(thread, SWITCH_STATUS_SUCCESS);
return NULL;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4
*/

View File

@ -38,13 +38,6 @@
#include "mod_amqp.h" #include "mod_amqp.h"
void mod_amqp_producer_msg_destroy(mod_amqp_message_t **msg)
{
if (!msg || !*msg) return;
switch_safe_free((*msg)->pjson);
switch_safe_free(*msg);
}
switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],
switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[]) switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[])
{ {
@ -115,7 +108,7 @@ void mod_amqp_producer_event_handler(switch_event_t* evt)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "AMQP message queue full. Messages will be dropped for %.1fs! (Queue capacity %d)", switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "AMQP message queue full. Messages will be dropped for %.1fs! (Queue capacity %d)",
profile->circuit_breaker_ms / 1000.0, queue_size); profile->circuit_breaker_ms / 1000.0, queue_size);
mod_amqp_producer_msg_destroy(&amqp_message); mod_amqp_util_msg_destroy(&amqp_message);
} }
} }
@ -155,7 +148,7 @@ switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **prof) {
profile->conn_root = NULL; profile->conn_root = NULL;
while (profile->send_queue && switch_queue_trypop(profile->send_queue, (void**)&msg) == SWITCH_STATUS_SUCCESS) { while (profile->send_queue && switch_queue_trypop(profile->send_queue, (void**)&msg) == SWITCH_STATUS_SUCCESS) {
mod_amqp_producer_msg_destroy(&msg); mod_amqp_util_msg_destroy(&msg);
} }
if (pool) { if (pool) {
@ -497,7 +490,7 @@ void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void
switch (mod_amqp_producer_send(profile, msg)) { switch (mod_amqp_producer_send(profile, msg)) {
case SWITCH_STATUS_SUCCESS: case SWITCH_STATUS_SUCCESS:
/* Success: prepare for next message */ /* Success: prepare for next message */
mod_amqp_producer_msg_destroy(&msg); mod_amqp_util_msg_destroy(&msg);
break; break;
case SWITCH_STATUS_NOT_INITALIZED: case SWITCH_STATUS_NOT_INITALIZED:
@ -541,7 +534,7 @@ void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void
} }
/* Abort the current message */ /* Abort the current message */
mod_amqp_producer_msg_destroy(&msg); mod_amqp_util_msg_destroy(&msg);
// Terminate the thread // Terminate the thread
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Event sender thread stopped\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Event sender thread stopped\n");

View File

@ -144,6 +144,30 @@ switch_status_t mod_amqp_do_config(switch_bool_t reload)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unable to locate commands section for mod_amqp\n" ); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unable to locate commands section for mod_amqp\n" );
} }
if ((profiles = switch_xml_child(cfg, "logging"))) {
if ((profile = switch_xml_child(profiles, "profile"))) {
for (; profile; profile = profile->next) {
char *name = (char *) switch_xml_attr_soft(profile, "name");
if (zstr(name)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile. Check configs missing name attr\n");
continue;
}
name = switch_core_strdup(globals.pool, name);
if ( mod_amqp_logging_create(name, profile) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile [%s]. Check configs\n", name);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Loaded mod_amqp profile [%s] successfully\n", name);
}
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unable to locate a profile for mod_amqp\n" );
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unable to locate logging section for mod_amqp\n" );
}
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -183,6 +207,14 @@ char *amqp_util_encode(char *key, char *dest) {
return dest; return dest;
} }
void mod_amqp_util_msg_destroy(mod_amqp_message_t **msg)
{
if (!msg || !*msg) return;
switch_safe_free((*msg)->pjson);
switch_safe_free(*msg);
}
/* For Emacs: /* For Emacs:
* Local Variables: * Local Variables: