[mod_kazoo] maintenance

* [mod_kazoo] add configured sleep time between each io fault

* [mod_kazoo] add configured delay  before fetching initial configuration

* [mod_kazoo] add explicit handling of http_cache for kz_moh
This commit is contained in:
lazedo 2020-03-13 16:19:21 +00:00 committed by GitHub
parent 1f268e0292
commit 3a0ee19f5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 68 additions and 6 deletions

View File

@ -450,6 +450,7 @@ SWITCH_STANDARD_APP(kz_moh_function)
switch_file_handle_t fh = { 0 }; switch_file_handle_t fh = { 0 };
const char *var_samples = switch_channel_get_variable_dup(channel, "moh_playback_samples", SWITCH_FALSE, -1); const char *var_samples = switch_channel_get_variable_dup(channel, "moh_playback_samples", SWITCH_FALSE, -1);
unsigned int samples = 0; unsigned int samples = 0;
char * my_data = NULL;
if (var_samples) { if (var_samples) {
fh.samples = samples = atoi(var_samples); fh.samples = samples = atoi(var_samples);
@ -458,7 +459,20 @@ SWITCH_STANDARD_APP(kz_moh_function)
switch_channel_set_variable(channel, SWITCH_PLAYBACK_TERMINATOR_USED, ""); switch_channel_set_variable(channel, SWITCH_PLAYBACK_TERMINATOR_USED, "");
status = switch_ivr_play_file(session, &fh, data, NULL); /*
* hack for proper position
*/
if (!strncmp(data, "http_cache://", 13) && session) {
switch_channel_t *channel = switch_core_session_get_channel(session);
char * resolve = switch_mprintf("${http_get({prefetch=true}%s)}", data+13);
my_data = switch_channel_expand_variables_check(channel, resolve, NULL, NULL, 0);
} else {
my_data = strdup(data);
}
status = switch_ivr_play_file(session, &fh, my_data, NULL);
// status = switch_ivr_play_file(session, &fh, data, NULL);
switch_assert(!(fh.flags & SWITCH_FILE_OPEN)); switch_assert(!(fh.flags & SWITCH_FILE_OPEN));
switch (status) { switch (status) {
@ -495,6 +509,7 @@ SWITCH_STANDARD_APP(kz_moh_function)
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH sample_count %ld\n", fh.sample_count); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH sample_count %ld\n", fh.sample_count);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH samples %d\n", fh.samples); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH samples %d\n", fh.samples);
switch_safe_free(my_data);
} }
SWITCH_STANDARD_APP(noop_function) SWITCH_STANDARD_APP(noop_function)

View File

@ -186,6 +186,7 @@ struct kz_globals_s {
switch_port_t port; switch_port_t port;
int config_fetched; int config_fetched;
int io_fault_tolerance; int io_fault_tolerance;
switch_interval_time_t io_fault_tolerance_sleep;
kazoo_event_profile_ptr events; kazoo_event_profile_ptr events;
kazoo_config_ptr definitions; kazoo_config_ptr definitions;
kazoo_config_ptr event_handlers; kazoo_config_ptr event_handlers;
@ -199,6 +200,8 @@ struct kz_globals_s {
uint8_t tweaks[KZ_TWEAK_MAX]; uint8_t tweaks[KZ_TWEAK_MAX];
switch_bool_t expand_headers_on_fetch; switch_bool_t expand_headers_on_fetch;
switch_interval_time_t delay_before_initial_fetch;
}; };
typedef struct kz_globals_s kz_globals_t; typedef struct kz_globals_s kz_globals_t;

View File

@ -127,8 +127,10 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
kazoo_globals.node_receiver_queue_timeout = 100000; kazoo_globals.node_receiver_queue_timeout = 100000;
kazoo_globals.node_sender_queue_timeout = 0; kazoo_globals.node_sender_queue_timeout = 0;
kazoo_globals.port = 0; kazoo_globals.port = 0;
kazoo_globals.io_fault_tolerance = 10; kazoo_globals.io_fault_tolerance = 3;
kazoo_globals.io_fault_tolerance_sleep = 100000; // 100 ms
kazoo_globals.json_encoding = ERLANG_TUPLE; kazoo_globals.json_encoding = ERLANG_TUPLE;
kazoo_globals.delay_before_initial_fetch = 10000000;
kazoo_globals.legacy_events = SWITCH_FALSE; kazoo_globals.legacy_events = SWITCH_FALSE;
kazoo_globals.expand_headers_on_fetch = SWITCH_TRUE; kazoo_globals.expand_headers_on_fetch = SWITCH_TRUE;
@ -206,9 +208,21 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
} else if (!strcmp(var, "event-stream-framing")) { } else if (!strcmp(var, "event-stream-framing")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set event-stream-framing: %s\n", val); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set event-stream-framing: %s\n", val);
kazoo_globals.event_stream_framing = atoi(val); kazoo_globals.event_stream_framing = atoi(val);
} else if (!strcmp(var, "io-fault-tolerance")) { } else if (!strcmp(var, "io-fault-tolerance")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set io-fault-tolerance: %s\n", val); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set io-fault-tolerance: %s\n", val);
kazoo_globals.io_fault_tolerance = atoi(val); kazoo_globals.io_fault_tolerance = atoi(val);
} else if (!strcmp(var, "io-fault-tolerance-sleep-micro")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
kazoo_globals.io_fault_tolerance_sleep = atoi(val);
} else if (!strcmp(var, "io-fault-tolerance-sleep-ms")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
kazoo_globals.io_fault_tolerance_sleep = atoi(val) * 1000;
} else if (!strcmp(var, "io-fault-tolerance-sleep-sec")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
kazoo_globals.io_fault_tolerance_sleep = atoi(val) * 1000000;
} else if (!strcmp(var, "node-worker-threads")) { } else if (!strcmp(var, "node-worker-threads")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set node-worker-threads: %s\n", val); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set node-worker-threads: %s\n", val);
kazoo_globals.node_worker_threads = atoi(val); kazoo_globals.node_worker_threads = atoi(val);
@ -232,6 +246,15 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
} else if (!strcmp(var, "event-stream-queue-timeout")) { } else if (!strcmp(var, "event-stream-queue-timeout")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
kazoo_globals.event_stream_queue_timeout = atoi(val); kazoo_globals.event_stream_queue_timeout = atoi(val);
} else if (!strcmp(var, "delay-before-initial-fetch-micro")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
kazoo_globals.delay_before_initial_fetch = atoi(val);
} else if (!strcmp(var, "delay-before-initial-fetch-ms")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
kazoo_globals.delay_before_initial_fetch = atoi(val) * 1000;
} else if (!strcmp(var, "delay-before-initial-fetch-sec")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set %s : %s\n", var, val);
kazoo_globals.delay_before_initial_fetch = atoi(val) * 1000000;
} else { } else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "unknown config option %s : %s\n", var, val); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "unknown config option %s : %s\n", var, val);
} }

View File

@ -922,12 +922,29 @@ static void fetch_config_handlers(switch_memory_pool_t *pool)
static void *SWITCH_THREAD_FUNC fetch_config_exec(switch_thread_t *thread, void *obj) static void *SWITCH_THREAD_FUNC fetch_config_exec(switch_thread_t *thread, void *obj)
{ {
switch_memory_pool_t *pool = (switch_memory_pool_t *) obj; switch_memory_pool_t *pool = (switch_memory_pool_t *) obj;
ei_node_t *node;
int fetch_filters = 0, fetch_handlers = 0;
// give some time for node initialization // give some time for node initialization
switch_sleep(10000); switch_sleep(kazoo_globals.delay_before_initial_fetch);
for (node = kazoo_globals.ei_nodes; node != NULL; node = node->next) {
if (node->legacy ) {
fetch_filters++;
} else {
fetch_handlers++;
}
}
if (fetch_filters) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "fetching filters for kazoo\n");
fetch_config_filters(pool); fetch_config_filters(pool);
}
if (fetch_handlers) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "fetching kazoo handlers\n");
fetch_config_handlers(pool); fetch_config_handlers(pool);
}
kazoo_globals.config_fetched = 1; kazoo_globals.config_fetched = 1;
@ -941,7 +958,7 @@ void fetch_config()
switch_threadattr_t *thd_attr = NULL; switch_threadattr_t *thd_attr = NULL;
switch_uuid_t uuid; switch_uuid_t uuid;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "fetching kazoo config\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "scheduling fetch for kazoo config\n");
switch_core_new_memory_pool(&pool); switch_core_new_memory_pool(&pool);

View File

@ -1521,6 +1521,7 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj)
switch (status) { switch (status) {
case ERL_TICK: case ERL_TICK:
/* erlang nodes send ticks to eachother to validate they are still reachable, we dont have to do anything here */ /* erlang nodes send ticks to eachother to validate they are still reachable, we dont have to do anything here */
fault_count = 0;
break; break;
case ERL_MSG: case ERL_MSG:
fault_count = 0; fault_count = 0;
@ -1543,6 +1544,7 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj)
case EAGAIN: case EAGAIN:
/* if ei_xreceive_msg_tmo just timed out, ignore it and let the while loop check if we are still running */ /* if ei_xreceive_msg_tmo just timed out, ignore it and let the while loop check if we are still running */
/* the erlang lib just wants us to try to receive again, so we will! */ /* the erlang lib just wants us to try to receive again, so we will! */
fault_count = 0;
break; break;
case EMSGSIZE: case EMSGSIZE:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Erlang communication fault with node %p %s (%s:%d): my spoon is too big\n", (void *)ei_node, ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Erlang communication fault with node %p %s (%s:%d): my spoon is too big\n", (void *)ei_node, ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port);
@ -1553,6 +1555,8 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj)
if (fault_count >= kazoo_globals.io_fault_tolerance) { if (fault_count >= kazoo_globals.io_fault_tolerance) {
switch_clear_flag(ei_node, LFLAG_RUNNING); switch_clear_flag(ei_node, LFLAG_RUNNING);
} else {
switch_sleep(kazoo_globals.io_fault_tolerance_sleep);
} }
break; break;