From 0d626bc715bdb44af7047cd0173170b9ac5cd079 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Fri, 19 Dec 2014 00:03:37 -0600 Subject: [PATCH] FS-7501: more factoring on vid buffer --- src/include/switch_utils.h | 1 + src/mod/codecs/mod_vpx/mod_vpx.c | 3 +- src/switch_core_media.c | 6 + src/switch_rtp.c | 7 +- src/switch_utils.c | 49 +++++++ src/switch_vidderbuffer.c | 241 +++++++++++++++++++++---------- 6 files changed, 225 insertions(+), 82 deletions(-) diff --git a/src/include/switch_utils.h b/src/include/switch_utils.h index 120e319838..e922ff262a 100644 --- a/src/include/switch_utils.h +++ b/src/include/switch_utils.h @@ -513,6 +513,7 @@ SWITCH_DECLARE(char *) get_addr6(char *buf, switch_size_t len, struct sockaddr_i SWITCH_DECLARE(int) get_addr_int(switch_sockaddr_t *sa); SWITCH_DECLARE(int) switch_cmp_addr(switch_sockaddr_t *sa1, switch_sockaddr_t *sa2); +SWITCH_DECLARE(int) switch_cp_addr(switch_sockaddr_t *sa1, switch_sockaddr_t *sa2); /*! \brief get the port number of an ip address diff --git a/src/mod/codecs/mod_vpx/mod_vpx.c b/src/mod/codecs/mod_vpx/mod_vpx.c index aa93c46caa..200213c2a8 100644 --- a/src/mod/codecs/mod_vpx/mod_vpx.c +++ b/src/mod/codecs/mod_vpx/mod_vpx.c @@ -517,6 +517,7 @@ static switch_status_t switch_vpx_decode(switch_codec_t *codec, switch_frame_t * (!frame->m) && (!context->last_received_complete_picture)) { // possible packet loss switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Packet Loss, skip previous received frame (to avoid crash?)\n"); + usleep(500000);abort(); switch_goto_status(SWITCH_STATUS_RESTART, end); } @@ -563,7 +564,7 @@ static switch_status_t switch_vpx_decode(switch_codec_t *codec, switch_frame_t * frame->img = (switch_image_t *) vpx_codec_get_frame(decoder, &iter); if (!(frame->img) || corrupted) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "VPX invalid packet\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "VPX invalid packet image: %d corrupted: %d\n", !!frame->img, corrupted); switch_goto_status(SWITCH_STATUS_RESTART, end); } diff --git a/src/switch_core_media.c b/src/switch_core_media.c index 4b4db8d904..5ed98ec8e5 100644 --- a/src/switch_core_media.c +++ b/src/switch_core_media.c @@ -9831,10 +9831,12 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_video_frame(switch_core loops++; if (switch_channel_down_nosig(session->channel)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "FUCKING DOWN\n"); return SWITCH_STATUS_FALSE; } if (switch_channel_test_flag(session->channel, CF_VIDEO_PAUSE)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "FUCKING VID PAUSED\n"); *frame = &runtime.dummy_cng_frame; switch_yield(20000); return SWITCH_STATUS_SUCCESS; @@ -9853,14 +9855,17 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_video_frame(switch_core if (status == SWITCH_STATUS_INUSE) { *frame = &runtime.dummy_cng_frame; switch_yield(20000); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "FUCKING DUMMY\n"); return SWITCH_STATUS_SUCCESS; } if (status != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "FUCKING BAD STATUS %d\n", status); goto done; } if (!(*frame)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "FUCKING NO FRAME\n"); goto done; } @@ -9882,6 +9887,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_video_frame(switch_core if (switch_test_flag(*frame, SFF_CNG)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "FUCKING CNG\n"); status = SWITCH_STATUS_SUCCESS; goto done; } diff --git a/src/switch_rtp.c b/src/switch_rtp.c index 936d71992d..bead97cd81 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -5107,6 +5107,7 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t rtp_session->recv_msg.header.pt = jb_frame->pt; rtp_session->recv_msg.header.seq = htons(jb_frame->seq); status = SWITCH_STATUS_SUCCESS; + switch_cp_addr(rtp_session->from_addr, rtp_session->remote_addr); if (!xcheck_jitter) { check_jitter(rtp_session); xcheck_jitter = *bytes; @@ -5121,18 +5122,22 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t switch(vstatus) { case SWITCH_STATUS_RESTART: switch_core_session_request_video_refresh(rtp_session->session); + *bytes = 0; break; case SWITCH_STATUS_MORE_DATA: status = SWITCH_STATUS_FALSE; *bytes = 0; break; - default: + case SWITCH_STATUS_SUCCESS: status = SWITCH_STATUS_SUCCESS; + switch_cp_addr(rtp_session->from_addr, rtp_session->remote_addr); if (!xcheck_jitter) { check_jitter(rtp_session); xcheck_jitter = *bytes; } break; + default: + break; } } diff --git a/src/switch_utils.c b/src/switch_utils.c index c0963aeb9e..c9ae2090b6 100644 --- a/src/switch_utils.c +++ b/src/switch_utils.c @@ -1976,6 +1976,55 @@ SWITCH_DECLARE(int) switch_cmp_addr(switch_sockaddr_t *sa1, switch_sockaddr_t *s return 0; } + +SWITCH_DECLARE(int) switch_cp_addr(switch_sockaddr_t *sa1, switch_sockaddr_t *sa2) +{ + struct sockaddr_in *s1; + struct sockaddr_in *s2; + + struct sockaddr_in6 *s16; + struct sockaddr_in6 *s26; + + struct sockaddr *ss1; + struct sockaddr *ss2; + + if (!(sa1 && sa2)) + return 0; + + s1 = (struct sockaddr_in *) &sa1->sa; + s2 = (struct sockaddr_in *) &sa2->sa; + + s16 = (struct sockaddr_in6 *) &sa1->sa; + s26 = (struct sockaddr_in6 *) &sa2->sa; + + ss1 = (struct sockaddr *) &sa1->sa; + ss2 = (struct sockaddr *) &sa2->sa; + + if (ss1->sa_family != ss2->sa_family) + return 0; + + switch (ss1->sa_family) { + case AF_INET: + s1->sin_addr.s_addr = s2->sin_addr.s_addr; + s1->sin_port = s2->sin_port; + return 1; + case AF_INET6: + if (s16->sin6_addr.s6_addr && s26->sin6_addr.s6_addr) { + int i; + + s16->sin6_port = s26->sin6_port; + + for (i = 0; i < 4; i++) { + *((int32_t *) s16->sin6_addr.s6_addr + i) = *((int32_t *) s26->sin6_addr.s6_addr + i); + } + + return 1; + } + } + + return 0; +} + SWITCH_DECLARE(char *) get_addr6(char *buf, switch_size_t len, struct sockaddr_in6 *sa, socklen_t salen) { switch_assert(buf); diff --git a/src/switch_vidderbuffer.c b/src/switch_vidderbuffer.c index c083f99f4b..7e95c77984 100644 --- a/src/switch_vidderbuffer.c +++ b/src/switch_vidderbuffer.c @@ -31,6 +31,7 @@ #include #include +#define MAX_MISSING_SEQ 20 #define vb_debug(_vb, _level, _format, ...) if (_vb->debug_level >= _level) switch_log_printf(SWITCH_CHANNEL_LOG_CLEAN, SWITCH_LOG_ALERT, "VB:%p level:%d line:%d ->" _format, (void *) _vb, _level, __LINE__, __VA_ARGS__) typedef struct switch_vb_node_s { @@ -45,8 +46,10 @@ typedef struct switch_vb_frame_s { struct switch_vb_s *parent; struct switch_vb_node_s *node_list; uint32_t ts; + uint32_t visible_nodes; uint8_t visible; uint8_t complete; + uint8_t mark; struct switch_vb_frame_s *next; uint16_t min_seq; uint16_t max_seq; @@ -55,8 +58,11 @@ typedef struct switch_vb_frame_s { struct switch_vb_s { struct switch_vb_frame_s *frame_list; struct switch_vb_frame_s *cur_read_frame; + struct switch_vb_frame_s *cur_write_frame; uint32_t last_read_ts; uint32_t last_read_seq; + uint32_t last_wrote_ts; + uint32_t last_wrote_seq; uint16_t target_seq; uint16_t seq_out; uint32_t visible_frames; @@ -96,7 +102,8 @@ static inline switch_vb_node_t *new_node(switch_vb_frame_t *frame) switch_assert(np); np->visible = 1; - + np->parent->visible_nodes++; + return np; } @@ -117,18 +124,29 @@ static inline void add_node(switch_vb_frame_t *frame, switch_rtp_packet_t *packe frame->max_seq = packet->header.seq; } - vb_debug(frame->parent, (packet->header.m ? 1 : 2), "PUT packet ts:%u seq:%u %s\n", - ntohl(node->packet.header.ts), ntohs(node->packet.header.seq), packet->header.m ? "FINAL" : "PARTIAL"); + vb_debug(frame->parent, (packet->header.m ? 1 : 2), "PUT packet last_ts:%u ts:%u seq:%u %s\n", + ntohl(frame->parent->last_wrote_ts), ntohl(node->packet.header.ts), ntohs(node->packet.header.seq), packet->header.m ? "FINAL" : "PARTIAL"); + if (packet->header.m) { + frame->mark = 1; + } + + if ((frame->parent->last_wrote_ts && frame->parent->last_wrote_ts != node->packet.header.ts)) { frame->complete = 1; frame->parent->complete_frames++; } + + frame->parent->last_wrote_ts = packet->header.ts; + frame->parent->last_wrote_seq = packet->header.seq; } static inline void hide_node(switch_vb_node_t *node) { - node->visible = 0; + if (node->visible) { + node->visible = 0; + node->parent->visible_nodes--; + } } static inline void hide_nodes(switch_vb_frame_t *frame) @@ -144,30 +162,45 @@ static inline void hide_frame(switch_vb_frame_t *frame) { vb_debug(frame->parent, 2, "Hide frame ts: %u\n", ntohl(frame->ts)); - frame->visible = 0; - frame->min_seq = frame->max_seq = 0; - frame->parent->visible_frames--; + if (frame->visible) { + frame->visible = 0; + frame->parent->visible_frames--; + } if (frame->complete) { frame->parent->complete_frames--; frame->complete = 0; } + frame->min_seq = frame->max_seq = 0; + hide_nodes(frame); } static inline switch_vb_frame_t *new_frame(switch_vb_t *vb, switch_rtp_packet_t *packet) { - switch_vb_frame_t *fp, *last = NULL; + switch_vb_frame_t *fp = NULL, *last = NULL; int new = 1; - for (fp = vb->frame_list; fp; fp = fp->next) { - if (fp->ts == packet->header.ts) { - if (fp->complete || !fp->visible) { - return NULL; - } else { - new = 0; - break; + if (vb->cur_write_frame) { + if (!vb->cur_write_frame->visible) { + vb->cur_write_frame = NULL; + return NULL; + } else if (vb->cur_write_frame->ts == packet->header.ts) { + fp = vb->cur_write_frame; + new = 0; + } + } + + if (!fp) { + for (fp = vb->frame_list; fp; fp = fp->next) { + if (fp->ts == packet->header.ts) { + if (!fp->visible) { + return NULL; + } else { + new = 0; + break; + } } } } @@ -201,8 +234,11 @@ static inline switch_vb_frame_t *new_frame(switch_vb_t *vb, switch_rtp_packet_t fp->complete = 0; fp->ts = packet->header.ts; fp->min_seq = fp->max_seq = 0; + fp->mark = 0; } + vb->cur_write_frame = fp; + return fp; } @@ -212,7 +248,12 @@ static inline int frame_contains_seq(switch_vb_frame_t *frame, uint16_t target_s int16_t seq = ntohs(target_seq); if (frame->min_seq && frame->max_seq && seq >= ntohs(frame->min_seq) && seq <= ntohs(frame->max_seq)) { - return 1; + switch_vb_node_t *np; + + for (np = frame->node_list; np; np = np->next) { + if (!np->visible) continue; + if (ntohs(np->packet.header.seq) == seq) return 1; + } } return 0; @@ -261,36 +302,85 @@ static inline int check_frame(switch_vb_frame_t *frame, switch_bool_t seq_check) } +static inline void increment_seq(switch_vb_t *vb) +{ + vb->target_seq = htons((ntohs(vb->target_seq) + 1)); +} + +static inline void set_read_seq(switch_vb_t *vb, uint16_t seq) +{ + vb->last_read_seq = seq; + vb->target_seq = htons((ntohs(vb->last_read_seq) + 1)); +} static inline switch_status_t next_frame(switch_vb_t *vb) { switch_vb_frame_t *fp = NULL, *oldest = NULL, *frame_containing_seq = NULL; - vb->cur_read_frame = NULL; - - for (fp = vb->frame_list; fp; fp = fp->next) { - - if (!fp->visible || !fp->complete) { - continue; + if ((fp = vb->cur_read_frame)) { + if (fp->visible_nodes == 0) { + hide_frame(fp); + vb->cur_read_frame = NULL; } + } - if (vb->target_seq) { - if (frame_contains_seq(fp, vb->target_seq)) { - vb_debug(fp->parent, 2, "FOUND FRAME CONTAINING SEQ %d\n", ntohs(vb->target_seq)); - frame_containing_seq = fp; - break; + + if ((fp = vb->cur_read_frame)) { + int ok = 1; + + if (!fp->visible || !fp->complete || fp->visible_nodes == 0) { + ok = 0; + } else { + if (vb->target_seq) { + if (frame_contains_seq(fp, vb->target_seq)) { + vb_debug(vb, 2, "FOUND CUR FRAME %u CONTAINING SEQ %d\n", ntohl(fp->ts), ntohs(vb->target_seq)); + frame_containing_seq = fp; + goto end; + } else { + ok = 0; + } } } - if (!check_frame(fp, SWITCH_FALSE)) { - return SWITCH_STATUS_RESTART; - } - - if ((!oldest || htonl(oldest->ts) > htonl(fp->ts))) { - oldest = fp; + if (!ok) { + vb_debug(vb, 2, "DONE WITH CUR FRAME %u v: %d c: %d\n", ntohl(fp->ts), fp->visible, fp->complete); + vb->cur_read_frame = NULL; } } + + do { + + for (fp = vb->frame_list; fp; fp = fp->next) { + + if (!fp->visible || !fp->complete) { + continue; + } + + if (vb->target_seq) { + if (frame_contains_seq(fp, vb->target_seq)) { + vb_debug(vb, 2, "FOUND FRAME %u CONTAINING SEQ %d\n", ntohl(fp->ts), ntohs(vb->target_seq)); + frame_containing_seq = fp; + goto end; + } + } + + if ((!oldest || htonl(oldest->ts) > htonl(fp->ts))) { + oldest = fp; + } + } + + if (!frame_containing_seq && vb->target_seq) { + if (ntohs(vb->target_seq) - ntohs(vb->last_read_seq) > MAX_MISSING_SEQ) { + vb_debug(vb, 1, "FOUND NO FRAMES CONTAINING SEQ %d. Too many failures....\n", ntohs(vb->target_seq)); + switch_vb_reset(vb, SWITCH_FALSE); + } else { + vb_debug(vb, 2, "FOUND NO FRAMES CONTAINING SEQ %d. Try next one\n", ntohs(vb->target_seq)); + increment_seq(vb); + } + } + } while (!frame_containing_seq && vb->target_seq); + end: if (frame_containing_seq) { vb->cur_read_frame = frame_containing_seq; @@ -305,12 +395,6 @@ static inline switch_status_t next_frame(switch_vb_t *vb) return SWITCH_STATUS_NOTFOUND; } -static inline void set_read_seq(switch_vb_t *vb, uint16_t seq) -{ - vb->last_read_seq = seq; - vb->target_seq = htons((ntohs(vb->last_read_seq) + 1)); -} - static inline switch_vb_node_t *frame_find_next_seq(switch_vb_frame_t *frame) { switch_vb_node_t *np; @@ -349,19 +433,31 @@ static inline switch_vb_node_t *frame_find_lowest_seq(switch_vb_frame_t *frame) return lowest; } -static inline switch_vb_node_t *next_frame_packet(switch_vb_t *vb) +static inline switch_status_t next_frame_packet(switch_vb_t *vb, switch_vb_node_t **nodep) { switch_vb_node_t *node; + switch_status_t status; - if (vb->last_read_seq) { + if ((status = next_frame(vb) != SWITCH_STATUS_SUCCESS)) { + return status; + } + + if (vb->target_seq) { vb_debug(vb, 2, "Search for next packet %u cur ts: %u\n", htons(vb->target_seq), htonl(vb->cur_read_frame->ts)); node = frame_find_next_seq(vb->cur_read_frame); } else { - vb_debug(vb, 2, "Find lowest seq frame ts: %u\n", ntohl(vb->cur_read_frame->ts)); node = frame_find_lowest_seq(vb->cur_read_frame); + vb_debug(vb, 2, "Find lowest seq frame ts: %u seq: %u\n", ntohl(vb->cur_read_frame->ts), ntohs(node->packet.header.seq)); } - return node; + *nodep = node; + + if (node) { + return SWITCH_STATUS_SUCCESS; + } + + return SWITCH_STATUS_NOTFOUND; + } static inline void free_nodes(switch_vb_frame_t *frame) @@ -418,8 +514,10 @@ SWITCH_DECLARE(void) switch_vb_debug_level(switch_vb_t *vb, uint8_t level) SWITCH_DECLARE(void) switch_vb_reset(switch_vb_t *vb, switch_bool_t flush) { + vb_debug(vb, 2, "RESET BUFFER flush: %d\n", (int)flush); + + if (vb->cur_read_frame) { - hide_frame(vb->cur_read_frame); vb->cur_read_frame = NULL; } @@ -469,7 +567,7 @@ SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp { switch_vb_frame_t *frame; -#ifndef VB_PLOSS +#ifdef VB_PLOSS int r = (rand() % 100000) + 1; if (r <= 20) { vb_debug(vb, 1, "Simulate dropped packet ......... ts: %u seq: %u\n", ntohl(packet->header.ts), ntohs(packet->header.seq)); @@ -488,37 +586,28 @@ SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t *len) { switch_vb_node_t *node = NULL; - int fail = 0; + switch_status_t status; if (vb->complete_frames < vb->frame_len) { vb_debug(vb, 2, "BUFFERING %u/%u\n", vb->complete_frames , vb->frame_len); return SWITCH_STATUS_MORE_DATA; } - do { - if (vb->cur_read_frame) { - if (!(node = next_frame_packet(vb))) { - vb_debug(vb, 2, "Cannot find next packet %u cur ts: %u\n", htons(vb->target_seq), htonl(vb->cur_read_frame->ts)); - switch_vb_reset(vb, SWITCH_FALSE); - fail++; - } - } else { - switch_status_t status = next_frame(vb); - - switch(status) { - case SWITCH_STATUS_RESTART: - vb_debug(vb, 2, "%s", "Error encountered ask for new keyframe\n"); - return SWITCH_STATUS_RESTART; - case SWITCH_STATUS_NOTFOUND: - vb_debug(vb, 2, "%s", "No frames found wait for more\n"); - return SWITCH_STATUS_MORE_DATA; - default: - vb_debug(vb, 2, "Found next frame cur ts: %u\n", htonl(vb->cur_read_frame->ts)); - break; - } - } + if ((status = next_frame_packet(vb, &node)) == SWITCH_STATUS_SUCCESS) { + vb_debug(vb, 2, "Found next frame cur ts: %u seq: %u\n", htonl(vb->cur_read_frame->ts), htons(node->packet.header.seq)); + } else { + switch_vb_reset(vb, SWITCH_FALSE); - } while (!node && fail < 2); + switch(status) { + case SWITCH_STATUS_RESTART: + vb_debug(vb, 2, "%s", "Error encountered ask for new keyframe\n"); + return SWITCH_STATUS_RESTART; + case SWITCH_STATUS_NOTFOUND: + default: + vb_debug(vb, 2, "%s", "No frames found wait for more\n"); + return SWITCH_STATUS_MORE_DATA; + } + } if (node) { *packet = node->packet; @@ -533,22 +622,14 @@ SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp } } - packet->header.ts = htonl(vb->timer.samplecount); - - vb_debug(vb, 1, "GET packet ts:%u seq:%u~%u\n", ntohl(packet->header.ts), ntohs(packet->header.seq), vb->seq_out); + vb_debug(vb, 1, "GET packet ts:%u seq:%u~%u m:%d\n", ntohl(packet->header.ts), ntohs(packet->header.seq), vb->seq_out, packet->header.m); if (vb->timer.timer_interface) { packet->header.seq = htons(vb->seq_out++); + packet->header.ts = htonl(vb->timer.samplecount); } - - if (vb->cur_read_frame && node->packet.header.m) { - hide_frame(vb->cur_read_frame); - vb->cur_read_frame = NULL; - } - + return SWITCH_STATUS_SUCCESS; - } else if (fail) { - return SWITCH_STATUS_NOTFOUND; } return SWITCH_STATUS_MORE_DATA;