From 9d70d99d892bb6b224c80337265fb875e14408f6 Mon Sep 17 00:00:00 2001 From: Chris Rienzo <chris.rienzo@grasshopper.com> Date: Mon, 9 Dec 2013 11:09:50 -0500 Subject: [PATCH] mod_mongo: use C++ driver pooling, added support for socket timeout --- conf/vanilla/autoload_configs/mongo.conf.xml | 3 +- src/mod/applications/mod_mongo/Makefile | 7 +- .../conf/autoload_configs/mongo.conf.xml | 3 +- src/mod/applications/mod_mongo/mod_mongo.cpp | 158 +++++++------ src/mod/applications/mod_mongo/mod_mongo.h | 75 ------- src/mod/applications/mod_mongo/mongo_conn.cpp | 211 ------------------ 6 files changed, 103 insertions(+), 354 deletions(-) delete mode 100644 src/mod/applications/mod_mongo/mod_mongo.h delete mode 100644 src/mod/applications/mod_mongo/mongo_conn.cpp diff --git a/conf/vanilla/autoload_configs/mongo.conf.xml b/conf/vanilla/autoload_configs/mongo.conf.xml index 8423645e95..72fffafbb7 100644 --- a/conf/vanilla/autoload_configs/mongo.conf.xml +++ b/conf/vanilla/autoload_configs/mongo.conf.xml @@ -7,8 +7,9 @@ foo/server:port,server:port SET --> <param name="connection-string" value="127.0.0.1:27017"/> - <param name="min-connections" value="10"/> <param name="max-connections" value="100"/> + <!-- Timeout in seconds. 0 means no timeout --> + <param name="socket-timeout" value="0"/> <!-- <param name="map" value="function() { emit(this.a, 1); }"/> diff --git a/src/mod/applications/mod_mongo/Makefile b/src/mod/applications/mod_mongo/Makefile index 7a6e1058ab..aeb6cee07a 100644 --- a/src/mod/applications/mod_mongo/Makefile +++ b/src/mod/applications/mod_mongo/Makefile @@ -6,12 +6,13 @@ MONGO_CXX_DRIVER_TARBALL=mongodb-linux-x86_64-$(MONGO_CXX_DRIVER_VERSION)-latest MONGO_CXX_DRIVER_SRC=$(BASE)/libs/mongo-cxx-driver-$(MONGO_CXX_DRIVER_VERSION) LIBMONGOCLIENT_A =$(MONGO_CXX_DRIVER_SRC)/libmongoclient.a -LOCAL_SOURCES=mongo_conn.cpp -LOCAL_OBJS=mongo_conn.o +LOCAL_SOURCES= +LOCAL_OBJS= LOCAL_CFLAGS=-I$(MONGO_CXX_DRIVER_SRC)/src LOCAL_LIBADD=$(LIBMONGOCLIENT_A) -LOCAL_LDFLAGS=-lboost_thread -lboost_filesystem-mt -lboost_system-mt +#LOCAL_LDFLAGS=-lboost_thread -lboost_filesystem-mt -lboost_system-mt +LOCAL_LDFLAGS=-lboost_thread-mt -lboost_filesystem-mt -lboost_system-mt MODDIR=$(shell pwd) diff --git a/src/mod/applications/mod_mongo/conf/autoload_configs/mongo.conf.xml b/src/mod/applications/mod_mongo/conf/autoload_configs/mongo.conf.xml index 8423645e95..72fffafbb7 100644 --- a/src/mod/applications/mod_mongo/conf/autoload_configs/mongo.conf.xml +++ b/src/mod/applications/mod_mongo/conf/autoload_configs/mongo.conf.xml @@ -7,8 +7,9 @@ foo/server:port,server:port SET --> <param name="connection-string" value="127.0.0.1:27017"/> - <param name="min-connections" value="10"/> <param name="max-connections" value="100"/> + <!-- Timeout in seconds. 0 means no timeout --> + <param name="socket-timeout" value="0"/> <!-- <param name="map" value="function() { emit(this.a, 1); }"/> diff --git a/src/mod/applications/mod_mongo/mod_mongo.cpp b/src/mod/applications/mod_mongo/mod_mongo.cpp index 8057277f15..b9d4c82e1b 100644 --- a/src/mod/applications/mod_mongo/mod_mongo.cpp +++ b/src/mod/applications/mod_mongo/mod_mongo.cpp @@ -31,17 +31,20 @@ */ #include <switch.h> -#include "mod_mongo.h" +#include <mongo/client/dbclient.h> + +using namespace mongo; #define DELIMITER ';' #define FIND_ONE_SYNTAX "mongo_find_one ns; query; fields; options" #define MAPREDUCE_SYNTAX "mongo_mapreduce ns; query" static struct { - mongo_connection_pool_t *conn_pool; - char *map; - char *reduce; - char *finalize; + const char *map; + const char *reduce; + const char *finalize; + const char *conn_str; + double socket_timeout; } globals; static int parse_query_options(char *query_options_str) @@ -86,6 +89,7 @@ SWITCH_STANDARD_API(mongo_mapreduce_function) if (!zstr(ns) && !zstr(json_query)) { try { + scoped_ptr<ScopedDbConnection> conn(ScopedDbConnection::getScopedDbConnection(string(globals.conn_str, globals.socket_timeout))); BSONObj query = fromjson(json_query); BSONObj out; BSONObjBuilder cmd; @@ -105,20 +109,19 @@ SWITCH_STANDARD_API(mongo_mapreduce_function) } cmd.append("out", BSON("inline" << 1)); - conn = mongo_connection_pool_get(globals.conn_pool); - if (conn) { - conn->runCommand(nsGetDB(ns), cmd.done(), out); - mongo_connection_pool_put(globals.conn_pool, conn, SWITCH_FALSE); - + try { + conn->get()->runCommand(nsGetDB(ns), cmd.done(), out); stream->write_function(stream, "-OK\n%s\n", out.jsonString().c_str()); - } else { - stream->write_function(stream, "-ERR\nNo connection\n"); + } catch (DBException &e) { + stream->write_function(stream, "-ERR\n%s\n", e.toString().c_str()); + } catch (...) { + stream->write_function(stream, "-ERR\nUnknown exception!\n"); } + conn->done(); } catch (DBException &e) { - if (conn) { - mongo_connection_pool_put(globals.conn_pool, conn, SWITCH_TRUE); - } stream->write_function(stream, "-ERR\n%s\n", e.toString().c_str()); + } catch (...) { + stream->write_function(stream, "-ERR\nUnknown exception!\n"); } } else { stream->write_function(stream, "-ERR\n%s\n", MAPREDUCE_SYNTAX); @@ -134,7 +137,7 @@ SWITCH_STANDARD_API(mongo_find_one_function) switch_status_t status = SWITCH_STATUS_SUCCESS; char *ns = NULL, *json_query = NULL, *json_fields = NULL, *query_options_str = NULL; int query_options = 0; - + ns = strdup(cmd); switch_assert(ns != NULL); @@ -152,29 +155,24 @@ SWITCH_STANDARD_API(mongo_find_one_function) } if (!zstr(ns) && !zstr(json_query) && !zstr(json_fields)) { - - DBClientBase *conn = NULL; - try { + scoped_ptr<ScopedDbConnection> conn(ScopedDbConnection::getScopedDbConnection(string(globals.conn_str), globals.socket_timeout)); BSONObj query = fromjson(json_query); BSONObj fields = fromjson(json_fields); - - conn = mongo_connection_pool_get(globals.conn_pool); - if (conn) { - BSONObj res = conn->findOne(ns, Query(query), &fields, query_options); - mongo_connection_pool_put(globals.conn_pool, conn, SWITCH_FALSE); - + try { + BSONObj res = conn->get()->findOne(ns, Query(query), &fields, query_options); stream->write_function(stream, "-OK\n%s\n", res.jsonString().c_str()); - } else { - stream->write_function(stream, "-ERR\nNo connection\n"); + } catch (DBException &e) { + stream->write_function(stream, "-ERR\n%s\n", e.toString().c_str()); + } catch (...) { + stream->write_function(stream, "-ERR\nUnknown exception!\n"); } + conn->done(); } catch (DBException &e) { - if (conn) { - mongo_connection_pool_put(globals.conn_pool, conn, SWITCH_TRUE); - } stream->write_function(stream, "-ERR\n%s\n", e.toString().c_str()); + } catch (...) { + stream->write_function(stream, "-ERR\nUnknown exception!\n"); } - } else { stream->write_function(stream, "-ERR\n%s\n", FIND_ONE_SYNTAX); } @@ -184,13 +182,18 @@ SWITCH_STANDARD_API(mongo_find_one_function) return status; } -static switch_status_t config(void) +static switch_status_t config(switch_memory_pool_t *pool) { const char *cf = "mongo.conf"; switch_xml_t cfg, xml, settings, param; switch_status_t status = SWITCH_STATUS_SUCCESS; - const char *conn_str = "127.0.0.1"; - switch_size_t min_connections = 1, max_connections = 1; + + /* set defaults */ + globals.map = ""; + globals.reduce = ""; + globals.finalize = ""; + globals.conn_str = ""; + globals.socket_timeout = 0.0; if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf); @@ -201,38 +204,71 @@ static switch_status_t config(void) for (param = switch_xml_child(settings, "param"); param; param = param->next) { char *var = (char *) switch_xml_attr_soft(param, "name"); char *val = (char *) switch_xml_attr_soft(param, "value"); - int tmp; - if (!strcmp(var, "host")) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "'host' is deprecated. use 'connection-string'\n"); - conn_str = val; - } else if (!strcmp(var, "connection-string")) { - conn_str = val; - } else if (!strcmp(var, "min-connections")) { - if ((tmp = atoi(val)) > 0) { - min_connections = tmp; - } - } else if (!strcmp(var, "max-connections")) { - if ((tmp = atoi(val)) > 0) { - max_connections = tmp; + if (!strcmp(var, "connection-string")) { + if (zstr(val)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing connection-string value\n"); + status = SWITCH_STATUS_GENERR; + } else { + try { + string errmsg; + ConnectionString cs = ConnectionString::parse(string(val), errmsg); + if (!cs.isValid()) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "connection-string \"%s\" is not valid: %s\n", val, errmsg.c_str()); + status = SWITCH_STATUS_GENERR; + } else { + globals.conn_str = switch_core_strdup(pool, val); + } + } catch (DBException &e) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "connection-string \"%s\" is not valid: %s\n", val, e.toString().c_str()); + status = SWITCH_STATUS_GENERR; + } catch (...) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "connection-string \"%s\" is not valid\n", val); + status = SWITCH_STATUS_GENERR; + } } } else if (!strcmp(var, "map")) { - globals.map = strdup(val); + if (!zstr(val)) { + globals.map = switch_core_strdup(pool, val); + } } else if (!strcmp(var, "reduce")) { - globals.reduce = strdup(val); + if (!zstr(val)) { + globals.reduce = switch_core_strdup(pool, val); + } } else if (!strcmp(var, "finalize")) { - globals.finalize = strdup(val); + if (!zstr(val)) { + globals.finalize = switch_core_strdup(pool, val); + } + } else if (!strcmp(var, "socket-timeout")) { + if (!zstr(val)) { + if (switch_is_number(val)) { + double timeout = atof(val); + if (timeout >= 0.0) { + globals.socket_timeout = timeout; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "socket-timeout \"%s\" is not valid\n", val); + } + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "socket-timeout \"%s\" is not valid\n", val); + } + } + } else if (!strcmp(var, "max-connections")) { + if (!zstr(val)) { + if (switch_is_number(val)) { + int max_connections = atoi(val); + if (max_connections > 0) { + PoolForHost::setMaxPerHost(max_connections); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "max-connections \"%s\" is not valid\n", val); + } + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "max-connections \"%s\" is not valid\n", val); + } + } } } } - if (mongo_connection_pool_create(&globals.conn_pool, min_connections, max_connections, conn_str) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Can't create connection pool\n"); - status = SWITCH_STATUS_GENERR; - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Mongo connection pool created [%s %d/%d]\n", conn_str, (int)min_connections, (int)max_connections); - } - switch_xml_free(xml); return status; @@ -255,7 +291,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load) memset(&globals, 0, sizeof(globals)); - if (config() != SWITCH_STATUS_SUCCESS) { + if (config(pool) != SWITCH_STATUS_SUCCESS) { return SWITCH_STATUS_TERM; } @@ -267,11 +303,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load) SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mongo_shutdown) { - mongo_connection_pool_destroy(&globals.conn_pool); - switch_safe_free(globals.map); - switch_safe_free(globals.reduce); - switch_safe_free(globals.finalize); - + ScopedDbConnection::clearPool(); return SWITCH_STATUS_SUCCESS; } diff --git a/src/mod/applications/mod_mongo/mod_mongo.h b/src/mod/applications/mod_mongo/mod_mongo.h deleted file mode 100644 index 822dacaf63..0000000000 --- a/src/mod/applications/mod_mongo/mod_mongo.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application - * Copyright (C) 2005-2013, Anthony Minessale II <anthm@freeswitch.org> - * - * Version: MPL 1.1 - * - * The contents of this file are subject to the Mozilla Public License Version - * 1.1 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" basis, - * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License - * for the specific language governing rights and limitations under the - * License. - * - * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application - * - * The Initial Developer of the Original Code is - * Anthony Minessale II <anthm@freeswitch.org> - * Portions created by the Initial Developer are Copyright (C) - * the Initial Developer. All Rights Reserved. - * - * Contributor(s): - * - * Tamas Cseke <cstomi.levlist@gmail.com> - * - * mod_mongo.h -- API for MongoDB - * - */ - -#ifndef MOD_MONGO_H -#define MOD_MONGO_H - -#include <mongo/client/dbclient.h> - -using namespace mongo; - -typedef struct { - char *conn_str; - - switch_size_t min_connections; - switch_size_t max_connections; - switch_size_t size; - switch_queue_t *connections; - switch_mutex_t *mutex; - switch_memory_pool_t *pool; - -} mongo_connection_pool_t; - - -switch_status_t mongo_connection_create(DBClientBase **connection, const char *conn_str); -void mongo_connection_destroy(DBClientBase **conn); - -switch_status_t mongo_connection_pool_create(mongo_connection_pool_t **conn_pool, switch_size_t min_connections, switch_size_t max_connections, - const char *conn_str); -void mongo_connection_pool_destroy(mongo_connection_pool_t **conn_pool); - - -DBClientBase *mongo_connection_pool_get(mongo_connection_pool_t *conn_pool); -switch_status_t mongo_connection_pool_put(mongo_connection_pool_t *conn_pool, DBClientBase *conn, switch_bool_t destroy); - - -#endif - -/* For Emacs: - * Local Variables: - * mode:c - * indent-tabs-mode:t - * tab-width:4 - * c-basic-offset:4 - * End: - * For VIM: - * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet - */ diff --git a/src/mod/applications/mod_mongo/mongo_conn.cpp b/src/mod/applications/mod_mongo/mongo_conn.cpp deleted file mode 100644 index fd095fe54a..0000000000 --- a/src/mod/applications/mod_mongo/mongo_conn.cpp +++ /dev/null @@ -1,211 +0,0 @@ -/* - * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application - * Copyright (C) 2005-2013, Anthony Minessale II <anthm@freeswitch.org> - * - * Version: MPL 1.1 - * - * The contents of this file are subject to the Mozilla Public License Version - * 1.1 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.mozilla.org/MPL/ - * - * Software distributed under the License is distributed on an "AS IS" basis, - * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License - * for the specific language governing rights and limitations under the - * License. - * - * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application - * - * The Initial Developer of the Original Code is - * Anthony Minessale II <anthm@freeswitch.org> - * Portions created by the Initial Developer are Copyright (C) - * the Initial Developer. All Rights Reserved. - * - * Contributor(s): - * - * Tamas Cseke <cstomi.levlist@gmail.com> - * - * mongo_conn.cpp -- MongoDB connection pool - * - */ -#include <switch.h> -#include "mod_mongo.h" - - /* - we could use the driver's connection pool, - if we could set the max connections (PoolForHost::setMaxPerHost) - - ScopedDbConnection scoped_conn("host"); - DBClientConnection *conn = dynamic_cast< DBClientConnection* >(&scoped_conn.conn()); - scoped_conn.done(); - */ - -switch_status_t mongo_connection_create(DBClientBase **connection, const char *conn_str) -{ - DBClientBase *conn = NULL; - string conn_string(conn_str), err_msg; - ConnectionString cs = ConnectionString::parse(conn_string, err_msg); - switch_status_t status = SWITCH_STATUS_FALSE; - - if (!cs.isValid()) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't parse url: %s\n", err_msg.c_str()); - return status; - } - - try { - conn = cs.connect(err_msg); - } catch (DBException &e) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't connect to mongo [%s]: %s\n", conn_str, err_msg.c_str()); - return status; - } - - if (conn) { - *connection = conn; - status = SWITCH_STATUS_SUCCESS; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Connected to mongo [%s]\n", conn_str); - } - - return status; -} - -void mongo_connection_destroy(DBClientBase **conn) -{ - switch_assert(*conn != NULL); - delete *conn; - - *conn = NULL; -} - -switch_status_t mongo_connection_pool_create(mongo_connection_pool_t **conn_pool, switch_size_t min_connections, switch_size_t max_connections, - const char *conn_str) -{ - switch_memory_pool_t *pool = NULL; - switch_status_t status = SWITCH_STATUS_SUCCESS; - mongo_connection_pool_t *cpool = NULL; - DBClientBase *conn = NULL; - - if ((status = switch_core_new_memory_pool(&pool)) != SWITCH_STATUS_SUCCESS) { - return status; - } - - if (!(cpool = (mongo_connection_pool_t *)switch_core_alloc(pool, sizeof(mongo_connection_pool_t)))) { - switch_goto_status(SWITCH_STATUS_MEMERR, done); - } - - if ((status = switch_mutex_init(&cpool->mutex, SWITCH_MUTEX_NESTED, pool)) != SWITCH_STATUS_SUCCESS) { - goto done; - } - - if ((status = switch_queue_create(&cpool->connections, max_connections, pool)) != SWITCH_STATUS_SUCCESS) { - goto done; - } - - cpool->min_connections = min_connections; - cpool->max_connections = max_connections; - cpool->conn_str = switch_core_strdup(pool, conn_str); - cpool->pool = pool; - - for (cpool->size = 0; cpool->size < min_connections; cpool->size++) { - - if (mongo_connection_create(&conn, conn_str) == SWITCH_STATUS_SUCCESS) { - mongo_connection_pool_put(cpool, conn, SWITCH_FALSE); - } else { - break; - } - } - - done: - - if (status == SWITCH_STATUS_SUCCESS) { - *conn_pool = cpool; - } else { - switch_core_destroy_memory_pool(&pool); - } - - return status; -} - -void mongo_connection_pool_destroy(mongo_connection_pool_t **conn_pool) -{ - mongo_connection_pool_t *cpool = *conn_pool; - void *data = NULL; - - switch_assert(cpool != NULL); - - while (switch_queue_trypop(cpool->connections, &data) == SWITCH_STATUS_SUCCESS) { - mongo_connection_destroy((DBClientBase **)&data); - } - - switch_mutex_destroy(cpool->mutex); - switch_core_destroy_memory_pool(&cpool->pool); - - *conn_pool = NULL; -} - - -DBClientBase *mongo_connection_pool_get(mongo_connection_pool_t *conn_pool) -{ - DBClientBase *conn = NULL; - void *data = NULL; - - switch_assert(conn_pool != NULL); - - switch_mutex_lock(conn_pool->mutex); - - if (switch_queue_trypop(conn_pool->connections, &data) == SWITCH_STATUS_SUCCESS) { - conn = (DBClientBase *) data; - } else if (mongo_connection_create(&conn, conn_pool->conn_str) == SWITCH_STATUS_SUCCESS) { - if (++conn_pool->size > conn_pool->max_connections) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Connection pool is empty. You may want to increase 'max-connections'\n"); - } - } - - switch_mutex_unlock(conn_pool->mutex); - -#ifdef MONGO_POOL_DEBUG - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "POOL get: size %d conn: %p\n", (int) switch_queue_size(conn_pool->connections), conn); -#endif - - return conn; -} - -switch_status_t mongo_connection_pool_put(mongo_connection_pool_t *conn_pool, DBClientBase *conn, switch_bool_t destroy) -{ - switch_status_t status = SWITCH_STATUS_SUCCESS; - - switch_assert(conn_pool != NULL); - switch_assert(conn != NULL); - - switch_mutex_lock(conn_pool->mutex); - if (destroy || conn_pool->size > conn_pool->max_connections) { -#ifdef MONGO_POOL_DEBUG - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "POOL: Destroy connection %p\n", conn); -#endif - mongo_connection_destroy(&conn); - conn_pool->size--; - } else { -#ifdef MONGO_POOL_DEBUG - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "POOL: push connection %p\n", conn); -#endif - status = switch_queue_push(conn_pool->connections, conn); - } - - switch_mutex_unlock(conn_pool->mutex); - -#ifdef MONGO_POOL_DEBUG - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "POOL: put size %d conn: %p\n", (int) switch_queue_size(conn_pool->connections), conn); -#endif - - return status; -} - -/* For Emacs: - * Local Variables: - * mode:c - * indent-tabs-mode:t - * tab-width:4 - * c-basic-offset:4 - * End: - * For VIM: - * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet - */