Merge pull request #384 in FS/freeswitch from ~LAZEDO/freeswitch:feature/amqp to master

* commit '4114c1d59afbad4ce803ed04f09897662dc639ae':
  FS-7806 FS-7803 #resolve
This commit is contained in:
William King 2015-08-03 09:28:48 -05:00
commit 36b4a74844
3 changed files with 138 additions and 58 deletions

View File

@ -50,10 +50,12 @@
/* If you change MAX_ROUTING_KEY_FORMAT_FIELDS then you must change the implementation of makeRoutingKey where it formats the routing key using sprintf */
#define MAX_ROUTING_KEY_FORMAT_FIELDS 10
#define MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS 5
#define MAX_AMQP_ROUTING_KEY_LENGTH 255
#define TIME_STATS_TO_AGGREGATE 1024
#define MOD_AMQP_DEBUG_TIMING 0
#define MOD_AMQP_DEFAULT_CONTENT_TYPE "text/json"
typedef struct {
@ -74,12 +76,23 @@ typedef struct mod_amqp_connection_s {
struct mod_amqp_connection_s *next;
} mod_amqp_connection_t;
typedef struct mod_amqp_keypart_s {
char *name[MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS];
int size;
} mod_amqp_keypart_t;
typedef struct {
char *name;
char *exchange;
char *exchange_type;
char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1];
int exchange_durable;
int exchange_auto_delete;
int delivery_mode;
int delivery_timestamp;
char *content_type;
mod_amqp_keypart_t format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1];
/* Array to store the possible event subscriptions */
int event_subscriptions;
@ -158,11 +171,12 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void
/* producer */
void mod_amqp_producer_event_handler(switch_event_t* evt);
switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],
switch_event_t* evt, char* routingKeyEventHeaderNames[]);
switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[]);
switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **profile);
switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg);
void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void *data);
char *amqp_util_encode(char *key, char *dest);
#endif /* MOD_AMQP_H */

View File

@ -46,50 +46,30 @@ void mod_amqp_producer_msg_destroy(mod_amqp_message_t **msg)
}
switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],
switch_event_t* evt, char* routingKeyEventHeaderNames[])
switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[])
{
int i = 0, idx = 0;
int i = 0, idx = 0, x = 0;
char keybuffer[MAX_AMQP_ROUTING_KEY_LENGTH];
for (i = 0; i < MAX_ROUTING_KEY_FORMAT_FIELDS && idx < MAX_AMQP_ROUTING_KEY_LENGTH; i++) {
if (routingKeyEventHeaderNames[i]) {
if (routingKeyEventHeaderNames[i].size) {
if (idx) {
routingKey[idx++] = '.';
}
if ( profile->enable_fallback_format_fields) {
int count = 0, x = 0;
char *argv[10];
count = switch_separate_string(routingKeyEventHeaderNames[i], '|', argv, (sizeof(argv) / sizeof(argv[0])));
for( x = 0; x < count; x++) {
if (argv[x][0] == '#') {
strncpy(routingKey + idx, argv[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
for( x = 0; x < routingKeyEventHeaderNames[i].size; x++) {
if (routingKeyEventHeaderNames[i].name[x][0] == '#') {
strncpy(routingKey + idx, routingKeyEventHeaderNames[i].name[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
break;
} else {
char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i].name[x]);
if (value) {
amqp_util_encode(value, keybuffer);
strncpy(routingKey + idx, keybuffer, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
break;
} else {
char *value = switch_event_get_header(evt, argv[x]);
if (!value) {
continue;
}
strncpy(routingKey + idx, value, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
/* Replace dots with underscores so that the routing key does not get corrupted */
switch_replace_char(routingKey + idx, '.', '_', 0);
}
}
idx += strlen(routingKey + idx);
} else {
if (routingKeyEventHeaderNames[i][0] == '#') {
strncpy(routingKey + idx, routingKeyEventHeaderNames[i] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
} else {
char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i]);
strncpy(routingKey + idx, value ? value : "", MAX_AMQP_ROUTING_KEY_LENGTH - idx);
/* Replace dots with underscores so that the routing key does not get corrupted */
switch_replace_char(routingKey + idx, '.', '_', 0);
}
idx += strlen(routingKey + idx);
}
idx += strlen(routingKey + idx);
}
}
return SWITCH_STATUS_SUCCESS;
@ -194,8 +174,16 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
char *argv[SWITCH_EVENT_ALL];
switch_xml_t params, param, connections, connection;
switch_threadattr_t *thd_attr = NULL;
char *exchange = NULL, *exchange_type = NULL;
char *exchange = NULL, *exchange_type = NULL, *content_type = NULL;
int exchange_durable = 1; /* durable */
int exchange_auto_delete = 0;
int delivery_mode = -1;
int delivery_timestamp = 1;
switch_memory_pool_t *pool;
char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1];
int format_fields_size = 0;
memset(format_fields, 0, MAX_ROUTING_KEY_FORMAT_FIELDS + 1);
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
goto err;
@ -205,12 +193,11 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
profile->pool = pool;
profile->name = switch_core_strdup(profile->pool, name);
profile->running = 1;
memset(profile->format_fields, 0, MAX_ROUTING_KEY_FORMAT_FIELDS + 1);
memset(profile->format_fields, 0, (MAX_ROUTING_KEY_FORMAT_FIELDS + 1) * sizeof(mod_amqp_keypart_t));
profile->event_ids[0] = SWITCH_EVENT_ALL;
profile->event_subscriptions = 1;
profile->conn_root = NULL;
profile->conn_active = NULL;
/* Set reasonable defaults which may change if more reasonable defaults are found */
/* Handle defaults of non string types */
profile->circuit_breaker_ms = 10000;
@ -251,24 +238,38 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
int interval = atoi(val);
if ( interval && interval > 0 ) {
profile->enable_fallback_format_fields = 1;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp fallback format fields enabled\n");
}
} else if (!strncmp(var, "exchange_type", 13)) {
} else if (!strncmp(var, "exchange-type", 13)) {
exchange_type = switch_core_strdup(profile->pool, val);
} else if (!strncmp(var, "exchange", 8)) {
} else if (!strncmp(var, "exchange-name", 13)) {
exchange = switch_core_strdup(profile->pool, val);
} else if (!strncmp(var, "exchange-durable", 16)) {
exchange_durable = switch_true(val);
} else if (!strncmp(var, "exchange-auto-delete", 20)) {
exchange_auto_delete = switch_true(val);
} else if (!strncmp(var, "delivery-mode", 13)) {
delivery_mode = atoi(val);
} else if (!strncmp(var, "delivery-timestamp", 18)) {
delivery_timestamp = switch_true(val);
} else if (!strncmp(var, "exchange_type", 13)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Found exchange_type parameter. please change to exchange-type\n");
} else if (!strncmp(var, "exchange", 8)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Found exchange parameter. please change to exchange-name\n");
} else if (!strncmp(var, "content-type", 12)) {
content_type = switch_core_strdup(profile->pool, val);
} else if (!strncmp(var, "format_fields", 13)) {
int size = 0;
if ((size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp format fields : %s\n", val);
if ((format_fields_size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "You can have only %d routing fields in the routing key.\n",
MAX_ROUTING_KEY_FORMAT_FIELDS);
goto err;
}
/* increment size because the count returned the number of separators, not number of fields */
size++;
switch_separate_string(val, ',', profile->format_fields, size);
profile->format_fields[size] = NULL;
format_fields_size++;
switch_separate_string(val, ',', format_fields, MAX_ROUTING_KEY_FORMAT_FIELDS);
format_fields[format_fields_size] = NULL;
} else if (!strncmp(var, "event_filter", 12)) {
/* Parse new events */
profile->event_subscriptions = switch_separate_string(val, ',', argv, (sizeof(argv) / sizeof(argv[0])));
@ -287,6 +288,28 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
/* Handle defaults of string types */
profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Events");
profile->exchange_type = exchange_type ? exchange_type : switch_core_strdup(profile->pool, "topic");
profile->exchange_durable = exchange_durable;
profile->exchange_auto_delete = exchange_auto_delete;
profile->delivery_mode = delivery_mode;
profile->delivery_timestamp = delivery_timestamp;
profile->content_type = content_type ? content_type : switch_core_strdup(profile->pool, MOD_AMQP_DEFAULT_CONTENT_TYPE);
for(i = 0; i < format_fields_size; i++) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : %s\n", i, format_fields[i]);
if(profile->enable_fallback_format_fields) {
profile->format_fields[i].size = switch_separate_string(format_fields[i], '|', profile->format_fields[i].name, MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS);
if(profile->format_fields[i].size > 1) {
for(arg = 0; arg < profile->format_fields[i].size; arg++) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : sub key %d : %s\n", i, arg, profile->format_fields[i].name[arg]);
}
}
} else {
profile->format_fields[i].name[0] = format_fields[i];
profile->format_fields[i].size = 1;
}
}
if ((connections = switch_xml_child(cfg, "connections")) != NULL) {
for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) {
@ -317,8 +340,8 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(profile->exchange_type),
0,
1,
profile->exchange_durable,
profile->exchange_auto_delete,
amqp_empty_table);
if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
@ -381,18 +404,25 @@ switch_status_t mod_amqp_producer_send(mod_amqp_producer_profile_t *profile, mod
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name);
return SWITCH_STATUS_NOT_INITALIZED;
}
memset(&props, 0, sizeof(amqp_basic_properties_t));
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_TIMESTAMP_FLAG | AMQP_BASIC_HEADERS_FLAG;
props.content_type = amqp_cstring_bytes("text/json");
props.delivery_mode = 1; /* non persistent delivery mode */
props.timestamp = (uint64_t)time(NULL);
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
props.content_type = amqp_cstring_bytes(profile->content_type);
props.headers.num_entries = 1;
props.headers.entries = messageTableEntries;
if(profile->delivery_mode > 0) {
props._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG;
props.delivery_mode = profile->delivery_mode;
}
messageTableEntries[0].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStamp");
messageTableEntries[0].value.kind = AMQP_FIELD_KIND_TIMESTAMP;
messageTableEntries[0].value.value.u64 = (uint64_t)switch_micro_time_now();
if(profile->delivery_timestamp) {
props._flags |= AMQP_BASIC_TIMESTAMP_FLAG | AMQP_BASIC_HEADERS_FLAG;
props.timestamp = (uint64_t)time(NULL);
props.headers.num_entries = 1;
props.headers.entries = messageTableEntries;
messageTableEntries[0].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStamp");
messageTableEntries[0].value.kind = AMQP_FIELD_KIND_TIMESTAMP;
messageTableEntries[0].value.value.u64 = (uint64_t)switch_micro_time_now();
}
status = amqp_basic_publish(
profile->conn_active->state,

View File

@ -148,6 +148,42 @@ switch_status_t mod_amqp_do_config(switch_bool_t reload)
}
#define KEY_SAFE(C) ((C >= 'a' && C <= 'z') || \
(C >= 'A' && C <= 'Z') || \
(C >= '0' && C <= '9') || \
(C == '-' || C == '~' || C == '_'))
#define HI4(C) (C>>4)
#define LO4(C) (C & 0x0F)
#define hexint(C) (C < 10?('0' + C):('A'+ C - 10))
char *amqp_util_encode(char *key, char *dest) {
char *p, *end;
if ((strlen(key) == 1) && (key[0] == '#' || key[0] == '*')) {
*dest++ = key[0];
*dest = '\0';
return dest;
}
for (p = key, end = key + strlen(key); p < end; p++) {
if (KEY_SAFE(*p)) {
*dest++ = *p;
} else if (*p == '.') {
memcpy(dest, "%2E", 3);
dest += 3;
} else if (*p == ' ') {
*dest++ = '+';
} else {
*dest++ = '%';
sprintf(dest, "%c%c", hexint(HI4(*p)), hexint(LO4(*p)));
dest += 2;
}
}
*dest = '\0';
return dest;
}
/* For Emacs:
* Local Variables:
* mode:c