FS-10739: [libblade] Loopback session support and reworking sessions to use independent ids, blade.connect now also responds with both the sessionid and the nodeid separately

This commit is contained in:
Shane Bryldt 2017-10-19 02:32:55 -06:00
parent 15455f7060
commit b3e84ac146
11 changed files with 180 additions and 123 deletions

View File

@ -129,15 +129,11 @@ KS_DECLARE(ks_status_t) blade_routemgr_local_set(blade_routemgr_t *brmgr, const
ks_rwl_write_lock(brmgr->local_lock);
if (brmgr->local_nodeid) {
ret = KS_STATUS_DUPLICATE_OPERATION;
goto done;
}
if (brmgr->local_nodeid) ks_pool_free(&brmgr->local_nodeid);
if (nodeid) brmgr->local_nodeid = ks_pstrdup(ks_pool_get(brmgr), nodeid);
ks_log(KS_LOG_DEBUG, "Local NodeID: %s\n", nodeid);
done:
ks_rwl_write_unlock(brmgr->local_lock);
return ret;
@ -220,21 +216,6 @@ KS_DECLARE(ks_bool_t) blade_routemgr_local_pack(blade_routemgr_t *brmgr, cJSON *
return ret;
}
KS_DECLARE(blade_session_t *) blade_routemgr_upstream_lookup(blade_routemgr_t *brmgr)
{
blade_session_t *bs = NULL;
ks_assert(brmgr);
ks_rwl_read_lock(brmgr->local_lock);
if (brmgr->local_nodeid) bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(brmgr->handle), brmgr->local_nodeid);
ks_rwl_read_unlock(brmgr->local_lock);
return bs;
}
KS_DECLARE(ks_status_t) blade_routemgr_master_set(blade_routemgr_t *brmgr, const char *nodeid)
{
ks_assert(brmgr);
@ -329,26 +310,35 @@ KS_DECLARE(blade_session_t *) blade_routemgr_route_lookup(blade_routemgr_t *brmg
ks_assert(brmgr);
ks_assert(target);
router = (const char *)ks_hash_search(brmgr->routes, (void *)target, KS_READLOCKED);
if (!router) {
// @todo this is all really inefficient, but we need the string to be parsed and recombined to ensure correctness for key matching
blade_identity_t *identity = NULL;
ks_pool_t *pool = ks_pool_get(brmgr);
// Short circuit any nodeid or identity that matches the local node by returning the loopback session explicitly
// @todo this could be potentially be avoided if the local nodeid and all local identities are added as routes to the loopback sessionid
if (blade_routemgr_local_check(brmgr, target)) bs = blade_sessionmgr_loopback_lookup(blade_handle_sessionmgr_get(brmgr->handle));
else {
// If the target is a downstream nodeid then it will be found in the route table immediately and return the sessionid of the downstream session to route through
router = (const char *)ks_hash_search(brmgr->routes, (void *)target, KS_READLOCKED);
if (!router) {
// If the target is a downstream node identity then it will be found in the identity table and return the nodeid which can then be used to lookup the route
blade_identity_t *identity = NULL;
ks_pool_t *pool = ks_pool_get(brmgr);
blade_identity_create(&identity, pool);
if (blade_identity_parse(identity, target) == KS_STATUS_SUCCESS) {
char *key = ks_psprintf(pool, "%s@%s/%s", blade_identity_user_get(identity), blade_identity_host_get(identity), blade_identity_path_get(identity));
blade_identity_create(&identity, pool);
if (blade_identity_parse(identity, target) == KS_STATUS_SUCCESS) {
char *key = ks_psprintf(pool, "%s@%s/%s", blade_identity_user_get(identity), blade_identity_host_get(identity), blade_identity_path_get(identity));
router = (const char *)ks_hash_search(brmgr->identities, (void *)key, KS_READLOCKED);
ks_hash_read_unlock(brmgr->identities);
router = (const char *)ks_hash_search(brmgr->identities, (void *)key, KS_READLOCKED);
ks_hash_read_unlock(brmgr->identities);
ks_pool_free(&key);
ks_pool_free(&key);
if (router) router = (const char *)ks_hash_search(brmgr->routes, (void *)router, KS_UNLOCKED);
}
blade_identity_destroy(&identity);
}
blade_identity_destroy(&identity);
// When a router is found it is the sessionid of the downstream session, lookup the session to route through
if (router) bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(brmgr->handle), router);
ks_hash_read_unlock(brmgr->routes);
}
if (router) bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(brmgr->handle), router);
ks_hash_read_unlock(brmgr->routes);
return bs;
}

View File

@ -36,11 +36,13 @@
struct blade_session_s {
blade_handle_t *handle;
volatile blade_session_state_t state;
blade_session_flags_t flags;
const char *id;
ks_rwl_t *lock;
volatile blade_session_state_t state;
ks_cond_t *cond;
const char *connection;
@ -82,7 +84,7 @@ static void blade_session_cleanup(void *ptr, void *arg, ks_pool_cleanup_action_t
}
KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh, const char *id)
KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh, blade_session_flags_t flags, const char *sessionid)
{
blade_session_t *bs = NULL;
ks_pool_t *pool = NULL;
@ -95,8 +97,9 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle
bs = ks_pool_alloc(pool, sizeof(blade_session_t));
bs->handle = bh;
bs->flags = flags;
if (id) bs->id = ks_pstrdup(pool, id);
if (sessionid) bs->id = ks_pstrdup(pool, sessionid);
else {
uuid_t id;
ks_uuid(&id);
@ -106,6 +109,8 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle
ks_rwl_create(&bs->lock, pool);
ks_assert(bs->lock);
bs->state = BLADE_SESSION_STATE_NONE;
ks_cond_create(&bs->cond, pool);
ks_assert(bs->cond);
@ -158,8 +163,6 @@ KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs)
tpool = blade_handle_tpool_get(bs->handle);
ks_assert(tpool);
blade_session_state_set(bs, BLADE_SESSION_STATE_NONE);
if (ks_thread_pool_add_job(tpool, blade_session_state_thread, bs) != KS_STATUS_SUCCESS) {
// @todo error logging
return KS_STATUS_FAIL;
@ -207,6 +210,18 @@ KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs)
return bs->handle;
}
KS_DECLARE(ks_bool_t) blade_session_loopback(blade_session_t *bs)
{
ks_assert(bs);
return (bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) == BLADE_SESSION_FLAGS_LOOPBACK;
}
KS_DECLARE(ks_bool_t) blade_session_upstream(blade_session_t *bs)
{
ks_assert(bs);
return (bs->flags & BLADE_SESSION_FLAGS_UPSTREAM) == BLADE_SESSION_FLAGS_UPSTREAM;
}
KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs)
{
ks_assert(bs);
@ -397,8 +412,11 @@ KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *j
ks_assert(bs);
ks_assert(json);
json_copy = cJSON_Duplicate(json, 1);
if ((ret = ks_q_push(bs->sending, json_copy)) == KS_STATUS_SUCCESS) ks_cond_try_signal(bs->cond);
if ((bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) == BLADE_SESSION_FLAGS_LOOPBACK) ret = blade_session_receiving_push(bs, json);
else {
json_copy = cJSON_Duplicate(json, 1);
if ((ret = ks_q_push(bs->sending, json_copy)) == KS_STATUS_SUCCESS) ks_cond_try_signal(bs->cond);
}
return ret;
}
@ -485,7 +503,8 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
default: break;
}
if (!bs->connection &&
if ((bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) != BLADE_SESSION_FLAGS_LOOPBACK &&
!bs->connection &&
bs->ttl > 0 &&
!blade_session_terminating(bs) &&
ks_time_now() >= bs->ttl) {
@ -537,7 +556,7 @@ ks_status_t blade_session_onstate_run(blade_session_t *bs)
ks_assert(bs);
while (bs->connection && blade_session_receiving_pop(bs, &json) == KS_STATUS_SUCCESS && json) {
while (blade_session_receiving_pop(bs, &json) == KS_STATUS_SUCCESS && json) {
blade_session_process(bs, json);
cJSON_Delete(json);
}
@ -645,7 +664,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
// not meant for local processing, continue with standard unicast routing for requests
blade_session_t *bs_router = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), params_responder_nodeid);
if (!bs_router) {
bs_router = blade_routemgr_upstream_lookup(blade_handle_routemgr_get(bh));
bs_router = blade_sessionmgr_upstream_lookup(blade_handle_sessionmgr_get(bh));
if (!bs_router) {
cJSON *res = NULL;
cJSON *res_error = NULL;
@ -719,7 +738,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
// not meant for local processing, continue with standard unicast routing for responses
blade_session_t *bs_router = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), object_requester_nodeid);
if (!bs_router) {
bs_router = blade_routemgr_upstream_lookup(blade_handle_routemgr_get(bh));
bs_router = blade_sessionmgr_upstream_lookup(blade_handle_sessionmgr_get(bh));
if (!bs_router) {
ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) but upstream session unavailable\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid);
return KS_STATUS_DISCONNECTED;

View File

@ -36,6 +36,8 @@
struct blade_sessionmgr_s {
blade_handle_t *handle;
blade_session_t *loopback;
blade_session_t *upstream;
ks_hash_t *sessions; // id, blade_session_t*
ks_hash_t *callbacks; // id, blade_session_callback_data_t*
};
@ -130,12 +132,37 @@ KS_DECLARE(blade_handle_t *) blade_sessionmgr_handle_get(blade_sessionmgr_t *bsm
return bsmgr->handle;
}
KS_DECLARE(ks_status_t) blade_sessionmgr_startup(blade_sessionmgr_t *bsmgr, config_setting_t *config)
{
ks_assert(bsmgr);
blade_session_create(&bsmgr->loopback, bsmgr->handle, BLADE_SESSION_FLAGS_LOOPBACK, NULL);
ks_assert(bsmgr->loopback);
ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bsmgr->loopback));
if (blade_session_startup(bsmgr->loopback) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", blade_session_id_get(bsmgr->loopback));
blade_session_destroy(&bsmgr->loopback);
return KS_STATUS_FAIL;
}
ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bsmgr->loopback));
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr)
{
ks_hash_iterator_t *it = NULL;
ks_assert(bsmgr);
if (bsmgr->loopback) {
blade_session_hangup(bsmgr->loopback);
ks_sleep_ms(100);
}
ks_hash_read_lock(bsmgr->sessions);
for (it = ks_hash_first(bsmgr->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
@ -151,6 +178,22 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr)
return KS_STATUS_SUCCESS;
}
KS_DECLARE(blade_session_t *) blade_sessionmgr_loopback_lookup(blade_sessionmgr_t *bsmgr)
{
ks_assert(bsmgr);
blade_session_read_lock(bsmgr->loopback, KS_TRUE);
return bsmgr->loopback;
}
KS_DECLARE(blade_session_t *) blade_sessionmgr_upstream_lookup(blade_sessionmgr_t *bsmgr)
{
ks_assert(bsmgr);
if (bsmgr->upstream) blade_session_read_lock(bsmgr->upstream, KS_TRUE);
return bsmgr->upstream;
}
KS_DECLARE(blade_session_t *) blade_sessionmgr_session_lookup(blade_sessionmgr_t *bsmgr, const char *id)
{
blade_session_t *bs = NULL;
@ -177,6 +220,8 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_session_add(blade_sessionmgr_t *bsmgr,
ks_log(KS_LOG_DEBUG, "Session Added: %s\n", key);
if (blade_session_upstream(bs)) bsmgr->upstream = bs;
return KS_STATUS_SUCCESS;
}
@ -196,7 +241,7 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_session_remove(blade_sessionmgr_t *bsmg
ks_log(KS_LOG_DEBUG, "Session Removed: %s\n", id);
routemgr = blade_handle_routemgr_get(bsmgr->handle);
if (blade_routemgr_local_check(routemgr, id)) {
if (blade_session_upstream(bs)) {
blade_routemgr_local_set(routemgr, NULL);
blade_routemgr_master_set(routemgr, NULL);

View File

@ -216,6 +216,8 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
blade_transportmgr_startup(bh->transportmgr, config);
blade_sessionmgr_startup(bh->sessionmgr, config);
blade_mastermgr_startup(bh->mastermgr, config);
blade_restmgr_startup(bh->restmgr, config);
@ -306,7 +308,7 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio
ks_assert(target);
// @todo mini state machine to deal with upstream establishment to avoid attempting multiple upstream connects at the same time?
if ((bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
if ((bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
blade_session_read_unlock(bs);
return KS_STATUS_DUPLICATE_OPERATION;
}
@ -362,7 +364,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcroute(blade_handle_t *bh, const char *no
ks_assert(nodeid);
if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
@ -479,7 +481,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char
ks_assert(bh);
ks_assert(identity);
if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
@ -514,7 +516,7 @@ ks_status_t blade_handle_rpcregister_raw(blade_handle_t *bh, const char *identit
ks_assert(bh);
ks_assert(identity);
if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
@ -720,7 +722,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, blade_rpcpub
ks_assert(protocol);
// @todo consideration for the Master trying to publish a protocol, with no upstream
if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
@ -969,7 +971,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char
ks_assert(channels);
// @todo consideration for the Master trying to publish a protocol, with no upstream
if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
@ -1167,7 +1169,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *p
ks_assert(bh);
ks_assert(protocol);
if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
@ -1303,7 +1305,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *
ks_assert(protocol);
if (!(bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), nodeid))) {
if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
@ -1582,7 +1584,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh,
ks_assert(channels);
// @note this is always produced by a subscriber, and sent upstream, master will only use the internal raw call
if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
@ -1640,7 +1642,7 @@ ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh,
goto done;
}
}
else if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) {
else if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
@ -1895,7 +1897,7 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, voi
if (temp_data && temp_data->original_requestid) {
blade_session_t *relay = NULL;
if (downstream) {
if (!(relay = blade_routemgr_upstream_lookup(bh->routemgr))) {
if (!(relay = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) {
goto done;
}
} else {
@ -1929,18 +1931,10 @@ done:
// blade.broadcast request generator
KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(bh);
ks_assert(protocol);
ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, BLADE_RPCBROADCAST_COMMAND_EVENT, NULL, protocol, channel, event, params, callback, data);
// @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations
// if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler
// is where this normally occurs, however this is not a simple case as the callback expects a blade_rpc_request_t parameter containing context
return ret;
return blade_subscriptionmgr_broadcast(bh->subscriptionmgr, BLADE_RPCBROADCAST_COMMAND_EVENT, NULL, protocol, channel, event, params, callback, data);
}
// @todo blade_handle_rpcbroadcast_raw() to encapsulate adding subcommands to broadcast to support protocol removal, protocol channel removal, and normal event broadcast
@ -2023,21 +2017,12 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
req_params_params = cJSON_GetObjectItem(req_params, "params");
blade_subscriptionmgr_broadcast(bh->subscriptionmgr, command, blade_session_id_get(bs), req_params_protocol, req_params_channel, req_params_event, req_params_params, NULL, NULL);
if (command == BLADE_RPCBROADCAST_COMMAND_EVENT) {
if (!blade_session_loopback(bs)) blade_subscriptionmgr_broadcast(bh->subscriptionmgr, command, blade_session_id_get(bs), req_params_protocol, req_params_channel, req_params_event, req_params_params, NULL, NULL);
else if (command == BLADE_RPCBROADCAST_COMMAND_EVENT) {
bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_protocol, req_params_channel);
if (bsub) {
const char *localid = NULL;
blade_routemgr_local_copy(bh->routemgr, &localid);
ks_assert(localid);
if (ks_hash_search(blade_subscription_subscribers_get(bsub), (void *)localid, KS_UNLOCKED)) {
callback = blade_subscription_callback_get(bsub);
if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
}
ks_pool_free(&localid);
callback = blade_subscription_callback_get(bsub);
if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub));
}
}

View File

@ -338,7 +338,7 @@ KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, con
}
}
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_nodeid, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_sessionid, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data)
{
ks_pool_t *pool = NULL;
const char *bsub_key = NULL;
@ -376,13 +376,13 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
ks_hash_this(it, (const void **)&key, NULL, &value);
if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
//if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
// @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
if (blade_routemgr_local_check(blade_handle_routemgr_get(bsmgr->handle), (const char *)key)) continue;
//if (blade_routemgr_local_check(blade_handle_routemgr_get(bsmgr->handle), (const char *)key)) continue;
bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key);
if (bs) {
if (excluded_sessionid && !ks_safe_strcasecmp(excluded_sessionid, blade_session_id_get(bs))) continue;
if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
if (!ks_hash_search(routers, (void *)blade_session_id_get(bs), KS_UNLOCKED)) ks_hash_insert(routers, (void *)blade_session_id_get(bs), (void *)bs);
else blade_session_read_unlock(bs);
@ -420,13 +420,13 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
ks_hash_this(it2, (const void **)&key2, NULL, &value2);
if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key2)) continue;
//if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key2)) continue;
// @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
if (blade_routemgr_local_check(blade_handle_routemgr_get(bsmgr->handle), (const char *)key2)) continue;
//if (blade_routemgr_local_check(blade_handle_routemgr_get(bsmgr->handle), (const char *)key2)) continue;
bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key2);
if (bs) {
if (excluded_sessionid && !ks_safe_strcasecmp(excluded_sessionid, blade_session_id_get(bs))) continue;
if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
if (!ks_hash_search(routers, (void *)blade_session_id_get(bs), KS_UNLOCKED)) ks_hash_insert(routers, (void *)blade_session_id_get(bs), (void *)bs);
else blade_session_read_unlock(bs);
@ -444,9 +444,9 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
}
bs = blade_routemgr_upstream_lookup(blade_handle_routemgr_get(bsmgr->handle));
bs = blade_sessionmgr_upstream_lookup(blade_handle_sessionmgr_get(bsmgr->handle));
if (bs) {
if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) {
if (!excluded_sessionid || ks_safe_strcasecmp(excluded_sessionid, blade_session_id_get(bs))) {
if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool);
ks_hash_insert(routers, (void *)blade_session_id_get(bs), (void *)bs);
}

View File

@ -832,6 +832,8 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
const char *jsonrpc = NULL;
const char *id = NULL;
const char *method = NULL;
const char *sessionid = NULL;
uuid_t uuid;
const char *nodeid = NULL;
ks_time_t timeout;
@ -901,15 +903,17 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
json_params = cJSON_GetObjectItem(json_req, "params");
if (json_params) {
nodeid = cJSON_GetObjectCstr(json_params, "session-id");
if (nodeid) {
// @todo validate uuid format by parsing, not currently available in uuid functions, send -32602 (invalid params) if invalid
ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", nodeid);
sessionid = cJSON_GetObjectCstr(json_params, "sessionid");
if (sessionid) {
ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", sessionid);
}
}
if (nodeid) {
bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), nodeid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
ks_uuid(&uuid);
nodeid = ks_uuid_str(ks_pool_get(bc), &uuid);
if (sessionid) {
bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), sessionid);
if (bs) {
if (blade_session_terminating(bs)) {
blade_session_read_unlock(bs);
@ -917,21 +921,22 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
bs = NULL;
} else {
ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs));
// @todo validate against IP address or something to ensure reconnects are acceptable
}
}
}
if (!bs) {
blade_session_create(&bs, bh, NULL);
blade_session_create(&bs, bh, BLADE_SESSION_FLAGS_NONE, NULL);
ks_assert(bs);
nodeid = blade_session_id_get(bs);
ks_log(KS_LOG_DEBUG, "Session (%s) created\n", nodeid);
sessionid = blade_session_id_get(bs);
ks_log(KS_LOG_DEBUG, "Session (%s) created\n", sessionid);
blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise
if (blade_session_startup(bs) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", nodeid);
ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", sessionid);
blade_transport_wss_rpc_error_send(bc, id, -32603, "Internal error, session could not be started");
blade_session_read_unlock(bs);
blade_session_destroy(&bs);
@ -941,7 +946,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
// This is an inbound connection, thus it is always creating a downstream session
ks_log(KS_LOG_DEBUG, "Session (%s) started\n", nodeid);
ks_log(KS_LOG_DEBUG, "Session (%s) started\n", sessionid);
blade_sessionmgr_session_add(blade_handle_sessionmgr_get(bh), bs);
// This is primarily to cleanup the routes added to the blade_handle for main routing when a session terminates, these don't have a lot of use otherwise but it will keep the main route table
@ -951,16 +956,17 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
// a message should pass through when it does not match the local node id from blade_routemgr_t, and must be matched with a call to blade_session_route_add() for cleanup, additionally when
// a "blade.route" is received the identity it carries affects these routes along with the sessionid of the downstream session it came through, and "blade.route" would also
// result in the new identities being added as routes however federation registration would require a special process to maintain proper routing
blade_routemgr_route_add(blade_handle_routemgr_get(bh), nodeid, nodeid);
blade_routemgr_route_add(blade_handle_routemgr_get(bh), nodeid, sessionid);
}
blade_rpc_response_raw_create(&json_res, &json_result, id);
ks_assert(json_res);
cJSON_AddStringToObject(json_result, "sessionid", sessionid);
cJSON_AddStringToObject(json_result, "nodeid", nodeid);
if (!blade_routemgr_master_pack(blade_handle_routemgr_get(bh), json_result, "master-nodeid")) {
ks_log(KS_LOG_DEBUG, "Master nodeid unavailable\n");
ks_log(KS_LOG_DEBUG, "Master nodeid unavailable, upstream is not established\n");
blade_transport_wss_rpc_error_send(bc, id, -32602, "Master nodeid unavailable");
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
@ -969,7 +975,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_
// This starts the final process for associating the connection to the session, including for reconnecting to an existing session, this simply
// associates the session to this connection, upon return the remainder of the association for the session to the connection is handled along
// with making sure both this connection and the session state machines are in running states
blade_connection_session_set(bc, nodeid);
blade_connection_session_set(bc, sessionid);
// @todo end of reusable handler for "blade.connect" request
@ -1004,6 +1010,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
const char *id = NULL;
cJSON *json_error = NULL;
cJSON *json_result = NULL;
const char *sessionid = NULL;
const char *nodeid = NULL;
const char *master_nodeid = NULL;
blade_session_t *bs = NULL;
@ -1032,7 +1039,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
blade_rpc_request_raw_create(pool, &json_req, &json_params, &mid, "blade.connect");
ks_assert(json_req);
if (btwssl->session_id) cJSON_AddStringToObject(json_params, "session-id", btwssl->session_id);
if (btwssl->session_id) cJSON_AddStringToObject(json_params, "sessionid", btwssl->session_id);
ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", (btwssl->session_id ? btwssl->session_id : "none"));
@ -1086,6 +1093,13 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
goto done;
}
sessionid = cJSON_GetObjectCstr(json_result, "sessionid");
if (!sessionid) {
ks_log(KS_LOG_DEBUG, "Received message 'result' is missing 'sessionid'\n");
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
nodeid = cJSON_GetObjectCstr(json_result, "nodeid");
if (!nodeid) {
ks_log(KS_LOG_DEBUG, "Received message 'result' is missing 'nodeid'\n");
@ -1100,15 +1114,19 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
goto done;
}
// @todo validate uuid format by parsing, not currently available in uuid functions
bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), nodeid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
bs = blade_sessionmgr_upstream_lookup(blade_handle_sessionmgr_get(bh));
if (bs) {
ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs));
if (ks_safe_strcasecmp(blade_session_id_get(bs), sessionid)) {
ks_log(KS_LOG_DEBUG, "Already have upstream session with different sessionid, could not establish session\n");
blade_session_read_unlock(bs);
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
ks_log(KS_LOG_DEBUG, "Session (%s) reestablishing\n", blade_session_id_get(bs));
}
if (!bs) {
blade_session_create(&bs, bh, nodeid);
blade_session_create(&bs, bh, BLADE_SESSION_FLAGS_UPSTREAM, sessionid);
ks_assert(bs);
ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bs));
@ -1123,20 +1141,11 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade
goto done;
}
// This is an outbound connection, thus it is always creating an upstream session, defined by the sessionid matching the local_nodeid in the handle
if (blade_routemgr_local_set(blade_handle_routemgr_get(bh), nodeid) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "Session (%s) abandoned, upstream already available\n", blade_session_id_get(bs));
blade_session_read_unlock(bs);
blade_session_hangup(bs);
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
// This is an outbound connection, thus it is always creating an upstream session
ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bs));
blade_sessionmgr_session_add(blade_handle_sessionmgr_get(bh), bs);
blade_routemgr_local_set(blade_handle_routemgr_get(bh), nodeid);
blade_routemgr_master_set(blade_handle_routemgr_get(bh), master_nodeid);
}

View File

@ -43,7 +43,6 @@ KS_DECLARE(ks_status_t) blade_routemgr_local_set(blade_routemgr_t *brmgr, const
KS_DECLARE(ks_bool_t) blade_routemgr_local_check(blade_routemgr_t *brmgr, const char *target);
KS_DECLARE(ks_bool_t) blade_routemgr_local_copy(blade_routemgr_t *brmgr, const char **nodeid);
KS_DECLARE(ks_bool_t) blade_routemgr_local_pack(blade_routemgr_t *brmgr, cJSON *json, const char *key);
KS_DECLARE(blade_session_t *) blade_routemgr_upstream_lookup(blade_routemgr_t *brmgr);
KS_DECLARE(ks_status_t) blade_routemgr_master_set(blade_routemgr_t *brmgr, const char *nodeid);
KS_DECLARE(ks_bool_t) blade_routemgr_master_check(blade_routemgr_t *brmgr, const char *target);
KS_DECLARE(ks_bool_t) blade_routemgr_master_pack(blade_routemgr_t *brmgr, cJSON *json, const char *key);

View File

@ -36,11 +36,13 @@
#include <blade.h>
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh, const char *id);
KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh, blade_session_flags_t flags, const char *sessionid);
KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP);
KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs);
KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs);
KS_DECLARE(ks_bool_t) blade_session_loopback(blade_session_t *bs);
KS_DECLARE(ks_bool_t) blade_session_upstream(blade_session_t *bs);
KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs);
KS_DECLARE(blade_session_state_t) blade_session_state_get(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_route_add(blade_session_t *bs, const char *nodeid);

View File

@ -39,7 +39,10 @@ KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_sessionmgr_create(blade_sessionmgr_t **bsmgrP, blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_sessionmgr_destroy(blade_sessionmgr_t **bsmgrP);
KS_DECLARE(blade_handle_t *) blade_sessionmgr_handle_get(blade_sessionmgr_t *bsmgr);
KS_DECLARE(ks_status_t) blade_sessionmgr_startup(blade_sessionmgr_t *bsmgr, config_setting_t *config);
KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr);
KS_DECLARE(blade_session_t *) blade_sessionmgr_loopback_lookup(blade_sessionmgr_t *bsmgr);
KS_DECLARE(blade_session_t *) blade_sessionmgr_upstream_lookup(blade_sessionmgr_t *bsmgr);
KS_DECLARE(blade_session_t *) blade_sessionmgr_session_lookup(blade_sessionmgr_t *bsmgr, const char *id);
KS_DECLARE(ks_status_t) blade_sessionmgr_session_add(blade_sessionmgr_t *bsmgr, blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_sessionmgr_session_remove(blade_sessionmgr_t *bsmgr, blade_session_t *bs);

View File

@ -44,7 +44,7 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_subscription_remove(blade_subscrip
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *channel, const char *subscriber);
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *channel, const char *subscriber);
KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_nodeid, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_sessionid, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_END_EXTERN_C
#endif

View File

@ -95,6 +95,11 @@ typedef enum {
BLADE_CONNECTION_STATE_HOOK_BYPASS,
} blade_connection_state_hook_t;
typedef enum {
BLADE_SESSION_FLAGS_NONE = 0 << 0,
BLADE_SESSION_FLAGS_LOOPBACK = 1 << 0,
BLADE_SESSION_FLAGS_UPSTREAM = 1 << 1,
} blade_session_flags_t;
typedef enum {
BLADE_SESSION_STATE_CONDITION_PRE,