From c18745176d1289fb97944bb60e636248da5e8f0f Mon Sep 17 00:00:00 2001 From: Michael Jerris Date: Tue, 9 Dec 2008 21:33:57 +0000 Subject: [PATCH] initial merge of mod_limit hash based rework from scripts/contrib/mrene git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@10688 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- src/mod/applications/mod_limit/mod_limit.c | 355 ++++++++++++++++++++- 1 file changed, 339 insertions(+), 16 deletions(-) diff --git a/src/mod/applications/mod_limit/mod_limit.c b/src/mod/applications/mod_limit/mod_limit.c index b7ae2365c0..6d5d0fd7b8 100644 --- a/src/mod/applications/mod_limit/mod_limit.c +++ b/src/mod/applications/mod_limit/mod_limit.c @@ -25,7 +25,8 @@ * * Anthony Minessale II * Ken Rice + * * mod_limit.c -- Resource Limit Module * */ @@ -44,6 +45,10 @@ static struct { char *dbname; char *odbc_dsn; switch_mutex_t *mutex; + switch_mutex_t *limit_hash_mutex; + switch_hash_t *limit_hash; + switch_mutex_t *db_hash_mutex; + switch_hash_t *db_hash; #ifdef SWITCH_HAVE_ODBC switch_odbc_handle_t *master_odbc; #else @@ -51,6 +56,14 @@ static struct { #endif } globals; +struct limit_hash_item { + uint32_t total_usage; + uint32_t rate_usage; + time_t last_check; +}; +typedef struct limit_hash_item limit_hash_item_t; + + static char limit_sql[] = "CREATE TABLE limit_data (\n" " hostname VARCHAR(255),\n" " realm VARCHAR(255),\n" " id VARCHAR(255),\n" " uuid VARCHAR(255)\n" ");\n"; @@ -247,26 +260,56 @@ static switch_status_t do_config() return status; } -static switch_status_t hanguphook(switch_core_session_t *session) +static switch_status_t db_state_handler(switch_core_session_t *session) { switch_channel_t *channel = switch_core_session_get_channel(session); switch_channel_state_t state = switch_channel_get_state(channel); - const char *realm = NULL; - const char *id = NULL; char *sql = NULL; if (state == CS_HANGUP || state == CS_ROUTING) { - id = switch_channel_get_variable(channel, "limit_id"); - realm = switch_channel_get_variable(channel, "limit_realm"); - sql = switch_mprintf("delete from limit_data where uuid='%q'", + sql = switch_mprintf("delete from limit_data where uuid='%q';", switch_core_session_get_uuid(session)); limit_execute_sql(sql, globals.mutex); switch_safe_free(sql); - switch_core_event_hook_remove_state_change(session, hanguphook); + switch_core_event_hook_remove_state_change(session, db_state_handler); } return SWITCH_STATUS_SUCCESS; } +static switch_status_t hash_state_handler(switch_core_session_t *session) +{ + switch_channel_t *channel = switch_core_session_get_channel(session); + switch_channel_state_t state = switch_channel_get_state(channel); + switch_hash_t *channel_hash = switch_channel_get_private(channel, "limit_hash"); + + /* The call is either hung up, or is going back into the dialplan, decrement appropriate couters */ + if (state == CS_HANGUP || state == CS_ROUTING) { + switch_hash_index_t *hi; + switch_mutex_lock(globals.mutex); + + /* Loop through the channel's hashtable which contains mapping to all the limit_hash_item_t referenced by that channel */ + for(hi = switch_hash_first(NULL, channel_hash); hi; hi = switch_hash_next(hi)) + { + void *val = NULL; + const void *key; + switch_ssize_t keylen; + limit_hash_item_t *item = NULL; + + switch_hash_this(hi, &key, &keylen, &val); + + item = (limit_hash_item_t*)val; + + /* We keep the structure even though the count is 0 so we do not allocate too often */ + item->total_usage--; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Usage for %s is now %d\n", (const char*)key, item->total_usage); + } + switch_core_event_hook_remove_state_change(session, hash_state_handler); + switch_mutex_unlock(globals.mutex); + } + + return SWITCH_STATUS_SUCCESS; +} + struct callback { char *buf; size_t len; @@ -303,6 +346,7 @@ SWITCH_STANDARD_API(db_api_function) if (!switch_strlen_zero(cmd)) { mydata = strdup(cmd); + switch_assert(mydata); argc = switch_separate_string(mydata, '/', argv, (sizeof(argv) / sizeof(argv[0]))); } @@ -376,10 +420,15 @@ SWITCH_STANDARD_APP(db_function) } if (argc < 3 || !argv[0]) { - goto error; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "USAGE: db %s\n", DB_USAGE); + return; } if (!strcasecmp(argv[0], "insert")) { + if (argc < 4) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "USAGE: db %s\n", DB_USAGE); + return; + } sql = switch_mprintf("delete from db_data where realm='%q' and data_key='%q'", argv[1], argv[2]); switch_assert(sql); limit_execute_sql(sql, globals.mutex); @@ -395,10 +444,109 @@ SWITCH_STANDARD_APP(db_function) switch_assert(sql); limit_execute_sql(sql, globals.mutex); switch_safe_free(sql); - return; +} - error: - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "USAGE: db %s\n", DB_USAGE); +#define HASH_USAGE "[insert|delete]///" +#define HASH_DESC "save data" + +SWITCH_STANDARD_APP(hash_function) +{ + int argc = 0; + char *argv[4] = { 0 }; + char *mydata = NULL; + char *hash_key = NULL; + char *value = NULL; + + if (!switch_strlen_zero(data)) { + mydata = strdup(data); + switch_assert(mydata); + argc = switch_separate_string(mydata, '/', argv, (sizeof(argv) / sizeof(argv[0]))); + } + + if (argc < 3 || !argv[0]) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "USAGE: hash %s\n", HASH_USAGE); + goto end; + } + + hash_key = switch_mprintf("%s_%s", argv[1], argv[2]); + + switch_mutex_lock(globals.db_hash_mutex); + if (!strcasecmp(argv[0], "insert")) { + if ((value = switch_core_hash_find(globals.db_hash, hash_key))) { + free(value); + switch_core_hash_delete(globals.db_hash, hash_key); + } + value = strdup(argv[3]); + switch_assert(value); + switch_core_hash_insert(globals.db_hash, hash_key, value); + } else if (!strcasecmp(argv[0], "delete")) { + if ((value = switch_core_hash_find(globals.db_hash, hash_key))) { + switch_safe_free(value); + switch_core_hash_delete(globals.db_hash, hash_key); + } + } + switch_mutex_unlock(globals.db_hash_mutex); + +end: + switch_safe_free(mydata); + switch_safe_free(hash_key); +} + +#define HASH_API_USAGE "insert|select|delete/realm/key[/value]" +SWITCH_STANDARD_API(hash_api_function) +{ + int argc = 0; + char *argv[4] = { 0 }; + char *mydata = NULL; + char *value = NULL; + char *hash_key = NULL; + + switch_mutex_lock(globals.db_hash_mutex); + + if (!switch_strlen_zero(cmd)) { + mydata = strdup(cmd); + switch_assert(mydata); + argc = switch_separate_string(mydata, '/', argv, (sizeof(argv) / sizeof(argv[0]))); + } + + if (argc < 3 || !argv[0]) { + stream->write_function(stream, "-ERR Usage: hash %s\n", HASH_API_USAGE); + goto end; + } + + hash_key = switch_mprintf("%s_%s", argv[1], argv[2]); + + if (!strcasecmp(argv[0], "insert")) { + if (argc < 4) { + stream->write_function(stream, "-ERR Usage: hash %s\n", HASH_API_USAGE); + goto end; + } + if ((value = switch_core_hash_find(globals.db_hash, hash_key))) { + switch_safe_free(value); + switch_core_hash_delete(globals.db_hash, hash_key); + } + value = strdup(argv[3]); + switch_assert(value); + switch_core_hash_insert(globals.db_hash, hash_key, value); + stream->write_function(stream, "+OK\n"); + } else if (!strcasecmp(argv[0], "delete")) { + if ((value = switch_core_hash_find(globals.db_hash, hash_key))) { + switch_safe_free(value); + switch_core_hash_delete(globals.db_hash, hash_key); + } + stream->write_function(stream, "+OK\n"); + } else if (!strcasecmp(argv[0], "select")) { + if ((value = switch_core_hash_find(globals.db_hash, hash_key))) { + stream->write_function(stream, "%s", value); + } + } + +end: + switch_mutex_unlock(globals.db_hash_mutex); + switch_safe_free(mydata); + switch_safe_free(hash_key); + + return SWITCH_STATUS_SUCCESS; } SWITCH_STANDARD_API(group_api_function) @@ -513,8 +661,8 @@ SWITCH_STANDARD_APP(group_function) } } -#define LIMIT_USAGE " [transfer_destination_number]" -#define LIMIT_DESC "limit access to an extension" +#define LIMIT_USAGE " [number [dialplan [context]]]" +#define LIMIT_DESC "limit access to a resource and transfer to an extension if the limit is exceeded" static char *limit_def_xfer_exten = "limit_exceeded"; SWITCH_STANDARD_APP(limit_function) @@ -530,6 +678,7 @@ SWITCH_STANDARD_APP(limit_function) char buf[80] = ""; callback_t cbt = { 0 }; switch_channel_t *channel = switch_core_session_get_channel(session); + switch_bool_t new_channel = SWITCH_FALSE; if (!switch_strlen_zero(data)) { mydata = switch_core_session_strdup(session, data); @@ -556,7 +705,8 @@ SWITCH_STANDARD_APP(limit_function) if (max < 0) { max = 0; } - + + new_channel = !switch_channel_get_variable(channel, "limit_realm"); switch_channel_set_variable(channel, "limit_realm", realm); switch_channel_set_variable(channel, "limit_id", id); switch_channel_set_variable(channel, "limit_max", argv[2]); @@ -572,7 +722,9 @@ SWITCH_STANDARD_APP(limit_function) goto done; } - switch_core_event_hook_add_state_change(session, hanguphook); + if (new_channel) { + switch_core_event_hook_add_state_change(session, db_state_handler); + } sql = switch_mprintf("insert into limit_data (hostname, realm, id, uuid) values('%q','%q','%q','%q');", globals.hostname, realm, id, switch_core_session_get_uuid(session)); @@ -583,6 +735,166 @@ SWITCH_STANDARD_APP(limit_function) switch_mutex_unlock(globals.mutex); } +#define LIMITHASH_USAGE " [/interval] [number [dialplan [context]]]" +#define LIMITHASH_DESC "limit access to a resource and transfer to an extension if the limit is exceeded" +SWITCH_STANDARD_APP(limit_hash_function) +{ + int argc = 0; + char *argv[6] = { 0 }; + char *mydata = NULL; + char *realm = NULL; + char *id = NULL; + char *hashkey = NULL; + char *xfer_exten = NULL; + int max = 0; + int interval = 0; + char *szinterval = NULL; + limit_hash_item_t *item = NULL; + switch_channel_t *channel = switch_core_session_get_channel(session); + time_t now = switch_timestamp(NULL); + switch_hash_t *channel_hash = NULL; + uint8_t increment = 1; + uint8_t new_channel = 0; + + /* Parse application data */ + if (!switch_strlen_zero(data)) { + mydata = switch_core_session_strdup(session, data); + argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0]))); + } + + if (argc < 3) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "USAGE: limit_hash %s\n", LIMITHASH_USAGE); + return; + } + + realm = argv[0]; + id = argv[1]; + if ((szinterval = strchr(argv[2], '/'))) + { + *szinterval++ = '\0'; + interval = atoi(szinterval); + } + + max = atoi(argv[2]); + + if (argc >= 4) { + xfer_exten = argv[3]; + } else { + xfer_exten = limit_def_xfer_exten; + } + + if (max < 0) { + max = 0; + } + + hashkey = switch_core_session_sprintf(session, "%s_%s", realm, id); + + switch_mutex_lock(globals.limit_hash_mutex); + /* Check if that realm+id has ever been checked */ + if (!(item = (limit_hash_item_t*)switch_core_hash_find(globals.limit_hash, hashkey))) { + /* No, create an empty structure and add it, then continue like as if it existed */ + item = (limit_hash_item_t*)switch_core_alloc(globals.pool, sizeof(limit_hash_item_t)); + memset(item, 0, sizeof(limit_hash_item_t)); + switch_core_hash_insert(globals.limit_hash, hashkey, item); + } + + /* Did we already run on this channel before? */ + if ((channel_hash = switch_channel_get_private(channel, "limit_hash"))) + { + /* Yes, but check if we did that realm+id */ + if (!switch_core_hash_find(channel_hash, hashkey)) { + /* No, add it to our table so the state handler can take care of it */ + switch_core_hash_insert(channel_hash, hashkey, item); + } else { + /* Yes, dont touch total counter */ + increment = 0; + } + } else { + /* This is the first limit check on this channel, create a hashtable, set our prviate data and add a state handler */ + new_channel = 1; + } + + if (interval > 0) { + if (item->last_check <= (now - interval)) { + item->rate_usage = 1; + item->last_check = now; + } else { + /* Always increment rate when its checked as it doesnt depend on the channel */ + item->rate_usage++; + + if (item->rate_usage > (uint32_t)max) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Usage for %s exceeds maximum rate of %d/%ds, now at %d\n", hashkey, max, interval, item->rate_usage); + switch_ivr_session_transfer(session, xfer_exten, argv[4], argv[5]); + goto end; + } + } + } else if (item->total_usage + increment > (uint32_t)max) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Usage for %s is already at max value (%d)\n", hashkey, item->total_usage); + switch_ivr_session_transfer(session, xfer_exten, argv[4], argv[5]); + goto end; + } + + if (increment) { + item->total_usage++; + + if (interval == 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Usage for %s is now %d/%d\n", hashkey, item->total_usage, max); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Usage for %s is now %d/%d for the last %d seconds\n", hashkey, item->rate_usage, max, interval); + } + } + + + if (new_channel) { + switch_core_hash_init(&channel_hash, switch_core_session_get_pool(session)); + switch_core_hash_insert(channel_hash, hashkey, item); + switch_channel_set_private(channel, "limit_hash", channel_hash); + switch_core_event_hook_add_state_change(session, hash_state_handler); + } + +end: + switch_mutex_unlock(globals.limit_hash_mutex); +} + +#define LIMIT_HASH_USAGE_USAGE " " +SWITCH_STANDARD_API(limit_hash_usage_function) +{ + int argc = 0; + char *argv[3] = { 0 }; + char *mydata = NULL; + char *hash_key = NULL; + limit_hash_item_t *item = NULL; + uint32_t count = 0; + + switch_mutex_lock(globals.limit_hash_mutex); + + if (!switch_strlen_zero(cmd)) { + mydata = strdup(cmd); + switch_assert(mydata); + argc = switch_separate_string(mydata, ' ', argv, (sizeof(argv) / sizeof(argv[0]))); + } + + if (argc < 2) { + stream->write_function(stream, "USAGE: limit_hash_usage %s\n", LIMIT_HASH_USAGE_USAGE); + goto end; + } + + hash_key = switch_mprintf("%s_%s", argv[0], argv[1]); + + if ((item = switch_core_hash_find(globals.limit_hash, hash_key))) { + count = item->total_usage; + } + + stream->write_function(stream, "%d", count); + +end: + switch_safe_free(mydata); + switch_safe_free(hash_key); + switch_mutex_unlock(globals.limit_hash_mutex); + + return SWITCH_STATUS_SUCCESS; +} + SWITCH_MODULE_LOAD_FUNCTION(mod_limit_load) { switch_status_t status; @@ -598,18 +910,29 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_limit_load) } switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool); + switch_mutex_init(&globals.limit_hash_mutex, SWITCH_MUTEX_NESTED, globals.pool); + switch_mutex_init(&globals.db_hash_mutex, SWITCH_MUTEX_NESTED, globals.pool); + switch_core_hash_init(&globals.limit_hash, pool); + switch_core_hash_init(&globals.db_hash, pool); /* connect my internal structure to the blank pointer passed to me */ *module_interface = switch_loadable_module_create_module_interface(pool, modname); SWITCH_ADD_APP(app_interface, "limit", "Limit", LIMIT_DESC, limit_function, LIMIT_USAGE, SAF_SUPPORT_NOMEDIA); + SWITCH_ADD_APP(app_interface, "limit_hash", "Limit (hash)", LIMITHASH_DESC, limit_hash_function, LIMITHASH_USAGE, SAF_SUPPORT_NOMEDIA); SWITCH_ADD_APP(app_interface, "db", "Insert to the db", DB_DESC, db_function, DB_USAGE, SAF_SUPPORT_NOMEDIA); + SWITCH_ADD_APP(app_interface, "hash", "Insert into the hashtable", HASH_DESC, hash_function, HASH_USAGE, SAF_SUPPORT_NOMEDIA) SWITCH_ADD_APP(app_interface, "group", "Manage a group", GROUP_DESC, group_function, GROUP_USAGE, SAF_SUPPORT_NOMEDIA); + SWITCH_ADD_API(commands_api_interface, "limit_hash_usage", "Gets the usage count of a limited resource", limit_hash_usage_function, LIMIT_HASH_USAGE_USAGE); SWITCH_ADD_API(commands_api_interface, "db", "db get/set", db_api_function, "[insert|delete|select]///"); switch_console_set_complete("add db insert"); switch_console_set_complete("add db delete"); switch_console_set_complete("add db select"); + SWITCH_ADD_API(commands_api_interface, "hash", "hash get/set", hash_api_function, "[insert|delete|select]///"); + switch_console_set_complete("add hash insert"); + switch_console_set_complete("add hash delete"); + switch_console_set_complete("add hash select"); SWITCH_ADD_API(commands_api_interface, "group", "group [insert|delete|call]", group_api_function, "[insert|delete|call]::"); switch_console_set_complete("add group insert"); switch_console_set_complete("add group delete");