add nitrus boost to sql thread

This commit is contained in:
Anthony Minessale 2010-09-14 16:19:03 -05:00
parent 8ea90fb17a
commit ef79535c45
4 changed files with 116 additions and 58 deletions

View File

@ -233,6 +233,8 @@ struct switch_runtime {
switch_profile_timer_t *profile_timer; switch_profile_timer_t *profile_timer;
double profile_time; double profile_time;
double min_idle_time; double min_idle_time;
int sql_buffer_len;
int max_sql_buffer_len;
}; };
extern struct switch_runtime runtime; extern struct switch_runtime runtime;

View File

@ -1225,7 +1225,7 @@ static void sofia_perform_profile_start_failure(sofia_profile_t *profile, char *
#define sofia_profile_start_failure(p, xp) sofia_perform_profile_start_failure(p, xp, __FILE__, __LINE__) #define sofia_profile_start_failure(p, xp) sofia_perform_profile_start_failure(p, xp, __FILE__, __LINE__)
#define SQLLEN 1024 * 32 #define SQLLEN 1024 * 1024
void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread, void *obj) void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread, void *obj)
{ {
sofia_profile_t *profile = (sofia_profile_t *) obj; sofia_profile_t *profile = (sofia_profile_t *) obj;
@ -1235,8 +1235,8 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
uint32_t qsize; uint32_t qsize;
void *pop; void *pop;
int loop_count = 0; int loop_count = 0;
switch_size_t sql_len = SQLLEN; switch_size_t sql_len = 1024 * 32;
char *sqlbuf = NULL; char *tmp, *sqlbuf = NULL;
char *sql = NULL; char *sql = NULL;
if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) { if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
@ -1254,33 +1254,43 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || qsize) { while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || qsize) {
if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) { if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
if ((qsize > 0 && (qsize >= 1024 || ++loop_count >= profile->trans_timeout)) || sql) { if (qsize > 0 && (qsize >= 1024 || ++loop_count >= profile->trans_timeout)) {
switch_size_t newlen; switch_size_t newlen;
uint32_t itterations = 0; uint32_t itterations = 0;
switch_size_t len = 0; switch_size_t len = 0;
switch_mutex_lock(profile->ireg_mutex); switch_mutex_lock(profile->ireg_mutex);
//sofia_glue_actually_execute_sql(profile, "begin;\n", NULL);
while (sql || (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop)) { while (sql || (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop)) {
if (!sql) sql = (char *) pop;
if (!sql) {
sql = (char *) pop;
}
newlen = strlen(sql) + 2; newlen = strlen(sql) + 2;
itterations++; itterations++;
if (len + newlen + 10 < sql_len) { if (len + newlen + 10 > sql_len) {
sprintf(sqlbuf + len, "%s;\n", sql); int new_mlen = len + newlen + 10 + 10240;
len += newlen;
switch_safe_free(sql); if (new_mlen < SQLLEN) {
} else { sql_len = new_mlen;
break;
if (!(tmp = realloc(sqlbuf, sql_len))) {
abort();
break;
}
sqlbuf = tmp;
} else {
goto skip;
}
} }
sprintf(sqlbuf + len, "%s;\n", sql);
len += newlen;
free(sql);
sql = NULL;
} }
skip:
//printf("TRANS:\n%s\n", sqlbuf); //printf("TRANS:\n%s\n", sqlbuf);
sofia_glue_actually_execute_sql_trans(profile, sqlbuf, NULL); sofia_glue_actually_execute_sql_trans(profile, sqlbuf, NULL);
//sofia_glue_actually_execute_sql(profile, "commit;\n", NULL); //sofia_glue_actually_execute_sql(profile, "commit;\n", NULL);

View File

@ -1222,7 +1222,8 @@ SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switc
} }
runtime.runlevel++; runtime.runlevel++;
runtime.sql_buffer_len = 1024 * 32;
runtime.max_sql_buffer_len = 1024 * 1024;
runtime.dummy_cng_frame.data = runtime.dummy_data; runtime.dummy_cng_frame.data = runtime.dummy_data;
runtime.dummy_cng_frame.datalen = sizeof(runtime.dummy_data); runtime.dummy_cng_frame.datalen = sizeof(runtime.dummy_data);
runtime.dummy_cng_frame.buflen = sizeof(runtime.dummy_data); runtime.dummy_cng_frame.buflen = sizeof(runtime.dummy_data);
@ -1440,6 +1441,37 @@ static void switch_load_core_config(const char *file)
if (tmp > -1 && tmp < 11) { if (tmp > -1 && tmp < 11) {
switch_core_session_ctl(SCSC_DEBUG_LEVEL, &tmp); switch_core_session_ctl(SCSC_DEBUG_LEVEL, &tmp);
} }
} else if (!strcasecmp(var, "sql-buffer-len")) {
int tmp = atoi(val);
if (end_of(val) == 'k') {
tmp *= 1024;
} else if (end_of(val) == 'm') {
tmp *= (1024 * 1024);
}
if (tmp >= 32000 && tmp < 10500000) {
runtime.sql_buffer_len = tmp;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "sql-buffer-len: Value is not within rage 32k to 10m\n");
}
} else if (!strcasecmp(var, "max-sql-buffer-len")) {
int tmp = atoi(val);
if (end_of(val) == 'k') {
tmp *= 1024;
} else if (end_of(val) == 'm') {
tmp *= (1024 * 1024);
}
if (tmp < runtime.sql_buffer_len) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Value is not larger than sql-buffer-len\n");
} else if (tmp >= 32000 && tmp < 10500000) {
runtime.sql_buffer_len = tmp;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "max-sql-buffer-len: Value is not within rage 32k to 10m\n");
}
} else if (!strcasecmp(var, "auto-create-schemas")) { } else if (!strcasecmp(var, "auto-create-schemas")) {
if (switch_true(val)) { if (switch_true(val)) {
switch_set_flag((&runtime), SCF_AUTO_SCHEMAS); switch_set_flag((&runtime), SCF_AUTO_SCHEMAS);

View File

@ -35,8 +35,6 @@
#include <switch.h> #include <switch.h>
#include "private/switch_core_pvt.h" #include "private/switch_core_pvt.h"
#define SQLLEN 32768
static struct { static struct {
switch_cache_db_handle_t *event_db; switch_cache_db_handle_t *event_db;
switch_queue_t *sql_queue[2]; switch_queue_t *sql_queue[2];
@ -48,6 +46,8 @@ static struct {
switch_mutex_t *io_mutex; switch_mutex_t *io_mutex;
switch_mutex_t *dbh_mutex; switch_mutex_t *dbh_mutex;
switch_hash_t *dbh_hash; switch_hash_t *dbh_hash;
switch_thread_cond_t *cond;
switch_mutex_t *cond_mutex;
} sql_manager; } sql_manager;
@ -539,7 +539,7 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand
switch (dbh->type) { switch (dbh->type) {
default: default:
{ {
status = switch_cache_db_execute_sql_chunked(dbh, (char *) sql, SQLLEN, err); status = switch_cache_db_execute_sql_chunked(dbh, (char *) sql, 32768, err);
} }
break; break;
} }
@ -850,19 +850,18 @@ SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_hand
static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj) static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj)
{ {
void *pop = NULL; void *pop;
uint32_t itterations = 0; uint32_t itterations = 0;
uint8_t trans = 0, nothing_in_queue = 0; uint8_t trans = 0;
uint32_t target = 100000; uint32_t target = 20000;
switch_size_t len = 0, sql_len = SQLLEN; switch_size_t len = 0, sql_len = runtime.sql_buffer_len;
char *sqlbuf = (char *) malloc(sql_len); char *tmp, *sqlbuf = (char *) malloc(sql_len);
char *sql = NULL; char *sql = NULL;
switch_size_t newlen; switch_size_t newlen;
int lc = 0; int lc = 0;
uint32_t loops = 0, sec = 0; uint32_t loops = 0, sec = 0;
uint32_t l1 = 1000; uint32_t l1 = 1000;
uint32_t sanity = 120; uint32_t sanity = 120;
int item_remained = 0;
switch_assert(sqlbuf); switch_assert(sqlbuf);
@ -887,6 +886,9 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
sql_manager.thread_running = 1; sql_manager.thread_running = 1;
while (sql_manager.thread_running == 1) { while (sql_manager.thread_running == 1) {
switch_mutex_lock(sql_manager.cond_mutex);
if (++loops == l1) { if (++loops == l1) {
if (++sec == SQL_CACHE_TIMEOUT) { if (++sec == SQL_CACHE_TIMEOUT) {
sql_close(switch_epoch_time_now(NULL)); sql_close(switch_epoch_time_now(NULL));
@ -900,16 +902,10 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
continue; continue;
} }
//printf("SIZE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1])); if (sql || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
if (item_remained || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
if (item_remained) { if (!sql) sql = (char *) pop;
item_remained = 0;
} else {
sql = (char *) pop;
}
if (sql) { if (sql) {
newlen = strlen(sql) + 2; newlen = strlen(sql) + 2;
@ -918,45 +914,59 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
trans = 1; trans = 1;
} }
/* ignore abnormally large strings sql strings as potential buffer overflow */ if (len + newlen > sql_len) {
if (newlen < SQLLEN) { int new_mlen = len + newlen + 10240;
itterations++;
if (len + newlen < sql_len) { if (new_mlen < runtime.max_sql_buffer_len) {
sprintf(sqlbuf + len, "%s;\n", sql); sql_len = new_mlen;
len += newlen; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10,
"REALLOC %ld %d %d\n", sql_len, switch_queue_size(sql_manager.sql_queue[0]),
switch_queue_size(sql_manager.sql_queue[1]));
if (!(tmp = realloc(sqlbuf, sql_len))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
abort();
break;
}
sqlbuf = tmp;
} else { } else {
item_remained = 1; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10,
"SAVE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]));
goto skip;
} }
} }
if (!item_remained) { itterations++;
free(sql); sprintf(sqlbuf + len, "%s;\n", sql);
} len += newlen;
free(sql);
sql = NULL;
} else { } else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
break; break;
} }
} else {
nothing_in_queue = 1;
} }
skip:
if ((item_remained || (trans && ((itterations == target) || (nothing_in_queue && ++lc >= 500)))) && lc = sql ? 1 : 0 + switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]);
(sql_manager.event_db->native_handle.core_db_dbh)) {
if (trans && itterations && (itterations > target || !lc)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10,
"RUN %d %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]), itterations);
if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) { if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "DONE\n");
itterations = 0; itterations = 0;
trans = 0; trans = 0;
nothing_in_queue = 0;
len = 0; len = 0;
*sqlbuf = '\0'; *sqlbuf = '\0';
lc = 0; lc = 0;
switch_yield(400000);
} }
if (nothing_in_queue) { if (!lc) {
switch_cond_next(); switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex);
} }
} }
@ -1389,6 +1399,7 @@ static void core_event_handler(switch_event_t *event)
switch_queue_push(sql_manager.sql_queue[0], sql[i]); switch_queue_push(sql_manager.sql_queue[0], sql[i]);
} }
sql[i] = NULL; sql[i] = NULL;
switch_thread_cond_broadcast(sql_manager.cond);
} }
} }
} }
@ -1511,6 +1522,9 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.cond_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_thread_cond_create(&sql_manager.cond, sql_manager.memory_pool);
switch_core_hash_init(&sql_manager.dbh_hash, sql_manager.memory_pool); switch_core_hash_init(&sql_manager.dbh_hash, sql_manager.memory_pool);