From 2ca53944259b8b80225c02aef257bf6fec990fb9 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Thu, 11 Jun 2015 10:40:26 -0500 Subject: [PATCH] FS-7621 backport to 1.4 --- src/mod/formats/mod_shout/mod_shout.c | 248 ++++++++++++++++++++++---- 1 file changed, 212 insertions(+), 36 deletions(-) diff --git a/src/mod/formats/mod_shout/mod_shout.c b/src/mod/formats/mod_shout/mod_shout.c index b384d1a8a4..763ca5ec1b 100644 --- a/src/mod/formats/mod_shout/mod_shout.c +++ b/src/mod/formats/mod_shout/mod_shout.c @@ -117,7 +117,6 @@ struct shout_context { int dlen; FILE *fp; size_t samplerate; - uint8_t thread_running; uint8_t shout_init; uint32_t prebuf; int lame_ready; @@ -128,6 +127,9 @@ struct shout_context { switch_size_t mp3buflen; switch_thread_rwlock_t *rwlock; int buffer_seconds; + switch_thread_t *read_stream_thread; + switch_thread_t *write_stream_thread; + curl_socket_t curlfd; }; typedef struct shout_context shout_context_t; @@ -137,6 +139,7 @@ static void decode_fd(shout_context_t *context, void *data, size_t bytes); static inline void free_context(shout_context_t *context) { size_t ret; + switch_status_t st; if (context) { switch_mutex_lock(context->audio_mutex); @@ -144,16 +147,17 @@ static inline void free_context(shout_context_t *context) switch_mutex_unlock(context->audio_mutex); if (context->stream_url) { - int sanity = 0; - - while (context->thread_running) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for stream to terminate: %s\n", context->stream_url); - switch_yield(500000); - if (++sanity > 10) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Giving up waiting for stream to terminate: %s\n", context->stream_url); - break; - } + if (context->curlfd > -1) { + shutdown(context->curlfd, 2); } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for stream to terminate: %s\n", context->stream_url); + if (context->read_stream_thread) { + switch_thread_join(&st, context->read_stream_thread); + } + } + + if (context->write_stream_thread) { + switch_thread_join(&st, context->write_stream_thread); } switch_thread_rwlock_wrlock(context->rwlock); @@ -368,6 +372,10 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data) uint32_t buf_size = 1024 * 128; /* do not make this 64 or less, stutter will ensue after first 64k buffer is dry */ switch_size_t used; + if (context->err) { + goto error; + } + if (!context->stream_channels) { long rate = 0; int channels = 0; @@ -398,6 +406,10 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data) switch_yield(500000); } + if (context->err) { + goto error; + } + if (mpg123_feed(context->mh, ptr, realsize) != MPG123_OK) { goto error; } @@ -442,6 +454,22 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data) return 0; } +static int progress_callback(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow) +{ + shout_context_t *context = (shout_context_t *) clientp; + return context->err; +} + + +static int sockopt_callback(void *clientp, curl_socket_t curlfd, + curlsocktype purpose) +{ + shout_context_t *context = (shout_context_t *) clientp; + + context->curlfd = curlfd; + + return CURL_SOCKOPT_OK; +} #define MY_BUF_LEN 1024 * 32 #define MY_BLOCK_SIZE MY_BUF_LEN @@ -452,9 +480,11 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void shout_context_t *context = (shout_context_t *) obj; switch_thread_rwlock_rdlock(context->rwlock); - + context->curlfd = -1; curl_handle = switch_curl_easy_init(); switch_curl_easy_setopt(curl_handle, CURLOPT_URL, context->stream_url); + curl_easy_setopt(curl_handle, CURLOPT_PROGRESSFUNCTION, progress_callback); + curl_easy_setopt(curl_handle, CURLOPT_PROGRESSDATA, (void *)context); switch_curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, 1); switch_curl_easy_setopt(curl_handle, CURLOPT_MAXREDIRS, 10); switch_curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, stream_callback); @@ -465,7 +495,11 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void switch_curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, 100); /* handle trickle connections */ switch_curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, 30); switch_curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, context->curl_error_buff); + curl_easy_setopt(curl_handle, CURLOPT_SOCKOPTFUNCTION, sockopt_callback); + curl_easy_setopt(curl_handle, CURLOPT_SOCKOPTDATA, (void *)context); + cc = switch_curl_easy_perform(curl_handle); + if (cc && cc != CURLE_WRITE_ERROR) { /* write error is ok, we just exited from callback early */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "CURL returned error:[%d] %s : %s [%s]\n", cc, switch_curl_easy_strerror(cc), context->curl_error_buff, context->stream_url); @@ -474,21 +508,17 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Read Thread Done\n"); context->eof++; - context->thread_running = 0; switch_thread_rwlock_unlock(context->rwlock); return NULL; } static void launch_read_stream_thread(shout_context_t *context) { - switch_thread_t *thread; switch_threadattr_t *thd_attr = NULL; - context->thread_running = 1; switch_threadattr_create(&thd_attr, context->memory_pool); - switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_thread_create(&thread, thd_attr, read_stream_thread, context, context->memory_pool); + switch_thread_create(&context->read_stream_thread, thd_attr, read_stream_thread, context, context->memory_pool); } #define error_check() if (context->err) goto error; @@ -499,20 +529,13 @@ static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, voi switch_thread_rwlock_rdlock(context->rwlock); - if (context->thread_running) { - context->thread_running++; - } else { - switch_thread_rwlock_unlock(context->rwlock); - return NULL; - } - if (!context->lame_ready) { lame_init_params(context->gfp); lame_print_config(context->gfp); context->lame_ready = 1; } - while (!context->err && context->thread_running) { + while (!context->err) { unsigned char mp3buf[20480] = ""; int16_t audio[9600] = { 0 }; switch_size_t audio_read = 0; @@ -575,31 +598,21 @@ static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, voi error: switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Write Thread Done\n"); switch_thread_rwlock_unlock(context->rwlock); - context->thread_running = 0; + return NULL; } static void launch_write_stream_thread(shout_context_t *context) { - switch_thread_t *thread; switch_threadattr_t *thd_attr = NULL; - int sanity = 10; if (context->err) { return; } - context->thread_running = 1; switch_threadattr_create(&thd_attr, context->memory_pool); - switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_thread_create(&thread, thd_attr, write_stream_thread, context, context->memory_pool); - - while (context->thread_running && context->thread_running != 2) { - switch_yield(100000); - if (!--sanity) - break; - } + switch_thread_create(&context->write_stream_thread, thd_attr, write_stream_thread, context, context->memory_pool); } #define TC_BUFFER_SIZE 1024 * 32 @@ -1549,10 +1562,137 @@ static switch_status_t load_config(void) return SWITCH_STATUS_SUCCESS; } +/* codec interface */ + +struct mp3_context { + lame_global_flags *gfp; +}; + +static switch_status_t switch_mp3_init(switch_codec_t *codec, switch_codec_flag_t flags, const switch_codec_settings_t *codec_settings) +{ + struct mp3_context *context = NULL; + int encoding, decoding; + + encoding = (flags & SWITCH_CODEC_FLAG_ENCODE); + decoding = (flags & SWITCH_CODEC_FLAG_DECODE); + + if (!(encoding || decoding) || (!(context = switch_core_alloc(codec->memory_pool, sizeof(struct mp3_context))))) { + return SWITCH_STATUS_FALSE; + } else { + const switch_codec_implementation_t *impl = codec->implementation; + + if (codec->fmtp_in) { + codec->fmtp_out = switch_core_strdup(codec->memory_pool, codec->fmtp_in); + } + + memset(context, 0, sizeof(struct mp3_context)); + + context->gfp = lame_init(); + + id3tag_init(context->gfp); + id3tag_v2_only(context->gfp); + id3tag_pad_v2(context->gfp); + + lame_set_num_channels(context->gfp, 1); + lame_set_in_samplerate(context->gfp, impl->actual_samples_per_second); + lame_set_out_samplerate(context->gfp, impl->actual_samples_per_second); + + if (impl->number_of_channels == 2) { + lame_set_mode(context->gfp, STEREO); + } else if (impl->number_of_channels == 1) { + lame_set_mode(context->gfp, MONO); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%d channels not supported\n", impl->number_of_channels); + } + + lame_set_brate(context->gfp, 16 * (impl->actual_samples_per_second / 8000) * impl->number_of_channels); + lame_set_quality(context->gfp, 2); + lame_set_errorf(context->gfp, log_error); + lame_set_debugf(context->gfp, log_debug); + lame_set_msgf(context->gfp, log_msg); + + lame_init_params(context->gfp); + lame_print_config(context->gfp); + + if (encoding) { + lame_set_bWriteVbrTag(context->gfp, 0); + lame_mp3_tags_fid(context->gfp, NULL); + lame_set_disable_reservoir(context->gfp, 1); + } + + if (decoding) { + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "MP3 framesize: %d\n", lame_get_framesize(context->gfp)); + + codec->private_info = context; + + return SWITCH_STATUS_SUCCESS; + } +} + +static switch_status_t switch_mp3_destroy(switch_codec_t *codec) +{ + struct mp3_context *context = codec->private_info; + + if (context && context->gfp) lame_close(context->gfp); + + codec->private_info = NULL; + + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t switch_mp3_encode(switch_codec_t *codec, + switch_codec_t *other_codec, + void *decoded_data, + uint32_t decoded_data_len, + uint32_t decoded_rate, + void *encoded_data, uint32_t *encoded_data_len, + uint32_t *encoded_rate, + unsigned int *flag) +{ + struct mp3_context *context = codec->private_info; + int len; + + if (!context) { + return SWITCH_STATUS_FALSE; + } + + if (codec->implementation->number_of_channels == 2) { + len = lame_encode_buffer_interleaved(context->gfp, decoded_data, decoded_data_len / 4, encoded_data, *encoded_data_len); + } else { + len = lame_encode_buffer(context->gfp, decoded_data, NULL, decoded_data_len / 2, encoded_data, *encoded_data_len); + } + + if (len < 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "encode error %d\n", len); + return SWITCH_STATUS_FALSE; + } + + *encoded_data_len = len; + + return SWITCH_STATUS_SUCCESS; +} + +static switch_status_t switch_mp3_decode(switch_codec_t *codec, + switch_codec_t *other_codec, + void *encoded_data, + uint32_t encoded_data_len, + uint32_t encoded_rate, void *decoded_data, uint32_t *decoded_data_len, uint32_t *decoded_rate, + unsigned int *flag) +{ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "decode not implemented!\n"); + return SWITCH_STATUS_FALSE; +} + SWITCH_MODULE_LOAD_FUNCTION(mod_shout_load) { switch_api_interface_t *shout_api_interface; switch_file_interface_t *file_interface; + switch_codec_interface_t *codec_interface; + int mpf = 10000, spf = 80, bpf = 160, count = 1; + int RATES[] = {8000, 11025, 16000, 22050, 32000, 44100, 48000}; + int i; supported_formats[0] = "shout"; supported_formats[1] = "mp3"; @@ -1576,6 +1716,42 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_shout_load) SWITCH_ADD_API(shout_api_interface, "telecast", "telecast", telecast_api_function, TELECAST_SYNTAX); + SWITCH_ADD_CODEC(codec_interface, "MP3"); + + for (count = 1; count <=4; count++) { + for (i = 0; i < sizeof(RATES) / sizeof(RATES[0]); i++) { + switch_core_codec_add_implementation(pool, codec_interface, SWITCH_CODEC_TYPE_AUDIO, + 98, /* the IANA code number */ + "MP3", /* the IANA code name */ + NULL, /* default fmtp to send (can be overridden by the init function) */ + RATES[i], /* samples transferred per second */ + RATES[i], /* actual samples transferred per second */ + 16 * RATES[i] / 8000, /* bits transferred per second */ + mpf * count, /* number of microseconds per frame */ + spf * count * RATES[i] / 8000, /* number of samples per frame */ + bpf * count * RATES[i] / 8000, /* number of bytes per frame decompressed */ + 0, /* number of bytes per frame compressed */ + 1, /* number of channels represented */ + 1, /* number of frames per network packet */ + switch_mp3_init, switch_mp3_encode, switch_mp3_decode, switch_mp3_destroy); + + switch_core_codec_add_implementation(pool, codec_interface, SWITCH_CODEC_TYPE_AUDIO, + 98, /* the IANA code number */ + "MP3", /* the IANA code name */ + NULL, /* default fmtp to send (can be overridden by the init function) */ + RATES[i], /* samples transferred per second */ + RATES[i], /* actual samples transferred per second */ + 16 * RATES[i] / 8000 * 2, /* bits transferred per second */ + mpf * count, /* number of microseconds per frame */ + spf * count * RATES[i] / 8000, /* number of samples per frame */ + bpf * count * RATES[i] / 8000 * 2, /* number of bytes per frame decompressed */ + 0, /* number of bytes per frame compressed */ + 2, /* number of channels represented */ + 1, /* number of frames per network packet */ + switch_mp3_init, switch_mp3_encode, switch_mp3_decode, switch_mp3_destroy); + } + } + /* indicate that the module should continue to be loaded */ return SWITCH_STATUS_SUCCESS; }