diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index d4e572030a..9af039e864 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -127,7 +127,6 @@ static switch_status_t fifo_queue_push(fifo_queue_t *queue, switch_event_t *ptr) queue->data[queue->idx++] = ptr; - switch_mutex_unlock(queue->mutex); return SWITCH_STATUS_SUCCESS; @@ -144,7 +143,7 @@ static int fifo_queue_size(fifo_queue_t *queue) return s; } -static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, switch_bool_t remove) +static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, int remove) { int i, j; @@ -157,7 +156,7 @@ static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, for (j = 0; j < queue->idx; j++) { const char *uuid = switch_event_get_header(queue->data[j], "unique-id"); - if (uuid && !check_caller_outbound_call(uuid)) { + if (uuid && (remove == 2 || !check_caller_outbound_call(uuid))) { if (remove) { *pop = queue->data[j]; } else { @@ -189,7 +188,7 @@ static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, } -static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, switch_bool_t remove) +static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, int remove) { int i, j, force = 0; @@ -200,6 +199,10 @@ static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *n force = 1; } + if (remove == 2) { + force = 1; + } + if (queue->idx == 0 || zstr(name) || zstr(val)) { switch_mutex_unlock(queue->mutex); return SWITCH_STATUS_FALSE; @@ -252,7 +255,10 @@ static switch_status_t fifo_queue_popfly(fifo_queue_t *queue, const char *uuid) for (j = 0; j < queue->idx; j++) { const char *j_uuid = switch_event_get_header(queue->data[j], "unique-id"); - if (j_uuid && !strcmp(j_uuid, uuid)) break; + if (j_uuid && !strcmp(j_uuid, uuid)) { + switch_event_destroy(&queue->data[j]); + break; + } } if (j == queue->idx) { @@ -556,6 +562,7 @@ static struct { char *odbc_pass; int node_thread_running; switch_odbc_handle_t *master_odbc; + int threads; } globals; @@ -1109,8 +1116,15 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void struct call_helper *rows[MAX_ROWS] = { 0 }; int rowcount = 0; + + if (!globals.running) return NULL; + switch_uuid_get(&uuid); switch_uuid_format(uuid_str, &uuid); + + switch_mutex_lock(globals.mutex); + globals.threads++; + switch_mutex_unlock(globals.mutex); if (!cbh->rowcount) { goto end; @@ -1415,6 +1429,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void switch_core_destroy_memory_pool(&cbh->pool); + switch_mutex_lock(globals.mutex); + globals.threads--; + switch_mutex_unlock(globals.mutex); + return NULL; } @@ -1434,7 +1452,14 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) switch_event_t *event = NULL; char *sql = NULL; int connected = 0; - + + if (!globals.running) return NULL; + + switch_mutex_lock(globals.mutex); + globals.threads++; + switch_mutex_unlock(globals.mutex); + + switch_mutex_lock(globals.mutex); node = switch_core_hash_find(globals.fifo_hash, h->node_name); switch_mutex_unlock(globals.mutex); @@ -1549,6 +1574,10 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) } switch_core_destroy_memory_pool(&h->pool); + switch_mutex_lock(globals.mutex); + globals.threads--; + switch_mutex_unlock(globals.mutex); + return NULL; } @@ -1639,7 +1668,7 @@ static void find_consumers(fifo_node_t *node) switch_thread_t *thread; switch_threadattr_t *thd_attr = NULL; struct callback_helper *cbh; - switch_memory_pool_t *pool; + switch_memory_pool_t *pool = NULL; switch_core_new_memory_pool(&pool); cbh = switch_core_alloc(pool, sizeof(*cbh)); @@ -2407,7 +2436,6 @@ SWITCH_STANDARD_APP(fifo_function) fifo_strategy_t strat = STRAT_WAITING_LONGER; const char *url = NULL; const char *caller_uuid = NULL; - switch_event_t *call_event; const char *outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"); //const char *track_use_count = switch_channel_get_variable(channel, "fifo_track_use_count"); //int do_track = switch_true(track_use_count); @@ -2613,12 +2641,10 @@ SWITCH_STANDARD_APP(fifo_function) continue; } - call_event = (switch_event_t *) pop; - pop = NULL; + url = switch_event_get_header(pop, "dial-url"); + caller_uuid = switch_core_session_strdup(session, switch_event_get_header(pop, "unique-id")); + switch_event_destroy(&pop); - url = switch_event_get_header(call_event, "dial-url"); - caller_uuid = switch_event_get_header(call_event, "unique-id"); - if (url) { switch_call_cause_t cause = SWITCH_CAUSE_NONE; const char *o_announce = NULL; @@ -2679,6 +2705,7 @@ SWITCH_STANDARD_APP(fifo_function) switch_channel_event_set_data(channel, event); switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]); switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_pop"); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-UUID", switch_core_session_get_uuid(other_session)); switch_event_fire(&event); } @@ -2883,9 +2910,7 @@ SWITCH_STANDARD_APP(fifo_function) send_presence(node); check_cancel(node); switch_core_session_rwunlock(other_session); - if (call_event) { - switch_event_destroy(&call_event); - } + if (!do_wait || !switch_channel_ready(channel)) { break; @@ -3895,7 +3920,7 @@ static switch_status_t load_config(int reload, int del_all) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name); switch_thread_rwlock_wrlock(node->rwlock); for (x = 0; x < MAX_PRI; x++) { - while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) { + while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) { switch_event_destroy(&pop); } } @@ -4171,6 +4196,11 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) if (globals.node_thread_running) { stop_node_thread(); } + + while(globals.threads) { + switch_cond_next(); + } + while ((hi = switch_hash_first(NULL, globals.fifo_hash))) { int x = 0; @@ -4180,7 +4210,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) switch_thread_rwlock_wrlock(node->rwlock); switch_mutex_lock(node->mutex); for (x = 0; x < MAX_PRI; x++) { - while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) { + while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) { switch_event_destroy(&pop); } }