From 43442e4f41c1f2a85fa07ec2d18e1421f1e81585 Mon Sep 17 00:00:00 2001 From: Brian West Date: Mon, 2 May 2011 17:21:39 -0500 Subject: [PATCH] refactor mod_loopback timeout handling --- src/mod/endpoints/mod_loopback/mod_loopback.c | 66 ++++++++----------- 1 file changed, 29 insertions(+), 37 deletions(-) diff --git a/src/mod/endpoints/mod_loopback/mod_loopback.c b/src/mod/endpoints/mod_loopback/mod_loopback.c index be9b4994d6..8fb8384e6b 100644 --- a/src/mod/endpoints/mod_loopback/mod_loopback.c +++ b/src/mod/endpoints/mod_loopback/mod_loopback.c @@ -48,7 +48,7 @@ typedef enum { TFLAG_LINKED = (1 << 0), TFLAG_OUTBOUND = (1 << 1), TFLAG_WRITE = (1 << 2), - TFLAG_CNG = (1 << 3), + TFLAG_USEME = (1 << 3), TFLAG_BRIDGE = (1 << 4), TFLAG_BOWOUT = (1 << 5), TFLAG_BLEG = (1 << 6), @@ -158,7 +158,7 @@ static switch_status_t tech_init(private_t *tech_pvt, switch_core_session_t *ses tech_pvt->cng_frame.data = tech_pvt->cng_databuf; tech_pvt->cng_frame.buflen = sizeof(tech_pvt->cng_databuf); - //switch_set_flag((&tech_pvt->cng_frame), SFF_CNG); + tech_pvt->cng_frame.datalen = 2; tech_pvt->bowout_frame_count = (tech_pvt->read_codec.implementation->actual_samples_per_second / @@ -297,12 +297,10 @@ static switch_status_t channel_on_init(switch_core_session_t *session) static void do_reset(private_t *tech_pvt) { switch_clear_flag_locked(tech_pvt, TFLAG_WRITE); - switch_set_flag_locked(tech_pvt, TFLAG_CNG); switch_mutex_lock(tech_pvt->mutex); if (tech_pvt->other_tech_pvt) { switch_clear_flag_locked(tech_pvt->other_tech_pvt, TFLAG_WRITE); - switch_set_flag_locked(tech_pvt->other_tech_pvt, TFLAG_CNG); } switch_mutex_unlock(tech_pvt->mutex); } @@ -446,12 +444,6 @@ static switch_status_t channel_kill_channel(switch_core_session_t *session, int switch (sig) { case SWITCH_SIG_BREAK: - switch_set_flag_locked(tech_pvt, TFLAG_CNG); - switch_mutex_lock(tech_pvt->mutex); - if (tech_pvt->other_tech_pvt) { - switch_set_flag_locked(tech_pvt->other_tech_pvt, TFLAG_CNG); - } - switch_mutex_unlock(tech_pvt->mutex); break; case SWITCH_SIG_KILL: switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING); @@ -583,18 +575,12 @@ static switch_status_t channel_read_frame(switch_core_session_t *session, switch tech_pvt->write_frame->codec = &tech_pvt->read_codec; *frame = tech_pvt->write_frame; tech_pvt->packet_count++; - switch_clear_flag_locked(tech_pvt, TFLAG_CNG); switch_clear_flag(tech_pvt->write_frame, SFF_CNG); } else { - switch_set_flag(tech_pvt, TFLAG_CNG); - } - - if (switch_test_flag(tech_pvt, TFLAG_CNG)) { *frame = &tech_pvt->cng_frame; tech_pvt->cng_frame.codec = &tech_pvt->read_codec; tech_pvt->cng_frame.datalen = tech_pvt->read_codec.implementation->decoded_bytes_per_packet; switch_set_flag((&tech_pvt->cng_frame), SFF_CNG); - switch_clear_flag_locked(tech_pvt, TFLAG_CNG); } @@ -613,6 +599,17 @@ static switch_status_t channel_read_frame(switch_core_session_t *session, switch return status; } +static void clear_queue(private_t *tech_pvt) +{ + void *pop; + + while (switch_queue_trypop(tech_pvt->frame_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { + switch_frame_t *frame = (switch_frame_t *) pop; + switch_frame_free(&frame); + } +} + + static switch_status_t channel_write_frame(switch_core_session_t *session, switch_frame_t *frame, switch_io_flag_t flags, int stream_id) { switch_channel_t *channel = NULL; @@ -626,7 +623,7 @@ static switch_status_t channel_write_frame(switch_core_session_t *session, switc switch_assert(tech_pvt != NULL); if (switch_test_flag(frame, SFF_CNG) || - switch_test_flag(tech_pvt, TFLAG_CNG) || (switch_test_flag(tech_pvt, TFLAG_BOWOUT) && switch_test_flag(tech_pvt, TFLAG_BOWOUT_USED))) { + (switch_test_flag(tech_pvt, TFLAG_BOWOUT) && switch_test_flag(tech_pvt, TFLAG_BOWOUT_USED))) { switch_core_timer_sync(&tech_pvt->timer); switch_core_timer_sync(&tech_pvt->other_tech_pvt->timer); return SWITCH_STATUS_SUCCESS; @@ -672,23 +669,27 @@ static switch_status_t channel_write_frame(switch_core_session_t *session, switc if (switch_test_flag(tech_pvt, TFLAG_LINKED) && tech_pvt->other_tech_pvt) { switch_frame_t *clone; - + if (frame->codec->implementation != tech_pvt->write_codec.implementation) { /* change codecs to match */ tech_init(tech_pvt, session, frame->codec); tech_init(tech_pvt->other_tech_pvt, tech_pvt->other_session, frame->codec); } - if (switch_queue_size(tech_pvt->other_tech_pvt->frame_queue) < FRAME_QUEUE_LEN) { - if (switch_frame_dup(frame, &clone) != SWITCH_STATUS_SUCCESS) { - abort(); - } - if (switch_queue_trypush(tech_pvt->other_tech_pvt->frame_queue, clone) != SWITCH_STATUS_SUCCESS) { - switch_frame_free(&clone); - } + if (switch_frame_dup(frame, &clone) != SWITCH_STATUS_SUCCESS) { + abort(); + } + + if ((status = switch_queue_trypush(tech_pvt->other_tech_pvt->frame_queue, clone)) != SWITCH_STATUS_SUCCESS) { + clear_queue(tech_pvt->other_tech_pvt); + status = switch_queue_trypush(tech_pvt->other_tech_pvt->frame_queue, clone); + } + if (status == SWITCH_STATUS_SUCCESS) { switch_set_flag_locked(tech_pvt->other_tech_pvt, TFLAG_WRITE); + } else { + switch_frame_free(&clone); } status = SWITCH_STATUS_SUCCESS; @@ -747,22 +748,13 @@ static switch_status_t channel_receive_message(switch_core_session_t *session, s case SWITCH_MESSAGE_INDICATE_UNBRIDGE: case SWITCH_MESSAGE_INDICATE_AUDIO_SYNC: { - void *pop; done = 1; - while (switch_queue_trypop(tech_pvt->frame_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { - switch_frame_t *frame = (switch_frame_t *) pop; - switch_frame_free(&frame); - } - - while (switch_queue_trypop(tech_pvt->other_tech_pvt->frame_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { - switch_frame_t *frame = (switch_frame_t *) pop; - switch_frame_free(&frame); - } - + clear_queue(tech_pvt); + clear_queue(tech_pvt->other_tech_pvt); switch_core_timer_sync(&tech_pvt->timer); - + switch_core_timer_sync(&tech_pvt->other_tech_pvt->timer); } break; default: