From 4348c7bca2944594187d311cd94cc7101f7150b5 Mon Sep 17 00:00:00 2001 From: Mathieu Rene Date: Wed, 10 Jun 2009 06:39:08 +0000 Subject: [PATCH] FSCORE-379 git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@13746 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- .../mod_erlang_event/mod_erlang_event.c | 214 ++++++++++-------- 1 file changed, 117 insertions(+), 97 deletions(-) diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index 9f7df23607..3ea86e9bdf 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -998,6 +998,7 @@ static listener_t* new_listener(struct ei_cnode_s *ec, int clientfd) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n"); return NULL; } + memset(listener, 0, sizeof(*listener)); switch_thread_rwlock_create(&listener->rwlock, listener_pool); switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener_pool); @@ -1039,126 +1040,145 @@ static listener_t* new_outbound_listener(char* node) return listener; } +static switch_status_t state_handler(switch_core_session_t *session) +{ + switch_channel_t *channel = switch_core_session_get_channel(session); + switch_channel_state_t state = switch_channel_get_state(channel); + + if (state >= CS_HANGUP) { + session_elem_t *session_element = switch_channel_get_private(channel, "_erlang_session_"); + listener_t* listener = switch_channel_get_private(channel, "_erlang_listener_"); + + if (session_element && listener) { + remove_session_elem_from_listener(listener, session_element); + } + + switch_core_event_hook_remove_state_change(session, state_handler); + } + + return SWITCH_STATUS_SUCCESS; +} + +session_elem_t *session_elem_create(listener_t* listener, switch_core_session_t *session) +{ + /* create a session list element */ + session_elem_t* session_element = switch_core_session_alloc(session, sizeof(*session_element)); + switch_channel_t *channel = switch_core_session_get_channel(session); + + memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH); + + switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); + switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); + + switch_channel_set_private(channel, "_erlang_session_", session_element); + switch_channel_set_private(channel, "_erlang_listener_", listener); + + switch_core_event_hook_add_state_change(session, state_handler); + + return session_element; +} session_elem_t* attach_call_to_registered_process(listener_t* listener, char* reg_name, switch_core_session_t *session) { /* create a session list element */ - session_elem_t* session_element = NULL; - if (!(session_element = switch_core_session_alloc(session, sizeof(*session_element)))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n"); - } else { - memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH); - session_element->process.type = ERLANG_REG_PROCESS; - session_element->process.reg_name = switch_core_strdup(switch_core_session_get_pool(session),reg_name); - switch_set_flag(session_element, LFLAG_SESSION_ALIVE); - switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); - switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); - switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); - /* attach the session to the listener */ - add_session_elem_to_listener(listener,session_element); - } + session_elem_t* session_element = session_elem_create(listener, session); + + session_element->process.type = ERLANG_REG_PROCESS; + session_element->process.reg_name = switch_core_session_strdup(session, reg_name); + switch_set_flag(session_element, LFLAG_SESSION_ALIVE); + /* attach the session to the listener */ + add_session_elem_to_listener(listener,session_element); + return session_element; } session_elem_t* attach_call_to_pid(listener_t* listener, erlang_pid* pid, switch_core_session_t *session) { /* create a session list element */ - session_elem_t* session_element = NULL; - if (!(session_element = switch_core_session_alloc(session, sizeof(*session_element)))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n"); - } else { - memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH); - session_element->process.type = ERLANG_PID; - memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); - switch_set_flag(session_element, LFLAG_SESSION_ALIVE); - switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); - switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); - switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); - /* attach the session to the listener */ - add_session_elem_to_listener(listener,session_element); - - ei_link(listener, ei_self(listener->ec), pid); - } + session_elem_t* session_element = session_elem_create(listener, session); + + session_element->process.type = ERLANG_PID; + memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); + switch_set_flag(session_element, LFLAG_SESSION_ALIVE); + /* attach the session to the listener */ + add_session_elem_to_listener(listener,session_element); + ei_link(listener, ei_self(listener->ec), pid); + return session_element; } session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *module, char *function, switch_core_session_t *session) { /* create a session list element */ - session_elem_t* session_element=NULL; - if (!(session_element = switch_core_session_alloc(session, sizeof(*session_element)))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to allocate session element\n"); - } else { - char hash[100]; - int i = 0; - void *p = NULL; + session_elem_t* session_element = session_elem_create(listener, session); + char hash[100]; + int i = 0; + void *p = NULL; + erlang_pid *pid; + erlang_ref ref; - memcpy(session_element->uuid_str, switch_core_session_get_uuid(session), SWITCH_UUID_FORMATTED_LENGTH); - erlang_pid *pid; - erlang_ref ref; + switch_set_flag(session_element, LFLAG_WAITING_FOR_PID); + + /* attach the session to the listener */ + add_session_elem_to_listener(listener,session_element); - switch_set_flag(session_element, LFLAG_WAITING_FOR_PID); - switch_queue_create(&session_element->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session)); - switch_mutex_init(&session_element->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); - /* attach the session to the listener */ - add_session_elem_to_listener(listener,session_element); + ei_init_ref(listener->ec, &ref); + ei_hash_ref(&ref, hash); + /* insert the waiting marker */ + switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.WAITING); - ei_init_ref(listener->ec, &ref); - ei_hash_ref(&ref, hash); - /* insert the waiting marker */ - switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.WAITING); + if (!strcmp(function, "!")) { + /* send a message to request a pid */ + ei_x_buff rbuf; + ei_x_new_with_version(&rbuf); - if (!strcmp(function, "!")) { - /* send a message to request a pid */ - ei_x_buff rbuf; - ei_x_new_with_version(&rbuf); - - ei_x_encode_tuple_header(&rbuf, 3); - ei_x_encode_atom(&rbuf, "new_pid"); - ei_x_encode_ref(&rbuf, &ref); - ei_x_encode_pid(&rbuf, ei_self(listener->ec)); - /* should lock with mutex? */ - ei_reg_send(listener->ec, listener->sockfd, module, rbuf.buff, rbuf.index); + ei_x_encode_tuple_header(&rbuf, 3); + ei_x_encode_atom(&rbuf, "new_pid"); + ei_x_encode_ref(&rbuf, &ref); + ei_x_encode_pid(&rbuf, ei_self(listener->ec)); + /* should lock with mutex? */ + ei_reg_send(listener->ec, listener->sockfd, module, rbuf.buff, rbuf.index); #ifdef EI_DEBUG - ei_x_print_reg_msg(&rbuf, module, 1); + ei_x_print_reg_msg(&rbuf, module, 1); #endif - ei_x_free(&rbuf); - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "rpc call: %s:%s(Ref)\n", module, function); - /* should lock with mutex? */ - ei_pid_from_rpc(listener->ec, listener->sockfd, &ref, module, function); - /* - char *argv[1]; - ei_spawn(listener->ec, listener->sockfd, &ref, module, function, 0, argv); - */ - } - - /* loop until either we timeout or we get a value that's not the waiting marker */ - while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) { - if (i > 50) { /* half a second timeout */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n"); - remove_session_elem_from_listener(listener,session_element); - switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */ - return NULL; - } - i++; - switch_yield(10000); /* 10ms */ - } - - switch_core_hash_delete(listener->spawn_pid_hash, hash); - - pid = (erlang_pid *) p; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got pid!\n"); - - session_element->process.type = ERLANG_PID; - memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); - switch_set_flag(session_element, LFLAG_SESSION_ALIVE); - switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); - switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID); - - ei_link(listener, ei_self(listener->ec), pid); - switch_safe_free(pid); /* malloced in handle_ref_tuple */ + ei_x_free(&rbuf); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "rpc call: %s:%s(Ref)\n", module, function); + /* should lock with mutex? */ + ei_pid_from_rpc(listener->ec, listener->sockfd, &ref, module, function); + /* + char *argv[1]; + ei_spawn(listener->ec, listener->sockfd, &ref, module, function, 0, argv); + */ } + + /* loop until either we timeout or we get a value that's not the waiting marker */ + while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) { + if (i > 50) { /* half a second timeout */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n"); + remove_session_elem_from_listener(listener,session_element); + switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */ + return NULL; + } + i++; + switch_yield(10000); /* 10ms */ + } + + switch_core_hash_delete(listener->spawn_pid_hash, hash); + + pid = (erlang_pid *) p; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got pid!\n"); + + session_element->process.type = ERLANG_PID; + memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); + + switch_set_flag(session_element, LFLAG_SESSION_ALIVE); + switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); + switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID); + + ei_link(listener, ei_self(listener->ec), pid); + switch_safe_free(pid); /* malloced in handle_ref_tuple */ + return session_element; }