From cd4c3188e4f715ff129dc4eea1a4ba50140c2a42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Artur=20Zaprza=C5=82a?= Date: Wed, 2 Sep 2015 13:24:18 +0200 Subject: [PATCH] FS-8142 Fix a thread cache thread-safety and caching --- src/include/private/switch_core_pvt.h | 6 - src/switch_core_session.c | 207 +++++--------------------- 2 files changed, 35 insertions(+), 178 deletions(-) diff --git a/src/include/private/switch_core_pvt.h b/src/include/private/switch_core_pvt.h index 3522cf0269..6b46994075 100644 --- a/src/include/private/switch_core_pvt.h +++ b/src/include/private/switch_core_pvt.h @@ -309,16 +309,10 @@ struct switch_session_manager { uint32_t session_limit; switch_size_t session_id; switch_queue_t *thread_queue; - switch_thread_t *manager_thread; switch_mutex_t *mutex; switch_thread_cond_t *cond; - switch_mutex_t *cond_mutex; - switch_mutex_t *cond2_mutex; - int ready; int running; int busy; - int popping; - int starting; }; extern struct switch_session_manager session_manager; diff --git a/src/switch_core_session.c b/src/switch_core_session.c index c059f1cadc..0b2caef3cd 100644 --- a/src/switch_core_session.c +++ b/src/switch_core_session.c @@ -1675,45 +1675,18 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th { switch_thread_pool_node_t *node = (switch_thread_pool_node_t *) obj; switch_memory_pool_t *pool = node->pool; - void *pop; - int check = 0; - - switch_mutex_lock(session_manager.mutex); - session_manager.starting--; - session_manager.running++; - switch_mutex_unlock(session_manager.mutex); #ifdef DEBUG_THREAD_POOL switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Started\n", (long) (intptr_t) thread); #endif - while(session_manager.ready) { - switch_status_t check_status; - - pop = NULL; - - if (check) { - check_status = switch_queue_trypop(session_manager.thread_queue, &pop); - } else { - switch_mutex_lock(session_manager.mutex); - session_manager.popping++; - switch_mutex_unlock(session_manager.mutex); - - check_status = switch_queue_pop(session_manager.thread_queue, &pop); - - switch_mutex_lock(session_manager.mutex); - session_manager.popping--; - switch_mutex_unlock(session_manager.mutex); - } - - if (check_status == SWITCH_STATUS_SUCCESS && pop) { + while (1) { + void *pop; + switch_status_t check_status = switch_queue_pop_timeout(session_manager.thread_queue, &pop, apr_time_from_sec(5)); + if (check_status == SWITCH_STATUS_SUCCESS) { switch_thread_data_t *td = (switch_thread_data_t *) pop; - switch_mutex_lock(session_manager.mutex); - session_manager.busy++; - switch_mutex_unlock(session_manager.mutex); #ifdef DEBUG_THREAD_POOL switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing\n", (long) (intptr_t) thread); #endif - td->func(thread, td->obj); if (td->pool) { @@ -1729,23 +1702,22 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th switch_mutex_lock(session_manager.mutex); session_manager.busy--; switch_mutex_unlock(session_manager.mutex); - } else { - if (check) { + switch_mutex_lock(session_manager.mutex); + if (!switch_status_is_timeup(check_status) || session_manager.running > session_manager.busy) { + if (!--session_manager.running) { + switch_thread_cond_signal(session_manager.cond); + } + switch_mutex_unlock(session_manager.mutex); break; } - check++; + switch_mutex_unlock(session_manager.mutex); } } #ifdef DEBUG_THREAD_POOL switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Ended\n", (long)(intptr_t) thread); #endif - switch_mutex_lock(session_manager.mutex); - session_manager.running--; - switch_mutex_unlock(session_manager.mutex); - switch_core_destroy_memory_pool(&pool); - return NULL; } @@ -1772,46 +1744,18 @@ static void thread_launch_failure(void) switch_mutex_unlock(session_manager.mutex); } -static int wake_queue(void) -{ - switch_status_t status; - int tries = 0; - - top: - - status = switch_mutex_trylock(session_manager.cond_mutex); - - if (status == SWITCH_STATUS_SUCCESS) { - switch_thread_cond_signal(session_manager.cond); - switch_mutex_unlock(session_manager.cond_mutex); - return 1; - } else { - if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) { - switch_mutex_unlock(session_manager.cond2_mutex); - } else { - if (++tries < 10) { - switch_cond_next(); - goto top; - } - } - } - - return 0; -} - static switch_status_t check_queue(void) { switch_status_t status = SWITCH_STATUS_FALSE; - int ttl = 0; - int x = 0; - switch_mutex_lock(session_manager.mutex); - ttl = switch_queue_size(session_manager.thread_queue); - x = ((session_manager.running + session_manager.starting) - session_manager.busy); + if (session_manager.running >= ++session_manager.busy) { + switch_mutex_unlock(session_manager.mutex); + return SWITCH_STATUS_SUCCESS; + } + ++session_manager.running; switch_mutex_unlock(session_manager.mutex); - - while (x < ttl) { + { switch_thread_t *thread; switch_threadattr_t *thd_attr; switch_memory_pool_t *pool; @@ -1827,6 +1771,11 @@ static switch_status_t check_queue(void) switch_threadattr_priority_set(thd_attr, SWITCH_PRI_LOW); if (switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_worker, node, node->pool) != SWITCH_STATUS_SUCCESS) { + switch_mutex_lock(session_manager.mutex); + if (!--session_manager.running) { + switch_thread_cond_signal(session_manager.cond); + } + switch_mutex_unlock(session_manager.mutex); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Thread Failure!\n"); switch_core_destroy_memory_pool(&pool); status = SWITCH_STATUS_GENERR; @@ -1834,76 +1783,11 @@ static switch_status_t check_queue(void) } else { status = SWITCH_STATUS_SUCCESS; } - - switch_mutex_lock(session_manager.mutex); - session_manager.starting++; - switch_mutex_unlock(session_manager.mutex); - x++; } - return status; } -static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj) -{ - - uint32_t sleep = 10000000; - switch_time_t next = switch_micro_time_now() + sleep; - - switch_mutex_lock(session_manager.cond_mutex); - - while(session_manager.ready) { - int check = 1; - int ttl = 0; - uint32_t xsleep = sleep; - - switch_mutex_lock(session_manager.mutex); - ttl = switch_queue_size(session_manager.thread_queue); - switch_mutex_unlock(session_manager.mutex); - - - if (!ttl) { - xsleep = 10000; - } - - if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) { - switch_thread_cond_timedwait(session_manager.cond, session_manager.cond_mutex, xsleep); - switch_mutex_unlock(session_manager.cond2_mutex); - } - - - if (switch_micro_time_now() >= next) { - if (session_manager.popping) { -#ifdef DEBUG_THREAD_POOL - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, - "Thread pool: running:%d busy:%d popping:%d\n", session_manager.running, session_manager.busy, session_manager.popping); -#endif - switch_queue_interrupt_all(session_manager.thread_queue); - - sleep = 100000; - check = 0; - } else { - sleep = 10000000; - } - } - - if (check) check_queue(); - - next = switch_micro_time_now() + sleep; - } - - switch_mutex_unlock(session_manager.cond_mutex); - - while(session_manager.running) { - switch_queue_interrupt_all(session_manager.thread_queue); - switch_yield(20000); - } - - - return NULL; -} - SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp) { switch_status_t status = SWITCH_STATUS_SUCCESS; @@ -1914,8 +1798,8 @@ SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_d td = *tdp; *tdp = NULL; - switch_queue_push(session_manager.thread_queue, td); - wake_queue(); + status = switch_queue_push(session_manager.thread_queue, td); + check_queue(); return status; } @@ -1931,14 +1815,13 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_co } else if (switch_test_flag(session, SSF_THREAD_STARTED)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Cannot launch thread again after it has already been run!\n"); } else { - status = SWITCH_STATUS_SUCCESS; switch_set_flag(session, SSF_THREAD_RUNNING); switch_set_flag(session, SSF_THREAD_STARTED); td = switch_core_session_alloc(session, sizeof(*td)); td->obj = session; td->func = switch_core_session_thread; - switch_queue_push(session_manager.thread_queue, td); - wake_queue(); + status = switch_queue_push(session_manager.thread_queue, td); + check_queue(); } switch_mutex_unlock(session->mutex); @@ -2581,39 +2464,19 @@ void switch_core_session_init(switch_memory_pool_t *pool) session_manager.session_id = 1; session_manager.memory_pool = pool; switch_core_hash_init(&session_manager.session_table); - - if (switch_test_flag((&runtime), SCF_SESSION_THREAD_POOL)) { - switch_threadattr_t *thd_attr; - - switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool); - switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool); - switch_mutex_init(&session_manager.cond_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool); - switch_mutex_init(&session_manager.cond2_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool); - switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool); - switch_threadattr_create(&thd_attr, session_manager.memory_pool); - switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - session_manager.ready = 1; - switch_thread_create(&session_manager.manager_thread, thd_attr, switch_core_session_thread_pool_manager, NULL, session_manager.memory_pool); - } - + switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_DEFAULT, session_manager.memory_pool); + switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool); + switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool); } void switch_core_session_uninit(void) { - int sanity = 100; - switch_status_t st = SWITCH_STATUS_FALSE; - - session_manager.ready = 0; - wake_queue(); - - while(session_manager.running && --sanity > 0) { - switch_queue_interrupt_all(session_manager.thread_queue); - switch_yield(100000); - } - - switch_thread_join(&st, session_manager.manager_thread); + switch_queue_term(session_manager.thread_queue); + switch_mutex_lock(session_manager.mutex); + if (session_manager.running) + switch_thread_cond_timedwait(session_manager.cond, session_manager.mutex, apr_time_from_sec(10)); + switch_mutex_unlock(session_manager.mutex); switch_core_hash_destroy(&session_manager.session_table); - } SWITCH_DECLARE(switch_app_log_t *) switch_core_session_get_app_log(switch_core_session_t *session) @@ -3062,7 +2925,7 @@ SWITCH_DECLARE(switch_log_level_t) switch_core_session_get_loglevel(switch_core_ SWITCH_DECLARE(void) switch_core_session_debug_pool(switch_stream_handle_t *stream) { stream->write_function(stream, "Thread pool: running:%d busy:%d popping:%d\n", - session_manager.running, session_manager.busy, session_manager.popping); + session_manager.running, session_manager.busy, session_manager.running - session_manager.busy); } SWITCH_DECLARE(void) switch_core_session_raw_read(switch_core_session_t *session)