[Core] Fix crash in switch_sql_queue_manager on shutdown, improve thread-safety, implement switch_thread_ctl functionality.

This commit is contained in:
Andrey Volk 2019-12-06 14:23:37 +04:00
parent b93eea73ef
commit 234e04e752
7 changed files with 187 additions and 56 deletions

View File

@ -854,8 +854,8 @@ struct switch_json_api_interface {
struct switch_json_api_interface *next; struct switch_json_api_interface *next;
}; };
#define PROTECT_INTERFACE(_it) if (_it) {switch_mutex_lock(_it->reflock); switch_thread_rwlock_rdlock(_it->parent->rwlock); switch_thread_rwlock_rdlock(_it->rwlock); _it->refs++; _it->parent->refs++; switch_mutex_unlock(_it->reflock);} //if (!strcmp(_it->interface_name, "user")) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "+++++++++++LOCK %s %d/%d\n", _it->interface_name, _it->refs, _it->parent->refs); #define PROTECT_INTERFACE(_it) if (_it) {switch_mutex_lock(_it->reflock); _it->refs++; _it->parent->refs++; switch_mutex_unlock(_it->reflock); switch_thread_rwlock_rdlock(_it->parent->rwlock); switch_thread_rwlock_rdlock(_it->rwlock);} //if (!strcmp(_it->interface_name, "user")) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "+++++++++++LOCK %s %d/%d\n", _it->interface_name, _it->refs, _it->parent->refs);
#define UNPROTECT_INTERFACE(_it) if (_it) {switch_mutex_lock(_it->reflock); switch_thread_rwlock_unlock(_it->rwlock); switch_thread_rwlock_unlock(_it->parent->rwlock); _it->refs--; _it->parent->refs--; switch_mutex_unlock(_it->reflock);} //if (!strcmp(_it->interface_name, "user")) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "---------UNLOCK %s %d/%d\n", _it->interface_name, _it->refs, _it->parent->refs); #define UNPROTECT_INTERFACE(_it) if (_it) {switch_thread_rwlock_unlock(_it->rwlock); switch_thread_rwlock_unlock(_it->parent->rwlock); switch_mutex_lock(_it->reflock); _it->refs--; _it->parent->refs--; switch_mutex_unlock(_it->reflock);} //if (!strcmp(_it->interface_name, "user")) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "---------UNLOCK %s %d/%d\n", _it->interface_name, _it->refs, _it->parent->refs);
#include "switch_frame.h" #include "switch_frame.h"

View File

@ -2304,6 +2304,7 @@ typedef struct switch_loadable_module switch_loadable_module_t;
typedef struct switch_frame switch_frame_t; typedef struct switch_frame switch_frame_t;
typedef struct switch_rtcp_frame switch_rtcp_frame_t; typedef struct switch_rtcp_frame switch_rtcp_frame_t;
typedef struct switch_channel switch_channel_t; typedef struct switch_channel switch_channel_t;
typedef struct switch_thread_ctl switch_thread_ctl_t;
typedef struct switch_sql_queue_manager switch_sql_queue_manager_t; typedef struct switch_sql_queue_manager switch_sql_queue_manager_t;
typedef struct switch_file_handle switch_file_handle_t; typedef struct switch_file_handle switch_file_handle_t;
typedef struct switch_core_session switch_core_session_t; typedef struct switch_core_session switch_core_session_t;

View File

@ -1452,6 +1452,116 @@ SWITCH_DECLARE(void) switch_getcputime(switch_cputime *t);
SWITCH_DECLARE(char *)switch_html_strip(const char *str); SWITCH_DECLARE(char *)switch_html_strip(const char *str);
/**
* switch_thread_ctl is the right thing if you need a thread to dispatch, buffer or queue
* Use it (and its helper functions) to safety control the thread states.
* Initiate switch_thread_ctl before starting your thread
* Use switch_thread_ctl_thread_set_running() and switch_thread_ctl_thread_set_stopped() inside your thread.
* Check your thread states from the outside with switch_thread_ctl_thread_is_initiated() and switch_thread_ctl_thread_is_running()
* See more in the header file.
**/
struct switch_thread_ctl {
volatile int8_t thread_initiated;
volatile int8_t thread_running;
switch_thread_rwlock_t *rwlock;
};
/**
* Allocates and initializes switch_thread_ctl structure
* \thread_ctl [in] thread_ctl
* \pool [in] memory pool
* \return void
**/
static inline void switch_thread_ctl_init(switch_thread_ctl_t **thread_ctl, switch_memory_pool_t *pool) {
*thread_ctl = (switch_thread_ctl_t *)switch_core_alloc(pool, sizeof(switch_thread_ctl_t));
switch_thread_rwlock_create(&(*thread_ctl)->rwlock, pool);
}
/**
* Checks if the newly created switch_thread_ctl thread was finally initiated to run.
* Usually called from the controlled thread itself to demonstrate that the thread turned into the operational state.
* Does not mean the thread is still running, because it may stop right away after by its own reasons.
* \thread_ctl [in] thread_ctl
* \return 1 - initiated, 0 - was not initiated yet.
**/
static inline int8_t switch_thread_ctl_thread_is_initiated(switch_thread_ctl_t *thread_ctl)
{
int8_t result;
switch_thread_rwlock_rdlock(thread_ctl->rwlock);
result = (thread_ctl->thread_initiated == 1);
switch_thread_rwlock_unlock(thread_ctl->rwlock);
return result;
}
/**
* Checks if switch_thread_ctl thread is running
* \thread_ctl [in] thread_ctl
* \return 1 - running, 0 - stopping or stopped.
**/
static inline int8_t switch_thread_ctl_thread_is_running(switch_thread_ctl_t *thread_ctl)
{
int8_t result;
switch_thread_rwlock_rdlock(thread_ctl->rwlock);
result = (thread_ctl->thread_running == 1);
switch_thread_rwlock_unlock(thread_ctl->rwlock);
return result;
}
/**
* Asks switch_thread_ctl thread to begin stopping and give other threads ability to wait for the actual stop.
* \thread_ctl [in] thread_ctl
* \return void
**/
static inline void switch_thread_ctl_thread_request_stop(switch_thread_ctl_t *thread_ctl)
{
switch_thread_rwlock_wrlock(thread_ctl->rwlock);
thread_ctl->thread_running = -1;
switch_thread_rwlock_unlock(thread_ctl->rwlock);
}
/**
* Checks if switch_thread_ctl thread is still stopping and not finally stopped.
* \thread_ctl [in] thread_ctl
* \return 1 - stopping, 0 - not stopping (0 does not mean stopped if wrongly used).
**/
static inline int8_t switch_thread_ctl_thread_is_stopping(switch_thread_ctl_t *thread_ctl)
{
int8_t result;
switch_thread_rwlock_rdlock(thread_ctl->rwlock);
result = (thread_ctl->thread_running == -1);
switch_thread_rwlock_unlock(thread_ctl->rwlock);
return result;
}
/**
* Puts switch_thread_ctl thread into the running and initializated state (initializated state will be never changed later)
* \thread_ctl [in] thread_ctl
* \return void
**/
static inline void switch_thread_ctl_thread_set_running(switch_thread_ctl_t *thread_ctl)
{
switch_thread_rwlock_wrlock(thread_ctl->rwlock);
thread_ctl->thread_initiated = 1;
thread_ctl->thread_running = 1;
switch_thread_rwlock_unlock(thread_ctl->rwlock);
}
/**
* Puts switch_thread_ctl into the stopped state.
* Usually called by the controled thread itself right before the exiting.
* NOTE: Does not flip the thread_initiated flag!
* \thread_ctl [in] thread_ctl
* \return void
**/
static inline void switch_thread_ctl_thread_set_stopped(switch_thread_ctl_t *thread_ctl)
{
switch_thread_rwlock_wrlock(thread_ctl->rwlock);
thread_ctl->thread_running = 0;
switch_thread_rwlock_unlock(thread_ctl->rwlock);
}
SWITCH_END_EXTERN_C SWITCH_END_EXTERN_C
#endif #endif
/* For Emacs: /* For Emacs:

View File

@ -47,6 +47,7 @@ struct switch_cache_db_handle {
time_t last_used; time_t last_used;
switch_mutex_t *mutex; switch_mutex_t *mutex;
switch_mutex_t *io_mutex; switch_mutex_t *io_mutex;
switch_mutex_t *usage_mutex;
switch_memory_pool_t *pool; switch_memory_pool_t *pool;
int32_t flags; int32_t flags;
unsigned long hash; unsigned long hash;
@ -61,17 +62,16 @@ struct switch_cache_db_handle {
static struct { static struct {
switch_memory_pool_t *memory_pool; switch_memory_pool_t *memory_pool;
switch_thread_t *db_thread; switch_thread_t *db_thread;
int db_thread_running;
switch_bool_t manage; switch_bool_t manage;
switch_mutex_t *io_mutex; switch_mutex_t *io_mutex;
switch_mutex_t *dbh_mutex; switch_mutex_t *dbh_mutex;
switch_mutex_t *ctl_mutex;
switch_cache_db_handle_t *handle_pool; switch_cache_db_handle_t *handle_pool;
uint32_t total_handles; uint32_t total_handles;
uint32_t total_used_handles; uint32_t total_used_handles;
switch_cache_db_handle_t *dbh; switch_cache_db_handle_t *dbh;
switch_sql_queue_manager_t *qm; switch_sql_queue_manager_t *qm;
int paused; int paused;
switch_thread_ctl_t *thread_ctl;
} sql_manager; } sql_manager;
@ -91,6 +91,7 @@ static switch_cache_db_handle_t *create_handle(switch_cache_db_handle_type_t typ
new_dbh->pool = pool; new_dbh->pool = pool;
new_dbh->type = type; new_dbh->type = type;
switch_mutex_init(&new_dbh->mutex, SWITCH_MUTEX_NESTED, new_dbh->pool); switch_mutex_init(&new_dbh->mutex, SWITCH_MUTEX_NESTED, new_dbh->pool);
switch_mutex_init(&new_dbh->usage_mutex, SWITCH_MUTEX_NESTED, new_dbh->pool);
return new_dbh; return new_dbh;
} }
@ -100,6 +101,7 @@ static void add_handle(switch_cache_db_handle_t *dbh, const char *db_str, const
switch_ssize_t hlen = -1; switch_ssize_t hlen = -1;
switch_mutex_lock(sql_manager.dbh_mutex); switch_mutex_lock(sql_manager.dbh_mutex);
switch_mutex_lock(dbh->mutex);
switch_set_string(dbh->creator, db_callsite_str); switch_set_string(dbh->creator, db_callsite_str);
@ -114,7 +116,7 @@ static void add_handle(switch_cache_db_handle_t *dbh, const char *db_str, const
sql_manager.handle_pool = dbh; sql_manager.handle_pool = dbh;
sql_manager.total_handles++; sql_manager.total_handles++;
switch_mutex_lock(dbh->mutex);
switch_mutex_unlock(sql_manager.dbh_mutex); switch_mutex_unlock(sql_manager.dbh_mutex);
} }
@ -122,7 +124,6 @@ static void del_handle(switch_cache_db_handle_t *dbh)
{ {
switch_cache_db_handle_t *dbh_ptr, *last = NULL; switch_cache_db_handle_t *dbh_ptr, *last = NULL;
switch_mutex_lock(sql_manager.dbh_mutex);
for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) { for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
if (dbh_ptr == dbh) { if (dbh_ptr == dbh) {
if (last) { if (last) {
@ -136,7 +137,6 @@ static void del_handle(switch_cache_db_handle_t *dbh)
last = dbh_ptr; last = dbh_ptr;
} }
switch_mutex_unlock(sql_manager.dbh_mutex);
} }
SWITCH_DECLARE(void) switch_cache_db_database_interface_flush_handles(switch_database_interface_t *database_interface) SWITCH_DECLARE(void) switch_cache_db_database_interface_flush_handles(switch_database_interface_t *database_interface)
@ -343,8 +343,10 @@ SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t
break; break;
} }
switch_mutex_lock(sql_manager.dbh_mutex);
(*dbh)->last_used = switch_epoch_time_now(NULL); (*dbh)->last_used = switch_epoch_time_now(NULL);
switch_mutex_unlock((*dbh)->mutex);
switch_mutex_lock(sql_manager.dbh_mutex);
(*dbh)->io_mutex = NULL; (*dbh)->io_mutex = NULL;
@ -353,10 +355,11 @@ SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t
(*dbh)->thread_hash = 1; (*dbh)->thread_hash = 1;
} }
} }
switch_mutex_unlock((*dbh)->mutex);
sql_manager.total_used_handles--; sql_manager.total_used_handles--;
*dbh = NULL;
switch_mutex_unlock(sql_manager.dbh_mutex); switch_mutex_unlock(sql_manager.dbh_mutex);
*dbh = NULL;
} }
} }
@ -1557,9 +1560,9 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *threa
{ {
int sec = 0, reg_sec = 0;; int sec = 0, reg_sec = 0;;
sql_manager.db_thread_running = 1; switch_thread_ctl_thread_set_running(sql_manager.thread_ctl);
while (sql_manager.db_thread_running == 1) { while (switch_thread_ctl_thread_is_running(sql_manager.thread_ctl)) {
if (++sec == SQL_CACHE_TIMEOUT) { if (++sec == SQL_CACHE_TIMEOUT) {
sql_close(switch_epoch_time_now(NULL)); sql_close(switch_epoch_time_now(NULL));
sec = 0; sec = 0;
@ -1588,7 +1591,6 @@ struct switch_sql_queue_manager {
uint32_t numq; uint32_t numq;
char *dsn; char *dsn;
switch_thread_t *thread; switch_thread_t *thread;
int thread_running;
switch_thread_cond_t *cond; switch_thread_cond_t *cond;
switch_mutex_t *cond_mutex; switch_mutex_t *cond_mutex;
switch_mutex_t *cond2_mutex; switch_mutex_t *cond2_mutex;
@ -1601,6 +1603,7 @@ struct switch_sql_queue_manager {
uint32_t max_trans; uint32_t max_trans;
uint32_t confirm; uint32_t confirm;
uint8_t paused; uint8_t paused;
switch_thread_ctl_t *thread_ctl;
}; };
static int qm_wake(switch_sql_queue_manager_t *qm) static int qm_wake(switch_sql_queue_manager_t *qm)
@ -1635,9 +1638,11 @@ static uint32_t qm_ttl(switch_sql_queue_manager_t *qm)
uint32_t ttl = 0; uint32_t ttl = 0;
uint32_t i; uint32_t i;
switch_mutex_lock(qm->mutex);
for (i = 0; i < qm->numq; i++) { for (i = 0; i < qm->numq; i++) {
ttl += switch_queue_size(qm->sql_queue[i]); ttl += switch_queue_size(qm->sql_queue[i]);
} }
switch_mutex_unlock(qm->mutex);
return ttl; return ttl;
} }
@ -1834,17 +1839,20 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_m
switch_status_t status = SWITCH_STATUS_FALSE; switch_status_t status = SWITCH_STATUS_FALSE;
uint32_t i, sanity = 100; uint32_t i, sanity = 100;
if (qm->thread_running == 1) { if (switch_thread_ctl_thread_is_running(qm->thread_ctl)) {
qm->thread_running = -1; switch_thread_ctl_thread_request_stop(qm->thread_ctl);
while(--sanity && qm->thread_running == -1) { while(--sanity && switch_thread_ctl_thread_is_stopping(qm->thread_ctl)) {
switch_mutex_lock(qm->mutex);
for(i = 0; i < qm->numq; i++) { for(i = 0; i < qm->numq; i++) {
switch_queue_push(qm->sql_queue[i], NULL); switch_queue_push(qm->sql_queue[i], NULL);
switch_queue_interrupt_all(qm->sql_queue[i]); switch_queue_interrupt_all(qm->sql_queue[i]);
} }
switch_mutex_unlock(qm->mutex);
qm_wake(qm); qm_wake(qm);
if (qm->thread_running == -1) { if (switch_thread_ctl_thread_is_stopping(qm->thread_ctl)) {
switch_yield(100000); switch_yield(100000);
} }
} }
@ -1866,7 +1874,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_
{ {
switch_threadattr_t *thd_attr; switch_threadattr_t *thd_attr;
if (!qm->thread_running) { if (!switch_thread_ctl_thread_is_running(qm->thread_ctl)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Starting SQL thread.\n", qm->name); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Starting SQL thread.\n", qm->name);
switch_threadattr_create(&thd_attr, qm->pool); switch_threadattr_create(&thd_attr, qm->pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
@ -1916,14 +1924,14 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_m
switch_status_t status; switch_status_t status;
int x = 0; int x = 0;
if (sql_manager.paused || qm->thread_running != 1) { if (sql_manager.paused || !switch_thread_ctl_thread_is_running(qm->thread_ctl)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql);
if (!dup) free((char *)sql); if (!dup) free((char *)sql);
qm_wake(qm); qm_wake(qm);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
if (qm->thread_running != 1) { if (!switch_thread_ctl_thread_is_running(qm->thread_ctl)) {
if (!dup) free((char *)sql); if (!dup) free((char *)sql);
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
@ -1958,7 +1966,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql
#ifdef EXEC_NOW #ifdef EXEC_NOW
switch_cache_db_handle_t *dbh; switch_cache_db_handle_t *dbh;
if (sql_manager.paused || qm->thread_running != 1) { if (sql_manager.paused || !switch_thread_ctl_thread_is_running(qm->thread_ctl)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql);
if (!dup) free((char *)sql); if (!dup) free((char *)sql);
qm_wake(qm); qm_wake(qm);
@ -2049,6 +2057,8 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n
qm->name = switch_core_strdup(qm->pool, name); qm->name = switch_core_strdup(qm->pool, name);
qm->max_trans = max_trans; qm->max_trans = max_trans;
switch_thread_ctl_init(&qm->thread_ctl, qm->pool);
switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool); switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool);
switch_mutex_init(&qm->cond2_mutex, SWITCH_MUTEX_NESTED, qm->pool); switch_mutex_init(&qm->cond2_mutex, SWITCH_MUTEX_NESTED, qm->pool);
switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool); switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool);
@ -2249,7 +2259,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
return NULL; return NULL;
} }
qm->thread_running = 1; switch_thread_ctl_thread_set_running(qm->thread_ctl);
switch_mutex_lock(qm->cond_mutex); switch_mutex_lock(qm->cond_mutex);
@ -2268,8 +2278,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
break; break;
} }
while (switch_thread_ctl_thread_is_running(qm->thread_ctl)) {
while (qm->thread_running == 1) {
uint32_t i, lc; uint32_t i, lc;
uint32_t written = 0, iterations = 0; uint32_t written = 0, iterations = 0;
@ -2313,8 +2322,15 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
check: check:
if ((lc = qm_ttl(qm)) == 0) { if ((lc = qm_ttl(qm)) == 0) {
/* Avoid deadlock by unlocking cond_mutex */
switch_mutex_unlock(qm->cond_mutex);
switch_mutex_lock(qm->cond2_mutex); switch_mutex_lock(qm->cond2_mutex);
/* switch_thread_cond_wait() requires mutex (cond_mutex in that case) to be locked before calling */
switch_mutex_lock(qm->cond_mutex);
switch_thread_cond_wait(qm->cond, qm->cond_mutex); switch_thread_cond_wait(qm->cond, qm->cond_mutex);
switch_mutex_unlock(qm->cond2_mutex); switch_mutex_unlock(qm->cond2_mutex);
} }
@ -2323,8 +2339,6 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
while (--i > 0 && (lc = qm_ttl(qm)) < 500) { while (--i > 0 && (lc = qm_ttl(qm)) < 500) {
switch_yield(5000); switch_yield(5000);
} }
} }
switch_mutex_unlock(qm->cond_mutex); switch_mutex_unlock(qm->cond_mutex);
@ -2335,7 +2349,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
switch_cache_db_release_db_handle(&qm->event_db); switch_cache_db_release_db_handle(&qm->event_db);
qm->thread_running = 0; switch_thread_ctl_thread_set_stopped(qm->thread_ctl);
return NULL; return NULL;
} }
@ -2834,15 +2848,22 @@ static void core_event_handler(switch_event_t *event)
if (sql_idx) { if (sql_idx) {
int i = 0; int i = 0;
switch_thread_rwlock_rdlock(sql_manager.thread_ctl->rwlock);
for (i = 0; i < sql_idx; i++) { for (i = 0; i < sql_idx; i++) {
if (switch_stristr("update channels", sql[i]) || switch_stristr("delete from channels", sql[i])) { if (!sql_manager.qm) {
switch_sql_queue_manager_push(sql_manager.qm, sql[i], 1, SWITCH_FALSE); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "DROP [%s]\n", sql[i]);
} else { }
switch_sql_queue_manager_push(sql_manager.qm, sql[i], 0, SWITCH_FALSE); else {
if (switch_stristr("update channels", sql[i]) || switch_stristr("delete from channels", sql[i])) {
switch_sql_queue_manager_push(sql_manager.qm, sql[i], 1, SWITCH_FALSE);
}
else {
switch_sql_queue_manager_push(sql_manager.qm, sql[i], 0, SWITCH_FALSE);
}
} }
sql[i] = NULL; sql[i] = NULL;
} }
switch_thread_rwlock_unlock(sql_manager.thread_ctl->rwlock);
} }
} }
@ -3369,11 +3390,11 @@ SWITCH_DECLARE(switch_cache_db_handle_type_t) switch_core_dbtype(void)
{ {
switch_cache_db_handle_type_t type = SCDB_TYPE_CORE_DB; switch_cache_db_handle_type_t type = SCDB_TYPE_CORE_DB;
switch_mutex_lock(sql_manager.ctl_mutex); switch_thread_rwlock_rdlock(sql_manager.thread_ctl->rwlock);
if (sql_manager.qm && sql_manager.qm->event_db) { if (sql_manager.qm && sql_manager.qm->event_db) {
type = sql_manager.qm->event_db->type; type = sql_manager.qm->event_db->type;
} }
switch_mutex_unlock(sql_manager.ctl_mutex); switch_thread_rwlock_unlock(sql_manager.thread_ctl->rwlock);
return type; return type;
} }
@ -3589,7 +3610,7 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.ctl_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); switch_thread_ctl_init(&sql_manager.thread_ctl, sql_manager.memory_pool);
if (!sql_manager.manage) goto skip; if (!sql_manager.manage) goto skip;
@ -3885,7 +3906,7 @@ SWITCH_DECLARE(void) switch_core_sqldb_resume(void)
static void switch_core_sqldb_stop_thread(void) static void switch_core_sqldb_stop_thread(void)
{ {
switch_mutex_lock(sql_manager.ctl_mutex); switch_thread_rwlock_wrlock(sql_manager.thread_ctl->rwlock);
if (sql_manager.manage) { if (sql_manager.manage) {
if (sql_manager.qm) { if (sql_manager.qm) {
switch_sql_queue_manager_destroy(&sql_manager.qm); switch_sql_queue_manager_destroy(&sql_manager.qm);
@ -3893,14 +3914,12 @@ static void switch_core_sqldb_stop_thread(void)
} else { } else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
} }
switch_thread_rwlock_unlock(sql_manager.thread_ctl->rwlock);
switch_mutex_unlock(sql_manager.ctl_mutex);
} }
static void switch_core_sqldb_start_thread(void) static void switch_core_sqldb_start_thread(void)
{ {
switch_thread_rwlock_wrlock(sql_manager.thread_ctl->rwlock);
switch_mutex_lock(sql_manager.ctl_mutex);
if (sql_manager.manage) { if (sql_manager.manage) {
if (!sql_manager.qm) { if (!sql_manager.qm) {
char *dbname = runtime.odbc_dsn; char *dbname = runtime.odbc_dsn;
@ -3927,7 +3946,7 @@ static void switch_core_sqldb_start_thread(void)
} else { } else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
} }
switch_mutex_unlock(sql_manager.ctl_mutex); switch_thread_rwlock_unlock(sql_manager.thread_ctl->rwlock);
} }
void switch_core_sqldb_stop(void) void switch_core_sqldb_stop(void)
@ -3936,8 +3955,8 @@ void switch_core_sqldb_stop(void)
switch_event_unbind_callback(core_event_handler); switch_event_unbind_callback(core_event_handler);
if (sql_manager.db_thread && sql_manager.db_thread_running) { if (sql_manager.db_thread && switch_thread_ctl_thread_is_running(sql_manager.thread_ctl)) {
sql_manager.db_thread_running = -1; switch_thread_ctl_thread_request_stop(sql_manager.thread_ctl);
switch_thread_join(&st, sql_manager.db_thread); switch_thread_join(&st, sql_manager.db_thread);
} }

View File

@ -571,7 +571,7 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch threads\n");
for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) { for(x = 0; x < (uint32_t)MAX_DISPATCH; x++) {
switch_status_t st; switch_status_t st;
switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]); switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]);
} }

View File

@ -2526,9 +2526,9 @@ SWITCH_DECLARE(switch_endpoint_interface_t *) switch_loadable_module_get_endpoin
switch_mutex_lock(loadable_modules.mutex); switch_mutex_lock(loadable_modules.mutex);
ptr = switch_core_hash_find(loadable_modules.endpoint_hash, name); ptr = switch_core_hash_find(loadable_modules.endpoint_hash, name);
PROTECT_INTERFACE(ptr);
switch_mutex_unlock(loadable_modules.mutex); switch_mutex_unlock(loadable_modules.mutex);
PROTECT_INTERFACE(ptr);
return ptr; return ptr;
} }
@ -2555,7 +2555,7 @@ SWITCH_DECLARE(switch_file_interface_t *) switch_loadable_module_get_file_interf
switch_mutex_unlock(loadable_modules.mutex); switch_mutex_unlock(loadable_modules.mutex);
if (i) PROTECT_INTERFACE(i); PROTECT_INTERFACE(i);
return i; return i;
} }
@ -2583,7 +2583,7 @@ SWITCH_DECLARE(switch_database_interface_t *) switch_loadable_module_get_databas
switch_mutex_unlock(loadable_modules.mutex); switch_mutex_unlock(loadable_modules.mutex);
if (i) PROTECT_INTERFACE(i); PROTECT_INTERFACE(i);
return i; return i;
} }

View File

@ -61,7 +61,7 @@ static switch_queue_t *LOG_QUEUE = NULL;
#ifdef SWITCH_LOG_RECYCLE #ifdef SWITCH_LOG_RECYCLE
static switch_queue_t *LOG_RECYCLE_QUEUE = NULL; static switch_queue_t *LOG_RECYCLE_QUEUE = NULL;
#endif #endif
static int8_t THREAD_RUNNING = 0; static switch_thread_ctl_t *thread_ctl;
static uint8_t MAX_LEVEL = 0; static uint8_t MAX_LEVEL = 0;
static int mods_loaded = 0; static int mods_loaded = 0;
static int console_mods_loaded = 0; static int console_mods_loaded = 0;
@ -434,9 +434,9 @@ static void *SWITCH_THREAD_FUNC log_thread(switch_thread_t *t, void *obj)
if (!obj) { if (!obj) {
obj = NULL; obj = NULL;
} }
THREAD_RUNNING = 1; switch_thread_ctl_thread_set_running(thread_ctl);
while (THREAD_RUNNING == 1) { while (1) {
void *pop = NULL; void *pop = NULL;
switch_log_node_t *node = NULL; switch_log_node_t *node = NULL;
switch_log_binding_t *binding; switch_log_binding_t *binding;
@ -446,7 +446,6 @@ static void *SWITCH_THREAD_FUNC log_thread(switch_thread_t *t, void *obj)
} }
if (!pop) { if (!pop) {
THREAD_RUNNING = -1;
break; break;
} }
@ -463,7 +462,7 @@ static void *SWITCH_THREAD_FUNC log_thread(switch_thread_t *t, void *obj)
} }
THREAD_RUNNING = 0; switch_thread_ctl_thread_set_stopped(thread_ctl);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Logger Ended.\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Logger Ended.\n");
return NULL; return NULL;
} }
@ -478,7 +477,7 @@ SWITCH_DECLARE(void) switch_log_printf(switch_text_channel_t channel, const char
va_end(ap); va_end(ap);
} }
#define do_mods (LOG_QUEUE && THREAD_RUNNING) #define do_mods (LOG_QUEUE && switch_thread_ctl_thread_is_running(thread_ctl))
SWITCH_DECLARE(void) switch_log_vprintf(switch_text_channel_t channel, const char *file, const char *func, int line, SWITCH_DECLARE(void) switch_log_vprintf(switch_text_channel_t channel, const char *file, const char *func, int line,
const char *userdata, switch_log_level_t level, const char *fmt, va_list ap) const char *userdata, switch_log_level_t level, const char *fmt, va_list ap)
{ {
@ -665,6 +664,8 @@ SWITCH_DECLARE(switch_status_t) switch_log_init(switch_memory_pool_t *pool, swit
LOG_POOL = pool; LOG_POOL = pool;
switch_thread_ctl_init(&thread_ctl, LOG_POOL);
switch_threadattr_create(&thd_attr, LOG_POOL); switch_threadattr_create(&thd_attr, LOG_POOL);
switch_queue_create(&LOG_QUEUE, SWITCH_CORE_QUEUE_LEN, LOG_POOL); switch_queue_create(&LOG_QUEUE, SWITCH_CORE_QUEUE_LEN, LOG_POOL);
@ -675,7 +676,7 @@ SWITCH_DECLARE(switch_status_t) switch_log_init(switch_memory_pool_t *pool, swit
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, log_thread, NULL, LOG_POOL); switch_thread_create(&thread, thd_attr, log_thread, NULL, LOG_POOL);
while (!THREAD_RUNNING) { while (!switch_thread_ctl_thread_is_initiated(thread_ctl)) {
switch_cond_next(); switch_cond_next();
} }
@ -717,7 +718,7 @@ SWITCH_DECLARE(switch_status_t) switch_log_shutdown(void)
switch_queue_push(LOG_QUEUE, NULL); switch_queue_push(LOG_QUEUE, NULL);
while (THREAD_RUNNING) { while (switch_thread_ctl_thread_is_running(thread_ctl)) {
switch_cond_next(); switch_cond_next();
} }