diff --git a/src/switch_event.c b/src/switch_event.c index 1301d92477..cfd6a7a139 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -79,6 +79,7 @@ static switch_memory_pool_t *THRUNTIME_POOL = NULL; static switch_thread_t *EVENT_QUEUE_THREADS[NUMBER_OF_QUEUES] = { 0 }; static switch_queue_t *EVENT_QUEUE[NUMBER_OF_QUEUES] = { 0 }; static switch_thread_t *EVENT_DISPATCH_QUEUE_THREADS[MAX_DISPATCH_VAL] = { 0 }; +static uint8_t EVENT_DISPATCH_QUEUE_RUNNING[MAX_DISPATCH_VAL] = { 0 }; static switch_queue_t *EVENT_DISPATCH_QUEUE[MAX_DISPATCH_VAL] = { 0 }; static int POOL_COUNT_MAX = SWITCH_CORE_QUEUE_LEN; static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL; @@ -238,9 +239,9 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th { switch_queue_t *queue = (switch_queue_t *) obj; int my_id = 0; + switch_mutex_lock(EVENT_QUEUE_MUTEX); THREAD_COUNT++; - switch_mutex_unlock(EVENT_QUEUE_MUTEX); for (my_id = 0; my_id < NUMBER_OF_QUEUES; my_id++) { if (EVENT_DISPATCH_QUEUE[my_id] == queue) { @@ -248,6 +249,9 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th } } + EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1; + switch_mutex_unlock(EVENT_QUEUE_MUTEX); + for (;;) { void *pop = NULL; switch_event_t *event = NULL; @@ -270,6 +274,7 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th switch_mutex_lock(EVENT_QUEUE_MUTEX); + EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1; THREAD_COUNT--; switch_mutex_unlock(EVENT_QUEUE_MUTEX); @@ -298,6 +303,7 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi for (;;) { void *pop = NULL; switch_event_t *event = NULL; + int loops = 0; if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) { break; @@ -314,13 +320,13 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi event = (switch_event_t *) pop; while (event) { - int max; - switch_mutex_lock(EVENT_QUEUE_MUTEX); - max = SOFT_MAX_DISPATCH; - switch_mutex_unlock(EVENT_QUEUE_MUTEX); + if (++loops > 2) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event system overloading\n"); + switch_yield(1000000); + } - for (index = 0; (int)index < max; index++) { + for (index = 0; index < SOFT_MAX_DISPATCH; index++) { if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], event) == SWITCH_STATUS_SUCCESS) { event = NULL; break; @@ -328,19 +334,15 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi } if (event) { - switch_mutex_lock(EVENT_QUEUE_MUTEX); if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Adding a new event thread #%d\n", SOFT_MAX_DISPATCH + 1); + switch_mutex_lock(EVENT_QUEUE_MUTEX); launch_dispatch_threads(SOFT_MAX_DISPATCH + 1, DISPATCH_QUEUE_LEN, RUNTIME_POOL); + switch_mutex_unlock(EVENT_QUEUE_MUTEX); } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event threads maxed out at %d.\n", SOFT_MAX_DISPATCH); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Out of threads!\n"); switch_yield(1000000); } - switch_mutex_unlock(EVENT_QUEUE_MUTEX); } - - - switch_cond_next(); } } @@ -566,6 +568,8 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t { switch_threadattr_t *thd_attr; uint32_t index = 0; + int launched = 0; + uint32_t sanity = 200; if (max > MAX_DISPATCH) { return; @@ -584,8 +588,10 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_priority_increase(thd_attr); switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE[index], pool); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Create event dispatch thread %d\n", index); - switch_yield(100000); + while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Create event dispatch thread %d\n", index); + launched++; + break; } SOFT_MAX_DISPATCH = index;