diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index ad8d936976..f8fa2a3b78 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -36,8 +36,10 @@ SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL); #define FIFO_EVENT "fifo::info" +static switch_status_t load_config(int reload, int del_all); #define MAX_PRI 10 + struct fifo_node { char *name; switch_mutex_t *mutex; @@ -48,6 +50,9 @@ struct fifo_node { int consumer_count; switch_time_t start_waiting; uint32_t importance; + switch_thread_rwlock_t *rwlock; + switch_memory_pool_t *pool; + int has_outbound; }; typedef struct fifo_node fifo_node_t; @@ -240,11 +245,116 @@ static switch_status_t consumer_read_frame_callback(switch_core_session_t *sessi static struct { switch_hash_t *fifo_hash; switch_mutex_t *mutex; + switch_mutex_t *sql_mutex; switch_memory_pool_t *pool; int running; switch_event_node_t *node; + char hostname[256]; + char *dbname; + char *odbc_dsn; + int node_thread_running; + +#ifdef SWITCH_HAVE_ODBC + switch_odbc_handle_t *master_odbc; +#else + void *filler1; +#endif } globals; + +static switch_status_t fifo_execute_sql(char *sql, switch_mutex_t *mutex) +{ + switch_core_db_t *db; + switch_status_t status = SWITCH_STATUS_SUCCESS; + char *err_str; + + if (mutex) { + switch_mutex_lock(mutex); + } +#ifdef SWITCH_HAVE_ODBC + if (globals.odbc_dsn) { + SQLHSTMT stmt; + if (switch_odbc_handle_exec(globals.master_odbc, sql, &stmt) != SWITCH_ODBC_SUCCESS) { + + err_str = switch_odbc_handle_get_error(globals.master_odbc, stmt); + if (!switch_strlen_zero(err_str)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ERR: [%s]\n[%s]\n", sql, switch_str_nil(err_str)); + } + switch_safe_free(err_str); + status = SWITCH_STATUS_FALSE; + } + SQLFreeHandle(SQL_HANDLE_STMT, stmt); + } else { +#endif + if (!(db = switch_core_db_open_file(globals.dbname))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB %s\n", globals.dbname); + status = SWITCH_STATUS_FALSE; + goto end; + } + + err_str = NULL; + switch_core_db_exec(db, sql, NULL, NULL, &err_str); + if (err_str) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error [%s]\n[%s]\n", sql, err_str); + free(err_str); + } + + switch_core_db_close(db); + +#ifdef SWITCH_HAVE_ODBC + } +#endif + + end: + if (mutex) { + switch_mutex_unlock(mutex); + } + + return status; +} + +static switch_bool_t fifo_execute_sql_callback(switch_mutex_t *mutex, char *sql, switch_core_db_callback_func_t callback, void *pdata) +{ + switch_bool_t ret = SWITCH_FALSE; + switch_core_db_t *db; + char *errmsg = NULL; + + if (mutex) { + switch_mutex_lock(mutex); + } + +#ifdef SWITCH_HAVE_ODBC + if (globals.odbc_dsn) { + switch_odbc_handle_callback_exec(globals.master_odbc, sql, callback, pdata); + } else { +#endif + if (!(db = switch_core_db_open_file(globals.dbname))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB %s\n", globals.dbname); + goto end; + } + + switch_core_db_exec(db, sql, callback, pdata, &errmsg); + + if (errmsg) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg); + free(errmsg); + } + + if (db) { + switch_core_db_close(db); + } +#ifdef SWITCH_HAVE_ODBC + } +#endif + + end: + if (mutex) { + switch_mutex_unlock(mutex); + } + + return ret; +} + static fifo_node_t *create_node(const char *name, uint32_t importance) { fifo_node_t *node; @@ -255,22 +365,223 @@ static fifo_node_t *create_node(const char *name, uint32_t importance) } node = switch_core_alloc(globals.pool, sizeof(*node)); - node->name = switch_core_strdup(globals.pool, name); + node->pool = globals.pool; + + node->name = switch_core_strdup(node->pool, name); for (x = 0; x < MAX_PRI; x++) { - switch_queue_create(&node->fifo_list[x], SWITCH_CORE_QUEUE_LEN, globals.pool); + switch_queue_create(&node->fifo_list[x], SWITCH_CORE_QUEUE_LEN, node->pool); switch_assert(node->fifo_list[x]); } - switch_core_hash_init(&node->caller_hash, globals.pool); - switch_core_hash_init(&node->consumer_hash, globals.pool); - - switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, globals.pool); - switch_core_hash_insert(globals.fifo_hash, name, node); - + switch_core_hash_init(&node->caller_hash, node->pool); + switch_core_hash_init(&node->consumer_hash, node->pool); + switch_thread_rwlock_create(&node->rwlock, node->pool); + switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool); node->importance = importance; - + switch_core_hash_insert(globals.fifo_hash, name, node); return node; } +static int node_idle_consumers(fifo_node_t *node) +{ + switch_hash_index_t *hi; + void *val; + const void *var; + switch_core_session_t *session; + switch_channel_t *channel; + int total = 0; + + switch_mutex_lock(node->mutex); + for (hi = switch_hash_first(NULL, node->consumer_hash); hi; hi = switch_hash_next(hi)) { + switch_hash_this(hi, &var, NULL, &val); + session = (switch_core_session_t *) val; + channel = switch_core_session_get_channel(session); + if (!switch_channel_test_flag(channel, CF_BRIDGED)) { + total++; + } + } + switch_mutex_unlock(node->mutex); + + return total; + +} + +static switch_status_t hanguphook(switch_core_session_t *session) +{ + switch_channel_t *channel = switch_core_session_get_channel(session); + switch_channel_state_t state = switch_channel_get_state(channel); + const char *uuid = NULL; + char sql[256] = ""; + + if (state == CS_HANGUP || state == CS_ROUTING) { + if ((uuid = switch_channel_get_variable(channel, "fifo_outbound_uuid"))) { + switch_snprintf(sql, sizeof(sql), "update fifo_outbound set use_count=use_count-1, outbound_call_count=outbound_call_count+1, next_avail=%ld + lag where uuid='%s'", + (long)switch_timestamp(NULL), uuid); + + fifo_execute_sql(sql, globals.sql_mutex); + } + switch_core_event_hook_remove_state_change(session, hanguphook); + } + return SWITCH_STATUS_SUCCESS; +} + +struct call_helper { + char *uuid; + char *node_name; + char *originate_string; + int timeout; + switch_memory_pool_t *pool; +}; + +static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) +{ + struct call_helper *h = (struct call_helper *) obj; + + switch_core_session_t *session = NULL; + switch_channel_t *channel; + switch_call_cause_t cause = SWITCH_CAUSE_NONE; + switch_caller_extension_t *extension = NULL; + char *app_name, *arg = NULL; + char sql[256] = ""; + + switch_snprintf(sql, sizeof(sql), "update fifo_outbound set use_count=use_count+1 where uuid='%s'", h->uuid); + fifo_execute_sql(sql, globals.sql_mutex); + + if (switch_ivr_originate(NULL, &session, &cause, h->originate_string, h->timeout, NULL, NULL, NULL, NULL, SOF_NONE) != SWITCH_STATUS_SUCCESS) { + switch_snprintf(sql, sizeof(sql), "update fifo_outbound set use_count=use_count-1, outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag where uuid='%s'", + (long)switch_timestamp(NULL), h->uuid); + fifo_execute_sql(sql, globals.sql_mutex); + goto end; + } + + + channel = switch_core_session_get_channel(session); + switch_channel_set_variable(channel, "fifo_outbound_uuid", h->uuid); + switch_core_event_hook_add_state_change(session, hanguphook); + app_name = "fifo"; + arg = switch_core_session_sprintf(session, "%s out wait", h->node_name); + extension = switch_caller_extension_new(session, app_name, arg); + switch_caller_extension_add_application(session, extension, app_name, arg); + switch_channel_set_caller_extension(channel, extension); + switch_channel_set_state(channel, CS_EXECUTE); + switch_core_session_rwunlock(session); + + end: + + switch_core_destroy_memory_pool(&h->pool); + + return NULL; +} + +static int place_call_callback(void *pArg, int argc, char **argv, char **columnNames) +{ + + int *need = (int *) pArg; + + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + switch_memory_pool_t *pool; + struct call_helper *h; + + switch_core_new_memory_pool(&pool); + h = switch_core_alloc(pool, sizeof(*h)); + h->pool = pool; + h->uuid = switch_core_strdup(h->pool, argv[0]); + h->node_name = switch_core_strdup(h->pool, argv[1]); + h->originate_string = switch_core_strdup(h->pool, argv[2]); + h->timeout = atoi(argv[5]); + + + switch_threadattr_create(&thd_attr, h->pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, o_thread_run, h, h->pool); + + (*need)--; + + return *need ? 0 : -1; +} + +static void find_consumers(fifo_node_t *node) +{ + int need = node_consumer_wait_count(node); + char *sql; + + sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname " + "from fifo_outbound where (use_count < simo_count) and (next_avail = 0 or next_avail <= %ld) order by outbound_call_count", + (long) switch_timestamp(NULL)); + switch_assert(sql); + fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_callback, &need); + free(sql); +} + +static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj) +{ + fifo_node_t *node; + + globals.node_thread_running = 1; + + while(globals.node_thread_running == 1) { + switch_hash_index_t *hi; + void *val; + const void *var; + int ppl_waiting, consumer_total, idle_consumers; + + switch_mutex_lock(globals.mutex); + for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { + switch_hash_this(hi, &var, NULL, &val); + if ((node = (fifo_node_t *) val) && node->has_outbound) { + switch_mutex_lock(node->mutex); + ppl_waiting = node_consumer_wait_count(node); + consumer_total = node->consumer_count; + idle_consumers = node_idle_consumers(node); + + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, + //"%s waiting %d consumer_total %d idle_consumers %d\n", node->name, ppl_waiting, consumer_total, idle_consumers); + + if (ppl_waiting && (!consumer_total || !idle_consumers)) { + find_consumers(node); + } + switch_mutex_unlock(node->mutex); + } + } + switch_mutex_unlock(globals.mutex); + + switch_yield(1000000); + } + + globals.node_thread_running = 0; + + return NULL; +} + +static void start_node_thread(switch_memory_pool_t *pool) +{ + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + + switch_threadattr_create(&thd_attr, pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, node_thread_run, pool, pool); +} + +static int stop_node_thread(void) +{ + int sanity = 20; + + if (globals.node_thread_running) { + globals.node_thread_running = -1; + while(globals.node_thread_running) { + switch_yield(500000); + if (!--sanity) { + return -1; + } + } + } + + return 0; +} + static void send_presence(fifo_node_t *node) { switch_event_t *event; @@ -509,6 +820,14 @@ SWITCH_STANDARD_APP(fifo_function) node->start_waiting = switch_timestamp_now(); } + if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { + switch_channel_event_set_data(channel, event); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "push"); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Slot", "%d", p); + switch_event_fire(&event); + } + switch_queue_push(node->fifo_list[p], (void *) strdup(uuid)); if (!pri) { @@ -524,14 +843,6 @@ SWITCH_STANDARD_APP(fifo_function) switch_channel_set_variable(channel, "fifo_status", "WAITING"); switch_channel_set_variable(channel, "fifo_timestamp", date); - if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { - switch_channel_event_set_data(channel, event); - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", argv[0]); - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "push"); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Slot", "%d", p); - switch_event_fire(&event); - } - switch_channel_set_flag(channel, CF_TAGGED); if (chime_list) { @@ -610,6 +921,7 @@ SWITCH_STANDARD_APP(fifo_function) if (!aborted && switch_channel_ready(channel)) { switch_channel_set_state(channel, CS_HIBERNATE); + return; } else { ts = switch_timestamp_now(); switch_time_exp_lt(&tm, ts); @@ -866,7 +1178,26 @@ SWITCH_STANDARD_APP(fifo_function) } if (!(switch_channel_ready(channel))) { - switch_channel_hangup(other_channel, SWITCH_CAUSE_NORMAL_CLEARING); + const char *app = switch_channel_get_variable(other_channel, "current_application"); + const char *arg = switch_channel_get_variable(other_channel, "current_application_data"); + switch_caller_extension_t *extension = NULL; + + switch_mutex_lock(node->mutex); + node->caller_count--; + switch_core_hash_delete(node->caller_hash, uuid); + switch_mutex_unlock(node->mutex); + send_presence(node); + + + if (app) { + extension = switch_caller_extension_new(other_session, app, arg); + switch_caller_extension_add_application(other_session, extension, app, arg); + switch_channel_set_caller_extension(other_channel, extension); + switch_channel_set_state(other_channel, CS_EXECUTE); + } else { + switch_channel_hangup(other_channel, SWITCH_CAUSE_NORMAL_CLEARING); + } + switch_core_session_rwunlock(other_session); break; } @@ -933,7 +1264,10 @@ SWITCH_STANDARD_APP(fifo_function) sfifo_consumer_wrapup_time = switch_channel_get_variable(channel, "fifo_consumer_wrapup_time"); if (!switch_strlen_zero(sfifo_consumer_wrapup_time)){ fifo_consumer_wrapup_time = atoi(sfifo_consumer_wrapup_time); + } else { + fifo_consumer_wrapup_time = 5000; } + memset(buf, 0, sizeof(buf)); if (fifo_consumer_wrapup_time || !switch_strlen_zero(fifo_consumer_wrapup_key)) { @@ -976,7 +1310,7 @@ SWITCH_STANDARD_APP(fifo_function) } } - } else if (fifo_consumer_wrapup_time && !strcmp(buf, fifo_consumer_wrapup_key)) { + } else if (fifo_consumer_wrapup_time && (switch_strlen_zero(fifo_consumer_wrapup_key) || !strcmp(buf, fifo_consumer_wrapup_key))) { while(switch_channel_ready(channel)) { wrapup_time_elapsed = (switch_timestamp_now() - wrapup_time_started) / 1000; if (wrapup_time_elapsed > fifo_consumer_wrapup_time) { @@ -1025,6 +1359,67 @@ SWITCH_STANDARD_APP(fifo_function) } } +struct xml_helper { + switch_xml_t xml; + fifo_node_t *node; + char *container; + char *tag; + int cc_off; + int verbose; +}; + +static int xml_callback(void *pArg, int argc, char **argv, char **columnNames) +{ + struct xml_helper *h = (struct xml_helper *) pArg; + switch_xml_t x_tmp, x_out; + int c_off = 0; + char exp_buf[128] = ""; + switch_time_exp_t tm; + switch_time_t etime = 0; + + if (argv[7]) { + if ((etime = atol(argv[7]))) { + switch_size_t retsize; + + switch_time_exp_lt(&tm, switch_time_from_sec(etime)); + switch_strftime_nocheck(exp_buf, &retsize, sizeof(exp_buf), "%Y-%m-%d %T", &tm); + } else { + switch_set_string(exp_buf, "now"); + } + } + + x_tmp = switch_xml_add_child_d(h->xml, h->container, h->cc_off++); + + x_out = switch_xml_add_child_d(x_tmp, h->tag, c_off++); + switch_xml_set_attr_d(x_out, "timeout", argv[5]); + switch_xml_set_attr_d(x_out, "simo", argv[3]); + switch_xml_set_attr_d(x_out, "lag", argv[6]); + switch_xml_set_attr_d(x_out, "outbound-call-count", argv[10]); + switch_xml_set_attr_d(x_out, "outbound-fail-count", argv[11]); + switch_xml_set_attr_d(x_out, "next-available", exp_buf); + + switch_xml_set_txt_d(x_out, argv[2]); + + return 0; +} + +static int xml_outbound(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose) +{ + struct xml_helper h; + char *sql = "select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname from fifo_outbound"; + + h.xml = xml; + h.node = node; + h.container = container; + h.tag = tag; + h.cc_off = cc_off; + h.verbose = verbose; + + fifo_execute_sql_callback(globals.sql_mutex, sql, xml_callback, &h); + + return h.cc_off; +} + static int xml_hash(switch_xml_t xml, switch_hash_t *hash, char *container, char *tag, int cc_off, int verbose) { switch_xml_t x_tmp, x_caller, x_cp, variables; @@ -1098,7 +1493,8 @@ static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int ve switch_xml_set_attr_d(x_fifo, "waiting_count", tmp); switch_snprintf(tmp, sizeof(buffer), "%u", node->importance); switch_xml_set_attr_d(x_fifo, "importance", tmp); - + + cc_off = xml_outbound(x_fifo, node, "outbound", "member", cc_off, verbose); cc_off = xml_hash(x_fifo, node->caller_hash, "callers", "caller", cc_off, verbose); cc_off = xml_hash(x_fifo, node->consumer_hash, "consumers", "consumer", cc_off, verbose); } @@ -1133,6 +1529,10 @@ SWITCH_STANDARD_API(fifo_api_function) switch_mutex_lock(globals.mutex); verbose = !strcasecmp(argv[0], "list_verbose"); + if (!strcasecmp(argv[0], "reparse")) { + load_config(1, argv[1] && !strcasecmp(argv[1], "del_all")); + } + if (!strcasecmp(argv[0], "list") || verbose) { char *xml_text = NULL; switch_xml_t x_report = switch_xml_new("fifo_report"); @@ -1202,6 +1602,310 @@ SWITCH_STANDARD_API(fifo_api_function) return SWITCH_STATUS_SUCCESS; } + +const char outbound_sql[] = + "create table fifo_outbound (\n" + " uuid varchar(255),\n" + " fifo_name varchar(255),\n" + " originate_string varchar(255),\n" + " simo_count integer,\n" + " use_count integer,\n" + " timeout integer,\n" + " lag integer,\n" + " next_avail integer,\n" + " expires integer,\n" + " static integer,\n" + " outbound_call_count integer," + " outbound_fail_count integer," + " hostname varchar(255)\n" + ");\n"; + + +static switch_status_t load_config(int reload, int del_all) +{ + char *cf = "fifo.conf"; + switch_xml_t cfg, xml, fifo, fifos, member, settings, param; + char *odbc_user = NULL; + char *odbc_pass = NULL; + switch_core_db_t *db; + switch_status_t status = SWITCH_STATUS_SUCCESS; + + gethostname(globals.hostname, sizeof(globals.hostname)); + + if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf); + return SWITCH_STATUS_TERM; + } + + if ((settings = switch_xml_child(cfg, "settings"))) { + for (param = switch_xml_child(settings, "param"); param; param = param->next) { + char *var = NULL; + char *val = NULL; + + var = (char *) switch_xml_attr_soft(param, "name"); + val = (char *) switch_xml_attr_soft(param, "value"); + + if (!strcasecmp(var, "odbc-dsn") && !switch_strlen_zero(val)) { +#ifdef SWITCH_HAVE_ODBC + globals.odbc_dsn = switch_core_strdup(globals.pool, val); + if ((odbc_user = strchr(globals.odbc_dsn, ':'))) { + *odbc_user++ = '\0'; + if ((odbc_pass = strchr(odbc_user, ':'))) { + *odbc_pass++ = '\0'; + } + } +#else + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC IS NOT AVAILABLE!\n"); +#endif + } + } + } + + + if (switch_strlen_zero(globals.odbc_dsn) || switch_strlen_zero(odbc_user) || switch_strlen_zero(odbc_pass)) { + globals.dbname = "fifo"; + } + +#ifdef SWITCH_HAVE_ODBC + if (globals.odbc_dsn) { + if (!(globals.master_odbc = switch_odbc_handle_new(globals.odbc_dsn, odbc_user, odbc_pass))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot Open ODBC Database!\n"); + status = SWITCH_STATUS_FALSE; + goto done; + } + if (switch_odbc_handle_connect(globals.master_odbc) != SWITCH_ODBC_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot Open ODBC Database!\n"); + status = SWITCH_STATUS_FALSE; + goto done; + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Connected ODBC DSN: %s\n", globals.odbc_dsn); + if (switch_odbc_handle_exec(globals.master_odbc, "delete from fifo_outbound", NULL) != SWITCH_STATUS_SUCCESS) { + if (switch_odbc_handle_exec(globals.master_odbc, (char *)outbound_sql, NULL) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot Create SQL Database!\n"); + } + } + } else { +#endif + if ((db = switch_core_db_open_file(globals.dbname))) { + switch_core_db_test_reactive(db, "delete from fifo_outbound", NULL, (char *)outbound_sql); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot Open SQL Database!\n"); + status = SWITCH_STATUS_FALSE; + goto done; + } + switch_core_db_close(db); +#ifdef SWITCH_HAVE_ODBC + } +#endif + + if ((fifos = switch_xml_child(cfg, "fifos"))) { + for (fifo = switch_xml_child(fifos, "fifo"); fifo; fifo = fifo->next) { + const char *name; + const char *importance; + int imp = 0; + int simo_i = 1; + int timeout_i = 60; + int lag_i = 10; + fifo_node_t *node; + char *sql; + + name = switch_xml_attr(fifo, "name"); + + if ((importance = switch_xml_attr(fifo, "importance"))) { + if ((imp = atoi(importance)) < 0) { + imp = 0; + } + } + + if (!name) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "fifo has no name!\n"); + continue; + } + + switch_mutex_lock(globals.mutex); + if (!(node = switch_core_hash_find(globals.fifo_hash, name))) { + node = create_node(name, imp); + } + switch_mutex_unlock(globals.mutex); + + if (del_all) { + sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and hostname='%q'", node->name, globals.hostname); + } else { + sql = switch_mprintf("delete from fifo_outbound where static=1 and fifo_name='%q' and hostname='%q'", node->name, globals.hostname); + } + + fifo_execute_sql(sql, globals.sql_mutex); + switch_safe_free(sql); + + switch_mutex_lock(node->mutex); + + for (member = switch_xml_child(fifo, "member"); member; member = member->next) { + const char *simo = switch_xml_attr_soft(member, "simo"); + const char *lag = switch_xml_attr_soft(member, "lag"); + const char *timeout = switch_xml_attr_soft(member, "timeout"); + + char *sql; + char digest[SWITCH_MD5_DIGEST_STRING_SIZE] = { 0 }; + switch_md5_string(digest, (void *) member->txt, strlen(member->txt)); + + if (simo) { + simo_i = atoi(simo); + } + + if (timeout) { + if ((timeout_i = atoi(timeout)) < 10) { + timeout_i = 60; + } + + } + + if (lag) { + if ((lag_i = atoi(lag)) < 0) { + lag_i = 10; + } + } + + sql = switch_mprintf("insert into fifo_outbound " + "(uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname) " + "values ('%q','%q','%q',%d,%d,%d,%d,0,0,1,0,0,'%q')", + digest, node->name, member->txt, simo_i, 0, timeout_i, lag_i, 0, 0, globals.hostname + ); + switch_assert(sql); + fifo_execute_sql(sql, globals.sql_mutex); + free(sql); + + node->has_outbound = 1; + + } + switch_mutex_unlock(node->mutex); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s started\n", node->name); + + } + } + switch_xml_free(xml); + + done: + + return status; +} + + +static void fifo_member_add(char *fifo_name, char *originate_string, int simo_count, int timeout, int lag, time_t expires) +{ + char digest[SWITCH_MD5_DIGEST_STRING_SIZE] = { 0 }; + char *sql; + fifo_node_t *node = NULL; + + switch_md5_string(digest, (void *) originate_string, strlen(originate_string)); + + sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q'", fifo_name, digest); + switch_assert(sql); + fifo_execute_sql(sql, globals.sql_mutex); + free(sql); + + + switch_mutex_lock(globals.mutex); + if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) { + node = create_node(fifo_name, 0); + } + switch_mutex_unlock(globals.mutex); + + node->has_outbound = 1; + + sql = switch_mprintf("insert into fifo_outbound " + "(uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname) " + "values ('%q','%q','%q',%d,%d,%d,%d,%d,%ld,0,0,0,'%q')", + digest, fifo_name, originate_string, simo_count, 0, timeout, lag, 0, (long)expires, globals.hostname + ); + switch_assert(sql); + fifo_execute_sql(sql, globals.sql_mutex); + free(sql); + +} + +static void fifo_member_del(char *fifo_name, char *originate_string) +{ + char digest[SWITCH_MD5_DIGEST_STRING_SIZE] = { 0 }; + char *sql; + + switch_md5_string(digest, (void *) originate_string, strlen(originate_string)); + + sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q' and hostname='%q'", fifo_name, digest, globals.hostname); + switch_assert(sql); + fifo_execute_sql(sql, globals.sql_mutex); + free(sql); +} + +#define FIFO_MEMBER_API_SYNTAX "[add [] [] [] | del ]" +SWITCH_STANDARD_API(fifo_member_api_function) +{ + char *fifo_name; + char *originate_string; + int simo_count = 1; + int timeout = 60; + int lag = 5; + char *action; + char *mydata = NULL, *argv[8] = { 0 }; + int argc; + time_t expires = 0; + + mydata = strdup(cmd); + switch_assert(mydata); + + argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0]))); + + if (argc < 3) { + stream->write_function(stream, "%s", "-ERR Invalid!\n"); + goto done; + } + + action = argv[0]; + fifo_name = argv[1]; + originate_string = argv[2]; + + if (!strcasecmp(action, "add")) { + if (argc > 3) { + simo_count = atoi(argv[3]); + } + if (argc > 4) { + timeout = atoi(argv[4]); + } + if (argc > 5) { + lag = atoi(argv[5]); + } + if (argc > 6) { + expires = switch_timestamp(NULL) + atoi(argv[6]); + } + if (simo_count < 0) { + simo_count = 1; + } + if (timeout < 0) { + timeout = 60; + } + if (lag < 0) { + lag = 5; + } + + fifo_member_add(fifo_name, originate_string, simo_count, timeout, lag, expires); + stream->write_function(stream, "%s", "+OK\n"); + } else if (!strcasecmp(action, "del")) { + fifo_member_del(fifo_name, originate_string); + stream->write_function(stream, "%s", "+OK\n"); + } else { + stream->write_function(stream, "%s", "-ERR Invalid!\n"); + goto done; + } + + done: + + free(mydata); + + return SWITCH_STATUS_SUCCESS; + +} + SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load) { switch_application_interface_t *app_interface; @@ -1223,11 +1927,13 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load) switch_core_new_memory_pool(&globals.pool); switch_core_hash_init(&globals.fifo_hash, globals.pool); switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool); + switch_mutex_init(&globals.sql_mutex, SWITCH_MUTEX_NESTED, globals.pool); /* connect my internal structure to the blank pointer passed to me */ *module_interface = switch_loadable_module_create_module_interface(pool, modname); SWITCH_ADD_APP(app_interface, "fifo", "Park with FIFO", FIFO_DESC, fifo_function, FIFO_USAGE, SAF_NONE); SWITCH_ADD_API(commands_api_interface, "fifo", "Return data about a fifo", fifo_api_function, FIFO_API_SYNTAX); + SWITCH_ADD_API(commands_api_interface, "fifo_member", "Add members to a fifo", fifo_member_api_function, FIFO_MEMBER_API_SYNTAX); switch_console_set_complete("add fifo list"); switch_console_set_complete("add fifo list_verbose"); switch_console_set_complete("add fifo count"); @@ -1235,6 +1941,9 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load) globals.running = 1; + load_config(0,1); + start_node_thread(globals.pool); + return SWITCH_STATUS_SUCCESS; } @@ -1256,10 +1965,18 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) globals.running = 0; /* Cleanup */ + + if (globals.node_thread_running) { + stop_node_thread(); + } + + for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { int x = 0; switch_hash_this(hi, NULL, NULL, &val); node = (fifo_node_t *) val; + + switch_thread_rwlock_wrlock(node->rwlock); for (x = 0; x < MAX_PRI; x++) { while (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS) { free(pop); @@ -1268,6 +1985,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) switch_core_hash_destroy(&node->caller_hash); switch_core_hash_destroy(&node->consumer_hash); + switch_thread_rwlock_unlock(node->rwlock); } switch_core_hash_destroy(&globals.fifo_hash); memset(&globals, 0, sizeof(globals));