refactor mod_loopback timeout handling

This commit is contained in:
Brian West 2011-05-02 17:21:39 -05:00
parent 2246f3ca75
commit 43442e4f41
1 changed files with 29 additions and 37 deletions

View File

@ -48,7 +48,7 @@ typedef enum {
TFLAG_LINKED = (1 << 0), TFLAG_LINKED = (1 << 0),
TFLAG_OUTBOUND = (1 << 1), TFLAG_OUTBOUND = (1 << 1),
TFLAG_WRITE = (1 << 2), TFLAG_WRITE = (1 << 2),
TFLAG_CNG = (1 << 3), TFLAG_USEME = (1 << 3),
TFLAG_BRIDGE = (1 << 4), TFLAG_BRIDGE = (1 << 4),
TFLAG_BOWOUT = (1 << 5), TFLAG_BOWOUT = (1 << 5),
TFLAG_BLEG = (1 << 6), TFLAG_BLEG = (1 << 6),
@ -158,7 +158,7 @@ static switch_status_t tech_init(private_t *tech_pvt, switch_core_session_t *ses
tech_pvt->cng_frame.data = tech_pvt->cng_databuf; tech_pvt->cng_frame.data = tech_pvt->cng_databuf;
tech_pvt->cng_frame.buflen = sizeof(tech_pvt->cng_databuf); tech_pvt->cng_frame.buflen = sizeof(tech_pvt->cng_databuf);
//switch_set_flag((&tech_pvt->cng_frame), SFF_CNG);
tech_pvt->cng_frame.datalen = 2; tech_pvt->cng_frame.datalen = 2;
tech_pvt->bowout_frame_count = (tech_pvt->read_codec.implementation->actual_samples_per_second / tech_pvt->bowout_frame_count = (tech_pvt->read_codec.implementation->actual_samples_per_second /
@ -297,12 +297,10 @@ static switch_status_t channel_on_init(switch_core_session_t *session)
static void do_reset(private_t *tech_pvt) static void do_reset(private_t *tech_pvt)
{ {
switch_clear_flag_locked(tech_pvt, TFLAG_WRITE); switch_clear_flag_locked(tech_pvt, TFLAG_WRITE);
switch_set_flag_locked(tech_pvt, TFLAG_CNG);
switch_mutex_lock(tech_pvt->mutex); switch_mutex_lock(tech_pvt->mutex);
if (tech_pvt->other_tech_pvt) { if (tech_pvt->other_tech_pvt) {
switch_clear_flag_locked(tech_pvt->other_tech_pvt, TFLAG_WRITE); switch_clear_flag_locked(tech_pvt->other_tech_pvt, TFLAG_WRITE);
switch_set_flag_locked(tech_pvt->other_tech_pvt, TFLAG_CNG);
} }
switch_mutex_unlock(tech_pvt->mutex); switch_mutex_unlock(tech_pvt->mutex);
} }
@ -446,12 +444,6 @@ static switch_status_t channel_kill_channel(switch_core_session_t *session, int
switch (sig) { switch (sig) {
case SWITCH_SIG_BREAK: case SWITCH_SIG_BREAK:
switch_set_flag_locked(tech_pvt, TFLAG_CNG);
switch_mutex_lock(tech_pvt->mutex);
if (tech_pvt->other_tech_pvt) {
switch_set_flag_locked(tech_pvt->other_tech_pvt, TFLAG_CNG);
}
switch_mutex_unlock(tech_pvt->mutex);
break; break;
case SWITCH_SIG_KILL: case SWITCH_SIG_KILL:
switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING); switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING);
@ -583,18 +575,12 @@ static switch_status_t channel_read_frame(switch_core_session_t *session, switch
tech_pvt->write_frame->codec = &tech_pvt->read_codec; tech_pvt->write_frame->codec = &tech_pvt->read_codec;
*frame = tech_pvt->write_frame; *frame = tech_pvt->write_frame;
tech_pvt->packet_count++; tech_pvt->packet_count++;
switch_clear_flag_locked(tech_pvt, TFLAG_CNG);
switch_clear_flag(tech_pvt->write_frame, SFF_CNG); switch_clear_flag(tech_pvt->write_frame, SFF_CNG);
} else { } else {
switch_set_flag(tech_pvt, TFLAG_CNG);
}
if (switch_test_flag(tech_pvt, TFLAG_CNG)) {
*frame = &tech_pvt->cng_frame; *frame = &tech_pvt->cng_frame;
tech_pvt->cng_frame.codec = &tech_pvt->read_codec; tech_pvt->cng_frame.codec = &tech_pvt->read_codec;
tech_pvt->cng_frame.datalen = tech_pvt->read_codec.implementation->decoded_bytes_per_packet; tech_pvt->cng_frame.datalen = tech_pvt->read_codec.implementation->decoded_bytes_per_packet;
switch_set_flag((&tech_pvt->cng_frame), SFF_CNG); switch_set_flag((&tech_pvt->cng_frame), SFF_CNG);
switch_clear_flag_locked(tech_pvt, TFLAG_CNG);
} }
@ -613,6 +599,17 @@ static switch_status_t channel_read_frame(switch_core_session_t *session, switch
return status; return status;
} }
static void clear_queue(private_t *tech_pvt)
{
void *pop;
while (switch_queue_trypop(tech_pvt->frame_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
switch_frame_t *frame = (switch_frame_t *) pop;
switch_frame_free(&frame);
}
}
static switch_status_t channel_write_frame(switch_core_session_t *session, switch_frame_t *frame, switch_io_flag_t flags, int stream_id) static switch_status_t channel_write_frame(switch_core_session_t *session, switch_frame_t *frame, switch_io_flag_t flags, int stream_id)
{ {
switch_channel_t *channel = NULL; switch_channel_t *channel = NULL;
@ -626,7 +623,7 @@ static switch_status_t channel_write_frame(switch_core_session_t *session, switc
switch_assert(tech_pvt != NULL); switch_assert(tech_pvt != NULL);
if (switch_test_flag(frame, SFF_CNG) || if (switch_test_flag(frame, SFF_CNG) ||
switch_test_flag(tech_pvt, TFLAG_CNG) || (switch_test_flag(tech_pvt, TFLAG_BOWOUT) && switch_test_flag(tech_pvt, TFLAG_BOWOUT_USED))) { (switch_test_flag(tech_pvt, TFLAG_BOWOUT) && switch_test_flag(tech_pvt, TFLAG_BOWOUT_USED))) {
switch_core_timer_sync(&tech_pvt->timer); switch_core_timer_sync(&tech_pvt->timer);
switch_core_timer_sync(&tech_pvt->other_tech_pvt->timer); switch_core_timer_sync(&tech_pvt->other_tech_pvt->timer);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
@ -672,23 +669,27 @@ static switch_status_t channel_write_frame(switch_core_session_t *session, switc
if (switch_test_flag(tech_pvt, TFLAG_LINKED) && tech_pvt->other_tech_pvt) { if (switch_test_flag(tech_pvt, TFLAG_LINKED) && tech_pvt->other_tech_pvt) {
switch_frame_t *clone; switch_frame_t *clone;
if (frame->codec->implementation != tech_pvt->write_codec.implementation) { if (frame->codec->implementation != tech_pvt->write_codec.implementation) {
/* change codecs to match */ /* change codecs to match */
tech_init(tech_pvt, session, frame->codec); tech_init(tech_pvt, session, frame->codec);
tech_init(tech_pvt->other_tech_pvt, tech_pvt->other_session, frame->codec); tech_init(tech_pvt->other_tech_pvt, tech_pvt->other_session, frame->codec);
} }
if (switch_queue_size(tech_pvt->other_tech_pvt->frame_queue) < FRAME_QUEUE_LEN) {
if (switch_frame_dup(frame, &clone) != SWITCH_STATUS_SUCCESS) {
abort();
}
if (switch_queue_trypush(tech_pvt->other_tech_pvt->frame_queue, clone) != SWITCH_STATUS_SUCCESS) { if (switch_frame_dup(frame, &clone) != SWITCH_STATUS_SUCCESS) {
switch_frame_free(&clone); abort();
} }
if ((status = switch_queue_trypush(tech_pvt->other_tech_pvt->frame_queue, clone)) != SWITCH_STATUS_SUCCESS) {
clear_queue(tech_pvt->other_tech_pvt);
status = switch_queue_trypush(tech_pvt->other_tech_pvt->frame_queue, clone);
}
if (status == SWITCH_STATUS_SUCCESS) {
switch_set_flag_locked(tech_pvt->other_tech_pvt, TFLAG_WRITE); switch_set_flag_locked(tech_pvt->other_tech_pvt, TFLAG_WRITE);
} else {
switch_frame_free(&clone);
} }
status = SWITCH_STATUS_SUCCESS; status = SWITCH_STATUS_SUCCESS;
@ -747,22 +748,13 @@ static switch_status_t channel_receive_message(switch_core_session_t *session, s
case SWITCH_MESSAGE_INDICATE_UNBRIDGE: case SWITCH_MESSAGE_INDICATE_UNBRIDGE:
case SWITCH_MESSAGE_INDICATE_AUDIO_SYNC: case SWITCH_MESSAGE_INDICATE_AUDIO_SYNC:
{ {
void *pop;
done = 1; done = 1;
while (switch_queue_trypop(tech_pvt->frame_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { clear_queue(tech_pvt);
switch_frame_t *frame = (switch_frame_t *) pop; clear_queue(tech_pvt->other_tech_pvt);
switch_frame_free(&frame);
}
while (switch_queue_trypop(tech_pvt->other_tech_pvt->frame_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
switch_frame_t *frame = (switch_frame_t *) pop;
switch_frame_free(&frame);
}
switch_core_timer_sync(&tech_pvt->timer); switch_core_timer_sync(&tech_pvt->timer);
switch_core_timer_sync(&tech_pvt->other_tech_pvt->timer);
} }
break; break;
default: default: