diff --git a/src/mod/event_handlers/mod_json_cdr/mod_json_cdr.c b/src/mod/event_handlers/mod_json_cdr/mod_json_cdr.c index a5dce577d1..186e8ccf40 100644 --- a/src/mod/event_handlers/mod_json_cdr/mod_json_cdr.c +++ b/src/mod/event_handlers/mod_json_cdr/mod_json_cdr.c @@ -73,8 +73,18 @@ static struct { switch_memory_pool_t *pool; switch_event_node_t *node; int encode_values; + switch_queue_t *queue; + switch_thread_t *thread; } globals; +typedef struct { + char *json_text; + char *json_text_escaped; + char *logdir; + char *uuid; + char *filename; +} cdr_data_t; + SWITCH_MODULE_LOAD_FUNCTION(mod_json_cdr_load); SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_json_cdr_shutdown); SWITCH_MODULE_DEFINITION(mod_json_cdr, mod_json_cdr_load, mod_json_cdr_shutdown, NULL); @@ -183,84 +193,98 @@ static switch_status_t set_json_cdr_log_dirs() return status; } - -static switch_status_t my_on_reporting(switch_core_session_t *session) +static void backup_cdr(cdr_data_t *data) +{ + if (globals.log_errors_to_disk) { + int fd = -1, err_dir_index; + char *path = NULL; + const char *json_text = data->json_text_escaped ? data->json_text_escaped : data->json_text; + + for (err_dir_index = 0; err_dir_index < globals.err_dir_count; err_dir_index++) { + switch_thread_rwlock_rdlock(globals.log_path_lock); + path = switch_mprintf("%s%s%s", globals.err_log_dir[err_dir_index], SWITCH_PATH_SEPARATOR, data->filename); + switch_thread_rwlock_unlock(globals.log_path_lock); + + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_INFO, "Backup file %s\n", path); + if (path) { +#ifdef _MSC_VER + mode_t mode = S_IRUSR | S_IWUSR; +#else + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; +#endif + if ((fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, mode)) > -1) { + int wrote = write(fd, json_text, (unsigned) strlen(json_text)); + close(fd); + fd = -1; + if (wrote < 0) { + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Error writing [%s]\n",path); + } + break; + } else { + char ebuf[512] = { 0 }; + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Can't open %s! [%s]\n", + path, switch_strerror_r(errno, ebuf, sizeof(ebuf))); + + } + switch_safe_free(path); + } + } + } else { + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_NOTICE, "Not writing to file\n"); + } +} + + +void destroy_cdr_data(cdr_data_t *data) +{ + switch_safe_free(data->json_text); + switch_safe_free(data->json_text_escaped); + switch_safe_free(data->uuid); + switch_safe_free(data->filename); + switch_safe_free(data->logdir); + switch_safe_free(data); +} + +static void process_cdr(cdr_data_t *data) { - cJSON *json_cdr = NULL; - char *json_text = NULL; - char *path = NULL; char *curl_json_text = NULL; - const char *logdir = NULL; - char *json_text_escaped = NULL; - int fd = -1, err_dir_index; - uint32_t cur_try; long httpRes; CURL *curl_handle = NULL; switch_curl_slist_t *headers = NULL; switch_curl_slist_t *slist = NULL; - switch_channel_t *channel = switch_core_session_get_channel(session); - switch_status_t status = SWITCH_STATUS_FALSE; - int is_b; - const char *a_prefix = ""; + int fd = -1; + uint32_t cur_try; + + switch_assert(data != NULL); if (globals.shutdown) { - return SWITCH_STATUS_SUCCESS; + goto end; } - is_b = channel && switch_channel_get_originator_caller_profile(channel); - if (!globals.log_b && is_b) { - const char *force_cdr = switch_channel_get_variable(channel, SWITCH_FORCE_PROCESS_CDR_VARIABLE); - if (!switch_true(force_cdr)) { - return SWITCH_STATUS_SUCCESS; - } - } - if (!is_b && globals.prefix_a) - a_prefix = "a_"; + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_INFO, "Process [%s]\n", data->filename); - - if (switch_ivr_generate_json_cdr(session, &json_cdr, globals.encode_values == ENCODING_DEFAULT) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Generating Data!\n"); - return SWITCH_STATUS_FALSE; - } - - json_text = cJSON_PrintUnformatted(json_cdr); - - if (!json_text) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error!\n"); - goto error; - } - - switch_thread_rwlock_rdlock(globals.log_path_lock); - - if (!(logdir = switch_channel_get_variable(channel, "json_cdr_base"))) { - logdir = globals.log_dir; - } - - if (!zstr(logdir) && (globals.log_http_and_disk || !globals.url_count)) { - path = switch_mprintf("%s%s%s%s.cdr.json", logdir, SWITCH_PATH_SEPARATOR, a_prefix, switch_core_session_get_uuid(session)); - switch_thread_rwlock_unlock(globals.log_path_lock); + if (!zstr(data->logdir) && (globals.log_http_and_disk || !globals.url_count)) { + char *path = switch_mprintf("%s%s%s", data->logdir, SWITCH_PATH_SEPARATOR, data->filename); + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_INFO, "Log to disk [%s]\n", path); if (path) { #ifdef _MSC_VER if ((fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR)) > -1) { #else if ((fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)) > -1) { #endif - int wrote; - wrote = write(fd, json_text, (unsigned) strlen(json_text)); + int wrote = write(fd, data->json_text, (unsigned) strlen(data->json_text)); close(fd); fd = -1; - if(wrote < 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error writing [%s]\n",path); + if (wrote < 0) { + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Error writing [%s]\n",path); } } else { char ebuf[512] = { 0 }; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error writing [%s][%s]\n", - path, switch_strerror_r(errno, ebuf, sizeof(ebuf))); + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Error writing [%s][%s]\n", + path, switch_strerror_r(errno, ebuf, sizeof(ebuf))); } switch_safe_free(path); } - } else { - switch_thread_rwlock_unlock(globals.log_path_lock); } /* try to post it to the web server */ @@ -269,33 +293,20 @@ static switch_status_t my_on_reporting(switch_core_session_t *session) curl_handle = switch_curl_easy_init(); if (globals.encode) { - switch_size_t need_bytes = strlen(json_text) * 3; - - json_text_escaped = malloc(need_bytes); - switch_assert(json_text_escaped); - memset(json_text_escaped, 0, need_bytes); if (globals.encode == ENCODING_DEFAULT) { headers = switch_curl_slist_append(headers, "Content-Type: application/x-www-form-urlencoded"); - switch_url_encode(json_text, json_text_escaped, need_bytes); } else { headers = switch_curl_slist_append(headers, "Content-Type: application/x-www-form-base64-encoded"); - switch_b64_encode((unsigned char *) json_text, need_bytes / 3, (unsigned char *) json_text_escaped, need_bytes); - } - - switch_safe_free(json_text); - json_text = json_text_escaped; - - if (!(curl_json_text = switch_mprintf("cdr=%s", json_text))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error!\n"); - goto error; } + curl_json_text = switch_mprintf("cdr=%s", data->json_text_escaped); + switch_assert(curl_json_text != NULL); + } else { headers = switch_curl_slist_append(headers, "Content-Type: application/json"); - curl_json_text = (char *)json_text; + curl_json_text = (char *)data->json_text; } - if (!zstr(globals.cred)) { switch_curl_easy_setopt(curl_handle, CURLOPT_HTTPAUTH, globals.auth_scheme); switch_curl_easy_setopt(curl_handle, CURLOPT_USERPWD, globals.cred); @@ -347,7 +358,7 @@ static switch_status_t my_on_reporting(switch_core_session_t *session) switch_yield(globals.delay * 1000000); } - destUrl = switch_mprintf("%s?uuid=%s", globals.urls[globals.url_index], switch_core_session_get_uuid(session)); + destUrl = switch_mprintf("%s?uuid=%s", globals.urls[globals.url_index], data->uuid); switch_curl_easy_setopt(curl_handle, CURLOPT_URL, destUrl); if (!strncasecmp(destUrl, "https", 5)) { @@ -367,16 +378,16 @@ static switch_status_t my_on_reporting(switch_core_session_t *session) switch_curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &httpRes); switch_safe_free(destUrl); if (httpRes >= 200 && httpRes < 300) { - goto success; + goto end; } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Got error [%ld] posting to web server [%s]\n", + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Got error [%ld] posting to web server [%s]\n", httpRes, globals.urls[globals.url_index]); globals.url_index++; switch_assert(globals.url_count <= MAX_URLS); if (globals.url_index >= globals.url_count) { globals.url_index = 0; } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Retry will be with url [%s]\n", globals.urls[globals.url_index]); + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Retry will be with url [%s]\n", globals.urls[globals.url_index]); } } switch_curl_easy_cleanup(curl_handle); @@ -387,45 +398,11 @@ static switch_status_t my_on_reporting(switch_core_session_t *session) curl_handle = NULL; /* if we are here the web post failed for some reason */ - if (globals.log_errors_to_disk) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to post to web server, writing to file\n"); - - for (err_dir_index = 0; err_dir_index < globals.err_dir_count; err_dir_index++) { - switch_thread_rwlock_rdlock(globals.log_path_lock); - path = switch_mprintf("%s%s%s%s.cdr.json", globals.err_log_dir[err_dir_index], SWITCH_PATH_SEPARATOR, a_prefix, switch_core_session_get_uuid(session)); - switch_thread_rwlock_unlock(globals.log_path_lock); - if (path) { -#ifdef _MSC_VER - if ((fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR)) > -1) { -#else - if ((fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)) > -1) { -#endif - int wrote; - wrote = write(fd, json_text, (unsigned) strlen(json_text)); - close(fd); - fd = -1; - if(wrote < 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error writing [%s]\n",path); - } - break; - } else { - char ebuf[512] = { 0 }; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't open %s! [%s]\n", - path, switch_strerror_r(errno, ebuf, sizeof(ebuf))); - - } - - switch_safe_free(path); - } - } - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unable to post to web server, not writing to file\n"); - } + switch_log_printf(SWITCH_CHANNEL_UUID_LOG(data->uuid), SWITCH_LOG_ERROR, "Unable to post to web server\n"); + backup_cdr(data); } - success: - status = SWITCH_STATUS_SUCCESS; - error: + end: if (curl_handle) { switch_curl_easy_cleanup(curl_handle); } @@ -435,14 +412,119 @@ static switch_status_t my_on_reporting(switch_core_session_t *session) if (slist) { switch_curl_slist_free_all(slist); } - if (curl_json_text != json_text) { + if (curl_json_text != data->json_text) { switch_safe_free(curl_json_text); } + + destroy_cdr_data(data); +} + +static switch_status_t my_on_reporting(switch_core_session_t *session) +{ + cJSON *json_cdr = NULL; + char *json_text = NULL; + char *json_text_escaped = NULL; + switch_channel_t *channel = switch_core_session_get_channel(session); + int is_b; + const char *a_prefix = ""; + cdr_data_t *cdr_data = NULL; + const char *logdir = NULL; + + if (globals.shutdown) { + return SWITCH_STATUS_SUCCESS; + } + + is_b = channel && switch_channel_get_originator_caller_profile(channel); + if (!globals.log_b && is_b) { + const char *force_cdr = switch_channel_get_variable(channel, SWITCH_FORCE_PROCESS_CDR_VARIABLE); + if (!switch_true(force_cdr)) { + return SWITCH_STATUS_SUCCESS; + } + } + if (!is_b && globals.prefix_a) { + a_prefix = "a_"; + } + + if (switch_ivr_generate_json_cdr(session, &json_cdr, globals.encode_values == ENCODING_DEFAULT) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error Generating Data!\n"); + return SWITCH_STATUS_FALSE; + } + + cdr_data = malloc(sizeof(cdr_data_t)); + switch_assert(cdr_data); + + json_text = cJSON_PrintUnformatted(json_cdr); + + if (globals.url_count && globals.encode) { + switch_size_t need_bytes = strlen(json_text) * 3; + + json_text_escaped = malloc(need_bytes); + switch_assert(json_text_escaped); + memset(json_text_escaped, 0, need_bytes); + if (globals.encode == ENCODING_DEFAULT) { + switch_url_encode(json_text, json_text_escaped, need_bytes); + } else { + switch_b64_encode((unsigned char *) json_text, need_bytes / 3, (unsigned char *) json_text_escaped, need_bytes); + } + } + + cdr_data->uuid = strdup(switch_core_session_get_uuid(session)); + cdr_data->filename = switch_mprintf("%s%s.cdr.json", a_prefix, cdr_data->uuid); + cdr_data->json_text = json_text; + cdr_data->json_text_escaped = json_text_escaped; + + switch_thread_rwlock_rdlock(globals.log_path_lock); + + if (!(logdir = switch_channel_get_variable(channel, "json_cdr_base"))) { + logdir = globals.log_dir; + } + cdr_data->logdir = switch_safe_strdup(logdir); + + switch_thread_rwlock_unlock(globals.log_path_lock); + + if (globals.queue) { + if (switch_queue_trypush(globals.queue, cdr_data) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Unable to push cdr to queue\n"); + backup_cdr(cdr_data); + destroy_cdr_data(cdr_data); + } + } else { + process_cdr(cdr_data); + } cJSON_Delete(json_cdr); - switch_safe_free(json_text); - return status; + return SWITCH_STATUS_SUCCESS; +} + +static void *SWITCH_THREAD_FUNC cdr_thread(switch_thread_t *t, void *obj) +{ + void *pop = NULL; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Cdr thread started.\n"); + + while (!globals.shutdown) { + cdr_data_t *data = NULL; + + if (switch_queue_pop(globals.queue, &pop) != SWITCH_STATUS_SUCCESS) { + break; + } + + if (!pop) { + break; + } + + data = (cdr_data_t *) pop; + process_cdr(data); + } + + while (switch_queue_trypop(globals.queue, &pop) == SWITCH_STATUS_SUCCESS) { + destroy_cdr_data((cdr_data_t *) pop); + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Cdr thread ended.\n"); + switch_thread_exit(t, SWITCH_STATUS_SUCCESS); + + return NULL; } static void event_handler(switch_event_t *event) @@ -598,8 +680,18 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_json_cdr_load) } } else if (!strcasecmp(var, "encode-values") && !zstr(val)) { globals.encode_values = switch_true(val) ? ENCODING_DEFAULT : ENCODING_NONE; - } + } else if (!strcasecmp(var, "queue-capacity") && !zstr(val)) { + int capacity = atoi(val); + if (capacity > 0) { + switch_threadattr_t *thd_attr; + switch_queue_create(&globals.queue, capacity, globals.pool); + + switch_threadattr_create(&thd_attr, globals.pool); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&globals.thread, thd_attr, cdr_thread, NULL, globals.pool); + } + } } if (!globals.err_dir_count) { @@ -624,12 +716,19 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_json_cdr_load) return status; } + SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_json_cdr_shutdown) { int err_dir_index = 0; + switch_status_t status; globals.shutdown = 1; + if (globals.queue) { + switch_queue_push(globals.queue, NULL); + switch_thread_join(&status, globals.thread); + } + switch_safe_free(globals.log_dir); for (;err_dir_index < globals.err_dir_count; err_dir_index++) {