From 5fe69d307b9578d4fc4a39f39b3e8672690f5a21 Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Sat, 24 Jan 2009 01:04:59 +0000 Subject: [PATCH] Initial support to spawn a process (module/function) outbound on a specified node. Also fix some bugs. git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@11477 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- .../mod_erlang_event/ei_helpers.c | 88 ++++++++-- .../mod_erlang_event/handle_msg.c | 83 ++++++++- .../mod_erlang_event/mod_erlang_event.c | 163 +++++++++++++----- .../mod_erlang_event/mod_erlang_event.h | 11 +- 4 files changed, 276 insertions(+), 69 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/ei_helpers.c b/src/mod/event_handlers/mod_erlang_event/ei_helpers.c index fc73f7ab13..9700753603 100644 --- a/src/mod/event_handlers/mod_erlang_event/ei_helpers.c +++ b/src/mod/event_handlers/mod_erlang_event/ei_helpers.c @@ -123,29 +123,29 @@ void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *ta /* function to spawn a process on a remote node */ -int ei_spawn(struct ei_cnode_s *ec, int sockfd, char *module, char *function, int argc, char **argv) +int ei_spawn(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function, int argc, char **argv) { ei_x_buff buf; ei_x_new_with_version(&buf); - erlang_ref ref; int i; ei_x_encode_tuple_header(&buf, 3); ei_x_encode_atom(&buf, "$gen_call"); ei_x_encode_tuple_header(&buf, 2); ei_x_encode_pid(&buf, ei_self(ec)); - /* TODO - use this reference to determine the response */ - ei_init_ref(ec, &ref); - ei_x_encode_ref(&buf, &ref); + ei_init_ref(ec, ref); + ei_x_encode_ref(&buf, ref); ei_x_encode_tuple_header(&buf, 5); ei_x_encode_atom(&buf, "spawn"); ei_x_encode_atom(&buf, module); ei_x_encode_atom(&buf, function); /* argument list */ - ei_x_encode_list_header(&buf, argc); - for(i = 0; i < argc && argv[i]; i++) { - ei_x_encode_atom(&buf, argv[i]); + if (argc < 0) { + ei_x_encode_list_header(&buf, argc); + for(i = 0; i < argc && argv[i]; i++) { + ei_x_encode_atom(&buf, argv[i]); + } } ei_x_encode_empty_list(&buf); @@ -156,12 +156,11 @@ int ei_spawn(struct ei_cnode_s *ec, int sockfd, char *module, char *function, in ei_x_encode_pid(&buf, ei_self(ec)); /* should really be a valid group leader */ - char *pbuf = 0; - i = 1; - ei_s_print_term(&pbuf, buf.buff, &i); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "spawn returning %s\n", pbuf); - +#ifdef EI_DEBUG + ei_x_print_reg_msg(&buf, "net_kernel", 1); +#endif return ei_reg_send(ec, sockfd, "net_kernel", buf.buff, buf.index); + } @@ -196,6 +195,69 @@ void ei_init_ref(ei_cnode *ec, erlang_ref *ref) } +void ei_x_print_reg_msg(ei_x_buff *buf, char *dest, int send) +{ + char *mbuf = NULL; + int i = 1; + + ei_s_print_term(&mbuf, buf->buff, &i); + + if (send) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending %s to %s\n", mbuf, dest); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Received %s from %s\n", mbuf, dest); + } + free(mbuf); +} + + +void ei_x_print_msg(ei_x_buff *buf, erlang_pid *pid, int send) +{ + char *pbuf = NULL; + int i = 0; + ei_x_buff pidbuf; + + ei_x_new(&pidbuf); + ei_x_encode_pid(&pidbuf, pid); + + ei_s_print_term(&pbuf, pidbuf.buff, &i); + + ei_x_print_reg_msg(buf, pbuf, send); + free(pbuf); +} + + +int ei_sendto(ei_cnode *ec, int fd, struct erlang_process *process, ei_x_buff *buf) +{ + int ret; + if (process->type == ERLANG_PID) { + ret = ei_send(fd, &process->pid, buf->buff, buf->index); +#ifdef EI_DEBUG + ei_x_print_msg(buf, &process->pid, 1); +#endif + } else if (process->type == ERLANG_REG_PROCESS) { + ret = ei_reg_send(ec, fd, process->reg_name, buf->buff, buf->index); +#ifdef EI_DEBUG + ei_x_print_reg_msg(buf, process->reg_name, 1); +#endif + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid process type!\n"); + /* wuh-oh */ + ret = -1; + } + + return ret; +} + + +/* convert an erlang reference to some kind of hashed string so we can store it as a hash key */ +void ei_hash_ref(erlang_ref *ref, char *output) +{ + /* very lazy */ + sprintf(output, "%d.%d.%d@%s", ref->n[0], ref->n[1], ref->n[2], ref->node); +} + + switch_status_t initialise_ei(struct ei_cnode_s *ec) { switch_status_t rv; diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index ff4689362f..41034b1a59 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -107,6 +107,9 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) switch_mutex_lock(acs->listener->sock_mutex); ei_send(acs->listener->sockfd, &acs->pid, ebuf.buff, ebuf.index); switch_mutex_unlock(acs->listener->sock_mutex); +#ifdef EI_DEBUG + ei_x_print_msg(&ebuf, &acs->pid, 1); +#endif ei_x_free(&ebuf); } @@ -132,6 +135,9 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) switch_mutex_lock(acs->listener->sock_mutex); ei_send(acs->listener->sockfd, &acs->pid, rbuf.buff, rbuf.index); switch_mutex_unlock(acs->listener->sock_mutex); +#ifdef EI_DEBUG + ei_x_print_msg(&rbuf, &acs->pid, 1); +#endif ei_x_free(&rbuf); } @@ -492,8 +498,6 @@ static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg *msg, ei binding->process.pid = msg->from; binding->listener = listener; - switch_core_hash_init(&listener->fetch_reply_hash, listener->pool); - switch_mutex_lock(globals.listener_mutex); for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next); @@ -532,7 +536,7 @@ static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei switch_core_session_t *session; if (!switch_strlen_zero(uuid_str) && (session = switch_core_session_locate(uuid_str))) { /* create a new session list element and attach it to this listener */ - if (attach_call_to_listener(listener,reg_name,session)) { + if (attach_call_to_registered_process(listener, reg_name, session)) { ei_x_encode_atom(rbuf, "ok"); } else { ei_x_encode_tuple_header(rbuf, 2); @@ -660,9 +664,48 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg *msg, e return ret; } + +static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) +{ + erlang_ref ref; + erlang_pid *pid2, *pid = switch_core_alloc(listener->pool, sizeof(erlang_pid)); + char hash[100]; + int arity; + + ei_decode_tuple_header(buf->buff, &buf->index, &arity); + + if (ei_decode_ref(buf->buff, &buf->index, &ref)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid reference\n"); + return SWITCH_STATUS_FALSE; + } + + if (ei_decode_pid(buf->buff, &buf->index, pid)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid pid in a reference/pid tuple\n"); + return SWITCH_STATUS_FALSE; + } + + ei_hash_ref(&ref, hash); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hashed ref to %s\n", hash); + + if ((pid2 = (erlang_pid *) switch_core_hash_find(listener->spawn_pid_hash, hash))) { + if (pid2 == NULL) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found unfilled slot for %s\n", hash); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", hash); + } + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "No slot for %s\n", hash); + switch_core_hash_insert(listener->spawn_pid_hash, hash, pid); + } + + return SWITCH_STATUS_SUCCESS; +} + + int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) { - int type, size, version; + int type, type2, size, version, arity, tmpindex; switch_status_t ret = SWITCH_STATUS_SUCCESS; buf->index = 0; @@ -672,7 +715,27 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff switch(type) { case ERL_SMALL_TUPLE_EXT : case ERL_LARGE_TUPLE_EXT : - ret = handle_msg_tuple(listener,msg,buf,rbuf); + tmpindex = buf->index; + ei_decode_tuple_header(buf->buff, &tmpindex, &arity); + ei_get_type(buf->buff, &tmpindex, &type2, &size); + + switch(type2) { + case ERL_ATOM_EXT: + ret = handle_msg_tuple(listener,msg,buf,rbuf); + break; + case ERL_REFERENCE_EXT : + case ERL_NEW_REFERENCE_EXT : + handle_ref_tuple(listener, msg, buf, rbuf); + return 0; + default : + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "WEEEEEEEE %d\n", type); + /* some other kind of erlang term */ + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "undef"); + break; + } + break; case ERL_ATOM_EXT : @@ -687,17 +750,23 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff break; } - if (SWITCH_STATUS_FALSE==ret) + if (SWITCH_STATUS_FALSE==ret) { return 0; - else { + } else if (rbuf->index > 1) { switch_mutex_lock(listener->sock_mutex); ei_send(listener->sockfd, &msg->from, rbuf->buff, rbuf->index); switch_mutex_unlock(listener->sock_mutex); +#ifdef EI_DEBUG + ei_x_print_msg(rbuf, &msg->from, 1); +#endif if (SWITCH_STATUS_SUCCESS==ret) return 0; else /* SWITCH_STATUS_TERM */ return 1; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Empty reply, supressing\n"); + return 0; } } diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 5b193924e1..000f59966d 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -346,7 +346,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c /*switch_core_hash_insert(ptr->reply_hash, uuid_str, );*/ switch_mutex_lock(ptr->listener->sock_mutex); - ei_send(ptr->listener->sockfd, &ptr->process.pid, buf.buff, buf.index); + ei_sendto(ptr->listener->ec, ptr->listener->sockfd, &ptr->process, &buf); switch_mutex_unlock(ptr->listener->sock_mutex); int i = 0; @@ -418,16 +418,13 @@ static switch_status_t notify_new_session(listener_t *listener, switch_core_sess ei_encode_switch_event(&lbuf, call_event); switch_mutex_lock(listener->sock_mutex); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Sending initial call event\n"); - if (process.type == ERLANG_PID) { - result = ei_send(listener->sockfd, &process.pid, lbuf.buff, lbuf.index); - } else { - result = ei_reg_send(listener->ec, listener->sockfd, process.reg_name, lbuf.buff, lbuf.index); - } + result = ei_sendto(listener->ec, listener->sockfd, &process, &lbuf); + + switch_mutex_unlock(listener->sock_mutex); if (result) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to send call event\n"); } - switch_mutex_unlock(listener->sock_mutex); ei_x_free(&lbuf); return SWITCH_STATUS_SUCCESS; @@ -467,11 +464,7 @@ static switch_status_t check_attached_sessions(listener_t *listener) ei_encode_switch_event(&ebuf, pevent); switch_mutex_lock(listener->sock_mutex); - if (sp->process.type == ERLANG_PID) { - ei_send(listener->sockfd, &sp->process.pid, ebuf.buff, ebuf.index); - } else { - ei_reg_send(listener->ec, listener->sockfd, sp->process.reg_name, ebuf.buff, ebuf.index); - } + ei_sendto(listener->ec, listener->sockfd, &sp->process, &ebuf); switch_mutex_unlock(listener->sock_mutex); /* event is a hangup, so this session can be removed */ @@ -484,8 +477,11 @@ static switch_status_t check_attached_sessions(listener_t *listener) else listener->session_list = sp->next; + + switch_channel_clear_flag(switch_core_session_get_channel(sp->session), CF_CONTROLLED); /* this allows the application threads to exit */ - switch_clear_flag_locked(sp, LFLAG_SESSION_ALIVE); + switch_clear_flag_locked(sp, LFLAG_SESSION_ALIVE); + switch_core_session_rwunlock(sp->session); /* TODO if this listener was created outbound, and the last session has been detached @@ -546,11 +542,7 @@ static void check_log_queue(listener_t *listener) ei_x_encode_empty_list(&lbuf); switch_mutex_lock(listener->sock_mutex); - if (listener->log_process.type == ERLANG_PID) { - ei_send(listener->sockfd, &listener->log_process.pid, lbuf.buff, lbuf.index); - } else { - ei_reg_send(listener->ec, listener->sockfd, listener->log_process.reg_name, lbuf.buff, lbuf.index); - } + ei_sendto(listener->ec, listener->sockfd, &listener->log_process, &lbuf); switch_mutex_unlock(listener->sock_mutex); ei_x_free(&lbuf); @@ -576,11 +568,7 @@ static void check_event_queue(listener_t *listener) ei_encode_switch_event(&ebuf, pevent); switch_mutex_lock(listener->sock_mutex); - if (listener->log_process.type == ERLANG_PID) { - ei_send(listener->sockfd, &listener->log_process.pid, ebuf.buff, ebuf.index); - } else { - ei_reg_send(listener->ec, listener->sockfd, listener->log_process.reg_name, ebuf.buff, ebuf.index); - } + ei_sendto(listener->ec, listener->sockfd, &listener->log_process, &ebuf); switch_mutex_unlock(listener->sock_mutex); ei_x_free(&ebuf); @@ -592,8 +580,6 @@ static void check_event_queue(listener_t *listener) static void listener_main_loop(listener_t *listener) { int status = 1; - /*int i = 1;*/ - /*char *pbuf = 0;*/ while ((status >= 0 || erl_errno == ETIMEDOUT || erl_errno == EAGAIN) && !prefs.done) { erlang_msg msg; @@ -614,20 +600,27 @@ static void listener_main_loop(listener_t *listener) case ERL_MSG : switch(msg.msgtype) { case ERL_SEND : - /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_send\n");*/ - /*i = 1;*/ +#ifdef EI_DEBUG + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_send\n"); + + ei_x_print_msg(&buf, &msg.from, 0); /*ei_s_print_term(&pbuf, buf.buff, &i);*/ /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_send was message %s\n", pbuf);*/ +#endif if (handle_msg(listener, &msg, &buf, &rbuf)) { return; } break; case ERL_REG_SEND : - /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_reg_send to %s\n", msg.toname);*/ +#ifdef EI_DEBUG + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_reg_send to %s\n", msg.toname); + + ei_x_print_reg_msg(&buf, msg.toname, 0); /*i = 1;*/ /*ei_s_print_term(&pbuf, buf.buff, &i);*/ /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_reg_send was message %s\n", pbuf);*/ +#endif if (handle_msg(listener, &msg, &buf, &rbuf)) { return; @@ -695,6 +688,9 @@ static switch_bool_t check_inbound_acl(listener_t* listener) ei_x_encode_atom(&rbuf, "acldeny"); ei_send(listener->sockfd, &msg.from, rbuf.buff, rbuf.index); +#ifdef EI_DEBUG + ei_x_print_msg(&rbuf, &msg.from, 1); +#endif ei_x_free(&rbuf); } @@ -718,6 +714,9 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) prefs.threads++; switch_mutex_unlock(globals.listener_mutex); + switch_core_hash_init(&listener->fetch_reply_hash, listener->pool); + switch_core_hash_init(&listener->spawn_pid_hash, listener->pool); + switch_assert(listener != NULL); if (check_inbound_acl(listener)) { @@ -896,7 +895,7 @@ static listener_t* new_outbound_listener(char* node) return listener; } -session_elem_t* attach_call_to_listener(listener_t* listener, char* reg_name, switch_core_session_t *session) +session_elem_t* attach_call_to_registered_process(listener_t* listener, char* reg_name, switch_core_session_t *session) { /* create a session list element */ session_elem_t* session_element=NULL; @@ -922,32 +921,92 @@ session_elem_t* attach_call_to_listener(listener_t* listener, char* reg_name, sw return session_element; } +session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *module, char *function, switch_core_session_t *session) +{ + /* create a session list element */ + session_elem_t* session_element=NULL; + if (!(session_element = switch_core_alloc(switch_core_session_get_pool(session), sizeof(*session_element)))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n"); + } + else { + if (SWITCH_STATUS_SUCCESS != switch_core_session_read_lock(session)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n"); + } + else { + char *argv[1], hash[100]; + int i = 0; + session_element->session = session; + erlang_pid *pid; + erlang_ref ref; + + ei_spawn(listener->ec, listener->sockfd, &ref, module, function, 0, argv); + ei_hash_ref(&ref, hash); + + while (!(pid = (erlang_pid *) switch_core_hash_find(listener->spawn_pid_hash, hash))) { + if (i > 50) { /* half a second timeout */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "timed out!\n"); + return NULL; + } + i++; + switch_yield(10000); /* 10ms */ + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got pid!\n"); + + session_element->process.type = ERLANG_PID; + memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); + switch_set_flag(session_element, LFLAG_SESSION_ALIVE); + switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); + switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); + switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); + /* attach the session to the listener */ + add_session_elem_to_listener(listener,session_element); + } + } + return session_element; +} + + /* Module Hooks */ /* Entry point for outbound mode */ SWITCH_STANDARD_APP(erlang_outbound_function) { - char *reg_name, *node; + char *reg_name = NULL, *node, *module = NULL, *function = NULL; listener_t *listener; - int argc = 0; - char *argv[80] = { 0 }; - char *mydata; + int argc = 0, argc2=0; + char *argv[80] = { 0 }, *argv2[80] = { 0 }; + char *mydata, *myarg; switch_bool_t new_session = SWITCH_FALSE; session_elem_t* session_element=NULL; /* process app arguments */ if (data && (mydata = switch_core_session_strdup(session, data))) { argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0]))); - } + } /* XXX else? */ if (argc < 2) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Parse Error - need registered name and node!\n"); return; } - reg_name = argv[0]; - if (switch_strlen_zero(reg_name)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Missing registered name!\n"); + if (switch_strlen_zero(argv[0])) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Missing registered name or module:function!\n"); return; } + + if ((myarg = switch_core_session_strdup(session, argv[0]))) { + argc2 = switch_separate_string(myarg, ':', argv2, (sizeof(argv2) / sizeof(argv2[0]))); + } + + if (argc2 == 2) { + /* mod:fun style */ + module = argv2[0]; + function = argv2[1]; + } else { + /* registered name style */ + reg_name = argv[0]; + } + + node = argv[1]; if (switch_strlen_zero(node)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Missing node name!\n"); @@ -967,17 +1026,27 @@ SWITCH_STANDARD_APP(erlang_outbound_function) else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Using existing listener for session\n"); } - if (listener && - (session_element=attach_call_to_listener(listener,reg_name,session)) != NULL) { - - if (new_session) - launch_listener_thread(listener); - switch_ivr_park(session, NULL); - /* keep app thread running for lifetime of session */ - if (switch_channel_get_state(switch_core_session_get_channel(session)) >= CS_HANGUP) { - while (switch_test_flag(session_element, LFLAG_SESSION_ALIVE)) { - switch_yield(100000); + if (listener) { + if (module && function) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n"); + session_element=attach_call_to_spawned_process(listener, module, function, session); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Creating new registered session for listener\n"); + session_element=attach_call_to_registered_process(listener, reg_name, session); + } + + if (session_element) { + + if (new_session) + launch_listener_thread(listener); + switch_ivr_park(session, NULL); + + /* keep app thread running for lifetime of session */ + if (switch_channel_get_state(switch_core_session_get_channel(session)) >= CS_HANGUP) { + while (switch_test_flag(session_element, LFLAG_SESSION_ALIVE)) { + switch_yield(100000); + } } } } diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 421b7abd36..0549efb385 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -32,6 +32,7 @@ * */ +#define EI_DEBUG typedef enum { LFLAG_OUTBOUND_INIT = (1 << 0), /* Erlang peer has been notified of this session */ @@ -93,6 +94,7 @@ struct listener { uint8_t event_list[SWITCH_EVENT_ALL + 1]; switch_hash_t *event_hash; switch_hash_t *fetch_reply_hash; + switch_hash_t *spawn_pid_hash; switch_thread_rwlock_t *rwlock; switch_mutex_t *session_mutex; session_elem_t *session_list; @@ -186,13 +188,18 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to); void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event); void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag); -int ei_spawn(struct ei_cnode_s *ec, int sockfd, char *module, char *function, int argc, char **argv); +int ei_spawn(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function, int argc, char **argv); void ei_init_ref(struct ei_cnode_s *ec, erlang_ref *ref); +void ei_x_print_reg_msg(ei_x_buff *buf, char *dest, int send); +void ei_x_print_msg(ei_x_buff *buf, erlang_pid *pid, int send); +int ei_sendto(ei_cnode *ec, int fd, struct erlang_process *process, ei_x_buff *buf); +void ei_hash_ref(erlang_ref *ref, char *output); switch_status_t initialise_ei(struct ei_cnode_s *ec); #define ei_encode_switch_event(_b, _e) ei_encode_switch_event_tag(_b, _e, "event") /* mod_erlang_event.c */ -session_elem_t* attach_call_to_listener(listener_t* listener, char* reg_name, switch_core_session_t *session); +session_elem_t* attach_call_to_registered_process(listener_t* listener, char* reg_name, switch_core_session_t *session); +session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *module, char *function, switch_core_session_t *session); /* For Emacs: * Local Variables: