listener r/w locking FS-3432

This commit is contained in:
Tamas Cseke 2012-05-24 10:30:48 +02:00
parent 8f4c5bc492
commit eade657225
2 changed files with 73 additions and 125 deletions

View File

@ -44,6 +44,7 @@ SWITCH_MODULE_DEFINITION(mod_erlang_event, mod_erlang_event_load, mod_erlang_eve
static switch_memory_pool_t *module_pool = NULL; static switch_memory_pool_t *module_pool = NULL;
static void remove_listener(listener_t *listener); static void remove_listener(listener_t *listener);
static void destroy_listener(listener_t *listener);
static switch_status_t state_handler(switch_core_session_t *session); static switch_status_t state_handler(switch_core_session_t *session);
SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip); SWITCH_DECLARE_GLOBAL_STRING_FUNC(set_pref_ip, prefs.ip);
@ -59,7 +60,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l
switch_thread_rwlock_rdlock(globals.listener_rwlock); switch_thread_rwlock_rdlock(globals.listener_rwlock);
for (l = listen_list.listeners; l; l = l->next) { for (l = listen_list.listeners; l; l = l->next) {
/* TODO listener read lock */
if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) { if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) {
switch_log_node_t *dnode = switch_log_node_dup(node); switch_log_node_t *dnode = switch_log_node_dup(node);
@ -80,6 +81,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l
l->lost_logs++; l->lost_logs++;
} }
} }
} }
switch_thread_rwlock_unlock(globals.listener_rwlock); switch_thread_rwlock_unlock(globals.listener_rwlock);
@ -192,8 +194,6 @@ static void event_handler(switch_event_t *event)
one of them should receive the event as well one of them should receive the event as well
*/ */
/* TODO need read locking */
send_event_to_attached_sessions(l, event); send_event_to_attached_sessions(l, event);
if (!switch_test_flag(l, LFLAG_EVENTS)) { if (!switch_test_flag(l, LFLAG_EVENTS)) {
@ -255,21 +255,21 @@ static void close_socket(int *sock)
} }
/*static void add_listener(listener_t *listener)*/ static void add_listener(listener_t *listener)
/*{*/ {
/* add me to the listeners so I get events */ /* add me to the listeners so I get events */
/*switch_thread_rwlock_wrlock(globals.listener_rwlock);*/ switch_thread_rwlock_wrlock(globals.listener_rwlock);
/*listener->next = listen_list.listeners;*/ listener->next = listen_list.listeners;
/*listen_list.listeners = listener;*/ listen_list.listeners = listener;
/*switch_thread_rwlock_unlock(globals.listener_rwlock);*/ switch_thread_rwlock_unlock(globals.listener_rwlock);
/*}*/ }
/* TODO lock */
static void remove_listener(listener_t *listener) static void remove_listener(listener_t *listener)
{ {
listener_t *l, *last = NULL; listener_t *l, *last = NULL;
switch_thread_rwlock_wrlock(globals.listener_rwlock);
for (l = listen_list.listeners; l; l = l->next) { for (l = listen_list.listeners; l; l = l->next) {
if (l == listener) { if (l == listener) {
if (last) { if (last) {
@ -280,17 +280,18 @@ static void remove_listener(listener_t *listener)
} }
last = l; last = l;
} }
switch_thread_rwlock_unlock(globals.listener_rwlock);
} }
/* Search for a listener already talking to the specified node */ /* Search for a listener already talking to the specified node and lock for reading*/
static listener_t *find_listener(char *nodename) static listener_t *find_listener_locked(char *nodename)
{ {
listener_t *l = NULL; listener_t *l = NULL;
switch_thread_rwlock_rdlock(globals.listener_rwlock); switch_thread_rwlock_rdlock(globals.listener_rwlock);
for (l = listen_list.listeners; l; l = l->next) { for (l = listen_list.listeners; l; l = l->next) {
if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) { if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) {
/* TODO listener rwlock */ switch_thread_rwlock_rdlock(l->rwlock);
break; break;
} }
} }
@ -976,10 +977,6 @@ static switch_bool_t check_inbound_acl(listener_t *listener)
static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
{ {
listener_t *listener = (listener_t *) obj; listener_t *listener = (listener_t *) obj;
session_elem_t *s = NULL;
const void *key;
void *value;
switch_hash_index_t *iter;
switch_mutex_lock(globals.listener_count_mutex); switch_mutex_lock(globals.listener_count_mutex);
prefs.threads++; prefs.threads++;
@ -994,50 +991,14 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip); /*, listener->remote_port); */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Open from %s\n", listener->remote_ip); /*, listener->remote_port); */
} }
/*add_listener(listener);*/ add_listener(listener);
listener_main_loop(listener); listener_main_loop(listener);
} }
/* clean up */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
listener->dead = 1; /* mark it as dead */
/* TODO - release write lock */
switch_thread_rwlock_wrlock(globals.listener_rwlock);
remove_listener(listener);
switch_thread_rwlock_unlock(globals.listener_rwlock);
switch_thread_rwlock_wrlock(listener->rwlock);
if (listener->sockfd) {
close_socket(&listener->sockfd);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Closed\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connection Closed\n");
/* TODO make listener destroy function and move there */
switch_core_hash_destroy(&listener->event_hash);
/* remove any bindings for this connection */ remove_listener(listener);
remove_binding(listener, NULL); destroy_listener(listener);
/* clean up all the attached sessions */
switch_thread_rwlock_wrlock(listener->session_rwlock);
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
switch_hash_this(iter, &key, NULL, &value);
s = (session_elem_t*)value;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Orphaning call %s\n", s->uuid_str);
remove_session_elem_from_listener(listener, s);
destroy_session_elem(s);
}
switch_thread_rwlock_unlock(listener->session_rwlock);
switch_thread_rwlock_unlock(listener->rwlock);
if (listener->pool) {
switch_memory_pool_t *pool = listener->pool;
switch_core_destroy_memory_pool(&pool);
}
switch_mutex_lock(globals.listener_count_mutex); switch_mutex_lock(globals.listener_count_mutex);
prefs.threads--; prefs.threads--;
@ -1220,8 +1181,6 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool); switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool);
switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool); switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool);
/* TODO remove */
listener->dead = 0; /* born alive */
listener->sockfd = clientfd; listener->sockfd = clientfd;
listener->pool = pool; listener->pool = pool;
listener->ec = switch_core_alloc(listener->pool, sizeof(ei_cnode)); listener->ec = switch_core_alloc(listener->pool, sizeof(ei_cnode));
@ -1233,49 +1192,16 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
switch_core_hash_init(&listener->event_hash, listener->pool); switch_core_hash_init(&listener->event_hash, listener->pool);
switch_core_hash_init(&listener->sessions, listener->pool); switch_core_hash_init(&listener->sessions, listener->pool);
/* TODO listener rdlock */
listener->next = listen_list.listeners;
listen_list.listeners = listener;
return listener; return listener;
} }
/*TODO we don't need bottleneck*/
static listener_t *new_listener_locked(struct ei_cnode_s *ec, int clientfd)
{
listener_t *res;
switch_thread_rwlock_wrlock(globals.listener_rwlock);
res = new_listener(ec, clientfd);
switch_thread_rwlock_unlock(globals.listener_rwlock);
return res;
}
/* TODO new session??? */ static listener_t *new_outbound_listener_locked(char *node)
static listener_t *new_outbound_listener(char *node, switch_bool_t *new_session)
{ {
listener_t *listener = NULL; listener_t *listener = NULL;
struct ei_cnode_s ec; struct ei_cnode_s ec;
int clientfd; int clientfd;
/* TODO find listener func */
switch_thread_rwlock_wrlock(globals.listener_rwlock);
for (listener = listen_list.listeners; listener; listener = listener->next) {
if (!strncmp(node, listener->peer_nodename, MAXNODELEN)) {
break;
}
}
if (listener && listener->dead) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "found dead listener for %s\n", node);
remove_listener(listener); /* remove the dead listener and continue adding one */
} else if (listener) {
switch_thread_rwlock_unlock(globals.listener_rwlock);
*new_session = SWITCH_FALSE;
return listener;
}
if (SWITCH_STATUS_SUCCESS == initialise_ei(&ec)) { if (SWITCH_STATUS_SUCCESS == initialise_ei(&ec)) {
#ifdef WIN32 #ifdef WIN32
WSASetLastError(0); WSASetLastError(0);
@ -1284,7 +1210,7 @@ static listener_t *new_outbound_listener(char *node, switch_bool_t *new_session)
#endif #endif
if ((clientfd = ei_connect(&ec, node)) < 0) { if ((clientfd = ei_connect(&ec, node)) < 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error connecting to node %s (erl_errno=%d, errno=%d)!\n", node, erl_errno, errno); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error connecting to node %s (erl_errno=%d, errno=%d)!\n", node, erl_errno, errno);
switch_thread_rwlock_unlock(globals.listener_rwlock);
return NULL; return NULL;
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "new listener for %s\n", node); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "new listener for %s\n", node);
@ -1292,12 +1218,49 @@ static listener_t *new_outbound_listener(char *node, switch_bool_t *new_session)
listener->peer_nodename = switch_core_strdup(listener->pool, node); listener->peer_nodename = switch_core_strdup(listener->pool, node);
} }
switch_thread_rwlock_unlock(globals.listener_rwlock); switch_thread_rwlock_rdlock(listener->rwlock);
*new_session = SWITCH_TRUE;
return listener; return listener;
} }
void destroy_listener(listener_t * listener)
{
session_elem_t *s = NULL;
const void *key;
void *value;
switch_hash_index_t *iter;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Session complete, waiting for children\n");
switch_thread_rwlock_wrlock(listener->rwlock);
if (listener->sockfd) {
close_socket(&listener->sockfd);
}
switch_core_hash_destroy(&listener->event_hash);
/* remove any bindings for this connection */
remove_binding(listener, NULL);
/* clean up all the attached sessions */
switch_thread_rwlock_wrlock(listener->session_rwlock);
for (iter = switch_hash_first(NULL, listener->sessions); iter; iter = switch_hash_next(iter)) {
switch_hash_this(iter, &key, NULL, &value);
s = (session_elem_t*)value;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Orphaning call %s\n", s->uuid_str);
remove_session_elem_from_listener(listener, s);
destroy_session_elem(s);
}
switch_thread_rwlock_unlock(listener->session_rwlock);
switch_thread_rwlock_unlock(listener->rwlock);
if (listener->pool) {
switch_memory_pool_t *pool = listener->pool;
switch_core_destroy_memory_pool(&pool);
}
}
static switch_status_t state_handler(switch_core_session_t *session) static switch_status_t state_handler(switch_core_session_t *session)
{ {
switch_channel_t *channel = switch_core_session_get_channel(session); switch_channel_t *channel = switch_core_session_get_channel(session);
@ -1499,7 +1462,6 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
char *argv[80] = { 0 }, *argv2[80] = { 0 }; char *argv[80] = { 0 }, *argv2[80] = { 0 };
char *mydata, *myarg; char *mydata, *myarg;
char uuid[SWITCH_UUID_FORMATTED_LENGTH + 1]; char uuid[SWITCH_UUID_FORMATTED_LENGTH + 1];
switch_bool_t new_session = SWITCH_FALSE;
session_elem_t *session_element = NULL; session_elem_t *session_element = NULL;
/* process app arguments */ /* process app arguments */
@ -1541,31 +1503,19 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n", argv[0], node); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "enter erlang_outbound_function %s %s\n", argv[0], node);
/* first work out if there is a listener already talking to the node we want to talk to */ /* first work out if there is a listener already talking to the node we want to talk to */
listener = find_listener(node); listener = find_listener_locked(node);
/* if there is no listener, then create one */ /* if there is no listener, then create one */
if (!listener) { if (!listener) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for session\n"); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for session\n");
listener = new_outbound_listener(node, &new_session); if ((listener = new_outbound_listener_locked(node))) {
/* XXX new_session isn't accurate now */
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for session\n");
/* TODO don't we need to connect ? */
}
/* TODO it's too late */
switch_thread_rwlock_rdlock(globals.listener_rwlock);
if (listener && !listener->dead) {
/* prevent the listener_run thread from destroying the listener out from under us */
/* get the listener lock */
switch_thread_rwlock_rdlock(listener->rwlock);
/* release the global listener lock, since the listener can't be freed without the listener lock */
switch_thread_rwlock_unlock(globals.listener_rwlock);
if (new_session == SWITCH_TRUE) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching new listener\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Launching new listener\n");
launch_listener_thread(listener); launch_listener_thread(listener);
} }
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for session\n");
}
if (listener) {
if (module && function) { if (module && function) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n"); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n");
@ -1575,10 +1525,8 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
session_element = attach_call_to_registered_process(listener, reg_name, session); session_element = attach_call_to_registered_process(listener, reg_name, session);
} }
/* should be safe now */
switch_thread_rwlock_unlock(listener->rwlock); switch_thread_rwlock_unlock(listener->rwlock);
if (session_element) { if (session_element) {
switch_ivr_park(session, NULL); switch_ivr_park(session, NULL);
} }
@ -1600,7 +1548,6 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function)
char *mydata; char *mydata;
ei_x_buff buf; ei_x_buff buf;
listener_t *listener; listener_t *listener;
switch_bool_t new_session;
ei_x_new_with_version(&buf); ei_x_new_with_version(&buf);
@ -1623,16 +1570,18 @@ SWITCH_STANDARD_APP(erlang_sendmsg_function)
ei_x_encode_atom(&buf, "freeswitch_sendmsg"); ei_x_encode_atom(&buf, "freeswitch_sendmsg");
_ei_x_encode_string(&buf, argv[2]); _ei_x_encode_string(&buf, argv[2]);
listener = find_listener(node); listener = find_listener_locked(node);
if (!listener) { if (!listener) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for sendmsg %s\n", node); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Creating new listener for sendmsg %s\n", node);
listener = new_outbound_listener(node, &new_session); listener = new_outbound_listener_locked(node);
} else { } else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for sendmsg to %s\n", node); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Using existing listener for sendmsg to %s\n", node);
} }
if (listener && !listener->dead) { if (listener) {
ei_reg_send(listener->ec, listener->sockfd, reg_name, buf.buff, buf.index); ei_reg_send(listener->ec, listener->sockfd, reg_name, buf.buff, buf.index);
switch_thread_rwlock_unlock(listener->rwlock);
} }
} }
@ -1927,7 +1876,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
continue; continue;
} }
listener = new_listener_locked(&ec, clientfd); listener = new_listener(&ec, clientfd);
if (listener) { if (listener) {
/* store the IP and node name we are talking with */ /* store the IP and node name we are talking with */
switch_inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip)); switch_inet_ntop(AF_INET, conn.ipadr, listener->remote_ip, sizeof(listener->remote_ip));

View File

@ -114,7 +114,6 @@ struct listener {
#else #else
int sockfd; int sockfd;
#endif #endif
uint8_t dead;
struct ei_cnode_s *ec; struct ei_cnode_s *ec;
struct erlang_process log_process; struct erlang_process log_process;
struct erlang_process event_process; struct erlang_process event_process;