This commit is contained in:
Anthony Minessale 2010-07-22 01:23:06 -05:00
parent 7518c86a93
commit 84d897cb05
1 changed files with 48 additions and 18 deletions

View File

@ -127,7 +127,6 @@ static switch_status_t fifo_queue_push(fifo_queue_t *queue, switch_event_t *ptr)
queue->data[queue->idx++] = ptr;
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_SUCCESS;
@ -144,7 +143,7 @@ static int fifo_queue_size(fifo_queue_t *queue)
return s;
}
static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, switch_bool_t remove)
static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, int remove)
{
int i, j;
@ -157,7 +156,7 @@ static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop,
for (j = 0; j < queue->idx; j++) {
const char *uuid = switch_event_get_header(queue->data[j], "unique-id");
if (uuid && !check_caller_outbound_call(uuid)) {
if (uuid && (remove == 2 || !check_caller_outbound_call(uuid))) {
if (remove) {
*pop = queue->data[j];
} else {
@ -189,7 +188,7 @@ static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop,
}
static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, switch_bool_t remove)
static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, int remove)
{
int i, j, force = 0;
@ -200,6 +199,10 @@ static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *n
force = 1;
}
if (remove == 2) {
force = 1;
}
if (queue->idx == 0 || zstr(name) || zstr(val)) {
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_FALSE;
@ -252,7 +255,10 @@ static switch_status_t fifo_queue_popfly(fifo_queue_t *queue, const char *uuid)
for (j = 0; j < queue->idx; j++) {
const char *j_uuid = switch_event_get_header(queue->data[j], "unique-id");
if (j_uuid && !strcmp(j_uuid, uuid)) break;
if (j_uuid && !strcmp(j_uuid, uuid)) {
switch_event_destroy(&queue->data[j]);
break;
}
}
if (j == queue->idx) {
@ -556,6 +562,7 @@ static struct {
char *odbc_pass;
int node_thread_running;
switch_odbc_handle_t *master_odbc;
int threads;
} globals;
@ -1109,8 +1116,15 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
struct call_helper *rows[MAX_ROWS] = { 0 };
int rowcount = 0;
if (!globals.running) return NULL;
switch_uuid_get(&uuid);
switch_uuid_format(uuid_str, &uuid);
switch_mutex_lock(globals.mutex);
globals.threads++;
switch_mutex_unlock(globals.mutex);
if (!cbh->rowcount) {
goto end;
@ -1415,6 +1429,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
switch_core_destroy_memory_pool(&cbh->pool);
switch_mutex_lock(globals.mutex);
globals.threads--;
switch_mutex_unlock(globals.mutex);
return NULL;
}
@ -1434,7 +1452,14 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
switch_event_t *event = NULL;
char *sql = NULL;
int connected = 0;
if (!globals.running) return NULL;
switch_mutex_lock(globals.mutex);
globals.threads++;
switch_mutex_unlock(globals.mutex);
switch_mutex_lock(globals.mutex);
node = switch_core_hash_find(globals.fifo_hash, h->node_name);
switch_mutex_unlock(globals.mutex);
@ -1549,6 +1574,10 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
}
switch_core_destroy_memory_pool(&h->pool);
switch_mutex_lock(globals.mutex);
globals.threads--;
switch_mutex_unlock(globals.mutex);
return NULL;
}
@ -1639,7 +1668,7 @@ static void find_consumers(fifo_node_t *node)
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
struct callback_helper *cbh;
switch_memory_pool_t *pool;
switch_memory_pool_t *pool = NULL;
switch_core_new_memory_pool(&pool);
cbh = switch_core_alloc(pool, sizeof(*cbh));
@ -2407,7 +2436,6 @@ SWITCH_STANDARD_APP(fifo_function)
fifo_strategy_t strat = STRAT_WAITING_LONGER;
const char *url = NULL;
const char *caller_uuid = NULL;
switch_event_t *call_event;
const char *outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid");
//const char *track_use_count = switch_channel_get_variable(channel, "fifo_track_use_count");
//int do_track = switch_true(track_use_count);
@ -2613,12 +2641,10 @@ SWITCH_STANDARD_APP(fifo_function)
continue;
}
call_event = (switch_event_t *) pop;
pop = NULL;
url = switch_event_get_header(pop, "dial-url");
caller_uuid = switch_core_session_strdup(session, switch_event_get_header(pop, "unique-id"));
switch_event_destroy(&pop);
url = switch_event_get_header(call_event, "dial-url");
caller_uuid = switch_event_get_header(call_event, "unique-id");
if (url) {
switch_call_cause_t cause = SWITCH_CAUSE_NONE;
const char *o_announce = NULL;
@ -2679,6 +2705,7 @@ SWITCH_STANDARD_APP(fifo_function)
switch_channel_event_set_data(channel, event);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_pop");
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Caller-UUID", switch_core_session_get_uuid(other_session));
switch_event_fire(&event);
}
@ -2883,9 +2910,7 @@ SWITCH_STANDARD_APP(fifo_function)
send_presence(node);
check_cancel(node);
switch_core_session_rwunlock(other_session);
if (call_event) {
switch_event_destroy(&call_event);
}
if (!do_wait || !switch_channel_ready(channel)) {
break;
@ -3895,7 +3920,7 @@ static switch_status_t load_config(int reload, int del_all)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
switch_thread_rwlock_wrlock(node->rwlock);
for (x = 0; x < MAX_PRI; x++) {
while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
}
}
@ -4171,6 +4196,11 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
if (globals.node_thread_running) {
stop_node_thread();
}
while(globals.threads) {
switch_cond_next();
}
while ((hi = switch_hash_first(NULL, globals.fifo_hash))) {
int x = 0;
@ -4180,7 +4210,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
switch_thread_rwlock_wrlock(node->rwlock);
switch_mutex_lock(node->mutex);
for (x = 0; x < MAX_PRI; x++) {
while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
}
}