FS-7499: add generic nack support to rtp stack

This commit is contained in:
Anthony Minessale 2015-01-09 13:36:57 -06:00 committed by Michael Jerris
parent 836e2b2888
commit 17aa836403
4 changed files with 509 additions and 458 deletions

View File

@ -474,6 +474,8 @@ SWITCH_DECLARE(int) switch_rtp_write_frame(switch_rtp_t *rtp_session, switch_fra
SWITCH_DECLARE(int) switch_rtp_write_manual(switch_rtp_t *rtp_session,
void *data, uint32_t datalen, uint8_t m, switch_payload_t payload, uint32_t ts, switch_frame_flag_t *flags);
SWITCH_DECLARE(switch_status_t) switch_rtp_write_raw(switch_rtp_t *rtp_session, void *data, switch_size_t *bytes, switch_bool_t process_encryption);
/*!
\brief Retrieve the SSRC from a given RTP session
\param rtp_session the RTP session to retrieve from

View File

@ -34,7 +34,7 @@
#define SWITCH_VIDDERBUFFER_H
SWITCH_BEGIN_EXTERN_C
SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len, switch_bool_t timer_compensation);
SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len);
SWITCH_DECLARE(switch_status_t) switch_vb_destroy(switch_vb_t **vbp);
SWITCH_DECLARE(void) switch_vb_reset(switch_vb_t *vb, switch_bool_t flush);
SWITCH_DECLARE(void) switch_vb_debug_level(switch_vb_t *vb, uint8_t level);
@ -42,6 +42,8 @@ SWITCH_DECLARE(int) switch_vb_frame_count(switch_vb_t *vb);
SWITCH_DECLARE(int) switch_vb_poll(switch_vb_t *vb);
SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len);
SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t *len);
SWITCH_DECLARE(uint32_t) switch_vb_pop_nack(switch_vb_t *vb);
SWITCH_DECLARE(switch_status_t) switch_vb_get_packet_by_seq(switch_vb_t *vb, uint16_t seq, switch_rtp_packet_t *packet, switch_size_t *len);
SWITCH_END_EXTERN_C
#endif

View File

@ -300,7 +300,7 @@ struct switch_rtp {
uint8_t fir_seq;
uint16_t fir_count;
uint16_t pli_count;
uint32_t cur_nack;
ts_normalize_t ts_norm;
switch_sockaddr_t *remote_addr, *rtcp_remote_addr;
rtp_msg_t recv_msg;
@ -389,6 +389,7 @@ struct switch_rtp {
uint8_t cn;
stfu_instance_t *jb;
switch_vb_t *vb;
switch_vb_t *vbw;
uint32_t max_missed_packets;
uint32_t missed_count;
rtp_msg_t write_msg;
@ -1848,9 +1849,13 @@ static int check_rtcp_and_ice(switch_rtp_t *rtp_session)
rtcp_ok = 0;
}
if (rtp_session->flags[SWITCH_RTP_FLAG_NACK] && rtp_session->vb) {
rtp_session->cur_nack = switch_vb_pop_nack(rtp_session->vb);
}
if (rtp_session->rtcp_sock_output && rtp_session->flags[SWITCH_RTP_FLAG_ENABLE_RTCP] &&
!rtp_session->flags[SWITCH_RTP_FLAG_RTCP_PASSTHRU] &&
((now - rtp_session->rtcp_last_sent) > rtp_session->rtcp_send_rate * 1000000 || rtp_session->pli_count || rtp_session->fir_count)) {
((now - rtp_session->rtcp_last_sent) > rtp_session->rtcp_send_rate * 1000000 || rtp_session->pli_count || rtp_session->fir_count || rtp_session->cur_nack)) {
switch_rtcp_numbers_t * stats = &rtp_session->stats.rtcp;
struct switch_rtcp_receiver_report *rr;
struct switch_rtcp_sender_report *sr;
@ -1886,7 +1891,7 @@ static int check_rtcp_and_ice(switch_rtp_t *rtp_session)
rtcp_generate_report_block(rtp_session, rtcp_report_block);
rtp_session->rtcp_send_msg.header.length = htons((uint16_t)(rtcp_bytes / 4) - 1);
if (rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) {
if (rtp_session->remote_ssrc == 0) {
rtp_session->remote_ssrc = rtp_session->stats.rtcp.peer_ssrc;
@ -1916,6 +1921,31 @@ static int check_rtcp_and_ice(switch_rtp_t *rtp_session)
rtcp_bytes += sizeof(switch_rtcp_ext_hdr_t);
rtp_session->pli_count = 0;
}
if (rtp_session->flags[SWITCH_RTP_FLAG_NACK] && rtp_session->cur_nack) {
switch_rtcp_ext_hdr_t *ext_hdr;
uint32_t *nack;
p = (uint8_t *) (&rtp_session->rtcp_send_msg) + rtcp_bytes;
ext_hdr = (switch_rtcp_ext_hdr_t *) p;
ext_hdr->version = 2;
ext_hdr->p = 0;
ext_hdr->fmt = 1;
ext_hdr->pt = 205;
ext_hdr->send_ssrc = htonl(rtp_session->ssrc);
ext_hdr->recv_ssrc = htonl(rtp_session->remote_ssrc);
ext_hdr->length = htons(3);
p += sizeof(switch_rtcp_ext_hdr_t);
nack = (uint32_t *) p;
*nack = rtp_session->cur_nack;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Sending RTCP NACK %u\n",
ntohs(*nack & 0xFFFF));
rtcp_bytes += sizeof(switch_rtcp_ext_hdr_t) + sizeof(rtp_session->cur_nack);
rtp_session->cur_nack = 0;
}
if (rtp_session->fir_count) {
switch_rtcp_ext_hdr_t *ext_hdr;
@ -3489,8 +3519,8 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_create(switch_rtp_t **new_rtp_session
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Starting video timer.\n");
}
switch_vb_create(&rtp_session->vb, 5, 30, SWITCH_FALSE);
switch_vb_debug_level(rtp_session->vb, 10);
switch_vb_create(&rtp_session->vb, 5, 30);
//switch_vb_debug_level(rtp_session->vb, 10);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Starting video buffer.\n");
} else {
@ -4131,6 +4161,10 @@ SWITCH_DECLARE(void) switch_rtp_destroy(switch_rtp_t **rtp_session)
switch_vb_destroy(&(*rtp_session)->vb);
}
if ((*rtp_session)->vbw) {
switch_vb_destroy(&(*rtp_session)->vbw);
}
if ((*rtp_session)->dtls && (*rtp_session)->dtls == (*rtp_session)->rtcp_dtls) {
(*rtp_session)->rtcp_dtls = NULL;
}
@ -4824,23 +4858,36 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t
my_host = switch_get_addr(bufc, sizeof(bufc), rtp_session->local_addr);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(rtp_session->session), SWITCH_LOG_CONSOLE,
"R %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u m=%d\n",
"R %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u seq=%u m=%d\n",
rtp_session->session ? switch_channel_get_name(switch_core_session_get_channel(rtp_session->session)) : "No-Name",
(long) *bytes,
my_host, switch_sockaddr_get_port(rtp_session->local_addr),
old_host, rtp_session->remote_port,
tx_host, switch_sockaddr_get_port(rtp_session->from_addr),
rtp_session->recv_msg.header.pt, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.header.m);
rtp_session->recv_msg.header.pt, ntohl(rtp_session->recv_msg.header.ts), ntohs(rtp_session->recv_msg.header.seq),
rtp_session->recv_msg.header.m);
}
#ifdef RTP_READ_PLOSS
{
int r = (rand() % 10000) + 1;
if (r <= 200) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ALERT,
"Simulate dropped packet ......... ts: %u seq: %u\n", ntohl(rtp_session->recv_msg.header.ts), ntohs(rtp_session->recv_msg.header.seq));
*bytes = 0;
}
}
#endif
if (sync) {
if (!rtp_session->flags[SWITCH_RTP_FLAG_USE_TIMER] && rtp_session->timer.interval) {
switch_core_timer_sync(&rtp_session->timer);
reset_jitter_seq(rtp_session);
}
rtp_session->hot_hits = 0;
goto more;
}
@ -5049,7 +5096,7 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t
stfu_n_destroy(&rtp_session->jb);
}
}
if (rtp_session->recv_msg.header.version == 2 && *bytes) {
if (rtp_session->vb) {
@ -5146,12 +5193,76 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t
return status;
}
static void handle_nack(switch_rtp_t *rtp_session, uint32_t nack)
{
switch_size_t bytes = 0;
rtp_msg_t send_msg[1] = {{{0}}};
uint16_t seq = (uint16_t) (nack & 0xFFFF);
int i;
const char *tx_host = NULL;
const char *old_host = NULL;
const char *my_host = NULL;
char bufa[30], bufb[30], bufc[30];
if (!(rtp_session->flags[SWITCH_RTP_FLAG_NACK] && rtp_session->vbw)) {
return; /* not enabled */
}
if (rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_WRITE]) {
tx_host = switch_get_addr(bufa, sizeof(bufa), rtp_session->from_addr);
old_host = switch_get_addr(bufb, sizeof(bufb), rtp_session->remote_addr);
my_host = switch_get_addr(bufc, sizeof(bufc), rtp_session->local_addr);
}
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Got NACK [%u][0x%x] for seq %u\n", nack, nack, ntohs(seq));
if (switch_vb_get_packet_by_seq(rtp_session->vbw, seq, (switch_rtp_packet_t *) send_msg, &bytes) == SWITCH_STATUS_SUCCESS) {
if (rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_WRITE]) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(rtp_session->session), SWITCH_LOG_CONSOLE,
"X %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u seq=%u m=%d\n",
rtp_session->session ? switch_channel_get_name(switch_core_session_get_channel(rtp_session->session)) : "NoName",
(long) bytes,
my_host, switch_sockaddr_get_port(rtp_session->local_addr),
old_host, rtp_session->remote_port,
tx_host, switch_sockaddr_get_port(rtp_session->from_addr),
send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m);
}
switch_rtp_write_raw(rtp_session, (void *) send_msg, &bytes, SWITCH_FALSE);
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Cannot send NACK for seq %u\n", ntohs(seq));
}
for (i = 0; i < 16; i++) {
if ((nack & (1 << (16 + i)))) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Also Got NACK for seq %u\n", ntohs(seq) + i);
if (switch_vb_get_packet_by_seq(rtp_session->vbw, htons(ntohs(seq) + i), (switch_rtp_packet_t *) &send_msg, &bytes) == SWITCH_STATUS_SUCCESS) {
if (rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_WRITE]) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(rtp_session->session), SWITCH_LOG_CONSOLE,
"X %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u seq=%u m=%d\n",
rtp_session->session ? switch_channel_get_name(switch_core_session_get_channel(rtp_session->session)) : "NoName",
(long) bytes,
my_host, switch_sockaddr_get_port(rtp_session->local_addr),
old_host, rtp_session->remote_port,
tx_host, switch_sockaddr_get_port(rtp_session->from_addr),
send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m);
}
switch_rtp_write_raw(rtp_session, (void *) &send_msg, &bytes, SWITCH_FALSE);
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Cannot send NACK for seq %u\n", ntohs(seq) + i);
}
}
}
}
static switch_status_t process_rtcp_report(switch_rtp_t *rtp_session, rtcp_msg_t *msg, switch_size_t bytes)
{
switch_status_t status = SWITCH_STATUS_FALSE;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_CRIT,
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG1,
"RTCP packet bytes %" SWITCH_SIZE_T_FMT " type %d pad %d\n",
bytes, msg->header.type, msg->header.p);
@ -5159,13 +5270,26 @@ static switch_status_t process_rtcp_report(switch_rtp_t *rtp_session, rtcp_msg_t
(msg->header.type == 205 || //RTPFB
msg->header.type == 206)) {//PSFB
rtcp_ext_msg_t *extp = (rtcp_ext_msg_t *) msg;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_CRIT, "PICKED UP XRTCP type: %d fmt: %d\n",
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "PICKED UP XRTCP type: %d fmt: %d\n",
msg->header.type, extp->header.fmt);
if ((extp->header.fmt == 4) || (extp->header.fmt == 1)) { /* FIR || PLI */
if (msg->header.type == 206 && (extp->header.fmt == 4 || extp->header.fmt == 1)) { /* FIR || PLI */
switch_core_media_gen_key_frame(rtp_session->session);
switch_channel_set_flag(switch_core_session_get_channel(rtp_session->session), CF_VIDEO_REFRESH_REQ);
}
if (msg->header.type == 205 && extp->header.fmt == 1) { /*NACK*/
uint32_t *nack = (uint32_t *) extp->body;
int i;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Got NACK count %d\n", ntohs(extp->header.length) - 2);
for (i = 0; i < ntohs(extp->header.length) - 2; i++) {
handle_nack(rtp_session, *nack);
nack++;
}
}
} else
if (msg->header.type == 200 || msg->header.type == 201) {
@ -5218,7 +5342,7 @@ static switch_status_t process_rtcp_report(switch_rtp_t *rtp_session, rtcp_msg_t
if (report_block->lsr && !rtp_session->flags[SWITCH_RTP_FLAG_RTCP_PASSTHRU]) {
switch_time_exp_gmt(&now_hr,now);
/* Calculating RTT = A - DLSR - LSR */
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG,
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG1,
"Receiving an RTCP packet\n[%04d-%02d-%02d %02d:%02d:%02d.%d] SSRC[%u]\n"
"RTT[%f] A[%u] - DLSR[%u] - LSR[%u]\n",
1900 + now_hr.tm_year, now_hr.tm_mday, now_hr.tm_mon, now_hr.tm_hour, now_hr.tm_min, now_hr.tm_sec, now_hr.tm_usec,
@ -5587,10 +5711,10 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
pt = 200000;
}
if (rtp_session->vb && switch_vb_poll(rtp_session->vb)) {
pt = 1000;
force = 1;
}
//if (rtp_session->vb && switch_vb_poll(rtp_session->vb)) {
// pt = 1000;
// force = 1;
//}
poll_status = switch_poll(rtp_session->read_pollfd, 1, &fdr, pt);
@ -6794,22 +6918,47 @@ static int rtp_common_write(switch_rtp_t *rtp_session,
my_host = switch_get_addr(bufc, sizeof(bufc), rtp_session->local_addr);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(rtp_session->session), SWITCH_LOG_CONSOLE,
"W %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u m=%d\n",
"W %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u seq=%u m=%d\n",
rtp_session->session ? switch_channel_get_name(switch_core_session_get_channel(rtp_session->session)) : "NoName",
(long) bytes,
my_host, switch_sockaddr_get_port(rtp_session->local_addr),
old_host, rtp_session->remote_port,
tx_host, switch_sockaddr_get_port(rtp_session->from_addr),
send_msg->header.pt, ntohl(send_msg->header.ts), send_msg->header.m);
send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m);
}
if (rtp_session->flags[SWITCH_RTP_FLAG_NACK]) {
if (!rtp_session->vbw) {
switch_vb_create(&rtp_session->vbw, 5, 5);
//switch_vb_debug_level(rtp_session->vbw, 10);
}
switch_vb_put_packet(rtp_session->vbw, (switch_rtp_packet_t *)send_msg, bytes);
}
#ifdef RTP_WRITE_PLOSS
{
int r = (rand() % 10000) + 1;
if (r <= 200) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ALERT,
"Simulate dropping packet ......... ts: %u seq: %u\n", ntohl(send_msg->header.ts), ntohs(send_msg->header.seq));
} else {
if (switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, (void *) send_msg, &bytes) != SWITCH_STATUS_SUCCESS) {
rtp_session->seq--;
ret = -1;
goto end;
}
}
}
#else
if (switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, (void *) send_msg, &bytes) != SWITCH_STATUS_SUCCESS) {
rtp_session->seq--;
ret = -1;
goto end;
}
#endif
rtp_session->last_write_ts = this_ts;
rtp_session->flags[SWITCH_RTP_FLAG_RESET] = 0;
@ -7159,60 +7308,7 @@ SWITCH_DECLARE(int) switch_rtp_write_manual(switch_rtp_t *rtp_session,
bytes = rtp_header_len + datalen;
#ifdef ENABLE_SRTP
if (rtp_session->flags[SWITCH_RTP_FLAG_SECURE_SEND]) {
int sbytes = (int) bytes;
err_status_t stat;
if (rtp_session->flags[SWITCH_RTP_FLAG_SECURE_SEND_RESET]) {
switch_rtp_clear_flag(rtp_session, SWITCH_RTP_FLAG_SECURE_SEND_RESET);
srtp_dealloc(rtp_session->send_ctx[rtp_session->srtp_idx_rtp]);
rtp_session->send_ctx[rtp_session->srtp_idx_rtp] = NULL;
if ((stat = srtp_create(&rtp_session->send_ctx[rtp_session->srtp_idx_rtp], &rtp_session->send_policy[rtp_session->srtp_idx_rtp]))) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ERROR, "Error! RE-Activating Secure RTP SEND\n");
ret = -1;
goto end;
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_INFO, "RE-Activating Secure RTP SEND\n");
}
}
stat = srtp_protect(rtp_session->send_ctx[rtp_session->srtp_idx_rtp], &rtp_session->write_msg.header, &sbytes);
if (stat) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ERROR, "Error: SRTP protection failed with code %d\n", stat);
}
bytes = sbytes;
}
#endif
#ifdef ENABLE_ZRTP
/* ZRTP Send */
if (zrtp_on && !rtp_session->flags[SWITCH_RTP_FLAG_PROXY_MEDIA]) {
unsigned int sbytes = (int) bytes;
zrtp_status_t stat = zrtp_status_fail;
stat = zrtp_process_rtp(rtp_session->zrtp_stream, (void *) &rtp_session->write_msg, &sbytes);
switch (stat) {
case zrtp_status_ok:
break;
case zrtp_status_drop:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: zRTP protection drop with code %d\n", stat);
ret = (int) bytes;
goto end;
break;
case zrtp_status_fail:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: zRTP protection fail with code %d\n", stat);
break;
default:
break;
}
bytes = sbytes;
}
#endif
if (switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, (void *) &rtp_session->write_msg, &bytes) != SWITCH_STATUS_SUCCESS) {
if (switch_rtp_write_raw(rtp_session, (void *) &rtp_session->write_msg, &bytes, SWITCH_TRUE) != SWITCH_STATUS_SUCCESS) {
rtp_session->seq--;
ret = -1;
goto end;
@ -7232,6 +7328,89 @@ SWITCH_DECLARE(int) switch_rtp_write_manual(switch_rtp_t *rtp_session,
return ret;
}
SWITCH_DECLARE(switch_status_t) switch_rtp_write_raw(switch_rtp_t *rtp_session, void *data, switch_size_t *bytes, switch_bool_t process_encryption)
{
switch_status_t status = SWITCH_STATUS_FALSE;
switch_assert(bytes);
if (!switch_rtp_ready(rtp_session) || !rtp_session->remote_addr || *bytes > SWITCH_RTP_MAX_BUF_LEN) {
return status;
}
if (!rtp_write_ready(rtp_session, *bytes, __LINE__)) {
return SWITCH_STATUS_NOT_INITALIZED;
}
WRITE_INC(rtp_session);
if (process_encryption) {
process_encryption = SWITCH_FALSE;
#ifdef ENABLE_SRTP
if (rtp_session->flags[SWITCH_RTP_FLAG_SECURE_SEND]) {
int sbytes = (int) *bytes;
err_status_t stat;
if (rtp_session->flags[SWITCH_RTP_FLAG_SECURE_SEND_RESET]) {
switch_rtp_clear_flag(rtp_session, SWITCH_RTP_FLAG_SECURE_SEND_RESET);
srtp_dealloc(rtp_session->send_ctx[rtp_session->srtp_idx_rtp]);
rtp_session->send_ctx[rtp_session->srtp_idx_rtp] = NULL;
if ((stat = srtp_create(&rtp_session->send_ctx[rtp_session->srtp_idx_rtp], &rtp_session->send_policy[rtp_session->srtp_idx_rtp]))) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ERROR, "Error! RE-Activating Secure RTP SEND\n");
status = SWITCH_STATUS_FALSE;
goto end;
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_INFO, "RE-Activating Secure RTP SEND\n");
}
}
stat = srtp_protect(rtp_session->send_ctx[rtp_session->srtp_idx_rtp], &rtp_session->write_msg.header, &sbytes);
if (stat) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ERROR, "Error: SRTP protection failed with code %d\n", stat);
}
*bytes = sbytes;
}
#endif
#ifdef ENABLE_ZRTP
/* ZRTP Send */
if (zrtp_on && !rtp_session->flags[SWITCH_RTP_FLAG_PROXY_MEDIA]) {
unsigned int sbytes = (int) *bytes;
zrtp_status_t stat = zrtp_status_fail;
stat = zrtp_process_rtp(rtp_session->zrtp_stream, (void *) &rtp_session->write_msg, &sbytes);
switch (stat) {
case zrtp_status_ok:
break;
case zrtp_status_drop:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: zRTP protection drop with code %d\n", stat);
ret = SWITCH_STATUS_SUCCESS;
goto end;
break;
case zrtp_status_fail:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: zRTP protection fail with code %d\n", stat);
break;
default:
break;
}
*bytes = sbytes;
}
#endif
}
status = switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, data, bytes);
end:
WRITE_DEC(rtp_session);
return status;
}
SWITCH_DECLARE(uint32_t) switch_rtp_get_ssrc(switch_rtp_t *rtp_session)
{
return rtp_session->ssrc;

View File

@ -34,54 +34,44 @@
#define MAX_MISSING_SEQ 20
#define vb_debug(_vb, _level, _format, ...) if (_vb->debug_level >= _level) switch_log_printf(SWITCH_CHANNEL_LOG_CLEAN, SWITCH_LOG_ALERT, "VB:%p level:%d line:%d ->" _format, (void *) _vb, _level, __LINE__, __VA_ARGS__)
struct switch_vb_s;
typedef struct switch_vb_node_s {
struct switch_vb_frame_s *parent;
struct switch_vb_s *parent;
switch_rtp_packet_t packet;
uint32_t len;
uint8_t visible;
struct switch_vb_node_s *next;
} switch_vb_node_t;
typedef struct switch_vb_frame_s {
struct switch_vb_s *parent;
struct switch_vb_node_s *node_list;
uint32_t ts;
uint32_t visible_nodes;
uint8_t visible;
uint8_t complete;
uint8_t mark;
struct switch_vb_frame_s *next;
uint16_t min_seq;
uint16_t max_seq;
} switch_vb_frame_t;
struct switch_vb_s {
struct switch_vb_frame_s *frame_list;
struct switch_vb_frame_s *cur_read_frame;
struct switch_vb_frame_s *cur_write_frame;
uint32_t last_read_ts;
uint32_t last_read_seq;
struct switch_vb_node_s *node_list;
uint32_t last_target_seq;
uint32_t last_wrote_ts;
uint32_t last_wrote_seq;
uint32_t highest_read_ts;
uint32_t highest_read_seq;
uint32_t highest_wrote_ts;
uint32_t highest_wrote_seq;
uint16_t target_seq;
uint16_t seq_out;
uint32_t visible_frames;
uint32_t visible_nodes;
uint32_t total_frames;
uint32_t complete_frames;
uint32_t frame_len;
uint32_t min_frame_len;
uint32_t max_frame_len;
uint8_t write_init;
uint8_t read_init;
uint8_t debug_level;
switch_timer_t timer;
int cur_errs;
uint16_t next_seq;
switch_inthash_t *missing_seq_hash;
switch_inthash_t *node_hash;
};
static inline switch_vb_node_t *new_node(switch_vb_frame_t *frame)
static inline switch_vb_node_t *new_node(switch_vb_t *vb)
{
switch_vb_node_t *np, *last = NULL;
for (np = frame->node_list; np; np = np->next) {
for (np = vb->node_list; np; np = np->next) {
if (!np->visible) {
break;
}
@ -91,12 +81,11 @@ static inline switch_vb_node_t *new_node(switch_vb_frame_t *frame)
if (!np) {
switch_zmalloc(np, sizeof(*np));
np->parent = frame;
if (last) {
last->next = np;
} else {
frame->node_list = np;
vb->node_list = np;
}
}
@ -104,167 +93,113 @@ static inline switch_vb_node_t *new_node(switch_vb_frame_t *frame)
switch_assert(np);
np->visible = 1;
np->parent->visible_nodes++;
vb->visible_nodes++;
np->parent = vb;
return np;
}
static inline void add_node(switch_vb_frame_t *frame, switch_rtp_packet_t *packet, switch_size_t len)
static inline switch_vb_node_t *find_seq(switch_vb_t *vb, uint16_t seq)
{
switch_vb_node_t *node = new_node(frame);
uint16_t seq = ntohs(packet->header.seq);
switch_vb_node_t *np;
for (np = vb->node_list; np; np = np->next) {
if (!np->visible) continue;
if (ntohs(np->packet.header.seq) == ntohs(seq)) {
return np;
}
}
return NULL;
}
static inline void hide_node(switch_vb_node_t *node)
{
node->visible = 0;
node->parent->visible_nodes--;
switch_core_inthash_delete(node->parent->node_hash, node->packet.header.seq);
}
static inline void hide_nodes(switch_vb_t *vb)
{
switch_vb_node_t *np;
for (np = vb->node_list; np; np = np->next) {
hide_node(np);
}
}
static inline void drop_ts(switch_vb_t *vb, uint32_t ts)
{
switch_vb_node_t *np;
int x = 0;
for (np = vb->node_list; np; np = np->next) {
if (!np->visible) continue;
if (ts == np->packet.header.ts) {
hide_node(np);
x++;
}
}
if (x) vb->complete_frames--;
}
static inline uint32_t vb_find_lowest_ts(switch_vb_t *vb)
{
switch_vb_node_t *np, *lowest = NULL;
for (np = vb->node_list; np; np = np->next) {
if (!np->visible) continue;
if (!lowest || ntohl(lowest->packet.header.ts) > ntohl(np->packet.header.ts)) {
lowest = np;
}
}
return lowest ? lowest->packet.header.ts : 0;
}
static inline void drop_oldest_frame(switch_vb_t *vb)
{
uint32_t ts = vb_find_lowest_ts(vb);
drop_ts(vb, ts);
vb_debug(vb, 1, "Dropping oldest frame ts:%u\n", ntohl(ts));
}
static inline void add_node(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len)
{
switch_vb_node_t *node = new_node(vb);
node->packet = *packet;
node->len = len;
memcpy(node->packet.body, packet->body, len);
if (!frame->min_seq ||seq < ntohs(frame->min_seq)) {
frame->min_seq = packet->header.seq;
switch_core_inthash_insert(vb->node_hash, node->packet.header.seq, node);
vb_debug(vb, (packet->header.m ? 1 : 2), "PUT packet last_ts:%u ts:%u seq:%u%s\n",
ntohl(vb->highest_wrote_ts), ntohl(node->packet.header.ts), ntohs(node->packet.header.seq), packet->header.m ? " <MARK>" : "");
if (!vb->write_init || ntohs(packet->header.seq) > ntohs(vb->highest_wrote_seq) ||
(ntohs(vb->highest_wrote_seq) > USHRT_MAX - 10 && ntohs(packet->header.seq) <= 10) ) {
vb->highest_wrote_seq = packet->header.seq;
}
if (seq > ntohs(frame->max_seq)) {
frame->max_seq = packet->header.seq;
}
vb_debug(frame->parent, (packet->header.m ? 1 : 2), "PUT packet last_ts:%u ts:%u seq:%u%s\n",
ntohl(frame->parent->last_wrote_ts), ntohl(node->packet.header.ts), ntohs(node->packet.header.seq), packet->header.m ? " <MARK>" : "");
if (packet->header.m) {
frame->mark = 1;
if (vb->write_init && htons(packet->header.seq) >= htons(vb->highest_wrote_seq) && (ntohl(node->packet.header.ts) > ntohl(vb->highest_wrote_ts))) {
vb->complete_frames++;
vb_debug(vb, 2, "WRITE frame ts: %u complete=%u/%u n:%u\n", ntohl(node->packet.header.ts), vb->complete_frames , vb->frame_len, vb->visible_nodes);
vb->highest_wrote_ts = packet->header.ts;
} else if (!vb->write_init) {
vb->highest_wrote_ts = packet->header.ts;
}
if ((frame->parent->last_wrote_ts && frame->parent->last_wrote_ts != node->packet.header.ts)) {
frame->complete = 1;
frame->parent->complete_frames++;
if (!vb->write_init) vb->write_init = 1;
if (vb->complete_frames > vb->max_frame_len) {
drop_oldest_frame(vb);
}
frame->parent->last_wrote_ts = packet->header.ts;
frame->parent->last_wrote_seq = packet->header.seq;
}
static inline void hide_node(switch_vb_node_t *node)
{
if (node->visible) {
node->visible = 0;
node->parent->visible_nodes--;
}
}
static inline void hide_nodes(switch_vb_frame_t *frame)
{
switch_vb_node_t *np;
for (np = frame->node_list; np; np = np->next) {
hide_node(np);
}
}
static inline void hide_frame(switch_vb_frame_t *frame)
{
vb_debug(frame->parent, 2, "Hide frame ts: %u\n", ntohl(frame->ts));
if (frame->visible) {
frame->visible = 0;
frame->parent->visible_frames--;
}
if (frame->complete) {
frame->parent->complete_frames--;
frame->complete = 0;
}
frame->min_seq = frame->max_seq = 0;
hide_nodes(frame);
}
static inline switch_vb_frame_t *new_frame(switch_vb_t *vb, switch_rtp_packet_t *packet)
{
switch_vb_frame_t *fp = NULL, *last = NULL;
int new = 1;
if (vb->cur_write_frame) {
if (!vb->cur_write_frame->visible) {
vb->cur_write_frame = NULL;
return NULL;
} else if (vb->cur_write_frame->ts == packet->header.ts) {
fp = vb->cur_write_frame;
new = 0;
}
}
if (!fp) {
for (fp = vb->frame_list; fp; fp = fp->next) {
if (fp->ts == packet->header.ts) {
if (!fp->visible) {
return NULL;
} else {
new = 0;
break;
}
}
}
}
if (!fp) {
for (fp = vb->frame_list; fp; fp = fp->next) {
if (!fp->visible) {
break;
}
last = fp;
}
}
if (!fp) {
switch_zmalloc(fp, sizeof(*fp));
fp->parent = vb;
vb->total_frames++;
if (last) {
last->next = fp;
} else {
vb->frame_list = fp;
}
}
switch_assert(fp);
if (new) {
vb->visible_frames++;
fp->visible = 1;
fp->complete = 0;
fp->ts = packet->header.ts;
fp->min_seq = fp->max_seq = 0;
fp->mark = 0;
}
vb->cur_write_frame = fp;
return fp;
}
static inline int frame_contains_seq(switch_vb_frame_t *frame, uint16_t target_seq, switch_vb_node_t **nodep)
{
uint16_t seq = ntohs(target_seq);
switch_vb_node_t *np;
for (np = frame->node_list; np; np = np->next) {
if (!np->visible) {
continue;
}
//vb_debug(frame->parent, 10, " CMP %u %u/%u\n", ntohl(frame->ts), ntohs(np->packet.header.seq), seq);
if (ntohs(np->packet.header.seq) == seq) {
//vb_debug(frame->parent, 10, " MATCH %u %u v:%d\n", ntohs(np->packet.header.seq), seq, np->visible);
if (nodep) {
*nodep = np;
}
return 1;
}
}
return 0;
}
static inline void increment_seq(switch_vb_t *vb)
@ -278,155 +213,56 @@ static inline void set_read_seq(switch_vb_t *vb, uint16_t seq)
vb->target_seq = htons((ntohs(vb->last_target_seq) + 1));
}
static inline switch_status_t next_frame(switch_vb_t *vb, switch_vb_node_t **nodep)
{
switch_vb_frame_t *fp = NULL, *oldest = NULL, *frame_containing_seq = NULL;
if ((fp = vb->cur_read_frame)) {
if (fp->visible_nodes == 0) {
hide_frame(fp);
vb->cur_read_frame = NULL;
}
}
if ((fp = vb->cur_read_frame)) {
int ok = 1;
if (!fp->visible || fp->visible_nodes == 0) {
ok = 0;
} else {
if (vb->target_seq) {
if (frame_contains_seq(fp, vb->target_seq, nodep)) {
vb_debug(vb, 2, "CUR FRAME %u CONTAINS REQUESTED SEQ %d\n", ntohl(fp->ts), ntohs(vb->target_seq));
frame_containing_seq = fp;
goto end;
} else {
ok = 0;
}
}
}
if (!ok) {
vb_debug(vb, 2, "DONE WITH CUR FRAME %u v: %d c: %d\n", ntohl(fp->ts), fp->visible, fp->complete);
vb->cur_read_frame = NULL;
}
}
do {
*nodep = NULL;
for (fp = vb->frame_list; fp; fp = fp->next) {
if (!fp->visible || !fp->complete) {
continue;
}
if (vb->target_seq) {
if (frame_contains_seq(fp, vb->target_seq, nodep)) {
vb_debug(vb, 2, "FOUND FRAME %u CONTAINING SEQ %d\n", ntohl(fp->ts), ntohs(vb->target_seq));
frame_containing_seq = fp;
goto end;
}
}
if ((!oldest || htonl(oldest->ts) > htonl(fp->ts))) {
oldest = fp;
}
}
if (!frame_containing_seq && vb->target_seq) {
if (ntohs(vb->target_seq) - ntohs(vb->last_target_seq) > MAX_MISSING_SEQ) {
vb_debug(vb, 1, "FOUND NO FRAMES CONTAINING SEQ %d. Too many failures....\n", ntohs(vb->target_seq));
switch_vb_reset(vb, SWITCH_FALSE);
} else {
vb_debug(vb, 2, "FOUND NO FRAMES CONTAINING SEQ %d. Try next one\n", ntohs(vb->target_seq));
increment_seq(vb);
vb->cur_errs++;
}
}
} while (!frame_containing_seq && vb->target_seq);
end:
if (frame_containing_seq) {
vb->cur_read_frame = frame_containing_seq;
if (nodep && *nodep) {
hide_node(*nodep);
set_read_seq(vb, (*nodep)->packet.header.seq);
}
} else if (oldest) {
vb->cur_read_frame = oldest;
} else {
vb->cur_read_frame = NULL;
}
if (vb->cur_read_frame) {
return SWITCH_STATUS_SUCCESS;
}
return SWITCH_STATUS_NOTFOUND;
}
static inline switch_vb_node_t *frame_find_next_seq(switch_vb_frame_t *frame)
{
switch_vb_node_t *np;
for (np = frame->node_list; np; np = np->next) {
if (!np->visible) continue;
if (ntohs(np->packet.header.seq) == ntohs(frame->parent->target_seq)) {
hide_node(np);
set_read_seq(frame->parent, np->packet.header.seq);
return np;
}
}
return NULL;
}
static inline switch_vb_node_t *frame_find_lowest_seq(switch_vb_frame_t *frame)
static inline switch_vb_node_t *vb_find_lowest_seq(switch_vb_t *vb)
{
switch_vb_node_t *np, *lowest = NULL;
for (np = frame->node_list; np; np = np->next) {
for (np = vb->node_list; np; np = np->next) {
if (!np->visible) continue;
if (!lowest || ntohs(lowest->packet.header.seq) > ntohs(np->packet.header.seq)) {
hide_node(np);
lowest = np;
}
}
if (lowest) {
set_read_seq(frame->parent, lowest->packet.header.seq);
}
return lowest;
}
static inline switch_status_t next_frame_packet(switch_vb_t *vb, switch_vb_node_t **nodep)
static inline switch_status_t vb_next_packet(switch_vb_t *vb, switch_vb_node_t **nodep)
{
switch_vb_node_t *node = NULL;
switch_vb_node_t *np = NULL, *node = NULL;
switch_status_t status;
if ((status = next_frame(vb, &node) != SWITCH_STATUS_SUCCESS)) {
return status;
}
if (!node) {
if (vb->target_seq) {
vb_debug(vb, 2, "Search for next packet %u cur ts: %u\n", htons(vb->target_seq), htonl(vb->cur_read_frame->ts));
node = frame_find_next_seq(vb->cur_read_frame);
if (np) status = 0, status++;
if (!vb->target_seq) {
if ((node = vb_find_lowest_seq(vb))) {
vb_debug(vb, 2, "No target seq using seq: %u as a starting point\n", ntohs(node->packet.header.seq));
} else {
node = frame_find_lowest_seq(vb->cur_read_frame);
vb_debug(vb, 2, "Find lowest seq frame ts: %u seq: %u\n", ntohl(vb->cur_read_frame->ts), ntohs(node->packet.header.seq));
vb_debug(vb, 1, "%s", "No nodes available....\n");
}
} else if ((node = switch_core_inthash_find(vb->node_hash, vb->target_seq))) {
vb_debug(vb, 2, "FOUND desired seq: %u\n", ntohs(vb->target_seq));
} else {
int x;
vb_debug(vb, 2, "MISSING desired seq: %u\n", ntohs(vb->target_seq));
for (x = 0; x < 10; x++) {
increment_seq(vb);
if ((node = switch_core_inthash_find(vb->node_hash, vb->target_seq))) {
vb_debug(vb, 2, "FOUND incremental seq: %u\n", ntohs(vb->target_seq));
break;
} else {
vb_debug(vb, 2, "MISSING incremental seq: %u\n", ntohs(vb->target_seq));
}
}
}
*nodep = node;
if (node) {
set_read_seq(vb, node->packet.header.seq);
return SWITCH_STATUS_SUCCESS;
}
@ -434,9 +270,9 @@ static inline switch_status_t next_frame_packet(switch_vb_t *vb, switch_vb_node_
}
static inline void free_nodes(switch_vb_frame_t *frame)
static inline void free_nodes(switch_vb_t *vb)
{
switch_vb_node_t *np = frame->node_list, *cur;
switch_vb_node_t *np = vb->node_list, *cur;
while(np) {
cur = np;
@ -444,32 +280,9 @@ static inline void free_nodes(switch_vb_frame_t *frame)
free(cur);
}
frame->node_list = NULL;
vb->node_list = NULL;
}
static inline void free_frames(switch_vb_t *vb)
{
switch_vb_frame_t *fp = vb->frame_list, *cur = NULL;
while(fp) {
cur = fp;
fp = fp->next;
free_nodes(cur);
free(cur);
}
vb->frame_list = NULL;
}
static inline void do_flush(switch_vb_t *vb)
{
switch_vb_frame_t *fp = vb->frame_list;
while(fp) {
hide_frame(fp);
fp = fp->next;
}
}
SWITCH_DECLARE(int) switch_vb_poll(switch_vb_t *vb)
{
@ -490,21 +303,15 @@ SWITCH_DECLARE(void) switch_vb_reset(switch_vb_t *vb, switch_bool_t flush)
{
vb_debug(vb, 2, "RESET BUFFER flush: %d\n", (int)flush);
if (vb->cur_read_frame) {
vb->cur_read_frame = NULL;
}
vb->last_read_ts = 0;
vb->last_target_seq = 0;
vb->target_seq = 0;
if (flush) {
do_flush(vb);
//do_flush(vb);
}
}
SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len, switch_bool_t timer_compensation)
SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len)
{
switch_vb_t *vb;
switch_zmalloc(vb, sizeof(*vb));
@ -512,10 +319,8 @@ SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min
vb->min_frame_len = vb->frame_len = min_frame_len;
vb->max_frame_len = max_frame_len;
//vb->seq_out = (uint16_t) rand();
if (timer_compensation) { /* rewrite timestamps and seq as they are read to hide packet loss */
switch_core_timer_init(&vb->timer, "soft", 1, 90, NULL);
}
switch_core_inthash_init(&vb->missing_seq_hash);
switch_core_inthash_init(&vb->node_hash);
*vbp = vb;
@ -526,35 +331,105 @@ SWITCH_DECLARE(switch_status_t) switch_vb_destroy(switch_vb_t **vbp)
{
switch_vb_t *vb = *vbp;
*vbp = NULL;
switch_core_inthash_destroy(&vb->missing_seq_hash);
switch_core_inthash_destroy(&vb->node_hash);
if (vb->timer.timer_interface) {
switch_core_timer_destroy(&vb->timer);
free_nodes(vb);
free(vb);
return SWITCH_STATUS_SUCCESS;
}
SWITCH_DECLARE(uint32_t) switch_vb_pop_nack(switch_vb_t *vb)
{
switch_hash_index_t *hi = NULL;
uint32_t nack = 0;
uint16_t least = 0;
int i = 0;
void *val;
const void *var;
for (hi = switch_core_hash_first(vb->missing_seq_hash); hi; hi = switch_core_hash_next(&hi)) {
uint16_t seq;
switch_core_hash_this(hi, &var, NULL, &val);
seq = ntohs(*((uint16_t *) var));
vb_debug(vb, 3, "WTF ENTRY %u\n", seq);
if (!least || seq < least) {
least = seq;
}
}
free_frames(vb);
free(vb);
if (least && switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(least))) {
vb_debug(vb, 3, "Found smallest NACKABLE seq %u\n", least);
nack = (uint32_t) htons(least);
for (i = 1; i > 17; i++) {
if (switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(least + i))) {
vb_debug(vb, 3, "Found addtl NACKABLE seq %u\n", least + i);
nack |= (1 << (16 + i));
} else {
break;
}
}
}
return nack;
}
SWITCH_DECLARE(switch_status_t) switch_vb_push_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len)
{
add_node(vb, packet, len);
return SWITCH_STATUS_SUCCESS;
}
SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len)
{
switch_vb_frame_t *frame;
uint32_t i;
uint16_t want = ntohs(vb->next_seq), got = ntohs(packet->header.seq);
if (!want) want = got;
#ifdef VB_PLOSS
int r = (rand() % 10000) + 1;
if (r <= 200) {
vb_debug(vb, 1, "Simulate dropped packet ......... ts: %u seq: %u\n", ntohl(packet->header.ts), ntohs(packet->header.seq));
return SWITCH_STATUS_SUCCESS;
}
#endif
if ((frame = new_frame(vb, packet))) {
add_node(frame, packet, len);
return SWITCH_STATUS_SUCCESS;
if (got > want) {
for (i = want; i < got; i++) {
vb_debug(vb, 2, "MARK SEQ MISSING %u\n", i);
switch_core_inthash_insert(vb->missing_seq_hash, (uint32_t)htons(i), (void *)SWITCH_TRUE);
}
} else {
if (switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(got))) {
vb_debug(vb, 2, "MARK SEQ FOUND %u\n", got);
}
}
return SWITCH_STATUS_IGNORE;
if (got >= want) {
vb->next_seq = htons(ntohs(packet->header.seq) + 1);
}
add_node(vb, packet, len);
return SWITCH_STATUS_SUCCESS;
}
SWITCH_DECLARE(switch_status_t) switch_vb_get_packet_by_seq(switch_vb_t *vb, uint16_t seq, switch_rtp_packet_t *packet, switch_size_t *len)
{
switch_vb_node_t *node;
if ((node = switch_core_inthash_find(vb->node_hash, seq))) {
vb_debug(vb, 2, "Found buffered seq: %u\n", ntohs(seq));
*packet = node->packet;
*len = node->len;
memcpy(packet->body, node->packet.body, node->len);
return SWITCH_STATUS_SUCCESS;
} else {
vb_debug(vb, 2, "Missing buffered seq: %u\n", ntohs(seq));
}
return SWITCH_STATUS_NOTFOUND;
}
SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t *len)
@ -562,15 +437,32 @@ SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp
switch_vb_node_t *node = NULL;
switch_status_t status;
vb->cur_errs = 0;
vb_debug(vb, 2, "GET PACKET %u/%u n:%d\n", vb->complete_frames , vb->frame_len, vb->visible_nodes);
if (vb->complete_frames < vb->frame_len) {
vb_debug(vb, 2, "BUFFERING %u/%u\n", vb->complete_frames , vb->frame_len);
return SWITCH_STATUS_MORE_DATA;
}
if ((status = next_frame_packet(vb, &node)) == SWITCH_STATUS_SUCCESS) {
vb_debug(vb, 2, "Found next frame cur ts: %u seq: %u\n", htonl(vb->cur_read_frame->ts), htons(node->packet.header.seq));
if ((status = vb_next_packet(vb, &node)) == SWITCH_STATUS_SUCCESS) {
vb_debug(vb, 2, "Found next frame cur ts: %u seq: %u\n", htonl(node->packet.header.ts), htons(node->packet.header.seq));
if (!vb->read_init || ntohs(node->packet.header.seq) > ntohs(vb->highest_read_seq) ||
(ntohs(vb->highest_read_seq) > USHRT_MAX - 10 && ntohs(node->packet.header.seq) <= 10) ) {
vb->highest_read_seq = node->packet.header.seq;
}
if (vb->read_init && htons(node->packet.header.seq) >= htons(vb->highest_read_seq) && (ntohl(node->packet.header.ts) > ntohl(vb->highest_read_ts))) {
vb->complete_frames--;
vb_debug(vb, 2, "READ frame ts: %u complete=%u/%u n:%u\n", ntohl(node->packet.header.ts), vb->complete_frames , vb->frame_len, vb->visible_nodes);
vb->highest_read_ts = node->packet.header.ts;
} else if (!vb->read_init) {
vb->highest_read_ts = node->packet.header.ts;
}
if (!vb->read_init) vb->read_init = 1;
} else {
switch_vb_reset(vb, SWITCH_FALSE);
@ -591,35 +483,11 @@ SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp
*packet = node->packet;
*len = node->len;
memcpy(packet->body, node->packet.body, node->len);
if (vb->cur_errs) {
vb_debug(vb, 1, "One or more Missing SEQ TS %u\n", ntohl(packet->header.ts));
status = SWITCH_STATUS_BREAK;
}
vb->last_read_ts = packet->header.ts;
vb->last_read_seq = packet->header.seq;
if (vb->timer.timer_interface) {
if (packet->header.m || !vb->timer.samplecount) {
switch_core_timer_sync(&vb->timer);
}
}
if (vb->cur_read_frame && vb->cur_read_frame->visible_nodes == 0 && !packet->header.m) {
/* force mark bit */
vb_debug(vb, 1, "LAST PACKET %u WITH NO MARK BIT, ADDIONG MARK BIT\n", ntohl(packet->header.ts));
packet->header.m = 1;
status = SWITCH_STATUS_BREAK;
}
hide_node(node);
vb_debug(vb, 1, "GET packet ts:%u seq:%u~%u%s\n", ntohl(packet->header.ts), ntohs(packet->header.seq), vb->seq_out, packet->header.m ? " <MARK>" : "");
//packet->header.seq = htons(vb->seq_out++);
if (vb->timer.timer_interface) {
packet->header.ts = htonl(vb->timer.samplecount);
}
return status;
}