add session event r/w locking FS-3432
This commit is contained in:
parent
87f65f1784
commit
85656c31af
|
@ -337,10 +337,11 @@ static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg
|
||||||
switch_event_types_t type;
|
switch_event_types_t type;
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
|
||||||
|
switch_thread_rwlock_wrlock(session->event_rwlock);
|
||||||
|
|
||||||
for (i = 1; i < arity; i++) {
|
for (i = 1; i < arity; i++) {
|
||||||
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
|
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
|
||||||
|
|
||||||
/* TODO session write locking */
|
|
||||||
if (custom) {
|
if (custom) {
|
||||||
switch_core_hash_insert(session->event_hash, atom, MARKER);
|
switch_core_hash_insert(session->event_hash, atom, MARKER);
|
||||||
} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
|
} else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) {
|
||||||
|
@ -363,6 +364,9 @@ static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch_thread_rwlock_unlock(session->event_rwlock);
|
||||||
|
|
||||||
ei_x_encode_atom(rbuf, "ok");
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
} else {
|
} else {
|
||||||
ei_x_encode_tuple_header(rbuf, 2);
|
ei_x_encode_tuple_header(rbuf, 2);
|
||||||
|
@ -437,7 +441,8 @@ static switch_status_t handle_msg_session_nixevent(listener_t *listener, erlang_
|
||||||
int i = 0;
|
int i = 0;
|
||||||
switch_event_types_t type;
|
switch_event_types_t type;
|
||||||
|
|
||||||
/* TODO session write lock */
|
switch_thread_rwlock_wrlock(session->event_rwlock);
|
||||||
|
|
||||||
for (i = 1; i < arity; i++) {
|
for (i = 1; i < arity; i++) {
|
||||||
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
|
if (!ei_decode_atom(buf->buff, &buf->index, atom)) {
|
||||||
|
|
||||||
|
@ -464,6 +469,8 @@ static switch_status_t handle_msg_session_nixevent(listener_t *listener, erlang_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
switch_thread_rwlock_unlock(session->event_rwlock);
|
||||||
|
|
||||||
ei_x_encode_atom(rbuf, "ok");
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
} else { /* no session for this pid */
|
} else { /* no session for this pid */
|
||||||
ei_x_encode_tuple_header(rbuf, 2);
|
ei_x_encode_tuple_header(rbuf, 2);
|
||||||
|
@ -590,12 +597,15 @@ static switch_status_t handle_msg_session_setevent(listener_t *listener, erlang_
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* update the event subscriptions with the new ones */
|
/* update the event subscriptions with the new ones */
|
||||||
|
switch_thread_rwlock_wrlock(session->event_rwlock);
|
||||||
memcpy(session->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
|
memcpy(session->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1));
|
||||||
/* wipe the old hash, and point the pointer at the new one */
|
/* wipe the old hash, and point the pointer at the new one */
|
||||||
/* TODO make thread safe*/
|
|
||||||
switch_core_hash_destroy(&session->event_hash);
|
switch_core_hash_destroy(&session->event_hash);
|
||||||
session->event_hash = event_hash;
|
session->event_hash = event_hash;
|
||||||
|
switch_thread_rwlock_unlock(session->event_rwlock);
|
||||||
|
|
||||||
/* TODO - we should flush any non-matching events from the queue */
|
/* TODO - we should flush any non-matching events from the queue */
|
||||||
ei_x_encode_atom(rbuf, "ok");
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
} else { /* no session for this pid */
|
} else { /* no session for this pid */
|
||||||
|
@ -1045,7 +1055,6 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_core_hash_delete_multi(listener->event_hash, NULL, NULL);
|
switch_core_hash_delete_multi(listener->event_hash, NULL, NULL);
|
||||||
switch_core_hash_init(&listener->event_hash, listener->pool);
|
|
||||||
|
|
||||||
switch_thread_rwlock_unlock(listener->event_rwlock);
|
switch_thread_rwlock_unlock(listener->event_rwlock);
|
||||||
ei_x_encode_atom(rbuf, "ok");
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
|
@ -1062,13 +1071,15 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e
|
||||||
|
|
||||||
/*purge the event queue */
|
/*purge the event queue */
|
||||||
while (switch_queue_trypop(session->event_queue, &pop) == SWITCH_STATUS_SUCCESS);
|
while (switch_queue_trypop(session->event_queue, &pop) == SWITCH_STATUS_SUCCESS);
|
||||||
|
|
||||||
|
switch_thread_rwlock_wrlock(session->event_rwlock);
|
||||||
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
|
for (x = 0; x <= SWITCH_EVENT_ALL; x++) {
|
||||||
session->event_list[x] = 0;
|
session->event_list[x] = 0;
|
||||||
}
|
}
|
||||||
/* wipe the hash */
|
/* wipe the hash */
|
||||||
/* TODO make thread safe*/
|
switch_core_hash_delete_multi(session->event_hash, NULL, NULL);
|
||||||
switch_core_hash_destroy(&session->event_hash);
|
switch_thread_rwlock_unlock(session->event_rwlock);
|
||||||
switch_core_hash_init(&session->event_hash, session->pool);
|
|
||||||
ei_x_encode_atom(rbuf, "ok");
|
ei_x_encode_atom(rbuf, "ok");
|
||||||
} else {
|
} else {
|
||||||
ei_x_encode_tuple_header(rbuf, 2);
|
ei_x_encode_tuple_header(rbuf, 2);
|
||||||
|
|
|
@ -144,6 +144,9 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t
|
||||||
|
|
||||||
if (s) {
|
if (s) {
|
||||||
int send = 0;
|
int send = 0;
|
||||||
|
|
||||||
|
switch_thread_rwlock_rdlock(s->event_rwlock);
|
||||||
|
|
||||||
if (s->event_list[SWITCH_EVENT_ALL]) {
|
if (s->event_list[SWITCH_EVENT_ALL]) {
|
||||||
send = 1;
|
send = 1;
|
||||||
} else if ((s->event_list[event->event_id])) {
|
} else if ((s->event_list[event->event_id])) {
|
||||||
|
@ -152,6 +155,8 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch_thread_rwlock_unlock(s->event_rwlock);
|
||||||
|
|
||||||
if (send) {
|
if (send) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(s->uuid_str), SWITCH_LOG_DEBUG, "Sending event %s to attached session %s\n",
|
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(s->uuid_str), SWITCH_LOG_DEBUG, "Sending event %s to attached session %s\n",
|
||||||
switch_event_name(event->event_id), s->uuid_str);
|
switch_event_name(event->event_id), s->uuid_str);
|
||||||
|
@ -1316,6 +1321,7 @@ session_elem_t *session_elem_create(listener_t *listener, switch_core_session_t
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_thread_rwlock_create(&session_element->rwlock, session_element->pool);
|
switch_thread_rwlock_create(&session_element->rwlock, session_element->pool);
|
||||||
|
switch_thread_rwlock_create(&session_element->event_rwlock, session_element->pool);
|
||||||
|
|
||||||
session_element->event_list[SWITCH_EVENT_ALL] = 1; /* defaults to everything */
|
session_element->event_list[SWITCH_EVENT_ALL] = 1; /* defaults to everything */
|
||||||
|
|
||||||
|
|
|
@ -84,6 +84,7 @@ struct session_elem {
|
||||||
struct erlang_process process;
|
struct erlang_process process;
|
||||||
switch_queue_t *event_queue;
|
switch_queue_t *event_queue;
|
||||||
switch_thread_rwlock_t *rwlock;
|
switch_thread_rwlock_t *rwlock;
|
||||||
|
switch_thread_rwlock_t *event_rwlock;
|
||||||
switch_channel_state_t channel_state;
|
switch_channel_state_t channel_state;
|
||||||
switch_memory_pool_t *pool;
|
switch_memory_pool_t *pool;
|
||||||
uint8_t event_list[SWITCH_EVENT_ALL + 1];
|
uint8_t event_list[SWITCH_EVENT_ALL + 1];
|
||||||
|
|
Loading…
Reference in New Issue