diff --git a/src/include/switch_vidderbuffer.h b/src/include/switch_vidderbuffer.h index 6f2b02e7a5..d59ad3c46b 100644 --- a/src/include/switch_vidderbuffer.h +++ b/src/include/switch_vidderbuffer.h @@ -45,6 +45,7 @@ SWITCH_DECLARE(void) switch_vb_reset(switch_vb_t *vb); SWITCH_DECLARE(void) switch_vb_debug_level(switch_vb_t *vb, uint8_t level); SWITCH_DECLARE(int) switch_vb_frame_count(switch_vb_t *vb); SWITCH_DECLARE(int) switch_vb_poll(switch_vb_t *vb); +SWITCH_DECLARE(switch_status_t) switch_vb_push_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len); SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len); SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t *len); SWITCH_DECLARE(uint32_t) switch_vb_pop_nack(switch_vb_t *vb); diff --git a/src/switch_rtp.c b/src/switch_rtp.c index 06670b90bf..a615b1449b 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -7018,11 +7018,10 @@ static int rtp_common_write(switch_rtp_t *rtp_session, if (rtp_session->flags[SWITCH_RTP_FLAG_NACK]) { if (!rtp_session->vbw) { - switch_vb_create(&rtp_session->vbw, 500, 500, rtp_session->pool); - switch_vb_set_flag(rtp_session->vbw, SVB_QUEUE_ONLY); + switch_vb_create(&rtp_session->vbw, 150, 150, rtp_session->pool); //switch_vb_debug_level(rtp_session->vbw, 10); } - switch_vb_put_packet(rtp_session->vbw, (switch_rtp_packet_t *)send_msg, bytes); + switch_vb_push_packet(rtp_session->vbw, (switch_rtp_packet_t *)send_msg, bytes); } #ifdef RTP_WRITE_PLOSS diff --git a/src/switch_utils.c b/src/switch_utils.c index d270081b20..e53c66d079 100644 --- a/src/switch_utils.c +++ b/src/switch_utils.c @@ -96,6 +96,7 @@ SWITCH_DECLARE(switch_status_t) switch_frame_alloc(switch_frame_t **frame, switc typedef struct switch_frame_node_s { switch_frame_t *frame; int inuse; + struct switch_frame_node_s *prev; struct switch_frame_node_s *next; } switch_frame_node_t; @@ -103,19 +104,37 @@ struct switch_frame_buffer_s { switch_frame_node_t *head; switch_memory_pool_t *pool; switch_mutex_t *mutex; + uint32_t total; }; static switch_frame_t *find_free_frame(switch_frame_buffer_t *fb, switch_frame_t *orig) { switch_frame_node_t *np; + int x = 0; switch_mutex_lock(fb->mutex); - for (np = fb->head; np; np = np->next) { - if (!np->inuse && ((orig->packet && np->frame->packet) || (!orig->packet && !np->frame->packet))) { - break; - } - } + for (np = fb->head; np; np = np->next) { + x++; + + if (!np->inuse && ((orig->packet && np->frame->packet) || (!orig->packet && !np->frame->packet))) { + + if (np == fb->head) { + fb->head = np->next; + } else if (np->prev) { + np->prev->next = np->next; + } + + if (np->next) { + np->next->prev = np->prev; + } + + fb->total--; + np->prev = np->next = NULL; + break; + } + } + if (!np) { np = switch_core_alloc(fb->pool, sizeof(*np)); np->frame = switch_core_alloc(fb->pool, sizeof(*np->frame)); @@ -126,11 +145,8 @@ static switch_frame_t *find_free_frame(switch_frame_buffer_t *fb, switch_frame_t np->frame->data = switch_core_alloc(fb->pool, SWITCH_RTP_MAX_BUF_LEN); np->frame->buflen = SWITCH_RTP_MAX_BUF_LEN; } - np->next = fb->head; - fb->head = np; } - np->frame->samples = orig->samples; np->frame->rate = orig->rate; np->frame->channels = orig->channels; @@ -177,10 +193,24 @@ SWITCH_DECLARE(switch_status_t) switch_frame_buffer_free(switch_frame_buffer_t * old_frame = *frameP; *frameP = NULL; - + node = (switch_frame_node_t *) old_frame->extra_data; node->inuse = 0; switch_img_free(&node->frame->img); + + fb->total++; + + if (fb->head) { + fb->head->prev = node; + } + + node->next = fb->head; + node->prev = NULL; + fb->head = node; + + switch_assert(node->next != node); + switch_assert(node->prev != node); + switch_mutex_unlock(fb->mutex); diff --git a/src/switch_vidderbuffer.c b/src/switch_vidderbuffer.c index 1cce952e4e..1b0c1b7e4d 100644 --- a/src/switch_vidderbuffer.c +++ b/src/switch_vidderbuffer.c @@ -44,6 +44,7 @@ typedef struct switch_vb_node_s { switch_rtp_packet_t packet; uint32_t len; uint8_t visible; + struct switch_vb_node_s *prev; struct switch_vb_node_s *next; } switch_vb_node_t; @@ -67,6 +68,7 @@ struct switch_vb_s { switch_inthash_t *missing_seq_hash; switch_inthash_t *node_hash; switch_mutex_t *mutex; + switch_mutex_t *list_mutex; switch_memory_pool_t *pool; int free_pool; switch_vb_flag_t flags; @@ -74,25 +76,26 @@ struct switch_vb_s { static inline switch_vb_node_t *new_node(switch_vb_t *vb) { - switch_vb_node_t *np, *last = NULL; + switch_vb_node_t *np; + + switch_mutex_lock(vb->list_mutex); for (np = vb->node_list; np; np = np->next) { if (!np->visible) { break; } - last = np; } if (!np) { np = switch_core_alloc(vb->pool, sizeof(*np)); - - if (last) { - last->next = np; - } else { - vb->node_list = np; + + np->next = vb->node_list; + if (np->next) { + np->next->prev = np; } - + vb->node_list = np; + } switch_assert(np); @@ -101,41 +104,89 @@ static inline switch_vb_node_t *new_node(switch_vb_t *vb) vb->visible_nodes++; np->parent = vb; + switch_mutex_unlock(vb->list_mutex); + return np; } -#if 0 -static inline switch_vb_node_t *find_seq(switch_vb_t *vb, uint16_t seq) +static inline void push_to_top(switch_vb_t *vb, switch_vb_node_t *node) { - switch_vb_node_t *np; - for (np = vb->node_list; np; np = np->next) { - if (!np->visible) continue; + if (node == vb->node_list) { + vb->node_list = node->next; + } else if (node->prev) { + node->prev->next = node->next; + } - if (ntohs(np->packet.header.seq) == ntohs(seq)) { - return np; + if (node->next) { + node->next->prev = node->prev; + } + + node->next = vb->node_list; + node->prev = NULL; + + if (node->next) { + node->next->prev = node; + } + + vb->node_list = node; + + switch_assert(node->next != node); + switch_assert(node->prev != node); +} + +static inline void hide_node(switch_vb_node_t *node, switch_bool_t pop) +{ + switch_vb_t *vb = node->parent; + + switch_mutex_lock(vb->list_mutex); + + if (node->visible) { + node->visible = 0; + vb->visible_nodes--; + + if (pop) { + push_to_top(vb, node); } } - return NULL; -} -#endif + switch_core_inthash_delete(vb->node_hash, node->packet.header.seq); -static inline void hide_node(switch_vb_node_t *node) + switch_mutex_unlock(vb->list_mutex); +} + +static inline void sort_free_nodes(switch_vb_t *vb) { - if (node->visible) { - node->visible = 0; - node->parent->visible_nodes--; + switch_vb_node_t *np, *this_np; + int start = 0; + + switch_mutex_lock(vb->list_mutex); + np = vb->node_list; + + while(np) { + this_np = np; + np = np->next; + + if (this_np->visible) { + start++; + } + + if (start && !this_np->visible) { + push_to_top(vb, this_np); + } } - switch_core_inthash_delete(node->parent->node_hash, node->packet.header.seq); + + switch_mutex_unlock(vb->list_mutex); } static inline void hide_nodes(switch_vb_t *vb) { switch_vb_node_t *np; + switch_mutex_lock(vb->list_mutex); for (np = vb->node_list; np; np = np->next) { - hide_node(np); + hide_node(np, SWITCH_FALSE); } + switch_mutex_unlock(vb->list_mutex); } static inline void drop_ts(switch_vb_t *vb, uint32_t ts) @@ -143,15 +194,22 @@ static inline void drop_ts(switch_vb_t *vb, uint32_t ts) switch_vb_node_t *np; int x = 0; + switch_mutex_lock(vb->list_mutex); for (np = vb->node_list; np; np = np->next) { if (!np->visible) continue; if (ts == np->packet.header.ts) { - hide_node(np); + hide_node(np, SWITCH_FALSE); x++; } } + if (x) { + sort_free_nodes(vb); + } + + switch_mutex_unlock(vb->list_mutex); + if (x) vb->complete_frames--; } @@ -159,6 +217,7 @@ static inline uint32_t vb_find_lowest_ts(switch_vb_t *vb) { switch_vb_node_t *np, *lowest = NULL; + switch_mutex_lock(vb->list_mutex); for (np = vb->node_list; np; np = np->next) { if (!np->visible) continue; @@ -166,6 +225,7 @@ static inline uint32_t vb_find_lowest_ts(switch_vb_t *vb) lowest = np; } } + switch_mutex_unlock(vb->list_mutex); return lowest ? lowest->packet.header.ts : 0; } @@ -239,6 +299,7 @@ static inline switch_vb_node_t *vb_find_lowest_seq(switch_vb_t *vb) { switch_vb_node_t *np, *lowest = NULL; + switch_mutex_lock(vb->list_mutex); for (np = vb->node_list; np; np = np->next) { if (!np->visible) continue; @@ -246,6 +307,7 @@ static inline switch_vb_node_t *vb_find_lowest_seq(switch_vb_t *vb) lowest = np; } } + switch_mutex_unlock(vb->list_mutex); return lowest; } @@ -294,7 +356,9 @@ static inline switch_status_t vb_next_packet(switch_vb_t *vb, switch_vb_node_t * static inline void free_nodes(switch_vb_t *vb) { + switch_mutex_lock(vb->list_mutex); vb->node_list = NULL; + switch_mutex_unlock(vb->list_mutex); } SWITCH_DECLARE(void) switch_vb_set_flag(switch_vb_t *vb, switch_vb_flag_t flag) @@ -370,7 +434,7 @@ SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min switch_core_inthash_init(&vb->missing_seq_hash); switch_core_inthash_init(&vb->node_hash); switch_mutex_init(&vb->mutex, SWITCH_MUTEX_NESTED, pool); - + switch_mutex_init(&vb->list_mutex, SWITCH_MUTEX_NESTED, pool); *vbp = vb; @@ -569,7 +633,7 @@ SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp *packet = node->packet; *len = node->len; memcpy(packet->body, node->packet.body, node->len); - hide_node(node); + hide_node(node, SWITCH_TRUE); vb_debug(vb, 1, "GET packet ts:%u seq:%u %s\n", ntohl(packet->header.ts), ntohs(packet->header.seq), packet->header.m ? " " : "");