FS-10167: Adjusted modules to utilize an isolated pool with auto cleanup per module, which also contains the implementation specific module data. Also changed the thread model of the listener for the wss module to utilize the thread pool, alleviating ownership issues during cleanup.

This commit is contained in:
Shane Bryldt 2017-04-18 17:02:34 -06:00
parent 21f5635037
commit 2e02f3b498
8 changed files with 96 additions and 91 deletions

View File

@ -41,7 +41,6 @@ struct blade_connection_s {
blade_transport_callbacks_t *transport_callbacks;
blade_connection_direction_t direction;
ks_thread_t *state_thread;
volatile blade_connection_state_t state;
const char *id;

View File

@ -41,43 +41,47 @@ struct blade_module_s {
blade_module_callbacks_t *module_callbacks;
};
static void blade_module_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
{
blade_module_t *bm = (blade_module_t *)ptr;
KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, void *module_data, blade_module_callbacks_t *module_callbacks)
ks_assert(bm);
switch (action) {
case KS_MPCL_ANNOUNCE:
break;
case KS_MPCL_TEARDOWN:
break;
case KS_MPCL_DESTROY:
break;
}
}
KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, ks_pool_t *pool, void *module_data, blade_module_callbacks_t *module_callbacks)
{
blade_module_t *bm = NULL;
ks_pool_t *pool = NULL;
ks_assert(bmP);
ks_assert(bh);
ks_assert(pool);
ks_assert(module_data);
ks_assert(module_callbacks);
pool = blade_handle_pool_get(bh);
bm = ks_pool_alloc(pool, sizeof(blade_module_t));
bm->handle = bh;
bm->pool = pool;
bm->module_data = module_data;
bm->module_callbacks = module_callbacks;
ks_assert(ks_pool_set_cleanup(pool, bm, NULL, blade_module_cleanup) == KS_STATUS_SUCCESS);
ks_log(KS_LOG_DEBUG, "Created\n");
*bmP = bm;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_module_destroy(blade_module_t **bmP)
{
blade_module_t *bm = NULL;
ks_assert(bmP);
ks_assert(*bmP);
bm = *bmP;
ks_pool_free(bm->pool, bmP);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(blade_handle_t *) blade_module_handle_get(blade_module_t *bm)
{
ks_assert(bm);

View File

@ -71,6 +71,24 @@ static blade_module_callbacks_t g_module_chat_callbacks =
};
static void blade_module_chat_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
{
blade_module_chat_t *bm_chat = (blade_module_chat_t *)ptr;
ks_assert(bm_chat);
switch (action) {
case KS_MPCL_ANNOUNCE:
break;
case KS_MPCL_TEARDOWN:
//ks_list_destroy(&bm_chat->participants);
blade_module_chat_on_shutdown(bm_chat->module);
break;
case KS_MPCL_DESTROY:
break;
}
}
ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handle_t *bh)
{
@ -80,7 +98,8 @@ ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handl
ks_assert(bm_chatP);
ks_assert(bh);
pool = blade_handle_pool_get(bh);
ks_pool_open(&pool);
ks_assert(pool);
bm_chat = ks_pool_alloc(pool, sizeof(blade_module_chat_t));
bm_chat->handle = bh;
@ -91,35 +110,36 @@ ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handl
ks_list_create(&bm_chat->participants, pool);
ks_assert(bm_chat->participants);
blade_module_create(&bm_chat->module, bh, bm_chat, &g_module_chat_callbacks);
blade_module_create(&bm_chat->module, bh, pool, bm_chat, &g_module_chat_callbacks);
bm_chat->module_callbacks = &g_module_chat_callbacks;
*bm_chatP = bm_chat;
ks_assert(ks_pool_set_cleanup(pool, bm_chat, NULL, blade_module_chat_cleanup) == KS_STATUS_SUCCESS);
ks_log(KS_LOG_DEBUG, "Created\n");
*bm_chatP = bm_chat;
return KS_STATUS_SUCCESS;
}
ks_status_t blade_module_chat_destroy(blade_module_chat_t **bm_chatP)
{
blade_module_chat_t *bm_chat = NULL;
ks_pool_t *pool = NULL;
ks_assert(bm_chatP);
ks_assert(*bm_chatP);
bm_chat = *bm_chatP;
blade_module_chat_on_shutdown(bm_chat->module);
ks_list_destroy(&bm_chat->participants);
blade_module_destroy(&bm_chat->module);
ks_pool_free(bm_chat->pool, bm_chatP);
pool = bm_chat->pool;
//ks_pool_free(bm_chat->pool, bm_chatP);
ks_pool_close(&pool);
ks_log(KS_LOG_DEBUG, "Destroyed\n");
*bm_chatP = NULL;
return KS_STATUS_SUCCESS;
}

View File

@ -42,7 +42,6 @@ typedef struct blade_transport_wss_s blade_transport_wss_t;
struct blade_module_wss_s {
blade_handle_t *handle;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
blade_module_t *module;
blade_module_callbacks_t *module_callbacks;
blade_transport_callbacks_t *transport_callbacks;
@ -53,9 +52,8 @@ struct blade_module_wss_s {
int32_t config_wss_endpoints_ipv6_length;
int32_t config_wss_endpoints_backlog;
ks_bool_t shutdown;
volatile ks_bool_t shutdown;
ks_thread_t *listeners_thread;
struct pollfd *listeners_poll;
int32_t listeners_count;
};
@ -86,7 +84,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data);
ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_t *pool, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id);
//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP);
ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id);
blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target);
@ -138,6 +135,22 @@ static blade_transport_callbacks_t g_transport_wss_callbacks =
};
static void blade_module_wss_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
{
blade_module_wss_t *bm_wss = (blade_module_wss_t *)ptr;
ks_assert(bm_wss);
switch (action) {
case KS_MPCL_ANNOUNCE:
break;
case KS_MPCL_TEARDOWN:
blade_module_wss_on_shutdown(bm_wss->module);
break;
case KS_MPCL_DESTROY:
break;
}
}
ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t *bh)
{
@ -147,17 +160,19 @@ ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t
ks_assert(bm_wssP);
ks_assert(bh);
pool = blade_handle_pool_get(bh);
ks_pool_open(&pool);
ks_assert(pool);
bm_wss = ks_pool_alloc(pool, sizeof(blade_module_wss_t));
bm_wss->handle = bh;
bm_wss->pool = pool;
bm_wss->tpool = blade_handle_tpool_get(bh);
blade_module_create(&bm_wss->module, bh, bm_wss, &g_module_wss_callbacks);
blade_module_create(&bm_wss->module, bh, pool, bm_wss, &g_module_wss_callbacks);
bm_wss->module_callbacks = &g_module_wss_callbacks;
bm_wss->transport_callbacks = &g_transport_wss_callbacks;
ks_assert(ks_pool_set_cleanup(pool, bm_wss, NULL, blade_module_wss_cleanup) == KS_STATUS_SUCCESS);
ks_log(KS_LOG_DEBUG, "Created\n");
*bm_wssP = bm_wss;
@ -168,20 +183,21 @@ ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t
ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP)
{
blade_module_wss_t *bm_wss = NULL;
ks_pool_t *pool = NULL;
ks_assert(bm_wssP);
ks_assert(*bm_wssP);
bm_wss = *bm_wssP;
blade_module_wss_on_shutdown(bm_wss->module);
blade_module_destroy(&bm_wss->module);
ks_pool_free(bm_wss->pool, bm_wssP);
pool = bm_wss->pool;
//ks_pool_free(bm_wss->pool, bm_wssP);
ks_pool_close(&pool);
ks_log(KS_LOG_DEBUG, "Destroyed\n");
*bm_wssP = NULL;
return KS_STATUS_SUCCESS;
}
@ -353,14 +369,12 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s
}
}
if (bm_wss->listeners_count > 0 &&
ks_thread_create_ex(&bm_wss->listeners_thread,
blade_module_wss_listeners_thread,
bm_wss,
KS_THREAD_FLAG_DEFAULT,
KS_THREAD_DEFAULT_STACK,
KS_PRI_NORMAL,
bm_wss->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_thread_pool_add_job(blade_handle_tpool_get(bm_wss->handle), blade_module_wss_listeners_thread, bm_wss) != KS_STATUS_SUCCESS) {
// @todo error logging
return KS_STATUS_FAIL;
}
blade_handle_transport_register(bm_wss->handle, bm, BLADE_MODULE_WSS_TRANSPORT_NAME, bm_wss->transport_callbacks);
@ -372,31 +386,25 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s
KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm)
{
blade_module_wss_t *bm_wss = NULL;
ks_bool_t stopped = KS_FALSE;
ks_assert(bm);
bm_wss = (blade_module_wss_t *)blade_module_data_get(bm);
blade_handle_transport_unregister(bm_wss->handle, BLADE_MODULE_WSS_TRANSPORT_NAME);
if (bm_wss->listeners_thread) {
if (bm_wss->listeners_count > 0) {
bm_wss->shutdown = KS_TRUE;
ks_thread_join(bm_wss->listeners_thread);
ks_pool_free(bm_wss->pool, &bm_wss->listeners_thread);
bm_wss->shutdown = KS_FALSE;
stopped = KS_TRUE;
while (bm_wss->shutdown) ks_sleep_ms(1);
}
blade_handle_transport_unregister(bm_wss->handle, BLADE_MODULE_WSS_TRANSPORT_NAME);
for (int32_t index = 0; index < bm_wss->listeners_count; ++index) {
ks_socket_t sock = bm_wss->listeners_poll[index].fd;
ks_socket_shutdown(sock, SHUT_RDWR);
ks_socket_close(&sock);
}
bm_wss->listeners_count = 0;
if (bm_wss->listeners_poll) ks_pool_free(bm_wss->pool, &bm_wss->listeners_poll);
if (stopped) ks_log(KS_LOG_DEBUG, "Stopped\n");
ks_log(KS_LOG_DEBUG, "Stopped\n");
return KS_STATUS_SUCCESS;
}
@ -465,10 +473,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
ks_log(KS_LOG_DEBUG, "Started\n");
while (!bm_wss->shutdown) {
//if (bm_wss->listeners_count == 0) {
// ks_sleep_ms(500);
// continue;
//}
// @todo take exact timeout from a setting in config_wss_endpoints
if (ks_poll(bm_wss->listeners_poll, bm_wss->listeners_count, 100) > 0) {
for (int32_t index = 0; index < bm_wss->listeners_count; ++index) {
@ -521,6 +525,8 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
}
ks_log(KS_LOG_DEBUG, "Stopped\n");
bm_wss->shutdown = KS_FALSE;
return NULL;
}
@ -567,24 +573,6 @@ ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_
return KS_STATUS_SUCCESS;
}
//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP)
//{
// blade_transport_wss_t *bt_wss = NULL;
//
// ks_assert(bt_wssP);
// ks_assert(*bt_wssP);
//
// bt_wss = *bt_wssP;
//
// ks_pool_free(bt_wss->pool, bt_wssP);
//
// ks_log(KS_LOG_DEBUG, "Destroyed\n");
//
// *bt_wssP = NULL;
//
// return KS_STATUS_SUCCESS;
//}
ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id)
{
ks_status_t ret = KS_STATUS_SUCCESS;

View File

@ -475,7 +475,7 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
while (!shutdown) {
// Entering the call below, the mutex is expected to be locked and will be unlocked by the call
ks_cond_timedwait(bs->cond, 500);
ks_cond_timedwait(bs->cond, 100);
// Leaving the call above, the mutex will be locked after being signalled, timing out, or woken up for any reason
state = bs->state;
@ -486,6 +486,8 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) {
blade_connection_sending_push(bc, json);
blade_connection_read_unlock(bc);
} else {
// @todo review this, possible the connection is dropped after popping a message, which results in it just being deleted without sending
}
cJSON_Delete(json);
}
@ -503,15 +505,12 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
break;
case BLADE_SESSION_STATE_CONNECT:
ks_log(KS_LOG_DEBUG, "Session (%s) state connect\n", bs->id);
//ks_sleep_ms(1000);
break;
case BLADE_SESSION_STATE_ATTACH:
ks_log(KS_LOG_DEBUG, "Session (%s) state attach\n", bs->id);
//ks_sleep_ms(1000);
break;
case BLADE_SESSION_STATE_DETACH:
ks_log(KS_LOG_DEBUG, "Session (%s) state detach\n", bs->id);
//ks_sleep_ms(1000);
break;
case BLADE_SESSION_STATE_READY:
blade_session_state_on_ready(bs);

View File

@ -36,8 +36,7 @@
#include <blade.h>
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, void *module_data, blade_module_callbacks_t *module_callbacks);
KS_DECLARE(ks_status_t) blade_module_destroy(blade_module_t **bmP);
KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, ks_pool_t *pool, void *module_data, blade_module_callbacks_t *module_callbacks);
KS_DECLARE(blade_handle_t *) blade_module_handle_get(blade_module_t *bm);
KS_DECLARE(void *) blade_module_data_get(blade_module_t *bm);

View File

@ -87,8 +87,6 @@ int main(int argc, char **argv)
blade_handle_session_state_callback_unregister(bh, session_state_callback_id);
blade_module_wss_on_shutdown(mod_wss);
blade_module_wss_on_unload(mod_wss);
blade_handle_destroy(&bh);

View File

@ -80,8 +80,6 @@ int main(int argc, char **argv)
//blade_module_chat_on_shutdown(mod_chat);
//blade_module_chat_on_unload(mod_chat);
blade_module_wss_on_shutdown(mod_wss);
blade_module_wss_on_unload(mod_wss);
blade_handle_destroy(&bh);