diff --git a/src/mod/event_handlers/mod_erlang_event/handle_msg.c b/src/mod/event_handlers/mod_erlang_event/handle_msg.c index 8e6ba46a62..944263b2f5 100644 --- a/src/mod/event_handlers/mod_erlang_event/handle_msg.c +++ b/src/mod/event_handlers/mod_erlang_event/handle_msg.c @@ -337,10 +337,11 @@ static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg switch_event_types_t type; int i = 0; + switch_thread_rwlock_wrlock(session->event_rwlock); + for (i = 1; i < arity; i++) { if (!ei_decode_atom(buf->buff, &buf->index, atom)) { - /* TODO session write locking */ if (custom) { switch_core_hash_insert(session->event_hash, atom, MARKER); } else if (switch_name_event(atom, &type) == SWITCH_STATUS_SUCCESS) { @@ -363,6 +364,9 @@ static switch_status_t handle_msg_session_event(listener_t *listener, erlang_msg switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str); } } + + switch_thread_rwlock_unlock(session->event_rwlock); + ei_x_encode_atom(rbuf, "ok"); } else { ei_x_encode_tuple_header(rbuf, 2); @@ -437,7 +441,8 @@ static switch_status_t handle_msg_session_nixevent(listener_t *listener, erlang_ int i = 0; switch_event_types_t type; - /* TODO session write lock */ + switch_thread_rwlock_wrlock(session->event_rwlock); + for (i = 1; i < arity; i++) { if (!ei_decode_atom(buf->buff, &buf->index, atom)) { @@ -464,6 +469,8 @@ static switch_status_t handle_msg_session_nixevent(listener_t *listener, erlang_ } } } + switch_thread_rwlock_unlock(session->event_rwlock); + ei_x_encode_atom(rbuf, "ok"); } else { /* no session for this pid */ ei_x_encode_tuple_header(rbuf, 2); @@ -590,12 +597,15 @@ static switch_status_t handle_msg_session_setevent(listener_t *listener, erlang_ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "enable event %s for session %s\n", atom, session->uuid_str); } } + /* update the event subscriptions with the new ones */ + switch_thread_rwlock_wrlock(session->event_rwlock); memcpy(session->event_list, event_list, sizeof(uint8_t) * (SWITCH_EVENT_ALL + 1)); /* wipe the old hash, and point the pointer at the new one */ - /* TODO make thread safe*/ switch_core_hash_destroy(&session->event_hash); session->event_hash = event_hash; + switch_thread_rwlock_unlock(session->event_rwlock); + /* TODO - we should flush any non-matching events from the queue */ ei_x_encode_atom(rbuf, "ok"); } else { /* no session for this pid */ @@ -1045,7 +1055,6 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e } switch_core_hash_delete_multi(listener->event_hash, NULL, NULL); - switch_core_hash_init(&listener->event_hash, listener->pool); switch_thread_rwlock_unlock(listener->event_rwlock); ei_x_encode_atom(rbuf, "ok"); @@ -1062,13 +1071,15 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg * msg, e /*purge the event queue */ while (switch_queue_trypop(session->event_queue, &pop) == SWITCH_STATUS_SUCCESS); + + switch_thread_rwlock_wrlock(session->event_rwlock); for (x = 0; x <= SWITCH_EVENT_ALL; x++) { session->event_list[x] = 0; } /* wipe the hash */ - /* TODO make thread safe*/ - switch_core_hash_destroy(&session->event_hash); - switch_core_hash_init(&session->event_hash, session->pool); + switch_core_hash_delete_multi(session->event_hash, NULL, NULL); + switch_thread_rwlock_unlock(session->event_rwlock); + ei_x_encode_atom(rbuf, "ok"); } else { ei_x_encode_tuple_header(rbuf, 2); diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index daa31d773a..fa1ea86982 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -144,6 +144,9 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t if (s) { int send = 0; + + switch_thread_rwlock_rdlock(s->event_rwlock); + if (s->event_list[SWITCH_EVENT_ALL]) { send = 1; } else if ((s->event_list[event->event_id])) { @@ -152,6 +155,8 @@ static void send_event_to_attached_sessions(listener_t *listener, switch_event_t } } + switch_thread_rwlock_unlock(s->event_rwlock); + if (send) { switch_log_printf(SWITCH_CHANNEL_UUID_LOG(s->uuid_str), SWITCH_LOG_DEBUG, "Sending event %s to attached session %s\n", switch_event_name(event->event_id), s->uuid_str); @@ -1316,6 +1321,7 @@ session_elem_t *session_elem_create(listener_t *listener, switch_core_session_t } switch_thread_rwlock_create(&session_element->rwlock, session_element->pool); + switch_thread_rwlock_create(&session_element->event_rwlock, session_element->pool); session_element->event_list[SWITCH_EVENT_ALL] = 1; /* defaults to everything */ diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h index 2e9fed865f..d5f890d0a1 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.h @@ -84,6 +84,7 @@ struct session_elem { struct erlang_process process; switch_queue_t *event_queue; switch_thread_rwlock_t *rwlock; + switch_thread_rwlock_t *event_rwlock; switch_channel_state_t channel_state; switch_memory_pool_t *pool; uint8_t event_list[SWITCH_EVENT_ALL + 1];