This commit is contained in:
Anthony Minessale 2011-12-07 11:27:50 -06:00
parent fd9158d069
commit f1d5120f63
4 changed files with 25 additions and 6 deletions

View File

@ -407,6 +407,7 @@ switch_status_t rtmp_read_frame(switch_core_session_t *session, switch_frame_t *
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
cng: cng:
data = (switch_byte_t *) tech_pvt->read_frame.data; data = (switch_byte_t *) tech_pvt->read_frame.data;
data[0] = 65; data[0] = 65;
data[1] = 0; data[1] = 0;
@ -414,7 +415,7 @@ cng:
tech_pvt->read_frame.flags = SFF_CNG; tech_pvt->read_frame.flags = SFF_CNG;
tech_pvt->read_frame.codec = &tech_pvt->read_codec; tech_pvt->read_frame.codec = &tech_pvt->read_codec;
switch_core_timer_sync(&tech_pvt->timer); //switch_core_timer_sync(&tech_pvt->timer);
*frame = &tech_pvt->read_frame; *frame = &tech_pvt->read_frame;

View File

@ -520,6 +520,7 @@ struct rtmp_private {
const char *auth; const char *auth;
uint16_t maxlen; uint16_t maxlen;
int over_size;
}; };
struct rtmp_reg; struct rtmp_reg;

View File

@ -105,7 +105,9 @@ void rtmp_handle_control(rtmp_session_t *rsession, int amfnumber)
void rtmp_handle_invoke(rtmp_session_t *rsession, int amfnumber) void rtmp_handle_invoke(rtmp_session_t *rsession, int amfnumber)
{ {
rtmp_state_t *state = &rsession->amfstate[amfnumber]; rtmp_state_t *state = &rsession->amfstate[amfnumber];
//amf0_data *dump; #ifdef RTMP_DEBUG_IO
amf0_data *dump;
#endif
int i = 0; int i = 0;
buffer_helper_t helper = { state->buf, 0, state->origlen }; buffer_helper_t helper = { state->buf, 0, state->origlen };
int64_t transaction_id; int64_t transaction_id;
@ -885,10 +887,18 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
uint16_t len = state->origlen; uint16_t len = state->origlen;
switch_mutex_lock(rsession->tech_pvt->readbuf_mutex); switch_mutex_lock(rsession->tech_pvt->readbuf_mutex);
if (rsession->tech_pvt->maxlen && switch_buffer_inuse(rsession->tech_pvt->readbuf) > rsession->tech_pvt->maxlen * 3) { if (rsession->tech_pvt->maxlen && switch_buffer_inuse(rsession->tech_pvt->readbuf) > rsession->tech_pvt->maxlen * 5) {
rsession->tech_pvt->over_size++;
} else {
rsession->tech_pvt->over_size = 0;
}
if (rsession->tech_pvt->over_size > 10) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
"%s buffer > %u for 10 consecutive packets... Flushing buffer\n",
switch_core_session_get_name(rsession->tech_pvt->session), rsession->tech_pvt->maxlen * 5);
switch_buffer_zero(rsession->tech_pvt->readbuf); switch_buffer_zero(rsession->tech_pvt->readbuf);
#ifdef RTMP_DEBUG_IO #ifdef RTMP_DEBUG_IO
fprintf(rsession->io_debug_in, "[chunk_stream=%d type=0x%x ts=%d stream_id=0x%x] FLUSH BUFFER [exceeded %u]\n", rsession->amfnumber, state->type, (int)state->ts, state->stream_id, rsession->tech_pvt->maxlen * 3); fprintf(rsession->io_debug_in, "[chunk_stream=%d type=0x%x ts=%d stream_id=0x%x] FLUSH BUFFER [exceeded %u]\n", rsession->amfnumber, state->type, (int)state->ts, state->stream_id, rsession->tech_pvt->maxlen * 5);
#endif #endif
} }
switch_buffer_write(rsession->tech_pvt->readbuf, &len, 2); switch_buffer_write(rsession->tech_pvt->readbuf, &len, 2);

View File

@ -197,10 +197,10 @@ void *SWITCH_THREAD_FUNC rtmp_io_tcp_thread(switch_thread_t *thread, void *obj)
switch_mutex_unlock(io->mutex); switch_mutex_unlock(io->mutex);
if (status != SWITCH_STATUS_SUCCESS && status != SWITCH_STATUS_TIMEOUT) { if (status != SWITCH_STATUS_SUCCESS && status != SWITCH_STATUS_TIMEOUT) {
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "pollset_poll failed\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "pollset_poll failed\n");
continue; continue;
} else if (status == SWITCH_STATUS_TIMEOUT) { } else if (status == SWITCH_STATUS_TIMEOUT) {
switch_yield(1); switch_cond_next();
} }
for (i = 0; i < numfds; i++) { for (i = 0; i < numfds; i++) {
@ -219,6 +219,10 @@ void *SWITCH_THREAD_FUNC rtmp_io_tcp_thread(switch_thread_t *thread, void *obj)
if (switch_socket_opt_set(newsocket, SWITCH_SO_NONBLOCK, TRUE)) { if (switch_socket_opt_set(newsocket, SWITCH_SO_NONBLOCK, TRUE)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n");
} }
if (switch_socket_opt_set(newsocket, SWITCH_SO_TCP_NODELAY, 1)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't disable Nagle.\n");
}
if (rtmp_session_request(io->base.profile, &newsession) != SWITCH_STATUS_SUCCESS) { if (rtmp_session_request(io->base.profile, &newsession) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "RTMP session request failed\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "RTMP session request failed\n");
@ -314,6 +318,9 @@ switch_status_t rtmp_tcp_init(rtmp_profile_t *profile, const char *bindaddr, rtm
if (switch_socket_opt_set(io_tcp->listen_socket, SWITCH_SO_REUSEADDR, 1)) { if (switch_socket_opt_set(io_tcp->listen_socket, SWITCH_SO_REUSEADDR, 1)) {
goto fail; goto fail;
} }
if (switch_socket_opt_set(io_tcp->listen_socket, SWITCH_SO_TCP_NODELAY, 1)) {
goto fail;
}
if (switch_socket_bind(io_tcp->listen_socket, sa)) { if (switch_socket_bind(io_tcp->listen_socket, sa)) {
goto fail; goto fail;
} }