From 4114c1d59afbad4ce803ed04f09897662dc639ae Mon Sep 17 00:00:00 2001
From: Luis Azedo <luis@2600hz.com>
Date: Mon, 3 Aug 2015 13:25:28 +0100
Subject: [PATCH] FS-7806 FS-7803 #resolve

added new properties to amqp configuration
fixed - enable_fallback_format_fields usage, only worked on first event
added amqp_util_encode to fix routing key
---
 src/mod/event_handlers/mod_amqp/mod_amqp.h    |  18 ++-
 .../mod_amqp/mod_amqp_producer.c              | 142 +++++++++++-------
 .../event_handlers/mod_amqp/mod_amqp_utils.c  |  36 +++++
 3 files changed, 138 insertions(+), 58 deletions(-)

diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp.h b/src/mod/event_handlers/mod_amqp/mod_amqp.h
index f651a1a89b..56e7e6372d 100644
--- a/src/mod/event_handlers/mod_amqp/mod_amqp.h
+++ b/src/mod/event_handlers/mod_amqp/mod_amqp.h
@@ -50,10 +50,12 @@
 
 /* If you change MAX_ROUTING_KEY_FORMAT_FIELDS then you must change the implementation of makeRoutingKey where it formats the routing key using sprintf */
 #define MAX_ROUTING_KEY_FORMAT_FIELDS 10
+#define MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS 5
 #define MAX_AMQP_ROUTING_KEY_LENGTH 255
 
 #define TIME_STATS_TO_AGGREGATE 1024
 #define MOD_AMQP_DEBUG_TIMING 0
+#define MOD_AMQP_DEFAULT_CONTENT_TYPE "text/json"
 
 
 typedef struct {
@@ -74,12 +76,23 @@ typedef struct mod_amqp_connection_s {
   struct mod_amqp_connection_s *next;
 } mod_amqp_connection_t;
 
+typedef struct mod_amqp_keypart_s {
+  char *name[MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS];
+  int size;
+} mod_amqp_keypart_t;
+
 typedef struct {
   char *name;
 
   char *exchange;
   char *exchange_type;
-  char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1];
+  int exchange_durable;
+  int exchange_auto_delete;
+  int delivery_mode;
+  int delivery_timestamp;
+  char *content_type;
+  mod_amqp_keypart_t format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1];
+
   
   /* Array to store the possible event subscriptions */
   int event_subscriptions;
@@ -158,11 +171,12 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void
 /* producer */
 void mod_amqp_producer_event_handler(switch_event_t* evt);
 switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],
-					      switch_event_t* evt, char* routingKeyEventHeaderNames[]);
+					      switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[]);
 switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **profile);
 switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg);
 void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void *data);
 
+char *amqp_util_encode(char *key, char *dest);
 
 #endif /* MOD_AMQP_H */
 
diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c
index 0afc281e19..088be1e1ff 100644
--- a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c
+++ b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c
@@ -46,50 +46,30 @@ void mod_amqp_producer_msg_destroy(mod_amqp_message_t **msg)
 }
 
 switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],
-											  switch_event_t* evt, char* routingKeyEventHeaderNames[])
+											  switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[])
 {
-	int i = 0, idx = 0;
+	int i = 0, idx = 0, x = 0;
+	char keybuffer[MAX_AMQP_ROUTING_KEY_LENGTH];
 
 	for (i = 0; i < MAX_ROUTING_KEY_FORMAT_FIELDS && idx < MAX_AMQP_ROUTING_KEY_LENGTH; i++) {
-		if (routingKeyEventHeaderNames[i]) {
+		if (routingKeyEventHeaderNames[i].size) {
 			if (idx) {
 				routingKey[idx++] = '.';
 			}
-			if ( profile->enable_fallback_format_fields) {
-				int count = 0, x = 0;
-				char *argv[10];
-
-				count = switch_separate_string(routingKeyEventHeaderNames[i], '|', argv, (sizeof(argv) / sizeof(argv[0])));
-				for( x = 0; x < count; x++) {
-					if (argv[x][0] == '#') {
-						strncpy(routingKey + idx, argv[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
+			for( x = 0; x < routingKeyEventHeaderNames[i].size; x++) {
+				if (routingKeyEventHeaderNames[i].name[x][0] == '#') {
+					strncpy(routingKey + idx, routingKeyEventHeaderNames[i].name[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
+					break;
+				} else {
+					char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i].name[x]);
+					if (value) {
+						amqp_util_encode(value, keybuffer);
+						strncpy(routingKey + idx, keybuffer, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
 						break;
-					} else {
-						char *value = switch_event_get_header(evt, argv[x]);
-
-						if (!value) {
-							continue;
-						}
-
-						strncpy(routingKey + idx, value, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
-
-						/* Replace dots with underscores so that the routing key does not get corrupted */
-						switch_replace_char(routingKey + idx, '.', '_', 0);
 					}
 				}
-				idx += strlen(routingKey + idx);
-			} else {
-				if (routingKeyEventHeaderNames[i][0] == '#') {
-					strncpy(routingKey + idx, routingKeyEventHeaderNames[i] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
-				} else {
-					char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i]);
-					strncpy(routingKey + idx, value ? value : "", MAX_AMQP_ROUTING_KEY_LENGTH - idx);
-
-					/* Replace dots with underscores so that the routing key does not get corrupted */
-					switch_replace_char(routingKey + idx, '.', '_', 0);
-				}
-				idx += strlen(routingKey + idx);
 			}
+			idx += strlen(routingKey + idx);
 		}
 	}
 	return SWITCH_STATUS_SUCCESS;
@@ -194,8 +174,16 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
 	char  *argv[SWITCH_EVENT_ALL];
 	switch_xml_t params, param, connections, connection;
 	switch_threadattr_t *thd_attr = NULL;
-	char *exchange = NULL, *exchange_type = NULL;
+	char *exchange = NULL, *exchange_type = NULL, *content_type = NULL;
+	int exchange_durable = 1; /* durable */
+	int exchange_auto_delete = 0;
+	int delivery_mode = -1;
+	int delivery_timestamp = 1;
 	switch_memory_pool_t *pool;
+	char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1];
+	int format_fields_size = 0;
+
+	memset(format_fields, 0, MAX_ROUTING_KEY_FORMAT_FIELDS + 1);
 
 	if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
 		goto err;
@@ -205,12 +193,11 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
 	profile->pool = pool;
 	profile->name = switch_core_strdup(profile->pool, name);
 	profile->running = 1;
-	memset(profile->format_fields, 0, MAX_ROUTING_KEY_FORMAT_FIELDS + 1);
+	memset(profile->format_fields, 0, (MAX_ROUTING_KEY_FORMAT_FIELDS + 1) * sizeof(mod_amqp_keypart_t));
 	profile->event_ids[0] = SWITCH_EVENT_ALL;
 	profile->event_subscriptions = 1;
 	profile->conn_root   = NULL;
 	profile->conn_active = NULL;
-
 	/* Set reasonable defaults which may change if more reasonable defaults are found */
 	/* Handle defaults of non string types */
 	profile->circuit_breaker_ms = 10000;
@@ -251,24 +238,38 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
 				int interval = atoi(val);
 				if ( interval && interval > 0 ) {
 					profile->enable_fallback_format_fields = 1;
+					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp fallback format fields enabled\n");
 				}
-			} else if (!strncmp(var, "exchange_type", 13)) {
+			} else if (!strncmp(var, "exchange-type", 13)) {
 				exchange_type = switch_core_strdup(profile->pool, val);
-			} else if (!strncmp(var, "exchange", 8)) {
+			} else if (!strncmp(var, "exchange-name", 13)) {
 				exchange = switch_core_strdup(profile->pool, val);
+			} else if (!strncmp(var, "exchange-durable", 16)) {
+				exchange_durable = switch_true(val);
+			} else if (!strncmp(var, "exchange-auto-delete", 20)) {
+				exchange_auto_delete = switch_true(val);
+			} else if (!strncmp(var, "delivery-mode", 13)) {
+				delivery_mode = atoi(val);
+			} else if (!strncmp(var, "delivery-timestamp", 18)) {
+				delivery_timestamp = switch_true(val);
+			} else if (!strncmp(var, "exchange_type", 13)) {
+				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Found exchange_type parameter. please change to exchange-type\n");
+			} else if (!strncmp(var, "exchange", 8)) {
+				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Found exchange parameter. please change to exchange-name\n");
+			} else if (!strncmp(var, "content-type", 12)) {
+				content_type = switch_core_strdup(profile->pool, val);
 			} else if (!strncmp(var, "format_fields", 13)) {
-				int size = 0;
-				if ((size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {
+				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp format fields : %s\n", val);
+				if ((format_fields_size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {
 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "You can have only %d routing fields in the routing key.\n",
 									  MAX_ROUTING_KEY_FORMAT_FIELDS);
 					goto err;
 				}
 
 				/* increment size because the count returned the number of separators, not number of fields */
-				size++;
-
-				switch_separate_string(val, ',', profile->format_fields, size);
-				profile->format_fields[size] = NULL;
+				format_fields_size++;
+				switch_separate_string(val, ',', format_fields, MAX_ROUTING_KEY_FORMAT_FIELDS);
+				format_fields[format_fields_size] = NULL;
 			} else if (!strncmp(var, "event_filter", 12)) {
 				/* Parse new events */
 				profile->event_subscriptions = switch_separate_string(val, ',', argv, (sizeof(argv) / sizeof(argv[0])));
@@ -287,6 +288,28 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
 	/* Handle defaults of string types */
 	profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Events");
 	profile->exchange_type = exchange_type ? exchange_type : switch_core_strdup(profile->pool, "topic");
+	profile->exchange_durable = exchange_durable;
+	profile->exchange_auto_delete = exchange_auto_delete;
+	profile->delivery_mode = delivery_mode;
+	profile->delivery_timestamp = delivery_timestamp;
+	profile->content_type = content_type ? content_type : switch_core_strdup(profile->pool, MOD_AMQP_DEFAULT_CONTENT_TYPE);
+
+
+	for(i = 0; i < format_fields_size; i++) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : %s\n", i, format_fields[i]);
+		if(profile->enable_fallback_format_fields) {
+			profile->format_fields[i].size = switch_separate_string(format_fields[i], '|', profile->format_fields[i].name, MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS);
+			if(profile->format_fields[i].size > 1) {
+				for(arg = 0; arg < profile->format_fields[i].size; arg++) {
+					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : sub key %d : %s\n", i, arg, profile->format_fields[i].name[arg]);
+				}
+			}
+		} else {
+			profile->format_fields[i].name[0] = format_fields[i];
+			profile->format_fields[i].size = 1;
+		}
+	}
+
 
 	if ((connections = switch_xml_child(cfg, "connections")) != NULL) {
 		for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) {
@@ -317,8 +340,8 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
 	amqp_exchange_declare(profile->conn_active->state, 1,
 						  amqp_cstring_bytes(profile->exchange),
 						  amqp_cstring_bytes(profile->exchange_type),
-						  0,
-						  1,
+						  profile->exchange_durable,
+						  profile->exchange_auto_delete,
 						  amqp_empty_table);
 	
 	if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
@@ -381,18 +404,25 @@ switch_status_t mod_amqp_producer_send(mod_amqp_producer_profile_t *profile, mod
 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name);
 		return SWITCH_STATUS_NOT_INITALIZED;
 	}
+	memset(&props, 0, sizeof(amqp_basic_properties_t));
 
-	props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_TIMESTAMP_FLAG | AMQP_BASIC_HEADERS_FLAG;
-	props.content_type = amqp_cstring_bytes("text/json");
-	props.delivery_mode = 1; /* non persistent delivery mode */
-	props.timestamp = (uint64_t)time(NULL);
+	props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
+	props.content_type = amqp_cstring_bytes(profile->content_type);
 
-	props.headers.num_entries = 1;
-	props.headers.entries = messageTableEntries;
+	if(profile->delivery_mode > 0) {
+		props._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG;
+		props.delivery_mode = profile->delivery_mode;
+	}
 
-	messageTableEntries[0].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStamp");
-	messageTableEntries[0].value.kind = AMQP_FIELD_KIND_TIMESTAMP;
-	messageTableEntries[0].value.value.u64 = (uint64_t)switch_micro_time_now();
+	if(profile->delivery_timestamp) {
+		props._flags |= AMQP_BASIC_TIMESTAMP_FLAG | AMQP_BASIC_HEADERS_FLAG;
+		props.timestamp = (uint64_t)time(NULL);
+		props.headers.num_entries = 1;
+		props.headers.entries = messageTableEntries;
+		messageTableEntries[0].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStamp");
+		messageTableEntries[0].value.kind = AMQP_FIELD_KIND_TIMESTAMP;
+		messageTableEntries[0].value.value.u64 = (uint64_t)switch_micro_time_now();
+	}
 
 	status = amqp_basic_publish(
 								profile->conn_active->state,
diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c b/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c
index caa22bf21c..5c59cba1f7 100644
--- a/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c
+++ b/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c
@@ -148,6 +148,42 @@ switch_status_t mod_amqp_do_config(switch_bool_t reload)
 }
 
 
+#define KEY_SAFE(C) ((C >= 'a' && C <= 'z') || \
+					(C >= 'A' && C <= 'Z') || \
+					(C >= '0' && C <= '9') || \
+					(C == '-' || C == '~' || C == '_'))
+
+#define HI4(C) (C>>4)
+#define LO4(C) (C & 0x0F)
+
+#define hexint(C) (C < 10?('0' + C):('A'+ C - 10))
+
+char *amqp_util_encode(char *key, char *dest) {
+	char *p, *end;
+ 	if ((strlen(key) == 1) && (key[0] == '#' || key[0] == '*')) {
+ 		*dest++ = key[0];
+		*dest = '\0';
+		return dest;
+    }
+	for (p = key, end = key + strlen(key); p < end; p++) {
+		if (KEY_SAFE(*p)) {
+			*dest++ = *p;
+		} else if (*p == '.') {
+			memcpy(dest, "%2E", 3);
+			dest += 3;
+		} else if (*p == ' ') {
+			*dest++ = '+';
+		} else {
+			*dest++ = '%';
+			sprintf(dest, "%c%c", hexint(HI4(*p)), hexint(LO4(*p)));
+			dest += 2;
+		}
+	}
+	*dest = '\0';
+	return dest;
+}
+
+
 /* For Emacs:
  * Local Variables:
  * mode:c