From 14fbb4b9e5b7f4d09d5e6322216234a0730c2036 Mon Sep 17 00:00:00 2001
From: Anthony Minessale <anthony.minessale@gmail.com>
Date: Sat, 5 Dec 2009 18:54:56 +0000
Subject: [PATCH] add io mutex to cache db for optional mutex protection

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@15800 d0543943-73ff-0310-b7d9-9358b9ac24b2
---
 src/include/switch_core.h |   1 +
 src/switch_core_db.c      |   2 +-
 src/switch_core_sqldb.c   | 161 +++++++++++++++++++++++++++-----------
 3 files changed, 116 insertions(+), 48 deletions(-)

diff --git a/src/include/switch_core.h b/src/include/switch_core.h
index 153e582037..3f9eddb584 100644
--- a/src/include/switch_core.h
+++ b/src/include/switch_core.h
@@ -1987,6 +1987,7 @@ typedef struct {
 	switch_cache_db_native_handle_t native_handle;
 	time_t last_used;
 	switch_mutex_t *mutex;
+	switch_mutex_t *io_mutex;
 	switch_memory_pool_t *pool;
 	int32_t flags;
 	unsigned long hash;
diff --git a/src/switch_core_db.c b/src/switch_core_db.c
index d6e3f08bf3..893626b8e1 100644
--- a/src/switch_core_db.c
+++ b/src/switch_core_db.c
@@ -86,7 +86,7 @@ SWITCH_DECLARE(const char *) switch_core_db_errmsg(switch_core_db_t *db)
 SWITCH_DECLARE(int) switch_core_db_exec(switch_core_db_t *db, const char *sql, switch_core_db_callback_func_t callback, void *data, char **errmsg)
 {
 	int ret = 0;
-	int sane = 100;
+	int sane = 300;
 	char *err = NULL;
 
 	while (--sane > 0) {
diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c
index 6c746006fb..3593916979 100644
--- a/src/switch_core_sqldb.c
+++ b/src/switch_core_sqldb.c
@@ -35,28 +35,6 @@
 #include <switch.h>
 #include "private/switch_core_pvt.h"
 
-
-#define SWITCH_CORE_DB "core"
-/*!
-  \brief Open the default system database
-*/
-SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
-{
-	switch_cache_db_connection_options_t options = { {0} };
-
-	if (runtime.odbc_dsn && runtime.odbc_user && runtime.odbc_pass) {
-		options.odbc_options.dsn = runtime.odbc_dsn;
-		options.odbc_options.user = runtime.odbc_user;
-		options.odbc_options.pass = runtime.odbc_pass;
-
-		return _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
-	} else {
-		options.core_db_options.db_path = SWITCH_CORE_DB;
-		return _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
-	}
-}
-
-
 static struct {
 	switch_cache_db_handle_t *event_db;
 	switch_queue_t *sql_queue[2];
@@ -65,10 +43,39 @@ static struct {
 	switch_thread_t *thread;
 	int thread_running;
 	switch_bool_t manage;
+	switch_mutex_t *io_mutex;
+	switch_mutex_t *dbh_mutex;
+	switch_hash_t *dbh_hash;
 } sql_manager;
 
-static switch_mutex_t *dbh_mutex = NULL;
-static switch_hash_t *dbh_hash = NULL;
+
+#define SWITCH_CORE_DB "core"
+/*!
+  \brief Open the default system database
+*/
+SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
+{
+	switch_cache_db_connection_options_t options = { {0} };
+	switch_status_t r;
+	
+	if (runtime.odbc_dsn && runtime.odbc_user && runtime.odbc_pass) {
+		options.odbc_options.dsn = runtime.odbc_dsn;
+		options.odbc_options.user = runtime.odbc_user;
+		options.odbc_options.pass = runtime.odbc_pass;
+
+		r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
+	} else {
+		options.core_db_options.db_path = SWITCH_CORE_DB;
+		r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
+	}
+
+	if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) {
+		(*dbh)->io_mutex = sql_manager.io_mutex;
+	}
+
+	return r;
+}
+
 
 #define SQL_CACHE_TIMEOUT 300
 
@@ -81,11 +88,11 @@ static void sql_close(time_t prune)
 	int locked = 0;
 	char *key;
 	
-	switch_mutex_lock(dbh_mutex);
+	switch_mutex_lock(sql_manager.dbh_mutex);
  top:
 	locked = 0;
 	
-	for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) {
+	for (hi = switch_hash_first(NULL, sql_manager.dbh_hash); hi; hi = switch_hash_next(hi)) {
 		switch_hash_this(hi, &var, NULL, &val);
 		key = (char *) var;
 
@@ -117,7 +124,7 @@ static void sql_close(time_t prune)
 					break;
 				}
 
-				switch_core_hash_delete(dbh_hash, key);
+				switch_core_hash_delete(sql_manager.dbh_hash, key);
 				switch_mutex_unlock(dbh->mutex);
 				switch_core_destroy_memory_pool(&dbh->pool);
 				goto top;
@@ -133,7 +140,7 @@ static void sql_close(time_t prune)
 		goto top;
 	}
 
-	switch_mutex_unlock(dbh_mutex);
+	switch_mutex_unlock(sql_manager.dbh_mutex);
 }
 
 
@@ -149,7 +156,7 @@ SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t
 SWITCH_DECLARE(void) switch_cache_db_destroy_db_handle(switch_cache_db_handle_t **dbh)
 {
 	if (dbh && *dbh) {
-		switch_mutex_lock(dbh_mutex);
+		switch_mutex_lock(sql_manager.dbh_mutex);
 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Deleting DB connection %s\n", (*dbh)->name);
 
 		switch ((*dbh)->type) {
@@ -167,11 +174,11 @@ SWITCH_DECLARE(void) switch_cache_db_destroy_db_handle(switch_cache_db_handle_t
 		}
 
 		
-		switch_core_hash_delete(dbh_hash, (*dbh)->name);		
+		switch_core_hash_delete(sql_manager.dbh_hash, (*dbh)->name);		
 		switch_mutex_unlock((*dbh)->mutex);
 		switch_core_destroy_memory_pool(&(*dbh)->pool);
 		*dbh = NULL;
-		switch_mutex_unlock(dbh_mutex);
+		switch_mutex_unlock(sql_manager.dbh_mutex);
 	}
 }
 
@@ -185,9 +192,9 @@ SWITCH_DECLARE(void) switch_cache_db_detach(void)
 	switch_cache_db_handle_t *dbh = NULL;
 
 	snprintf(thread_str, sizeof(thread_str) - 1, "%lu", (unsigned long)(intptr_t)switch_thread_self());
-	switch_mutex_lock(dbh_mutex);
+	switch_mutex_lock(sql_manager.dbh_mutex);
 
-	for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) {
+	for (hi = switch_hash_first(NULL, sql_manager.dbh_hash); hi; hi = switch_hash_next(hi)) {
 		switch_hash_this(hi, &var, NULL, &val);
 		key = (char *) var;
 		if ((dbh = (switch_cache_db_handle_t *) val)) {
@@ -202,7 +209,7 @@ SWITCH_DECLARE(void) switch_cache_db_detach(void)
 		}
 	}
 
-	switch_mutex_unlock(dbh_mutex);
+	switch_mutex_unlock(sql_manager.dbh_mutex);
 }
 
 SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_handle_t **dbh,
@@ -246,8 +253,8 @@ SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_ha
 	snprintf(thread_str, sizeof(thread_str) - 1, "%s;thread=\"%lu\"", db_str, (unsigned long)(intptr_t)self);
 	snprintf(db_callsite_str, sizeof(db_callsite_str) - 1, "%s:%d", file, line);
 	
-	switch_mutex_lock(dbh_mutex);
-	if ((new_dbh = switch_core_hash_find(dbh_hash, thread_str))) {
+	switch_mutex_lock(sql_manager.dbh_mutex);
+	if ((new_dbh = switch_core_hash_find(sql_manager.dbh_hash, thread_str))) {
 		switch_set_string(new_dbh->last_user, db_callsite_str);
 		switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10, 
 						  "Reuse Cached DB handle %s [%s]\n", thread_str, switch_cache_db_type_name(new_dbh->type));
@@ -260,7 +267,7 @@ SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_ha
 		
 		hash = switch_ci_hashfunc_default(db_str, &hlen);
 
-		for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) {
+		for (hi = switch_hash_first(NULL, sql_manager.dbh_hash); hi; hi = switch_hash_next(hi)) {
 			switch_hash_this(hi, &var, NULL, &val);
 			key = (char *) var;
 			
@@ -337,14 +344,14 @@ SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_ha
 		switch_set_string(new_dbh->creator, db_callsite_str);
 		switch_mutex_lock(new_dbh->mutex);
 
-		switch_core_hash_insert(dbh_hash, new_dbh->name, new_dbh);
+		switch_core_hash_insert(sql_manager.dbh_hash, new_dbh->name, new_dbh);
 	}
 
  end:
 
 	if (new_dbh) new_dbh->last_used = switch_epoch_time_now(NULL);
 
-	switch_mutex_unlock(dbh_mutex);
+	switch_mutex_unlock(sql_manager.dbh_mutex);
 
 	*dbh = new_dbh;
 
@@ -357,6 +364,10 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t
 	switch_status_t status = SWITCH_STATUS_FALSE;
 	char *errmsg = NULL;
 
+	if (dbh->io_mutex) {
+		switch_mutex_lock(dbh->io_mutex);
+	}
+
 	if (err) *err = NULL;
 	
 	switch (dbh->type) {
@@ -386,6 +397,11 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t
 			free(errmsg);
 		}
 	}
+
+
+	if (dbh->io_mutex) {
+		switch_mutex_unlock(dbh->io_mutex);
+	}
 	
 	return status;
 }
@@ -456,6 +472,9 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand
 {
 	switch_status_t status = SWITCH_STATUS_FALSE;
 
+	if (dbh->io_mutex) {
+		switch_mutex_lock(dbh->io_mutex);
+	}
 
 	switch (dbh->type) {
 	default:
@@ -465,6 +484,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand
 		break;
 	}
 
+	if (dbh->io_mutex) {
+		switch_mutex_unlock(dbh->io_mutex);
+	}
+
 	return status;
 
 }
@@ -475,6 +498,10 @@ SWITCH_DECLARE(char *) switch_cache_db_execute_sql2str(switch_cache_db_handle_t
 	switch_status_t status = SWITCH_STATUS_FALSE;
 
 
+	if (dbh->io_mutex) {
+		switch_mutex_lock(dbh->io_mutex);
+	}
+
 	switch (dbh->type) {
 	case SCDB_TYPE_CORE_DB:
 		{
@@ -522,6 +549,10 @@ SWITCH_DECLARE(char *) switch_cache_db_execute_sql2str(switch_cache_db_handle_t
 
  end:
 
+	if (dbh->io_mutex) {
+		switch_mutex_unlock(dbh->io_mutex);
+	}
+
 	return status == SWITCH_STATUS_SUCCESS ? str : NULL;
 
 }
@@ -537,6 +568,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute(switch_cache_
 		retries = 1000;
 	}
 
+	if (dbh->io_mutex) {
+		switch_mutex_lock(dbh->io_mutex);
+	}
+
 	while (retries > 0) {
 		switch_cache_db_execute_sql_real(dbh, sql, &errmsg);
 		if (errmsg) {
@@ -554,6 +589,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute(switch_cache_
 		}
 	}
 
+	if (dbh->io_mutex) {
+		switch_mutex_unlock(dbh->io_mutex);
+	}
+
 	return status;
 }
 
@@ -571,6 +610,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_
 		retries = 1000;
 	}
 
+	if (dbh->io_mutex) {
+		switch_mutex_lock(dbh->io_mutex);
+	}
+
 again:
 
 	while (begin_retries > 0) {
@@ -629,6 +672,11 @@ done:
 
 	switch_cache_db_execute_sql_real(dbh,  "COMMIT", NULL);
 
+	if (dbh->io_mutex) {
+		switch_mutex_unlock(dbh->io_mutex);
+	}
+
+
 	return status;
 }
 
@@ -641,6 +689,9 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach
 
 	if (err) *err = NULL;
 
+	if (dbh->io_mutex) {
+		switch_mutex_lock(dbh->io_mutex);
+	}
 
 
 	switch (dbh->type) {
@@ -661,6 +712,10 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach
 		break;
 	}
 	
+	if (dbh->io_mutex) {
+		switch_mutex_unlock(dbh->io_mutex);
+	}
+
 	return status;
 }
 
@@ -668,6 +723,11 @@ SWITCH_DECLARE(void) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh
 {
 	char *errmsg;
 
+
+	if (dbh->io_mutex) {
+		switch_mutex_lock(dbh->io_mutex);
+	}
+
 	switch (dbh->type) {
 	case SCDB_TYPE_ODBC:
 		{
@@ -707,17 +767,22 @@ SWITCH_DECLARE(void) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh
 		}
 		break;
 	}
+
+
+	if (dbh->io_mutex) {
+		switch_mutex_unlock(dbh->io_mutex);
+	}
 }
 
 
 
-#define SQLLEN 1024 * 64
+#define SQLLEN 1024 * 1024
 static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread, void *obj)
 {
 	void *pop;
 	uint32_t itterations = 0;
 	uint8_t trans = 0, nothing_in_queue = 0;
-	uint32_t target = 50000;
+	uint32_t target = 100000;
 	switch_size_t len = 0, sql_len = SQLLEN;
 	char *tmp, *sqlbuf = (char *) malloc(sql_len);
 	char *sql;
@@ -1175,8 +1240,10 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
 	sql_manager.memory_pool = pool;
 	sql_manager.manage = manage;
 	
-	switch_mutex_init(&dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
-	switch_core_hash_init(&dbh_hash, sql_manager.memory_pool);
+	switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
+	switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
+	
+	switch_core_hash_init(&sql_manager.dbh_hash, sql_manager.memory_pool);
 
  top:
 	
@@ -1329,7 +1396,7 @@ void switch_core_sqldb_stop(void)
 
 	sql_close(0);
 
-	switch_core_hash_destroy(&dbh_hash);
+	switch_core_hash_destroy(&sql_manager.dbh_hash);
 
 }
 
@@ -1347,9 +1414,9 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream)
 	char *pos1 = NULL;
 	char *pos2 = NULL;
 	
-	switch_mutex_lock(dbh_mutex);
+	switch_mutex_lock(sql_manager.dbh_mutex);
 	
-	for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) {
+	for (hi = switch_hash_first(NULL, sql_manager.dbh_hash); hi; hi = switch_hash_next(hi)) {
 		switch_hash_this(hi, &var, NULL, &val);
 		key = (char *) var;
 
@@ -1385,7 +1452,7 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream)
 				dbh->last_user);
 		}
 	}
-	switch_mutex_unlock(dbh_mutex);
+	switch_mutex_unlock(sql_manager.dbh_mutex);
 }
 
 /* For Emacs: