Replace busy wait for XML fetch with thread cond stuff (Original patch contributed by Michael Fig / MarkeTel Systems)

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@16697 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Andrew Thompson 2010-02-18 21:55:46 +00:00
parent 1f8c1112d6
commit 6107493160
3 changed files with 135 additions and 67 deletions

View File

@ -166,7 +166,7 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj)
static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff * buf, ei_x_buff * rbuf)
{
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
void *p;
fetch_reply_t *p;
if (ei_decode_string_or_binary(buf->buff, &buf->index, SWITCH_UUID_FORMATTED_LENGTH, uuid_str)) {
ei_x_encode_tuple_header(rbuf, 2);
@ -179,20 +179,40 @@ static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff *
nbuf->index = buf->index;
nbuf->buffsz = buf->buffsz;
if ((p = switch_core_hash_find(listener->fetch_reply_hash, uuid_str))) {
if (p == &globals.TIMEOUT) {
switch_mutex_lock(globals.fetch_reply_mutex);
if ((p = switch_core_hash_find(globals.fetch_reply_hash, uuid_str))) {
/* Get the status and release the lock ASAP. */
enum { is_timeout, is_waiting, is_filled } status;
if (p->state == reply_not_ready) {
switch_thread_cond_wait(p->ready_or_found, globals.fetch_reply_mutex);
}
if (p->state == reply_waiting) {
/* update the key with a reply */
status = is_waiting;
p->reply = nbuf;
p->state = reply_found;
strncpy(p->winner, listener->peer_nodename, MAXNODELEN);
switch_thread_cond_broadcast(p->ready_or_found);
} else if (p->state == reply_timeout) {
status = is_timeout;
} else {
status = is_filled;
}
put_reply_unlock(p, uuid_str);
/* Relay the status back to the fetch responder. */
if (status == is_waiting) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", uuid_str);
ei_x_encode_atom(rbuf, "ok");
/* Return here to avoid freeing the reply. */
return SWITCH_STATUS_SUCCESS;
} else if (status == is_timeout) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Handler for %s timed out\n", uuid_str);
switch_core_hash_delete(listener->fetch_reply_hash, uuid_str);
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "timeout");
} else if (p == &globals.WAITING) {
/* update the key to point at a pid */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", uuid_str);
switch_core_hash_delete(listener->fetch_reply_hash, uuid_str);
switch_core_hash_insert(listener->fetch_reply_hash, uuid_str, nbuf);
ei_x_encode_atom(rbuf, "ok");
return SWITCH_STATUS_SUCCESS;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", uuid_str);
ei_x_encode_tuple_header(rbuf, 2);
@ -200,14 +220,14 @@ static switch_status_t handle_msg_fetch_reply(listener_t *listener, ei_x_buff *
ei_x_encode_atom(rbuf, "duplicate_response");
}
} else {
/* nothin in the hash */
/* nothing in the hash */
switch_mutex_unlock(globals.fetch_reply_mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Empty slot for %s\n", uuid_str);
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "invalid_uuid");
}
/*switch_core_hash_insert(listener->fetch_reply_hash, uuid_str, nbuf); */
switch_safe_free(nbuf->buff);
switch_safe_free(nbuf);
}

View File

@ -41,6 +41,8 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown);
SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime);
SWITCH_MODULE_DEFINITION(mod_erlang_event, mod_erlang_event_load, mod_erlang_event_shutdown, mod_erlang_event_runtime);
static switch_memory_pool_t *module_pool = NULL;
static void remove_listener(listener_t *listener);
static switch_status_t state_handler(switch_core_session_t *session);
@ -377,7 +379,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1];
int type, size;
int i = 0;
void *p = NULL;
fetch_reply_t *p = NULL;
char *xmlstr;
struct erlang_binding *ptr;
switch_uuid_t uuid;
@ -391,25 +393,19 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
section = switch_xml_parse_section_string((char *) sectionstr);
for (ptr = bindings.head; ptr && ptr->section != section; ptr = ptr->next); /* just get the first match */
if (!ptr) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "no binding for %s\n", sectionstr);
return NULL;
}
if (!ptr->listener) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "NULL pointer binding!\n");
return NULL; /* our pointer is trash */
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name,
sectionstr, key_name, key_value, ptr->process.pid.node);
switch_uuid_get(&uuid);
switch_uuid_format(uuid_str, &uuid);
/*switch_event_add_header_string(params, SWITCH_STACK_BOTTOM, "Request-ID", uuid_str); */
for (ptr = bindings.head; ptr; ptr = ptr->next) {
if (ptr->section != section)
continue;
if (!ptr->listener) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "NULL pointer binding!\n");
goto cleanup; /* our pointer is trash */
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->process.pid.node);
ei_x_encode_tuple_header(&buf, 7);
ei_x_encode_atom(&buf, "fetch");
@ -424,41 +420,62 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
ei_x_encode_empty_list(&buf);
}
switch_core_hash_insert(ptr->listener->fetch_reply_hash, uuid_str, &globals.WAITING);
if (!p) {
/* Create a new fetch object. */
p = malloc(sizeof(*p));
switch_thread_cond_create(&p->ready_or_found, module_pool);
p->usecount = 1;
p->state = reply_not_ready;
p->reply = NULL;
switch_core_hash_insert_locked(globals.fetch_reply_hash, uuid_str, p, globals.fetch_reply_mutex);
}
/* We don't need to lock here because everybody is waiting
on our condition before the action starts. */
p->usecount ++;
switch_mutex_lock(ptr->listener->sock_mutex);
ei_sendto(ptr->listener->ec, ptr->listener->sockfd, &ptr->process, &buf);
switch_mutex_unlock(ptr->listener->sock_mutex);
}
while (!(p = switch_core_hash_find(ptr->listener->fetch_reply_hash, uuid_str)) || p == &globals.WAITING) {
if (i > 50) { /* half a second timeout */
if (!p) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "no binding for %s\n", sectionstr);
goto cleanup;
}
/* Tell the threads to be ready, and wait five seconds for a reply. */
switch_mutex_lock(globals.fetch_reply_mutex);
p->state = reply_waiting;
switch_thread_cond_broadcast(p->ready_or_found);
switch_thread_cond_timedwait(p->ready_or_found,
globals.fetch_reply_mutex, 5000000);
if (!p->reply) {
p->state = reply_timeout;
switch_mutex_unlock(globals.fetch_reply_mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for XML fetch response\n");
switch_core_hash_insert(ptr->listener->fetch_reply_hash, uuid_str, &globals.TIMEOUT); /* TODO lock this? */
return NULL;
}
i++;
switch_yield(10000); /* 10ms */
goto cleanup;
}
rep = (ei_x_buff *) p;
rep = p->reply;
switch_mutex_unlock(globals.fetch_reply_mutex);
ei_get_type(rep->buff, &rep->index, &type, &size);
if (type != ERL_STRING_EXT && type != ERL_BINARY_EXT) { /* XXX no unicode or character codes > 255 */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "XML fetch response contained non ASCII characters? (was type %d of size %d)\n", type,
size);
return NULL;
goto cleanup;
}
if (!(xmlstr = malloc(size + 1))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error\n");
return NULL;
goto cleanup;
}
ei_decode_string_or_binary(rep->buff, &rep->index, size, xmlstr);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got data %s after %d milliseconds!\n", xmlstr, i * 10);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got data %s after %d milliseconds from %s!\n", xmlstr, i * 10, p->winner);
if (zstr(xmlstr)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "No Result\n");
@ -469,14 +486,31 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
}
/* cleanup */
switch_core_hash_delete(ptr->listener->fetch_reply_hash, uuid_str);
switch_safe_free(rep->buff);
switch_safe_free(rep);
cleanup:
if (p) {
switch_mutex_lock(globals.fetch_reply_mutex);
put_reply_unlock(p, uuid_str);
}
return xml;
}
void put_reply_unlock(fetch_reply_t *p, char *uuid_str)
{
if (-- p->usecount == 0) {
switch_core_hash_delete(globals.fetch_reply_hash, uuid_str);
switch_thread_cond_destroy(p->ready_or_found);
if (p->reply) {
switch_safe_free(p->reply->buff);
switch_safe_free(p->reply);
}
switch_safe_free(p);
}
switch_mutex_unlock(globals.fetch_reply_mutex);
}
static switch_status_t notify_new_session(listener_t *listener, session_elem_t *session_element)
{
int result;
@ -921,8 +955,6 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
prefs.threads++;
switch_mutex_unlock(globals.listener_mutex);
switch_core_hash_init(&listener->fetch_reply_hash, listener->pool);
switch_assert(listener != NULL);
if (check_inbound_acl(listener)) {
@ -1548,9 +1580,13 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
switch_application_interface_t *app_interface;
switch_api_interface_t *api_interface;
module_pool = pool;
memset(&prefs, 0, sizeof(prefs));
switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
switch_mutex_init(&globals.fetch_reply_mutex, SWITCH_MUTEX_DEFAULT, pool);
switch_core_hash_init(&globals.fetch_reply_hash, pool);
/* intialize the unique reference stuff */
switch_mutex_init(&globals.ref_mutex, SWITCH_MUTEX_NESTED, pool);

View File

@ -103,7 +103,6 @@ struct listener {
switch_log_level_t level;
uint8_t event_list[SWITCH_EVENT_ALL + 1];
switch_hash_t *event_hash;
switch_hash_t *fetch_reply_hash;
switch_hash_t *spawn_pid_hash;
switch_thread_rwlock_t *rwlock;
switch_mutex_t *session_mutex;
@ -140,15 +139,27 @@ struct api_command_struct {
struct globals_struct {
switch_mutex_t *listener_mutex;
switch_event_node_t *node;
switch_mutex_t *ref_mutex;
switch_mutex_t *fetch_reply_mutex;
switch_hash_t *fetch_reply_hash;
unsigned int reference0;
unsigned int reference1;
unsigned int reference2;
char TIMEOUT; /* marker for a timed out request */
char WAITING; /* marker for a request waiting for a response */
switch_mutex_t *ref_mutex;
};
typedef struct globals_struct globals_t;
struct fetch_reply_struct
{
switch_thread_cond_t *ready_or_found;
int usecount;
enum { reply_not_ready, reply_waiting, reply_found, reply_timeout } state;
ei_x_buff *reply;
char winner[MAXNODELEN + 1];
};
typedef struct fetch_reply_struct fetch_reply_t;
struct listen_list_struct {
#ifdef WIN32
SOCKET sockfd;
@ -236,6 +247,7 @@ switch_status_t initialise_ei(struct ei_cnode_s *ec);
session_elem_t *attach_call_to_registered_process(listener_t *listener, char *reg_name, switch_core_session_t *session);
session_elem_t *attach_call_to_pid(listener_t *listener, erlang_pid * pid, switch_core_session_t *session);
session_elem_t *attach_call_to_spawned_process(listener_t *listener, char *module, char *function, switch_core_session_t *session);
void put_reply_unlock(fetch_reply_t *p, char *uuid_str);
/* For Emacs:
* Local Variables: