[mod_verto] move websocket implementation to libks

This commit is contained in:
Anthony Minessale 2020-03-25 03:49:19 +04:00 committed by Andrey Volk
parent e237d08073
commit ed98516666
8 changed files with 195 additions and 1425 deletions

View File

@ -2,6 +2,7 @@
<settings>
<param name="debug" value="0"/>
<!-- <param name="kslog" value="true"/> -->
<!-- seconds to wait before hanging up a disconnected channel -->
<!-- <param name="detach-timeout-sec" value="120"/> -->
<!-- enable broadcasting all FreeSWITCH events in Verto -->

View File

@ -2,10 +2,10 @@ include $(top_srcdir)/build/modmake.rulesam
MODNAME=mod_verto
mod_LTLIBRARIES = mod_verto.la
mod_verto_la_SOURCES = mod_verto.c ws.c mcast/mcast.c
mod_verto_la_CFLAGS = -D__EXTENSIONS__ -D_GNU_SOURCE $(AM_CFLAGS)
mod_verto_la_SOURCES = mod_verto.c mcast/mcast.c
mod_verto_la_CFLAGS = -D__EXTENSIONS__ -D_GNU_SOURCE $(AM_CFLAGS) $(KS_CFLAGS)
mod_verto_la_CPPFLAGS = -I. -I$(switch_srcdir)/src/mod/endpoints/mod_verto/mcast
mod_verto_la_LIBADD = $(switch_builddir)/libfreeswitch.la
mod_verto_la_LIBADD = $(switch_builddir)/libfreeswitch.la $(KS_LIBS)
mod_verto_la_LDFLAGS = -avoid-version -module -no-undefined -shared
if HAVE_PERL

View File

@ -47,6 +47,7 @@
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<Import Project="..\..\..\..\w32\openssl.props" />
<Import Project="$(SolutionDir)\w32\libks.props" Condition=" '$(libksPropsImported)' == '' " />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="PropertySheets">
@ -135,12 +136,6 @@
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="mod_verto.c" />
<ClCompile Include="ws.c">
<DisableSpecificWarnings Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">6386;4267;4244;6031;6340;6246;6011;6387;%(DisableSpecificWarnings)</DisableSpecificWarnings>
<DisableSpecificWarnings Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">6386;4267;4244;6031;6340;6246;6011;6387;%(DisableSpecificWarnings)</DisableSpecificWarnings>
<DisableSpecificWarnings Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">6386;4267;4244;6031;6340;6246;6011;6387;%(DisableSpecificWarnings)</DisableSpecificWarnings>
<DisableSpecificWarnings Condition="'$(Configuration)|$(Platform)'=='Release|x64'">6386;4267;4244;6031;6340;6246;6011;6387;%(DisableSpecificWarnings)</DisableSpecificWarnings>
</ClCompile>
<ClCompile Include="mcast\mcast.c" />
<ClCompile Include="mcast\mcast_cpp.cpp" />
</ItemGroup>

View File

@ -33,7 +33,6 @@
#include <switch_json.h>
#include <switch_stun.h>
/* Prototypes */
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_verto_shutdown);
SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load);
@ -41,11 +40,11 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_verto_runtime);
SWITCH_MODULE_DEFINITION(mod_verto, mod_verto_load, mod_verto_shutdown, mod_verto_runtime);
#define HTTP_CHUNK_SIZE 1024 * 32
#define EP_NAME "verto.rtc"
//#define WSS_STANDALONE 1
#include "ws.h"
#include "ks.h"
//////////////////////////
#include <mod_verto.h>
#ifndef WIN32
#include <sys/param.h>
@ -151,21 +150,21 @@ static void verto_deinit_ssl(verto_profile_t *profile)
}
}
static void close_file(ws_socket_t *sock)
static void close_file(ks_socket_t *sock)
{
if (*sock != ws_sock_invalid) {
if (*sock != KS_SOCK_INVALID) {
#ifndef WIN32
close(*sock);
#else
closesocket(*sock);
#endif
*sock = ws_sock_invalid;
*sock = KS_SOCK_INVALID;
}
}
static void close_socket(ws_socket_t *sock)
static void close_socket(ks_socket_t *sock)
{
if (*sock != ws_sock_invalid) {
if (*sock != KS_SOCK_INVALID) {
shutdown(*sock, 2);
close_file(sock);
}
@ -608,7 +607,7 @@ static switch_ssize_t ws_write_json(jsock_t *jsock, cJSON **json, switch_bool_t
free(log_text);
}
switch_mutex_lock(jsock->write_mutex);
r = ws_write_frame(&jsock->ws, WSOC_TEXT, json_text, strlen(json_text));
r = kws_write_frame(jsock->ws, WSOC_TEXT, json_text, strlen(json_text));
switch_mutex_unlock(jsock->write_mutex);
switch_safe_free(json_text);
}
@ -1480,52 +1479,36 @@ static void jsock_check_event_queue(jsock_t *jsock)
/* DO NOT use this unless you know what you are doing, you are WARNNED!!! */
static uint8_t *http_stream_read(switch_stream_handle_t *handle, int *len)
{
switch_http_request_t *r = (switch_http_request_t *) handle->data;
kws_request_t *r = (kws_request_t *) handle->data;
jsock_t *jsock = r->user_data;
wsh_t *wsh = &jsock->ws;
kws_t *wsh = jsock->ws;
uint8_t *buffer = NULL;
if (!jsock->profile->running) {
*len = 0;
return NULL;
}
*len = (int)(r->bytes_buffered - r->bytes_read);
if (*len > 0) { // we already read part of the body
uint8_t *data = (uint8_t *)wsh->buffer + r->bytes_read;
r->bytes_read = r->bytes_buffered;
return data;
}
if (r->content_length && (r->bytes_read - r->bytes_header) >= r->content_length) {
*len = HTTP_CHUNK_SIZE;
if ((*len = (int)kws_read_buffer(wsh, &buffer, *len, 1)) < 0) {
*len = 0;
return NULL;
}
*len = (int)(r->content_length - (r->bytes_read - r->bytes_header));
*len = *len > sizeof(wsh->buffer) ? wsh->buflen : *len;
if ((*len = (int)ws_raw_read(wsh, wsh->buffer, *len, wsh->block)) < 0) {
*len = 0;
return NULL;
}
r->bytes_read += *len;
return (uint8_t *)wsh->buffer;
return buffer;
}
static switch_status_t http_stream_raw_write(switch_stream_handle_t *handle, uint8_t *data, switch_size_t datalen)
{
switch_http_request_t *r = (switch_http_request_t *) handle->data;
kws_request_t *r = (kws_request_t *) handle->data;
jsock_t *jsock = r->user_data;
return ws_raw_write(&jsock->ws, data, (uint32_t)datalen) ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE;
return kws_raw_write(jsock->ws, data, (uint32_t)datalen) ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE;
}
static switch_status_t http_stream_write(switch_stream_handle_t *handle, const char *fmt, ...)
{
switch_http_request_t *r = (switch_http_request_t *) handle->data;
kws_request_t *r = (kws_request_t *) handle->data;
jsock_t *jsock = r->user_data;
int ret = 1;
char *data;
@ -1537,7 +1520,7 @@ static switch_status_t http_stream_write(switch_stream_handle_t *handle, const c
if (data) {
if (ret) {
ret =(int) ws_raw_write(&jsock->ws, data, (uint32_t)strlen(data));
ret =(int) kws_raw_write(jsock->ws, data, (uint32_t)strlen(data));
}
switch_safe_free(data);
}
@ -1545,7 +1528,7 @@ static switch_status_t http_stream_write(switch_stream_handle_t *handle, const c
return ret ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE;
}
static void http_static_handler(switch_http_request_t *request, verto_vhost_t *vhost)
static void http_static_handler(kws_request_t *request, verto_vhost_t *vhost)
{
jsock_t *jsock = request->user_data;
char path[512];
@ -1553,11 +1536,16 @@ static void http_static_handler(switch_http_request_t *request, verto_vhost_t *v
char *ext;
uint8_t chunk[4096];
const char *mime_type = "text/html", *new_type;
switch_time_exp_t tm;
char date[80] = "";
switch_time_t ts = switch_micro_time_now();
switch_time_exp_lt(&tm, ts);
switch_rfc822_date(date, ts);
if (strncmp(request->method, "GET", 3) && strncmp(request->method, "HEAD", 4)) {
char *data = "HTTP/1.1 415 Method Not Allowed\r\n"
"Content-Length: 0\r\n\r\n";
ws_raw_write(&jsock->ws, data, strlen(data));
kws_raw_write(jsock->ws, data, strlen(data));
return;
}
@ -1587,12 +1575,12 @@ static void http_static_handler(switch_http_request_t *request, verto_vhost_t *v
"Server: FreeSWITCH-%s-mod_verto\r\n"
"Content-Type: %s\r\n"
"Content-Length: %" SWITCH_SIZE_T_FMT "\r\n\r\n",
switch_event_get_header(request->headers, "Event-Date-GMT"),
date,
switch_version_full(),
mime_type,
flen);
ws_raw_write(&jsock->ws, chunk, strlen((char *)chunk));
kws_raw_write(jsock->ws, chunk, strlen((char *)chunk));
for (;;) {
switch_status_t status;
@ -1604,42 +1592,49 @@ static void http_static_handler(switch_http_request_t *request, verto_vhost_t *v
break;
}
ws_raw_write(&jsock->ws, chunk, flen);
kws_raw_write(jsock->ws, chunk, flen);
}
switch_file_close(fd);
} else {
char *data = "HTTP/1.1 404 Not Found\r\n"
"Content-Length: 0\r\n\r\n";
ws_raw_write(&jsock->ws, data, strlen(data));
kws_raw_write(jsock->ws, data, strlen(data));
}
}
static void request_headers_to_event(switch_event_t *event, kws_request_t *request)
{
int i;
for (i = 0; i < KWS_MAX_HEADERS; i++) {
if (!request->headers_k[i]) break;
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, request->headers_k[i], request->headers_v[i]);
}
}
static void http_run(jsock_t *jsock)
{
switch_http_request_t request = { 0 };
kws_request_t *request = NULL;
switch_stream_handle_t stream = { 0 };
char *err = NULL;
char *ext;
verto_vhost_t *vhost;
switch_bool_t keepalive;
ks_bool_t keepalive;
new_req:
request.user_data = jsock;
if (switch_event_create(&stream.param_event, SWITCH_EVENT_CHANNEL_DATA) != SWITCH_STATUS_SUCCESS) {
goto err;
}
request.headers = stream.param_event;
if (switch_http_parse_header(jsock->ws.buffer, (uint32_t)jsock->ws.datalen, &request) != SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&stream.param_event);
if (kws_parse_header(jsock->ws, &request) != KS_STATUS_SUCCESS) {
goto err;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s [%4" SWITCH_SIZE_T_FMT "] %s\n", jsock->name, jsock->ws.datalen, request.uri);
request->user_data = jsock;
if (!strncmp(request.method, "OPTIONS", 7)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s %s\n", jsock->name, request->uri);
if (!strncmp(request->method, "OPTIONS", 7)) {
char data[512];
switch_snprintf(data, sizeof(data),
"HTTP/1.1 200 OK\r\n"
@ -1647,23 +1642,32 @@ new_req:
"Date: %s\r\n"
"Allow: HEAD,GET,POST,PUT,DELETE,PATCH,OPTIONS\r\n"
"Server: FreeSWITCH-%s-mod_verto\r\n\r\n",
switch_event_get_header(request.headers, "Event-Date-GMT"),
switch_event_get_header(stream.param_event, "Event-Date-GMT"),
switch_version_full());
ws_raw_write(&jsock->ws, data, strlen(data));
kws_raw_write(jsock->ws, data, strlen(data));
goto done;
}
if (!strncmp(request.method, "POST", 4) && request.content_length && request.content_type &&
!strncmp(request.content_type, "application/x-www-form-urlencoded", 33)) {
if (request->content_length && request->content_length > 5l * 1024 * 1024 * 1024 - 1) {
char *data = "HTTP/1.1 413 Request Entity Too Large\r\n"
"Content-Length: 0\r\n\r\n";
kws_raw_write(jsock->ws, data, strlen(data));
request->keepalive = 0;
goto done;
}
if (!strncmp(request->method, "POST", 4) && request->content_length && request->content_type &&
!strncmp(request->content_type, "application/x-www-form-urlencoded", 33)) {
char *buffer = NULL;
switch_ssize_t len = 0, bytes = 0;
if (request.content_length > 2 * 1024 * 1024 - 1) {
if (request->content_length && request->content_length > 10 * 1024 * 1024 - 1) {
char *data = "HTTP/1.1 413 Request Entity Too Large\r\n"
"Content-Length: 0\r\n\r\n";
ws_raw_write(&jsock->ws, data, strlen(data));
kws_raw_write(jsock->ws, data, strlen(data));
request->keepalive = 0;
goto done;
}
@ -1671,14 +1675,12 @@ new_req:
goto request_err;
}
if ((bytes = request.bytes_buffered - request.bytes_read) > 0) {
memcpy(buffer, jsock->ws.buffer + request.bytes_read, bytes);
}
while(bytes < (switch_ssize_t)request->content_length) {
len = request->content_length - bytes;
while(bytes < (switch_ssize_t)request.content_length) {
len = request.content_length - bytes;
#define WS_BLOCK 1
if ((len = ws_raw_read(&jsock->ws, buffer + bytes, len, jsock->ws.block)) < 0) {
if ((len = kws_raw_read(jsock->ws, buffer + bytes, len, WS_BLOCK)) < 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Read error %" SWITCH_SSIZE_T_FMT"\n", len);
goto done;
}
@ -1688,19 +1690,20 @@ new_req:
*(buffer + bytes) = '\0';
switch_http_parse_qs(&request, buffer);
kws_parse_qs(request, buffer);
free(buffer);
}
// switch_http_dump_request(&request);
// kws_request_dump(request);
stream.data = &request;
stream.data = request;
stream.read_function = http_stream_read;
stream.write_function = http_stream_write;
stream.raw_write_function = http_stream_raw_write;
request_headers_to_event(stream.param_event, request);
switch_event_add_header_string(request.headers, SWITCH_STACK_BOTTOM, "Request-Method", request.method);
switch_event_add_header_string(request.headers, SWITCH_STACK_BOTTOM, "HTTP-Request-URI", request.uri);
switch_event_add_header_string(stream.param_event, SWITCH_STACK_BOTTOM, "Request-Method", request->method);
switch_event_add_header_string(stream.param_event, SWITCH_STACK_BOTTOM, "HTTP-Request-URI", request->uri);
if (!jsock->profile->vhosts) goto err;
@ -1711,11 +1714,11 @@ new_req:
int code = CODE_AUTH_REQUIRED;
char message[128] = "Authentication Required";
cJSON *params = NULL;
char *www_auth;
const char *www_auth;
char auth_buffer[512];
char *auth_user = NULL, *auth_pass = NULL;
www_auth = switch_event_get_header(request.headers, "Authorization");
www_auth = request->authorization;
if (zstr(www_auth)) {
switch_snprintf(auth_buffer, sizeof(auth_buffer),
@ -1723,7 +1726,7 @@ new_req:
"WWW-Authenticate: Basic realm=\"%s\"\r\n"
"Content-Length: 0\r\n\r\n",
vhost->auth_realm);
ws_raw_write(&jsock->ws, auth_buffer, strlen(auth_buffer));
kws_raw_write(jsock->ws, auth_buffer, strlen(auth_buffer));
goto done;
}
@ -1758,7 +1761,7 @@ new_req:
"WWW-Authenticate: Basic realm=\"%s\"\r\n"
"Content-Length: 0\r\n\r\n",
vhost->auth_realm);
ws_raw_write(&jsock->ws, auth_buffer, strlen(auth_buffer));
kws_raw_write(jsock->ws, auth_buffer, strlen(auth_buffer));
cJSON_Delete(params);
goto done;
} else {
@ -1767,7 +1770,7 @@ new_req:
authed:
switch_set_flag(jsock, JPFLAG_AUTHED);
switch_event_add_header_string(request.headers, SWITCH_STACK_BOTTOM, "HTTP-USER", auth_user);
switch_event_add_header_string(stream.param_event, SWITCH_STACK_BOTTOM, "HTTP-USER", auth_user);
}
if (vhost->rewrites) {
@ -1779,10 +1782,10 @@ authed:
while(rule) {
char *expression = rule->name;
if ((proceed = switch_regex_perform(request.uri, expression, &re, ovector, sizeof(ovector) / sizeof(ovector[0])))) {
if ((proceed = switch_regex_perform(request->uri, expression, &re, ovector, sizeof(ovector) / sizeof(ovector[0])))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
"%d request [%s] matched expr [%s]\n", proceed, request.uri, expression);
request.uri = rule->value;
"%d request [%s] matched expr [%s]\n", proceed, request->uri, expression);
request->uri = rule->value;
break;
}
@ -1790,79 +1793,48 @@ authed:
}
}
switch_event_add_header_string(request.headers, SWITCH_STACK_BOTTOM, "HTTP-URI", request.uri);
switch_event_add_header_string(stream.param_event, SWITCH_STACK_BOTTOM, "HTTP-URI", request->uri);
if ((ext = strrchr(request.uri, '.'))) {
if ((ext = strrchr(request->uri, '.'))) {
char path[1024];
if (!strncmp(ext, ".lua", 4)) {
switch_snprintf(path, sizeof(path), "%s%s", vhost->script_root, request.uri);
switch_snprintf(path, sizeof(path), "%s%s", vhost->script_root, request->uri);
switch_api_execute("lua", path, NULL, &stream);
} else {
http_static_handler(&request, vhost);
http_static_handler(request, vhost);
}
} else {
http_static_handler(&request, vhost);
http_static_handler(request, vhost);
}
done:
keepalive = request.keepalive;
switch_http_free_request(&request);
keepalive = request->keepalive;
kws_request_free(&request);
if (keepalive) {
wsh_t *wsh = &jsock->ws;
memset(&request, 0, sizeof(request));
wsh->datalen = 0;
*wsh->buffer = '\0';
kws_t *wsh = jsock->ws;
while(jsock->profile->running) {
int pflags;
if (wsh->ssl && SSL_pending(wsh->ssl) > 0) {
pflags = SWITCH_POLL_READ;
} else {
pflags = switch_wait_sock(jsock->client_socket, 3000, SWITCH_POLL_READ | SWITCH_POLL_ERROR | SWITCH_POLL_HUP);
}
int pflags = kws_wait_sock(wsh, 3000, KS_POLL_READ | KS_POLL_ERROR | KS_POLL_HUP);
if (jsock->drop) { die("%s Dropping Connection\n", jsock->name); }
if (pflags < 0 && (errno != EINTR)) { die_errnof("%s POLL FAILED with %d", jsock->name, pflags); }
if (pflags == 0) { /* keepalive socket poll timeout */ break; }
if (pflags > 0 && (pflags & SWITCH_POLL_HUP)) { log_and_exit(SWITCH_LOG_INFO, "%s POLL HANGUP DETECTED (peer closed its end of socket)\n", jsock->name); }
if (pflags > 0 && (pflags & SWITCH_POLL_ERROR)) { die("%s POLL ERROR\n", jsock->name); }
if (pflags > 0 && (pflags & SWITCH_POLL_INVALID)) { die("%s POLL INVALID SOCKET (not opened or already closed)\n", jsock->name); }
if (pflags > 0 && (pflags & SWITCH_POLL_READ)) {
ssize_t bytes;
bytes = ws_raw_read(wsh, wsh->buffer + wsh->datalen, wsh->buflen - wsh->datalen - 1, wsh->block);
if (bytes < 0) {
die("%s BAD READ %" SWITCH_SIZE_T_FMT "\n", jsock->name, bytes);
break;
}
if (bytes == 0) {
bytes = ws_raw_read(wsh, wsh->buffer + wsh->datalen, wsh->buflen - wsh->datalen - 1, wsh->block);
if (bytes < 0) {
die("%s BAD READ %" SWITCH_SIZE_T_FMT "\n", jsock->name, bytes);
break;
}
if (bytes == 0) { // socket broken ?
break;
}
}
wsh->datalen += bytes;
*(wsh->buffer + wsh->datalen) = '\0';
if (strstr(wsh->buffer, "\r\n\r\n") || strstr(wsh->buffer, "\n\n")) {
if (pflags > 0 && (pflags & KS_POLL_HUP)) { log_and_exit(SWITCH_LOG_INFO, "%s POLL HANGUP DETECTED (peer closed its end of socket)\n", jsock->name); }
if (pflags > 0 && (pflags & KS_POLL_ERROR)) { die("%s POLL ERROR\n", jsock->name); }
if (pflags > 0 && (pflags & KS_POLL_INVALID)) { die("%s POLL INVALID SOCKET (not opened or already closed)\n", jsock->name); }
if (pflags > 0 && (pflags & KS_POLL_READ)) {
if (kws_keepalive(wsh) == KS_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "socket %s is going to handle a new request\n", jsock->name);
goto new_req;
} else {
// switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Read Error\n");
break;
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "unhandled flag: %d\n", pflags);
}
}
}
@ -1870,12 +1842,12 @@ done:
return;
request_err:
switch_http_free_request(&request);
kws_request_free(&request);
err:
err = "HTTP/1.1 500 Internal Server Error\r\n"
"Content-Length: 0\r\n\r\n";
ws_raw_write(&jsock->ws, err, strlen(err));
kws_raw_write(jsock->ws, err, strlen(err));
error:
return;
@ -1883,40 +1855,46 @@ error:
static void client_run(jsock_t *jsock)
{
if (ws_init(&jsock->ws, jsock->client_socket, (jsock->ptype & PTYPE_CLIENT_SSL) ? jsock->profile->ssl_ctx : NULL, 0, 1, !!jsock->profile->vhosts) < 0) {
int flags = KWS_BLOCK;
if (jsock->profile->vhosts) {
http_run(jsock);
ws_close(&jsock->ws, WS_NONE);
goto error;
} else {
flags |= KWS_STAY_OPEN;
flags |= KWS_HTTP;
}
ks_pool_open(&jsock->kpool);
if (kws_init(&jsock->ws, jsock->client_socket, (jsock->ptype & PTYPE_CLIENT_SSL) ? jsock->profile->ssl_ctx : NULL, 0, flags, jsock->kpool) != KS_STATUS_SUCCESS) {
log_and_exit(SWITCH_LOG_NOTICE, "%s WS SETUP FAILED\n", jsock->name);
}
if (kws_test_flag(jsock->ws, KWS_HTTP)) {
http_run(jsock);
kws_close(jsock->ws, WS_NONE);
goto end;
}
while(jsock->profile->running) {
int pflags;
if (jsock->ws.ssl && SSL_pending(jsock->ws.ssl) > 0) {
pflags = SWITCH_POLL_READ;
} else {
pflags = switch_wait_sock(jsock->client_socket, 50, SWITCH_POLL_READ | SWITCH_POLL_ERROR | SWITCH_POLL_HUP);
}
if (!jsock->ws) { die("%s Setup Error\n", jsock->name); }
pflags = kws_wait_sock(jsock->ws, 50, KS_POLL_READ | KS_POLL_ERROR | KS_POLL_HUP);
if (jsock->drop) { die("%s Dropping Connection\n", jsock->name); }
if (pflags < 0 && (errno != EINTR)) { die_errnof("%s POLL FAILED with %d", jsock->name, pflags); }
if (pflags == 0) {/* socket poll timeout */ jsock_check_event_queue(jsock); }
if (pflags > 0 && (pflags & SWITCH_POLL_HUP)) { log_and_exit(SWITCH_LOG_INFO, "%s POLL HANGUP DETECTED (peer closed its end of socket)\n", jsock->name); }
if (pflags > 0 && (pflags & SWITCH_POLL_ERROR)) { die("%s POLL ERROR\n", jsock->name); }
if (pflags > 0 && (pflags & SWITCH_POLL_INVALID)) { die("%s POLL INVALID SOCKET (not opened or already closed)\n", jsock->name); }
if (pflags > 0 && (pflags & SWITCH_POLL_READ)) {
if (pflags > 0 && (pflags & KS_POLL_HUP)) { log_and_exit(SWITCH_LOG_INFO, "%s POLL HANGUP DETECTED (peer closed its end of socket)\n", jsock->name); }
if (pflags > 0 && (pflags & KS_POLL_ERROR)) { die("%s POLL ERROR\n", jsock->name); }
if (pflags > 0 && (pflags & KS_POLL_INVALID)) { die("%s POLL INVALID SOCKET (not opened or already closed)\n", jsock->name); }
if (pflags > 0 && (pflags & KS_POLL_READ)) {
switch_ssize_t bytes;
ws_opcode_t oc;
kws_opcode_t oc;
uint8_t *data;
bytes = ws_read_frame(&jsock->ws, &oc, &data);
bytes = kws_read_frame(jsock->ws, &oc, &data);
if (bytes < 0) {
if (bytes == -WS_RECV_CLOSE) {
if (bytes == -1000) {
log_and_exit(SWITCH_LOG_INFO, "%s Client sent close request\n", jsock->name);
} else {
die("%s BAD READ %" SWITCH_SSIZE_T_FMT "\n", jsock->name, bytes);
@ -1945,7 +1923,7 @@ static void client_run(jsock_t *jsock)
a = switch_time_now();
do {
bytes = ws_read_frame(&jsock->ws, &oc, &data);
bytes = kws_read_frame(jsock->ws, &oc, &data);
s = (char *) data;
} while (bytes && data && s[0] == '#' && s[3] == 'B');
b = switch_time_now();
@ -1955,7 +1933,7 @@ static void client_run(jsock_t *jsock)
if (s[0] != '#') goto nm;
switch_snprintf(repl, sizeof(repl), "#SPU %ld", (long)((b - a) / 1000));
ws_write_frame(&jsock->ws, WSOC_TEXT, repl, strlen(repl));
kws_write_frame(jsock->ws, WSOC_TEXT, repl, strlen(repl));
loops = size / 1024;
rem = size % 1024;
switch_snprintf(repl, sizeof(repl), "#SPB ");
@ -1965,10 +1943,10 @@ static void client_run(jsock_t *jsock)
int ddur = 0;
a = switch_time_now();
for (i = 0; i < loops; i++) {
ws_write_frame(&jsock->ws, WSOC_TEXT, repl, 1024);
kws_write_frame(jsock->ws, WSOC_TEXT, repl, 1024);
}
if (rem) {
ws_write_frame(&jsock->ws, WSOC_TEXT, repl, rem);
kws_write_frame(jsock->ws, WSOC_TEXT, repl, rem);
}
b = switch_time_now();
ddur += (int)((b - a) / 1000);
@ -1979,7 +1957,7 @@ static void client_run(jsock_t *jsock)
dur /= j+1;
switch_snprintf(repl, sizeof(repl), "#SPD %d", dur);
ws_write_frame(&jsock->ws, WSOC_TEXT, repl, strlen(repl));
kws_write_frame(jsock->ws, WSOC_TEXT, repl, strlen(repl));
}
}
@ -2001,10 +1979,10 @@ static void client_run(jsock_t *jsock)
}
error:
end:
detach_jsock(jsock);
ws_destroy(&jsock->ws);
kws_destroy(&jsock->ws);
ks_pool_close(&jsock->kpool);
return;
}
@ -2049,7 +2027,7 @@ static void *SWITCH_THREAD_FUNC client_thread(switch_thread_t *thread, void *obj
switch_event_destroy(&jsock->vars);
switch_event_destroy(&jsock->user_vars);
if (jsock->client_socket != ws_sock_invalid) {
if (jsock->client_socket != KS_SOCK_INVALID) {
close_socket(&jsock->client_socket);
}
@ -4105,7 +4083,7 @@ static switch_bool_t verto__broadcast_func(const char *method, cJSON *params, js
switch_event_channel_broadcast(event_channel, &jevent, modname, verto_globals.event_channel_id);
}
if (jsock->profile->mcast_pub.sock != ws_sock_invalid) {
if (jsock->profile->mcast_pub.sock != KS_SOCK_INVALID) {
if ((json_text = cJSON_PrintUnformatted(params))) {
if (mcast_socket_send(&jsock->profile->mcast_pub, json_text, strlen(json_text) + 1) <= 0) {
@ -4257,7 +4235,7 @@ static void jrpc_init(void)
static int start_jsock(verto_profile_t *profile, ws_socket_t sock, int family)
static int start_jsock(verto_profile_t *profile, ks_socket_t sock, int family)
{
jsock_t *jsock = NULL;
int flag = 1;
@ -4369,7 +4347,7 @@ static int start_jsock(verto_profile_t *profile, ws_socket_t sock, int family)
error:
if (jsock) {
if (jsock->client_socket != ws_sock_invalid) {
if (jsock->client_socket != KS_SOCK_INVALID) {
close_socket(&jsock->client_socket);
}
@ -4379,9 +4357,9 @@ static int start_jsock(verto_profile_t *profile, ws_socket_t sock, int family)
return -1;
}
static ws_socket_t prepare_socket(ips_t *ips)
static ks_socket_t prepare_socket(ips_t *ips)
{
ws_socket_t sock = ws_sock_invalid;
ks_socket_t sock = KS_SOCK_INVALID;
#ifndef WIN32
int reuse_addr = 1;
#else
@ -4435,14 +4413,14 @@ static ws_socket_t prepare_socket(ips_t *ips)
close_file(&sock);
return ws_sock_invalid;
return KS_SOCK_INVALID;
}
static void handle_mcast_sub(verto_profile_t *profile)
{
int bytes;
if (profile->mcast_sub.sock == ws_sock_invalid) {
if (profile->mcast_sub.sock == KS_SOCK_INVALID) {
return;
}
@ -4580,7 +4558,7 @@ static void runtime(verto_profile_t *profile)
for (i = 0; i < profile->i; i++) {
//if ((profile->server_socket[i] = prepare_socket(profile->ip[i].local_ip_addr, profile->ip[i].local_port)) < 0) {
if ((profile->server_socket[i] = prepare_socket(&profile->ip[i])) != ws_sock_invalid) {
if ((profile->server_socket[i] = prepare_socket(&profile->ip[i])) != KS_SOCK_INVALID) {
listeners++;
}
}
@ -4617,11 +4595,11 @@ static void runtime(verto_profile_t *profile)
error:
if (profile->mcast_sub.sock != ws_sock_invalid) {
if (profile->mcast_sub.sock != KS_SOCK_INVALID) {
mcast_socket_close(&profile->mcast_sub);
}
if (profile->mcast_pub.sock != ws_sock_invalid) {
if (profile->mcast_pub.sock != KS_SOCK_INVALID) {
mcast_socket_close(&profile->mcast_pub);
}
@ -4805,8 +4783,8 @@ static switch_status_t parse_config(const char *cf)
profile->local_network = "localnet.auto";
profile->mcast_sub.sock = ws_sock_invalid;
profile->mcast_pub.sock = ws_sock_invalid;
profile->mcast_sub.sock = KS_SOCK_INVALID;
profile->mcast_pub.sock = KS_SOCK_INVALID;
for (param = switch_xml_child(xprofile, "param"); param; param = param->next) {
@ -5053,6 +5031,10 @@ static switch_status_t parse_config(const char *cf)
if (tmp > 0) {
verto_globals.detach_timeout = tmp;
}
} else if (!strcasecmp(var, "kslog")) {
if (val) {
verto_globals.kslog_on = switch_true(val);
}
}
}
}
@ -5119,7 +5101,7 @@ static switch_status_t cmd_status(char **argv, int argc, switch_stream_handle_t
for (i = 0; i < profile->i; i++) {
char *tmpurl = switch_mprintf(strchr(profile->ip[i].local_ip, ':') ? "%s:[%s]:%d" : "%s:%s:%d",
(profile->ip[i].secure == 1) ? "wss" : "ws", profile->ip[i].local_ip, profile->ip[i].local_port);
stream->write_function(stream, "%25s\t%s\t %40s\t%s\n", profile->name, "profile", tmpurl, (profile->server_socket[i] != ws_sock_invalid) ? "RUNNING" : "DOWN");
stream->write_function(stream, "%25s\t%s\t %40s\t%s\n", profile->name, "profile", tmpurl, (profile->server_socket[i] != KS_SOCK_INVALID) ? "RUNNING" : "DOWN");
switch_safe_free(tmpurl);
}
cp++;
@ -6178,6 +6160,25 @@ static void event_handler(switch_event_t *event)
}
static void mod_verto_ks_logger(const char *file, const char *func, int line, int level, const char *fmt, ...)
{
char fmt_buf[32768];
va_list ap;
size_t len;
va_start(ap, fmt);
len = snprintf(fmt_buf, sizeof(fmt_buf), "%s\n", fmt); // add return that is missing
if (level == SWITCH_LOG_DEBUG) level = SWITCH_LOG_DEBUG1;
if (len < sizeof(fmt_buf)) {
switch_log_vprintf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, level, fmt_buf, ap);
} else {
switch_log_vprintf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, level, fmt, ap);
}
va_end(ap);
}
/* Macro expands to: switch_status_t mod_verto_load(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) */
SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
{
@ -6188,6 +6189,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
switch_cache_db_handle_t *dbh;
//switch_application_interface_t *app_interface = NULL;
ks_ssl_init_skip(KS_TRUE);
ks_init();
if (switch_event_reserve_subclass(MY_EVENT_LOGIN) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't register subclass %s!\n", MY_EVENT_LOGIN);
@ -6251,6 +6254,11 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
if (r) return SWITCH_STATUS_TERM;
if (verto_globals.kslog_on == SWITCH_TRUE) {
ks_global_set_logger(mod_verto_ks_logger);
ks_log(KS_LOG_INFO, "ks log registered in mod_verto\n");
}
jrpc_init();
/* connect my internal structure to the blank pointer passed to me */
@ -6315,6 +6323,9 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_verto_shutdown)
switch_core_hash_destroy(&verto_globals.event_channel_hash);
switch_core_hash_destroy(&verto_globals.jsock_hash);
ks_global_set_logger(NULL);
ks_shutdown();
return SWITCH_STATUS_SUCCESS;
}
@ -6340,7 +6351,6 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_verto_runtime)
return SWITCH_STATUS_TERM;
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -62,6 +62,8 @@
#include <openssl/ssl.h>
#include "mcast.h"
#include "ks.h"
#define MAX_QUEUE_LEN 100000
#define MAX_MISSED 500
@ -99,10 +101,11 @@ typedef enum {
struct verto_profile_s;
struct jsock_s {
ws_socket_t client_socket;
ks_socket_t client_socket;
switch_memory_pool_t *pool;
switch_thread_t *thread;
wsh_t ws;
ks_pool_t *kpool;
kws_t *ws;
unsigned char buf[65535];
char *name;
jsock_type_t ptype;
@ -114,7 +117,7 @@ struct jsock_s {
uint8_t drop;
uint8_t nodelete;
ws_socket_t local_sock;
ks_socket_t local_sock;
SSL *ssl;
jpflag_t flags;
@ -230,7 +233,7 @@ struct verto_profile_s {
jsock_t *jsock_head;
int jsock_count;
ws_socket_t server_socket[MAX_BIND];
ks_socket_t server_socket[MAX_BIND];
int running;
int ssl_ready;
@ -307,6 +310,7 @@ struct globals_s {
int profile_threads;
int enable_presence;
int enable_fs_events;
switch_bool_t kslog_on;
switch_hash_t *jsock_hash;
switch_mutex_t *jsock_mutex;

File diff suppressed because it is too large Load Diff

View File

@ -1,143 +0,0 @@
#ifndef _WS_H
#define _WS_H
//#define WSS_STANDALONE 1
#define WEBSOCKET_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
#define B64BUFFLEN 1024
#include <sys/types.h>
#ifndef _MSC_VER
#include <arpa/inet.h>
#include <sys/wait.h>
#include <sys/socket.h>
#include <unistd.h>
#else
#pragma warning(disable:4996)
#endif
#include <string.h>
#include <string.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
//#include "sha1.h"
#include <openssl/ssl.h>
#if defined(_MSC_VER) || defined(__APPLE__) || defined(__FreeBSD__) || (defined(__SVR4) && defined(__sun))
#define __bswap_64(x) \
x = (x>>56) | \
((x<<40) & 0x00FF000000000000) | \
((x<<24) & 0x0000FF0000000000) | \
((x<<8) & 0x000000FF00000000) | \
((x>>8) & 0x00000000FF000000) | \
((x>>24) & 0x0000000000FF0000) | \
((x>>40) & 0x000000000000FF00) | \
(x<<56)
#endif
#ifdef _MSC_VER
#ifndef strncasecmp
#define strncasecmp _strnicmp
#endif
#ifdef _WIN64
#define WS_SSIZE_T __int64
#elif _MSC_VER >= 1400
#define WS_SSIZE_T __int32 __w64
#else
#define WS_SSIZE_T __int32
#endif
typedef WS_SSIZE_T ssize_t;
#endif
struct ws_globals_s {
const SSL_METHOD *ssl_method;
SSL_CTX *ssl_ctx;
char cert[512];
char key[512];
};
//extern struct ws_globals_s ws_globals;
#ifndef WIN32
typedef int ws_socket_t;
#else
typedef SOCKET ws_socket_t;
#endif
#define ws_sock_invalid (ws_socket_t)-1
typedef enum {
WS_NONE = 0,
WS_RECV_CLOSE = 1000,
WS_PROTO_ERR = 1002,
WS_DATA_TOO_BIG = 1009
} ws_cause_t;
typedef enum {
WSOC_CONTINUATION = 0x0,
WSOC_TEXT = 0x1,
WSOC_BINARY = 0x2,
WSOC_CLOSE = 0x8,
WSOC_PING = 0x9,
WSOC_PONG = 0xA
} ws_opcode_t;
typedef struct wsh_s {
ws_socket_t sock;
char *buffer;
char *bbuffer;
char *body;
char *uri;
size_t buflen;
size_t bbuflen;
ssize_t datalen;
ssize_t wdatalen;
char *payload;
ssize_t plen;
ssize_t rplen;
ssize_t packetlen;
SSL *ssl;
int handshake;
uint8_t down;
int secure;
uint8_t close_sock;
SSL_CTX *ssl_ctx;
int block;
int sanity;
int secure_established;
int logical_established;
int stay_open;
int x;
void *write_buffer;
size_t write_buffer_len;
} wsh_t;
ssize_t ws_send_buf(wsh_t *wsh, ws_opcode_t oc);
ssize_t ws_feed_buf(wsh_t *wsh, void *data, size_t bytes);
ssize_t ws_raw_read(wsh_t *wsh, void *data, size_t bytes, int block);
ssize_t ws_raw_write(wsh_t *wsh, void *data, size_t bytes);
ssize_t ws_read_frame(wsh_t *wsh, ws_opcode_t *oc, uint8_t **data);
ssize_t ws_write_frame(wsh_t *wsh, ws_opcode_t oc, void *data, size_t bytes);
int ws_init(wsh_t *wsh, ws_socket_t sock, SSL_CTX *ssl_ctx, int close_sock, int block, int stay_open);
ssize_t ws_close(wsh_t *wsh, int16_t reason);
void ws_destroy(wsh_t *wsh);
void init_ssl(void);
void deinit_ssl(void);
int xp_errno(void);
int xp_is_blocking(int errcode);
#ifndef _MSC_VER
static inline uint64_t get_unaligned_uint64(const void *p)
{
const struct { uint64_t d; } __attribute__((packed)) *pp = p;
return pp->d;
}
#endif
#endif

View File

@ -67,7 +67,7 @@
<ItemDefinitionGroup>
<ClCompile>
<AdditionalIncludeDirectories>$(libksDir)\src\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories>$(libksDir)\src\include;$(libksDir)\src\include\libks;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>__PRETTY_FUNCTION__=__FUNCSIG__;WIN32;_WINDOWS;SWCLT_VERSION_MAJOR=1;SWCLT_VERSION_MINOR=0;SWCLT_VERSION_REVISION=0;_WIN32_WINNT=0x0600;_WINSOCK_DEPRECATED_NO_WARNINGS=1;WIN32_LEAN_AND_MEAN=1;KS_PLAT_WIN=1;NOMAXMIN=1;_CRT_SECURE_NO_WARNINGS=1;SWCLT_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile>
<Link>