Adding mod_amqp as an event_handler module

AMQP(Advanced Message Queueing Protocol) is an event bus protocol
with more info here http://www.amqp.org/about/what

mod_amqp implements the client side of the amqp protocol. Currently
the module supports two separate, but not mutually exclusive usages:

1. Sending a filtered whitelist of events to an AMQP server. The
before the events are sent an amqp routing header is generated so that
downstream subscriptions can subscribe using filters. If the
connection to the remote server is interrupted, the module will
attempt to reconnect.

2. Subscribing to a 'control' channel on the remote AMQP server. When
events are received on this channel if they match a configured filter,
the commands are run locally as api commands. If the filter for the
events being sent to the remote AMQP server include SWITCH_EVENT_API
then the results of the api commands will be sent back to the AMQP
server.

FS-7060 #resolve
Thanks-to: Daniel Bryars <danb@aeriandi.com> and Tim Brown <tim.brown@aeriandi.com>
This commit is contained in:
William King 2014-11-20 14:02:40 -08:00
parent c6ef0558ff
commit 1a96f23f27
11 changed files with 1701 additions and 0 deletions

View File

@ -94,6 +94,7 @@ endpoints/mod_skinny
#endpoints/mod_skypopen #endpoints/mod_skypopen
endpoints/mod_sofia endpoints/mod_sofia
#endpoints/mod_unicall #endpoints/mod_unicall
#event_handlers/mod_amqp
event_handlers/mod_cdr_csv event_handlers/mod_cdr_csv
#event_handlers/mod_cdr_mongodb #event_handlers/mod_cdr_mongodb
#event_handlers/mod_cdr_pg_csv #event_handlers/mod_cdr_pg_csv

View File

@ -0,0 +1,56 @@
<configuration name="amqp.conf" description="mod_amqp">
<producers>
<profile name="default">
<connections>
<connection name="primary">
<param name="hostname" value="localhost"/>
<param name="virtualhost" value="/"/>
<param name="username" value="guest"/>
<param name="password" value="guest"/>
<param name="port" value="5673"/>
<param name="heartbeat" value="0"/>
</connection>
<connection name="secondary">
<param name="hostname" value="localhost"/>
<param name="virtualhost" value="/"/>
<param name="username" value="guest"/>
<param name="password" value="guest"/>
<param name="port" value="5672"/>
<param name="heartbeat" value="0"/>
</connection>
</connections>
<params>
<param name="exchange" value="TAP.Events"/>
<param name="exchange_type" value="topic"/>
<param name="circuit_breaker_ms" value="10000"/>
<param name="reconnect_interval_ms" value="1000"/>
<param name="send_queue_size" value="5000"/>
<!-- The routing key is made from the format string, using the header values in the event specified in the format_fields.-->
<!-- Fields that are prefixed with a # are treated as literals rather than doing a header lookup -->
<param name="format_fields" value="#FreeSWITCH,FreeSWITCH-Hostname,Event-Name,Event-Subclass,Unique-ID"/>
<!-- <param name="eventFilter" value="SWITCH_EVENT_ALL"/> -->
<param name="event_filter" value="SWITCH_EVENT_CHANNEL_CREATE,SWITCH_EVENT_CHANNEL_DESTROY,SWITCH_EVENT_HEARTBEAT,SWITCH_EVENT_DTMF"/>
</params>
</profile>
</producers>
<commands>
<profile name="default">
<connections>
<connection name="primary">
<param name="hostname" value="localhost"/>
<param name="virtualhost" value="/"/>
<param name="username" value="guest"/>
<param name="password" value="guest"/>
<param name="port" value="5672"/>
<param name="heartbeat" value="0"/>
</connection>
</connections>
<params>
<param name="eventExchange" value="TAP.Events"/>
<param name="eventExchangetype" value="topic"/>
</params>
</profile>
</commands>
</configuration>

View File

@ -1227,6 +1227,10 @@ PKG_CHECK_MODULES([MEMCACHED], [libmemcached >= 0.31],[
AM_CONDITIONAL([HAVE_MEMCACHED],[false]) AM_CONDITIONAL([HAVE_MEMCACHED],[false])
]) ])
PKG_CHECK_MODULES([AMQP], [librabbitmq >= 0.5.2],[
AM_CONDITIONAL([HAVE_AMQP],[true])],[
AC_MSG_RESULT([no]); AM_CONDITIONAL([HAVE_AMQP],[false])])
AC_ARG_ENABLE(core-libedit-support, AC_ARG_ENABLE(core-libedit-support,
[AS_HELP_STRING([--disable-core-libedit-support], [Compile without libedit Support])]) [AS_HELP_STRING([--disable-core-libedit-support], [Compile without libedit Support])])
@ -1572,6 +1576,7 @@ AC_CONFIG_FILES([Makefile
src/mod/endpoints/mod_unicall/Makefile src/mod/endpoints/mod_unicall/Makefile
src/mod/endpoints/mod_rtc/Makefile src/mod/endpoints/mod_rtc/Makefile
src/mod/endpoints/mod_verto/Makefile src/mod/endpoints/mod_verto/Makefile
src/mod/event_handlers/mod_amqp/Makefile
src/mod/event_handlers/mod_cdr_csv/Makefile src/mod/event_handlers/mod_cdr_csv/Makefile
src/mod/event_handlers/mod_cdr_mongodb/Makefile src/mod/event_handlers/mod_cdr_mongodb/Makefile
src/mod/event_handlers/mod_cdr_pg_csv/Makefile src/mod/event_handlers/mod_cdr_pg_csv/Makefile

View File

@ -0,0 +1,17 @@
include $(top_srcdir)/build/modmake.rulesam
MODNAME=mod_amqp
if HAVE_AMQP
mod_LTLIBRARIES = mod_amqp.la
mod_amqp_la_SOURCES = mod_amqp_utils.c mod_amqp_connection.c mod_amqp_producer.c mod_amqp_command.c mod_amqp.c
mod_amqp_la_CFLAGS = $(AM_CFLAGS) $(AMQP_CFLAGS)
mod_amqp_la_LIBADD = $(switch_builddir)/libfreeswitch.la
mod_amqp_la_LDFLAGS = -avoid-version -module -no-undefined -shared $(AMQP_LIBS) $(SWITCH_AM_LDFLAGS)
else
install: error
all: error
error:
$(error You must install librabbitmq1 and librabbitmq-dev to build this module)
endif

View File

@ -0,0 +1,121 @@
_
_ __ ___ ___ __| | __ _ _ __ ___ __ _ _ __
| '_ ` _ \ / _ \ / _` | / _` | '_ ` _ \ / _` | '_ \
| | | | | | (_) | (_| | | (_| | | | | | | (_| | |_) |
|_| |_| |_|\___/ \__,_|___\__,_|_| |_| |_|\__, | .__/
|_____| |_|_| by Aeriandi
Contents
--------
1. Features
2. How to build and install
3. Configuration
4. Usage
5. Trouleshooting
6. Notes
1. Features
-----------
* Authenticates with an AMQP broker such as RabbitMQ.
* If the broker disconnects, the connection is retried.
* Routing keys can include values from Freeswitch message headers.
* The rate of messages pulished can be limited by filtering event types.
* Messages are sent asynchronously so as not to block the Freeswitch core.
* Pulishing can be temporarily suspended on the event of "back pressure" from the AMQP broker.
2. How to build and install
---------------------------
Requires librabbitmq1 to build. Debian Jessie comes with the correct version.
3. Configuration
----------------
All configuration is done within the amqp.conf.xml file located in the freeswitch/autoload_configs folder, which is usually in /etc/ for linux based machines.
The file is of the format:
<configuration name="amqp.conf" description="mod_amqp">
<settings>
<param name="parameter1" value="value1"/>
<param name="parameter2" value="value2"/>
...etc.
</settings>
</configuration>
Available parameters are as follows:
+------------------------------+-----------------------------------+
| name | default value (units) |
|------------------------------+-----------------------------------|
| amqpHostnames | localhost |
| amqpVirtualHost | / |
| amqpPort | 5672 |
| amqpUsername | guest |
| amqpPassword | guest |
| amqpHeartbeatSeconds | 0 (s) |
| eventExchange | TAP.Events |
| eventExchangetype | topic |
| eventRoutingKeyFormat | %s.%s.%s.%s |
| eventRoutingKeyFormatFields | FreeSWITCH-Hostname,Event-Name, |
| | Event-Subclass,Unique-ID |
| eventFilter | SWITCH_EVENT_CHANNEL_CREATE, |
| | SWITCH_EVENT_CHANNEL_DESTROY, |
| | SWITCH_EVENT_HEARTBEAT, |
| | SWITCH_EVENT_DTMF |
| commandExchange | TAP.Commands |
| commandExchangeType | topic |
| commandBindingKey | TapCommands |
| amqpSendQueueSize | 500 (events) |
| amqpCircuitBreakerTimeout | 10000 (ms) |
| amqpReconnectInterval | 1000 (ms) |
+------------------------------+-----------------------------------+
Set the amqpHostname and amqpPort to point to the AMQP broker, and set valid login credentials using amqpUsername and amqpPassword.
The routing key is made from the eventRoutingKeyFormat format string using the freeswitch event header values specified in the eventRoutingKeyFormatFields. See the manpage printf(1) for more information about format strings. The numer of percent marks in the format string must match the number of comma-separated header names in the format fields string.
mod_amqp has an internal buffer for events so that it can send them asynchronously and also cope with the connection going down for a short amount of time. The size of this buffer is set by amqpSendQueueSize. If this buffer ever becomes full, then mod_amqp will drop event messages for the period of time specified by amqpCircuitBreakerTimeout (in milliseconds).
If the connection to the AMQP broker is severed, mod_amqp will attempt to reconnect regularly according to the amqpReconnectInterval (in milliseconds). It will cycle through the hostnames provided in amqpHostnames.
The eventFilter parameter specifies which events will be sent to the AMQP broker, a full list of available options can be found in src/include/switch_types.h. The special event name SWITCH_EVENT_ALL causes all events to be sent, effectively disabling the filter.
4. Usage
--------
Usually, mod_amqp will be loaded automatically when Freeswitch starts. To establish whether the module has been loaded, you can execute "module_exists mod_amqp" in fs_cli.
If the module is not set to load with Freeswitch, it can be loaded on he fly by executing "load mod_amqp" from fs_cli. You'll see a few lines of status messages as the module loads and tries to connect to the AMQP roker. Conversely, the module can be unloaded using "unload mod_amqp".
To effect new settings having edited the config file, the module should be unloaded then loaded again.
5. Trouleshooting
-----------------
Any errors or warnings will be reported using Freeswitch logging, so check for errors using fs_cli with the loglevel is set to be sufficiently verbose, or with your selected logging module; for example, the syslog logger.
Typically, messages not being received by the AMQP broker is due to network connectivity or failed authentication with the broker.
If mod_amqp experiences back-pressure from the AMQP broker, its internal buffer of events to send fills up. When this buffer is half full, warning messages are logged, and when the queue is completely full the circuit breaker will be triggered, logging an error and dropping events for the predefined amount of time.
6. Notes
--------
The SHA for the revision of librabbitmq-c1 that is included is 1c213703c9fdd747bc71ea4f64943c3b4269f8cf.

View File

@ -0,0 +1,115 @@
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthm@freeswitch.org>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Based on mod_skel by
* Anthony Minessale II <anthm@freeswitch.org>
*
* Contributor(s):
*
* Daniel Bryars <danb@aeriandi.com>
* Tim Brown <tim.brown@aeriandi.com>
* Anthony Minessale II <anthm@freeswitch.org>
* William King <william.king@quentustech.com>
* Mike Jerris <mike@jerris.com>
*
* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
*
*/
#include "mod_amqp.h"
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_amqp_shutdown);
SWITCH_MODULE_LOAD_FUNCTION(mod_amqp_load);
SWITCH_MODULE_DEFINITION(mod_amqp, mod_amqp_load, mod_amqp_shutdown, NULL);
SWITCH_STANDARD_API(amqp_reload)
{
return mod_amqp_do_config(SWITCH_TRUE);
}
/* ------------------------------
Startup
------------------------------
*/
SWITCH_MODULE_LOAD_FUNCTION(mod_amqp_load)
{
switch_api_interface_t *api_interface;
memset(&globals, 0, sizeof(globals));
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
globals.pool = pool;
switch_core_hash_init(&(globals.producer_hash));
switch_core_hash_init(&(globals.command_hash));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_apqp loading: Version %s\n", switch_version_full());
/* Create producer profiles */
if ( mod_amqp_do_config(SWITCH_FALSE) != SWITCH_STATUS_SUCCESS ){
return SWITCH_STATUS_GENERR;
}
SWITCH_ADD_API(api_interface, "amqp", "amqp API", amqp_reload, "syntax");
return SWITCH_STATUS_SUCCESS;
}
/* ------------------------------
Shutdown
------------------------------
*/
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_amqp_shutdown)
{
switch_hash_index_t *hi;
mod_amqp_producer_profile_t *producer;
mod_amqp_command_profile_t *command;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Mod starting shutting down\n");
switch_event_unbind_callback(mod_amqp_producer_event_handler);
for (hi = switch_core_hash_first(globals.producer_hash); hi; hi = switch_core_hash_next(&hi)) {
switch_core_hash_this(hi, NULL, NULL, (void **)&producer);
mod_amqp_producer_destroy(&producer);
}
for (hi = switch_core_hash_first(globals.command_hash); hi; hi = switch_core_hash_next(&hi)) {
switch_core_hash_this(hi, NULL, NULL, (void **)&command);
mod_amqp_command_destroy(&command);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Mod finished shutting down\n");
return SWITCH_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4
*/

View File

@ -0,0 +1,173 @@
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthm@freeswitch.org>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Based on mod_skel by
* Anthony Minessale II <anthm@freeswitch.org>
*
* Contributor(s):
*
* Daniel Bryars <danb@aeriandi.com>
* Tim Brown <tim.brown@aeriandi.com>
* Anthony Minessale II <anthm@freeswitch.org>
* William King <william.king@quentustech.com>
* Mike Jerris <mike@jerris.com>
*
* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
*
*/
#ifndef MOD_AMQP_H
#define MOD_AMQP_H
#include <switch.h>
#include <amqp.h>
#include <amqp_framing.h>
#include <amqp_tcp_socket.h>
#include <strings.h>
#define MAX_LOG_MESSAGE_SIZE 1024
#define AMQP_MAX_HOSTS 4
/* If you change MAX_ROUTING_KEY_FORMAT_FIELDS then you must change the implementation of makeRoutingKey where it formats the routing key using sprintf */
#define MAX_ROUTING_KEY_FORMAT_FIELDS 10
#define MAX_AMQP_ROUTING_KEY_LENGTH 255
#define TIME_STATS_TO_AGGREGATE 1024
#define MOD_AMQP_DEBUG_TIMING 0
typedef struct {
char routing_key[MAX_AMQP_ROUTING_KEY_LENGTH];
char *pjson;
} mod_amqp_message_t;
typedef struct mod_amqp_connection_s {
char *name;
char *hostname;
char *virtualhost;
char *username;
char *password;
unsigned int port;
unsigned int heartbeat; /* in seconds */
amqp_connection_state_t state;
struct mod_amqp_connection_s *next;
} mod_amqp_connection_t;
typedef struct {
char *name;
char *exchange;
char *exchange_type;
char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1];
/* Array to store the possible event subscriptions */
int event_subscriptions;
switch_event_node_t *event_nodes[SWITCH_EVENT_ALL];
switch_event_types_t event_ids[SWITCH_EVENT_ALL];
switch_event_node_t *eventNode;
/* Because only the 'running' thread will be reading or writing to the two connection pointers
* this does not 'yet' need a read/write lock. Before these structures can be destroyed,
* the running thread must be joined first.
*/
mod_amqp_connection_t *conn_root;
mod_amqp_connection_t *conn_active;
/* Rabbit connections are not thread safe so one connection per thread.
Communicate with sender thread using a queue */
switch_thread_t *producer_thread;
switch_queue_t *send_queue;
unsigned int send_queue_size;
int reconnect_interval_ms;
int circuit_breaker_ms;
switch_time_t circuit_breaker_reset_time;
switch_bool_t running;
switch_memory_pool_t *pool;
char *custom_attr;
} mod_amqp_producer_profile_t;
typedef struct {
char *name;
char *exchange;
char *exchange_type;
char *binding_key;
/* Array to store the possible event subscriptions */
char *event_filter;
unsigned int number_of_event_filters;
switch_event_node_t *event_nodes[SWITCH_EVENT_ALL];
switch_event_types_t event_ids[SWITCH_EVENT_ALL];
/* Note: The AMQP channel is not reentrant this MUTEX serializes sending events. */
mod_amqp_connection_t *conn_root;
mod_amqp_connection_t *conn_active;
int reconnect_interval_ms;
/* Listener thread */
switch_thread_t *command_thread;
switch_mutex_t *mutex;
switch_bool_t running;
switch_memory_pool_t *pool;
char *custom_attr;
} mod_amqp_command_profile_t;
struct {
switch_memory_pool_t *pool;
switch_hash_t *producer_hash;
switch_hash_t *command_hash;
} globals;
/* utils */
switch_status_t mod_amqp_do_config(switch_bool_t reload);
int mod_amqp_log_if_amqp_error(amqp_rpc_reply_t x, char const *context);
int mod_amqp_count_chars(const char* string, char ch);
/* connection */
switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_xml_t cfg, switch_memory_pool_t *pool);
void mod_amqp_connection_destroy(mod_amqp_connection_t **conn);
void mod_amqp_connection_close(mod_amqp_connection_t *connection);
switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod_amqp_connection_t **active, char *profile_name, char *custom_attr);
/* command */
switch_status_t mod_amqp_command_destroy(mod_amqp_command_profile_t **profile);
switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg);
void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void *data);
/* producer */
void mod_amqp_producer_event_handler(switch_event_t* evt);
switch_status_t mod_amqp_producer_routing_key(char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],switch_event_t* evt, char* routingKeyEventHeaderNames[]);
switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **profile);
switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg);
void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void *data);
#endif /* MOD_AMQP_H */

View File

@ -0,0 +1,323 @@
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthm@freeswitch.org>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Based on mod_skel by
* Anthony Minessale II <anthm@freeswitch.org>
*
* Contributor(s):
*
* Daniel Bryars <danb@aeriandi.com>
* Tim Brown <tim.brown@aeriandi.com>
* Anthony Minessale II <anthm@freeswitch.org>
* William King <william.king@quentustech.com>
* Mike Jerris <mike@jerris.com>
*
* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
*
*/
#include "mod_amqp.h"
switch_status_t mod_amqp_command_destroy(mod_amqp_command_profile_t **prof)
{
switch_status_t ret;
mod_amqp_connection_t *conn = NULL, *conn_next = NULL;
switch_memory_pool_t *pool;
mod_amqp_command_profile_t *profile;
if (!prof || !*prof) {
return SWITCH_STATUS_SUCCESS;
}
profile = *prof;
pool = profile->pool;
if (profile->name) {
switch_core_hash_delete(globals.command_hash, profile->name);
}
profile->running = 0;
if (profile->command_thread) {
switch_thread_join(&ret, profile->command_thread);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Profile[%s] closing AMQP socket...\n", profile->name);
for (conn = profile->conn_root; conn; conn = conn_next) {
mod_amqp_connection_destroy(&conn);
}
profile->conn_active = NULL;
profile->conn_root = NULL;
if (pool) {
switch_core_destroy_memory_pool(&pool);
}
*prof = NULL;
return SWITCH_STATUS_SUCCESS;
}
switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg)
{
mod_amqp_command_profile_t *profile = NULL;
switch_threadattr_t *thd_attr = NULL;
switch_memory_pool_t *pool;
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
goto err;
}
profile = switch_core_alloc(pool, sizeof(mod_amqp_command_profile_t));
profile->pool = pool;
profile->name = switch_core_strdup(profile->pool, name);
profile->running = 1;
/* Start the worker threads */
switch_threadattr_create(&thd_attr, profile->pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
if (switch_thread_create(&profile->command_thread, thd_attr, mod_amqp_command_thread, profile, profile->pool)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create 'amqp event sender' thread!\n");
goto err;
}
if ( switch_core_hash_insert(globals.command_hash, name, (void *) profile) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to insert new profile [%s] into mod_amqp profile hash\n", name);
goto err;
}
return SWITCH_STATUS_SUCCESS;
err:
/* Cleanup */
mod_amqp_command_destroy(&profile);
return SWITCH_STATUS_GENERR;
}
void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void *data)
{
mod_amqp_command_profile_t *profile = (mod_amqp_command_profile_t *) data;
while (profile->running) {
amqp_queue_declare_ok_t *recv_queue;
amqp_bytes_t queueName = { 0, NULL };
/* Ensure we have an AMQP connection */
if (!profile->conn_active) {
switch_status_t status;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Amqp no connection- reconnecting...\n");
status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr);
if ( status != SWITCH_STATUS_SUCCESS ) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to connect with code(%d), sleeping for %dms\n",
profile->name, status, profile->reconnect_interval_ms);
switch_sleep(profile->reconnect_interval_ms * 1000);
continue;
}
/* Ensure we have a queue */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Creating command queue");
recv_queue = amqp_queue_declare(profile->conn_active->state, // state
1, // channel
amqp_empty_bytes, // queue name
0, 0, // passive, durable
0, 1, // exclusive, auto-delete
amqp_empty_table); // args
if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring queue")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to connect with code(%d), sleeping for %dms\n",
profile->name, status, profile->reconnect_interval_ms);
switch_sleep(profile->reconnect_interval_ms * 1000);
continue;
}
if (queueName.bytes) {
amqp_bytes_free(queueName);
}
queueName = amqp_bytes_malloc_dup(recv_queue->queue);
if (!queueName.bytes) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Out of memory while copying queue name");
break;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Created command queue %.*s", (int)queueName.len, (char *)queueName.bytes);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Binding command queue to exchange %s", profile->exchange);
/* Bind the queue to the exchange */
amqp_queue_bind(profile->conn_active->state, // state
1, // channel
queueName, // queue
amqp_cstring_bytes(profile->exchange), // exchange
amqp_cstring_bytes(profile->binding_key), // routing key
amqp_empty_table); // args
if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Binding queue")) {
mod_amqp_connection_close(profile->conn_active);
profile->conn_active = NULL;
switch_sleep(profile->reconnect_interval_ms * 1000);
continue;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n");
continue;
}
// Start a command
amqp_basic_consume(profile->conn_active->state, // state
1, // channel
queueName, // queue
amqp_empty_bytes, // command tag
0, 1, 0, // no_local, no_ack, exclusive
amqp_empty_table); // args
if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Creating a command")) {
mod_amqp_connection_close(profile->conn_active);
profile->conn_active = NULL;
switch_sleep(profile->reconnect_interval_ms * 1000);
continue;
}
while (profile->running && profile->conn_active) {
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
struct timeval timeout = {0};
char command[1024];
enum ECommandFormat {
COMMAND_FORMAT_UNKNOWN,
COMMAND_FORMAT_PLAINTEXT
} commandFormat = COMMAND_FORMAT_PLAINTEXT;
amqp_maybe_release_buffers(profile->conn_active->state);
timeout.tv_usec = 500 * 1000;
res = amqp_consume_message(profile->conn_active->state, &envelope, &timeout, 0);
if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
if (res.library_error == AMQP_STATUS_UNEXPECTED_STATE) {
/* Unexpected frame. Discard it then continue */
amqp_frame_t decoded_frame;
amqp_simple_wait_frame(profile->conn_active->state, &decoded_frame);
}
if (res.library_error == AMQP_STATUS_SOCKET_ERROR) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "A socket error occurred. Tearing down and reconnecting\n");
break;
}
if (res.library_error == AMQP_STATUS_CONNECTION_CLOSED) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "AMQP connection was closed. Tearing down and reconnecting\n");
break;
}
if (res.library_error == AMQP_STATUS_TCP_ERROR) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "A TCP error occurred. Tearing down and reconnecting\n");
break;
}
if (res.library_error == AMQP_STATUS_TIMEOUT) {
// nop
}
/* Try consuming again */
continue;
}
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
break;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Delivery:%u, exchange:%.*s routingkey:%.*s\n",
(unsigned) envelope.delivery_tag, (int) envelope.exchange.len, (char *) envelope.exchange.bytes,
(int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Content-type: %.*s\n",
(int) envelope.message.properties.content_type.len, (char *) envelope.message.properties.content_type.bytes);
if (strncasecmp("text/plain", envelope.message.properties.content_type.bytes, strlen("text/plain")) == 0) {
commandFormat = COMMAND_FORMAT_PLAINTEXT;
} else {
commandFormat = COMMAND_FORMAT_UNKNOWN;
}
}
if (commandFormat == COMMAND_FORMAT_PLAINTEXT) {
switch_stream_handle_t stream = { 0 }; /* Collects the command output */
/* Convert amqp bytes to c-string */
snprintf(command, sizeof(command), "%.*s", (int) envelope.message.body.len, (char *) envelope.message.body.bytes);
/* Execute the command */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Executing: %s\n", command);
SWITCH_STANDARD_STREAM(stream);
if (switch_console_execute(command, 0, &stream) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Remote command failed:\n%s\n", (char *) stream.data);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Remote command succeeded:\n%s\n", (char *) stream.data);
}
switch_safe_free(stream.data);
}
/* Tidy up */
amqp_destroy_envelope(&envelope);
}
amqp_bytes_free(queueName);
queueName.bytes = NULL;
mod_amqp_connection_close(profile->conn_active);
profile->conn_active = NULL;
if (profile->running) {
/* We'll reconnect, but sleep to avoid hammering resources */
switch_sleep(500);
}
}
/* Terminate the thread */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Command listener thread stopped\n");
switch_thread_exit(thread, SWITCH_STATUS_SUCCESS);
return NULL;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4
*/

View File

@ -0,0 +1,246 @@
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthm@freeswitch.org>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Based on mod_skel by
* Anthony Minessale II <anthm@freeswitch.org>
*
* Contributor(s):
*
* Daniel Bryars <danb@aeriandi.com>
* Tim Brown <tim.brown@aeriandi.com>
* Anthony Minessale II <anthm@freeswitch.org>
* William King <william.king@quentustech.com>
* Mike Jerris <mike@jerris.com>
*
* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
*
*/
#include "mod_amqp.h"
void mod_amqp_connection_close(mod_amqp_connection_t *connection)
{
amqp_connection_state_t old_state = connection->state;
int status = 0;
connection->state = NULL;
if (old_state != NULL) {
mod_amqp_log_if_amqp_error(amqp_channel_close(old_state, 1, AMQP_REPLY_SUCCESS), "Closing channel");
mod_amqp_log_if_amqp_error(amqp_connection_close(old_state, AMQP_REPLY_SUCCESS), "Closing connection");
if ((status = amqp_destroy_connection(old_state))) {
const char *errstr = amqp_error_string2(-status);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error destroying amqp connection: %s\n", errstr);
}
}
}
switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod_amqp_connection_t **active, char *profile_name, char *custom_attr)
{
int channel_max = 0;
int frame_max = 131072;
amqp_table_t loginProperties;
amqp_table_entry_t loginTableEntries[5];
char hostname[64];
int bHasHostname;
char key_string[256] = {0};
amqp_rpc_reply_t status;
amqp_socket_t *socket = NULL;
int amqp_status = -1;
mod_amqp_connection_t *connection_attempt = NULL;
amqp_connection_state_t newConnection = amqp_new_connection();
amqp_connection_state_t oldConnection = NULL;
if (active && *active) {
oldConnection = (*active)->state;
}
/* Set up meta data for connection */
bHasHostname = gethostname(hostname, sizeof(hostname)) == 0;
loginProperties.num_entries = sizeof(loginTableEntries)/sizeof(*loginTableEntries);
loginProperties.entries = loginTableEntries;
snprintf(key_string, 256, "x_%s_HostMachineName", custom_attr);
loginTableEntries[0].key = amqp_cstring_bytes(key_string);
loginTableEntries[0].value.kind = AMQP_FIELD_KIND_BYTES;
loginTableEntries[0].value.value.bytes = amqp_cstring_bytes(bHasHostname ? hostname : "(unknown)");
snprintf(key_string, 256, "x_%s_ProcessDescription", custom_attr);
loginTableEntries[1].key = amqp_cstring_bytes(key_string);
loginTableEntries[1].value.kind = AMQP_FIELD_KIND_BYTES;
loginTableEntries[1].value.value.bytes = amqp_cstring_bytes("FreeSwitch");
snprintf(key_string, 256, "x_%s_ProcessType", custom_attr);
loginTableEntries[2].key = amqp_cstring_bytes(key_string);
loginTableEntries[2].value.kind = AMQP_FIELD_KIND_BYTES;
loginTableEntries[2].value.value.bytes = amqp_cstring_bytes("TAP");
snprintf(key_string, 256, "x_%s_ProcessBuildVersion", custom_attr);
loginTableEntries[3].key = amqp_cstring_bytes(key_string);
loginTableEntries[3].value.kind = AMQP_FIELD_KIND_BYTES;
loginTableEntries[3].value.value.bytes = amqp_cstring_bytes(switch_version_full());
snprintf(key_string, 256, "x_%s_Liquid_ProcessBuildBornOn", custom_attr);
loginTableEntries[4].key = amqp_cstring_bytes(key_string);
loginTableEntries[4].value.kind = AMQP_FIELD_KIND_BYTES;
loginTableEntries[4].value.value.bytes = amqp_cstring_bytes(__DATE__ " " __TIME__);
if (!(socket = amqp_tcp_socket_new(newConnection))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not create TCP socket\n");
return SWITCH_STATUS_GENERR;
}
connection_attempt = connections;
amqp_status = -1;
while (connection_attempt && amqp_status){
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Profile[%s] trying to connect to AMQP broker %s:%d\n",
profile_name, connection_attempt->hostname, connection_attempt->port);
if ((amqp_status = amqp_socket_open(socket, connection_attempt->hostname, connection_attempt->port))){
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Could not open socket connection to AMQP broker %s:%d status(%d) %s\n",
connection_attempt->hostname, connection_attempt->port, amqp_status, amqp_error_string2(amqp_status));
connection_attempt = connection_attempt->next;
}
}
if (!connection_attempt) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] could not connect to any AMQP brokers\n", profile_name);
return SWITCH_STATUS_GENERR;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Profile[%s] opened socket connection to AMQP broker %s:%d\n",
profile_name, connection_attempt->hostname, connection_attempt->port);
*active = connection_attempt;
/* We have a connection, now log in */
status = amqp_login_with_properties(newConnection,
connection_attempt->virtualhost,
channel_max,
frame_max,
connection_attempt->heartbeat,
&loginProperties,
AMQP_SASL_METHOD_PLAIN,
connection_attempt->username,
connection_attempt->password);
if (mod_amqp_log_if_amqp_error(status, "Logging in")) {
return SWITCH_STATUS_GENERR;
}
// Open a channel (1). This is fairly standard
amqp_channel_open(newConnection, 1);
if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(newConnection), "Opening channel")) {
return SWITCH_STATUS_GENERR;
}
(*active)->state = newConnection;
if (oldConnection) {
amqp_destroy_connection(oldConnection);
}
return SWITCH_STATUS_SUCCESS;
}
switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_xml_t cfg, switch_memory_pool_t *pool)
{
mod_amqp_connection_t *new_con = switch_core_alloc(pool, sizeof(mod_amqp_connection_t));
switch_xml_t param;
char *name = (char *) switch_xml_attr_soft(cfg, "name");
char *hostname = NULL, *virtualhost = NULL, *username = NULL, *password = NULL;
unsigned int port = 0, heartbeat = 0;
if (zstr(name)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection missing name attribute\n%s\n", switch_xml_toxml(cfg, 1));
return SWITCH_STATUS_GENERR;
}
new_con->name = switch_core_strdup(pool, name);
new_con->state = NULL;
for (param = switch_xml_child(cfg, "param"); param; param = param->next) {
char *var = (char *) switch_xml_attr_soft(param, "name");
char *val = (char *) switch_xml_attr_soft(param, "value");
if (!var) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "AMQP connection[%s] param missing 'name' attribute\n", name);
continue;
}
if (!val) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "AMQP connection[%s] param[%s] missing 'value' attribute\n", name, var);
continue;
}
if (!strncmp(var, "hostname", 8)) {
hostname = switch_core_strdup(pool, val);
} else if (!strncmp(var, "virtualhost", 11)) {
virtualhost = switch_core_strdup(pool, val);
} else if (!strncmp(var, "username", 8)) {
username = switch_core_strdup(pool, val);
} else if (!strncmp(var, "password", 8)) {
password = switch_core_strdup(pool, val);
} else if (!strncmp(var, "port", 4)) {
int interval = atoi(val);
if (interval && interval > 0) {
port = interval;
}
} else if (!strncmp(var, "heartbeat", 4)) {
int interval = atoi(val);
if (interval && interval > 0) {
heartbeat = interval;
}
}
}
new_con->hostname = hostname ? hostname : "localhost";
new_con->virtualhost = virtualhost ? virtualhost : "/";
new_con->username = username ? username : "guest";
new_con->password = password ? password : "guest";
new_con->port = port ? port : 5672;
new_con->heartbeat = heartbeat ? heartbeat : 0;
*conn = new_con;
return SWITCH_STATUS_SUCCESS;
}
void mod_amqp_connection_destroy(mod_amqp_connection_t **conn)
{
if (conn && *conn) {
*conn = NULL;
}
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4
*/

View File

@ -0,0 +1,484 @@
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthm@freeswitch.org>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Based on mod_skel by
* Anthony Minessale II <anthm@freeswitch.org>
*
* Contributor(s):
*
* Daniel Bryars <danb@aeriandi.com>
* Tim Brown <tim.brown@aeriandi.com>
* Anthony Minessale II <anthm@freeswitch.org>
* William King <william.king@quentustech.com>
* Mike Jerris <mike@jerris.com>
*
* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
*
*/
#include "mod_amqp.h"
void mod_amqp_producer_msg_destroy(mod_amqp_message_t **msg)
{
if (!msg || !*msg) return;
switch_safe_free((*msg)->pjson);
switch_safe_free(*msg);
}
switch_status_t mod_amqp_producer_routing_key(char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], switch_event_t* evt, char* routingKeyEventHeaderNames[])
{
int i = 0, idx = 0;
for (i = 0; i < MAX_ROUTING_KEY_FORMAT_FIELDS && idx < MAX_AMQP_ROUTING_KEY_LENGTH; i++) {
if (routingKeyEventHeaderNames[i]) {
if (idx) {
routingKey[idx++] = '.';
}
if (routingKeyEventHeaderNames[i][0] == '#') {
strncpy(routingKey + idx, routingKeyEventHeaderNames[i] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
} else {
char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i]);
strncpy(routingKey + idx, value ? value : "", MAX_AMQP_ROUTING_KEY_LENGTH - idx);
/* Replace dots with underscores so that the routing key does not get corrupted */
switch_replace_char(routingKey + idx, '.', '_', 0);
}
idx += strlen(routingKey + idx);
}
}
return SWITCH_STATUS_SUCCESS;
}
void mod_amqp_producer_event_handler(switch_event_t* evt)
{
mod_amqp_message_t *amqp_message;
mod_amqp_producer_profile_t *profile = (mod_amqp_producer_profile_t *)evt->bind_user_data;
switch_time_t now = switch_time_now();
switch_time_t reset_time;
if (!profile) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Event without a profile %p %p\n", (void *)evt, (void *)evt->event_user_data);
return;
}
/* If the mod is disabled ignore the event */
if (!profile->running) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] not running\n", profile->name);
return;
}
/* If the circuit breaker is active, ignore the event */
reset_time = profile->circuit_breaker_reset_time;
if (now < reset_time) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] circuit breaker hit[%d] (%d)\n", profile->name, (int) now, (int) reset_time);
return;
}
switch_malloc(amqp_message, sizeof(mod_amqp_message_t));
switch_event_serialize_json(evt, &amqp_message->pjson);
mod_amqp_producer_routing_key(amqp_message->routing_key, evt, profile->format_fields);
/* Queue the message to be sent by the worker thread, errors are reported only once per circuit breaker interval */
if (switch_queue_trypush(profile->send_queue, amqp_message) != SWITCH_STATUS_SUCCESS) {
unsigned int queue_size = switch_queue_size(profile->send_queue);
/* Trip the circuit breaker for a short period to stop recurring error messages (time is measured in uS) */
profile->circuit_breaker_reset_time = now + profile->circuit_breaker_ms * 1000;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "AMQP message queue full. Messages will be dropped for %.1fs! (Queue capacity %d)",
profile->circuit_breaker_ms / 1000.0, queue_size);
mod_amqp_producer_msg_destroy(&amqp_message);
}
}
switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **prof) {
mod_amqp_message_t *msg = NULL;
switch_status_t status = SWITCH_STATUS_SUCCESS;
mod_amqp_connection_t *conn = NULL, *conn_next = NULL;
switch_memory_pool_t *pool;
mod_amqp_producer_profile_t *profile;
if (!prof || !*prof) {
return SWITCH_STATUS_SUCCESS;
}
profile = *prof;
pool = profile->pool;
if (profile->name) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Profile[%s] shutting down...\n", profile->name);
switch_core_hash_delete(globals.producer_hash, profile->name);
}
profile->running = 0;
if (profile->producer_thread) {
switch_thread_join(&status, profile->producer_thread);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Profile[%s] closing AMQP socket...\n", profile->name);
for (conn = profile->conn_root; conn; conn = conn_next) {
conn_next = conn->next;
mod_amqp_connection_destroy(&conn);
}
profile->conn_active = NULL;
profile->conn_root = NULL;
while (profile->send_queue && switch_queue_trypop(profile->send_queue, (void**)&msg) == SWITCH_STATUS_SUCCESS) {
mod_amqp_producer_msg_destroy(&msg);
}
if (pool) {
switch_core_destroy_memory_pool(&pool);
}
*prof = NULL;
return SWITCH_STATUS_SUCCESS;
}
switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
{
mod_amqp_producer_profile_t *profile = NULL;
int arg = 0, i = 0;
char *argv[SWITCH_EVENT_ALL];
switch_xml_t params, param, connections, connection;
switch_threadattr_t *thd_attr = NULL;
char *exchange = NULL, *exchange_type = NULL;
switch_memory_pool_t *pool;
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
goto err;
}
profile = switch_core_alloc(pool, sizeof(mod_amqp_producer_profile_t));
profile->pool = pool;
profile->name = switch_core_strdup(profile->pool, name);
profile->running = 1;
memset(profile->format_fields, 0, MAX_ROUTING_KEY_FORMAT_FIELDS + 1);
profile->event_ids[0] = SWITCH_EVENT_ALL;
profile->event_subscriptions = 1;
profile->conn_root = NULL;
profile->conn_active = NULL;
/* Set reasonable defaults which may change if more reasonable defaults are found */
/* Handle defaults of non string types */
profile->circuit_breaker_ms = 10000;
profile->reconnect_interval_ms = 1000;
profile->send_queue_size = 5000;
if ((params = switch_xml_child(cfg, "params")) != NULL) {
for (param = switch_xml_child(params, "param"); param; param = param->next) {
char *var = (char *) switch_xml_attr_soft(param, "name");
char *val = (char *) switch_xml_attr_soft(param, "value");
if (!var) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param missing 'name' attribute\n", profile->name);
continue;
}
if (!val) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param[%s] missing 'value' attribute\n", profile->name, var);
continue;
}
if (!strncmp(var, "reconnect_interval_ms", 21)) {
int interval = atoi(val);
if ( interval && interval > 0 ) {
profile->reconnect_interval_ms = interval;
}
} else if (!strncmp(var, "circuit_breaker_ms", 18)) {
int interval = atoi(val);
if ( interval && interval > 0 ) {
profile->circuit_breaker_ms = interval;
}
} else if (!strncmp(var, "send_queue_size", 15)) {
int interval = atoi(val);
if ( interval && interval > 0 ) {
profile->send_queue_size = interval;
}
} else if (!strncmp(var, "format_fields", 13)) {
int size = 0;
if ((size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "You can have only %d routing fields in the routing key.\n",
MAX_ROUTING_KEY_FORMAT_FIELDS);
goto err;
}
/* increment size because the count returned the number of separators, not number of fields */
size++;
switch_separate_string(val, ',', profile->format_fields, size);
profile->format_fields[size] = NULL;
} else if (!strncmp(var, "event_filter", 12)) {
/* Parse new events */
profile->event_subscriptions = switch_separate_string(val, ',', argv, (sizeof(argv) / sizeof(argv[0])));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Found %d subscriptions\n", profile->event_subscriptions);
for (arg = 0; arg < profile->event_subscriptions; arg++) {
if (switch_name_event(argv[arg], &(profile->event_ids[arg])) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "The switch event %s was not recognised.\n", argv[arg]);
}
}
}
} /* params for loop */
}
/* Handle defaults of string types */
profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Events");
profile->exchange_type = exchange_type ? exchange_type : switch_core_strdup(profile->pool, "topic");
if ((connections = switch_xml_child(cfg, "connections")) != NULL) {
for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) {
if ( ! profile->conn_root ) { /* Handle first root node */
if (mod_amqp_connection_create(&(profile->conn_root), connection, profile->pool) != SWITCH_STATUS_SUCCESS) {
/* Handle connection create failure */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name);
continue;
}
profile->conn_active = profile->conn_root;
} else {
if (mod_amqp_connection_create(&(profile->conn_active->next), connection, profile->pool) != SWITCH_STATUS_SUCCESS) {
/* Handle connection create failure */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name);
continue;
}
profile->conn_active = profile->conn_active->next;
}
}
}
profile->conn_active = NULL;
if ( mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] was unable to connect to any connection\n", profile->name);
}
/* Create a bounded FIFO queue for sending messages */
if (switch_queue_create(&(profile->send_queue), profile->send_queue_size, profile->pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create send queue of size %d!\n", profile->send_queue_size);
goto err;
}
/* Start the event send thread. This will set up the initial connection */
switch_threadattr_create(&thd_attr, profile->pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
if (switch_thread_create(&profile->producer_thread, thd_attr, mod_amqp_producer_thread, profile, profile->pool)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create 'amqp event sender' thread!\n");
goto err;
}
/* Subscribe events */
for (i = 0; i < profile->event_subscriptions; i++) {
if (switch_event_bind_removable("AMQP",
profile->event_ids[i],
SWITCH_EVENT_SUBCLASS_ANY,
mod_amqp_producer_event_handler,
profile,
&(profile->event_nodes[i])) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot bind to event handler %d!\n",(int)profile->event_ids[i]);
goto err;
}
}
if ( switch_core_hash_insert(globals.producer_hash, name, (void *) profile) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to insert new profile [%s] into mod_amqp profile hash\n", name);
goto err;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] Successfully started\n", profile->name);
return SWITCH_STATUS_SUCCESS;
err:
/* Cleanup */
mod_amqp_producer_destroy(&profile);
return SWITCH_STATUS_GENERR;
}
/* This should only be called in a single threaded context from the producer profile send thread */
switch_status_t mod_amqp_producer_send(mod_amqp_producer_profile_t *profile, mod_amqp_message_t *msg)
{
amqp_table_entry_t messageTableEntries[1];
amqp_basic_properties_t props;
int status;
if (! profile->conn_active) {
/* No connection, so we can not send the message. */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name);
return SWITCH_STATUS_NOT_INITALIZED;
}
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_TIMESTAMP_FLAG | AMQP_BASIC_HEADERS_FLAG;
props.content_type = amqp_cstring_bytes("text/json");
props.delivery_mode = 1; /* non persistent delivery mode */
props.timestamp = (uint64_t)time(NULL);
props.headers.num_entries = 1;
props.headers.entries = messageTableEntries;
messageTableEntries[0].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStamp");
messageTableEntries[0].value.kind = AMQP_FIELD_KIND_TIMESTAMP;
messageTableEntries[0].value.value.u64 = (uint64_t)switch_micro_time_now();
status = amqp_basic_publish(
profile->conn_active->state,
1,
amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(msg->routing_key),
0,
0,
&props,
amqp_cstring_bytes(msg->pjson));
if (status < 0) {
const char *errstr = amqp_error_string2(-status);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] failed to send event on connection[%s]: %s\n",
profile->name, profile->conn_active->name, errstr);
/* This is bad, we couldn't send the message. Clear up any connection */
mod_amqp_connection_close(profile->conn_active);
profile->conn_active = NULL;
return SWITCH_STATUS_SOCKERR;
}
return SWITCH_STATUS_SUCCESS;
}
void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void *data)
{
mod_amqp_message_t *msg = NULL;
switch_status_t status = SWITCH_STATUS_SUCCESS;
mod_amqp_producer_profile_t *profile = (mod_amqp_producer_profile_t *)data;
amqp_boolean_t passive = 0;
amqp_boolean_t durable = 1;
while (profile->running) {
if (!profile->conn_active) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Amqp no connection- reconnecting...\n");
status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr);
if ( status == SWITCH_STATUS_SUCCESS ) {
// Ensure that the exchange exists, and is of the correct type
amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(profile->exchange_type),
passive,
durable,
amqp_empty_table);
if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
continue;
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to connect with code(%d), sleeping for %dms\n",
profile->name, status, profile->reconnect_interval_ms);
switch_sleep(profile->reconnect_interval_ms * 1000);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n");
continue;
}
if (!msg && switch_queue_pop_timeout(profile->send_queue, (void**)&msg, 1000000) != SWITCH_STATUS_SUCCESS) {
continue;
}
if (msg) {
#ifdef MOD_AMQP_DEBUG_TIMING
long times[TIME_STATS_TO_AGGREGATE];
static unsigned int thistime = 0;
switch_time_t start = switch_time_now();
#endif
switch (mod_amqp_producer_send(profile, msg)) {
case SWITCH_STATUS_SUCCESS:
/* Success: prepare for next message */
mod_amqp_producer_msg_destroy(&msg);
break;
case SWITCH_STATUS_NOT_INITALIZED:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with 'not initialised'\n");
break;
case SWITCH_STATUS_SOCKERR:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with 'socket error'\n");
break;
default:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with a generic error\n");
/* Send failed and closed the connection; reconnect will happen at the beginning of the loop
* NB: do we need a delay here to prevent a fast reconnect-send-fail loop? */
break;
}
#ifdef MOD_AMQP_DEBUG_TIMING
times[thistime++] = switch_time_now() - start;
if (thistime >= TIME_STATS_TO_AGGREGATE) {
int i;
long min_time, max_time, avg_time;
/* Calculate aggregate times */
min_time = max_time = avg_time = times[0];
for (i = 1; i < TIME_STATS_TO_AGGREGATE; ++i) {
avg_time += times[i];
if (times[i] < min_time) min_time = times[i];
if (times[i] > max_time) max_time = times[i];
}
avg_time /= TIME_STATS_TO_AGGREGATE;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Microseconds to send last %d messages: Min %ld Max %ld Avg %ld\n",
TIME_STATS_TO_AGGREGATE, min_time, max_time, avg_time);
thistime = 0;
}
#endif
}
}
/* Abort the current message */
mod_amqp_producer_msg_destroy(&msg);
// Terminate the thread
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Event sender thread stopped\n");
switch_thread_exit(thread, SWITCH_STATUS_SUCCESS);
return NULL;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4
*/

View File

@ -0,0 +1,160 @@
/*
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
*
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
*
* The Initial Developer of the Original Code is
* Anthony Minessale II <anthm@freeswitch.org>
* Portions created by the Initial Developer are Copyright (C)
* the Initial Developer. All Rights Reserved.
*
* Based on mod_skel by
* Anthony Minessale II <anthm@freeswitch.org>
*
* Contributor(s):
*
* Daniel Bryars <danb@aeriandi.com>
* Tim Brown <tim.brown@aeriandi.com>
* Anthony Minessale II <anthm@freeswitch.org>
* William King <william.king@quentustech.com>
* Mike Jerris <mike@jerris.com>
*
* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
*
*/
#include "mod_amqp.h"
int mod_amqp_log_if_amqp_error(amqp_rpc_reply_t x, char const *context)
{
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return 0;
case AMQP_RESPONSE_NONE:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s: %s\n", context, amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s: server connection error %d, message: %.*s\n",
context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s: server channel error %d, message: %.*s\n",
context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
default:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
break;
}
break;
default:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s: unknown reply_type: %d \n", context, x.reply_type);
break;
}
return -1;
}
int mod_amqp_count_chars(const char* string, char ch)
{
int c = 0;
while (*string) c += *(string++) == ch;
return c;
}
switch_status_t mod_amqp_do_config(switch_bool_t reload)
{
switch_xml_t cfg = NULL, xml = NULL, profiles = NULL, profile = NULL;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, reload ? "Reloading Config\n" : "Loading Config\n");
if (!(xml = switch_xml_open_cfg("amqp.conf", &cfg, NULL))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "open of amqp.conf.xml failed\n");
return SWITCH_STATUS_FALSE;
}
if ((profiles = switch_xml_child(cfg, "producers"))) {
if ((profile = switch_xml_child(profiles, "profile"))) {
for (; profile; profile = profile->next) {
char *name = (char *) switch_xml_attr_soft(profile, "name");
if (zstr(name)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile. Check configs missing name attr\n");
continue;
}
if ( mod_amqp_producer_create(name, profile) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile [%s]. Check configs\n", name);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Loaded mod_amqp profile [%s] successfully\n", name);
}
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unable to locate a profile for mod_amqp\n" );
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unable to locate producers section for mod_amqp\n" );
}
if ((profiles = switch_xml_child(cfg, "commands"))) {
if ((profile = switch_xml_child(profiles, "profile"))) {
for (; profile; profile = profile->next) {
char *name = (char *) switch_xml_attr_soft(profile, "name");
if (zstr(name)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile. Check configs missing name attr\n");
continue;
}
name = switch_core_strdup(globals.pool, name);
if ( mod_amqp_command_create(name, profile) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile [%s]. Check configs\n", name);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Loaded mod_amqp profile [%s] successfully\n", name);
}
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unable to locate a profile for mod_amqp\n" );
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unable to locate commands section for mod_amqp\n" );
}
return SWITCH_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4
*/