From 6ae73314a7b017873abadaa1fff684a8f37a81b1 Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Thu, 5 Mar 2009 21:54:16 +0000 Subject: [PATCH] Rework how spawned outbound pids communicate their pid back to the outbound call process, try to improve some memory management git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@12475 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- .../mod_erlang_event/ei_helpers.c | 1 - .../mod_erlang_event/handle_msg.c | 45 ++++++++++++++----- .../mod_erlang_event/mod_erlang_event.c | 28 +++++++++--- .../mod_erlang_event/mod_erlang_event.h | 2 + 4 files changed, 58 insertions(+), 18 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/ei_helpers.c b/src/mod/event_handlers/mod_erlang_event/ei_helpers.c index 160a15c204..60a063fe43 100644 --- a/src/mod/event_handlers/mod_erlang_event/ei_helpers.c +++ b/src/mod/event_handlers/mod_erlang_event/ei_helpers.c @@ -131,7 +131,6 @@ int ei_pid_from_rpc(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *mo ei_x_buff buf; ei_x_new(&buf); ei_x_encode_list_header(&buf, 1); - ei_init_ref(ec, ref); ei_x_encode_ref(&buf, ref); ei_x_encode_empty_list(&buf); diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 9f3af150cc..c77470b07d 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -700,7 +700,8 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg *msg, e static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) { erlang_ref ref; - erlang_pid *pid2, *pid = switch_core_alloc(listener->pool, sizeof(erlang_pid)); + erlang_pid *pid;/* = switch_core_alloc(listener->pool, sizeof(erlang_pid));*/ + void *p; char hash[100]; int arity; @@ -711,6 +712,14 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, e return SWITCH_STATUS_FALSE; } + if (!(pid = malloc(sizeof(erlang_pid)))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n"); + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badmem"); + return SWITCH_STATUS_SUCCESS; + } + if (ei_decode_pid(buf->buff, &buf->index, pid)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid pid in a reference/pid tuple\n"); return SWITCH_STATUS_FALSE; @@ -720,19 +729,36 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, e switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hashed ref to %s\n", hash); - if ((pid2 = (erlang_pid *) switch_core_hash_find(listener->spawn_pid_hash, hash))) { - if (pid2 == NULL) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found unfilled slot for %s\n", hash); + if ((p = switch_core_hash_find(listener->spawn_pid_hash, hash))) { + if (p == &globals.TIMEOUT) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Handler for %s timed out\n", hash); + switch_core_hash_delete(listener->spawn_pid_hash, hash); + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "timeout"); + } else if (p == &globals.WAITING) { + /* update the key to point at a pid */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", hash); + switch_core_hash_delete(listener->spawn_pid_hash, hash); + switch_core_hash_insert(listener->spawn_pid_hash, hash, pid); + return SWITCH_STATUS_FALSE; /*no reply */ } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", hash); + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "duplicate_response"); } } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "No slot for %s\n", hash); - switch_core_hash_insert(listener->spawn_pid_hash, hash, pid); + /* nothin in the hash */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Empty slot for %s\n", hash); + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "invalid_ref"); } - /* no reply */ - return SWITCH_STATUS_FALSE; + free(pid); /* don't need it */ + + return SWITCH_STATUS_SUCCESS; } @@ -758,8 +784,7 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff break; case ERL_REFERENCE_EXT : case ERL_NEW_REFERENCE_EXT : - handle_ref_tuple(listener, msg, buf, rbuf); - return 0; + ret = handle_ref_tuple(listener, msg, buf, rbuf); default : switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "WEEEEEEEE %d\n", type); /* some other kind of erlang term */ diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 5df830820c..350144efd5 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -413,7 +413,12 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c if (type != ERL_STRING_EXT && type != ERL_BINARY_EXT) /* XXX no unicode or character codes > 255 */ return NULL; - char *xmlstr = switch_core_alloc(ptr->listener->pool, size + 1); + char *xmlstr; + + if (!(xmlstr = malloc(size + 1))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error\n"); + return NULL; + } ei_decode_string_or_binary(rep->buff, &rep->index, size, xmlstr); @@ -431,7 +436,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c /*switch_safe_free(rep->buff);*/ /*switch_safe_free(rep);*/ - /*switch_safe_free(xmlstr);*/ + free(xmlstr); return xml; } @@ -811,7 +816,6 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) switch_mutex_unlock(globals.listener_mutex); switch_core_hash_init(&listener->fetch_reply_hash, listener->pool); - switch_core_hash_init(&listener->spawn_pid_hash, listener->pool); switch_assert(listener != NULL); @@ -1071,6 +1075,7 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul else { char hash[100]; int i = 0; + void *p = NULL; session_element->session = session; erlang_pid *pid; erlang_ref ref; @@ -1081,12 +1086,16 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul /* attach the session to the listener */ add_session_elem_to_listener(listener,session_element); + ei_init_ref(listener->ec, &ref); + ei_hash_ref(&ref, hash); + /* insert the waiting marker */ + switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.WAITING); + if (!strcmp(function, "!")) { /* send a message to request a pid */ ei_x_buff rbuf; ei_x_new_with_version(&rbuf); - ei_init_ref(listener->ec, &ref); ei_x_encode_tuple_header(&rbuf, 3); ei_x_encode_atom(&rbuf, "new_pid"); ei_x_encode_ref(&rbuf, &ref); @@ -1107,23 +1116,27 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul */ } - ei_hash_ref(&ref, hash); - - while (!(pid = (erlang_pid *) switch_core_hash_find(listener->spawn_pid_hash, hash))) { + /* loop until either we timeout or we get a value that's not the waiting marker */ + while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) { if (i > 50) { /* half a second timeout */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n"); switch_core_session_rwunlock(session); remove_session_elem_from_listener(listener,session_element); + switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */ return NULL; } i++; switch_yield(10000); /* 10ms */ } + switch_core_hash_delete(listener->spawn_pid_hash, hash); + + pid = (erlang_pid *) p; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got pid!\n"); session_element->process.type = ERLANG_PID; memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); + free(pid); /* malloced in handle_ref_tuple */ switch_set_flag(session_element, LFLAG_SESSION_ALIVE); switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID); @@ -1222,6 +1235,7 @@ SWITCH_STANDARD_APP(erlang_outbound_function) } if (module && function) { + switch_core_hash_init(&listener->spawn_pid_hash, listener->pool); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n"); session_element=attach_call_to_spawned_process(listener, module, function, session); } else { diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index bc4e4246f3..70f9d90015 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -136,6 +136,8 @@ struct globals_struct { unsigned int reference0; unsigned int reference1; unsigned int reference2; + char TIMEOUT; /* marker for a timed out request */ + char WAITING; /* marker for a request waiting for a response */ switch_mutex_t *ref_mutex; }; typedef struct globals_struct globals_t;