diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index 47d3cb9aa5..d26ddf0a8e 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -291,6 +291,7 @@ struct fifo_node { switch_mutex_t *mutex; fifo_queue_t *fifo_list[MAX_PRI]; switch_hash_t *consumer_hash; + int outbound_priority; int caller_count; int consumer_count; int ring_consumer_count; @@ -567,6 +568,8 @@ static struct { int node_thread_running; switch_odbc_handle_t *master_odbc; int threads; + switch_thread_t *node_thread; + int debug; } globals; @@ -848,6 +851,85 @@ struct callback_helper { int ready; }; +static void do_unbridge(switch_core_session_t *consumer_session, switch_core_session_t *caller_session) +{ + switch_channel_t *consumer_channel = switch_core_session_get_channel(consumer_session); + switch_channel_t *caller_channel = NULL; + + if (caller_session) { + caller_channel = switch_core_session_get_channel(caller_session); + } + + if (switch_channel_test_app_flag(consumer_channel, FIFO_APP_BRIDGE_TAG)) { + char date[80] = ""; + switch_time_exp_t tm; + switch_time_t ts = switch_micro_time_now(); + switch_size_t retsize; + long epoch_start = 0, epoch_end = 0; + const char *epoch_start_a = NULL; + char *sql; + switch_event_t *event; + + switch_channel_clear_app_flag(consumer_channel, FIFO_APP_BRIDGE_TAG); + switch_channel_set_variable(consumer_channel, "fifo_bridged", NULL); + + ts = switch_micro_time_now(); + switch_time_exp_lt(&tm, ts); + switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm); + + sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(consumer_session)); + fifo_execute_sql(sql, globals.sql_mutex); + switch_safe_free(sql); + + + + switch_channel_set_variable(consumer_channel, "fifo_status", "WAITING"); + switch_channel_set_variable(consumer_channel, "fifo_timestamp", date); + + if (caller_channel) { + switch_channel_set_variable(caller_channel, "fifo_status", "DONE"); + switch_channel_set_variable(caller_channel, "fifo_timestamp", date); + } + + if ((epoch_start_a = switch_channel_get_variable(consumer_channel, "fifo_epoch_start_bridge"))) { + epoch_start = atol(epoch_start_a); + } + + epoch_end = (long)switch_epoch_time_now(NULL); + + switch_channel_set_variable_printf(consumer_channel, "fifo_epoch_stop_bridge", "%ld", epoch_end); + switch_channel_set_variable_printf(consumer_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start); + + if (caller_channel) { + switch_channel_set_variable_printf(caller_channel, "fifo_epoch_stop_bridge", "%ld", epoch_end); + switch_channel_set_variable_printf(caller_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start); + } + + if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { + switch_channel_event_set_data(consumer_channel, event); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-stop"); + switch_event_fire(&event); + } + + if (caller_channel) { + if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { + switch_channel_event_set_data(caller_channel, event); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-caller-stop"); + switch_event_fire(&event); + } + } + + if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { + switch_channel_event_set_data(consumer_channel, event); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_stop"); + switch_event_fire(&event); + } + } +} + static switch_status_t messagehook (switch_core_session_t *session, switch_core_session_message_t *msg) { @@ -1006,73 +1088,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_ break; case SWITCH_MESSAGE_INDICATE_UNBRIDGE: { - if (switch_channel_test_app_flag(consumer_channel, FIFO_APP_BRIDGE_TAG)) { - char date[80] = ""; - switch_time_exp_t tm; - switch_time_t ts = switch_micro_time_now(); - switch_size_t retsize; - long epoch_start = 0, epoch_end = 0; - const char *epoch_start_a = NULL; - - switch_channel_clear_app_flag(consumer_channel, FIFO_APP_BRIDGE_TAG); - switch_channel_set_variable(consumer_channel, "fifo_bridged", NULL); - - ts = switch_micro_time_now(); - switch_time_exp_lt(&tm, ts); - switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm); - - sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(consumer_session)); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); - - - - switch_channel_set_variable(consumer_channel, "fifo_status", "WAITING"); - switch_channel_set_variable(consumer_channel, "fifo_timestamp", date); - - if (caller_channel) { - switch_channel_set_variable(caller_channel, "fifo_status", "DONE"); - switch_channel_set_variable(caller_channel, "fifo_timestamp", date); - } - - if ((epoch_start_a = switch_channel_get_variable(consumer_channel, "fifo_epoch_start_bridge"))) { - epoch_start = atol(epoch_start_a); - } - - epoch_end = (long)switch_epoch_time_now(NULL); - - switch_channel_set_variable_printf(consumer_channel, "fifo_epoch_stop_bridge", "%ld", epoch_end); - switch_channel_set_variable_printf(consumer_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start); - - if (caller_channel) { - switch_channel_set_variable_printf(caller_channel, "fifo_epoch_stop_bridge", "%ld", epoch_end); - switch_channel_set_variable_printf(caller_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start); - } - - if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { - switch_channel_event_set_data(consumer_channel, event); - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME); - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-stop"); - switch_event_fire(&event); - } - - if (caller_channel) { - if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { - switch_channel_event_set_data(caller_channel, event); - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME); - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-caller-stop"); - switch_event_fire(&event); - } - } - - if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { - switch_channel_event_set_data(consumer_channel, event); - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME); - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "consumer_stop"); - switch_event_fire(&event); - } - } - + do_unbridge(consumer_session, caller_session); } break; default: @@ -1705,6 +1721,7 @@ static void find_consumers(fifo_node_t *node) static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj) { fifo_node_t *node; + int cur_priority = 1; globals.node_thread_running = 1; @@ -1712,31 +1729,47 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o switch_hash_index_t *hi; void *val; const void *var; - int ppl_waiting, consumer_total, idle_consumers; - + int ppl_waiting, consumer_total, idle_consumers, found = 0; + switch_mutex_lock(globals.mutex); + + if (globals.debug) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying priority: %d\n", cur_priority); + } + for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, &var, NULL, &val); if ((node = (fifo_node_t *) val)) { - if (node->has_outbound && node->ready && !node->busy) { + if (node->has_outbound && node->ready && !node->busy && node->outbound_priority == cur_priority) { ppl_waiting = node_consumer_wait_count(node); consumer_total = node->consumer_count; idle_consumers = node_idle_consumers(node); - - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG5, - //"%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d\n", - //node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count); + if (globals.debug) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, + "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d pri %d\n", + node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count, node->outbound_priority); + } + + if ((ppl_waiting - node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) { + found++; find_consumers(node); switch_yield(1000000); } } } } - switch_mutex_unlock(globals.mutex); - switch_yield(1000000); + if (++cur_priority > 10) { + cur_priority = 1; + } + + switch_mutex_unlock(globals.mutex); + + if (cur_priority == 1) { + switch_yield(1000000); + } } globals.node_thread_running = 0; @@ -1746,28 +1779,20 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o static void start_node_thread(switch_memory_pool_t *pool) { - switch_thread_t *thread; switch_threadattr_t *thd_attr = NULL; switch_threadattr_create(&thd_attr, pool); - switch_threadattr_detach_set(thd_attr, 1); + //switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_thread_create(&thread, thd_attr, node_thread_run, pool, pool); + switch_thread_create(&globals.node_thread, thd_attr, node_thread_run, pool, pool); } static int stop_node_thread(void) { - int sanity = 20; + switch_status_t st = SWITCH_STATUS_SUCCESS; - if (globals.node_thread_running) { - globals.node_thread_running = -1; - while (globals.node_thread_running) { - switch_yield(500000); - if (!--sanity) { - return -1; - } - } - } + globals.node_thread_running = -1; + switch_thread_join(&st, globals.node_thread); return 0; } @@ -1940,14 +1965,19 @@ SWITCH_STANDARD_API(fifo_add_outbound_function) } -static void dec_use_count(switch_channel_t *channel) +static void dec_use_count(switch_core_session_t *session) { char *sql; const char *outbound_id; switch_event_t *event; long now = (long) switch_epoch_time_now(NULL); + switch_channel_t *channel = switch_core_session_get_channel(session); + + do_unbridge(session, NULL); if ((outbound_id = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s untracking call on uuid %s!\n", switch_channel_get_name(channel), outbound_id); + del_bridge_call(outbound_id); sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag + 1 where use_count > 0 and uuid='%q'", now, now, outbound_id); @@ -1962,8 +1992,6 @@ static void dec_use_count(switch_channel_t *channel) switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-stop"); switch_event_fire(&event); } - - } static switch_status_t hanguphook(switch_core_session_t *session) @@ -1972,7 +2000,7 @@ static switch_status_t hanguphook(switch_core_session_t *session) switch_channel_state_t state = switch_channel_get_state(channel); if (state == CS_HANGUP) { - dec_use_count(channel); + dec_use_count(session); switch_core_event_hook_remove_state_change(session, hanguphook); } @@ -1996,6 +2024,9 @@ SWITCH_STANDARD_APP(fifo_track_call_function) return; } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s tracking call on uuid %s!\n", switch_channel_get_name(channel), data); + add_bridge_call(data); switch_channel_set_app_flag(channel, FIFO_APP_TRACKING); @@ -2105,7 +2136,7 @@ SWITCH_STANDARD_APP(fifo_function) const char *serviced_uuid = NULL; if (switch_core_event_hook_remove_receive_message(session, messagehook) == SWITCH_STATUS_SUCCESS) { - dec_use_count(channel); + dec_use_count(session); switch_core_event_hook_remove_state_change(session, hanguphook); } @@ -3464,6 +3495,9 @@ static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int ve switch_snprintf(tmp, sizeof(buffer), "%u", node->outbound_per_cycle); switch_xml_set_attr_d(x_fifo, "outbound_per_cycle", tmp); + switch_snprintf(tmp, sizeof(buffer), "%u", node->outbound_priority); + switch_xml_set_attr_d(x_fifo, "outbound_priority", tmp); + switch_xml_set_attr_d(x_fifo, "outbound_strategy", strat_parse(node->outbound_strategy)); cc_off = xml_outbound(x_fifo, node, "outbound", "member", cc_off, verbose); @@ -3496,12 +3530,20 @@ SWITCH_STANDARD_API(fifo_api_function) switch_assert(data); } - if (zstr(cmd) || (argc = switch_separate_string(data, ' ', argv, (sizeof(argv) / sizeof(argv[0])))) < 1 || !argv[0]) { - stream->write_function(stream, "%s\n", FIFO_API_SYNTAX); - return SWITCH_STATUS_SUCCESS; - } switch_mutex_lock(globals.mutex); + + if (zstr(cmd) || (argc = switch_separate_string(data, ' ', argv, (sizeof(argv) / sizeof(argv[0])))) < 1 || !argv[0]) { + stream->write_function(stream, "%s\n", FIFO_API_SYNTAX); + goto done; + } + + if (!strcasecmp(argv[0], "debug")) { + globals.debug = !globals.debug; + stream->write_function(stream, "debug %s\n", globals.debug ? "on" : "off"); + goto done; + } + verbose = !strcasecmp(argv[0], "list_verbose"); if (!strcasecmp(argv[0], "reparse")) { @@ -3601,6 +3643,8 @@ SWITCH_STANDARD_API(fifo_api_function) done: + switch_safe_free(data); + switch_mutex_unlock(globals.mutex); return SWITCH_STATUS_SUCCESS; } @@ -3780,7 +3824,7 @@ static switch_status_t load_config(int reload, int del_all) for (fifo = switch_xml_child(fifos, "fifo"); fifo; fifo = fifo->next) { const char *name, *outbound_strategy; const char *val; - int imp = 0, outbound_per_cycle = 1; + int imp = 0, outbound_per_cycle = 1, outbound_priority = 5; int simo_i = 1; int taking_calls_i = 1; int timeout_i = 60; @@ -3812,6 +3856,14 @@ static switch_status_t load_config(int reload, int del_all) outbound_per_cycle = 0; } } + + if ((val = switch_xml_attr(fifo, "outbound_priority"))) { + outbound_priority = atoi(val); + + if (outbound_priority < 1 || outbound_priority > 10) { + outbound_priority = 5; + } + } switch_mutex_lock(globals.mutex); if (!(node = switch_core_hash_find(globals.fifo_hash, name))) { @@ -3829,6 +3881,7 @@ static switch_status_t load_config(int reload, int del_all) switch_mutex_lock(node->mutex); node->outbound_per_cycle = outbound_per_cycle; + node->outbound_priority = outbound_priority; if (outbound_strategy) { diff --git a/src/mod/endpoints/mod_sofia/sofia_glue.c b/src/mod/endpoints/mod_sofia/sofia_glue.c index d31ad9af8f..e0e67ba40b 100644 --- a/src/mod/endpoints/mod_sofia/sofia_glue.c +++ b/src/mod/endpoints/mod_sofia/sofia_glue.c @@ -4920,6 +4920,7 @@ int sofia_glue_init_sql(sofia_profile_t *profile) switch_odbc_handle_exec(odbc_dbh, dialog_sql, NULL, NULL); } + free(test_sql); test_sql = switch_mprintf("delete from sip_presence where hostname='%q' ", mod_sofia_globals.hostname); if (switch_odbc_handle_exec(odbc_dbh, test_sql, NULL, NULL) != SWITCH_ODBC_SUCCESS) { diff --git a/src/switch_channel.c b/src/switch_channel.c index 8a6e24bdd3..254c82f4d4 100644 --- a/src/switch_channel.c +++ b/src/switch_channel.c @@ -536,6 +536,9 @@ SWITCH_DECLARE(void) switch_channel_uninit(switch_channel_t *channel) switch_safe_free(pop); } switch_core_hash_destroy(&channel->private_hash); + if (channel->app_flag_hash) { + switch_core_hash_destroy(&channel->app_flag_hash); + } switch_mutex_lock(channel->profile_mutex); switch_event_destroy(&channel->variables); switch_mutex_unlock(channel->profile_mutex); diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index 810de2ddda..0ea4150b15 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -425,11 +425,7 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t switch (dbh->type) { case SCDB_TYPE_ODBC: { - switch_odbc_statement_handle_t stmt = NULL; - if ((status = switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, sql, &stmt, NULL)) != SWITCH_STATUS_SUCCESS) { - errmsg = switch_odbc_handle_get_error(dbh->native_handle.odbc_dbh, stmt); - } - switch_odbc_statement_handle_free(&stmt); + status = switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, sql, NULL, &errmsg); } break; case SCDB_TYPE_CORE_DB: diff --git a/src/switch_odbc.c b/src/switch_odbc.c index 92a551ff1e..c853e14c4c 100644 --- a/src/switch_odbc.c +++ b/src/switch_odbc.c @@ -160,6 +160,11 @@ static int db_is_up(switch_odbc_handle_t *handle) strcpy((char *) sql, "select 1"); } + if (stmt) { + SQLFreeHandle(SQL_HANDLE_STMT, stmt); + stmt = NULL; + } + if (SQLAllocHandle(SQL_HANDLE_STMT, handle->con, &stmt) != SQL_SUCCESS) { code = __LINE__; goto error; @@ -370,12 +375,12 @@ SWITCH_DECLARE(switch_odbc_status_t) switch_odbc_handle_exec_string(switch_odbc_ SQLGetData(stmt, 1, SQL_C_CHAR, (SQLCHAR *) resbuf, (SQLLEN) len, NULL); sstatus = SWITCH_ODBC_SUCCESS; - } else { - return sstatus; } - done: + done: + switch_odbc_statement_handle_free(&stmt); + return sstatus; #else return SWITCH_ODBC_FAIL;