diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index cfa1d63115..c2f204ef0a 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -44,6 +44,7 @@ SWITCH_MODULE_DEFINITION(mod_erlang_event, mod_erlang_event_load, mod_erlang_eve static switch_memory_pool_t *module_pool = NULL; static void remove_listener(listener_t *listener); +static void destroy_listener(listener_t *listener); static switch_status_t state_handler(switch_core_session_t *session); SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip); @@ -59,7 +60,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l switch_thread_rwlock_rdlock(globals.listener_rwlock); for (l = listen_list.listeners; l; l = l->next) { - /* TODO listener read lock */ + if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) { switch_log_node_t *dnode = switch_log_node_dup(node); @@ -80,6 +81,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l l->lost_logs++; } } + } switch_thread_rwlock_unlock(globals.listener_rwlock); @@ -192,8 +194,6 @@ static void event_handler(switch_event_t *event) one of them should receive the event as well */ - /* TODO need read locking */ - send_event_to_attached_sessions(l, event); if (!switch_test_flag(l, LFLAG_EVENTS)) { @@ -255,21 +255,21 @@ static void close_socket(int *sock) } -/*static void add_listener(listener_t *listener)*/ -/*{*/ - /* add me to the listeners so I get events */ - /*switch_thread_rwlock_wrlock(globals.listener_rwlock);*/ - /*listener->next = listen_list.listeners;*/ - /*listen_list.listeners = listener;*/ - /*switch_thread_rwlock_unlock(globals.listener_rwlock);*/ -/*}*/ +static void add_listener(listener_t *listener) +{ + /* add me to the listeners so I get events */ + switch_thread_rwlock_wrlock(globals.listener_rwlock); + listener->next = listen_list.listeners; + listen_list.listeners = listener; + switch_thread_rwlock_unlock(globals.listener_rwlock); +} -/* TODO lock */ static void remove_listener(listener_t *listener) { listener_t *l, *last = NULL; + switch_thread_rwlock_wrlock(globals.listener_rwlock); for (l = listen_list.listeners; l; l = l->next) { if (l == listener) { if (last) { @@ -280,17 +280,18 @@ static void remove_listener(listener_t *listener) } last = l; } + switch_thread_rwlock_unlock(globals.listener_rwlock); } -/* Search for a listener already talking to the specified node */ -static listener_t *find_listener(char *nodename) +/* Search for a listener already talking to the specified node and lock for reading*/ +static listener_t *find_listener_locked(char *nodename) { listener_t *l = NULL; switch_thread_rwlock_rdlock(globals.listener_rwlock); for (l = listen_list.listeners; l; l = l->next) { if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) { - /* TODO listener rwlock */ + switch_thread_rwlock_rdlock(l->rwlock); break; } } @@ -976,10 +977,6 @@ static switch_bool_t check_inbound_acl(listener_t *listener) static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) { listener_t *listener = (listener_t *) obj; - session_elem_t *s = NULL; - const void *key; - void *value; - switch_hash_index_t *iter; switch_mutex_lock(globals.listener_count_mutex); prefs.threads++; @@ -994,50 +991,14 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip); /*, listener->remote_port); */ } - /*add_listener(listener);*/ + add_listener(listener); listener_main_loop(listener); } - /* clean up */ - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n"); - - listener->dead = 1; /* mark it as dead */ - - /* TODO - release write lock */ - switch_thread_rwlock_wrlock(globals.listener_rwlock); - remove_listener(listener); - switch_thread_rwlock_unlock(globals.listener_rwlock); - - switch_thread_rwlock_wrlock(listener->rwlock); - - if (listener->sockfd) { - close_socket(&listener->sockfd); - } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Closed\n"); - /* TODO make listener destroy function and move there */ - switch_core_hash_destroy(&listener->event_hash); - /* remove any bindings for this connection */ - remove_binding(listener, NULL); - - /* clean up all the attached sessions */ - switch_thread_rwlock_wrlock(listener->session_rwlock); - for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) { - switch_hash_this(iter, &key, NULL, &value); - s = (session_elem_t*)value; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Orphaning call %s\n", s->uuid_str); - remove_session_elem_from_listener(listener, s); - destroy_session_elem(s); - } - switch_thread_rwlock_unlock(listener->session_rwlock); - switch_thread_rwlock_unlock(listener->rwlock); - - if (listener->pool) { - switch_memory_pool_t *pool = listener->pool; - switch_core_destroy_memory_pool(&pool); - } + remove_listener(listener); + destroy_listener(listener); switch_mutex_lock(globals.listener_count_mutex); prefs.threads--; @@ -1220,8 +1181,6 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd) switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool); switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool); - /* TODO remove */ - listener->dead = 0; /* born alive */ listener->sockfd = clientfd; listener->pool = pool; listener->ec = switch_core_alloc(listener->pool, sizeof(ei_cnode)); @@ -1233,49 +1192,16 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd) switch_core_hash_init(&listener->event_hash, listener->pool); switch_core_hash_init(&listener->sessions, listener->pool); - /* TODO listener rdlock */ - listener->next = listen_list.listeners; - listen_list.listeners = listener; - return listener; } -/*TODO we don't need bottleneck*/ -static listener_t *new_listener_locked(struct ei_cnode_s *ec, int clientfd) -{ - listener_t *res; - switch_thread_rwlock_wrlock(globals.listener_rwlock); - res = new_listener(ec, clientfd); - switch_thread_rwlock_unlock(globals.listener_rwlock); - return res; -} -/* TODO new session??? */ -static listener_t *new_outbound_listener(char *node, switch_bool_t *new_session) +static listener_t *new_outbound_listener_locked(char *node) { listener_t *listener = NULL; struct ei_cnode_s ec; int clientfd; - /* TODO find listener func */ - switch_thread_rwlock_wrlock(globals.listener_rwlock); - for (listener = listen_list.listeners; listener; listener = listener->next) { - if (!strncmp(node, listener->peer_nodename, MAXNODELEN)) { - break; - } - } - - if (listener && listener->dead) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "found dead listener for %s\n", node); - remove_listener(listener); /* remove the dead listener and continue adding one */ - } else if (listener) { - switch_thread_rwlock_unlock(globals.listener_rwlock); - - *new_session = SWITCH_FALSE; - return listener; - } - - if (SWITCH_STATUS_SUCCESS == initialise_ei(&ec)) { #ifdef WIN32 WSASetLastError(0); @@ -1284,7 +1210,7 @@ static listener_t *new_outbound_listener(char *node, switch_bool_t *new_session) #endif if ((clientfd = ei_connect(&ec, node)) < 0) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error connecting to node %s (erl_errno=%d, errno=%d)!\n", node, erl_errno, errno); - switch_thread_rwlock_unlock(globals.listener_rwlock); + return NULL; } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "new listener for %s\n", node); @@ -1292,12 +1218,49 @@ static listener_t *new_outbound_listener(char *node, switch_bool_t *new_session) listener->peer_nodename = switch_core_strdup(listener->pool, node); } - switch_thread_rwlock_unlock(globals.listener_rwlock); - *new_session = SWITCH_TRUE; + switch_thread_rwlock_rdlock(listener->rwlock); return listener; } +void destroy_listener(listener_t * listener) +{ + session_elem_t *s = NULL; + const void *key; + void *value; + switch_hash_index_t *iter; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n"); + switch_thread_rwlock_wrlock(listener->rwlock); + + if (listener->sockfd) { + close_socket(&listener->sockfd); + } + + switch_core_hash_destroy(&listener->event_hash); + + /* remove any bindings for this connection */ + remove_binding(listener, NULL); + + /* clean up all the attached sessions */ + switch_thread_rwlock_wrlock(listener->session_rwlock); + for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) { + switch_hash_this(iter, &key, NULL, &value); + s = (session_elem_t*)value; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Orphaning call %s\n", s->uuid_str); + remove_session_elem_from_listener(listener, s); + destroy_session_elem(s); + } + switch_thread_rwlock_unlock(listener->session_rwlock); + switch_thread_rwlock_unlock(listener->rwlock); + + if (listener->pool) { + switch_memory_pool_t *pool = listener->pool; + switch_core_destroy_memory_pool(&pool); + } + +} + static switch_status_t state_handler(switch_core_session_t *session) { switch_channel_t *channel = switch_core_session_get_channel(session); @@ -1499,7 +1462,6 @@ SWITCH_STANDARD_APP(erlang_outbound_function) char *argv[80] = { 0 }, *argv2[80] = { 0 }; char *mydata, *myarg; char uuid[SWITCH_UUID_FORMATTED_LENGTH + 1]; - switch_bool_t new_session = SWITCH_FALSE; session_elem_t *session_element = NULL; /* process app arguments */ @@ -1541,31 +1503,19 @@ SWITCH_STANDARD_APP(erlang_outbound_function) switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n", argv[0], node); /* first work out if there is a listener already talking to the node we want to talk to */ - listener = find_listener(node); + listener = find_listener_locked(node); /* if there is no listener, then create one */ if (!listener) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for session\n"); - listener = new_outbound_listener(node, &new_session); - /* XXX new_session isn't accurate now */ - } else { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for session\n"); - /* TODO don't we need to connect ? */ - } - - /* TODO it's too late */ - switch_thread_rwlock_rdlock(globals.listener_rwlock); - - if (listener && !listener->dead) { - /* prevent the listener_run thread from destroying the listener out from under us */ - /* get the listener lock */ - switch_thread_rwlock_rdlock(listener->rwlock); - /* release the global listener lock, since the listener can't be freed without the listener lock */ - switch_thread_rwlock_unlock(globals.listener_rwlock); - - if (new_session == SWITCH_TRUE) { + if ((listener = new_outbound_listener_locked(node))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching new listener\n"); launch_listener_thread(listener); } + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for session\n"); + } + + if (listener) { if (module && function) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n"); @@ -1575,10 +1525,8 @@ SWITCH_STANDARD_APP(erlang_outbound_function) session_element = attach_call_to_registered_process(listener, reg_name, session); } - /* should be safe now */ switch_thread_rwlock_unlock(listener->rwlock); - if (session_element) { switch_ivr_park(session, NULL); } @@ -1600,7 +1548,6 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function) char *mydata; ei_x_buff buf; listener_t *listener; - switch_bool_t new_session; ei_x_new_with_version(&buf); @@ -1623,16 +1570,18 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function) ei_x_encode_atom(&buf, "freeswitch_sendmsg"); _ei_x_encode_string(&buf, argv[2]); - listener = find_listener(node); + listener = find_listener_locked(node); if (!listener) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for sendmsg %s\n", node); - listener = new_outbound_listener(node, &new_session); + listener = new_outbound_listener_locked(node); } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for sendmsg to %s\n", node); } - if (listener && !listener->dead) { + if (listener) { ei_reg_send(listener->ec, listener->sockfd, reg_name, buf.buff, buf.index); + + switch_thread_rwlock_unlock(listener->rwlock); } } @@ -1927,7 +1876,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) continue; } - listener = new_listener_locked(&ec, clientfd); + listener = new_listener(&ec, clientfd); if (listener) { /* store the IP and node name we are talking with */ switch_inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip)); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 775cdf74f3..016734dd80 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -114,7 +114,6 @@ struct listener { #else int sockfd; #endif - uint8_t dead; struct ei_cnode_s *ec; struct erlang_process log_process; struct erlang_process event_process;