From 61f23283eca8452c6982150eb5e36fa11936bbdc Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Fri, 4 Apr 2008 21:18:16 +0000 Subject: [PATCH] add some goodies to mod_fifo git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@8026 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- conf/dialplan/default.xml | 3 +- src/include/switch_types.h | 3 +- src/mod/applications/mod_fifo/mod_fifo.c | 443 ++++++++++++++---- .../mod_local_stream/mod_local_stream.c | 6 +- src/switch_ivr.c | 6 + src/switch_ivr_play_say.c | 6 + 6 files changed, 364 insertions(+), 103 deletions(-) diff --git a/conf/dialplan/default.xml b/conf/dialplan/default.xml index bf9fc4df7c..b215f77be0 100644 --- a/conf/dialplan/default.xml +++ b/conf/dialplan/default.xml @@ -216,7 +216,8 @@ --> - + + diff --git a/src/include/switch_types.h b/src/include/switch_types.h index 4c91b79bd6..09868da572 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -937,7 +937,8 @@ typedef enum { SWITCH_FILE_PAUSE = (1 << 8), SWITCH_FILE_NATIVE = (1 << 9), SWITCH_FILE_SEEK = (1 << 10), - SWITCH_FILE_OPEN = (1 << 11) + SWITCH_FILE_OPEN = (1 << 11), + SWITCH_FILE_CALLBACK = (1 << 12) } switch_file_flag_t; typedef enum { diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index c12bed5ae1..499c1dc2cd 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -45,8 +45,9 @@ struct fifo_node { switch_hash_t *caller_hash; switch_hash_t *consumer_hash; int caller_count; - int waiting_count; int consumer_count; + switch_time_t start_waiting; + uint32_t importance; }; typedef struct fifo_node fifo_node_t; @@ -103,13 +104,109 @@ static switch_status_t moh_on_dtmf(switch_core_session_t *session, void *input, #define check_string(s) if (!switch_strlen_zero(s) && !strcasecmp(s, "undef")) { s = NULL; } -static switch_status_t read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data) +static int node_consumer_wait_count(fifo_node_t *node) { - fifo_node_t *node = (fifo_node_t *) user_data; - int x = 0, total = 0; + int i, len = 0; - for (x = 0; x < MAX_PRI; x++) { - total += switch_queue_size(node->fifo_list[x]); + for (i = 0; i < MAX_PRI; i++) { + len += switch_queue_size(node->fifo_list[i]); + } + + return len; +} + +static void node_remove_uuid(fifo_node_t *node, const char *uuid) +{ + int i, len = 0; + void *pop = NULL; + + for (i = 0; i < MAX_PRI; i++) { + if (!(len = switch_queue_size(node->fifo_list[i]))) { + continue; + } + while(len) { + if (switch_queue_trypop(node->fifo_list[i], &pop) == SWITCH_STATUS_SUCCESS && pop) { + if (!strcmp((char *)pop, uuid)) { + free(pop); + goto end; + } + switch_queue_push(node->fifo_list[i], pop); + } + len--; + } + } + + end: + + if (!node_consumer_wait_count(node)) { + node->start_waiting = 0; + } + + return; +} + +#define MAX_CHIME 25 +struct fifo_chime_data { + char *list[MAX_CHIME]; + int total; + int index; + time_t next; + int freq; + int abort; + int orbit_timeout; + int do_orbit; + char *orbit_exten; +}; +typedef struct fifo_chime_data fifo_chime_data_t; + + + +static switch_status_t caller_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data) +{ + fifo_chime_data_t *cd = (fifo_chime_data_t *) user_data; + + if (cd && cd->total && switch_timestamp(NULL) >= cd->next) { + if (cd->index == MAX_CHIME || cd->index == cd->total || !cd->list[cd->index]) { + cd->index = 0; + } + + if (cd->list[cd->index]) { + switch_input_args_t args = { 0 }; + char buf[25] = ""; + switch_channel_t *channel = switch_core_session_get_channel(session); + const char *caller_exit_key = switch_channel_get_variable(channel, "fifo_caller_exit_key"); + args.input_callback = moh_on_dtmf; + args.buf = buf; + args.buflen = sizeof(buf); + switch_ivr_play_file(session, NULL, cd->list[cd->index], &args); + if (caller_exit_key && *buf == *caller_exit_key) { + cd->abort = 1; + return SWITCH_STATUS_FALSE; + } + cd->next = switch_timestamp(NULL) + cd->freq; + cd->index++; + } + } else if (cd->orbit_timeout && switch_timestamp(NULL) >= cd->orbit_timeout) { + cd->do_orbit = 1; + return SWITCH_STATUS_FALSE; + } + + return SWITCH_STATUS_SUCCESS; +} + + +static switch_status_t consumer_read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data) +{ + fifo_node_t *node, **node_list = (fifo_node_t **) user_data; + int x = 0, total = 0, i = 0; + + for(i = 0; ; i++) { + if (!(node = node_list[i])) { + break; + } + for (x = 0; x < MAX_PRI; x++) { + total += switch_queue_size(node->fifo_list[x]); + } } if (total) { @@ -127,7 +224,7 @@ static struct { } globals; -static fifo_node_t *create_node(const char *name) +static fifo_node_t *create_node(const char *name, uint32_t importance) { fifo_node_t *node; int x = 0; @@ -148,12 +245,15 @@ static fifo_node_t *create_node(const char *name) switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, globals.pool); switch_core_hash_insert(globals.fifo_hash, name, node); + node->importance = importance; + return node; } static void send_presence(fifo_node_t *node) { switch_event_t *event; + int wait_count = 0; if (!globals.running) { return; @@ -163,8 +263,8 @@ static void send_presence(fifo_node_t *node) switch_event_add_header(event, SWITCH_STACK_BOTTOM, "proto", "%s", "park"); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s", node->name); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s", node->name); - if (node->waiting_count > 0) { - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Active (%d waiting)", node->waiting_count); + if ((wait_count = node_consumer_wait_count(node)) > 0) { + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Active (%d waiting)", wait_count); } else { switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Idle"); } @@ -173,9 +273,9 @@ static void send_presence(fifo_node_t *node) switch_event_add_header(event, SWITCH_STACK_BOTTOM, "alt_event_type", "dialog"); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_count", "%d", 0); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "channel-state", "%s", node->waiting_count > 0 ? "CS_RING" : "CS_HANGUP"); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "channel-state", "%s", wait_count > 0 ? "CS_RING" : "CS_HANGUP"); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "unique-id", "%s", node->name); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "answer-state", "%s", node->waiting_count > 0 ? "early" : "terminated"); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "answer-state", "%s", wait_count > 0 ? "early" : "terminated"); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "call-direction", "%s", "inbound"); switch_event_fire(&event); } @@ -203,7 +303,7 @@ static void pres_event_handler(switch_event_t *event) switch_mutex_lock(globals.mutex); if (!(node = switch_core_hash_find(globals.fifo_hash, node_name))) { - node = create_node(node_name); + node = create_node(node_name, 0); } switch_mutex_lock(node->mutex); @@ -215,15 +315,16 @@ static void pres_event_handler(switch_event_t *event) switch_safe_free(dup_to); } +#define MAX_NODES_PER_CONSUMER 25 #define FIFO_DESC "Fifo for stacking parked calls." #define FIFO_USAGE " [in [|undef] [|undef] | out [wait|nowait] [|undef] [|undef]]" SWITCH_STANDARD_APP(fifo_function) { int argc; char *mydata = NULL, *argv[5] = { 0 }; - fifo_node_t *node; + fifo_node_t *node = NULL, *node_list[MAX_NODES_PER_CONSUMER+1] = { 0 }; switch_channel_t *channel = switch_core_session_get_channel(session); - int nowait = 0; + int do_wait = 1, node_count = 0, i = 0; const char *moh = NULL; const char *announce = NULL; switch_event_t *event = NULL; @@ -231,6 +332,12 @@ SWITCH_STANDARD_APP(fifo_function) switch_time_exp_t tm; switch_time_t ts = switch_timestamp_now(); switch_size_t retsize; + char *list_string; + int nlist_count; + char *nlist[MAX_NODES_PER_CONSUMER]; + int consumer = 0; + const char *arg_fifo_name = NULL; + const char *arg_inout = NULL; const char *serviced_uuid = NULL; if (!globals.running) { @@ -244,44 +351,105 @@ SWITCH_STANDARD_APP(fifo_function) mydata = switch_core_session_strdup(session, data); switch_assert(mydata); - if ((argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0])))) < 2 || !argv[0]) { + + argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0]))); + arg_fifo_name = argv[0]; + arg_inout = argv[1]; + + if (!arg_fifo_name && arg_inout) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE); return; } + if (!strcasecmp(arg_inout, "out")) { + consumer = 1; + } else if (strcasecmp(arg_inout, "in")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE); + return; + } + + list_string = switch_core_session_strdup(session, arg_fifo_name); + + if (!(nlist_count = switch_separate_string(list_string, ',', nlist, (sizeof(nlist) / sizeof(nlist[0]))))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE); + return; + } + + if (!consumer && nlist_count > 1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE); + return; + } + switch_mutex_lock(globals.mutex); - if (!(node = switch_core_hash_find(globals.fifo_hash, argv[0]))) { - node = create_node(argv[0]); - } + for(i = 0; i < nlist_count; i++) { + int importance = 0; + char *p; + + if ((p = strrchr(nlist[i], '!'))) { + *p++ = '\0'; + importance = atoi(p); + if (importance < 0) { + importance = 0; + } + } + + if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) { + node = create_node(nlist[i], importance); + } + node_list[node_count++] = node; + } switch_mutex_unlock(globals.mutex); moh = switch_channel_get_variable(channel, "fifo_music"); announce = switch_channel_get_variable(channel, "fifo_announce"); + check_string(announce); + check_string(moh); - if (!strcasecmp(argv[1], "in")) { + if (!consumer && node) { switch_core_session_t *other_session; switch_channel_t *other_channel; const char *uuid = strdup(switch_core_session_get_uuid(session)); const char *pri; + char tmp[25] = ""; int p = 0; int aborted = 0; + fifo_chime_data_t cd = { {0} }; + const char *chime_list = switch_channel_get_variable(channel, "fifo_chime_list"); + const char *chime_freq = switch_channel_get_variable(channel, "fifo_chime_freq"); + const char *orbit_var = switch_channel_get_variable(channel, "fifo_orbit_exten"); + const char *orbit_ann = switch_channel_get_variable(channel, "fifo_orbit_announce"); + const char *caller_exit_key = switch_channel_get_variable(channel, "fifo_caller_exit_key"); + int freq = 30; + int ftmp = 0; + int to = 60; + + if (orbit_var) { + char *ot; + if ((cd.orbit_exten = switch_core_session_strdup(session, orbit_var))) { + if ((ot = strchr(cd.orbit_exten, ':'))) { + *ot++ = '\0'; + if ((to = atoi(ot)) < 0) { + to = 60; + } + } + cd.orbit_timeout = switch_timestamp(NULL) + to; + } + } + + + if (chime_freq) { + ftmp = atoi(chime_freq); + if (ftmp > 0) { + freq = ftmp; + } + } + + switch_channel_answer(channel); - if (argc > 2) { - announce = argv[2]; - } - - if (argc > 3) { - moh = argv[3]; - } - - check_string(announce); - check_string(moh); - switch_mutex_lock(node->mutex); node->caller_count++; - node->waiting_count++; send_presence(node); switch_core_hash_insert(node->caller_hash, uuid, session); @@ -292,11 +460,20 @@ SWITCH_STANDARD_APP(fifo_function) if (p >= MAX_PRI) { p = MAX_PRI - 1; } - + + if (!node_consumer_wait_count(node)) { + node->start_waiting = switch_timestamp_now(); + } + switch_queue_push(node->fifo_list[p], (void *)uuid); - switch_mutex_unlock(node->mutex); + if (!pri) { + switch_snprintf(tmp, sizeof(tmp), "%d", p); + switch_channel_set_variable(channel, "fifo_priority", tmp); + } + switch_mutex_unlock(node->mutex); + ts = switch_timestamp_now(); switch_time_exp_lt(&tm, ts); switch_strftime(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm); @@ -307,19 +484,36 @@ SWITCH_STANDARD_APP(fifo_function) switch_channel_event_set_data(channel, event); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Name", "%s", argv[0]); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "push"); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Slot", "%d", p); switch_event_fire(&event); } switch_channel_set_flag(channel, CF_TAGGED); + if (chime_list) { + char *list_dup = switch_core_session_strdup(session, chime_list); + cd.total = switch_separate_string(list_dup, ',', cd.list, (sizeof(cd.list) / sizeof(cd.list[0]))); + cd.freq = freq; + cd.next = switch_timestamp(NULL) + cd.freq; + } + while(switch_channel_ready(channel)) { switch_input_args_t args = { 0 }; char buf[25] = ""; - const char *caller_exit_key = switch_channel_get_variable(channel, "fifo_caller_exit_key"); + args.input_callback = moh_on_dtmf; args.buf = buf; args.buflen = sizeof(buf); - + + if (cd.total || cd.orbit_timeout) { + args.read_frame_callback = caller_read_frame_callback; + args.user_data = &cd; + } + + if (cd.abort || cd.do_orbit) { + aborted = 1; + goto abort; + } if ((serviced_uuid = switch_channel_get_variable(channel, "fifo_serviced_uuid"))) { break; @@ -367,25 +561,32 @@ SWITCH_STANDARD_APP(fifo_function) ts = switch_timestamp_now(); switch_time_exp_lt(&tm, ts); switch_strftime(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm); - switch_channel_set_variable(channel, "fifo_status", "ABORTED"); + switch_channel_set_variable(channel, "fifo_status", cd.do_orbit ? "TIMEOUT" : "ABORTED"); switch_channel_set_variable(channel, "fifo_timestamp", date); if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { switch_channel_event_set_data(channel, event); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Name", "%s", argv[0]); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "abort"); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Action", cd.do_orbit ? "timeout" : "abort"); switch_event_fire(&event); } switch_mutex_lock(node->mutex); + node_remove_uuid(node, uuid); node->caller_count--; - node->waiting_count--; send_presence(node); switch_core_hash_delete(node->caller_hash, uuid); switch_mutex_unlock(node->mutex); } + if (cd.do_orbit && cd.orbit_exten) { + if (orbit_ann) { + switch_ivr_play_file(session, NULL, orbit_ann, NULL); + } + switch_ivr_session_transfer(session, cd.orbit_exten, NULL, NULL); + } + return; - } else if (!strcasecmp(argv[1], "out")) { + } else { /* consumer */ void *pop = NULL; switch_frame_t *read_frame; switch_status_t status; @@ -403,35 +604,31 @@ SWITCH_STANDARD_APP(fifo_function) char buf[5] = ""; - if (!(my_id = switch_channel_get_variable(channel, "fifo_consumer_id"))) { - my_id = switch_core_session_get_uuid(session); - } - if (argc > 2) { if (!strcasecmp(argv[2], "nowait")) { - nowait++; + do_wait = 0; } else if (strcasecmp(argv[2], "wait")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE); return; } } - - if (argc > 3) { - announce = argv[3]; - } - if (argc > 4) { - moh = argv[4]; - } + + if (!(my_id = switch_channel_get_variable(channel, "fifo_consumer_id"))) { + my_id = switch_core_session_get_uuid(session); + } + - check_string(announce); - check_string(moh); - - if (!nowait) { - switch_mutex_lock(node->mutex); - node->consumer_count++; - switch_core_hash_insert(node->consumer_hash, switch_core_session_get_uuid(session), session); - switch_mutex_unlock(node->mutex); + if (do_wait) { + for (i = 0; i < node_count; i++) { + if (!(node = node_list[i])) { + continue; + } + switch_mutex_lock(node->mutex); + node->consumer_count++; + switch_core_hash_insert(node->consumer_hash, switch_core_session_get_uuid(session), session); + switch_mutex_unlock(node->mutex); + } switch_channel_answer(channel); } @@ -465,32 +662,69 @@ SWITCH_STANDARD_APP(fifo_function) } while(switch_channel_ready(channel)) { - int x = 0 ; + int x = 0, winner = -1; + switch_time_t longest = 0xFFFFFFFFFFFFFFFF / 2; + uint32_t importance = 0; + pop = NULL; - if (moh) { + if (moh && do_wait) { memset(&args, 0, sizeof(args)); - args.read_frame_callback = read_frame_callback; - args.user_data = node; + args.read_frame_callback = consumer_read_frame_callback; + args.user_data = node_list; switch_ivr_play_file(session, NULL, moh, &args); } - if (custom_pop) { - for(x = 0; x < MAX_PRI; x++) { - if (switch_queue_trypop(node->fifo_list[pop_array[x]], &pop) == SWITCH_STATUS_SUCCESS && pop) { - break; + for(i = 0; i < node_count; i++) { + if (!(node = node_list[i])) { + continue; + } + + if (node_consumer_wait_count(node)) { + if (!importance && node->start_waiting < longest) { + longest = node->start_waiting; + winner = i; + } + + if (node->importance > importance) { + importance = node->importance; + winner = i; } } + + } + + if (winner > -1) { + node = node_list[winner]; } else { - for(x = 0; x < MAX_PRI; x++) { - if (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS && pop) { - break; - } - } + node = NULL; } + if (node) { + if (custom_pop) { + for(x = 0; x < MAX_PRI; x++) { + if (switch_queue_trypop(node->fifo_list[pop_array[x]], &pop) == SWITCH_STATUS_SUCCESS && pop) { + break; + } + } + } else { + for(x = 0; x < MAX_PRI; x++) { + if (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS && pop) { + break; + } + } + } + + if (pop && !node_consumer_wait_count(node)) { + switch_mutex_lock(node->mutex); + node->start_waiting = 0; + switch_mutex_unlock(node->mutex); + } + } + + if (!pop) { - if (nowait) { + if (!do_wait) { break; } @@ -506,7 +740,7 @@ SWITCH_STANDARD_APP(fifo_function) uuid = (char *) pop; pop = NULL; - if ((other_session = switch_core_session_locate(uuid))) { + if (node && (other_session = switch_core_session_locate(uuid))) { switch_channel_t *other_channel = switch_core_session_get_channel(other_session); switch_caller_profile_t *cloned_profile; const char *o_announce = NULL; @@ -575,7 +809,6 @@ SWITCH_STANDARD_APP(fifo_function) switch_channel_set_variable(other_channel, "fifo_timestamp", date); switch_channel_set_variable(other_channel, "fifo_target", switch_core_session_get_uuid(session)); switch_mutex_lock(node->mutex); - node->waiting_count--; send_presence(node); switch_mutex_unlock(node->mutex); switch_ivr_multi_threaded_bridge(session, other_session, on_dtmf, other_session, session); @@ -595,7 +828,7 @@ SWITCH_STANDARD_APP(fifo_function) switch_core_hash_delete(node->caller_hash, uuid); switch_mutex_unlock(node->mutex); switch_core_session_rwunlock(other_session); - if (nowait) { + if (!do_wait) { done = 1; } @@ -620,14 +853,14 @@ SWITCH_STANDARD_APP(fifo_function) } } } - + switch_safe_free(uuid); if (done) { break; } } - + if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { switch_channel_event_set_data(channel, event); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "FIFO-Name", "%s", argv[0]); @@ -635,15 +868,18 @@ SWITCH_STANDARD_APP(fifo_function) switch_event_fire(&event); } - if (!nowait) { - switch_mutex_lock(node->mutex); - switch_core_hash_delete(node->consumer_hash, switch_core_session_get_uuid(session)); - node->consumer_count--; - switch_mutex_unlock(node->mutex); + if (do_wait) { + for (i = 0; i < node_count; i++) { + if (!(node = node_list[i])) { + continue; + } + switch_mutex_lock(node->mutex); + switch_core_hash_delete(node->consumer_hash, switch_core_session_get_uuid(session)); + node->consumer_count--; + switch_mutex_unlock(node->mutex); + } } - - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "USAGE %s\n", FIFO_USAGE); + } } @@ -718,15 +954,17 @@ static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int ve switch_xml_set_attr_d(x_fifo, "consumer_count", tmp); switch_snprintf(tmp, sizeof(buffer), "%d", node->caller_count); switch_xml_set_attr_d(x_fifo, "caller_count", tmp); - switch_snprintf(tmp, sizeof(buffer), "%d", node->waiting_count); + switch_snprintf(tmp, sizeof(buffer), "%d", node_consumer_wait_count(node)); switch_xml_set_attr_d(x_fifo, "waiting_count", tmp); + switch_snprintf(tmp, sizeof(buffer), "%u", node->importance); + switch_xml_set_attr_d(x_fifo, "importance", tmp); cc_off = xml_hash(x_fifo, node->caller_hash, "callers", "caller", cc_off, verbose); cc_off = xml_hash(x_fifo, node->consumer_hash, "consumers", "consumer", cc_off, verbose); } -#define FIFO_API_SYNTAX "list|count []" +#define FIFO_API_SYNTAX "list|list_verbose|count|importance []" SWITCH_STANDARD_API(fifo_api_function) { int len = 0; @@ -761,7 +999,7 @@ SWITCH_STANDARD_API(fifo_api_function) char *xml_text = NULL; switch_xml_t x_report = switch_xml_new("fifo_report"); switch_assert(x_report); - + if (argc < 2) { for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, &var, NULL, &val); @@ -783,16 +1021,26 @@ SWITCH_STANDARD_API(fifo_api_function) switch_xml_free(x_report); switch_safe_free(xml_text); + } else if (!strcasecmp(argv[0], "importance")) { + if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) { + int importance = 0; + if (argc > 2) { + importance = atoi(argv[2]); + if (importance < 0) { + importance = 0; + } + node->importance = importance; + } + stream->write_function(stream, "importance: %u\n", node->importance); + } else { + stream->write_function(stream, "no fifo by that name\n"); + } } else if (!strcasecmp(argv[0], "count")) { if (argc < 2) { for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { - int i = 0; switch_hash_this(hi, &var, NULL, &val); node = (fifo_node_t *) val; - len = 0; - for (i = 0; i < MAX_PRI; i++) { - len += switch_queue_size(node->fifo_list[i]); - } + len = node_consumer_wait_count(node); switch_mutex_lock(node->mutex); stream->write_function(stream, "%s:%d:%d:%d\n", (char *)var, node->consumer_count, node->caller_count, len); switch_mutex_unlock(node->mutex); @@ -804,12 +1052,7 @@ SWITCH_STANDARD_API(fifo_api_function) } } else { if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) { - int i = 0; - len = 0; - for (i = 0 ;i < MAX_PRI; i++) { - len += switch_queue_size(node->fifo_list[i]); - } - + len = node_consumer_wait_count(node); } switch_mutex_lock(node->mutex); stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node->caller_count, len); diff --git a/src/mod/formats/mod_local_stream/mod_local_stream.c b/src/mod/formats/mod_local_stream/mod_local_stream.c index f65188653a..5a6c4e6c04 100644 --- a/src/mod/formats/mod_local_stream/mod_local_stream.c +++ b/src/mod/formats/mod_local_stream/mod_local_stream.c @@ -54,6 +54,7 @@ struct local_stream_context { const char *file; const char *func; int line; + switch_file_handle_t *handle; struct local_stream_context *next; }; @@ -203,6 +204,9 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void switch_mutex_lock(source->mutex); for (cp = source->context_list; cp; cp = cp->next) { + if (switch_test_flag(cp->handle, SWITCH_FILE_CALLBACK)) { + continue; + } switch_mutex_lock(cp->audio_mutex); if (switch_buffer_inuse(cp->audio_buffer) > source->samples * 768) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Leaking stream handle! [%s() %s:%d]\n", cp->func, cp->file, cp->line); @@ -291,7 +295,7 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons context->file = handle->file; context->func = handle->func; context->line = handle->line; - + context->handle = handle; switch_mutex_lock(source->mutex); context->next = source->context_list; source->context_list = context; diff --git a/src/switch_ivr.c b/src/switch_ivr.c index 26e09d9d38..c4ef5f3ffb 100644 --- a/src/switch_ivr.c +++ b/src/switch_ivr.c @@ -616,6 +616,12 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_collect_digits_callback(switch_core_s if (!SWITCH_READ_ACCEPTABLE(status)) { break; } + + if (args && (args->read_frame_callback)) { + if (args->read_frame_callback(session, read_frame, args->user_data) != SWITCH_STATUS_SUCCESS) { + break; + } + } } return status; diff --git a/src/switch_ivr_play_say.c b/src/switch_ivr_play_say.c index 63db34cb09..c804b555b0 100644 --- a/src/switch_ivr_play_say.c +++ b/src/switch_ivr_play_say.c @@ -1111,7 +1111,13 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess } if (args && (args->read_frame_callback)) { + int ok = 1; + switch_set_flag(fh, SWITCH_FILE_CALLBACK); if (args->read_frame_callback(session, read_frame, args->user_data) != SWITCH_STATUS_SUCCESS) { + ok = 0; + } + switch_clear_flag(fh, SWITCH_FILE_CALLBACK); + if (!ok) { break; } }