FS-7354: Filter feature ported from mod_event_socket to mod_erlang_event

This commit is contained in:
Sergey Safarov 2015-03-09 10:55:28 +03:00
parent 5e43c6dd25
commit 57aba47f6e
3 changed files with 150 additions and 0 deletions

View File

@ -313,6 +313,87 @@ static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_bu
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
static switch_status_t handle_msg_filter(listener_t *listener, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
{
char atom[MAXATOMLEN];
char reply[MAXATOMLEN]= "";
char *header_name = NULL;
char *header_val = NULL;
if (arity == 1) {
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badarg");
} else {
int i = 0;
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "filter_command_processing_log");
ei_x_encode_list_header(rbuf, arity - 1);
switch_thread_rwlock_wrlock(listener->event_rwlock);
switch_mutex_lock(listener->filter_mutex);
if (!listener->filters) {
switch_event_create_plain(&listener->filters, SWITCH_EVENT_CLONE);
switch_clear_flag(listener->filters, EF_UNIQ_HEADERS);
}
for (i = 1; i < arity; i++) {
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
header_name=atom;
while (header_name && *header_name && *header_name == ' ')
header_name++;
if ((header_val = strchr(atom, ' '))) {
*header_val++ = '\0';
}
if (!strcasecmp(header_name, "delete") && header_val) {
header_name = header_val;
if ((header_val = strchr(header_name, ' '))) {
*header_val++ = '\0';
}
if (!strcasecmp(header_name, "all")) {
switch_event_destroy(&listener->filters);
switch_event_create_plain(&listener->filters, SWITCH_EVENT_CLONE);
} else {
switch_event_del_header_val(listener->filters, header_name, header_val);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "+OK filter deleted. [%s]=[%s]", header_name, switch_str_nil(header_val));
ei_x_encode_tuple_header(rbuf, 3);
_ei_x_encode_string(rbuf, "deleted");
_ei_x_encode_string(rbuf, header_name);
_ei_x_encode_string(rbuf, switch_str_nil(header_val));
} else if (header_val) {
if (!strcasecmp(header_name, "add")) {
header_name = header_val;
if ((header_val = strchr(header_name, ' '))) {
*header_val++ = '\0';
}
}
switch_event_add_header_string(listener->filters, SWITCH_STACK_BOTTOM, header_name, header_val);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "+OK filter added. [%s]=[%s]", header_name, header_val);
ei_x_encode_tuple_header(rbuf, 3);
_ei_x_encode_string(rbuf, "added");
_ei_x_encode_string(rbuf, header_name);
_ei_x_encode_string(rbuf, header_val);
} else {
switch_snprintf(reply, MAXATOMLEN, "-ERR invalid syntax");
ei_x_encode_atom(rbuf, "-ERR invalid syntax");
}
}
}
switch_mutex_unlock(listener->filter_mutex);
switch_thread_rwlock_unlock(listener->event_rwlock);
ei_x_encode_empty_list(rbuf);
}
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf) static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg *msg, int arity, ei_x_buff * buf, ei_x_buff * rbuf)
{ {
char atom[MAXATOMLEN]; char atom[MAXATOMLEN];
@ -975,6 +1056,8 @@ static switch_status_t handle_msg_tuple(listener_t *listener, erlang_msg * msg,
ret = handle_msg_set_log_level(listener, arity, buf, rbuf); ret = handle_msg_set_log_level(listener, arity, buf, rbuf);
} else if (!strncmp(tupletag, "event", MAXATOMLEN)) { } else if (!strncmp(tupletag, "event", MAXATOMLEN)) {
ret = handle_msg_event(listener, arity, buf, rbuf); ret = handle_msg_event(listener, arity, buf, rbuf);
} else if (!strncmp(tupletag, "filter", MAXATOMLEN)) {
ret = handle_msg_filter(listener, arity, buf, rbuf);
} else if (!strncmp(tupletag, "session_event", MAXATOMLEN)) { } else if (!strncmp(tupletag, "session_event", MAXATOMLEN)) {
ret = handle_msg_session_event(listener, msg, arity, buf, rbuf); ret = handle_msg_session_event(listener, msg, arity, buf, rbuf);
} else if (!strncmp(tupletag, "nixevent", MAXATOMLEN)) { } else if (!strncmp(tupletag, "nixevent", MAXATOMLEN)) {

View File

@ -215,6 +215,70 @@ static void event_handler(switch_event_t *event)
} }
} }
if (send) {
switch_mutex_lock(l->filter_mutex);
if (l->filters && l->filters->headers) {
switch_event_header_t *hp;
const char *hval;
for (hp = l->filters->headers; hp; hp = hp->next) {
if ((hval = switch_event_get_header(event, hp->name))) {
const char *comp_to = hp->value;
int pos = 1, cmp = 0;
while (comp_to && *comp_to) {
if (*comp_to == '+') {
pos = 1;
} else if (*comp_to == '-') {
pos = 0;
} else if (*comp_to != ' ') {
break;
}
comp_to++;
}
if (!(comp_to && *comp_to)) {
if (pos) {
send = 1;
continue;
} else {
send = 0;
break;
}
}
if (*hp->value == '/') {
switch_regex_t *re = NULL;
int ovector[30];
cmp = !!switch_regex_perform(hval, comp_to, &re, ovector, sizeof(ovector) / sizeof(ovector[0]));
switch_regex_safe_free(re);
} else {
cmp = !strcasecmp(hval, comp_to);
}
if (cmp) {
if (pos) {
send = 1;
} else {
send = 0;
break;
}
} else {
if (pos) {
send = 0;
break;
} else {
send = 1;
}
}
}
}
}
switch_mutex_unlock(l->filter_mutex);
}
switch_thread_rwlock_unlock(l->event_rwlock); switch_thread_rwlock_unlock(l->event_rwlock);
if (send) { if (send) {
@ -1273,6 +1337,7 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
listener->level = SWITCH_LOG_DEBUG; listener->level = SWITCH_LOG_DEBUG;
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool); switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool); switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_mutex_init(&listener->filter_mutex, SWITCH_MUTEX_NESTED, listener->pool);
switch_thread_rwlock_create(&listener->rwlock, pool); switch_thread_rwlock_create(&listener->rwlock, pool);
switch_thread_rwlock_create(&listener->event_rwlock, pool); switch_thread_rwlock_create(&listener->event_rwlock, pool);

View File

@ -126,6 +126,8 @@ struct listener {
switch_memory_pool_t *pool; switch_memory_pool_t *pool;
switch_mutex_t *flag_mutex; switch_mutex_t *flag_mutex;
switch_mutex_t *sock_mutex; switch_mutex_t *sock_mutex;
switch_mutex_t *filter_mutex;
switch_event_t *filters;
char *ebuf; char *ebuf;
uint32_t flags; uint32_t flags;
switch_log_level_t level; switch_log_level_t level;