move recovery engine up into the core

This commit is contained in:
Anthony Minessale 2012-08-22 16:24:09 -05:00 committed by Ken Rice
parent 575f7b6c5b
commit c79643c8c0
9 changed files with 454 additions and 3 deletions

View File

@ -250,6 +250,11 @@ struct switch_runtime {
char *odbc_user;
char *odbc_pass;
char *dbname;
char *recovery_odbc_dsn;
char *recovery_odbc_user;
char *recovery_odbc_pass;
char *recovery_dbname;
switch_dbtype_t recovery_odbc_dbtype;
uint32_t debug_level;
uint32_t runlevel;
uint32_t tipping_point;

View File

@ -2288,6 +2288,8 @@ SWITCH_DECLARE(int) switch_cache_db_affected_rows(switch_cache_db_handle_t *dbh)
SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream);
SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line);
#define switch_core_db_handle(_a) _switch_core_db_handle(_a, __FILE__, __SWITCH_FUNC__, __LINE__)
SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line);
#define switch_core_recovery_db_handle(_a) _switch_core_recovery_db_handle(_a, __FILE__, __SWITCH_FUNC__, __LINE__)
SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_handle_t *db,
const char *test_sql, const char *drop_sql, const char *reactive_sql);
@ -2344,6 +2346,13 @@ SWITCH_DECLARE(void) switch_close_extra_files(int *keep, int keep_ttl);
SWITCH_DECLARE(switch_status_t) switch_core_thread_set_cpu_affinity(int cpu);
SWITCH_DECLARE(void) switch_os_yield(void);
SWITCH_DECLARE(switch_status_t) switch_core_get_stacksizes(switch_size_t *cur, switch_size_t *max);
SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const char *profile_name);
SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session, switch_bool_t force);
SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session);
SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const char *profile_name);
SWITCH_END_EXTERN_C
#endif
/* For Emacs:

View File

@ -184,8 +184,12 @@ struct switch_endpoint_interface {
/* parent */
switch_loadable_module_interface_t *parent;
/* to facilitate linking */
struct switch_endpoint_interface *next;
switch_core_recover_callback_t recover_callback;
};
/*! \brief Abstract handler to a timer module */

View File

@ -1237,6 +1237,10 @@ typedef enum {
CF_CONFIRM_BLIND_TRANSFER,
CF_NO_PRESENCE,
CF_CONFERENCE,
CF_RECOVERING,
CF_RECOVERING_BRIDGE,
CF_TRACKED,
CF_TRACKABLE,
/* WARNING: DO NOT ADD ANY FLAGS BELOW THIS LINE */
/* IF YOU ADD NEW ONES CHECK IF THEY SHOULD PERSIST OR ZERO THEM IN switch_core_session.c switch_core_session_request_xml() */
CF_FLAG_MAX
@ -1778,7 +1782,8 @@ typedef enum {
SCSC_SYNC_CLOCK_WHEN_IDLE,
SCSC_DEBUG_SQL,
SCSC_SQL,
SCSC_API_EXPANSION
SCSC_API_EXPANSION,
SCSC_RECOVER
} switch_session_ctl_t;
typedef enum {
@ -1891,6 +1896,7 @@ typedef switch_status_t (*switch_chat_application_function_t) (switch_event_t *,
typedef void (*switch_application_function_t) (switch_core_session_t *, const char *);
#define SWITCH_STANDARD_APP(name) static void name (switch_core_session_t *session, const char *data)
typedef int (*switch_core_recover_callback_t)(switch_core_session_t *session);
typedef void (*switch_event_callback_t) (switch_event_t *);
typedef switch_caller_extension_t *(*switch_dialplan_hunt_function_t) (switch_core_session_t *, void *, switch_caller_profile_t *);
#define SWITCH_STANDARD_DIALPLAN(name) static switch_caller_extension_t *name (switch_core_session_t *session, void *arg, switch_caller_profile_t *caller_profile)

View File

@ -1789,7 +1789,7 @@ SWITCH_STANDARD_API(status_function)
return SWITCH_STATUS_SUCCESS;
}
#define CTL_SYNTAX "[send_sighup|hupall|pause [inbound|outbound]|resume [inbound|outbound]|shutdown [cancel|elegant|asap|now|restart]|sps|sync_clock|sync_clock_when_idle|reclaim_mem|max_sessions|min_dtmf_duration [num]|max_dtmf_duration [num]|default_dtmf_duration [num]|min_idle_cpu|loglevel [level]|debug_level [level]]"
#define CTL_SYNTAX "[recover|send_sighup|hupall|pause [inbound|outbound]|resume [inbound|outbound]|shutdown [cancel|elegant|asap|now|restart]|sps|sync_clock|sync_clock_when_idle|reclaim_mem|max_sessions|min_dtmf_duration [num]|max_dtmf_duration [num]|default_dtmf_duration [num]|min_idle_cpu|loglevel [level]|debug_level [level]]"
SWITCH_STANDARD_API(ctl_function)
{
int argc;
@ -1808,6 +1808,13 @@ SWITCH_STANDARD_API(ctl_function)
arg = 1;
switch_core_session_ctl(SCSC_HUPALL, &arg);
stream->write_function(stream, "+OK\n");
} else if (!strcasecmp(argv[0], "recover")) {
int r = switch_core_session_ctl(SCSC_RECOVER, argv[1]);
if (r < 0){
stream->write_function(stream, "+OK flushed\n");
} else {
stream->write_function(stream, "+OK %d session(s) recovered in total\n", r);
}
} else if (!strcasecmp(argv[0], "flush_db_handles")) {
switch_core_session_ctl(SCSC_FLUSH_DB_HANDLES, NULL);
stream->write_function(stream, "+OK\n");
@ -5646,6 +5653,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_commands_load)
switch_console_set_complete("add fsctl pause_check inbound");
switch_console_set_complete("add fsctl pause_check outbound");
switch_console_set_complete("add fsctl ready_check");
switch_console_set_complete("add fsctl recover");
switch_console_set_complete("add fsctl shutdown_check");
switch_console_set_complete("add fsctl shutdown");
switch_console_set_complete("add fsctl shutdown asap");

View File

@ -1882,6 +1882,18 @@ static void switch_load_core_config(const char *file)
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC IS NOT AVAILABLE!\n");
}
} else if (!strcasecmp(var, "core-recovery-db-dsn") && !zstr(val)) {
if (switch_odbc_available()) {
runtime.recovery_odbc_dsn = switch_core_strdup(runtime.memory_pool, val);
if ((runtime.recovery_odbc_user = strchr(runtime.recovery_odbc_dsn, ':'))) {
*runtime.recovery_odbc_user++ = '\0';
if ((runtime.recovery_odbc_pass = strchr(runtime.recovery_odbc_user, ':'))) {
*runtime.recovery_odbc_pass++ = '\0';
}
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC IS NOT AVAILABLE!\n");
}
} else if (!strcasecmp(var, "core-odbc-required") && !zstr(val)) {
switch_set_flag((&runtime), SCF_CORE_ODBC_REQ);
} else if (!strcasecmp(var, "core-dbtype") && !zstr(val)) {
@ -2108,6 +2120,44 @@ SWITCH_DECLARE(int32_t) switch_core_session_ctl(switch_session_ctl_t cmd, void *
}
switch (cmd) {
case SCSC_RECOVER:
{
char *arg = (char *) val;
char *tech = NULL, *prof = NULL;
int r, flush = 0;
if (!zstr(arg)) {
tech = strdup(arg);
if ((prof = strchr(tech, ':'))) {
*prof++ = '\0';
}
if (!strcasecmp(tech, "flush")) {
flush++;
if (prof) {
tech = prof;
if ((prof = strchr(tech, ':'))) {
*prof++ = '\0';
}
}
}
}
if (flush) {
switch_core_recovery_flush(tech, prof);
r = -1;
} else {
r = switch_core_recovery_recover(tech, prof);
}
switch_safe_free(tech);
return r;
}
break;
case SCSC_DEBUG_SQL:
{
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {

View File

@ -1788,6 +1788,9 @@ SWITCH_DECLARE(switch_core_session_t *) switch_core_session_request_xml(switch_e
parse_array(flag_str, flags, CF_FLAG_MAX);
parse_array(cap_str, caps, CC_FLAG_MAX);
flags[CF_RECOVERING] = 0;
flags[CF_RECOVERING_BRIDGE] = 0;
flags[CF_TRACKED] = 0;
flags[CF_TRANSFER] = 0;
flags[CF_ACCEPT_CNG] = 0;
flags[CF_REDIRECT] = 0;

View File

@ -218,6 +218,45 @@ SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t
return r;
}
#define SWITCH_CORE_RECOVERY_DB "core_recovery"
SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
{
switch_cache_db_connection_options_t options = { {0} };
switch_status_t r;
if (!sql_manager.manage) {
return SWITCH_STATUS_FALSE;
}
if (zstr(runtime.recovery_odbc_dsn)) {
if (switch_test_flag((&runtime), SCF_CORE_ODBC_REQ)) {
return SWITCH_STATUS_FALSE;
}
if (runtime.recovery_dbname) {
options.core_db_options.db_path = runtime.recovery_dbname;
} else {
options.core_db_options.db_path = SWITCH_CORE_RECOVERY_DB;
}
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
} else {
options.odbc_options.dsn = runtime.recovery_odbc_dsn;
options.odbc_options.user = runtime.recovery_odbc_user;
options.odbc_options.pass = runtime.recovery_odbc_pass;
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
}
/* I *think* we can do without this now, if not let me know
if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) {
(*dbh)->io_mutex = sql_manager.io_mutex;
}
*/
return r;
}
#define SQL_CACHE_TIMEOUT 30
#define SQL_REG_TIMEOUT 15
@ -1805,6 +1844,16 @@ static char detailed_calls_sql[] =
"where a.uuid = c.caller_uuid or a.uuid not in (select callee_uuid from calls)";
static char recovery_sql[] =
"CREATE TABLE recovery (\n"
" runtime_uuid VARCHAR(255),\n"
" technology VARCHAR(255),\n"
" profile_name VARCHAR(255),\n"
" hostname VARCHAR(255),\n"
" uuid VARCHAR(255),\n"
" metadata text\n"
");\n";
static char basic_calls_sql[] =
"create view basic_calls as select "
"a.uuid as uuid,"
@ -1857,6 +1906,289 @@ static char basic_calls_sql[] =
"where a.uuid = c.caller_uuid or a.uuid not in (select callee_uuid from calls)";
SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const char *profile_name)
{
char *sql = NULL;
switch_cache_db_handle_t *dbh;
if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
return;
}
if (zstr(technology)) {
if (zstr(profile_name)) {
sql = switch_mprintf("delete from recovery");
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "INVALID\n");
}
} else {
if (zstr(profile_name)) {
sql = switch_mprintf("delete from recovery where technology='%q' ", technology);
} else {
sql = switch_mprintf("delete from recovery where technology='%q' and profile_name='%q'", technology, profile_name);
}
}
if (sql) {
switch_cache_db_execute_sql(dbh, sql, NULL);
switch_safe_free(sql);
}
switch_cache_db_release_db_handle(&dbh);
}
static int recover_callback(void *pArg, int argc, char **argv, char **columnNames)
{
int *rp = (int *) pArg;
switch_xml_t xml;
switch_endpoint_interface_t *ep;
switch_core_session_t *session;
if (argc < 4) {
return 0;
}
if (!(xml = switch_xml_parse_str_dynamic(argv[4], SWITCH_TRUE))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "XML ERROR\n");
return 0;
}
if (!(ep = switch_loadable_module_get_endpoint_interface(argv[0]))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "EP ERROR\n");
return 0;
}
if (!(session = switch_core_session_request_xml(ep, NULL, xml))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid cdr data, call not recovered\n");
goto end;
}
if (ep->recover_callback) {
switch_caller_extension_t *extension = NULL;
if (ep->recover_callback(session) > 0) {
switch_channel_t *channel = switch_core_session_get_channel(session);
if (switch_channel_get_partner_uuid(channel)) {
switch_channel_set_flag(channel, CF_RECOVERING_BRIDGE);
} else {
switch_xml_t callflow, param, x_extension;
if ((extension = switch_caller_extension_new(session, "recovery", "recovery")) == 0) {
abort();
}
if ((callflow = switch_xml_child(xml, "callflow")) && (x_extension = switch_xml_child(callflow, "extension"))) {
for (param = switch_xml_child(x_extension, "application"); param; param = param->next) {
const char *var = switch_xml_attr_soft(param, "app_name");
const char *val = switch_xml_attr_soft(param, "app_data");
/* skip announcement type apps */
if (strcasecmp(var, "speak") && strcasecmp(var, "playback") && strcasecmp(var, "gentones") && strcasecmp(var, "say")) {
switch_caller_extension_add_application(session, extension, var, val);
}
}
}
switch_channel_set_caller_extension(channel, extension);
}
switch_channel_set_state(channel, CS_INIT);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE,
"Resurrecting fallen channel %s\n", switch_channel_get_name(channel));
switch_core_session_thread_launch(session);
*rp = (*rp) + 1;
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Endpoint %s has no recovery function\n", argv[0]);
}
end:
UNPROTECT_INTERFACE(ep);
switch_xml_free(xml);
return 0;
}
SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const char *profile_name)
{
char *sql = NULL;
char *errmsg = NULL;
switch_cache_db_handle_t *dbh;
int r = 0;
if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
return 0;
}
if (zstr(technology)) {
if (zstr(profile_name)) {
sql = switch_mprintf("select technology, profile_name, hostname, uuid, metadata "
"from recovery where runtime_uuid!='%q'",
switch_core_get_uuid());
} else {
sql = switch_mprintf("select technology, profile_name, hostname, uuid, metadata "
"from recovery where runtime_uuid!='%q' and profile_name='%q'",
switch_core_get_uuid(), profile_name);
}
} else {
if (zstr(profile_name)) {
sql = switch_mprintf("select technology, profile_name, hostname, uuid, metadata "
"from recovery where technology='%q' and runtime_uuid!='%q'",
technology, switch_core_get_uuid());
} else {
sql = switch_mprintf("select technology, profile_name, hostname, uuid, metadata "
"from recovery where technology='%q' and runtime_uuid!='%q' and profile_name='%q'",
technology, switch_core_get_uuid(), profile_name);
}
}
switch_cache_db_execute_sql_callback(dbh, sql, recover_callback, &r, &errmsg);
if (errmsg) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
free(errmsg);
}
switch_safe_free(sql);
if (zstr(technology)) {
if (zstr(profile_name)) {
sql = switch_mprintf("delete from recovery where runtime_uuid!='%q'",
switch_core_get_uuid());
} else {
sql = switch_mprintf("delete from recovery where runtime_uuid!='%q' and profile_name='%q'",
switch_core_get_uuid(), profile_name);
}
} else {
if (zstr(profile_name)) {
sql = switch_mprintf("delete from recovery where runtime_uuid!='%q' and technology='%q' ",
switch_core_get_uuid(), technology);
} else {
sql = switch_mprintf("delete from recovery where runtime_uuid!='%q' and technology='%q' and profile_name='%q'",
switch_core_get_uuid(), technology, profile_name);
}
}
switch_cache_db_execute_sql(dbh, sql, NULL);
switch_safe_free(sql);
switch_cache_db_release_db_handle(&dbh);
return r;
}
SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session, switch_bool_t force)
{
char *sql = NULL;
switch_cache_db_handle_t *dbh;
switch_channel_t *channel = switch_core_session_get_channel(session);
if (!switch_channel_test_flag(channel, CF_TRACKABLE)) {
return;
}
if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
return;
}
if ((switch_channel_test_flag(channel, CF_RECOVERING))) {
return;
}
if (switch_channel_test_flag(channel, CF_TRACKED || force)) {
if (force) {
sql = switch_mprintf("delete from recovery where uuid='%q'", switch_core_session_get_uuid(session));
} else {
sql = switch_mprintf("delete from recovery where runtime_uuid='%q' and uuid='%q'",
switch_core_get_uuid(), switch_core_session_get_uuid(session));
}
switch_cache_db_execute_sql(dbh, sql, NULL);
switch_channel_clear_flag(channel, CF_TRACKED);
switch_safe_free(sql);
}
switch_cache_db_release_db_handle(&dbh);
}
SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session)
{
switch_xml_t cdr = NULL;
char *xml_cdr_text = NULL;
char *sql = NULL;
switch_cache_db_handle_t *dbh;
switch_channel_t *channel = switch_core_session_get_channel(session);
const char *profile_name;
const char *technology;
if (switch_channel_test_flag(channel, CF_RECOVERING) || !switch_channel_test_flag(channel, CF_TRACKABLE)) {
return;
}
profile_name = switch_channel_get_variable_dup(channel, "recovery_profile_name", SWITCH_FALSE, -1);
technology = session->endpoint_interface->interface_name;
if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
return;
}
if (switch_ivr_generate_xml_cdr(session, &cdr) == SWITCH_STATUS_SUCCESS) {
xml_cdr_text = switch_xml_toxml_nolock(cdr, SWITCH_FALSE);
switch_xml_free(cdr);
}
if (xml_cdr_text) {
if (switch_channel_test_flag(channel, CF_TRACKED)) {
sql = switch_mprintf("update recovery set metadata='%q' where uuid='%q'", xml_cdr_text, switch_core_session_get_uuid(session));
} else {
sql = switch_mprintf("insert into recovery (runtime_uuid, technology, profile_name, hostname, uuid, metadata) "
"values ('%q','%q','%q','%q','%q','%q')",
switch_core_get_uuid(), switch_str_nil(technology),
switch_str_nil(profile_name), switch_core_get_hostname(), switch_core_session_get_uuid(session), xml_cdr_text);
}
switch_cache_db_execute_sql(dbh, sql, NULL);
switch_safe_free(sql);
free(xml_cdr_text);
switch_channel_set_flag(channel, CF_TRACKED);
}
switch_cache_db_release_db_handle(&dbh);
}
SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, const char *realm, const char *token, const char *url, uint32_t expires,
const char *network_ip, const char *network_port, const char *network_proto,
const char *metadata)
@ -2171,7 +2503,6 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
}
SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void)
{
switch_mutex_lock(sql_manager.ctl_mutex);
@ -2199,8 +2530,30 @@ SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void)
SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
{
switch_cache_db_handle_t *dbh;
switch_mutex_lock(sql_manager.ctl_mutex);
if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
if (switch_test_flag((&runtime), SCF_CORE_ODBC_REQ)) {
int arg = 1;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC IS REQUIRED!\n");
switch_core_session_ctl(SCSC_SHUTDOWN_NOW, &arg);
}
} else {
switch_cache_db_test_reactive(dbh, "select hostname from recovery", "DROP TABLE recovery", recovery_sql);
switch_cache_db_execute_sql(dbh, "create index recovery1 on recovery(technology)", NULL);
switch_cache_db_execute_sql(dbh, "create index recovery2 on recovery(profile_name)", NULL);
switch_cache_db_execute_sql(dbh, "create index recovery3 on recovery(uuid)", NULL);
switch_cache_db_execute_sql(dbh, "create index recovery3 on recovery(runtime_uuid)", NULL);
switch_cache_db_release_db_handle(&dbh);
}
if (sql_manager.manage) {
top:

View File

@ -43,9 +43,19 @@ static void switch_core_standard_on_init(switch_core_session_t *session)
static void switch_core_standard_on_hangup(switch_core_session_t *session)
{
switch_caller_extension_t *extension;
int rec;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s Standard HANGUP, cause: %s\n",
switch_channel_get_name(session->channel), switch_channel_cause2str(switch_channel_get_cause(session->channel)));
rec = switch_channel_test_flag(session->channel, CF_RECOVERING);
switch_channel_clear_flag(session->channel, CF_RECOVERING);
if (!rec) {
switch_core_recovery_untrack(session, SWITCH_TRUE);
}
if (!switch_channel_test_flag(session->channel, CF_ZOMBIE_EXEC)) {
return;
@ -71,6 +81,9 @@ static void switch_core_standard_on_hangup(switch_core_session_t *session)
}
}
static void switch_core_standard_on_reporting(switch_core_session_t *session)