From b2bf73de6e539e9871c2c8f69a90ccf0ae7ab183 Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Fri, 14 Nov 2008 17:55:20 +0000 Subject: [PATCH] Ton of stuff, mainly laying groundwork for bind_search functionality git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@10403 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- .../mod_erlang_event/mod_erlang_event.c | 332 ++++++++++++++---- 1 file changed, 271 insertions(+), 61 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 51c2eaa13e..a4ad11c9b9 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -53,14 +53,6 @@ typedef enum { LFLAG_STATEFUL = (1 << 8) } event_flag_t; - -/* TODO - support multiple event handlers per erlang connection each with their own event filters? */ -struct event_handler { - erlang_pid pid; - switch_hash_t *event_hash; - struct event_handler *next; -}; - struct listener { int sockfd; struct ei_cnode_s *ec; @@ -70,6 +62,7 @@ struct listener { switch_queue_t *log_queue; switch_memory_pool_t *pool; switch_mutex_t *flag_mutex; + switch_mutex_t *sock_mutex; char *ebuf; uint32_t flags; switch_log_level_t level; @@ -98,6 +91,19 @@ static struct { #define MAX_ACL 100 +struct erlang_binding { + switch_xml_section_t section; + erlang_pid pid; + char *registered_process; /* TODO */ + listener_t *listener; + struct erlang_binding *next; +}; + +static struct { + struct erlang_binding *head; + switch_xml_binding_t *search_binding; +} bindings; + static struct { switch_mutex_t *mutex; char *ip; @@ -163,6 +169,49 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l } +/* Stolen from code added to ei in R12B-5. + * Since not everyone has this verison yet; + * provide our own version. + * */ + +#define put8(s,n) do { \ + (s)[0] = (char)((n) & 0xff); \ + (s) += 1; \ +} while (0) + +#define put32be(s,n) do { \ + (s)[0] = ((n) >> 24) & 0xff; \ + (s)[1] = ((n) >> 16) & 0xff; \ + (s)[2] = ((n) >> 8) & 0xff; \ + (s)[3] = (n) & 0xff; \ + (s) += 4; \ +} while (0) + +static void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to) { + char msgbuf[2048]; + char *s; + int index = 0; + /*int n;*/ + + index = 5; /* max sizes: */ + ei_encode_version(msgbuf,&index); /* 1 */ + ei_encode_tuple_header(msgbuf,&index,3); + ei_encode_long(msgbuf,&index,ERL_LINK); + ei_encode_pid(msgbuf,&index,from); /* 268 */ + ei_encode_pid(msgbuf,&index,to); /* 268 */ + + /* 5 byte header missing */ + s = msgbuf; + put32be(s, index - 4); /* 4 */ + put8(s, ERL_PASS_THROUGH); /* 1 */ + /* sum: 542 */ + + switch_mutex_lock(listener->sock_mutex); + write(listener->sockfd, msgbuf, index); + switch_mutex_unlock(listener->sock_mutex); +} + + static void expire_listener(listener_t **listener) { void *pop; @@ -180,6 +229,38 @@ static void expire_listener(listener_t **listener) } +static void remove_binding(listener_t *listener) { + struct erlang_binding *ptr, *lst = NULL; + + switch_mutex_lock(globals.listener_mutex); + + switch_xml_set_binding_sections(bindings.search_binding, (1 << sizeof(switch_xml_section_enum_t))); + + for (ptr = bindings.head; ptr; lst = ptr, ptr = ptr->next) { + if (ptr->listener == listener) { + if (bindings.head == ptr) { + if (ptr->next) { + bindings.head = ptr->next; + } else { + bindings.head = NULL; + break; + } + } else { + if (ptr->next) { + lst->next = ptr->next; + } else { + lst->next = NULL; + } + } + } else { + switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | ptr->section); + } + } + + switch_mutex_unlock(globals.listener_mutex); +} + + static void ei_encode_switch_event(ei_x_buff *ebuf, switch_event_t *event) { int i; @@ -295,40 +376,6 @@ static void close_socket(int *sock) } -SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown) -{ - listener_t *l; - int sanity = 0; - - prefs.done = 1; - - switch_log_unbind_logger(socket_logger); - - /*close_socket(&listen_list.sockfd);*/ - - while (prefs.threads || prefs.done == 1) { - switch_yield(10000); - if (++sanity == 1000) { - break; - } - } - - switch_event_unbind(&globals.node); - - switch_mutex_lock(globals.listener_mutex); - - for (l = listen_list.listeners; l; l = l->next) { - close_socket(&l->sockfd); - } - - switch_mutex_unlock(globals.listener_mutex); - - switch_sleep(1500000); /* sleep for 1.5 seconds */ - - return SWITCH_STATUS_SUCCESS; -} - - static void add_listener(listener_t *listener) { /* add me to the listeners so I get events */ @@ -357,25 +404,6 @@ static void remove_listener(listener_t *listener) switch_mutex_unlock(globals.listener_mutex); } -SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) -{ - switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool); - - if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n"); - close_socket(&listen_list.sockfd); - return SWITCH_STATUS_GENERR; - } - - switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE); - - /* connect my internal structure to the blank pointer passed to me */ - *module_interface = switch_loadable_module_create_module_interface(pool, modname); - - /* indicate that the module should continue to be loaded */ - return SWITCH_STATUS_SUCCESS; -} - struct api_command_struct { char *api_cmd; @@ -456,7 +484,9 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) ei_x_encode_string(&ebuf, acs->uuid_str); ei_x_encode_string(&ebuf, reply); + switch_mutex_lock(acs->listener->sock_mutex); ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index); + switch_mutex_unlock(acs->listener->sock_mutex); ei_x_free(&ebuf); } @@ -478,7 +508,11 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) ei_x_encode_string(&rbuf, reply); + + switch_mutex_lock(acs->listener->sock_mutex); ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index); + switch_mutex_unlock(acs->listener->sock_mutex); + ei_x_free(&rbuf); } @@ -500,6 +534,38 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) } +static switch_xml_t erlang_fetch (const char *sectionstr, const char *tag_name, const char *key_name, const char *key_value, + switch_event_t *params, void *user_data) +{ + switch_xml_t xml = NULL; + struct erlang_binding *ptr; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "looking for bindings\n"); + + switch_xml_section_t section = switch_xml_parse_section_string((char *) sectionstr); + + for (ptr = bindings.head; ptr && ptr->section != section; ptr = ptr->next); /* just get the first match */ + + if (!ptr) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "no binding for %s\n", sectionstr); + return NULL; + } + + if (!ptr->listener) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "NULL pointer binding!\n"); + return NULL; /* our pointer is trash */ + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->pid.node); + + + switch_mutex_lock(ptr->listener->sock_mutex); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "It's a lock!\n"); + switch_mutex_unlock(ptr->listener->sock_mutex); + + return xml; +} + static int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) { @@ -829,6 +895,57 @@ sendmsg_fail: break; } + } else if (!strncmp(tupletag, "bind", MAXATOMLEN)) { + + /* format is (result|config|directory|dialplan|phrases) */ + char sectionstr[MAXATOMLEN]; + + if (ei_decode_atom(buf->buff, &buf->index, sectionstr)) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + break; + } + + switch_xml_section_t section; + + if (!(section = switch_xml_parse_section_string(sectionstr))) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + break; + } + + struct erlang_binding *binding, *ptr; + + if (!(binding = switch_core_alloc(listener->pool, sizeof(*binding)))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n"); + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badmem"); + break; + } + + binding->section = section; + binding->pid = msg->from; + binding->listener = listener; + + switch_mutex_lock(globals.listener_mutex); + + for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next); + + if (ptr) { + ptr->next = binding; + } else { + bindings.head = binding; + } + + switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section); + switch_mutex_unlock(globals.listener_mutex); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding)); + + ei_link(listener, ei_self(listener->ec), &msg->from); } else { ei_x_encode_tuple_header(rbuf, 2); @@ -852,10 +969,12 @@ sendmsg_fail: switch_clear_flag_locked(listener, LFLAG_LOG); } } else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) { + ei_link(listener, ei_self(listener->ec), &msg->from); listener->log_pid = msg->from; listener->level = SWITCH_LOG_DEBUG; switch_set_flag(listener, LFLAG_LOG); } else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) { + ei_link(listener, ei_self(listener->ec), &msg->from); listener->event_pid = msg->from; if (!switch_test_flag(listener, LFLAG_EVENTS)) { switch_set_flag_locked(listener, LFLAG_EVENTS); @@ -888,6 +1007,10 @@ sendmsg_fail: ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_atom(rbuf, "ok"); ei_x_encode_pid(rbuf, ei_self(listener->ec)); + } else if (!strncmp(atom, "link", MAXATOMLEN)) { + /* debugging */ + ei_link(listener, ei_self(listener->ec), &msg->from); + goto noreply; } else { ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_atom(rbuf, "error"); @@ -905,12 +1028,17 @@ sendmsg_fail: break; } + + switch_mutex_lock(listener->sock_mutex); ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index); + switch_mutex_unlock(listener->sock_mutex); noreply: return 0; event_done: + switch_mutex_lock(listener->sock_mutex); ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index); + switch_mutex_unlock(listener->sock_mutex); return 1; } @@ -985,7 +1113,9 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) ei_x_buff rbuf; ei_x_new_with_version(&rbuf); + switch_mutex_lock(listener->sock_mutex); status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 100); + switch_mutex_unlock(listener->sock_mutex); switch(status) { case ERL_TICK : @@ -1043,7 +1173,11 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) ei_x_encode_tuple_header(&lbuf, 2); ei_x_encode_atom(&lbuf, "log"); ei_x_encode_string(&lbuf, data); + + switch_mutex_lock(listener->sock_mutex); ei_send(listener->sockfd, &listener->log_pid, lbuf.buff, lbuf.index); + switch_mutex_unlock(listener->sock_mutex); + ei_x_free(&lbuf); } } @@ -1060,7 +1194,9 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) ei_encode_switch_event(&ebuf, pevent); + switch_mutex_lock(listener->sock_mutex); ei_send(listener->sockfd, &listener->event_pid, ebuf.buff, ebuf.index); + switch_mutex_unlock(listener->sock_mutex); ei_x_free(&ebuf); switch_event_destroy(&pevent); @@ -1084,6 +1220,9 @@ done: switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Closed\n"); switch_core_hash_destroy(&listener->event_hash); + /* remove any bindings for this connection */ + remove_binding(listener); + if (listener->session) { switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED); switch_clear_flag_locked(listener, LFLAG_SESSION); @@ -1093,6 +1232,7 @@ done: switch_core_destroy_memory_pool(&pool); } + switch_mutex_lock(globals.listener_mutex); prefs.threads--; switch_mutex_unlock(globals.listener_mutex); @@ -1111,6 +1251,7 @@ static void launch_listener_thread(listener_t *listener) switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_thread_create(&thread, thd_attr, listener_run, listener, listener->pool); + } @@ -1173,6 +1314,39 @@ static int config(void) } +/* Module Hooks */ + + +SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) +{ + switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool); + + if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n"); + close_socket(&listen_list.sockfd); + return SWITCH_STATUS_GENERR; + } + + switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE); + + memset(&bindings, 0, sizeof(bindings)); + + if (switch_xml_bind_search_function_ret(erlang_fetch, (1 << sizeof(switch_xml_section_enum_t)), NULL, &bindings.search_binding) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n"); + close_socket(&listen_list.sockfd); + return SWITCH_STATUS_GENERR; + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding)); + + /* connect my internal structure to the blank pointer passed to me */ + *module_interface = switch_loadable_module_create_module_interface(pool, modname); + + /* indicate that the module should continue to be loaded */ + return SWITCH_STATUS_SUCCESS; +} + + SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) { switch_memory_pool_t *pool = NULL, *listener_pool = NULL; @@ -1328,6 +1502,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) listener_pool = NULL; listener->level = SWITCH_LOG_DEBUG; switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool); + switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool); switch_core_hash_init(&listener->event_hash, listener->pool); launch_listener_thread(listener); @@ -1358,6 +1533,41 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) return SWITCH_STATUS_TERM; } +SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown) +{ + listener_t *l; + int sanity = 0; + + prefs.done = 1; + + switch_log_unbind_logger(socket_logger); + + /*close_socket(&listen_list.sockfd);*/ + + while (prefs.threads || prefs.done == 1) { + switch_yield(10000); + if (++sanity == 1000) { + break; + } + } + + switch_event_unbind(&globals.node); + switch_xml_unbind_search_function_ptr(erlang_fetch); + + switch_mutex_lock(globals.listener_mutex); + + for (l = listen_list.listeners; l; l = l->next) { + close_socket(&l->sockfd); + } + + switch_mutex_unlock(globals.listener_mutex); + + switch_sleep(1500000); /* sleep for 1.5 seconds */ + + return SWITCH_STATUS_SUCCESS; +} + + /* For Emacs: * Local Variables: * mode:c