From 1a95ef664b0078b05a51dba5a4d3ea38f3016232 Mon Sep 17 00:00:00 2001 From: Seven Du Date: Fri, 12 Apr 2019 12:04:04 +0800 Subject: [PATCH] FS-11721 add a send queue to buffer msg messages before msrp socket is ready --- src/include/switch_msrp.h | 1 + src/switch_msrp.c | 63 ++++++++++++++++++++++++++++++++++----- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/src/include/switch_msrp.h b/src/include/switch_msrp.h index c86a166877..c1e2192537 100644 --- a/src/include/switch_msrp.h +++ b/src/include/switch_msrp.h @@ -118,6 +118,7 @@ struct switch_msrp_session_s{ uint8_t frame_data[SWITCH_RTP_MAX_BUF_LEN]; int running; void *user_data; + switch_queue_t *send_queue; }; SWITCH_DECLARE(switch_status_t) switch_msrp_init(void); diff --git a/src/switch_msrp.c b/src/switch_msrp.c index 29b3bbd6fd..1692a554db 100644 --- a/src/switch_msrp.c +++ b/src/switch_msrp.c @@ -331,7 +331,9 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_destroy() { switch_status_t st = SWITCH_STATUS_SUCCESS; switch_socket_t *sock; + globals.running = 0; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "destroying thread\n"); sock = globals.msock.sock; @@ -387,6 +389,14 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_session_destroy(switch_msrp_session_ switch_yield(20000); } + if ((*ms)->send_queue) { + switch_msrp_msg_t *msg = NULL; + + while (switch_queue_trypop((*ms)->send_queue, (void **)&msg) == SWITCH_STATUS_SUCCESS) { + switch_msrp_msg_destroy(&msg); + } + } + switch_mutex_destroy((*ms)->mutex); ms = NULL; return SWITCH_STATUS_SUCCESS; @@ -1467,7 +1477,7 @@ void random_string(char *buf, uint16_t size) } #define MSRP_TRANS_ID_LEN 16 -SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *ms, switch_msrp_msg_t *msrp_msg, const char *file, const char *func, int line) +static switch_status_t switch_msrp_do_send(switch_msrp_session_t *ms, switch_msrp_msg_t *msrp_msg, const char *file, const char *func, int line) { char transaction_id[MSRP_TRANS_ID_LEN + 1] = { 0 }; char buf[MSRP_BUFF_SIZE]; @@ -1482,11 +1492,6 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t * return SWITCH_STATUS_SUCCESS; } - if (!ms->running) { - switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "MSRP not ready! Discard one message\n"); - return SWITCH_STATUS_SUCCESS; - } - if (!from_path) { switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "NO FROM PATH\n"); return SWITCH_STATUS_SUCCESS; @@ -1524,6 +1529,47 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t * return ms->csock ? msrp_socket_send(ms->csock, buf, &len) : SWITCH_STATUS_FALSE; } +SWITCH_DECLARE (switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *ms, switch_msrp_msg_t *msrp_msg, const char *file, const char *func, int line) +{ + switch_msrp_msg_t *msg = NULL; + switch_status_t status = SWITCH_STATUS_SUCCESS; + + if (!ms->running) { + if (!ms->send_queue) { + switch_queue_create(&ms->send_queue, 100, ms->pool); + } + + switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "MSRP not ready! Buffering one message %" SWITCH_SIZE_T_FMT " bytes\n", msrp_msg->payload_bytes); + + if (globals.debug && msrp_msg->payload) { + switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "MSRP not ready! Buffered one message [%s]\n", msrp_msg->payload); + } + + msg = switch_msrp_msg_dup(msrp_msg); + + status = switch_queue_trypush(ms->send_queue, msg); + + if (status != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_ERROR, "MSRP queue FULL! Discard one message %" SWITCH_SIZE_T_FMT " bytes\n", msg->payload_bytes); + switch_msrp_msg_destroy(&msg); + } + + return status; + } + + if (ms->send_queue) { + while (status == SWITCH_STATUS_SUCCESS && switch_queue_trypop(ms->send_queue, (void **)&msg) == SWITCH_STATUS_SUCCESS) { + status = switch_msrp_do_send(ms, msg, file, func, line); + } + + switch_queue_term(ms->send_queue); + ms->send_queue = NULL; + } + + status = switch_msrp_do_send(ms, msrp_msg, file, func, line); + + return status; +} SWITCH_DECLARE(switch_msrp_msg_t *) switch_msrp_msg_create() { @@ -1553,8 +1599,10 @@ SWITCH_DECLARE(switch_msrp_msg_t *) switch_msrp_msg_dup(switch_msrp_msg_t *msg) new_msg->code_number = msg->code_number; new_msg->payload_bytes = msg->payload_bytes; - if (msg->payload) { + if (msg->payload_bytes > 0 && msg->payload) { + new_msg->payload = malloc(msg->payload_bytes + 1); memcpy(new_msg->payload, msg->payload, msg->payload_bytes); + *(new_msg->payload + msg->payload_bytes) = '\0'; } return new_msg; @@ -1562,7 +1610,6 @@ SWITCH_DECLARE(switch_msrp_msg_t *) switch_msrp_msg_dup(switch_msrp_msg_t *msg) SWITCH_DECLARE(void) switch_msrp_msg_destroy(switch_msrp_msg_t **msg) { - switch_msrp_msg_t *msrp_msg = *msg; if (msrp_msg->headers) { switch_event_destroy(&msrp_msg->headers);