diff --git a/src/include/private/switch_core_pvt.h b/src/include/private/switch_core_pvt.h index b7d5f75868..40f51ba61a 100644 --- a/src/include/private/switch_core_pvt.h +++ b/src/include/private/switch_core_pvt.h @@ -256,8 +256,6 @@ struct switch_runtime { switch_profile_timer_t *profile_timer; double profile_time; double min_idle_time; - int sql_buffer_len; - int max_sql_buffer_len; switch_dbtype_t odbc_dbtype; char hostname[256]; char *switchname; diff --git a/src/include/switch_core.h b/src/include/switch_core.h index 69a9b07576..73a8fc8f17 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -2205,8 +2205,9 @@ SWITCH_DECLARE(switch_status_t) switch_core_chat_send(const char *dest_proto, sw SWITCH_DECLARE(switch_status_t) switch_core_chat_deliver(const char *dest_proto, switch_event_t **message_event); SWITCH_DECLARE(switch_status_t) switch_ivr_preprocess_session(switch_core_session_t *session, const char *cmds); -SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void); -SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void); +SWITCH_DECLARE(void) switch_core_sqldb_pause(void); +SWITCH_DECLARE(void) switch_core_sqldb_resume(void); + ///\} diff --git a/src/switch_core.c b/src/switch_core.c index 431ed8d3a1..73d96b2869 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -1470,11 +1470,10 @@ SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switc runtime.db_handle_timeout = 5000000; runtime.runlevel++; - runtime.sql_buffer_len = 1024 * 32; - runtime.max_sql_buffer_len = 1024 * 1024; runtime.dummy_cng_frame.data = runtime.dummy_data; runtime.dummy_cng_frame.datalen = sizeof(runtime.dummy_data); runtime.dummy_cng_frame.buflen = sizeof(runtime.dummy_data); + runtime.dbname = "core"; switch_set_flag((&runtime.dummy_cng_frame), SFF_CNG); switch_set_flag((&runtime), SCF_AUTO_SCHEMAS); switch_set_flag((&runtime), SCF_CLEAR_SQL); @@ -1754,37 +1753,6 @@ static void switch_load_core_config(const char *file) } else if (!strcasecmp(var, "multiple-registrations")) { runtime.multiple_registrations = switch_true(val); - } else if (!strcasecmp(var, "sql-buffer-len")) { - int tmp = atoi(val); - - if (end_of(val) == 'k') { - tmp *= 1024; - } else if (end_of(val) == 'm') { - tmp *= (1024 * 1024); - } - - if (tmp >= 32000 && tmp < 10500000) { - runtime.sql_buffer_len = tmp; - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "sql-buffer-len: Value is not within rage 32k to 10m\n"); - } - } else if (!strcasecmp(var, "max-sql-buffer-len")) { - int tmp = atoi(val); - - if (end_of(val) == 'k') { - tmp *= 1024; - } else if (end_of(val) == 'm') { - tmp *= (1024 * 1024); - } - - if (tmp < runtime.sql_buffer_len) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Value is not larger than sql-buffer-len\n"); - } else if (tmp >= 32000 && tmp < 10500000) { - runtime.max_sql_buffer_len = tmp; - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "max-sql-buffer-len: Value is not within rage 32k to 10m\n"); - } - } else if (!strcasecmp(var, "auto-create-schemas")) { if (switch_true(val)) { switch_set_flag((&runtime), SCF_AUTO_SCHEMAS); @@ -2256,9 +2224,9 @@ SWITCH_DECLARE(int32_t) switch_core_session_ctl(switch_session_ctl_t cmd, void * break; case SCSC_SQL: if (oldintval) { - switch_core_sqldb_start_thread(); + switch_core_sqldb_resume(); } else { - switch_core_sqldb_stop_thread(); + switch_core_sqldb_pause(); } break; case SCSC_PAUSE_ALL: diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index 45ea99ba29..6e449b6f7b 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -56,26 +56,25 @@ struct switch_cache_db_handle { }; static struct { - switch_cache_db_handle_t *event_db; - switch_queue_t *sql_queue[4]; switch_memory_pool_t *memory_pool; - switch_thread_t *thread; switch_thread_t *db_thread; - int thread_running; int db_thread_running; switch_bool_t manage; switch_mutex_t *io_mutex; switch_mutex_t *dbh_mutex; switch_mutex_t *ctl_mutex; switch_cache_db_handle_t *handle_pool; - switch_thread_cond_t *cond; - switch_mutex_t *cond_mutex; uint32_t total_handles; uint32_t total_used_handles; switch_cache_db_handle_t *dbh; + switch_sql_queue_manager_t *qm; + int paused; } sql_manager; +static void switch_core_sqldb_start_thread(void); +static void switch_core_sqldb_stop_thread(void); + static switch_cache_db_handle_t *create_handle(switch_cache_db_handle_type_t type) { switch_cache_db_handle_t *new_dbh = NULL; @@ -575,19 +574,6 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t return status; } -static void wake_thread(int force) -{ - if (force) { - switch_thread_cond_signal(sql_manager.cond); - return; - } - - if (switch_mutex_trylock(sql_manager.cond_mutex) == SWITCH_STATUS_SUCCESS) { - switch_thread_cond_signal(sql_manager.cond); - switch_mutex_unlock(sql_manager.cond_mutex); - } -} - /** OMFG you cruel bastards. Who chooses 64k as a max buffer len for a sql statement, have you ever heard of transactions? **/ @@ -1195,7 +1181,6 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *threa while (sql_manager.db_thread_running == 1) { if (++sec == SQL_CACHE_TIMEOUT) { sql_close(switch_epoch_time_now(NULL)); - wake_thread(0); sec = 0; } @@ -1217,7 +1202,6 @@ struct switch_sql_queue_manager { const char *name; switch_cache_db_handle_t *event_db; switch_queue_t **sql_queue; - uint32_t *pre_written; uint32_t *written; uint32_t numq; char *dsn; @@ -1281,6 +1265,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_m } if (qm->thread) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Stopping SQL thread.\n", qm->name); switch_thread_join(&status, qm->thread); qm->thread = NULL; status = SWITCH_STATUS_SUCCESS; @@ -1294,7 +1279,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_ switch_threadattr_t *thd_attr; if (!qm->thread_running) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting SQL thread.\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Starting SQL thread.\n", qm->name); switch_threadattr_create(&thd_attr, qm->pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_priority_set(thd_attr, SWITCH_PRI_NORMAL); @@ -1306,24 +1291,40 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_ } +static void do_flush(switch_queue_t *q, switch_cache_db_handle_t *dbh) +{ + void *pop = NULL; + + while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) { + if (pop) { + if (dbh) { + switch_cache_db_execute_sql(dbh, (char *) pop, NULL); + } + free(pop); + } + } + +} + SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp) { switch_sql_queue_manager_t *qm; switch_status_t status = SWITCH_STATUS_SUCCESS; switch_memory_pool_t *pool; - void *pop; uint32_t i; switch_assert(qmp); qm = *qmp; *qmp = NULL; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Destroying SQL queue.\n", qm->name); + switch_sql_queue_manager_stop(qm); + + for(i = 0; i < qm->numq; i++) { - while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) { - switch_safe_free(pop); - } + do_flush(qm->sql_queue[i], NULL); } pool = qm->pool; @@ -1335,14 +1336,17 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queu SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup) { + if (sql_manager.paused) { + if (!dup) free((char *)sql); + qm_wake(qm); + return SWITCH_STATUS_SUCCESS; + } + if (!qm->thread_running) { + if (!dup) free((char *)sql); return SWITCH_STATUS_FALSE; } - if (sql_manager.thread_running != 1) { - return SWITCH_STATUS_FALSE; - } - if (pos > qm->numq - 1) { pos = 0; } @@ -1362,14 +1366,17 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql int size, x = 0, sanity = 0; uint32_t written, want; + if (sql_manager.paused) { + if (!dup) free((char *)sql); + qm_wake(qm); + return SWITCH_STATUS_SUCCESS; + } + if (!qm->thread_running) { + if (!dup) free((char *)sql); return SWITCH_STATUS_FALSE; } - if (sql_manager.thread_running != 1) { - return SWITCH_STATUS_FALSE; - } - if (pos > qm->numq - 1) { pos = 0; } @@ -1430,7 +1437,6 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n qm->sql_queue = switch_core_alloc(qm->pool, sizeof(switch_queue_t *) * numq); qm->written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq); - qm->pre_written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq); for (i = 0; i < qm->numq; i++) { switch_queue_create(&qm->sql_queue[i], SWITCH_SQL_QUEUE_LEN, qm->pool); @@ -1448,24 +1454,111 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n return SWITCH_STATUS_SUCCESS; } + +static uint32_t do_trans(switch_cache_db_handle_t *dbh, + switch_queue_t *q, + switch_mutex_t *mutex, + const char *pre_trans_execute, + const char *post_trans_execute, + const char *inner_pre_trans_execute, + const char *inner_post_trans_execute) +{ + char *errmsg = NULL; + void *pop; + switch_status_t status; + uint32_t ttl = 0; + + if (!switch_queue_size(q)) { + return 0; + } + + switch(dbh->type) { + case SCDB_TYPE_CORE_DB: + { + switch_cache_db_execute_sql_real(dbh, "BEGIN", &errmsg); + } + break; + case SCDB_TYPE_ODBC: + { + switch_odbc_status_t result; + + if ((result = switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) { + char tmp[100]; + switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result); + errmsg = strdup(tmp); + } + } + break; + case SCDB_TYPE_PGSQL: + { + switch_pgsql_status_t result; + + if ((result = switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 0)) != SWITCH_PGSQL_SUCCESS) { + char tmp[100]; + switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result); + errmsg = strdup(tmp); + } + } + break; + } + + if (errmsg) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "ERROR [%s]\n", errmsg); + free(errmsg); + goto end; + } + + + for(;;) { + if (mutex) switch_mutex_lock(mutex); + status = switch_queue_trypop(q, &pop); + if (mutex) switch_mutex_unlock(mutex); + + if (status != SWITCH_STATUS_SUCCESS || !pop) break; + + if ((status = switch_cache_db_execute_sql(dbh, (char *) pop, NULL)) == SWITCH_STATUS_SUCCESS) { + ttl++; + } + free(pop); + + if (status != SWITCH_STATUS_SUCCESS) break; + } + + end: + + switch(dbh->type) { + case SCDB_TYPE_CORE_DB: + { + switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL); + } + break; + case SCDB_TYPE_ODBC: + { + switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1); + switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1); + } + break; + case SCDB_TYPE_PGSQL: + { + switch_pgsql_SQLEndTran(dbh->native_handle.pgsql_dbh, 1); + switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 1); + switch_pgsql_finish_results(dbh->native_handle.pgsql_dbh); + } + break; + } + + + + return ttl; +} + static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj) { - void *pop = NULL; - uint32_t iterations = 0; - uint8_t trans = 0; - uint32_t target = 20000; - switch_size_t len = 0, sql_len = runtime.sql_buffer_len; - char *tmp, *sqlbuf = (char *) malloc(sql_len); - char *sql = NULL, *save_sql = NULL; - switch_size_t newlen; - int lc = 0, wrote = 0, do_sleep = 1; + uint32_t sanity = 120; - int auto_pause = 0; switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj; uint32_t i; - switch_assert(sqlbuf); - while (!qm->event_db) { if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db) break; @@ -1498,428 +1591,75 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, break; } + while (qm->thread_running == 1) { - int proceed = !!save_sql; - int pindex = -1; + int lc; + int i; + uint32_t iterations = 0; - if (!proceed) { + if (sql_manager.paused) { for (i = 0; i < qm->numq; i++) { - switch_status_t status; - - switch_mutex_lock(qm->mutex); - status = switch_queue_trypop(qm->sql_queue[i], &pop); - switch_mutex_unlock(qm->mutex); - - if (status == SWITCH_STATUS_SUCCESS) { - if (sql_manager.thread_running != 1) { - if (pop) { - switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL); - free(pop); - pop = NULL; - } - } else { - pindex = i; - proceed = 1; - break; - } - } + do_flush(qm->sql_queue[i], NULL); } + goto check; } + + for (i = 0; i < qm->numq; i++) { + uint32_t written = do_trans(qm->event_db, qm->sql_queue[i], NULL, + qm->pre_trans_execute, + qm->post_trans_execute, + qm->inner_pre_trans_execute, + qm->inner_post_trans_execute); - if (proceed) { - - if (save_sql) { - sql = save_sql; - save_sql = NULL; - } else if ((sql = (char *) pop)) { - pop = NULL; + iterations += written; + + switch_mutex_lock(qm->mutex); + qm->written[i] += written; + switch_mutex_unlock(qm->mutex); + } + + if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { + char line[128] = ""; + int l; + + switch_snprintf(line, sizeof(line), "%s RUN QUEUE [", qm->name); + + for (i = 0; i < qm->numq; i++) { + l = strlen(line); + switch_snprintf(line + l, sizeof(line) - l, "%d%s", switch_queue_size(qm->sql_queue[i]), i == qm->numq - 1 ? "" : "|"); } - if (sql) { - newlen = strlen(sql) + 2; - - if (iterations == 0) { - trans = 1; - } - - if (len + newlen + 1 > sql_len) { - int new_mlen = len + newlen + 10240; - - if (new_mlen < runtime.max_sql_buffer_len) { - sql_len = new_mlen; - if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { - for (i = 0; i < qm->numq; i++) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "%s REALLOC QUEUE %ld %d %d\n", - qm->name, - (long int)sql_len, - i, - switch_queue_size(qm->sql_queue[i])); - - } - } - if (!(tmp = realloc(sqlbuf, sql_len))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread ending on mem err\n", qm->name); - abort(); - break; - } - sqlbuf = tmp; - } else { - if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { - for (i = 0; i < qm->numq; i++) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "%s SAVE QUEUE %d %d\n", - qm->name, - i, - switch_queue_size(qm->sql_queue[i])); - - } - } - save_sql = sql; - sql = NULL; - lc = 0; - goto skip; - } - } - - switch_mutex_lock(qm->mutex); - qm->pre_written[pindex]++; - switch_mutex_unlock(qm->mutex); - - iterations++; - sprintf(sqlbuf + len, "%s;\n", sql); - len += newlen; - free(sql); - sql = NULL; - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s, SQL thread ending\n", qm->name); - break; - } + l = strlen(line); + switch_snprintf(line + l, sizeof(line) - l, "]--[%d]\n", iterations); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line); + } - lc = qm_ttl(qm); - + check: - if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) { - if (!auto_pause) { - auto_pause = 1; - switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause); - auto_pause = 1; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue overflowing [%d], Pausing calls.\n", qm->name, lc); - } - } else { - if (auto_pause && lc < 1000) { - auto_pause = 0; - switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause); - auto_pause = 0; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue back to normal size, resuming..\n", qm->name); - } - } - - skip: - - wrote = 0; + lc = qm_ttl(qm); - if (trans && iterations && (iterations > target || !lc)) { - - if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { - char line[128] = ""; - int l; - - switch_snprintf(line, sizeof(line), "%s RUN QUEUE ", qm->name); - - for (i = 0; i < qm->numq; i++) { - l = strlen(line); - switch_snprintf(line + l, sizeof(line) - l, "%d:%d ", i, switch_queue_size(qm->sql_queue[i])); - } - - l = strlen(line); - switch_snprintf(line + l, sizeof(line) - l, "%d\n", iterations); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line); - - } - if (switch_cache_db_persistant_execute_trans_full(qm->event_db, sqlbuf, 1, - qm->pre_trans_execute, - qm->post_trans_execute, - qm->inner_pre_trans_execute, - qm->inner_post_trans_execute - ) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread unable to commit transaction, records lost!\n", qm->name); - } - if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s DONE\n", qm->name); - } - - iterations = 0; - trans = 0; - len = 0; - *sqlbuf = '\0'; - lc = 0; - if (do_sleep) { - switch_yield(200000); - } else { - switch_yield(1000); - } - wrote = 1; - } - - lc = qm_ttl(qm); - - switch_mutex_lock(qm->mutex); - for (i = 0; i < qm->numq; i++) { - qm->written[i] += qm->pre_written[i]; - qm->pre_written[i] = 0; - } - switch_mutex_unlock(qm->mutex); - if (!lc) { switch_thread_cond_wait(qm->cond, qm->cond_mutex); - } else if (wrote) { - if (lc > 2000) { - do_sleep = 0; - } else { - do_sleep = 1; - } + } else if (lc < 2000) { + switch_yield(200000); } } switch_mutex_unlock(qm->cond_mutex); for(i = 0; i < qm->numq; i++) { - while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) { - if (pop) { - switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL); - free(pop); - } - } + do_flush(qm->sql_queue[i], qm->event_db); } - free(sqlbuf); - qm->thread_running = 0; switch_cache_db_release_db_handle(&qm->event_db); - + return NULL; } -static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj) -{ - void *pop = NULL; - uint32_t iterations = 0; - uint8_t trans = 0; - uint32_t target = 20000; - switch_size_t len = 0, sql_len = runtime.sql_buffer_len; - char *tmp, *sqlbuf = (char *) malloc(sql_len); - char *sql = NULL, *save_sql = NULL; - switch_size_t newlen; - int lc = 0, wrote = 0, do_sleep = 1; - uint32_t sanity = 120; - int auto_pause = 0; - - switch_assert(sqlbuf); - - while (!sql_manager.event_db) { - if (switch_core_db_handle(&sql_manager.event_db) == SWITCH_STATUS_SUCCESS && sql_manager.event_db) - break; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error getting core db, Retrying\n"); - switch_yield(500000); - sanity--; - } - - if (!sql_manager.event_db) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error getting core db Disabling core sql functionality\n"); - return NULL; - } - - sql_manager.thread_running = 1; - - switch_mutex_lock(sql_manager.cond_mutex); - - switch (sql_manager.event_db->type) { - case SCDB_TYPE_PGSQL: - break; - case SCDB_TYPE_ODBC: - break; - case SCDB_TYPE_CORE_DB: - { - switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA synchronous=OFF;", NULL); - switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA count_changes=OFF;", NULL); - switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA temp_store=MEMORY;", NULL); - switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA journal_mode=OFF;", NULL); - } - break; - } - - while (sql_manager.thread_running == 1) { - if (save_sql || - switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || - switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS || - switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS || - switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS - ) { - - if (save_sql) { - sql = save_sql; - save_sql = NULL; - } else if ((sql = (char *) pop)) { - pop = NULL; - } - - if (sql) { - newlen = strlen(sql) + 2; - - if (iterations == 0) { - trans = 1; - } - - if (len + newlen + 1 > sql_len) { - int new_mlen = len + newlen + 10240; - - if (new_mlen < runtime.max_sql_buffer_len) { - sql_len = new_mlen; - if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "REALLOC %ld %d %d\n", (long int)sql_len, switch_queue_size(sql_manager.sql_queue[0]), - switch_queue_size(sql_manager.sql_queue[1])); - } - if (!(tmp = realloc(sqlbuf, sql_len))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n"); - abort(); - break; - } - sqlbuf = tmp; - } else { - if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "SAVE %d %d %d %d\n", - switch_queue_size(sql_manager.sql_queue[0]), - switch_queue_size(sql_manager.sql_queue[1]), - switch_queue_size(sql_manager.sql_queue[2]), - switch_queue_size(sql_manager.sql_queue[3]) - ); - } - save_sql = sql; - sql = NULL; - lc = 0; - goto skip; - } - } - - iterations++; - sprintf(sqlbuf + len, "%s;\n", sql); - len += newlen; - free(sql); - sql = NULL; - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n"); - break; - } - } - - lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) + - switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]); - - - if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) { - if (!auto_pause) { - auto_pause = 1; - switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause); - auto_pause = 1; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue overflowing [%d], Pausing calls.\n", lc); - } - } else { - if (auto_pause && lc < 1000) { - auto_pause = 0; - switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause); - auto_pause = 0; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue back to normal size, resuming..\n"); - } - } - - skip: - - wrote = 0; - - if (trans && iterations && (iterations > target || !lc)) { - if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "RUN %d %d %d %d %d\n", - switch_queue_size(sql_manager.sql_queue[0]), - switch_queue_size(sql_manager.sql_queue[1]), - switch_queue_size(sql_manager.sql_queue[2]), - switch_queue_size(sql_manager.sql_queue[3]), - iterations); - } - if (switch_cache_db_persistant_execute_trans_full(sql_manager.event_db, sqlbuf, 1, - runtime.core_db_pre_trans_execute, - runtime.core_db_post_trans_execute, - runtime.core_db_inner_pre_trans_execute, - runtime.core_db_inner_post_trans_execute - ) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n"); - } - if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "DONE\n"); - } - - - iterations = 0; - trans = 0; - len = 0; - *sqlbuf = '\0'; - lc = 0; - if (do_sleep) { - switch_yield(200000); - } else { - switch_yield(1000); - } - wrote = 1; - } - - lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) + - switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]); - - if (!lc) { - switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex); - } else if (wrote) { - if (lc > 2000) { - do_sleep = 0; - } else { - do_sleep = 1; - } - } - - - } - - switch_mutex_unlock(sql_manager.cond_mutex); - - while (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS) { - free(pop); - } - - while (switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { - free(pop); - } - - while (switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS) { - free(pop); - } - - while (switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS) { - free(pop); - } - - free(sqlbuf); - - sql_manager.thread_running = 0; - - switch_cache_db_release_db_handle(&sql_manager.event_db); - - return NULL; -} static char *parse_presence_data_cols(switch_event_t *event) { @@ -2388,12 +2128,11 @@ static void core_event_handler(switch_event_t *event) for (i = 0; i < sql_idx; i++) { if (switch_stristr("update channels", sql[i]) || switch_stristr("delete from channels", sql[i])) { - switch_queue_push(sql_manager.sql_queue[1], sql[i]); + switch_sql_queue_manager_push(sql_manager.qm, sql[i], 1, SWITCH_FALSE); } else { - switch_queue_push(sql_manager.sql_queue[0], sql[i]); + switch_sql_queue_manager_push(sql_manager.qm, sql[i], 0, SWITCH_FALSE); } sql[i] = NULL; - wake_thread(0); } } } @@ -2770,6 +2509,11 @@ SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const c switch_cache_db_handle_t *dbh; int r = 0; + if (!sql_manager.manage) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "DATABASE NOT AVAIALBLE, REVCOVERY NOT POSSIBLE\n"); + return 0; + } + if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n"); return 0; @@ -2839,16 +2583,21 @@ SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const c SWITCH_DECLARE(switch_cache_db_handle_type_t) switch_core_dbtype(void) { - return sql_manager.event_db->type; + return sql_manager.qm ? sql_manager.qm->event_db->type : SCDB_TYPE_CORE_DB; } SWITCH_DECLARE(void) switch_core_sql_exec(const char *sql) { + if (!sql_manager.manage) { + return; + } + if (!switch_test_flag((&runtime), SCF_USE_SQL)) { return; } - switch_queue_push(sql_manager.sql_queue[3], strdup(sql)); + + switch_sql_queue_manager_push(sql_manager.qm, sql, 3, SWITCH_TRUE); } SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session, switch_bool_t force) @@ -2856,6 +2605,10 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session char *sql = NULL; switch_channel_t *channel = switch_core_session_get_channel(session); + if (!sql_manager.manage) { + return; + } + if (!switch_channel_test_flag(channel, CF_ANSWERED) || switch_channel_get_state(channel) < CS_SOFT_EXECUTE) { return; } @@ -2878,7 +2631,7 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session switch_core_get_uuid(), switch_core_session_get_uuid(session)); } - switch_queue_push(sql_manager.sql_queue[3], sql); + switch_sql_queue_manager_push(sql_manager.qm, sql, 3, SWITCH_FALSE); switch_channel_clear_flag(channel, CF_TRACKED); } @@ -2894,6 +2647,9 @@ SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session) const char *profile_name; const char *technology; + if (!sql_manager.manage) { + return; + } if (!switch_channel_test_flag(channel, CF_ANSWERED) || switch_channel_get_state(channel) < CS_SOFT_EXECUTE) { return; @@ -2921,7 +2677,7 @@ SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session) switch_str_nil(profile_name), switch_core_get_hostname(), switch_core_session_get_uuid(session), xml_cdr_text); } - switch_queue_push(sql_manager.sql_queue[2], sql); + switch_sql_queue_manager_push(sql_manager.qm, sql, 2, SWITCH_FALSE); free(xml_cdr_text); switch_channel_set_flag(channel, CF_TRACKED); @@ -2950,7 +2706,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, c user, realm, switch_core_get_switchname()); } - switch_queue_push(sql_manager.sql_queue[0], sql); + switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE); if ( !zstr(metadata) ) { sql = switch_mprintf("insert into registrations (reg_user,realm,token,url,expires,network_ip,network_port,network_proto,hostname,metadata) " @@ -2982,7 +2738,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, c } - switch_queue_push(sql_manager.sql_queue[0], sql); + switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE); return SWITCH_STATUS_SUCCESS; } @@ -3002,7 +2758,8 @@ SWITCH_DECLARE(switch_status_t) switch_core_del_registration(const char *user, c sql = switch_mprintf("delete from registrations where reg_user='%q' and realm='%q' and hostname='%q'", user, realm, switch_core_get_switchname()); } - switch_queue_push(sql_manager.sql_queue[0], sql); + switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE); + return SWITCH_STATUS_SUCCESS; } @@ -3025,7 +2782,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_expire_registration(int force) sql = switch_mprintf("delete from registrations where expires > 0 and expires <= %ld and hostname='%q'", now, switch_core_get_switchname()); } - switch_queue_push(sql_manager.sql_queue[0], sql); + switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE); return SWITCH_STATUS_SUCCESS; @@ -3034,20 +2791,14 @@ SWITCH_DECLARE(switch_status_t) switch_core_expire_registration(int force) switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_t manage) { switch_threadattr_t *thd_attr; - uint32_t sanity = 400; sql_manager.memory_pool = pool; sql_manager.manage = manage; switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); - switch_mutex_init(&sql_manager.cond_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); switch_mutex_init(&sql_manager.ctl_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); - switch_thread_cond_create(&sql_manager.cond, sql_manager.memory_pool); - - - if (!sql_manager.manage) goto skip; top: @@ -3117,6 +2868,15 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ switch_cache_db_test_reactive(sql_manager.dbh, "select metadata from registrations", NULL, "ALTER TABLE registrations ADD COLUMN metadata VARCHAR(256)"); + switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from recovery", "DROP TABLE recovery", recovery_sql); + switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery1 on recovery(technology)", NULL); + switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery2 on recovery(profile_name)", NULL); + switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery3 on recovery(uuid)", NULL); + switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery3 on recovery(runtime_uuid)", NULL); + + + + switch (sql_manager.dbh->type) { case SCDB_TYPE_PGSQL: case SCDB_TYPE_ODBC: @@ -3227,126 +2987,76 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ switch_event_bind("core_db", SWITCH_EVENT_NAT, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL); #endif - switch_queue_create(&sql_manager.sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); - switch_queue_create(&sql_manager.sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); - switch_queue_create(&sql_manager.sql_queue[2], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); - switch_queue_create(&sql_manager.sql_queue[3], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); - switch_threadattr_create(&thd_attr, sql_manager.memory_pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME); switch_core_sqldb_start_thread(); switch_thread_create(&sql_manager.db_thread, thd_attr, switch_core_sql_db_thread, NULL, sql_manager.memory_pool); - while (sql_manager.manage && !sql_manager.thread_running && --sanity) { - switch_yield(10000); - } } return SWITCH_STATUS_SUCCESS; } - -SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void) +SWITCH_DECLARE(void) switch_core_sqldb_pause(void) { - switch_mutex_lock(sql_manager.ctl_mutex); - if (sql_manager.thread && sql_manager.thread_running) { - switch_status_t st; - - if (sql_manager.manage) { - switch_queue_push(sql_manager.sql_queue[0], NULL); - switch_queue_push(sql_manager.sql_queue[1], NULL); - switch_queue_push(sql_manager.sql_queue[2], NULL); - switch_queue_push(sql_manager.sql_queue[3], NULL); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n"); - wake_thread(0); - sql_manager.thread_running = -1; - switch_thread_join(&st, sql_manager.thread); - sql_manager.thread = NULL; - switch_cache_db_release_db_handle(&sql_manager.dbh); - sql_manager.dbh = NULL; - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n"); - } - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL thread is not running\n"); + if (sql_manager.paused) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL is already paused.\n"); } - switch_mutex_unlock(sql_manager.ctl_mutex); + sql_manager.paused = 1; } -SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void) +SWITCH_DECLARE(void) switch_core_sqldb_resume(void) { - switch_cache_db_handle_t *dbh; - - switch_mutex_lock(sql_manager.ctl_mutex); - - if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n"); - - if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) { - int arg = 1; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC OR PGSQL IS REQUIRED!\n"); - switch_core_session_ctl(SCSC_SHUTDOWN_NOW, &arg); - } - - - } else { - switch_cache_db_test_reactive(dbh, "select hostname from recovery", "DROP TABLE recovery", recovery_sql); - switch_cache_db_execute_sql(dbh, "create index recovery1 on recovery(technology)", NULL); - switch_cache_db_execute_sql(dbh, "create index recovery2 on recovery(profile_name)", NULL); - switch_cache_db_execute_sql(dbh, "create index recovery3 on recovery(uuid)", NULL); - switch_cache_db_execute_sql(dbh, "create index recovery3 on recovery(runtime_uuid)", NULL); - switch_cache_db_release_db_handle(&dbh); + if (!sql_manager.paused) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL is already running.\n"); } + sql_manager.paused = 0; +} +static void switch_core_sqldb_stop_thread(void) +{ + switch_mutex_lock(sql_manager.ctl_mutex); if (sql_manager.manage) { - - top: - - if (!sql_manager.dbh) { - /* Activate SQL database */ - if (switch_core_db_handle(&sql_manager.dbh) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n"); - - if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC OR PGSQL IS REQUIRED!\n"); - goto end; - } - - if (runtime.odbc_dsn) { - runtime.odbc_dsn = NULL; - runtime.odbc_dbtype = DBTYPE_DEFAULT; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Falling back to core_db.\n"); - sql_manager.dbh = NULL; - goto top; - } - - - switch_clear_flag((&runtime), SCF_USE_SQL); - goto end; - } - - switch_cache_db_execute_sql(sql_manager.dbh, "delete from channels", NULL); - switch_cache_db_execute_sql(sql_manager.dbh, "delete from calls", NULL); - } - - - if (!sql_manager.thread) { - switch_threadattr_t *thd_attr; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting SQL thread.\n"); - switch_threadattr_create(&thd_attr, sql_manager.memory_pool); - switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME); - switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool); - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL thread is already running\n"); + if (sql_manager.qm) { + switch_sql_queue_manager_destroy(&sql_manager.qm); } } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n"); } + + switch_mutex_unlock(sql_manager.ctl_mutex); +} - end: +static void switch_core_sqldb_start_thread(void) +{ + switch_mutex_lock(sql_manager.ctl_mutex); + if (sql_manager.manage) { + if (!sql_manager.qm) { + char *dbname = runtime.odbc_dsn; + + if (zstr(dbname)) { + dbname = runtime.dbname; + if (zstr(dbname)) { + dbname = "core"; + } + } + + switch_sql_queue_manager_init_name("CORE", + &sql_manager.qm, + 4, + dbname, + runtime.core_db_pre_trans_execute, + runtime.core_db_post_trans_execute, + runtime.core_db_inner_pre_trans_execute, + runtime.core_db_inner_post_trans_execute); + + } + switch_sql_queue_manager_start(sql_manager.qm); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n"); + } switch_mutex_unlock(sql_manager.ctl_mutex); }