add rtp-autoflush profile param and rtp_autoflush var to skip timer sleeps when the socket has data ready and refactor the jitter buffer code out into a different function and fix it so it works in blocking mode too
git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@12854 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
parent
7b72eceba2
commit
7119462aa1
|
@ -470,7 +470,8 @@ typedef enum {
|
|||
SWITCH_RTP_FLAG_SECURE_RECV_RESET = (1 << 17),
|
||||
SWITCH_RTP_FLAG_PROXY_MEDIA = (1 << 18),
|
||||
SWITCH_RTP_FLAG_SHUTDOWN = (1 << 19),
|
||||
SWITCH_RTP_FLAG_FLUSH = (1 << 20)
|
||||
SWITCH_RTP_FLAG_FLUSH = (1 << 20),
|
||||
SWITCH_RTP_FLAG_AUTOFLUSH = (1 << 21)
|
||||
} switch_rtp_flag_enum_t;
|
||||
typedef uint32_t switch_rtp_flag_t;
|
||||
|
||||
|
|
|
@ -181,6 +181,7 @@ typedef enum {
|
|||
PFLAG_MANAGE_SHARED_APPEARANCE,
|
||||
PFLAG_DISABLE_SRV,
|
||||
PFLAG_DISABLE_NAPTR,
|
||||
PFLAG_AUTOFLUSH,
|
||||
|
||||
/* No new flags below this line */
|
||||
PFLAG_MAX
|
||||
|
|
|
@ -1655,6 +1655,12 @@ switch_status_t reconfig_sofia(sofia_profile_t *profile)
|
|||
} else {
|
||||
sofia_clear_pflag(profile, PFLAG_PASS_RFC2833);
|
||||
}
|
||||
} else if (!strcasecmp(var, "rtp-autoflush")) {
|
||||
if (switch_true(val)) {
|
||||
sofia_set_pflag(profile, PFLAG_AUTOFLUSH);
|
||||
} else {
|
||||
sofia_clear_pflag(profile, PFLAG_AUTOFLUSH);
|
||||
}
|
||||
} else if (!strcasecmp(var, "inbound-codec-negotiation")) {
|
||||
if (!strcasecmp(val, "greedy")) {
|
||||
sofia_set_pflag(profile, PFLAG_GREEDY);
|
||||
|
@ -2181,6 +2187,12 @@ switch_status_t config_sofia(int reload, char *profile_name)
|
|||
if (switch_true(val)) {
|
||||
sofia_set_pflag(profile, PFLAG_PASS_RFC2833);
|
||||
}
|
||||
} else if (!strcasecmp(var, "rtp-autoflush")) {
|
||||
if (switch_true(val)) {
|
||||
sofia_set_pflag(profile, PFLAG_AUTOFLUSH);
|
||||
} else {
|
||||
sofia_clear_pflag(profile, PFLAG_AUTOFLUSH);
|
||||
}
|
||||
} else if (!strcasecmp(var, "inbound-codec-negotiation")) {
|
||||
if (!strcasecmp(val, "greedy")) {
|
||||
sofia_set_pflag(profile, PFLAG_GREEDY);
|
||||
|
|
|
@ -2054,6 +2054,12 @@ switch_status_t sofia_glue_activate_rtp(private_object_t *tech_pvt, switch_rtp_f
|
|||
flags |= SWITCH_RTP_FLAG_PASS_RFC2833;
|
||||
}
|
||||
|
||||
|
||||
if (sofia_test_pflag(tech_pvt->profile, PFLAG_AUTOFLUSH)
|
||||
|| ((val = switch_channel_get_variable(tech_pvt->channel, "rtp_autoflush")) && switch_true(val))) {
|
||||
flags |= SWITCH_RTP_FLAG_AUTOFLUSH;
|
||||
}
|
||||
|
||||
if (!(sofia_test_pflag(tech_pvt->profile, PFLAG_REWRITE_TIMESTAMPS) ||
|
||||
((val = switch_channel_get_variable(tech_pvt->channel, "rtp_rewrite_timestamps")) && !switch_true(val)))) {
|
||||
flags |= SWITCH_RTP_FLAG_RAW_WRITE;
|
||||
|
|
103
src/switch_rtp.c
103
src/switch_rtp.c
|
@ -599,10 +599,10 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_local_address(switch_rtp_t *rtp_s
|
|||
if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_USE_TIMER) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_NOBLOCK)) {
|
||||
switch_socket_opt_set(rtp_session->sock_input, SWITCH_SO_NONBLOCK, TRUE);
|
||||
switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_NOBLOCK);
|
||||
} else {
|
||||
switch_socket_create_pollfd(&rtp_session->read_pollfd, rtp_session->sock_input, SWITCH_POLLIN | SWITCH_POLLERR, rtp_session->pool);
|
||||
}
|
||||
|
||||
switch_socket_create_pollfd(&rtp_session->read_pollfd, rtp_session->sock_input, SWITCH_POLLIN | SWITCH_POLLERR, rtp_session->pool);
|
||||
|
||||
status = SWITCH_STATUS_SUCCESS;
|
||||
*err = "Success";
|
||||
switch_set_flag_locked(rtp_session, SWITCH_RTP_FLAG_IO);
|
||||
|
@ -1016,13 +1016,8 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_activate_stun_ping(switch_rtp_t *rtp_
|
|||
|
||||
SWITCH_DECLARE(switch_status_t) switch_rtp_activate_jitter_buffer(switch_rtp_t *rtp_session, uint32_t queue_frames)
|
||||
{
|
||||
if (rtp_session->read_pollfd) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Can't use jitterbuffer without a timer.\n");
|
||||
return SWITCH_STATUS_FALSE;
|
||||
}
|
||||
|
||||
rtp_session->jb = stfu_n_init(queue_frames);
|
||||
switch_socket_create_pollfd(&rtp_session->jb_pollfd, rtp_session->sock_input, SWITCH_POLLIN | SWITCH_POLLERR, rtp_session->pool);
|
||||
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
@ -1371,18 +1366,53 @@ static void do_flush(switch_rtp_t *rtp_session)
|
|||
|
||||
#define return_cng_frame() do_cng = 1; goto timer_check
|
||||
|
||||
static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t *bytes, switch_frame_flag_t *flags)
|
||||
{
|
||||
switch_status_t status = SWITCH_STATUS_FALSE;
|
||||
stfu_frame_t *jb_frame;
|
||||
|
||||
switch_assert(bytes);
|
||||
|
||||
*bytes = sizeof(rtp_msg_t);
|
||||
status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, bytes);
|
||||
|
||||
if (rtp_session->jb && rtp_session->recv_msg.header.version == 2 && *bytes) {
|
||||
if (rtp_session->recv_msg.header.m && rtp_session->recv_msg.header.pt != rtp_session->te) {
|
||||
stfu_n_reset(rtp_session->jb);
|
||||
}
|
||||
|
||||
stfu_n_eat(rtp_session->jb, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.body, *bytes - rtp_header_len);
|
||||
*bytes = 0;
|
||||
status = SWITCH_STATUS_FALSE;
|
||||
}
|
||||
|
||||
if (rtp_session->jb) {
|
||||
if ((jb_frame = stfu_n_read_a_frame(rtp_session->jb))) {
|
||||
memcpy(rtp_session->recv_msg.body, jb_frame->data, jb_frame->dlen);
|
||||
if (jb_frame->plc) {
|
||||
*flags |= SFF_PLC;
|
||||
}
|
||||
*bytes = jb_frame->dlen + rtp_header_len;
|
||||
rtp_session->recv_msg.header.ts = htonl(jb_frame->ts);
|
||||
rtp_session->recv_msg.header.pt = rtp_session->payload;
|
||||
status = SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_type, switch_frame_flag_t *flags, switch_io_flag_t io_flags)
|
||||
{
|
||||
switch_size_t bytes = 0;
|
||||
switch_status_t status = SWITCH_STATUS_SUCCESS, poll_status = SWITCH_STATUS_SUCCESS;
|
||||
int check = 0;
|
||||
stfu_frame_t *jb_frame;
|
||||
int ret = -1;
|
||||
int sleep_mss = 1000;
|
||||
int poll_sec = 5;
|
||||
int poll_loop = 0;
|
||||
int fdr = 0;
|
||||
int from_jb = 0;
|
||||
int hot_socket = 0;
|
||||
|
||||
if (!switch_rtp_ready(rtp_session)) {
|
||||
return -1;
|
||||
|
@ -1398,42 +1428,24 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
|
|||
int do_cng = 0;
|
||||
|
||||
if (rtp_session->timer.interval) {
|
||||
int do_sleep = 1;
|
||||
if (rtp_session->jb) {
|
||||
if (switch_poll(rtp_session->jb_pollfd, 1, &fdr, 1) == SWITCH_STATUS_SUCCESS) {
|
||||
do_sleep = 0;
|
||||
if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOFLUSH)) {
|
||||
if (switch_poll(rtp_session->read_pollfd, 1, &fdr, 1) == SWITCH_STATUS_SUCCESS) {
|
||||
hot_socket = 1;
|
||||
}
|
||||
}
|
||||
if (do_sleep) {
|
||||
if (!hot_socket) {
|
||||
switch_core_timer_next(&rtp_session->timer);
|
||||
}
|
||||
}
|
||||
|
||||
recvfrom:
|
||||
|
||||
if (rtp_session->jb) {
|
||||
if ((jb_frame = stfu_n_read_a_frame(rtp_session->jb))) {
|
||||
memcpy(rtp_session->recv_msg.body, jb_frame->data, jb_frame->dlen);
|
||||
if (jb_frame->plc) {
|
||||
*flags |= SFF_PLC;
|
||||
}
|
||||
bytes = jb_frame->dlen + rtp_header_len;
|
||||
rtp_session->recv_msg.header.ts = htonl(jb_frame->ts);
|
||||
rtp_session->recv_msg.header.pt = rtp_session->payload;
|
||||
from_jb = 1;
|
||||
goto post_read;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (rtp_session->read_pollfd) {
|
||||
if (!rtp_session->timer.interval) {
|
||||
poll_status = switch_poll(rtp_session->read_pollfd, 1, &fdr, poll_sec * 1000000);
|
||||
}
|
||||
|
||||
|
||||
if (poll_status == SWITCH_STATUS_SUCCESS) {
|
||||
bytes = sizeof(rtp_msg_t);
|
||||
status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, &bytes);
|
||||
status = read_rtp_packet(rtp_session, &bytes, flags);
|
||||
} else {
|
||||
if (!SWITCH_STATUS_IS_BREAK(poll_status) && poll_status != SWITCH_STATUS_TIMEOUT) {
|
||||
ret = -1;
|
||||
|
@ -1445,8 +1457,6 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
|
|||
bytes = 0;
|
||||
}
|
||||
|
||||
post_read:
|
||||
|
||||
if (bytes < 0) {
|
||||
ret = (int) bytes;
|
||||
goto end;
|
||||
|
@ -1574,20 +1584,6 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
|
|||
goto end;
|
||||
}
|
||||
|
||||
if (rtp_session->jb && !from_jb && ((bytes && rtp_session->recv_msg.header.pt == rtp_session->payload) || check)) {
|
||||
if (bytes) {
|
||||
|
||||
if (rtp_session->recv_msg.header.m) {
|
||||
stfu_n_reset(rtp_session->jb);
|
||||
}
|
||||
|
||||
stfu_n_eat(rtp_session->jb, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.body, bytes - rtp_header_len);
|
||||
bytes = 0;
|
||||
goto recvfrom;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (bytes && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_SECURE_RECV)) {
|
||||
int sbytes = (int) bytes;
|
||||
err_status_t stat = 0;
|
||||
|
@ -1749,14 +1745,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
|
|||
}
|
||||
|
||||
if (check || (bytes && !rtp_session->timer.interval)) {
|
||||
if (rtp_session->jb && (jb_frame = stfu_n_read_a_frame(rtp_session->jb))) {
|
||||
memcpy(rtp_session->recv_msg.body, jb_frame->data, jb_frame->dlen);
|
||||
if (jb_frame->plc) {
|
||||
*flags |= SFF_PLC;
|
||||
}
|
||||
bytes = jb_frame->dlen + rtp_header_len;
|
||||
rtp_session->recv_msg.header.ts = htonl(jb_frame->ts);
|
||||
} else if (!bytes && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_USE_TIMER)) { /* We're late! We're Late! */
|
||||
if (!bytes && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_USE_TIMER)) { /* We're late! We're Late! */
|
||||
if (!switch_test_flag(rtp_session, SWITCH_RTP_FLAG_NOBLOCK) && status == SWITCH_STATUS_BREAK) {
|
||||
switch_cond_next();
|
||||
continue;
|
||||
|
|
Loading…
Reference in New Issue