diff --git a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c index c8d8a23501..93514c8c1d 100644 --- a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c +++ b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c @@ -98,6 +98,8 @@ struct listener { time_t linger_timeout; struct listener *next; switch_pollfd_t *pollfd; + uint8_t lock_acquired; + uint8_t finished; }; typedef struct listener listener_t; @@ -163,7 +165,7 @@ SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip); SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_pass, prefs.password); static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj); -static void launch_listener_thread(listener_t *listener); +static switch_status_t launch_listener_thread(listener_t *listener); static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level) { @@ -523,9 +525,17 @@ SWITCH_STANDARD_APP(socket_function) if (switch_test_flag(listener, LFLAG_ASYNC)) { const char *var; - launch_listener_thread(listener); + if (launch_listener_thread(listener) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Failed to start listener\n"); + return; + } - while (switch_channel_ready(channel) && !switch_test_flag(listener, LFLAG_CONNECTED)) { + /* Wait until listener_thread acquires session read lock */ + while (!listener->lock_acquired && !listener->finished) { + switch_cond_next(); + } + + while (switch_channel_ready(channel) && !listener->finished && !switch_test_flag(listener, LFLAG_CONNECTED)) { switch_cond_next(); } @@ -2637,10 +2647,13 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) if ((session = listener->session)) { if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Unable to lock session!\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to lock session!\n"); locked = 0; + session = NULL; goto done; } + + listener->lock_acquired = 1; } if (!listener->sock) { @@ -2791,7 +2804,7 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) } switch_mutex_unlock(listener->filter_mutex); - if (listener->session) { + if (listener->session && locked) { channel = switch_core_session_get_channel(listener->session); } @@ -2823,7 +2836,9 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) } if (listener->session) { - switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED); + if (locked) { + switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED); + } switch_clear_flag_locked(listener, LFLAG_SESSION); if (locked) { switch_core_session_rwunlock(listener->session); @@ -2837,12 +2852,14 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) prefs.threads--; switch_mutex_unlock(globals.listener_mutex); + listener->finished = 1; + return NULL; } /* Create a thread for the socket and launch it */ -static void launch_listener_thread(listener_t *listener) +static switch_status_t launch_listener_thread(listener_t *listener) { switch_thread_t *thread; switch_threadattr_t *thd_attr = NULL; @@ -2850,7 +2867,7 @@ static void launch_listener_thread(listener_t *listener) switch_threadattr_create(&thd_attr, listener->pool); switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool); + return switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool); } static int config(void) @@ -3033,8 +3050,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime) if (switch_socket_addr_get(&listener->sa, SWITCH_TRUE, listener->sock) == SWITCH_STATUS_SUCCESS && listener->sa) { switch_get_addr(listener->remote_ip, sizeof(listener->remote_ip), listener->sa); if ((listener->remote_port = switch_sockaddr_get_port(listener->sa))) { - launch_listener_thread(listener); - continue; + if (launch_listener_thread(listener) == SWITCH_STATUS_SUCCESS) + continue; } }