conference recording

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@3139 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2006-10-21 21:19:40 +00:00
parent 1f29ce2749
commit b2f9fae6d1
2 changed files with 290 additions and 72 deletions

View File

@ -73,7 +73,8 @@ typedef enum {
MFLAG_CAN_SPEAK = (1 << 1),
MFLAG_CAN_HEAR = (1 << 2),
MFLAG_KICKED = (1 << 3),
MFLAG_ITHREAD = (1 << 4)
MFLAG_ITHREAD = (1 << 4),
MFLAG_NOCHANNEL = (1 << 5)
} member_flag_t;
@ -170,6 +171,7 @@ struct conference_member {
switch_mutex_t *audio_out_mutex;
switch_codec_t read_codec;
switch_codec_t write_codec;
char *rec_path;
uint8_t *frame;
uint8_t *mux_frame;
uint32_t buflen;
@ -186,6 +188,15 @@ struct conference_member {
struct conference_member *next;
};
/* Record Node */
struct conference_record {
conference_obj_t *conference;
char *path;
switch_memory_pool_t *pool;
};
typedef struct conference_record conference_record_t;
/* Function Prototypes */
static uint32_t next_member_id(void);
static conference_relationship_t *member_get_relationship(conference_member_t *member, conference_member_t *other_member);
@ -220,6 +231,7 @@ static uint32_t conference_member_stop_file(conference_member_t *member, file_st
static conference_obj_t *conference_new(char *name, switch_xml_t profile, switch_memory_pool_t *pool);
static void switch_change_sln_volume(int16_t *data, uint32_t samples, int32_t vol);
static switch_status_t chat_send(char *proto, char *from, char *to, char *subject, char *body, char *hint);
static void launch_conference_record_thread(conference_obj_t *conference, char *path);
/* Return a Distinct ID # */
static uint32_t next_member_id(void)
@ -294,6 +306,11 @@ static conference_member_t *conference_member_get(conference_obj_t *conference,
conference_member_t *member = NULL;
for(member = conference->members; member; member = member->next) {
if (switch_test_flag(member, MFLAG_NOCHANNEL)) {
continue;
}
if (member->id == id) {
break;
}
@ -302,6 +319,17 @@ static conference_member_t *conference_member_get(conference_obj_t *conference,
return member;
}
static void conference_record_stop(conference_obj_t *conference, char *path)
{
conference_member_t *member = NULL;
for(member = conference->members; member; member = member->next) {
if (switch_test_flag(member, MFLAG_NOCHANNEL) && (!path || !strcmp(path, member->rec_path))) {
switch_clear_flag_locked(member, MFLAG_RUNNING);
}
}
}
/* Add a custom relationship to a member */
static conference_relationship_t *member_add_relationship(conference_member_t *member, uint32_t id)
{
@ -359,39 +387,41 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe
member->next = conference->members;
member->energy_level = conference->energy_level;
conference->members = member;
conference->count++;
if (switch_event_create(&event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "proto", CONF_CHAT_PROTO);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s", conference->name);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s@%s", conference->name, conference->domain);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Active (%d caller%s)", conference->count, conference->count == 1 ? "" : "s");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_type", "presence");
switch_event_fire(&event);
}
if (conference->enter_sound) {
conference_play_file(conference, conference->enter_sound, CONF_DEFAULT_LEADIN);
}
if (!switch_test_flag(member, MFLAG_NOCHANNEL)) {
conference->count++;
if (switch_event_create(&event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "proto", CONF_CHAT_PROTO);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s", conference->name);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s@%s", conference->name, conference->domain);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Active (%d caller%s)", conference->count, conference->count == 1 ? "" : "s");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_type", "presence");
switch_event_fire(&event);
}
if (conference->count == 1 && conference->alone_sound) {
conference_play_file(conference, conference->alone_sound, 0);
}
if (conference->min && conference->count >= conference->min) {
switch_set_flag(conference, CFLAG_ENFORCE_MIN);
}
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, CONF_EVENT_MAINT) == SWITCH_STATUS_SUCCESS) {
switch_channel_t *channel = switch_core_session_get_channel(member->session);
switch_channel_event_set_data(channel, event);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Conference-Name", conference->name);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Action", "add-member");
switch_event_fire(&event);
}
if (conference->enter_sound) {
conference_play_file(conference, conference->enter_sound, CONF_DEFAULT_LEADIN);
}
if (conference->count == 1 && conference->alone_sound) {
conference_play_file(conference, conference->alone_sound, 0);
}
if (conference->min && conference->count >= conference->min) {
switch_set_flag(conference, CFLAG_ENFORCE_MIN);
}
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, CONF_EVENT_MAINT) == SWITCH_STATUS_SUCCESS) {
switch_channel_t *channel = switch_core_session_get_channel(member->session);
switch_channel_event_set_data(channel, event);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Conference-Name", conference->name);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Action", "add-member");
switch_event_fire(&event);
}
}
switch_mutex_unlock(member->flag_mutex);
switch_mutex_unlock(member->audio_out_mutex);
switch_mutex_unlock(member->audio_in_mutex);
@ -425,39 +455,41 @@ static void conference_del_member(conference_obj_t *conference, conference_membe
last = imember;
}
conference->count--;
member->conference = NULL;
if (switch_event_create(&event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "proto", CONF_CHAT_PROTO);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s", conference->name);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s@%s", conference->name, conference->domain);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Active (%d caller%s)", conference->count, conference->count == 1 ? "" : "s");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_type", "presence");
switch_event_fire(&event);
}
if ((conference->min && switch_test_flag(conference, CFLAG_ENFORCE_MIN) && conference->count < conference->min)
|| (switch_test_flag(conference, CFLAG_DYNAMIC) && conference->count == 0) ) {
switch_set_flag(conference, CFLAG_DESTRUCT);
} else {
if (conference->exit_sound) {
conference_play_file(conference, conference->exit_sound, 0);
if (!switch_test_flag(member, MFLAG_NOCHANNEL)) {
conference->count--;
if (switch_event_create(&event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "proto", CONF_CHAT_PROTO);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s", conference->name);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s@%s", conference->name, conference->domain);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Active (%d caller%s)", conference->count, conference->count == 1 ? "" : "s");
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_type", "presence");
switch_event_fire(&event);
}
if ((conference->min && switch_test_flag(conference, CFLAG_ENFORCE_MIN) && conference->count < conference->min)
|| (switch_test_flag(conference, CFLAG_DYNAMIC) && conference->count == 0) ) {
switch_set_flag(conference, CFLAG_DESTRUCT);
} else {
if (conference->exit_sound) {
conference_play_file(conference, conference->exit_sound, 0);
}
if (conference->count == 1 && conference->alone_sound) {
conference_play_file(conference, conference->alone_sound, 0);
}
}
if (conference->count == 1 && conference->alone_sound) {
conference_play_file(conference, conference->alone_sound, 0);
}
}
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, CONF_EVENT_MAINT) == SWITCH_STATUS_SUCCESS) {
switch_channel_t *channel = switch_core_session_get_channel(member->session);
switch_channel_event_set_data(channel, event);
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, CONF_EVENT_MAINT) == SWITCH_STATUS_SUCCESS) {
switch_channel_t *channel = switch_core_session_get_channel(member->session);
switch_channel_event_set_data(channel, event);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Conference-Name", conference->name);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Action", "del-member");
switch_event_fire(&event);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Conference-Name", conference->name);
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Action", "del-member");
switch_event_fire(&event);
}
}
switch_mutex_unlock(member->flag_mutex);
switch_mutex_unlock(member->audio_out_mutex);
switch_mutex_unlock(member->audio_in_mutex);
@ -488,7 +520,9 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v
return NULL;
}
switch_mutex_lock(globals.hash_mutex);
globals.threads++;
switch_mutex_unlock(globals.hash_mutex);
while(globals.running && !switch_test_flag(conference, CFLAG_DESTRUCT)) {
uint8_t file_frame[CONF_BUFFER_SIZE] = {0};
@ -672,23 +706,32 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v
switch_mutex_lock(conference->mutex);
for(imember = conference->members; imember; imember = imember->next) {
switch_channel_t *channel = switch_core_session_get_channel(imember->session);
// add this little bit to preserve the bridge cause code in case of an early media call that
// never answers
if (switch_test_flag(conference, CFLAG_ANSWERED))
switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING);
else
// put actual cause code from outbound channel hangup here
switch_channel_hangup(channel, conference->bridge_hangup_cause);
switch_channel_t *channel;
if (!switch_test_flag(imember, MFLAG_NOCHANNEL)) {
channel = switch_core_session_get_channel(imember->session);
// add this little bit to preserve the bridge cause code in case of an early media call that
// never answers
if (switch_test_flag(conference, CFLAG_ANSWERED)) {
switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING);
} else {
// put actual cause code from outbound channel hangup here
switch_channel_hangup(channel, conference->bridge_hangup_cause);
}
}
switch_clear_flag_locked(imember, MFLAG_RUNNING);
}
switch_mutex_unlock(conference->mutex);
/* Wait till everybody is out */
switch_clear_flag_locked(conference, CFLAG_RUNNING);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Write Lock ON\n");
switch_thread_rwlock_wrlock(conference->rwlock);
switch_thread_rwlock_unlock(conference->rwlock);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Write Lock OFF\n");
switch_mutex_lock(globals.hash_mutex);
switch_core_hash_delete(globals.conference_hash, conference->name);
@ -710,7 +753,10 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v
switch_event_fire(&event);
}
switch_mutex_lock(globals.hash_mutex);
globals.threads--;
switch_mutex_unlock(globals.hash_mutex);
return NULL;
}
@ -1027,6 +1073,117 @@ static void conference_loop(conference_member_t *member)
}
}
/* Sub-Routine called by a record entity inside a conference */
static void *SWITCH_THREAD_FUNC conference_record_thread_run(switch_thread_t *thread, void *obj)
{
switch_frame_t write_frame = {0};
uint8_t data[SWITCH_RECCOMMENDED_BUFFER_SIZE];
switch_file_handle_t fh = {0};
conference_member_t smember = {0}, *member;
conference_record_t *rec = (conference_record_t *) obj;
uint32_t divider = 1000 / rec->conference->interval;
uint32_t samples = (rec->conference->rate / divider);
uint32_t bytes = samples * 2;
uint32_t mux_used;
char *vval;
switch_mutex_lock(globals.hash_mutex);
globals.threads++;
switch_mutex_unlock(globals.hash_mutex);
member = &smember;
member->flags = MFLAG_CAN_HEAR | MFLAG_NOCHANNEL | MFLAG_RUNNING;
write_frame.data = data;
write_frame.buflen = sizeof(data);
assert(rec->conference != NULL);
member->conference = rec->conference;
member->native_rate = rec->conference->rate;
member->rec_path = rec->path;
fh.channels = 1;
fh.samplerate = rec->conference->rate;
member->id = next_member_id();
member->pool = rec->pool;
switch_mutex_init(&member->flag_mutex, SWITCH_MUTEX_NESTED, rec->pool);
switch_mutex_init(&member->audio_in_mutex, SWITCH_MUTEX_NESTED, rec->pool);
switch_mutex_init(&member->audio_out_mutex, SWITCH_MUTEX_NESTED, rec->pool);
/* Setup an audio buffer for the incoming audio */
if (switch_buffer_create_dynamic(&member->audio_buffer, CONF_DBLOCK_SIZE, CONF_DBUFFER_SIZE, 0) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error Creating Audio Buffer!\n");
goto end;
}
/* Setup an audio buffer for the outgoing audio */
if (switch_buffer_create_dynamic(&member->mux_buffer, CONF_DBLOCK_SIZE, CONF_DBUFFER_SIZE, 0) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error Creating Audio Buffer!\n");
goto end;
}
if (conference_add_member(rec->conference, member) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Joining Conference\n");
goto end;
}
if (switch_core_file_open(&fh,
rec->path,
SWITCH_FILE_FLAG_WRITE | SWITCH_FILE_DATA_SHORT,
rec->pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening File [%s]\n", rec->path);
goto end;
}
if ((vval = switch_mprintf("Conference %s", rec->conference->name))) {
switch_core_file_set_string(&fh, SWITCH_AUDIO_COL_STR_TITLE, vval);
switch_safe_free(vval);
}
while(switch_test_flag(member, MFLAG_RUNNING) && switch_test_flag(rec->conference, CFLAG_RUNNING) && rec->conference->count) {
if ((mux_used = (uint32_t) switch_buffer_inuse(member->mux_buffer)) >= bytes) {
/* Flush the output buffer and write all the data (presumably muxed) back to the channel */
switch_mutex_lock(member->audio_out_mutex);
write_frame.data = data;
while ((write_frame.datalen = (uint32_t)switch_buffer_read(member->mux_buffer, write_frame.data, mux_used))) {
if (!switch_test_flag((&fh), SWITCH_FILE_PAUSE)) {
switch_size_t len = (switch_size_t) mux_used / 2;
switch_core_file_write(&fh, write_frame.data, &len);
}
}
switch_mutex_unlock(member->audio_out_mutex);
} else {
switch_yield(20000);
}
} /* Rinse ... Repeat */
conference_del_member(rec->conference, member);
switch_buffer_destroy(&member->audio_buffer);
switch_buffer_destroy(&member->mux_buffer);
switch_clear_flag_locked(member, MFLAG_RUNNING);
switch_core_file_close(&fh);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Recording Stopped\n");
end:
if (rec->pool) {
switch_memory_pool_t *pool = rec->pool;
rec = NULL;
switch_core_destroy_memory_pool(&pool);
}
switch_mutex_lock(globals.hash_mutex);
globals.threads--;
switch_mutex_unlock(globals.hash_mutex);
return NULL;
}
/* Make files stop playing in a conference either the current one or all of them */
static uint32_t conference_stop_file(conference_obj_t *conference, file_stop_t stop)
{
@ -1329,12 +1486,22 @@ static void conference_list(conference_obj_t *conference, switch_stream_handle_t
switch_mutex_lock(conference->member_mutex);
for (member = conference->members; member; member = member->next) {
switch_channel_t *channel = switch_core_session_get_channel(member->session);
switch_caller_profile_t *profile = switch_channel_get_caller_profile(channel);
char *uuid = switch_core_session_get_uuid(member->session);
char *name = switch_channel_get_name(channel);
switch_channel_t *channel;
switch_caller_profile_t *profile;
char *uuid;
char *name;
uint32_t count = 0;
if (switch_test_flag(member, MFLAG_NOCHANNEL)) {
continue;
}
uuid = switch_core_session_get_uuid(member->session);
channel = switch_core_session_get_channel(member->session);
profile = switch_channel_get_caller_profile(channel);
name = switch_channel_get_name(channel);
stream->write_function(stream, "%u%s%s%s%s%s%s%s%s%s",
member->id,delim,
name,delim,
@ -1366,8 +1533,15 @@ static void conference_list_pretty(conference_obj_t *conference, switch_stream_h
stream->write_function(stream, "<pre>Current Callers:\n");
for (member = conference->members; member; member = member->next) {
switch_channel_t *channel = switch_core_session_get_channel(member->session);
switch_caller_profile_t *profile = switch_channel_get_caller_profile(channel);
switch_channel_t *channel;
switch_caller_profile_t *profile;
if (switch_test_flag(member, MFLAG_NOCHANNEL)) {
continue;
}
channel = switch_core_session_get_channel(member->session);
profile = switch_channel_get_caller_profile(channel);
stream->write_function(stream, "*) %s (%s)\n",
profile->caller_id_name,
@ -1846,6 +2020,15 @@ static switch_status_t conf_function(char *buf, switch_core_session_t *session,
stream->write_function(stream, "usage undeaf <id>\n");
goto done;
}
} else if (!strcasecmp(argv[1], "record")) {
if (argc > 2) {
launch_conference_record_thread(conference, argv[2]);
} else {
stream->write_function(stream, "-ERR No Path Specified!\n");
}
} else if (!strcasecmp(argv[1], "norecord")) {
conference_record_stop(conference, argv[1]);
} else if (!strcasecmp(argv[1], "kick")) {
if (argc > 2) {
uint32_t id = atoi(argv[2]);
@ -2477,6 +2660,8 @@ static void conference_function(switch_core_session_t *session, char *data)
goto codec_done1;
}
/* Prepare MUTEXS */
member.id = next_member_id();
member.pool = pool;
@ -2557,6 +2742,34 @@ static void launch_conference_thread(conference_obj_t *conference)
switch_thread_create(&thread, thd_attr, conference_thread_run, conference, conference->pool);
}
static void launch_conference_record_thread(conference_obj_t *conference, char *path)
{
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
switch_memory_pool_t *pool;
conference_record_t *rec;
/* Setup a memory pool to use. */
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Pool Failure\n");
}
/* Create a node object*/
if (!(rec = switch_core_alloc(pool, sizeof(*rec)))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Alloc Failure\n");
switch_core_destroy_memory_pool(&pool);
}
rec->conference = conference;
rec->path = switch_core_strdup(pool, path);
rec->pool = pool;
switch_threadattr_create(&thd_attr, rec->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, conference_record_thread_run, rec, rec->pool);
}
static void *SWITCH_THREAD_FUNC input_thread_run(switch_thread_t *thread, void *obj)
{
conference_member_t *member = obj;
@ -3087,6 +3300,7 @@ SWITCH_MOD_DECLARE(switch_status_t) switch_module_shutdown(void)
if (globals.running) {
globals.running = 0;
while (globals.threads) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for %d threads\n", globals.threads);
switch_yield(100000);
}
}

View File

@ -73,6 +73,8 @@ static switch_status_t sndfile_file_open(switch_file_handle_t *handle, char *pat
}
if (mode & SFM_WRITE) {
sf_count_t frames = 0 ;
context->sfinfo.channels = handle->channels;
context->sfinfo.samplerate = handle->samplerate;
if (handle->samplerate == 8000 || handle->samplerate == 16000) {
@ -83,6 +85,8 @@ static switch_status_t sndfile_file_open(switch_file_handle_t *handle, char *pat
context->sfinfo.format |= SF_FORMAT_PCM_32;
}
sf_command (context->handle, SFC_FILE_TRUNCATE, &frames, sizeof (frames)) ;
/* Could add more else if() but i am too lazy atm.. */
if (!strcasecmp(ext, "wav")) {
context->sfinfo.format |= SF_FORMAT_WAV;