Add cdr queueing
Can be enabled with 'queue-capacity' param Cdrs can be pushed to the queue and posted in a separate thread to avoid bllocking the session with a slow web server
This commit is contained in:
parent
45e19b75dd
commit
a75a0ab919
|
@ -73,8 +73,18 @@ static struct {
|
|||
switch_memory_pool_t *pool;
|
||||
switch_event_node_t *node;
|
||||
int encode_values;
|
||||
switch_queue_t *queue;
|
||||
switch_thread_t *thread;
|
||||
} globals;
|
||||
|
||||
typedef struct {
|
||||
char *json_text;
|
||||
char *json_text_escaped;
|
||||
char *logdir;
|
||||
char *uuid;
|
||||
char *filename;
|
||||
} cdr_data_t;
|
||||
|
||||
SWITCH_MODULE_LOAD_FUNCTION(mod_json_cdr_load);
|
||||
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_json_cdr_shutdown);
|
||||
SWITCH_MODULE_DEFINITION(mod_json_cdr, mod_json_cdr_load, mod_json_cdr_shutdown, NULL);
|
||||
|
@ -183,84 +193,98 @@ static switch_status_t set_json_cdr_log_dirs()
|
|||
return status;
|
||||
}
|
||||
|
||||
|
||||
static switch_status_t my_on_reporting(switch_core_session_t *session)
|
||||
static void backup_cdr(cdr_data_t *data)
|
||||
{
|
||||
if (globals.log_errors_to_disk) {
|
||||
int fd = -1, err_dir_index;
|
||||
char *path = NULL;
|
||||
const char *json_text = data->json_text_escaped ? data->json_text_escaped : data->json_text;
|
||||
|
||||
for (err_dir_index = 0; err_dir_index < globals.err_dir_count; err_dir_index++) {
|
||||
switch_thread_rwlock_rdlock(globals.log_path_lock);
|
||||
path = switch_mprintf("%s%s%s", globals.err_log_dir[err_dir_index], SWITCH_PATH_SEPARATOR, data->filename);
|
||||
switch_thread_rwlock_unlock(globals.log_path_lock);
|
||||
|
||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_INFO, "Backup file %s\n", path);
|
||||
if (path) {
|
||||
#ifdef _MSC_VER
|
||||
mode_t mode = S_IRUSR | S_IWUSR;
|
||||
#else
|
||||
mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
|
||||
#endif
|
||||
if ((fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, mode)) > -1) {
|
||||
int wrote = write(fd, json_text, (unsigned) strlen(json_text));
|
||||
close(fd);
|
||||
fd = -1;
|
||||
if (wrote < 0) {
|
||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Error writing [%s]\n",path);
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
char ebuf[512] = { 0 };
|
||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Can't open %s! [%s]\n",
|
||||
path, switch_strerror_r(errno, ebuf, sizeof(ebuf)));
|
||||
|
||||
}
|
||||
switch_safe_free(path);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_NOTICE, "Not writing to file\n");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void destroy_cdr_data(cdr_data_t *data)
|
||||
{
|
||||
switch_safe_free(data->json_text);
|
||||
switch_safe_free(data->json_text_escaped);
|
||||
switch_safe_free(data->uuid);
|
||||
switch_safe_free(data->filename);
|
||||
switch_safe_free(data->logdir);
|
||||
switch_safe_free(data);
|
||||
}
|
||||
|
||||
static void process_cdr(cdr_data_t *data)
|
||||
{
|
||||
cJSON *json_cdr = NULL;
|
||||
char *json_text = NULL;
|
||||
char *path = NULL;
|
||||
char *curl_json_text = NULL;
|
||||
const char *logdir = NULL;
|
||||
char *json_text_escaped = NULL;
|
||||
int fd = -1, err_dir_index;
|
||||
uint32_t cur_try;
|
||||
long httpRes;
|
||||
CURL *curl_handle = NULL;
|
||||
switch_curl_slist_t *headers = NULL;
|
||||
switch_curl_slist_t *slist = NULL;
|
||||
switch_channel_t *channel = switch_core_session_get_channel(session);
|
||||
switch_status_t status = SWITCH_STATUS_FALSE;
|
||||
int is_b;
|
||||
const char *a_prefix = "";
|
||||
int fd = -1;
|
||||
uint32_t cur_try;
|
||||
|
||||
switch_assert(data != NULL);
|
||||
|
||||
if (globals.shutdown) {
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
goto end;
|
||||
}
|
||||
|
||||
is_b = channel && switch_channel_get_originator_caller_profile(channel);
|
||||
if (!globals.log_b && is_b) {
|
||||
const char *force_cdr = switch_channel_get_variable(channel, SWITCH_FORCE_PROCESS_CDR_VARIABLE);
|
||||
if (!switch_true(force_cdr)) {
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
}
|
||||
if (!is_b && globals.prefix_a)
|
||||
a_prefix = "a_";
|
||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_INFO, "Process [%s]\n", data->filename);
|
||||
|
||||
|
||||
if (switch_ivr_generate_json_cdr(session, &json_cdr, globals.encode_values == ENCODING_DEFAULT) != SWITCH_STATUS_SUCCESS) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Generating Data!\n");
|
||||
return SWITCH_STATUS_FALSE;
|
||||
}
|
||||
|
||||
json_text = cJSON_PrintUnformatted(json_cdr);
|
||||
|
||||
if (!json_text) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error!\n");
|
||||
goto error;
|
||||
}
|
||||
|
||||
switch_thread_rwlock_rdlock(globals.log_path_lock);
|
||||
|
||||
if (!(logdir = switch_channel_get_variable(channel, "json_cdr_base"))) {
|
||||
logdir = globals.log_dir;
|
||||
}
|
||||
|
||||
if (!zstr(logdir) && (globals.log_http_and_disk || !globals.url_count)) {
|
||||
path = switch_mprintf("%s%s%s%s.cdr.json", logdir, SWITCH_PATH_SEPARATOR, a_prefix, switch_core_session_get_uuid(session));
|
||||
switch_thread_rwlock_unlock(globals.log_path_lock);
|
||||
if (!zstr(data->logdir) && (globals.log_http_and_disk || !globals.url_count)) {
|
||||
char *path = switch_mprintf("%s%s%s", data->logdir, SWITCH_PATH_SEPARATOR, data->filename);
|
||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_INFO, "Log to disk [%s]\n", path);
|
||||
if (path) {
|
||||
#ifdef _MSC_VER
|
||||
if ((fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR)) > -1) {
|
||||
#else
|
||||
if ((fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)) > -1) {
|
||||
#endif
|
||||
int wrote;
|
||||
wrote = write(fd, json_text, (unsigned) strlen(json_text));
|
||||
int wrote = write(fd, data->json_text, (unsigned) strlen(data->json_text));
|
||||
close(fd);
|
||||
fd = -1;
|
||||
if(wrote < 0) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error writing [%s]\n",path);
|
||||
if (wrote < 0) {
|
||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Error writing [%s]\n",path);
|
||||
}
|
||||
} else {
|
||||
char ebuf[512] = { 0 };
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error writing [%s][%s]\n",
|
||||
path, switch_strerror_r(errno, ebuf, sizeof(ebuf)));
|
||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Error writing [%s][%s]\n",
|
||||
path, switch_strerror_r(errno, ebuf, sizeof(ebuf)));
|
||||
}
|
||||
switch_safe_free(path);
|
||||
}
|
||||
} else {
|
||||
switch_thread_rwlock_unlock(globals.log_path_lock);
|
||||
}
|
||||
|
||||
/* try to post it to the web server */
|
||||
|
@ -269,33 +293,20 @@ static switch_status_t my_on_reporting(switch_core_session_t *session)
|
|||
curl_handle = switch_curl_easy_init();
|
||||
|
||||
if (globals.encode) {
|
||||
switch_size_t need_bytes = strlen(json_text) * 3;
|
||||
|
||||
json_text_escaped = malloc(need_bytes);
|
||||
switch_assert(json_text_escaped);
|
||||
memset(json_text_escaped, 0, need_bytes);
|
||||
if (globals.encode == ENCODING_DEFAULT) {
|
||||
headers = switch_curl_slist_append(headers, "Content-Type: application/x-www-form-urlencoded");
|
||||
switch_url_encode(json_text, json_text_escaped, need_bytes);
|
||||
} else {
|
||||
headers = switch_curl_slist_append(headers, "Content-Type: application/x-www-form-base64-encoded");
|
||||
switch_b64_encode((unsigned char *) json_text, need_bytes / 3, (unsigned char *) json_text_escaped, need_bytes);
|
||||
}
|
||||
|
||||
switch_safe_free(json_text);
|
||||
json_text = json_text_escaped;
|
||||
|
||||
if (!(curl_json_text = switch_mprintf("cdr=%s", json_text))) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error!\n");
|
||||
goto error;
|
||||
}
|
||||
|
||||
curl_json_text = switch_mprintf("cdr=%s", data->json_text_escaped);
|
||||
switch_assert(curl_json_text != NULL);
|
||||
|
||||
} else {
|
||||
headers = switch_curl_slist_append(headers, "Content-Type: application/json");
|
||||
curl_json_text = (char *)json_text;
|
||||
curl_json_text = (char *)data->json_text;
|
||||
}
|
||||
|
||||
|
||||
if (!zstr(globals.cred)) {
|
||||
switch_curl_easy_setopt(curl_handle, CURLOPT_HTTPAUTH, globals.auth_scheme);
|
||||
switch_curl_easy_setopt(curl_handle, CURLOPT_USERPWD, globals.cred);
|
||||
|
@ -347,7 +358,7 @@ static switch_status_t my_on_reporting(switch_core_session_t *session)
|
|||
switch_yield(globals.delay * 1000000);
|
||||
}
|
||||
|
||||
destUrl = switch_mprintf("%s?uuid=%s", globals.urls[globals.url_index], switch_core_session_get_uuid(session));
|
||||
destUrl = switch_mprintf("%s?uuid=%s", globals.urls[globals.url_index], data->uuid);
|
||||
switch_curl_easy_setopt(curl_handle, CURLOPT_URL, destUrl);
|
||||
|
||||
if (!strncasecmp(destUrl, "https", 5)) {
|
||||
|
@ -367,16 +378,16 @@ static switch_status_t my_on_reporting(switch_core_session_t *session)
|
|||
switch_curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &httpRes);
|
||||
switch_safe_free(destUrl);
|
||||
if (httpRes >= 200 && httpRes < 300) {
|
||||
goto success;
|
||||
goto end;
|
||||
} else {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Got error [%ld] posting to web server [%s]\n",
|
||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Got error [%ld] posting to web server [%s]\n",
|
||||
httpRes, globals.urls[globals.url_index]);
|
||||
globals.url_index++;
|
||||
switch_assert(globals.url_count <= MAX_URLS);
|
||||
if (globals.url_index >= globals.url_count) {
|
||||
globals.url_index = 0;
|
||||
}
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Retry will be with url [%s]\n", globals.urls[globals.url_index]);
|
||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Retry will be with url [%s]\n", globals.urls[globals.url_index]);
|
||||
}
|
||||
}
|
||||
switch_curl_easy_cleanup(curl_handle);
|
||||
|
@ -387,45 +398,11 @@ static switch_status_t my_on_reporting(switch_core_session_t *session)
|
|||
curl_handle = NULL;
|
||||
|
||||
/* if we are here the web post failed for some reason */
|
||||
if (globals.log_errors_to_disk) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to post to web server, writing to file\n");
|
||||
|
||||
for (err_dir_index = 0; err_dir_index < globals.err_dir_count; err_dir_index++) {
|
||||
switch_thread_rwlock_rdlock(globals.log_path_lock);
|
||||
path = switch_mprintf("%s%s%s%s.cdr.json", globals.err_log_dir[err_dir_index], SWITCH_PATH_SEPARATOR, a_prefix, switch_core_session_get_uuid(session));
|
||||
switch_thread_rwlock_unlock(globals.log_path_lock);
|
||||
if (path) {
|
||||
#ifdef _MSC_VER
|
||||
if ((fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR)) > -1) {
|
||||
#else
|
||||
if ((fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)) > -1) {
|
||||
#endif
|
||||
int wrote;
|
||||
wrote = write(fd, json_text, (unsigned) strlen(json_text));
|
||||
close(fd);
|
||||
fd = -1;
|
||||
if(wrote < 0) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error writing [%s]\n",path);
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
char ebuf[512] = { 0 };
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't open %s! [%s]\n",
|
||||
path, switch_strerror_r(errno, ebuf, sizeof(ebuf)));
|
||||
|
||||
}
|
||||
|
||||
switch_safe_free(path);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to post to web server, not writing to file\n");
|
||||
}
|
||||
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Unable to post to web server\n");
|
||||
backup_cdr(data);
|
||||
}
|
||||
success:
|
||||
status = SWITCH_STATUS_SUCCESS;
|
||||
|
||||
error:
|
||||
end:
|
||||
if (curl_handle) {
|
||||
switch_curl_easy_cleanup(curl_handle);
|
||||
}
|
||||
|
@ -435,14 +412,119 @@ static switch_status_t my_on_reporting(switch_core_session_t *session)
|
|||
if (slist) {
|
||||
switch_curl_slist_free_all(slist);
|
||||
}
|
||||
if (curl_json_text != json_text) {
|
||||
if (curl_json_text != data->json_text) {
|
||||
switch_safe_free(curl_json_text);
|
||||
}
|
||||
|
||||
destroy_cdr_data(data);
|
||||
}
|
||||
|
||||
static switch_status_t my_on_reporting(switch_core_session_t *session)
|
||||
{
|
||||
cJSON *json_cdr = NULL;
|
||||
char *json_text = NULL;
|
||||
char *json_text_escaped = NULL;
|
||||
switch_channel_t *channel = switch_core_session_get_channel(session);
|
||||
int is_b;
|
||||
const char *a_prefix = "";
|
||||
cdr_data_t *cdr_data = NULL;
|
||||
const char *logdir = NULL;
|
||||
|
||||
if (globals.shutdown) {
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
is_b = channel && switch_channel_get_originator_caller_profile(channel);
|
||||
if (!globals.log_b && is_b) {
|
||||
const char *force_cdr = switch_channel_get_variable(channel, SWITCH_FORCE_PROCESS_CDR_VARIABLE);
|
||||
if (!switch_true(force_cdr)) {
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
}
|
||||
if (!is_b && globals.prefix_a) {
|
||||
a_prefix = "a_";
|
||||
}
|
||||
|
||||
if (switch_ivr_generate_json_cdr(session, &json_cdr, globals.encode_values == ENCODING_DEFAULT) != SWITCH_STATUS_SUCCESS) {
|
||||
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error Generating Data!\n");
|
||||
return SWITCH_STATUS_FALSE;
|
||||
}
|
||||
|
||||
cdr_data = malloc(sizeof(cdr_data_t));
|
||||
switch_assert(cdr_data);
|
||||
|
||||
json_text = cJSON_PrintUnformatted(json_cdr);
|
||||
|
||||
if (globals.url_count && globals.encode) {
|
||||
switch_size_t need_bytes = strlen(json_text) * 3;
|
||||
|
||||
json_text_escaped = malloc(need_bytes);
|
||||
switch_assert(json_text_escaped);
|
||||
memset(json_text_escaped, 0, need_bytes);
|
||||
if (globals.encode == ENCODING_DEFAULT) {
|
||||
switch_url_encode(json_text, json_text_escaped, need_bytes);
|
||||
} else {
|
||||
switch_b64_encode((unsigned char *) json_text, need_bytes / 3, (unsigned char *) json_text_escaped, need_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
cdr_data->uuid = strdup(switch_core_session_get_uuid(session));
|
||||
cdr_data->filename = switch_mprintf("%s%s.cdr.json", a_prefix, cdr_data->uuid);
|
||||
cdr_data->json_text = json_text;
|
||||
cdr_data->json_text_escaped = json_text_escaped;
|
||||
|
||||
switch_thread_rwlock_rdlock(globals.log_path_lock);
|
||||
|
||||
if (!(logdir = switch_channel_get_variable(channel, "json_cdr_base"))) {
|
||||
logdir = globals.log_dir;
|
||||
}
|
||||
cdr_data->logdir = switch_safe_strdup(logdir);
|
||||
|
||||
switch_thread_rwlock_unlock(globals.log_path_lock);
|
||||
|
||||
if (globals.queue) {
|
||||
if (switch_queue_trypush(globals.queue, cdr_data) != SWITCH_STATUS_SUCCESS) {
|
||||
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Unable to push cdr to queue\n");
|
||||
backup_cdr(cdr_data);
|
||||
destroy_cdr_data(cdr_data);
|
||||
}
|
||||
} else {
|
||||
process_cdr(cdr_data);
|
||||
}
|
||||
|
||||
cJSON_Delete(json_cdr);
|
||||
switch_safe_free(json_text);
|
||||
|
||||
return status;
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
static void *SWITCH_THREAD_FUNC cdr_thread(switch_thread_t *t, void *obj)
|
||||
{
|
||||
void *pop = NULL;
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Cdr thread started.\n");
|
||||
|
||||
while (!globals.shutdown) {
|
||||
cdr_data_t *data = NULL;
|
||||
|
||||
if (switch_queue_pop(globals.queue, &pop) != SWITCH_STATUS_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!pop) {
|
||||
break;
|
||||
}
|
||||
|
||||
data = (cdr_data_t *) pop;
|
||||
process_cdr(data);
|
||||
}
|
||||
|
||||
while (switch_queue_trypop(globals.queue, &pop) == SWITCH_STATUS_SUCCESS) {
|
||||
destroy_cdr_data((cdr_data_t *) pop);
|
||||
}
|
||||
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Cdr thread ended.\n");
|
||||
switch_thread_exit(t, SWITCH_STATUS_SUCCESS);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void event_handler(switch_event_t *event)
|
||||
|
@ -598,8 +680,18 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_json_cdr_load)
|
|||
}
|
||||
} else if (!strcasecmp(var, "encode-values") && !zstr(val)) {
|
||||
globals.encode_values = switch_true(val) ? ENCODING_DEFAULT : ENCODING_NONE;
|
||||
}
|
||||
} else if (!strcasecmp(var, "queue-capacity") && !zstr(val)) {
|
||||
int capacity = atoi(val);
|
||||
if (capacity > 0) {
|
||||
switch_threadattr_t *thd_attr;
|
||||
|
||||
switch_queue_create(&globals.queue, capacity, globals.pool);
|
||||
|
||||
switch_threadattr_create(&thd_attr, globals.pool);
|
||||
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||
switch_thread_create(&globals.thread, thd_attr, cdr_thread, NULL, globals.pool);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!globals.err_dir_count) {
|
||||
|
@ -624,12 +716,19 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_json_cdr_load)
|
|||
return status;
|
||||
}
|
||||
|
||||
|
||||
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_json_cdr_shutdown)
|
||||
{
|
||||
int err_dir_index = 0;
|
||||
switch_status_t status;
|
||||
|
||||
globals.shutdown = 1;
|
||||
|
||||
if (globals.queue) {
|
||||
switch_queue_push(globals.queue, NULL);
|
||||
switch_thread_join(&status, globals.thread);
|
||||
}
|
||||
|
||||
switch_safe_free(globals.log_dir);
|
||||
|
||||
for (;err_dir_index < globals.err_dir_count; err_dir_index++) {
|
||||
|
|
Loading…
Reference in New Issue