improve thread pool logic

This commit is contained in:
Anthony Minessale 2013-09-27 23:37:05 +05:00
parent 25001e857f
commit 012e321f61
3 changed files with 67 additions and 11 deletions

View File

@ -290,10 +290,14 @@ struct switch_session_manager {
switch_queue_t *thread_queue;
switch_thread_t *manager_thread;
switch_mutex_t *mutex;
switch_thread_cond_t *cond;
switch_mutex_t *cond_mutex;
switch_mutex_t *cond2_mutex;
int ready;
int running;
int busy;
int popping;
int starting;
};
extern struct switch_session_manager session_manager;

View File

@ -1792,6 +1792,12 @@ static void switch_load_core_config(const char *file)
switch_core_hash_insert(runtime.ptimes, "isac", &d_30);
switch_core_hash_insert(runtime.ptimes, "G723", &d_30);
if (runtime.cpu_count == 1) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING,
"Implicitly setting events-use-dispatch based on a single CPU\n");
runtime.events_use_dispatch = 1;
}
if ((xml = switch_xml_open_cfg(file, &cfg, NULL))) {
switch_xml_t settings, param;

View File

@ -1588,6 +1588,7 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
int check = 0;
switch_mutex_lock(session_manager.mutex);
session_manager.starting--;
session_manager.running++;
switch_mutex_unlock(session_manager.mutex);
#ifdef DEBUG_THREAD_POOL
@ -1682,6 +1683,33 @@ static void thread_launch_failure(void)
switch_mutex_unlock(session_manager.mutex);
}
static int wake_queue(void)
{
switch_status_t status;
int tries = 0;
top:
status = switch_mutex_trylock(session_manager.cond_mutex);
if (status == SWITCH_STATUS_SUCCESS) {
switch_thread_cond_signal(session_manager.cond);
switch_mutex_unlock(session_manager.cond_mutex);
return 1;
} else {
if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) {
switch_mutex_unlock(session_manager.cond2_mutex);
} else {
if (++tries < 10) {
switch_cond_next();
goto top;
}
}
}
return 0;
}
static switch_status_t check_queue(void)
{
switch_status_t status = SWITCH_STATUS_FALSE;
@ -1690,7 +1718,7 @@ static switch_status_t check_queue(void)
switch_mutex_lock(session_manager.mutex);
ttl = switch_queue_size(session_manager.thread_queue);
x = (session_manager.running - session_manager.busy);
x = ((session_manager.running + session_manager.starting) - session_manager.busy);
switch_mutex_unlock(session_manager.mutex);
@ -1717,6 +1745,10 @@ static switch_status_t check_queue(void)
} else {
status = SWITCH_STATUS_SUCCESS;
}
switch_mutex_lock(session_manager.mutex);
session_manager.starting++;
switch_mutex_unlock(session_manager.mutex);
x++;
}
@ -1726,12 +1758,20 @@ static switch_status_t check_queue(void)
static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj)
{
int x = 0;
uint32_t sleep = 10000000;
switch_time_t next = switch_micro_time_now() + sleep;
switch_mutex_lock(session_manager.cond_mutex);
while(session_manager.ready) {
switch_yield(100000);
int check = 1;
if (++x == 300) {
switch_mutex_lock(session_manager.cond2_mutex);
switch_thread_cond_timedwait(session_manager.cond, session_manager.cond_mutex, sleep);
switch_mutex_unlock(session_manager.cond2_mutex);
if (switch_micro_time_now() >= next) {
if (session_manager.popping) {
#ifdef DEBUG_THREAD_POOL
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10,
@ -1739,17 +1779,20 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_t
#endif
switch_queue_interrupt_all(session_manager.thread_queue);
x--;
continue;
sleep = 100000;
check = 0;
} else {
x = 0;
sleep = 10000000;
}
}
check_queue();
if (check) check_queue();
next = switch_micro_time_now() + sleep;
}
switch_mutex_unlock(session_manager.cond_mutex);
while(session_manager.running) {
switch_queue_interrupt_all(session_manager.thread_queue);
switch_yield(20000);
@ -1770,7 +1813,7 @@ SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_d
*tdp = NULL;
switch_queue_push(session_manager.thread_queue, td);
check_queue();
wake_queue();
return status;
}
@ -1793,7 +1836,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_co
td->obj = session;
td->func = switch_core_session_thread;
switch_queue_push(session_manager.thread_queue, td);
check_queue();
wake_queue();
}
switch_mutex_unlock(session->mutex);
@ -2438,6 +2481,9 @@ void switch_core_session_init(switch_memory_pool_t *pool)
switch_threadattr_t *thd_attr;
switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool);
switch_mutex_init(&session_manager.cond_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
switch_mutex_init(&session_manager.cond2_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
switch_threadattr_create(&thd_attr, session_manager.memory_pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);