diff --git a/src/include/private/switch_core_pvt.h b/src/include/private/switch_core_pvt.h index 0acd41c3fb..1b0ab28742 100644 --- a/src/include/private/switch_core_pvt.h +++ b/src/include/private/switch_core_pvt.h @@ -273,6 +273,7 @@ struct switch_runtime { char *core_db_post_trans_execute; char *core_db_inner_pre_trans_execute; char *core_db_inner_post_trans_execute; + int events_use_dispatch; }; extern struct switch_runtime runtime; diff --git a/src/include/switch_core.h b/src/include/switch_core.h index 60abb21194..58b2691531 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -65,6 +65,7 @@ typedef struct switch_thread_data_s { switch_thread_start_t func; void *obj; int alloc; + switch_memory_pool_t *pool; } switch_thread_data_t; typedef struct switch_hold_record_s { diff --git a/src/switch_core.c b/src/switch_core.c index 513ffb16f2..87f83f6ec2 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -1950,9 +1950,18 @@ static void switch_load_core_config(const char *file) switch_core_min_idle_cpu(atof(val)); } else if (!strcasecmp(var, "tipping-point") && !zstr(val)) { runtime.tipping_point = atoi(val); + } else if (!strcasecmp(var, "events-use-dispatch") && !zstr(val)) { + runtime.events_use_dispatch = 1; } else if (!strcasecmp(var, "initial-event-threads") && !zstr(val)) { - int tmp = atoi(val); + int tmp; + if (!runtime.events_use_dispatch) { + runtime.events_use_dispatch = 1; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, + "Implicitly setting events-use-dispatch based on usage of this initial-event-threads parameter.\n"); + } + + tmp = atoi(val); if (tmp > runtime.cpu_count / 2) { tmp = runtime.cpu_count / 2; diff --git a/src/switch_core_session.c b/src/switch_core_session.c index 177737b768..389345fca6 100644 --- a/src/switch_core_session.c +++ b/src/switch_core_session.c @@ -1615,10 +1615,14 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th td->func(thread, td->obj); - if (td->alloc) { + if (td->pool) { + switch_memory_pool_t *pool = td->pool; + td = NULL; + switch_core_destroy_memory_pool(&pool); + } else if (td->alloc) { free(td); } - + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Done Processing\n", (long) thread); switch_mutex_lock(session_manager.mutex); diff --git a/src/switch_event.c b/src/switch_event.c index de875b6337..ef30be5be8 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -35,6 +35,7 @@ #include #include #include "tpl.h" +#include "private/switch_core_pvt.h" //#define SWITCH_EVENT_RECYCLE #define DISPATCH_QUEUE_LEN 10000 @@ -241,6 +242,34 @@ static int switch_events_match(switch_event_t *event, switch_event_node_t *node) return match; } + +static void *SWITCH_THREAD_FUNC switch_event_deliver_thread(switch_thread_t *thread, void *obj) +{ + switch_event_t *event = (switch_event_t *) obj; + + switch_event_deliver(&event); + + return NULL; +} + +static void switch_event_deliver_thread_pool(switch_event_t **event) +{ + switch_thread_data_t *td; + + td = malloc(sizeof(*td)); + switch_assert(td); + + td->alloc = 1; + td->func = switch_event_deliver_thread; + td->obj = *event; + td->pool = NULL; + + *event = NULL; + + switch_thread_pool_launch_thread(&td); + +} + static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *thread, void *obj) { switch_queue_t *queue = (switch_queue_t *) obj; @@ -486,19 +515,22 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) SYSTEM_RUNNING = 0; switch_mutex_unlock(EVENT_QUEUE_MUTEX); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n"); + if (runtime.events_use_dispatch) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n"); - for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) { - switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL); - } + for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) { + switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL); + } + - switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n"); - - for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) { - switch_status_t st; - switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]); + switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n"); + + for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) { + switch_status_t st; + switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]); + } } x = 0; @@ -510,7 +542,7 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) last = THREAD_COUNT; } - { + if (runtime.events_use_dispatch) { void *pop = NULL; switch_event_t *event = NULL; @@ -619,19 +651,21 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool) //switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - - switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, pool); - switch_event_launch_dispatch_threads(1); + if (runtime.events_use_dispatch) { + switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, pool); + switch_event_launch_dispatch_threads(1); + } //switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL); //switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL); //switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL); - while (!THREAD_COUNT) { - switch_cond_next(); + if (runtime.events_use_dispatch) { + while (!THREAD_COUNT) { + switch_cond_next(); + } } - switch_mutex_lock(EVENT_QUEUE_MUTEX); SYSTEM_RUNNING = 1; switch_mutex_unlock(EVENT_QUEUE_MUTEX); @@ -1883,9 +1917,15 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, con (*event)->event_user_data = user_data; } - if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) { - switch_event_destroy(event); - return SWITCH_STATUS_FALSE; + + + if (runtime.events_use_dispatch) { + if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) { + switch_event_destroy(event); + return SWITCH_STATUS_FALSE; + } + } else { + switch_event_deliver_thread_pool(event); } return SWITCH_STATUS_SUCCESS;