diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index a4ad11c9b9..bbfe64e149 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -68,6 +68,7 @@ struct listener { switch_log_level_t level; uint8_t event_list[SWITCH_EVENT_ALL + 1]; switch_hash_t *event_hash; + switch_hash_t *fetch_reply_hash; switch_thread_rwlock_t *rwlock; switch_core_session_t *session; int lost_events; @@ -261,18 +262,16 @@ static void remove_binding(listener_t *listener) { } -static void ei_encode_switch_event(ei_x_buff *ebuf, switch_event_t *event) +static void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event) { int i; switch_event_header_t *hp; - for (i = 0, hp = event->headers; hp; hp = hp->next, i++) { - } + + for (i = 0, hp = event->headers; hp; hp = hp->next, i++); if (event->body) i++; - ei_x_encode_tuple_header(ebuf, 2); - ei_x_encode_atom(ebuf, "event"); ei_x_encode_list_header(ebuf, i); for (hp = event->headers; hp; hp = hp->next) { @@ -291,6 +290,18 @@ static void ei_encode_switch_event(ei_x_buff *ebuf, switch_event_t *event) } +static void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag) +{ + + ei_x_encode_tuple_header(ebuf, 2); + ei_x_encode_atom(ebuf, tag); + ei_encode_switch_event_headers(ebuf, event); +} + + +#define ei_encode_switch_event(_b, _e) ei_encode_switch_event_tag(_b, _e, "event") + + static void event_handler(switch_event_t *event) { switch_event_t *clone = NULL; @@ -534,11 +545,15 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) } -static switch_xml_t erlang_fetch (const char *sectionstr, const char *tag_name, const char *key_name, const char *key_value, +static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, const char *key_name, const char *key_value, switch_event_t *params, void *user_data) { switch_xml_t xml = NULL; struct erlang_binding *ptr; + switch_uuid_t uuid; + char uuid_str[SWITCH_UUID_FORMATTED_LENGTH+1]; + ei_x_buff buf; + ei_x_new_with_version(&buf); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "looking for bindings\n"); @@ -558,11 +573,44 @@ static switch_xml_t erlang_fetch (const char *sectionstr, const char *tag_name, switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "binding for %s in section %s with key %s and value %s requested from node %s\n", tag_name, sectionstr, key_name, key_value, ptr->pid.node); + switch_uuid_get(&uuid); + switch_uuid_format(uuid_str, &uuid); + + /*switch_event_add_header_string(params, SWITCH_STACK_BOTTOM, "Request-ID", uuid_str);*/ + + ei_x_encode_tuple_header(&buf, 7); + ei_x_encode_atom(&buf, "fetch"); + ei_x_encode_atom(&buf, sectionstr); + ei_x_encode_string(&buf, tag_name ? tag_name : "undefined"); + ei_x_encode_string(&buf, key_name ? key_name : "undefined"); + ei_x_encode_string(&buf, key_value ? key_value : "undefined"); + ei_x_encode_string(&buf, uuid_str); + ei_encode_switch_event_headers(&buf, params); + + /*switch_core_hash_insert(ptr->reply_hash, uuid_str, );*/ switch_mutex_lock(ptr->listener->sock_mutex); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "It's a lock!\n"); + ei_send(ptr->listener->sockfd, &ptr->pid, buf.buff, buf.index); switch_mutex_unlock(ptr->listener->sock_mutex); + int i = 0; + ei_x_buff *rep; + /*int index = 3;*/ + while (!(rep = (ei_x_buff *) switch_core_hash_find(ptr->listener->fetch_reply_hash, uuid_str))) { + if (i > 50) { /* half a second timeout */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "timed out!\n"); + return NULL; + } + i++; + switch_yield(10000); /* 10ms */ + } + + char data[MAXATOMLEN]; + + ei_decode_atom(rep->buff, &rep->index, data); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got data %s after %d milliseconds!\n", data, i*10); + return xml; } @@ -588,7 +636,29 @@ static int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_ break; } - if (!strncmp(tupletag, "set_log_level", MAXATOMLEN)) { + if (!strncmp(tupletag, "fetch_reply", MAXATOMLEN)) { + char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; + + if (ei_decode_string(buf->buff, &buf->index, uuid_str)) { + ei_x_encode_tuple_header(rbuf, 2); + ei_x_encode_atom(rbuf, "error"); + ei_x_encode_atom(rbuf, "badarg"); + break; + } + + ei_x_buff *nbuf = switch_core_alloc(listener->pool, sizeof(nbuf)); + /*char *wtf = "hello world";*/ + nbuf->buff = switch_core_alloc(listener->pool, buf->buffsz); + memcpy(nbuf->buff, buf->buff, buf->buffsz); + /*memcpy(nbuf, wtf, 20);*/ + nbuf->index = buf->index; + nbuf->buffsz = buf->buffsz; + + /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "stored %d %d %s\n", buf->index, buf->buffsz, nbuf);*/ + + switch_core_hash_insert(listener->fetch_reply_hash, uuid_str, nbuf); + + } else if (!strncmp(tupletag, "set_log_level", MAXATOMLEN)) { if (arity == 2) { switch_log_level_t ltype = SWITCH_LOG_DEBUG; char loglevelstr[MAXATOMLEN]; @@ -930,6 +1000,8 @@ sendmsg_fail: binding->pid = msg->from; binding->listener = listener; + switch_core_hash_init(&listener->fetch_reply_hash, listener->pool); + switch_mutex_lock(globals.listener_mutex); for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);