diff --git a/src/include/switch_core.h b/src/include/switch_core.h index 8f9220d0b9..9803c6708b 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -2329,6 +2329,8 @@ SWITCH_DECLARE(void) switch_say_file(switch_say_file_handle_t *sh, const char *f SWITCH_DECLARE(int) switch_max_file_desc(void); SWITCH_DECLARE(void) switch_close_extra_files(int *keep, int keep_ttl); SWITCH_DECLARE(switch_status_t) switch_core_thread_set_cpu_affinity(int cpu); +SWITCH_DECLARE(void) switch_os_yield(void); + SWITCH_END_EXTERN_C #endif /* For Emacs: diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.c b/src/mod/endpoints/mod_sofia/mod_sofia.c index 164d0e76ff..6f64078509 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.c +++ b/src/mod/endpoints/mod_sofia/mod_sofia.c @@ -5445,7 +5445,10 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_sofia_load) switch_yield(1500000); mod_sofia_globals.cpu_count = switch_core_cpu_count(); - mod_sofia_globals.max_msg_queues = mod_sofia_globals.cpu_count + 1; + mod_sofia_globals.max_msg_queues = (mod_sofia_globals.cpu_count / 2) + 1; + if (mod_sofia_globals.max_msg_queues < 2) { + mod_sofia_globals.max_msg_queues = 2; + } if (mod_sofia_globals.max_msg_queues > SOFIA_MAX_MSG_QUEUE) { mod_sofia_globals.max_msg_queues = SOFIA_MAX_MSG_QUEUE; @@ -5627,11 +5630,12 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_sofia_shutdown) } - for (i = 0; i < mod_sofia_globals.msg_queue_len; i++) { - switch_queue_push(mod_sofia_globals.msg_queue[i], NULL); + for (i = 0; mod_sofia_globals.msg_queue_thread[i]; i++) { + switch_queue_push(mod_sofia_globals.msg_queue, NULL); } - for (i = 0; i < mod_sofia_globals.msg_queue_len; i++) { + + for (i = 0; mod_sofia_globals.msg_queue_thread[i]; i++) { switch_status_t st; switch_thread_join(&st, mod_sofia_globals.msg_queue_thread[i]); } diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.h b/src/mod/endpoints/mod_sofia/mod_sofia.h index b96ff52011..c0fc7dfba4 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.h +++ b/src/mod/endpoints/mod_sofia/mod_sofia.h @@ -342,7 +342,7 @@ typedef enum { } TFLAGS; #define SOFIA_MAX_MSG_QUEUE 64 -#define SOFIA_MSG_QUEUE_SIZE 250 +#define SOFIA_MSG_QUEUE_SIZE 100 struct mod_sofia_globals { switch_memory_pool_t *pool; @@ -359,7 +359,7 @@ struct mod_sofia_globals { char hostname[512]; switch_queue_t *presence_queue; switch_queue_t *mwi_queue; - switch_queue_t *msg_queue[SOFIA_MAX_MSG_QUEUE]; + switch_queue_t *msg_queue; switch_thread_t *msg_queue_thread[SOFIA_MAX_MSG_QUEUE]; int msg_queue_len; struct sofia_private destroy_private; diff --git a/src/mod/endpoints/mod_sofia/sofia.c b/src/mod/endpoints/mod_sofia/sofia.c index 0e579ca1f8..8eb122b310 100644 --- a/src/mod/endpoints/mod_sofia/sofia.c +++ b/src/mod/endpoints/mod_sofia/sofia.c @@ -1347,34 +1347,56 @@ void sofia_process_dispatch_event(sofia_dispatch_event_t **dep) nua_handle_unref(nh); nua_stack_unref(nua); + switch_os_yield(); } + +static int msg_queue_threads = 0; +//static int count = 0; + void *SWITCH_THREAD_FUNC sofia_msg_thread_run(switch_thread_t *thread, void *obj) { void *pop; switch_queue_t *q = (switch_queue_t *) obj; int my_id; + for (my_id = 0; my_id < mod_sofia_globals.msg_queue_len; my_id++) { - if (mod_sofia_globals.msg_queue[my_id] == q) { + if (mod_sofia_globals.msg_queue_thread[my_id] == thread) { + break; + } + } + + switch_mutex_lock(mod_sofia_globals.mutex); + msg_queue_threads++; + switch_mutex_unlock(mod_sofia_globals.mutex); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread %d Started\n", my_id); + + + for(;;) { + + if (switch_queue_pop(q, &pop) != SWITCH_STATUS_SUCCESS) { + switch_cond_next(); + continue; + } + + if (pop) { + sofia_dispatch_event_t *de = (sofia_dispatch_event_t *) pop; + sofia_process_dispatch_event(&de); + switch_os_yield(); + } else { break; } } - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread %d Started\n", my_id); - - switch_core_thread_set_cpu_affinity(my_id); - - while(switch_queue_pop(q, &pop) == SWITCH_STATUS_SUCCESS && pop) { - sofia_dispatch_event_t *de = (sofia_dispatch_event_t *) pop; - sofia_process_dispatch_event(&de); - switch_cond_next(); - } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread Ended\n"); + switch_mutex_lock(mod_sofia_globals.mutex); + msg_queue_threads--; + switch_mutex_unlock(mod_sofia_globals.mutex); + return NULL; } @@ -1392,11 +1414,14 @@ void sofia_msg_thread_start(int idx) int i; mod_sofia_globals.msg_queue_len = idx + 1; - for (i = 0; i < mod_sofia_globals.msg_queue_len; i++) { - if (!mod_sofia_globals.msg_queue[i]) { - switch_threadattr_t *thd_attr = NULL; + if (!mod_sofia_globals.msg_queue) { + switch_queue_create(&mod_sofia_globals.msg_queue, SOFIA_MSG_QUEUE_SIZE * mod_sofia_globals.cpu_count, mod_sofia_globals.pool); + } - switch_queue_create(&mod_sofia_globals.msg_queue[i], SOFIA_MSG_QUEUE_SIZE, mod_sofia_globals.pool); + + for (i = 0; i < mod_sofia_globals.msg_queue_len; i++) { + if (!mod_sofia_globals.msg_queue_thread[i]) { + switch_threadattr_t *thd_attr = NULL; switch_threadattr_create(&thd_attr, mod_sofia_globals.pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); @@ -1404,7 +1429,7 @@ void sofia_msg_thread_start(int idx) switch_thread_create(&mod_sofia_globals.msg_queue_thread[i], thd_attr, sofia_msg_thread_run, - mod_sofia_globals.msg_queue[i], + mod_sofia_globals.msg_queue, mod_sofia_globals.pool); } } @@ -1413,12 +1438,12 @@ void sofia_msg_thread_start(int idx) switch_mutex_unlock(mod_sofia_globals.mutex); } - +//static int foo = 0; static void sofia_queue_message(sofia_dispatch_event_t *de) { - int idx = 0, queued = 0; + int launch = 0; - if (mod_sofia_globals.running == 0 || !mod_sofia_globals.msg_queue[0]) { + if (mod_sofia_globals.running == 0 || !mod_sofia_globals.msg_queue) { sofia_process_dispatch_event(&de); return; } @@ -1430,25 +1455,18 @@ static void sofia_queue_message(sofia_dispatch_event_t *de) } - again: - - for (idx = 0; idx < mod_sofia_globals.msg_queue_len; idx++) { - if (switch_queue_trypush(mod_sofia_globals.msg_queue[idx], de) == SWITCH_STATUS_SUCCESS) { - queued++; - break; - } + if ((switch_queue_size(mod_sofia_globals.msg_queue) > (SOFIA_MSG_QUEUE_SIZE * msg_queue_threads))) { + launch++; } - if (!queued) { + if (launch) { if (mod_sofia_globals.msg_queue_len < mod_sofia_globals.max_msg_queues) { sofia_msg_thread_start(mod_sofia_globals.msg_queue_len + 1); - goto again; } - - switch_queue_push(mod_sofia_globals.msg_queue[0], de); } - + + switch_queue_push(mod_sofia_globals.msg_queue, de); } @@ -1468,6 +1486,7 @@ void sofia_event_callback(nua_event_t event, return; } + switch_mutex_lock(profile->flag_mutex); profile->queued_events++; @@ -1483,6 +1502,13 @@ void sofia_event_callback(nua_event_t event, de->nua = nua_stack_ref(nua); if (event == nua_i_invite && !sofia_private) { + int critical = (((SOFIA_MSG_QUEUE_SIZE * mod_sofia_globals.max_msg_queues) * 900) / 1000); + + if (switch_queue_size(mod_sofia_globals.msg_queue) > critical) { + nua_respond(nh, 503, "Maximum Calls In Progress", SIPTAG_RETRY_AFTER_STR("300"), TAG_END()); + return; + } + if (!(sofia_private = su_alloc(nh->nh_home, sizeof(*sofia_private)))) { abort(); } @@ -1516,8 +1542,8 @@ void sofia_event_callback(nua_event_t event, } } - sofia_queue_message(de); + switch_os_yield(); } diff --git a/src/switch_channel.c b/src/switch_channel.c index f7d7e3aae4..7afe342103 100644 --- a/src/switch_channel.c +++ b/src/switch_channel.c @@ -1415,7 +1415,7 @@ SWITCH_DECLARE(void) switch_channel_wait_for_state(switch_channel_t *channel, sw (other_channel && switch_channel_down_nosig(other_channel)) || switch_channel_down(channel)) { break; } - switch_yield(20000); + switch_cond_next(); } } diff --git a/src/switch_event.c b/src/switch_event.c index 23670fac93..38ec16bf26 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -35,7 +35,7 @@ #include #include //#define SWITCH_EVENT_RECYCLE -#define DISPATCH_QUEUE_LEN 1000 +#define DISPATCH_QUEUE_LEN 100 //#define DEBUG_DISPATCH_QUEUES /*! \brief A node to store binded events */ @@ -74,23 +74,20 @@ static switch_mutex_t *BLOCK = NULL; static switch_mutex_t *POOL_LOCK = NULL; static switch_memory_pool_t *RUNTIME_POOL = NULL; static switch_memory_pool_t *THRUNTIME_POOL = NULL; -#define NUMBER_OF_QUEUES 3 -static switch_thread_t *EVENT_QUEUE_THREADS[NUMBER_OF_QUEUES] = { 0 }; -static switch_queue_t *EVENT_QUEUE[NUMBER_OF_QUEUES] = { 0 }; static switch_thread_t *EVENT_DISPATCH_QUEUE_THREADS[MAX_DISPATCH_VAL] = { 0 }; static uint8_t EVENT_DISPATCH_QUEUE_RUNNING[MAX_DISPATCH_VAL] = { 0 }; -static switch_queue_t *EVENT_DISPATCH_QUEUE[MAX_DISPATCH_VAL] = { 0 }; -static int POOL_COUNT_MAX = SWITCH_CORE_QUEUE_LEN; +static switch_queue_t *EVENT_DISPATCH_QUEUE = NULL; static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL; static switch_hash_t *CUSTOM_HASH = NULL; static int THREAD_COUNT = 0; +static int DISPATCH_THREAD_COUNT = 0; static int SYSTEM_RUNNING = 0; static uint64_t EVENT_SEQUENCE_NR = 0; #ifdef SWITCH_EVENT_RECYCLE static switch_queue_t *EVENT_RECYCLE_QUEUE = NULL; static switch_queue_t *EVENT_HEADER_RECYCLE_QUEUE = NULL; #endif -static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t *pool); +static void launch_dispatch_threads(uint32_t max, switch_memory_pool_t *pool); static char *my_dup(const char *s) { @@ -244,9 +241,10 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th switch_mutex_lock(EVENT_QUEUE_MUTEX); THREAD_COUNT++; + DISPATCH_THREAD_COUNT++; - for (my_id = 0; my_id < NUMBER_OF_QUEUES; my_id++) { - if (EVENT_DISPATCH_QUEUE[my_id] == queue) { + for (my_id = 0; my_id < MAX_DISPATCH_VAL; my_id++) { + if (EVENT_DISPATCH_QUEUE_THREADS[my_id] == thread) { break; } } @@ -254,7 +252,6 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1; switch_mutex_unlock(EVENT_QUEUE_MUTEX); - switch_core_thread_set_cpu_affinity(my_id); for (;;) { void *pop = NULL; @@ -265,7 +262,7 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th } if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) { - break; + continue; } if (!pop) { @@ -280,6 +277,7 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th switch_mutex_lock(EVENT_QUEUE_MUTEX); EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 0; THREAD_COUNT--; + DISPATCH_THREAD_COUNT--; switch_mutex_unlock(EVENT_QUEUE_MUTEX); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Dispatch Thread %d Ended.\n", my_id); @@ -287,74 +285,41 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th } - -static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, void *obj) +static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp) { - switch_queue_t *queue = (switch_queue_t *) obj; - uint32_t index = 0; - int my_id = 0; - switch_mutex_lock(EVENT_QUEUE_MUTEX); - THREAD_COUNT++; - switch_mutex_unlock(EVENT_QUEUE_MUTEX); + switch_event_t *event = *eventp; - for (my_id = 0; my_id < NUMBER_OF_QUEUES; my_id++) { - if (EVENT_QUEUE[my_id] == queue) { - break; - } + if (!SYSTEM_RUNNING) { + return SWITCH_STATUS_FALSE; } + + while (event) { + int launch = 0; - for (;;) { - void *pop = NULL; - switch_event_t *event = NULL; + switch_mutex_lock(EVENT_QUEUE_MUTEX); - if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) { - break; + if (switch_queue_size(EVENT_DISPATCH_QUEUE) > (DISPATCH_QUEUE_LEN * DISPATCH_THREAD_COUNT)) { + launch++; } - - if (!pop) { - break; - } - - if (!SYSTEM_RUNNING) { - break; - } - - event = (switch_event_t *) pop; - - while (event) { - - for (index = 0; index < SOFT_MAX_DISPATCH; index++) { - if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], event) == SWITCH_STATUS_SUCCESS) { - event = NULL; - break; - } - } - - if (event) { - if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) { - switch_mutex_lock(EVENT_QUEUE_MUTEX); - launch_dispatch_threads(SOFT_MAX_DISPATCH + 1, DISPATCH_QUEUE_LEN, RUNTIME_POOL); - switch_mutex_unlock(EVENT_QUEUE_MUTEX); - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Event Thread %d is blocking\n", my_id); - switch_queue_push(EVENT_DISPATCH_QUEUE[0], event); - event = NULL; - } + + if (launch) { + if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) { + launch_dispatch_threads(SOFT_MAX_DISPATCH + 1, RUNTIME_POOL); } } + + switch_mutex_unlock(EVENT_QUEUE_MUTEX); + + *eventp = NULL; + switch_queue_push(EVENT_DISPATCH_QUEUE, event); + event = NULL; + } - - switch_mutex_lock(EVENT_QUEUE_MUTEX); - THREAD_COUNT--; - switch_mutex_unlock(EVENT_QUEUE_MUTEX); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread %d Ended.\n", my_id); - return NULL; - + + return SWITCH_STATUS_SUCCESS; } - SWITCH_DECLARE(void) switch_event_deliver(switch_event_t **event) { switch_event_types_t e; @@ -499,20 +464,23 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) SYSTEM_RUNNING = 0; switch_mutex_unlock(EVENT_QUEUE_MUTEX); - for (x = 0; x < 3; x++) { - if (EVENT_QUEUE[x]) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping event queue %d\n", x); - switch_queue_trypush(EVENT_QUEUE[x], NULL); - switch_queue_interrupt_all(EVENT_QUEUE[x]); - } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n"); + + + for(x = 0; x < DISPATCH_THREAD_COUNT; x++) { + switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL); } - for (x = 0; x < SOFT_MAX_DISPATCH; x++) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queue %d\n", x); - switch_queue_trypush(EVENT_DISPATCH_QUEUE[x], NULL); - switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE[x]); + switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n"); + + for(x = 0; x < DISPATCH_THREAD_COUNT; x++) { + switch_status_t st; + switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]); } + x = 0; while (x < 10000 && THREAD_COUNT) { switch_cond_next(); if (THREAD_COUNT == last) { @@ -521,37 +489,16 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) last = THREAD_COUNT; } - for (x = 0; x < SOFT_MAX_DISPATCH; x++) { + { void *pop = NULL; switch_event_t *event = NULL; - switch_status_t st; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch thread %d\n", x); - switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]); - - while (switch_queue_trypop(EVENT_DISPATCH_QUEUE[x], &pop) == SWITCH_STATUS_SUCCESS && pop) { + while (switch_queue_trypop(EVENT_DISPATCH_QUEUE, &pop) == SWITCH_STATUS_SUCCESS && pop) { event = (switch_event_t *) pop; switch_event_destroy(&event); } } - for (x = 0; x < NUMBER_OF_QUEUES; x++) { - void *pop = NULL; - switch_event_t *event = NULL; - switch_status_t st; - - if (EVENT_QUEUE_THREADS[x]) { - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping queue thread %d\n", x); - switch_thread_join(&st, EVENT_QUEUE_THREADS[x]); - - while (switch_queue_trypop(EVENT_QUEUE[x], &pop) == SWITCH_STATUS_SUCCESS && pop) { - event = (switch_event_t *) pop; - switch_event_destroy(&event); - } - } - } - for (hi = switch_hash_first(NULL, CUSTOM_HASH); hi; hi = switch_hash_next(hi)) { switch_event_subclass_t *subclass; switch_hash_this(hi, &var, NULL, &val); @@ -568,7 +515,7 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) return SWITCH_STATUS_SUCCESS; } -static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t *pool) +static void launch_dispatch_threads(uint32_t max, switch_memory_pool_t *pool) { switch_threadattr_t *thd_attr; uint32_t index = 0; @@ -584,14 +531,14 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t } for (index = SOFT_MAX_DISPATCH; index < max && index < MAX_DISPATCH; index++) { - if (EVENT_DISPATCH_QUEUE[index]) { + if (EVENT_DISPATCH_QUEUE_THREADS[index]) { continue; } - switch_queue_create(&EVENT_DISPATCH_QUEUE[index], len, pool); + switch_threadattr_create(&thd_attr, pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_priority_increase(thd_attr); - switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE[index], pool); + switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE, pool); while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000); if (index == 1) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Create event dispatch thread %d\n", index); @@ -616,8 +563,10 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool) */ /* don't need any more dispatch threads than we have CPU's*/ - MAX_DISPATCH = switch_core_cpu_count() + 1; - + MAX_DISPATCH = (switch_core_cpu_count() / 2) + 1; + if (MAX_DISPATCH < 2) { + MAX_DISPATCH = 2; + } switch_assert(pool != NULL); THRUNTIME_POOL = RUNTIME_POOL = pool; @@ -648,7 +597,9 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool) //switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); //switch_threadattr_priority_increase(thd_attr); - launch_dispatch_threads(1, DISPATCH_QUEUE_LEN, RUNTIME_POOL); + switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, pool); + launch_dispatch_threads(1, RUNTIME_POOL); + //switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL); //switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL); //switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL); @@ -1782,30 +1733,11 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, con (*event)->event_user_data = user_data; } - if (!EVENT_QUEUE_THREADS[(*event)->priority] && (*event)->priority < 3) { - switch_threadattr_t *thd_attr; - - switch_queue_create(&EVENT_QUEUE[(*event)->priority], POOL_COUNT_MAX + 10, THRUNTIME_POOL); - switch_threadattr_create(&thd_attr, THRUNTIME_POOL); - switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_threadattr_priority_increase(thd_attr); - switch_thread_create(&EVENT_QUEUE_THREADS[(*event)->priority], thd_attr, switch_event_thread, EVENT_QUEUE[(*event)->priority], RUNTIME_POOL); + if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) { + switch_event_destroy(event); + return SWITCH_STATUS_FALSE; } - for (;;) { - if (switch_queue_trypush(EVENT_QUEUE[(*event)->priority], *event) == SWITCH_STATUS_SUCCESS) { - goto end; - } - - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event queue is full!\n"); - switch_yield(100000); - } - - end: - - *event = NULL; - return SWITCH_STATUS_SUCCESS; } diff --git a/src/switch_time.c b/src/switch_time.c index e7256538f3..381d2dc5ad 100644 --- a/src/switch_time.c +++ b/src/switch_time.c @@ -140,7 +140,7 @@ typedef struct timer_matrix timer_matrix_t; static timer_matrix_t TIMER_MATRIX[MAX_ELEMENTS + 1]; -static void os_yield(void) +SWITCH_DECLARE(void) switch_os_yield(void) { #if defined(WIN32) SwitchToThread(); @@ -467,7 +467,7 @@ SWITCH_DECLARE(void) switch_sleep(switch_interval_time_t t) SWITCH_DECLARE(void) switch_cond_next(void) { if (runtime.tipping_point && globals.timer_count >= runtime.tipping_point) { - os_yield(); + switch_os_yield(); return; } #ifdef DISABLE_1MS_COND @@ -633,7 +633,7 @@ static switch_status_t timer_next(switch_timer_t *timer) check_roll(); if (runtime.tipping_point && globals.timer_count >= runtime.tipping_point) { - os_yield(); + switch_os_yield(); globals.use_cond_yield = 0; } else { if (globals.use_cond_yield == 1) { @@ -884,7 +884,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(softtimer_runtime) } if (runtime.tipping_point && globals.timer_count >= runtime.tipping_point) { - os_yield(); + switch_os_yield(); } else { if (tfd > -1 && globals.RUNNING == 1) { uint64_t exp;