deliver events with the core thread pool set events-use-dispatch=true in switch.conf.xml to use the old way

This commit is contained in:
Anthony Minessale 2013-09-05 03:42:35 +05:00
parent 670b496c48
commit 4158393f4e
5 changed files with 79 additions and 24 deletions

View File

@ -273,6 +273,7 @@ struct switch_runtime {
char *core_db_post_trans_execute; char *core_db_post_trans_execute;
char *core_db_inner_pre_trans_execute; char *core_db_inner_pre_trans_execute;
char *core_db_inner_post_trans_execute; char *core_db_inner_post_trans_execute;
int events_use_dispatch;
}; };
extern struct switch_runtime runtime; extern struct switch_runtime runtime;

View File

@ -65,6 +65,7 @@ typedef struct switch_thread_data_s {
switch_thread_start_t func; switch_thread_start_t func;
void *obj; void *obj;
int alloc; int alloc;
switch_memory_pool_t *pool;
} switch_thread_data_t; } switch_thread_data_t;
typedef struct switch_hold_record_s { typedef struct switch_hold_record_s {

View File

@ -1950,9 +1950,18 @@ static void switch_load_core_config(const char *file)
switch_core_min_idle_cpu(atof(val)); switch_core_min_idle_cpu(atof(val));
} else if (!strcasecmp(var, "tipping-point") && !zstr(val)) { } else if (!strcasecmp(var, "tipping-point") && !zstr(val)) {
runtime.tipping_point = atoi(val); runtime.tipping_point = atoi(val);
} else if (!strcasecmp(var, "events-use-dispatch") && !zstr(val)) {
runtime.events_use_dispatch = 1;
} else if (!strcasecmp(var, "initial-event-threads") && !zstr(val)) { } else if (!strcasecmp(var, "initial-event-threads") && !zstr(val)) {
int tmp = atoi(val); int tmp;
if (!runtime.events_use_dispatch) {
runtime.events_use_dispatch = 1;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING,
"Implicitly setting events-use-dispatch based on usage of this initial-event-threads parameter.\n");
}
tmp = atoi(val);
if (tmp > runtime.cpu_count / 2) { if (tmp > runtime.cpu_count / 2) {
tmp = runtime.cpu_count / 2; tmp = runtime.cpu_count / 2;

View File

@ -1615,7 +1615,11 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
td->func(thread, td->obj); td->func(thread, td->obj);
if (td->alloc) { if (td->pool) {
switch_memory_pool_t *pool = td->pool;
td = NULL;
switch_core_destroy_memory_pool(&pool);
} else if (td->alloc) {
free(td); free(td);
} }

View File

@ -35,6 +35,7 @@
#include <switch.h> #include <switch.h>
#include <switch_event.h> #include <switch_event.h>
#include "tpl.h" #include "tpl.h"
#include "private/switch_core_pvt.h"
//#define SWITCH_EVENT_RECYCLE //#define SWITCH_EVENT_RECYCLE
#define DISPATCH_QUEUE_LEN 10000 #define DISPATCH_QUEUE_LEN 10000
@ -241,6 +242,34 @@ static int switch_events_match(switch_event_t *event, switch_event_node_t *node)
return match; return match;
} }
static void *SWITCH_THREAD_FUNC switch_event_deliver_thread(switch_thread_t *thread, void *obj)
{
switch_event_t *event = (switch_event_t *) obj;
switch_event_deliver(&event);
return NULL;
}
static void switch_event_deliver_thread_pool(switch_event_t **event)
{
switch_thread_data_t *td;
td = malloc(sizeof(*td));
switch_assert(td);
td->alloc = 1;
td->func = switch_event_deliver_thread;
td->obj = *event;
td->pool = NULL;
*event = NULL;
switch_thread_pool_launch_thread(&td);
}
static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *thread, void *obj) static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *thread, void *obj)
{ {
switch_queue_t *queue = (switch_queue_t *) obj; switch_queue_t *queue = (switch_queue_t *) obj;
@ -486,12 +515,14 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
SYSTEM_RUNNING = 0; SYSTEM_RUNNING = 0;
switch_mutex_unlock(EVENT_QUEUE_MUTEX); switch_mutex_unlock(EVENT_QUEUE_MUTEX);
if (runtime.events_use_dispatch) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n");
for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) { for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) {
switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL); switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL);
} }
switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE); switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n");
@ -500,6 +531,7 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
switch_status_t st; switch_status_t st;
switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]); switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]);
} }
}
x = 0; x = 0;
while (x < 100 && THREAD_COUNT) { while (x < 100 && THREAD_COUNT) {
@ -510,7 +542,7 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
last = THREAD_COUNT; last = THREAD_COUNT;
} }
{ if (runtime.events_use_dispatch) {
void *pop = NULL; void *pop = NULL;
switch_event_t *event = NULL; switch_event_t *event = NULL;
@ -619,18 +651,20 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool)
//switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); //switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
if (runtime.events_use_dispatch) {
switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, pool); switch_queue_create(&EVENT_DISPATCH_QUEUE, DISPATCH_QUEUE_LEN * MAX_DISPATCH, pool);
switch_event_launch_dispatch_threads(1); switch_event_launch_dispatch_threads(1);
}
//switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL); //switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL);
//switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL); //switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL);
//switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL); //switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL);
if (runtime.events_use_dispatch) {
while (!THREAD_COUNT) { while (!THREAD_COUNT) {
switch_cond_next(); switch_cond_next();
} }
}
switch_mutex_lock(EVENT_QUEUE_MUTEX); switch_mutex_lock(EVENT_QUEUE_MUTEX);
SYSTEM_RUNNING = 1; SYSTEM_RUNNING = 1;
@ -1883,10 +1917,16 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, con
(*event)->event_user_data = user_data; (*event)->event_user_data = user_data;
} }
if (runtime.events_use_dispatch) {
if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) { if (switch_event_queue_dispatch_event(event) != SWITCH_STATUS_SUCCESS) {
switch_event_destroy(event); switch_event_destroy(event);
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
} else {
switch_event_deliver_thread_pool(event);
}
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }