[mod_event_socket] Fix hangup race with listener_thread of socket application in async mode

This commit is contained in:
Mike Jerris 2020-05-14 22:30:47 +04:00
parent 55ddecd750
commit 838f62a743
1 changed files with 27 additions and 10 deletions

View File

@ -98,6 +98,8 @@ struct listener {
time_t linger_timeout; time_t linger_timeout;
struct listener *next; struct listener *next;
switch_pollfd_t *pollfd; switch_pollfd_t *pollfd;
uint8_t lock_acquired;
uint8_t finished;
}; };
typedef struct listener listener_t; typedef struct listener listener_t;
@ -163,7 +165,7 @@ SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip);
SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_pass, prefs.password); SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_pass, prefs.password);
static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj); static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj);
static void launch_listener_thread(listener_t *listener); static switch_status_t launch_listener_thread(listener_t *listener);
static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level) static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level)
{ {
@ -523,9 +525,17 @@ SWITCH_STANDARD_APP(socket_function)
if (switch_test_flag(listener, LFLAG_ASYNC)) { if (switch_test_flag(listener, LFLAG_ASYNC)) {
const char *var; const char *var;
launch_listener_thread(listener); if (launch_listener_thread(listener) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Failed to start listener\n");
return;
}
while (switch_channel_ready(channel) && !switch_test_flag(listener, LFLAG_CONNECTED)) { /* Wait until listener_thread acquires session read lock */
while (!listener->lock_acquired && !listener->finished) {
switch_cond_next();
}
while (switch_channel_ready(channel) && !listener->finished && !switch_test_flag(listener, LFLAG_CONNECTED)) {
switch_cond_next(); switch_cond_next();
} }
@ -2637,10 +2647,13 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
if ((session = listener->session)) { if ((session = listener->session)) {
if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) { if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Unable to lock session!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to lock session!\n");
locked = 0; locked = 0;
session = NULL;
goto done; goto done;
} }
listener->lock_acquired = 1;
} }
if (!listener->sock) { if (!listener->sock) {
@ -2791,7 +2804,7 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
} }
switch_mutex_unlock(listener->filter_mutex); switch_mutex_unlock(listener->filter_mutex);
if (listener->session) { if (listener->session && locked) {
channel = switch_core_session_get_channel(listener->session); channel = switch_core_session_get_channel(listener->session);
} }
@ -2823,7 +2836,9 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
} }
if (listener->session) { if (listener->session) {
switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED); if (locked) {
switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED);
}
switch_clear_flag_locked(listener, LFLAG_SESSION); switch_clear_flag_locked(listener, LFLAG_SESSION);
if (locked) { if (locked) {
switch_core_session_rwunlock(listener->session); switch_core_session_rwunlock(listener->session);
@ -2837,12 +2852,14 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
prefs.threads--; prefs.threads--;
switch_mutex_unlock(globals.listener_mutex); switch_mutex_unlock(globals.listener_mutex);
listener->finished = 1;
return NULL; return NULL;
} }
/* Create a thread for the socket and launch it */ /* Create a thread for the socket and launch it */
static void launch_listener_thread(listener_t *listener) static switch_status_t launch_listener_thread(listener_t *listener)
{ {
switch_thread_t *thread; switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL; switch_threadattr_t *thd_attr = NULL;
@ -2850,7 +2867,7 @@ static void launch_listener_thread(listener_t *listener)
switch_threadattr_create(&thd_attr, listener->pool); switch_threadattr_create(&thd_attr, listener->pool);
switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool); return switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool);
} }
static int config(void) static int config(void)
@ -3033,8 +3050,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime)
if (switch_socket_addr_get(&listener->sa, SWITCH_TRUE, listener->sock) == SWITCH_STATUS_SUCCESS && listener->sa) { if (switch_socket_addr_get(&listener->sa, SWITCH_TRUE, listener->sock) == SWITCH_STATUS_SUCCESS && listener->sa) {
switch_get_addr(listener->remote_ip, sizeof(listener->remote_ip), listener->sa); switch_get_addr(listener->remote_ip, sizeof(listener->remote_ip), listener->sa);
if ((listener->remote_port = switch_sockaddr_get_port(listener->sa))) { if ((listener->remote_port = switch_sockaddr_get_port(listener->sa))) {
launch_listener_thread(listener); if (launch_listener_thread(listener) == SWITCH_STATUS_SUCCESS)
continue; continue;
} }
} }