From 942ae77bde1c6d1d5a77fbb5729d8b333fba4909 Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Mon, 13 Feb 2017 19:07:33 +0000 Subject: [PATCH] FS-9952: Added initial support for registering transports, and initial untested code for parsing identities --- libs/libblade/src/blade_identity.c | 52 +++++++++++++++++++++++-- libs/libblade/src/blade_module_wss.c | 8 ++-- libs/libblade/src/blade_stack.c | 46 ++++++++++++++++++++++ libs/libblade/src/include/blade_stack.h | 3 ++ libs/libblade/src/include/blade_types.h | 2 + libs/libblade/test/bladec.cfg | 4 ++ libs/libblade/test/bladec2.cfg | 29 ++++++++++++++ 7 files changed, 138 insertions(+), 6 deletions(-) create mode 100644 libs/libblade/test/bladec2.cfg diff --git a/libs/libblade/src/blade_identity.c b/libs/libblade/src/blade_identity.c index 034923c7de..fafbfb5ae8 100644 --- a/libs/libblade/src/blade_identity.c +++ b/libs/libblade/src/blade_identity.c @@ -37,7 +37,12 @@ struct blade_identity_s { ks_pool_t *pool; const char *uri; - // @todo breakdown of uri into constituent parts + + const char *components; + const char *name; + const char *domain; + const char *resource; + ks_hash_t *parameters; }; @@ -63,6 +68,11 @@ KS_DECLARE(ks_status_t) blade_identity_destroy(blade_identity_t **biP) ks_assert(*biP); bi = *biP; + if (bi->uri) { + ks_pool_free(bi->pool, &bi->uri); + ks_pool_free(bi->pool, &bi->components); + } + if (bi->parameters) ks_hash_destroy(&bi->parameters); ks_pool_free(bi->pool, biP); @@ -71,14 +81,50 @@ KS_DECLARE(ks_status_t) blade_identity_destroy(blade_identity_t **biP) KS_DECLARE(ks_status_t) blade_identity_parse(blade_identity_t *bi, const char *uri) { + char *tmp = NULL; + char *tmp2 = NULL; + ks_assert(bi); ks_assert(uri); - if (bi->uri) ks_pool_free(bi->pool, &bi->uri); + if (bi->uri) { + ks_pool_free(bi->pool, &bi->uri); + ks_pool_free(bi->pool, &bi->components); + } bi->uri = ks_pstrdup(bi->pool, uri); + bi->components = tmp = ks_pstrdup(bi->pool, uri); - // @todo parse into components + bi->name = tmp; + if (!(tmp = strchr(tmp, '@'))) return KS_STATUS_FAIL; + *tmp++ = '\0'; + bi->domain = tmp2 = tmp; + if ((tmp = strchr(tmp, '/'))) { + *tmp++ = '\0'; + bi->resource = tmp2 = tmp; + } else tmp = tmp2; + + if ((tmp = strchr(tmp, '?'))) { + *tmp++ = '\0'; + + while (tmp) { + char *key = tmp; + char *val = NULL; + if (!(tmp = strchr(tmp, '='))) return KS_STATUS_FAIL; + *tmp++ = '\0'; + val = tmp; + if ((tmp = strchr(tmp, '&'))) { + *tmp++ = '\0'; + } + + if (!bi->parameters) { + ks_hash_create(&bi->parameters, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, bi->pool); + ks_assert(bi->parameters); + } + ks_hash_insert(bi->parameters, key, val); + } + } + return KS_STATUS_SUCCESS; } diff --git a/libs/libblade/src/blade_module_wss.c b/libs/libblade/src/blade_module_wss.c index 109082b232..b98bbb0814 100644 --- a/libs/libblade/src/blade_module_wss.c +++ b/libs/libblade/src/blade_module_wss.c @@ -130,6 +130,8 @@ static blade_module_callbacks_t g_module_wss_callbacks = static blade_transport_callbacks_t g_transport_wss_callbacks = { + "wss", + blade_transport_wss_on_connect, blade_transport_wss_on_rank, blade_transport_wss_on_send, @@ -346,8 +348,6 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s bm_wss = (blade_module_wss_t *)blade_module_data_get(bm); - // @todo register wss transport to the blade_handle_t - if (blade_module_wss_config(bm_wss, config) != KS_STATUS_SUCCESS) { ks_log(KS_LOG_DEBUG, "blade_module_wss_config failed\n"); return KS_STATUS_FAIL; @@ -374,6 +374,8 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s KS_PRI_NORMAL, bm_wss->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + blade_handle_transport_register(bm_wss->handle, bm_wss->transport_callbacks); + return KS_STATUS_SUCCESS; } @@ -387,7 +389,7 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm) bm_wss = (blade_module_wss_t *)blade_module_data_get(bm); - // @todo unregister wss transport from the blade_handle_t + blade_handle_transport_unregister(bm_wss->handle, bm_wss->transport_callbacks); if (bm_wss->listeners_thread) { bm_wss->shutdown = KS_TRUE; diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index aeac8682db..02f2dc6df3 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -47,6 +47,7 @@ struct blade_handle_s { config_setting_t *config_service; config_setting_t *config_datastore; + ks_hash_t *transports; ks_q_t *messages_discarded; blade_datastore_t *datastore; @@ -76,6 +77,8 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP) ks_q_destroy(&bh->messages_discarded); } + ks_hash_destroy(&bh->transports); + if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool); ks_pool_free(bh->pool, &bh); @@ -109,6 +112,9 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo bh->pool = pool; bh->tpool = tpool; + ks_hash_create(&bh->transports, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, bh->pool); + ks_assert(bh->transports); + // @todo check thresholds from config, for now just ensure it doesn't grow out of control, allow 100 discarded messages ks_q_create(&bh->messages_discarded, bh->pool, 100); ks_assert(bh->messages_discarded); @@ -184,6 +190,46 @@ KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh) return bh->tpool; } +KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_transport_callbacks_t *callbacks) +{ + ks_assert(bh); + ks_assert(callbacks); + + ks_hash_write_lock(bh->transports); + ks_hash_insert(bh->transports, (void *)callbacks->name, callbacks); + ks_hash_write_unlock(bh->transports); + + ks_log(KS_LOG_DEBUG, "Transport Registered: %s\n", callbacks->name); + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, blade_transport_callbacks_t *callbacks) +{ + ks_assert(bh); + ks_assert(callbacks); + + ks_hash_write_lock(bh->transports); + ks_hash_remove(bh->transports, (void *)callbacks->name); + ks_hash_write_unlock(bh->transports); + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target) +{ + ks_assert(bh); + ks_assert(target); + + ks_hash_read_lock(bh->transports); + // @todo find transport for target, check if target specifies explicit transport parameter first, otherwise use onrank and keep highest ranked callbacks + ks_hash_read_unlock(bh->transports); + + // transport_callbacks->onconnect(bcP, target); + + return KS_STATUS_SUCCESS; +} + KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length) { blade_message_t *msg = NULL; diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h index 04daa932d6..78b4429376 100644 --- a/libs/libblade/src/include/blade_stack.h +++ b/libs/libblade/src/include/blade_stack.h @@ -48,6 +48,9 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh); KS_DECLARE(ks_pool_t *) blade_handle_pool_get(blade_handle_t *bh); KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh); +KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_transport_callbacks_t *callbacks); +KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, blade_transport_callbacks_t *callbacks); + KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length); KS_DECLARE(ks_status_t) blade_handle_message_discard(blade_handle_t *bh, blade_message_t **message); diff --git a/libs/libblade/src/include/blade_types.h b/libs/libblade/src/include/blade_types.h index 6df447ec60..aad7e4aaa8 100644 --- a/libs/libblade/src/include/blade_types.h +++ b/libs/libblade/src/include/blade_types.h @@ -105,6 +105,8 @@ typedef ks_status_t (*blade_transport_receive_callback_t)(blade_connection_t *bc typedef blade_connection_state_hook_t (*blade_transport_state_callback_t)(blade_connection_t *bc, blade_connection_state_condition_t condition); struct blade_transport_callbacks_s { + const char *name; + blade_transport_connect_callback_t onconnect; blade_transport_rank_callback_t onrank; blade_transport_send_callback_t onsend; diff --git a/libs/libblade/test/bladec.cfg b/libs/libblade/test/bladec.cfg index 95a7f24397..2233b34608 100644 --- a/libs/libblade/test/bladec.cfg +++ b/libs/libblade/test/bladec.cfg @@ -1,5 +1,9 @@ blade: { + identity = "directory@domain"; + directory: + { + }; datastore: { database: diff --git a/libs/libblade/test/bladec2.cfg b/libs/libblade/test/bladec2.cfg new file mode 100644 index 0000000000..1167439083 --- /dev/null +++ b/libs/libblade/test/bladec2.cfg @@ -0,0 +1,29 @@ +blade: +{ + identity = "peer@domain"; + directory: + { + uris = ( "directory@domain?transport=wss&host=127.0.0.1&port=2100" ); + }; + datastore: + { + database: + { + path = ":mem:"; + }; + }; + wss: + { + endpoints: + { + ipv4 = ( { address = "0.0.0.0", port = 2101 } ); + ipv6 = ( { address = "::", port = 2101 } ); + backlog = 128; + }; + # SSL group is optional, disabled when absent + ssl: + { + # todo: server SSL stuffs here + }; + }; +};