diff --git a/src/mod/applications/mod_mongo/conf/autoload_configs/mongo.conf.xml b/src/mod/applications/mod_mongo/conf/autoload_configs/mongo.conf.xml index 57f9f6566f..041432a232 100644 --- a/src/mod/applications/mod_mongo/conf/autoload_configs/mongo.conf.xml +++ b/src/mod/applications/mod_mongo/conf/autoload_configs/mongo.conf.xml @@ -1,11 +1,17 @@ <configuration name="mongo.conf"> <settings> - <!-- + <!-- connection-string handles different ways to connect to mongo mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]] --> - <param name="connection-string" value="mongodb://127.0.0.1:27017/?connectTimeoutMS=5000"/> - + <!-- connection for queries and limit backend --> + <param name="connection-string" value="mongodb://127.0.0.1:27017/?connecttimetoutms=5000"/> + <!-- connection for limit backend, if different than connection-string (default is value set in connection-string) --> + <!--param name="limit-connection-string" value="mongodb://127.0.0.1:27019/?connecttimeoutms=5000"/--> + <!-- database name for limit backend (default is limit) --> + <!--param name="limit-database" value="limit"/--> + <!-- collection name for limit backend (default is mod_mongo) --> + <!--param name="limit-collection" value="mod_mongo"/--> <!-- <param name="map" value="function() { emit(this.a, 1); }"/> <param name="reduce" value="function(key, values) { return Array.sum(values); }"/> diff --git a/src/mod/applications/mod_mongo/mod_mongo.c b/src/mod/applications/mod_mongo/mod_mongo.c index 0de63fd0b6..a14958dab6 100644 --- a/src/mod/applications/mod_mongo/mod_mongo.c +++ b/src/mod/applications/mod_mongo/mod_mongo.c @@ -26,7 +26,7 @@ * Tamas Cseke <cstomi.levlist@gmail.com> * Christopher Rienzo <crienzo@grasshopper.com> * - * mod_mongo.c -- API for MongoDB + * mod_mongo.c -- API for MongoDB * */ #include <switch.h> @@ -49,17 +49,34 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load); SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mongo_shutdown); -SWITCH_MODULE_DEFINITION(mod_mongo, mod_mongo_load, mod_mongo_shutdown, NULL); +SWITCH_MODULE_RUNTIME_FUNCTION(mod_mongo_runtime); +SWITCH_MODULE_DEFINITION(mod_mongo, mod_mongo_load, mod_mongo_shutdown, mod_mongo_runtime); static struct { + int shutdown; const char *map; const char *reduce; const char *finalize; const char *conn_str; - mongoc_uri_t *uri; mongoc_client_pool_t *client_pool; + const char *limit_database; + const char *limit_collection; + const char *limit_conn_str; + int limit_cleanup_interval_sec; + mongoc_client_pool_t *limit_client_pool; + switch_mutex_t *mod_mongo_private_mutex; + switch_thread_rwlock_t *limit_database_rwlock; + switch_thread_rwlock_t *shutdown_rwlock; } globals; +/** + * resources acquired by this session + */ +struct mod_mongo_private { + switch_hash_t *resources; + switch_mutex_t *mutex; +}; + /** * @param query_options_str * @return query options @@ -94,11 +111,11 @@ static int parse_query_options(char *query_options_str) /** * @return a new connection to mongodb or NULL if error */ -static mongoc_client_t *get_connection(void) +static mongoc_client_t *get_connection(mongoc_client_pool_t *client_pool, const char *conn_str) { - mongoc_client_t *client = mongoc_client_pool_pop(globals.client_pool); + mongoc_client_t *client = mongoc_client_pool_pop(client_pool); if (!client) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to get connection to: %s\n", globals.conn_str); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to get connection to: %s\n", conn_str); return NULL; } /* TODO auth */ @@ -108,12 +125,12 @@ static mongoc_client_t *get_connection(void) /** * Mark connection as finished */ -static void connection_done(mongoc_client_t *conn) +static void connection_done(mongoc_client_pool_t *client_pool, mongoc_client_t *conn) { - mongoc_client_pool_push(globals.client_pool, conn); + mongoc_client_pool_push(client_pool, conn); } -SWITCH_STANDARD_API(mongo_mapreduce_function) +SWITCH_STANDARD_API(mod_mongo_mapreduce_function) { switch_status_t status = SWITCH_STATUS_SUCCESS; char *db = NULL, *collection = NULL, *json_query = NULL; @@ -129,7 +146,7 @@ SWITCH_STANDARD_API(mongo_mapreduce_function) } if (!zstr(db) && !zstr(collection) && !zstr(json_query)) { - mongoc_client_t *conn = get_connection(); + mongoc_client_t *conn = get_connection(globals.client_pool, globals.conn_str); if (conn) { bson_error_t error; bson_t *query = bson_new_from_json((uint8_t *)json_query, strlen(json_query), &error); @@ -172,7 +189,7 @@ SWITCH_STANDARD_API(mongo_mapreduce_function) } else { stream->write_function(stream, "-ERR\nfailed to parse query!\n"); } - connection_done(conn); + connection_done(globals.client_pool, conn); } else { stream->write_function(stream, "-ERR\nfailed to get connection!\n"); } @@ -185,7 +202,7 @@ SWITCH_STANDARD_API(mongo_mapreduce_function) return status; } -SWITCH_STANDARD_API(mongo_find_n_function) +SWITCH_STANDARD_API(mod_mongo_find_n_function) { switch_status_t status = SWITCH_STATUS_SUCCESS; char *db = NULL, *collection = NULL, *json_query = NULL, *json_fields = NULL, *query_options_str = NULL; @@ -223,7 +240,7 @@ SWITCH_STANDARD_API(mongo_find_n_function) if (!zstr(db) && !zstr(collection) && !zstr(json_query) && !zstr(json_fields)) { bson_error_t error; - mongoc_client_t *conn = get_connection(); + mongoc_client_t *conn = get_connection(globals.client_pool, globals.conn_str); if (conn) { mongoc_collection_t *col = mongoc_client_get_collection(conn, db, collection); if (col) { @@ -268,7 +285,7 @@ SWITCH_STANDARD_API(mongo_find_n_function) } else { stream->write_function(stream, "-ERR\nunknown collection: %s\n", collection); } - connection_done(conn); + connection_done(globals.client_pool, conn); } else { stream->write_function(stream, "-ERR\nfailed to get connection!\n"); } @@ -281,7 +298,7 @@ SWITCH_STANDARD_API(mongo_find_n_function) return status; } -SWITCH_STANDARD_API(mongo_find_one_function) +SWITCH_STANDARD_API(mod_mongo_find_one_function) { switch_status_t status = SWITCH_STATUS_SUCCESS; char *db = NULL, *collection = NULL, *json_query = NULL, *json_fields = NULL, *query_options_str = NULL; @@ -308,7 +325,7 @@ SWITCH_STANDARD_API(mongo_find_one_function) if (!zstr(db) && !zstr(collection) && !zstr(json_query) && !zstr(json_fields)) { bson_error_t error; - mongoc_client_t *conn = get_connection(); + mongoc_client_t *conn = get_connection(globals.client_pool, globals.conn_str); if (conn) { mongoc_collection_t *col = mongoc_client_get_collection(conn, db, collection); if (col) { @@ -348,7 +365,7 @@ SWITCH_STANDARD_API(mongo_find_one_function) } else { stream->write_function(stream, "-ERR\nunknown collection: %s\n", collection); } - connection_done(conn); + connection_done(globals.client_pool, conn); } else { stream->write_function(stream, "-ERR\nfailed to get connection!\n"); } @@ -361,7 +378,362 @@ SWITCH_STANDARD_API(mongo_find_one_function) return status; } -static switch_status_t config(switch_memory_pool_t *pool) +/** + * Calculate resource count from BSON document + */ +static switch_status_t mod_mongo_get_count(switch_core_session_t *session, const char *key, const bson_t *b, int *new_val_ret, char **resource_ret) +{ + switch_status_t status = SWITCH_STATUS_SUCCESS; + bson_iter_t iter; + if (new_val_ret) { + if (bson_iter_init_find(&iter, b, key) && BSON_ITER_HOLDS_INT32(&iter)) { + *new_val_ret = bson_iter_int32(&iter); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Failed to get resource count\n"); + status = SWITCH_STATUS_GENERR; + } + } + if (resource_ret) { + if (bson_iter_init_find(&iter, b, "_id") && BSON_ITER_HOLDS_UTF8(&iter)) { + uint32_t len; + const char *resource = bson_iter_utf8(&iter, &len); + if (!zstr(resource)) { + if (bson_utf8_validate(resource, len, 0)) { + *resource_ret = strdup(resource); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Resource name is not valid utf8\n"); + status = SWITCH_STATUS_GENERR; + } + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Resource name is empty string"); + status = SWITCH_STATUS_GENERR; + } + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Resource name not found\n"); + status = SWITCH_STATUS_GENERR; + } + } + return status; +} + +/** + * Increment a resource by val + * @param session + * @param resource name of resource being incremented + * @param val number to increment resource by + * @param max maximum value of resource allowed + * @param new_val_ret new value of resource after increment completed + */ +static switch_status_t mod_mongo_increment(switch_core_session_t *session, const char *resource, int val, int max, int *new_val_ret) +{ + switch_status_t status = SWITCH_STATUS_GENERR; + mongoc_client_t *conn = get_connection(globals.limit_client_pool, globals.limit_conn_str); + if (conn) { + mongoc_collection_t *col = mongoc_client_get_collection(conn, globals.limit_database, globals.limit_collection); + if (col) { + int upsert; + bson_t *query, *update, reply; + bson_error_t error; + + /* construct update query - the counts are stored as: + { _id: "realm_resource", total: 29, "fs-1": 5, "fs-2": 10, "fs-3": 3, "fs-4": 11 } + */ + if (val >= 0) { + if (max > 0) { + /* increment if < max */ + query = BCON_NEW("_id", resource, + "total", "{", "$lt", BCON_INT32(max), "}"); + upsert = 1; /* will fail with duplicate index key error if total condition is not satisfied */ + } else { + /* increment, no restrictions */ + query = BCON_NEW("_id", resource); + upsert = 1; + } + } else { + /* don't allow decrement below 0, don't add fields that don't exist */ + query = BCON_NEW("_id", resource, + "total", "{", "$gte", BCON_INT32(-val), "}", + switch_core_get_switchname(), "{", "$gte", BCON_INT32(-val), "}"); + upsert = 0; + } + update = BCON_NEW("$inc", "{", "total", BCON_INT32(val), switch_core_get_switchname(), BCON_INT32(val), "}"); + + if (!mongoc_collection_find_and_modify(col, query, NULL, update, NULL, false, upsert, true, &reply, &error)) { + if (max > 0 && error.code == 11000) { + /* duplicate key index error - limit exceeded */ + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "Usage for %s exceeds maximum rate of %d\n", resource, max); + status = SWITCH_STATUS_FALSE; + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Increment %s by %d failed: %s\n", resource, val, error.message); + status = SWITCH_STATUS_GENERR; + } + } else if (new_val_ret) { + status = mod_mongo_get_count(session, "total", &reply, new_val_ret, NULL); + } else { + status = SWITCH_STATUS_SUCCESS; + } + + bson_destroy(query); + bson_destroy(update); + bson_destroy(&reply); + mongoc_collection_destroy(col); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Increment %s by %d failed: unable to get collection %s from database %s\n", resource, val, globals.limit_collection, globals.limit_database); + } + connection_done(globals.limit_client_pool, conn); + } + return status; +} + +/** + * Get resource usage + */ +static switch_status_t mod_mongo_get_usage(const char *resource, int *usage) +{ + switch_status_t status = SWITCH_STATUS_GENERR; + mongoc_client_t *conn = get_connection(globals.limit_client_pool, globals.limit_conn_str); + if (conn) { + mongoc_collection_t *col = mongoc_client_get_collection(conn, globals.limit_database, globals.limit_collection); + if (col) { + bson_t *query = BCON_NEW("_id", resource); + bson_t *fields = BCON_NEW("{", "total", BCON_INT32(1), "}"); + bson_error_t error; + mongoc_cursor_t *cursor = mongoc_collection_find(col, 0, 0, 1, 0, query, fields, NULL); + if (cursor) { + if (!mongoc_cursor_error(cursor, &error)) { + /* get result from cursor */ + const bson_t *result; + if (mongoc_cursor_more(cursor) && mongoc_cursor_next(cursor, &result)) { + status = mod_mongo_get_count(NULL, "total", result, usage, NULL); + } + } + mongoc_cursor_destroy(cursor); + } + bson_destroy(query); + bson_destroy(fields); + mongoc_collection_destroy(col); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Get usage failed: unable to get collection %s from database %s\n", globals.limit_collection, globals.limit_database); + } + connection_done(globals.limit_client_pool, conn); + } + return status; +} + +/** + * Clear all limits on this server + */ +static switch_status_t mod_mongo_reset(void) +{ + switch_status_t status = SWITCH_STATUS_GENERR; + mongoc_client_t *conn = get_connection(globals.limit_client_pool, globals.limit_conn_str); + if (conn) { + mongoc_collection_t *col = mongoc_client_get_collection(conn, globals.limit_database, globals.limit_collection); + if (col) { + bson_t *query; + //bson_t *fields; + mongoc_cursor_t *cursor; + bson_error_t error; + query = BCON_NEW(switch_core_get_switchname(), "{", "$gt", BCON_INT32(0), "}"); + //fields = BCON_NEW(switch_core_get_switchname(), "1"); + + /* find all docs w/ this server and clear its counts */ + switch_thread_rwlock_wrlock(globals.limit_database_rwlock); /* prevent increments on this server */ + cursor = mongoc_collection_find(col, 0, 0, 0, 0, query, NULL, NULL); + if (cursor) { + if (!mongoc_cursor_error(cursor, &error)) { + /* get result from cursor */ + const bson_t *result; + char *resource = NULL; + while (mongoc_cursor_more(cursor) && mongoc_cursor_next(cursor, &result)) { + int count = 0; + if ((status = mod_mongo_get_count(NULL, switch_core_get_switchname(), result, &count, &resource)) == SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reset %s, -%d\n", resource, count); + if (count > 0 && !zstr(resource)) { + /* decrement server counts from mongo */ + if ((status = mod_mongo_increment(NULL, resource, -count, 0, NULL)) == SWITCH_STATUS_GENERR) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset done - increment error\n"); + break; + } + } + switch_safe_free(resource); + resource = NULL; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reset failed - get count\n"); + break; + } + } + switch_safe_free(resource); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset failed: %s\n", error.message); + } + mongoc_cursor_destroy(cursor); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset failed: NULL cursor returned\n"); + } + switch_thread_rwlock_unlock(globals.limit_database_rwlock); + + bson_destroy(query); + //bson_destroy(fields); + mongoc_collection_destroy(col); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset failed: unable to get collection %s from database %s\n", globals.limit_collection, globals.limit_database); + } + connection_done(globals.limit_client_pool, conn); + } + return status; +} + +/** + * Clean up all entries w/ resource count of 0 + */ +static switch_status_t mod_mongo_cleanup(void) +{ + switch_status_t status = SWITCH_STATUS_GENERR; + mongoc_client_t *conn = get_connection(globals.limit_client_pool, globals.limit_conn_str); + if (conn) { + mongoc_collection_t *col = mongoc_client_get_collection(conn, globals.limit_database, globals.limit_collection); + if (col) { + bson_t *selector = BCON_NEW("total", BCON_INT32(0)); + bson_error_t error; + if (mongoc_collection_remove(col, 0, selector, NULL, &error)) { + status = SWITCH_STATUS_SUCCESS; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Cleanup failed: %s\n", error.message); + } + bson_destroy(selector); + mongoc_collection_destroy(col); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Cleanup failed: unable to get collection %s from database %s\n", globals.limit_collection, globals.limit_database); + } + connection_done(globals.limit_client_pool, conn); + } + return status; +} + +/** + * @brief Enforces limit_mongo restrictions + * @param session current session + * @param realm limit realm + * @param id limit id + * @param max maximum count + * @param interval interval for rate limiting + * @return SWITCH_TRUE if the access is allowed, SWITCH_FALSE if it isn't + */ +SWITCH_LIMIT_INCR(mod_mongo_limit_incr) +{ + switch_status_t status = SWITCH_STATUS_FALSE; + switch_channel_t *channel = switch_core_session_get_channel(session); + const char *limit_id = switch_core_session_sprintf(session, "%s_%s", realm, resource); + + /* get session's resource tracking information */ + struct mod_mongo_private *pvt = switch_channel_get_private(channel, "limit_mongo"); + if (!pvt) { + switch_mutex_lock(globals.mod_mongo_private_mutex); /* prevents concurrent alloc of mod_mongo_private */ + pvt = switch_channel_get_private(channel, "limit_mongo"); + if (!pvt) { + pvt = (struct mod_mongo_private *) switch_core_session_alloc(session, sizeof(*pvt)); + switch_core_hash_init(&pvt->resources); + switch_mutex_init(&pvt->mutex, SWITCH_MUTEX_UNNESTED, switch_core_session_get_pool(session)); + switch_channel_set_private(channel, "limit_mongo", pvt); + } + switch_mutex_unlock(globals.mod_mongo_private_mutex); + } + + switch_mutex_lock(pvt->mutex); /* prevents concurrent increment in session */ + switch_thread_rwlock_rdlock(globals.limit_database_rwlock); /* prevent reset operation on this server */ + + /* check if resource is already incremented on this session */ + if (!switch_core_hash_find(pvt->resources, limit_id)) { + /* increment resource usage */ + if ((status = mod_mongo_increment(session, limit_id, 1, max, NULL)) == SWITCH_STATUS_SUCCESS) { + /* remember this resource was incremented */ + switch_core_hash_insert(pvt->resources, limit_id, limit_id); + } + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "%s already acquired\n", limit_id); + } + + switch_thread_rwlock_unlock(globals.limit_database_rwlock); + switch_mutex_unlock(pvt->mutex); + + return status; +} + +/** + * @brief Releases usage of a limit_mongo-controlled resource + */ +SWITCH_LIMIT_RELEASE(mod_mongo_limit_release) +{ + switch_channel_t *channel = switch_core_session_get_channel(session); + struct mod_mongo_private *pvt = switch_channel_get_private(channel, "limit_mongo"); + int status = SWITCH_STATUS_SUCCESS; + + if (!pvt) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "No limit tracking data for channel\n"); + return SWITCH_STATUS_SUCCESS; + } + + switch_mutex_lock(pvt->mutex); /* prevents concurrent decrement in session */ + switch_thread_rwlock_rdlock(globals.limit_database_rwlock); /* prevent reset operation on this server */ + + /* no realm / resource = clear all resources */ + if (realm == NULL && resource == NULL) { + /* clear all resources */ + switch_hash_index_t *hi = NULL; + while ((hi = switch_core_hash_first_iter(pvt->resources, hi))) { + void *p_val = NULL; + const void *p_key; + switch_ssize_t keylen; + switch_core_hash_this(hi, &p_key, &keylen, &p_val); + if (mod_mongo_increment(session, (const char *)p_key, -1, 0, NULL) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Couldn't decrement %s\n", (const char *)p_key); + status = SWITCH_STATUS_FALSE; + break; + } else { + switch_core_hash_delete(pvt->resources, (const char *) p_key); + } + } + } else if (!zstr(realm) && !zstr(resource)) { + /* clear specific resource */ + const char *limit_id = switch_core_session_sprintf(session, "%s_%s", realm, resource); + if (switch_core_hash_find(pvt->resources, limit_id)) { + if (mod_mongo_increment(session, limit_id, -1, 0, NULL) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Couldn't decrement %s\n", limit_id); + } else { + switch_core_hash_delete(pvt->resources, limit_id); + } + } + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Missing either realm or resource to release\n"); + } + + switch_thread_rwlock_unlock(globals.limit_database_rwlock); + switch_mutex_unlock(pvt->mutex); + + return status; +} + +SWITCH_LIMIT_USAGE(mod_mongo_limit_usage) +{ + char *limit_id = switch_mprintf("%s_%s", realm, resource); + int usage = 0; + mod_mongo_get_usage(limit_id, &usage); + switch_safe_free(limit_id); + return usage; +} + +SWITCH_LIMIT_RESET(mod_mongo_limit_reset) +{ + return mod_mongo_reset(); +} + +SWITCH_LIMIT_STATUS(mod_mongo_limit_status) +{ + return strdup("-ERR not supported"); +} + +static switch_status_t do_config(switch_memory_pool_t *pool) { const char *cf = "mongo.conf"; switch_xml_t cfg, xml, settings, param; @@ -372,8 +744,12 @@ static switch_status_t config(switch_memory_pool_t *pool) globals.reduce = ""; globals.finalize = ""; globals.conn_str = ""; - globals.uri = NULL; globals.client_pool = NULL; + globals.limit_database = "limit"; + globals.limit_collection = "mod_mongo"; + globals.limit_conn_str = ""; + globals.limit_client_pool = NULL; + globals.limit_cleanup_interval_sec = 300; if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf); @@ -391,21 +767,75 @@ static switch_status_t config(switch_memory_pool_t *pool) status = SWITCH_STATUS_GENERR; goto done; } else { + mongoc_uri_t *uri; globals.conn_str = switch_core_strdup(pool, val); - globals.uri = mongoc_uri_new(globals.conn_str); - if (globals.uri) { - globals.client_pool = mongoc_client_pool_new(globals.uri); + uri = mongoc_uri_new(globals.conn_str); + if (uri) { + globals.client_pool = mongoc_client_pool_new(uri); + mongoc_uri_destroy(uri); if (!globals.client_pool) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to pool for connection-string: %s\n", globals.conn_str); status = SWITCH_STATUS_GENERR; goto done; } + if (!globals.limit_client_pool) { + /* use connection-string for limit backend unless overriden by limit-connection-string */ + globals.limit_client_pool = globals.client_pool; + globals.limit_conn_str = globals.conn_str; + } } else { + mongoc_uri_destroy(uri); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid connection-string: %s\n", globals.conn_str); status = SWITCH_STATUS_GENERR; goto done; } } + } else if (!strcmp(var, "limit-connection-string")) { + if (zstr(val)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing limit-connection-string - using connection-string instead\n"); + continue; + } else { + mongoc_uri_t *uri; + globals.limit_conn_str = switch_core_strdup(pool, val); + uri = mongoc_uri_new(globals.limit_conn_str); + if (uri) { + globals.limit_client_pool = mongoc_client_pool_new(uri); + mongoc_uri_destroy(uri); + if (!globals.limit_client_pool) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to pool for limit-connection-string: %s\n", globals.limit_conn_str); + status = SWITCH_STATUS_GENERR; + goto done; + } + } else { + mongoc_uri_destroy(uri); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid limit-connection-string: %s\n", globals.limit_conn_str); + status = SWITCH_STATUS_GENERR; + goto done; + } + } + } else if (!strcmp(var, "limit-database")) { + if (zstr(val)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing limit-database - using '%s'\n", globals.limit_database); + } else { + globals.limit_database = switch_core_strdup(pool, val); + } + } else if (!strcmp(var, "limit-collection")) { + if (zstr(val)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing limit-collection - using '%s'\n", globals.limit_collection); + } else { + globals.limit_collection = switch_core_strdup(pool, val); + } + } else if (!strcmp(var, "limit-cleanup-interval-sec")) { + if (zstr(val) || !switch_is_number(val)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "bad value of limit-cleanup-interval-sec\n"); + } else { + int new_interval = atoi(val); + if (new_interval >= 0) { + globals.limit_cleanup_interval_sec = new_interval; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "limit-cleanup-interval-sec must be >= 0\n"); + } + } } else if (!strcmp(var, "map")) { if (!zstr(val)) { globals.map = switch_core_strdup(pool, val); @@ -435,29 +865,69 @@ done: SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load) { - switch_api_interface_t *api_interface; + switch_api_interface_t *api_interface = NULL; + switch_limit_interface_t *limit_interface = NULL; *module_interface = switch_loadable_module_create_module_interface(pool, modname); memset(&globals, 0, sizeof(globals)); - if (config(pool) != SWITCH_STATUS_SUCCESS) { + if (do_config(pool) != SWITCH_STATUS_SUCCESS) { return SWITCH_STATUS_TERM; } - SWITCH_ADD_API(api_interface, "mongo_find_one", "findOne", mongo_find_one_function, FIND_ONE_SYNTAX); - SWITCH_ADD_API(api_interface, "mongo_find_n", "find", mongo_find_n_function, FIND_N_SYNTAX); - SWITCH_ADD_API(api_interface, "mongo_mapreduce", "Map/Reduce", mongo_mapreduce_function, MAPREDUCE_SYNTAX); + switch_mutex_init(&globals.mod_mongo_private_mutex, SWITCH_MUTEX_UNNESTED, pool); + switch_thread_rwlock_create(&globals.limit_database_rwlock, pool); + switch_thread_rwlock_create(&globals.shutdown_rwlock, pool); + + /* clear all entries */ + mod_mongo_reset(); + + SWITCH_ADD_API(api_interface, "mongo_find_one", "findOne", mod_mongo_find_one_function, FIND_ONE_SYNTAX); + SWITCH_ADD_API(api_interface, "mongo_find_n", "find", mod_mongo_find_n_function, FIND_N_SYNTAX); + SWITCH_ADD_API(api_interface, "mongo_mapreduce", "Map/Reduce", mod_mongo_mapreduce_function, MAPREDUCE_SYNTAX); + + SWITCH_ADD_LIMIT(limit_interface, "mongo", mod_mongo_limit_incr, mod_mongo_limit_release, mod_mongo_limit_usage, mod_mongo_limit_reset, mod_mongo_limit_status, NULL); return SWITCH_STATUS_SUCCESS; } +/** + * Periodically cleanup mongo limit counters + */ +SWITCH_MODULE_RUNTIME_FUNCTION(mod_mongo_runtime) +{ + switch_interval_time_t cleanup_time = switch_micro_time_now() + (globals.limit_cleanup_interval_sec * 1000 * 1000); + switch_thread_rwlock_rdlock(globals.shutdown_rwlock); + while(!globals.shutdown && globals.limit_cleanup_interval_sec) { + switch_micro_sleep(1 * 1000 * 1000); + if (!globals.shutdown && switch_micro_time_now() > cleanup_time) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Cleanup\n"); + mod_mongo_cleanup(); + cleanup_time = switch_micro_time_now() + (globals.limit_cleanup_interval_sec * 1000 * 1000); + } + } + switch_thread_rwlock_unlock(globals.shutdown_rwlock); + return SWITCH_STATUS_TERM; +} + SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mongo_shutdown) { + globals.shutdown = 1; + switch_thread_rwlock_wrlock(globals.shutdown_rwlock); + switch_thread_rwlock_unlock(globals.shutdown_rwlock); + if (globals.limit_client_pool && globals.limit_client_pool != globals.client_pool) { + mongoc_client_pool_destroy(globals.limit_client_pool); + globals.limit_client_pool = NULL; + } if (globals.client_pool) { mongoc_client_pool_destroy(globals.client_pool); globals.client_pool = NULL; } + if (globals.mod_mongo_private_mutex) { + switch_mutex_destroy(globals.mod_mongo_private_mutex); + globals.mod_mongo_private_mutex = NULL; + } return SWITCH_STATUS_SUCCESS; }