From 4114c1d59afbad4ce803ed04f09897662dc639ae Mon Sep 17 00:00:00 2001 From: Luis Azedo Date: Mon, 3 Aug 2015 13:25:28 +0100 Subject: [PATCH] FS-7806 FS-7803 #resolve added new properties to amqp configuration fixed - enable_fallback_format_fields usage, only worked on first event added amqp_util_encode to fix routing key --- src/mod/event_handlers/mod_amqp/mod_amqp.h | 18 ++- .../mod_amqp/mod_amqp_producer.c | 142 +++++++++++------- .../event_handlers/mod_amqp/mod_amqp_utils.c | 36 +++++ 3 files changed, 138 insertions(+), 58 deletions(-) diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp.h b/src/mod/event_handlers/mod_amqp/mod_amqp.h index f651a1a89b..56e7e6372d 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp.h +++ b/src/mod/event_handlers/mod_amqp/mod_amqp.h @@ -50,10 +50,12 @@ /* If you change MAX_ROUTING_KEY_FORMAT_FIELDS then you must change the implementation of makeRoutingKey where it formats the routing key using sprintf */ #define MAX_ROUTING_KEY_FORMAT_FIELDS 10 +#define MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS 5 #define MAX_AMQP_ROUTING_KEY_LENGTH 255 #define TIME_STATS_TO_AGGREGATE 1024 #define MOD_AMQP_DEBUG_TIMING 0 +#define MOD_AMQP_DEFAULT_CONTENT_TYPE "text/json" typedef struct { @@ -74,12 +76,23 @@ typedef struct mod_amqp_connection_s { struct mod_amqp_connection_s *next; } mod_amqp_connection_t; +typedef struct mod_amqp_keypart_s { + char *name[MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS]; + int size; +} mod_amqp_keypart_t; + typedef struct { char *name; char *exchange; char *exchange_type; - char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1]; + int exchange_durable; + int exchange_auto_delete; + int delivery_mode; + int delivery_timestamp; + char *content_type; + mod_amqp_keypart_t format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1]; + /* Array to store the possible event subscriptions */ int event_subscriptions; @@ -158,11 +171,12 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void /* producer */ void mod_amqp_producer_event_handler(switch_event_t* evt); switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], - switch_event_t* evt, char* routingKeyEventHeaderNames[]); + switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[]); switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **profile); switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg); void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void *data); +char *amqp_util_encode(char *key, char *dest); #endif /* MOD_AMQP_H */ diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c index 0afc281e19..088be1e1ff 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c @@ -46,50 +46,30 @@ void mod_amqp_producer_msg_destroy(mod_amqp_message_t **msg) } switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], - switch_event_t* evt, char* routingKeyEventHeaderNames[]) + switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[]) { - int i = 0, idx = 0; + int i = 0, idx = 0, x = 0; + char keybuffer[MAX_AMQP_ROUTING_KEY_LENGTH]; for (i = 0; i < MAX_ROUTING_KEY_FORMAT_FIELDS && idx < MAX_AMQP_ROUTING_KEY_LENGTH; i++) { - if (routingKeyEventHeaderNames[i]) { + if (routingKeyEventHeaderNames[i].size) { if (idx) { routingKey[idx++] = '.'; } - if ( profile->enable_fallback_format_fields) { - int count = 0, x = 0; - char *argv[10]; - - count = switch_separate_string(routingKeyEventHeaderNames[i], '|', argv, (sizeof(argv) / sizeof(argv[0]))); - for( x = 0; x < count; x++) { - if (argv[x][0] == '#') { - strncpy(routingKey + idx, argv[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx); + for( x = 0; x < routingKeyEventHeaderNames[i].size; x++) { + if (routingKeyEventHeaderNames[i].name[x][0] == '#') { + strncpy(routingKey + idx, routingKeyEventHeaderNames[i].name[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx); + break; + } else { + char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i].name[x]); + if (value) { + amqp_util_encode(value, keybuffer); + strncpy(routingKey + idx, keybuffer, MAX_AMQP_ROUTING_KEY_LENGTH - idx); break; - } else { - char *value = switch_event_get_header(evt, argv[x]); - - if (!value) { - continue; - } - - strncpy(routingKey + idx, value, MAX_AMQP_ROUTING_KEY_LENGTH - idx); - - /* Replace dots with underscores so that the routing key does not get corrupted */ - switch_replace_char(routingKey + idx, '.', '_', 0); } } - idx += strlen(routingKey + idx); - } else { - if (routingKeyEventHeaderNames[i][0] == '#') { - strncpy(routingKey + idx, routingKeyEventHeaderNames[i] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx); - } else { - char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i]); - strncpy(routingKey + idx, value ? value : "", MAX_AMQP_ROUTING_KEY_LENGTH - idx); - - /* Replace dots with underscores so that the routing key does not get corrupted */ - switch_replace_char(routingKey + idx, '.', '_', 0); - } - idx += strlen(routingKey + idx); } + idx += strlen(routingKey + idx); } } return SWITCH_STATUS_SUCCESS; @@ -194,8 +174,16 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg) char *argv[SWITCH_EVENT_ALL]; switch_xml_t params, param, connections, connection; switch_threadattr_t *thd_attr = NULL; - char *exchange = NULL, *exchange_type = NULL; + char *exchange = NULL, *exchange_type = NULL, *content_type = NULL; + int exchange_durable = 1; /* durable */ + int exchange_auto_delete = 0; + int delivery_mode = -1; + int delivery_timestamp = 1; switch_memory_pool_t *pool; + char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1]; + int format_fields_size = 0; + + memset(format_fields, 0, MAX_ROUTING_KEY_FORMAT_FIELDS + 1); if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { goto err; @@ -205,12 +193,11 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg) profile->pool = pool; profile->name = switch_core_strdup(profile->pool, name); profile->running = 1; - memset(profile->format_fields, 0, MAX_ROUTING_KEY_FORMAT_FIELDS + 1); + memset(profile->format_fields, 0, (MAX_ROUTING_KEY_FORMAT_FIELDS + 1) * sizeof(mod_amqp_keypart_t)); profile->event_ids[0] = SWITCH_EVENT_ALL; profile->event_subscriptions = 1; profile->conn_root = NULL; profile->conn_active = NULL; - /* Set reasonable defaults which may change if more reasonable defaults are found */ /* Handle defaults of non string types */ profile->circuit_breaker_ms = 10000; @@ -251,24 +238,38 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg) int interval = atoi(val); if ( interval && interval > 0 ) { profile->enable_fallback_format_fields = 1; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp fallback format fields enabled\n"); } - } else if (!strncmp(var, "exchange_type", 13)) { + } else if (!strncmp(var, "exchange-type", 13)) { exchange_type = switch_core_strdup(profile->pool, val); - } else if (!strncmp(var, "exchange", 8)) { + } else if (!strncmp(var, "exchange-name", 13)) { exchange = switch_core_strdup(profile->pool, val); + } else if (!strncmp(var, "exchange-durable", 16)) { + exchange_durable = switch_true(val); + } else if (!strncmp(var, "exchange-auto-delete", 20)) { + exchange_auto_delete = switch_true(val); + } else if (!strncmp(var, "delivery-mode", 13)) { + delivery_mode = atoi(val); + } else if (!strncmp(var, "delivery-timestamp", 18)) { + delivery_timestamp = switch_true(val); + } else if (!strncmp(var, "exchange_type", 13)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Found exchange_type parameter. please change to exchange-type\n"); + } else if (!strncmp(var, "exchange", 8)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Found exchange parameter. please change to exchange-name\n"); + } else if (!strncmp(var, "content-type", 12)) { + content_type = switch_core_strdup(profile->pool, val); } else if (!strncmp(var, "format_fields", 13)) { - int size = 0; - if ((size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp format fields : %s\n", val); + if ((format_fields_size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "You can have only %d routing fields in the routing key.\n", MAX_ROUTING_KEY_FORMAT_FIELDS); goto err; } /* increment size because the count returned the number of separators, not number of fields */ - size++; - - switch_separate_string(val, ',', profile->format_fields, size); - profile->format_fields[size] = NULL; + format_fields_size++; + switch_separate_string(val, ',', format_fields, MAX_ROUTING_KEY_FORMAT_FIELDS); + format_fields[format_fields_size] = NULL; } else if (!strncmp(var, "event_filter", 12)) { /* Parse new events */ profile->event_subscriptions = switch_separate_string(val, ',', argv, (sizeof(argv) / sizeof(argv[0]))); @@ -287,6 +288,28 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg) /* Handle defaults of string types */ profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Events"); profile->exchange_type = exchange_type ? exchange_type : switch_core_strdup(profile->pool, "topic"); + profile->exchange_durable = exchange_durable; + profile->exchange_auto_delete = exchange_auto_delete; + profile->delivery_mode = delivery_mode; + profile->delivery_timestamp = delivery_timestamp; + profile->content_type = content_type ? content_type : switch_core_strdup(profile->pool, MOD_AMQP_DEFAULT_CONTENT_TYPE); + + + for(i = 0; i < format_fields_size; i++) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : %s\n", i, format_fields[i]); + if(profile->enable_fallback_format_fields) { + profile->format_fields[i].size = switch_separate_string(format_fields[i], '|', profile->format_fields[i].name, MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS); + if(profile->format_fields[i].size > 1) { + for(arg = 0; arg < profile->format_fields[i].size; arg++) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : sub key %d : %s\n", i, arg, profile->format_fields[i].name[arg]); + } + } + } else { + profile->format_fields[i].name[0] = format_fields[i]; + profile->format_fields[i].size = 1; + } + } + if ((connections = switch_xml_child(cfg, "connections")) != NULL) { for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) { @@ -317,8 +340,8 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg) amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange_type), - 0, - 1, + profile->exchange_durable, + profile->exchange_auto_delete, amqp_empty_table); if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { @@ -381,18 +404,25 @@ switch_status_t mod_amqp_producer_send(mod_amqp_producer_profile_t *profile, mod switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name); return SWITCH_STATUS_NOT_INITALIZED; } + memset(&props, 0, sizeof(amqp_basic_properties_t)); - props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_TIMESTAMP_FLAG | AMQP_BASIC_HEADERS_FLAG; - props.content_type = amqp_cstring_bytes("text/json"); - props.delivery_mode = 1; /* non persistent delivery mode */ - props.timestamp = (uint64_t)time(NULL); + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG; + props.content_type = amqp_cstring_bytes(profile->content_type); - props.headers.num_entries = 1; - props.headers.entries = messageTableEntries; + if(profile->delivery_mode > 0) { + props._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG; + props.delivery_mode = profile->delivery_mode; + } - messageTableEntries[0].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStamp"); - messageTableEntries[0].value.kind = AMQP_FIELD_KIND_TIMESTAMP; - messageTableEntries[0].value.value.u64 = (uint64_t)switch_micro_time_now(); + if(profile->delivery_timestamp) { + props._flags |= AMQP_BASIC_TIMESTAMP_FLAG | AMQP_BASIC_HEADERS_FLAG; + props.timestamp = (uint64_t)time(NULL); + props.headers.num_entries = 1; + props.headers.entries = messageTableEntries; + messageTableEntries[0].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStamp"); + messageTableEntries[0].value.kind = AMQP_FIELD_KIND_TIMESTAMP; + messageTableEntries[0].value.value.u64 = (uint64_t)switch_micro_time_now(); + } status = amqp_basic_publish( profile->conn_active->state, diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c b/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c index caa22bf21c..5c59cba1f7 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c @@ -148,6 +148,42 @@ switch_status_t mod_amqp_do_config(switch_bool_t reload) } +#define KEY_SAFE(C) ((C >= 'a' && C <= 'z') || \ + (C >= 'A' && C <= 'Z') || \ + (C >= '0' && C <= '9') || \ + (C == '-' || C == '~' || C == '_')) + +#define HI4(C) (C>>4) +#define LO4(C) (C & 0x0F) + +#define hexint(C) (C < 10?('0' + C):('A'+ C - 10)) + +char *amqp_util_encode(char *key, char *dest) { + char *p, *end; + if ((strlen(key) == 1) && (key[0] == '#' || key[0] == '*')) { + *dest++ = key[0]; + *dest = '\0'; + return dest; + } + for (p = key, end = key + strlen(key); p < end; p++) { + if (KEY_SAFE(*p)) { + *dest++ = *p; + } else if (*p == '.') { + memcpy(dest, "%2E", 3); + dest += 3; + } else if (*p == ' ') { + *dest++ = '+'; + } else { + *dest++ = '%'; + sprintf(dest, "%c%c", hexint(HI4(*p)), hexint(LO4(*p))); + dest += 2; + } + } + *dest = '\0'; + return dest; +} + + /* For Emacs: * Local Variables: * mode:c