From b6c7dd36a0b1228b6096f4bf87887e596d47fe0b Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Thu, 22 Jan 2009 19:34:53 +0000 Subject: [PATCH] Merge in Rob Charlton's patch for outbound session support in mod_erlang_event git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@11376 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- .../event_handlers/mod_erlang_event/Makefile | 5 +- .../mod_erlang_event/ei_helpers.c | 183 +++ .../mod_erlang_event/handle_msg.c | 713 +++++++++ .../mod_erlang_event/mod_erlang_event.c | 1406 +++++------------ .../mod_erlang_event/mod_erlang_event.h | 190 +++ 5 files changed, 1526 insertions(+), 971 deletions(-) create mode 100644 src/mod/event_handlers/mod_erlang_event/ei_helpers.c create mode 100644 src/mod/event_handlers/mod_erlang_event/handle_msg.c create mode 100644 src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h diff --git a/src/mod/event_handlers/mod_erlang_event/Makefile b/src/mod/event_handlers/mod_erlang_event/Makefile index bd7d438850..0ff1a89901 100644 --- a/src/mod/event_handlers/mod_erlang_event/Makefile +++ b/src/mod/event_handlers/mod_erlang_event/Makefile @@ -1,5 +1,8 @@ BASE=../../../.. +LOCAL_SOURCES=handle_msg.c ei_helpers.c +LOCAL_OBJS=handle_msg.o ei_helpers.o include $(BASE)/build/modmake.rules -LOCAL_CFLAGS=-I/usr/local/lib/erlang/lib/erl_interface-3.5.8/include -L/usr/local/lib/erlang/lib/erl_interface-3.5.8/lib/ -D_REENTRANT + +LOCAL_CFLAGS=-I/usr/local/lib/erlang/lib/erl_interface-3.5.9/include -L/usr/local/lib/erlang/lib/erl_interface-3.5.9/lib/ -D_REENTRANT LOCAL_LDFLAGS=-lei diff --git a/src/mod/event_handlers/mod_erlang_event/ei_helpers.c b/src/mod/event_handlers/mod_erlang_event/ei_helpers.c new file mode 100644 index 0000000000..5b3bd47c65 --- /dev/null +++ b/src/mod/event_handlers/mod_erlang_event/ei_helpers.c @@ -0,0 +1,183 @@ +/* + * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * Copyright (C) 2005/2006, Anthony Minessale II + * + * Version: MPL 1.1 + * + * The contents of this file are subject to the Mozilla Public License Version + * 1.1 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + * for the specific language governing rights and limitations under the + * License. + * + * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * + * The Initial Developer of the Original Code is + * Anthony Minessale II + * Portions created by the Initial Developer are Copyright (C) + * the Initial Developer. All Rights Reserved. + * + * Contributor(s): + * + * Anthony Minessale II + * Andrew Thompson + * Rob Charlton + * + * + * ei_helpers.c -- helper functions for ei + * + */ +#include +#include +#include "mod_erlang_event.h" + +/* Stolen from code added to ei in R12B-5. + * Since not everyone has this version yet; + * provide our own version. + * */ + +#define put8(s,n) do { \ + (s)[0] = (char)((n) & 0xff); \ + (s) += 1; \ +} while (0) + +#define put32be(s,n) do { \ + (s)[0] = ((n) >> 24) & 0xff; \ + (s)[1] = ((n) >> 16) & 0xff; \ + (s)[2] = ((n) >> 8) & 0xff; \ + (s)[3] = (n) & 0xff; \ + (s) += 4; \ +} while (0) + +void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to) { + char msgbuf[2048]; + char *s; + int index = 0; + /*int n;*/ + + index = 5; /* max sizes: */ + ei_encode_version(msgbuf,&index); /* 1 */ + ei_encode_tuple_header(msgbuf,&index,3); + ei_encode_long(msgbuf,&index,ERL_LINK); + ei_encode_pid(msgbuf,&index,from); /* 268 */ + ei_encode_pid(msgbuf,&index,to); /* 268 */ + + /* 5 byte header missing */ + s = msgbuf; + put32be(s, index - 4); /* 4 */ + put8(s, ERL_PASS_THROUGH); /* 1 */ + /* sum: 542 */ + + switch_mutex_lock(listener->sock_mutex); + write(listener->sockfd, msgbuf, index); + switch_mutex_unlock(listener->sock_mutex); +} + +void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event) +{ + int i; + char *uuid = switch_event_get_header(event, "unique-id"); + + switch_event_header_t *hp; + + for (i = 0, hp = event->headers; hp; hp = hp->next, i++); + + if (event->body) + i++; + + ei_x_encode_list_header(ebuf, i+1); + + if (uuid) { + ei_x_encode_string(ebuf, switch_event_get_header(event, "unique-id")); + } else { + ei_x_encode_atom(ebuf, "undefined"); + } + + for (hp = event->headers; hp; hp = hp->next) { + ei_x_encode_tuple_header(ebuf, 2); + ei_x_encode_string(ebuf, hp->name); + ei_x_encode_string(ebuf, hp->value); + } + + if (event->body) { + ei_x_encode_tuple_header(ebuf, 2); + ei_x_encode_string(ebuf, "body"); + ei_x_encode_string(ebuf, event->body); + } + + ei_x_encode_empty_list(ebuf); +} + + +void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag) +{ + + ei_x_encode_tuple_header(ebuf, 2); + ei_x_encode_atom(ebuf, tag); + ei_encode_switch_event_headers(ebuf, event); +} + + +switch_status_t initialise_ei(struct ei_cnode_s *ec) +{ + switch_status_t rv; + struct sockaddr_in server_addr; + + /* zero out the struct before we use it */ + memset(&server_addr, 0, sizeof(server_addr)); + + /* convert the configured IP to network byte order, handing errors */ + rv = inet_pton(AF_INET, prefs.ip, &server_addr.sin_addr.s_addr); + if (rv == 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not parse invalid ip address: %s\n", prefs.ip); + return SWITCH_STATUS_FALSE; + } else if (rv == -1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error when parsing ip address %s : %s\n", prefs.ip, strerror(errno)); + return SWITCH_STATUS_FALSE; + } + + /* set the address family and port */ + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(prefs.port); + + struct hostent *nodehost = gethostbyaddr(&server_addr.sin_addr.s_addr, sizeof(server_addr.sin_addr.s_addr), AF_INET); + + char *thishostname = nodehost->h_name; + char thisnodename[MAXNODELEN+1]; + + if (!strcmp(thishostname, "localhost")) + gethostname(thishostname, EI_MAXHOSTNAMELEN); + + if (prefs.shortname) { + char *off; + if ((off = strchr(thishostname, '.'))) { + *off = '\0'; + } + } + + snprintf(thisnodename, MAXNODELEN+1, "%s@%s", prefs.nodename, thishostname); + + /* init the ei stuff */ + if (ei_connect_xinit(ec, thishostname, prefs.nodename, thisnodename, (Erl_IpAddr)(&server_addr.sin_addr.s_addr), prefs.cookie, 0) < 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n"); + return SWITCH_STATUS_FALSE; + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ei initialized at %s\n", thisnodename); + return SWITCH_STATUS_SUCCESS; +} + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4: + */ diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c new file mode 100644 index 0000000000..0ca38dd357 --- /dev/null +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -0,0 +1,713 @@ +/* + * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * Copyright (C) 2005/2006, Anthony Minessale II + * + * Version: MPL 1.1 + * + * The contents of this file are subject to the Mozilla Public License Version + * 1.1 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + * for the specific language governing rights and limitations under the + * License. + * + * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * + * The Initial Developer of the Original Code is + * Anthony Minessale II + * Portions created by the Initial Developer are Copyright (C) + * the Initial Developer. All Rights Reserved. + * + * Contributor(s): + * + * Anthony Minessale II + * Andrew Thompson + * Rob Charlton + * + * + * handle_msg.c -- handle messages received from erlang nodes + * + */ +#include +#include +#include "mod_erlang_event.h" + +static char *MARKER = "1"; + +static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) +{ + switch_bool_t r = SWITCH_TRUE; + struct api_command_struct *acs = (struct api_command_struct *) obj; + switch_stream_handle_t stream = { 0 }; + char *reply, *freply = NULL; + switch_status_t status; + + if (!acs) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Internal error.\n"); + return NULL; + } + + if (!acs->listener || !acs->listener->rwlock || switch_thread_rwlock_tryrdlock(acs->listener->rwlock) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! cannot get read lock.\n"); + goto done; + } + + SWITCH_STANDARD_STREAM(stream); + + if ((status = switch_api_execute(acs->api_cmd, acs->arg, NULL, &stream)) == SWITCH_STATUS_SUCCESS) { + reply = stream.data; + } else { + freply = switch_mprintf("%s: Command not found!\n", acs->api_cmd); + reply = freply; + r = SWITCH_FALSE; + } + + if (!reply) { + reply = "Command returned no output!"; + r = SWITCH_FALSE; + } + + if (*reply == '-') + r = SWITCH_FALSE; + + if (acs->bg) { + switch_event_t *event; + + if (switch_event_create(&event, SWITCH_EVENT_BACKGROUND_JOB) == SWITCH_STATUS_SUCCESS) { + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-UUID", acs->uuid_str); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command", acs->api_cmd); + + ei_x_buff ebuf; + ei_x_new_with_version(&ebuf); + + if (acs->arg) { + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command-Arg", acs->arg); + } + + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Successful", r ? "true" : "false"); + switch_event_add_body(event, "%s", reply); + + switch_event_fire(&event); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending bgapi reply to %s\n", acs->pid.node); + + ei_x_encode_tuple_header(&ebuf, 3); + + if (r) + ei_x_encode_atom(&ebuf, "bgok"); + else + ei_x_encode_atom(&ebuf, "bgerror"); + + ei_x_encode_string(&ebuf, acs->uuid_str); + ei_x_encode_string(&ebuf, reply); + + switch_mutex_lock(acs->listener->sock_mutex); + ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index); + switch_mutex_unlock(acs->listener->sock_mutex); + + ei_x_free(&ebuf); + } + } else { + ei_x_buff rbuf; + ei_x_new_with_version(&rbuf); + ei_x_encode_tuple_header(&rbuf, 2); + + if (!strlen(reply)) { + reply = "Command returned no output!"; + r = SWITCH_FALSE; + } + + if (r) { + ei_x_encode_atom(&rbuf, "ok"); + } else { + ei_x_encode_atom(&rbuf, "error"); + } + + ei_x_encode_string(&rbuf, reply); + + + switch_mutex_lock(acs->listener->sock_mutex); + ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index); + switch_mutex_unlock(acs->listener->sock_mutex); + + ei_x_free(&rbuf); + } + + switch_safe_free(stream.data); + switch_safe_free(freply); + + if (acs->listener->rwlock) { + switch_thread_rwlock_unlock(acs->listener->rwlock); + } + + done: + if (acs->bg) { + switch_memory_pool_t *pool = acs->pool; + acs = NULL; + switch_core_destroy_memory_pool(&pool); + pool = NULL; + } + return NULL; + +} + +static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff *buf, ei_x_buff *rbuf) +{ + char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; + + if (ei_decode_string(buf->buff, &buf->index, uuid_str)) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else { + ei_x_buff *nbuf = switch_core_alloc(listener->pool, sizeof(nbuf)); + nbuf->buff = switch_core_alloc(listener->pool, buf->buffsz); + memcpy(nbuf->buff, buf->buff, buf->buffsz); + nbuf->index = buf->index; + nbuf->buffsz = buf->buffsz; + + switch_core_hash_insert(listener->fetch_reply_hash, uuid_str, nbuf); + ei_x_encode_atom(rbuf, "ok"); + } + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t handle_msg_set_log_level(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf) +{ + switch_log_level_t ltype = SWITCH_LOG_DEBUG; + char loglevelstr[MAXATOMLEN]; + if (arity != 2 || + ei_decode_atom(buf->buff, &buf->index, loglevelstr)) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else { + ltype = switch_log_str2level(loglevelstr); + + if (ltype && ltype != SWITCH_LOG_INVALID) { + listener->level = ltype; + ei_x_encode_atom(rbuf, "ok"); + } else { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + } + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf) +{ + char atom[MAXATOMLEN]; + + if (arity == 1) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else { + int custom = 0; + switch_event_types_t type; + + if (!switch_test_flag(listener, LFLAG_EVENTS)) { + switch_set_flag_locked(listener, LFLAG_EVENTS); + } + + for (int i = 1; i < arity; i++) { + if (!ei_decode_atom(buf->buff, &buf->index, atom)) { + + if (custom) { + switch_core_hash_insert(listener->event_hash, atom, MARKER); + } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) { + if (type == SWITCH_EVENT_ALL) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "ALL events enabled\n"); + uint32_t x = 0; + for (x = 0; x < SWITCH_EVENT_ALL; x++) { + listener->event_list[x] = 1; + } + } + if (type <= SWITCH_EVENT_ALL) { + listener->event_list[type] = 1; + } + if (type == SWITCH_EVENT_CUSTOM) { + custom++; + } + + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom); + } + } + ei_x_encode_atom(rbuf, "ok"); + } + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf) +{ + char atom[MAXATOMLEN]; + + if (arity == 1) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else { + int custom = 0; + switch_event_types_t type; + + for (int i = 1; i < arity; i++) { + if (!ei_decode_atom(buf->buff, &buf->index, atom)) { + + if (custom) { + switch_core_hash_delete(listener->event_hash, atom); + } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) { + uint32_t x = 0; + + if (type == SWITCH_EVENT_CUSTOM) { + custom++; + } else if (type == SWITCH_EVENT_ALL) { + for (x = 0; x <= SWITCH_EVENT_ALL; x++) { + listener->event_list[x] = 0; + } + } else { + if (listener->event_list[SWITCH_EVENT_ALL]) { + listener->event_list[SWITCH_EVENT_ALL] = 0; + for (x = 0; x < SWITCH_EVENT_ALL; x++) { + listener->event_list[x] = 1; + } + } + listener->event_list[type] = 0; + } + } + } + } + ei_x_encode_atom(rbuf, "ok"); + } + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t handle_msg_api(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff *buf, ei_x_buff *rbuf) +{ + char api_cmd[MAXATOMLEN]; + char arg[1024]; + if (arity < 3 || + ei_decode_atom(buf->buff, &buf->index, api_cmd) || + ei_decode_string(buf->buff, &buf->index, arg)) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + return SWITCH_STATUS_SUCCESS; + } + else { + struct api_command_struct acs = { 0 }; + acs.listener = listener; + acs.api_cmd = api_cmd; + acs.arg = arg; + acs.bg = 0; + acs.pid = msg->from; + api_exec(NULL, (void *) &acs); + /* don't reply */ + return SWITCH_STATUS_FALSE; + } +} + +static switch_status_t handle_msg_bgapi(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff *buf, ei_x_buff *rbuf) +{ + char api_cmd[MAXATOMLEN]; + char arg[1024]; + if (arity < 3 || + ei_decode_atom(buf->buff, &buf->index, api_cmd) || + ei_decode_string(buf->buff, &buf->index, arg)) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else { + struct api_command_struct *acs = NULL; + switch_memory_pool_t *pool; + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + switch_uuid_t uuid; + + switch_core_new_memory_pool(&pool); + acs = switch_core_alloc(pool, sizeof(*acs)); + switch_assert(acs); + acs->pool = pool; + acs->listener = listener; + acs->api_cmd = switch_core_strdup(acs->pool, api_cmd); + acs->arg = switch_core_strdup(acs->pool, arg); + acs->bg = 1; + acs->pid = msg->from; + + switch_threadattr_create(&thd_attr, acs->pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + + switch_uuid_get(&uuid); + switch_uuid_format(acs->uuid_str, &uuid); + switch_thread_create(&thread, thd_attr, api_exec, acs, acs->pool); + + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "ok"); + ei_x_encode_string(rbuf, acs->uuid_str); + } + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t handle_msg_sendevent(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf) +{ + char ename[MAXATOMLEN]; + int headerlength; + + if (ei_decode_atom(buf->buff, &buf->index, ename) || + ei_decode_list_header(buf->buff, &buf->index, &headerlength)) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else { + switch_event_types_t etype; + if (switch_name_event(ename, &etype) == SWITCH_STATUS_SUCCESS) { + switch_event_t *event; + if (switch_event_create(&event, etype) == SWITCH_STATUS_SUCCESS) { + char key[1024]; + char value[1024]; + int i = 0; + switch_bool_t fail = SWITCH_FALSE; + + while(!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) { + i++; + if (ei_decode_string(buf->buff, &buf->index, key) || + ei_decode_string(buf->buff, &buf->index, value)) { + fail = SWITCH_TRUE; + break; + } + + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, key, value); + } + + if (headerlength != i || fail) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else { + switch_event_fire(&event); + ei_x_encode_atom(rbuf, "ok"); + } + } + } + } + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t handle_msg_sendmsg(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf) +{ + char uuid[37]; + int headerlength; + + if (ei_decode_string(buf->buff, &buf->index, uuid) || + ei_decode_list_header(buf->buff, &buf->index, &headerlength)) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else { + switch_core_session_t *session; + if (!switch_strlen_zero(uuid) && (session = switch_core_session_locate(uuid))) { + switch_event_t *event; + if (switch_event_create(&event, SWITCH_EVENT_SEND_MESSAGE) == SWITCH_STATUS_SUCCESS) { + + char key[1024]; + char value[1024]; + int i = 0; + switch_bool_t fail = SWITCH_FALSE; + while(!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) { + i++; + if (ei_decode_string(buf->buff, &buf->index, key) || + ei_decode_string(buf->buff, &buf->index, value)) { + fail = SWITCH_TRUE; + break; + } + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, key, value); + } + + if (headerlength != i || fail) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else { + if (switch_core_session_queue_private_event(session, &event) == SWITCH_STATUS_SUCCESS) { + ei_x_encode_atom(rbuf, "ok"); + } else { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badmem"); + } + + } + } + /* release the lock returned by switch_core_locate_session */ + switch_core_session_rwunlock(session); + + } else { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "nosession"); + } + } + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) +{ + /* format is (result|config|directory|dialplan|phrases) */ + char sectionstr[MAXATOMLEN]; + switch_xml_section_t section; + + if (ei_decode_atom(buf->buff, &buf->index, sectionstr) || + !(section = switch_xml_parse_section_string(sectionstr))) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else { + struct erlang_binding *binding, *ptr; + + if (!(binding = switch_core_alloc(listener->pool, sizeof(*binding)))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n"); + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badmem"); + } + else { + binding->section = section; + binding->pid = msg->from; + binding->listener = listener; + + switch_core_hash_init(&listener->fetch_reply_hash, listener->pool); + + switch_mutex_lock(globals.listener_mutex); + + for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next); + + if (ptr) { + ptr->next = binding; + } else { + bindings.head = binding; + } + + switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section); + switch_mutex_unlock(globals.listener_mutex); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding)); + + ei_link(listener, ei_self(listener->ec), &msg->from); + ei_x_encode_atom(rbuf, "ok"); + } + } + return SWITCH_STATUS_SUCCESS; +} + +/* {handlecall,,} */ +static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf) +{ + char reg_name[MAXATOMLEN]; + char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; + + if (arity != 3 || + ei_decode_string(buf->buff, &buf->index, uuid_str) || + ei_decode_string(buf->buff, &buf->index, reg_name)) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else { + switch_core_session_t *session; + if (!switch_strlen_zero(uuid_str) && (session = switch_core_session_locate(uuid_str))) { + /* create a new sesion list element and attach it to this listener */ + if (attach_call_to_listener(listener,reg_name,session)) { + ei_x_encode_atom(rbuf, "ok"); + } + else { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badsession"); + } + } + else { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + } + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) +{ + char tupletag[MAXATOMLEN]; + int arity; + switch_status_t ret = SWITCH_STATUS_SUCCESS; + + ei_decode_tuple_header(buf->buff, &buf->index, &arity); + if (ei_decode_atom(buf->buff, &buf->index, tupletag)) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else { + if (!strncmp(tupletag, "fetch_reply", MAXATOMLEN)) { + ret = handle_msg_fetch_reply(listener,buf,rbuf); + } else if (!strncmp(tupletag, "set_log_level", MAXATOMLEN)) { + ret = handle_msg_set_log_level(listener,arity,buf,rbuf); + } else if (!strncmp(tupletag, "event", MAXATOMLEN)) { + ret = handle_msg_event(listener,arity,buf,rbuf); + } else if (!strncmp(tupletag, "nixevent", MAXATOMLEN)) { + ret = handle_msg_nixevent(listener,arity,buf,rbuf); + } else if (!strncmp(tupletag, "api", MAXATOMLEN)) { + ret = handle_msg_api(listener,msg,arity,buf,rbuf); + } else if (!strncmp(tupletag, "bgapi", MAXATOMLEN)) { + ret = handle_msg_bgapi(listener,msg,arity,buf,rbuf); + } else if (!strncmp(tupletag, "sendevent", MAXATOMLEN)) { + ret = handle_msg_sendevent(listener,arity,buf,rbuf); + } else if (!strncmp(tupletag, "sendmsg", MAXATOMLEN)) { + ret = handle_msg_sendmsg(listener,arity,buf,rbuf); + } else if (!strncmp(tupletag, "bind", MAXATOMLEN)) { + ret = handle_msg_bind(listener,msg,buf,rbuf); + } else if (!strncmp(tupletag, "handlecall", MAXATOMLEN)) { + ret = handle_msg_handlecall(listener,arity,buf,rbuf); + } else { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "undef"); + } + } + return ret; +} + +static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) +{ + char atom[MAXATOMLEN]; + switch_status_t ret = SWITCH_STATUS_SUCCESS; + + if (ei_decode_atom(buf->buff, &buf->index, atom)) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + } + else if (!strncmp(atom, "nolog", MAXATOMLEN)) { + if (switch_test_flag(listener, LFLAG_LOG)) { + switch_clear_flag_locked(listener, LFLAG_LOG); + } + ei_x_encode_atom(rbuf, "ok"); + } else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) { + ei_link(listener, ei_self(listener->ec), &msg->from); + listener->log_pid = msg->from; + listener->level = SWITCH_LOG_DEBUG; + switch_set_flag(listener, LFLAG_LOG); + ei_x_encode_atom(rbuf, "ok"); + } else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) { + ei_link(listener, ei_self(listener->ec), &msg->from); + listener->event_pid = msg->from; + if (!switch_test_flag(listener, LFLAG_EVENTS)) { + switch_set_flag_locked(listener, LFLAG_EVENTS); + } + ei_x_encode_atom(rbuf, "ok"); + } else if (!strncmp(atom, "noevents", MAXATOMLEN)) { + void *pop; + /*purge the event queue */ + while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS); + + if (switch_test_flag(listener, LFLAG_EVENTS)) { + uint8_t x = 0; + switch_clear_flag_locked(listener, LFLAG_EVENTS); + for (x = 0; x <= SWITCH_EVENT_ALL; x++) { + listener->event_list[x] = 0; + } + /* wipe the hash */ + switch_core_hash_destroy(&listener->event_hash); + switch_core_hash_init(&listener->event_hash, listener->pool); + ei_x_encode_atom(rbuf, "ok"); + } else { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "notlistening"); + } + } else if (!strncmp(atom, "exit", MAXATOMLEN)) { + ei_x_encode_atom(rbuf, "ok"); + ret = SWITCH_STATUS_TERM; + } else if (!strncmp(atom, "getpid", MAXATOMLEN)) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "ok"); + ei_x_encode_pid(rbuf, ei_self(listener->ec)); + } else if (!strncmp(atom, "link", MAXATOMLEN)) { + /* debugging */ + ei_link(listener, ei_self(listener->ec), &msg->from); + ret = SWITCH_STATUS_FALSE; + } else { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "undef"); + } + + return ret; +} + +int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) +{ + int type, size, version; + switch_status_t ret = SWITCH_STATUS_SUCCESS; + + buf->index = 0; + ei_decode_version(buf->buff, &buf->index, &version); + ei_get_type(buf->buff, &buf->index, &type, &size); + + switch(type) { + case ERL_SMALL_TUPLE_EXT : + case ERL_LARGE_TUPLE_EXT : + ret = handle_msg_tuple(listener,msg,buf,rbuf); + break; + + case ERL_ATOM_EXT : + ret = handle_msg_atom(listener,msg,buf,rbuf); + break; + + default : + /* some other kind of erlang term */ + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "undef"); + break; + } + + if (SWITCH_STATUS_FALSE==ret) + return 0; + else { + switch_mutex_lock(listener->sock_mutex); + ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index); + switch_mutex_unlock(listener->sock_mutex); + + if (SWITCH_STATUS_SUCCESS==ret) + return 0; + else /* SWITCH_STATUS_TERM */ + return 1; + } +} + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4: + */ diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index ef6e26c0ec..75d62fa0fa 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -25,101 +25,22 @@ * * Anthony Minessale II * Andrew Thompson + * Rob Charlton * * * mod_erlang_event.c -- Erlang Event Handler derived from mod_event_socket * */ #include - #include +#define DEFINE_GLOBALS +#include "mod_erlang_event.h" SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load); SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown); SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime); SWITCH_MODULE_DEFINITION(mod_erlang_event, mod_erlang_event_load, mod_erlang_event_shutdown, mod_erlang_event_runtime); -static char *MARKER = "1"; - -typedef enum { - LFLAG_AUTHED = (1 << 0), - LFLAG_RUNNING = (1 << 1), - LFLAG_EVENTS = (1 << 2), - LFLAG_LOG = (1 << 3), - LFLAG_FULL = (1 << 4), - LFLAG_MYEVENTS = (1 << 5), - LFLAG_SESSION = (1 << 6), - LFLAG_ASYNC = (1 << 7), - LFLAG_STATEFUL = (1 << 8) -} event_flag_t; - -struct listener { - int sockfd; - struct ei_cnode_s *ec; - erlang_pid log_pid; - erlang_pid event_pid; - switch_queue_t *event_queue; - switch_queue_t *log_queue; - switch_memory_pool_t *pool; - switch_mutex_t *flag_mutex; - switch_mutex_t *sock_mutex; - char *ebuf; - uint32_t flags; - switch_log_level_t level; - uint8_t event_list[SWITCH_EVENT_ALL + 1]; - switch_hash_t *event_hash; - switch_hash_t *fetch_reply_hash; - switch_thread_rwlock_t *rwlock; - switch_core_session_t *session; - int lost_events; - int lost_logs; - time_t last_flush; - uint32_t timeout; - uint32_t id; - char remote_ip[50]; - /*switch_port_t remote_port;*/ - struct listener *next; -}; - -typedef struct listener listener_t; - -static struct { - int sockfd; - switch_mutex_t *sock_mutex; - listener_t *listeners; - uint8_t ready; -} listen_list; - -#define MAX_ACL 100 - -struct erlang_binding { - switch_xml_section_t section; - erlang_pid pid; - char *registered_process; /* TODO */ - listener_t *listener; - struct erlang_binding *next; -}; - -static struct { - struct erlang_binding *head; - switch_xml_binding_t *search_binding; -} bindings; - -static struct { - switch_mutex_t *mutex; - char *ip; - char *nodename; - switch_bool_t shortname; - uint16_t port; - char *cookie; - int done; - int threads; - char *acl[MAX_ACL]; - uint32_t acl_count; - uint32_t id; -} prefs; - - static void remove_listener(listener_t *listener); SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip); @@ -129,12 +50,6 @@ SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_nodename, prefs.nodename); static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj); static void launch_listener_thread(listener_t *listener); -static struct { - switch_mutex_t *listener_mutex; - switch_event_node_t *node; -} globals; - - static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_level_t level) { listener_t *l; @@ -173,49 +88,6 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l } -/* Stolen from code added to ei in R12B-5. - * Since not everyone has this verison yet; - * provide our own version. - * */ - -#define put8(s,n) do { \ - (s)[0] = (char)((n) & 0xff); \ - (s) += 1; \ -} while (0) - -#define put32be(s,n) do { \ - (s)[0] = ((n) >> 24) & 0xff; \ - (s)[1] = ((n) >> 16) & 0xff; \ - (s)[2] = ((n) >> 8) & 0xff; \ - (s)[3] = (n) & 0xff; \ - (s) += 4; \ -} while (0) - -static void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to) { - char msgbuf[2048]; - char *s; - int index = 0; - /*int n;*/ - - index = 5; /* max sizes: */ - ei_encode_version(msgbuf,&index); /* 1 */ - ei_encode_tuple_header(msgbuf,&index,3); - ei_encode_long(msgbuf,&index,ERL_LINK); - ei_encode_pid(msgbuf,&index,from); /* 268 */ - ei_encode_pid(msgbuf,&index,to); /* 268 */ - - /* 5 byte header missing */ - s = msgbuf; - put32be(s, index - 4); /* 4 */ - put8(s, ERL_PASS_THROUGH); /* 1 */ - /* sum: 542 */ - - switch_mutex_lock(listener->sock_mutex); - write(listener->sockfd, msgbuf, index); - switch_mutex_unlock(listener->sock_mutex); -} - - static void expire_listener(listener_t **listener) { void *pop; @@ -268,54 +140,33 @@ static void remove_binding(listener_t *listener, erlang_pid *pid) { } -static void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event) +static void send_event_to_attached_sessions(listener_t* listener, switch_event_t *event) { - int i; char *uuid = switch_event_get_header(event, "unique-id"); + switch_event_t *clone = NULL; + session_elem_t* s; - switch_event_header_t *hp; - - for (i = 0, hp = event->headers; hp; hp = hp->next, i++); - - if (event->body) - i++; - - ei_x_encode_list_header(ebuf, i+1); - - if (uuid) { - ei_x_encode_string(ebuf, switch_event_get_header(event, "unique-id")); - } else { - ei_x_encode_atom(ebuf, "undefined"); + if (!uuid) + return; + switch_mutex_lock(listener->session_mutex); + for (s = listener->session_list; s; s = s->next) { + /* check the event uuid against the uuid of each session */ + if (!strcmp(uuid, switch_core_session_get_uuid(s->session))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending event to attached session\n"); + if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) { + /* add the event to the queue for this session */ + if (switch_queue_trypush(s->event_queue, clone) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Lost event!\n"); + switch_event_destroy(&clone); + } + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n"); + } + } } - - for (hp = event->headers; hp; hp = hp->next) { - ei_x_encode_tuple_header(ebuf, 2); - ei_x_encode_string(ebuf, hp->name); - ei_x_encode_string(ebuf, hp->value); - } - - if (event->body) { - ei_x_encode_tuple_header(ebuf, 2); - ei_x_encode_string(ebuf, "body"); - ei_x_encode_string(ebuf, event->body); - } - - ei_x_encode_empty_list(ebuf); + switch_mutex_unlock(listener->session_mutex); } - -static void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag) -{ - - ei_x_encode_tuple_header(ebuf, 2); - ei_x_encode_atom(ebuf, tag); - ei_encode_switch_event_headers(ebuf, event); -} - - -#define ei_encode_switch_event(_b, _e) ei_encode_switch_event_tag(_b, _e, "event") - - static void event_handler(switch_event_t *event) { switch_event_t *clone = NULL; @@ -335,7 +186,12 @@ static void event_handler(switch_event_t *event) l = lp; lp = lp->next; - + + /* test all of the sessions attached to this event in case + one of them should receive it as well + */ + send_event_to_attached_sessions(l,event); + if (!switch_test_flag(l, LFLAG_EVENTS)) { continue; } @@ -355,13 +211,6 @@ static void event_handler(switch_event_t *event) } } - if (send && switch_test_flag(l, LFLAG_MYEVENTS)) { - char *uuid = switch_event_get_header(event, "unique-id"); - if (!uuid || strcmp(uuid, switch_core_session_get_uuid(l->session))) { - send = 0; - } - } - if (send) { if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) { if (switch_queue_trypush(l->event_queue, clone) == SWITCH_STATUS_SUCCESS) { @@ -429,134 +278,27 @@ static void remove_listener(listener_t *listener) switch_mutex_unlock(globals.listener_mutex); } - -struct api_command_struct { - char *api_cmd; - char *arg; - listener_t *listener; - char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; - uint8_t bg; - erlang_pid pid; - switch_memory_pool_t *pool; -}; - - -static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) +/* Search for a listener already talking to the specified node */ +static listener_t * find_listener(char* nodename) { - switch_bool_t r = SWITCH_TRUE; - struct api_command_struct *acs = (struct api_command_struct *) obj; - switch_stream_handle_t stream = { 0 }; - char *reply, *freply = NULL; - switch_status_t status; + listener_t *l = NULL; - if (!acs) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Internal error.\n"); - return NULL; - } - - if (!acs->listener || !acs->listener->rwlock || switch_thread_rwlock_tryrdlock(acs->listener->rwlock) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! cannot get read lock.\n"); - goto done; - } - - - SWITCH_STANDARD_STREAM(stream); - - if ((status = switch_api_execute(acs->api_cmd, acs->arg, NULL, &stream)) == SWITCH_STATUS_SUCCESS) { - reply = stream.data; - } else { - freply = switch_mprintf("%s: Command not found!\n", acs->api_cmd); - reply = freply; - r = SWITCH_FALSE; - } - - if (!reply) { - reply = "Command returned no output!"; - r = SWITCH_FALSE; - } - - if (*reply == '-') - r = SWITCH_FALSE; - - if (acs->bg) { - switch_event_t *event; - - if (switch_event_create(&event, SWITCH_EVENT_BACKGROUND_JOB) == SWITCH_STATUS_SUCCESS) { - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-UUID", acs->uuid_str); - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command", acs->api_cmd); - - ei_x_buff ebuf; - ei_x_new_with_version(&ebuf); - - if (acs->arg) { - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Command-Arg", acs->arg); - } - - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Job-Successful", r ? "true" : "false"); - switch_event_add_body(event, "%s", reply); - - switch_event_fire(&event); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending bgapi reply to %s\n", acs->pid.node); - - ei_x_encode_tuple_header(&ebuf, 3); - - if (r) - ei_x_encode_atom(&ebuf, "bgok"); - else - ei_x_encode_atom(&ebuf, "bgerror"); - - ei_x_encode_string(&ebuf, acs->uuid_str); - ei_x_encode_string(&ebuf, reply); - - switch_mutex_lock(acs->listener->sock_mutex); - ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index); - switch_mutex_unlock(acs->listener->sock_mutex); - - ei_x_free(&ebuf); + switch_mutex_lock(globals.listener_mutex); + for (l = listen_list.listeners; l; l = l->next) { + if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) { + break; } - } else { - ei_x_buff rbuf; - ei_x_new_with_version(&rbuf); - ei_x_encode_tuple_header(&rbuf, 2); - - if (!strlen(reply)) { - reply = "Command returned no output!"; - r = SWITCH_FALSE; - } - - if (r) { - ei_x_encode_atom(&rbuf, "ok"); - } else { - ei_x_encode_atom(&rbuf, "error"); - } - - ei_x_encode_string(&rbuf, reply); - - - switch_mutex_lock(acs->listener->sock_mutex); - ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index); - switch_mutex_unlock(acs->listener->sock_mutex); - - ei_x_free(&rbuf); } + switch_mutex_unlock(globals.listener_mutex); + return l; +} - switch_safe_free(stream.data); - switch_safe_free(freply); - - if (acs->listener->rwlock) { - switch_thread_rwlock_unlock(acs->listener->rwlock); - } - - done: - if (acs->bg) { - switch_memory_pool_t *pool = acs->pool; - acs = NULL; - switch_core_destroy_memory_pool(&pool); - pool = NULL; - } - return NULL; - +static void add_session_elem_to_listener(listener_t *listener, session_elem_t *session_element) +{ + switch_mutex_lock(listener->session_mutex); + session_element->next = listener->session_list; + listener->session_list = session_element; + switch_mutex_unlock(listener->session_mutex); } static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, const char *key_name, const char *key_value, @@ -650,571 +392,187 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c } -static int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) +static switch_status_t notify_new_session(listener_t *listener, switch_core_session_t *session, char* reg_name) { - int type, size, version, arity; - char tupletag[MAXATOMLEN]; - char atom[MAXATOMLEN]; - - buf->index = 0; - ei_decode_version(buf->buff, &buf->index, &version); - ei_get_type(buf->buff, &buf->index, &type, &size); - - switch(type) { - case ERL_SMALL_TUPLE_EXT : - case ERL_LARGE_TUPLE_EXT : - ei_decode_tuple_header(buf->buff, &buf->index, &arity); - if (ei_decode_atom(buf->buff, &buf->index, tupletag)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - if (!strncmp(tupletag, "fetch_reply", MAXATOMLEN)) { - char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; - - if (ei_decode_string(buf->buff, &buf->index, uuid_str)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - ei_x_buff *nbuf = switch_core_alloc(listener->pool, sizeof(nbuf)); - /*char *wtf = "hello world";*/ - nbuf->buff = switch_core_alloc(listener->pool, buf->buffsz); - memcpy(nbuf->buff, buf->buff, buf->buffsz); - /*memcpy(nbuf, wtf, 20);*/ - nbuf->index = buf->index; - nbuf->buffsz = buf->buffsz; - - /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "stored %d %d %s\n", buf->index, buf->buffsz, nbuf);*/ - - switch_core_hash_insert(listener->fetch_reply_hash, uuid_str, nbuf); - - } else if (!strncmp(tupletag, "set_log_level", MAXATOMLEN)) { - if (arity == 2) { - switch_log_level_t ltype = SWITCH_LOG_DEBUG; - char loglevelstr[MAXATOMLEN]; - if (ei_decode_atom(buf->buff, &buf->index, loglevelstr)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - ltype = switch_log_str2level(loglevelstr); - - if (ltype && ltype != SWITCH_LOG_INVALID) { - listener->level = ltype; - } else { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - } else { - /* tuple too long */ - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - } else if (!strncmp(tupletag, "event", MAXATOMLEN)) { - if (arity == 1) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - int custom = 0; - switch_event_types_t type; - - if (!switch_test_flag(listener, LFLAG_EVENTS)) { - switch_set_flag_locked(listener, LFLAG_EVENTS); - } - - for (int i = 1; i < arity; i++) { - if (!ei_decode_atom(buf->buff, &buf->index, atom)) { - - if (custom) { - switch_core_hash_insert(listener->event_hash, atom, MARKER); - } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) { - if (type == SWITCH_EVENT_ALL) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "ALL events enabled\n"); - uint32_t x = 0; - for (x = 0; x < SWITCH_EVENT_ALL; x++) { - listener->event_list[x] = 1; - } - } - if (type <= SWITCH_EVENT_ALL) { - listener->event_list[type] = 1; - } - if (type == SWITCH_EVENT_CUSTOM) { - custom++; - } - - } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom); - } - } - } else if (!strncmp(tupletag, "nixevent", MAXATOMLEN)) { - if (arity == 1) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - int custom = 0; - switch_event_types_t type; - - for (int i = 1; i < arity; i++) { - if (!ei_decode_atom(buf->buff, &buf->index, atom)) { - - if (custom) { - switch_core_hash_delete(listener->event_hash, atom); - } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) { - uint32_t x = 0; - - if (type == SWITCH_EVENT_CUSTOM) { - custom++; - } else if (type == SWITCH_EVENT_ALL) { - for (x = 0; x <= SWITCH_EVENT_ALL; x++) { - listener->event_list[x] = 0; - } - } else { - if (listener->event_list[SWITCH_EVENT_ALL]) { - listener->event_list[SWITCH_EVENT_ALL] = 0; - for (x = 0; x < SWITCH_EVENT_ALL; x++) { - listener->event_list[x] = 1; - } - } - listener->event_list[type] = 0; - } - } - } - } - } else if (!strncmp(tupletag, "api", MAXATOMLEN)) { - if (arity < 3) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - char api_cmd[MAXATOMLEN]; - char arg[1024]; - - if (ei_decode_atom(buf->buff, &buf->index, api_cmd)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - if (ei_decode_string(buf->buff, &buf->index, arg)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - struct api_command_struct acs = { 0 }; - acs.listener = listener; - acs.api_cmd = api_cmd; - acs.arg = arg; - acs.bg = 0; - acs.pid = msg->from; - api_exec(NULL, (void *) &acs); - goto noreply; - - } else if (!strncmp(tupletag, "bgapi", MAXATOMLEN)) { - if (arity < 3) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - char api_cmd[MAXATOMLEN]; - char arg[1024]; - - if (ei_decode_atom(buf->buff, &buf->index, api_cmd)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - if (ei_decode_string(buf->buff, &buf->index, arg)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - struct api_command_struct *acs = NULL; - switch_memory_pool_t *pool; - switch_thread_t *thread; - switch_threadattr_t *thd_attr = NULL; - switch_uuid_t uuid; - - switch_core_new_memory_pool(&pool); - acs = switch_core_alloc(pool, sizeof(*acs)); - switch_assert(acs); - acs->pool = pool; - acs->listener = listener; - acs->api_cmd = switch_core_strdup(acs->pool, api_cmd); - acs->arg = switch_core_strdup(acs->pool, arg); - acs->bg = 1; - acs->pid = msg->from; - - switch_threadattr_create(&thd_attr, acs->pool); - switch_threadattr_detach_set(thd_attr, 1); - switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - - switch_uuid_get(&uuid); - switch_uuid_format(acs->uuid_str, &uuid); - switch_thread_create(&thread, thd_attr, api_exec, acs, acs->pool); - - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "ok"); - ei_x_encode_string(rbuf, acs->uuid_str); - - break; - } else if (!strncmp(tupletag, "sendevent", MAXATOMLEN)) { - char ename[MAXATOMLEN]; - - if (ei_decode_atom(buf->buff, &buf->index, ename)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - int headerlength; - - if (ei_decode_list_header(buf->buff, &buf->index, &headerlength)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - switch_event_types_t etype; - if (switch_name_event(ename, &etype) == SWITCH_STATUS_SUCCESS) { - switch_event_t *event; - - if (switch_event_create(&event, etype) == SWITCH_STATUS_SUCCESS) { - - char key[1024]; - char value[1024]; - int i = 0; - while(!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) { - i++; - if (ei_decode_string(buf->buff, &buf->index, key)) - goto sendevent_fail; - if (ei_decode_string(buf->buff, &buf->index, value)) - goto sendevent_fail; - - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, key, value); - } - - if (headerlength != i) - goto sendevent_fail; - - - switch_event_fire(&event); - ei_x_encode_atom(rbuf, "ok"); - break; - -sendevent_fail: - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - } - } else if (!strncmp(tupletag, "sendmsg", MAXATOMLEN)) { - char uuid[37]; - - if (ei_decode_string(buf->buff, &buf->index, uuid)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - switch_core_session_t *session; - if (!switch_strlen_zero(uuid) && (session = switch_core_session_locate(uuid))) { - } else { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "nosession"); - break; - } - - int headerlength; - - if (ei_decode_list_header(buf->buff, &buf->index, &headerlength)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - switch_event_t *event; - - if (switch_event_create(&event, SWITCH_EVENT_SEND_MESSAGE) == SWITCH_STATUS_SUCCESS) { - - char key[1024]; - char value[1024]; - int i = 0; - while(!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) { - i++; - if (ei_decode_string(buf->buff, &buf->index, key)) - goto sendmsg_fail; - if (ei_decode_string(buf->buff, &buf->index, value)) - goto sendmsg_fail; - - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, key, value); - } - - if (headerlength != i) - goto sendmsg_fail; - - if (switch_core_session_queue_private_event(session, &event) == SWITCH_STATUS_SUCCESS) { - ei_x_encode_atom(rbuf, "ok"); - } else { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badmem"); - } - - /* release the lock returned by switch_core_locate_session */ - switch_core_session_rwunlock(session); - break; - -sendmsg_fail: - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - } else if (!strncmp(tupletag, "bind", MAXATOMLEN)) { - - /* format is (result|config|directory|dialplan|phrases) */ - char sectionstr[MAXATOMLEN]; - - if (ei_decode_atom(buf->buff, &buf->index, sectionstr)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - switch_xml_section_t section; - - if (!(section = switch_xml_parse_section_string(sectionstr))) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - struct erlang_binding *binding, *ptr; - - if (!(binding = switch_core_alloc(listener->pool, sizeof(*binding)))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n"); - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badmem"); - break; - } - - binding->section = section; - binding->pid = msg->from; - binding->listener = listener; - - switch_core_hash_init(&listener->fetch_reply_hash, listener->pool); - - switch_mutex_lock(globals.listener_mutex); - - for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next); - - if (ptr) { - ptr->next = binding; - } else { - bindings.head = binding; - } - - switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section); - switch_mutex_unlock(globals.listener_mutex); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding)); - - ei_link(listener, ei_self(listener->ec), &msg->from); - - } else { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "undef"); - break; - } - - ei_x_encode_atom(rbuf, "ok"); - break; - case ERL_ATOM_EXT : - if (ei_decode_atom(buf->buff, &buf->index, atom)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "badarg"); - break; - } - - if (!strncmp(atom, "nolog", MAXATOMLEN)) { - if (switch_test_flag(listener, LFLAG_LOG)) { - switch_clear_flag_locked(listener, LFLAG_LOG); - } - } else if (!strncmp(atom, "register_log_handler", MAXATOMLEN)) { - ei_link(listener, ei_self(listener->ec), &msg->from); - listener->log_pid = msg->from; - listener->level = SWITCH_LOG_DEBUG; - switch_set_flag(listener, LFLAG_LOG); - } else if (!strncmp(atom, "register_event_handler", MAXATOMLEN)) { - ei_link(listener, ei_self(listener->ec), &msg->from); - listener->event_pid = msg->from; - if (!switch_test_flag(listener, LFLAG_EVENTS)) { - switch_set_flag_locked(listener, LFLAG_EVENTS); - } - } else if (!strncmp(atom, "noevents", MAXATOMLEN)) { - void *pop; - /*purge the event queue */ - while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS); - - if (switch_test_flag(listener, LFLAG_EVENTS)) { - uint8_t x = 0; - switch_clear_flag_locked(listener, LFLAG_EVENTS); - for (x = 0; x <= SWITCH_EVENT_ALL; x++) { - listener->event_list[x] = 0; - } - /* wipe the hash */ - switch_core_hash_destroy(&listener->event_hash); - switch_core_hash_init(&listener->event_hash, listener->pool); - } else { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "notlistening"); - break; - } - } else if (!strncmp(atom, "exit", MAXATOMLEN)) { - switch_clear_flag_locked(listener, LFLAG_RUNNING); - ei_x_encode_atom(rbuf, "ok"); - goto event_done; - } else if (!strncmp(atom, "getpid", MAXATOMLEN)) { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "ok"); - ei_x_encode_pid(rbuf, ei_self(listener->ec)); - } else if (!strncmp(atom, "link", MAXATOMLEN)) { - /* debugging */ - ei_link(listener, ei_self(listener->ec), &msg->from); - goto noreply; - } else { - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "undef"); - break; - } - - ei_x_encode_atom(rbuf, "ok"); - break; - default : - /* some other kind of erlang term */ - ei_x_encode_tuple_header(rbuf, 2); - ei_x_encode_atom(rbuf, "error"); - ei_x_encode_atom(rbuf, "undef"); - break; + switch_event_t *call_event=NULL; + switch_channel_t *channel=NULL; + + /* Send a message to the associated registered process to let it know there is a call. + Message is a tuple of the form {call, } + */ + channel = switch_core_session_get_channel(session); + if (switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error!\n"); + return SWITCH_STATUS_MEMERR; } - + switch_caller_profile_event_set_data(switch_channel_get_caller_profile(channel), "Channel", call_event); + switch_channel_event_set_data(channel, call_event); + switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "Content-Type", "command/reply"); + switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "Reply-Text", "+OK\n"); + ei_x_buff lbuf; + ei_x_new_with_version(&lbuf); + ei_x_encode_tuple_header(&lbuf, 2); + ei_x_encode_atom(&lbuf, "call"); + ei_encode_switch_event(&lbuf, call_event); switch_mutex_lock(listener->sock_mutex); - ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending initial call event\n"); + if (ei_reg_send(listener->ec,listener->sockfd, reg_name, lbuf.buff, lbuf.index)==ERL_ERROR) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to send call event\n"); + } switch_mutex_unlock(listener->sock_mutex); -noreply: - return 0; - -event_done: - switch_mutex_lock(listener->sock_mutex); - ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index); - switch_mutex_unlock(listener->sock_mutex); - return 1; + + ei_x_free(&lbuf); + return SWITCH_STATUS_SUCCESS; } - -static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) +static switch_status_t check_attached_sessions(listener_t *listener) +{ + session_elem_t *last,*sp; + switch_status_t status = SWITCH_STATUS_SUCCESS; + void *pop; + /* check up on all the attached sessions - + if they have not yet sent an initial call event to the associated erlang process then do so + if they have pending events in their queues then send them + if the session has finished then clean it up + */ + switch_mutex_lock(listener->session_mutex); + sp = listener->session_list; + last = NULL; + while(sp) { + if (!switch_test_flag(sp, LFLAG_OUTBOUND_INIT)) { + status = notify_new_session(listener, sp->session, sp->reg_name); + if (status != SWITCH_STATUS_SUCCESS) + break; + switch_set_flag(sp, LFLAG_OUTBOUND_INIT); + } + /* check event queue for this session */ + if (switch_queue_trypop(sp->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { + switch_event_t *pevent = (switch_event_t *) pop; + + /* events from attached sessions are wrapped in a {call_event,} tuple + to distinguish them from normal events (if they are sent to the same process) + */ + ei_x_buff ebuf; + ei_x_new_with_version(&ebuf); + ei_x_encode_tuple_header(&ebuf, 2); + ei_x_encode_atom(&ebuf, "call_event"); + ei_encode_switch_event(&ebuf, pevent); + + switch_mutex_lock(listener->sock_mutex); + ei_reg_send(listener->ec, listener->sockfd, sp->reg_name, ebuf.buff, ebuf.index); + switch_mutex_unlock(listener->sock_mutex); + + /* event is a hangup, so this session can be removed */ + if (pevent->event_id == SWITCH_EVENT_CHANNEL_HANGUP) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hangup event for attached session\n"); + + /* remove session from list */ + if (last) + last->next = sp->next; + else + listener->session_list = sp->next; + + /* this allows the application threads to exit */ + switch_clear_flag_locked(sp, LFLAG_SESSION_ALIVE); + + /* TODO + if this listener was created outbound, and the last session has been detached + should the listener also exit? Does it matter? + */ + } + + ei_x_free(&ebuf); + switch_event_destroy(&pevent); + } + last = sp; + sp = sp->next; + } + switch_mutex_unlock(listener->session_mutex); + return status; +} + +static void check_log_queue(listener_t *listener) { - listener_t *listener = (listener_t *) obj; - switch_core_session_t *session = NULL; - switch_channel_t *channel = NULL; - int status = 1; void *pop; - switch_mutex_lock(globals.listener_mutex); - prefs.threads++; - switch_mutex_unlock(globals.listener_mutex); - - switch_assert(listener != NULL); - - if (prefs.acl_count && !switch_strlen_zero(listener->remote_ip)) { - uint32_t x = 0; - for (x = 0; x < prefs.acl_count; x++) { - if (!switch_check_network_list_ip(listener->remote_ip, prefs.acl[x])) { - erlang_msg msg; - - ei_x_buff buf; - ei_x_new(&buf); - - status = ei_xreceive_msg(listener->sockfd, &msg, &buf); - /* get data off the socket, just so we can get the pid on the other end */ - if (status == ERL_MSG) { - /* if we got a message, return an ACL error. */ - ei_x_buff rbuf; - ei_x_new_with_version(&rbuf); - - ei_x_encode_tuple_header(&rbuf, 2); - ei_x_encode_atom(&rbuf, "error"); - ei_x_encode_atom(&rbuf, "acldeny"); - - ei_send(listener->sockfd, &msg.from, rbuf.buff, rbuf.index); - - ei_x_free(&rbuf); - } - - ei_x_free(&buf); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection from %s denied by acl %s\n", listener->remote_ip, prefs.acl[x]); - goto done; + /* send out any pending crap in the log queue */ + if (switch_test_flag(listener, LFLAG_LOG)) { + if (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) { + switch_log_node_t *dnode = (switch_log_node_t *) pop; + + if (dnode->data) { + ei_x_buff lbuf; + ei_x_new_with_version(&lbuf); + ei_x_encode_tuple_header(&lbuf, 2); + ei_x_encode_atom(&lbuf, "log"); + ei_x_encode_list_header(&lbuf, 6); + + ei_x_encode_tuple_header(&lbuf, 2); + ei_x_encode_atom(&lbuf, "level"); + ei_x_encode_char(&lbuf, (unsigned char)dnode->level); + + ei_x_encode_tuple_header(&lbuf, 2); + ei_x_encode_atom(&lbuf, "text_channel"); + ei_x_encode_char(&lbuf, (unsigned char)dnode->level); + + ei_x_encode_tuple_header(&lbuf, 2); + ei_x_encode_atom(&lbuf, "file"); + ei_x_encode_string(&lbuf, dnode->file); + + ei_x_encode_tuple_header(&lbuf, 2); + ei_x_encode_atom(&lbuf, "func"); + ei_x_encode_string(&lbuf, dnode->func); + + ei_x_encode_tuple_header(&lbuf, 2); + ei_x_encode_atom(&lbuf, "line"); + ei_x_encode_ulong(&lbuf, (unsigned long)dnode->line); + + ei_x_encode_tuple_header(&lbuf, 2); + ei_x_encode_atom(&lbuf, "data"); + ei_x_encode_string(&lbuf, dnode->data); + + ei_x_encode_empty_list(&lbuf); + + switch_mutex_lock(listener->sock_mutex); + ei_send(listener->sockfd, &listener->log_pid, lbuf.buff, lbuf.index); + switch_mutex_unlock(listener->sock_mutex); + + ei_x_free(&lbuf); + free(dnode->data); + free(dnode); } } } +} - if ((session = listener->session)) { - channel = switch_core_session_get_channel(session); - if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) { - goto done; +static void check_event_queue(listener_t *listener) +{ + void* pop; + /* send out any pending crap in the event queue */ + if (switch_test_flag(listener, LFLAG_EVENTS)) { + if (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { + + switch_event_t *pevent = (switch_event_t *) pop; + + ei_x_buff ebuf; + ei_x_new_with_version(&ebuf); + + ei_encode_switch_event(&ebuf, pevent); + + switch_mutex_lock(listener->sock_mutex); + ei_send(listener->sockfd, &listener->event_pid, ebuf.buff, ebuf.index); + switch_mutex_unlock(listener->sock_mutex); + + ei_x_free(&ebuf); + switch_event_destroy(&pevent); } } +} - if (switch_strlen_zero(listener->remote_ip)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open\n"); - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip);/*, listener->remote_port);*/ - } - - switch_set_flag_locked(listener, LFLAG_RUNNING); - add_listener(listener); +static void listener_main_loop(listener_t *listener) +{ + int status = 1; while ((status >= 0 || erl_errno == ETIMEDOUT || erl_errno == EAGAIN) && !prefs.done) { erlang_msg msg; @@ -1237,13 +595,13 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) case ERL_SEND : /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_send\n");*/ if (handle_msg(listener, &msg, &buf, &rbuf)) { - goto done; + return; } break; case ERL_REG_SEND : /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_reg_send\n");*/ if (handle_msg(listener, &msg, &buf, &rbuf)) { - goto done; + return; } break; case ERL_LINK : @@ -1275,77 +633,76 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) ei_x_free(&buf); ei_x_free(&rbuf); - /* send out any pending crap in the log queue */ - if (switch_test_flag(listener, LFLAG_LOG)) { - if (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) { - switch_log_node_t *dnode = (switch_log_node_t *) pop; - - if (dnode->data) { - ei_x_buff lbuf; - ei_x_new_with_version(&lbuf); - ei_x_encode_tuple_header(&lbuf, 2); - ei_x_encode_atom(&lbuf, "log"); - ei_x_encode_list_header(&lbuf, 6); - - ei_x_encode_tuple_header(&lbuf, 2); - ei_x_encode_atom(&lbuf, "level"); - ei_x_encode_char(&lbuf, (unsigned char)dnode->level); - - ei_x_encode_tuple_header(&lbuf, 2); - ei_x_encode_atom(&lbuf, "text_channel"); - ei_x_encode_char(&lbuf, (unsigned char)dnode->level); - - ei_x_encode_tuple_header(&lbuf, 2); - ei_x_encode_atom(&lbuf, "file"); - ei_x_encode_string(&lbuf, dnode->file); - - ei_x_encode_tuple_header(&lbuf, 2); - ei_x_encode_atom(&lbuf, "func"); - ei_x_encode_string(&lbuf, dnode->func); - - ei_x_encode_tuple_header(&lbuf, 2); - ei_x_encode_atom(&lbuf, "line"); - ei_x_encode_ulong(&lbuf, (unsigned long)dnode->line); - - ei_x_encode_tuple_header(&lbuf, 2); - ei_x_encode_atom(&lbuf, "data"); - ei_x_encode_string(&lbuf, dnode->data); - - ei_x_encode_empty_list(&lbuf); - - switch_mutex_lock(listener->sock_mutex); - ei_send(listener->sockfd, &listener->log_pid, lbuf.buff, lbuf.index); - switch_mutex_unlock(listener->sock_mutex); - - ei_x_free(&lbuf); - free(dnode->data); - free(dnode); - } - } + check_log_queue(listener); + check_event_queue(listener); + if (SWITCH_STATUS_SUCCESS != check_attached_sessions(listener)) { + return; } + } +} - /* ditto with the event queue */ - if (switch_test_flag(listener, LFLAG_EVENTS)) { - if (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { +static switch_bool_t check_inbound_acl(listener_t* listener) +{ + /* check acl to see if inbound connection is allowed */ + if (prefs.acl_count && !switch_strlen_zero(listener->remote_ip)) { + uint32_t x = 0; + for (x = 0; x < prefs.acl_count; x++) { + if (!switch_check_network_list_ip(listener->remote_ip, prefs.acl[x])) { + int status = 1; + erlang_msg msg; - switch_event_t *pevent = (switch_event_t *) pop; + ei_x_buff buf; + ei_x_new(&buf); - ei_x_buff ebuf; - ei_x_new_with_version(&ebuf); + status = ei_xreceive_msg(listener->sockfd, &msg, &buf); + /* get data off the socket, just so we can get the pid on the other end */ + if (status == ERL_MSG) { + /* if we got a message, return an ACL error. */ + ei_x_buff rbuf; + ei_x_new_with_version(&rbuf); - ei_encode_switch_event(&ebuf, pevent); + ei_x_encode_tuple_header(&rbuf, 2); + ei_x_encode_atom(&rbuf, "error"); + ei_x_encode_atom(&rbuf, "acldeny"); - switch_mutex_lock(listener->sock_mutex); - ei_send(listener->sockfd, &listener->event_pid, ebuf.buff, ebuf.index); - switch_mutex_unlock(listener->sock_mutex); + ei_send(listener->sockfd, &msg.from, rbuf.buff, rbuf.index); - ei_x_free(&ebuf); - switch_event_destroy(&pevent); + ei_x_free(&rbuf); + } + + ei_x_free(&buf); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection from %s denied by acl %s\n", listener->remote_ip, prefs.acl[x]); + return SWITCH_FALSE; } } } + return SWITCH_TRUE; +} -done: +static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) +{ + listener_t *listener = (listener_t *) obj; + session_elem_t* s; + + switch_mutex_lock(globals.listener_mutex); + prefs.threads++; + switch_mutex_unlock(globals.listener_mutex); + + switch_assert(listener != NULL); + + if (check_inbound_acl(listener)) { + if (switch_strlen_zero(listener->remote_ip)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open\n"); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip);/*, listener->remote_port);*/ + } + + add_listener(listener); + listener_main_loop(listener); + } + + /* clean up */ remove_listener(listener); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n"); @@ -1364,16 +721,22 @@ done: /* remove any bindings for this connection */ remove_binding(listener, NULL); - if (listener->session) { - switch_channel_clear_flag(switch_core_session_get_channel(listener->session), CF_CONTROLLED); - switch_clear_flag_locked(listener, LFLAG_SESSION); - switch_core_session_rwunlock(listener->session); - } else if (listener->pool) { + /* clean up all the attached sessions */ + switch_mutex_lock(listener->session_mutex); + for (s = listener->session_list; s; s = s->next) { + switch_channel_clear_flag(switch_core_session_get_channel(s->session), CF_CONTROLLED); + /* this allows the application threads to exit */ + switch_clear_flag_locked(s, LFLAG_SESSION_ALIVE); + /* */ + switch_core_session_rwunlock(s->session); + } + switch_mutex_unlock(listener->session_mutex); + + if (listener->pool) { switch_memory_pool_t *pool = listener->pool; switch_core_destroy_memory_pool(&pool); } - switch_mutex_lock(globals.listener_mutex); prefs.threads--; switch_mutex_unlock(globals.listener_mutex); @@ -1454,12 +817,148 @@ static int config(void) return 0; } +static listener_t* new_listener(struct ei_cnode_s *ec, int clientfd) +{ + switch_memory_pool_t *listener_pool = NULL; + listener_t* listener = NULL; + + if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); + return NULL; + } + + if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n"); + return NULL; + } + + switch_thread_rwlock_create(&listener->rwlock, listener_pool); + switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener_pool); + switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, listener_pool); + + listener->ec = ec; + listener->sockfd = clientfd; + listener->pool = listener_pool; + listener_pool = NULL; + listener->level = SWITCH_LOG_DEBUG; + switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool); + switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool); + switch_mutex_init(&listener->session_mutex, SWITCH_MUTEX_NESTED, listener->pool); + switch_core_hash_init(&listener->event_hash, listener->pool); + + return listener; +} + +static listener_t* new_outbound_listener(char* node) +{ + listener_t* listener = NULL; + struct ei_cnode_s ec; + int clientfd; + + if (SWITCH_STATUS_SUCCESS==initialise_ei(&ec)) { + errno = 0; + if ((clientfd=ei_connect(&ec,node)) == ERL_ERROR) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error connecting to node %s (erl_errno=%d, errno=%d)!\n",node,erl_errno,errno); + return NULL; + } + listener = new_listener(&ec,clientfd); + listener->peer_nodename = switch_core_strdup(listener->pool,node); + } + return listener; +} + +session_elem_t* attach_call_to_listener(listener_t* listener, char* reg_name, switch_core_session_t *session) +{ + /* create a session list element */ + session_elem_t* session_element=NULL; + if (!(session_element = switch_core_alloc(switch_core_session_get_pool(session), sizeof(*session_element)))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n"); + } + else { + if (SWITCH_STATUS_SUCCESS != switch_core_session_read_lock(session)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n"); + } + else { + session_element->session = session; + session_element->reg_name = switch_core_strdup(switch_core_session_get_pool(session),reg_name); + switch_set_flag(session_element, LFLAG_SESSION_ALIVE); + switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); + switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); + switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); + /* attach the session to the listener */ + add_session_elem_to_listener(listener,session_element); + } + } + return session_element; +} /* Module Hooks */ +/* Entry point for outbound mode */ +SWITCH_STANDARD_APP(erlang_outbound_function) +{ + char *reg_name, *node; + listener_t *listener; + int argc = 0; + char *argv[80] = { 0 }; + char *mydata; + switch_bool_t new_session = SWITCH_FALSE; + session_elem_t* session_element=NULL; + + /* process app arguments */ + if (data && (mydata = switch_core_session_strdup(session, data))) { + argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0]))); + } + if (argc < 2) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Parse Error - need registered name and node!\n"); + return; + } + reg_name = argv[0]; + if (switch_strlen_zero(reg_name)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Missing registered name!\n"); + return; + } + node = argv[1]; + if (switch_strlen_zero(node)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Missing node name!\n"); + return; + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n",reg_name, node); + + /* first work out if there is a listener already talking to the node we want to talk to */ + listener = find_listener(node); + /* if there is no listener, then create one */ + if (!listener) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Creating new listener for session\n"); + new_session = SWITCH_TRUE; + listener = new_outbound_listener(node); + } + else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Using existing listener for session\n"); + } + if (listener && + (session_element=attach_call_to_listener(listener,reg_name,session)) != NULL) { + + if (new_session) + launch_listener_thread(listener); + switch_ivr_park(session, NULL); + + /* keep app thread running for lifetime of session */ + if (switch_channel_get_state(switch_core_session_get_channel(session)) >= CS_HANGUP) { + while (switch_test_flag(session_element, LFLAG_SESSION_ALIVE)) { + switch_yield(100000); + } + } + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "exit erlang_outbound_function\n"); +} + SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) { + switch_application_interface_t *app_interface; + switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool); if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) { @@ -1483,6 +982,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) /* connect my internal structure to the blank pointer passed to me */ *module_interface = switch_loadable_module_create_module_interface(pool, modname); + SWITCH_ADD_APP(app_interface, "erlang", "Connect to an erlang node", "Connect to erlang", erlang_outbound_function, " ", SAF_SUPPORT_NOMEDIA); + /* indicate that the module should continue to be loaded */ return SWITCH_STATUS_SUCCESS; } @@ -1557,25 +1058,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) switch_yield(100000); } - struct hostent *nodehost = gethostbyaddr(&server_addr.sin_addr.s_addr, sizeof(server_addr.sin_addr.s_addr), AF_INET); - - char *thishostname = nodehost->h_name; - char thisnodename[MAXNODELEN+1]; - - if (!strcmp(thishostname, "localhost")) - gethostname(thishostname, EI_MAXHOSTNAMELEN); - - if (prefs.shortname) { - char *off; - if ((off = strchr(thishostname, '.'))) { - *off = '\0'; - } - } - - snprintf(thisnodename, MAXNODELEN+1, "%s@%s", prefs.nodename, thishostname); - - /* init the ei stuff */ - if (ei_connect_xinit(&ec, thishostname, prefs.nodename, thisnodename, (Erl_IpAddr)(&server_addr.sin_addr.s_addr), prefs.cookie, 0) < 0) { + if (SWITCH_STATUS_SUCCESS!=initialise_ei(&ec)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n"); close_socket(&listen_list.sockfd); return SWITCH_STATUS_GENERR; @@ -1596,7 +1079,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) } } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connected and published erlang cnode at %s port %u\n", thisnodename, prefs.port); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connected and published erlang cnode\n"); listen_list.ready = 1; @@ -1621,59 +1104,42 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) break; } - if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); - goto fail; - } + listener = new_listener(&ec,clientfd); + if (listener) { + /* store the IP and node name we are talking with */ + inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip)); + listener->peer_nodename = switch_core_strdup(listener->pool,conn.nodename); - if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching listener, connection from node %s, ip %s\n", conn.nodename, listener->remote_ip); + launch_listener_thread(listener); + } + else + /* if we fail to create a listener (memory error), then the module will exit */ break; - } - - switch_thread_rwlock_create(&listener->rwlock, listener_pool); - switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener_pool); - switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, listener_pool); - - inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip)); - - listener->ec = &ec; - listener->sockfd = clientfd; - listener->pool = listener_pool; - listener_pool = NULL; - listener->level = SWITCH_LOG_DEBUG; - switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool); - switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool); - switch_core_hash_init(&listener->event_hash, listener->pool); - - launch_listener_thread(listener); - } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Exiting module mod_erlang_event\n"); + /* cleanup epmd registration */ ei_unpublish(&ec); close(epmdfd); close_socket(&listen_list.sockfd); - if (pool) { switch_core_destroy_memory_pool(&pool); } - if (listener_pool) { switch_core_destroy_memory_pool(&listener_pool); } - - for (x = 0; x < prefs.acl_count; x++) { switch_safe_free(prefs.acl[x]); } - fail: prefs.done = 2; return SWITCH_STATUS_TERM; } + SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown) { listener_t *l; diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h new file mode 100644 index 0000000000..cbf982bdfd --- /dev/null +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -0,0 +1,190 @@ +/* + * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * Copyright (C) 2005/2006, Anthony Minessale II + * + * Version: MPL 1.1 + * + * The contents of this file are subject to the Mozilla Public License Version + * 1.1 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + * for the specific language governing rights and limitations under the + * License. + * + * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * + * The Initial Developer of the Original Code is + * Anthony Minessale II + * Portions created by the Initial Developer are Copyright (C) + * the Initial Developer. All Rights Reserved. + * + * Contributor(s): + * + * Anthony Minessale II + * Andrew Thompson + * Rob Charlton + * + * + * mod_erlang_event.h -- Erlang Event Handler derived from mod_event_socket + * + */ + + +typedef enum { + LFLAG_OUTBOUND_INIT = (1 << 0), /* Erlang peer has been notified of this session */ + LFLAG_SESSION_ALIVE +} session_flag_t; + +struct session_elem { + switch_core_session_t *session; + switch_mutex_t *flag_mutex; + uint32_t flags; + /* registered process name that will receive call notifications from this session */ + char* reg_name; + switch_queue_t *event_queue; + struct session_elem *next; +}; + +typedef struct session_elem session_elem_t; + +typedef enum { + LFLAG_RUNNING = (1 << 0), + LFLAG_EVENTS = (1 << 1), + LFLAG_LOG = (1 << 2), + LFLAG_MYEVENTS = (1 << 3), + LFLAG_STATEFUL = (1 << 4) +} event_flag_t; + +/* There is one listener for each Erlang node we are attached to - either + inbound or outbound. For example, if the erlang node node1@server connects + to freeswitch then a listener is created and handles commands sent from + that node. If 5 calls are directed to the outbound erlang application + via the dialplan, and are also set to talk to node1@server, then those + 5 call sessions will be "attached" to the same listener. + */ +struct listener { + int sockfd; + struct ei_cnode_s *ec; + erlang_pid log_pid; + erlang_pid event_pid; + char *peer_nodename; + switch_queue_t *event_queue; + switch_queue_t *log_queue; + switch_memory_pool_t *pool; + switch_mutex_t *flag_mutex; + switch_mutex_t *sock_mutex; + char *ebuf; + uint32_t flags; + switch_log_level_t level; + uint8_t event_list[SWITCH_EVENT_ALL + 1]; + switch_hash_t *event_hash; + switch_hash_t *fetch_reply_hash; + switch_thread_rwlock_t *rwlock; + switch_mutex_t *session_mutex; + session_elem_t *session_list; + int lost_events; + int lost_logs; + time_t last_flush; + uint32_t timeout; + uint32_t id; + char remote_ip[50]; + /*switch_port_t remote_port;*/ + struct listener *next; +}; + +typedef struct listener listener_t; + +struct erlang_binding { + switch_xml_section_t section; + erlang_pid pid; + char *registered_process; /* TODO */ + listener_t *listener; + struct erlang_binding *next; +}; + +struct api_command_struct { + char *api_cmd; + char *arg; + listener_t *listener; + char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; + uint8_t bg; + erlang_pid pid; + switch_memory_pool_t *pool; +}; + +struct globals_struct { + switch_mutex_t *listener_mutex; + switch_event_node_t *node; +}; +typedef struct globals_struct globals_t; + +struct listen_list_struct { + int sockfd; + switch_mutex_t *sock_mutex; + listener_t *listeners; + uint8_t ready; +}; +typedef struct listen_list_struct listen_list_t; + +struct bindings_struct { + struct erlang_binding *head; + switch_xml_binding_t *search_binding; +}; +typedef struct bindings_struct bindings_t; + +#define MAX_ACL 100 +struct prefs_struct { + switch_mutex_t *mutex; + char *ip; + char *nodename; + switch_bool_t shortname; + uint16_t port; + char *cookie; + int done; + int threads; + char *acl[MAX_ACL]; + uint32_t acl_count; + uint32_t id; +}; +typedef struct prefs_struct prefs_t; + +/* shared globals */ +#ifdef DEFINE_GLOBALS +globals_t globals; +listen_list_t listen_list; +bindings_t bindings; +prefs_t prefs; +#else +extern globals_t globals; +extern listen_list_t listen_list; +extern bindings_t bindings; +extern prefs_t prefs; +#endif + +/* function prototypes */ +/* handle_msg.c */ +int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf); + +/* ei_helpers.c */ +void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to); +void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event); +void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag); +switch_status_t initialise_ei(struct ei_cnode_s *ec); +#define ei_encode_switch_event(_b, _e) ei_encode_switch_event_tag(_b, _e, "event") + +/* mod_erlang_event.c */ +session_elem_t* attach_call_to_listener(listener_t* listener, char* reg_name, switch_core_session_t *session); + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4: + */