FS-7585: spaces to tabs and clean up trilling spaces

This commit is contained in:
Seven Du 2015-05-26 23:45:05 +08:00 committed by Michael Jerris
parent 2eda7eb3c3
commit e187479784
5 changed files with 638 additions and 638 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
/* /*
* mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application * mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2011, Barracuda Networks Inc. * Copyright (C) 2011, Barracuda Networks Inc.
* *
@ -21,7 +21,7 @@
* the Initial Developer. All Rights Reserved. * the Initial Developer. All Rights Reserved.
* *
* Contributor(s): * Contributor(s):
* *
* Mathieu Rene <mrene@avgs.ca> * Mathieu Rene <mrene@avgs.ca>
* *
* mod_rtmp.h -- RTMP Endpoint Module * mod_rtmp.h -- RTMP Endpoint Module
@ -37,7 +37,7 @@
#include "io.h" #include "io.h"
#include "types.h" #include "types.h"
//#define RTMP_DEBUG_IO //#define RTMP_DEBUG_IO
#define RTMP_DONT_HOLD #define RTMP_DONT_HOLD
#define RTMP_THREE_WAY_UUID_VARIABLE "rtmp_three_way_uuid" #define RTMP_THREE_WAY_UUID_VARIABLE "rtmp_three_way_uuid"
@ -162,9 +162,9 @@ codecID (byte & 0x0f) » 0 2: Sorensen H.263, 3: Screen video, 4: On2 VP6, 5:
frameType (byte & 0xf0) » 4 1: keyframe, 2: inter frame, 3: disposable inter frame frameType (byte & 0xf0) » 4 1: keyframe, 2: inter frame, 3: disposable inter frame
0x12: META 0x12: META
The contents of a meta packet are two AMF packets. The contents of a meta packet are two AMF packets.
The first is almost always a short uint16_be length-prefixed UTF-8 string (AMF type 0×02), The first is almost always a short uint16_be length-prefixed UTF-8 string (AMF type 0×02),
and the second is typically a mixed array (AMF type 0×08). However, the second chunk typically contains a variety of types, and the second is typically a mixed array (AMF type 0×08). However, the second chunk typically contains a variety of types,
so a full AMF parser should be used. so a full AMF parser should be used.
*/ */
@ -198,7 +198,7 @@ static inline rtmp_audio_format_t rtmp_audio_codec_get_format(uint8_t codec) {
static inline uint8_t rtmp_audio_codec(int channels, int bits, int rate, rtmp_audio_format_t format) { static inline uint8_t rtmp_audio_codec(int channels, int bits, int rate, rtmp_audio_format_t format) {
uint8_t codec = 0; uint8_t codec = 0;
switch (channels) { switch (channels) {
case 1: case 1:
break; break;
@ -207,7 +207,7 @@ static inline uint8_t rtmp_audio_codec(int channels, int bits, int rate, rtmp_au
default: default:
return 0; return 0;
} }
switch (bits) { switch (bits) {
case 8: case 8:
break; break;
@ -215,8 +215,8 @@ static inline uint8_t rtmp_audio_codec(int channels, int bits, int rate, rtmp_au
codec |= 2; codec |= 2;
default: default:
return 0; return 0;
} }
switch (rate) { switch (rate) {
case 0: case 0:
case 5500: case 5500:
@ -232,7 +232,7 @@ static inline uint8_t rtmp_audio_codec(int channels, int bits, int rate, rtmp_au
default: default:
return 0; return 0;
} }
switch(format) { switch(format) {
case RTMP_AUDIO_PCM: case RTMP_AUDIO_PCM:
break; break;
@ -254,7 +254,7 @@ static inline uint8_t rtmp_audio_codec(int channels, int bits, int rate, rtmp_au
default: default:
return 0; return 0;
} }
return codec; return codec;
} }
@ -280,12 +280,12 @@ typedef switch_status_t (*rtmp_invoke_function_t)(RTMP_INVOKE_FUNCTION_ARGS);
#define amf0_is_boolean(_x) (_x && (_x)->type == AMF0_TYPE_BOOLEAN) #define amf0_is_boolean(_x) (_x && (_x)->type == AMF0_TYPE_BOOLEAN)
#define amf0_is_object(_x) (_x && (_x)->type == AMF0_TYPE_OBJECT) #define amf0_is_object(_x) (_x && (_x)->type == AMF0_TYPE_OBJECT)
static inline char *amf0_get_string(amf0_data *x) static inline char *amf0_get_string(amf0_data *x)
{ {
return (amf0_is_string(x) ? (char*)amf0_string_get_uint8_ts(x) : NULL); return (amf0_is_string(x) ? (char*)amf0_string_get_uint8_ts(x) : NULL);
} }
static inline int amf0_get_number(amf0_data *x) static inline int amf0_get_number(amf0_data *x)
{ {
return (amf0_is_number(x) ? amf0_number_get_value(x) : 0); return (amf0_is_number(x) ? amf0_number_get_value(x) : 0);
} }
@ -321,7 +321,7 @@ typedef enum {
typedef enum { typedef enum {
SFLAG_AUDIO = (1 << 0), /* < Send audio */ SFLAG_AUDIO = (1 << 0), /* < Send audio */
SFLAG_VIDEO = (1 << 1) /* < Send video */ SFLAG_VIDEO = (1 << 1) /* < Send video */
} SFLAGS; } SFLAGS;
typedef enum { typedef enum {
PFLAG_RUNNING = (1 << 0) PFLAG_RUNNING = (1 << 0)
@ -357,11 +357,11 @@ struct rtmp_profile {
const char *bind_address; /* < Bind address */ const char *bind_address; /* < Bind address */
const char *io_name; /* < Name of I/O module (from config) */ const char *io_name; /* < Name of I/O module (from config) */
int chunksize; /* < Override default chunksize (from config) */ int chunksize; /* < Override default chunksize (from config) */
int buffer_len; /* < Receive buffer length the flash clients should use */ int buffer_len; /* < Receive buffer length the flash clients should use */
switch_hash_t *reg_hash; /* < Registration hashtable */ switch_hash_t *reg_hash; /* < Registration hashtable */
switch_thread_rwlock_t *reg_rwlock; /* < Registration hash rwlock */ switch_thread_rwlock_t *reg_rwlock; /* < Registration hash rwlock */
switch_bool_t auth_calls; /* < Require authentiation */ switch_bool_t auth_calls; /* < Require authentiation */
}; };
@ -417,52 +417,52 @@ struct rtmp_session {
rtmp_profile_t *profile; rtmp_profile_t *profile;
char uuid[SWITCH_UUID_FORMATTED_LENGTH+1]; char uuid[SWITCH_UUID_FORMATTED_LENGTH+1];
void *io_private; void *io_private;
rtmp_session_state_t state; rtmp_session_state_t state;
int parse_state; int parse_state;
uint16_t parse_remain; /* < Remaining bytes required before changing parse state */ uint16_t parse_remain; /* < Remaining bytes required before changing parse state */
int hdrsize; /* < The current header size */ int hdrsize; /* < The current header size */
int amfnumber; /* < The current AMF number */ int amfnumber; /* < The current AMF number */
rtmp_state_t amfstate[64]; rtmp_state_t amfstate[64];
rtmp_state_t amfstate_out[64]; rtmp_state_t amfstate_out[64];
switch_mutex_t *socket_mutex; switch_mutex_t *socket_mutex;
switch_mutex_t *count_mutex; switch_mutex_t *count_mutex;
int active_sessions; int active_sessions;
unsigned char hsbuf[2048]; unsigned char hsbuf[2048];
int hspos; int hspos;
uint16_t in_chunksize; uint16_t in_chunksize;
uint16_t out_chunksize; uint16_t out_chunksize;
/* Connect params */ /* Connect params */
const char *flashVer; const char *flashVer;
const char *swfUrl; const char *swfUrl;
const char *tcUrl; const char *tcUrl;
const char *app; const char *app;
const char *pageUrl; const char *pageUrl;
uint32_t capabilities; uint32_t capabilities;
uint32_t audioCodecs; uint32_t audioCodecs;
uint32_t videoCodecs; uint32_t videoCodecs;
uint32_t videoFunction; uint32_t videoFunction;
switch_thread_rwlock_t *rwlock; switch_thread_rwlock_t *rwlock;
rtmp_private_t *tech_pvt; /* < Active call's tech_pvt */ rtmp_private_t *tech_pvt; /* < Active call's tech_pvt */
#ifdef RTMP_DEBUG_IO #ifdef RTMP_DEBUG_IO
FILE *io_debug_in; FILE *io_debug_in;
FILE *io_debug_out; FILE *io_debug_out;
#endif #endif
const char *remote_address; const char *remote_address;
switch_port_t remote_port; switch_port_t remote_port;
switch_hash_t *session_hash; /* < Hash of call uuids and tech_pvt */ switch_hash_t *session_hash; /* < Hash of call uuids and tech_pvt */
switch_thread_rwlock_t *session_rwlock; /* < RWLock protecting session_hash */ switch_thread_rwlock_t *session_rwlock; /* < RWLock protecting session_hash */
rtmp_account_t *account; rtmp_account_t *account;
switch_thread_rwlock_t *account_rwlock; switch_thread_rwlock_t *account_rwlock;
uint32_t flags; uint32_t flags;
@ -471,18 +471,18 @@ struct rtmp_session {
uint64_t recv_ack_window; /* < ACK Window */ uint64_t recv_ack_window; /* < ACK Window */
uint64_t recv_ack_sent; /* < Bytes ack'd */ uint64_t recv_ack_sent; /* < Bytes ack'd */
uint64_t recv; /* < Bytes received */ uint64_t recv; /* < Bytes received */
uint32_t send_ack_window; uint32_t send_ack_window;
uint32_t send_ack; uint32_t send_ack;
uint32_t send; uint32_t send;
switch_time_t send_ack_ts; switch_time_t send_ack_ts;
uint32_t send_bw; /* < Current send bandwidth (in bytes/sec) */ uint32_t send_bw; /* < Current send bandwidth (in bytes/sec) */
uint32_t next_streamid; /* < The next stream id that will be used */ uint32_t next_streamid; /* < The next stream id that will be used */
uint32_t active_streamid; /* < The stream id returned by the last call to createStream */ uint32_t active_streamid; /* < The stream id returned by the last call to createStream */
uint32_t media_streamid; /* < The stream id that was used for the last "play" command, uint32_t media_streamid; /* < The stream id that was used for the last "play" command,
where we should send media */ where we should send media */
}; };
@ -490,32 +490,32 @@ struct rtmp_private {
unsigned int flags; unsigned int flags;
switch_codec_t read_codec; switch_codec_t read_codec;
switch_codec_t write_codec; switch_codec_t write_codec;
switch_frame_t read_frame; switch_frame_t read_frame;
unsigned char databuf[SWITCH_RECOMMENDED_BUFFER_SIZE]; /* < Buffer for read_frame */ unsigned char databuf[SWITCH_RECOMMENDED_BUFFER_SIZE]; /* < Buffer for read_frame */
switch_caller_profile_t *caller_profile; switch_caller_profile_t *caller_profile;
switch_mutex_t *mutex; switch_mutex_t *mutex;
switch_mutex_t *flag_mutex; switch_mutex_t *flag_mutex;
switch_core_session_t *session; switch_core_session_t *session;
switch_channel_t *channel; switch_channel_t *channel;
rtmp_session_t *rtmp_session; rtmp_session_t *rtmp_session;
int read_channel; /* RTMP channel #s for read and write */ int read_channel; /* RTMP channel #s for read and write */
int write_channel; int write_channel;
uint8_t audio_codec; uint8_t audio_codec;
uint8_t video_codec; uint8_t video_codec;
switch_time_t stream_start_ts; switch_time_t stream_start_ts;
switch_timer_t timer; switch_timer_t timer;
switch_buffer_t *readbuf; switch_buffer_t *readbuf;
switch_mutex_t *readbuf_mutex; switch_mutex_t *readbuf_mutex;
const char *display_callee_id_name; const char *display_callee_id_name;
const char *display_callee_id_number; const char *display_callee_id_number;
const char *auth_user; const char *auth_user;
const char *auth_domain; const char *auth_domain;
const char *auth; const char *auth;
@ -536,10 +536,10 @@ struct rtmp_reg {
}; };
typedef enum { typedef enum {
MSG_FULLHEADER = 1 MSG_FULLHEADER = 1
} rtmp_message_send_flag_t; } rtmp_message_send_flag_t;
/* Invokable functions from flash */ /* Invokable functions from flash */
RTMP_INVOKE_FUNCTION(rtmp_i_connect); RTMP_INVOKE_FUNCTION(rtmp_i_connect);
@ -591,9 +591,9 @@ switch_status_t rtmp_on_routing(switch_core_session_t *session);
switch_status_t rtmp_on_exchange_media(switch_core_session_t *session); switch_status_t rtmp_on_exchange_media(switch_core_session_t *session);
switch_status_t rtmp_on_soft_execute(switch_core_session_t *session); switch_status_t rtmp_on_soft_execute(switch_core_session_t *session);
switch_call_cause_t rtmp_outgoing_channel(switch_core_session_t *session, switch_event_t *var_event, switch_call_cause_t rtmp_outgoing_channel(switch_core_session_t *session, switch_event_t *var_event,
switch_caller_profile_t *outbound_profile, switch_caller_profile_t *outbound_profile,
switch_core_session_t **new_session, switch_memory_pool_t **pool, switch_originate_flag_t flags, switch_core_session_t **new_session, switch_memory_pool_t **pool, switch_originate_flag_t flags,
switch_call_cause_t *cancel_cause); switch_call_cause_t *cancel_cause);
switch_status_t rtmp_read_frame(switch_core_session_t *session, switch_frame_t **frame, switch_io_flag_t flags, int stream_id); switch_status_t rtmp_read_frame(switch_core_session_t *session, switch_frame_t **frame, switch_io_flag_t flags, int stream_id);
switch_status_t rtmp_write_frame(switch_core_session_t *session, switch_frame_t *frame, switch_io_flag_t flags, int stream_id); switch_status_t rtmp_write_frame(switch_core_session_t *session, switch_frame_t *frame, switch_io_flag_t flags, int stream_id);
switch_status_t rtmp_kill_channel(switch_core_session_t *session, int sig); switch_status_t rtmp_kill_channel(switch_core_session_t *session, int sig);

View File

@ -1,4 +1,4 @@
/* /*
* mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application * mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2011-2012, Barracuda Networks Inc. * Copyright (C) 2011-2012, Barracuda Networks Inc.
* *
@ -21,7 +21,7 @@
* the Initial Developer. All Rights Reserved. * the Initial Developer. All Rights Reserved.
* *
* Contributor(s): * Contributor(s):
* *
* Mathieu Rene <mrene@avgs.ca> * Mathieu Rene <mrene@avgs.ca>
* Joao Mesquita <jmesquita@freeswitch.org> * Joao Mesquita <jmesquita@freeswitch.org>
* William King <william.king@quentustech.com> * William King <william.king@quentustech.com>
@ -69,13 +69,13 @@ void rtmp_handle_control(rtmp_session_t *rsession, int amfnumber)
char *p = buf; char *p = buf;
int type = state->buf[0] << 8 | state->buf[1]; int type = state->buf[0] << 8 | state->buf[1];
int i; int i;
for (i = 2; i < state->origlen; i++) { for (i = 2; i < state->origlen; i++) {
p += sprintf(p, "%02x ", state->buf[i] & 0xFF); p += sprintf(p, "%02x ", state->buf[i] & 0xFF);
} }
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Control (%d): %s\n", type, buf); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Control (%d): %s\n", type, buf);
switch(type) { switch(type) {
case RTMP_CTRL_STREAM_BEGIN: case RTMP_CTRL_STREAM_BEGIN:
break; break;
@ -93,7 +93,7 @@ void rtmp_handle_control(rtmp_session_t *rsession, int amfnumber)
{ {
uint32_t now = ((switch_micro_time_now()/1000) & 0xFFFFFFFF); uint32_t now = ((switch_micro_time_now()/1000) & 0xFFFFFFFF);
uint32_t sent = state->buf[2] << 24 | state->buf[3] << 16 | state->buf[4] << 8 | state->buf[5]; uint32_t sent = state->buf[2] << 24 | state->buf[3] << 16 | state->buf[4] << 8 | state->buf[5];
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Ping reply: %d ms\n", (int)(now - sent)); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Ping reply: %d ms\n", (int)(now - sent));
} }
break; break;
@ -112,7 +112,7 @@ void rtmp_handle_invoke(rtmp_session_t *rsession, int amfnumber)
int i = 0; int i = 0;
buffer_helper_t helper = { state->buf, 0, state->origlen }; buffer_helper_t helper = { state->buf, 0, state->origlen };
int64_t transaction_id; int64_t transaction_id;
const char *command; const char *command;
int argc = 0; int argc = 0;
amf0_data *argv[100] = { 0 }; amf0_data *argv[100] = { 0 };
rtmp_invoke_function_t function; rtmp_invoke_function_t function;
@ -132,9 +132,9 @@ void rtmp_handle_invoke(rtmp_session_t *rsession, int amfnumber)
} }
amf0_data_free(dump); amf0_data_free(dump);
} }
printf("<<<<< END AMF MSG\n"); printf("<<<<< END AMF MSG\n");
#endif #endif
#ifdef RTMP_DEBUG_IO #ifdef RTMP_DEBUG_IO
{ {
helper.pos = 0; helper.pos = 0;
@ -156,8 +156,8 @@ void rtmp_handle_invoke(rtmp_session_t *rsession, int amfnumber)
fprintf(rsession->io_debug_in, "<<<<< END AMF MSG\n"); fprintf(rsession->io_debug_in, "<<<<< END AMF MSG\n");
fflush(rsession->io_debug_in); fflush(rsession->io_debug_in);
} }
#endif #endif
helper.pos = 0; helper.pos = 0;
while (argc < switch_arraylen(argv) && (argv[argc++] = amf0_data_read(my_buffer_read, &helper))); while (argc < switch_arraylen(argv) && (argv[argc++] = amf0_data_read(my_buffer_read, &helper)));
@ -165,10 +165,10 @@ void rtmp_handle_invoke(rtmp_session_t *rsession, int amfnumber)
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Bogus INVOKE request\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Bogus INVOKE request\n");
return; return;
} }
transaction_id = amf0_get_number(argv[i++]); transaction_id = amf0_get_number(argv[i++]);
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "[amfnumber=%d] Got INVOKE for %s\n", amfnumber, switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "[amfnumber=%d] Got INVOKE for %s\n", amfnumber,
command); command);
if ((function = (rtmp_invoke_function_t)(intptr_t)switch_core_hash_find(rtmp_globals.invoke_hash, command))) { if ((function = (rtmp_invoke_function_t)(intptr_t)switch_core_hash_find(rtmp_globals.invoke_hash, command))) {
@ -192,13 +192,13 @@ switch_status_t rtmp_check_auth(rtmp_session_t *rsession, const char *user, cons
switch_xml_t xml = NULL, x_param, x_params; switch_xml_t xml = NULL, x_param, x_params;
switch_bool_t allow_empty_password = SWITCH_FALSE; switch_bool_t allow_empty_password = SWITCH_FALSE;
const char *passwd = NULL; const char *passwd = NULL;
switch_bool_t disallow_multiple_registration = SWITCH_FALSE; switch_bool_t disallow_multiple_registration = SWITCH_FALSE;
switch_event_t *locate_params; switch_event_t *locate_params;
switch_event_create(&locate_params, SWITCH_EVENT_GENERAL); switch_event_create(&locate_params, SWITCH_EVENT_GENERAL);
switch_assert(locate_params); switch_assert(locate_params);
switch_event_add_header_string(locate_params, SWITCH_STACK_BOTTOM, "source", "mod_rtmp"); switch_event_add_header_string(locate_params, SWITCH_STACK_BOTTOM, "source", "mod_rtmp");
/* Locate user */ /* Locate user */
if (switch_xml_locate_user_merged("id", user, domain, NULL, &xml, locate_params) != SWITCH_STATUS_SUCCESS) { if (switch_xml_locate_user_merged("id", user, domain, NULL, &xml, locate_params) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Authentication failed. No such user %s@%s\n", user, domain); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Authentication failed. No such user %s@%s\n", user, domain);
@ -216,52 +216,52 @@ switch_status_t rtmp_check_auth(rtmp_session_t *rsession, const char *user, cons
if (!strcasecmp(var, "allow-empty-password")) { if (!strcasecmp(var, "allow-empty-password")) {
allow_empty_password = switch_true(val); allow_empty_password = switch_true(val);
} }
if (!strcasecmp(var, "disallow-multiple-registration")) { if (!strcasecmp(var, "disallow-multiple-registration")) {
disallow_multiple_registration = switch_true(val); disallow_multiple_registration = switch_true(val);
} }
} }
} }
if (zstr(passwd)) { if (zstr(passwd)) {
if (allow_empty_password) { if (allow_empty_password) {
status = SWITCH_STATUS_SUCCESS; status = SWITCH_STATUS_SUCCESS;
} else { } else {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Authentication failed for %s@%s: empty password not allowed\n", user, switch_str_nil(domain)); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Authentication failed for %s@%s: empty password not allowed\n", user, switch_str_nil(domain));
} }
goto done; goto done;
} }
auth = switch_core_sprintf(rsession->pool, "%s:%s@%s:%s", rsession->uuid, user, domain, passwd); auth = switch_core_sprintf(rsession->pool, "%s:%s@%s:%s", rsession->uuid, user, domain, passwd);
switch_md5_string(md5, auth, strlen(auth)); switch_md5_string(md5, auth, strlen(auth));
if (!strncmp(md5, authmd5, SWITCH_MD5_DIGEST_STRING_SIZE)) { if (!strncmp(md5, authmd5, SWITCH_MD5_DIGEST_STRING_SIZE)) {
status = SWITCH_STATUS_SUCCESS; status = SWITCH_STATUS_SUCCESS;
} else { } else {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Authentication failed for %s@%s\n", user, domain); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Authentication failed for %s@%s\n", user, domain);
} }
if (disallow_multiple_registration) { if (disallow_multiple_registration) {
switch_hash_index_t *hi; switch_hash_index_t *hi;
switch_thread_rwlock_rdlock(rsession->profile->session_rwlock); switch_thread_rwlock_rdlock(rsession->profile->session_rwlock);
for (hi = switch_core_hash_first(rsession->profile->session_hash); hi; hi = switch_core_hash_next(&hi)) { for (hi = switch_core_hash_first(rsession->profile->session_hash); hi; hi = switch_core_hash_next(&hi)) {
void *val; void *val;
const void *key; const void *key;
switch_ssize_t keylen; switch_ssize_t keylen;
rtmp_session_t *item; rtmp_session_t *item;
switch_core_hash_this(hi, &key, &keylen, &val); switch_core_hash_this(hi, &key, &keylen, &val);
item = (rtmp_session_t *)val; item = (rtmp_session_t *)val;
if (rtmp_session_check_user(item, user, domain) == SWITCH_STATUS_SUCCESS) { if (rtmp_session_check_user(item, user, domain) == SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Logging out %s@%s on RTMP sesssion [%s]\n", user, domain, item->uuid); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Logging out %s@%s on RTMP sesssion [%s]\n", user, domain, item->uuid);
if (rtmp_session_logout(item, user, domain) != SWITCH_STATUS_SUCCESS) { if (rtmp_session_logout(item, user, domain) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Unable to logout %s@%s on RTMP sesssion [%s]\n", user, domain, item->uuid); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Unable to logout %s@%s on RTMP sesssion [%s]\n", user, domain, item->uuid);
} }
} }
} }
switch_thread_rwlock_unlock(rsession->profile->session_rwlock); switch_thread_rwlock_unlock(rsession->profile->session_rwlock);
} }
done: done:
if (xml) { if (xml) {
switch_xml_free(xml); switch_xml_free(xml);
@ -275,7 +275,7 @@ done:
switch_status_t amf_object_to_event(amf0_data *obj, switch_event_t **event) switch_status_t amf_object_to_event(amf0_data *obj, switch_event_t **event)
{ {
switch_status_t status = SWITCH_STATUS_SUCCESS; switch_status_t status = SWITCH_STATUS_SUCCESS;
if (obj && obj->type == AMF0_TYPE_OBJECT) { if (obj && obj->type == AMF0_TYPE_OBJECT) {
amf0_node *node; amf0_node *node;
if (!*event) { if (!*event) {
@ -287,7 +287,7 @@ switch_status_t amf_object_to_event(amf0_data *obj, switch_event_t **event)
for (node = amf0_object_first(obj); node; node = amf0_object_next(node)) { for (node = amf0_object_first(obj); node; node = amf0_object_next(node)) {
const char *name = amf0_get_string(amf0_object_get_name(node)); const char *name = amf0_get_string(amf0_object_get_name(node));
const char *value = amf0_get_string(amf0_object_get_data(node)); const char *value = amf0_get_string(amf0_object_get_data(node));
if (!zstr(name) && !zstr(value)) { if (!zstr(name) && !zstr(value)) {
if (!strcmp(name, "_body")) { if (!strcmp(name, "_body")) {
switch_event_add_body(*event, "%s", value); switch_event_add_body(*event, "%s", value);
@ -299,41 +299,41 @@ switch_status_t amf_object_to_event(amf0_data *obj, switch_event_t **event)
} else { } else {
status = SWITCH_STATUS_FALSE; status = SWITCH_STATUS_FALSE;
} }
return status; return status;
} }
switch_status_t amf_event_to_object(amf0_data **obj, switch_event_t *event) switch_status_t amf_event_to_object(amf0_data **obj, switch_event_t *event)
{ {
switch_event_header_t *hp; switch_event_header_t *hp;
const char *body; const char *body;
switch_assert(event); switch_assert(event);
switch_assert(obj); switch_assert(obj);
if (!*obj) { if (!*obj) {
*obj = amf0_object_new(); *obj = amf0_object_new();
} }
for (hp = event->headers; hp; hp = hp->next) { for (hp = event->headers; hp; hp = hp->next) {
amf0_object_add(*obj, hp->name, amf0_str(hp->value)); amf0_object_add(*obj, hp->name, amf0_str(hp->value));
} }
body = switch_event_get_body(event); body = switch_event_get_body(event);
if (!zstr(body)) { if (!zstr(body)) {
amf0_object_add(*obj, "_body", amf0_str(body)); amf0_object_add(*obj, "_body", amf0_str(body));
} }
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
void rtmp_set_chunksize(rtmp_session_t *rsession, uint32_t chunksize) void rtmp_set_chunksize(rtmp_session_t *rsession, uint32_t chunksize)
{ {
if (rsession->out_chunksize != chunksize) { if (rsession->out_chunksize != chunksize) {
unsigned char buf[] = { unsigned char buf[] = {
INT32(chunksize) INT32(chunksize)
}; };
rtmp_send_message(rsession, 2 /*amfnumber*/, 0, RTMP_TYPE_CHUNKSIZE, 0, buf, sizeof(buf), MSG_FULLHEADER); rtmp_send_message(rsession, 2 /*amfnumber*/, 0, RTMP_TYPE_CHUNKSIZE, 0, buf, sizeof(buf), MSG_FULLHEADER);
rsession->out_chunksize = chunksize; rsession->out_chunksize = chunksize;
} }
@ -343,11 +343,11 @@ void rtmp_get_user_variables(switch_event_t **event, switch_core_session_t *sess
{ {
switch_channel_t *channel = switch_core_session_get_channel(session); switch_channel_t *channel = switch_core_session_get_channel(session);
switch_event_header_t *he; switch_event_header_t *he;
if (!*event && switch_event_create(event, SWITCH_EVENT_CLONE) != SWITCH_STATUS_SUCCESS) { if (!*event && switch_event_create(event, SWITCH_EVENT_CLONE) != SWITCH_STATUS_SUCCESS) {
return; return;
} }
if ((he = switch_channel_variable_first(channel))) { if ((he = switch_channel_variable_first(channel))) {
for (; he; he = he->next) { for (; he; he = he->next) {
if (!strncmp(he->name, RTMP_USER_VARIABLE_PREFIX, strlen(RTMP_USER_VARIABLE_PREFIX))) { if (!strncmp(he->name, RTMP_USER_VARIABLE_PREFIX, strlen(RTMP_USER_VARIABLE_PREFIX))) {
@ -362,7 +362,7 @@ void rtmp_get_user_variables(switch_event_t **event, switch_core_session_t *sess
void rtmp_get_user_variables_event(switch_event_t **event, switch_event_t *var_event) void rtmp_get_user_variables_event(switch_event_t **event, switch_event_t *var_event)
{ {
switch_event_header_t *he; switch_event_header_t *he;
if (!*event && switch_event_create(event, SWITCH_EVENT_CLONE) != SWITCH_STATUS_SUCCESS) { if (!*event && switch_event_create(event, SWITCH_EVENT_CLONE) != SWITCH_STATUS_SUCCESS) {
return; return;
} }
@ -380,7 +380,7 @@ void rtmp_get_user_variables_event(switch_event_t **event, switch_event_t *var_e
void rtmp_session_send_onattach(rtmp_session_t *rsession) void rtmp_session_send_onattach(rtmp_session_t *rsession)
{ {
const char *uuid = ""; const char *uuid = "";
if (rsession->tech_pvt) { if (rsession->tech_pvt) {
uuid = switch_core_session_get_uuid(rsession->tech_pvt->session); uuid = switch_core_session_get_uuid(rsession->tech_pvt->session);
} }
@ -390,7 +390,7 @@ void rtmp_session_send_onattach(rtmp_session_t *rsession)
amf0_number_new(0), amf0_number_new(0),
amf0_null_new(), amf0_null_new(),
amf0_str(uuid), NULL); amf0_str(uuid), NULL);
} }
void rtmp_send_display_update(switch_core_session_t *session) void rtmp_send_display_update(switch_core_session_t *session)
@ -415,18 +415,18 @@ void rtmp_send_incoming_call(switch_core_session_t *session, switch_event_t *var
switch_caller_profile_t *caller_profile = switch_channel_get_caller_profile(channel); switch_caller_profile_t *caller_profile = switch_channel_get_caller_profile(channel);
switch_event_t *event = NULL; switch_event_t *event = NULL;
amf0_data *obj = NULL; amf0_data *obj = NULL;
if (var_event) { if (var_event) {
rtmp_get_user_variables_event(&event, var_event); rtmp_get_user_variables_event(&event, var_event);
} else { } else {
rtmp_get_user_variables(&event, session); rtmp_get_user_variables(&event, session);
} }
if (event) { if (event) {
amf_event_to_object(&obj, event); amf_event_to_object(&obj, event);
switch_event_destroy(&event); switch_event_destroy(&event);
} }
rtmp_send_invoke_free(rsession, 3, 0, 0, rtmp_send_invoke_free(rsession, 3, 0, 0,
amf0_str("incomingCall"), amf0_str("incomingCall"),
amf0_number_new(0), amf0_number_new(0),
@ -443,7 +443,7 @@ void rtmp_send_onhangup(switch_core_session_t *session)
rtmp_private_t *tech_pvt = switch_core_session_get_private(session); rtmp_private_t *tech_pvt = switch_core_session_get_private(session);
rtmp_session_t *rsession = tech_pvt->rtmp_session; rtmp_session_t *rsession = tech_pvt->rtmp_session;
switch_channel_t *channel = switch_core_session_get_channel(session); switch_channel_t *channel = switch_core_session_get_channel(session);
rtmp_send_invoke_free(rsession, 3, 0, 0, rtmp_send_invoke_free(rsession, 3, 0, 0,
amf0_str("onHangup"), amf0_str("onHangup"),
amf0_number_new(0), amf0_number_new(0),
@ -455,18 +455,18 @@ void rtmp_send_onhangup(switch_core_session_t *session)
void rtmp_send_event(rtmp_session_t *rsession, switch_event_t *event) void rtmp_send_event(rtmp_session_t *rsession, switch_event_t *event)
{ {
amf0_data *obj = NULL; amf0_data *obj = NULL;
switch_assert(event != NULL); switch_assert(event != NULL);
switch_assert(rsession != NULL); switch_assert(rsession != NULL);
if (amf_event_to_object(&obj, event) == SWITCH_STATUS_SUCCESS) { if (amf_event_to_object(&obj, event) == SWITCH_STATUS_SUCCESS) {
rtmp_send_invoke_free(rsession, 3, 0, 0, amf0_str("event"), amf0_number_new(0), amf0_null_new(), obj, NULL); rtmp_send_invoke_free(rsession, 3, 0, 0, amf0_str("event"), amf0_number_new(0), amf0_null_new(), obj, NULL);
} }
} }
void rtmp_ping(rtmp_session_t *rsession) void rtmp_ping(rtmp_session_t *rsession)
{ {
uint32_t now = (uint32_t)((switch_micro_time_now() / 1000) & 0xFFFFFFFF); uint32_t now = (uint32_t)((switch_micro_time_now() / 1000) & 0xFFFFFFFF);
unsigned char buf[] = { unsigned char buf[] = {
INT16(RTMP_CTRL_PING_REQUEST), INT16(RTMP_CTRL_PING_REQUEST),
INT32(now) INT32(now)
@ -536,7 +536,7 @@ switch_status_t rtmp_send_invoke_v(rtmp_session_t *rsession, uint8_t amfnumber,
amf0_data *data; amf0_data *data;
unsigned char buf[AMF_MAX_SIZE]; unsigned char buf[AMF_MAX_SIZE];
buffer_helper_t helper = { buf, 0, AMF_MAX_SIZE }; buffer_helper_t helper = { buf, 0, AMF_MAX_SIZE };
while ((data = va_arg(list, amf0_data*))) { while ((data = va_arg(list, amf0_data*))) {
//amf0_data_dump(stdout, data, 0); //amf0_data_dump(stdout, data, 0);
//printf("\n"); //printf("\n");
@ -558,21 +558,21 @@ switch_status_t rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, u
switch_size_t hdrsize = 1; switch_size_t hdrsize = 1;
switch_status_t status = SWITCH_STATUS_SUCCESS; switch_status_t status = SWITCH_STATUS_SUCCESS;
rtmp_state_t *state = &rsession->amfstate_out[amfnumber]; rtmp_state_t *state = &rsession->amfstate_out[amfnumber];
if ((rsession->send_ack + rsession->send_ack_window) < rsession->send && if ((rsession->send_ack + rsession->send_ack_window) < rsession->send &&
(type == RTMP_TYPE_VIDEO || type == RTMP_TYPE_AUDIO)) { (type == RTMP_TYPE_VIDEO || type == RTMP_TYPE_AUDIO)) {
/* We're sending too fast, drop the frame */ /* We're sending too fast, drop the frame */
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
"DROP %s FRAME [amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n", "DROP %s FRAME [amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n",
type == RTMP_TYPE_AUDIO ? "AUDIO" : "VIDEO", amfnumber, type, stream_id, len); type == RTMP_TYPE_AUDIO ? "AUDIO" : "VIDEO", amfnumber, type, stream_id, len);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
if (type != RTMP_TYPE_AUDIO && type != RTMP_TYPE_VIDEO && type != RTMP_TYPE_ACK) { if (type != RTMP_TYPE_AUDIO && type != RTMP_TYPE_VIDEO && type != RTMP_TYPE_ACK) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
"[amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n", amfnumber, type, stream_id, len); "[amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n", amfnumber, type, stream_id, len);
} }
#ifdef RTMP_DEBUG_IO #ifdef RTMP_DEBUG_IO
{ {
fprintf(rsession->io_debug_out, "[amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n", amfnumber, type, stream_id, len); fprintf(rsession->io_debug_out, "[amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n", amfnumber, type, stream_id, len);
@ -595,10 +595,10 @@ switch_status_t rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, u
fprintf(rsession->io_debug_out, "<<<<< END AMF MSG\n"); fprintf(rsession->io_debug_out, "<<<<< END AMF MSG\n");
} }
fflush(rsession->io_debug_out); fflush(rsession->io_debug_out);
} }
#endif #endif
/* Find out what is the smallest header we can use */ /* Find out what is the smallest header we can use */
if (!(flags & MSG_FULLHEADER) && stream_id > 0 && state->stream_id == stream_id && timestamp >= state->ts) { if (!(flags & MSG_FULLHEADER) && stream_id > 0 && state->stream_id == stream_id && timestamp >= state->ts) {
if (state->type == type && state->origlen == (int)len) { if (state->type == type && state->origlen == (int)len) {
@ -630,7 +630,7 @@ switch_status_t rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, u
header[2] = (timestamp >> 8) & 0xFF; header[2] = (timestamp >> 8) & 0xFF;
header[3] = timestamp & 0xFF; header[3] = timestamp & 0xFF;
} }
state->ts = timestamp; state->ts = timestamp;
state->type = type; state->type = type;
state->origlen = len; state->origlen = len;
@ -642,14 +642,14 @@ switch_status_t rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, u
switch_goto_status(SWITCH_STATUS_FALSE, end); switch_goto_status(SWITCH_STATUS_FALSE, end);
} }
rsession->send += hdrsize; rsession->send += hdrsize;
/* Write one chunk of data */ /* Write one chunk of data */
if (rsession->profile->io->write(rsession, (unsigned char*)message, &chunksize) != SWITCH_STATUS_SUCCESS) { if (rsession->profile->io->write(rsession, (unsigned char*)message, &chunksize) != SWITCH_STATUS_SUCCESS) {
switch_goto_status(SWITCH_STATUS_FALSE, end); switch_goto_status(SWITCH_STATUS_FALSE, end);
} }
rsession->send += chunksize; rsession->send += chunksize;
pos += chunksize; pos += chunksize;
/* Send more chunks if we need to */ /* Send more chunks if we need to */
while (((signed)len - (signed)pos) > 0) { while (((signed)len - (signed)pos) > 0) {
switch_mutex_unlock(rsession->socket_mutex); switch_mutex_unlock(rsession->socket_mutex);
@ -660,9 +660,9 @@ switch_status_t rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, u
switch_goto_status(SWITCH_STATUS_FALSE, end); switch_goto_status(SWITCH_STATUS_FALSE, end);
} }
rsession->send += hdrsize; rsession->send += hdrsize;
chunksize = (len - pos) < rsession->out_chunksize ? (len - pos) : rsession->out_chunksize; chunksize = (len - pos) < rsession->out_chunksize ? (len - pos) : rsession->out_chunksize;
if (rsession->profile->io->write(rsession, message + pos, &chunksize) != SWITCH_STATUS_SUCCESS) { if (rsession->profile->io->write(rsession, message + pos, &chunksize) != SWITCH_STATUS_SUCCESS) {
switch_goto_status(SWITCH_STATUS_FALSE, end); switch_goto_status(SWITCH_STATUS_FALSE, end);
} }
@ -682,58 +682,58 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
if (rsession->state == RS_HANDSHAKE) { if (rsession->state == RS_HANDSHAKE) {
s = 1537 - rsession->hspos; s = 1537 - rsession->hspos;
if (rsession->profile->io->read(rsession, rsession->hsbuf + rsession->hspos, &s) != SWITCH_STATUS_SUCCESS) { if (rsession->profile->io->read(rsession, rsession->hsbuf + rsession->hspos, &s) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
rsession->hspos += s; rsession->hspos += s;
/* Receive C0 and C1 */ /* Receive C0 and C1 */
if (rsession->hspos < 1537) { if (rsession->hspos < 1537) {
/* Not quite there yet */ /* Not quite there yet */
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
/* Send reply (S0 + S1) */ /* Send reply (S0 + S1) */
memset(buf, 0, sizeof(buf)); memset(buf, 0, sizeof(buf));
*buf = '\x03'; *buf = '\x03';
s = 1537; s = 1537;
rsession->profile->io->write(rsession, (unsigned char*)buf, &s); rsession->profile->io->write(rsession, (unsigned char*)buf, &s);
/* Send S2 */ /* Send S2 */
s = 1536; s = 1536;
rsession->profile->io->write(rsession, rsession->hsbuf, &s); rsession->profile->io->write(rsession, rsession->hsbuf, &s);
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Sent handshake response\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Sent handshake response\n");
rsession->state++; rsession->state++;
rsession->hspos = 0; rsession->hspos = 0;
} else if (rsession->state == RS_HANDSHAKE2) { } else if (rsession->state == RS_HANDSHAKE2) {
s = 1536 - rsession->hspos; s = 1536 - rsession->hspos;
/* Receive C2 */ /* Receive C2 */
if (rsession->profile->io->read(rsession, rsession->hsbuf + rsession->hspos, &s) != SWITCH_STATUS_SUCCESS) { if (rsession->profile->io->read(rsession, rsession->hsbuf + rsession->hspos, &s) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
rsession->hspos += s; rsession->hspos += s;
if (rsession->hspos < 1536) { if (rsession->hspos < 1536) {
/* Not quite there yet */ /* Not quite there yet */
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
rsession->state++; rsession->state++;
//s = 1536; //s = 1536;
//rsession->profile->io->write(rsession, (char*)buf, &s); //rsession->profile->io->write(rsession, (char*)buf, &s);
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Done with handshake\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Done with handshake\n");
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} else if (rsession->state == RS_ESTABLISHED) { } else if (rsession->state == RS_ESTABLISHED) {
/* Process RTMP packet */ /* Process RTMP packet */
@ -745,9 +745,9 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
rsession->recv += s; rsession->recv += s;
switch(buf[0] >> 6) { switch(buf[0] >> 6) {
case 0: case 0:
rsession->hdrsize = 12; rsession->hdrsize = 12;
@ -770,7 +770,7 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
if (rsession->amfnumber > 64) { if (rsession->amfnumber > 64) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Header size: %d AMF Number: %d\n", rsession->hdrsize, rsession->amfnumber); //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Header size: %d AMF Number: %d\n", rsession->hdrsize, rsession->amfnumber);
rsession->parse_state++; rsession->parse_state++;
if (rsession->hdrsize == 1) { if (rsession->hdrsize == 1) {
@ -779,39 +779,39 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
} }
rsession->parse_remain = 0; rsession->parse_remain = 0;
break; break;
case 1: case 1:
{ {
/* Read full header and decode */ /* Read full header and decode */
rtmp_state_t *state = &rsession->amfstate[rsession->amfnumber]; rtmp_state_t *state = &rsession->amfstate[rsession->amfnumber];
uint8_t *hdr = (uint8_t*)state->header.sz; uint8_t *hdr = (uint8_t*)state->header.sz;
unsigned char *readbuf = (unsigned char*)hdr; unsigned char *readbuf = (unsigned char*)hdr;
if (!rsession->parse_remain) { if (!rsession->parse_remain) {
rsession->parse_remain = s = rsession->hdrsize - 1; rsession->parse_remain = s = rsession->hdrsize - 1;
} else { } else {
s = rsession->parse_remain; s = rsession->parse_remain;
readbuf += (rsession->hdrsize - 1) - s; readbuf += (rsession->hdrsize - 1) - s;
} }
if ( !(s < 12 && s > 0) ) { /** XXX **/ if ( !(s < 12 && s > 0) ) { /** XXX **/
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Protocol error: Invalid header size\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Protocol error: Invalid header size\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if (rsession->profile->io->read(rsession, readbuf, &s) != SWITCH_STATUS_SUCCESS) { if (rsession->profile->io->read(rsession, readbuf, &s) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
rsession->parse_remain -= s; rsession->parse_remain -= s;
if (rsession->parse_remain > 0) { if (rsession->parse_remain > 0) {
/* More data please */ /* More data please */
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
rsession->recv += s; rsession->recv += s;
if (rsession->hdrsize == 12) { if (rsession->hdrsize == 12) {
state->ts = (hdr[0] << 16) | (hdr[1] << 8) | (hdr[2]); state->ts = (hdr[0] << 16) | (hdr[1] << 8) | (hdr[2]);
state->ts_delta = 0; state->ts_delta = 0;
@ -823,7 +823,7 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
/* Type 3: Re-use timestamp delta if we have one */ /* Type 3: Re-use timestamp delta if we have one */
state->ts += state->ts_delta; state->ts += state->ts_delta;
} }
if (rsession->hdrsize >= 8) { if (rsession->hdrsize >= 8) {
/* Reset length counter since its included in the header */ /* Reset length counter since its included in the header */
state->remainlen = state->origlen = (hdr[3] << 16) | (hdr[4] << 8) | (hdr[5]); state->remainlen = state->origlen = (hdr[3] << 16) | (hdr[4] << 8) | (hdr[5]);
@ -833,31 +833,31 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
if (rsession->hdrsize == 12) { if (rsession->hdrsize == 12) {
state->stream_id = (hdr[10] << 24) | (hdr[9] << 16) | (hdr[8] << 8) | hdr[7]; state->stream_id = (hdr[10] << 24) | (hdr[9] << 16) | (hdr[8] << 8) | hdr[7];
} }
if (rsession->hdrsize >= 8 && state->origlen == 0) { if (rsession->hdrsize >= 8 && state->origlen == 0) {
/* Happens we sometimes get a 0 length packet */ /* Happens we sometimes get a 0 length packet */
rsession->parse_state = 0; rsession->parse_state = 0;
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
/* FIXME: Handle extended timestamps */ /* FIXME: Handle extended timestamps */
if (state->ts == 0x00ffffff) { if (state->ts == 0x00ffffff) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
rsession->parse_state++; rsession->parse_state++;
} }
case 2: case 2:
{ {
rtmp_state_t *state = &rsession->amfstate[rsession->amfnumber]; rtmp_state_t *state = &rsession->amfstate[rsession->amfnumber];
if (rsession->parse_remain > 0) { if (rsession->parse_remain > 0) {
s = rsession->parse_remain; s = rsession->parse_remain;
} else { } else {
s = state->remainlen < rsession->in_chunksize ? state->remainlen : rsession->in_chunksize; s = state->remainlen < rsession->in_chunksize ? state->remainlen : rsession->in_chunksize;
rsession->parse_remain = s; rsession->parse_remain = s;
} }
if (!s) { if (!s) {
/* Restart from beginning */ /* Restart from beginning */
s = state->remainlen = state->origlen; s = state->remainlen = state->origlen;
@ -869,31 +869,31 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
} }
/* Sanity check */ /* Sanity check */
if ((state->buf_pos + s) > AMF_MAX_SIZE) { if ((state->buf_pos + s) > AMF_MAX_SIZE) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "WTF %"SWITCH_SIZE_T_FMT" %"SWITCH_SIZE_T_FMT"\n", switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "WTF %"SWITCH_SIZE_T_FMT" %"SWITCH_SIZE_T_FMT"\n",
state->buf_pos, s); state->buf_pos, s);
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error: exceeding max AMF packet size\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error: exceeding max AMF packet size\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if (s > rsession->in_chunksize) { if (s > rsession->in_chunksize) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error: invalid chunksize\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error: invalid chunksize\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if (rsession->profile->io->read(rsession, state->buf + state->buf_pos, &s) != SWITCH_STATUS_SUCCESS) { if (rsession->profile->io->read(rsession, state->buf + state->buf_pos, &s) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
rsession->recv += s; rsession->recv += s;
state->remainlen -= s; state->remainlen -= s;
rsession->parse_remain -= s; rsession->parse_remain -= s;
state->buf_pos += s; state->buf_pos += s;
if (rsession->parse_remain > 0) { if (rsession->parse_remain > 0) {
/* Need more data */ /* Need more data */
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
@ -922,13 +922,13 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
switch_thread_rwlock_wrlock(rsession->rwlock); switch_thread_rwlock_wrlock(rsession->rwlock);
if (rsession->tech_pvt) { if (rsession->tech_pvt) {
uint16_t len = state->origlen; uint16_t len = state->origlen;
if (!rsession->tech_pvt->readbuf) { if (!rsession->tech_pvt->readbuf) {
switch_thread_rwlock_unlock(rsession->rwlock); switch_thread_rwlock_unlock(rsession->rwlock);
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
switch_mutex_lock(rsession->tech_pvt->readbuf_mutex); switch_mutex_lock(rsession->tech_pvt->readbuf_mutex);
if (rsession->tech_pvt->maxlen && switch_buffer_inuse(rsession->tech_pvt->readbuf) > (switch_size_t)(rsession->tech_pvt->maxlen * 40)) { if (rsession->tech_pvt->maxlen && switch_buffer_inuse(rsession->tech_pvt->readbuf) > (switch_size_t)(rsession->tech_pvt->maxlen * 40)) {
rsession->tech_pvt->over_size++; rsession->tech_pvt->over_size++;
@ -936,8 +936,8 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
rsession->tech_pvt->over_size = 0; rsession->tech_pvt->over_size = 0;
} }
if (rsession->tech_pvt->over_size > 10) { if (rsession->tech_pvt->over_size > 10) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
"%s buffer > %u for 10 consecutive packets... Flushing buffer\n", "%s buffer > %u for 10 consecutive packets... Flushing buffer\n",
switch_core_session_get_name(rsession->tech_pvt->session), rsession->tech_pvt->maxlen * 40); switch_core_session_get_name(rsession->tech_pvt->session), rsession->tech_pvt->maxlen * 40);
switch_buffer_zero(rsession->tech_pvt->readbuf); switch_buffer_zero(rsession->tech_pvt->readbuf);
#ifdef RTMP_DEBUG_IO #ifdef RTMP_DEBUG_IO
@ -965,15 +965,15 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
switch_time_t now = switch_micro_time_now(); switch_time_t now = switch_micro_time_now();
uint32_t ack = (state->buf[0] << 24) | (state->buf[1] << 16) | (state->buf[2] << 8) | (state->buf[3]); uint32_t ack = (state->buf[0] << 24) | (state->buf[1] << 16) | (state->buf[2] << 8) | (state->buf[3]);
uint32_t delta = rsession->send_ack_ts == 0 ? 0 : now - rsession->send_ack_ts; uint32_t delta = rsession->send_ack_ts == 0 ? 0 : now - rsession->send_ack_ts;
delta /= 1000000; /* microseconds -> seconds */ delta /= 1000000; /* microseconds -> seconds */
if (delta) { if (delta) {
rsession->send_bw = (ack - rsession->send_ack) / delta; rsession->send_bw = (ack - rsession->send_ack) / delta;
} }
rsession->send_ack = ack; rsession->send_ack = ack;
rsession->send_ack_ts = switch_micro_time_now(); rsession->send_ack_ts = switch_micro_time_now();
break; break;
} }
default: default:
@ -984,7 +984,7 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
} }
rsession->parse_state = 0; rsession->parse_state = 0;
/* Send an ACK if we need to */ /* Send an ACK if we need to */
if (rsession->recv - rsession->recv_ack_sent >= rsession->recv_ack_window) { if (rsession->recv - rsession->recv_ack_sent >= rsession->recv_ack_window) {
unsigned char ackbuf[] = { INT32(rsession->recv) }; unsigned char ackbuf[] = { INT32(rsession->recv) };
@ -992,11 +992,11 @@ switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
rtmp_send_message(rsession, 2/*chunkstream*/, 0/*ts*/, RTMP_TYPE_ACK, 0/*msg stream id */, ackbuf, sizeof(ackbuf), 0 /*flags*/); rtmp_send_message(rsession, 2/*chunkstream*/, 0/*ts*/, RTMP_TYPE_ACK, 0/*msg stream id */, ackbuf, sizeof(ackbuf), 0 /*flags*/);
rsession->recv_ack_sent = rsession->recv; rsession->recv_ack_sent = rsession->recv;
} }
} }
} }
} }
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }

View File

@ -1,4 +1,4 @@
/* /*
* mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application * mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2011-2012, Barracuda Networks Inc. * Copyright (C) 2011-2012, Barracuda Networks Inc.
* *
@ -21,7 +21,7 @@
* the Initial Developer. All Rights Reserved. * the Initial Developer. All Rights Reserved.
* *
* Contributor(s): * Contributor(s):
* *
* Mathieu Rene <mrene@avgs.ca> * Mathieu Rene <mrene@avgs.ca>
* *
* rtmp.c -- RTMP Signalling functions * rtmp.c -- RTMP Signalling functions
@ -35,7 +35,7 @@
#include "io.h" #include "io.h"
#include "types.h" #include "types.h"
/* RTMP_INVOKE_FUNCTION is a macro that expands to: /* RTMP_INVOKE_FUNCTION is a macro that expands to:
switch_status_t function(rtmp_session_t *rsession, rtmp_state_t *state, int amfnumber, int transaction_id, int argc, amf0_data *argv[]) switch_status_t function(rtmp_session_t *rsession, rtmp_state_t *state, int amfnumber, int transaction_id, int argc, amf0_data *argv[])
*/ */
@ -43,22 +43,22 @@ RTMP_INVOKE_FUNCTION(rtmp_i_connect)
{ {
amf0_data *object1 = amf0_object_new(), *object2 = amf0_object_new(), *params = argv[0], *d; amf0_data *object1 = amf0_object_new(), *object2 = amf0_object_new(), *params = argv[0], *d;
const char *s; const char *s;
if ((d = amf0_object_get(params, "app")) && (s = amf0_get_string(d))) { if ((d = amf0_object_get(params, "app")) && (s = amf0_get_string(d))) {
rsession->app = switch_core_strdup(rsession->pool, s); rsession->app = switch_core_strdup(rsession->pool, s);
} }
if ((d = amf0_object_get(params, "flashVer")) && (s = amf0_get_string(d))) { if ((d = amf0_object_get(params, "flashVer")) && (s = amf0_get_string(d))) {
rsession->flashVer = switch_core_strdup(rsession->pool, s); rsession->flashVer = switch_core_strdup(rsession->pool, s);
} }
if ((d = amf0_object_get(params, "swfUrl")) && (s = amf0_get_string(d))) { if ((d = amf0_object_get(params, "swfUrl")) && (s = amf0_get_string(d))) {
rsession->swfUrl = switch_core_strdup(rsession->pool, s); rsession->swfUrl = switch_core_strdup(rsession->pool, s);
} }
if ((d = amf0_object_get(params, "tcUrl")) && (s = amf0_get_string(d))) { if ((d = amf0_object_get(params, "tcUrl")) && (s = amf0_get_string(d))) {
rsession->tcUrl = switch_core_strdup(rsession->pool, s); rsession->tcUrl = switch_core_strdup(rsession->pool, s);
} }
if ((d = amf0_object_get(params, "pageUrl")) && (s = amf0_get_string(d))) { if ((d = amf0_object_get(params, "pageUrl")) && (s = amf0_get_string(d))) {
rsession->pageUrl = switch_core_strdup(rsession->pool, s); rsession->pageUrl = switch_core_strdup(rsession->pool, s);
} }
if ((d = amf0_object_get(params, "capabilities"))) { if ((d = amf0_object_get(params, "capabilities"))) {
@ -82,61 +82,61 @@ RTMP_INVOKE_FUNCTION(rtmp_i_connect)
amf0_object_add(object2, "description", amf0_str("Connection succeeded")); amf0_object_add(object2, "description", amf0_str("Connection succeeded"));
amf0_object_add(object2, "clientId", amf0_number_new(217834719)); amf0_object_add(object2, "clientId", amf0_number_new(217834719));
amf0_object_add(object2, "objectEncoding", amf0_number_new(0)); amf0_object_add(object2, "objectEncoding", amf0_number_new(0));
rtmp_set_chunksize(rsession, rsession->profile->chunksize); rtmp_set_chunksize(rsession, rsession->profile->chunksize);
{ {
unsigned char ackbuf[] = { INT32(RTMP_DEFAULT_ACK_WINDOW) }; unsigned char ackbuf[] = { INT32(RTMP_DEFAULT_ACK_WINDOW) };
rtmp_send_message(rsession, 2, 0, RTMP_TYPE_WINDOW_ACK_SIZE, 0, ackbuf, sizeof(ackbuf), MSG_FULLHEADER); rtmp_send_message(rsession, 2, 0, RTMP_TYPE_WINDOW_ACK_SIZE, 0, ackbuf, sizeof(ackbuf), MSG_FULLHEADER);
} }
{ {
unsigned char ackbuf[] = { INT32(RTMP_DEFAULT_ACK_WINDOW), 0x1 /* Soft limit */}; unsigned char ackbuf[] = { INT32(RTMP_DEFAULT_ACK_WINDOW), 0x1 /* Soft limit */};
rtmp_send_message(rsession, 2, 0, RTMP_TYPE_SET_PEER_BW, 0, ackbuf, sizeof(ackbuf), MSG_FULLHEADER); rtmp_send_message(rsession, 2, 0, RTMP_TYPE_SET_PEER_BW, 0, ackbuf, sizeof(ackbuf), MSG_FULLHEADER);
} }
{ {
unsigned char buf[] = { unsigned char buf[] = {
INT16(RTMP_CTRL_STREAM_BEGIN), INT16(RTMP_CTRL_STREAM_BEGIN),
INT32(0) INT32(0)
}; };
rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0); rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0);
} }
/* respond with a success message */ /* respond with a success message */
rtmp_send_invoke_free(rsession, amfnumber, 0, 0, rtmp_send_invoke_free(rsession, amfnumber, 0, 0,
amf0_str("_result"), amf0_str("_result"),
amf0_number_new(1), amf0_number_new(1),
object1, object1,
object2, object2,
NULL); NULL);
rtmp_send_invoke_free(rsession, 3, 0, 0, rtmp_send_invoke_free(rsession, 3, 0, 0,
amf0_str("connected"), amf0_str("connected"),
amf0_number_new(0), amf0_number_new(0),
amf0_null_new(), amf0_null_new(),
amf0_str(rsession->uuid), NULL); amf0_str(rsession->uuid), NULL);
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Sent connect reply\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Sent connect reply\n");
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
RTMP_INVOKE_FUNCTION(rtmp_i_createStream) RTMP_INVOKE_FUNCTION(rtmp_i_createStream)
{ {
rtmp_send_invoke_free(rsession, amfnumber, 0, 0, rtmp_send_invoke_free(rsession, amfnumber, 0, 0,
amf0_str("_result"), amf0_str("_result"),
amf0_number_new(transaction_id), amf0_number_new(transaction_id),
amf0_null_new(), amf0_null_new(),
amf0_number_new(rsession->next_streamid), amf0_number_new(rsession->next_streamid),
NULL); NULL);
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Replied to createStream (%u)\n", rsession->next_streamid); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Replied to createStream (%u)\n", rsession->next_streamid);
rsession->next_streamid++; rsession->next_streamid++;
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -148,7 +148,7 @@ RTMP_INVOKE_FUNCTION(rtmp_i_noop)
RTMP_INVOKE_FUNCTION(rtmp_i_receiveaudio) RTMP_INVOKE_FUNCTION(rtmp_i_receiveaudio)
{ {
switch_bool_t enabled = argv[1] ? amf0_boolean_get_value(argv[1]) : SWITCH_FALSE; switch_bool_t enabled = argv[1] ? amf0_boolean_get_value(argv[1]) : SWITCH_FALSE;
if (enabled) { if (enabled) {
switch_set_flag(rsession, SFLAG_AUDIO); switch_set_flag(rsession, SFLAG_AUDIO);
@ -156,14 +156,14 @@ RTMP_INVOKE_FUNCTION(rtmp_i_receiveaudio)
switch_clear_flag(rsession, SFLAG_AUDIO); switch_clear_flag(rsession, SFLAG_AUDIO);
} }
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "%sending audio\n", enabled ? "S" : "Not s"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "%sending audio\n", enabled ? "S" : "Not s");
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
RTMP_INVOKE_FUNCTION(rtmp_i_receivevideo) RTMP_INVOKE_FUNCTION(rtmp_i_receivevideo)
{ {
switch_bool_t enabled = argv[1] ? amf0_boolean_get_value(argv[1]) : SWITCH_FALSE; switch_bool_t enabled = argv[1] ? amf0_boolean_get_value(argv[1]) : SWITCH_FALSE;
if (enabled) { if (enabled) {
switch_set_flag(rsession, SFLAG_VIDEO); switch_set_flag(rsession, SFLAG_VIDEO);
@ -174,9 +174,9 @@ RTMP_INVOKE_FUNCTION(rtmp_i_receivevideo)
switch_clear_flag(rsession, SFLAG_VIDEO); switch_clear_flag(rsession, SFLAG_VIDEO);
} }
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "%sending video\n", enabled ? "S" : "Not s"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "%sending video\n", enabled ? "S" : "Not s");
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -184,24 +184,24 @@ RTMP_INVOKE_FUNCTION(rtmp_i_play)
{ {
amf0_data *obj = amf0_object_new(); amf0_data *obj = amf0_object_new();
amf0_data *object = amf0_object_new(); amf0_data *object = amf0_object_new();
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Got play for %s on stream %d\n", switch_str_nil(amf0_get_string(argv[1])), switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Got play for %s on stream %d\n", switch_str_nil(amf0_get_string(argv[1])),
state->stream_id); state->stream_id);
/* Set outgoing chunk size to 1024 bytes */ /* Set outgoing chunk size to 1024 bytes */
rtmp_set_chunksize(rsession, 1024); rtmp_set_chunksize(rsession, 1024);
rsession->media_streamid = state->stream_id; rsession->media_streamid = state->stream_id;
/* Send StreamBegin on the current stream */ /* Send StreamBegin on the current stream */
{ {
unsigned char buf[] = { unsigned char buf[] = {
INT16(RTMP_CTRL_STREAM_BEGIN), INT16(RTMP_CTRL_STREAM_BEGIN),
INT32(rsession->media_streamid) INT32(rsession->media_streamid)
}; };
rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0); rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0);
} }
{ {
unsigned char buf[] = { unsigned char buf[] = {
@ -210,7 +210,7 @@ RTMP_INVOKE_FUNCTION(rtmp_i_play)
INT32(rsession->profile->buffer_len) INT32(rsession->profile->buffer_len)
}; };
rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0); rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0);
} }
/* Send onStatus */ /* Send onStatus */
amf0_object_add(object, "level", amf0_str("status")); amf0_object_add(object, "level", amf0_str("status"));
@ -218,13 +218,13 @@ RTMP_INVOKE_FUNCTION(rtmp_i_play)
amf0_object_add(object, "description", amf0_str("description")); amf0_object_add(object, "description", amf0_str("description"));
amf0_object_add(object, "details", amf0_str("details")); amf0_object_add(object, "details", amf0_str("details"));
amf0_object_add(object, "clientid", amf0_number_new(217834719)); amf0_object_add(object, "clientid", amf0_number_new(217834719));
rtmp_send_invoke_free(rsession, RTMP_DEFAULT_STREAM_NOTIFY, 0, rsession->media_streamid, rtmp_send_invoke_free(rsession, RTMP_DEFAULT_STREAM_NOTIFY, 0, rsession->media_streamid,
amf0_str("onStatus"), amf0_str("onStatus"),
amf0_number_new(1), amf0_number_new(1),
amf0_null_new(), amf0_null_new(),
object, NULL); object, NULL);
object = amf0_object_new(); object = amf0_object_new();
amf0_object_add(object, "level", amf0_str("status")); amf0_object_add(object, "level", amf0_str("status"));
@ -240,51 +240,51 @@ RTMP_INVOKE_FUNCTION(rtmp_i_play)
object, NULL); object, NULL);
amf0_object_add(obj, "code", amf0_str("NetStream.Data.Start")); amf0_object_add(obj, "code", amf0_str("NetStream.Data.Start"));
rtmp_send_notify_free(rsession, RTMP_DEFAULT_STREAM_NOTIFY, 0, rsession->media_streamid, rtmp_send_notify_free(rsession, RTMP_DEFAULT_STREAM_NOTIFY, 0, rsession->media_streamid,
amf0_str("onStatus"), amf0_str("onStatus"),
obj, NULL); obj, NULL);
rtmp_send_notify_free(rsession, RTMP_DEFAULT_STREAM_NOTIFY, 0, rsession->media_streamid, rtmp_send_notify_free(rsession, RTMP_DEFAULT_STREAM_NOTIFY, 0, rsession->media_streamid,
amf0_str("|RtmpSampleAccess"), amf0_str("|RtmpSampleAccess"),
amf0_boolean_new(1), amf0_boolean_new(1),
amf0_boolean_new(1), NULL); amf0_boolean_new(1), NULL);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
RTMP_INVOKE_FUNCTION(rtmp_i_publish) RTMP_INVOKE_FUNCTION(rtmp_i_publish)
{ {
unsigned char buf[] = { unsigned char buf[] = {
INT16(RTMP_CTRL_STREAM_BEGIN), INT16(RTMP_CTRL_STREAM_BEGIN),
INT32(state->stream_id) INT32(state->stream_id)
}; };
rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0); rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0);
rtmp_send_invoke_free(rsession, amfnumber, 0, 0, rtmp_send_invoke_free(rsession, amfnumber, 0, 0,
amf0_str("_result"), amf0_str("_result"),
amf0_number_new(transaction_id), amf0_number_new(transaction_id),
amf0_null_new(), amf0_null_new(),
amf0_null_new(), amf0_null_new(),
NULL); NULL);
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Got publish on stream %u.\n", state->stream_id); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Got publish on stream %u.\n", state->stream_id);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
RTMP_INVOKE_FUNCTION(rtmp_i_makeCall) RTMP_INVOKE_FUNCTION(rtmp_i_makeCall)
{ {
switch_core_session_t *newsession = NULL; switch_core_session_t *newsession = NULL;
char *number = NULL; char *number = NULL;
if ((number = amf0_get_string(argv[1]))) { if ((number = amf0_get_string(argv[1]))) {
switch_event_t *event = NULL; switch_event_t *event = NULL;
char *auth, *user = NULL, *domain = NULL; char *auth, *user = NULL, *domain = NULL;
if ((auth = amf0_get_string(argv[2])) && !zstr(auth)) { if ((auth = amf0_get_string(argv[2])) && !zstr(auth)) {
switch_split_user_domain(auth, &user, &domain); switch_split_user_domain(auth, &user, &domain);
if (rtmp_session_check_user(rsession, user, domain) != SWITCH_STATUS_SUCCESS) { if (rtmp_session_check_user(rsession, user, domain) != SWITCH_STATUS_SUCCESS) {
@ -296,20 +296,20 @@ RTMP_INVOKE_FUNCTION(rtmp_i_makeCall)
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Unauthorized call to %s, client is not logged in\n", number); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Unauthorized call to %s, client is not logged in\n", number);
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if (amf0_is_object(argv[3])) { if (amf0_is_object(argv[3])) {
amf_object_to_event(argv[3], &event); amf_object_to_event(argv[3], &event);
} }
if (rtmp_session_create_call(rsession, &newsession, 0, RTMP_DEFAULT_STREAM_AUDIO, number, user, domain, event) != SWITCH_CAUSE_SUCCESS) { if (rtmp_session_create_call(rsession, &newsession, 0, RTMP_DEFAULT_STREAM_AUDIO, number, user, domain, event) != SWITCH_CAUSE_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Couldn't create call.\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Couldn't create call.\n");
} }
if (event) { if (event) {
switch_event_destroy(&event); switch_event_destroy(&event);
} }
} }
if (newsession) { if (newsession) {
rtmp_private_t *new_pvt = switch_core_session_get_private(newsession); rtmp_private_t *new_pvt = switch_core_session_get_private(newsession);
rtmp_send_invoke_free(rsession, 3, 0, 0, rtmp_send_invoke_free(rsession, 3, 0, 0,
@ -320,10 +320,10 @@ RTMP_INVOKE_FUNCTION(rtmp_i_makeCall)
amf0_str(switch_str_nil(number)), amf0_str(switch_str_nil(number)),
amf0_str(switch_str_nil(new_pvt->auth)), amf0_str(switch_str_nil(new_pvt->auth)),
NULL); NULL);
rtmp_attach_private(rsession, switch_core_session_get_private(newsession)); rtmp_attach_private(rsession, switch_core_session_get_private(newsession));
} }
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -333,19 +333,19 @@ RTMP_INVOKE_FUNCTION(rtmp_i_sendDTMF)
switch_dtmf_t dtmf = { 0 }; switch_dtmf_t dtmf = { 0 };
switch_channel_t *channel; switch_channel_t *channel;
char *digits; char *digits;
if (!rsession->tech_pvt) { if (!rsession->tech_pvt) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
channel = switch_core_session_get_channel(rsession->tech_pvt->session); channel = switch_core_session_get_channel(rsession->tech_pvt->session);
if (amf0_is_number(argv[2])) { if (amf0_is_number(argv[2])) {
dtmf.duration = amf0_get_number(argv[2]); dtmf.duration = amf0_get_number(argv[2]);
} else if (!zstr(amf0_get_string(argv[2]))) { } else if (!zstr(amf0_get_string(argv[2]))) {
dtmf.duration = atoi(amf0_get_string(argv[2])); dtmf.duration = atoi(amf0_get_string(argv[2]));
} }
if ((digits = amf0_get_string(argv[1]))) { if ((digits = amf0_get_string(argv[1]))) {
size_t len = strlen(digits); size_t len = strlen(digits);
size_t j; size_t j;
@ -354,7 +354,7 @@ RTMP_INVOKE_FUNCTION(rtmp_i_sendDTMF)
switch_channel_queue_dtmf(channel, &dtmf); switch_channel_queue_dtmf(channel, &dtmf);
} }
} }
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -362,19 +362,19 @@ RTMP_INVOKE_FUNCTION(rtmp_i_sendDTMF)
RTMP_INVOKE_FUNCTION(rtmp_i_login) RTMP_INVOKE_FUNCTION(rtmp_i_login)
{ {
char *user, *auth, *domain, *ddomain = NULL; char *user, *auth, *domain, *ddomain = NULL;
user = amf0_get_string(argv[1]); user = amf0_get_string(argv[1]);
auth = amf0_get_string(argv[2]); auth = amf0_get_string(argv[2]);
if (zstr(user) || zstr(auth)) { if (zstr(user) || zstr(auth)) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if ((domain = strchr(user, '@'))) { if ((domain = strchr(user, '@'))) {
*domain++ = '\0'; *domain++ = '\0';
} }
if (zstr(domain)) { if (zstr(domain)) {
ddomain = switch_core_get_domain(SWITCH_TRUE); ddomain = switch_core_get_domain(SWITCH_TRUE);
domain = ddomain; domain = ddomain;
@ -382,7 +382,7 @@ RTMP_INVOKE_FUNCTION(rtmp_i_login)
if (rtmp_check_auth(rsession, user, domain, auth) == SWITCH_STATUS_SUCCESS) { if (rtmp_check_auth(rsession, user, domain, auth) == SWITCH_STATUS_SUCCESS) {
rtmp_session_login(rsession, user, domain); rtmp_session_login(rsession, user, domain);
} else { } else {
rtmp_send_invoke_free(rsession, 3, 0, 0, rtmp_send_invoke_free(rsession, 3, 0, 0,
amf0_str("onLogin"), amf0_str("onLogin"),
@ -392,7 +392,7 @@ RTMP_INVOKE_FUNCTION(rtmp_i_login)
amf0_null_new(), amf0_null_new(),
amf0_null_new(), NULL); amf0_null_new(), NULL);
} }
switch_safe_free(ddomain); switch_safe_free(ddomain);
@ -403,16 +403,16 @@ RTMP_INVOKE_FUNCTION(rtmp_i_logout)
{ {
char *auth = amf0_get_string(argv[1]); char *auth = amf0_get_string(argv[1]);
char *user = NULL, *domain = NULL; char *user = NULL, *domain = NULL;
/* Unregister from that user */ /* Unregister from that user */
rtmp_clear_registration(rsession, auth, NULL); rtmp_clear_registration(rsession, auth, NULL);
switch_split_user_domain(auth, &user, &domain); switch_split_user_domain(auth, &user, &domain);
if (!zstr(user) && !zstr(domain)) { if (!zstr(user) && !zstr(domain)) {
rtmp_session_logout(rsession, user, domain); rtmp_session_logout(rsession, user, domain);
} }
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -422,11 +422,11 @@ RTMP_INVOKE_FUNCTION(rtmp_i_register)
const char *user = NULL, *domain = NULL; const char *user = NULL, *domain = NULL;
char *dup = NULL; char *dup = NULL;
switch_status_t status; switch_status_t status;
if (!rsession->account) { if (!rsession->account) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if (!zstr(auth)) { if (!zstr(auth)) {
dup = strdup(auth); dup = strdup(auth);
switch_split_user_domain(dup, (char**)&user, (char**)&domain); switch_split_user_domain(dup, (char**)&user, (char**)&domain);
@ -435,23 +435,23 @@ RTMP_INVOKE_FUNCTION(rtmp_i_register)
user = rsession->account->user; user = rsession->account->user;
domain = rsession->account->domain; domain = rsession->account->domain;
} }
if (rtmp_session_check_user(rsession, user, domain) == SWITCH_STATUS_SUCCESS) { if (rtmp_session_check_user(rsession, user, domain) == SWITCH_STATUS_SUCCESS) {
rtmp_add_registration(rsession, auth, amf0_get_string(argv[2])); rtmp_add_registration(rsession, auth, amf0_get_string(argv[2]));
status = SWITCH_STATUS_SUCCESS; status = SWITCH_STATUS_SUCCESS;
} else { } else {
status = SWITCH_STATUS_FALSE; status = SWITCH_STATUS_FALSE;
} }
switch_safe_free(dup); switch_safe_free(dup);
return status; return status;
} }
RTMP_INVOKE_FUNCTION(rtmp_i_unregister) RTMP_INVOKE_FUNCTION(rtmp_i_unregister)
{ {
rtmp_clear_registration(rsession, amf0_get_string(argv[1]), amf0_get_string(argv[2])); rtmp_clear_registration(rsession, amf0_get_string(argv[1]), amf0_get_string(argv[2]));
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -459,7 +459,7 @@ RTMP_INVOKE_FUNCTION(rtmp_i_answer)
{ {
switch_channel_t *channel = NULL; switch_channel_t *channel = NULL;
char *uuid = amf0_get_string(argv[1]); char *uuid = amf0_get_string(argv[1]);
if (!zstr(uuid)) { if (!zstr(uuid)) {
rtmp_private_t *new_tech_pvt = rtmp_locate_private(rsession, uuid); rtmp_private_t *new_tech_pvt = rtmp_locate_private(rsession, uuid);
if (new_tech_pvt) { if (new_tech_pvt) {
@ -468,16 +468,16 @@ RTMP_INVOKE_FUNCTION(rtmp_i_answer)
} }
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if (!rsession->tech_pvt) { if (!rsession->tech_pvt) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
/* No UUID specified but we're attached to a channel, mark it as answered */ /* No UUID specified but we're attached to a channel, mark it as answered */
channel = switch_core_session_get_channel(rsession->tech_pvt->session); channel = switch_core_session_get_channel(rsession->tech_pvt->session);
switch_channel_mark_answered(channel); switch_channel_mark_answered(channel);
rtmp_attach_private(rsession, rsession->tech_pvt); rtmp_attach_private(rsession, rsession->tech_pvt);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -485,13 +485,13 @@ RTMP_INVOKE_FUNCTION(rtmp_i_attach)
{ {
rtmp_private_t *tech_pvt = NULL; rtmp_private_t *tech_pvt = NULL;
char *uuid = amf0_get_string(argv[1]); char *uuid = amf0_get_string(argv[1]);
if (!zstr(uuid)) { if (!zstr(uuid)) {
tech_pvt = rtmp_locate_private(rsession, uuid); tech_pvt = rtmp_locate_private(rsession, uuid);
} }
/* Will detach if an empty (or invalid) uuid is received */ /* Will detach if an empty (or invalid) uuid is received */
rtmp_attach_private(rsession, tech_pvt); rtmp_attach_private(rsession, tech_pvt);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -506,25 +506,25 @@ RTMP_INVOKE_FUNCTION(rtmp_i_hangup)
if (!zstr(uuid)) { if (!zstr(uuid)) {
rtmp_private_t *tech_pvt = rtmp_locate_private(rsession, uuid); rtmp_private_t *tech_pvt = rtmp_locate_private(rsession, uuid);
if (tech_pvt) { if (tech_pvt) {
channel = switch_core_session_get_channel(tech_pvt->session); channel = switch_core_session_get_channel(tech_pvt->session);
} }
} }
if (!channel) { if (!channel) {
if (!rsession->tech_pvt) { if (!rsession->tech_pvt) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
channel = switch_core_session_get_channel(rsession->tech_pvt->session); channel = switch_core_session_get_channel(rsession->tech_pvt->session);
} }
if (amf0_is_number(argv[2])) { if (amf0_is_number(argv[2])) {
cause = amf0_get_number(argv[2]); cause = amf0_get_number(argv[2]);
} else if ((scause = amf0_get_string(argv[2])) && !zstr(scause)) { } else if ((scause = amf0_get_string(argv[2])) && !zstr(scause)) {
cause = switch_channel_str2cause(scause); cause = switch_channel_str2cause(scause);
} }
switch_channel_hangup(channel, cause); switch_channel_hangup(channel, cause);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -533,21 +533,21 @@ RTMP_INVOKE_FUNCTION(rtmp_i_transfer)
char *uuid = amf0_get_string(argv[1]); char *uuid = amf0_get_string(argv[1]);
char *dest = amf0_get_string(argv[2]); char *dest = amf0_get_string(argv[2]);
rtmp_private_t *tech_pvt; rtmp_private_t *tech_pvt;
if (zstr(uuid) || zstr(dest)) { if (zstr(uuid) || zstr(dest)) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if ((tech_pvt = rtmp_locate_private(rsession, uuid))) { if ((tech_pvt = rtmp_locate_private(rsession, uuid))) {
const char *other_uuid = switch_channel_get_partner_uuid(tech_pvt->channel); const char *other_uuid = switch_channel_get_partner_uuid(tech_pvt->channel);
switch_core_session_t *session; switch_core_session_t *session;
if (!zstr(other_uuid) && (session = switch_core_session_locate(other_uuid))) { if (!zstr(other_uuid) && (session = switch_core_session_locate(other_uuid))) {
switch_ivr_session_transfer(session, dest, NULL, NULL); switch_ivr_session_transfer(session, dest, NULL, NULL);
switch_core_session_rwunlock(session); switch_core_session_rwunlock(session);
} }
} }
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -556,22 +556,22 @@ RTMP_INVOKE_FUNCTION(rtmp_i_join)
char *uuid[] = { amf0_get_string(argv[1]), amf0_get_string(argv[2]) }; char *uuid[] = { amf0_get_string(argv[1]), amf0_get_string(argv[2]) };
const char *other_uuid[2]; const char *other_uuid[2];
rtmp_private_t *tech_pvt[2]; rtmp_private_t *tech_pvt[2];
if (zstr(uuid[0]) || zstr(uuid[1])) { if (zstr(uuid[0]) || zstr(uuid[1])) {
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
if (!(tech_pvt[0] = rtmp_locate_private(rsession, uuid[0])) || if (!(tech_pvt[0] = rtmp_locate_private(rsession, uuid[0])) ||
!(tech_pvt[1] = rtmp_locate_private(rsession, uuid[1]))) { !(tech_pvt[1] = rtmp_locate_private(rsession, uuid[1]))) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if (tech_pvt[0] == tech_pvt[1]) { if (tech_pvt[0] == tech_pvt[1]) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if ((other_uuid[0] = switch_channel_get_partner_uuid(tech_pvt[0]->channel)) && if ((other_uuid[0] = switch_channel_get_partner_uuid(tech_pvt[0]->channel)) &&
(other_uuid[1] = switch_channel_get_partner_uuid(tech_pvt[1]->channel))) { (other_uuid[1] = switch_channel_get_partner_uuid(tech_pvt[1]->channel))) {
#ifndef RTMP_DONT_HOLD #ifndef RTMP_DONT_HOLD
if (switch_test_flag(tech_pvt[0], TFLAG_DETACHED)) { if (switch_test_flag(tech_pvt[0], TFLAG_DETACHED)) {
@ -581,10 +581,10 @@ RTMP_INVOKE_FUNCTION(rtmp_i_join)
switch_ivr_unhold(tech_pvt[1]->session); switch_ivr_unhold(tech_pvt[1]->session);
} }
#endif #endif
switch_ivr_uuid_bridge(other_uuid[0], other_uuid[1]); switch_ivr_uuid_bridge(other_uuid[0], other_uuid[1]);
} }
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -602,9 +602,9 @@ RTMP_INVOKE_FUNCTION(rtmp_i_join)
- setup a state handler in other[1] to detect when it hangs up - setup a state handler in other[1] to detect when it hangs up
Check list: Check list:
tech_pvt[0] or other[0] hangs up tech_pvt[0] or other[0] hangs up
If we were attached to the call, switch the active call to tech_pvt[1] If we were attached to the call, switch the active call to tech_pvt[1]
tech_pvt[1] or other[1] hangs up tech_pvt[1] or other[1] hangs up
Clear up any 3-way indications on the tech_pvt[0] Clear up any 3-way indications on the tech_pvt[0]
*/ */
@ -612,7 +612,7 @@ tech_pvt[1] or other[1] hangs up
static switch_status_t three_way_on_soft_execute(switch_core_session_t *session); static switch_status_t three_way_on_soft_execute(switch_core_session_t *session);
#if 0 #if 0
static switch_status_t three_way_on_hangup(switch_core_session_t *session); static switch_status_t three_way_on_hangup(switch_core_session_t *session);
#endif #endif
static const switch_state_handler_table_t three_way_state_handlers_remote = { static const switch_state_handler_table_t three_way_state_handlers_remote = {
/*.on_init */ NULL, /*.on_init */ NULL,
@ -634,26 +634,26 @@ static switch_status_t three_way_on_soft_execute(switch_core_session_t *other_se
switch_core_session_t *my_session; switch_core_session_t *my_session;
switch_channel_t *my_channel; switch_channel_t *my_channel;
rtmp_private_t *tech_pvt; rtmp_private_t *tech_pvt;
if (zstr(uuid) || zstr(my_uuid)) { if (zstr(uuid) || zstr(my_uuid)) {
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
if (zstr(my_uuid) || !(my_session = switch_core_session_locate(my_uuid))) { if (zstr(my_uuid) || !(my_session = switch_core_session_locate(my_uuid))) {
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
if (!switch_core_session_check_interface(my_session, rtmp_globals.rtmp_endpoint_interface)) { if (!switch_core_session_check_interface(my_session, rtmp_globals.rtmp_endpoint_interface)) {
/* In case someone tempers with my variables, since we get tech_pvt from there */ /* In case someone tempers with my variables, since we get tech_pvt from there */
switch_core_session_rwunlock(my_session); switch_core_session_rwunlock(my_session);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
my_channel = switch_core_session_get_channel(my_session); my_channel = switch_core_session_get_channel(my_session);
tech_pvt = switch_core_session_get_private(my_session); tech_pvt = switch_core_session_get_private(my_session);
switch_ivr_eavesdrop_session(other_session, uuid, NULL, ED_MUX_READ | ED_MUX_WRITE); switch_ivr_eavesdrop_session(other_session, uuid, NULL, ED_MUX_READ | ED_MUX_WRITE);
/* 3-way call ended, whatever the reason /* 3-way call ended, whatever the reason
* We need to go back to our original state. */ * We need to go back to our original state. */
if (!switch_channel_up(other_channel)) { if (!switch_channel_up(other_channel)) {
@ -677,30 +677,30 @@ static switch_status_t three_way_on_soft_execute(switch_core_session_t *other_se
} }
} else { } else {
switch_channel_hangup(my_channel, SWITCH_CAUSE_NORMAL_CLEARING); switch_channel_hangup(my_channel, SWITCH_CAUSE_NORMAL_CLEARING);
} }
} }
} else if (switch_channel_ready(other_channel)) { } else if (switch_channel_ready(other_channel)) {
/* channel[1] didn't hangup, must be channel[0] then, rebridge this one with its original partner */ /* channel[1] didn't hangup, must be channel[0] then, rebridge this one with its original partner */
switch_ivr_uuid_bridge(switch_core_session_get_uuid(other_session), my_uuid); switch_ivr_uuid_bridge(switch_core_session_get_uuid(other_session), my_uuid);
} else { } else {
/* channel[1] being taken out of our control, take the other leg out of CS_HIBERNATE if its ready, or else leave it alone */ /* channel[1] being taken out of our control, take the other leg out of CS_HIBERNATE if its ready, or else leave it alone */
if (switch_channel_ready(my_channel)) { if (switch_channel_ready(my_channel)) {
switch_channel_set_state(my_channel, CS_EXECUTE); switch_channel_set_state(my_channel, CS_EXECUTE);
} }
} }
switch_channel_clear_state_handler(other_channel, &three_way_state_handlers_remote); switch_channel_clear_state_handler(other_channel, &three_way_state_handlers_remote);
switch_channel_set_variable(other_channel, SWITCH_SOFT_HOLDING_UUID_VARIABLE, NULL); switch_channel_set_variable(other_channel, SWITCH_SOFT_HOLDING_UUID_VARIABLE, NULL);
switch_channel_set_variable(my_channel, SWITCH_SOFT_HOLDING_UUID_VARIABLE, NULL); switch_channel_set_variable(my_channel, SWITCH_SOFT_HOLDING_UUID_VARIABLE, NULL);
switch_channel_set_variable(other_channel, RTMP_THREE_WAY_UUID_VARIABLE, NULL); switch_channel_set_variable(other_channel, RTMP_THREE_WAY_UUID_VARIABLE, NULL);
switch_clear_flag(tech_pvt, TFLAG_THREE_WAY); switch_clear_flag(tech_pvt, TFLAG_THREE_WAY);
if (my_session) { if (my_session) {
switch_core_session_rwunlock(my_session); switch_core_session_rwunlock(my_session);
} }
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -712,39 +712,39 @@ RTMP_INVOKE_FUNCTION(rtmp_i_three_way)
const char *other_uuid[2]; const char *other_uuid[2];
switch_core_session_t *other_session[2] = { 0 }; switch_core_session_t *other_session[2] = { 0 };
switch_channel_t *other_channel[2] = { 0 }; switch_channel_t *other_channel[2] = { 0 };
if (zstr(uuid[0]) || zstr(uuid[1]) || if (zstr(uuid[0]) || zstr(uuid[1]) ||
!(tech_pvt[0] = rtmp_locate_private(rsession, uuid[0])) || !(tech_pvt[0] = rtmp_locate_private(rsession, uuid[0])) ||
!(tech_pvt[1] = rtmp_locate_private(rsession, uuid[1]))) { !(tech_pvt[1] = rtmp_locate_private(rsession, uuid[1]))) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
/* Make sure we don't 3-way with the same call, and that it doesnt turn into a 4-way, we aren't that permissive */ /* Make sure we don't 3-way with the same call, and that it doesnt turn into a 4-way, we aren't that permissive */
if (tech_pvt[0] == tech_pvt[1] || switch_test_flag(tech_pvt[0], TFLAG_THREE_WAY) || if (tech_pvt[0] == tech_pvt[1] || switch_test_flag(tech_pvt[0], TFLAG_THREE_WAY) ||
switch_test_flag(tech_pvt[1], TFLAG_THREE_WAY)) { switch_test_flag(tech_pvt[1], TFLAG_THREE_WAY)) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if (!(other_uuid[0] = switch_channel_get_partner_uuid(tech_pvt[0]->channel)) || if (!(other_uuid[0] = switch_channel_get_partner_uuid(tech_pvt[0]->channel)) ||
!(other_uuid[1] = switch_channel_get_partner_uuid(tech_pvt[1]->channel))) { !(other_uuid[1] = switch_channel_get_partner_uuid(tech_pvt[1]->channel))) {
return SWITCH_STATUS_FALSE; /* Both calls aren't bridged */ return SWITCH_STATUS_FALSE; /* Both calls aren't bridged */
} }
if (!(other_session[0] = switch_core_session_locate(other_uuid[0])) || if (!(other_session[0] = switch_core_session_locate(other_uuid[0])) ||
!(other_session[1] = switch_core_session_locate(other_uuid[1]))) { !(other_session[1] = switch_core_session_locate(other_uuid[1]))) {
goto done; goto done;
} }
other_channel[0] = switch_core_session_get_channel(other_session[0]); other_channel[0] = switch_core_session_get_channel(other_session[0]);
other_channel[1] = switch_core_session_get_channel(other_session[1]); other_channel[1] = switch_core_session_get_channel(other_session[1]);
/* Save which uuid is the 3-way target */ /* Save which uuid is the 3-way target */
switch_channel_set_variable(other_channel[1], RTMP_THREE_WAY_UUID_VARIABLE, uuid[0]); switch_channel_set_variable(other_channel[1], RTMP_THREE_WAY_UUID_VARIABLE, uuid[0]);
switch_channel_set_variable(tech_pvt[1]->channel, RTMP_THREE_WAY_UUID_VARIABLE, uuid[0]); switch_channel_set_variable(tech_pvt[1]->channel, RTMP_THREE_WAY_UUID_VARIABLE, uuid[0]);
/* Attach redirect */ /* Attach redirect */
switch_set_flag(tech_pvt[1], TFLAG_THREE_WAY); switch_set_flag(tech_pvt[1], TFLAG_THREE_WAY);
/* Set soft_holding_uuid to the uuid of the other matching channel, so they can can be bridged back when the 3-way is over */ /* Set soft_holding_uuid to the uuid of the other matching channel, so they can can be bridged back when the 3-way is over */
switch_channel_set_variable(tech_pvt[1]->channel, SWITCH_SOFT_HOLDING_UUID_VARIABLE, other_uuid[1]); switch_channel_set_variable(tech_pvt[1]->channel, SWITCH_SOFT_HOLDING_UUID_VARIABLE, other_uuid[1]);
switch_channel_set_variable(other_channel[1], SWITCH_SOFT_HOLDING_UUID_VARIABLE, uuid[1]); switch_channel_set_variable(other_channel[1], SWITCH_SOFT_HOLDING_UUID_VARIABLE, uuid[1]);
@ -762,11 +762,11 @@ done:
if (other_session[0]) { if (other_session[0]) {
switch_core_session_rwunlock(other_session[0]); switch_core_session_rwunlock(other_session[0]);
} }
if (other_session[1]) { if (other_session[1]) {
switch_core_session_rwunlock(other_session[1]); switch_core_session_rwunlock(other_session[1]);
} }
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -776,7 +776,7 @@ RTMP_INVOKE_FUNCTION(rtmp_i_sendevent)
switch_event_t *event = NULL; switch_event_t *event = NULL;
switch_status_t status = SWITCH_STATUS_SUCCESS; switch_status_t status = SWITCH_STATUS_SUCCESS;
const char *uuid = NULL; const char *uuid = NULL;
if (argv[1] && argv[1]->type == AMF0_TYPE_OBJECT) { if (argv[1] && argv[1]->type == AMF0_TYPE_OBJECT) {
obj = argv[1]; obj = argv[1];
} else if (argv[2] && argv[2]->type == AMF0_TYPE_OBJECT) { } else if (argv[2] && argv[2]->type == AMF0_TYPE_OBJECT) {
@ -786,14 +786,14 @@ RTMP_INVOKE_FUNCTION(rtmp_i_sendevent)
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Bad argument for sendevent"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Bad argument for sendevent");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if (switch_event_create_subclass(&event, zstr(uuid) ? SWITCH_EVENT_CUSTOM : SWITCH_EVENT_MESSAGE, if (switch_event_create_subclass(&event, zstr(uuid) ? SWITCH_EVENT_CUSTOM : SWITCH_EVENT_MESSAGE,
zstr(uuid) ? RTMP_EVENT_CLIENTCUSTOM : NULL) != SWITCH_STATUS_SUCCESS) { zstr(uuid) ? RTMP_EVENT_CLIENTCUSTOM : NULL) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Couldn't create event\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Couldn't create event\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
rtmp_event_fill(rsession, event); rtmp_event_fill(rsession, event);
/* Build event using amf array */ /* Build event using amf array */
@ -801,7 +801,7 @@ RTMP_INVOKE_FUNCTION(rtmp_i_sendevent)
switch_event_destroy(&event); switch_event_destroy(&event);
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if (!zstr(uuid)) { if (!zstr(uuid)) {
rtmp_private_t *session_pvt = rtmp_locate_private(rsession, uuid); rtmp_private_t *session_pvt = rtmp_locate_private(rsession, uuid);
if (session_pvt) { if (session_pvt) {
@ -814,18 +814,18 @@ RTMP_INVOKE_FUNCTION(rtmp_i_sendevent)
} }
} }
} }
switch_event_fire(&event); switch_event_fire(&event);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
RTMP_INVOKE_FUNCTION(rtmp_i_log) RTMP_INVOKE_FUNCTION(rtmp_i_log)
{ {
const char *data = amf0_get_string(argv[1]); const char *data = amf0_get_string(argv[1]);
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Log: %s\n", data); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Log: %s\n", data);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }

View File

@ -1,4 +1,4 @@
/* /*
* mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application * mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2011-2012, Barracuda Networks Inc. * Copyright (C) 2011-2012, Barracuda Networks Inc.
* *
@ -21,7 +21,7 @@
* the Initial Developer. All Rights Reserved. * the Initial Developer. All Rights Reserved.
* *
* Contributor(s): * Contributor(s):
* *
* Mathieu Rene <mrene@avgs.ca> * Mathieu Rene <mrene@avgs.ca>
* William King <william.king@quentustech.com> * William King <william.king@quentustech.com>
* *
@ -59,21 +59,21 @@ static void rtmp_tcp_alter_pollfd(rtmp_session_t *rsession, switch_bool_t pollou
{ {
rtmp_tcp_io_private_t *io_pvt = rsession->io_private; rtmp_tcp_io_private_t *io_pvt = rsession->io_private;
rtmp_io_tcp_t *io = (rtmp_io_tcp_t*)rsession->profile->io; rtmp_io_tcp_t *io = (rtmp_io_tcp_t*)rsession->profile->io;
if (pollout && (io_pvt->pollfd->reqevents & SWITCH_POLLOUT)) { if (pollout && (io_pvt->pollfd->reqevents & SWITCH_POLLOUT)) {
return; return;
} else if (!pollout && !(io_pvt->pollfd->reqevents & SWITCH_POLLOUT)) { } else if (!pollout && !(io_pvt->pollfd->reqevents & SWITCH_POLLOUT)) {
return; return;
} }
switch_pollset_remove(io->pollset, io_pvt->pollfd); switch_pollset_remove(io->pollset, io_pvt->pollfd);
io_pvt->pollfd->reqevents = SWITCH_POLLIN | SWITCH_POLLERR; io_pvt->pollfd->reqevents = SWITCH_POLLIN | SWITCH_POLLERR;
if (pollout) { if (pollout) {
io_pvt->pollfd->reqevents |= SWITCH_POLLOUT; io_pvt->pollfd->reqevents |= SWITCH_POLLOUT;
} }
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Pollout: %s\n", switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Pollout: %s\n",
pollout ? "true" : "false"); pollout ? "true" : "false");
switch_pollset_add(io->pollset, io_pvt->pollfd); switch_pollset_add(io->pollset, io_pvt->pollfd);
} }
@ -84,21 +84,21 @@ static switch_status_t rtmp_tcp_read(rtmp_session_t *rsession, unsigned char *bu
switch_status_t status = SWITCH_STATUS_SUCCESS; switch_status_t status = SWITCH_STATUS_SUCCESS;
#ifdef RTMP_DEBUG_IO #ifdef RTMP_DEBUG_IO
switch_size_t olen = *len; switch_size_t olen = *len;
#endif #endif
switch_assert(*len > 0 && *len < 1024000); switch_assert(*len > 0 && *len < 1024000);
do { do {
status = switch_socket_recv(io_pvt->socket, (char*)buf, len); status = switch_socket_recv(io_pvt->socket, (char*)buf, len);
} while(status != SWITCH_STATUS_SUCCESS && SWITCH_STATUS_IS_BREAK(status)); } while(status != SWITCH_STATUS_SUCCESS && SWITCH_STATUS_IS_BREAK(status));
#ifdef RTMP_DEBUG_IO #ifdef RTMP_DEBUG_IO
{ {
int i; int i;
fprintf(rsession->io_debug_in, "recv %p max=%"SWITCH_SIZE_T_FMT" got=%"SWITCH_SIZE_T_FMT"\n< ", (void*)buf, olen, *len); fprintf(rsession->io_debug_in, "recv %p max=%"SWITCH_SIZE_T_FMT" got=%"SWITCH_SIZE_T_FMT"\n< ", (void*)buf, olen, *len);
for (i = 0; i < *len; i++) { for (i = 0; i < *len; i++) {
fprintf(rsession->io_debug_in, "%02X ", (uint8_t)buf[i]); fprintf(rsession->io_debug_in, "%02X ", (uint8_t)buf[i]);
if (i != 0 && i % 32 == 0) { if (i != 0 && i % 32 == 0) {
@ -118,8 +118,8 @@ static switch_status_t rtmp_tcp_write(rtmp_session_t *rsession, const unsigned c
//rtmp_io_tcp_t *io = (rtmp_io_tcp_t*)rsession->profile->io; //rtmp_io_tcp_t *io = (rtmp_io_tcp_t*)rsession->profile->io;
rtmp_tcp_io_private_t *io_pvt = rsession->io_private; rtmp_tcp_io_private_t *io_pvt = rsession->io_private;
switch_status_t status; switch_status_t status;
switch_size_t orig_len = *len; switch_size_t orig_len = *len;
#ifdef RTMP_DEBUG_IO #ifdef RTMP_DEBUG_IO
{ {
int i; int i;
@ -128,48 +128,48 @@ static switch_status_t rtmp_tcp_write(rtmp_session_t *rsession, const unsigned c
for (i = 0; i < *len; i++) { for (i = 0; i < *len; i++) {
fprintf(rsession->io_debug_out, "%02X ", (uint8_t)buf[i]); fprintf(rsession->io_debug_out, "%02X ", (uint8_t)buf[i]);
if (i != 0 && i % 32 == 0) { if (i != 0 && i % 32 == 0) {
fprintf(rsession->io_debug_out, "\n> "); fprintf(rsession->io_debug_out, "\n> ");
} }
} }
fprintf(rsession->io_debug_out, "\n\n "); fprintf(rsession->io_debug_out, "\n\n ");
fflush(rsession->io_debug_out); fflush(rsession->io_debug_out);
} }
#endif #endif
if (io_pvt->sendq && switch_buffer_inuse(io_pvt->sendq) > 0) { if (io_pvt->sendq && switch_buffer_inuse(io_pvt->sendq) > 0) {
/* We already have queued data, append it to the sendq */ /* We already have queued data, append it to the sendq */
switch_buffer_write(io_pvt->sendq, buf, *len); switch_buffer_write(io_pvt->sendq, buf, *len);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
status = switch_socket_send_nonblock(io_pvt->socket, (char*)buf, len); status = switch_socket_send_nonblock(io_pvt->socket, (char*)buf, len);
if (*len > 0 && *len < orig_len) { if (*len > 0 && *len < orig_len) {
if (rsession->state >= RS_DESTROY) { if (rsession->state >= RS_DESTROY) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
/* We didnt send it all... add it to the sendq*/ /* We didnt send it all... add it to the sendq*/
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "%"SWITCH_SIZE_T_FMT" bytes added to sendq.\n", (orig_len - *len)); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "%"SWITCH_SIZE_T_FMT" bytes added to sendq.\n", (orig_len - *len));
switch_buffer_write(io_pvt->sendq, (buf + *len), orig_len - *len); switch_buffer_write(io_pvt->sendq, (buf + *len), orig_len - *len);
/* Make sure we poll-write */ /* Make sure we poll-write */
rtmp_tcp_alter_pollfd(rsession, SWITCH_TRUE); rtmp_tcp_alter_pollfd(rsession, SWITCH_TRUE);
} }
return status; return status;
} }
static switch_status_t rtmp_tcp_close(rtmp_session_t *rsession) static switch_status_t rtmp_tcp_close(rtmp_session_t *rsession)
{ {
rtmp_io_tcp_t *io = (rtmp_io_tcp_t*)rsession->profile->io; rtmp_io_tcp_t *io = (rtmp_io_tcp_t*)rsession->profile->io;
rtmp_tcp_io_private_t *io_pvt = rsession->io_private; rtmp_tcp_io_private_t *io_pvt = rsession->io_private;
if (io_pvt->socket) { if (io_pvt->socket) {
switch_mutex_lock(io->mutex); switch_mutex_lock(io->mutex);
switch_pollset_remove(io->pollset, io_pvt->pollfd); switch_pollset_remove(io->pollset, io_pvt->pollfd);
@ -182,7 +182,7 @@ static switch_status_t rtmp_tcp_close(rtmp_session_t *rsession)
if ( io_pvt->sendq ) { if ( io_pvt->sendq ) {
switch_buffer_destroy(&(io_pvt->sendq)); switch_buffer_destroy(&(io_pvt->sendq));
} }
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -191,40 +191,40 @@ void *SWITCH_THREAD_FUNC rtmp_io_tcp_thread(switch_thread_t *thread, void *obj)
{ {
rtmp_io_tcp_t *io = (rtmp_io_tcp_t*)obj; rtmp_io_tcp_t *io = (rtmp_io_tcp_t*)obj;
io->base.running = 1; io->base.running = 1;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s: I/O Thread starting\n", io->base.profile->name); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s: I/O Thread starting\n", io->base.profile->name);
while(io->base.running) { while(io->base.running) {
const switch_pollfd_t *fds; const switch_pollfd_t *fds;
int32_t numfds; int32_t numfds;
int32_t i; int32_t i;
switch_status_t status; switch_status_t status;
switch_mutex_lock(io->mutex); switch_mutex_lock(io->mutex);
status = switch_pollset_poll(io->pollset, 500000, &numfds, &fds); status = switch_pollset_poll(io->pollset, 500000, &numfds, &fds);
switch_mutex_unlock(io->mutex); switch_mutex_unlock(io->mutex);
if (status != SWITCH_STATUS_SUCCESS && status != SWITCH_STATUS_TIMEOUT) { if (status != SWITCH_STATUS_SUCCESS && status != SWITCH_STATUS_TIMEOUT) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "pollset_poll failed\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "pollset_poll failed\n");
continue; continue;
} else if (status == SWITCH_STATUS_TIMEOUT) { } else if (status == SWITCH_STATUS_TIMEOUT) {
switch_cond_next(); switch_cond_next();
} }
for (i = 0; i < numfds; i++) { for (i = 0; i < numfds; i++) {
if (!fds[i].client_data) { if (!fds[i].client_data) {
switch_socket_t *newsocket; switch_socket_t *newsocket;
if (switch_socket_accept(&newsocket, io->listen_socket, io->base.pool) != SWITCH_STATUS_SUCCESS) { if (switch_socket_accept(&newsocket, io->listen_socket, io->base.pool) != SWITCH_STATUS_SUCCESS) {
if (io->base.running) { if (io->base.running) {
/* Don't spam the logs if we are shutting down */ /* Don't spam the logs if we are shutting down */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error [%s]\n", strerror(errno)); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error [%s]\n", strerror(errno));
} else { } else {
return NULL; return NULL;
} }
} else { } else {
rtmp_session_t *rsession; rtmp_session_t *rsession;
if (switch_socket_opt_set(newsocket, SWITCH_SO_NONBLOCK, TRUE)) { if (switch_socket_opt_set(newsocket, SWITCH_SO_NONBLOCK, TRUE)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n");
} }
@ -232,14 +232,14 @@ void *SWITCH_THREAD_FUNC rtmp_io_tcp_thread(switch_thread_t *thread, void *obj)
if (switch_socket_opt_set(newsocket, SWITCH_SO_TCP_NODELAY, 1)) { if (switch_socket_opt_set(newsocket, SWITCH_SO_TCP_NODELAY, 1)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't disable Nagle.\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't disable Nagle.\n");
} }
if (rtmp_session_request(io->base.profile, &rsession) != SWITCH_STATUS_SUCCESS) { if (rtmp_session_request(io->base.profile, &rsession) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "RTMP session request failed\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "RTMP session request failed\n");
switch_socket_close(newsocket); switch_socket_close(newsocket);
} else { } else {
switch_sockaddr_t *addr = NULL; switch_sockaddr_t *addr = NULL;
char ipbuf[200]; char ipbuf[200];
/* Create out private data and attach it to the rtmp session structure */ /* Create out private data and attach it to the rtmp session structure */
rtmp_tcp_io_private_t *pvt = switch_core_alloc(rsession->pool, sizeof(*pvt)); rtmp_tcp_io_private_t *pvt = switch_core_alloc(rsession->pool, sizeof(*pvt));
rsession->io_private = pvt; rsession->io_private = pvt;
@ -247,7 +247,7 @@ void *SWITCH_THREAD_FUNC rtmp_io_tcp_thread(switch_thread_t *thread, void *obj)
switch_socket_create_pollfd(&pvt->pollfd, newsocket, SWITCH_POLLIN | SWITCH_POLLERR, rsession, rsession->pool); switch_socket_create_pollfd(&pvt->pollfd, newsocket, SWITCH_POLLIN | SWITCH_POLLERR, rsession, rsession->pool);
switch_pollset_add(io->pollset, pvt->pollfd); switch_pollset_add(io->pollset, pvt->pollfd);
switch_buffer_create_dynamic(&pvt->sendq, 512, 1024, 0); switch_buffer_create_dynamic(&pvt->sendq, 512, 1024, 0);
/* Get the remote address/port info */ /* Get the remote address/port info */
switch_socket_addr_get(&addr, SWITCH_TRUE, newsocket); switch_socket_addr_get(&addr, SWITCH_TRUE, newsocket);
switch_get_addr(ipbuf, sizeof(ipbuf), addr); switch_get_addr(ipbuf, sizeof(ipbuf), addr);
@ -260,7 +260,7 @@ void *SWITCH_THREAD_FUNC rtmp_io_tcp_thread(switch_thread_t *thread, void *obj)
} else { } else {
rtmp_session_t *rsession = (rtmp_session_t*)fds[i].client_data; rtmp_session_t *rsession = (rtmp_session_t*)fds[i].client_data;
rtmp_tcp_io_private_t *io_pvt = (rtmp_tcp_io_private_t*)rsession->io_private; rtmp_tcp_io_private_t *io_pvt = (rtmp_tcp_io_private_t*)rsession->io_private;
if (fds[i].rtnevents & SWITCH_POLLOUT && switch_buffer_inuse(io_pvt->sendq) > 0) { if (fds[i].rtnevents & SWITCH_POLLOUT && switch_buffer_inuse(io_pvt->sendq) > 0) {
/* Send as much remaining data as possible */ /* Send as much remaining data as possible */
switch_size_t sendlen; switch_size_t sendlen;
@ -274,25 +274,25 @@ void *SWITCH_THREAD_FUNC rtmp_io_tcp_thread(switch_thread_t *thread, void *obj)
} }
} else if (fds[i].rtnevents & SWITCH_POLLIN && rtmp_handle_data(rsession) != SWITCH_STATUS_SUCCESS) { } else if (fds[i].rtnevents & SWITCH_POLLIN && rtmp_handle_data(rsession) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Closing socket\n"); switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Closing socket\n");
switch_mutex_lock(io->mutex); switch_mutex_lock(io->mutex);
switch_pollset_remove(io->pollset, io_pvt->pollfd); switch_pollset_remove(io->pollset, io_pvt->pollfd);
switch_mutex_unlock(io->mutex); switch_mutex_unlock(io->mutex);
switch_socket_close(io_pvt->socket); switch_socket_close(io_pvt->socket);
io_pvt->socket = NULL; io_pvt->socket = NULL;
io->base.close(rsession); io->base.close(rsession);
rtmp_session_destroy(&rsession); rtmp_session_destroy(&rsession);
} }
} }
} }
} }
io->base.running = -1; io->base.running = -1;
switch_socket_close(io->listen_socket); switch_socket_close(io->listen_socket);
return NULL; return NULL;
} }
@ -302,11 +302,11 @@ switch_status_t rtmp_tcp_init(rtmp_profile_t *profile, const char *bindaddr, rtm
switch_sockaddr_t *sa; switch_sockaddr_t *sa;
switch_threadattr_t *thd_attr = NULL; switch_threadattr_t *thd_attr = NULL;
rtmp_io_tcp_t *io_tcp; rtmp_io_tcp_t *io_tcp;
io_tcp = (rtmp_io_tcp_t*)switch_core_alloc(pool, sizeof(rtmp_io_tcp_t)); io_tcp = (rtmp_io_tcp_t*)switch_core_alloc(pool, sizeof(rtmp_io_tcp_t));
io_tcp->base.pool = pool; io_tcp->base.pool = pool;
io_tcp->ip = switch_core_strdup(pool, bindaddr); io_tcp->ip = switch_core_strdup(pool, bindaddr);
*new_io = (rtmp_io_t*)io_tcp; *new_io = (rtmp_io_t*)io_tcp;
io_tcp->base.profile = profile; io_tcp->base.profile = profile;
io_tcp->base.read = rtmp_tcp_read; io_tcp->base.read = rtmp_tcp_read;
@ -314,14 +314,14 @@ switch_status_t rtmp_tcp_init(rtmp_profile_t *profile, const char *bindaddr, rtm
io_tcp->base.close = rtmp_tcp_close; io_tcp->base.close = rtmp_tcp_close;
io_tcp->base.name = "tcp"; io_tcp->base.name = "tcp";
io_tcp->base.address = switch_core_strdup(pool, io_tcp->ip); io_tcp->base.address = switch_core_strdup(pool, io_tcp->ip);
if ((szport = strchr(io_tcp->ip, ':'))) { if ((szport = strchr(io_tcp->ip, ':'))) {
*szport++ = '\0'; *szport++ = '\0';
io_tcp->port = atoi(szport); io_tcp->port = atoi(szport);
} else { } else {
io_tcp->port = RTMP_DEFAULT_PORT; io_tcp->port = RTMP_DEFAULT_PORT;
} }
if (switch_sockaddr_info_get(&sa, io_tcp->ip, SWITCH_INET, io_tcp->port, 0, pool)) { if (switch_sockaddr_info_get(&sa, io_tcp->ip, SWITCH_INET, io_tcp->port, 0, pool)) {
goto fail; goto fail;
} }
@ -345,27 +345,27 @@ switch_status_t rtmp_tcp_init(rtmp_profile_t *profile, const char *bindaddr, rtm
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Listening on %s:%u (tcp)\n", io_tcp->ip, io_tcp->port); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Listening on %s:%u (tcp)\n", io_tcp->ip, io_tcp->port);
io_tcp->base.running = 1; io_tcp->base.running = 1;
if (switch_pollset_create(&io_tcp->pollset, 1000 /* max poll fds */, pool, 0) != SWITCH_STATUS_SUCCESS) { if (switch_pollset_create(&io_tcp->pollset, 1000 /* max poll fds */, pool, 0) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "pollset_create failed\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "pollset_create failed\n");
goto fail; goto fail;
} }
switch_socket_create_pollfd(&(io_tcp->listen_pollfd), io_tcp->listen_socket, SWITCH_POLLIN | SWITCH_POLLERR, NULL, pool); switch_socket_create_pollfd(&(io_tcp->listen_pollfd), io_tcp->listen_socket, SWITCH_POLLIN | SWITCH_POLLERR, NULL, pool);
if (switch_pollset_add(io_tcp->pollset, io_tcp->listen_pollfd) != SWITCH_STATUS_SUCCESS) { if (switch_pollset_add(io_tcp->pollset, io_tcp->listen_pollfd) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "pollset_add failed\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "pollset_add failed\n");
goto fail; goto fail;
} }
switch_mutex_init(&io_tcp->mutex, SWITCH_MUTEX_NESTED, pool); switch_mutex_init(&io_tcp->mutex, SWITCH_MUTEX_NESTED, pool);
switch_threadattr_create(&thd_attr, pool); switch_threadattr_create(&thd_attr, pool);
switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&io_tcp->thread, thd_attr, rtmp_io_tcp_thread, *new_io, pool); switch_thread_create(&io_tcp->thread, thd_attr, rtmp_io_tcp_thread, *new_io, pool);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
fail: fail:
if (io_tcp->listen_socket) { if (io_tcp->listen_socket) {