From beec142c5cf27d218e7ea016872e69a27fce897b Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Wed, 21 Jul 2010 02:46:35 -0500 Subject: [PATCH] fifo up --- src/mod/applications/mod_fifo/mod_fifo.c | 340 +++++++++++++++++++---- src/switch_loadable_module.c | 19 +- 2 files changed, 293 insertions(+), 66 deletions(-) diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index 0deeff2a31..e5771b5a11 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -48,6 +48,7 @@ typedef enum { static outbound_strategy_t default_strategy = NODE_STRATEGY_RINGALL; +static int marker = 1; typedef struct { int nelm; @@ -57,6 +58,24 @@ typedef struct { switch_mutex_t *mutex; } fifo_queue_t; + + + +static int check_caller_outbound_call(const char *key); +static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause); +static void del_caller_outbound_call(const char *key); +static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cause); +static int check_consumer_outbound_call(const char *key); +static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause); +static void del_consumer_outbound_call(const char *key); +static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause); + + +static int check_bridge_call(const char *key); +static void add_bridge_call(const char *key); +static void del_bridge_call(const char *key); + + switch_status_t fifo_queue_create(fifo_queue_t **queue, int size, switch_memory_pool_t *pool) { fifo_queue_t *q; @@ -127,24 +146,34 @@ static int fifo_queue_size(fifo_queue_t *queue) static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, switch_bool_t remove) { - int i; + int i, j; switch_mutex_lock(queue->mutex); if (queue->idx == 0) { switch_mutex_unlock(queue->mutex); - *pop = NULL; return SWITCH_STATUS_FALSE; } - if (remove) { - *pop = queue->data[0]; - } else { - switch_event_dup(pop, queue->data[0]); + for (j = 0; j < queue->idx; j++) { + const char *uuid = switch_event_get_header(queue->data[j], "unique-id"); + if (uuid && !check_caller_outbound_call(uuid)) { + if (remove) { + *pop = queue->data[j]; + } else { + switch_event_dup(pop, queue->data[j]); + } + break; + } } + if (j == queue->idx) { + switch_mutex_unlock(queue->mutex); + return SWITCH_STATUS_FALSE; + } + if (remove) { - for (i = 1; i < queue->idx; i++) { + for (i = j+1; i < queue->idx; i++) { queue->data[i-1] = queue->data[i]; queue->data[i] = NULL; change_pos(queue->data[i-1], i); @@ -162,10 +191,15 @@ static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, switch_bool_t remove) { - int i, j; + int i, j, force = 0; switch_mutex_lock(queue->mutex); + if (name && *name == '+') { + name++; + force = 1; + } + if (queue->idx == 0 || zstr(name) || zstr(val)) { switch_mutex_unlock(queue->mutex); return SWITCH_STATUS_FALSE; @@ -173,7 +207,8 @@ static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *n for (j = 0; j < queue->idx; j++) { const char *j_val = switch_event_get_header(queue->data[j], name); - if (j_val && val && !strcmp(j_val, val)) { + const char *uuid = switch_event_get_header(queue->data[j], "unique-id"); + if (j_val && val && !strcmp(j_val, val) && (force || !check_caller_outbound_call(uuid))) { if (remove) { *pop = queue->data[j]; @@ -502,8 +537,12 @@ static switch_status_t consumer_read_frame_callback(switch_core_session_t *sessi } static struct { - switch_hash_t *orig_hash; - switch_mutex_t *orig_mutex; + switch_hash_t *caller_orig_hash; + switch_hash_t *consumer_orig_hash; + switch_hash_t *bridge_hash; + switch_mutex_t *caller_orig_mutex; + switch_mutex_t *consumer_orig_mutex; + switch_mutex_t *bridge_mutex; switch_hash_t *fifo_hash; switch_mutex_t *mutex; switch_mutex_t *sql_mutex; @@ -520,6 +559,115 @@ static struct { } globals; + +static int check_caller_outbound_call(const char *key) +{ + int x = 0; + + switch_mutex_lock(globals.caller_orig_mutex); + x = !!switch_core_hash_find(globals.caller_orig_hash, key); + switch_mutex_unlock(globals.caller_orig_mutex); + return x; + +} + + +static void add_caller_outbound_call(const char *key, switch_call_cause_t *cancel_cause) +{ + switch_mutex_lock(globals.caller_orig_mutex); + switch_core_hash_insert(globals.caller_orig_hash, key, cancel_cause); + switch_mutex_unlock(globals.caller_orig_mutex); +} + +static void del_caller_outbound_call(const char *key) +{ + switch_mutex_lock(globals.caller_orig_mutex); + switch_core_hash_delete(globals.caller_orig_hash, key); + switch_mutex_unlock(globals.caller_orig_mutex); +} + +static void cancel_caller_outbound_call(const char *key, switch_call_cause_t cause) +{ + switch_call_cause_t *cancel_cause = NULL; + + switch_mutex_lock(globals.caller_orig_mutex); + if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.caller_orig_hash, key))) { + *cancel_cause = cause; + } + switch_mutex_unlock(globals.caller_orig_mutex); + + fifo_caller_del(key); + +} + + + +static int check_bridge_call(const char *key) +{ + int x = 0; + + switch_mutex_lock(globals.bridge_mutex); + x = !!switch_core_hash_find(globals.bridge_hash, key); + switch_mutex_unlock(globals.bridge_mutex); + return x; + +} + + +static void add_bridge_call(const char *key) +{ + switch_mutex_lock(globals.bridge_mutex); + switch_core_hash_insert(globals.bridge_hash, key, (void *)&marker); + switch_mutex_unlock(globals.bridge_mutex); +} + +static void del_bridge_call(const char *key) +{ + switch_mutex_lock(globals.bridge_mutex); + switch_core_hash_delete(globals.bridge_hash, key); + switch_mutex_unlock(globals.bridge_mutex); +} + + +static int check_consumer_outbound_call(const char *key) +{ + int x = 0; + + switch_mutex_lock(globals.consumer_orig_mutex); + x = !!switch_core_hash_find(globals.consumer_orig_hash, key); + switch_mutex_unlock(globals.consumer_orig_mutex); + return x; + +} + +static void add_consumer_outbound_call(const char *key, switch_call_cause_t *cancel_cause) +{ + switch_mutex_lock(globals.consumer_orig_mutex); + switch_core_hash_insert(globals.consumer_orig_hash, key, cancel_cause); + switch_mutex_unlock(globals.consumer_orig_mutex); +} + +static void del_consumer_outbound_call(const char *key) +{ + switch_mutex_lock(globals.consumer_orig_mutex); + switch_core_hash_delete(globals.consumer_orig_hash, key); + switch_mutex_unlock(globals.consumer_orig_mutex); +} + +static void cancel_consumer_outbound_call(const char *key, switch_call_cause_t cause) +{ + switch_call_cause_t *cancel_cause = NULL; + + switch_mutex_lock(globals.consumer_orig_mutex); + if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.consumer_orig_hash, key))) { + *cancel_cause = cause; + } + switch_mutex_unlock(globals.consumer_orig_mutex); + +} + + + switch_cache_db_handle_t *fifo_get_db_handle(void) { switch_cache_db_connection_options_t options = { {0} }; @@ -633,7 +781,7 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool); cbt.buf = outbound_count; cbt.len = sizeof(outbound_count); - sql = switch_mprintf("select count(*) from fifo_outbound where taking_calls = 1 and fifo_name = '%q'", name); + sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name); fifo_execute_sql_callback(mutex, sql, sql2str_callback, &cbt); if (atoi(outbound_count) > 0) { node->has_outbound = 1; @@ -686,6 +834,7 @@ struct callback_helper { switch_memory_pool_t *pool; struct call_helper *rows[MAX_ROWS]; int rowcount; + int ready; }; @@ -699,6 +848,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_ consumer_session = session; consumer_channel = switch_core_session_get_channel(consumer_session); + outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid"); switch (msg->message_id) { case SWITCH_MESSAGE_INDICATE_BRIDGE: @@ -706,6 +856,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_ if ((caller_session = switch_core_session_locate(msg->string_arg))) { caller_channel = switch_core_session_get_channel(caller_session); if (msg->message_id == SWITCH_MESSAGE_INDICATE_BRIDGE) { + cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL); switch_core_session_soft_lock(caller_session, 5); } else { switch_core_session_soft_unlock(caller_session); @@ -723,8 +874,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_ default: goto end; } - - outbound_id = switch_channel_get_variable(consumer_channel, "fifo_outbound_uuid"); + switch (msg->message_id) { case SWITCH_MESSAGE_INDICATE_BRIDGE: @@ -927,33 +1077,6 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_ return SWITCH_STATUS_SUCCESS; } -static void add_outbound_call(const char *key, switch_call_cause_t *cancel_cause) -{ - switch_mutex_lock(globals.orig_mutex); - switch_core_hash_insert(globals.orig_hash, key, cancel_cause); - switch_mutex_unlock(globals.orig_mutex); -} - -static void del_outbound_call(const char *key) -{ - switch_mutex_lock(globals.orig_mutex); - switch_core_hash_delete(globals.orig_hash, key); - switch_mutex_unlock(globals.orig_mutex); -} - -static void cancel_outbound_call(const char *key, switch_call_cause_t cause) -{ - switch_call_cause_t *cancel_cause = NULL; - - switch_mutex_lock(globals.orig_mutex); - if ((cancel_cause = (switch_call_cause_t *) switch_core_hash_find(globals.orig_hash, key))) { - *cancel_cause = cause; - } - switch_mutex_unlock(globals.orig_mutex); - - fifo_caller_del(key); - -} static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void *obj) { @@ -981,8 +1104,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; switch_call_cause_t cancel_cause = 0; char *uuid_list = NULL; - int connected = 0; + int connected = 0, total = 0; const char *codec; + struct call_helper *rows[MAX_ROWS] = { 0 }; + int rowcount = 0; switch_uuid_get(&uuid); switch_uuid_format(uuid_str, &uuid); @@ -997,9 +1122,35 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void node = switch_core_hash_find(globals.fifo_hash, node_name); switch_mutex_unlock(globals.mutex); + for (i = 0; i < cbh->rowcount; i++) { + struct call_helper *h = cbh->rows[i]; + + if (check_consumer_outbound_call(h->uuid) || check_bridge_call(h->uuid)) { + continue; + } + + rows[rowcount++] = h; + add_consumer_outbound_call(h->uuid, &cancel_cause); + total++; + } + + for (i = 0; i < rowcount; i++) { + struct call_helper *h = rows[i]; + cbh->rows[i] = h; + } + + cbh->rowcount = rowcount; + + cbh->ready = 1; + + if (!total) { + goto end; + } + + if (node) { switch_mutex_lock(node->mutex); - node->busy = switch_epoch_time_now(NULL) + 600; + //node->busy = switch_epoch_time_now(NULL) + 600; node->ring_consumer_count = 1; switch_mutex_unlock(node->mutex); } else { @@ -1113,11 +1264,14 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void for (i = 0; i < cbh->rowcount; i++) { struct call_helper *h = cbh->rows[i]; char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid); + fifo_execute_sql(sql, globals.sql_mutex); switch_safe_free(sql); } + if (!total) goto end; + if ((codec = switch_event_get_header(pop, "variable_sip_use_codec_name"))) { const char *rate = switch_event_get_header(pop, "variable_sip_use_codec_rate"); const char *ptime = switch_event_get_header(pop, "variable_sip_use_codec_ptime"); @@ -1132,11 +1286,11 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "absolute_codec_string", nstr); } - add_outbound_call(id, &cancel_cause); + add_caller_outbound_call(id, &cancel_cause); status = switch_ivr_originate(NULL, &session, &cause, originate_string, timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, &cancel_cause); - del_outbound_call(id); + del_caller_outbound_call(id); if (status != SWITCH_STATUS_SUCCESS || cause != SWITCH_CAUSE_SUCCESS) { @@ -1217,7 +1371,12 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void switch_channel_set_caller_extension(channel, extension); switch_channel_set_state(channel, CS_EXECUTE); switch_channel_wait_for_state(channel, NULL, CS_EXECUTE); + switch_channel_wait_for_flag(channel, CF_BRIDGED, SWITCH_TRUE, 5000, NULL); + switch_core_session_rwunlock(session); + + + for (i = 0; i < cbh->rowcount; i++) { struct call_helper *h = cbh->rows[i]; @@ -1228,10 +1387,19 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void end: + cbh->ready = 1; + + for (i = 0; i < cbh->rowcount; i++) { + struct call_helper *h = cbh->rows[i]; + del_consumer_outbound_call(h->uuid); + } + switch_safe_free(originate_string); switch_safe_free(uuid_list); - switch_event_destroy(&ovars); + if (ovars) { + switch_event_destroy(&ovars); + } if (pop_dup) { switch_event_destroy(&pop_dup); @@ -1240,7 +1408,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void if (node) { switch_mutex_lock(node->mutex); node->ring_consumer_count = 0; - node->busy = switch_epoch_time_now(NULL) + connected; + //node->busy = switch_epoch_time_now(NULL) + connected; switch_mutex_unlock(node->mutex); } @@ -1273,7 +1441,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) if (node) { switch_mutex_lock(node->mutex); node->ring_consumer_count++; - node->busy = switch_epoch_time_now(NULL) + 600; + //node->busy = switch_epoch_time_now(NULL) + 600; switch_mutex_unlock(node->mutex); } @@ -1375,7 +1543,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) if (node->ring_consumer_count-- < 0) { node->ring_consumer_count = 0; } - node->busy = switch_epoch_time_now(NULL) + connected; + //node->busy = switch_epoch_time_now(NULL) + connected; switch_mutex_unlock(node->mutex); } switch_core_destroy_memory_pool(&h->pool); @@ -1484,10 +1652,15 @@ static void find_consumers(fifo_node_t *node) fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh); if (cbh->rowcount) { + int sanity = 40; + switch_threadattr_create(&thd_attr, cbh->pool); switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_thread_create(&thread, thd_attr, ringall_thread_run, cbh, cbh->pool); + while(--sanity > 0 && !cbh->ready) { + switch_yield(100000); + } } } @@ -1517,16 +1690,18 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o switch_hash_this(hi, &var, NULL, &val); if ((node = (fifo_node_t *) val)) { switch_mutex_lock(node->mutex); - if (node->has_outbound && node->ready && switch_epoch_time_now(NULL) > node->busy) { + if (node->has_outbound && node->ready) {// && switch_epoch_time_now(NULL) > node->busy) { ppl_waiting = node_consumer_wait_count(node); consumer_total = node->consumer_count; idle_consumers = node_idle_consumers(node); - - /* switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, - "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d\n", node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count); */ + + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG5, + //"%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d\n", + //node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count); if ((ppl_waiting - node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) { find_consumers(node); + switch_yield(1000000); } } switch_mutex_unlock(node->mutex); @@ -1579,7 +1754,7 @@ static void check_ocancel(switch_core_session_t *session) //channel = switch_core_session_get_channel(session); - cancel_outbound_call(switch_core_session_get_uuid(session), SWITCH_CAUSE_ORIGINATOR_CANCEL); + cancel_caller_outbound_call(switch_core_session_get_uuid(session), SWITCH_CAUSE_ORIGINATOR_CANCEL); } @@ -1693,6 +1868,13 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32 } +SWITCH_STANDARD_API(fifo_check_bridge_function) +{ + stream->write_function(stream, "%s", (cmd && check_bridge_call(cmd)) ? "true" : "false"); + + return SWITCH_STATUS_SUCCESS; +} + SWITCH_STANDARD_API(fifo_add_outbound_function) { char *data = NULL, *argv[4] = { 0 }; @@ -1739,6 +1921,7 @@ static void dec_use_count(switch_channel_t *channel) long now = (long) switch_epoch_time_now(NULL); if ((outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) { + del_bridge_call(outbound_id); sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag where use_count > 0 and uuid='%q'", now, now, outbound_id); fifo_execute_sql(sql, globals.sql_mutex); @@ -1780,6 +1963,8 @@ SWITCH_STANDARD_APP(fifo_track_call_function) return; } + add_bridge_call(data); + switch_channel_set_variable(channel, "fifo_outbound_uuid", data); if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_OUTBOUND) { @@ -2344,15 +2529,29 @@ SWITCH_STANDARD_APP(fifo_function) } if (node) { - const char *varval; + const char *varval, *check = NULL; + check = switch_channel_get_variable(channel, "fifo_bridge_uuid_required"); + if ((varval = switch_channel_get_variable(channel, "fifo_bridge_uuid"))) { + if (check_bridge_call(varval) && switch_true(check)) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n", + switch_channel_get_name(channel)); + goto done; + } + for (x = 0; x < MAX_PRI; x++) { - if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) { - cancel_outbound_call(varval, SWITCH_CAUSE_PICKED_OFF); + if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "+unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) { + cancel_caller_outbound_call(varval, SWITCH_CAUSE_PICKED_OFF); break; } } + if (!pop && switch_true(check)) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "%s Call has already been answered\n", + switch_channel_get_name(channel)); + + goto done; + } } if (!pop && (varval = switch_channel_get_variable(channel, "fifo_target_skill"))) { @@ -2582,12 +2781,18 @@ SWITCH_STANDARD_APP(fifo_function) } if (outbound_id) { + cancel_consumer_outbound_call(outbound_id, SWITCH_CAUSE_ORIGINATOR_CANCEL); + add_bridge_call(outbound_id); + sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,use_count=use_count+1,outbound_fail_count=0 where uuid='%s'", switch_epoch_time_now(NULL), outbound_id); fifo_execute_sql(sql, globals.sql_mutex); switch_safe_free(sql); } + add_bridge_call(switch_core_session_get_uuid(other_session)); + add_bridge_call(switch_core_session_get_uuid(session)); + sql = switch_mprintf("insert into fifo_bridge " "(fifo_name,caller_uuid,caller_caller_id_name,caller_caller_id_number,consumer_uuid,consumer_outgoing_uuid,bridge_start) " "values ('%q','%q','%q','%q','%q','%q',%ld)", @@ -2604,6 +2809,7 @@ SWITCH_STANDARD_APP(fifo_function) fifo_execute_sql(sql, globals.sql_mutex); switch_safe_free(sql); + switch_ivr_multi_threaded_bridge(session, other_session, on_dtmf, other_session, session); if (outbound_id) { @@ -2616,8 +2822,13 @@ SWITCH_STANDARD_APP(fifo_function) fifo_execute_sql(sql, globals.sql_mutex); switch_safe_free(sql); + + del_bridge_call(outbound_id); + } + del_bridge_call(switch_core_session_get_uuid(session)); + del_bridge_call(switch_core_session_get_uuid(other_session)); if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { @@ -3779,7 +3990,7 @@ static void fifo_member_del(char *fifo_name, char *originate_string) cbt.buf = outbound_count; cbt.len = sizeof(outbound_count); - sql = switch_mprintf("select count(*) from fifo_outbound where taking_calls = 1 and fifo_name = '%q'", node->name); + sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", node->name); fifo_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt); if (atoi(outbound_count) > 0) { node->has_outbound = 1; @@ -3895,8 +4106,12 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load) switch_core_new_memory_pool(&globals.pool); switch_core_hash_init(&globals.fifo_hash, globals.pool); - switch_core_hash_init(&globals.orig_hash, globals.pool); - switch_mutex_init(&globals.orig_mutex, SWITCH_MUTEX_NESTED, globals.pool); + switch_core_hash_init(&globals.caller_orig_hash, globals.pool); + switch_core_hash_init(&globals.consumer_orig_hash, globals.pool); + switch_core_hash_init(&globals.bridge_hash, globals.pool); + switch_mutex_init(&globals.caller_orig_mutex, SWITCH_MUTEX_NESTED, globals.pool); + switch_mutex_init(&globals.consumer_orig_mutex, SWITCH_MUTEX_NESTED, globals.pool); + switch_mutex_init(&globals.bridge_mutex, SWITCH_MUTEX_NESTED, globals.pool); switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool); switch_mutex_init(&globals.sql_mutex, SWITCH_MUTEX_NESTED, globals.pool); @@ -3919,11 +4134,13 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load) SWITCH_ADD_API(commands_api_interface, "fifo", "Return data about a fifo", fifo_api_function, FIFO_API_SYNTAX); SWITCH_ADD_API(commands_api_interface, "fifo_member", "Add members to a fifo", fifo_member_api_function, FIFO_MEMBER_API_SYNTAX); SWITCH_ADD_API(commands_api_interface, "fifo_add_outbound", "Add outbound members to a fifo", fifo_add_outbound_function, " []"); + SWITCH_ADD_API(commands_api_interface, "fifo_check_bridge", "check if uuid is in a bridge", fifo_check_bridge_function, "|"); switch_console_set_complete("add fifo list"); switch_console_set_complete("add fifo list_verbose"); switch_console_set_complete("add fifo count"); switch_console_set_complete("add fifo has_outbound"); switch_console_set_complete("add fifo importance"); + switch_console_set_complete("add fifo_check_bridge ::console::list_uuid"); start_node_thread(globals.pool); @@ -3960,12 +4177,13 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) node = (fifo_node_t *) val; switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->mutex); for (x = 0; x < MAX_PRI; x++) { while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) { switch_event_destroy(&pop); } } - + switch_mutex_unlock(node->mutex); switch_core_hash_delete(globals.fifo_hash, node->name); switch_core_hash_destroy(&node->consumer_hash); switch_thread_rwlock_unlock(node->rwlock); diff --git a/src/switch_loadable_module.c b/src/switch_loadable_module.c index 97cb5e8d70..811383817c 100644 --- a/src/switch_loadable_module.c +++ b/src/switch_loadable_module.c @@ -1627,7 +1627,8 @@ SWITCH_DECLARE(switch_status_t) switch_api_execute(const char *cmd, const char * { switch_api_interface_t *api; switch_status_t status; - + char *myarg = NULL, *argp = NULL; + switch_assert(stream != NULL); switch_assert(stream->data != NULL); switch_assert(stream->write_function != NULL); @@ -1636,18 +1637,25 @@ SWITCH_DECLARE(switch_status_t) switch_api_execute(const char *cmd, const char * switch_event_create(&stream->param_event, SWITCH_EVENT_API); } + if (arg) { + myarg = strdup(arg); + argp = myarg; + while(*argp == ' ') argp++; + while(end_of(argp) == ' ') end_of(argp) = '\0'; + } + if (stream->param_event) { if (cmd) { switch_event_add_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Command", cmd); } - if (arg) { - switch_event_add_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Command-Argument", arg); + if (argp) { + switch_event_add_header_string(stream->param_event, SWITCH_STACK_BOTTOM, "API-Command-Argument", argp); } } if (cmd && (api = switch_loadable_module_get_api_interface(cmd)) != 0) { - if ((status = api->function(arg, session, stream)) != SWITCH_STATUS_SUCCESS) { + if ((status = api->function(argp, session, stream)) != SWITCH_STATUS_SUCCESS) { stream->write_function(stream, "COMMAND RETURNED ERROR!\n"); } UNPROTECT_INTERFACE(api); @@ -1660,7 +1668,8 @@ SWITCH_DECLARE(switch_status_t) switch_api_execute(const char *cmd, const char * switch_event_fire(&stream->param_event); } - + switch_safe_free(myarg); + return status; }