From 7119462aa1e553a5719258b1fedd62ac81bb369f Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Tue, 31 Mar 2009 19:10:43 +0000 Subject: [PATCH] add rtp-autoflush profile param and rtp_autoflush var to skip timer sleeps when the socket has data ready and refactor the jitter buffer code out into a different function and fix it so it works in blocking mode too git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@12854 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- src/include/switch_types.h | 3 +- src/mod/endpoints/mod_sofia/mod_sofia.h | 1 + src/mod/endpoints/mod_sofia/sofia.c | 12 +++ src/mod/endpoints/mod_sofia/sofia_glue.c | 6 ++ src/switch_rtp.c | 105 ++++++++++------------- 5 files changed, 68 insertions(+), 59 deletions(-) diff --git a/src/include/switch_types.h b/src/include/switch_types.h index 9d9ad16f58..7a7166c21c 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -470,7 +470,8 @@ typedef enum { SWITCH_RTP_FLAG_SECURE_RECV_RESET = (1 << 17), SWITCH_RTP_FLAG_PROXY_MEDIA = (1 << 18), SWITCH_RTP_FLAG_SHUTDOWN = (1 << 19), - SWITCH_RTP_FLAG_FLUSH = (1 << 20) + SWITCH_RTP_FLAG_FLUSH = (1 << 20), + SWITCH_RTP_FLAG_AUTOFLUSH = (1 << 21) } switch_rtp_flag_enum_t; typedef uint32_t switch_rtp_flag_t; diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.h b/src/mod/endpoints/mod_sofia/mod_sofia.h index b8f665cc44..ed3616c762 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.h +++ b/src/mod/endpoints/mod_sofia/mod_sofia.h @@ -181,6 +181,7 @@ typedef enum { PFLAG_MANAGE_SHARED_APPEARANCE, PFLAG_DISABLE_SRV, PFLAG_DISABLE_NAPTR, + PFLAG_AUTOFLUSH, /* No new flags below this line */ PFLAG_MAX diff --git a/src/mod/endpoints/mod_sofia/sofia.c b/src/mod/endpoints/mod_sofia/sofia.c index 2044188636..28f3f03554 100644 --- a/src/mod/endpoints/mod_sofia/sofia.c +++ b/src/mod/endpoints/mod_sofia/sofia.c @@ -1655,6 +1655,12 @@ switch_status_t reconfig_sofia(sofia_profile_t *profile) } else { sofia_clear_pflag(profile, PFLAG_PASS_RFC2833); } + } else if (!strcasecmp(var, "rtp-autoflush")) { + if (switch_true(val)) { + sofia_set_pflag(profile, PFLAG_AUTOFLUSH); + } else { + sofia_clear_pflag(profile, PFLAG_AUTOFLUSH); + } } else if (!strcasecmp(var, "inbound-codec-negotiation")) { if (!strcasecmp(val, "greedy")) { sofia_set_pflag(profile, PFLAG_GREEDY); @@ -2181,6 +2187,12 @@ switch_status_t config_sofia(int reload, char *profile_name) if (switch_true(val)) { sofia_set_pflag(profile, PFLAG_PASS_RFC2833); } + } else if (!strcasecmp(var, "rtp-autoflush")) { + if (switch_true(val)) { + sofia_set_pflag(profile, PFLAG_AUTOFLUSH); + } else { + sofia_clear_pflag(profile, PFLAG_AUTOFLUSH); + } } else if (!strcasecmp(var, "inbound-codec-negotiation")) { if (!strcasecmp(val, "greedy")) { sofia_set_pflag(profile, PFLAG_GREEDY); diff --git a/src/mod/endpoints/mod_sofia/sofia_glue.c b/src/mod/endpoints/mod_sofia/sofia_glue.c index ab6a00fa1b..67fac6ac0b 100644 --- a/src/mod/endpoints/mod_sofia/sofia_glue.c +++ b/src/mod/endpoints/mod_sofia/sofia_glue.c @@ -2054,6 +2054,12 @@ switch_status_t sofia_glue_activate_rtp(private_object_t *tech_pvt, switch_rtp_f flags |= SWITCH_RTP_FLAG_PASS_RFC2833; } + + if (sofia_test_pflag(tech_pvt->profile, PFLAG_AUTOFLUSH) + || ((val = switch_channel_get_variable(tech_pvt->channel, "rtp_autoflush")) && switch_true(val))) { + flags |= SWITCH_RTP_FLAG_AUTOFLUSH; + } + if (!(sofia_test_pflag(tech_pvt->profile, PFLAG_REWRITE_TIMESTAMPS) || ((val = switch_channel_get_variable(tech_pvt->channel, "rtp_rewrite_timestamps")) && !switch_true(val)))) { flags |= SWITCH_RTP_FLAG_RAW_WRITE; diff --git a/src/switch_rtp.c b/src/switch_rtp.c index d8905c1324..846fe20f2c 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -599,10 +599,10 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_local_address(switch_rtp_t *rtp_s if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_USE_TIMER) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_NOBLOCK)) { switch_socket_opt_set(rtp_session->sock_input, SWITCH_SO_NONBLOCK, TRUE); switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_NOBLOCK); - } else { - switch_socket_create_pollfd(&rtp_session->read_pollfd, rtp_session->sock_input, SWITCH_POLLIN | SWITCH_POLLERR, rtp_session->pool); } + switch_socket_create_pollfd(&rtp_session->read_pollfd, rtp_session->sock_input, SWITCH_POLLIN | SWITCH_POLLERR, rtp_session->pool); + status = SWITCH_STATUS_SUCCESS; *err = "Success"; switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_IO); @@ -1016,13 +1016,8 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_activate_stun_ping(switch_rtp_t *rtp_ SWITCH_DECLARE(switch_status_t) switch_rtp_activate_jitter_buffer(switch_rtp_t *rtp_session, uint32_t queue_frames) { - if (rtp_session->read_pollfd) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Can't use jitterbuffer without a timer.\n"); - return SWITCH_STATUS_FALSE; - } rtp_session->jb = stfu_n_init(queue_frames); - switch_socket_create_pollfd(&rtp_session->jb_pollfd, rtp_session->sock_input, SWITCH_POLLIN | SWITCH_POLLERR, rtp_session->pool); return SWITCH_STATUS_SUCCESS; } @@ -1371,18 +1366,53 @@ static void do_flush(switch_rtp_t *rtp_session) #define return_cng_frame() do_cng = 1; goto timer_check +static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t *bytes, switch_frame_flag_t *flags) +{ + switch_status_t status = SWITCH_STATUS_FALSE; + stfu_frame_t *jb_frame; + + switch_assert(bytes); + + *bytes = sizeof(rtp_msg_t); + status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, bytes); + + if (rtp_session->jb && rtp_session->recv_msg.header.version == 2 && *bytes) { + if (rtp_session->recv_msg.header.m && rtp_session->recv_msg.header.pt != rtp_session->te) { + stfu_n_reset(rtp_session->jb); + } + + stfu_n_eat(rtp_session->jb, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.body, *bytes - rtp_header_len); + *bytes = 0; + status = SWITCH_STATUS_FALSE; + } + + if (rtp_session->jb) { + if ((jb_frame = stfu_n_read_a_frame(rtp_session->jb))) { + memcpy(rtp_session->recv_msg.body, jb_frame->data, jb_frame->dlen); + if (jb_frame->plc) { + *flags |= SFF_PLC; + } + *bytes = jb_frame->dlen + rtp_header_len; + rtp_session->recv_msg.header.ts = htonl(jb_frame->ts); + rtp_session->recv_msg.header.pt = rtp_session->payload; + status = SWITCH_STATUS_SUCCESS; + } + } + + return status; +} + static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_type, switch_frame_flag_t *flags, switch_io_flag_t io_flags) { switch_size_t bytes = 0; switch_status_t status = SWITCH_STATUS_SUCCESS, poll_status = SWITCH_STATUS_SUCCESS; int check = 0; - stfu_frame_t *jb_frame; int ret = -1; int sleep_mss = 1000; int poll_sec = 5; int poll_loop = 0; int fdr = 0; - int from_jb = 0; + int hot_socket = 0; if (!switch_rtp_ready(rtp_session)) { return -1; @@ -1398,42 +1428,24 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ int do_cng = 0; if (rtp_session->timer.interval) { - int do_sleep = 1; - if (rtp_session->jb) { - if (switch_poll(rtp_session->jb_pollfd, 1, &fdr, 1) == SWITCH_STATUS_SUCCESS) { - do_sleep = 0; + if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOFLUSH)) { + if (switch_poll(rtp_session->read_pollfd, 1, &fdr, 1) == SWITCH_STATUS_SUCCESS) { + hot_socket = 1; } } - if (do_sleep) { + if (!hot_socket) { switch_core_timer_next(&rtp_session->timer); } } recvfrom: - if (rtp_session->jb) { - if ((jb_frame = stfu_n_read_a_frame(rtp_session->jb))) { - memcpy(rtp_session->recv_msg.body, jb_frame->data, jb_frame->dlen); - if (jb_frame->plc) { - *flags |= SFF_PLC; - } - bytes = jb_frame->dlen + rtp_header_len; - rtp_session->recv_msg.header.ts = htonl(jb_frame->ts); - rtp_session->recv_msg.header.pt = rtp_session->payload; - from_jb = 1; - goto post_read; - } - } - - - if (rtp_session->read_pollfd) { + if (!rtp_session->timer.interval) { poll_status = switch_poll(rtp_session->read_pollfd, 1, &fdr, poll_sec * 1000000); } - - + if (poll_status == SWITCH_STATUS_SUCCESS) { - bytes = sizeof(rtp_msg_t); - status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, &bytes); + status = read_rtp_packet(rtp_session, &bytes, flags); } else { if (!SWITCH_STATUS_IS_BREAK(poll_status) && poll_status != SWITCH_STATUS_TIMEOUT) { ret = -1; @@ -1445,8 +1457,6 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ bytes = 0; } - post_read: - if (bytes < 0) { ret = (int) bytes; goto end; @@ -1574,20 +1584,6 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ goto end; } - if (rtp_session->jb && !from_jb && ((bytes && rtp_session->recv_msg.header.pt == rtp_session->payload) || check)) { - if (bytes) { - - if (rtp_session->recv_msg.header.m) { - stfu_n_reset(rtp_session->jb); - } - - stfu_n_eat(rtp_session->jb, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.body, bytes - rtp_header_len); - bytes = 0; - goto recvfrom; - } - } - - if (bytes && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_SECURE_RECV)) { int sbytes = (int) bytes; err_status_t stat = 0; @@ -1749,14 +1745,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ } if (check || (bytes && !rtp_session->timer.interval)) { - if (rtp_session->jb && (jb_frame = stfu_n_read_a_frame(rtp_session->jb))) { - memcpy(rtp_session->recv_msg.body, jb_frame->data, jb_frame->dlen); - if (jb_frame->plc) { - *flags |= SFF_PLC; - } - bytes = jb_frame->dlen + rtp_header_len; - rtp_session->recv_msg.header.ts = htonl(jb_frame->ts); - } else if (!bytes && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_USE_TIMER)) { /* We're late! We're Late! */ + if (!bytes && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_USE_TIMER)) { /* We're late! We're Late! */ if (!switch_test_flag(rtp_session, SWITCH_RTP_FLAG_NOBLOCK) && status == SWITCH_STATUS_BREAK) { switch_cond_next(); continue;