From b94eb0f7e26e31dd62d5cb49758489a4b47b03e9 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Sat, 20 Dec 2014 00:25:59 -0600 Subject: [PATCH] FS-7083 #resolve #comment this should do it. The problem is linked to side-effects from the read thread being delayed by writing to the file handle. It was so much worse on mp3 because the shout encoder blocks while its churning the data and delays it more. This patch adds a dedicated thread for writing to the file and the channel_variable RECORD_USE_THREAD=false will disable it and sync may still be maintained at the cost of dropping more data from the audio signal. --- src/switch_core_media_bug.c | 73 ++++++++++---------- src/switch_ivr_async.c | 131 ++++++++++++++++++++++++++++++------ src/switch_time.c | 2 + 3 files changed, 150 insertions(+), 56 deletions(-) diff --git a/src/switch_core_media_bug.c b/src/switch_core_media_bug.c index 08f2ee30d7..a8af7d8f97 100644 --- a/src/switch_core_media_bug.c +++ b/src/switch_core_media_bug.c @@ -185,9 +185,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b uint32_t blen; switch_codec_implementation_t read_impl = { 0 }; int16_t *tp; - switch_size_t do_read = 0, do_write = 0; - int fill_read = 0, fill_write = 0; - + switch_size_t do_read = 0, do_write = 0, has_read = 0, has_write = 0, fill_read = 0, fill_write = 0; switch_core_session_get_read_impl(bug->session, &read_impl); @@ -213,50 +211,47 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b frame->datalen = 0; if (switch_test_flag(bug, SMBF_READ_STREAM)) { + has_read = 1; switch_mutex_lock(bug->read_mutex); do_read = switch_buffer_inuse(bug->raw_read_buffer); switch_mutex_unlock(bug->read_mutex); } if (switch_test_flag(bug, SMBF_WRITE_STREAM)) { + has_write = 1; switch_mutex_lock(bug->write_mutex); do_write = switch_buffer_inuse(bug->raw_write_buffer); switch_mutex_unlock(bug->write_mutex); } + if (bug->record_frame_size && bug->record_pre_buffer_max && (do_read || do_write) && bug->record_pre_buffer_count < bug->record_pre_buffer_max) { bug->record_pre_buffer_count++; return SWITCH_STATUS_FALSE; } else { uint32_t frame_size; switch_codec_implementation_t read_impl = { 0 }; - //switch_codec_implementation_t other_read_impl = { 0 }; - //switch_core_session_t *other_session; - + switch_core_session_get_read_impl(bug->session, &read_impl); frame_size = read_impl.decoded_bytes_per_packet; bug->record_frame_size = frame_size; - -#if 0 - if (do_read && do_write) { - if (switch_core_session_get_partner(bug->session, &other_session) == SWITCH_STATUS_SUCCESS) { - switch_core_session_get_read_impl(other_session, &other_read_impl); - switch_core_session_rwunlock(other_session); + } - if (read_impl.actual_samples_per_second == other_read_impl.actual_samples_per_second) { - if (read_impl.decoded_bytes_per_packet < other_read_impl.decoded_bytes_per_packet) { - frame_size = read_impl.decoded_bytes_per_packet; - } - } else { - if (read_impl.decoded_bytes_per_packet > other_read_impl.decoded_bytes_per_packet) { - frame_size = read_impl.decoded_bytes_per_packet; - } - } - } + if (bug->record_frame_size && do_write > do_read && do_write > (bug->record_frame_size * 2)) { + switch_mutex_lock(bug->write_mutex); + switch_buffer_toss(bug->raw_write_buffer, bug->record_frame_size); + do_write = switch_buffer_inuse(bug->raw_write_buffer); + switch_mutex_unlock(bug->write_mutex); + } - bug->record_frame_size = bytes = frame_size; - } -#endif + + + if ((has_read && !do_read)) { + fill_read = 1; + } + + if ((has_write && !do_write)) { + fill_write = 1; } @@ -274,10 +269,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b } } - fill_read = !do_read; - fill_write = !do_write; - - if ((fill_read && fill_write) || (!fill && fill_read)) { + if ((fill_read && fill_write) || (fill && (fill_read || fill_write))) { return SWITCH_STATUS_FALSE; } @@ -384,17 +376,26 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b frame->rate = read_impl.actual_samples_per_second; frame->codec = NULL; - if (fill_read && fill_write) { - return SWITCH_STATUS_BREAK; - } - - if (fill_read || fill_write) { - return SWITCH_STATUS_BREAK; + if (switch_test_flag(bug, SMBF_STEREO)) { + frame->datalen *= 2; + frame->channels = 2; } memcpy(bug->session->recur_buffer, frame->data, frame->datalen); bug->session->recur_buffer_len = frame->datalen; - + + if (has_read) { + switch_mutex_lock(bug->read_mutex); + do_read = switch_buffer_inuse(bug->raw_read_buffer); + switch_mutex_unlock(bug->read_mutex); + } + + if (has_write) { + switch_mutex_lock(bug->write_mutex); + do_write = switch_buffer_inuse(bug->raw_write_buffer); + switch_mutex_unlock(bug->write_mutex); + } + return SWITCH_STATUS_SUCCESS; } diff --git a/src/switch_ivr_async.c b/src/switch_ivr_async.c index 25ec4d7285..21180d789f 100644 --- a/src/switch_ivr_async.c +++ b/src/switch_ivr_async.c @@ -1044,6 +1044,10 @@ struct record_helper { switch_bool_t hangup_on_error; switch_codec_implementation_t read_impl; switch_bool_t speech_detected; + switch_buffer_t *thread_buffer; + switch_thread_t *thread; + switch_mutex_t *buffer_mutex; + int thread_ready; const char *completion_cause; }; @@ -1110,6 +1114,55 @@ static void send_record_stop_event(switch_channel_t *channel, switch_codec_imple } } +static void *SWITCH_THREAD_FUNC recording_thread(switch_thread_t *thread, void *obj) +{ + switch_media_bug_t *bug = (switch_media_bug_t *) obj; + switch_core_session_t *session = switch_core_media_bug_get_session(bug); + switch_channel_t *channel = switch_core_session_get_channel(session); + struct record_helper *rh; + switch_size_t bsize = SWITCH_RECOMMENDED_BUFFER_SIZE, samples = 0, inuse = 0; + unsigned char *data = switch_core_session_alloc(session, bsize); + int channels = switch_core_media_bug_test_flag(bug, SMBF_STEREO) ? 2 : 1; + + if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) { + return NULL; + } + + rh = switch_core_media_bug_get_user_data(bug); + switch_buffer_create_dynamic(&rh->thread_buffer, 1024 * 512, 1024 * 64, 0); + rh->thread_ready = 1; + + while(switch_test_flag(rh->fh, SWITCH_FILE_OPEN)) { + switch_mutex_lock(rh->buffer_mutex); + inuse = switch_buffer_inuse(rh->thread_buffer); + + if (rh->thread_ready && switch_channel_up_nosig(channel) && inuse < bsize) { + switch_mutex_unlock(rh->buffer_mutex); + switch_yield(20000); + continue; + } else if ((!rh->thread_ready || switch_channel_down_nosig(channel)) && !inuse) { + break; + } + + samples = switch_buffer_read(rh->thread_buffer, data, bsize) / 2 / channels; + switch_mutex_unlock(rh->buffer_mutex); + + if (switch_core_file_write(rh->fh, data, &samples) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error writing %s\n", rh->file); + /* File write failed */ + set_completion_cause(rh, "uri-failure"); + if (rh->hangup_on_error) { + switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER); + switch_core_session_reset(session, SWITCH_TRUE, SWITCH_TRUE); + } + } + } + + switch_core_session_rwunlock(session); + + return NULL; +} + static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, switch_abc_type_t type) { switch_core_session_t *session = switch_core_media_bug_get_session(bug); @@ -1123,18 +1176,40 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s switch (type) { case SWITCH_ABC_TYPE_INIT: - if (switch_event_create(&event, SWITCH_EVENT_RECORD_START) == SWITCH_STATUS_SUCCESS) { - switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Record-File-Path", rh->file); - switch_channel_event_set_data(channel, event); - switch_event_fire(&event); + { + const char *var = switch_channel_get_variable(channel, "RECORD_USE_THREAD"); + + if (zstr(var) || switch_true(var)) { + switch_threadattr_t *thd_attr = NULL; + switch_memory_pool_t *pool = switch_core_session_get_pool(session); + int sanity = 200; + + + switch_core_session_get_read_impl(session, &rh->read_impl); + switch_mutex_init(&rh->buffer_mutex, SWITCH_MUTEX_NESTED, pool); + switch_threadattr_create(&thd_attr, pool); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&rh->thread, thd_attr, recording_thread, bug, pool); + + while(--sanity > 0 && !rh->thread_ready) { + switch_yield(10000); + } + } + + + if (switch_event_create(&event, SWITCH_EVENT_RECORD_START) == SWITCH_STATUS_SUCCESS) { + switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Record-File-Path", rh->file); + switch_channel_event_set_data(channel, event); + switch_event_fire(&event); + } + + rh->silence_time = switch_micro_time_now(); + rh->silence_timeout_ms = rh->initial_timeout_ms; + rh->speech_detected = SWITCH_FALSE; + rh->completion_cause = NULL; + + switch_core_session_get_read_impl(session, &rh->read_impl); } - rh->silence_time = switch_micro_time_now(); - rh->silence_timeout_ms = rh->initial_timeout_ms; - rh->speech_detected = SWITCH_FALSE; - rh->completion_cause = NULL; - - switch_core_session_get_read_impl(session, &rh->read_impl); - break; case SWITCH_ABC_TYPE_TAP_NATIVE_READ: { @@ -1232,6 +1307,18 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE]; switch_frame_t frame = { 0 }; + if (rh->thread_ready) { + switch_status_t st; + + rh->thread_ready = 0; + switch_thread_join(&st, rh->thread); + } + + if (rh->thread_buffer) { + switch_buffer_destroy(&rh->thread_buffer); + } + + frame.data = data; frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE; @@ -1313,18 +1400,24 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE]; switch_frame_t frame = { 0 }; switch_status_t status; + int i = 0; frame.data = data; frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE; for (;;) { - status = switch_core_media_bug_read(bug, &frame, SWITCH_FALSE); + status = switch_core_media_bug_read(bug, &frame, i++ == 0 ? SWITCH_FALSE : SWITCH_TRUE); - if (status == SWITCH_STATUS_SUCCESS || status == SWITCH_STATUS_BREAK) { - - len = (switch_size_t) frame.datalen / 2; - - if (len && switch_core_file_write(rh->fh, mask ? null_data : data, &len) != SWITCH_STATUS_SUCCESS) { + if (status != SWITCH_STATUS_SUCCESS || !frame.datalen) { + break; + } else { + len = (switch_size_t) frame.datalen / 2 / frame.channels; + + if (rh->thread_buffer) { + switch_mutex_lock(rh->buffer_mutex); + switch_buffer_write(rh->thread_buffer, mask ? null_data : data, frame.datalen); + switch_mutex_unlock(rh->buffer_mutex); + } else if (switch_core_file_write(rh->fh, mask ? null_data : data, &len) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error writing %s\n", rh->file); /* File write failed */ set_completion_cause(rh, "uri-failure"); @@ -1377,8 +1470,6 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s rh->speech_detected = SWITCH_TRUE; } } - } else { - break; } } } @@ -2269,7 +2360,7 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_record_session(switch_core_session_t } rh->hangup_on_error = hangup_on_error; - + if ((status = switch_core_media_bug_add(session, "session_record", file, record_callback, rh, to, flags, &bug)) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error adding media bug for file %s\n", file); diff --git a/src/switch_time.c b/src/switch_time.c index f710105031..c226b90592 100644 --- a/src/switch_time.c +++ b/src/switch_time.c @@ -1046,6 +1046,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(softtimer_runtime) tfd = -1; } } + + if (tfd > -1) MATRIX = 0; } #else tfd = -1;