FS-7083 #resolve #comment this should do it. The problem is linked to side-effects from the read thread being delayed by writing to the file handle. It was so much worse on mp3 because the shout encoder blocks while its churning the data and delays it more. This patch adds a dedicated thread for writing to the file and the channel_variable RECORD_USE_THREAD=false will disable it and sync may still be maintained at the cost of dropping more data from the audio signal.

This commit is contained in:
Anthony Minessale 2014-12-20 00:25:59 -06:00 committed by Ken Rice
parent 41b97031d7
commit b94eb0f7e2
3 changed files with 150 additions and 56 deletions

View File

@ -185,9 +185,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b
uint32_t blen;
switch_codec_implementation_t read_impl = { 0 };
int16_t *tp;
switch_size_t do_read = 0, do_write = 0;
int fill_read = 0, fill_write = 0;
switch_size_t do_read = 0, do_write = 0, has_read = 0, has_write = 0, fill_read = 0, fill_write = 0;
switch_core_session_get_read_impl(bug->session, &read_impl);
@ -213,50 +211,47 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b
frame->datalen = 0;
if (switch_test_flag(bug, SMBF_READ_STREAM)) {
has_read = 1;
switch_mutex_lock(bug->read_mutex);
do_read = switch_buffer_inuse(bug->raw_read_buffer);
switch_mutex_unlock(bug->read_mutex);
}
if (switch_test_flag(bug, SMBF_WRITE_STREAM)) {
has_write = 1;
switch_mutex_lock(bug->write_mutex);
do_write = switch_buffer_inuse(bug->raw_write_buffer);
switch_mutex_unlock(bug->write_mutex);
}
if (bug->record_frame_size && bug->record_pre_buffer_max && (do_read || do_write) && bug->record_pre_buffer_count < bug->record_pre_buffer_max) {
bug->record_pre_buffer_count++;
return SWITCH_STATUS_FALSE;
} else {
uint32_t frame_size;
switch_codec_implementation_t read_impl = { 0 };
//switch_codec_implementation_t other_read_impl = { 0 };
//switch_core_session_t *other_session;
switch_core_session_get_read_impl(bug->session, &read_impl);
frame_size = read_impl.decoded_bytes_per_packet;
bug->record_frame_size = frame_size;
#if 0
if (do_read && do_write) {
if (switch_core_session_get_partner(bug->session, &other_session) == SWITCH_STATUS_SUCCESS) {
switch_core_session_get_read_impl(other_session, &other_read_impl);
switch_core_session_rwunlock(other_session);
if (read_impl.actual_samples_per_second == other_read_impl.actual_samples_per_second) {
if (read_impl.decoded_bytes_per_packet < other_read_impl.decoded_bytes_per_packet) {
frame_size = read_impl.decoded_bytes_per_packet;
}
} else {
if (read_impl.decoded_bytes_per_packet > other_read_impl.decoded_bytes_per_packet) {
frame_size = read_impl.decoded_bytes_per_packet;
}
}
}
bug->record_frame_size = bytes = frame_size;
if (bug->record_frame_size && do_write > do_read && do_write > (bug->record_frame_size * 2)) {
switch_mutex_lock(bug->write_mutex);
switch_buffer_toss(bug->raw_write_buffer, bug->record_frame_size);
do_write = switch_buffer_inuse(bug->raw_write_buffer);
switch_mutex_unlock(bug->write_mutex);
}
#endif
if ((has_read && !do_read)) {
fill_read = 1;
}
if ((has_write && !do_write)) {
fill_write = 1;
}
@ -274,10 +269,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b
}
}
fill_read = !do_read;
fill_write = !do_write;
if ((fill_read && fill_write) || (!fill && fill_read)) {
if ((fill_read && fill_write) || (fill && (fill_read || fill_write))) {
return SWITCH_STATUS_FALSE;
}
@ -384,17 +376,26 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b
frame->rate = read_impl.actual_samples_per_second;
frame->codec = NULL;
if (fill_read && fill_write) {
return SWITCH_STATUS_BREAK;
}
if (fill_read || fill_write) {
return SWITCH_STATUS_BREAK;
if (switch_test_flag(bug, SMBF_STEREO)) {
frame->datalen *= 2;
frame->channels = 2;
}
memcpy(bug->session->recur_buffer, frame->data, frame->datalen);
bug->session->recur_buffer_len = frame->datalen;
if (has_read) {
switch_mutex_lock(bug->read_mutex);
do_read = switch_buffer_inuse(bug->raw_read_buffer);
switch_mutex_unlock(bug->read_mutex);
}
if (has_write) {
switch_mutex_lock(bug->write_mutex);
do_write = switch_buffer_inuse(bug->raw_write_buffer);
switch_mutex_unlock(bug->write_mutex);
}
return SWITCH_STATUS_SUCCESS;
}

View File

@ -1044,6 +1044,10 @@ struct record_helper {
switch_bool_t hangup_on_error;
switch_codec_implementation_t read_impl;
switch_bool_t speech_detected;
switch_buffer_t *thread_buffer;
switch_thread_t *thread;
switch_mutex_t *buffer_mutex;
int thread_ready;
const char *completion_cause;
};
@ -1110,6 +1114,55 @@ static void send_record_stop_event(switch_channel_t *channel, switch_codec_imple
}
}
static void *SWITCH_THREAD_FUNC recording_thread(switch_thread_t *thread, void *obj)
{
switch_media_bug_t *bug = (switch_media_bug_t *) obj;
switch_core_session_t *session = switch_core_media_bug_get_session(bug);
switch_channel_t *channel = switch_core_session_get_channel(session);
struct record_helper *rh;
switch_size_t bsize = SWITCH_RECOMMENDED_BUFFER_SIZE, samples = 0, inuse = 0;
unsigned char *data = switch_core_session_alloc(session, bsize);
int channels = switch_core_media_bug_test_flag(bug, SMBF_STEREO) ? 2 : 1;
if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) {
return NULL;
}
rh = switch_core_media_bug_get_user_data(bug);
switch_buffer_create_dynamic(&rh->thread_buffer, 1024 * 512, 1024 * 64, 0);
rh->thread_ready = 1;
while(switch_test_flag(rh->fh, SWITCH_FILE_OPEN)) {
switch_mutex_lock(rh->buffer_mutex);
inuse = switch_buffer_inuse(rh->thread_buffer);
if (rh->thread_ready && switch_channel_up_nosig(channel) && inuse < bsize) {
switch_mutex_unlock(rh->buffer_mutex);
switch_yield(20000);
continue;
} else if ((!rh->thread_ready || switch_channel_down_nosig(channel)) && !inuse) {
break;
}
samples = switch_buffer_read(rh->thread_buffer, data, bsize) / 2 / channels;
switch_mutex_unlock(rh->buffer_mutex);
if (switch_core_file_write(rh->fh, data, &samples) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error writing %s\n", rh->file);
/* File write failed */
set_completion_cause(rh, "uri-failure");
if (rh->hangup_on_error) {
switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
switch_core_session_reset(session, SWITCH_TRUE, SWITCH_TRUE);
}
}
}
switch_core_session_rwunlock(session);
return NULL;
}
static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, switch_abc_type_t type)
{
switch_core_session_t *session = switch_core_media_bug_get_session(bug);
@ -1123,18 +1176,40 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s
switch (type) {
case SWITCH_ABC_TYPE_INIT:
{
const char *var = switch_channel_get_variable(channel, "RECORD_USE_THREAD");
if (zstr(var) || switch_true(var)) {
switch_threadattr_t *thd_attr = NULL;
switch_memory_pool_t *pool = switch_core_session_get_pool(session);
int sanity = 200;
switch_core_session_get_read_impl(session, &rh->read_impl);
switch_mutex_init(&rh->buffer_mutex, SWITCH_MUTEX_NESTED, pool);
switch_threadattr_create(&thd_attr, pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&rh->thread, thd_attr, recording_thread, bug, pool);
while(--sanity > 0 && !rh->thread_ready) {
switch_yield(10000);
}
}
if (switch_event_create(&event, SWITCH_EVENT_RECORD_START) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Record-File-Path", rh->file);
switch_channel_event_set_data(channel, event);
switch_event_fire(&event);
}
rh->silence_time = switch_micro_time_now();
rh->silence_timeout_ms = rh->initial_timeout_ms;
rh->speech_detected = SWITCH_FALSE;
rh->completion_cause = NULL;
switch_core_session_get_read_impl(session, &rh->read_impl);
}
break;
case SWITCH_ABC_TYPE_TAP_NATIVE_READ:
{
@ -1232,6 +1307,18 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s
uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE];
switch_frame_t frame = { 0 };
if (rh->thread_ready) {
switch_status_t st;
rh->thread_ready = 0;
switch_thread_join(&st, rh->thread);
}
if (rh->thread_buffer) {
switch_buffer_destroy(&rh->thread_buffer);
}
frame.data = data;
frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE;
@ -1313,18 +1400,24 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s
uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE];
switch_frame_t frame = { 0 };
switch_status_t status;
int i = 0;
frame.data = data;
frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE;
for (;;) {
status = switch_core_media_bug_read(bug, &frame, SWITCH_FALSE);
status = switch_core_media_bug_read(bug, &frame, i++ == 0 ? SWITCH_FALSE : SWITCH_TRUE);
if (status == SWITCH_STATUS_SUCCESS || status == SWITCH_STATUS_BREAK) {
if (status != SWITCH_STATUS_SUCCESS || !frame.datalen) {
break;
} else {
len = (switch_size_t) frame.datalen / 2 / frame.channels;
len = (switch_size_t) frame.datalen / 2;
if (len && switch_core_file_write(rh->fh, mask ? null_data : data, &len) != SWITCH_STATUS_SUCCESS) {
if (rh->thread_buffer) {
switch_mutex_lock(rh->buffer_mutex);
switch_buffer_write(rh->thread_buffer, mask ? null_data : data, frame.datalen);
switch_mutex_unlock(rh->buffer_mutex);
} else if (switch_core_file_write(rh->fh, mask ? null_data : data, &len) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error writing %s\n", rh->file);
/* File write failed */
set_completion_cause(rh, "uri-failure");
@ -1377,8 +1470,6 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s
rh->speech_detected = SWITCH_TRUE;
}
}
} else {
break;
}
}
}

View File

@ -1046,6 +1046,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(softtimer_runtime)
tfd = -1;
}
}
if (tfd > -1) MATRIX = 0;
}
#else
tfd = -1;