diff --git a/libs/libblade/src/blade_connection.c b/libs/libblade/src/blade_connection.c index 66d354e00f..ec68cda642 100644 --- a/libs/libblade/src/blade_connection.c +++ b/libs/libblade/src/blade_connection.c @@ -41,7 +41,6 @@ struct blade_connection_s { blade_transport_callbacks_t *transport_callbacks; blade_connection_direction_t direction; - ks_thread_t *state_thread; volatile blade_connection_state_t state; const char *id; diff --git a/libs/libblade/src/blade_module.c b/libs/libblade/src/blade_module.c index 403748b0fc..2101420d44 100644 --- a/libs/libblade/src/blade_module.c +++ b/libs/libblade/src/blade_module.c @@ -41,43 +41,47 @@ struct blade_module_s { blade_module_callbacks_t *module_callbacks; }; +static void blade_module_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type) +{ + blade_module_t *bm = (blade_module_t *)ptr; -KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, void *module_data, blade_module_callbacks_t *module_callbacks) + ks_assert(bm); + + switch (action) { + case KS_MPCL_ANNOUNCE: + break; + case KS_MPCL_TEARDOWN: + break; + case KS_MPCL_DESTROY: + break; + } +} + +KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, ks_pool_t *pool, void *module_data, blade_module_callbacks_t *module_callbacks) { blade_module_t *bm = NULL; - ks_pool_t *pool = NULL; ks_assert(bmP); ks_assert(bh); + ks_assert(pool); ks_assert(module_data); ks_assert(module_callbacks); - pool = blade_handle_pool_get(bh); - bm = ks_pool_alloc(pool, sizeof(blade_module_t)); bm->handle = bh; bm->pool = pool; bm->module_data = module_data; bm->module_callbacks = module_callbacks; + + ks_assert(ks_pool_set_cleanup(pool, bm, NULL, blade_module_cleanup) == KS_STATUS_SUCCESS); + + ks_log(KS_LOG_DEBUG, "Created\n"); + *bmP = bm; return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) blade_module_destroy(blade_module_t **bmP) -{ - blade_module_t *bm = NULL; - - ks_assert(bmP); - ks_assert(*bmP); - - bm = *bmP; - - ks_pool_free(bm->pool, bmP); - - return KS_STATUS_SUCCESS; -} - KS_DECLARE(blade_handle_t *) blade_module_handle_get(blade_module_t *bm) { ks_assert(bm); diff --git a/libs/libblade/src/blade_module_chat.c b/libs/libblade/src/blade_module_chat.c index 9b085b1f75..ea5579dc9d 100644 --- a/libs/libblade/src/blade_module_chat.c +++ b/libs/libblade/src/blade_module_chat.c @@ -71,6 +71,24 @@ static blade_module_callbacks_t g_module_chat_callbacks = }; +static void blade_module_chat_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type) +{ + blade_module_chat_t *bm_chat = (blade_module_chat_t *)ptr; + + ks_assert(bm_chat); + + switch (action) { + case KS_MPCL_ANNOUNCE: + break; + case KS_MPCL_TEARDOWN: + //ks_list_destroy(&bm_chat->participants); + blade_module_chat_on_shutdown(bm_chat->module); + break; + case KS_MPCL_DESTROY: + break; + } +} + ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handle_t *bh) { @@ -80,7 +98,8 @@ ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handl ks_assert(bm_chatP); ks_assert(bh); - pool = blade_handle_pool_get(bh); + ks_pool_open(&pool); + ks_assert(pool); bm_chat = ks_pool_alloc(pool, sizeof(blade_module_chat_t)); bm_chat->handle = bh; @@ -91,35 +110,36 @@ ks_status_t blade_module_chat_create(blade_module_chat_t **bm_chatP, blade_handl ks_list_create(&bm_chat->participants, pool); ks_assert(bm_chat->participants); - blade_module_create(&bm_chat->module, bh, bm_chat, &g_module_chat_callbacks); + blade_module_create(&bm_chat->module, bh, pool, bm_chat, &g_module_chat_callbacks); bm_chat->module_callbacks = &g_module_chat_callbacks; - *bm_chatP = bm_chat; + ks_assert(ks_pool_set_cleanup(pool, bm_chat, NULL, blade_module_chat_cleanup) == KS_STATUS_SUCCESS); ks_log(KS_LOG_DEBUG, "Created\n"); + *bm_chatP = bm_chat; + return KS_STATUS_SUCCESS; } ks_status_t blade_module_chat_destroy(blade_module_chat_t **bm_chatP) { blade_module_chat_t *bm_chat = NULL; + ks_pool_t *pool = NULL; ks_assert(bm_chatP); ks_assert(*bm_chatP); bm_chat = *bm_chatP; - blade_module_chat_on_shutdown(bm_chat->module); - - ks_list_destroy(&bm_chat->participants); - - blade_module_destroy(&bm_chat->module); - - ks_pool_free(bm_chat->pool, bm_chatP); + pool = bm_chat->pool; + //ks_pool_free(bm_chat->pool, bm_chatP); + ks_pool_close(&pool); ks_log(KS_LOG_DEBUG, "Destroyed\n"); + *bm_chatP = NULL; + return KS_STATUS_SUCCESS; } diff --git a/libs/libblade/src/blade_module_wss.c b/libs/libblade/src/blade_module_wss.c index 881346c0ff..e23dd9aa56 100644 --- a/libs/libblade/src/blade_module_wss.c +++ b/libs/libblade/src/blade_module_wss.c @@ -42,7 +42,6 @@ typedef struct blade_transport_wss_s blade_transport_wss_t; struct blade_module_wss_s { blade_handle_t *handle; ks_pool_t *pool; - ks_thread_pool_t *tpool; blade_module_t *module; blade_module_callbacks_t *module_callbacks; blade_transport_callbacks_t *transport_callbacks; @@ -53,9 +52,8 @@ struct blade_module_wss_s { int32_t config_wss_endpoints_ipv6_length; int32_t config_wss_endpoints_backlog; - ks_bool_t shutdown; + volatile ks_bool_t shutdown; - ks_thread_t *listeners_thread; struct pollfd *listeners_poll; int32_t listeners_count; }; @@ -86,7 +84,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data); ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_t *pool, blade_module_wss_t *bm_wss, ks_socket_t sock, const char *session_id); -//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP); ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id); blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target); @@ -138,6 +135,22 @@ static blade_transport_callbacks_t g_transport_wss_callbacks = }; +static void blade_module_wss_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type) +{ + blade_module_wss_t *bm_wss = (blade_module_wss_t *)ptr; + + ks_assert(bm_wss); + + switch (action) { + case KS_MPCL_ANNOUNCE: + break; + case KS_MPCL_TEARDOWN: + blade_module_wss_on_shutdown(bm_wss->module); + break; + case KS_MPCL_DESTROY: + break; + } +} ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t *bh) { @@ -147,17 +160,19 @@ ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t ks_assert(bm_wssP); ks_assert(bh); - pool = blade_handle_pool_get(bh); + ks_pool_open(&pool); + ks_assert(pool); bm_wss = ks_pool_alloc(pool, sizeof(blade_module_wss_t)); bm_wss->handle = bh; bm_wss->pool = pool; - bm_wss->tpool = blade_handle_tpool_get(bh); - blade_module_create(&bm_wss->module, bh, bm_wss, &g_module_wss_callbacks); + blade_module_create(&bm_wss->module, bh, pool, bm_wss, &g_module_wss_callbacks); bm_wss->module_callbacks = &g_module_wss_callbacks; bm_wss->transport_callbacks = &g_transport_wss_callbacks; + ks_assert(ks_pool_set_cleanup(pool, bm_wss, NULL, blade_module_wss_cleanup) == KS_STATUS_SUCCESS); + ks_log(KS_LOG_DEBUG, "Created\n"); *bm_wssP = bm_wss; @@ -168,20 +183,21 @@ ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP) { blade_module_wss_t *bm_wss = NULL; + ks_pool_t *pool = NULL; ks_assert(bm_wssP); ks_assert(*bm_wssP); bm_wss = *bm_wssP; - blade_module_wss_on_shutdown(bm_wss->module); - - blade_module_destroy(&bm_wss->module); - - ks_pool_free(bm_wss->pool, bm_wssP); + pool = bm_wss->pool; + //ks_pool_free(bm_wss->pool, bm_wssP); + ks_pool_close(&pool); ks_log(KS_LOG_DEBUG, "Destroyed\n"); + *bm_wssP = NULL; + return KS_STATUS_SUCCESS; } @@ -353,14 +369,12 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s } } + if (bm_wss->listeners_count > 0 && - ks_thread_create_ex(&bm_wss->listeners_thread, - blade_module_wss_listeners_thread, - bm_wss, - KS_THREAD_FLAG_DEFAULT, - KS_THREAD_DEFAULT_STACK, - KS_PRI_NORMAL, - bm_wss->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + ks_thread_pool_add_job(blade_handle_tpool_get(bm_wss->handle), blade_module_wss_listeners_thread, bm_wss) != KS_STATUS_SUCCESS) { + // @todo error logging + return KS_STATUS_FAIL; + } blade_handle_transport_register(bm_wss->handle, bm, BLADE_MODULE_WSS_TRANSPORT_NAME, bm_wss->transport_callbacks); @@ -372,31 +386,25 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm) { blade_module_wss_t *bm_wss = NULL; - ks_bool_t stopped = KS_FALSE; ks_assert(bm); bm_wss = (blade_module_wss_t *)blade_module_data_get(bm); - blade_handle_transport_unregister(bm_wss->handle, BLADE_MODULE_WSS_TRANSPORT_NAME); - - if (bm_wss->listeners_thread) { + if (bm_wss->listeners_count > 0) { bm_wss->shutdown = KS_TRUE; - ks_thread_join(bm_wss->listeners_thread); - ks_pool_free(bm_wss->pool, &bm_wss->listeners_thread); - bm_wss->shutdown = KS_FALSE; - stopped = KS_TRUE; + while (bm_wss->shutdown) ks_sleep_ms(1); } + blade_handle_transport_unregister(bm_wss->handle, BLADE_MODULE_WSS_TRANSPORT_NAME); + for (int32_t index = 0; index < bm_wss->listeners_count; ++index) { ks_socket_t sock = bm_wss->listeners_poll[index].fd; ks_socket_shutdown(sock, SHUT_RDWR); ks_socket_close(&sock); } - bm_wss->listeners_count = 0; - if (bm_wss->listeners_poll) ks_pool_free(bm_wss->pool, &bm_wss->listeners_poll); - if (stopped) ks_log(KS_LOG_DEBUG, "Stopped\n"); + ks_log(KS_LOG_DEBUG, "Stopped\n"); return KS_STATUS_SUCCESS; } @@ -465,10 +473,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) ks_log(KS_LOG_DEBUG, "Started\n"); while (!bm_wss->shutdown) { - //if (bm_wss->listeners_count == 0) { - // ks_sleep_ms(500); - // continue; - //} // @todo take exact timeout from a setting in config_wss_endpoints if (ks_poll(bm_wss->listeners_poll, bm_wss->listeners_count, 100) > 0) { for (int32_t index = 0; index < bm_wss->listeners_count; ++index) { @@ -521,6 +525,8 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) } ks_log(KS_LOG_DEBUG, "Stopped\n"); + bm_wss->shutdown = KS_FALSE; + return NULL; } @@ -567,24 +573,6 @@ ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, ks_pool_ return KS_STATUS_SUCCESS; } -//ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP) -//{ -// blade_transport_wss_t *bt_wss = NULL; -// -// ks_assert(bt_wssP); -// ks_assert(*bt_wssP); -// -// bt_wss = *bt_wssP; -// -// ks_pool_free(bt_wss->pool, bt_wssP); -// -// ks_log(KS_LOG_DEBUG, "Destroyed\n"); -// -// *bt_wssP = NULL; -// -// return KS_STATUS_SUCCESS; -//} - ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target, const char *session_id) { ks_status_t ret = KS_STATUS_SUCCESS; diff --git a/libs/libblade/src/blade_session.c b/libs/libblade/src/blade_session.c index 04aac39dfa..a2412613ec 100644 --- a/libs/libblade/src/blade_session.c +++ b/libs/libblade/src/blade_session.c @@ -475,7 +475,7 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data) while (!shutdown) { // Entering the call below, the mutex is expected to be locked and will be unlocked by the call - ks_cond_timedwait(bs->cond, 500); + ks_cond_timedwait(bs->cond, 100); // Leaving the call above, the mutex will be locked after being signalled, timing out, or woken up for any reason state = bs->state; @@ -486,6 +486,8 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data) if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) { blade_connection_sending_push(bc, json); blade_connection_read_unlock(bc); + } else { + // @todo review this, possible the connection is dropped after popping a message, which results in it just being deleted without sending } cJSON_Delete(json); } @@ -503,15 +505,12 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data) break; case BLADE_SESSION_STATE_CONNECT: ks_log(KS_LOG_DEBUG, "Session (%s) state connect\n", bs->id); - //ks_sleep_ms(1000); break; case BLADE_SESSION_STATE_ATTACH: ks_log(KS_LOG_DEBUG, "Session (%s) state attach\n", bs->id); - //ks_sleep_ms(1000); break; case BLADE_SESSION_STATE_DETACH: ks_log(KS_LOG_DEBUG, "Session (%s) state detach\n", bs->id); - //ks_sleep_ms(1000); break; case BLADE_SESSION_STATE_READY: blade_session_state_on_ready(bs); diff --git a/libs/libblade/src/include/blade_module.h b/libs/libblade/src/include/blade_module.h index e5fc558f16..ff2b942f48 100644 --- a/libs/libblade/src/include/blade_module.h +++ b/libs/libblade/src/include/blade_module.h @@ -36,8 +36,7 @@ #include KS_BEGIN_EXTERN_C -KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, void *module_data, blade_module_callbacks_t *module_callbacks); -KS_DECLARE(ks_status_t) blade_module_destroy(blade_module_t **bmP); +KS_DECLARE(ks_status_t) blade_module_create(blade_module_t **bmP, blade_handle_t *bh, ks_pool_t *pool, void *module_data, blade_module_callbacks_t *module_callbacks); KS_DECLARE(blade_handle_t *) blade_module_handle_get(blade_module_t *bm); KS_DECLARE(void *) blade_module_data_get(blade_module_t *bm); diff --git a/libs/libblade/test/bladec.c b/libs/libblade/test/bladec.c index 10c000f086..c5f8c01065 100644 --- a/libs/libblade/test/bladec.c +++ b/libs/libblade/test/bladec.c @@ -87,8 +87,6 @@ int main(int argc, char **argv) blade_handle_session_state_callback_unregister(bh, session_state_callback_id); - blade_module_wss_on_shutdown(mod_wss); - blade_module_wss_on_unload(mod_wss); blade_handle_destroy(&bh); diff --git a/libs/libblade/test/blades.c b/libs/libblade/test/blades.c index 638074db05..42af3a8448 100644 --- a/libs/libblade/test/blades.c +++ b/libs/libblade/test/blades.c @@ -80,8 +80,6 @@ int main(int argc, char **argv) //blade_module_chat_on_shutdown(mod_chat); //blade_module_chat_on_unload(mod_chat); - blade_module_wss_on_shutdown(mod_wss); - blade_module_wss_on_unload(mod_wss); blade_handle_destroy(&bh);