From b2f9fae6d17e171c9db3ba6c9d30b7721d878bd9 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Sat, 21 Oct 2006 21:19:40 +0000 Subject: [PATCH] conference recording git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@3139 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- .../mod_conference/mod_conference.c | 358 ++++++++++++++---- src/mod/formats/mod_sndfile/mod_sndfile.c | 4 + 2 files changed, 290 insertions(+), 72 deletions(-) diff --git a/src/mod/applications/mod_conference/mod_conference.c b/src/mod/applications/mod_conference/mod_conference.c index 70c7652b22..022a7d2844 100644 --- a/src/mod/applications/mod_conference/mod_conference.c +++ b/src/mod/applications/mod_conference/mod_conference.c @@ -73,7 +73,8 @@ typedef enum { MFLAG_CAN_SPEAK = (1 << 1), MFLAG_CAN_HEAR = (1 << 2), MFLAG_KICKED = (1 << 3), - MFLAG_ITHREAD = (1 << 4) + MFLAG_ITHREAD = (1 << 4), + MFLAG_NOCHANNEL = (1 << 5) } member_flag_t; @@ -170,6 +171,7 @@ struct conference_member { switch_mutex_t *audio_out_mutex; switch_codec_t read_codec; switch_codec_t write_codec; + char *rec_path; uint8_t *frame; uint8_t *mux_frame; uint32_t buflen; @@ -186,6 +188,15 @@ struct conference_member { struct conference_member *next; }; +/* Record Node */ +struct conference_record { + conference_obj_t *conference; + char *path; + switch_memory_pool_t *pool; +}; +typedef struct conference_record conference_record_t; + + /* Function Prototypes */ static uint32_t next_member_id(void); static conference_relationship_t *member_get_relationship(conference_member_t *member, conference_member_t *other_member); @@ -220,6 +231,7 @@ static uint32_t conference_member_stop_file(conference_member_t *member, file_st static conference_obj_t *conference_new(char *name, switch_xml_t profile, switch_memory_pool_t *pool); static void switch_change_sln_volume(int16_t *data, uint32_t samples, int32_t vol); static switch_status_t chat_send(char *proto, char *from, char *to, char *subject, char *body, char *hint); +static void launch_conference_record_thread(conference_obj_t *conference, char *path); /* Return a Distinct ID # */ static uint32_t next_member_id(void) @@ -294,6 +306,11 @@ static conference_member_t *conference_member_get(conference_obj_t *conference, conference_member_t *member = NULL; for(member = conference->members; member; member = member->next) { + + if (switch_test_flag(member, MFLAG_NOCHANNEL)) { + continue; + } + if (member->id == id) { break; } @@ -302,6 +319,17 @@ static conference_member_t *conference_member_get(conference_obj_t *conference, return member; } +static void conference_record_stop(conference_obj_t *conference, char *path) +{ + conference_member_t *member = NULL; + + for(member = conference->members; member; member = member->next) { + if (switch_test_flag(member, MFLAG_NOCHANNEL) && (!path || !strcmp(path, member->rec_path))) { + switch_clear_flag_locked(member, MFLAG_RUNNING); + } + } +} + /* Add a custom relationship to a member */ static conference_relationship_t *member_add_relationship(conference_member_t *member, uint32_t id) { @@ -359,39 +387,41 @@ static switch_status_t conference_add_member(conference_obj_t *conference, confe member->next = conference->members; member->energy_level = conference->energy_level; conference->members = member; - conference->count++; - - if (switch_event_create(&event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) { - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "proto", CONF_CHAT_PROTO); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s", conference->name); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s@%s", conference->name, conference->domain); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Active (%d caller%s)", conference->count, conference->count == 1 ? "" : "s"); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_type", "presence"); - switch_event_fire(&event); - } - if (conference->enter_sound) { - conference_play_file(conference, conference->enter_sound, CONF_DEFAULT_LEADIN); - } + if (!switch_test_flag(member, MFLAG_NOCHANNEL)) { + conference->count++; + if (switch_event_create(&event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) { + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "proto", CONF_CHAT_PROTO); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s", conference->name); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s@%s", conference->name, conference->domain); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Active (%d caller%s)", conference->count, conference->count == 1 ? "" : "s"); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_type", "presence"); + switch_event_fire(&event); + } + - if (conference->count == 1 && conference->alone_sound) { - conference_play_file(conference, conference->alone_sound, 0); - } - - if (conference->min && conference->count >= conference->min) { - switch_set_flag(conference, CFLAG_ENFORCE_MIN); - } - - if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, CONF_EVENT_MAINT) == SWITCH_STATUS_SUCCESS) { - switch_channel_t *channel = switch_core_session_get_channel(member->session); - switch_channel_event_set_data(channel, event); - - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Conference-Name", conference->name); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Action", "add-member"); - switch_event_fire(&event); - } + if (conference->enter_sound) { + conference_play_file(conference, conference->enter_sound, CONF_DEFAULT_LEADIN); + } + if (conference->count == 1 && conference->alone_sound) { + conference_play_file(conference, conference->alone_sound, 0); + } + + if (conference->min && conference->count >= conference->min) { + switch_set_flag(conference, CFLAG_ENFORCE_MIN); + } + + if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, CONF_EVENT_MAINT) == SWITCH_STATUS_SUCCESS) { + switch_channel_t *channel = switch_core_session_get_channel(member->session); + switch_channel_event_set_data(channel, event); + + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Conference-Name", conference->name); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Action", "add-member"); + switch_event_fire(&event); + } + } switch_mutex_unlock(member->flag_mutex); switch_mutex_unlock(member->audio_out_mutex); switch_mutex_unlock(member->audio_in_mutex); @@ -425,39 +455,41 @@ static void conference_del_member(conference_obj_t *conference, conference_membe last = imember; } - conference->count--; + member->conference = NULL; - if (switch_event_create(&event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) { - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "proto", CONF_CHAT_PROTO); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s", conference->name); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s@%s", conference->name, conference->domain); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Active (%d caller%s)", conference->count, conference->count == 1 ? "" : "s"); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_type", "presence"); - switch_event_fire(&event); - } - - if ((conference->min && switch_test_flag(conference, CFLAG_ENFORCE_MIN) && conference->count < conference->min) - || (switch_test_flag(conference, CFLAG_DYNAMIC) && conference->count == 0) ) { - switch_set_flag(conference, CFLAG_DESTRUCT); - } else { - if (conference->exit_sound) { - conference_play_file(conference, conference->exit_sound, 0); + if (!switch_test_flag(member, MFLAG_NOCHANNEL)) { + conference->count--; + if (switch_event_create(&event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) { + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "proto", CONF_CHAT_PROTO); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "login", "%s", conference->name); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "from", "%s@%s", conference->name, conference->domain); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "status", "Active (%d caller%s)", conference->count, conference->count == 1 ? "" : "s"); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "event_type", "presence"); + switch_event_fire(&event); + } + + if ((conference->min && switch_test_flag(conference, CFLAG_ENFORCE_MIN) && conference->count < conference->min) + || (switch_test_flag(conference, CFLAG_DYNAMIC) && conference->count == 0) ) { + switch_set_flag(conference, CFLAG_DESTRUCT); + } else { + if (conference->exit_sound) { + conference_play_file(conference, conference->exit_sound, 0); + } + if (conference->count == 1 && conference->alone_sound) { + conference_play_file(conference, conference->alone_sound, 0); + } } - if (conference->count == 1 && conference->alone_sound) { - conference_play_file(conference, conference->alone_sound, 0); - } - } - if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, CONF_EVENT_MAINT) == SWITCH_STATUS_SUCCESS) { - switch_channel_t *channel = switch_core_session_get_channel(member->session); - switch_channel_event_set_data(channel, event); + if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, CONF_EVENT_MAINT) == SWITCH_STATUS_SUCCESS) { + switch_channel_t *channel = switch_core_session_get_channel(member->session); + switch_channel_event_set_data(channel, event); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Conference-Name", conference->name); - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Action", "del-member"); - switch_event_fire(&event); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Conference-Name", conference->name); + switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Action", "del-member"); + switch_event_fire(&event); + } } - switch_mutex_unlock(member->flag_mutex); switch_mutex_unlock(member->audio_out_mutex); switch_mutex_unlock(member->audio_in_mutex); @@ -488,7 +520,9 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v return NULL; } + switch_mutex_lock(globals.hash_mutex); globals.threads++; + switch_mutex_unlock(globals.hash_mutex); while(globals.running && !switch_test_flag(conference, CFLAG_DESTRUCT)) { uint8_t file_frame[CONF_BUFFER_SIZE] = {0}; @@ -672,23 +706,32 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v switch_mutex_lock(conference->mutex); for(imember = conference->members; imember; imember = imember->next) { - switch_channel_t *channel = switch_core_session_get_channel(imember->session); - // add this little bit to preserve the bridge cause code in case of an early media call that - // never answers - if (switch_test_flag(conference, CFLAG_ANSWERED)) - switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING); - else - // put actual cause code from outbound channel hangup here - switch_channel_hangup(channel, conference->bridge_hangup_cause); + switch_channel_t *channel; + + if (!switch_test_flag(imember, MFLAG_NOCHANNEL)) { + channel = switch_core_session_get_channel(imember->session); + + // add this little bit to preserve the bridge cause code in case of an early media call that + // never answers + if (switch_test_flag(conference, CFLAG_ANSWERED)) { + switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_CLEARING); + } else { + // put actual cause code from outbound channel hangup here + switch_channel_hangup(channel, conference->bridge_hangup_cause); + } + } + switch_clear_flag_locked(imember, MFLAG_RUNNING); } switch_mutex_unlock(conference->mutex); /* Wait till everybody is out */ - + switch_clear_flag_locked(conference, CFLAG_RUNNING); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Write Lock ON\n"); switch_thread_rwlock_wrlock(conference->rwlock); switch_thread_rwlock_unlock(conference->rwlock); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Write Lock OFF\n"); switch_mutex_lock(globals.hash_mutex); switch_core_hash_delete(globals.conference_hash, conference->name); @@ -710,7 +753,10 @@ static void *SWITCH_THREAD_FUNC conference_thread_run(switch_thread_t *thread, v switch_event_fire(&event); } + switch_mutex_lock(globals.hash_mutex); globals.threads--; + switch_mutex_unlock(globals.hash_mutex); + return NULL; } @@ -1027,6 +1073,117 @@ static void conference_loop(conference_member_t *member) } } + +/* Sub-Routine called by a record entity inside a conference */ +static void *SWITCH_THREAD_FUNC conference_record_thread_run(switch_thread_t *thread, void *obj) +{ + switch_frame_t write_frame = {0}; + uint8_t data[SWITCH_RECCOMMENDED_BUFFER_SIZE]; + switch_file_handle_t fh = {0}; + conference_member_t smember = {0}, *member; + conference_record_t *rec = (conference_record_t *) obj; + uint32_t divider = 1000 / rec->conference->interval; + uint32_t samples = (rec->conference->rate / divider); + uint32_t bytes = samples * 2; + uint32_t mux_used; + char *vval; + + switch_mutex_lock(globals.hash_mutex); + globals.threads++; + switch_mutex_unlock(globals.hash_mutex); + + member = &smember; + + member->flags = MFLAG_CAN_HEAR | MFLAG_NOCHANNEL | MFLAG_RUNNING; + + write_frame.data = data; + write_frame.buflen = sizeof(data); + assert(rec->conference != NULL); + + member->conference = rec->conference; + member->native_rate = rec->conference->rate; + member->rec_path = rec->path; + fh.channels = 1; + fh.samplerate = rec->conference->rate; + member->id = next_member_id(); + member->pool = rec->pool; + + switch_mutex_init(&member->flag_mutex, SWITCH_MUTEX_NESTED, rec->pool); + switch_mutex_init(&member->audio_in_mutex, SWITCH_MUTEX_NESTED, rec->pool); + switch_mutex_init(&member->audio_out_mutex, SWITCH_MUTEX_NESTED, rec->pool); + + /* Setup an audio buffer for the incoming audio */ + if (switch_buffer_create_dynamic(&member->audio_buffer, CONF_DBLOCK_SIZE, CONF_DBUFFER_SIZE, 0) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error Creating Audio Buffer!\n"); + goto end; + } + + /* Setup an audio buffer for the outgoing audio */ + if (switch_buffer_create_dynamic(&member->mux_buffer, CONF_DBLOCK_SIZE, CONF_DBUFFER_SIZE, 0) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error Creating Audio Buffer!\n"); + goto end; + } + + if (conference_add_member(rec->conference, member) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Joining Conference\n"); + goto end; + } + + + if (switch_core_file_open(&fh, + rec->path, + SWITCH_FILE_FLAG_WRITE | SWITCH_FILE_DATA_SHORT, + rec->pool) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening File [%s]\n", rec->path); + goto end; + } + + if ((vval = switch_mprintf("Conference %s", rec->conference->name))) { + switch_core_file_set_string(&fh, SWITCH_AUDIO_COL_STR_TITLE, vval); + switch_safe_free(vval); + } + + while(switch_test_flag(member, MFLAG_RUNNING) && switch_test_flag(rec->conference, CFLAG_RUNNING) && rec->conference->count) { + if ((mux_used = (uint32_t) switch_buffer_inuse(member->mux_buffer)) >= bytes) { + /* Flush the output buffer and write all the data (presumably muxed) back to the channel */ + switch_mutex_lock(member->audio_out_mutex); + write_frame.data = data; + while ((write_frame.datalen = (uint32_t)switch_buffer_read(member->mux_buffer, write_frame.data, mux_used))) { + if (!switch_test_flag((&fh), SWITCH_FILE_PAUSE)) { + switch_size_t len = (switch_size_t) mux_used / 2; + switch_core_file_write(&fh, write_frame.data, &len); + } + } + switch_mutex_unlock(member->audio_out_mutex); + } else { + switch_yield(20000); + } + } /* Rinse ... Repeat */ + + conference_del_member(rec->conference, member); + switch_buffer_destroy(&member->audio_buffer); + switch_buffer_destroy(&member->mux_buffer); + switch_clear_flag_locked(member, MFLAG_RUNNING); + switch_core_file_close(&fh); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Recording Stopped\n"); + + end: + + + if (rec->pool) { + switch_memory_pool_t *pool = rec->pool; + rec = NULL; + switch_core_destroy_memory_pool(&pool); + } + + switch_mutex_lock(globals.hash_mutex); + globals.threads--; + switch_mutex_unlock(globals.hash_mutex); + + return NULL; +} + + /* Make files stop playing in a conference either the current one or all of them */ static uint32_t conference_stop_file(conference_obj_t *conference, file_stop_t stop) { @@ -1329,12 +1486,22 @@ static void conference_list(conference_obj_t *conference, switch_stream_handle_t switch_mutex_lock(conference->member_mutex); for (member = conference->members; member; member = member->next) { - switch_channel_t *channel = switch_core_session_get_channel(member->session); - switch_caller_profile_t *profile = switch_channel_get_caller_profile(channel); - char *uuid = switch_core_session_get_uuid(member->session); - char *name = switch_channel_get_name(channel); + switch_channel_t *channel; + switch_caller_profile_t *profile; + char *uuid; + char *name; uint32_t count = 0; + if (switch_test_flag(member, MFLAG_NOCHANNEL)) { + continue; + } + + uuid = switch_core_session_get_uuid(member->session); + channel = switch_core_session_get_channel(member->session); + profile = switch_channel_get_caller_profile(channel); + name = switch_channel_get_name(channel); + + stream->write_function(stream, "%u%s%s%s%s%s%s%s%s%s", member->id,delim, name,delim, @@ -1366,8 +1533,15 @@ static void conference_list_pretty(conference_obj_t *conference, switch_stream_h stream->write_function(stream, "
Current Callers:\n");
 
 	for (member = conference->members; member; member = member->next) {
-		switch_channel_t *channel = switch_core_session_get_channel(member->session);
-		switch_caller_profile_t *profile = switch_channel_get_caller_profile(channel);
+		switch_channel_t *channel;
+		switch_caller_profile_t *profile;
+
+		if (switch_test_flag(member, MFLAG_NOCHANNEL)) {
+			continue;
+		}
+		channel = switch_core_session_get_channel(member->session);
+		profile = switch_channel_get_caller_profile(channel);
+
 
 		stream->write_function(stream, "*) %s (%s)\n", 
 							   profile->caller_id_name,
@@ -1846,6 +2020,15 @@ static switch_status_t conf_function(char *buf, switch_core_session_t *session,
 						stream->write_function(stream, "usage undeaf \n");
 						goto done;
 					}
+
+				} else if (!strcasecmp(argv[1], "record")) {
+					if (argc > 2) {
+						launch_conference_record_thread(conference, argv[2]);
+					} else {
+						stream->write_function(stream, "-ERR No Path Specified!\n");
+					}
+				} else if (!strcasecmp(argv[1], "norecord")) {
+					conference_record_stop(conference, argv[1]);
 				} else if (!strcasecmp(argv[1], "kick")) {
 					if (argc > 2) {
 						uint32_t id = atoi(argv[2]);
@@ -2477,6 +2660,8 @@ static void conference_function(switch_core_session_t *session, char *data)
 		goto codec_done1;
 	}
 	
+
+
 	/* Prepare MUTEXS */
 	member.id = next_member_id();
 	member.pool = pool;
@@ -2557,6 +2742,34 @@ static void launch_conference_thread(conference_obj_t *conference)
 	switch_thread_create(&thread, thd_attr, conference_thread_run, conference, conference->pool);
 }
 
+static void launch_conference_record_thread(conference_obj_t *conference, char *path)
+{
+	switch_thread_t *thread;
+	switch_threadattr_t *thd_attr = NULL;
+	switch_memory_pool_t *pool;
+	conference_record_t *rec;
+	
+	/* Setup a memory pool to use. */
+	if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Pool Failure\n");
+	}
+
+	/* Create a node object*/
+	if (!(rec = switch_core_alloc(pool, sizeof(*rec)))) {
+		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Alloc Failure\n");
+		switch_core_destroy_memory_pool(&pool);
+	}
+	
+	rec->conference = conference;
+	rec->path = switch_core_strdup(pool, path);
+	rec->pool = pool;
+
+	switch_threadattr_create(&thd_attr, rec->pool);
+	switch_threadattr_detach_set(thd_attr, 1);
+	switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+	switch_thread_create(&thread, thd_attr, conference_record_thread_run, rec, rec->pool);
+}
+
 static void *SWITCH_THREAD_FUNC input_thread_run(switch_thread_t *thread, void *obj)
 {
 	conference_member_t *member = obj;
@@ -3087,6 +3300,7 @@ SWITCH_MOD_DECLARE(switch_status_t) switch_module_shutdown(void)
 	if (globals.running) {
 		globals.running = 0;
 		while (globals.threads) {
+			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for %d threads\n", globals.threads);
 			switch_yield(100000);
 		}
 	}
diff --git a/src/mod/formats/mod_sndfile/mod_sndfile.c b/src/mod/formats/mod_sndfile/mod_sndfile.c
index 93a6ce445a..a0e2ce25ae 100644
--- a/src/mod/formats/mod_sndfile/mod_sndfile.c
+++ b/src/mod/formats/mod_sndfile/mod_sndfile.c
@@ -73,6 +73,8 @@ static switch_status_t sndfile_file_open(switch_file_handle_t *handle, char *pat
 	}
 
 	if (mode & SFM_WRITE) {
+		sf_count_t  frames = 0 ;
+
 		context->sfinfo.channels = handle->channels;
 		context->sfinfo.samplerate = handle->samplerate;
 		if (handle->samplerate == 8000 || handle->samplerate == 16000) {
@@ -83,6 +85,8 @@ static switch_status_t sndfile_file_open(switch_file_handle_t *handle, char *pat
 			context->sfinfo.format |= SF_FORMAT_PCM_32;
 		}
 
+        sf_command (context->handle, SFC_FILE_TRUNCATE, &frames, sizeof (frames)) ;
+
 		/* Could add more else if() but i am too lazy atm.. */
 		if (!strcasecmp(ext, "wav")) {
 			context->sfinfo.format |= SF_FORMAT_WAV;