Patch (with changes) from Micah Warren to add 'setevent' which is a mostly-atomic nixevent ALL + a event subscription
This commit is contained in:
parent
f6065262ff
commit
2e8ece4fba
|
@ -454,6 +454,132 @@ static switch_status_t handle_msg_session_nixevent(listener_t *listener, erlang_
|
||||||
return SWITCH_STATUS_SUCCESS;
|
return SWITCH_STATUS_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Nix's all events, then sets up a listener for the given ones.
|
||||||
|
// meant to ensure that no events are missed during this common operation.
|
||||||
|
static switch_status_t handle_msg_setevent(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
|
||||||
|
{
|
||||||
|
char atom[MAXATOMLEN];
|
||||||
|
|
||||||
|
if(arity == 1) {
|
||||||
|
ei_x_encode_tuple_header(rbuf, 2);
|
||||||
|
ei_x_encode_atom(rbuf, "error");
|
||||||
|
ei_x_encode_atom(rbuf, "badarg");
|
||||||
|
} else {
|
||||||
|
uint8_t event_list[SWITCH_EVENT_ALL + 1];
|
||||||
|
switch_hash_t *event_hash;
|
||||||
|
uint32_t x = 0;
|
||||||
|
int custom = 0;
|
||||||
|
switch_event_types_t type;
|
||||||
|
int i = 0;
|
||||||
|
|
||||||
|
/* clear any previous event registrations */
|
||||||
|
for( x = 0; x <= SWITCH_EVENT_ALL; x++){
|
||||||
|
event_list[x] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* create new hash */
|
||||||
|
switch_core_hash_init(&event_hash, listener->pool);
|
||||||
|
|
||||||
|
if(!switch_test_flag(listener, LFLAG_EVENTS)) {
|
||||||
|
switch_set_flag_locked(listener, LFLAG_EVENTS);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(i = 1; i < arity; i++){
|
||||||
|
if(!ei_decode_atom(buf->buff, &buf->index, atom)){
|
||||||
|
|
||||||
|
if(custom){
|
||||||
|
switch_core_hash_insert(event_hash, atom, MARKER);
|
||||||
|
} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
|
||||||
|
if (type == SWITCH_EVENT_ALL) {
|
||||||
|
ei_x_encode_tuple_header(rbuf, 2);
|
||||||
|
ei_x_encode_atom(rbuf, "error");
|
||||||
|
ei_x_encode_atom(rbuf, "badarg");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (type <= SWITCH_EVENT_ALL) {
|
||||||
|
event_list[type] = 1;
|
||||||
|
}
|
||||||
|
if (type == SWITCH_EVENT_CUSTOM) {
|
||||||
|
custom++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* update the event subscriptions with the new ones */
|
||||||
|
memcpy(listener->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
|
||||||
|
/* wipe the old hash, and point the pointer at the new one */
|
||||||
|
switch_core_hash_destroy(&listener->event_hash);
|
||||||
|
listener->event_hash = event_hash;
|
||||||
|
|
||||||
|
/* TODO - we should flush any non-matching events from the queue */
|
||||||
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
|
}
|
||||||
|
return SWITCH_STATUS_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static switch_status_t handle_msg_session_setevent(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
|
||||||
|
{
|
||||||
|
char atom[MAXATOMLEN];
|
||||||
|
|
||||||
|
if (arity == 1){
|
||||||
|
ei_x_encode_tuple_header(rbuf, 2);
|
||||||
|
ei_x_encode_atom(rbuf, "error");
|
||||||
|
ei_x_encode_atom(rbuf, "badarg");
|
||||||
|
} else {
|
||||||
|
session_elem_t *session;
|
||||||
|
if ((session = find_session_elem_by_pid(listener, &msg->from))) {
|
||||||
|
uint8_t event_list[SWITCH_EVENT_ALL + 1];
|
||||||
|
switch_hash_t *event_hash;
|
||||||
|
int custom = 0;
|
||||||
|
int i = 0;
|
||||||
|
switch_event_types_t type;
|
||||||
|
uint32_t x = 0;
|
||||||
|
|
||||||
|
/* clear any previous event registrations */
|
||||||
|
for (x = 0; x <= SWITCH_EVENT_ALL; x++){
|
||||||
|
event_list[x] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* create new hash */
|
||||||
|
switch_core_hash_init(&event_hash, session->pool);
|
||||||
|
|
||||||
|
for (i = 1; i < arity; i++){
|
||||||
|
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
|
||||||
|
if (custom) {
|
||||||
|
switch_core_hash_insert(event_hash, atom, MARKER);
|
||||||
|
} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
|
||||||
|
if (type == SWITCH_EVENT_ALL) {
|
||||||
|
ei_x_encode_tuple_header(rbuf, 1);
|
||||||
|
ei_x_encode_atom(rbuf, "error");
|
||||||
|
ei_x_encode_atom(rbuf, "badarg");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (type <= SWITCH_EVENT_ALL) {
|
||||||
|
event_list[type] = 1;
|
||||||
|
}
|
||||||
|
if (type == SWITCH_EVENT_CUSTOM) {
|
||||||
|
custom++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* update the event subscriptions with the new ones */
|
||||||
|
memcpy(session->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
|
||||||
|
/* wipe the old hash, and point the pointer at the new one */
|
||||||
|
switch_core_hash_destroy(&session->event_hash);
|
||||||
|
session->event_hash = event_hash;
|
||||||
|
/* TODO - we should flush any non-matching events from the queue */
|
||||||
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
|
} else { /* no session for this pid */
|
||||||
|
ei_x_encode_tuple_header(rbuf, 2);
|
||||||
|
ei_x_encode_atom(rbuf, "error");
|
||||||
|
ei_x_encode_atom(rbuf, "notlistening");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return SWITCH_STATUS_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static switch_status_t handle_msg_api(listener_t *listener, erlang_msg * msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
|
static switch_status_t handle_msg_api(listener_t *listener, erlang_msg * msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
|
||||||
{
|
{
|
||||||
|
@ -772,6 +898,10 @@ static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg * msg,
|
||||||
ret = handle_msg_handlecall(listener, msg, arity, buf, rbuf);
|
ret = handle_msg_handlecall(listener, msg, arity, buf, rbuf);
|
||||||
} else if (!strncmp(tupletag, "rex", MAXATOMLEN)) {
|
} else if (!strncmp(tupletag, "rex", MAXATOMLEN)) {
|
||||||
ret = handle_msg_rpcresponse(listener, msg, arity, buf, rbuf);
|
ret = handle_msg_rpcresponse(listener, msg, arity, buf, rbuf);
|
||||||
|
} else if (!strncmp(tupletag, "setevent", MAXATOMLEN)) {
|
||||||
|
ret = handle_msg_setevent(listener, msg, arity, buf, rbuf);
|
||||||
|
} else if (!strncmp(tupletag, "session_setevent", MAXATOMLEN)) {
|
||||||
|
ret = handle_msg_session_setevent(listener, msg, arity, buf, rbuf);
|
||||||
} else {
|
} else {
|
||||||
ei_x_encode_tuple_header(rbuf, 2);
|
ei_x_encode_tuple_header(rbuf, 2);
|
||||||
ei_x_encode_atom(rbuf, "error");
|
ei_x_encode_atom(rbuf, "error");
|
||||||
|
|
Loading…
Reference in New Issue