Merge pull request #475 in FS/freeswitch from ~ARTURZ/freeswitch:FS-8142-switch_core_session-thread-cache-races to master
* commit 'cd4c3188e4f715ff129dc4eea1a4ba50140c2a42': FS-8142 Fix a thread cache thread-safety and caching
This commit is contained in:
commit
c73d3123b2
|
@ -309,16 +309,10 @@ struct switch_session_manager {
|
|||
uint32_t session_limit;
|
||||
switch_size_t session_id;
|
||||
switch_queue_t *thread_queue;
|
||||
switch_thread_t *manager_thread;
|
||||
switch_mutex_t *mutex;
|
||||
switch_thread_cond_t *cond;
|
||||
switch_mutex_t *cond_mutex;
|
||||
switch_mutex_t *cond2_mutex;
|
||||
int ready;
|
||||
int running;
|
||||
int busy;
|
||||
int popping;
|
||||
int starting;
|
||||
};
|
||||
|
||||
extern struct switch_session_manager session_manager;
|
||||
|
|
|
@ -1675,45 +1675,18 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
|
|||
{
|
||||
switch_thread_pool_node_t *node = (switch_thread_pool_node_t *) obj;
|
||||
switch_memory_pool_t *pool = node->pool;
|
||||
void *pop;
|
||||
int check = 0;
|
||||
|
||||
switch_mutex_lock(session_manager.mutex);
|
||||
session_manager.starting--;
|
||||
session_manager.running++;
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
#ifdef DEBUG_THREAD_POOL
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Started\n", (long) (intptr_t) thread);
|
||||
#endif
|
||||
while(session_manager.ready) {
|
||||
switch_status_t check_status;
|
||||
|
||||
pop = NULL;
|
||||
|
||||
if (check) {
|
||||
check_status = switch_queue_trypop(session_manager.thread_queue, &pop);
|
||||
} else {
|
||||
switch_mutex_lock(session_manager.mutex);
|
||||
session_manager.popping++;
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
|
||||
check_status = switch_queue_pop(session_manager.thread_queue, &pop);
|
||||
|
||||
switch_mutex_lock(session_manager.mutex);
|
||||
session_manager.popping--;
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
}
|
||||
|
||||
if (check_status == SWITCH_STATUS_SUCCESS && pop) {
|
||||
while (1) {
|
||||
void *pop;
|
||||
switch_status_t check_status = switch_queue_pop_timeout(session_manager.thread_queue, &pop, apr_time_from_sec(5));
|
||||
if (check_status == SWITCH_STATUS_SUCCESS) {
|
||||
switch_thread_data_t *td = (switch_thread_data_t *) pop;
|
||||
|
||||
switch_mutex_lock(session_manager.mutex);
|
||||
session_manager.busy++;
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
#ifdef DEBUG_THREAD_POOL
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing\n", (long) (intptr_t) thread);
|
||||
#endif
|
||||
|
||||
td->func(thread, td->obj);
|
||||
|
||||
if (td->pool) {
|
||||
|
@ -1729,23 +1702,22 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
|
|||
switch_mutex_lock(session_manager.mutex);
|
||||
session_manager.busy--;
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
|
||||
} else {
|
||||
if (check) {
|
||||
switch_mutex_lock(session_manager.mutex);
|
||||
if (!switch_status_is_timeup(check_status) || session_manager.running > session_manager.busy) {
|
||||
if (!--session_manager.running) {
|
||||
switch_thread_cond_signal(session_manager.cond);
|
||||
}
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
break;
|
||||
}
|
||||
check++;
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
}
|
||||
}
|
||||
#ifdef DEBUG_THREAD_POOL
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Ended\n", (long)(intptr_t) thread);
|
||||
#endif
|
||||
switch_mutex_lock(session_manager.mutex);
|
||||
session_manager.running--;
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
|
||||
switch_core_destroy_memory_pool(&pool);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1772,46 +1744,18 @@ static void thread_launch_failure(void)
|
|||
switch_mutex_unlock(session_manager.mutex);
|
||||
}
|
||||
|
||||
static int wake_queue(void)
|
||||
{
|
||||
switch_status_t status;
|
||||
int tries = 0;
|
||||
|
||||
top:
|
||||
|
||||
status = switch_mutex_trylock(session_manager.cond_mutex);
|
||||
|
||||
if (status == SWITCH_STATUS_SUCCESS) {
|
||||
switch_thread_cond_signal(session_manager.cond);
|
||||
switch_mutex_unlock(session_manager.cond_mutex);
|
||||
return 1;
|
||||
} else {
|
||||
if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) {
|
||||
switch_mutex_unlock(session_manager.cond2_mutex);
|
||||
} else {
|
||||
if (++tries < 10) {
|
||||
switch_cond_next();
|
||||
goto top;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static switch_status_t check_queue(void)
|
||||
{
|
||||
switch_status_t status = SWITCH_STATUS_FALSE;
|
||||
int ttl = 0;
|
||||
int x = 0;
|
||||
|
||||
switch_mutex_lock(session_manager.mutex);
|
||||
ttl = switch_queue_size(session_manager.thread_queue);
|
||||
x = ((session_manager.running + session_manager.starting) - session_manager.busy);
|
||||
if (session_manager.running >= ++session_manager.busy) {
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
++session_manager.running;
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
|
||||
|
||||
while (x < ttl) {
|
||||
{
|
||||
switch_thread_t *thread;
|
||||
switch_threadattr_t *thd_attr;
|
||||
switch_memory_pool_t *pool;
|
||||
|
@ -1827,6 +1771,11 @@ static switch_status_t check_queue(void)
|
|||
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_LOW);
|
||||
|
||||
if (switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_worker, node, node->pool) != SWITCH_STATUS_SUCCESS) {
|
||||
switch_mutex_lock(session_manager.mutex);
|
||||
if (!--session_manager.running) {
|
||||
switch_thread_cond_signal(session_manager.cond);
|
||||
}
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Thread Failure!\n");
|
||||
switch_core_destroy_memory_pool(&pool);
|
||||
status = SWITCH_STATUS_GENERR;
|
||||
|
@ -1834,76 +1783,11 @@ static switch_status_t check_queue(void)
|
|||
} else {
|
||||
status = SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
switch_mutex_lock(session_manager.mutex);
|
||||
session_manager.starting++;
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
x++;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj)
|
||||
{
|
||||
|
||||
uint32_t sleep = 10000000;
|
||||
switch_time_t next = switch_micro_time_now() + sleep;
|
||||
|
||||
switch_mutex_lock(session_manager.cond_mutex);
|
||||
|
||||
while(session_manager.ready) {
|
||||
int check = 1;
|
||||
int ttl = 0;
|
||||
uint32_t xsleep = sleep;
|
||||
|
||||
switch_mutex_lock(session_manager.mutex);
|
||||
ttl = switch_queue_size(session_manager.thread_queue);
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
|
||||
|
||||
if (!ttl) {
|
||||
xsleep = 10000;
|
||||
}
|
||||
|
||||
if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) {
|
||||
switch_thread_cond_timedwait(session_manager.cond, session_manager.cond_mutex, xsleep);
|
||||
switch_mutex_unlock(session_manager.cond2_mutex);
|
||||
}
|
||||
|
||||
|
||||
if (switch_micro_time_now() >= next) {
|
||||
if (session_manager.popping) {
|
||||
#ifdef DEBUG_THREAD_POOL
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10,
|
||||
"Thread pool: running:%d busy:%d popping:%d\n", session_manager.running, session_manager.busy, session_manager.popping);
|
||||
#endif
|
||||
switch_queue_interrupt_all(session_manager.thread_queue);
|
||||
|
||||
sleep = 100000;
|
||||
check = 0;
|
||||
} else {
|
||||
sleep = 10000000;
|
||||
}
|
||||
}
|
||||
|
||||
if (check) check_queue();
|
||||
|
||||
next = switch_micro_time_now() + sleep;
|
||||
}
|
||||
|
||||
switch_mutex_unlock(session_manager.cond_mutex);
|
||||
|
||||
while(session_manager.running) {
|
||||
switch_queue_interrupt_all(session_manager.thread_queue);
|
||||
switch_yield(20000);
|
||||
}
|
||||
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp)
|
||||
{
|
||||
switch_status_t status = SWITCH_STATUS_SUCCESS;
|
||||
|
@ -1914,8 +1798,8 @@ SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_d
|
|||
td = *tdp;
|
||||
*tdp = NULL;
|
||||
|
||||
switch_queue_push(session_manager.thread_queue, td);
|
||||
wake_queue();
|
||||
status = switch_queue_push(session_manager.thread_queue, td);
|
||||
check_queue();
|
||||
|
||||
return status;
|
||||
}
|
||||
|
@ -1931,14 +1815,13 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_co
|
|||
} else if (switch_test_flag(session, SSF_THREAD_STARTED)) {
|
||||
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Cannot launch thread again after it has already been run!\n");
|
||||
} else {
|
||||
status = SWITCH_STATUS_SUCCESS;
|
||||
switch_set_flag(session, SSF_THREAD_RUNNING);
|
||||
switch_set_flag(session, SSF_THREAD_STARTED);
|
||||
td = switch_core_session_alloc(session, sizeof(*td));
|
||||
td->obj = session;
|
||||
td->func = switch_core_session_thread;
|
||||
switch_queue_push(session_manager.thread_queue, td);
|
||||
wake_queue();
|
||||
status = switch_queue_push(session_manager.thread_queue, td);
|
||||
check_queue();
|
||||
}
|
||||
switch_mutex_unlock(session->mutex);
|
||||
|
||||
|
@ -2581,39 +2464,19 @@ void switch_core_session_init(switch_memory_pool_t *pool)
|
|||
session_manager.session_id = 1;
|
||||
session_manager.memory_pool = pool;
|
||||
switch_core_hash_init(&session_manager.session_table);
|
||||
|
||||
if (switch_test_flag((&runtime), SCF_SESSION_THREAD_POOL)) {
|
||||
switch_threadattr_t *thd_attr;
|
||||
|
||||
switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
|
||||
switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool);
|
||||
switch_mutex_init(&session_manager.cond_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
|
||||
switch_mutex_init(&session_manager.cond2_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
|
||||
switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
|
||||
switch_threadattr_create(&thd_attr, session_manager.memory_pool);
|
||||
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||
session_manager.ready = 1;
|
||||
switch_thread_create(&session_manager.manager_thread, thd_attr, switch_core_session_thread_pool_manager, NULL, session_manager.memory_pool);
|
||||
}
|
||||
|
||||
switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_DEFAULT, session_manager.memory_pool);
|
||||
switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool);
|
||||
switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
|
||||
}
|
||||
|
||||
void switch_core_session_uninit(void)
|
||||
{
|
||||
int sanity = 100;
|
||||
switch_status_t st = SWITCH_STATUS_FALSE;
|
||||
|
||||
session_manager.ready = 0;
|
||||
wake_queue();
|
||||
|
||||
while(session_manager.running && --sanity > 0) {
|
||||
switch_queue_interrupt_all(session_manager.thread_queue);
|
||||
switch_yield(100000);
|
||||
}
|
||||
|
||||
switch_thread_join(&st, session_manager.manager_thread);
|
||||
switch_queue_term(session_manager.thread_queue);
|
||||
switch_mutex_lock(session_manager.mutex);
|
||||
if (session_manager.running)
|
||||
switch_thread_cond_timedwait(session_manager.cond, session_manager.mutex, apr_time_from_sec(10));
|
||||
switch_mutex_unlock(session_manager.mutex);
|
||||
switch_core_hash_destroy(&session_manager.session_table);
|
||||
|
||||
}
|
||||
|
||||
SWITCH_DECLARE(switch_app_log_t *) switch_core_session_get_app_log(switch_core_session_t *session)
|
||||
|
@ -3062,7 +2925,7 @@ SWITCH_DECLARE(switch_log_level_t) switch_core_session_get_loglevel(switch_core_
|
|||
SWITCH_DECLARE(void) switch_core_session_debug_pool(switch_stream_handle_t *stream)
|
||||
{
|
||||
stream->write_function(stream, "Thread pool: running:%d busy:%d popping:%d\n",
|
||||
session_manager.running, session_manager.busy, session_manager.popping);
|
||||
session_manager.running, session_manager.busy, session_manager.running - session_manager.busy);
|
||||
}
|
||||
|
||||
SWITCH_DECLARE(void) switch_core_session_raw_read(switch_core_session_t *session)
|
||||
|
|
Loading…
Reference in New Issue