diff --git a/src/include/switch_types.h b/src/include/switch_types.h index b7d4b0dd88..3d924cbfc6 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -1861,7 +1861,8 @@ typedef enum { SWITCH_IO_FLAG_NONE = 0, SWITCH_IO_FLAG_NOBLOCK = (1 << 0), SWITCH_IO_FLAG_SINGLE_READ = (1 << 1), - SWITCH_IO_FLAG_FORCE = (1 << 2) + SWITCH_IO_FLAG_FORCE = (1 << 2), + SWITCH_IO_FLAG_QUEUED = (1 << 3) } switch_io_flag_enum_t; typedef uint32_t switch_io_flag_t; diff --git a/src/include/switch_utils.h b/src/include/switch_utils.h index 5b3285a43d..2692cdf4d3 100644 --- a/src/include/switch_utils.h +++ b/src/include/switch_utils.h @@ -1329,7 +1329,11 @@ SWITCH_DECLARE(void) switch_http_parse_qs(switch_http_request_t *request, char * SWITCH_DECLARE(switch_status_t) switch_frame_buffer_free(switch_frame_buffer_t *fb, switch_frame_t **frameP); SWITCH_DECLARE(switch_status_t) switch_frame_buffer_dup(switch_frame_buffer_t *fb, switch_frame_t *orig, switch_frame_t **clone); SWITCH_DECLARE(switch_status_t) switch_frame_buffer_destroy(switch_frame_buffer_t **fbP); -SWITCH_DECLARE(switch_status_t) switch_frame_buffer_create(switch_frame_buffer_t **fbP); +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_create(switch_frame_buffer_t **fbP, switch_size_t qlen); +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_push(switch_frame_buffer_t *fb, void *ptr); +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_trypush(switch_frame_buffer_t *fb, void *ptr); +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_pop(switch_frame_buffer_t *fb, void **ptr); +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_trypop(switch_frame_buffer_t *fb, void **ptr); typedef struct { int64_t userms; diff --git a/src/mod/applications/mod_conference/conference_video.c b/src/mod/applications/mod_conference/conference_video.c index c3545b9b7a..7ae512cb31 100644 --- a/src/mod/applications/mod_conference/conference_video.c +++ b/src/mod/applications/mod_conference/conference_video.c @@ -1326,7 +1326,7 @@ void conference_video_write_canvas_image_to_codec_group(conference_obj_t *confer switch_set_flag(frame, SFF_ENCODED); if (switch_frame_buffer_dup(imember->fb, frame, &dupframe) == SWITCH_STATUS_SUCCESS) { - if (switch_queue_trypush(imember->mux_out_queue, dupframe) != SWITCH_STATUS_SUCCESS) { + if (switch_frame_buffer_trypush(imember->fb, dupframe) != SWITCH_STATUS_SUCCESS) { switch_frame_buffer_free(imember->fb, &dupframe); } dupframe = NULL; @@ -1529,9 +1529,9 @@ void *SWITCH_THREAD_FUNC conference_video_muxing_write_thread_run(switch_thread_ while(conference_utils_member_test_flag(member, MFLAG_RUNNING)) { if (patched) { - pop_status = switch_queue_trypop(member->mux_out_queue, &pop); + pop_status = switch_frame_buffer_trypop(member->fb, &pop); } else { - pop_status = switch_queue_pop(member->mux_out_queue, &pop); + pop_status = switch_frame_buffer_pop(member->fb, &pop); } if (pop_status == SWITCH_STATUS_SUCCESS) { @@ -1599,7 +1599,7 @@ void *SWITCH_THREAD_FUNC conference_video_muxing_write_thread_run(switch_thread_ } } - while (switch_queue_trypop(member->mux_out_queue, &pop) == SWITCH_STATUS_SUCCESS) { + while (switch_frame_buffer_trypop(member->fb, &pop) == SWITCH_STATUS_SUCCESS) { if (pop) { if ((switch_size_t)pop != 1) { frame = (switch_frame_t *) pop; @@ -2198,7 +2198,7 @@ static void wait_for_canvas(mcu_canvas_t *canvas) if (layer->need_patch) { if (layer->member) { - switch_queue_trypush(layer->member->mux_out_queue, (void *) 1); + switch_frame_buffer_trypush(layer->member->fb, (void *) 1); x++; } else { layer->need_patch = 0; @@ -2949,7 +2949,9 @@ void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread_t *thr write_frame.packetlen = 0; if (switch_frame_buffer_dup(imember->fb, &write_frame, &dupframe) == SWITCH_STATUS_SUCCESS) { - switch_queue_push(imember->mux_out_queue, dupframe); + if (switch_frame_buffer_trypush(imember->fb, dupframe) != SWITCH_STATUS_SUCCESS) { + switch_frame_buffer_free(imember->fb, &dupframe); + } dupframe = NULL; } } @@ -3139,7 +3141,7 @@ void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread_t *thr //switch_core_session_write_video_frame(imember->session, &write_frame, SWITCH_IO_FLAG_NONE, 0); if (switch_frame_buffer_dup(imember->fb, &write_frame, &dupframe) == SWITCH_STATUS_SUCCESS) { - if (switch_queue_trypush(imember->mux_out_queue, dupframe) != SWITCH_STATUS_SUCCESS) { + if (switch_frame_buffer_trypush(imember->fb, dupframe) != SWITCH_STATUS_SUCCESS) { switch_frame_buffer_free(imember->fb, &dupframe); } dupframe = NULL; @@ -3487,7 +3489,7 @@ void *SWITCH_THREAD_FUNC conference_video_super_muxing_thread_run(switch_thread_ //switch_core_session_write_video_frame(imember->session, &write_frame, SWITCH_IO_FLAG_NONE, 0); if (switch_frame_buffer_dup(imember->fb, &write_frame, &dupframe) == SWITCH_STATUS_SUCCESS) { - if (switch_queue_trypush(imember->mux_out_queue, dupframe) != SWITCH_STATUS_SUCCESS) { + if (switch_frame_buffer_trypush(imember->fb, dupframe) != SWITCH_STATUS_SUCCESS) { switch_frame_buffer_free(imember->fb, &dupframe); } dupframe = NULL; diff --git a/src/mod/applications/mod_conference/mod_conference.c b/src/mod/applications/mod_conference/mod_conference.c index 1a3e17a767..f6afcd0fb7 100644 --- a/src/mod/applications/mod_conference/mod_conference.c +++ b/src/mod/applications/mod_conference/mod_conference.c @@ -2297,8 +2297,7 @@ SWITCH_STANDARD_APP(conference_function) if (conference->conference_video_mode == CONF_VIDEO_MODE_MUX) { switch_queue_create(&member.video_queue, 200, member.pool); - switch_queue_create(&member.mux_out_queue, 500, member.pool); - switch_frame_buffer_create(&member.fb); + switch_frame_buffer_create(&member.fb, 500); } /* Add the caller to the conference */ @@ -2349,7 +2348,7 @@ SWITCH_STANDARD_APP(conference_function) if (member.video_muxing_write_thread) { switch_status_t st = SWITCH_STATUS_SUCCESS; - switch_queue_push(member.mux_out_queue, NULL); + switch_frame_buffer_push(member.fb, NULL); switch_thread_join(&st, member.video_muxing_write_thread); member.video_muxing_write_thread = NULL; } diff --git a/src/mod/applications/mod_conference/mod_conference.h b/src/mod/applications/mod_conference/mod_conference.h index cc8d36b4fd..a042cc551d 100644 --- a/src/mod/applications/mod_conference/mod_conference.h +++ b/src/mod/applications/mod_conference/mod_conference.h @@ -750,7 +750,6 @@ struct conference_member { char *kicked_sound; switch_queue_t *dtmf_queue; switch_queue_t *video_queue; - switch_queue_t *mux_out_queue; switch_thread_t *video_muxing_write_thread; switch_thread_t *input_thread; cJSON *json; diff --git a/src/switch_core_io.c b/src/switch_core_io.c index 3532ea2141..f91f89f3fe 100644 --- a/src/switch_core_io.c +++ b/src/switch_core_io.c @@ -945,708 +945,6 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi return status; } -static switch_status_t perform_write(switch_core_session_t *session, switch_frame_t *frame, switch_io_flag_t flags, int stream_id) -{ - switch_io_event_hook_write_frame_t *ptr; - switch_status_t status = SWITCH_STATUS_FALSE; - - - if (session->bugs && !(frame->flags & SFF_NOT_AUDIO)) { - switch_media_bug_t *bp; - switch_bool_t ok = SWITCH_TRUE; - int prune = 0; - - switch_thread_rwlock_rdlock(session->bug_rwlock); - - for (bp = session->bugs; bp; bp = bp->next) { - ok = SWITCH_TRUE; - - if (switch_channel_test_flag(session->channel, CF_PAUSE_BUGS) && !switch_core_media_bug_test_flag(bp, SMBF_NO_PAUSE)) { - continue; - } - - if (!switch_channel_test_flag(session->channel, CF_ANSWERED) && switch_core_media_bug_test_flag(bp, SMBF_ANSWER_REQ)) { - continue; - } - if (switch_test_flag(bp, SMBF_PRUNE)) { - prune++; - continue; - } - - if (bp->ready) { - if (switch_test_flag(bp, SMBF_TAP_NATIVE_WRITE)) { - if (bp->callback) { - bp->native_write_frame = frame; - ok = bp->callback(bp, bp->user_data, SWITCH_ABC_TYPE_TAP_NATIVE_WRITE); - bp->native_write_frame = NULL; - } - } - } - - if ((bp->stop_time && bp->stop_time <= switch_epoch_time_now(NULL)) || ok == SWITCH_FALSE) { - switch_set_flag(bp, SMBF_PRUNE); - prune++; - } - } - switch_thread_rwlock_unlock(session->bug_rwlock); - - if (prune) { - switch_core_media_bug_prune(session); - } - } - - - if (session->endpoint_interface->io_routines->write_frame) { - if ((status = session->endpoint_interface->io_routines->write_frame(session, frame, flags, stream_id)) == SWITCH_STATUS_SUCCESS) { - for (ptr = session->event_hooks.write_frame; ptr; ptr = ptr->next) { - if ((status = ptr->write_frame(session, frame, flags, stream_id)) != SWITCH_STATUS_SUCCESS) { - break; - } - } - } - } - - return status; -} - -SWITCH_DECLARE(switch_status_t) switch_core_session_write_frame(switch_core_session_t *session, switch_frame_t *frame, switch_io_flag_t flags, - int stream_id) -{ - - switch_status_t status = SWITCH_STATUS_FALSE; - switch_frame_t *enc_frame = NULL, *write_frame = frame; - unsigned int flag = 0, need_codec = 0, perfect = 0, do_bugs = 0, do_write = 0, do_resample = 0, ptime_mismatch = 0, pass_cng = 0, resample = 0; - int did_write_resample = 0; - - switch_assert(session != NULL); - switch_assert(frame != NULL); - - if (!switch_channel_ready(session->channel)) { - return SWITCH_STATUS_FALSE; - } - - if (switch_mutex_trylock(session->codec_write_mutex) == SWITCH_STATUS_SUCCESS) { - switch_mutex_unlock(session->codec_write_mutex); - } else { - return SWITCH_STATUS_SUCCESS; - } - - if (switch_test_flag(frame, SFF_CNG)) { - if (switch_channel_test_flag(session->channel, CF_ACCEPT_CNG)) { - pass_cng = 1; - } else { - return SWITCH_STATUS_SUCCESS; - } - } - - if (switch_channel_test_flag(session->channel, CF_AUDIO_PAUSE_WRITE)) { - return SWITCH_STATUS_SUCCESS; - } - - if (!(session->write_codec && switch_core_codec_ready(session->write_codec)) && !pass_cng) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s has no write codec.\n", switch_channel_get_name(session->channel)); - switch_channel_hangup(session->channel, SWITCH_CAUSE_INCOMPATIBLE_DESTINATION); - return SWITCH_STATUS_FALSE; - } - - if (switch_channel_test_flag(session->channel, CF_HOLD)) { - return SWITCH_STATUS_SUCCESS; - } - - if (switch_test_flag(frame, SFF_PROXY_PACKET) || pass_cng) { - /* Fast PASS! */ - switch_mutex_lock(session->codec_write_mutex); - status = perform_write(session, frame, flag, stream_id); - switch_mutex_unlock(session->codec_write_mutex); - return status; - } - - switch_mutex_lock(session->codec_write_mutex); - - if (!(frame->codec && frame->codec->implementation)) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s has received a bad frame with no codec!\n", - switch_channel_get_name(session->channel)); - switch_channel_hangup(session->channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER); - switch_mutex_unlock(session->codec_write_mutex); - return SWITCH_STATUS_FALSE; - } - - switch_assert(frame->codec != NULL); - switch_assert(frame->codec->implementation != NULL); - - if (!(switch_core_codec_ready(session->write_codec) && frame->codec) || - !switch_channel_ready(session->channel) || !switch_channel_media_ready(session->channel)) { - switch_mutex_unlock(session->codec_write_mutex); - return SWITCH_STATUS_FALSE; - } - - switch_mutex_lock(session->write_codec->mutex); - switch_mutex_lock(frame->codec->mutex); - - if (!(switch_core_codec_ready(session->write_codec) && switch_core_codec_ready(frame->codec))) goto error; - - if ((session->write_codec && frame->codec && session->write_codec->implementation != frame->codec->implementation)) { - if (session->write_impl.codec_id == frame->codec->implementation->codec_id || - session->write_impl.microseconds_per_packet != frame->codec->implementation->microseconds_per_packet) { - ptime_mismatch = TRUE; - if ((switch_test_flag(frame->codec, SWITCH_CODEC_FLAG_PASSTHROUGH) || switch_test_flag(session->read_codec, SWITCH_CODEC_FLAG_PASSTHROUGH)) || - switch_channel_test_flag(session->channel, CF_PASSTHRU_PTIME_MISMATCH)) { - status = perform_write(session, frame, flags, stream_id); - goto error; - } - } - need_codec = TRUE; - } - - if (session->write_codec && !frame->codec) { - need_codec = TRUE; - } - - if (session->bugs && !need_codec && !switch_test_flag(session, SSF_MEDIA_BUG_TAP_ONLY)) { - do_bugs = TRUE; - need_codec = TRUE; - } - - if (frame->codec->implementation->actual_samples_per_second != session->write_impl.actual_samples_per_second) { - need_codec = TRUE; - do_resample = TRUE; - } - - - if ((frame->flags & SFF_NOT_AUDIO)) { - do_resample = 0; - do_bugs = 0; - need_codec = 0; - } - - if (switch_test_flag(session, SSF_WRITE_TRANSCODE) && !need_codec && switch_core_codec_ready(session->write_codec)) { - switch_core_session_t *other_session; - const char *uuid = switch_channel_get_partner_uuid(switch_core_session_get_channel(session)); - - if (uuid && (other_session = switch_core_session_locate(uuid))) { - switch_set_flag(other_session, SSF_READ_CODEC_RESET); - switch_set_flag(other_session, SSF_READ_CODEC_RESET); - switch_set_flag(other_session, SSF_WRITE_CODEC_RESET); - switch_core_session_rwunlock(other_session); - } - - switch_clear_flag(session, SSF_WRITE_TRANSCODE); - } - - - if (switch_test_flag(session, SSF_WRITE_CODEC_RESET)) { - switch_core_codec_reset(session->write_codec); - switch_clear_flag(session, SSF_WRITE_CODEC_RESET); - } - - if (!need_codec) { - do_write = TRUE; - write_frame = frame; - goto done; - } - - if (!switch_test_flag(session, SSF_WARN_TRANSCODE)) { - switch_core_session_message_t msg = { 0 }; - - msg.message_id = SWITCH_MESSAGE_INDICATE_TRANSCODING_NECESSARY; - switch_core_session_receive_message(session, &msg); - switch_set_flag(session, SSF_WARN_TRANSCODE); - } - - if (frame->codec) { - session->raw_write_frame.datalen = session->raw_write_frame.buflen; - frame->codec->cur_frame = frame; - session->write_codec->cur_frame = frame; - status = switch_core_codec_decode(frame->codec, - session->write_codec, - frame->data, - frame->datalen, - session->write_impl.actual_samples_per_second, - session->raw_write_frame.data, &session->raw_write_frame.datalen, &session->raw_write_frame.rate, &frame->flags); - frame->codec->cur_frame = NULL; - session->write_codec->cur_frame = NULL; - if (do_resample && status == SWITCH_STATUS_SUCCESS) { - status = SWITCH_STATUS_RESAMPLE; - } - - /* mux or demux to match */ - if (session->write_impl.number_of_channels != frame->codec->implementation->number_of_channels) { - uint32_t rlen = session->raw_write_frame.datalen / 2 / frame->codec->implementation->number_of_channels; - switch_mux_channels((int16_t *) session->raw_write_frame.data, rlen, - frame->codec->implementation->number_of_channels, session->write_impl.number_of_channels); - session->raw_write_frame.datalen = rlen * 2 * session->write_impl.number_of_channels; - } - - switch (status) { - case SWITCH_STATUS_RESAMPLE: - resample++; - write_frame = &session->raw_write_frame; - write_frame->rate = frame->codec->implementation->actual_samples_per_second; - if (!session->write_resampler) { - switch_mutex_lock(session->resample_mutex); - status = switch_resample_create(&session->write_resampler, - frame->codec->implementation->actual_samples_per_second, - session->write_impl.actual_samples_per_second, - session->write_impl.decoded_bytes_per_packet, SWITCH_RESAMPLE_QUALITY, session->write_impl.number_of_channels); - - - switch_mutex_unlock(session->resample_mutex); - if (status != SWITCH_STATUS_SUCCESS) { - goto done; - } else { - switch_core_session_message_t msg = { 0 }; - msg.numeric_arg = 1; - msg.message_id = SWITCH_MESSAGE_RESAMPLE_EVENT; - switch_core_session_receive_message(session, &msg); - - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "Activating write resampler\n"); - } - } - break; - case SWITCH_STATUS_SUCCESS: - session->raw_write_frame.samples = session->raw_write_frame.datalen / sizeof(int16_t) / session->write_impl.number_of_channels; - session->raw_write_frame.channels = session->write_impl.number_of_channels; - session->raw_write_frame.timestamp = frame->timestamp; - session->raw_write_frame.rate = frame->rate; - session->raw_write_frame.m = frame->m; - session->raw_write_frame.ssrc = frame->ssrc; - session->raw_write_frame.seq = frame->seq; - session->raw_write_frame.payload = frame->payload; - session->raw_write_frame.flags = 0; - if (switch_test_flag(frame, SFF_PLC)) { - session->raw_write_frame.flags |= SFF_PLC; - } - - write_frame = &session->raw_write_frame; - break; - case SWITCH_STATUS_BREAK: - status = SWITCH_STATUS_SUCCESS; - goto error; - case SWITCH_STATUS_NOOP: - if (session->write_resampler) { - switch_mutex_lock(session->resample_mutex); - switch_resample_destroy(&session->write_resampler); - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "Deactivating write resampler\n"); - switch_mutex_unlock(session->resample_mutex); - - { - switch_core_session_message_t msg = { 0 }; - msg.numeric_arg = 0; - msg.message_id = SWITCH_MESSAGE_RESAMPLE_EVENT; - switch_core_session_receive_message(session, &msg); - } - - } - write_frame = frame; - status = SWITCH_STATUS_SUCCESS; - break; - default: - - if (status == SWITCH_STATUS_NOT_INITALIZED) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Codec init error!\n"); - goto error; - } - if (ptime_mismatch && status != SWITCH_STATUS_GENERR) { - status = perform_write(session, frame, flags, stream_id); - status = SWITCH_STATUS_SUCCESS; - goto error; - } - - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Codec %s decoder error!\n", - frame->codec->codec_interface->interface_name); - goto error; - } - } - - - - if (session->write_resampler) { - short *data = write_frame->data; - - switch_mutex_lock(session->resample_mutex); - if (session->write_resampler) { - - if (switch_resample_calc_buffer_size(session->write_resampler->to_rate, session->write_resampler->from_rate, - write_frame->datalen / 2 / session->write_resampler->channels) > SWITCH_RECOMMENDED_BUFFER_SIZE) { - - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "%s not enough buffer space for required resample operation!\n", - switch_channel_get_name(session->channel)); - switch_channel_hangup(session->channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER); - switch_mutex_unlock(session->resample_mutex); - goto error; - } - - - switch_resample_process(session->write_resampler, data, write_frame->datalen / 2 / session->write_resampler->channels); - - memcpy(data, session->write_resampler->to, session->write_resampler->to_len * 2 * session->write_resampler->channels); - - write_frame->samples = session->write_resampler->to_len; - write_frame->channels = session->write_resampler->channels; - write_frame->datalen = write_frame->samples * 2 * session->write_resampler->channels; - - write_frame->rate = session->write_resampler->to_rate; - - did_write_resample = 1; - } - switch_mutex_unlock(session->resample_mutex); - } - - - - if (session->bugs) { - switch_media_bug_t *bp; - int prune = 0; - - switch_thread_rwlock_rdlock(session->bug_rwlock); - for (bp = session->bugs; bp; bp = bp->next) { - switch_bool_t ok = SWITCH_TRUE; - - if (!bp->ready) { - continue; - } - - if (switch_channel_test_flag(session->channel, CF_PAUSE_BUGS) && !switch_core_media_bug_test_flag(bp, SMBF_NO_PAUSE)) { - continue; - } - - if (!switch_channel_test_flag(session->channel, CF_ANSWERED) && switch_core_media_bug_test_flag(bp, SMBF_ANSWER_REQ)) { - continue; - } - - if (switch_test_flag(bp, SMBF_PRUNE)) { - prune++; - continue; - } - - if (switch_test_flag(bp, SMBF_WRITE_STREAM)) { - switch_mutex_lock(bp->write_mutex); - switch_buffer_write(bp->raw_write_buffer, write_frame->data, write_frame->datalen); - switch_mutex_unlock(bp->write_mutex); - - if (bp->callback) { - ok = bp->callback(bp, bp->user_data, SWITCH_ABC_TYPE_WRITE); - } - } - - if (switch_test_flag(bp, SMBF_WRITE_REPLACE)) { - do_bugs = 0; - if (bp->callback) { - bp->write_replace_frame_in = write_frame; - bp->write_replace_frame_out = write_frame; - if ((ok = bp->callback(bp, bp->user_data, SWITCH_ABC_TYPE_WRITE_REPLACE)) == SWITCH_TRUE) { - write_frame = bp->write_replace_frame_out; - } - } - } - - if (bp->stop_time && bp->stop_time <= switch_epoch_time_now(NULL)) { - ok = SWITCH_FALSE; - } - - - if (ok == SWITCH_FALSE) { - switch_set_flag(bp, SMBF_PRUNE); - prune++; - } - } - switch_thread_rwlock_unlock(session->bug_rwlock); - if (prune) { - switch_core_media_bug_prune(session); - } - } - - if (do_bugs) { - do_write = TRUE; - write_frame = frame; - goto done; - } - - if (session->write_codec) { - if (!ptime_mismatch && write_frame->codec && write_frame->codec->implementation && - write_frame->codec->implementation->decoded_bytes_per_packet == session->write_impl.decoded_bytes_per_packet) { - perfect = TRUE; - } - - - - if (perfect) { - - if (write_frame->datalen < session->write_impl.decoded_bytes_per_packet) { - memset(write_frame->data, 255, session->write_impl.decoded_bytes_per_packet - write_frame->datalen); - write_frame->datalen = session->write_impl.decoded_bytes_per_packet; - } - - enc_frame = write_frame; - session->enc_write_frame.datalen = session->enc_write_frame.buflen; - session->write_codec->cur_frame = frame; - frame->codec->cur_frame = frame; - switch_assert(enc_frame->datalen <= SWITCH_RECOMMENDED_BUFFER_SIZE); - switch_assert(session->enc_read_frame.datalen <= SWITCH_RECOMMENDED_BUFFER_SIZE); - status = switch_core_codec_encode(session->write_codec, - frame->codec, - enc_frame->data, - enc_frame->datalen, - session->write_impl.actual_samples_per_second, - session->enc_write_frame.data, &session->enc_write_frame.datalen, &session->enc_write_frame.rate, &flag); - - switch_assert(session->enc_read_frame.datalen <= SWITCH_RECOMMENDED_BUFFER_SIZE); - - session->write_codec->cur_frame = NULL; - frame->codec->cur_frame = NULL; - switch (status) { - case SWITCH_STATUS_RESAMPLE: - resample++; - /* switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Fixme 2\n"); */ - case SWITCH_STATUS_SUCCESS: - session->enc_write_frame.codec = session->write_codec; - session->enc_write_frame.samples = enc_frame->datalen / sizeof(int16_t) / session->write_impl.number_of_channels; - session->enc_write_frame.channels = session->write_impl.number_of_channels; - if (frame->codec->implementation->samples_per_packet != session->write_impl.samples_per_packet) { - session->enc_write_frame.timestamp = 0; - } else { - session->enc_write_frame.timestamp = frame->timestamp; - } - session->enc_write_frame.payload = session->write_impl.ianacode; - session->enc_write_frame.m = frame->m; - session->enc_write_frame.ssrc = frame->ssrc; - session->enc_write_frame.seq = frame->seq; - session->enc_write_frame.flags = 0; - write_frame = &session->enc_write_frame; - break; - case SWITCH_STATUS_NOOP: - enc_frame->codec = session->write_codec; - enc_frame->samples = enc_frame->datalen / sizeof(int16_t) / session->write_impl.number_of_channels; - enc_frame->channels = session->write_impl.number_of_channels; - enc_frame->timestamp = frame->timestamp; - enc_frame->m = frame->m; - enc_frame->seq = frame->seq; - enc_frame->ssrc = frame->ssrc; - enc_frame->payload = enc_frame->codec->implementation->ianacode; - write_frame = enc_frame; - status = SWITCH_STATUS_SUCCESS; - break; - case SWITCH_STATUS_NOT_INITALIZED: - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Codec init error!\n"); - write_frame = NULL; - goto error; - default: - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Codec %s encoder error!\n", - session->read_codec->codec_interface->interface_name); - write_frame = NULL; - goto error; - } - if (flag & SFF_CNG) { - switch_set_flag(write_frame, SFF_CNG); - } - - status = perform_write(session, write_frame, flags, stream_id); - goto error; - } else { - if (!session->raw_write_buffer) { - switch_size_t bytes_per_packet = session->write_impl.decoded_bytes_per_packet; - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, - "Engaging Write Buffer at %u bytes to accommodate %u->%u\n", - (uint32_t) bytes_per_packet, write_frame->datalen, session->write_impl.decoded_bytes_per_packet); - if ((status = switch_buffer_create_dynamic(&session->raw_write_buffer, - bytes_per_packet * SWITCH_BUFFER_BLOCK_FRAMES, - bytes_per_packet * SWITCH_BUFFER_START_FRAMES, 0)) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Write Buffer Failed!\n"); - goto error; - } - - /* Need to retrain the recording data */ - switch_core_media_bug_flush_all(session); - } - - if (!(switch_buffer_write(session->raw_write_buffer, write_frame->data, write_frame->datalen))) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Write Buffer %u bytes Failed!\n", write_frame->datalen); - status = SWITCH_STATUS_MEMERR; - goto error; - } - - status = SWITCH_STATUS_SUCCESS; - - while (switch_buffer_inuse(session->raw_write_buffer) >= session->write_impl.decoded_bytes_per_packet) { - int rate; - - if (switch_channel_down(session->channel) || !session->raw_write_buffer) { - goto error; - } - if ((session->raw_write_frame.datalen = (uint32_t) - switch_buffer_read(session->raw_write_buffer, session->raw_write_frame.data, session->write_impl.decoded_bytes_per_packet)) == 0) { - goto error; - } - - enc_frame = &session->raw_write_frame; - session->raw_write_frame.rate = session->write_impl.actual_samples_per_second; - session->enc_write_frame.datalen = session->enc_write_frame.buflen; - session->enc_write_frame.timestamp = 0; - - - if (frame->codec && frame->codec->implementation && switch_core_codec_ready(frame->codec)) { - rate = frame->codec->implementation->actual_samples_per_second; - } else { - rate = session->write_impl.actual_samples_per_second; - } - - session->write_codec->cur_frame = frame; - frame->codec->cur_frame = frame; - switch_assert(enc_frame->datalen <= SWITCH_RECOMMENDED_BUFFER_SIZE); - switch_assert(session->enc_read_frame.datalen <= SWITCH_RECOMMENDED_BUFFER_SIZE); - status = switch_core_codec_encode(session->write_codec, - frame->codec, - enc_frame->data, - enc_frame->datalen, - rate, - session->enc_write_frame.data, &session->enc_write_frame.datalen, &session->enc_write_frame.rate, &flag); - - switch_assert(session->enc_read_frame.datalen <= SWITCH_RECOMMENDED_BUFFER_SIZE); - - session->write_codec->cur_frame = NULL; - frame->codec->cur_frame = NULL; - switch (status) { - case SWITCH_STATUS_RESAMPLE: - resample++; - session->enc_write_frame.codec = session->write_codec; - session->enc_write_frame.samples = enc_frame->datalen / sizeof(int16_t) / session->write_impl.number_of_channels; - session->enc_write_frame.channels = session->write_impl.number_of_channels; - session->enc_write_frame.m = frame->m; - session->enc_write_frame.ssrc = frame->ssrc; - session->enc_write_frame.payload = session->write_impl.ianacode; - write_frame = &session->enc_write_frame; - if (!session->write_resampler) { - switch_mutex_lock(session->resample_mutex); - if (!session->write_resampler) { - status = switch_resample_create(&session->write_resampler, - frame->codec->implementation->actual_samples_per_second, - session->write_impl.actual_samples_per_second, - session->write_impl.decoded_bytes_per_packet, SWITCH_RESAMPLE_QUALITY, - session->write_impl.number_of_channels); - } - switch_mutex_unlock(session->resample_mutex); - - - - if (status != SWITCH_STATUS_SUCCESS) { - goto done; - } else { - switch_core_session_message_t msg = { 0 }; - msg.numeric_arg = 1; - msg.message_id = SWITCH_MESSAGE_RESAMPLE_EVENT; - switch_core_session_receive_message(session, &msg); - - - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "Activating write resampler\n"); - } - } - break; - case SWITCH_STATUS_SUCCESS: - session->enc_write_frame.codec = session->write_codec; - session->enc_write_frame.samples = enc_frame->datalen / sizeof(int16_t) / session->write_impl.number_of_channels; - session->enc_write_frame.channels = session->write_impl.number_of_channels; - session->enc_write_frame.m = frame->m; - session->enc_write_frame.ssrc = frame->ssrc; - session->enc_write_frame.payload = session->write_impl.ianacode; - session->enc_write_frame.flags = 0; - write_frame = &session->enc_write_frame; - break; - case SWITCH_STATUS_NOOP: - if (session->write_resampler) { - switch_core_session_message_t msg = { 0 }; - int ok = 0; - - switch_mutex_lock(session->resample_mutex); - if (session->write_resampler) { - switch_resample_destroy(&session->write_resampler); - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "Deactivating write resampler\n"); - ok = 1; - } - switch_mutex_unlock(session->resample_mutex); - - if (ok) { - msg.numeric_arg = 0; - msg.message_id = SWITCH_MESSAGE_RESAMPLE_EVENT; - switch_core_session_receive_message(session, &msg); - } - - } - enc_frame->codec = session->write_codec; - enc_frame->samples = enc_frame->datalen / sizeof(int16_t) / session->read_impl.number_of_channels; - enc_frame->channels = session->read_impl.number_of_channels; - enc_frame->m = frame->m; - enc_frame->ssrc = frame->ssrc; - enc_frame->payload = enc_frame->codec->implementation->ianacode; - write_frame = enc_frame; - status = SWITCH_STATUS_SUCCESS; - break; - case SWITCH_STATUS_NOT_INITALIZED: - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Codec init error!\n"); - write_frame = NULL; - goto error; - default: - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Codec %s encoder error %d!\n", - session->read_codec->codec_interface->interface_name, status); - write_frame = NULL; - goto error; - } - - if (!did_write_resample && session->read_resampler) { - short *data = write_frame->data; - switch_mutex_lock(session->resample_mutex); - if (session->read_resampler) { - switch_resample_process(session->read_resampler, data, write_frame->datalen / 2 / session->read_resampler->channels); - memcpy(data, session->read_resampler->to, session->read_resampler->to_len * 2 * session->read_resampler->channels); - write_frame->samples = session->read_resampler->to_len; - write_frame->channels = session->read_resampler->channels; - write_frame->datalen = session->read_resampler->to_len * 2 * session->read_resampler->channels; - write_frame->rate = session->read_resampler->to_rate; - } - switch_mutex_unlock(session->resample_mutex); - - } - - if (flag & SFF_CNG) { - switch_set_flag(write_frame, SFF_CNG); - } - - if (ptime_mismatch || resample) { - write_frame->timestamp = 0; - } - - if ((status = perform_write(session, write_frame, flags, stream_id)) != SWITCH_STATUS_SUCCESS) { - break; - } - - } - - goto error; - } - } - - - - - - done: - - if (ptime_mismatch || resample) { - write_frame->timestamp = 0; - } - - if (do_write) { - status = perform_write(session, write_frame, flags, stream_id); - } - - error: - - switch_mutex_unlock(session->write_codec->mutex); - switch_mutex_unlock(frame->codec->mutex); - switch_mutex_unlock(session->codec_write_mutex); - - return status; -} - static char *SIG_NAMES[] = { "NONE", "KILL", diff --git a/src/switch_core_media.c b/src/switch_core_media.c index 7bc2abe6bb..5cf2f2b624 100644 --- a/src/switch_core_media.c +++ b/src/switch_core_media.c @@ -208,7 +208,7 @@ typedef struct switch_rtp_engine_s { switch_engine_function_t engine_function; void *engine_user_data; int8_t engine_function_running; - + switch_frame_buffer_t *write_fb; } switch_rtp_engine_t; struct switch_media_handle_s { @@ -6154,6 +6154,198 @@ SWITCH_DECLARE(void) switch_core_autobind_cpu(void) } } +static switch_status_t perform_write(switch_core_session_t *session, switch_frame_t *frame, switch_io_flag_t flags, int stream_id) +{ + switch_io_event_hook_write_frame_t *ptr; + switch_status_t status = SWITCH_STATUS_FALSE; + switch_rtp_engine_t *a_engine; + switch_media_handle_t *smh; + + switch_assert(session != NULL); + + if (!(smh = session->media_handle)) { + return SWITCH_STATUS_FALSE; + } + + a_engine = &smh->engines[SWITCH_MEDIA_TYPE_AUDIO]; + + if (a_engine && a_engine->write_fb && !(flags & SWITCH_IO_FLAG_QUEUED)) { + switch_frame_t *dupframe = NULL; + + if (switch_frame_buffer_dup(a_engine->write_fb, frame, &dupframe) == SWITCH_STATUS_SUCCESS) { + switch_frame_buffer_push(a_engine->write_fb, dupframe); + dupframe = NULL; + return SWITCH_STATUS_SUCCESS; + } + } + + if (session->bugs && !(frame->flags & SFF_NOT_AUDIO)) { + switch_media_bug_t *bp; + switch_bool_t ok = SWITCH_TRUE; + int prune = 0; + + switch_thread_rwlock_rdlock(session->bug_rwlock); + + for (bp = session->bugs; bp; bp = bp->next) { + ok = SWITCH_TRUE; + + if (switch_channel_test_flag(session->channel, CF_PAUSE_BUGS) && !switch_core_media_bug_test_flag(bp, SMBF_NO_PAUSE)) { + continue; + } + + if (!switch_channel_test_flag(session->channel, CF_ANSWERED) && switch_core_media_bug_test_flag(bp, SMBF_ANSWER_REQ)) { + continue; + } + if (switch_test_flag(bp, SMBF_PRUNE)) { + prune++; + continue; + } + + if (bp->ready) { + if (switch_test_flag(bp, SMBF_TAP_NATIVE_WRITE)) { + if (bp->callback) { + bp->native_write_frame = frame; + ok = bp->callback(bp, bp->user_data, SWITCH_ABC_TYPE_TAP_NATIVE_WRITE); + bp->native_write_frame = NULL; + } + } + } + + if ((bp->stop_time && bp->stop_time <= switch_epoch_time_now(NULL)) || ok == SWITCH_FALSE) { + switch_set_flag(bp, SMBF_PRUNE); + prune++; + } + } + switch_thread_rwlock_unlock(session->bug_rwlock); + + if (prune) { + switch_core_media_bug_prune(session); + } + } + + + if (session->endpoint_interface->io_routines->write_frame) { + if ((status = session->endpoint_interface->io_routines->write_frame(session, frame, flags, stream_id)) == SWITCH_STATUS_SUCCESS) { + for (ptr = session->event_hooks.write_frame; ptr; ptr = ptr->next) { + if ((status = ptr->write_frame(session, frame, flags, stream_id)) != SWITCH_STATUS_SUCCESS) { + break; + } + } + } + } + + return status; +} + + +static void *SWITCH_THREAD_FUNC audio_write_thread(switch_thread_t *thread, void *obj) +{ + struct media_helper *mh = obj; + switch_core_session_t *session = mh->session; + switch_media_handle_t *smh; + switch_rtp_engine_t *a_engine = NULL; + switch_codec_implementation_t write_impl; + switch_timer_t timer = {0}; + + if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) { + return NULL; + } + + if (!(smh = session->media_handle)) { + switch_core_session_rwunlock(session); + return NULL; + } + + a_engine = &smh->engines[SWITCH_MEDIA_TYPE_AUDIO]; + a_engine->thread_id = switch_thread_self(); + + + write_impl = session->write_impl; + + switch_core_timer_init(&timer, "soft", write_impl.microseconds_per_packet / 1000, + write_impl.samples_per_packet, switch_core_session_get_pool(session)); + + mh->up = 1; + + switch_frame_buffer_create(&a_engine->write_fb, 500); + + + while(switch_channel_up_nosig(session->channel) && mh->up == 1) { + void *pop; + + if (session->write_impl.microseconds_per_packet != write_impl.microseconds_per_packet || + session->write_impl.samples_per_packet != write_impl.samples_per_packet) { + + + write_impl = session->write_impl; + switch_core_timer_destroy(&timer); + switch_core_timer_init(&timer, "soft", write_impl.microseconds_per_packet / 1000, + write_impl.samples_per_packet, switch_core_session_get_pool(session)); + + } + + if (switch_frame_buffer_pop(a_engine->write_fb, &pop) == SWITCH_STATUS_SUCCESS && pop) { + switch_frame_t *frame = (switch_frame_t *)pop; + + if ((switch_size_t)pop == 1) { + break; + } + + perform_write(session, frame, SWITCH_IO_FLAG_QUEUED, 0); + switch_frame_buffer_free(a_engine->write_fb, &frame); + } + + switch_core_timer_next(&timer); + + } + + switch_mutex_lock(smh->control_mutex); + mh->up = 0; + switch_mutex_unlock(smh->control_mutex); + + switch_frame_buffer_destroy(&a_engine->write_fb); + switch_core_timer_destroy(&timer); + + switch_core_session_rwunlock(session); + return NULL; +} + +SWITCH_DECLARE(switch_status_t) switch_core_session_start_audio_write_thread(switch_core_session_t *session) +{ + switch_threadattr_t *thd_attr = NULL; + switch_memory_pool_t *pool = switch_core_session_get_pool(session); + switch_rtp_engine_t *a_engine = NULL; + switch_media_handle_t *smh; + + if (!switch_channel_test_flag(session->channel, CF_AUDIO)) { + return SWITCH_STATUS_NOTIMPL; + } + + if (!(smh = session->media_handle)) { + return SWITCH_STATUS_FALSE; + } + + a_engine = &smh->engines[SWITCH_MEDIA_TYPE_AUDIO]; + + if (a_engine->media_thread) { + return SWITCH_STATUS_INUSE; + } + + switch_mutex_lock(smh->control_mutex); + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s Starting Audio write thread\n", switch_core_session_get_name(session)); + + a_engine->mh.session = session; + switch_threadattr_create(&thd_attr, pool); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + + switch_thread_cond_create(&a_engine->mh.cond, pool); + switch_mutex_init(&a_engine->mh.cond_mutex, SWITCH_MUTEX_NESTED, pool); + switch_thread_create(&a_engine->media_thread, thd_attr, audio_write_thread, &a_engine->mh, switch_core_session_get_pool(session)); + + switch_mutex_unlock(smh->control_mutex); + return SWITCH_STATUS_SUCCESS; +} @@ -7120,6 +7312,20 @@ SWITCH_DECLARE(void) switch_core_media_deactivate_rtp(switch_core_session_t *ses switch_rtp_text_factory_destroy(&t_engine->tf); } + if (a_engine->media_thread) { + switch_status_t st; + + switch_mutex_lock(smh->control_mutex); + if (a_engine->mh.up && a_engine->write_fb) { + switch_frame_buffer_push(a_engine->write_fb, (void *) 1); + } + a_engine->mh.up = 0; + switch_mutex_unlock(smh->control_mutex); + + switch_thread_join(&st, a_engine->media_thread); + a_engine->media_thread = NULL; + } + if (v_engine->media_thread) { switch_status_t st; switch_channel_clear_flag(session->channel, CF_VIDEO_PASSIVE); @@ -7541,6 +7747,8 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_activate_rtp(switch_core_sessi if (switch_channel_up(session->channel)) { switch_channel_set_variable(session->channel, "rtp_use_timer_name", timer_name); + + a_engine->rtp_session = switch_rtp_new(a_engine->local_sdp_ip, a_engine->local_sdp_port, a_engine->cur_payload_map->remote_sdp_ip, @@ -14077,6 +14285,651 @@ SWITCH_DECLARE(switch_msrp_session_t *) switch_core_media_get_msrp_session(switc } +SWITCH_DECLARE(switch_status_t) switch_core_session_write_frame(switch_core_session_t *session, switch_frame_t *frame, switch_io_flag_t flags, + int stream_id) +{ + + switch_status_t status = SWITCH_STATUS_FALSE; + switch_frame_t *enc_frame = NULL, *write_frame = frame; + unsigned int flag = 0, need_codec = 0, perfect = 0, do_bugs = 0, do_write = 0, do_resample = 0, ptime_mismatch = 0, pass_cng = 0, resample = 0; + int did_write_resample = 0; + + switch_assert(session != NULL); + switch_assert(frame != NULL); + + if (!switch_channel_ready(session->channel)) { + return SWITCH_STATUS_FALSE; + } + + if (switch_mutex_trylock(session->codec_write_mutex) == SWITCH_STATUS_SUCCESS) { + switch_mutex_unlock(session->codec_write_mutex); + } else { + return SWITCH_STATUS_SUCCESS; + } + + if (switch_test_flag(frame, SFF_CNG)) { + if (switch_channel_test_flag(session->channel, CF_ACCEPT_CNG)) { + pass_cng = 1; + } else { + return SWITCH_STATUS_SUCCESS; + } + } + + if (switch_channel_test_flag(session->channel, CF_AUDIO_PAUSE_WRITE)) { + return SWITCH_STATUS_SUCCESS; + } + + if (!(session->write_codec && switch_core_codec_ready(session->write_codec)) && !pass_cng) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s has no write codec.\n", switch_channel_get_name(session->channel)); + switch_channel_hangup(session->channel, SWITCH_CAUSE_INCOMPATIBLE_DESTINATION); + return SWITCH_STATUS_FALSE; + } + + if (switch_channel_test_flag(session->channel, CF_HOLD)) { + return SWITCH_STATUS_SUCCESS; + } + + if (switch_test_flag(frame, SFF_PROXY_PACKET) || pass_cng) { + /* Fast PASS! */ + switch_mutex_lock(session->codec_write_mutex); + status = perform_write(session, frame, flag, stream_id); + switch_mutex_unlock(session->codec_write_mutex); + return status; + } + + switch_mutex_lock(session->codec_write_mutex); + + if (!(frame->codec && frame->codec->implementation)) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s has received a bad frame with no codec!\n", + switch_channel_get_name(session->channel)); + switch_channel_hangup(session->channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER); + switch_mutex_unlock(session->codec_write_mutex); + return SWITCH_STATUS_FALSE; + } + + switch_assert(frame->codec != NULL); + switch_assert(frame->codec->implementation != NULL); + + if (!(switch_core_codec_ready(session->write_codec) && frame->codec) || + !switch_channel_ready(session->channel) || !switch_channel_media_ready(session->channel)) { + switch_mutex_unlock(session->codec_write_mutex); + return SWITCH_STATUS_FALSE; + } + + switch_mutex_lock(session->write_codec->mutex); + switch_mutex_lock(frame->codec->mutex); + + if (!(switch_core_codec_ready(session->write_codec) && switch_core_codec_ready(frame->codec))) goto error; + + if ((session->write_codec && frame->codec && session->write_codec->implementation != frame->codec->implementation)) { + if (session->write_impl.codec_id == frame->codec->implementation->codec_id || + session->write_impl.microseconds_per_packet != frame->codec->implementation->microseconds_per_packet) { + ptime_mismatch = TRUE; + + if ((switch_test_flag(frame->codec, SWITCH_CODEC_FLAG_PASSTHROUGH) || switch_test_flag(session->read_codec, SWITCH_CODEC_FLAG_PASSTHROUGH)) || + switch_channel_test_flag(session->channel, CF_PASSTHRU_PTIME_MISMATCH)) { + status = perform_write(session, frame, flags, stream_id); + goto error; + } + + if (session->write_impl.microseconds_per_packet < frame->codec->implementation->microseconds_per_packet) { + switch_core_session_start_audio_write_thread(session); + } + } + need_codec = TRUE; + } + + if (session->write_codec && !frame->codec) { + need_codec = TRUE; + } + + if (session->bugs && !need_codec && !switch_test_flag(session, SSF_MEDIA_BUG_TAP_ONLY)) { + do_bugs = TRUE; + need_codec = TRUE; + } + + if (frame->codec->implementation->actual_samples_per_second != session->write_impl.actual_samples_per_second) { + need_codec = TRUE; + do_resample = TRUE; + } + + + if ((frame->flags & SFF_NOT_AUDIO)) { + do_resample = 0; + do_bugs = 0; + need_codec = 0; + } + + if (switch_test_flag(session, SSF_WRITE_TRANSCODE) && !need_codec && switch_core_codec_ready(session->write_codec)) { + switch_core_session_t *other_session; + const char *uuid = switch_channel_get_partner_uuid(switch_core_session_get_channel(session)); + + if (uuid && (other_session = switch_core_session_locate(uuid))) { + switch_set_flag(other_session, SSF_READ_CODEC_RESET); + switch_set_flag(other_session, SSF_READ_CODEC_RESET); + switch_set_flag(other_session, SSF_WRITE_CODEC_RESET); + switch_core_session_rwunlock(other_session); + } + + switch_clear_flag(session, SSF_WRITE_TRANSCODE); + } + + + if (switch_test_flag(session, SSF_WRITE_CODEC_RESET)) { + switch_core_codec_reset(session->write_codec); + switch_clear_flag(session, SSF_WRITE_CODEC_RESET); + } + + if (!need_codec) { + do_write = TRUE; + write_frame = frame; + goto done; + } + + if (!switch_test_flag(session, SSF_WARN_TRANSCODE)) { + switch_core_session_message_t msg = { 0 }; + + msg.message_id = SWITCH_MESSAGE_INDICATE_TRANSCODING_NECESSARY; + switch_core_session_receive_message(session, &msg); + switch_set_flag(session, SSF_WARN_TRANSCODE); + } + + if (frame->codec) { + session->raw_write_frame.datalen = session->raw_write_frame.buflen; + frame->codec->cur_frame = frame; + session->write_codec->cur_frame = frame; + status = switch_core_codec_decode(frame->codec, + session->write_codec, + frame->data, + frame->datalen, + session->write_impl.actual_samples_per_second, + session->raw_write_frame.data, &session->raw_write_frame.datalen, &session->raw_write_frame.rate, &frame->flags); + frame->codec->cur_frame = NULL; + session->write_codec->cur_frame = NULL; + if (do_resample && status == SWITCH_STATUS_SUCCESS) { + status = SWITCH_STATUS_RESAMPLE; + } + + /* mux or demux to match */ + if (session->write_impl.number_of_channels != frame->codec->implementation->number_of_channels) { + uint32_t rlen = session->raw_write_frame.datalen / 2 / frame->codec->implementation->number_of_channels; + switch_mux_channels((int16_t *) session->raw_write_frame.data, rlen, + frame->codec->implementation->number_of_channels, session->write_impl.number_of_channels); + session->raw_write_frame.datalen = rlen * 2 * session->write_impl.number_of_channels; + } + + switch (status) { + case SWITCH_STATUS_RESAMPLE: + resample++; + write_frame = &session->raw_write_frame; + write_frame->rate = frame->codec->implementation->actual_samples_per_second; + if (!session->write_resampler) { + switch_mutex_lock(session->resample_mutex); + status = switch_resample_create(&session->write_resampler, + frame->codec->implementation->actual_samples_per_second, + session->write_impl.actual_samples_per_second, + session->write_impl.decoded_bytes_per_packet, SWITCH_RESAMPLE_QUALITY, session->write_impl.number_of_channels); + + + switch_mutex_unlock(session->resample_mutex); + if (status != SWITCH_STATUS_SUCCESS) { + goto done; + } else { + switch_core_session_message_t msg = { 0 }; + msg.numeric_arg = 1; + msg.message_id = SWITCH_MESSAGE_RESAMPLE_EVENT; + switch_core_session_receive_message(session, &msg); + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "Activating write resampler\n"); + } + } + break; + case SWITCH_STATUS_SUCCESS: + session->raw_write_frame.samples = session->raw_write_frame.datalen / sizeof(int16_t) / session->write_impl.number_of_channels; + session->raw_write_frame.channels = session->write_impl.number_of_channels; + session->raw_write_frame.timestamp = frame->timestamp; + session->raw_write_frame.rate = frame->rate; + session->raw_write_frame.m = frame->m; + session->raw_write_frame.ssrc = frame->ssrc; + session->raw_write_frame.seq = frame->seq; + session->raw_write_frame.payload = frame->payload; + session->raw_write_frame.flags = 0; + if (switch_test_flag(frame, SFF_PLC)) { + session->raw_write_frame.flags |= SFF_PLC; + } + + write_frame = &session->raw_write_frame; + break; + case SWITCH_STATUS_BREAK: + status = SWITCH_STATUS_SUCCESS; + goto error; + case SWITCH_STATUS_NOOP: + if (session->write_resampler) { + switch_mutex_lock(session->resample_mutex); + switch_resample_destroy(&session->write_resampler); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "Deactivating write resampler\n"); + switch_mutex_unlock(session->resample_mutex); + + { + switch_core_session_message_t msg = { 0 }; + msg.numeric_arg = 0; + msg.message_id = SWITCH_MESSAGE_RESAMPLE_EVENT; + switch_core_session_receive_message(session, &msg); + } + + } + write_frame = frame; + status = SWITCH_STATUS_SUCCESS; + break; + default: + + if (status == SWITCH_STATUS_NOT_INITALIZED) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Codec init error!\n"); + goto error; + } + if (ptime_mismatch && status != SWITCH_STATUS_GENERR) { + status = perform_write(session, frame, flags, stream_id); + status = SWITCH_STATUS_SUCCESS; + goto error; + } + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Codec %s decoder error!\n", + frame->codec->codec_interface->interface_name); + goto error; + } + } + + + + if (session->write_resampler) { + short *data = write_frame->data; + + switch_mutex_lock(session->resample_mutex); + if (session->write_resampler) { + + if (switch_resample_calc_buffer_size(session->write_resampler->to_rate, session->write_resampler->from_rate, + write_frame->datalen / 2 / session->write_resampler->channels) > SWITCH_RECOMMENDED_BUFFER_SIZE) { + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "%s not enough buffer space for required resample operation!\n", + switch_channel_get_name(session->channel)); + switch_channel_hangup(session->channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER); + switch_mutex_unlock(session->resample_mutex); + goto error; + } + + + switch_resample_process(session->write_resampler, data, write_frame->datalen / 2 / session->write_resampler->channels); + + memcpy(data, session->write_resampler->to, session->write_resampler->to_len * 2 * session->write_resampler->channels); + + write_frame->samples = session->write_resampler->to_len; + write_frame->channels = session->write_resampler->channels; + write_frame->datalen = write_frame->samples * 2 * session->write_resampler->channels; + + write_frame->rate = session->write_resampler->to_rate; + + did_write_resample = 1; + } + switch_mutex_unlock(session->resample_mutex); + } + + + + if (session->bugs) { + switch_media_bug_t *bp; + int prune = 0; + + switch_thread_rwlock_rdlock(session->bug_rwlock); + for (bp = session->bugs; bp; bp = bp->next) { + switch_bool_t ok = SWITCH_TRUE; + + if (!bp->ready) { + continue; + } + + if (switch_channel_test_flag(session->channel, CF_PAUSE_BUGS) && !switch_core_media_bug_test_flag(bp, SMBF_NO_PAUSE)) { + continue; + } + + if (!switch_channel_test_flag(session->channel, CF_ANSWERED) && switch_core_media_bug_test_flag(bp, SMBF_ANSWER_REQ)) { + continue; + } + + if (switch_test_flag(bp, SMBF_PRUNE)) { + prune++; + continue; + } + + if (switch_test_flag(bp, SMBF_WRITE_STREAM)) { + switch_mutex_lock(bp->write_mutex); + switch_buffer_write(bp->raw_write_buffer, write_frame->data, write_frame->datalen); + switch_mutex_unlock(bp->write_mutex); + + if (bp->callback) { + ok = bp->callback(bp, bp->user_data, SWITCH_ABC_TYPE_WRITE); + } + } + + if (switch_test_flag(bp, SMBF_WRITE_REPLACE)) { + do_bugs = 0; + if (bp->callback) { + bp->write_replace_frame_in = write_frame; + bp->write_replace_frame_out = write_frame; + if ((ok = bp->callback(bp, bp->user_data, SWITCH_ABC_TYPE_WRITE_REPLACE)) == SWITCH_TRUE) { + write_frame = bp->write_replace_frame_out; + } + } + } + + if (bp->stop_time && bp->stop_time <= switch_epoch_time_now(NULL)) { + ok = SWITCH_FALSE; + } + + + if (ok == SWITCH_FALSE) { + switch_set_flag(bp, SMBF_PRUNE); + prune++; + } + } + switch_thread_rwlock_unlock(session->bug_rwlock); + if (prune) { + switch_core_media_bug_prune(session); + } + } + + if (do_bugs) { + do_write = TRUE; + write_frame = frame; + goto done; + } + + if (session->write_codec) { + if (!ptime_mismatch && write_frame->codec && write_frame->codec->implementation && + write_frame->codec->implementation->decoded_bytes_per_packet == session->write_impl.decoded_bytes_per_packet) { + perfect = TRUE; + } + + + + if (perfect) { + + if (write_frame->datalen < session->write_impl.decoded_bytes_per_packet) { + memset(write_frame->data, 255, session->write_impl.decoded_bytes_per_packet - write_frame->datalen); + write_frame->datalen = session->write_impl.decoded_bytes_per_packet; + } + + enc_frame = write_frame; + session->enc_write_frame.datalen = session->enc_write_frame.buflen; + session->write_codec->cur_frame = frame; + frame->codec->cur_frame = frame; + switch_assert(enc_frame->datalen <= SWITCH_RECOMMENDED_BUFFER_SIZE); + switch_assert(session->enc_read_frame.datalen <= SWITCH_RECOMMENDED_BUFFER_SIZE); + status = switch_core_codec_encode(session->write_codec, + frame->codec, + enc_frame->data, + enc_frame->datalen, + session->write_impl.actual_samples_per_second, + session->enc_write_frame.data, &session->enc_write_frame.datalen, &session->enc_write_frame.rate, &flag); + + switch_assert(session->enc_read_frame.datalen <= SWITCH_RECOMMENDED_BUFFER_SIZE); + + session->write_codec->cur_frame = NULL; + frame->codec->cur_frame = NULL; + switch (status) { + case SWITCH_STATUS_RESAMPLE: + resample++; + /* switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Fixme 2\n"); */ + case SWITCH_STATUS_SUCCESS: + session->enc_write_frame.codec = session->write_codec; + session->enc_write_frame.samples = enc_frame->datalen / sizeof(int16_t) / session->write_impl.number_of_channels; + session->enc_write_frame.channels = session->write_impl.number_of_channels; + if (frame->codec->implementation->samples_per_packet != session->write_impl.samples_per_packet) { + session->enc_write_frame.timestamp = 0; + } else { + session->enc_write_frame.timestamp = frame->timestamp; + } + session->enc_write_frame.payload = session->write_impl.ianacode; + session->enc_write_frame.m = frame->m; + session->enc_write_frame.ssrc = frame->ssrc; + session->enc_write_frame.seq = frame->seq; + session->enc_write_frame.flags = 0; + write_frame = &session->enc_write_frame; + break; + case SWITCH_STATUS_NOOP: + enc_frame->codec = session->write_codec; + enc_frame->samples = enc_frame->datalen / sizeof(int16_t) / session->write_impl.number_of_channels; + enc_frame->channels = session->write_impl.number_of_channels; + enc_frame->timestamp = frame->timestamp; + enc_frame->m = frame->m; + enc_frame->seq = frame->seq; + enc_frame->ssrc = frame->ssrc; + enc_frame->payload = enc_frame->codec->implementation->ianacode; + write_frame = enc_frame; + status = SWITCH_STATUS_SUCCESS; + break; + case SWITCH_STATUS_NOT_INITALIZED: + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Codec init error!\n"); + write_frame = NULL; + goto error; + default: + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Codec %s encoder error!\n", + session->read_codec->codec_interface->interface_name); + write_frame = NULL; + goto error; + } + if (flag & SFF_CNG) { + switch_set_flag(write_frame, SFF_CNG); + } + + status = perform_write(session, write_frame, flags, stream_id); + goto error; + } else { + if (!session->raw_write_buffer) { + switch_size_t bytes_per_packet = session->write_impl.decoded_bytes_per_packet; + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, + "Engaging Write Buffer at %u bytes to accommodate %u->%u\n", + (uint32_t) bytes_per_packet, write_frame->datalen, session->write_impl.decoded_bytes_per_packet); + if ((status = switch_buffer_create_dynamic(&session->raw_write_buffer, + bytes_per_packet * SWITCH_BUFFER_BLOCK_FRAMES, + bytes_per_packet * SWITCH_BUFFER_START_FRAMES, 0)) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Write Buffer Failed!\n"); + goto error; + } + + /* Need to retrain the recording data */ + switch_core_media_bug_flush_all(session); + } + + if (!(switch_buffer_write(session->raw_write_buffer, write_frame->data, write_frame->datalen))) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Write Buffer %u bytes Failed!\n", write_frame->datalen); + status = SWITCH_STATUS_MEMERR; + goto error; + } + + status = SWITCH_STATUS_SUCCESS; + + while (switch_buffer_inuse(session->raw_write_buffer) >= session->write_impl.decoded_bytes_per_packet) { + int rate; + + if (switch_channel_down(session->channel) || !session->raw_write_buffer) { + goto error; + } + if ((session->raw_write_frame.datalen = (uint32_t) + switch_buffer_read(session->raw_write_buffer, session->raw_write_frame.data, session->write_impl.decoded_bytes_per_packet)) == 0) { + goto error; + } + + enc_frame = &session->raw_write_frame; + session->raw_write_frame.rate = session->write_impl.actual_samples_per_second; + session->enc_write_frame.datalen = session->enc_write_frame.buflen; + session->enc_write_frame.timestamp = 0; + + + if (frame->codec && frame->codec->implementation && switch_core_codec_ready(frame->codec)) { + rate = frame->codec->implementation->actual_samples_per_second; + } else { + rate = session->write_impl.actual_samples_per_second; + } + + session->write_codec->cur_frame = frame; + frame->codec->cur_frame = frame; + switch_assert(enc_frame->datalen <= SWITCH_RECOMMENDED_BUFFER_SIZE); + switch_assert(session->enc_read_frame.datalen <= SWITCH_RECOMMENDED_BUFFER_SIZE); + status = switch_core_codec_encode(session->write_codec, + frame->codec, + enc_frame->data, + enc_frame->datalen, + rate, + session->enc_write_frame.data, &session->enc_write_frame.datalen, &session->enc_write_frame.rate, &flag); + + switch_assert(session->enc_read_frame.datalen <= SWITCH_RECOMMENDED_BUFFER_SIZE); + + session->write_codec->cur_frame = NULL; + frame->codec->cur_frame = NULL; + switch (status) { + case SWITCH_STATUS_RESAMPLE: + resample++; + session->enc_write_frame.codec = session->write_codec; + session->enc_write_frame.samples = enc_frame->datalen / sizeof(int16_t) / session->write_impl.number_of_channels; + session->enc_write_frame.channels = session->write_impl.number_of_channels; + session->enc_write_frame.m = frame->m; + session->enc_write_frame.ssrc = frame->ssrc; + session->enc_write_frame.payload = session->write_impl.ianacode; + write_frame = &session->enc_write_frame; + if (!session->write_resampler) { + switch_mutex_lock(session->resample_mutex); + if (!session->write_resampler) { + status = switch_resample_create(&session->write_resampler, + frame->codec->implementation->actual_samples_per_second, + session->write_impl.actual_samples_per_second, + session->write_impl.decoded_bytes_per_packet, SWITCH_RESAMPLE_QUALITY, + session->write_impl.number_of_channels); + } + switch_mutex_unlock(session->resample_mutex); + + + + if (status != SWITCH_STATUS_SUCCESS) { + goto done; + } else { + switch_core_session_message_t msg = { 0 }; + msg.numeric_arg = 1; + msg.message_id = SWITCH_MESSAGE_RESAMPLE_EVENT; + switch_core_session_receive_message(session, &msg); + + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "Activating write resampler\n"); + } + } + break; + case SWITCH_STATUS_SUCCESS: + session->enc_write_frame.codec = session->write_codec; + session->enc_write_frame.samples = enc_frame->datalen / sizeof(int16_t) / session->write_impl.number_of_channels; + session->enc_write_frame.channels = session->write_impl.number_of_channels; + session->enc_write_frame.m = frame->m; + session->enc_write_frame.ssrc = frame->ssrc; + session->enc_write_frame.payload = session->write_impl.ianacode; + session->enc_write_frame.flags = 0; + write_frame = &session->enc_write_frame; + break; + case SWITCH_STATUS_NOOP: + if (session->write_resampler) { + switch_core_session_message_t msg = { 0 }; + int ok = 0; + + switch_mutex_lock(session->resample_mutex); + if (session->write_resampler) { + switch_resample_destroy(&session->write_resampler); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "Deactivating write resampler\n"); + ok = 1; + } + switch_mutex_unlock(session->resample_mutex); + + if (ok) { + msg.numeric_arg = 0; + msg.message_id = SWITCH_MESSAGE_RESAMPLE_EVENT; + switch_core_session_receive_message(session, &msg); + } + + } + enc_frame->codec = session->write_codec; + enc_frame->samples = enc_frame->datalen / sizeof(int16_t) / session->read_impl.number_of_channels; + enc_frame->channels = session->read_impl.number_of_channels; + enc_frame->m = frame->m; + enc_frame->ssrc = frame->ssrc; + enc_frame->payload = enc_frame->codec->implementation->ianacode; + write_frame = enc_frame; + status = SWITCH_STATUS_SUCCESS; + break; + case SWITCH_STATUS_NOT_INITALIZED: + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Codec init error!\n"); + write_frame = NULL; + goto error; + default: + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Codec %s encoder error %d!\n", + session->read_codec->codec_interface->interface_name, status); + write_frame = NULL; + goto error; + } + + if (!did_write_resample && session->read_resampler) { + short *data = write_frame->data; + switch_mutex_lock(session->resample_mutex); + if (session->read_resampler) { + switch_resample_process(session->read_resampler, data, write_frame->datalen / 2 / session->read_resampler->channels); + memcpy(data, session->read_resampler->to, session->read_resampler->to_len * 2 * session->read_resampler->channels); + write_frame->samples = session->read_resampler->to_len; + write_frame->channels = session->read_resampler->channels; + write_frame->datalen = session->read_resampler->to_len * 2 * session->read_resampler->channels; + write_frame->rate = session->read_resampler->to_rate; + } + switch_mutex_unlock(session->resample_mutex); + + } + + if (flag & SFF_CNG) { + switch_set_flag(write_frame, SFF_CNG); + } + + if (ptime_mismatch || resample) { + write_frame->timestamp = 0; + } + + if ((status = perform_write(session, write_frame, flags, stream_id)) != SWITCH_STATUS_SUCCESS) { + break; + } + + } + + goto error; + } + } + + + + + + done: + + if (ptime_mismatch || resample) { + write_frame->timestamp = 0; + } + + if (do_write) { + status = perform_write(session, write_frame, flags, stream_id); + } + + error: + + switch_mutex_unlock(session->write_codec->mutex); + switch_mutex_unlock(frame->codec->mutex); + switch_mutex_unlock(session->codec_write_mutex); + + return status; +} + + + /* For Emacs: * Local Variables: * mode:c diff --git a/src/switch_utils.c b/src/switch_utils.c index e3ba27996c..5c47974f1b 100644 --- a/src/switch_utils.c +++ b/src/switch_utils.c @@ -110,6 +110,7 @@ typedef struct switch_frame_node_s { struct switch_frame_buffer_s { switch_frame_node_t *head; switch_memory_pool_t *pool; + switch_queue_t *queue; switch_mutex_t *mutex; uint32_t total; }; @@ -164,8 +165,8 @@ static switch_frame_t *find_free_frame(switch_frame_buffer_t *fb, switch_frame_t np->frame->ssrc = orig->ssrc; np->frame->m = orig->m; np->frame->flags = orig->flags; - np->frame->codec = NULL; - np->frame->pmap = NULL; + np->frame->codec = orig->codec; + np->frame->pmap = orig->pmap; np->frame->img = NULL; np->frame->extra_data = np; np->inuse = 1; @@ -243,6 +244,26 @@ SWITCH_DECLARE(switch_status_t) switch_frame_buffer_dup(switch_frame_buffer_t *f return SWITCH_STATUS_SUCCESS; } +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_push(switch_frame_buffer_t *fb, void *ptr) +{ + return switch_queue_push(fb->queue, ptr); +} + +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_trypush(switch_frame_buffer_t *fb, void *ptr) +{ + return switch_queue_trypush(fb->queue, ptr); +} + +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_pop(switch_frame_buffer_t *fb, void **ptr) +{ + return switch_queue_pop(fb->queue, ptr); +} + +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_trypop(switch_frame_buffer_t *fb, void **ptr) +{ + return switch_queue_trypop(fb->queue, ptr); +} + SWITCH_DECLARE(switch_status_t) switch_frame_buffer_destroy(switch_frame_buffer_t **fbP) { switch_frame_buffer_t *fb = *fbP; @@ -254,14 +275,17 @@ SWITCH_DECLARE(switch_status_t) switch_frame_buffer_destroy(switch_frame_buffer_ return SWITCH_STATUS_SUCCESS; } -SWITCH_DECLARE(switch_status_t) switch_frame_buffer_create(switch_frame_buffer_t **fbP) +SWITCH_DECLARE(switch_status_t) switch_frame_buffer_create(switch_frame_buffer_t **fbP, switch_size_t qlen) { switch_frame_buffer_t *fb; switch_memory_pool_t *pool; + if (!qlen) qlen = 500; + switch_core_new_memory_pool(&pool); fb = switch_core_alloc(pool, sizeof(*fb)); fb->pool = pool; + switch_queue_create(&fb->queue, qlen, fb->pool); switch_mutex_init(&fb->mutex, SWITCH_MUTEX_NESTED, pool); *fbP = fb; @@ -297,9 +321,11 @@ SWITCH_DECLARE(switch_status_t) switch_frame_dup(switch_frame_t *orig, switch_fr } - new_frame->codec = NULL; - new_frame->pmap = NULL; + new_frame->codec = orig->codec; + new_frame->pmap = orig->pmap; new_frame->img = NULL; + + if (orig->img && !switch_test_flag(orig, SFF_ENCODED)) { switch_img_copy(orig->img, &new_frame->img); }