From 7f9beb96cc173ab05377d70156aebf874e185d3a Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Wed, 30 Jun 2010 21:46:38 -0500 Subject: [PATCH] revamp fifo with a ringall strategy so you can see the stupid callers' caller id .... --- src/mod/applications/mod_fifo/mod_fifo.c | 706 ++++++++++++++++++++--- 1 file changed, 633 insertions(+), 73 deletions(-) diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index 52a922644e..bc6159172d 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -39,11 +39,203 @@ SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL); static switch_status_t load_config(int reload, int del_all); #define MAX_PRI 10 +typedef enum { + NODE_STRATEGY_INVALID = -1, + NODE_STRATEGY_RINGALL = 0, + NODE_STRATEGY_ENTERPRISE +} outbound_strategy_t; + +static outbound_strategy_t default_strategy = NODE_STRATEGY_ENTERPRISE; + + +typedef struct { + int nelm; + int idx; + switch_event_t **data; + switch_memory_pool_t *pool; + switch_mutex_t *mutex; +} fifo_queue_t; + +switch_status_t fifo_queue_create(fifo_queue_t **queue, int size, switch_memory_pool_t *pool) +{ + fifo_queue_t *q; + + q = switch_core_alloc(pool, sizeof(*q)); + q->pool = pool; + q->nelm = size - 1; + q->data = switch_core_alloc(pool, size * sizeof(switch_event_t *)); + switch_mutex_init(&q->mutex, SWITCH_MUTEX_NESTED, pool); + + *queue = q; + + return SWITCH_STATUS_SUCCESS; +} + + +static void change_pos(switch_event_t *event, int pos) +{ + const char *uuid = switch_event_get_header(event, "unique-id"); + switch_core_session_t *session; + switch_channel_t *channel; + char tmp[30] = ""; + + if (zstr(uuid)) return; + + if (!(session = switch_core_session_locate(uuid))) { + return; + } + + channel = switch_core_session_get_channel(session); + + switch_snprintf(tmp, sizeof(tmp), "%d", pos); + switch_channel_set_variable(channel, "fifo_position", tmp); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "fifo_position", tmp); + + switch_core_session_rwunlock(session); + + +} + +static switch_status_t fifo_queue_push(fifo_queue_t *queue, switch_event_t *ptr) +{ + switch_mutex_lock(queue->mutex); + + if (queue->idx == queue->nelm) { + switch_mutex_unlock(queue->mutex); + return SWITCH_STATUS_FALSE; + } + + queue->data[queue->idx++] = ptr; + + + switch_mutex_unlock(queue->mutex); + + return SWITCH_STATUS_SUCCESS; + +} + +static int fifo_queue_size(fifo_queue_t *queue) +{ + int s; + switch_mutex_lock(queue->mutex); + s = queue->idx; + switch_mutex_unlock(queue->mutex); + + return s; +} + +static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, switch_bool_t remove) +{ + int i; + + switch_mutex_lock(queue->mutex); + + if (queue->idx == 0) { + switch_mutex_unlock(queue->mutex); + *pop = NULL; + return SWITCH_STATUS_FALSE; + } + + *pop = queue->data[0]; + + if (remove) { + for (i = 1; i < queue->idx; i++) { + queue->data[i-1] = queue->data[i]; + queue->data[i] = NULL; + change_pos(queue->data[i-1], i); + } + + queue->idx--; + } + + switch_mutex_unlock(queue->mutex); + + return SWITCH_STATUS_SUCCESS; + +} + + +static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, switch_bool_t remove) +{ + int i, j; + + switch_mutex_lock(queue->mutex); + + if (queue->idx == 0 || zstr(name) || zstr(val)) { + switch_mutex_unlock(queue->mutex); + return SWITCH_STATUS_FALSE; + } + + for (j = 0; j < queue->idx; j++) { + const char *j_val = switch_event_get_header(queue->data[j], name); + if (j_val && val && !strcmp(j_val, val)) { + *pop = queue->data[j]; + break; + } + } + + if (j == queue->idx) { + switch_mutex_unlock(queue->mutex); + return SWITCH_STATUS_FALSE; + } + + if (remove) { + for (i = j+1; i < queue->idx; i++) { + queue->data[i-1] = queue->data[i]; + queue->data[i] = NULL; + change_pos(queue->data[i-1], i); + } + + queue->idx--; + } + + switch_mutex_unlock(queue->mutex); + + return SWITCH_STATUS_SUCCESS; +} + + +static switch_status_t fifo_queue_popfly(fifo_queue_t *queue, const char *uuid) +{ + int i, j; + + switch_mutex_lock(queue->mutex); + + if (queue->idx == 0 || zstr(uuid)) { + switch_mutex_unlock(queue->mutex); + return SWITCH_STATUS_FALSE; + } + + for (j = 0; j < queue->idx; j++) { + const char *j_uuid = switch_event_get_header(queue->data[j], "unique-id"); + if (j_uuid && !strcmp(j_uuid, uuid)) break; + } + + if (j == queue->idx) { + switch_mutex_unlock(queue->mutex); + return SWITCH_STATUS_FALSE; + } + + for (i = j+1; i < queue->idx; i++) { + queue->data[i-1] = queue->data[i]; + queue->data[i] = NULL; + change_pos(queue->data[i-1], i); + } + + queue->idx--; + + switch_mutex_unlock(queue->mutex); + + return SWITCH_STATUS_SUCCESS; + +} + + + struct fifo_node { char *name; switch_mutex_t *mutex; - switch_queue_t *fifo_list[MAX_PRI]; - switch_hash_t *caller_hash; + fifo_queue_t *fifo_list[MAX_PRI]; switch_hash_t *consumer_hash; int caller_count; int consumer_count; @@ -54,7 +246,9 @@ struct fifo_node { switch_memory_pool_t *pool; int has_outbound; int ready; + int busy; int is_static; + outbound_strategy_t outbound_strategy; }; typedef struct fifo_node fifo_node_t; @@ -66,6 +260,33 @@ struct callback { }; typedef struct callback callback_t; +static const char *strat_parse(outbound_strategy_t s) +{ + switch (s) { + case NODE_STRATEGY_RINGALL: + return "ringall"; + case NODE_STRATEGY_ENTERPRISE: + return "enterprise"; + default: + break; + } + + return "invalid"; +} + +static outbound_strategy_t parse_strat(const char *name) +{ + if (!strcasecmp(name, "ringall")) { + return NODE_STRATEGY_RINGALL; + } + + if (!strcasecmp(name, "enterprise")) { + return NODE_STRATEGY_ENTERPRISE; + } + + return NODE_STRATEGY_INVALID; +} + static int sql2str_callback(void *pArg, int argc, char **argv, char **columnNames) { callback_t *cbt = (callback_t *) pArg; @@ -161,7 +382,7 @@ static int node_consumer_wait_count(fifo_node_t *node) int i, len = 0; for (i = 0; i < MAX_PRI; i++) { - len += switch_queue_size(node->fifo_list[i]); + len += fifo_queue_size(node->fifo_list[i]); } return len; @@ -169,24 +390,10 @@ static int node_consumer_wait_count(fifo_node_t *node) static void node_remove_uuid(fifo_node_t *node, const char *uuid) { - int i, len = 0, done = 0; - void *pop = NULL; + int i = 0; for (i = 0; i < MAX_PRI; i++) { - if (!(len = switch_queue_size(node->fifo_list[i]))) { - continue; - } - while (len) { - if (switch_queue_trypop(node->fifo_list[i], &pop) == SWITCH_STATUS_SUCCESS && pop) { - if (!done && !strcmp((char *) pop, uuid)) { - free(pop); - done++; - } else { - switch_queue_push(node->fifo_list[i], pop); - } - } - len--; - } + fifo_queue_popfly(node->fifo_list[i], uuid); } if (!node_consumer_wait_count(node)) { @@ -265,7 +472,7 @@ static switch_status_t consumer_read_frame_callback(switch_core_session_t *sessi break; } for (x = 0; x < MAX_PRI; x++) { - total += switch_queue_size(node->fifo_list[x]); + total += fifo_queue_size(node->fifo_list[x]); } } @@ -303,13 +510,15 @@ switch_cache_db_handle_t *fifo_get_db_handle(void) options.odbc_options.user = globals.odbc_user; options.odbc_options.pass = globals.odbc_pass; - if (switch_cache_db_get_db_handle(&dbh, SCDB_TYPE_ODBC, &options) != SWITCH_STATUS_SUCCESS) + if (switch_cache_db_get_db_handle(&dbh, SCDB_TYPE_ODBC, &options) != SWITCH_STATUS_SUCCESS) { dbh = NULL; + } return dbh; } else { options.core_db_options.db_path = globals.dbname; - if (switch_cache_db_get_db_handle(&dbh, SCDB_TYPE_CORE_DB, &options) != SWITCH_STATUS_SUCCESS) + if (switch_cache_db_get_db_handle(&dbh, SCDB_TYPE_CORE_DB, &options) != SWITCH_STATUS_SUCCESS) { dbh = NULL; + } return dbh; } } @@ -392,13 +601,13 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu node = switch_core_alloc(pool, sizeof(*node)); node->pool = pool; - + node->outbound_strategy = default_strategy; node->name = switch_core_strdup(node->pool, name); for (x = 0; x < MAX_PRI; x++) { - switch_queue_create(&node->fifo_list[x], SWITCH_CORE_QUEUE_LEN, node->pool); + fifo_queue_create(&node->fifo_list[x], SWITCH_CORE_QUEUE_LEN, node->pool); switch_assert(node->fifo_list[x]); } - switch_core_hash_init(&node->caller_hash, node->pool); + switch_core_hash_init(&node->consumer_hash, node->pool); switch_thread_rwlock_create(&node->rwlock, node->pool); switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool); @@ -471,6 +680,179 @@ struct call_helper { switch_memory_pool_t *pool; }; +#define MAX_ROWS 2048 +struct callback_helper { + int need; + switch_memory_pool_t *pool; + struct call_helper *rows[MAX_ROWS]; + int rowcount; +}; + + + +static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void *obj) +{ + struct callback_helper *cbh = (struct callback_helper *) obj; + char *node_name; + int i = 0; + int timeout = 0; + char sql[256] = ""; + switch_stream_handle_t stream = { 0 }; + fifo_node_t *node = NULL; + char *originate_string; + switch_event_t *ovars = NULL; + switch_status_t status; + switch_core_session_t *session = NULL; + switch_call_cause_t cause = SWITCH_CAUSE_NONE; + char *app_name = NULL, *arg = NULL; + switch_caller_extension_t *extension = NULL; + switch_channel_t *channel; + char *cid_name = NULL, *cid_num = NULL, *id = NULL; + switch_event_t *pop = NULL; + fifo_queue_t *q = NULL; + int x = 0; + + if (!cbh->rowcount) { + goto end; + } + + node_name = cbh->rows[0]->node_name; + + switch_mutex_lock(globals.mutex); + node = switch_core_hash_find(globals.fifo_hash, node_name); + switch_mutex_unlock(globals.mutex); + + if (node) { + switch_mutex_lock(node->mutex); + node->busy = 1; + node->ring_consumer_count++; + switch_mutex_unlock(node->mutex); + } else { + goto end; + } + + SWITCH_STANDARD_STREAM(stream); + + switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS); + switch_assert(ovars); + + + for (i = 0; i < cbh->rowcount; i++) { + struct call_helper *h = cbh->rows[i]; + char *parsed = NULL; + + switch_event_create_brackets(h->originate_string, '{', '}', ',', &ovars, &parsed); + + if (!h->timeout) h->timeout = 60; + if (timeout < h->timeout) timeout = h->timeout; + + stream.write_function(&stream, "[leg_timeout=%d,fifo_outbound_uuid=%s]%s,", h->timeout, h->uuid, parsed ? parsed : h->originate_string); + switch_safe_free(parsed); + + switch_snprintf(sql, sizeof(sql), "update fifo_outbound set use_count=use_count+1 where uuid='%s'", h->uuid); + fifo_execute_sql(sql, globals.sql_mutex); + } + + + originate_string = (char *) stream.data; + + if (originate_string) { + end_of(originate_string) = '\0'; + } + + if (!timeout) timeout = 60; + + for (x = 0; x < MAX_PRI; x++) { + q = node->fifo_list[x]; + switch_mutex_lock(q->mutex); + + if (fifo_queue_pop_nameval(q, "variable_fifo_vip", "true", &pop, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS && pop) { + goto found; + } + switch_mutex_unlock(q->mutex); + q = NULL; + } + + if (!pop) { + for (x = 0; x < MAX_PRI; x++) { + q = node->fifo_list[x]; + switch_mutex_lock(q->mutex); + + if (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS && pop) { + goto found; + } + } + switch_mutex_unlock(q->mutex); + q = NULL; + } + + found: + + + if (!q) goto end; + + if (!pop) { + if (q) switch_mutex_unlock(q->mutex); + goto end; + } + + if ((cid_name = switch_event_get_header(pop, "caller-caller-id-name"))) { + switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_name", cid_name); + } + + if ((cid_num = switch_event_get_header(pop, "caller-caller-id-number"))) { + switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_number", cid_num); + } + + if ((id = switch_event_get_header(pop, "unique-id"))) { + switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "fifo_bridge_uuid", id); + } + + switch_mutex_unlock(q->mutex); + + status = switch_ivr_originate(NULL, &session, &cause, originate_string, timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, NULL); + free(originate_string); + + if (status != SWITCH_STATUS_SUCCESS) { + for (i = 0; i < cbh->rowcount; i++) { + struct call_helper *h = cbh->rows[i]; + switch_snprintf(sql, sizeof(sql), + "update fifo_outbound set use_count=use_count-1, outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag where uuid='%s'", + (long) switch_epoch_time_now(NULL), h->uuid); + fifo_execute_sql(sql, globals.sql_mutex); + } + goto end; + } + + channel = switch_core_session_get_channel(session); + switch_channel_set_variable(channel, "fifo_pop_order", NULL); + + switch_core_event_hook_add_state_change(session, hanguphook); + app_name = "fifo"; + arg = switch_core_session_sprintf(session, "%s out wait", node_name); + extension = switch_caller_extension_new(session, app_name, arg); + switch_caller_extension_add_application(session, extension, app_name, arg); + switch_channel_set_caller_extension(channel, extension); + switch_channel_set_state(channel, CS_EXECUTE); + switch_core_session_rwunlock(session); + + end: + + switch_event_destroy(&ovars); + if (node) { + switch_mutex_lock(node->mutex); + if (node->ring_consumer_count-- < 0) { + node->ring_consumer_count = 0; + } + node->busy = 0; + switch_mutex_unlock(node->mutex); + } + + switch_core_destroy_memory_pool(&cbh->pool); + + return NULL; +} + static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) { struct call_helper *h = (struct call_helper *) obj; @@ -565,7 +947,29 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) return NULL; } -static int place_call_callback(void *pArg, int argc, char **argv, char **columnNames) +static int place_call_ringall_callback(void *pArg, int argc, char **argv, char **columnNames) +{ + struct callback_helper *cbh = (struct callback_helper *) pArg; + struct call_helper *h; + + h = switch_core_alloc(cbh->pool, sizeof(*h)); + h->pool = cbh->pool; + h->uuid = switch_core_strdup(h->pool, argv[0]); + h->node_name = switch_core_strdup(h->pool, argv[1]); + h->originate_string = switch_core_strdup(h->pool, argv[2]); + h->timeout = atoi(argv[5]); + + cbh->rows[cbh->rowcount++] = h; + + cbh->need--; + + if (cbh->rowcount == MAX_ROWS) return -1; + + return cbh->need ? 0 : -1; + +} + +static int place_call_enterprise_callback(void *pArg, int argc, char **argv, char **columnNames) { int *need = (int *) pArg; @@ -596,18 +1000,50 @@ static int place_call_callback(void *pArg, int argc, char **argv, char **columnN static void find_consumers(fifo_node_t *node) { - int need = node_consumer_wait_count(node); char *sql; + + sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, " "next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname " "from fifo_outbound where taking_calls = 1 and " "(fifo_name = '%q') and (use_count < simo_count) and (next_avail = 0 or next_avail <= %ld) " "order by next_avail", node->name, (long) switch_epoch_time_now(NULL)); - switch_assert(sql); - fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_callback, &need); - free(sql); + switch(node->outbound_strategy) { + case NODE_STRATEGY_ENTERPRISE: + { + int need = node_consumer_wait_count(node); + fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_enterprise_callback, &need); + + } + break; + case NODE_STRATEGY_RINGALL: + { + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + struct callback_helper *cbh; + switch_memory_pool_t *pool; + + switch_core_new_memory_pool(&pool); + cbh = switch_core_alloc(pool, sizeof(*cbh)); + cbh->pool = pool; + cbh->need = node_consumer_wait_count(node); + fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh); + + switch_threadattr_create(&thd_attr, cbh->pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, ringall_thread_run, cbh, cbh->pool); + + } + break; + default: + break; + } + + + switch_safe_free(sql); } static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj) @@ -626,7 +1062,7 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, &var, NULL, &val); if ((node = (fifo_node_t *) val)) { - if (node->has_outbound && node->ready) { + if (node->has_outbound && node->ready && !node->busy) { switch_mutex_lock(node->mutex); ppl_waiting = node_consumer_wait_count(node); consumer_total = node->consumer_count; @@ -759,7 +1195,7 @@ static void pres_event_handler(switch_event_t *event) static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32_t priority) { fifo_node_t *node; - char *str; + switch_event_t *call_event; if (priority >= MAX_PRI) { priority = MAX_PRI - 1; @@ -773,10 +1209,13 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32 switch_mutex_unlock(globals.mutex); - str = switch_mprintf("dial:%s", url); - switch_queue_push(node->fifo_list[priority], (void *) str); + switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA); + switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "dial-url", url); + + fifo_queue_push(node->fifo_list[priority], call_event); + call_event = NULL; - return switch_queue_size(node->fifo_list[priority]); + return fifo_queue_size(node->fifo_list[priority]); } @@ -970,6 +1409,7 @@ SWITCH_STANDARD_APP(fifo_function) int freq = 30; int ftmp = 0; int to = 60; + switch_event_t *call_event; if (orbit_exten) { char *ot; @@ -998,8 +1438,6 @@ SWITCH_STANDARD_APP(fifo_function) switch_mutex_lock(node->mutex); node->caller_count++; - switch_core_hash_insert(node->caller_hash, uuid, session); - if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) { p = atoi(pri); } @@ -1020,8 +1458,13 @@ SWITCH_STANDARD_APP(fifo_function) switch_event_fire(&event); } - switch_queue_push(node->fifo_list[p], (void *) strdup(uuid)); - switch_snprintf(tmp, sizeof(tmp), "%d", switch_queue_size(node->fifo_list[p])); + switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA); + switch_channel_event_set_data(channel, call_event); + + + fifo_queue_push(node->fifo_list[p], call_event); + call_event = NULL; + switch_snprintf(tmp, sizeof(tmp), "%d", fifo_queue_size(node->fifo_list[p])); switch_channel_set_variable(channel, "fifo_position", tmp); if (!pri) { @@ -1136,7 +1579,6 @@ SWITCH_STANDARD_APP(fifo_function) switch_mutex_lock(node->mutex); node_remove_uuid(node, uuid); node->caller_count--; - switch_core_hash_delete(node->caller_hash, uuid); switch_mutex_unlock(node->mutex); send_presence(node); check_cancel(node); @@ -1154,10 +1596,9 @@ SWITCH_STANDARD_APP(fifo_function) goto done; } else { /* consumer */ - void *pop = NULL; + switch_event_t *pop = NULL; switch_frame_t *read_frame; switch_status_t status; - char *uuid; switch_core_session_t *other_session; switch_input_args_t args = { 0 }; const char *pop_order = NULL; @@ -1173,8 +1614,10 @@ SWITCH_STANDARD_APP(fifo_function) char buf[5] = ""; const char *strat_str = switch_channel_get_variable(channel, "fifo_strategy"); fifo_strategy_t strat = STRAT_WAITING_LONGER; - char *url = NULL; - + const char *url = NULL; + const char *caller_uuid = NULL; + switch_event_t *call_event; + if (!zstr(strat_str)) { if (!strcasecmp(strat_str, "more_ppl")) { strat = STRAT_MORE_PPL; @@ -1247,7 +1690,7 @@ SWITCH_STANDARD_APP(fifo_function) int x = 0, winner = -1; switch_time_t longest = (0xFFFFFFFFFFFFFFFFULL / 2); uint32_t importance = 0, waiting = 0, most_waiting = 0; - + pop = NULL; if (moh && do_wait) { @@ -1296,20 +1739,50 @@ SWITCH_STANDARD_APP(fifo_function) } if (node) { - if (custom_pop) { + const char *varval; + + if ((varval = switch_channel_get_variable(channel, "fifo_bridge_uuid"))) { for (x = 0; x < MAX_PRI; x++) { - if (switch_queue_trypop(node->fifo_list[pop_array[x]], &pop) == SWITCH_STATUS_SUCCESS && pop) { + if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) { break; } } - } else { + } + + if (!pop && (varval = switch_channel_get_variable(channel, "fifo_target_skill"))) { for (x = 0; x < MAX_PRI; x++) { - if (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS && pop) { + if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "variable_fifo_skill", + varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) { break; } } } + if (!pop) { + for (x = 0; x < MAX_PRI; x++) { + if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "variable_fifo_vip", "true", + &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) { + break; + } + } + } + + if (!pop) { + if (custom_pop) { + for (x = 0; x < MAX_PRI; x++) { + if (fifo_queue_pop(node->fifo_list[pop_array[x]], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) { + break; + } + } + } else { + for (x = 0; x < MAX_PRI; x++) { + if (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) { + break; + } + } + } + } + if (pop && !node_consumer_wait_count(node)) { switch_mutex_lock(node->mutex); node->start_waiting = 0; @@ -1331,14 +1804,16 @@ SWITCH_STANDARD_APP(fifo_function) continue; } - uuid = (char *) pop; + call_event = (switch_event_t *) pop; pop = NULL; + + url = switch_event_get_header(call_event, "dial-url"); + caller_uuid = switch_event_get_header(call_event, "unique-id"); - if (!strncasecmp(uuid, "dial:", 5)) { + if (url) { switch_call_cause_t cause = SWITCH_CAUSE_NONE; const char *o_announce = NULL; - url = uuid + 5; - + if ((o_announce = switch_channel_get_variable(channel, "fifo_outbound_announce"))) { switch_ivr_play_file(session, NULL, o_announce, NULL); } @@ -1367,12 +1842,11 @@ SWITCH_STANDARD_APP(fifo_function) switch_event_fire(&event); } url = NULL; - free(uuid); - uuid = strdup(switch_core_session_get_uuid(other_session)); + caller_uuid = switch_core_session_strdup(session, switch_core_session_get_uuid(other_session)); } } else { - if ((other_session = switch_core_session_locate(uuid))) { + if ((other_session = switch_core_session_locate(caller_uuid))) { switch_channel_t *other_channel = switch_core_session_get_channel(other_session); if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { switch_channel_event_set_data(other_channel, event); @@ -1427,7 +1901,6 @@ SWITCH_STANDARD_APP(fifo_function) switch_mutex_lock(node->mutex); node->caller_count--; - switch_core_hash_delete(node->caller_hash, uuid); switch_mutex_unlock(node->mutex); send_presence(node); check_cancel(node); @@ -1460,7 +1933,7 @@ SWITCH_STANDARD_APP(fifo_function) switch_time_exp_lt(&tm, ts); switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm); switch_channel_set_variable(channel, "fifo_status", "TALKING"); - switch_channel_set_variable(channel, "fifo_target", uuid); + switch_channel_set_variable(channel, "fifo_target", caller_uuid); switch_channel_set_variable(channel, "fifo_timestamp", date); switch_channel_set_variable(other_channel, "fifo_status", "TALKING"); @@ -1512,12 +1985,13 @@ SWITCH_STANDARD_APP(fifo_function) switch_mutex_lock(node->mutex); node->caller_count--; - switch_core_hash_delete(node->caller_hash, uuid); switch_mutex_unlock(node->mutex); send_presence(node); check_cancel(node); switch_core_session_rwunlock(other_session); - switch_safe_free(uuid); + if (call_event) { + switch_event_destroy(&call_event); + } if (!do_wait || !switch_channel_ready(channel)) { break; @@ -1626,7 +2100,6 @@ SWITCH_STANDARD_APP(fifo_function) switch_thread_rwlock_wrlock(node->rwlock); node->ready = 0; switch_mutex_lock(node->mutex); - switch_core_hash_destroy(&node->caller_hash); switch_core_hash_destroy(&node->consumer_hash); switch_mutex_unlock(node->mutex); switch_thread_rwlock_unlock(node->rwlock); @@ -1680,8 +2153,8 @@ static int xml_callback(void *pArg, int argc, char **argv, char **columnNames) switch_xml_set_attr_d(x_out, "lag", argv[6]); switch_xml_set_attr_d(x_out, "outbound-call-count", argv[10]); switch_xml_set_attr_d(x_out, "outbound-fail-count", argv[11]); - switch_xml_set_attr_d(x_out, "taking-calls", argv[12]); - switch_xml_set_attr_d(x_out, "status", argv[13]); + switch_xml_set_attr_d(x_out, "taking-calls", argv[13]); + switch_xml_set_attr_d(x_out, "status", argv[14]); switch_xml_set_attr_d(x_out, "next-available", expires); switch_xml_set_txt_d(x_out, argv[2]); @@ -1764,6 +2237,82 @@ static int xml_hash(switch_xml_t xml, switch_hash_t *hash, char *container, char return cc_off; } + +static int xml_caller(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose) +{ + switch_xml_t x_tmp, x_caller, x_cp, variables; + int i, x; + switch_core_session_t *session; + switch_channel_t *channel; + + x_tmp = switch_xml_add_child_d(xml, container, cc_off++); + switch_assert(x_tmp); + + for (x = 0; x < MAX_PRI; x++) { + fifo_queue_t *q = node->fifo_list[x]; + + switch_mutex_lock(q->mutex); + + for (i = 0; i < q->idx; i++) { + + int c_off = 0, d_off = 0; + const char *status; + const char *ts; + const char *uuid = switch_event_get_header(q->data[i], "unique-id"); + + if (!uuid) { + continue; + } + + if (!(session = switch_core_session_locate(uuid))) { + continue; + } + + channel = switch_core_session_get_channel(session); + x_caller = switch_xml_add_child_d(x_tmp, tag, c_off++); + switch_assert(x_caller); + + switch_xml_set_attr_d(x_caller, "uuid", switch_core_session_get_uuid(session)); + + if ((status = switch_channel_get_variable(channel, "fifo_status"))) { + switch_xml_set_attr_d(x_caller, "status", status); + } + + if ((ts = switch_channel_get_variable(channel, "fifo_timestamp"))) { + switch_xml_set_attr_d(x_caller, "timestamp", ts); + } + + if ((ts = switch_channel_get_variable(channel, "fifo_target"))) { + switch_xml_set_attr_d(x_caller, "target", ts); + } + + if ((ts = switch_channel_get_variable(channel, "fifo_position"))) { + switch_xml_set_attr_d(x_caller, "position", ts); + } + + if (!(x_cp = switch_xml_add_child_d(x_caller, "caller_profile", d_off++))) { + abort(); + } + if (verbose) { + d_off += switch_ivr_set_xml_profile_data(x_cp, switch_channel_get_caller_profile(channel), d_off); + + if (!(variables = switch_xml_add_child_d(x_caller, "variables", c_off++))) { + abort(); + } + + switch_ivr_set_xml_chan_vars(variables, channel, c_off); + } + + switch_core_session_rwunlock(session); + session = NULL; + } + + switch_mutex_unlock(q->mutex); + } + + return cc_off; +} + static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int verbose) { switch_xml_t x_fifo; @@ -1784,8 +2333,10 @@ static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int ve switch_snprintf(tmp, sizeof(buffer), "%u", node->importance); switch_xml_set_attr_d(x_fifo, "importance", tmp); + switch_xml_set_attr_d(x_fifo, "outbound_strategy", strat_parse(node->outbound_strategy)); + cc_off = xml_outbound(x_fifo, node, "outbound", "member", cc_off, verbose); - cc_off = xml_hash(x_fifo, node->caller_hash, "callers", "caller", cc_off, verbose); + cc_off = xml_caller(x_fifo, node, "callers", "caller", cc_off, verbose); cc_off = xml_hash(x_fifo, node->consumer_hash, "consumers", "consumer", cc_off, verbose); } @@ -1965,6 +2516,10 @@ static switch_status_t load_config(int reload, int del_all) var = (char *) switch_xml_attr_soft(param, "name"); val = (char *) switch_xml_attr_soft(param, "value"); + if (!strcasecmp(var, "outbound-strategy") && !zstr(val)) { + default_strategy = parse_strat(val); + } + if (!strcasecmp(var, "odbc-dsn") && !zstr(val)) { if (switch_odbc_available()) { globals.odbc_dsn = switch_core_strdup(globals.pool, val); @@ -2024,7 +2579,7 @@ static switch_status_t load_config(int reload, int del_all) if ((fifos = switch_xml_child(cfg, "fifos"))) { for (fifo = switch_xml_child(fifos, "fifo"); fifo; fifo = fifo->next) { - const char *name; + const char *name, *outbound_strategy; const char *importance; int imp = 0; int simo_i = 1; @@ -2033,6 +2588,7 @@ static switch_status_t load_config(int reload, int del_all) fifo_node_t *node; name = switch_xml_attr(fifo, "name"); + outbound_strategy = switch_xml_attr(fifo, "outbound_strategy"); if ((importance = switch_xml_attr(fifo, "importance"))) { if ((imp = atoi(importance)) < 0) { @@ -2055,6 +2611,10 @@ static switch_status_t load_config(int reload, int del_all) switch_mutex_lock(node->mutex); + if (outbound_strategy) { + node->outbound_strategy = parse_strat(outbound_strategy); + } + for (member = switch_xml_child(fifo, "member"); member; member = member->next) { const char *simo = switch_xml_attr_soft(member, "simo"); const char *lag = switch_xml_attr_soft(member, "lag"); @@ -2113,7 +2673,8 @@ static switch_status_t load_config(int reload, int del_all) if (reload) { switch_hash_index_t *hi; - void *val, *pop; + void *val; + switch_event_t *pop; fifo_node_t *node; switch_mutex_lock(globals.mutex); top: @@ -2131,13 +2692,12 @@ static switch_status_t load_config(int reload, int del_all) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name); switch_thread_rwlock_wrlock(node->rwlock); for (x = 0; x < MAX_PRI; x++) { - while (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS) { - free(pop); + while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) { + switch_event_destroy(&pop); } } switch_core_hash_delete(globals.fifo_hash, node->name); - switch_core_hash_destroy(&node->caller_hash); switch_core_hash_destroy(&node->consumer_hash); switch_thread_rwlock_unlock(node->rwlock); switch_core_destroy_memory_pool(&node->pool); @@ -2361,7 +2921,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load) SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) { switch_hash_index_t *hi; - void *val, *pop; + void *val; + switch_event_t *pop = NULL; fifo_node_t *node; switch_memory_pool_t *pool = globals.pool; switch_mutex_t *mutex = globals.mutex; @@ -2385,13 +2946,12 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) switch_thread_rwlock_wrlock(node->rwlock); for (x = 0; x < MAX_PRI; x++) { - while (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS) { - free(pop); + while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) { + switch_event_destroy(&pop); } } switch_core_hash_delete(globals.fifo_hash, node->name); - switch_core_hash_destroy(&node->caller_hash); switch_core_hash_destroy(&node->consumer_hash); switch_thread_rwlock_unlock(node->rwlock); switch_core_destroy_memory_pool(&node->pool);