From dd6031544a6f0b44a88c8832d1edb39003ecf2f5 Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Tue, 4 Apr 2017 12:32:01 -0600 Subject: [PATCH] FS-10167: Fixed an issue with connection cleanup, the same approach should be taken with sessions to avoid any potential thread deadlock due to cleanup from the same thread which is running the session. --- libs/libblade/src/blade_connection.c | 10 +++- libs/libblade/src/blade_stack.c | 59 ++++++++++++++++++++ libs/libblade/src/include/blade_connection.h | 1 + libs/libblade/src/include/blade_types.h | 1 + 4 files changed, 68 insertions(+), 3 deletions(-) diff --git a/libs/libblade/src/blade_connection.c b/libs/libblade/src/blade_connection.c index d757845b6b..a3743aa363 100644 --- a/libs/libblade/src/blade_connection.c +++ b/libs/libblade/src/blade_connection.c @@ -302,6 +302,12 @@ KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connec if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc); } +KS_DECLARE(blade_connection_state_t) blade_connection_state_get(blade_connection_t *bc) +{ + ks_assert(bc); + return bc->state; +} + KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc) { ks_assert(bc); @@ -393,12 +399,10 @@ ks_status_t blade_connection_state_on_disconnect(blade_connection_t *bc) ks_assert(bc); - blade_handle_connections_remove(bc); - callback = blade_connection_state_callback_lookup(bc, BLADE_CONNECTION_STATE_DISCONNECT); if (callback) callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST); - blade_connection_destroy(&bc); + blade_connection_state_set(bc, BLADE_CONNECTION_STATE_CLEANUP); return KS_STATUS_SUCCESS; } diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index 536d52ef26..9af67c4a22 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -47,6 +47,9 @@ struct blade_handle_s { config_setting_t *config_directory; config_setting_t *config_datastore; + ks_thread_t *worker_thread; + ks_bool_t shutdown; + ks_hash_t *transports; // registered transports exposed by modules, NOT active connections ks_hash_t *spaces; // registered method spaces exposed by modules // registered event callback registry @@ -68,6 +71,7 @@ struct blade_handle_s { ks_hash_t *requests; // outgoing requests waiting for a response keyed by the message id }; +void *blade_handle_worker_thread(ks_thread_t *thread, void *data); typedef struct blade_handle_transport_registration_s blade_handle_transport_registration_t; struct blade_handle_transport_registration_s { @@ -300,6 +304,17 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_ // @todo call onload and onstartup callbacks for modules from DSOs + if (ks_thread_create_ex(&bh->worker_thread, + blade_handle_worker_thread, + bh, + KS_THREAD_FLAG_DEFAULT, + KS_THREAD_DEFAULT_STACK, + KS_PRI_NORMAL, + bh->pool) != KS_STATUS_SUCCESS) { + // @todo error logging + return KS_STATUS_FAIL; + } + return KS_STATUS_SUCCESS; } @@ -309,6 +324,13 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh) ks_assert(bh); + if (bh->worker_thread) { + bh->shutdown = KS_TRUE; + ks_thread_join(bh->worker_thread); + ks_pool_free(bh->pool, &bh->worker_thread); + bh->shutdown = KS_FALSE; + } + while ((it = ks_hash_first(bh->requests, KS_UNLOCKED))) { void *key = NULL; blade_request_t *value = NULL; @@ -863,6 +885,43 @@ KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh, return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata); } +void *blade_handle_worker_thread(ks_thread_t *thread, void *data) +{ + blade_handle_t *bh = NULL; + blade_connection_t *bc = NULL; + ks_hash_iterator_t *it = NULL; + ks_q_t *cleanup = NULL; + + ks_assert(thread); + ks_assert(data); + + bh = (blade_handle_t *)data; + + ks_q_create(&cleanup, bh->pool, 0); + ks_assert(cleanup); + + while (!bh->shutdown) { + ks_hash_write_lock(bh->connections); + for (it = ks_hash_first(bh->connections, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + void *key = NULL; + blade_connection_t *value = NULL; + + ks_hash_this(it, (const void **)&key, NULL, (void **)&value); + + if (blade_connection_state_get(value) == BLADE_CONNECTION_STATE_CLEANUP) ks_q_push(cleanup, value); + } + ks_hash_write_unlock(bh->connections); + + while (ks_q_trypop(cleanup, (void **)&bc) == KS_STATUS_SUCCESS) { + blade_handle_connections_remove(bc); + blade_connection_destroy(&bc); + } + + ks_sleep_ms(500); + } + + return NULL; +} /* For Emacs: * Local Variables: diff --git a/libs/libblade/src/include/blade_connection.h b/libs/libblade/src/include/blade_connection.h index 8b8660f9cb..6f580828d1 100644 --- a/libs/libblade/src/include/blade_connection.h +++ b/libs/libblade/src/include/blade_connection.h @@ -54,6 +54,7 @@ KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc); KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc); KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data); KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state); +KS_DECLARE(blade_connection_state_t) blade_connection_state_get(blade_connection_t *bc); KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc); KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target); KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json); diff --git a/libs/libblade/src/include/blade_types.h b/libs/libblade/src/include/blade_types.h index e78c6dad0f..71326f0b3a 100644 --- a/libs/libblade/src/include/blade_types.h +++ b/libs/libblade/src/include/blade_types.h @@ -65,6 +65,7 @@ typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, co typedef enum { BLADE_CONNECTION_STATE_NONE, + BLADE_CONNECTION_STATE_CLEANUP, BLADE_CONNECTION_STATE_DISCONNECT, BLADE_CONNECTION_STATE_NEW, BLADE_CONNECTION_STATE_CONNECT,