fix some contention in rtmp
This commit is contained in:
parent
a6e636c2d5
commit
4da26f57ff
|
@ -40,7 +40,8 @@
|
|||
|
||||
SWITCH_MODULE_LOAD_FUNCTION(mod_rtmp_load);
|
||||
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rtmp_shutdown);
|
||||
SWITCH_MODULE_DEFINITION(mod_rtmp, mod_rtmp_load, mod_rtmp_shutdown, NULL);
|
||||
SWITCH_MODULE_RUNTIME_FUNCTION(mod_rtmp_runtime);
|
||||
SWITCH_MODULE_DEFINITION(mod_rtmp, mod_rtmp_load, mod_rtmp_shutdown, mod_rtmp_runtime);
|
||||
|
||||
static switch_status_t config_profile(rtmp_profile_t *profile, switch_bool_t reload);
|
||||
static switch_xml_config_item_t *get_instructions(rtmp_profile_t *profile);
|
||||
|
@ -739,7 +740,7 @@ rtmp_session_t *rtmp_session_locate(const char *uuid)
|
|||
{
|
||||
rtmp_session_t *rsession = switch_core_hash_find_rdlock(rtmp_globals.session_hash, uuid, rtmp_globals.session_rwlock);
|
||||
|
||||
if (!rsession || rsession->state == RS_DESTROY) {
|
||||
if (!rsession || rsession->state >= RS_DESTROY) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -819,54 +820,96 @@ switch_status_t rtmp_session_request(rtmp_profile_t *profile, rtmp_session_t **n
|
|||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
static void rtmp_garbage_colletor(void)
|
||||
{
|
||||
switch_hash_index_t *hi;
|
||||
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP Garbage Collection\n");
|
||||
|
||||
|
||||
switch_thread_rwlock_wrlock(rtmp_globals.session_rwlock);
|
||||
|
||||
top:
|
||||
|
||||
for (hi = switch_hash_first(NULL, rtmp_globals.session_hash); hi; hi = switch_hash_next(hi)) {
|
||||
void *val;
|
||||
const void *key;
|
||||
switch_ssize_t keylen;
|
||||
rtmp_session_t *rsession;
|
||||
|
||||
switch_hash_this(hi, &key, &keylen, &val);
|
||||
rsession = (rtmp_session_t *) val;
|
||||
|
||||
if (rsession->state == RS_DESTROY) {
|
||||
if (rtmp_real_session_destroy(&rsession) == SWITCH_STATUS_SUCCESS) {
|
||||
goto top;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch_thread_rwlock_unlock(rtmp_globals.session_rwlock);
|
||||
}
|
||||
|
||||
switch_status_t rtmp_session_destroy(rtmp_session_t **rsession)
|
||||
{
|
||||
switch_status_t status = SWITCH_STATUS_FALSE;
|
||||
|
||||
switch_mutex_lock(rtmp_globals.mutex);
|
||||
if (rsession && *rsession) {
|
||||
(*rsession)->state = RS_DESTROY;
|
||||
*rsession = NULL;
|
||||
status = SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
switch_mutex_unlock(rtmp_globals.mutex);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
switch_status_t rtmp_real_session_destroy(rtmp_session_t **rsession)
|
||||
{
|
||||
switch_hash_index_t *hi;
|
||||
switch_event_t *event;
|
||||
|
||||
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_DISCONNECT) == SWITCH_STATUS_SUCCESS) {
|
||||
rtmp_event_fill(*rsession, event);
|
||||
switch_event_fire(&event);
|
||||
}
|
||||
|
||||
switch_core_hash_delete_wrlock(rtmp_globals.session_hash, (*rsession)->uuid, rtmp_globals.session_rwlock);
|
||||
switch_core_hash_delete_wrlock((*rsession)->profile->session_hash, (*rsession)->uuid, (*rsession)->profile->session_rwlock);
|
||||
rtmp_clear_registration(*rsession, NULL, NULL);
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "RTMP session ended [%s]\n", (*rsession)->uuid);
|
||||
|
||||
(*rsession)->state = RS_DESTROY;
|
||||
int sess = 0;
|
||||
|
||||
switch_thread_rwlock_rdlock((*rsession)->session_rwlock);
|
||||
for (hi = switch_hash_first(NULL, (*rsession)->session_hash); hi; hi = switch_hash_next(hi)) {
|
||||
void *val;
|
||||
const void *key;
|
||||
switch_ssize_t keylen;
|
||||
rtmp_private_t *tech_pvt;
|
||||
switch_channel_t *channel;
|
||||
switch_core_session_t *session;
|
||||
|
||||
switch_hash_this(hi, &key, &keylen, &val);
|
||||
tech_pvt = (rtmp_private_t *)val;
|
||||
|
||||
/* At this point we don't know if the session still exists, so request a fresh pointer to it from the core. */
|
||||
if ( (session = switch_core_session_locate((char *)key)) != NULL ) {
|
||||
/*
|
||||
* This is here so that if the FS session still exists and has the FS session write(or read) lock, then we won't destroy the rsession
|
||||
* until the FS session is finished with it. But if the rsession is able to get the FS session
|
||||
* write lock, before the FS session is hungup, then once the FS session does get the write lock
|
||||
* the rsession pointer will be null, and the FS session will never try and touch the already destroyed rsession.
|
||||
*/
|
||||
|
||||
/* If there are any sessions attached, abort the destroy operation */
|
||||
if ((session = switch_core_session_locate((char *)key)) != NULL ) {
|
||||
channel = switch_core_session_get_channel(session);
|
||||
tech_pvt = switch_core_session_get_private(session);
|
||||
if ( tech_pvt && tech_pvt->rtmp_session ) {
|
||||
tech_pvt->rtmp_session = NULL;
|
||||
}
|
||||
switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
|
||||
switch_core_session_rwunlock(session);
|
||||
sess++;
|
||||
}
|
||||
}
|
||||
switch_thread_rwlock_unlock((*rsession)->session_rwlock);
|
||||
|
||||
if (sess) {
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP session [%s] %p still busy.\n", (*rsession)->uuid, (void *) *rsession);
|
||||
return SWITCH_STATUS_FALSE;
|
||||
}
|
||||
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP session [%s] %p will be destroyed.\n", (*rsession)->uuid, (void *) *rsession);
|
||||
|
||||
|
||||
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_DISCONNECT) == SWITCH_STATUS_SUCCESS) {
|
||||
rtmp_event_fill(*rsession, event);
|
||||
switch_event_fire(&event);
|
||||
}
|
||||
|
||||
switch_core_hash_delete(rtmp_globals.session_hash, (*rsession)->uuid);
|
||||
switch_core_hash_delete_wrlock((*rsession)->profile->session_hash, (*rsession)->uuid, (*rsession)->profile->session_rwlock);
|
||||
rtmp_clear_registration(*rsession, NULL, NULL);
|
||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "RTMP session ended [%s]\n", (*rsession)->uuid);
|
||||
|
||||
|
||||
switch_mutex_lock((*rsession)->profile->mutex);
|
||||
if ( (*rsession)->profile->calls < 1 ) {
|
||||
(*rsession)->profile->calls = 0;
|
||||
|
@ -1144,7 +1187,7 @@ static void rtmp_clear_reg_auth(rtmp_session_t *rsession, const char *auth, cons
|
|||
switch_thread_rwlock_wrlock(rsession->profile->reg_rwlock);
|
||||
if ((reg = switch_core_hash_find(rsession->profile->reg_hash, auth))) {
|
||||
for (; reg; reg = reg->next) {
|
||||
if (!strcmp(reg->uuid, rsession->uuid) && (zstr(nickname) || !strcmp(reg->nickname, nickname))) {
|
||||
if (!zstr(reg->uuid) && !strcmp(reg->uuid, rsession->uuid) && (zstr(nickname) || !strcmp(reg->nickname, nickname))) {
|
||||
switch_event_t *event;
|
||||
if (prev) {
|
||||
prev->next = reg->next;
|
||||
|
@ -1502,6 +1545,20 @@ done:
|
|||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
static const char *state2name(int state)
|
||||
{
|
||||
switch(state) {
|
||||
case RS_HANDSHAKE:
|
||||
return "HANDSHAKE";
|
||||
case RS_HANDSHAKE2:
|
||||
return "HANDSHAKE2";
|
||||
case RS_ESTABLISHED:
|
||||
return "ESTABLISHED";
|
||||
default:
|
||||
return "DESTROY (PENDING)";
|
||||
}
|
||||
}
|
||||
|
||||
#define RTMP_FUNCTION_SYNTAX "profile [profilename] [start | stop | rescan | restart]\nstatus profile [profilename]\nstatus profile [profilename] [reg | sessions]\nsession [session_id] [kill | login [user@domain] | logout [user@domain]]"
|
||||
SWITCH_STANDARD_API(rtmp_function)
|
||||
{
|
||||
|
@ -1578,7 +1635,7 @@ SWITCH_STANDARD_API(rtmp_function)
|
|||
{
|
||||
switch_hash_index_t *hi;
|
||||
stream->write_function(stream, "\nSessions:\n");
|
||||
stream->write_function(stream, "uuid,address,user,domain,flashVer\n");
|
||||
stream->write_function(stream, "uuid,address,user,domain,flashVer,state\n");
|
||||
switch_thread_rwlock_rdlock(profile->session_rwlock);
|
||||
for (hi = switch_hash_first(NULL, profile->session_hash); hi; hi = switch_hash_next(hi)) {
|
||||
void *val;
|
||||
|
@ -1588,11 +1645,11 @@ SWITCH_STANDARD_API(rtmp_function)
|
|||
switch_hash_this(hi, &key, &keylen, &val);
|
||||
|
||||
item = (rtmp_session_t *)val;
|
||||
stream->write_function(stream, "%s,%s:%d,%s,%s,%s\n",
|
||||
item->uuid, item->remote_address, item->remote_port,
|
||||
item->account ? item->account->user : NULL,
|
||||
item->account ? item->account->domain : NULL,
|
||||
item->flashVer);
|
||||
stream->write_function(stream, "%s,%s:%d,%s,%s,%s,%s\n",
|
||||
item->uuid, item->remote_address, item->remote_port,
|
||||
item->account ? item->account->user : NULL,
|
||||
item->account ? item->account->domain : NULL,
|
||||
item->flashVer, state2name(item->state));
|
||||
|
||||
}
|
||||
switch_thread_rwlock_unlock(profile->session_rwlock);
|
||||
|
@ -1870,6 +1927,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rtmp_load)
|
|||
}
|
||||
}
|
||||
|
||||
rtmp_globals.running = 1;
|
||||
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1899,9 +1958,22 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rtmp_shutdown)
|
|||
switch_core_hash_destroy(&rtmp_globals.session_hash);
|
||||
switch_core_hash_destroy(&rtmp_globals.invoke_hash);
|
||||
|
||||
rtmp_globals.running = 0;
|
||||
|
||||
return SWITCH_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
SWITCH_MODULE_RUNTIME_FUNCTION(mod_rtmp_runtime)
|
||||
{
|
||||
|
||||
while(rtmp_globals.running) {
|
||||
rtmp_garbage_colletor();
|
||||
switch_yield(10000000);
|
||||
}
|
||||
|
||||
return SWITCH_STATUS_TERM;
|
||||
}
|
||||
|
||||
/* For Emacs:
|
||||
* Local Variables:
|
||||
* mode:c
|
||||
|
|
|
@ -336,6 +336,7 @@ struct mod_rtmp_globals {
|
|||
switch_hash_t *session_hash;
|
||||
switch_thread_rwlock_t *session_rwlock;
|
||||
switch_hash_t *invoke_hash;
|
||||
int running;
|
||||
};
|
||||
|
||||
extern struct mod_rtmp_globals rtmp_globals;
|
||||
|
@ -605,6 +606,7 @@ void rtmp_profile_release(rtmp_profile_t *profile);
|
|||
switch_status_t rtmp_tcp_init(rtmp_profile_t *profile, const char *bindaddr, rtmp_io_t **new_io, switch_memory_pool_t *pool);
|
||||
switch_status_t rtmp_session_request(rtmp_profile_t *profile, rtmp_session_t **newsession);
|
||||
switch_status_t rtmp_session_destroy(rtmp_session_t **session);
|
||||
switch_status_t rtmp_real_session_destroy(rtmp_session_t **session);
|
||||
|
||||
/**** Protocol ****/
|
||||
void rtmp_set_chunksize(rtmp_session_t *rsession, uint32_t chunksize);
|
||||
|
|
Loading…
Reference in New Issue