optimize sofia sql by using new core transaction processor we will no longer support databases that do not support transactions

This commit is contained in:
Anthony Minessale 2012-10-25 11:31:42 -05:00
parent c670ed6df5
commit 68e0b7e859
9 changed files with 316 additions and 291 deletions

View File

@ -70,6 +70,12 @@ typedef struct switch_hold_record_s {
} switch_hold_record_t;
typedef struct switch_thread_data_s {
switch_thread_start_t func;
void *obj;
int alloc;
} switch_thread_data_t;
#define MESSAGE_STAMP_FFL(_m) _m->_file = __FILE__; _m->_func = __SWITCH_FUNC__; _m->_line = __LINE__
@ -703,6 +709,7 @@ SWITCH_DECLARE(switch_core_session_t *) switch_core_session_request_by_name(_In_
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(_In_ switch_core_session_t *session);
SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp);
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session);
/*!
@ -2418,14 +2425,18 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session
SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session);
SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const char *profile_name);
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup);
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup);
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp);
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init(switch_sql_queue_manager_t **qmp,
uint32_t numq, const char *dsn,
const char *pre_trans_execute,
const char *post_trans_execute,
const char *inner_pre_trans_execute,
const char *inner_post_trans_execute);
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init_name(const char *name,
switch_sql_queue_manager_t **qmp,
uint32_t numq, const char *dsn,
const char *pre_trans_execute,
const char *post_trans_execute,
const char *inner_pre_trans_execute,
const char *inner_post_trans_execute);
#define switch_switch_sql_queue_manager_init(_q, _d, _p1, _p2, _ip1, _ip2) switch_switch_sql_queue_manager_init_name(__FILE__, _q, _d, _p1, _p2, _ip1, _ip2)
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm);
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm);

View File

@ -4900,8 +4900,7 @@ static switch_call_cause_t sofia_outgoing_channel(switch_core_session_t *session
sql = switch_mprintf("insert into sip_dialogs (uuid,presence_id,presence_data,profile_name,hostname,rcd,call_info_state) "
"values ('%q', '%q', '%q', '%q', '%q', %ld, '')", switch_core_session_get_uuid(nsession),
switch_str_nil(presence_id), switch_str_nil(presence_data), profile->name, mod_sofia_globals.hostname, (long) now);
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
switch_safe_free(sql);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
}
#endif

View File

@ -241,7 +241,7 @@ typedef enum {
PFLAG_DISABLE_HOLD,
PFLAG_AUTO_NAT,
PFLAG_SIPCOMPACT,
PFLAG_SQL_IN_TRANS,
PFLAG_USE_ME,
PFLAG_PRESENCE_PRIVACY,
PFLAG_PASS_CALLEE_ID,
PFLAG_LOG_AUTH_FAIL,
@ -273,6 +273,7 @@ typedef enum {
PFLAG_MWI_USE_REG_CALLID,
PFLAG_FIRE_MESSAGE_EVENTS,
PFLAG_SEND_DISPLAY_UPDATE,
PFLAG_RUNNING_TRANS,
/* No new flags below this line */
PFLAG_MAX
} PFLAGS;
@ -632,7 +633,7 @@ struct sofia_profile {
char *post_trans_execute;
char *inner_pre_trans_execute;
char *inner_post_trans_execute;
switch_queue_t *sql_queue;
switch_sql_queue_manager_t *qm;
char *acl[SOFIA_MAX_ACL];
char *acl_pass_context[SOFIA_MAX_ACL];
char *acl_fail_context[SOFIA_MAX_ACL];

View File

@ -1520,6 +1520,28 @@ void *SWITCH_THREAD_FUNC sofia_msg_thread_run_once(switch_thread_t *thread, void
return NULL;
}
void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
{
sofia_dispatch_event_t *de = *dep;
switch_memory_pool_t *pool;
sofia_profile_t *profile = (*dep)->profile;
switch_thread_data_t *td;
switch_core_new_memory_pool(&pool);
*dep = NULL;
de->pool = pool;
td = switch_core_alloc(pool, sizeof(*td));
td->func = sofia_msg_thread_run_once;
td->obj = de;
switch_mutex_lock(profile->ireg_mutex);
switch_thread_pool_launch_thread(&td);
switch_mutex_unlock(profile->ireg_mutex);
}
#if 0
void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
{
sofia_dispatch_event_t *de = *dep;
@ -1551,6 +1573,7 @@ void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
sofia_process_dispatch_event(&de);
}
}
#endif
void sofia_process_dispatch_event(sofia_dispatch_event_t **dep)
{
@ -2158,153 +2181,63 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
sofia_profile_t *profile = (sofia_profile_t *) obj;
uint32_t ireg_loops = profile->ireg_seconds; /* Number of loop iterations done when we haven't checked for registrations */
uint32_t gateway_loops = GATEWAY_SECONDS; /* Number of loop iterations done when we haven't checked for gateways */
void *pop = NULL; /* queue_pop placeholder */
switch_size_t sql_len = 1024 * 32; /* length of sqlbuf */
char *tmp, *sqlbuf = NULL; /* Buffer for SQL statements */
char *sql = NULL; /* Current SQL statement */
switch_time_t last_commit; /* Last time we committed stuff to the DB */
switch_time_t last_check; /* Last time we did the second-resolution loop that checks various stuff */
switch_size_t len = 0; /* Current length of sqlbuf */
uint32_t statements = 0; /* Number of statements in the current sql buffer */
last_commit = last_check = switch_micro_time_now();
if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
sqlbuf = (char *) malloc(sql_len);
}
sofia_set_pflag_locked(profile, PFLAG_WORKER_RUNNING);
switch_queue_create(&profile->sql_queue, SOFIA_QUEUE_SIZE, profile->pool);
/* While we're running, or there is a pending sql statment that we haven't appended to sqlbuf yet, because of a lack of buffer space */
while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || sql) {
if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
/* Do we have enough statements or is the timeout expired */
while (sql || (sofia_test_pflag(profile, PFLAG_RUNNING) && mod_sofia_globals.running == 1 &&
switch_micro_time_now() - last_check < 1000000 &&
(statements == 0 || (statements <= 1024 && (switch_micro_time_now() - last_commit)/1000 < profile->trans_timeout)))) {
while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING))) {
if (profile->watchdog_enabled) {
uint32_t event_diff = 0, step_diff = 0, event_fail = 0, step_fail = 0;
if (profile->step_timeout) {
step_diff = (uint32_t) ((switch_time_now() - profile->last_root_step) / 1000);
switch_interval_time_t sleepy_time = !statements ? 1000000 : switch_micro_time_now() - last_commit - profile->trans_timeout*1000;
if (sleepy_time < 1000 || sleepy_time > 1000000) {
sleepy_time = 1000;
}
if (sql || (switch_queue_pop_timeout(profile->sql_queue, &pop, sleepy_time) == SWITCH_STATUS_SUCCESS && pop)) {
switch_size_t newlen;
if (!sql) sql = (char *) pop;
newlen = strlen(sql) + 2 /* strlen(";\n") */ ;
if (len + newlen + 10 > sql_len) {
switch_size_t new_mlen = len + newlen + 10 + 10240;
if (new_mlen < SQLLEN) {
sql_len = new_mlen;
if (!(tmp = realloc(sqlbuf, sql_len))) {
abort();
break;
}
sqlbuf = tmp;
} else {
break;
}
}
sprintf(sqlbuf + len, "%s;\n", sql);
len += newlen;
free(sql);
sql = NULL;
statements++;
if (step_diff > profile->step_timeout) {
step_fail = 1;
}
}
/* Execute here */
last_commit = switch_micro_time_now();
if (len) {
//printf("TRANS:\n%s\n", sqlbuf);
switch_mutex_lock(profile->ireg_mutex);
sofia_glue_actually_execute_sql_trans(profile, sqlbuf, NULL);
//sofia_glue_actually_execute_sql(profile, "commit;\n", NULL);
switch_mutex_unlock(profile->ireg_mutex);
statements = 0;
len = 0;
if (profile->event_timeout) {
event_diff = (uint32_t) ((switch_time_now() - profile->last_sip_event) / 1000);
if (event_diff > profile->event_timeout) {
event_fail = 1;
}
}
} else {
if (switch_queue_pop_timeout(profile->sql_queue, &pop, 1000000) == SWITCH_STATUS_SUCCESS && pop) {
sofia_glue_actually_execute_sql(profile, (char *) pop, profile->ireg_mutex);
free(pop);
if (step_fail && profile->event_timeout && !event_fail) {
step_fail = 0;
}
if (event_fail || step_fail) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile %s: SIP STACK FAILURE DETECTED BY WATCHDOG!\n"
"GOODBYE CRUEL WORLD, I'M LEAVING YOU TODAY....GOODBYE, GOODBYE, GOOD BYE\n", profile->name);
switch_yield(2000000);
watchdog_triggered_abort();
}
}
if (switch_micro_time_now() - last_check >= 1000000) {
if (profile->watchdog_enabled) {
uint32_t event_diff = 0, step_diff = 0, event_fail = 0, step_fail = 0;
if (profile->step_timeout) {
step_diff = (uint32_t) ((switch_time_now() - profile->last_root_step) / 1000);
if (step_diff > profile->step_timeout) {
step_fail = 1;
}
}
if (profile->event_timeout) {
event_diff = (uint32_t) ((switch_time_now() - profile->last_sip_event) / 1000);
if (event_diff > profile->event_timeout) {
event_fail = 1;
}
}
if (step_fail && profile->event_timeout && !event_fail) {
step_fail = 0;
}
if (event_fail || step_fail) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile %s: SIP STACK FAILURE DETECTED BY WATCHDOG!\n"
"GOODBYE CRUEL WORLD, I'M LEAVING YOU TODAY....GOODBYE, GOODBYE, GOOD BYE\n", profile->name);
switch_yield(2000000);
watchdog_triggered_abort();
}
}
if (!sofia_test_pflag(profile, PFLAG_STANDBY)) {
if (++ireg_loops >= IREG_SECONDS) {
time_t now = switch_epoch_time_now(NULL);
sofia_reg_check_expire(profile, now, 0);
ireg_loops = 0;
}
if (++gateway_loops >= GATEWAY_SECONDS) {
sofia_reg_check_gateway(profile, switch_epoch_time_now(NULL));
gateway_loops = 0;
}
sofia_sub_check_gateway(profile, time(NULL));
if (!sofia_test_pflag(profile, PFLAG_STANDBY)) {
if (++ireg_loops >= IREG_SECONDS) {
time_t now = switch_epoch_time_now(NULL);
sofia_reg_check_expire(profile, now, 0);
ireg_loops = 0;
}
last_check = switch_micro_time_now();
if (++gateway_loops >= GATEWAY_SECONDS) {
sofia_reg_check_gateway(profile, switch_epoch_time_now(NULL));
gateway_loops = 0;
}
sofia_sub_check_gateway(profile, time(NULL));
}
}
switch_mutex_lock(profile->ireg_mutex);
while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
sofia_glue_actually_execute_sql(profile, (char *) pop, NULL);
free(pop);
switch_yield(1000000);
}
switch_mutex_unlock(profile->ireg_mutex);
sofia_clear_pflag_locked(profile, PFLAG_WORKER_RUNNING);
switch_safe_free(sqlbuf);
return NULL;
}
@ -2409,6 +2342,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
int sanity;
switch_thread_t *worker_thread;
switch_status_t st;
char qname [128] = "";
switch_mutex_lock(mod_sofia_globals.mutex);
mod_sofia_globals.threads++;
@ -2596,6 +2530,17 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
switch_mutex_init(&profile->ireg_mutex, SWITCH_MUTEX_NESTED, profile->pool);
switch_mutex_init(&profile->gateway_mutex, SWITCH_MUTEX_NESTED, profile->pool);
switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name);
switch_switch_sql_queue_manager_init_name(qname,
&profile->qm,
1,
profile->odbc_dsn ? profile->odbc_dsn : profile->dbname,
profile->pre_trans_execute,
profile->post_trans_execute,
profile->inner_pre_trans_execute,
profile->inner_post_trans_execute);
switch_switch_sql_queue_manager_start(profile->qm);
if (switch_event_create(&s_event, SWITCH_EVENT_PUBLISH) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "service", "_sip._udp,_sip._tcp,_sip._sctp%s",
(sofia_test_pflag(profile, PFLAG_TLS)) ? ",_sips._tcp" : "");
@ -2682,6 +2627,8 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void
switch_mutex_lock(profile->flag_mutex);
switch_mutex_unlock(profile->flag_mutex);
switch_switch_sql_queue_manager_stop(profile->qm);
if (switch_event_create(&s_event, SWITCH_EVENT_UNPUBLISH) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "service", "_sip._udp,_sip._tcp,_sip._sctp%s",
(sofia_test_pflag(profile, PFLAG_TLS)) ? ",_sips._tcp" : "");
@ -4405,7 +4352,6 @@ switch_status_t config_sofia(int reload, char *profile_name)
sofia_set_pflag(profile, PFLAG_SEND_DISPLAY_UPDATE);
sofia_set_pflag(profile, PFLAG_MESSAGE_QUERY_ON_FIRST_REGISTER);
//sofia_set_pflag(profile, PFLAG_PRESENCE_ON_FIRST_REGISTER);
sofia_set_pflag(profile, PFLAG_SQL_IN_TRANS);
profile->shutdown_type = "false";
profile->local_network = "localnet.auto";
@ -5107,20 +5053,6 @@ switch_status_t config_sofia(int reload, char *profile_name)
} else {
sofia_clear_pflag(profile, PFLAG_PASS_CALLEE_ID);
}
} else if (!strcasecmp(var, "sql-in-transactions")) {
int tmp = atoi(val);
if (switch_true(val)) {
tmp = 500;
}
if (tmp > 0) {
profile->trans_timeout = tmp;
sofia_set_pflag(profile, PFLAG_SQL_IN_TRANS);
} else {
sofia_clear_pflag(profile, PFLAG_SQL_IN_TRANS);
}
} else if (!strcasecmp(var, "enable-soa")) {
if (switch_true(val)) {
sofia_set_flag(profile, TFLAG_ENABLE_SOA);
@ -6102,8 +6034,7 @@ static void sofia_handle_sip_r_invite(switch_core_session_t *session, int status
switch_str_nil(presence_id), switch_str_nil(presence_data), switch_str_nil(p), (long) now);
switch_assert(sql);
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
switch_safe_free(sql);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
}
} else if (status == 200 && (profile->pres_type)) {
@ -9406,9 +9337,7 @@ void sofia_handle_sip_i_invite(switch_core_session_t *session, nua_t *nua, sofia
switch_assert(sql);
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
switch_safe_free(sql);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
}
if (is_nat) {

View File

@ -6269,7 +6269,8 @@ int sofia_glue_init_sql(sofia_profile_t *profile)
};
switch_cache_db_handle_t *dbh = sofia_glue_get_db_handle(profile);
char *test2;
if (!dbh) {
return 0;
}
@ -6283,20 +6284,22 @@ int sofia_glue_init_sql(sofia_profile_t *profile)
switch_cache_db_test_reactive(dbh, test_sql, "drop table sip_registrations", reg_sql);
if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
char *test2 = switch_mprintf("%s;%s", test_sql, test_sql);
test2 = switch_mprintf("%s;%s", test_sql, test_sql);
if (switch_cache_db_execute_sql(dbh, test2, NULL) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "GREAT SCOTT!!! Cannot execute batched statements!\n"
"If you are using mysql, make sure you are using MYODBC 3.51.18 or higher and enable FLAG_MULTI_STATEMENTS\n");
sofia_clear_pflag(profile, PFLAG_SQL_IN_TRANS);
}
if (switch_cache_db_execute_sql(dbh, test2, NULL) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "GREAT SCOTT!!! Cannot execute batched statements!\n"
"If you are using mysql, make sure you are using MYODBC 3.51.18 or higher and enable FLAG_MULTI_STATEMENTS\n");
switch_cache_db_release_db_handle(&dbh);
free(test2);
free(test_sql);
return 0;
}
free(test2);
free(test_sql);
test_sql = switch_mprintf("delete from sip_subscriptions where hostname='%q' and full_to='XXX'", mod_sofia_globals.hostname);
@ -6346,45 +6349,31 @@ int sofia_glue_init_sql(sofia_profile_t *profile)
void sofia_glue_execute_sql(sofia_profile_t *profile, char **sqlp, switch_bool_t sql_already_dynamic)
{
switch_status_t status = SWITCH_STATUS_FALSE;
char *d_sql = NULL, *sql;
char *sql;
switch_assert(sqlp && *sqlp);
sql = *sqlp;
sql = *sqlp;
if (profile->sql_queue) {
if (sql_already_dynamic) {
d_sql = sql;
} else {
d_sql = strdup(sql);
}
switch_assert(d_sql);
if ((status = switch_queue_trypush(profile->sql_queue, d_sql)) == SWITCH_STATUS_SUCCESS) {
d_sql = NULL;
}
} else if (sql_already_dynamic) {
d_sql = sql;
}
if (status != SWITCH_STATUS_SUCCESS) {
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
}
switch_safe_free(d_sql);
switch_switch_sql_queue_manager_push(profile->qm, sql, 0, !sql_already_dynamic);
if (sql_already_dynamic) {
*sqlp = NULL;
}
}
void sofia_glue_execute_sql_now(sofia_profile_t *profile, char **sqlp, switch_bool_t sql_already_dynamic)
{
sofia_glue_actually_execute_sql(profile, *sqlp, profile->ireg_mutex);
char *sql;
switch_assert(sqlp && *sqlp);
sql = *sqlp;
switch_switch_sql_queue_manager_push_confirm(profile->qm, sql, 0, !sql_already_dynamic);
if (sql_already_dynamic) {
switch_safe_free(*sqlp);
*sqlp = NULL;
}
*sqlp = NULL;
}

View File

@ -3619,9 +3619,7 @@ void sofia_presence_handle_sip_i_subscribe(int status,
}
switch_assert(sql != NULL);
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
switch_safe_free(sql);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
sstr = switch_mprintf("terminated;reason=noresource");
} else {
@ -4522,8 +4520,7 @@ void sofia_presence_check_subscriptions(sofia_profile_t *profile, time_t now)
"sub del sql: %s\n", sql);
}
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
switch_safe_free(sql);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
}
}

View File

@ -695,7 +695,7 @@ void sofia_reg_expire_call_id(sofia_profile_t *profile, const char *call_id, int
switch_safe_free(sql);
sql = switch_mprintf("delete from sip_registrations where call_id='%q' %s", call_id, sqlextra);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_FALSE);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
switch_safe_free(sqlextra);
switch_safe_free(sql);
@ -705,84 +705,80 @@ void sofia_reg_expire_call_id(sofia_profile_t *profile, const char *call_id, int
void sofia_reg_check_expire(sofia_profile_t *profile, time_t now, int reboot)
{
char sql[1024];
char *sql;
if (now) {
switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,expires"
sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,expires"
",user_agent,server_user,server_host,profile_name,network_ip"
",%d from sip_registrations where expires > 0 and expires <= %ld", reboot, (long) now);
} else {
switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,expires"
sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,expires"
",user_agent,server_user,server_host,profile_name,network_ip" ",%d from sip_registrations where expires > 0", reboot);
}
sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_del_callback, profile);
if (now) {
switch_snprintfv(sql, sizeof(sql), "delete from sip_registrations where expires > 0 and expires <= %ld and hostname='%q'",
sql = switch_mprintf("delete from sip_registrations where expires > 0 and expires <= %ld and hostname='%q'",
(long) now, mod_sofia_globals.hostname);
} else {
switch_snprintfv(sql, sizeof(sql), "delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
}
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
if (now) {
switch_snprintfv(sql, sizeof(sql), "select call_id from sip_shared_appearance_dialogs where hostname='%q' "
sql = switch_mprintf("select call_id from sip_shared_appearance_dialogs where hostname='%q' "
"and profile_name='%s' and expires <= %ld", mod_sofia_globals.hostname, profile->name, (long) now);
sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_sla_dialog_del_callback, profile);
switch_snprintfv(sql, sizeof(sql), "delete from sip_shared_appearance_dialogs where expires > 0 and hostname='%q' and expires <= %ld",
sql = switch_mprintf("delete from sip_shared_appearance_dialogs where expires > 0 and hostname='%q' and expires <= %ld",
mod_sofia_globals.hostname, (long) now);
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
}
if (now) {
switch_snprintfv(sql, sizeof(sql), "delete from sip_presence where expires > 0 and expires <= %ld and hostname='%q'",
sql = switch_mprintf("delete from sip_presence where expires > 0 and expires <= %ld and hostname='%q'",
(long) now, mod_sofia_globals.hostname);
} else {
switch_snprintfv(sql, sizeof(sql), "delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
}
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
if (now) {
switch_snprintfv(sql, sizeof(sql), "delete from sip_authentication where expires > 0 and expires <= %ld and hostname='%q'",
sql = switch_mprintf("delete from sip_authentication where expires > 0 and expires <= %ld and hostname='%q'",
(long) now, mod_sofia_globals.hostname);
} else {
switch_snprintfv(sql, sizeof(sql), "delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sql = switch_mprintf("delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
}
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
sofia_presence_check_subscriptions(profile, now);
sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
if (now) {
switch_snprintfv(sql, sizeof(sql), "delete from sip_dialogs where (expires = -1 or (expires > 0 and expires <= %ld)) and hostname='%q'",
sql = switch_mprintf("delete from sip_dialogs where (expires = -1 or (expires > 0 and expires <= %ld)) and hostname='%q'",
(long) now, mod_sofia_globals.hostname);
} else {
switch_snprintfv(sql, sizeof(sql), "delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
sql = switch_mprintf("delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
}
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
if (now) {
if (sofia_test_pflag(profile, PFLAG_ALL_REG_OPTIONS_PING)) {
switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,"
sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,"
"expires,user_agent,server_user,server_host,profile_name"
" from sip_registrations where hostname='%s' and "
"profile_name='%s'", mod_sofia_globals.hostname, profile->name);
sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_nat_callback, profile);
} else if (sofia_test_pflag(profile, PFLAG_NAT_OPTIONS_PING)) {
switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,"
sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,"
"expires,user_agent,server_user,server_host,profile_name"
" from sip_registrations where (status like '%%NAT%%' "
"or contact like '%%fs_nat=yes%%') and hostname='%s' "
@ -846,37 +842,36 @@ void sofia_reg_check_call_id(sofia_profile_t *profile, const char *call_id)
void sofia_reg_check_sync(sofia_profile_t *profile)
{
char sql[1024];
char *sql;
switch_snprintf(sql, sizeof(sql), "select call_id,sip_user,sip_host,contact,status,rpid,expires"
sql = switch_mprintf("select call_id,sip_user,sip_host,contact,status,rpid,expires"
",user_agent,server_user,server_host,profile_name,network_ip"
" from sip_registrations where expires > 0");
sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_del_callback, profile);
switch_snprintfv(sql, sizeof(sql), "delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
switch_snprintfv(sql, sizeof(sql), "delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
switch_snprintfv(sql, sizeof(sql), "delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
sql = switch_mprintf("delete from sip_authentication where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
switch_snprintfv(sql, sizeof(sql), "delete from sip_subscriptions where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
sql = switch_mprintf("delete from sip_subscriptions where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
switch_snprintfv(sql, sizeof(sql), "delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
sql = switch_mprintf("delete from sip_dialogs where expires >= -1 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
}
char *sofia_reg_find_reg_url(sofia_profile_t *profile, const char *user, const char *host, char *val, switch_size_t len)
{
struct callback_t cbt = { 0 };
char sql[512] = "";
char *sql;
if (!user) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Called with null user!\n");
@ -887,10 +882,10 @@ char *sofia_reg_find_reg_url(sofia_profile_t *profile, const char *user, const c
cbt.len = len;
if (host) {
switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
sql = switch_mprintf("select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
user, host, host);
} else {
switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q'", user);
sql = switch_mprintf("select contact from sip_registrations where sip_user='%q'", user);
}
@ -908,7 +903,7 @@ char *sofia_reg_find_reg_url(sofia_profile_t *profile, const char *user, const c
switch_console_callback_match_t *sofia_reg_find_reg_url_multi(sofia_profile_t *profile, const char *user, const char *host)
{
struct callback_t cbt = { 0 };
char sql[512] = "";
char *sql;
if (!user) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Called with null user!\n");
@ -916,10 +911,10 @@ switch_console_callback_match_t *sofia_reg_find_reg_url_multi(sofia_profile_t *p
}
if (host) {
switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
sql = switch_mprintf("select contact from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
user, host, host);
} else {
switch_snprintfv(sql, sizeof(sql), "select contact from sip_registrations where sip_user='%q'", user);
sql = switch_mprintf("select contact from sip_registrations where sip_user='%q'", user);
}
@ -932,7 +927,7 @@ switch_console_callback_match_t *sofia_reg_find_reg_url_multi(sofia_profile_t *p
switch_console_callback_match_t *sofia_reg_find_reg_url_with_positive_expires_multi(sofia_profile_t *profile, const char *user, const char *host)
{
struct callback_t cbt = { 0 };
char sql[512] = "";
char *sql;
if (!user) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Called with null user!\n");
@ -940,10 +935,10 @@ switch_console_callback_match_t *sofia_reg_find_reg_url_with_positive_expires_mu
}
if (host) {
switch_snprintfv(sql, sizeof(sql), "select contact,expires from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
sql = switch_mprintf("select contact,expires from sip_registrations where sip_user='%q' and (sip_host='%q' or presence_hosts like '%%%q%%')",
user, host, host);
} else {
switch_snprintfv(sql, sizeof(sql), "select contact,expires from sip_registrations where sip_user='%q'", user);
sql = switch_mprintf("select contact,expires from sip_registrations where sip_user='%q'", user);
}
sofia_glue_execute_sql_callback(profile, profile->ireg_mutex, sql, sofia_reg_find_reg_with_positive_expires_callback, &cbt);
@ -973,8 +968,7 @@ void sofia_reg_auth_challenge(sofia_profile_t *profile, nua_handle_t *nh, sofia_
(long) switch_epoch_time_now(NULL) + (profile->nonce_ttl ? profile->nonce_ttl : DEFAULT_NONCE_TTL),
profile->name, mod_sofia_globals.hostname);
switch_assert(sql != NULL);
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
switch_safe_free(sql);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
auth_str = switch_mprintf("Digest realm=\"%q\", nonce=\"%q\",%s algorithm=MD5, qop=\"auth\"", realm, uuid_str, stale ? " stale=true," : "");
@ -2802,8 +2796,7 @@ auth_res_t sofia_reg_parse_auth(sofia_profile_t *profile,
switch_epoch_time_now(NULL) + (profile->nonce_ttl ? profile->nonce_ttl : exptime + 10), ncl, nonce);
switch_assert(sql != NULL);
sofia_glue_actually_execute_sql(profile, sql, profile->ireg_mutex);
switch_safe_free(sql);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
if (ret == AUTH_OK)
ret = AUTH_RENEWED;

View File

@ -1530,25 +1530,25 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
}
if (check_status == SWITCH_STATUS_SUCCESS) {
switch_core_session_t *session = (switch_core_session_t *) pop;
switch_size_t id;
switch_thread_data_t *td = (switch_thread_data_t *) pop;
if (!session) break;
if (!td) break;
id = session->id;
switch_mutex_lock(session_manager.mutex);
session_manager.busy++;
switch_mutex_unlock(session_manager.mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing session %"SWITCH_SIZE_T_FMT" %s\n",
(long) thread, id, switch_core_session_get_name(session));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing\n", (long) thread);
switch_core_session_thread(thread, (void *) session);
td->func(thread, td->obj);
if (td->alloc) {
free(td);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Done Processing\n", (long) thread);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Done Processing session %"SWITCH_SIZE_T_FMT"\n",
(long) thread, id);
switch_mutex_lock(session_manager.mutex);
session_manager.busy--;
switch_mutex_unlock(session_manager.mutex);
@ -1656,11 +1656,27 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_t
return NULL;
}
SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp)
{
switch_status_t status = SWITCH_STATUS_SUCCESS;
switch_thread_data_t *td;
switch_assert(tdp);
td = *tdp;
*tdp = NULL;
switch_queue_push(session_manager.thread_queue, td);
check_queue();
return status;
}
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session)
{
switch_status_t status = SWITCH_STATUS_INUSE;
switch_thread_data_t *td;
switch_mutex_lock(session->mutex);
if (switch_test_flag(session, SSF_THREAD_RUNNING)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Cannot double-launch thread!\n");
@ -1670,7 +1686,10 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_co
status = SWITCH_STATUS_SUCCESS;
switch_set_flag(session, SSF_THREAD_RUNNING);
switch_set_flag(session, SSF_THREAD_STARTED);
switch_queue_push(session_manager.thread_queue, session);
td = switch_core_session_alloc(session, sizeof(*td));
td->obj = session;
td->func = switch_core_session_thread;
switch_queue_push(session_manager.thread_queue, td);
check_queue();
}
switch_mutex_unlock(session->mutex);

View File

@ -1214,14 +1214,19 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *threa
static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj);
struct switch_sql_queue_manager {
const char *name;
switch_cache_db_handle_t *event_db;
switch_queue_t **sql_queue;
uint32_t *pre_written;
uint32_t *written;
int *sizes;
uint32_t numq;
char *dsn;
switch_thread_t *thread;
int thread_running;
switch_thread_cond_t *cond;
switch_mutex_t *cond_mutex;
switch_mutex_t *mutex;
char *pre_trans_execute;
char *post_trans_execute;
char *inner_pre_trans_execute;
@ -1229,12 +1234,15 @@ struct switch_sql_queue_manager {
switch_memory_pool_t *pool;
};
static void qm_wake(switch_sql_queue_manager_t *qm)
static int qm_wake(switch_sql_queue_manager_t *qm)
{
if (switch_mutex_trylock(qm->cond_mutex) == SWITCH_STATUS_SUCCESS) {
switch_thread_cond_signal(qm->cond);
switch_mutex_unlock(qm->cond_mutex);
return 1;
}
return 0;
}
static uint32_t qm_ttl(switch_sql_queue_manager_t *qm)
@ -1335,15 +1343,59 @@ SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push(switch_sql_
}
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup)
{
int want, size, x = 0, sanity = 0;
uint32_t written;
if (!qm->thread_running) {
return SWITCH_STATUS_FALSE;
}
if (sql_manager.thread_running != 1) {
return SWITCH_STATUS_FALSE;
}
if (pos > qm->numq - 1) {
pos = 0;
}
switch_queue_push(qm->sql_queue[pos], dup ? strdup(sql) : (char *)sql);
switch_mutex_lock(qm->mutex);
written = qm->written[pos];
size = qm->sizes[pos];
want = written + size;
switch_mutex_unlock(qm->mutex);
qm_wake(qm);
while((qm->written[pos] < want) || (qm->written[pos] >= written && want < written && qm->written[pos] > want)) {
switch_yield(5000);
if (++x == 200) {
qm_wake(qm);
x = 0;
if (++sanity == 20) {
break;
}
}
}
return SWITCH_STATUS_SUCCESS;
}
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init(switch_sql_queue_manager_t **qmp,
uint32_t numq, const char *dsn,
const char *pre_trans_execute,
const char *post_trans_execute,
const char *inner_pre_trans_execute,
const char *inner_post_trans_execute)
SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init_name(const char *name,
switch_sql_queue_manager_t **qmp,
uint32_t numq, const char *dsn,
const char *pre_trans_execute,
const char *post_trans_execute,
const char *inner_pre_trans_execute,
const char *inner_post_trans_execute)
{
switch_memory_pool_t *pool;
switch_sql_queue_manager_t *qm;
@ -1357,11 +1409,16 @@ SWITCH_DECLARE(switch_status_t) switch_switch_sql_queue_manager_init(switch_sql_
qm->pool = pool;
qm->numq = numq;
qm->dsn = switch_core_strdup(qm->pool, dsn);
qm->name = switch_core_strdup(qm->pool, name);
switch_mutex_init(&qm->cond_mutex, SWITCH_MUTEX_NESTED, qm->pool);
switch_mutex_init(&qm->mutex, SWITCH_MUTEX_NESTED, qm->pool);
switch_thread_cond_create(&qm->cond, qm->pool);
qm->sql_queue = switch_core_alloc(qm->pool, sizeof(switch_queue_t *) * numq);
qm->sizes = switch_core_alloc(qm->pool, sizeof(int) * numq);
qm->written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
qm->pre_written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
for (i = 0; i < qm->numq; i++) {
switch_queue_create(&qm->sql_queue[i], SWITCH_SQL_QUEUE_LEN, qm->pool);
@ -1400,13 +1457,13 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
while (!qm->event_db) {
if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
break;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error getting core db, Retrying\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s Error getting db handle, Retrying\n", qm->name);
switch_yield(500000);
sanity--;
}
if (!qm->event_db) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error getting core db\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
return NULL;
}
@ -1431,14 +1488,19 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
while (qm->thread_running == 1) {
int proceed = !!save_sql;
int pindex = -1;
if (!proceed) {
for (i = 0; i < qm->numq; i++) {
if (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
if (sql_manager.thread_running != 1) {
free(pop);
pop = NULL;
if (pop) {
switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL);
free(pop);
pop = NULL;
}
} else {
pindex = i;
proceed = 1;
break;
}
@ -1470,7 +1532,8 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
for (i = 0; i < qm->numq; i++) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"REALLOC QUEUE %ld %d %d\n",
"%s REALLOC QUEUE %ld %d %d\n",
qm->name,
(long int)sql_len,
i,
switch_queue_size(qm->sql_queue[i]));
@ -1478,7 +1541,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
}
}
if (!(tmp = realloc(sqlbuf, sql_len))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread ending on mem err\n", qm->name);
abort();
break;
}
@ -1487,7 +1550,8 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
for (i = 0; i < qm->numq; i++) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"SAVE QUEUE %d %d\n",
"%s SAVE QUEUE %d %d\n",
qm->name,
i,
switch_queue_size(qm->sql_queue[i]));
@ -1499,6 +1563,10 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
goto skip;
}
}
switch_mutex_lock(qm->mutex);
qm->pre_written[pindex]++;
switch_mutex_unlock(qm->mutex);
iterations++;
sprintf(sqlbuf + len, "%s;\n", sql);
@ -1506,7 +1574,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
free(sql);
sql = NULL;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s, SQL thread ending\n", qm->name);
break;
}
}
@ -1519,14 +1587,14 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
auto_pause = 1;
switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
auto_pause = 1;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue overflowing [%d], Pausing calls.\n", lc);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue overflowing [%d], Pausing calls.\n", qm->name, lc);
}
} else {
if (auto_pause && lc < 1000) {
auto_pause = 0;
switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
auto_pause = 0;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue back to normal size, resuming..\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue back to normal size, resuming..\n", qm->name);
}
}
@ -1535,14 +1603,23 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
wrote = 0;
if (trans && iterations && (iterations > target || !lc)) {
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
char line[128] = "";
int l;
switch_snprintf(line, sizeof(line), "%s RUN QUEUE ", qm->name);
for (i = 0; i < qm->numq; i++) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"RUN QUEUE %d %d %d\n",
i,
switch_queue_size(qm->sql_queue[i]),
iterations);
l = strlen(line);
switch_snprintf(line + l, sizeof(line) - l, "%d:%d ", i, switch_queue_size(qm->sql_queue[i]));
}
l = strlen(line);
switch_snprintf(line + l, sizeof(line) - l, "%d\n", iterations);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line);
}
if (switch_cache_db_persistant_execute_trans_full(qm->event_db, sqlbuf, 1,
qm->pre_trans_execute,
@ -1550,13 +1627,12 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
qm->inner_pre_trans_execute,
qm->inner_post_trans_execute
) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread unable to commit transaction, records lost!\n", qm->name);
}
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "DONE\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s DONE\n", qm->name);
}
iterations = 0;
trans = 0;
len = 0;
@ -1572,6 +1648,14 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
lc = qm_ttl(qm);
switch_mutex_lock(qm->mutex);
for (i = 0; i < qm->numq; i++) {
qm->sizes[i] = switch_queue_size(qm->sql_queue[i]);
qm->written[i] += qm->pre_written[i];
qm->pre_written[i] = 0;
}
switch_mutex_unlock(qm->mutex);
if (!lc) {
switch_thread_cond_wait(qm->cond, qm->cond_mutex);
} else if (wrote) {
@ -1587,7 +1671,10 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
for(i = 0; i < qm->numq; i++) {
while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
switch_safe_free(pop);
if (pop) {
switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL);
free(pop);
}
}
}