fix sql queue manager issues

This commit is contained in:
Anthony Minessale 2012-11-21 21:15:31 -06:00
parent 2b2a4fb256
commit 5dccbe4818
2 changed files with 21 additions and 18 deletions

View File

@ -873,13 +873,13 @@ static void do_dialog_probe(switch_event_t *event)
if (mod_sofia_globals.debug_presence > 1) { if (mod_sofia_globals.debug_presence > 1) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s DUMP DIALOG_PROBE set version sql:\n%s\n", profile->name, sql); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s DUMP DIALOG_PROBE set version sql:\n%s\n", profile->name, sql);
} }
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); sofia_glue_execute_sql_soon(profile, &sql, SWITCH_TRUE);
switch_safe_free(sql); switch_safe_free(sql);
// The dialog_probe_callback has built up the dialogs to be included in the NOTIFY. // The dialog_probe_callback has built up the dialogs to be included in the NOTIFY.
// Now send the "full" dialog event to the triggering subscription. // Now send the "full" dialog event to the triggering subscription.
sql = switch_mprintf("select call_id,expires,sub_to_user,sub_to_host,event,version, " sql = switch_mprintf("select call_id,expires,sub_to_user,sub_to_host,event,version+1, "
"'full',full_to,full_from,contact,network_ip,network_port " "'full',full_to,full_from,contact,network_ip,network_port "
"from sip_subscriptions " "from sip_subscriptions "
"where hostname='%q' and profile_name='%q' and sub_to_user='%q' and sub_to_host='%q' and call_id='%q'", "where hostname='%q' and profile_name='%q' and sub_to_user='%q' and sub_to_host='%q' and call_id='%q'",
@ -4543,7 +4543,7 @@ void sofia_presence_check_subscriptions(sofia_profile_t *profile, time_t now)
"((expires > 0 and expires <= %ld)) and profile_name='%q' and hostname='%q'", "((expires > 0 and expires <= %ld)) and profile_name='%q' and hostname='%q'",
(long) now, profile->name, mod_sofia_globals.hostname); (long) now, profile->name, mod_sofia_globals.hostname);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); sofia_glue_execute_sql_soon(profile, &sql, SWITCH_TRUE);
switch_safe_free(sql); switch_safe_free(sql);
sql = switch_mprintf("select full_to, full_from, contact, -1, call_id, event, network_ip, network_port, " sql = switch_mprintf("select full_to, full_from, contact, -1, call_id, event, network_ip, network_port, "

View File

@ -1314,10 +1314,12 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_
} }
static void do_flush(switch_queue_t *q, switch_cache_db_handle_t *dbh) static void do_flush(switch_sql_queue_manager_t *qm, int i, switch_cache_db_handle_t *dbh)
{ {
void *pop = NULL; void *pop = NULL;
switch_queue_t *q = qm->sql_queue[i];
switch_mutex_lock(qm->mutex);
while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) { while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) {
if (pop) { if (pop) {
if (dbh) { if (dbh) {
@ -1326,6 +1328,7 @@ static void do_flush(switch_queue_t *q, switch_cache_db_handle_t *dbh)
free(pop); free(pop);
} }
} }
switch_mutex_unlock(qm->mutex);
} }
@ -1347,7 +1350,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queu
for(i = 0; i < qm->numq; i++) { for(i = 0; i < qm->numq; i++) {
do_flush(qm->sql_queue[i], NULL); do_flush(qm, i, NULL);
} }
pool = qm->pool; pool = qm->pool;
@ -1408,7 +1411,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql
switch_queue_push(qm->sql_queue[pos], dup ? strdup(sql) : (char *)sql); switch_queue_push(qm->sql_queue[pos], dup ? strdup(sql) : (char *)sql);
written = qm->written[pos]; written = qm->written[pos];
size = switch_sql_queue_manager_size(qm, pos); size = switch_sql_queue_manager_size(qm, pos);
want = written + size; want = written + qm->pre_written[pos] + size;
switch_mutex_unlock(qm->mutex); switch_mutex_unlock(qm->mutex);
qm_wake(qm); qm_wake(qm);
@ -1563,7 +1566,9 @@ static uint32_t do_trans(switch_sql_queue_manager_t *qm)
if (pop) { if (pop) {
if ((status = switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL)) == SWITCH_STATUS_SUCCESS) { if ((status = switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL)) == SWITCH_STATUS_SUCCESS) {
switch_mutex_lock(qm->mutex);
qm->pre_written[i]++; qm->pre_written[i]++;
switch_mutex_unlock(qm->mutex);
ttl++; ttl++;
} }
free(pop); free(pop);
@ -1633,7 +1638,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
uint32_t sanity = 120; uint32_t sanity = 120;
switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj; switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
uint32_t i, countdown = 0; uint32_t i;
while (!qm->event_db) { while (!qm->event_db) {
if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db) if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
@ -1674,7 +1679,7 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
if (sql_manager.paused) { if (sql_manager.paused) {
for (i = 0; i < qm->numq; i++) { for (i = 0; i < qm->numq; i++) {
do_flush(qm->sql_queue[i], NULL); do_flush(qm, i, NULL);
} }
goto check; goto check;
} }
@ -1707,21 +1712,19 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
check: check:
countdown = 40; if ((lc = qm_ttl(qm)) < qm->max_trans / 4) {
switch_yield(500000);
while (--countdown && (lc = qm_ttl(qm)) < qm->max_trans / 4) { } else if (lc == 0) {
if (lc == 0) { switch_thread_cond_wait(qm->cond, qm->cond_mutex);
switch_thread_cond_wait(qm->cond, qm->cond_mutex); } else {
break; switch_cond_next();
}
switch_yield(5000);
} }
} }
switch_mutex_unlock(qm->cond_mutex); switch_mutex_unlock(qm->cond_mutex);
for(i = 0; i < qm->numq; i++) { for(i = 0; i < qm->numq; i++) {
do_flush(qm->sql_queue[i], qm->event_db); do_flush(qm, i, qm->event_db);
} }
qm->thread_running = 0; qm->thread_running = 0;