diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 3cbfb0b88a..48f53ad00a 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -541,8 +541,6 @@ static switch_status_t check_attached_sessions(listener_t *listener) /* event is a channel destroy, so this session can be removed */ if (pevent->event_id == SWITCH_EVENT_CHANNEL_DESTROY) { - switch_core_session_t *session; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Destroy event for attached session for %s\n", sp->uuid_str); /* remove session from list */ @@ -551,10 +549,6 @@ static switch_status_t check_attached_sessions(listener_t *listener) else listener->session_list = sp->next; - if ((session = switch_core_session_locate(sp->uuid_str))) { - switch_channel_clear_flag(switch_core_session_get_channel(session), CF_CONTROLLED); - switch_core_session_rwunlock(session); - } /* this allows the application threads to exit */ switch_clear_flag_locked(sp, LFLAG_SESSION_ALIVE); removed = 1; @@ -1041,23 +1035,17 @@ session_elem_t* attach_call_to_registered_process(listener_t* listener, char* re { /* create a session list element */ session_elem_t* session_element = NULL; - if (!(session_element = switch_core_alloc(switch_core_session_get_pool(session), sizeof(*session_element)))) { + if (!(session_element = switch_core_session_alloc(session, sizeof(*session_element)))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n"); - } - else { - if (SWITCH_STATUS_SUCCESS != switch_core_session_read_lock(session)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n"); - } - else { - session_element->process.type = ERLANG_REG_PROCESS; - session_element->process.reg_name = switch_core_strdup(switch_core_session_get_pool(session),reg_name); - switch_set_flag(session_element, LFLAG_SESSION_ALIVE); - switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); - switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); - switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); - /* attach the session to the listener */ - add_session_elem_to_listener(listener,session_element); - } + } else { + session_element->process.type = ERLANG_REG_PROCESS; + session_element->process.reg_name = switch_core_strdup(switch_core_session_get_pool(session),reg_name); + switch_set_flag(session_element, LFLAG_SESSION_ALIVE); + switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); + switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); + switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); + /* attach the session to the listener */ + add_session_elem_to_listener(listener,session_element); } return session_element; } @@ -1066,26 +1054,20 @@ session_elem_t* attach_call_to_pid(listener_t* listener, erlang_pid* pid, switch { /* create a session list element */ session_elem_t* session_element = NULL; - if (!(session_element = switch_core_alloc(switch_core_session_get_pool(session), sizeof(*session_element)))) { + if (!(session_element = switch_core_session_alloc(session, sizeof(*session_element)))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n"); - } - else { - if (SWITCH_STATUS_SUCCESS != switch_core_session_read_lock(session)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n"); - } - else { - memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH); - session_element->process.type = ERLANG_PID; - memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); - switch_set_flag(session_element, LFLAG_SESSION_ALIVE); - switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); - switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); - switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); - /* attach the session to the listener */ - add_session_elem_to_listener(listener,session_element); + } else { + memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH); + session_element->process.type = ERLANG_PID; + memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); + switch_set_flag(session_element, LFLAG_SESSION_ALIVE); + switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); + switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); + switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); + /* attach the session to the listener */ + add_session_elem_to_listener(listener,session_element); - ei_link(listener, ei_self(listener->ec), pid); - } + ei_link(listener, ei_self(listener->ec), pid); } return session_element; } @@ -1094,83 +1076,78 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul { /* create a session list element */ session_elem_t* session_element=NULL; - if (!(session_element = switch_core_alloc(switch_core_session_get_pool(session), sizeof(*session_element)))) { + if (!(session_element = switch_core_session_alloc(session, sizeof(*session_element)))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n"); } else { - if (SWITCH_STATUS_SUCCESS != switch_core_session_read_lock(session)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n"); - } - else { - char hash[100]; - int i = 0; - void *p = NULL; + char hash[100]; + int i = 0; + void *p = NULL; - memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH); - erlang_pid *pid; - erlang_ref ref; + memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH); + erlang_pid *pid; + erlang_ref ref; - switch_set_flag(session_element, LFLAG_WAITING_FOR_PID); - switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); - switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); - /* attach the session to the listener */ - add_session_elem_to_listener(listener,session_element); + switch_set_flag(session_element, LFLAG_WAITING_FOR_PID); + switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); + switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); + /* attach the session to the listener */ + add_session_elem_to_listener(listener,session_element); - ei_init_ref(listener->ec, &ref); - ei_hash_ref(&ref, hash); - /* insert the waiting marker */ - switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.WAITING); + ei_init_ref(listener->ec, &ref); + ei_hash_ref(&ref, hash); + /* insert the waiting marker */ + switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.WAITING); - if (!strcmp(function, "!")) { - /* send a message to request a pid */ - ei_x_buff rbuf; - ei_x_new_with_version(&rbuf); + if (!strcmp(function, "!")) { + /* send a message to request a pid */ + ei_x_buff rbuf; + ei_x_new_with_version(&rbuf); - ei_x_encode_tuple_header(&rbuf, 3); - ei_x_encode_atom(&rbuf, "new_pid"); - ei_x_encode_ref(&rbuf, &ref); - ei_x_encode_pid(&rbuf, ei_self(listener->ec)); - /* should lock with mutex? */ - ei_reg_send(listener->ec, listener->sockfd, module, rbuf.buff, rbuf.index); + ei_x_encode_tuple_header(&rbuf, 3); + ei_x_encode_atom(&rbuf, "new_pid"); + ei_x_encode_ref(&rbuf, &ref); + ei_x_encode_pid(&rbuf, ei_self(listener->ec)); + /* should lock with mutex? */ + ei_reg_send(listener->ec, listener->sockfd, module, rbuf.buff, rbuf.index); #ifdef EI_DEBUG - ei_x_print_reg_msg(&rbuf, module, 1); + ei_x_print_reg_msg(&rbuf, module, 1); #endif - ei_x_free(&rbuf); - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "rpc call: %s:%s(Ref)\n", module, function); - /* should lock with mutex? */ - ei_pid_from_rpc(listener->ec, listener->sockfd, &ref, module, function); - /* - char *argv[1]; - ei_spawn(listener->ec, listener->sockfd, &ref, module, function, 0, argv); - */ - } - - /* loop until either we timeout or we get a value that's not the waiting marker */ - while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) { - if (i > 50) { /* half a second timeout */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n"); - remove_session_elem_from_listener(listener,session_element); - switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */ - return NULL; - } - i++; - switch_yield(10000); /* 10ms */ - } - - switch_core_hash_delete(listener->spawn_pid_hash, hash); - - pid = (erlang_pid *) p; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got pid!\n"); - - session_element->process.type = ERLANG_PID; - memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); - free(pid); /* malloced in handle_ref_tuple */ - switch_set_flag(session_element, LFLAG_SESSION_ALIVE); - switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); - switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID); - - ei_link(listener, ei_self(listener->ec), pid); + ei_x_free(&rbuf); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "rpc call: %s:%s(Ref)\n", module, function); + /* should lock with mutex? */ + ei_pid_from_rpc(listener->ec, listener->sockfd, &ref, module, function); + /* + char *argv[1]; + ei_spawn(listener->ec, listener->sockfd, &ref, module, function, 0, argv); + */ } + + /* loop until either we timeout or we get a value that's not the waiting marker */ + while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) { + if (i > 50) { /* half a second timeout */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n"); + remove_session_elem_from_listener(listener,session_element); + switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */ + return NULL; + } + i++; + switch_yield(10000); /* 10ms */ + } + + switch_core_hash_delete(listener->spawn_pid_hash, hash); + + pid = (erlang_pid *) p; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got pid!\n"); + + session_element->process.type = ERLANG_PID; + memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); + free(pid); /* malloced in handle_ref_tuple */ + switch_set_flag(session_element, LFLAG_SESSION_ALIVE); + switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); + switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID); + + ei_link(listener, ei_self(listener->ec), pid); } return session_element; } @@ -1276,9 +1253,8 @@ SWITCH_STANDARD_APP(erlang_outbound_function) /* keep app thread running for lifetime of session */ if (switch_channel_down(switch_core_session_get_channel(session))) { - while (switch_test_flag(session_element, LFLAG_SESSION_ALIVE)) { - switch_yield(100000); - } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "outbound session all done\n"); + switch_clear_flag_locked(session_element, LFLAG_SESSION_ALIVE); } } }