use more of apr-utils in the event stuff

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@191 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2005-12-22 18:53:33 +00:00
parent e60fbc2267
commit 3e0234219b
7 changed files with 231 additions and 107 deletions

View File

@ -61,6 +61,10 @@ extern "C" {
#include <apr_poll.h>
#include <apr_queue.h>
#include <apr_uuid.h>
#include <apr_strmatch.h>
#define APR_WANT_STDIO
#define APR_WANT_STRFUNC
#include <apr_want.h>
#include <assert.h>
#include <sqlite3.h>

View File

@ -42,6 +42,7 @@ extern "C" {
struct switch_core_thread_session {
int running;
void *objs[MAX_CORE_THREAD_SESSION_OBJS];
switch_memory_pool *pool;
};
struct switch_core_session;
@ -73,7 +74,7 @@ SWITCH_DECLARE(switch_status) switch_core_hash_insert(switch_hash *hash, char *k
SWITCH_DECLARE(switch_status) switch_core_hash_insert_dup(switch_hash *hash, char *key, void *data);
SWITCH_DECLARE(switch_status) switch_core_hash_delete(switch_hash *hash, char *key);
SWITCH_DECLARE(void *) switch_core_hash_find(switch_hash *hash, char *key);
SWITCH_DECLARE(void) switch_core_launch_module_thread(void *(*func)(switch_thread *, void*), void *obj);
SWITCH_DECLARE(void) switch_core_launch_thread(void *(*func)(switch_thread *, void*), void *obj);
SWITCH_DECLARE(FILE *) switch_core_data_channel(switch_text_channel channel);
SWITCH_DECLARE(void) switch_core_session_launch_thread(switch_core_session *session, void *(*func)(switch_thread *, void *), void *obj);
SWITCH_DECLARE(switch_status) switch_core_timer_init(switch_timer *timer, char *timer_name, int interval, int samples);

View File

@ -193,6 +193,7 @@ typedef switch_status (*switch_api_function)(char *in, char *out, size_t outlen)
The pieces of apr we allow ppl to pass around between modules we typedef into our namespace and wrap all the functions
any other apr code should be as hidden as possible.
*/
typedef apr_strmatch_pattern switch_strmatch_pattern;
typedef apr_uuid_t switch_uuid_t;
typedef apr_queue_t switch_queue_t;
typedef apr_hash_t switch_hash;
@ -228,6 +229,9 @@ typedef apr_hash_index_t switch_hash_index_t;
#define switch_thread_cond_broadcast apr_thread_cond_broadcast
#define switch_thread_cond_destroy apr_thread_cond_destroy
#define switch_pool_clear apr_pool_clear
#define switch_strmatch_precompile apr_strmatch_precompile
#define switch_strmatch apr_strmatch
#define switch_uuid_format apr_uuid_format
#define switch_uuid_get apr_uuid_get
#define switch_uuid_parse apr_uuid_parse

View File

@ -60,6 +60,50 @@ static switch_loadable_module_interface event_test_module_interface = {
#define MY_EVENT_COOL "test::cool"
//#define TORTURE_ME
#ifdef TORTURE_ME
#define TTHREADS 500
static int THREADS = 0;
static void *torture_thread(switch_thread *thread, void *obj)
{
int y = 0;
int z = 0;
switch_core_thread_session *ts = obj;
switch_event *event;
z = THREADS++;
while(THREADS > 0) {
int x;
for(x = 0; x < 1; x++) {
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, MY_EVENT_COOL) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, "event_info", "hello world %d %d", z, y++);
switch_event_fire(&event);
}
}
switch_yield(100000);
}
if (ts->pool) {
switch_memory_pool *pool = ts->pool;
switch_core_destroy_memory_pool(&pool);
}
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Thread Ended\n");
}
SWITCH_MOD_DECLARE(switch_status) switch_module_shutdown(void)
{
THREADS = -1;
switch_yield(100000);
return SWITCH_STATUS_SUCCESS;
}
#endif
SWITCH_MOD_DECLARE(switch_status) switch_module_load(switch_loadable_module_interface **interface, char *filename) {
/* connect my internal structure to the blank pointer passed to me */
*interface = &event_test_module_interface;
@ -69,32 +113,21 @@ SWITCH_MOD_DECLARE(switch_status) switch_module_load(switch_loadable_module_inte
return SWITCH_STATUS_GENERR;
}
#ifdef TORTURE_ME
if (switch_event_bind((char *)modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL) != SWITCH_STATUS_SUCCESS) {
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Couldn't bind!\n");
return SWITCH_STATUS_GENERR;
}
if (1) {
int x = 0;
for(x = 0 ; x < TTHREADS ; x++) {
switch_core_launch_thread(torture_thread, NULL);
}
}
#endif
/* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS;
}
//#define TORTURE_ME
#ifdef TORTURE_ME
SWITCH_MOD_DECLARE(switch_status) switch_module_runtime(void)
{
int y = 0;
switch_event *event;
for(;;) {
int x;
for(x = 0; x < 100; x++) {
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, MY_EVENT_COOL) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, "event_info", "hello world %d", y++);
switch_event_fire(&event);
}
}
switch_yield(100000);
}
}
#endif

View File

@ -1408,19 +1408,33 @@ SWITCH_DECLARE(void *) switch_core_hash_find(switch_hash *hash, char *key)
*/
SWITCH_DECLARE(void) switch_core_launch_module_thread(switch_thread_start_t func, void *obj)
SWITCH_DECLARE(void) switch_core_launch_thread(switch_thread_start_t func, void *obj)
{
switch_thread *thread;
switch_threadattr_t *thd_attr;;
switch_threadattr_create(&thd_attr, runtime.memory_pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_core_thread_session *ts;
switch_memory_pool *pool = NULL;
switch_thread_create(&thread,
thd_attr,
func,
obj,
runtime.memory_pool
);
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Could not allocate memory pool\n");
return;
}
if (!(ts = switch_core_alloc(pool, sizeof(*ts)))) {
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Could not allocate memory\n");
} else {
ts->pool = pool;
ts->objs[0] = obj;
switch_thread_create(&thread,
thd_attr,
func,
ts,
ts->pool
);
}
}

View File

@ -31,16 +31,58 @@
*/
#include <switch_event.h>
static switch_event *EVENT_QUEUE_HEAD;
static switch_event *EVENT_QUEUE_WORK;
static switch_thread_cond_t *COND;
static switch_event_node *EVENT_NODES[SWITCH_EVENT_ALL+1] = {NULL};
static switch_mutex_t *BLOCK = NULL;
static switch_mutex_t *QLOCK = NULL;
static switch_memory_pool *EPOOL = NULL;
static switch_mutex_t *POOL_LOCK = NULL;
static switch_memory_pool *RUNTIME_POOL = NULL;
static switch_memory_pool *APOOL = NULL;
static switch_memory_pool *BPOOL = NULL;
static switch_memory_pool *THRUNTIME_POOL = NULL;
static switch_queue_t *EVENT_QUEUE = NULL;
//#define MALLOC_EVENTS
#ifdef MALLOC_EVENTS
static int POOL_COUNT = 0;
#endif
static int POOL_COUNT_MAX = 100;
static switch_hash *CUSTOM_HASH = NULL;
static int THREAD_RUNNING = 0;
#ifdef MALLOC_EVENTS
#define ALLOC(size) malloc(size)
#define DUP(str) strdup(str)
#else
static void *locked_alloc(size_t len)
{
void *mem;
switch_mutex_lock(POOL_LOCK);
/* <LOCKED> -----------------------------------------------*/
mem = switch_core_alloc(THRUNTIME_POOL, len);
switch_mutex_unlock(POOL_LOCK);
/* </LOCKED> ----------------------------------------------*/
return mem;
}
static void *locked_dup(char *str)
{
char *dup;
switch_mutex_lock(POOL_LOCK);
/* <LOCKED> -----------------------------------------------*/
dup = switch_core_strdup(THRUNTIME_POOL, str);
switch_mutex_unlock(POOL_LOCK);
/* </LOCKED> ----------------------------------------------*/
return dup;
}
#define ALLOC(size) locked_alloc(size)
#define DUP(str) locked_dup(str)
#endif
/* make sure this is synced with the switch_event_t enum in switch_types.h
also never put any new ones before EVENT_ALL
*/
@ -101,31 +143,34 @@ static int switch_events_match(switch_event *event, switch_event_node *node)
static void * SWITCH_THREAD_FUNC switch_event_thread(switch_thread *thread, void *obj)
{
switch_event_node *node;
switch_event *event = NULL, *out_event = NULL;
switch_event *out_event = NULL;
switch_event_t e;
switch_mutex_t *mutex = NULL;
switch_mutex_init(&mutex, SWITCH_MUTEX_NESTED, EPOOL);
switch_thread_cond_create(&COND, EPOOL);
switch_mutex_lock(mutex);
assert(QLOCK != NULL);
assert(EPOOL != NULL);
void *pop;
assert(POOL_LOCK != NULL);
assert(RUNTIME_POOL != NULL);
THREAD_RUNNING = 1;
while(THREAD_RUNNING == 1) {
switch_thread_cond_wait(COND, mutex);
switch_mutex_lock(QLOCK);
/* <LOCKED> -----------------------------------------------*/
EVENT_QUEUE_WORK = EVENT_QUEUE_HEAD;
EVENT_QUEUE_HEAD = NULL;
switch_mutex_unlock(QLOCK);
/* </LOCKED> -----------------------------------------------*/
while(THREAD_RUNNING == 1 || switch_queue_size(EVENT_QUEUE)) {
#ifdef MALLOC_EVENTS
switch_mutex_lock(POOL_LOCK);
/* <LOCKED> -----------------------------------------------*/
if (POOL_COUNT >= POOL_COUNT_MAX) {
if (THRUNTIME_POOL == APOOL) {
THRUNTIME_POOL = BPOOL;
} else {
THRUNTIME_POOL = APOOL;
}
switch_pool_clear(THRUNTIME_POOL);
POOL_COUNT = 0;
}
switch_mutex_unlock(POOL_LOCK);
/* </LOCKED> -----------------------------------------------*/
#endif
while (switch_queue_trypop(EVENT_QUEUE, &pop) == SWITCH_STATUS_SUCCESS) {
out_event = pop;
for(event = EVENT_QUEUE_WORK; event;) {
out_event = event;
event = event->next;
out_event->next = NULL;
for(e = out_event->event_id;; e = SWITCH_EVENT_ALL) {
for(node = EVENT_NODES[e]; node; node = node->next) {
if (switch_events_match(out_event, node)) {
@ -133,7 +178,7 @@ static void * SWITCH_THREAD_FUNC switch_event_thread(switch_thread *thread, void
node->callback(out_event);
}
}
if (e == SWITCH_EVENT_ALL) {
break;
}
@ -142,7 +187,7 @@ static void * SWITCH_THREAD_FUNC switch_event_thread(switch_thread *thread, void
switch_event_destroy(&out_event);
}
switch_yield(1000);
}
THREAD_RUNNING = 0;
return NULL;
@ -156,7 +201,7 @@ SWITCH_DECLARE(switch_status) switch_event_running(void)
SWITCH_DECLARE(char *) switch_event_name(switch_event_t event)
{
assert(BLOCK != NULL);
assert(EPOOL != NULL);
assert(RUNTIME_POOL != NULL);
return EVENT_NAMES[event];
}
@ -166,19 +211,19 @@ SWITCH_DECLARE(switch_status) switch_event_reserve_subclass_detailed(char *owner
switch_event_subclass *subclass;
assert(EPOOL != NULL);
assert(RUNTIME_POOL != NULL);
assert(CUSTOM_HASH != NULL);
if (switch_core_hash_find(CUSTOM_HASH, subclass_name)) {
return SWITCH_STATUS_INUSE;
}
if (!(subclass = switch_core_alloc(EPOOL, sizeof(*subclass)))) {
if (!(subclass = switch_core_alloc(RUNTIME_POOL, sizeof(*subclass)))) {
return SWITCH_STATUS_MEMERR;
}
subclass->owner = switch_core_strdup(EPOOL, owner);
subclass->name = switch_core_strdup(EPOOL, subclass_name);
subclass->owner = switch_core_strdup(RUNTIME_POOL, owner);
subclass->name = switch_core_strdup(RUNTIME_POOL, subclass_name);
switch_core_hash_insert(CUSTOM_HASH, subclass->name, subclass);
@ -189,12 +234,14 @@ SWITCH_DECLARE(switch_status) switch_event_reserve_subclass_detailed(char *owner
SWITCH_DECLARE(switch_status) switch_event_shutdown(void)
{
switch_event *event;
THREAD_RUNNING = -1;
if (switch_event_create(&event, SWITCH_EVENT_EVENT_SHUTDOWN) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, "event_info", "Event System Shutting Down");
switch_event_fire(&event);
}
THREAD_RUNNING = -1;
while(THREAD_RUNNING) {
switch_yield(1000);
}
@ -209,16 +256,30 @@ SWITCH_DECLARE(switch_status) switch_event_init(switch_memory_pool *pool)
switch_threadattr_detach_set(thd_attr, 1);
assert(pool != NULL);
EPOOL = pool;
RUNTIME_POOL = pool;
if (switch_core_new_memory_pool(&APOOL) != SWITCH_STATUS_SUCCESS) {
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Could not allocate memory pool\n");
return SWITCH_STATUS_MEMERR;
}
if (switch_core_new_memory_pool(&BPOOL) != SWITCH_STATUS_SUCCESS) {
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Could not allocate memory pool\n");
return SWITCH_STATUS_MEMERR;
}
THRUNTIME_POOL = APOOL;
switch_queue_create(&EVENT_QUEUE, POOL_COUNT_MAX + 10, THRUNTIME_POOL);
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Activate Eventing Engine.\n");
switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, EPOOL);
switch_mutex_init(&QLOCK, SWITCH_MUTEX_NESTED, EPOOL);
switch_core_hash_init(&CUSTOM_HASH, EPOOL);
switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
switch_core_hash_init(&CUSTOM_HASH, RUNTIME_POOL);
switch_thread_create(&thread,
thd_attr,
switch_event_thread,
NULL,
EPOOL
RUNTIME_POOL
);
while(!THREAD_RUNNING) {
@ -235,11 +296,13 @@ SWITCH_DECLARE(switch_status) switch_event_create_subclass(switch_event **event,
return SWITCH_STATUS_GENERR;
}
if(!(*event = malloc(sizeof(switch_event)))) {
if(!(*event = ALLOC(sizeof(switch_event)))) {
return SWITCH_STATUS_MEMERR;
}
#ifdef MALLOC_EVENTS
memset(*event, 0, sizeof(switch_event));
#endif
(*event)->event_id = event_id;
@ -265,31 +328,28 @@ SWITCH_DECLARE(char *) switch_event_get_header(switch_event *event, char *header
SWITCH_DECLARE(switch_status) switch_event_add_header(switch_event *event, char *header_name, char *fmt, ...)
{
char *data;
int ret = 0;
va_list ap;
char data[2048];
va_list ap;
va_start(ap, fmt);
#ifdef HAVE_VASPRINTF
ret = vasprintf(&data, fmt, ap);
#else
data = (char *) malloc(2048);
vsnprintf(data, 2048, fmt, ap);
#endif
vsnprintf(data, sizeof(data), fmt, ap);
va_end(ap);
if (ret == -1) {
return SWITCH_STATUS_MEMERR;
} else {
switch_event_header *header, *hp;
if (!(header = malloc(sizeof(*header)))) {
if (!(header = ALLOC(sizeof(*header)))) {
return SWITCH_STATUS_MEMERR;
}
#ifdef MALLOC_EVENTS
memset(header, 0, sizeof(*header));
header->name = strdup(header_name);
header->value = data;
#endif
header->name = DUP(header_name);
header->value = DUP(data);
for (hp = event->headers; hp && hp->next; hp = hp->next);
@ -309,6 +369,7 @@ SWITCH_DECLARE(switch_status) switch_event_add_header(switch_event *event, char
SWITCH_DECLARE(void) switch_event_destroy(switch_event **event)
{
#ifdef MALLOC_EVENTS
switch_event_header *hp, *tofree;
for (hp = (*event)->headers; hp && hp->next;) {
@ -320,6 +381,7 @@ SWITCH_DECLARE(void) switch_event_destroy(switch_event **event)
}
free((*event));
#endif
*event = NULL;
}
@ -336,12 +398,14 @@ SWITCH_DECLARE(switch_status) switch_event_dup(switch_event **event, switch_even
(*event)->bind_user_data = todup->bind_user_data;
for (hp = todup->headers; hp && hp->next;) {
if (!(header = malloc(sizeof(*header)))) {
if (!(header = ALLOC(sizeof(*header)))) {
return SWITCH_STATUS_MEMERR;
}
#ifdef MALLOC_EVENTS
memset(header, 0, sizeof(*header));
header->name = strdup(hp->name);
header->value = strdup(hp->value);
#endif
header->name = DUP(hp->name);
header->value = DUP(hp->value);
for (hp2 = todup->headers; hp2 && hp2->next; hp2 = hp2->next);
@ -401,13 +465,18 @@ SWITCH_DECLARE(switch_status) switch_event_serialize(switch_event *event, char *
SWITCH_DECLARE(switch_status) switch_event_fire_detailed(char *file, char *func, int line, switch_event **event, void *user_data)
{
switch_event *ep;
switch_time_exp_t tm;
char date[80] = "";
size_t retsize;
assert(BLOCK != NULL);
assert(EPOOL != NULL);
assert(RUNTIME_POOL != NULL);
if (THREAD_RUNNING <= 0) {
/* sorry we're closed */
switch_event_destroy(event);
return SWITCH_STATUS_FALSE;
}
switch_time_exp_lt(&tm, switch_time_now());
switch_strftime(date, &retsize, sizeof(date), "%Y-%m-%d", &tm);
@ -424,21 +493,13 @@ SWITCH_DECLARE(switch_status) switch_event_fire_detailed(char *file, char *func,
(*event)->event_user_data = user_data;
}
switch_mutex_lock(QLOCK);
/* <LOCKED> -----------------------------------------------*/
for(ep = EVENT_QUEUE_HEAD; ep && ep->next; ep = ep->next);
if (ep) {
ep->next = *event;
} else {
EVENT_QUEUE_HEAD = *event;
}
switch_mutex_unlock(QLOCK);
/* </LOCKED> -----------------------------------------------*/
switch_queue_push(EVENT_QUEUE, *event);
*event = NULL;
switch_thread_cond_signal(COND);
#ifdef MALLOC_EVENTS
POOL_COUNT++;
#endif
return SWITCH_STATUS_SUCCESS;
}
@ -448,23 +509,23 @@ SWITCH_DECLARE(switch_status) switch_event_bind(char *id, switch_event_t event,
switch_event_subclass *subclass = NULL;
assert(BLOCK != NULL);
assert(EPOOL != NULL);
assert(RUNTIME_POOL != NULL);
if (subclass_name) {
if (!(subclass = switch_core_hash_find(CUSTOM_HASH, subclass_name))) {
if (!(subclass = switch_core_alloc(EPOOL, sizeof(*subclass)))) {
if (!(subclass = switch_core_alloc(RUNTIME_POOL, sizeof(*subclass)))) {
return SWITCH_STATUS_MEMERR;
} else {
subclass->owner = switch_core_strdup(EPOOL, id);
subclass->name = switch_core_strdup(EPOOL, subclass_name);
subclass->owner = switch_core_strdup(RUNTIME_POOL, id);
subclass->name = switch_core_strdup(RUNTIME_POOL, subclass_name);
}
}
}
if (event <= SWITCH_EVENT_ALL && (event_node = switch_core_alloc(EPOOL, sizeof(switch_event_node)))) {
if (event <= SWITCH_EVENT_ALL && (event_node = switch_core_alloc(RUNTIME_POOL, sizeof(switch_event_node)))) {
switch_mutex_lock(BLOCK);
/* <LOCKED> -----------------------------------------------*/
event_node->id = switch_core_strdup(EPOOL, id);
event_node->id = switch_core_strdup(RUNTIME_POOL, id);
event_node->event_id = event;
event_node->subclass = subclass;
event_node->callback = callback;

View File

@ -70,7 +70,8 @@ static struct switch_loadable_module_container loadable_modules;
static void *switch_loadable_module_exec(switch_thread *thread, void *obj)
{
switch_status status = SWITCH_STATUS_SUCCESS;
switch_loadable_module *module = obj;
switch_core_thread_session *ts = obj;
switch_loadable_module *module = ts->objs[0];
int restarts;
assert(module != NULL);
@ -79,6 +80,12 @@ static void *switch_loadable_module_exec(switch_thread *thread, void *obj)
status = module->switch_module_runtime();
}
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Thread ended for %s\n", module->interface->module_name);
if (ts->pool) {
switch_memory_pool *pool = ts->pool;
switch_core_destroy_memory_pool(&pool);
}
return NULL;
}
@ -165,7 +172,7 @@ static switch_status switch_loadable_module_load_file(char *filename, switch_mem
module->lib = dso;
if (module->switch_module_runtime) {
switch_core_launch_module_thread(switch_loadable_module_exec, module);
switch_core_launch_thread(switch_loadable_module_exec, module);
}
*new_module = module;