Switch from mutex to a rwlock to increase throughput
This commit is contained in:
parent
5ef61c961f
commit
121e57a1db
|
@ -646,7 +646,7 @@ static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg * msg, e
|
||||||
binding->process.pid = msg->from;
|
binding->process.pid = msg->from;
|
||||||
binding->listener = listener;
|
binding->listener = listener;
|
||||||
|
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
switch_thread_rwlock_wrlock(globals.listener_rwlock);
|
||||||
|
|
||||||
for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);
|
for (ptr = bindings.head; ptr && ptr->next; ptr = ptr->next);
|
||||||
|
|
||||||
|
@ -657,7 +657,7 @@ static switch_status_t handle_msg_bind(listener_t *listener, erlang_msg * msg, e
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section);
|
switch_xml_set_binding_sections(bindings.search_binding, switch_xml_get_binding_sections(bindings.search_binding) | section);
|
||||||
switch_mutex_unlock(globals.listener_mutex);
|
switch_thread_rwlock_unlock(globals.listener_rwlock);
|
||||||
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "sections %d\n", switch_xml_get_binding_sections(bindings.search_binding));
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l
|
||||||
{
|
{
|
||||||
listener_t *l;
|
listener_t *l;
|
||||||
|
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
switch_thread_rwlock_rdlock(globals.listener_rwlock);
|
||||||
for (l = listen_list.listeners; l; l = l->next) {
|
for (l = listen_list.listeners; l; l = l->next) {
|
||||||
if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) {
|
if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) {
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(globals.listener_mutex);
|
switch_thread_rwlock_unlock(globals.listener_rwlock);
|
||||||
|
|
||||||
return SWITCH_STATUS_SUCCESS;
|
return SWITCH_STATUS_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -107,7 +107,7 @@ static void remove_binding(listener_t *listener, erlang_pid * pid)
|
||||||
{
|
{
|
||||||
struct erlang_binding *ptr, *lst = NULL;
|
struct erlang_binding *ptr, *lst = NULL;
|
||||||
|
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
switch_thread_rwlock_wrlock(globals.listener_rwlock);
|
||||||
|
|
||||||
switch_xml_set_binding_sections(bindings.search_binding, SWITCH_XML_SECTION_MAX);
|
switch_xml_set_binding_sections(bindings.search_binding, SWITCH_XML_SECTION_MAX);
|
||||||
|
|
||||||
|
@ -134,7 +134,7 @@ static void remove_binding(listener_t *listener, erlang_pid * pid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_mutex_unlock(globals.listener_mutex);
|
switch_thread_rwlock_unlock(globals.listener_rwlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -194,7 +194,7 @@ static void event_handler(switch_event_t *event)
|
||||||
|
|
||||||
lp = listen_list.listeners;
|
lp = listen_list.listeners;
|
||||||
|
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
switch_thread_rwlock_rdlock(globals.listener_rwlock);
|
||||||
while (lp) {
|
while (lp) {
|
||||||
uint8_t send = 0;
|
uint8_t send = 0;
|
||||||
|
|
||||||
|
@ -250,7 +250,7 @@ static void event_handler(switch_event_t *event)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(globals.listener_mutex);
|
switch_thread_rwlock_unlock(globals.listener_rwlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -276,10 +276,10 @@ static void close_socket(int *sock)
|
||||||
static void add_listener(listener_t *listener)
|
static void add_listener(listener_t *listener)
|
||||||
{
|
{
|
||||||
/* add me to the listeners so I get events */
|
/* add me to the listeners so I get events */
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
switch_thread_rwlock_wrlock(globals.listener_rwlock);
|
||||||
listener->next = listen_list.listeners;
|
listener->next = listen_list.listeners;
|
||||||
listen_list.listeners = listener;
|
listen_list.listeners = listener;
|
||||||
switch_mutex_unlock(globals.listener_mutex);
|
switch_thread_rwlock_unlock(globals.listener_rwlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -287,7 +287,7 @@ static void remove_listener(listener_t *listener)
|
||||||
{
|
{
|
||||||
listener_t *l, *last = NULL;
|
listener_t *l, *last = NULL;
|
||||||
|
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
switch_thread_rwlock_wrlock(globals.listener_rwlock);
|
||||||
for (l = listen_list.listeners; l; l = l->next) {
|
for (l = listen_list.listeners; l; l = l->next) {
|
||||||
if (l == listener) {
|
if (l == listener) {
|
||||||
if (last) {
|
if (last) {
|
||||||
|
@ -298,7 +298,7 @@ static void remove_listener(listener_t *listener)
|
||||||
}
|
}
|
||||||
last = l;
|
last = l;
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(globals.listener_mutex);
|
switch_thread_rwlock_unlock(globals.listener_rwlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Search for a listener already talking to the specified node */
|
/* Search for a listener already talking to the specified node */
|
||||||
|
@ -306,13 +306,13 @@ static listener_t *find_listener(char *nodename)
|
||||||
{
|
{
|
||||||
listener_t *l = NULL;
|
listener_t *l = NULL;
|
||||||
|
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
switch_thread_rwlock_rdlock(globals.listener_rwlock);
|
||||||
for (l = listen_list.listeners; l; l = l->next) {
|
for (l = listen_list.listeners; l; l = l->next) {
|
||||||
if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) {
|
if (!strncmp(nodename, l->peer_nodename, MAXNODELEN)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(globals.listener_mutex);
|
switch_thread_rwlock_unlock(globals.listener_rwlock);
|
||||||
return l;
|
return l;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -968,9 +968,10 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
|
||||||
void *value;
|
void *value;
|
||||||
switch_hash_index_t *iter;
|
switch_hash_index_t *iter;
|
||||||
|
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
/* TODO - should we have a different mutex for this? */
|
||||||
|
switch_thread_rwlock_wrlock(globals.listener_rwlock);
|
||||||
prefs.threads++;
|
prefs.threads++;
|
||||||
switch_mutex_unlock(globals.listener_mutex);
|
switch_thread_rwlock_unlock(globals.listener_rwlock);
|
||||||
|
|
||||||
switch_assert(listener != NULL);
|
switch_assert(listener != NULL);
|
||||||
|
|
||||||
|
@ -1018,9 +1019,9 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
|
||||||
switch_core_destroy_memory_pool(&pool);
|
switch_core_destroy_memory_pool(&pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
switch_thread_rwlock_wrlock(globals.listener_rwlock);
|
||||||
prefs.threads--;
|
prefs.threads--;
|
||||||
switch_mutex_unlock(globals.listener_mutex);
|
switch_thread_rwlock_unlock(globals.listener_rwlock);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1537,7 +1538,7 @@ SWITCH_STANDARD_API(erlang_cmd)
|
||||||
if (!strcasecmp(argv[0], "listeners")) {
|
if (!strcasecmp(argv[0], "listeners")) {
|
||||||
|
|
||||||
listener_t *l;
|
listener_t *l;
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
switch_thread_rwlock_rdlock(globals.listener_rwlock);
|
||||||
|
|
||||||
if (listen_list.listeners) {
|
if (listen_list.listeners) {
|
||||||
for (l = listen_list.listeners; l; l = l->next) {
|
for (l = listen_list.listeners; l; l = l->next) {
|
||||||
|
@ -1547,12 +1548,12 @@ SWITCH_STANDARD_API(erlang_cmd)
|
||||||
stream->write_function(stream, "No active listeners\n");
|
stream->write_function(stream, "No active listeners\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_mutex_unlock(globals.listener_mutex);
|
switch_thread_rwlock_unlock(globals.listener_rwlock);
|
||||||
} else if (!strcasecmp(argv[0], "sessions") && argc == 2) {
|
} else if (!strcasecmp(argv[0], "sessions") && argc == 2) {
|
||||||
listener_t *l;
|
listener_t *l;
|
||||||
int found = 0;
|
int found = 0;
|
||||||
|
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
switch_thread_rwlock_rdlock(globals.listener_rwlock);
|
||||||
for (l = listen_list.listeners; l; l = l->next) {
|
for (l = listen_list.listeners; l; l = l->next) {
|
||||||
if (!strcasecmp(l->peer_nodename, argv[1])) {
|
if (!strcasecmp(l->peer_nodename, argv[1])) {
|
||||||
session_elem_t *sp;
|
session_elem_t *sp;
|
||||||
|
@ -1578,7 +1579,7 @@ SWITCH_STANDARD_API(erlang_cmd)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
switch_mutex_unlock(globals.listener_mutex);
|
switch_thread_rwlock_unlock(globals.listener_rwlock);
|
||||||
|
|
||||||
if (!found)
|
if (!found)
|
||||||
stream->write_function(stream, "Could not find a listener for %s\n", argv[1]);
|
stream->write_function(stream, "Could not find a listener for %s\n", argv[1]);
|
||||||
|
@ -1604,7 +1605,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
|
||||||
|
|
||||||
memset(&prefs, 0, sizeof(prefs));
|
memset(&prefs, 0, sizeof(prefs));
|
||||||
|
|
||||||
switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
|
switch_thread_rwlock_create(&globals.listener_rwlock, pool);
|
||||||
switch_mutex_init(&globals.fetch_reply_mutex, SWITCH_MUTEX_DEFAULT, pool);
|
switch_mutex_init(&globals.fetch_reply_mutex, SWITCH_MUTEX_DEFAULT, pool);
|
||||||
switch_core_hash_init(&globals.fetch_reply_hash, pool);
|
switch_core_hash_init(&globals.fetch_reply_hash, pool);
|
||||||
|
|
||||||
|
@ -1853,7 +1854,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown)
|
||||||
switch_event_unbind(&globals.node);
|
switch_event_unbind(&globals.node);
|
||||||
switch_xml_unbind_search_function_ptr(erlang_fetch);
|
switch_xml_unbind_search_function_ptr(erlang_fetch);
|
||||||
|
|
||||||
switch_mutex_lock(globals.listener_mutex);
|
switch_thread_rwlock_wrlock(globals.listener_rwlock);
|
||||||
|
|
||||||
for (l = listen_list.listeners; l; l = l->next) {
|
for (l = listen_list.listeners; l; l = l->next) {
|
||||||
close_socket(&l->sockfd);
|
close_socket(&l->sockfd);
|
||||||
|
@ -1863,7 +1864,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown)
|
||||||
WSACleanup();
|
WSACleanup();
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
switch_mutex_unlock(globals.listener_mutex);
|
switch_thread_rwlock_unlock(globals.listener_rwlock);
|
||||||
|
|
||||||
switch_sleep(1500000); /* sleep for 1.5 seconds */
|
switch_sleep(1500000); /* sleep for 1.5 seconds */
|
||||||
|
|
||||||
|
|
|
@ -162,7 +162,7 @@ struct api_command_struct {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct globals_struct {
|
struct globals_struct {
|
||||||
switch_mutex_t *listener_mutex;
|
switch_thread_rwlock_t *listener_rwlock;
|
||||||
switch_event_node_t *node;
|
switch_event_node_t *node;
|
||||||
switch_mutex_t *ref_mutex;
|
switch_mutex_t *ref_mutex;
|
||||||
switch_mutex_t *fetch_reply_mutex;
|
switch_mutex_t *fetch_reply_mutex;
|
||||||
|
|
Loading…
Reference in New Issue