mirror of
https://github.com/signalwire/freeswitch.git
synced 2025-03-10 03:41:16 +00:00
add listener event r/w locking FS-3432
This commit is contained in:
parent
eade657225
commit
4e6b56c53d
src/mod/event_handlers/mod_erlang_event
@ -28,6 +28,7 @@
|
|||||||
* Rob Charlton <rob.charlton@savageminds.com>
|
* Rob Charlton <rob.charlton@savageminds.com>
|
||||||
* Darren Schreiber <d@d-man.org>
|
* Darren Schreiber <d@d-man.org>
|
||||||
* Mike Jerris <mike@jerris.com>
|
* Mike Jerris <mike@jerris.com>
|
||||||
|
* Tamas Cseke <tamas.cseke@virtual-call-center.eu>
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
* handle_msg.c -- handle messages received from erlang nodes
|
* handle_msg.c -- handle messages received from erlang nodes
|
||||||
@ -286,7 +287,8 @@ static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_bu
|
|||||||
switch_set_flag_locked(listener, LFLAG_EVENTS);
|
switch_set_flag_locked(listener, LFLAG_EVENTS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TODO - listener write lock */
|
switch_thread_rwlock_wrlock(listener->event_rwlock);
|
||||||
|
|
||||||
for (i = 1; i < arity; i++) {
|
for (i = 1; i < arity; i++) {
|
||||||
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
|
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
|
||||||
|
|
||||||
@ -312,6 +314,8 @@ static switch_status_t handle_msg_event(listener_t *listener, int arity, ei_x_bu
|
|||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s\n", atom);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
switch_thread_rwlock_unlock(listener->event_rwlock);
|
||||||
|
|
||||||
ei_x_encode_atom(rbuf, "ok");
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
}
|
}
|
||||||
return SWITCH_STATUS_SUCCESS;
|
return SWITCH_STATUS_SUCCESS;
|
||||||
@ -382,7 +386,8 @@ static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x
|
|||||||
int i = 0;
|
int i = 0;
|
||||||
switch_event_types_t type;
|
switch_event_types_t type;
|
||||||
|
|
||||||
/* TODO listener write lock */
|
switch_thread_rwlock_wrlock(listener->event_rwlock);
|
||||||
|
|
||||||
for (i = 1; i < arity; i++) {
|
for (i = 1; i < arity; i++) {
|
||||||
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
|
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
|
||||||
|
|
||||||
@ -410,6 +415,8 @@ static switch_status_t handle_msg_nixevent(listener_t *listener, int arity, ei_x
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch_thread_rwlock_unlock(listener->event_rwlock);
|
||||||
ei_x_encode_atom(rbuf, "ok");
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
}
|
}
|
||||||
return SWITCH_STATUS_SUCCESS;
|
return SWITCH_STATUS_SUCCESS;
|
||||||
@ -522,11 +529,11 @@ static switch_status_t handle_msg_setevent(listener_t *listener, erlang_msg *msg
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* update the event subscriptions with the new ones */
|
/* update the event subscriptions with the new ones */
|
||||||
|
switch_thread_rwlock_wrlock(listener->event_rwlock);
|
||||||
memcpy(listener->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
|
memcpy(listener->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
|
||||||
/* wipe the old hash, and point the pointer at the new one */
|
|
||||||
/* TODO make thread safe */
|
|
||||||
switch_core_hash_destroy(&listener->event_hash);
|
switch_core_hash_destroy(&listener->event_hash);
|
||||||
listener->event_hash = event_hash;
|
listener->event_hash = event_hash;
|
||||||
|
switch_thread_rwlock_unlock(listener->event_rwlock);
|
||||||
|
|
||||||
/* TODO - we should flush any non-matching events from the queue */
|
/* TODO - we should flush any non-matching events from the queue */
|
||||||
ei_x_encode_atom(rbuf, "ok");
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
@ -1031,13 +1038,16 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e
|
|||||||
if (switch_test_flag(listener, LFLAG_EVENTS)) {
|
if (switch_test_flag(listener, LFLAG_EVENTS)) {
|
||||||
uint8_t x = 0;
|
uint8_t x = 0;
|
||||||
switch_clear_flag_locked(listener, LFLAG_EVENTS);
|
switch_clear_flag_locked(listener, LFLAG_EVENTS);
|
||||||
|
|
||||||
|
switch_thread_rwlock_wrlock(listener->event_rwlock);
|
||||||
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
|
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
|
||||||
listener->event_list[x] = 0;
|
listener->event_list[x] = 0;
|
||||||
}
|
}
|
||||||
/* wipe the hash */
|
|
||||||
/* TODO make thread safe*/
|
switch_core_hash_delete_multi(listener->event_hash, NULL, NULL);
|
||||||
switch_core_hash_destroy(&listener->event_hash);
|
|
||||||
switch_core_hash_init(&listener->event_hash, listener->pool);
|
switch_core_hash_init(&listener->event_hash, listener->pool);
|
||||||
|
|
||||||
|
switch_thread_rwlock_unlock(listener->event_rwlock);
|
||||||
ei_x_encode_atom(rbuf, "ok");
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
} else {
|
} else {
|
||||||
ei_x_encode_tuple_header(rbuf, 2);
|
ei_x_encode_tuple_header(rbuf, 2);
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
* Anthony Minessale II <anthm@freeswitch.org>
|
* Anthony Minessale II <anthm@freeswitch.org>
|
||||||
* Andrew Thompson <andrew@hijacked.us>
|
* Andrew Thompson <andrew@hijacked.us>
|
||||||
* Rob Charlton <rob.charlton@savageminds.com>
|
* Rob Charlton <rob.charlton@savageminds.com>
|
||||||
|
* Tamas Cseke <tamas.cseke@virtual-call-center.eu>
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
* mod_erlang_event.c -- Erlang Event Handler derived from mod_event_socket
|
* mod_erlang_event.c -- Erlang Event Handler derived from mod_event_socket
|
||||||
@ -200,6 +201,8 @@ static void event_handler(switch_event_t *event)
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch_thread_rwlock_rdlock(l->event_rwlock);
|
||||||
|
|
||||||
if (l->event_list[SWITCH_EVENT_ALL]) {
|
if (l->event_list[SWITCH_EVENT_ALL]) {
|
||||||
send = 1;
|
send = 1;
|
||||||
} else if ((l->event_list[event->event_id])) {
|
} else if ((l->event_list[event->event_id])) {
|
||||||
@ -208,6 +211,7 @@ static void event_handler(switch_event_t *event)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch_thread_rwlock_unlock(l->event_rwlock);
|
||||||
|
|
||||||
if (send) {
|
if (send) {
|
||||||
if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
|
if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
|
||||||
@ -815,14 +819,12 @@ static void handle_exit(listener_t *listener, erlang_pid * pid)
|
|||||||
uint8_t x = 0;
|
uint8_t x = 0;
|
||||||
switch_clear_flag_locked(listener, LFLAG_EVENTS);
|
switch_clear_flag_locked(listener, LFLAG_EVENTS);
|
||||||
|
|
||||||
|
switch_thread_rwlock_wrlock(listener->event_rwlock);
|
||||||
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
|
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
|
||||||
listener->event_list[x] = 0;
|
listener->event_list[x] = 0;
|
||||||
}
|
}
|
||||||
/* wipe the hash */
|
switch_core_hash_delete_multi(listener->event_hash, NULL, NULL);
|
||||||
/* XXX this needs to be locked */
|
switch_thread_rwlock_unlock(listener->event_rwlock);
|
||||||
/* TODO switch_core_hash_delete_multi_locked */
|
|
||||||
switch_core_hash_destroy(&listener->event_hash);
|
|
||||||
switch_core_hash_init(&listener->event_hash, listener->pool);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1177,7 +1179,6 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
|
|||||||
}
|
}
|
||||||
memset(listener, 0, sizeof(*listener));
|
memset(listener, 0, sizeof(*listener));
|
||||||
|
|
||||||
switch_thread_rwlock_create(&listener->rwlock, pool);
|
|
||||||
switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool);
|
switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, pool);
|
||||||
switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool);
|
switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, pool);
|
||||||
|
|
||||||
@ -1188,7 +1189,11 @@ static listener_t *new_listener(struct ei_cnode_s *ec, int clientfd)
|
|||||||
listener->level = SWITCH_LOG_DEBUG;
|
listener->level = SWITCH_LOG_DEBUG;
|
||||||
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
switch_mutex_init(&listener->flag_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
||||||
switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
switch_mutex_init(&listener->sock_mutex, SWITCH_MUTEX_NESTED, listener->pool);
|
||||||
|
|
||||||
|
switch_thread_rwlock_create(&listener->rwlock, pool);
|
||||||
|
switch_thread_rwlock_create(&listener->event_rwlock, pool);
|
||||||
switch_thread_rwlock_create(&listener->session_rwlock, listener->pool);
|
switch_thread_rwlock_create(&listener->session_rwlock, listener->pool);
|
||||||
|
|
||||||
switch_core_hash_init(&listener->event_hash, listener->pool);
|
switch_core_hash_init(&listener->event_hash, listener->pool);
|
||||||
switch_core_hash_init(&listener->sessions, listener->pool);
|
switch_core_hash_init(&listener->sessions, listener->pool);
|
||||||
|
|
||||||
|
@ -129,6 +129,7 @@ struct listener {
|
|||||||
uint8_t event_list[SWITCH_EVENT_ALL + 1];
|
uint8_t event_list[SWITCH_EVENT_ALL + 1];
|
||||||
switch_hash_t *event_hash;
|
switch_hash_t *event_hash;
|
||||||
switch_thread_rwlock_t *rwlock;
|
switch_thread_rwlock_t *rwlock;
|
||||||
|
switch_thread_rwlock_t *event_rwlock;
|
||||||
switch_thread_rwlock_t *session_rwlock;
|
switch_thread_rwlock_t *session_rwlock;
|
||||||
//session_elem_t *session_list;
|
//session_elem_t *session_list;
|
||||||
switch_hash_t *sessions;
|
switch_hash_t *sessions;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user