FS-9952: Added initial support for registering transports, and initial untested code for parsing identities

This commit is contained in:
Shane Bryldt 2017-02-13 19:07:33 +00:00 committed by Mike Jerris
parent 2b3e2646e3
commit 942ae77bde
7 changed files with 138 additions and 6 deletions

View File

@ -37,7 +37,12 @@ struct blade_identity_s {
ks_pool_t *pool;
const char *uri;
// @todo breakdown of uri into constituent parts
const char *components;
const char *name;
const char *domain;
const char *resource;
ks_hash_t *parameters;
};
@ -63,6 +68,11 @@ KS_DECLARE(ks_status_t) blade_identity_destroy(blade_identity_t **biP)
ks_assert(*biP);
bi = *biP;
if (bi->uri) {
ks_pool_free(bi->pool, &bi->uri);
ks_pool_free(bi->pool, &bi->components);
}
if (bi->parameters) ks_hash_destroy(&bi->parameters);
ks_pool_free(bi->pool, biP);
@ -71,14 +81,50 @@ KS_DECLARE(ks_status_t) blade_identity_destroy(blade_identity_t **biP)
KS_DECLARE(ks_status_t) blade_identity_parse(blade_identity_t *bi, const char *uri)
{
char *tmp = NULL;
char *tmp2 = NULL;
ks_assert(bi);
ks_assert(uri);
if (bi->uri) ks_pool_free(bi->pool, &bi->uri);
if (bi->uri) {
ks_pool_free(bi->pool, &bi->uri);
ks_pool_free(bi->pool, &bi->components);
}
bi->uri = ks_pstrdup(bi->pool, uri);
bi->components = tmp = ks_pstrdup(bi->pool, uri);
// @todo parse into components
bi->name = tmp;
if (!(tmp = strchr(tmp, '@'))) return KS_STATUS_FAIL;
*tmp++ = '\0';
bi->domain = tmp2 = tmp;
if ((tmp = strchr(tmp, '/'))) {
*tmp++ = '\0';
bi->resource = tmp2 = tmp;
} else tmp = tmp2;
if ((tmp = strchr(tmp, '?'))) {
*tmp++ = '\0';
while (tmp) {
char *key = tmp;
char *val = NULL;
if (!(tmp = strchr(tmp, '='))) return KS_STATUS_FAIL;
*tmp++ = '\0';
val = tmp;
if ((tmp = strchr(tmp, '&'))) {
*tmp++ = '\0';
}
if (!bi->parameters) {
ks_hash_create(&bi->parameters, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, bi->pool);
ks_assert(bi->parameters);
}
ks_hash_insert(bi->parameters, key, val);
}
}
return KS_STATUS_SUCCESS;
}

View File

@ -130,6 +130,8 @@ static blade_module_callbacks_t g_module_wss_callbacks =
static blade_transport_callbacks_t g_transport_wss_callbacks =
{
"wss",
blade_transport_wss_on_connect,
blade_transport_wss_on_rank,
blade_transport_wss_on_send,
@ -346,8 +348,6 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s
bm_wss = (blade_module_wss_t *)blade_module_data_get(bm);
// @todo register wss transport to the blade_handle_t
if (blade_module_wss_config(bm_wss, config) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "blade_module_wss_config failed\n");
return KS_STATUS_FAIL;
@ -374,6 +374,8 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_startup(blade_module_t *bm, config_s
KS_PRI_NORMAL,
bm_wss->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
blade_handle_transport_register(bm_wss->handle, bm_wss->transport_callbacks);
return KS_STATUS_SUCCESS;
}
@ -387,7 +389,7 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_shutdown(blade_module_t *bm)
bm_wss = (blade_module_wss_t *)blade_module_data_get(bm);
// @todo unregister wss transport from the blade_handle_t
blade_handle_transport_unregister(bm_wss->handle, bm_wss->transport_callbacks);
if (bm_wss->listeners_thread) {
bm_wss->shutdown = KS_TRUE;

View File

@ -47,6 +47,7 @@ struct blade_handle_s {
config_setting_t *config_service;
config_setting_t *config_datastore;
ks_hash_t *transports;
ks_q_t *messages_discarded;
blade_datastore_t *datastore;
@ -76,6 +77,8 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
ks_q_destroy(&bh->messages_discarded);
}
ks_hash_destroy(&bh->transports);
if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool);
ks_pool_free(bh->pool, &bh);
@ -109,6 +112,9 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo
bh->pool = pool;
bh->tpool = tpool;
ks_hash_create(&bh->transports, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, bh->pool);
ks_assert(bh->transports);
// @todo check thresholds from config, for now just ensure it doesn't grow out of control, allow 100 discarded messages
ks_q_create(&bh->messages_discarded, bh->pool, 100);
ks_assert(bh->messages_discarded);
@ -184,6 +190,46 @@ KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh)
return bh->tpool;
}
KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_transport_callbacks_t *callbacks)
{
ks_assert(bh);
ks_assert(callbacks);
ks_hash_write_lock(bh->transports);
ks_hash_insert(bh->transports, (void *)callbacks->name, callbacks);
ks_hash_write_unlock(bh->transports);
ks_log(KS_LOG_DEBUG, "Transport Registered: %s\n", callbacks->name);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, blade_transport_callbacks_t *callbacks)
{
ks_assert(bh);
ks_assert(callbacks);
ks_hash_write_lock(bh->transports);
ks_hash_remove(bh->transports, (void *)callbacks->name);
ks_hash_write_unlock(bh->transports);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target)
{
ks_assert(bh);
ks_assert(target);
ks_hash_read_lock(bh->transports);
// @todo find transport for target, check if target specifies explicit transport parameter first, otherwise use onrank and keep highest ranked callbacks
ks_hash_read_unlock(bh->transports);
// transport_callbacks->onconnect(bcP, target);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length)
{
blade_message_t *msg = NULL;

View File

@ -48,6 +48,9 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh);
KS_DECLARE(ks_pool_t *) blade_handle_pool_get(blade_handle_t *bh);
KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_transport_callbacks_t *callbacks);
KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, blade_transport_callbacks_t *callbacks);
KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length);
KS_DECLARE(ks_status_t) blade_handle_message_discard(blade_handle_t *bh, blade_message_t **message);

View File

@ -105,6 +105,8 @@ typedef ks_status_t (*blade_transport_receive_callback_t)(blade_connection_t *bc
typedef blade_connection_state_hook_t (*blade_transport_state_callback_t)(blade_connection_t *bc, blade_connection_state_condition_t condition);
struct blade_transport_callbacks_s {
const char *name;
blade_transport_connect_callback_t onconnect;
blade_transport_rank_callback_t onrank;
blade_transport_send_callback_t onsend;

View File

@ -1,5 +1,9 @@
blade:
{
identity = "directory@domain";
directory:
{
};
datastore:
{
database:

View File

@ -0,0 +1,29 @@
blade:
{
identity = "peer@domain";
directory:
{
uris = ( "directory@domain?transport=wss&host=127.0.0.1&port=2100" );
};
datastore:
{
database:
{
path = ":mem:";
};
};
wss:
{
endpoints:
{
ipv4 = ( { address = "0.0.0.0", port = 2101 } );
ipv6 = ( { address = "::", port = 2101 } );
backlog = 128;
};
# SSL group is optional, disabled when absent
ssl:
{
# todo: server SSL stuffs here
};
};
};