chan_websocket: Add ability to place a MARK in the media stream.

Also cleaned up a few unused #if blocks, and started sending a few ERROR
events back to the apps.

Resolves: #1574

DeveloperNote: Apps can now send a `MARK_MEDIA` command with an optional
`correlation_id` parameter to chan_websocket which will be placed in the
media frame queue. When that frame is dequeued after all intervening media
has been played to the core, chan_websocket will send a
`MEDIA_MARK_PROCESSED` event to the app with the same correlation_id
(if any).
This commit is contained in:
George Joseph
2025-11-05 14:27:32 -07:00
committed by github-actions[bot]
parent f08020df80
commit 4fa314242b

View File

@@ -112,22 +112,13 @@ struct websocket_pvt {
#define HANGUP_CHANNEL "HANGUP"
#define START_MEDIA_BUFFERING "START_MEDIA_BUFFERING"
#define STOP_MEDIA_BUFFERING "STOP_MEDIA_BUFFERING"
#define MARK_MEDIA "MARK_MEDIA"
#define FLUSH_MEDIA "FLUSH_MEDIA"
#define GET_DRIVER_STATUS "GET_STATUS"
#define REPORT_QUEUE_DRAINED "REPORT_QUEUE_DRAINED"
#define PAUSE_MEDIA "PAUSE_MEDIA"
#define CONTINUE_MEDIA "CONTINUE_MEDIA"
#if 0
#define MEDIA_START "MEDIA_START"
#define MEDIA_XON "MEDIA_XON"
#define MEDIA_XOFF "MEDIA_XOFF"
#define QUEUE_DRAINED "QUEUE_DRAINED"
#define DRIVER_STATUS "STATUS"
#define MEDIA_BUFFERING_COMPLETED "MEDIA_BUFFERING_COMPLETED"
#define DTMF_END "DTMF_END"
#endif
#define QUEUE_LENGTH_MAX 1000
#define QUEUE_LENGTH_XOFF_LEVEL 900
#define QUEUE_LENGTH_XON_LEVEL 800
@@ -272,6 +263,36 @@ static char *_create_event_MEDIA_BUFFERING_COMPLETED(struct websocket_pvt *insta
return payload;
}
/*!
* \internal
* \brief Print the MEDIA_MARK_PROCESSED event.
* \warning Do not call directly.
*/
static char *_create_event_MEDIA_MARK_PROCESSED(struct websocket_pvt *instance,
const char *id)
{
char *payload = NULL;
if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) {
struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
"event", "MEDIA_MARK_PROCESSED",
"channel_id", ast_channel_uniqueid(instance->channel),
"correlation_id", S_OR(id, "")
);
if (!msg) {
return NULL;
}
payload = ast_json_dump_string_format(msg, AST_JSON_COMPACT);
ast_json_unref(msg);
} else {
ast_asprintf(&payload, "%s%s%s",
"MEDIA_MARK_PROCESSED",
S_COR(id, " ",""), S_OR(id, ""));
}
return payload;
}
/*!
* \internal
* \brief Print the DTMF_END event.
@@ -345,15 +366,27 @@ static char *_create_event_STATUS(struct websocket_pvt *instance)
* \brief Print the ERROR event.
* \warning Do not call directly.
*/
static char *_create_event_ERROR(struct websocket_pvt *instance,
const char *error_text)
static __attribute__ ((format (gnu_printf, 2, 3))) char *_create_event_ERROR(
struct websocket_pvt *instance, const char *format, ...)
{
char *payload = NULL;
char *error_text = NULL;
va_list ap;
int res = 0;
va_start(ap, format);
res = ast_vasprintf(&error_text, format, ap);
va_end(ap);
if (res < 0 || !error_text) {
return NULL;
}
if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) {
struct ast_json *msg = ast_json_pack("{s:s, s:s, s:s}",
"event", "ERROR",
"channel_id", ast_channel_uniqueid(instance->channel),
"error_text", error_text);
ast_free(error_text);
if (!msg) {
return NULL;
}
@@ -362,6 +395,7 @@ static char *_create_event_ERROR(struct websocket_pvt *instance,
} else {
ast_asprintf(&payload, "%s channel_id:%s error_text:%s",
"ERROR", ast_channel_uniqueid(instance->channel), error_text);
ast_free(error_text);
}
return payload;
@@ -722,6 +756,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
} else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) {
if (instance->passthrough) {
send_event(instance, ERROR, "%s not supported in passthrough mode", command);
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
ast_channel_name(instance->channel), command);
return 0;
@@ -743,6 +778,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
}
if (instance->passthrough) {
send_event(instance, ERROR, "%s not supported in passthrough mode", command);
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
ast_channel_name(instance->channel), command);
return 0;
@@ -767,10 +803,40 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
res = queue_option_frame(instance, option);
ast_free(option);
} else if (ast_strings_equal(command, MARK_MEDIA)) {
const char *id;
char *option;
SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK,
AST_LIST_UNLOCK);
if (instance->passthrough) {
send_event(instance, ERROR, "%s not supported in passthrough mode", command);
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
ast_channel_name(instance->channel), command);
return 0;
}
if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) {
id = ast_json_object_string_get(json, "correlation_id");
} else {
id = data;
}
ast_debug(4, "%s: %s %s\n",
ast_channel_name(instance->channel), MARK_MEDIA, id);
option = create_event(instance, MEDIA_MARK_PROCESSED, id);
if (!option) {
return -1;
}
res = queue_option_frame(instance, option);
ast_free(option);
} else if (ast_strings_equal(command, FLUSH_MEDIA)) {
struct ast_frame *frame = NULL;
if (instance->passthrough) {
send_event(instance, ERROR, "FLUSH_MEDIA not supported in passthrough mode");
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
ast_channel_name(instance->channel), command);
return 0;
@@ -787,6 +853,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
} else if (ast_strings_equal(command, REPORT_QUEUE_DRAINED)) {
if (instance->passthrough) {
send_event(instance, ERROR, "%s not supported in passthrough mode", command);
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
ast_channel_name(instance->channel), command);
return 0;
@@ -801,6 +868,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
} else if (ast_strings_equal(command, PAUSE_MEDIA)) {
if (instance->passthrough) {
send_event(instance, ERROR, "%s not supported in passthrough mode", command);
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
ast_channel_name(instance->channel), command);
return 0;
@@ -811,6 +879,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
} else if (ast_strings_equal(command, CONTINUE_MEDIA)) {
if (instance->passthrough) {
send_event(instance, ERROR, "%s not supported in passthrough mode", command);
ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n",
ast_channel_name(instance->channel), command);
return 0;
@@ -1515,7 +1584,8 @@ static struct ast_channel *webchan_request(const char *type,
);
struct ast_flags opts = { 0, };
char *opt_args[OPT_ARG_ARRAY_SIZE];
const char *requestor_name = requestor ? ast_channel_name(requestor) : "no channel";
const char *requestor_name = requestor ? ast_channel_name(requestor) :
(assignedids && !ast_strlen_zero(assignedids->uniqueid) ? assignedids->uniqueid : "<unknown>");
RAII_VAR(struct webchan_conf_global *, global_cfg, NULL, ao2_cleanup);
global_cfg = ast_sorcery_retrieve_by_id(sorcery, "global", "global");
@@ -1547,16 +1617,12 @@ static struct ast_channel *webchan_request(const char *type,
if (ast_test_flag(&opts, OPT_WS_CODEC)
&& !ast_strlen_zero(opt_args[OPT_ARG_WS_CODEC])) {
ast_debug(3, "%s: Using specified format %s\n",
requestor_name, opt_args[OPT_ARG_WS_CODEC]);
fmt = ast_format_cache_get(opt_args[OPT_ARG_WS_CODEC]);
} else {
/*
* If codec wasn't specified in the dial string,
* use the first format in the capabilities.
*/
ast_debug(3, "%s: Using format %s from requesting channel\n",
requestor_name, opt_args[OPT_ARG_WS_CODEC]);
fmt = ast_format_cap_get_format(cap, 0);
}
@@ -1566,6 +1632,10 @@ static struct ast_channel *webchan_request(const char *type,
goto failure;
}
ast_debug(3, "%s: Using format %s from %s\n",
requestor_name, ast_format_get_name(fmt),
ast_test_flag(&opts, OPT_WS_CODEC) ? "dialstring" : "requester");
instance = websocket_new(requestor_name, args.connection_id, fmt);
if (!instance) {
ast_log(LOG_ERROR, "%s: Failed to allocate WebSocket channel pvt\n",