mirror of
https://github.com/signalwire/freeswitch.git
synced 2025-03-13 20:50:41 +00:00
FS-11721 add a send queue to buffer msg messages before msrp socket is ready
This commit is contained in:
parent
7432091fdd
commit
1a95ef664b
@ -118,6 +118,7 @@ struct switch_msrp_session_s{
|
|||||||
uint8_t frame_data[SWITCH_RTP_MAX_BUF_LEN];
|
uint8_t frame_data[SWITCH_RTP_MAX_BUF_LEN];
|
||||||
int running;
|
int running;
|
||||||
void *user_data;
|
void *user_data;
|
||||||
|
switch_queue_t *send_queue;
|
||||||
};
|
};
|
||||||
|
|
||||||
SWITCH_DECLARE(switch_status_t) switch_msrp_init(void);
|
SWITCH_DECLARE(switch_status_t) switch_msrp_init(void);
|
||||||
|
@ -331,7 +331,9 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_destroy()
|
|||||||
{
|
{
|
||||||
switch_status_t st = SWITCH_STATUS_SUCCESS;
|
switch_status_t st = SWITCH_STATUS_SUCCESS;
|
||||||
switch_socket_t *sock;
|
switch_socket_t *sock;
|
||||||
|
|
||||||
globals.running = 0;
|
globals.running = 0;
|
||||||
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "destroying thread\n");
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "destroying thread\n");
|
||||||
|
|
||||||
sock = globals.msock.sock;
|
sock = globals.msock.sock;
|
||||||
@ -387,6 +389,14 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_session_destroy(switch_msrp_session_
|
|||||||
switch_yield(20000);
|
switch_yield(20000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((*ms)->send_queue) {
|
||||||
|
switch_msrp_msg_t *msg = NULL;
|
||||||
|
|
||||||
|
while (switch_queue_trypop((*ms)->send_queue, (void **)&msg) == SWITCH_STATUS_SUCCESS) {
|
||||||
|
switch_msrp_msg_destroy(&msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
switch_mutex_destroy((*ms)->mutex);
|
switch_mutex_destroy((*ms)->mutex);
|
||||||
ms = NULL;
|
ms = NULL;
|
||||||
return SWITCH_STATUS_SUCCESS;
|
return SWITCH_STATUS_SUCCESS;
|
||||||
@ -1467,7 +1477,7 @@ void random_string(char *buf, uint16_t size)
|
|||||||
}
|
}
|
||||||
|
|
||||||
#define MSRP_TRANS_ID_LEN 16
|
#define MSRP_TRANS_ID_LEN 16
|
||||||
SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *ms, switch_msrp_msg_t *msrp_msg, const char *file, const char *func, int line)
|
static switch_status_t switch_msrp_do_send(switch_msrp_session_t *ms, switch_msrp_msg_t *msrp_msg, const char *file, const char *func, int line)
|
||||||
{
|
{
|
||||||
char transaction_id[MSRP_TRANS_ID_LEN + 1] = { 0 };
|
char transaction_id[MSRP_TRANS_ID_LEN + 1] = { 0 };
|
||||||
char buf[MSRP_BUFF_SIZE];
|
char buf[MSRP_BUFF_SIZE];
|
||||||
@ -1482,11 +1492,6 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *
|
|||||||
return SWITCH_STATUS_SUCCESS;
|
return SWITCH_STATUS_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ms->running) {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "MSRP not ready! Discard one message\n");
|
|
||||||
return SWITCH_STATUS_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!from_path) {
|
if (!from_path) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "NO FROM PATH\n");
|
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "NO FROM PATH\n");
|
||||||
return SWITCH_STATUS_SUCCESS;
|
return SWITCH_STATUS_SUCCESS;
|
||||||
@ -1524,6 +1529,47 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *
|
|||||||
return ms->csock ? msrp_socket_send(ms->csock, buf, &len) : SWITCH_STATUS_FALSE;
|
return ms->csock ? msrp_socket_send(ms->csock, buf, &len) : SWITCH_STATUS_FALSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SWITCH_DECLARE (switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *ms, switch_msrp_msg_t *msrp_msg, const char *file, const char *func, int line)
|
||||||
|
{
|
||||||
|
switch_msrp_msg_t *msg = NULL;
|
||||||
|
switch_status_t status = SWITCH_STATUS_SUCCESS;
|
||||||
|
|
||||||
|
if (!ms->running) {
|
||||||
|
if (!ms->send_queue) {
|
||||||
|
switch_queue_create(&ms->send_queue, 100, ms->pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "MSRP not ready! Buffering one message %" SWITCH_SIZE_T_FMT " bytes\n", msrp_msg->payload_bytes);
|
||||||
|
|
||||||
|
if (globals.debug && msrp_msg->payload) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "MSRP not ready! Buffered one message [%s]\n", msrp_msg->payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
msg = switch_msrp_msg_dup(msrp_msg);
|
||||||
|
|
||||||
|
status = switch_queue_trypush(ms->send_queue, msg);
|
||||||
|
|
||||||
|
if (status != SWITCH_STATUS_SUCCESS) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_ERROR, "MSRP queue FULL! Discard one message %" SWITCH_SIZE_T_FMT " bytes\n", msg->payload_bytes);
|
||||||
|
switch_msrp_msg_destroy(&msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ms->send_queue) {
|
||||||
|
while (status == SWITCH_STATUS_SUCCESS && switch_queue_trypop(ms->send_queue, (void **)&msg) == SWITCH_STATUS_SUCCESS) {
|
||||||
|
status = switch_msrp_do_send(ms, msg, file, func, line);
|
||||||
|
}
|
||||||
|
|
||||||
|
switch_queue_term(ms->send_queue);
|
||||||
|
ms->send_queue = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
status = switch_msrp_do_send(ms, msrp_msg, file, func, line);
|
||||||
|
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
SWITCH_DECLARE(switch_msrp_msg_t *) switch_msrp_msg_create()
|
SWITCH_DECLARE(switch_msrp_msg_t *) switch_msrp_msg_create()
|
||||||
{
|
{
|
||||||
@ -1553,8 +1599,10 @@ SWITCH_DECLARE(switch_msrp_msg_t *) switch_msrp_msg_dup(switch_msrp_msg_t *msg)
|
|||||||
new_msg->code_number = msg->code_number;
|
new_msg->code_number = msg->code_number;
|
||||||
new_msg->payload_bytes = msg->payload_bytes;
|
new_msg->payload_bytes = msg->payload_bytes;
|
||||||
|
|
||||||
if (msg->payload) {
|
if (msg->payload_bytes > 0 && msg->payload) {
|
||||||
|
new_msg->payload = malloc(msg->payload_bytes + 1);
|
||||||
memcpy(new_msg->payload, msg->payload, msg->payload_bytes);
|
memcpy(new_msg->payload, msg->payload, msg->payload_bytes);
|
||||||
|
*(new_msg->payload + msg->payload_bytes) = '\0';
|
||||||
}
|
}
|
||||||
|
|
||||||
return new_msg;
|
return new_msg;
|
||||||
@ -1562,7 +1610,6 @@ SWITCH_DECLARE(switch_msrp_msg_t *) switch_msrp_msg_dup(switch_msrp_msg_t *msg)
|
|||||||
|
|
||||||
SWITCH_DECLARE(void) switch_msrp_msg_destroy(switch_msrp_msg_t **msg)
|
SWITCH_DECLARE(void) switch_msrp_msg_destroy(switch_msrp_msg_t **msg)
|
||||||
{
|
{
|
||||||
|
|
||||||
switch_msrp_msg_t *msrp_msg = *msg;
|
switch_msrp_msg_t *msrp_msg = *msg;
|
||||||
if (msrp_msg->headers) {
|
if (msrp_msg->headers) {
|
||||||
switch_event_destroy(&msrp_msg->headers);
|
switch_event_destroy(&msrp_msg->headers);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user