From a6717df8d09c5a3735feae6fd4ba1a8f8407022e Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Wed, 28 Jan 2009 19:26:37 +0000 Subject: [PATCH] Patch from Rob Charlton to use rpc:call instead of spawn and to make the registered process argument to handlecall optional git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@11542 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- .../mod_erlang_event/ei_helpers.c | 18 +++++++ .../mod_erlang_event/handle_msg.c | 47 ++++++++++++++--- .../mod_erlang_event/mod_erlang_event.c | 50 ++++++++++++++++--- .../mod_erlang_event/mod_erlang_event.h | 2 + 4 files changed, 103 insertions(+), 14 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/ei_helpers.c b/src/mod/event_handlers/mod_erlang_event/ei_helpers.c index f8b5f6cc3e..d206988de4 100644 --- a/src/mod/event_handlers/mod_erlang_event/ei_helpers.c +++ b/src/mod/event_handlers/mod_erlang_event/ei_helpers.c @@ -122,6 +122,24 @@ void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *ta ei_encode_switch_event_headers(ebuf, event); } +/* function to make rpc call to remote node to retrieve a pid - + calls module:function(Ref). The response comes back as + {rex, {Ref, Pid}} + */ +int ei_pid_from_rpc(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function) +{ + ei_x_buff buf; + ei_x_new(&buf); + ei_x_encode_list_header(&buf, 1); + ei_init_ref(ec, ref); + ei_x_encode_ref(&buf, ref); + ei_x_encode_empty_list(&buf); + + ei_rpc_to(ec, sockfd, module, function, buf.buff, buf.index); + ei_x_free(&buf); + + return 0; +} /* function to spawn a process on a remote node */ int ei_spawn(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function, int argc, char **argv) diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 99bde11fbe..455e2be9e3 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -37,6 +37,8 @@ static char *MARKER = "1"; +static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf); + static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) { switch_bool_t r = SWITCH_TRUE; @@ -519,15 +521,18 @@ static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg *msg, ei return SWITCH_STATUS_SUCCESS; } -/* {handlecall,,} */ -static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei_x_buff *buf, ei_x_buff *rbuf) +/* {handlecall,,} + or + {handlecall,} to send messages back to the sender + */ +static switch_status_t handle_msg_handlecall(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff *buf, ei_x_buff *rbuf) { char reg_name[MAXATOMLEN]; char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; - if (arity != 3 || - ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid_str) || - ei_decode_atom(buf->buff, &buf->index, reg_name)) { + if (arity < 2 || arity > 3 || + (arity==3 && ei_decode_atom(buf->buff, &buf->index, reg_name)) || + ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid_str)) { ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_atom(rbuf, "error"); ei_x_encode_atom(rbuf, "badarg"); @@ -535,7 +540,8 @@ static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei switch_core_session_t *session; if (!switch_strlen_zero(uuid_str) && (session = switch_core_session_locate(uuid_str))) { /* create a new session list element and attach it to this listener */ - if (attach_call_to_registered_process(listener, reg_name, session)) { + if ((arity==2 && attach_call_to_pid(listener, &msg->from, session)) || + (arity==3 && attach_call_to_registered_process(listener, reg_name, session))) { ei_x_encode_atom(rbuf, "ok"); } else { ei_x_encode_tuple_header(rbuf, 2); @@ -551,6 +557,28 @@ static switch_status_t handle_msg_handlecall(listener_t *listener, int arity, ei return SWITCH_STATUS_SUCCESS; } +/* catch the response to ei_rpc_to (which comes back as {rex, {Ref, Pid}} + The {Ref,Pid} bit can be handled by handle_ref_tuple + */ +static switch_status_t handle_msg_rpcresponse(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff *buf, ei_x_buff *rbuf) +{ + int type, size, arity2, tmpindex; + + ei_get_type(buf->buff, &buf->index, &type, &size); + switch(type) { + case ERL_SMALL_TUPLE_EXT : + case ERL_LARGE_TUPLE_EXT : + tmpindex = buf->index; + ei_decode_tuple_header(buf->buff, &tmpindex, &arity2); + return handle_ref_tuple(listener,msg,buf,rbuf); + default: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unknown rpc response\n"); + break; + } + /* no reply */ + return SWITCH_STATUS_FALSE; +} + static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) { char tupletag[MAXATOMLEN]; @@ -583,7 +611,9 @@ static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg *msg, e } else if (!strncmp(tupletag, "bind", MAXATOMLEN)) { ret = handle_msg_bind(listener,msg,buf,rbuf); } else if (!strncmp(tupletag, "handlecall", MAXATOMLEN)) { - ret = handle_msg_handlecall(listener,arity,buf,rbuf); + ret = handle_msg_handlecall(listener,msg,arity,buf,rbuf); + } else if (!strncmp(tupletag, "rex", MAXATOMLEN)) { + ret = handle_msg_rpcresponse(listener,msg,arity,buf,rbuf); } else { ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_atom(rbuf, "error"); @@ -701,7 +731,8 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, e switch_core_hash_insert(listener->spawn_pid_hash, hash, pid); } - return SWITCH_STATUS_SUCCESS; + /* no reply */ + return SWITCH_STATUS_FALSE; } diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index a5223d4b88..8f78dfb201 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -187,8 +187,8 @@ static void event_handler(switch_event_t *event) l = lp; lp = lp->next; - /* test all of the sessions attached to this event in case - one of them should receive it as well + /* test all of the sessions attached to this listener in case + one of them should receive the event as well */ send_event_to_attached_sessions(l,event); @@ -684,9 +684,10 @@ static void listener_main_loop(listener_t *listener) ei_x_buff rbuf; ei_x_new_with_version(&rbuf); - switch_mutex_lock(listener->sock_mutex); + /* do we need the mutex when reading? */ + /*switch_mutex_lock(listener->sock_mutex);*/ status = ei_xreceive_msg_tmo(listener->sockfd, &msg, &buf, 100); - switch_mutex_unlock(listener->sock_mutex); + /*switch_mutex_unlock(listener->sock_mutex);*/ switch(status) { case ERL_TICK : @@ -780,7 +781,9 @@ static switch_bool_t check_inbound_acl(listener_t* listener) ei_x_encode_atom(&rbuf, "error"); ei_x_encode_atom(&rbuf, "acldeny"); + switch_mutex_lock(listener->sock_mutex); ei_send(listener->sockfd, &msg.from, rbuf.buff, rbuf.index); + switch_mutex_unlock(listener->sock_mutex); #ifdef EI_DEBUG ei_x_print_msg(&rbuf, &msg.from, 1); #endif @@ -1026,6 +1029,33 @@ session_elem_t* attach_call_to_registered_process(listener_t* listener, char* re return session_element; } +session_elem_t* attach_call_to_pid(listener_t* listener, erlang_pid* pid, switch_core_session_t *session) +{ + /* create a session list element */ + session_elem_t* session_element = NULL; + if (!(session_element = switch_core_alloc(switch_core_session_get_pool(session), sizeof(*session_element)))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n"); + } + else { + if (SWITCH_STATUS_SUCCESS != switch_core_session_read_lock(session)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n"); + } + else { + session_element->session = session; + session_element->process.type = ERLANG_PID; + memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); + switch_set_flag(session_element, LFLAG_SESSION_ALIVE); + switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); + switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); + switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); + /* attach the session to the listener */ + add_session_elem_to_listener(listener,session_element); + + ei_link(listener, ei_self(listener->ec), pid); + } + } + return session_element; +} session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *module, char *function, switch_core_session_t *session) { @@ -1039,7 +1069,7 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to get session read lock\n"); } else { - char *argv[1], hash[100]; + char hash[100]; int i = 0; session_element->session = session; erlang_pid *pid; @@ -1061,13 +1091,20 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul ei_x_encode_atom(&rbuf, "new_pid"); ei_x_encode_ref(&rbuf, &ref); ei_x_encode_pid(&rbuf, ei_self(listener->ec)); + /* should lock with mutex? */ ei_reg_send(listener->ec, listener->sockfd, module, rbuf.buff, rbuf.index); #ifdef EI_DEBUG ei_x_print_reg_msg(&rbuf, module, 1); #endif ei_x_free(&rbuf); } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "rpc call: %s:%s(Ref)\n", module, function); + /* should lock with mutex? */ + ei_pid_from_rpc(listener->ec, listener->sockfd, &ref, module, function); + /* + char *argv[1]; ei_spawn(listener->ec, listener->sockfd, &ref, module, function, 0, argv); + */ } ei_hash_ref(&ref, hash); @@ -1091,7 +1128,8 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID); - ei_link(listener, ei_self(listener->ec), pid); + /* this hangs because it can never get hold of the socket mutex */ + ei_link(listener, ei_self(listener->ec), pid); } } return session_element; diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 15ff9da3d3..b90eff8eff 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -192,6 +192,7 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to); void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event); void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag); +int ei_pid_from_rpc(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function); int ei_spawn(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *module, char *function, int argc, char **argv); void ei_init_ref(struct ei_cnode_s *ec, erlang_ref *ref); void ei_x_print_reg_msg(ei_x_buff *buf, char *dest, int send); @@ -215,6 +216,7 @@ switch_status_t initialise_ei(struct ei_cnode_s *ec); /* mod_erlang_event.c */ session_elem_t* attach_call_to_registered_process(listener_t* listener, char* reg_name, switch_core_session_t *session); +session_elem_t* attach_call_to_pid(listener_t* listener, erlang_pid* pid, switch_core_session_t *session); session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *module, char *function, switch_core_session_t *session); /* For Emacs: