From 8f4c5bc49240e69c938644a7da4b1632e74d9e95 Mon Sep 17 00:00:00 2001 From: Tamas Cseke Date: Thu, 24 May 2012 09:37:41 +0200 Subject: [PATCH] add Andrew's patch from FS-3432 as a starting point with todo markers --- .../mod_erlang_event/handle_msg.c | 59 +++--- .../mod_erlang_event/mod_erlang_event.c | 195 +++++++++++++----- .../mod_erlang_event/mod_erlang_event.h | 4 +- 3 files changed, 187 insertions(+), 71 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 46e9e67e85..0b083cd0c0 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -286,6 +286,7 @@ static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_bu switch_set_flag_locked(listener, LFLAG_EVENTS); } + /* TODO - listener write lock */ for (i = 1; i < arity; i++) { if (!ei_decode_atom(buf->buff, &buf->index, atom)) { @@ -335,6 +336,7 @@ static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg for (i = 1; i < arity; i++) { if (!ei_decode_atom(buf->buff, &buf->index, atom)) { + /* TODO session write locking */ if (custom) { switch_core_hash_insert(session->event_hash, atom, MARKER); } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) { @@ -380,10 +382,12 @@ static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x int i = 0; switch_event_types_t type; + /* TODO listener write lock */ for (i = 1; i < arity; i++) { if (!ei_decode_atom(buf->buff, &buf->index, atom)) { if (custom) { + switch_core_hash_delete(listener->event_hash, atom); } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) { uint32_t x = 0; @@ -426,6 +430,7 @@ static switch_status_t handle_msg_session_nixevent(listener_t *listener, erlang_ int i = 0; switch_event_types_t type; + /* TODO session write lock */ for (i = 1; i < arity; i++) { if (!ei_decode_atom(buf->buff, &buf->index, atom)) { @@ -480,6 +485,8 @@ static switch_status_t handle_msg_setevent(listener_t *listener, erlang_msg *msg switch_event_types_t type; int i = 0; + /* TODO listener write lock */ + /* clear any previous event registrations */ for( x = 0; x <= SWITCH_EVENT_ALL; x++){ event_list[x] = 0; @@ -517,6 +524,7 @@ static switch_status_t handle_msg_setevent(listener_t *listener, erlang_msg *msg /* update the event subscriptions with the new ones */ memcpy(listener->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1)); /* wipe the old hash, and point the pointer at the new one */ + /* TODO make thread safe */ switch_core_hash_destroy(&listener->event_hash); listener->event_hash = event_hash; @@ -544,12 +552,14 @@ static switch_status_t handle_msg_session_setevent(listener_t *listener, erlang_ switch_event_types_t type; uint32_t x = 0; + /* TODO session write lock */ /* clear any previous event registrations */ for (x = 0; x <= SWITCH_EVENT_ALL; x++){ event_list[x] = 0; } /* create new hash */ + /* TODO make thread safe*/ switch_core_hash_init(&event_hash, session->pool); for (i = 1; i < arity; i++){ @@ -576,6 +586,7 @@ static switch_status_t handle_msg_session_setevent(listener_t *listener, erlang_ /* update the event subscriptions with the new ones */ memcpy(session->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1)); /* wipe the old hash, and point the pointer at the new one */ + /* TODO make thread safe*/ switch_core_hash_destroy(&session->event_hash); session->event_hash = event_hash; /* TODO - we should flush any non-matching events from the queue */ @@ -601,13 +612,13 @@ static switch_status_t handle_msg_api(listener_t *listener, erlang_msg * msg, in fail = SWITCH_TRUE; } - ei_get_type(buf->buff, &buf->index, &type, &size); + ei_get_type(buf->buff, &buf->index, &type, &size); if ((size > (sizeof(api_cmd) - 1)) || ei_decode_atom(buf->buff, &buf->index, api_cmd)) { fail = SWITCH_TRUE; } - ei_get_type(buf->buff, &buf->index, &type, &size); + ei_get_type(buf->buff, &buf->index, &type, &size); arg = malloc(size + 1); if (ei_decode_string(buf->buff, &buf->index, arg)) { @@ -706,29 +717,29 @@ static switch_status_t handle_msg_sendevent(listener_t *listener, int arity, ei_ while (!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) { i++; - ei_get_type(buf->buff, &buf->index, &type, &size); + ei_get_type(buf->buff, &buf->index, &type, &size); if ((size > (sizeof(key) - 1)) || ei_decode_string(buf->buff, &buf->index, key)) { fail = SWITCH_TRUE; break; } - ei_get_type(buf->buff, &buf->index, &type, &size); - value = malloc(size + 1); + ei_get_type(buf->buff, &buf->index, &type, &size); + value = malloc(size + 1); if (ei_decode_string(buf->buff, &buf->index, value)) { fail = SWITCH_TRUE; break; } - if (!fail && !strcmp(key, "body")) { - switch_safe_free(event->body); - event->body = value; - } else if (!fail) { - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM | SWITCH_STACK_NODUP, key, value); - } - - /* Do not free malloc here! The above commands utilize the raw allocated memory and skip any copying/duplication. Faster. */ + if (!fail && !strcmp(key, "body")) { + switch_safe_free(event->body); + event->body = value; + } else if (!fail) { + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM | SWITCH_STACK_NODUP, key, value); + } + + /* Do not free malloc here! The above commands utilize the raw allocated memory and skip any copying/duplication. Faster. */ } if (headerlength != i || fail) { @@ -763,22 +774,22 @@ static switch_status_t handle_msg_sendmsg(listener_t *listener, int arity, ei_x_ char key[1024]; char *value; - int type; - int size; + int type; + int size; int i = 0; switch_bool_t fail = SWITCH_FALSE; while (!ei_decode_tuple_header(buf->buff, &buf->index, &arity) && arity == 2) { i++; - ei_get_type(buf->buff, &buf->index, &type, &size); + ei_get_type(buf->buff, &buf->index, &type, &size); if ((size > (sizeof(key) - 1)) || ei_decode_string(buf->buff, &buf->index, key)) { fail = SWITCH_TRUE; break; } - - ei_get_type(buf->buff, &buf->index, &type, &size); - value = malloc(size + 1); + + ei_get_type(buf->buff, &buf->index, &type, &size); + value = malloc(size + 1); if (ei_decode_string(buf->buff, &buf->index, value)) { fail = SWITCH_TRUE; @@ -786,7 +797,7 @@ static switch_status_t handle_msg_sendmsg(listener_t *listener, int arity, ei_x_ } if (!fail) { - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM | SWITCH_STACK_NODUP, key, value); + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM | SWITCH_STACK_NODUP, key, value); } } @@ -1024,6 +1035,7 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e listener->event_list[x] = 0; } /* wipe the hash */ + /* TODO make thread safe*/ switch_core_hash_destroy(&listener->event_hash); switch_core_hash_init(&listener->event_hash, listener->pool); ei_x_encode_atom(rbuf, "ok"); @@ -1044,6 +1056,7 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e session->event_list[x] = 0; } /* wipe the hash */ + /* TODO make thread safe*/ switch_core_hash_destroy(&session->event_hash); switch_core_hash_init(&session->event_hash, session->pool); ei_x_encode_atom(rbuf, "ok"); @@ -1240,6 +1253,7 @@ int handle_msg(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buf buf->index = 0; ei_decode_version(buf->buff, &buf->index, &version); ei_get_type(buf->buff, &buf->index, &type, &size); + switch (type) { case ERL_SMALL_TUPLE_EXT: case ERL_LARGE_TUPLE_EXT: @@ -1288,11 +1302,8 @@ int handle_msg(listener_t *listener, erlang_msg * msg, ei_x_buff * buf, ei_x_buf #ifdef EI_DEBUG ei_x_print_msg(rbuf, &msg->from, 1); #endif + return SWITCH_STATUS_SUCCESS != ret; - if (SWITCH_STATUS_SUCCESS == ret) - return 0; - else /* SWITCH_STATUS_TERM */ - return 1; } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Empty reply, supressing\n"); return 0; diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 12cf01a650..cfa1d63115 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -59,6 +59,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l switch_thread_rwlock_rdlock(globals.listener_rwlock); for (l = listen_list.listeners; l; l = l->next) { + /* TODO listener read lock */ if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) { switch_log_node_t *dnode = switch_log_node_dup(node); @@ -131,9 +132,10 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t return; } + switch_thread_rwlock_rdlock(listener->session_rwlock); s = (session_elem_t*)switch_core_hash_find(listener->sessions, uuid); - switch_thread_rwlock_unlock(listener->session_rwlock); + /* TODO - we don't need to hold the lock, we need to lock the session */ if (s) { int send = 0; @@ -162,6 +164,7 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t switch_event_name(event->event_id), s->uuid_str); } } + switch_thread_rwlock_unlock(listener->session_rwlock); } static void event_handler(switch_event_t *event) @@ -175,9 +178,10 @@ static void event_handler(switch_event_t *event) return; } + switch_thread_rwlock_rdlock(globals.listener_rwlock); + lp = listen_list.listeners; - switch_thread_rwlock_rdlock(globals.listener_rwlock); while (lp) { uint8_t send = 0; @@ -188,6 +192,8 @@ static void event_handler(switch_event_t *event) one of them should receive the event as well */ + /* TODO need read locking */ + send_event_to_attached_sessions(l, event); if (!switch_test_flag(l, LFLAG_EVENTS)) { @@ -249,21 +255,21 @@ static void close_socket(int *sock) } -static void add_listener(listener_t *listener) -{ +/*static void add_listener(listener_t *listener)*/ +/*{*/ /* add me to the listeners so I get events */ - switch_thread_rwlock_wrlock(globals.listener_rwlock); - listener->next = listen_list.listeners; - listen_list.listeners = listener; - switch_thread_rwlock_unlock(globals.listener_rwlock); -} + /*switch_thread_rwlock_wrlock(globals.listener_rwlock);*/ + /*listener->next = listen_list.listeners;*/ + /*listen_list.listeners = listener;*/ + /*switch_thread_rwlock_unlock(globals.listener_rwlock);*/ +/*}*/ +/* TODO lock */ static void remove_listener(listener_t *listener) { listener_t *l, *last = NULL; - switch_thread_rwlock_wrlock(globals.listener_rwlock); for (l = listen_list.listeners; l; l = l->next) { if (l == listener) { if (last) { @@ -274,7 +280,6 @@ static void remove_listener(listener_t *listener) } last = l; } - switch_thread_rwlock_unlock(globals.listener_rwlock); } /* Search for a listener already talking to the specified node */ @@ -285,6 +290,7 @@ static listener_t *find_listener(char *nodename) switch_thread_rwlock_rdlock(globals.listener_rwlock); for (l = listen_list.listeners; l; l = l->next) { if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) { + /* TODO listener rwlock */ break; } } @@ -301,11 +307,19 @@ static void add_session_elem_to_listener(listener_t *listener, session_elem_t *s } +/* TODO lock */ static void remove_session_elem_from_listener(listener_t *listener, session_elem_t *session_element) { switch_core_hash_delete(listener->sessions, session_element->uuid_str); } +static void remove_session_elem_from_listener_locked(listener_t *listener, session_elem_t *session_element) +{ + switch_thread_rwlock_wrlock(listener->session_rwlock); + remove_session_elem_from_listener(listener, session_element); + switch_thread_rwlock_unlock(listener->session_rwlock); +} + static void destroy_session_elem(session_elem_t *session_element) { switch_core_session_t *session; @@ -315,16 +329,10 @@ static void destroy_session_elem(session_elem_t *session_element) switch_core_session_rwunlock(session); } switch_core_destroy_memory_pool(&session_element->pool); + session_element = NULL; /*switch_safe_free(s); */ } -static void remove_session_elem_from_listener_locked(listener_t *listener, session_elem_t *session_element) -{ - switch_thread_rwlock_wrlock(listener->session_rwlock); - remove_session_elem_from_listener(listener, session_element); - switch_thread_rwlock_unlock(listener->session_rwlock); -} - session_elem_t *find_session_elem_by_pid(listener_t *listener, erlang_pid *pid) { @@ -362,6 +370,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c switch_xml_t xml = NULL; ei_x_buff *rep; ei_x_buff buf; + ei_x_new_with_version(&buf); switch_uuid_get(&uuid); @@ -403,6 +412,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c /* Create a new fetch object. */ p = malloc(sizeof(*p)); switch_thread_cond_create(&p->ready_or_found, module_pool); + /* TODO module pool */ switch_mutex_init(&p->mutex, SWITCH_MUTEX_UNNESTED, module_pool); p->state = reply_not_ready; p->reply = NULL; @@ -430,8 +440,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c /* Tell the threads to be ready, and wait five seconds for a reply. */ switch_mutex_lock(p->mutex); //p->state = reply_waiting; - switch_thread_cond_timedwait(p->ready_or_found, - p->mutex, 5000000); + switch_thread_cond_timedwait(p->ready_or_found, p->mutex, 5000000); if (!p->reply) { p->state = reply_timeout; switch_mutex_unlock(p->mutex); @@ -516,6 +525,7 @@ static switch_status_t notify_new_session(listener_t *listener, session_elem_t * switch_caller_profile_event_set_data(switch_channel_get_caller_profile(channel), "Channel", call_event); switch_channel_event_set_data(channel, call_event); switch_core_session_rwunlock(session); + /* TODO reply? sure? */ switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "Content-Type", "command/reply"); switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "Reply-Text", "+OK\n"); @@ -551,6 +561,7 @@ static switch_status_t check_attached_sessions(listener_t *listener) /* event used to track sessions to remove */ switch_event_t *event = NULL; switch_event_header_t *header = NULL; + switch_event_create_subclass(&event, SWITCH_EVENT_CLONE, NULL); switch_assert(event); /* check up on all the attached sessions - @@ -558,6 +569,8 @@ static switch_status_t check_attached_sessions(listener_t *listener) if they have pending events in their queues then send them if the session has finished then clean it up */ + + /* TODO try to minimize critical section */ switch_thread_rwlock_rdlock(listener->session_rwlock); for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) { switch_hash_this(iter, &key, NULL, &value); @@ -643,6 +656,8 @@ static switch_status_t check_attached_sessions(listener_t *listener) /* release the read lock and get a write lock */ switch_thread_rwlock_wrlock(listener->session_rwlock); /* do the deferred remove */ + + /* TODO refactor find_session_elem_by_uuid*/ for (header = event->headers; header; header = header->next) { if ((sp = (session_elem_t*)switch_core_hash_find(listener->sessions, header->value))) { remove_session_elem_from_listener(listener, sp); @@ -762,6 +777,7 @@ static void handle_exit(listener_t *listener, erlang_pid * pid) switch_core_session_rwunlock(session); } /* TODO - if a spawned process that was handling an outbound call fails.. what do we do with the call? */ + /* TODO hangup and let the state handler set the complete flag and destroy as usual*/ } remove_session_elem_from_listener_locked(listener, s); destroy_session_elem(s); @@ -772,6 +788,7 @@ static void handle_exit(listener_t *listener, erlang_pid * pid) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Log handler process for node %s exited\n", pid->node); /*purge the log queue */ + /* TODO don't we want to clear flag first? */ while (switch_queue_trypop(listener->log_queue, &pop) == SWITCH_STATUS_SUCCESS) { switch_log_node_t *dnode = (switch_log_node_t *) pop; switch_log_node_free(&dnode); @@ -787,6 +804,7 @@ static void handle_exit(listener_t *listener, erlang_pid * pid) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Event handler process for node %s exited\n", pid->node); /*purge the event queue */ + /* TODO don't we want to clear flag first? */ while (switch_queue_trypop(listener->event_queue, &pop) == SWITCH_STATUS_SUCCESS) { switch_event_t *pevent = (switch_event_t *) pop; switch_event_destroy(&pevent); @@ -795,10 +813,13 @@ static void handle_exit(listener_t *listener, erlang_pid * pid) if (switch_test_flag(listener, LFLAG_EVENTS)) { uint8_t x = 0; switch_clear_flag_locked(listener, LFLAG_EVENTS); + for (x = 0; x <= SWITCH_EVENT_ALL; x++) { listener->event_list[x] = 0; } /* wipe the hash */ + /* XXX this needs to be locked */ + /* TODO switch_core_hash_delete_multi_locked */ switch_core_hash_destroy(&listener->event_hash); switch_core_hash_init(&listener->event_hash, listener->pool); } @@ -865,7 +886,16 @@ static void listener_main_loop(listener_t *listener) case ERL_EXIT: switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_exit from %s <%d.%d.%d>\n", msg.from.node, msg.from.creation, msg.from.num, msg.from.serial); - handle_exit(listener, &msg.from); + + switch_thread_rwlock_rdlock(globals.listener_rwlock); + if (listener) { + /* get the listener lock */ + switch_thread_rwlock_wrlock(listener->rwlock); + /* wipe event hash */ + handle_exit(listener, &msg.from); + switch_thread_rwlock_unlock(listener->rwlock); + } + switch_thread_rwlock_unlock(globals.listener_rwlock); break; default: switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "unexpected msg type %d\n", (int) (msg.msgtype)); @@ -874,7 +904,7 @@ static void listener_main_loop(listener_t *listener) break; case ERL_ERROR: if (erl_errno != ETIMEDOUT && erl_errno != EAGAIN) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_error\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "erl_error: status=%d, erl_errno=%d errno=%d\n", status, erl_errno, errno); } break; default: @@ -892,6 +922,11 @@ static void listener_main_loop(listener_t *listener) return; } } + if (prefs.done) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "shutting down listener\n"); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "listener exit: status=%d, erl_errno=%d errno=%d\n", status, erl_errno, errno); + } } static switch_bool_t check_inbound_acl(listener_t *listener) @@ -941,7 +976,7 @@ static switch_bool_t check_inbound_acl(listener_t *listener) static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) { listener_t *listener = (listener_t *) obj; - session_elem_t *s; + session_elem_t *s = NULL; const void *key; void *value; switch_hash_index_t *iter; @@ -959,24 +994,29 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip); /*, listener->remote_port); */ } - add_listener(listener); + /*add_listener(listener);*/ listener_main_loop(listener); } /* clean up */ - remove_listener(listener); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n"); + listener->dead = 1; /* mark it as dead */ + + /* TODO - release write lock */ + switch_thread_rwlock_wrlock(globals.listener_rwlock); + remove_listener(listener); + switch_thread_rwlock_unlock(globals.listener_rwlock); + switch_thread_rwlock_wrlock(listener->rwlock); if (listener->sockfd) { close_socket(&listener->sockfd); } - switch_thread_rwlock_unlock(listener->rwlock); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Closed\n"); + /* TODO make listener destroy function and move there */ switch_core_hash_destroy(&listener->event_hash); /* remove any bindings for this connection */ @@ -987,9 +1027,12 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) { switch_hash_this(iter, &key, NULL, &value); s = (session_elem_t*)value; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Orphaning call %s\n", s->uuid_str); + remove_session_elem_from_listener(listener, s); destroy_session_elem(s); } switch_thread_rwlock_unlock(listener->session_rwlock); + switch_thread_rwlock_unlock(listener->rwlock); if (listener->pool) { switch_memory_pool_t *pool = listener->pool; @@ -1156,30 +1199,31 @@ static int config(void) return 0; } - static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd) { - switch_memory_pool_t *listener_pool = NULL; + switch_memory_pool_t *pool = NULL; listener_t *listener = NULL; - if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) { + if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); return NULL; } - if (!(listener = switch_core_alloc(listener_pool, sizeof(*listener)))) { + if (!(listener = switch_core_alloc(pool, sizeof(*listener)))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n"); + switch_core_destroy_memory_pool(&pool); return NULL; } memset(listener, 0, sizeof(*listener)); - switch_thread_rwlock_create(&listener->rwlock, listener_pool); - switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener_pool); - switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, listener_pool); + switch_thread_rwlock_create(&listener->rwlock, pool); + switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool); + switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool); + /* TODO remove */ + listener->dead = 0; /* born alive */ listener->sockfd = clientfd; - listener->pool = listener_pool; - listener_pool = NULL; + listener->pool = pool; listener->ec = switch_core_alloc(listener->pool, sizeof(ei_cnode)); memcpy(listener->ec, ec, sizeof(ei_cnode)); listener->level = SWITCH_LOG_DEBUG; @@ -1189,15 +1233,49 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd) switch_core_hash_init(&listener->event_hash, listener->pool); switch_core_hash_init(&listener->sessions, listener->pool); + /* TODO listener rdlock */ + listener->next = listen_list.listeners; + listen_list.listeners = listener; + return listener; } -static listener_t *new_outbound_listener(char *node) +/*TODO we don't need bottleneck*/ +static listener_t *new_listener_locked(struct ei_cnode_s *ec, int clientfd) +{ + listener_t *res; + switch_thread_rwlock_wrlock(globals.listener_rwlock); + res = new_listener(ec, clientfd); + switch_thread_rwlock_unlock(globals.listener_rwlock); + return res; +} + +/* TODO new session??? */ +static listener_t *new_outbound_listener(char *node, switch_bool_t *new_session) { listener_t *listener = NULL; struct ei_cnode_s ec; int clientfd; + /* TODO find listener func */ + switch_thread_rwlock_wrlock(globals.listener_rwlock); + for (listener = listen_list.listeners; listener; listener = listener->next) { + if (!strncmp(node, listener->peer_nodename, MAXNODELEN)) { + break; + } + } + + if (listener && listener->dead) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "found dead listener for %s\n", node); + remove_listener(listener); /* remove the dead listener and continue adding one */ + } else if (listener) { + switch_thread_rwlock_unlock(globals.listener_rwlock); + + *new_session = SWITCH_FALSE; + return listener; + } + + if (SWITCH_STATUS_SUCCESS == initialise_ei(&ec)) { #ifdef WIN32 WSASetLastError(0); @@ -1206,11 +1284,17 @@ static listener_t *new_outbound_listener(char *node) #endif if ((clientfd = ei_connect(&ec, node)) < 0) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error connecting to node %s (erl_errno=%d, errno=%d)!\n", node, erl_errno, errno); + switch_thread_rwlock_unlock(globals.listener_rwlock); return NULL; } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "new listener for %s\n", node); listener = new_listener(&ec, clientfd); listener->peer_nodename = switch_core_strdup(listener->pool, node); } + + switch_thread_rwlock_unlock(globals.listener_rwlock); + *new_session = SWITCH_TRUE; + return listener; } @@ -1226,6 +1310,7 @@ static switch_status_t state_handler(switch_core_session_t *session) if (state == CS_DESTROY) { /* indicate that once all the events in the event queue are done * we can throw this away */ + /* TODO locked? */ switch_set_flag(session_element, LFLAG_SESSION_COMPLETE); } } else { @@ -1298,6 +1383,7 @@ session_elem_t *attach_call_to_pid(listener_t *listener, erlang_pid * pid, switc memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); /* attach the session to the listener */ add_session_elem_to_listener(listener, session_element); + /* TODO link before added to listener? */ ei_link(listener, ei_self(listener->ec), pid); return session_element; @@ -1361,8 +1447,7 @@ session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *modul */ } - switch_thread_cond_timedwait(p->ready_or_found, - p->mutex, 5000000); + switch_thread_cond_timedwait(p->ready_or_found, p->mutex, 5000000); if (!p->pid) { p->state = reply_timeout; switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid %s %s\n", hash, session_element->uuid_str); @@ -1460,13 +1545,23 @@ SWITCH_STANDARD_APP(erlang_outbound_function) /* if there is no listener, then create one */ if (!listener) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for session\n"); - new_session = SWITCH_TRUE; - listener = new_outbound_listener(node); + listener = new_outbound_listener(node, &new_session); + /* XXX new_session isn't accurate now */ } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for session\n"); + /* TODO don't we need to connect ? */ } - if (listener) { + /* TODO it's too late */ + switch_thread_rwlock_rdlock(globals.listener_rwlock); + + if (listener && !listener->dead) { + /* prevent the listener_run thread from destroying the listener out from under us */ + /* get the listener lock */ + switch_thread_rwlock_rdlock(listener->rwlock); + /* release the global listener lock, since the listener can't be freed without the listener lock */ + switch_thread_rwlock_unlock(globals.listener_rwlock); + if (new_session == SWITCH_TRUE) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching new listener\n"); launch_listener_thread(listener); @@ -1480,11 +1575,18 @@ SWITCH_STANDARD_APP(erlang_outbound_function) session_element = attach_call_to_registered_process(listener, reg_name, session); } + /* should be safe now */ + switch_thread_rwlock_unlock(listener->rwlock); + + if (session_element) { switch_ivr_park(session, NULL); } + } else { + switch_thread_rwlock_unlock(globals.listener_rwlock); } + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(uuid), SWITCH_LOG_DEBUG, "exit erlang_outbound_function\n"); } @@ -1498,6 +1600,7 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function) char *mydata; ei_x_buff buf; listener_t *listener; + switch_bool_t new_session; ei_x_new_with_version(&buf); @@ -1523,12 +1626,12 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function) listener = find_listener(node); if (!listener) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for sendmsg %s\n", node); - listener = new_outbound_listener(node); + listener = new_outbound_listener(node, &new_session); } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for sendmsg to %s\n", node); } - if (listener) { + if (listener && !listener->dead) { ei_reg_send(listener->ec, listener->sockfd, reg_name, buf.buff, buf.index); } } @@ -1598,11 +1701,11 @@ SWITCH_STANDARD_API(erlang_cmd) stream->write_function(stream, "Outbound session for %s in state %s\n", sp->uuid_str, switch_channel_state_name(sp->channel_state)); } + switch_thread_rwlock_unlock(l->session_rwlock); if (empty) { stream->write_function(stream, "No active sessions for %s\n", argv[1]); } - switch_thread_rwlock_unlock(l->session_rwlock); break; } } @@ -1824,7 +1927,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) continue; } - listener = new_listener(&ec, clientfd); + listener = new_listener_locked(&ec, clientfd); if (listener) { /* store the IP and node name we are talking with */ switch_inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip)); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 1721d496fb..775cdf74f3 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -39,7 +39,8 @@ typedef enum { } session_flag_t; typedef enum { - ERLANG_PID = 0, + NONE = 0, + ERLANG_PID, ERLANG_REG_PROCESS } process_type; @@ -113,6 +114,7 @@ struct listener { #else int sockfd; #endif + uint8_t dead; struct ei_cnode_s *ec; struct erlang_process log_process; struct erlang_process event_process;