improve thread pool logic
This commit is contained in:
parent
d732c855cc
commit
0a9ed88019
|
@ -288,10 +288,14 @@ struct switch_session_manager {
|
||||||
switch_queue_t *thread_queue;
|
switch_queue_t *thread_queue;
|
||||||
switch_thread_t *manager_thread;
|
switch_thread_t *manager_thread;
|
||||||
switch_mutex_t *mutex;
|
switch_mutex_t *mutex;
|
||||||
|
switch_thread_cond_t *cond;
|
||||||
|
switch_mutex_t *cond_mutex;
|
||||||
|
switch_mutex_t *cond2_mutex;
|
||||||
int ready;
|
int ready;
|
||||||
int running;
|
int running;
|
||||||
int busy;
|
int busy;
|
||||||
int popping;
|
int popping;
|
||||||
|
int starting;
|
||||||
};
|
};
|
||||||
|
|
||||||
extern struct switch_session_manager session_manager;
|
extern struct switch_session_manager session_manager;
|
||||||
|
|
|
@ -1779,6 +1779,12 @@ static void switch_load_core_config(const char *file)
|
||||||
switch_core_hash_insert(runtime.ptimes, "isac", &d_30);
|
switch_core_hash_insert(runtime.ptimes, "isac", &d_30);
|
||||||
switch_core_hash_insert(runtime.ptimes, "G723", &d_30);
|
switch_core_hash_insert(runtime.ptimes, "G723", &d_30);
|
||||||
|
|
||||||
|
if (runtime.cpu_count == 1) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING,
|
||||||
|
"Implicitly setting events-use-dispatch based on a single CPU\n");
|
||||||
|
runtime.events_use_dispatch = 1;
|
||||||
|
}
|
||||||
|
|
||||||
if ((xml = switch_xml_open_cfg(file, &cfg, NULL))) {
|
if ((xml = switch_xml_open_cfg(file, &cfg, NULL))) {
|
||||||
switch_xml_t settings, param;
|
switch_xml_t settings, param;
|
||||||
|
|
||||||
|
|
|
@ -1581,6 +1581,7 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
|
||||||
int check = 0;
|
int check = 0;
|
||||||
|
|
||||||
switch_mutex_lock(session_manager.mutex);
|
switch_mutex_lock(session_manager.mutex);
|
||||||
|
session_manager.starting--;
|
||||||
session_manager.running++;
|
session_manager.running++;
|
||||||
switch_mutex_unlock(session_manager.mutex);
|
switch_mutex_unlock(session_manager.mutex);
|
||||||
#ifdef DEBUG_THREAD_POOL
|
#ifdef DEBUG_THREAD_POOL
|
||||||
|
@ -1675,6 +1676,33 @@ static void thread_launch_failure(void)
|
||||||
switch_mutex_unlock(session_manager.mutex);
|
switch_mutex_unlock(session_manager.mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int wake_queue(void)
|
||||||
|
{
|
||||||
|
switch_status_t status;
|
||||||
|
int tries = 0;
|
||||||
|
|
||||||
|
top:
|
||||||
|
|
||||||
|
status = switch_mutex_trylock(session_manager.cond_mutex);
|
||||||
|
|
||||||
|
if (status == SWITCH_STATUS_SUCCESS) {
|
||||||
|
switch_thread_cond_signal(session_manager.cond);
|
||||||
|
switch_mutex_unlock(session_manager.cond_mutex);
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) {
|
||||||
|
switch_mutex_unlock(session_manager.cond2_mutex);
|
||||||
|
} else {
|
||||||
|
if (++tries < 10) {
|
||||||
|
switch_cond_next();
|
||||||
|
goto top;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static switch_status_t check_queue(void)
|
static switch_status_t check_queue(void)
|
||||||
{
|
{
|
||||||
switch_status_t status = SWITCH_STATUS_FALSE;
|
switch_status_t status = SWITCH_STATUS_FALSE;
|
||||||
|
@ -1683,7 +1711,7 @@ static switch_status_t check_queue(void)
|
||||||
|
|
||||||
switch_mutex_lock(session_manager.mutex);
|
switch_mutex_lock(session_manager.mutex);
|
||||||
ttl = switch_queue_size(session_manager.thread_queue);
|
ttl = switch_queue_size(session_manager.thread_queue);
|
||||||
x = (session_manager.running - session_manager.busy);
|
x = ((session_manager.running + session_manager.starting) - session_manager.busy);
|
||||||
switch_mutex_unlock(session_manager.mutex);
|
switch_mutex_unlock(session_manager.mutex);
|
||||||
|
|
||||||
|
|
||||||
|
@ -1710,6 +1738,10 @@ static switch_status_t check_queue(void)
|
||||||
} else {
|
} else {
|
||||||
status = SWITCH_STATUS_SUCCESS;
|
status = SWITCH_STATUS_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch_mutex_lock(session_manager.mutex);
|
||||||
|
session_manager.starting++;
|
||||||
|
switch_mutex_unlock(session_manager.mutex);
|
||||||
x++;
|
x++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1719,12 +1751,20 @@ static switch_status_t check_queue(void)
|
||||||
|
|
||||||
static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj)
|
static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj)
|
||||||
{
|
{
|
||||||
int x = 0;
|
|
||||||
|
uint32_t sleep = 10000000;
|
||||||
|
switch_time_t next = switch_micro_time_now() + sleep;
|
||||||
|
|
||||||
|
switch_mutex_lock(session_manager.cond_mutex);
|
||||||
|
|
||||||
while(session_manager.ready) {
|
while(session_manager.ready) {
|
||||||
switch_yield(100000);
|
int check = 1;
|
||||||
|
|
||||||
if (++x == 300) {
|
switch_mutex_lock(session_manager.cond2_mutex);
|
||||||
|
switch_thread_cond_timedwait(session_manager.cond, session_manager.cond_mutex, sleep);
|
||||||
|
switch_mutex_unlock(session_manager.cond2_mutex);
|
||||||
|
|
||||||
|
if (switch_micro_time_now() >= next) {
|
||||||
if (session_manager.popping) {
|
if (session_manager.popping) {
|
||||||
#ifdef DEBUG_THREAD_POOL
|
#ifdef DEBUG_THREAD_POOL
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10,
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10,
|
||||||
|
@ -1732,17 +1772,20 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_t
|
||||||
#endif
|
#endif
|
||||||
switch_queue_interrupt_all(session_manager.thread_queue);
|
switch_queue_interrupt_all(session_manager.thread_queue);
|
||||||
|
|
||||||
x--;
|
sleep = 100000;
|
||||||
|
check = 0;
|
||||||
continue;
|
|
||||||
} else {
|
} else {
|
||||||
x = 0;
|
sleep = 10000000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
check_queue();
|
if (check) check_queue();
|
||||||
|
|
||||||
|
next = switch_micro_time_now() + sleep;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch_mutex_unlock(session_manager.cond_mutex);
|
||||||
|
|
||||||
while(session_manager.running) {
|
while(session_manager.running) {
|
||||||
switch_queue_interrupt_all(session_manager.thread_queue);
|
switch_queue_interrupt_all(session_manager.thread_queue);
|
||||||
switch_yield(20000);
|
switch_yield(20000);
|
||||||
|
@ -1763,7 +1806,7 @@ SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_d
|
||||||
*tdp = NULL;
|
*tdp = NULL;
|
||||||
|
|
||||||
switch_queue_push(session_manager.thread_queue, td);
|
switch_queue_push(session_manager.thread_queue, td);
|
||||||
check_queue();
|
wake_queue();
|
||||||
|
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
@ -1786,7 +1829,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_co
|
||||||
td->obj = session;
|
td->obj = session;
|
||||||
td->func = switch_core_session_thread;
|
td->func = switch_core_session_thread;
|
||||||
switch_queue_push(session_manager.thread_queue, td);
|
switch_queue_push(session_manager.thread_queue, td);
|
||||||
check_queue();
|
wake_queue();
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(session->mutex);
|
switch_mutex_unlock(session->mutex);
|
||||||
|
|
||||||
|
@ -2431,6 +2474,9 @@ void switch_core_session_init(switch_memory_pool_t *pool)
|
||||||
switch_threadattr_t *thd_attr;
|
switch_threadattr_t *thd_attr;
|
||||||
|
|
||||||
switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
|
switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
|
||||||
|
switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool);
|
||||||
|
switch_mutex_init(&session_manager.cond_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
|
||||||
|
switch_mutex_init(&session_manager.cond2_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
|
||||||
switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
|
switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
|
||||||
switch_threadattr_create(&thd_attr, session_manager.memory_pool);
|
switch_threadattr_create(&thd_attr, session_manager.memory_pool);
|
||||||
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||||
|
|
Loading…
Reference in New Issue