diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index d09920699e..c2026ed67a 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -166,7 +166,7 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff * buf, ei_x_buff * rbuf) { char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; - void *p; + fetch_reply_t *p; if (ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid_str)) { ei_x_encode_tuple_header(rbuf, 2); @@ -179,20 +179,40 @@ static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff * nbuf->index = buf->index; nbuf->buffsz = buf->buffsz; - if ((p = switch_core_hash_find(listener->fetch_reply_hash, uuid_str))) { - if (p == &globals.TIMEOUT) { + switch_mutex_lock(globals.fetch_reply_mutex); + if ((p = switch_core_hash_find(globals.fetch_reply_hash, uuid_str))) { + /* Get the status and release the lock ASAP. */ + enum { is_timeout, is_waiting, is_filled } status; + if (p->state == reply_not_ready) { + switch_thread_cond_wait(p->ready_or_found, globals.fetch_reply_mutex); + } + + if (p->state == reply_waiting) { + /* update the key with a reply */ + status = is_waiting; + p->reply = nbuf; + p->state = reply_found; + strncpy(p->winner, listener->peer_nodename, MAXNODELEN); + switch_thread_cond_broadcast(p->ready_or_found); + } else if (p->state == reply_timeout) { + status = is_timeout; + } else { + status = is_filled; + } + + put_reply_unlock(p, uuid_str); + + /* Relay the status back to the fetch responder. */ + if (status == is_waiting) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", uuid_str); + ei_x_encode_atom(rbuf, "ok"); + /* Return here to avoid freeing the reply. */ + return SWITCH_STATUS_SUCCESS; + } else if (status == is_timeout) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Handler for %s timed out\n", uuid_str); - switch_core_hash_delete(listener->fetch_reply_hash, uuid_str); ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_atom(rbuf, "error"); ei_x_encode_atom(rbuf, "timeout"); - } else if (p == &globals.WAITING) { - /* update the key to point at a pid */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", uuid_str); - switch_core_hash_delete(listener->fetch_reply_hash, uuid_str); - switch_core_hash_insert(listener->fetch_reply_hash, uuid_str, nbuf); - ei_x_encode_atom(rbuf, "ok"); - return SWITCH_STATUS_SUCCESS; } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", uuid_str); ei_x_encode_tuple_header(rbuf, 2); @@ -200,14 +220,14 @@ static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff * ei_x_encode_atom(rbuf, "duplicate_response"); } } else { - /* nothin in the hash */ + /* nothing in the hash */ + switch_mutex_unlock(globals.fetch_reply_mutex); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Empty slot for %s\n", uuid_str); ei_x_encode_tuple_header(rbuf, 2); ei_x_encode_atom(rbuf, "error"); ei_x_encode_atom(rbuf, "invalid_uuid"); } - /*switch_core_hash_insert(listener->fetch_reply_hash, uuid_str, nbuf); */ switch_safe_free(nbuf->buff); switch_safe_free(nbuf); } diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index e0235db679..442ce1c274 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -41,6 +41,8 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown); SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime); SWITCH_MODULE_DEFINITION(mod_erlang_event, mod_erlang_event_load, mod_erlang_event_shutdown, mod_erlang_event_runtime); +static switch_memory_pool_t *module_pool = NULL; + static void remove_listener(listener_t *listener); static switch_status_t state_handler(switch_core_session_t *session); @@ -377,7 +379,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; int type, size; int i = 0; - void *p = NULL; + fetch_reply_t *p = NULL; char *xmlstr; struct erlang_binding *ptr; switch_uuid_t uuid; @@ -391,74 +393,89 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c section = switch_xml_parse_section_string((char *) sectionstr); - for (ptr = bindings.head; ptr && ptr->section != section; ptr = ptr->next); /* just get the first match */ - - if (!ptr) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "no binding for %s\n", sectionstr); - return NULL; - } - - if (!ptr->listener) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "NULL pointer binding!\n"); - return NULL; /* our pointer is trash */ - } - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, - sectionstr, key_name, key_value, ptr->process.pid.node); - switch_uuid_get(&uuid); switch_uuid_format(uuid_str, &uuid); - /*switch_event_add_header_string(params, SWITCH_STACK_BOTTOM, "Request-ID", uuid_str); */ + for (ptr = bindings.head; ptr; ptr = ptr->next) { + if (ptr->section != section) + continue; - ei_x_encode_tuple_header(&buf, 7); - ei_x_encode_atom(&buf, "fetch"); - ei_x_encode_atom(&buf, sectionstr); - _ei_x_encode_string(&buf, tag_name ? tag_name : "undefined"); - _ei_x_encode_string(&buf, key_name ? key_name : "undefined"); - _ei_x_encode_string(&buf, key_value ? key_value : "undefined"); - _ei_x_encode_string(&buf, uuid_str); - if (params) { - ei_encode_switch_event_headers(&buf, params); - } else { - ei_x_encode_empty_list(&buf); - } - - switch_core_hash_insert(ptr->listener->fetch_reply_hash, uuid_str, &globals.WAITING); - - switch_mutex_lock(ptr->listener->sock_mutex); - ei_sendto(ptr->listener->ec, ptr->listener->sockfd, &ptr->process, &buf); - switch_mutex_unlock(ptr->listener->sock_mutex); - - while (!(p = switch_core_hash_find(ptr->listener->fetch_reply_hash, uuid_str)) || p == &globals.WAITING) { - if (i > 50) { /* half a second timeout */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for XML fetch response\n"); - switch_core_hash_insert(ptr->listener->fetch_reply_hash, uuid_str, &globals.TIMEOUT); /* TODO lock this? */ - return NULL; + if (!ptr->listener) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "NULL pointer binding!\n"); + goto cleanup; /* our pointer is trash */ } - i++; - switch_yield(10000); /* 10ms */ + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->process.pid.node); + + ei_x_encode_tuple_header(&buf, 7); + ei_x_encode_atom(&buf, "fetch"); + ei_x_encode_atom(&buf, sectionstr); + _ei_x_encode_string(&buf, tag_name ? tag_name : "undefined"); + _ei_x_encode_string(&buf, key_name ? key_name : "undefined"); + _ei_x_encode_string(&buf, key_value ? key_value : "undefined"); + _ei_x_encode_string(&buf, uuid_str); + if (params) { + ei_encode_switch_event_headers(&buf, params); + } else { + ei_x_encode_empty_list(&buf); + } + + if (!p) { + /* Create a new fetch object. */ + p = malloc(sizeof(*p)); + switch_thread_cond_create(&p->ready_or_found, module_pool); + p->usecount = 1; + p->state = reply_not_ready; + p->reply = NULL; + switch_core_hash_insert_locked(globals.fetch_reply_hash, uuid_str, p, globals.fetch_reply_mutex); + } + /* We don't need to lock here because everybody is waiting + on our condition before the action starts. */ + p->usecount ++; + + switch_mutex_lock(ptr->listener->sock_mutex); + ei_sendto(ptr->listener->ec, ptr->listener->sockfd, &ptr->process, &buf); + switch_mutex_unlock(ptr->listener->sock_mutex); } - rep = (ei_x_buff *) p; + if (!p) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "no binding for %s\n", sectionstr); + goto cleanup; + } + + /* Tell the threads to be ready, and wait five seconds for a reply. */ + switch_mutex_lock(globals.fetch_reply_mutex); + p->state = reply_waiting; + switch_thread_cond_broadcast(p->ready_or_found); + switch_thread_cond_timedwait(p->ready_or_found, + globals.fetch_reply_mutex, 5000000); + if (!p->reply) { + p->state = reply_timeout; + switch_mutex_unlock(globals.fetch_reply_mutex); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for XML fetch response\n"); + goto cleanup; + } + + rep = p->reply; + switch_mutex_unlock(globals.fetch_reply_mutex); ei_get_type(rep->buff, &rep->index, &type, &size); if (type != ERL_STRING_EXT && type != ERL_BINARY_EXT) { /* XXX no unicode or character codes > 255 */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "XML fetch response contained non ASCII characters? (was type %d of size %d)\n", type, size); - return NULL; + goto cleanup; } if (!(xmlstr = malloc(size + 1))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error\n"); - return NULL; + goto cleanup; } ei_decode_string_or_binary(rep->buff, &rep->index, size, xmlstr); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got data %s after %d milliseconds!\n", xmlstr, i * 10); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got data %s after %d milliseconds from %s!\n", xmlstr, i * 10, p->winner); if (zstr(xmlstr)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "No Result\n"); @@ -469,14 +486,31 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c } /* cleanup */ - switch_core_hash_delete(ptr->listener->fetch_reply_hash, uuid_str); - switch_safe_free(rep->buff); - switch_safe_free(rep); + cleanup: + if (p) { + switch_mutex_lock(globals.fetch_reply_mutex); + put_reply_unlock(p, uuid_str); + } return xml; } +void put_reply_unlock(fetch_reply_t *p, char *uuid_str) +{ + if (-- p->usecount == 0) { + switch_core_hash_delete(globals.fetch_reply_hash, uuid_str); + switch_thread_cond_destroy(p->ready_or_found); + if (p->reply) { + switch_safe_free(p->reply->buff); + switch_safe_free(p->reply); + } + switch_safe_free(p); + } + switch_mutex_unlock(globals.fetch_reply_mutex); +} + + static switch_status_t notify_new_session(listener_t *listener, session_elem_t *session_element) { int result; @@ -921,8 +955,6 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) prefs.threads++; switch_mutex_unlock(globals.listener_mutex); - switch_core_hash_init(&listener->fetch_reply_hash, listener->pool); - switch_assert(listener != NULL); if (check_inbound_acl(listener)) { @@ -1548,9 +1580,13 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) switch_application_interface_t *app_interface; switch_api_interface_t *api_interface; + module_pool = pool; + memset(&prefs, 0, sizeof(prefs)); switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool); + switch_mutex_init(&globals.fetch_reply_mutex, SWITCH_MUTEX_DEFAULT, pool); + switch_core_hash_init(&globals.fetch_reply_hash, pool); /* intialize the unique reference stuff */ switch_mutex_init(&globals.ref_mutex, SWITCH_MUTEX_NESTED, pool); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index dbe80441e3..424b3af40e 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -103,7 +103,6 @@ struct listener { switch_log_level_t level; uint8_t event_list[SWITCH_EVENT_ALL + 1]; switch_hash_t *event_hash; - switch_hash_t *fetch_reply_hash; switch_hash_t *spawn_pid_hash; switch_thread_rwlock_t *rwlock; switch_mutex_t *session_mutex; @@ -140,15 +139,27 @@ struct api_command_struct { struct globals_struct { switch_mutex_t *listener_mutex; switch_event_node_t *node; + switch_mutex_t *ref_mutex; + switch_mutex_t *fetch_reply_mutex; + switch_hash_t *fetch_reply_hash; unsigned int reference0; unsigned int reference1; unsigned int reference2; char TIMEOUT; /* marker for a timed out request */ char WAITING; /* marker for a request waiting for a response */ - switch_mutex_t *ref_mutex; }; typedef struct globals_struct globals_t; +struct fetch_reply_struct +{ + switch_thread_cond_t *ready_or_found; + int usecount; + enum { reply_not_ready, reply_waiting, reply_found, reply_timeout } state; + ei_x_buff *reply; + char winner[MAXNODELEN + 1]; +}; +typedef struct fetch_reply_struct fetch_reply_t; + struct listen_list_struct { #ifdef WIN32 SOCKET sockfd; @@ -236,6 +247,7 @@ switch_status_t initialise_ei(struct ei_cnode_s *ec); session_elem_t *attach_call_to_registered_process(listener_t *listener, char *reg_name, switch_core_session_t *session); session_elem_t *attach_call_to_pid(listener_t *listener, erlang_pid * pid, switch_core_session_t *session); session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *module, char *function, switch_core_session_t *session); +void put_reply_unlock(fetch_reply_t *p, char *uuid_str); /* For Emacs: * Local Variables: