diff --git a/src/include/switch.h b/src/include/switch.h index bcdb11e757..f19a6b0881 100644 --- a/src/include/switch.h +++ b/src/include/switch.h @@ -61,6 +61,10 @@ extern "C" { #include #include #include +#include +#define APR_WANT_STDIO +#define APR_WANT_STRFUNC +#include #include #include diff --git a/src/include/switch_core.h b/src/include/switch_core.h index 0dcf1b08a2..1763ecaf7d 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -42,6 +42,7 @@ extern "C" { struct switch_core_thread_session { int running; void *objs[MAX_CORE_THREAD_SESSION_OBJS]; + switch_memory_pool *pool; }; struct switch_core_session; @@ -73,7 +74,7 @@ SWITCH_DECLARE(switch_status) switch_core_hash_insert(switch_hash *hash, char *k SWITCH_DECLARE(switch_status) switch_core_hash_insert_dup(switch_hash *hash, char *key, void *data); SWITCH_DECLARE(switch_status) switch_core_hash_delete(switch_hash *hash, char *key); SWITCH_DECLARE(void *) switch_core_hash_find(switch_hash *hash, char *key); -SWITCH_DECLARE(void) switch_core_launch_module_thread(void *(*func)(switch_thread *, void*), void *obj); +SWITCH_DECLARE(void) switch_core_launch_thread(void *(*func)(switch_thread *, void*), void *obj); SWITCH_DECLARE(FILE *) switch_core_data_channel(switch_text_channel channel); SWITCH_DECLARE(void) switch_core_session_launch_thread(switch_core_session *session, void *(*func)(switch_thread *, void *), void *obj); SWITCH_DECLARE(switch_status) switch_core_timer_init(switch_timer *timer, char *timer_name, int interval, int samples); diff --git a/src/include/switch_types.h b/src/include/switch_types.h index 61ca2077fd..57eae8d8c9 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -193,6 +193,7 @@ typedef switch_status (*switch_api_function)(char *in, char *out, size_t outlen) The pieces of apr we allow ppl to pass around between modules we typedef into our namespace and wrap all the functions any other apr code should be as hidden as possible. */ +typedef apr_strmatch_pattern switch_strmatch_pattern; typedef apr_uuid_t switch_uuid_t; typedef apr_queue_t switch_queue_t; typedef apr_hash_t switch_hash; @@ -228,6 +229,9 @@ typedef apr_hash_index_t switch_hash_index_t; #define switch_thread_cond_broadcast apr_thread_cond_broadcast #define switch_thread_cond_destroy apr_thread_cond_destroy +#define switch_pool_clear apr_pool_clear +#define switch_strmatch_precompile apr_strmatch_precompile +#define switch_strmatch apr_strmatch #define switch_uuid_format apr_uuid_format #define switch_uuid_get apr_uuid_get #define switch_uuid_parse apr_uuid_parse diff --git a/src/mod/mod_event_test/mod_event_test.c b/src/mod/mod_event_test/mod_event_test.c index 648f31660b..1769777ef1 100644 --- a/src/mod/mod_event_test/mod_event_test.c +++ b/src/mod/mod_event_test/mod_event_test.c @@ -60,6 +60,50 @@ static switch_loadable_module_interface event_test_module_interface = { #define MY_EVENT_COOL "test::cool" + +//#define TORTURE_ME + +#ifdef TORTURE_ME +#define TTHREADS 500 +static int THREADS = 0; + +static void *torture_thread(switch_thread *thread, void *obj) +{ + int y = 0; + int z = 0; + switch_core_thread_session *ts = obj; + switch_event *event; + + z = THREADS++; + + while(THREADS > 0) { + int x; + for(x = 0; x < 1; x++) { + if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, MY_EVENT_COOL) == SWITCH_STATUS_SUCCESS) { + switch_event_add_header(event, "event_info", "hello world %d %d", z, y++); + switch_event_fire(&event); + } + } + switch_yield(100000); + } + + if (ts->pool) { + switch_memory_pool *pool = ts->pool; + switch_core_destroy_memory_pool(&pool); + } + + switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Thread Ended\n"); +} + +SWITCH_MOD_DECLARE(switch_status) switch_module_shutdown(void) +{ + THREADS = -1; + switch_yield(100000); + return SWITCH_STATUS_SUCCESS; +} +#endif + + SWITCH_MOD_DECLARE(switch_status) switch_module_load(switch_loadable_module_interface **interface, char *filename) { /* connect my internal structure to the blank pointer passed to me */ *interface = &event_test_module_interface; @@ -69,32 +113,21 @@ SWITCH_MOD_DECLARE(switch_status) switch_module_load(switch_loadable_module_inte return SWITCH_STATUS_GENERR; } +#ifdef TORTURE_ME if (switch_event_bind((char *)modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL) != SWITCH_STATUS_SUCCESS) { switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Couldn't bind!\n"); return SWITCH_STATUS_GENERR; } + if (1) { + int x = 0; + for(x = 0 ; x < TTHREADS ; x++) { + switch_core_launch_thread(torture_thread, NULL); + } + } +#endif + /* indicate that the module should continue to be loaded */ return SWITCH_STATUS_SUCCESS; } -//#define TORTURE_ME - -#ifdef TORTURE_ME -SWITCH_MOD_DECLARE(switch_status) switch_module_runtime(void) -{ - int y = 0; - switch_event *event; - - for(;;) { - int x; - for(x = 0; x < 100; x++) { - if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, MY_EVENT_COOL) == SWITCH_STATUS_SUCCESS) { - switch_event_add_header(event, "event_info", "hello world %d", y++); - switch_event_fire(&event); - } - } - switch_yield(100000); - } -} -#endif diff --git a/src/switch_core.c b/src/switch_core.c index 1c312ec9b4..fc8e82c38a 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -1408,19 +1408,33 @@ SWITCH_DECLARE(void *) switch_core_hash_find(switch_hash *hash, char *key) */ -SWITCH_DECLARE(void) switch_core_launch_module_thread(switch_thread_start_t func, void *obj) +SWITCH_DECLARE(void) switch_core_launch_thread(switch_thread_start_t func, void *obj) { switch_thread *thread; switch_threadattr_t *thd_attr;; switch_threadattr_create(&thd_attr, runtime.memory_pool); switch_threadattr_detach_set(thd_attr, 1); + switch_core_thread_session *ts; + switch_memory_pool *pool = NULL; - switch_thread_create(&thread, - thd_attr, - func, - obj, - runtime.memory_pool - ); + if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { + switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Could not allocate memory pool\n"); + return; + } + + if (!(ts = switch_core_alloc(pool, sizeof(*ts)))) { + switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Could not allocate memory\n"); + } else { + ts->pool = pool; + ts->objs[0] = obj; + + switch_thread_create(&thread, + thd_attr, + func, + ts, + ts->pool + ); + } } diff --git a/src/switch_event.c b/src/switch_event.c index 1fb5c068a5..8ffe30dfcd 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -31,16 +31,58 @@ */ #include -static switch_event *EVENT_QUEUE_HEAD; -static switch_event *EVENT_QUEUE_WORK; -static switch_thread_cond_t *COND; static switch_event_node *EVENT_NODES[SWITCH_EVENT_ALL+1] = {NULL}; static switch_mutex_t *BLOCK = NULL; -static switch_mutex_t *QLOCK = NULL; -static switch_memory_pool *EPOOL = NULL; +static switch_mutex_t *POOL_LOCK = NULL; +static switch_memory_pool *RUNTIME_POOL = NULL; +static switch_memory_pool *APOOL = NULL; +static switch_memory_pool *BPOOL = NULL; +static switch_memory_pool *THRUNTIME_POOL = NULL; +static switch_queue_t *EVENT_QUEUE = NULL; +//#define MALLOC_EVENTS + +#ifdef MALLOC_EVENTS +static int POOL_COUNT = 0; +#endif + +static int POOL_COUNT_MAX = 100; + static switch_hash *CUSTOM_HASH = NULL; static int THREAD_RUNNING = 0; +#ifdef MALLOC_EVENTS +#define ALLOC(size) malloc(size) +#define DUP(str) strdup(str) +#else +static void *locked_alloc(size_t len) +{ + void *mem; + + switch_mutex_lock(POOL_LOCK); + /* -----------------------------------------------*/ + mem = switch_core_alloc(THRUNTIME_POOL, len); + switch_mutex_unlock(POOL_LOCK); + /* ----------------------------------------------*/ + + return mem; +} + +static void *locked_dup(char *str) +{ + char *dup; + + switch_mutex_lock(POOL_LOCK); + /* -----------------------------------------------*/ + dup = switch_core_strdup(THRUNTIME_POOL, str); + switch_mutex_unlock(POOL_LOCK); + /* ----------------------------------------------*/ + + return dup; +} +#define ALLOC(size) locked_alloc(size) +#define DUP(str) locked_dup(str) +#endif + /* make sure this is synced with the switch_event_t enum in switch_types.h also never put any new ones before EVENT_ALL */ @@ -101,31 +143,34 @@ static int switch_events_match(switch_event *event, switch_event_node *node) static void * SWITCH_THREAD_FUNC switch_event_thread(switch_thread *thread, void *obj) { switch_event_node *node; - switch_event *event = NULL, *out_event = NULL; + switch_event *out_event = NULL; switch_event_t e; - switch_mutex_t *mutex = NULL; - - switch_mutex_init(&mutex, SWITCH_MUTEX_NESTED, EPOOL); - switch_thread_cond_create(&COND, EPOOL); - switch_mutex_lock(mutex); - - assert(QLOCK != NULL); - assert(EPOOL != NULL); + void *pop; + assert(POOL_LOCK != NULL); + assert(RUNTIME_POOL != NULL); THREAD_RUNNING = 1; - while(THREAD_RUNNING == 1) { - switch_thread_cond_wait(COND, mutex); - switch_mutex_lock(QLOCK); - /* -----------------------------------------------*/ - EVENT_QUEUE_WORK = EVENT_QUEUE_HEAD; - EVENT_QUEUE_HEAD = NULL; - switch_mutex_unlock(QLOCK); - /* -----------------------------------------------*/ + while(THREAD_RUNNING == 1 || switch_queue_size(EVENT_QUEUE)) { + +#ifdef MALLOC_EVENTS + switch_mutex_lock(POOL_LOCK); + /* -----------------------------------------------*/ + if (POOL_COUNT >= POOL_COUNT_MAX) { + if (THRUNTIME_POOL == APOOL) { + THRUNTIME_POOL = BPOOL; + } else { + THRUNTIME_POOL = APOOL; + } + switch_pool_clear(THRUNTIME_POOL); + POOL_COUNT = 0; + } + switch_mutex_unlock(POOL_LOCK); + /* -----------------------------------------------*/ +#endif + + while (switch_queue_trypop(EVENT_QUEUE, &pop) == SWITCH_STATUS_SUCCESS) { + out_event = pop; - for(event = EVENT_QUEUE_WORK; event;) { - out_event = event; - event = event->next; - out_event->next = NULL; for(e = out_event->event_id;; e = SWITCH_EVENT_ALL) { for(node = EVENT_NODES[e]; node; node = node->next) { if (switch_events_match(out_event, node)) { @@ -133,7 +178,7 @@ static void * SWITCH_THREAD_FUNC switch_event_thread(switch_thread *thread, void node->callback(out_event); } } - + if (e == SWITCH_EVENT_ALL) { break; } @@ -142,7 +187,7 @@ static void * SWITCH_THREAD_FUNC switch_event_thread(switch_thread *thread, void switch_event_destroy(&out_event); } - + switch_yield(1000); } THREAD_RUNNING = 0; return NULL; @@ -156,7 +201,7 @@ SWITCH_DECLARE(switch_status) switch_event_running(void) SWITCH_DECLARE(char *) switch_event_name(switch_event_t event) { assert(BLOCK != NULL); - assert(EPOOL != NULL); + assert(RUNTIME_POOL != NULL); return EVENT_NAMES[event]; } @@ -166,19 +211,19 @@ SWITCH_DECLARE(switch_status) switch_event_reserve_subclass_detailed(char *owner switch_event_subclass *subclass; - assert(EPOOL != NULL); + assert(RUNTIME_POOL != NULL); assert(CUSTOM_HASH != NULL); if (switch_core_hash_find(CUSTOM_HASH, subclass_name)) { return SWITCH_STATUS_INUSE; } - if (!(subclass = switch_core_alloc(EPOOL, sizeof(*subclass)))) { + if (!(subclass = switch_core_alloc(RUNTIME_POOL, sizeof(*subclass)))) { return SWITCH_STATUS_MEMERR; } - subclass->owner = switch_core_strdup(EPOOL, owner); - subclass->name = switch_core_strdup(EPOOL, subclass_name); + subclass->owner = switch_core_strdup(RUNTIME_POOL, owner); + subclass->name = switch_core_strdup(RUNTIME_POOL, subclass_name); switch_core_hash_insert(CUSTOM_HASH, subclass->name, subclass); @@ -189,12 +234,14 @@ SWITCH_DECLARE(switch_status) switch_event_reserve_subclass_detailed(char *owner SWITCH_DECLARE(switch_status) switch_event_shutdown(void) { switch_event *event; - THREAD_RUNNING = -1; if (switch_event_create(&event, SWITCH_EVENT_EVENT_SHUTDOWN) == SWITCH_STATUS_SUCCESS) { switch_event_add_header(event, "event_info", "Event System Shutting Down"); switch_event_fire(&event); } + + THREAD_RUNNING = -1; + while(THREAD_RUNNING) { switch_yield(1000); } @@ -209,16 +256,30 @@ SWITCH_DECLARE(switch_status) switch_event_init(switch_memory_pool *pool) switch_threadattr_detach_set(thd_attr, 1); assert(pool != NULL); - EPOOL = pool; + RUNTIME_POOL = pool; + + + if (switch_core_new_memory_pool(&APOOL) != SWITCH_STATUS_SUCCESS) { + switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Could not allocate memory pool\n"); + return SWITCH_STATUS_MEMERR; + } + if (switch_core_new_memory_pool(&BPOOL) != SWITCH_STATUS_SUCCESS) { + switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Could not allocate memory pool\n"); + return SWITCH_STATUS_MEMERR; + } + + THRUNTIME_POOL = APOOL; + switch_queue_create(&EVENT_QUEUE, POOL_COUNT_MAX + 10, THRUNTIME_POOL); + switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Activate Eventing Engine.\n"); - switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, EPOOL); - switch_mutex_init(&QLOCK, SWITCH_MUTEX_NESTED, EPOOL); - switch_core_hash_init(&CUSTOM_HASH, EPOOL); + switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); + switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); + switch_core_hash_init(&CUSTOM_HASH, RUNTIME_POOL); switch_thread_create(&thread, thd_attr, switch_event_thread, NULL, - EPOOL + RUNTIME_POOL ); while(!THREAD_RUNNING) { @@ -235,11 +296,13 @@ SWITCH_DECLARE(switch_status) switch_event_create_subclass(switch_event **event, return SWITCH_STATUS_GENERR; } - if(!(*event = malloc(sizeof(switch_event)))) { + if(!(*event = ALLOC(sizeof(switch_event)))) { return SWITCH_STATUS_MEMERR; } +#ifdef MALLOC_EVENTS memset(*event, 0, sizeof(switch_event)); +#endif (*event)->event_id = event_id; @@ -265,31 +328,28 @@ SWITCH_DECLARE(char *) switch_event_get_header(switch_event *event, char *header SWITCH_DECLARE(switch_status) switch_event_add_header(switch_event *event, char *header_name, char *fmt, ...) { - char *data; int ret = 0; - va_list ap; + char data[2048]; + va_list ap; va_start(ap, fmt); - -#ifdef HAVE_VASPRINTF - ret = vasprintf(&data, fmt, ap); -#else - data = (char *) malloc(2048); - vsnprintf(data, 2048, fmt, ap); -#endif + vsnprintf(data, sizeof(data), fmt, ap); va_end(ap); + if (ret == -1) { return SWITCH_STATUS_MEMERR; } else { switch_event_header *header, *hp; - - if (!(header = malloc(sizeof(*header)))) { + if (!(header = ALLOC(sizeof(*header)))) { return SWITCH_STATUS_MEMERR; } +#ifdef MALLOC_EVENTS memset(header, 0, sizeof(*header)); - header->name = strdup(header_name); - header->value = data; +#endif + + header->name = DUP(header_name); + header->value = DUP(data); for (hp = event->headers; hp && hp->next; hp = hp->next); @@ -309,6 +369,7 @@ SWITCH_DECLARE(switch_status) switch_event_add_header(switch_event *event, char SWITCH_DECLARE(void) switch_event_destroy(switch_event **event) { +#ifdef MALLOC_EVENTS switch_event_header *hp, *tofree; for (hp = (*event)->headers; hp && hp->next;) { @@ -320,6 +381,7 @@ SWITCH_DECLARE(void) switch_event_destroy(switch_event **event) } free((*event)); +#endif *event = NULL; } @@ -336,12 +398,14 @@ SWITCH_DECLARE(switch_status) switch_event_dup(switch_event **event, switch_even (*event)->bind_user_data = todup->bind_user_data; for (hp = todup->headers; hp && hp->next;) { - if (!(header = malloc(sizeof(*header)))) { + if (!(header = ALLOC(sizeof(*header)))) { return SWITCH_STATUS_MEMERR; } +#ifdef MALLOC_EVENTS memset(header, 0, sizeof(*header)); - header->name = strdup(hp->name); - header->value = strdup(hp->value); +#endif + header->name = DUP(hp->name); + header->value = DUP(hp->value); for (hp2 = todup->headers; hp2 && hp2->next; hp2 = hp2->next); @@ -401,13 +465,18 @@ SWITCH_DECLARE(switch_status) switch_event_serialize(switch_event *event, char * SWITCH_DECLARE(switch_status) switch_event_fire_detailed(char *file, char *func, int line, switch_event **event, void *user_data) { - switch_event *ep; switch_time_exp_t tm; char date[80] = ""; size_t retsize; assert(BLOCK != NULL); - assert(EPOOL != NULL); + assert(RUNTIME_POOL != NULL); + + if (THREAD_RUNNING <= 0) { + /* sorry we're closed */ + switch_event_destroy(event); + return SWITCH_STATUS_FALSE; + } switch_time_exp_lt(&tm, switch_time_now()); switch_strftime(date, &retsize, sizeof(date), "%Y-%m-%d", &tm); @@ -424,21 +493,13 @@ SWITCH_DECLARE(switch_status) switch_event_fire_detailed(char *file, char *func, (*event)->event_user_data = user_data; } - switch_mutex_lock(QLOCK); - /* -----------------------------------------------*/ - for(ep = EVENT_QUEUE_HEAD; ep && ep->next; ep = ep->next); - - if (ep) { - ep->next = *event; - } else { - EVENT_QUEUE_HEAD = *event; - } - switch_mutex_unlock(QLOCK); - /* -----------------------------------------------*/ - + switch_queue_push(EVENT_QUEUE, *event); *event = NULL; - switch_thread_cond_signal(COND); +#ifdef MALLOC_EVENTS + POOL_COUNT++; +#endif + return SWITCH_STATUS_SUCCESS; } @@ -448,23 +509,23 @@ SWITCH_DECLARE(switch_status) switch_event_bind(char *id, switch_event_t event, switch_event_subclass *subclass = NULL; assert(BLOCK != NULL); - assert(EPOOL != NULL); + assert(RUNTIME_POOL != NULL); if (subclass_name) { if (!(subclass = switch_core_hash_find(CUSTOM_HASH, subclass_name))) { - if (!(subclass = switch_core_alloc(EPOOL, sizeof(*subclass)))) { + if (!(subclass = switch_core_alloc(RUNTIME_POOL, sizeof(*subclass)))) { return SWITCH_STATUS_MEMERR; } else { - subclass->owner = switch_core_strdup(EPOOL, id); - subclass->name = switch_core_strdup(EPOOL, subclass_name); + subclass->owner = switch_core_strdup(RUNTIME_POOL, id); + subclass->name = switch_core_strdup(RUNTIME_POOL, subclass_name); } } } - if (event <= SWITCH_EVENT_ALL && (event_node = switch_core_alloc(EPOOL, sizeof(switch_event_node)))) { + if (event <= SWITCH_EVENT_ALL && (event_node = switch_core_alloc(RUNTIME_POOL, sizeof(switch_event_node)))) { switch_mutex_lock(BLOCK); /* -----------------------------------------------*/ - event_node->id = switch_core_strdup(EPOOL, id); + event_node->id = switch_core_strdup(RUNTIME_POOL, id); event_node->event_id = event; event_node->subclass = subclass; event_node->callback = callback; diff --git a/src/switch_loadable_module.c b/src/switch_loadable_module.c index 4569cb73a9..b9cda82f7f 100644 --- a/src/switch_loadable_module.c +++ b/src/switch_loadable_module.c @@ -70,7 +70,8 @@ static struct switch_loadable_module_container loadable_modules; static void *switch_loadable_module_exec(switch_thread *thread, void *obj) { switch_status status = SWITCH_STATUS_SUCCESS; - switch_loadable_module *module = obj; + switch_core_thread_session *ts = obj; + switch_loadable_module *module = ts->objs[0]; int restarts; assert(module != NULL); @@ -79,6 +80,12 @@ static void *switch_loadable_module_exec(switch_thread *thread, void *obj) status = module->switch_module_runtime(); } switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Thread ended for %s\n", module->interface->module_name); + + if (ts->pool) { + switch_memory_pool *pool = ts->pool; + switch_core_destroy_memory_pool(&pool); + } + return NULL; } @@ -165,7 +172,7 @@ static switch_status switch_loadable_module_load_file(char *filename, switch_mem module->lib = dso; if (module->switch_module_runtime) { - switch_core_launch_module_thread(switch_loadable_module_exec, module); + switch_core_launch_thread(switch_loadable_module_exec, module); } *new_module = module;