From 90b75b3b71072487a8029ca81a48eaec26234060 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Tue, 28 Aug 2012 21:11:37 -0500 Subject: [PATCH] check-in of mutex app --- .../applications/mod_dptools/mod_dptools.c | 279 ++++++++++++++++++ 1 file changed, 279 insertions(+) diff --git a/src/mod/applications/mod_dptools/mod_dptools.c b/src/mod/applications/mod_dptools/mod_dptools.c index 92eba0f568..053be7de91 100755 --- a/src/mod/applications/mod_dptools/mod_dptools.c +++ b/src/mod/applications/mod_dptools/mod_dptools.c @@ -3197,6 +3197,8 @@ static struct { switch_memory_pool_t *pool; switch_hash_t *pickup_hash; switch_mutex_t *pickup_mutex; + switch_hash_t *mutex_hash; + switch_mutex_t *mutex_mutex; } globals; /* pickup channel */ @@ -4560,7 +4562,281 @@ SWITCH_STANDARD_APP(blind_transfer_ack_function) switch_ivr_blind_transfer_ack(session, val); } +/* /// mutex /// */ +typedef struct mutex_node_s { + switch_core_session_t *session; + struct mutex_node_s *next; +} mutex_node_t; + +typedef enum { + MUTEX_FLAG_WAIT = (1 << 0), + MUTEX_FLAG_SET = (1 << 1) +} mutex_flag_t; + +struct read_frame_data { + const char *dp; + const char *exten; + const char *context; + const char *key; + long to; +}; + +typedef struct master_mutex_s { + mutex_node_t *list; + char *key; +} master_mutex_t; + +static switch_status_t mutex_hanguphook(switch_core_session_t *session); + +static switch_status_t read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data) +{ + switch_channel_t *channel = switch_core_session_get_channel(session); + struct read_frame_data *rf = (struct read_frame_data *) user_data; + + if (rf->to && --rf->to <= 0) { + rf->to = -1; + return SWITCH_STATUS_FALSE; + } + + return switch_channel_test_app_flag_key(rf->key, channel, MUTEX_FLAG_WAIT) ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE; + +} + +static void cancel(switch_core_session_t *session, master_mutex_t *master) +{ + mutex_node_t *np, *lp = NULL; + + switch_mutex_lock(globals.mutex_mutex); + for (np = master->list; np; np = np->next) { + if (np->session == session) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s mutex %s canceled\n", + switch_core_session_get_name(session), master->key); + if (lp) { + lp->next = np->next; + } else { + master->list = np->next; + } + break; + } + + lp = np; + } + + switch_mutex_unlock(globals.mutex_mutex); + +} + +static void advance(master_mutex_t *master) +{ + switch_mutex_lock(globals.mutex_mutex); + + if (!master || !master->list) { + goto end; + } + + master->list = master->list->next; + + if (master->list) { + switch_channel_t *channel = switch_core_session_get_channel(master->list->session); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(master->list->session), SWITCH_LOG_DEBUG, + "%s mutex %s advanced\n", switch_channel_get_name(channel), master->key); + switch_channel_set_app_flag_key(master->key, channel, MUTEX_FLAG_SET); + switch_channel_clear_app_flag_key(master->key, channel, MUTEX_FLAG_WAIT); + } + + + end: + + switch_mutex_unlock(globals.mutex_mutex); + + +} + +static void confirm(switch_core_session_t *session, master_mutex_t *master) +{ + switch_channel_t *channel = switch_core_session_get_channel(session); + + if (!master) { + if (!(master = switch_channel_get_private(channel, "_mutex_master"))) { + return; + } + } + + if (switch_channel_test_app_flag_key(master->key, channel, MUTEX_FLAG_WAIT)) { + cancel(session, master); + } + + if (master->list->session == session) { + switch_channel_clear_app_flag_key(master->key, channel, MUTEX_FLAG_SET); + switch_core_event_hook_remove_state_change(session, mutex_hanguphook); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s mutex %s cleared\n", switch_channel_get_name(channel), master->key); + advance(master); + } +} + + + +static switch_status_t mutex_hanguphook(switch_core_session_t *session) +{ + switch_channel_t *channel = switch_core_session_get_channel(session); + switch_channel_state_t state = switch_channel_get_state(channel); + + if (state != CS_HANGUP) { + return SWITCH_STATUS_SUCCESS; + } + + confirm(session, NULL); + switch_core_event_hook_remove_state_change(session, mutex_hanguphook); + + return SWITCH_STATUS_SUCCESS; +} + +static switch_bool_t do_mutex(switch_core_session_t *session, const char *key, switch_bool_t on) +{ + switch_channel_t *channel = switch_core_session_get_channel(session); + const char *feedback, *var; + switch_input_args_t args = { 0 }; + master_mutex_t *master = NULL; + mutex_node_t *node, *np; + int used; + struct read_frame_data rf = { 0 }; + long to_val = 0; + + if (switch_channel_pre_answer(channel) != SWITCH_STATUS_SUCCESS) { + return SWITCH_FALSE; + } + + switch_mutex_lock(globals.mutex_mutex); + used = switch_channel_test_app_flag_key(key, channel, MUTEX_FLAG_WAIT) || switch_channel_test_app_flag_key(key, channel, MUTEX_FLAG_SET); + + if ((on && used) || (!on && !used)) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "INVALID STATE\n"); + switch_mutex_unlock(globals.mutex_mutex); + return SWITCH_FALSE; + } + + if (!(master = switch_core_hash_find(globals.mutex_hash, key))) { + master = switch_core_alloc(globals.pool, sizeof(*master)); + master->key = switch_core_strdup(globals.pool, key); + switch_core_hash_insert(globals.mutex_hash, key, master); + } + + if (on) { + node = switch_core_session_alloc(session, sizeof(*node)); + node->session = session; + + for (np = master->list; np && np->next; np = np->next); + + if (np) { + np->next = node; + switch_channel_set_app_flag_key(key, channel, MUTEX_FLAG_WAIT); + } else { + master->list = node; + switch_channel_set_app_flag_key(key, channel, MUTEX_FLAG_SET); + switch_channel_set_private(channel, "_mutex_master", master); + switch_core_event_hook_add_state_change(session, mutex_hanguphook); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s mutex %s acquired\n", switch_channel_get_name(channel), key); + switch_mutex_unlock(globals.mutex_mutex); + return SWITCH_TRUE; + } + } else { + confirm(session, master); + + switch_mutex_unlock(globals.mutex_mutex); + return SWITCH_TRUE; + } + + switch_mutex_unlock(globals.mutex_mutex); + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s mutex %s is busy, waiting...\n", switch_channel_get_name(channel), key); + + if (!(feedback = switch_channel_get_variable(channel, "mutex_feedback"))) { + if ((var = switch_channel_get_variable(channel, "ringback"))) { + feedback = switch_core_session_sprintf(session, "tone_stream://%s;loops=-1", var); + } else { + feedback = switch_channel_get_hold_music(channel); + } + } + + if (zstr(feedback) || !strcasecmp(feedback, "silence")) { + feedback = "silence_stream://-1"; + } + + + if ((rf.exten = switch_channel_get_variable(channel, "mutex_orbit_exten"))) { + to_val = 60; + } + + if ((var = switch_channel_get_variable(channel, "mutex_timeout"))) { + long tmp = atol(var); + + if (tmp > 0) { + to_val = tmp; + } + } + + if (to_val) { + switch_codec_implementation_t read_impl; + switch_core_session_get_read_impl(session, &read_impl); + + rf.to = (1000 / (read_impl.microseconds_per_packet / 1000)) * to_val; + rf.dp = switch_channel_get_variable(channel, "mutex_orbit_dialplan"); + rf.context = switch_channel_get_variable(channel, "mutex_orbit_context"); + } + + rf.key = key; + + args.read_frame_callback = read_frame_callback; + args.user_data = &rf; + + while(switch_channel_ready(channel) && switch_channel_test_app_flag_key(key, channel, MUTEX_FLAG_WAIT)) { + switch_status_t st = switch_ivr_play_file(session, NULL, feedback, &args); + + if (st != SWITCH_STATUS_SUCCESS) { + break; + } + } + + switch_mutex_lock(globals.mutex_mutex); + if (switch_channel_test_app_flag_key(key, channel, MUTEX_FLAG_WAIT)) { + cancel(session, master); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s mutex %s acquired\n", switch_channel_get_name(channel), key); + } + switch_mutex_unlock(globals.mutex_mutex); + + return SWITCH_TRUE; +} + +#define MUTEX_SYNTAX "[ on|off]" +SWITCH_STANDARD_APP(mutex_function) +{ + char *key; + char *arg; + switch_bool_t on = SWITCH_TRUE; + + if (zstr(data)) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Missing keyname\n"); + return; + } + + key = switch_core_session_sprintf(session, "_mutex_key_%s", (char *)data); + + if ((arg = strchr(key, ' '))) { + *arg++ = '\0'; + + if (!strcasecmp(arg, "off")) { + on = SWITCH_FALSE; + } + } + + do_mutex(session, key, on); + + +} + +/* /// mutex /// */ #define SPEAK_DESC "Speak text to a channel via the tts interface" @@ -4597,6 +4873,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_dptools_load) globals.pool = pool; switch_core_hash_init(&globals.pickup_hash, NULL); switch_mutex_init(&globals.pickup_mutex, SWITCH_MUTEX_NESTED, globals.pool); + switch_core_hash_init(&globals.mutex_hash, NULL); + switch_mutex_init(&globals.mutex_mutex, SWITCH_MUTEX_NESTED, globals.pool); /* connect my internal structure to the blank pointer passed to me */ *module_interface = switch_loadable_module_create_module_interface(pool, modname); @@ -4664,6 +4942,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_dptools_load) SWITCH_ADD_APP(app_interface, "flush_dtmf", "flush any queued dtmf", "flush any queued dtmf", flush_dtmf_function, "", SAF_SUPPORT_NOMEDIA); SWITCH_ADD_APP(app_interface, "hold", "Send a hold message", "Send a hold message", hold_function, HOLD_SYNTAX, SAF_SUPPORT_NOMEDIA); SWITCH_ADD_APP(app_interface, "unhold", "Send a un-hold message", "Send a un-hold message", unhold_function, UNHOLD_SYNTAX, SAF_SUPPORT_NOMEDIA); + SWITCH_ADD_APP(app_interface, "mutex", "block on a call flow only allowing one at a time", "", mutex_function, MUTEX_SYNTAX, SAF_NONE); SWITCH_ADD_APP(app_interface, "transfer", "Transfer a channel", TRANSFER_LONG_DESC, transfer_function, " [ ]", SAF_SUPPORT_NOMEDIA); SWITCH_ADD_APP(app_interface, "check_acl", "Check an ip against an ACL list", "Check an ip against an ACL list", check_acl_function,