diff --git a/src/include/private/switch_core_pvt.h b/src/include/private/switch_core_pvt.h index 50dde95dc7..e77d5f74ef 100644 --- a/src/include/private/switch_core_pvt.h +++ b/src/include/private/switch_core_pvt.h @@ -233,6 +233,8 @@ struct switch_runtime { switch_profile_timer_t *profile_timer; double profile_time; double min_idle_time; + int sql_buffer_len; + int max_sql_buffer_len; }; extern struct switch_runtime runtime; diff --git a/src/mod/endpoints/mod_sofia/sofia.c b/src/mod/endpoints/mod_sofia/sofia.c index 4d9ce9eea5..2ae5048b06 100644 --- a/src/mod/endpoints/mod_sofia/sofia.c +++ b/src/mod/endpoints/mod_sofia/sofia.c @@ -1225,7 +1225,7 @@ static void sofia_perform_profile_start_failure(sofia_profile_t *profile, char * #define sofia_profile_start_failure(p, xp) sofia_perform_profile_start_failure(p, xp, __FILE__, __LINE__) -#define SQLLEN 1024 * 32 +#define SQLLEN 1024 * 1024 void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread, void *obj) { sofia_profile_t *profile = (sofia_profile_t *) obj; @@ -1235,10 +1235,10 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread uint32_t qsize; void *pop; int loop_count = 0; - switch_size_t sql_len = SQLLEN; - char *sqlbuf = NULL; + switch_size_t sql_len = 1024 * 32; + char *tmp, *sqlbuf = NULL; char *sql = NULL; - + if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) { sqlbuf = (char *) malloc(sql_len); } @@ -1254,33 +1254,43 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || qsize) { if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) { - if ((qsize > 0 && (qsize >= 1024 || ++loop_count >= profile->trans_timeout)) || sql) { + if (qsize > 0 && (qsize >= 1024 || ++loop_count >= profile->trans_timeout)) { switch_size_t newlen; uint32_t itterations = 0; switch_size_t len = 0; switch_mutex_lock(profile->ireg_mutex); - //sofia_glue_actually_execute_sql(profile, "begin;\n", NULL); - while (sql || (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop)) { - - if (!sql) { - sql = (char *) pop; - } - + if (!sql) sql = (char *) pop; + newlen = strlen(sql) + 2; itterations++; - - if (len + newlen + 10 < sql_len) { - sprintf(sqlbuf + len, "%s;\n", sql); - len += newlen; - switch_safe_free(sql); - } else { - break; + + if (len + newlen + 10 > sql_len) { + int new_mlen = len + newlen + 10 + 10240; + + if (new_mlen < SQLLEN) { + sql_len = new_mlen; + + if (!(tmp = realloc(sqlbuf, sql_len))) { + abort(); + break; + } + sqlbuf = tmp; + } else { + goto skip; + } } + + sprintf(sqlbuf + len, "%s;\n", sql); + len += newlen; + free(sql); + sql = NULL; } - + + skip: + //printf("TRANS:\n%s\n", sqlbuf); sofia_glue_actually_execute_sql_trans(profile, sqlbuf, NULL); //sofia_glue_actually_execute_sql(profile, "commit;\n", NULL); diff --git a/src/switch_core.c b/src/switch_core.c index 0a45723870..2c33b341de 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -1222,7 +1222,8 @@ SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switc } runtime.runlevel++; - + runtime.sql_buffer_len = 1024 * 32; + runtime.max_sql_buffer_len = 1024 * 1024; runtime.dummy_cng_frame.data = runtime.dummy_data; runtime.dummy_cng_frame.datalen = sizeof(runtime.dummy_data); runtime.dummy_cng_frame.buflen = sizeof(runtime.dummy_data); @@ -1440,6 +1441,37 @@ static void switch_load_core_config(const char *file) if (tmp > -1 && tmp < 11) { switch_core_session_ctl(SCSC_DEBUG_LEVEL, &tmp); } + } else if (!strcasecmp(var, "sql-buffer-len")) { + int tmp = atoi(val); + + if (end_of(val) == 'k') { + tmp *= 1024; + } else if (end_of(val) == 'm') { + tmp *= (1024 * 1024); + } + + if (tmp >= 32000 && tmp < 10500000) { + runtime.sql_buffer_len = tmp; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "sql-buffer-len: Value is not within rage 32k to 10m\n"); + } + } else if (!strcasecmp(var, "max-sql-buffer-len")) { + int tmp = atoi(val); + + if (end_of(val) == 'k') { + tmp *= 1024; + } else if (end_of(val) == 'm') { + tmp *= (1024 * 1024); + } + + if (tmp < runtime.sql_buffer_len) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Value is not larger than sql-buffer-len\n"); + } else if (tmp >= 32000 && tmp < 10500000) { + runtime.sql_buffer_len = tmp; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "max-sql-buffer-len: Value is not within rage 32k to 10m\n"); + } + } else if (!strcasecmp(var, "auto-create-schemas")) { if (switch_true(val)) { switch_set_flag((&runtime), SCF_AUTO_SCHEMAS); diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index b1815b0f22..dfb4134b9b 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -35,8 +35,6 @@ #include #include "private/switch_core_pvt.h" -#define SQLLEN 32768 - static struct { switch_cache_db_handle_t *event_db; switch_queue_t *sql_queue[2]; @@ -48,6 +46,8 @@ static struct { switch_mutex_t *io_mutex; switch_mutex_t *dbh_mutex; switch_hash_t *dbh_hash; + switch_thread_cond_t *cond; + switch_mutex_t *cond_mutex; } sql_manager; @@ -539,7 +539,7 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand switch (dbh->type) { default: { - status = switch_cache_db_execute_sql_chunked(dbh, (char *) sql, SQLLEN, err); + status = switch_cache_db_execute_sql_chunked(dbh, (char *) sql, 32768, err); } break; } @@ -850,19 +850,18 @@ SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_hand static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj) { - void *pop = NULL; + void *pop; uint32_t itterations = 0; - uint8_t trans = 0, nothing_in_queue = 0; - uint32_t target = 100000; - switch_size_t len = 0, sql_len = SQLLEN; - char *sqlbuf = (char *) malloc(sql_len); + uint8_t trans = 0; + uint32_t target = 20000; + switch_size_t len = 0, sql_len = runtime.sql_buffer_len; + char *tmp, *sqlbuf = (char *) malloc(sql_len); char *sql = NULL; switch_size_t newlen; int lc = 0; uint32_t loops = 0, sec = 0; uint32_t l1 = 1000; uint32_t sanity = 120; - int item_remained = 0; switch_assert(sqlbuf); @@ -887,6 +886,9 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, sql_manager.thread_running = 1; while (sql_manager.thread_running == 1) { + + switch_mutex_lock(sql_manager.cond_mutex); + if (++loops == l1) { if (++sec == SQL_CACHE_TIMEOUT) { sql_close(switch_epoch_time_now(NULL)); @@ -900,17 +902,11 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, continue; } - //printf("SIZE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1])); - - if (item_remained || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || + if (sql || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { - if (item_remained) { - item_remained = 0; - } else { - sql = (char *) pop; - } - + if (!sql) sql = (char *) pop; + if (sql) { newlen = strlen(sql) + 2; @@ -918,45 +914,59 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, trans = 1; } - /* ignore abnormally large strings sql strings as potential buffer overflow */ - if (newlen < SQLLEN) { - itterations++; + if (len + newlen > sql_len) { + int new_mlen = len + newlen + 10240; - if (len + newlen < sql_len) { - sprintf(sqlbuf + len, "%s;\n", sql); - len += newlen; + if (new_mlen < runtime.max_sql_buffer_len) { + sql_len = new_mlen; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, + "REALLOC %ld %d %d\n", sql_len, switch_queue_size(sql_manager.sql_queue[0]), + switch_queue_size(sql_manager.sql_queue[1])); + if (!(tmp = realloc(sqlbuf, sql_len))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n"); + abort(); + break; + } + sqlbuf = tmp; } else { - item_remained = 1; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, + "SAVE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1])); + goto skip; } } - - if (!item_remained) { - free(sql); - } + + itterations++; + sprintf(sqlbuf + len, "%s;\n", sql); + len += newlen; + free(sql); + sql = NULL; } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n"); break; } - } else { - nothing_in_queue = 1; } - - if ((item_remained || (trans && ((itterations == target) || (nothing_in_queue && ++lc >= 500)))) && - (sql_manager.event_db->native_handle.core_db_dbh)) { + skip: + + lc = sql ? 1 : 0 + switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]); + + if (trans && itterations && (itterations > target || !lc)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, + "RUN %d %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]), itterations); if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n"); } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "DONE\n"); itterations = 0; trans = 0; - nothing_in_queue = 0; len = 0; *sqlbuf = '\0'; lc = 0; + switch_yield(400000); } - if (nothing_in_queue) { - switch_cond_next(); + if (!lc) { + switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex); } } @@ -1389,6 +1399,7 @@ static void core_event_handler(switch_event_t *event) switch_queue_push(sql_manager.sql_queue[0], sql[i]); } sql[i] = NULL; + switch_thread_cond_broadcast(sql_manager.cond); } } } @@ -1511,6 +1522,9 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); + switch_mutex_init(&sql_manager.cond_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool); + + switch_thread_cond_create(&sql_manager.cond, sql_manager.memory_pool); switch_core_hash_init(&sql_manager.dbh_hash, sql_manager.memory_pool);