From fa7695847a48df255a85bf48fe1765e74aa7ac68 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Thu, 26 Mar 2015 15:00:55 -0500 Subject: [PATCH] FS-7499: improve generic nack and vpx framing --- src/include/switch_core.h | 1 + src/include/switch_hashtable.h | 1 + src/include/switch_vidderbuffer.h | 7 + src/mod/codecs/mod_vpx/mod_vpx.c | 375 ++++++++++++++++-------------- src/switch_core_hash.c | 5 + src/switch_hashtable.c | 7 +- src/switch_rtp.c | 31 ++- src/switch_vidderbuffer.c | 67 ++++-- 8 files changed, 302 insertions(+), 192 deletions(-) diff --git a/src/include/switch_core.h b/src/include/switch_core.h index e379cc8ff4..39105d0aa9 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -1493,6 +1493,7 @@ SWITCH_DECLARE(switch_hash_index_t *) switch_core_hash_next(_In_ switch_hash_ind SWITCH_DECLARE(void) switch_core_hash_this(_In_ switch_hash_index_t *hi, _Out_opt_ptrdiff_cap_(klen) const void **key, _Out_opt_ switch_ssize_t *klen, _Out_ void **val); +SWITCH_DECLARE(void) switch_core_hash_this_val(switch_hash_index_t *hi, void *val); SWITCH_DECLARE(switch_status_t) switch_core_inthash_init(switch_inthash_t **hash); SWITCH_DECLARE(switch_status_t) switch_core_inthash_destroy(switch_inthash_t **hash); diff --git a/src/include/switch_hashtable.h b/src/include/switch_hashtable.h index 20c18276bc..80bc6ba5dc 100644 --- a/src/include/switch_hashtable.h +++ b/src/include/switch_hashtable.h @@ -196,6 +196,7 @@ SWITCH_DECLARE(switch_hashtable_iterator_t*) switch_hashtable_first_iter(switch_ #define switch_hashtable_first(_h) switch_hashtable_first_iter(_h, NULL) SWITCH_DECLARE(switch_hashtable_iterator_t*) switch_hashtable_next(switch_hashtable_iterator_t **iP); SWITCH_DECLARE(void) switch_hashtable_this(switch_hashtable_iterator_t *i, const void **key, switch_ssize_t *klen, void **val); +SWITCH_DECLARE(void) switch_hashtable_this_val(switch_hashtable_iterator_t *i, void *val); static inline uint32_t switch_hash_default_int(void *ky) { uint32_t x = *((uint32_t *)ky); diff --git a/src/include/switch_vidderbuffer.h b/src/include/switch_vidderbuffer.h index 3fc7a91e14..6f2b02e7a5 100644 --- a/src/include/switch_vidderbuffer.h +++ b/src/include/switch_vidderbuffer.h @@ -33,6 +33,11 @@ #ifndef SWITCH_VIDDERBUFFER_H #define SWITCH_VIDDERBUFFER_H +typedef enum { + SVB_QUEUE_ONLY = (1 << 0) +} switch_vb_flag_t; + + SWITCH_BEGIN_EXTERN_C SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len, switch_memory_pool_t *pool); SWITCH_DECLARE(switch_status_t) switch_vb_destroy(switch_vb_t **vbp); @@ -44,6 +49,8 @@ SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t *len); SWITCH_DECLARE(uint32_t) switch_vb_pop_nack(switch_vb_t *vb); SWITCH_DECLARE(switch_status_t) switch_vb_get_packet_by_seq(switch_vb_t *vb, uint16_t seq, switch_rtp_packet_t *packet, switch_size_t *len); +SWITCH_DECLARE(void) switch_vb_set_flag(switch_vb_t *vb, switch_vb_flag_t flag); +SWITCH_DECLARE(void) switch_vb_clear_flag(switch_vb_t *vb, switch_vb_flag_t flag); SWITCH_END_EXTERN_C #endif diff --git a/src/mod/codecs/mod_vpx/mod_vpx.c b/src/mod/codecs/mod_vpx/mod_vpx.c index 8306c23c3a..d57b7915b3 100644 --- a/src/mod/codecs/mod_vpx/mod_vpx.c +++ b/src/mod/codecs/mod_vpx/mod_vpx.c @@ -41,7 +41,155 @@ #define SLICE_SIZE SWITCH_DEFAULT_VIDEO_SIZE #define KEY_FRAME_MIN_FREQ 1000000 -#define IS_VP8_KEY_FRAME(byte) (((byte) & 0x01) ^ 0x01) + +/* http://tools.ietf.org/html/draft-ietf-payload-vp8-10 + + The first octets after the RTP header are the VP8 payload descriptor, with the following structure. + + 0 1 2 3 4 5 6 7 + +-+-+-+-+-+-+-+-+ + |X|R|N|S|R| PID | (REQUIRED) + +-+-+-+-+-+-+-+-+ + X: |I|L|T|K| RSV | (OPTIONAL) + +-+-+-+-+-+-+-+-+ + I: |M| PictureID | (OPTIONAL) + +-+-+-+-+-+-+-+-+ + L: | TL0PICIDX | (OPTIONAL) + +-+-+-+-+-+-+-+-+ + T/K:|TID|Y| KEYIDX | (OPTIONAL) + +-+-+-+-+-+-+-+-+ + + + VP8 Payload Header + + 0 1 2 3 4 5 6 7 + +-+-+-+-+-+-+-+-+ + |Size0|H| VER |P| + +-+-+-+-+-+-+-+-+ + | Size1 | + +-+-+-+-+-+-+-+-+ + | Size2 | + +-+-+-+-+-+-+-+-+ + | Bytes 4..N of | + | VP8 payload | + : : + +-+-+-+-+-+-+-+-+ + | OPTIONAL RTP | + | padding | + : : + +-+-+-+-+-+-+-+-+ +*/ + + +#ifdef _MSC_VER +#pragma pack(push, r1, 1) +#endif + +#if SWITCH_BYTE_ORDER == __BIG_ENDIAN + +typedef struct { + unsigned extended:1; + unsigned reserved1:1; + unsigned non_referenced:1; + unsigned start:1; + unsigned reserved2:1; + unsigned pid:3; +} vp8_payload_descriptor_t; + +#ifdef WHAT_THEY_FUCKING_SAY +typedef struct { + unsigned have_pid:1; + unsigned have_layer_ind:1; + unsigned have_ref_ind:1; + unsigned start:1; + unsigned end:1; + unsigned have_ss:1; + unsigned have_su:1; + unsigned zero:1; +} vp9_payload_descriptor_t; + +#else +typedef struct { + unsigned dunno:6; + unsigned start:1; + unsigned key:1; +} vp9_payload_descriptor_t; +#endif + + +#else /* ELSE LITTLE */ + +typedef struct { + unsigned pid:3; + unsigned reserved2:1; + unsigned start:1; + unsigned non_referenced:1; + unsigned reserved1:1; + unsigned extended:1; +} vp8_payload_descriptor_t; + +#ifdef WHAT_THEY_FUCKING_SAY +typedef struct { + unsigned zero:1; + unsigned have_su:1; + unsigned have_ss:1; + unsigned end:1; + unsigned start:1; + unsigned have_ref_ind:1; + unsigned have_layer_ind:1; + unsigned have_pid:1; +} vp9_payload_descriptor_t; +#else +typedef struct { + unsigned key:1; + unsigned start:1; + unsigned dunno:6; +} vp9_payload_descriptor_t; +#endif + +#endif + +typedef union { + vp8_payload_descriptor_t vp8; + vp9_payload_descriptor_t vp9; +} vpx_payload_descriptor_t; + +#ifdef _MSC_VER +#pragma pack(pop, r1) +#endif + + +#define __IS_VP8_KEY_FRAME(byte) (((byte) & 0x01) ^ 0x01) +static inline int IS_VP8_KEY_FRAME(uint8_t *data) +{ + uint8_t S; + uint8_t DES; + uint8_t PID; + + DES = *data; + data++; + S = DES & 0x10; + PID = DES & 0x07; + + if (DES & 0x80) { // X + uint8_t X = *data; + data++; + if (X & 0x80) { // I + uint8_t M = (*data) & 0x80; + data++; + if (M) data++; + } + if (X & 0x40) data++; // L + if (X & 0x30) data++; // T/K + } + + if (S && PID == 0) { + return __IS_VP8_KEY_FRAME(*data); + } else { + return 0; + } +} + #define IS_VP9_KEY_FRAME(byte) ((byte) & 0x01) #define IS_VP9_START_PKT(byte) ((byte) & 0x02) @@ -116,16 +264,24 @@ static switch_status_t init_decoder(switch_codec_t *codec) return SWITCH_STATUS_FALSE; } + + context->last_ts = 0; + context->last_received_timestamp = 0; + context->last_received_complete_picture = 0; context->decoder_init = 1; - + context->got_key_frame = 0; // the types of post processing to be done, should be combination of "vp8_postproc_level" - ppcfg.post_proc_flag = VP8_DEMACROBLOCK | VP8_DEBLOCK; + ppcfg.post_proc_flag = VP8_DEBLOCK;//VP8_DEMACROBLOCK | VP8_DEBLOCK; // the strength of deblocking, valid range [0, 16] - ppcfg.deblocking_level = 3; + ppcfg.deblocking_level = 1; // Set deblocking settings vpx_codec_control(&context->decoder, VP8_SET_POSTPROC, &ppcfg); - switch_buffer_create_dynamic(&context->vpx_packet_buffer, 512, 512, 1024000); + if (context->vpx_packet_buffer) { + switch_buffer_zero(context->vpx_packet_buffer); + } else { + switch_buffer_create_dynamic(&context->vpx_packet_buffer, 512, 512, 0); + } } return SWITCH_STATUS_SUCCESS; @@ -177,7 +333,7 @@ static switch_status_t init_encoder(switch_codec_t *codec) config->rc_target_bitrate = context->bandwidth; config->g_lag_in_frames = 0; config->kf_max_dist = 2000; - config->g_threads = cpus;//(cpus > 1) ? 2 : 1; + config->g_threads = (cpus > 1) ? 2 : 1; if (context->is_vp9) { //config->rc_dropframe_thresh = 2; @@ -344,122 +500,6 @@ static switch_status_t switch_vpx_init(switch_codec_t *codec, switch_codec_flag_ return SWITCH_STATUS_SUCCESS; } -/* http://tools.ietf.org/html/draft-ietf-payload-vp8-10 - - The first octets after the RTP header are the VP8 payload descriptor, with the following structure. - - 0 1 2 3 4 5 6 7 - +-+-+-+-+-+-+-+-+ - |X|R|N|S|R| PID | (REQUIRED) - +-+-+-+-+-+-+-+-+ - X: |I|L|T|K| RSV | (OPTIONAL) - +-+-+-+-+-+-+-+-+ - I: |M| PictureID | (OPTIONAL) - +-+-+-+-+-+-+-+-+ - L: | TL0PICIDX | (OPTIONAL) - +-+-+-+-+-+-+-+-+ - T/K:|TID|Y| KEYIDX | (OPTIONAL) - +-+-+-+-+-+-+-+-+ - - - VP8 Payload Header - - 0 1 2 3 4 5 6 7 - +-+-+-+-+-+-+-+-+ - |Size0|H| VER |P| - +-+-+-+-+-+-+-+-+ - | Size1 | - +-+-+-+-+-+-+-+-+ - | Size2 | - +-+-+-+-+-+-+-+-+ - | Bytes 4..N of | - | VP8 payload | - : : - +-+-+-+-+-+-+-+-+ - | OPTIONAL RTP | - | padding | - : : - +-+-+-+-+-+-+-+-+ -*/ - - -#ifdef _MSC_VER -#pragma pack(push, r1, 1) -#endif - -#if SWITCH_BYTE_ORDER == __BIG_ENDIAN - -typedef struct { - unsigned extended:1; - unsigned reserved1:1; - unsigned non_referenced:1; - unsigned start:1; - unsigned reserved2:1; - unsigned pid:3; -} vp8_payload_descriptor_t; - -#ifdef WHAT_THEY_FUCKING_SAY -typedef struct { - unsigned have_pid:1; - unsigned have_layer_ind:1; - unsigned have_ref_ind:1; - unsigned start:1; - unsigned end:1; - unsigned have_ss:1; - unsigned have_su:1; - unsigned zero:1; -} vp9_payload_descriptor_t; - -#else -typedef struct { - unsigned dunno:6; - unsigned start:1; - unsigned key:1; -} vp9_payload_descriptor_t; -#endif - - -#else /* ELSE LITTLE */ - -typedef struct { - unsigned pid:3; - unsigned reserved2:1; - unsigned start:1; - unsigned non_referenced:1; - unsigned reserved1:1; - unsigned extended:1; -} vp8_payload_descriptor_t; - -#ifdef WHAT_THEY_FUCKING_SAY -typedef struct { - unsigned zero:1; - unsigned have_su:1; - unsigned have_ss:1; - unsigned end:1; - unsigned start:1; - unsigned have_ref_ind:1; - unsigned have_layer_ind:1; - unsigned have_pid:1; -} vp9_payload_descriptor_t; -#else -typedef struct { - unsigned key:1; - unsigned start:1; - unsigned dunno:6; -} vp9_payload_descriptor_t; -#endif - -#endif - -typedef union { - vp8_payload_descriptor_t vp8; - vp9_payload_descriptor_t vp9; -} vpx_payload_descriptor_t; - -#ifdef _MSC_VER -#pragma pack(pop, r1) -#endif - static switch_status_t consume_partition(vpx_context_t *context, switch_frame_t *frame) { vpx_payload_descriptor_t *payload_descriptor; @@ -611,9 +651,7 @@ static switch_status_t switch_vpx_encode(switch_codec_t *codec, switch_frame_t * dur, vpx_flags, VPX_DL_REALTIME) != VPX_CODEC_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "VPX encode error %d:%s\n", - context->encoder.err, context->encoder.err_detail); - + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "VPX encode error %d:%s\n", context->encoder.err, context->encoder.err_detail); frame->datalen = 0; return SWITCH_STATUS_FALSE; } @@ -630,13 +668,13 @@ static switch_status_t buffer_vp8_packets(vpx_context_t *context, switch_frame_t uint8_t *data = frame->data; uint8_t S; uint8_t DES; - uint8_t PID; + //uint8_t PID; int len; DES = *data; data++; S = DES & 0x10; - PID = DES & 0x07; + //PID = DES & 0x07; if (DES & 0x80) { // X uint8_t X = *data; @@ -650,32 +688,26 @@ static switch_status_t buffer_vp8_packets(vpx_context_t *context, switch_frame_t if (X & 0x30) data++; // T/K } + if (!switch_buffer_inuse(context->vpx_packet_buffer) && !S) { + if (context->got_key_frame > 0) { + context->got_key_frame = 0; + } + return SWITCH_STATUS_MORE_DATA; + } + + if (S) { + switch_buffer_zero(context->vpx_packet_buffer); + } + len = frame->datalen - (data - (uint8_t *)frame->data); if (len <= 0) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid packet %d\n", len); return SWITCH_STATUS_RESTART; } - - if (S && (PID == 0)) { - int is_keyframe = IS_VP8_KEY_FRAME(*data); - - if (is_keyframe && context->got_key_frame <= 0) { - context->got_key_frame = 1; - } - } - - if (context->got_key_frame <= 0) { - if ((context->got_key_frame-- % 200) == 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Waiting for key frame\n"); - } - return SWITCH_STATUS_RESTART; - } switch_buffer_write(context->vpx_packet_buffer, data, len); - return SWITCH_STATUS_SUCCESS; - } static switch_status_t buffer_vp9_packets(vpx_context_t *context, switch_frame_t *frame) @@ -720,7 +752,7 @@ static switch_status_t switch_vpx_decode(switch_codec_t *codec, switch_frame_t * if (context->is_vp9) { is_keyframe = IS_VP9_KEY_FRAME(*(unsigned char *)frame->data); } else { // vp8 - is_keyframe = IS_VP8_KEY_FRAME(*(unsigned char *)frame->data); + is_keyframe = IS_VP8_KEY_FRAME((uint8_t *)frame->data); } if (context->need_decoder_reset != 0) { @@ -740,21 +772,26 @@ static switch_status_t switch_vpx_decode(switch_codec_t *codec, switch_frame_t * } decoder = &context->decoder; - + // switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "len: %d ts: %u mark:%d\n", frame->datalen, frame->timestamp, frame->m); - if (!is_keyframe && context->last_received_timestamp && context->last_received_timestamp != frame->timestamp && - (!frame->m) && (!context->last_received_complete_picture)) { - // possible packet loss - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset\n"); - context->need_key_frame = 1; - context->last_ts = 0; - switch_goto_status(SWITCH_STATUS_RESTART, end); - } - context->last_received_timestamp = frame->timestamp; context->last_received_complete_picture = frame->m ? SWITCH_TRUE : SWITCH_FALSE; + if (is_keyframe) { + if (context->got_key_frame <= 0) { + context->got_key_frame = 1; + } else { + context->got_key_frame++; + } + } else if (context->got_key_frame <= 0) { + if ((--context->got_key_frame % 200) == 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Waiting for key frame\n"); + } + switch_goto_status(SWITCH_STATUS_MORE_DATA, end); + } + + status = context->is_vp9 ? buffer_vp9_packets(context, frame) : buffer_vp8_packets(context, frame); //printf("READ buf:%ld got_key:%d st:%d m:%d\n", switch_buffer_inuse(context->vpx_packet_buffer), context->got_key_frame, status, frame->m); @@ -785,7 +822,7 @@ static switch_status_t switch_vpx_decode(switch_codec_t *codec, switch_frame_t * if (err != VPX_CODEC_OK) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error decoding %" SWITCH_SIZE_T_FMT " bytes, [%d:%s:%s]\n", - len, err, vpx_codec_error(decoder), vpx_codec_error_detail(decoder)); + len, err, vpx_codec_error(decoder), vpx_codec_error_detail(decoder)); switch_goto_status(SWITCH_STATUS_RESTART, end); } @@ -793,29 +830,28 @@ static switch_status_t switch_vpx_decode(switch_codec_t *codec, switch_frame_t * switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "VPX control error!\n"); switch_goto_status(SWITCH_STATUS_RESTART, end); } - - frame->img = (switch_image_t *) vpx_codec_get_frame(decoder, &iter); - - if (!(frame->img) || corrupted) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "VPX invalid packet\n"); - switch_goto_status(SWITCH_STATUS_RESTART, end); + + if (corrupted) { + frame->img = NULL; + } else { + frame->img = (switch_image_t *) vpx_codec_get_frame(decoder, &iter); } - + switch_buffer_zero(context->vpx_packet_buffer); + + if (!frame->img) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "VPX invalid packet\n"); + status = SWITCH_STATUS_RESTART; + } } end: if (status == SWITCH_STATUS_RESTART) { - if (context->got_key_frame > 0) { - context->got_key_frame = 0; - } - switch_buffer_zero(context->vpx_packet_buffer); + context->need_decoder_reset = 1; } if (!frame->img || status == SWITCH_STATUS_RESTART) { - //switch_set_flag(frame, SFF_USE_VIDEO_TIMESTAMP); - //} else { status = SWITCH_STATUS_MORE_DATA; } @@ -823,7 +859,6 @@ end: switch_set_flag(frame, SFF_WAIT_KEY_FRAME); } - return status; } diff --git a/src/switch_core_hash.c b/src/switch_core_hash.c index e3f4aaf171..728cbdc877 100644 --- a/src/switch_core_hash.c +++ b/src/switch_core_hash.c @@ -231,6 +231,11 @@ SWITCH_DECLARE(void) switch_core_hash_this(switch_hash_index_t *hi, const void * switch_hashtable_this(hi, key, klen, val); } +SWITCH_DECLARE(void) switch_core_hash_this_val(switch_hash_index_t *hi, void *val) +{ + switch_hashtable_this_val(hi, val); +} + SWITCH_DECLARE(switch_status_t) switch_core_inthash_init(switch_inthash_t **hash) { diff --git a/src/switch_hashtable.c b/src/switch_hashtable.c index 597312b805..458b5abcab 100644 --- a/src/switch_hashtable.c +++ b/src/switch_hashtable.c @@ -332,7 +332,12 @@ SWITCH_DECLARE(switch_hashtable_iterator_t *) switch_hashtable_first_iter(switch return switch_hashtable_next(&iterator); } - +SWITCH_DECLARE(void) switch_hashtable_this_val(switch_hashtable_iterator_t *i, void *val) +{ + if (i->e) { + i->e->v = val; + } +} SWITCH_DECLARE(void) switch_hashtable_this(switch_hashtable_iterator_t *i, const void **key, switch_ssize_t *klen, void **val) { diff --git a/src/switch_rtp.c b/src/switch_rtp.c index 4ccc41cf5d..06670b90bf 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -958,7 +958,6 @@ static void handle_ice(switch_rtp_t *rtp_session, switch_rtp_ice_t *ice, void *d if (rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) { switch_core_session_video_reinit(rtp_session->session); } - switch_rtp_set_flag(rtp_session, SWITCH_RTP_FLAG_FLUSH); } } @@ -1109,7 +1108,6 @@ static void handle_ice(switch_rtp_t *rtp_session, switch_rtp_ice_t *ice, void *d if (rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) { switch_core_session_video_reinit(rtp_session->session); } - switch_rtp_set_flag(rtp_session, SWITCH_RTP_FLAG_FLUSH); } memset(stunbuf, 0, sizeof(stunbuf)); @@ -2413,7 +2411,12 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_local_address(switch_rtp_t *rtp_s *err = "Socket Error!"; goto done; } - + + if (rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) { + switch_socket_opt_set(new_sock, SWITCH_SO_RCVBUF, 786432); + switch_socket_opt_set(new_sock, SWITCH_SO_SNDBUF, 786432); + } + if (switch_socket_bind(new_sock, rtp_session->local_addr) != SWITCH_STATUS_SUCCESS) { char *em = switch_core_sprintf(rtp_session->pool, "Bind Error! %s:%d", host, port); *err = em; @@ -3961,6 +3964,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_activate_ice(switch_rtp_t *rtp_sessio switch_port_t port = 0; char bufc[30]; + switch_mutex_lock(rtp_session->ice_mutex); if (proto == IPR_RTP) { @@ -4045,7 +4049,9 @@ SWITCH_DECLARE(void) switch_rtp_flush(switch_rtp_t *rtp_session) return; } - switch_rtp_set_flag(rtp_session, SWITCH_RTP_FLAG_FLUSH); + if (!rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) { + switch_rtp_set_flag(rtp_session, SWITCH_RTP_FLAG_FLUSH); + } } SWITCH_DECLARE(void) switch_rtp_video_refresh(switch_rtp_t *rtp_session) @@ -4370,6 +4376,10 @@ SWITCH_DECLARE(void) switch_rtp_clear_flags(switch_rtp_t *rtp_session, switch_rt SWITCH_DECLARE(void) switch_rtp_set_flag(switch_rtp_t *rtp_session, switch_rtp_flag_t flag) { + if (flag == SWITCH_RTP_FLAG_FLUSH && rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) { + return; + } + switch_mutex_lock(rtp_session->flag_mutex); rtp_session->flags[flag] = 1; switch_mutex_unlock(rtp_session->flag_mutex); @@ -4605,6 +4615,7 @@ SWITCH_DECLARE(void) rtp_flush_read_buffer(switch_rtp_t *rtp_session, switch_rtp { if (rtp_session->flags[SWITCH_RTP_FLAG_PROXY_MEDIA] || + rtp_session->flags[SWITCH_RTP_FLAG_VIDEO] || rtp_session->flags[SWITCH_RTP_FLAG_UDPTL]) { return; } @@ -5300,6 +5311,7 @@ static void handle_nack(switch_rtp_t *rtp_session, uint32_t nack) send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m); } + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "RE----SEND %u\n", ntohs(send_msg->header.seq)); switch_rtp_write_raw(rtp_session, (void *) send_msg, &bytes, SWITCH_FALSE); } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Cannot send NACK for seq %u\n", ntohs(seq)); @@ -5321,6 +5333,7 @@ static void handle_nack(switch_rtp_t *rtp_session, uint32_t nack) send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m); } + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "RE----SEND %u\n", ntohs(send_msg->header.seq)); switch_rtp_write_raw(rtp_session, (void *) &send_msg, &bytes, SWITCH_FALSE); } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Cannot send NACK for seq %u\n", ntohs(seq) + i); @@ -7005,7 +7018,8 @@ static int rtp_common_write(switch_rtp_t *rtp_session, if (rtp_session->flags[SWITCH_RTP_FLAG_NACK]) { if (!rtp_session->vbw) { - switch_vb_create(&rtp_session->vbw, 5, 5, rtp_session->pool); + switch_vb_create(&rtp_session->vbw, 500, 500, rtp_session->pool); + switch_vb_set_flag(rtp_session->vbw, SVB_QUEUE_ONLY); //switch_vb_debug_level(rtp_session->vbw, 10); } switch_vb_put_packet(rtp_session->vbw, (switch_rtp_packet_t *)send_msg, bytes); @@ -7027,7 +7041,12 @@ static int rtp_common_write(switch_rtp_t *rtp_session, } } #else - + //if (rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) { + // + // rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_READ]++; + // + // //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SEND %u\n", ntohs(send_msg->header.seq)); + //} if (switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, (void *) send_msg, &bytes) != SWITCH_STATUS_SUCCESS) { rtp_session->seq--; ret = -1; diff --git a/src/switch_vidderbuffer.c b/src/switch_vidderbuffer.c index 07c83d1933..af60c79e7a 100644 --- a/src/switch_vidderbuffer.c +++ b/src/switch_vidderbuffer.c @@ -34,6 +34,9 @@ #define MAX_MISSING_SEQ 20 #define vb_debug(_vb, _level, _format, ...) if (_vb->debug_level >= _level) switch_log_printf(SWITCH_CHANNEL_LOG_CLEAN, SWITCH_LOG_ALERT, "VB:%p level:%d line:%d ->" _format, (void *) _vb, _level, __LINE__, __VA_ARGS__) +const char *TOKEN_1 = "ONE"; +const char *TOKEN_2 = "TWO"; + struct switch_vb_s; typedef struct switch_vb_node_s { @@ -66,6 +69,7 @@ struct switch_vb_s { switch_mutex_t *mutex; switch_memory_pool_t *pool; int free_pool; + switch_vb_flag_t flags; }; static inline switch_vb_node_t *new_node(switch_vb_t *vb) @@ -293,6 +297,15 @@ static inline void free_nodes(switch_vb_t *vb) vb->node_list = NULL; } +SWITCH_DECLARE(void) switch_vb_set_flag(switch_vb_t *vb, switch_vb_flag_t flag) +{ + switch_set_flag(vb, flag); +} + +SWITCH_DECLARE(void) switch_vb_clear_flag(switch_vb_t *vb, switch_vb_flag_t flag) +{ + switch_clear_flag(vb, flag); +} SWITCH_DECLARE(int) switch_vb_poll(switch_vb_t *vb) { @@ -394,10 +407,17 @@ SWITCH_DECLARE(uint32_t) switch_vb_pop_nack(switch_vb_t *vb) for (hi = switch_core_hash_first(vb->missing_seq_hash); hi; hi = switch_core_hash_next(&hi)) { uint16_t seq; - - switch_core_hash_this(hi, &var, NULL, &val); - seq = ntohs(*((uint16_t *) var)); + const char *token; + switch_core_hash_this(hi, &var, NULL, &val); + token = (const char *) val; + + if (token == TOKEN_2) { + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SKIP %u %s\n", ntohs(*((uint16_t *) var)), token); + continue; + } + seq = ntohs(*((uint16_t *) var)); + if (!least || seq < least) { least = seq; } @@ -406,9 +426,12 @@ SWITCH_DECLARE(uint32_t) switch_vb_pop_nack(switch_vb_t *vb) if (least && switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(least))) { vb_debug(vb, 3, "Found smallest NACKABLE seq %u\n", least); nack = (uint32_t) htons(least); - + + switch_core_inthash_insert(vb->missing_seq_hash, nack, (void *) TOKEN_2); + for(i = 0; i < 16; i++) { if (switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(least + i + 1))) { + switch_core_inthash_insert(vb->missing_seq_hash, (uint32_t)htons(least + i + 1), (void *) TOKEN_2); vb_debug(vb, 3, "Found addtl NACKABLE seq %u\n", least + i + 1); blp |= (1 << i); } @@ -435,23 +458,37 @@ SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp { uint32_t i; uint16_t want = ntohs(vb->next_seq), got = ntohs(packet->header.seq); + int missing = 0; switch_mutex_lock(vb->mutex); if (!want) want = got; - - if (got > want) { - vb_debug(vb, 2, "GOT %u WANTED %u; MARK SEQS MISSING %u - %u\n", got, want, want, got - 1); - for (i = want; i < got; i++) { - switch_core_inthash_insert(vb->missing_seq_hash, (uint32_t)htons(i), (void *)SWITCH_TRUE); - } - } else { - switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(got)); - } - - if (got >= want) { + if (switch_test_flag(vb, SVB_QUEUE_ONLY)) { vb->next_seq = htons(got + 1); + } else { + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "WTF %u\n", got); + + if (switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(got))) { + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "POPPED RESEND %u\n", got); + missing = 1; + } + + if (!missing || want == got) { + if (got > want) { + //vb_debug(vb, 2, "GOT %u WANTED %u; MARK SEQS MISSING %u - %u\n", got, want, want, got - 1); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "XXXXXXXXXXXXXXXXXX WTF GOT %u WANTED %u; MARK SEQS MISSING %u - %u\n", got, want, want, got - 1); + for (i = want; i < got; i++) { + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "MISSING %u\n", i); + switch_core_inthash_insert(vb->missing_seq_hash, (uint32_t)htons(i), (void *)TOKEN_1); + } + + } + + if (got >= want || (want - got) > 1000) { + vb->next_seq = htons(got + 1); + } + } } add_node(vb, packet, len);