From efef505e26a720f00f545c69395ca6798b7ad46a Mon Sep 17 00:00:00 2001
From: Anthony Minessale <anthm@freeswitch.org>
Date: Wed, 12 Mar 2014 11:42:37 -0500
Subject: [PATCH] add switch_sql_queue_manager_pause and
 switch_sql_queue_manager_resume

---
 src/include/switch_core.h |  4 ++-
 src/switch_core_sqldb.c   | 70 ++++++++++++++++++++++++++++-----------
 2 files changed, 54 insertions(+), 20 deletions(-)

diff --git a/src/include/switch_core.h b/src/include/switch_core.h
index 564694dbac..01f4a87111 100644
--- a/src/include/switch_core.h
+++ b/src/include/switch_core.h
@@ -1355,7 +1355,6 @@ SWITCH_DECLARE(switch_status_t) switch_core_hash_init_case(_Out_ switch_hash_t *
 #define switch_core_hash_init_nocase(_hash) switch_core_hash_init_case(_hash, SWITCH_FALSE)
 
 
-
 /*! 
   \brief Destroy an existing hash table
   \param hash the hash to destroy
@@ -2541,6 +2540,9 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session
 SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session);
 SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const char *profile_name);
 
+SWITCH_DECLARE(void) switch_sql_queue_manager_pause(switch_sql_queue_manager_t *qm, switch_bool_t flush);
+SWITCH_DECLARE(void) switch_sql_queue_manager_resume(switch_sql_queue_manager_t *qm);
+
 SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index);
 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup);
 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup);
diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c
index 054cfad2f5..0195e4ac6e 100644
--- a/src/switch_core_sqldb.c
+++ b/src/switch_core_sqldb.c
@@ -1266,6 +1266,7 @@ struct switch_sql_queue_manager {
 	switch_memory_pool_t *pool;
 	uint32_t max_trans;
 	uint32_t confirm;
+	uint8_t paused;
 };
 
 static int qm_wake(switch_sql_queue_manager_t *qm)
@@ -1400,6 +1401,52 @@ SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_event_callback(switch_s
 	}
 }
 
+
+static void do_flush(switch_sql_queue_manager_t *qm, int i, switch_cache_db_handle_t *dbh)
+{
+	void *pop = NULL;
+	switch_queue_t *q = qm->sql_queue[i];
+
+	switch_mutex_lock(qm->mutex);
+	while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) {
+		if (pop) {
+			if (dbh) {
+				switch_cache_db_execute_sql(dbh, (char *) pop, NULL);
+			}
+			free(pop);
+		}
+	}
+	switch_mutex_unlock(qm->mutex);
+
+}
+
+
+SWITCH_DECLARE(void) switch_sql_queue_manager_resume(switch_sql_queue_manager_t *qm)
+{
+	switch_mutex_lock(qm->mutex);
+	qm->paused = 0;
+	switch_mutex_unlock(qm->mutex);
+
+	qm_wake(qm);
+
+}
+
+SWITCH_DECLARE(void) switch_sql_queue_manager_pause(switch_sql_queue_manager_t *qm, switch_bool_t flush)
+{
+	uint32_t i;
+
+	switch_mutex_lock(qm->mutex);
+	qm->paused = 1;
+	switch_mutex_unlock(qm->mutex);
+
+	if (flush) {
+		for(i = 0; i < qm->numq; i++) {
+			do_flush(qm, i, NULL);
+		}
+	}
+
+}
+
 SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index)
 {
 	int size = 0;
@@ -1462,25 +1509,6 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_
 	return SWITCH_STATUS_FALSE;
 }
 
-
-static void do_flush(switch_sql_queue_manager_t *qm, int i, switch_cache_db_handle_t *dbh)
-{
-	void *pop = NULL;
-	switch_queue_t *q = qm->sql_queue[i];
-
-	switch_mutex_lock(qm->mutex);
-	while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) {
-		if (pop) {
-			if (dbh) {
-				switch_cache_db_execute_sql(dbh, (char *) pop, NULL);
-			}
-			free(pop);
-		}
-	}
-	switch_mutex_unlock(qm->mutex);
-
-}
-
 SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp)
 {
 	switch_sql_queue_manager_t *qm;
@@ -1854,6 +1882,10 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
 		uint32_t i, lc;
 		uint32_t written = 0, iterations = 0;
 
+		if (qm->paused) {
+			goto check;
+		}
+
 		if (sql_manager.paused) {
 			for (i = 0; i < qm->numq; i++) {
 				do_flush(qm, i, NULL);