diff --git a/libs/libblade/src/blade_routemgr.c b/libs/libblade/src/blade_routemgr.c index 91bd99a209..af3149d921 100644 --- a/libs/libblade/src/blade_routemgr.c +++ b/libs/libblade/src/blade_routemgr.c @@ -129,15 +129,11 @@ KS_DECLARE(ks_status_t) blade_routemgr_local_set(blade_routemgr_t *brmgr, const ks_rwl_write_lock(brmgr->local_lock); - if (brmgr->local_nodeid) { - ret = KS_STATUS_DUPLICATE_OPERATION; - goto done; - } + if (brmgr->local_nodeid) ks_pool_free(&brmgr->local_nodeid); if (nodeid) brmgr->local_nodeid = ks_pstrdup(ks_pool_get(brmgr), nodeid); ks_log(KS_LOG_DEBUG, "Local NodeID: %s\n", nodeid); -done: ks_rwl_write_unlock(brmgr->local_lock); return ret; @@ -220,21 +216,6 @@ KS_DECLARE(ks_bool_t) blade_routemgr_local_pack(blade_routemgr_t *brmgr, cJSON * return ret; } -KS_DECLARE(blade_session_t *) blade_routemgr_upstream_lookup(blade_routemgr_t *brmgr) -{ - blade_session_t *bs = NULL; - - ks_assert(brmgr); - - ks_rwl_read_lock(brmgr->local_lock); - - if (brmgr->local_nodeid) bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(brmgr->handle), brmgr->local_nodeid); - - ks_rwl_read_unlock(brmgr->local_lock); - - return bs; -} - KS_DECLARE(ks_status_t) blade_routemgr_master_set(blade_routemgr_t *brmgr, const char *nodeid) { ks_assert(brmgr); @@ -329,26 +310,35 @@ KS_DECLARE(blade_session_t *) blade_routemgr_route_lookup(blade_routemgr_t *brmg ks_assert(brmgr); ks_assert(target); - router = (const char *)ks_hash_search(brmgr->routes, (void *)target, KS_READLOCKED); - if (!router) { - // @todo this is all really inefficient, but we need the string to be parsed and recombined to ensure correctness for key matching - blade_identity_t *identity = NULL; - ks_pool_t *pool = ks_pool_get(brmgr); + // Short circuit any nodeid or identity that matches the local node by returning the loopback session explicitly + // @todo this could be potentially be avoided if the local nodeid and all local identities are added as routes to the loopback sessionid + if (blade_routemgr_local_check(brmgr, target)) bs = blade_sessionmgr_loopback_lookup(blade_handle_sessionmgr_get(brmgr->handle)); + else { + // If the target is a downstream nodeid then it will be found in the route table immediately and return the sessionid of the downstream session to route through + router = (const char *)ks_hash_search(brmgr->routes, (void *)target, KS_READLOCKED); + if (!router) { + // If the target is a downstream node identity then it will be found in the identity table and return the nodeid which can then be used to lookup the route + blade_identity_t *identity = NULL; + ks_pool_t *pool = ks_pool_get(brmgr); - blade_identity_create(&identity, pool); - if (blade_identity_parse(identity, target) == KS_STATUS_SUCCESS) { - char *key = ks_psprintf(pool, "%s@%s/%s", blade_identity_user_get(identity), blade_identity_host_get(identity), blade_identity_path_get(identity)); + blade_identity_create(&identity, pool); + if (blade_identity_parse(identity, target) == KS_STATUS_SUCCESS) { + char *key = ks_psprintf(pool, "%s@%s/%s", blade_identity_user_get(identity), blade_identity_host_get(identity), blade_identity_path_get(identity)); - router = (const char *)ks_hash_search(brmgr->identities, (void *)key, KS_READLOCKED); - ks_hash_read_unlock(brmgr->identities); + router = (const char *)ks_hash_search(brmgr->identities, (void *)key, KS_READLOCKED); + ks_hash_read_unlock(brmgr->identities); - ks_pool_free(&key); + ks_pool_free(&key); + + if (router) router = (const char *)ks_hash_search(brmgr->routes, (void *)router, KS_UNLOCKED); + } + + blade_identity_destroy(&identity); } - - blade_identity_destroy(&identity); + // When a router is found it is the sessionid of the downstream session, lookup the session to route through + if (router) bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(brmgr->handle), router); + ks_hash_read_unlock(brmgr->routes); } - if (router) bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(brmgr->handle), router); - ks_hash_read_unlock(brmgr->routes); return bs; } diff --git a/libs/libblade/src/blade_session.c b/libs/libblade/src/blade_session.c index 99626209e2..d449ff725b 100644 --- a/libs/libblade/src/blade_session.c +++ b/libs/libblade/src/blade_session.c @@ -36,11 +36,13 @@ struct blade_session_s { blade_handle_t *handle; - volatile blade_session_state_t state; + blade_session_flags_t flags; const char *id; ks_rwl_t *lock; + volatile blade_session_state_t state; + ks_cond_t *cond; const char *connection; @@ -82,7 +84,7 @@ static void blade_session_cleanup(void *ptr, void *arg, ks_pool_cleanup_action_t } -KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh, const char *id) +KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh, blade_session_flags_t flags, const char *sessionid) { blade_session_t *bs = NULL; ks_pool_t *pool = NULL; @@ -95,8 +97,9 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle bs = ks_pool_alloc(pool, sizeof(blade_session_t)); bs->handle = bh; + bs->flags = flags; - if (id) bs->id = ks_pstrdup(pool, id); + if (sessionid) bs->id = ks_pstrdup(pool, sessionid); else { uuid_t id; ks_uuid(&id); @@ -106,6 +109,8 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle ks_rwl_create(&bs->lock, pool); ks_assert(bs->lock); + bs->state = BLADE_SESSION_STATE_NONE; + ks_cond_create(&bs->cond, pool); ks_assert(bs->cond); @@ -158,8 +163,6 @@ KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs) tpool = blade_handle_tpool_get(bs->handle); ks_assert(tpool); - blade_session_state_set(bs, BLADE_SESSION_STATE_NONE); - if (ks_thread_pool_add_job(tpool, blade_session_state_thread, bs) != KS_STATUS_SUCCESS) { // @todo error logging return KS_STATUS_FAIL; @@ -207,6 +210,18 @@ KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs) return bs->handle; } +KS_DECLARE(ks_bool_t) blade_session_loopback(blade_session_t *bs) +{ + ks_assert(bs); + return (bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) == BLADE_SESSION_FLAGS_LOOPBACK; +} + +KS_DECLARE(ks_bool_t) blade_session_upstream(blade_session_t *bs) +{ + ks_assert(bs); + return (bs->flags & BLADE_SESSION_FLAGS_UPSTREAM) == BLADE_SESSION_FLAGS_UPSTREAM; +} + KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs) { ks_assert(bs); @@ -397,8 +412,11 @@ KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *j ks_assert(bs); ks_assert(json); - json_copy = cJSON_Duplicate(json, 1); - if ((ret = ks_q_push(bs->sending, json_copy)) == KS_STATUS_SUCCESS) ks_cond_try_signal(bs->cond); + if ((bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) == BLADE_SESSION_FLAGS_LOOPBACK) ret = blade_session_receiving_push(bs, json); + else { + json_copy = cJSON_Duplicate(json, 1); + if ((ret = ks_q_push(bs->sending, json_copy)) == KS_STATUS_SUCCESS) ks_cond_try_signal(bs->cond); + } return ret; } @@ -485,7 +503,8 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data) default: break; } - if (!bs->connection && + if ((bs->flags & BLADE_SESSION_FLAGS_LOOPBACK) != BLADE_SESSION_FLAGS_LOOPBACK && + !bs->connection && bs->ttl > 0 && !blade_session_terminating(bs) && ks_time_now() >= bs->ttl) { @@ -537,7 +556,7 @@ ks_status_t blade_session_onstate_run(blade_session_t *bs) ks_assert(bs); - while (bs->connection && blade_session_receiving_pop(bs, &json) == KS_STATUS_SUCCESS && json) { + while (blade_session_receiving_pop(bs, &json) == KS_STATUS_SUCCESS && json) { blade_session_process(bs, json); cJSON_Delete(json); } @@ -645,7 +664,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) // not meant for local processing, continue with standard unicast routing for requests blade_session_t *bs_router = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), params_responder_nodeid); if (!bs_router) { - bs_router = blade_routemgr_upstream_lookup(blade_handle_routemgr_get(bh)); + bs_router = blade_sessionmgr_upstream_lookup(blade_handle_sessionmgr_get(bh)); if (!bs_router) { cJSON *res = NULL; cJSON *res_error = NULL; @@ -719,7 +738,7 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) // not meant for local processing, continue with standard unicast routing for responses blade_session_t *bs_router = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), object_requester_nodeid); if (!bs_router) { - bs_router = blade_routemgr_upstream_lookup(blade_handle_routemgr_get(bh)); + bs_router = blade_sessionmgr_upstream_lookup(blade_handle_sessionmgr_get(bh)); if (!bs_router) { ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) but upstream session unavailable\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid); return KS_STATUS_DISCONNECTED; diff --git a/libs/libblade/src/blade_sessionmgr.c b/libs/libblade/src/blade_sessionmgr.c index 593317c37d..c4b2eefadb 100644 --- a/libs/libblade/src/blade_sessionmgr.c +++ b/libs/libblade/src/blade_sessionmgr.c @@ -36,6 +36,8 @@ struct blade_sessionmgr_s { blade_handle_t *handle; + blade_session_t *loopback; + blade_session_t *upstream; ks_hash_t *sessions; // id, blade_session_t* ks_hash_t *callbacks; // id, blade_session_callback_data_t* }; @@ -130,12 +132,37 @@ KS_DECLARE(blade_handle_t *) blade_sessionmgr_handle_get(blade_sessionmgr_t *bsm return bsmgr->handle; } +KS_DECLARE(ks_status_t) blade_sessionmgr_startup(blade_sessionmgr_t *bsmgr, config_setting_t *config) +{ + ks_assert(bsmgr); + + blade_session_create(&bsmgr->loopback, bsmgr->handle, BLADE_SESSION_FLAGS_LOOPBACK, NULL); + ks_assert(bsmgr->loopback); + + ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bsmgr->loopback)); + + if (blade_session_startup(bsmgr->loopback) != KS_STATUS_SUCCESS) { + ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", blade_session_id_get(bsmgr->loopback)); + blade_session_destroy(&bsmgr->loopback); + return KS_STATUS_FAIL; + } + + ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bsmgr->loopback)); + + return KS_STATUS_SUCCESS; +} + KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr) { ks_hash_iterator_t *it = NULL; ks_assert(bsmgr); + if (bsmgr->loopback) { + blade_session_hangup(bsmgr->loopback); + ks_sleep_ms(100); + } + ks_hash_read_lock(bsmgr->sessions); for (it = ks_hash_first(bsmgr->sessions, KS_UNLOCKED); it; it = ks_hash_next(&it)) { void *key = NULL; @@ -151,6 +178,22 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr) return KS_STATUS_SUCCESS; } +KS_DECLARE(blade_session_t *) blade_sessionmgr_loopback_lookup(blade_sessionmgr_t *bsmgr) +{ + ks_assert(bsmgr); + + blade_session_read_lock(bsmgr->loopback, KS_TRUE); + return bsmgr->loopback; +} + +KS_DECLARE(blade_session_t *) blade_sessionmgr_upstream_lookup(blade_sessionmgr_t *bsmgr) +{ + ks_assert(bsmgr); + + if (bsmgr->upstream) blade_session_read_lock(bsmgr->upstream, KS_TRUE); + return bsmgr->upstream; +} + KS_DECLARE(blade_session_t *) blade_sessionmgr_session_lookup(blade_sessionmgr_t *bsmgr, const char *id) { blade_session_t *bs = NULL; @@ -177,6 +220,8 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_session_add(blade_sessionmgr_t *bsmgr, ks_log(KS_LOG_DEBUG, "Session Added: %s\n", key); + if (blade_session_upstream(bs)) bsmgr->upstream = bs; + return KS_STATUS_SUCCESS; } @@ -196,7 +241,7 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_session_remove(blade_sessionmgr_t *bsmg ks_log(KS_LOG_DEBUG, "Session Removed: %s\n", id); routemgr = blade_handle_routemgr_get(bsmgr->handle); - if (blade_routemgr_local_check(routemgr, id)) { + if (blade_session_upstream(bs)) { blade_routemgr_local_set(routemgr, NULL); blade_routemgr_master_set(routemgr, NULL); diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index bc7d96a32e..70e1943deb 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -216,6 +216,8 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_ blade_transportmgr_startup(bh->transportmgr, config); + blade_sessionmgr_startup(bh->sessionmgr, config); + blade_mastermgr_startup(bh->mastermgr, config); blade_restmgr_startup(bh->restmgr, config); @@ -306,7 +308,7 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio ks_assert(target); // @todo mini state machine to deal with upstream establishment to avoid attempting multiple upstream connects at the same time? - if ((bs = blade_routemgr_upstream_lookup(bh->routemgr))) { + if ((bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) { blade_session_read_unlock(bs); return KS_STATUS_DUPLICATE_OPERATION; } @@ -362,7 +364,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcroute(blade_handle_t *bh, const char *no ks_assert(nodeid); - if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) { + if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) { ret = KS_STATUS_DISCONNECTED; goto done; } @@ -479,7 +481,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char ks_assert(bh); ks_assert(identity); - if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) { + if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) { ret = KS_STATUS_DISCONNECTED; goto done; } @@ -514,7 +516,7 @@ ks_status_t blade_handle_rpcregister_raw(blade_handle_t *bh, const char *identit ks_assert(bh); ks_assert(identity); - if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) { + if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) { ret = KS_STATUS_DISCONNECTED; goto done; } @@ -720,7 +722,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, blade_rpcpub ks_assert(protocol); // @todo consideration for the Master trying to publish a protocol, with no upstream - if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) { + if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) { ret = KS_STATUS_DISCONNECTED; goto done; } @@ -969,7 +971,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char ks_assert(channels); // @todo consideration for the Master trying to publish a protocol, with no upstream - if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) { + if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) { ret = KS_STATUS_DISCONNECTED; goto done; } @@ -1167,7 +1169,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *p ks_assert(bh); ks_assert(protocol); - if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) { + if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) { ret = KS_STATUS_DISCONNECTED; goto done; } @@ -1303,7 +1305,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char * ks_assert(protocol); if (!(bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), nodeid))) { - if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) { + if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) { ret = KS_STATUS_DISCONNECTED; goto done; } @@ -1582,7 +1584,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, ks_assert(channels); // @note this is always produced by a subscriber, and sent upstream, master will only use the internal raw call - if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) { + if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) { ret = KS_STATUS_DISCONNECTED; goto done; } @@ -1640,7 +1642,7 @@ ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, goto done; } } - else if (!(bs = blade_routemgr_upstream_lookup(bh->routemgr))) { + else if (!(bs = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) { ret = KS_STATUS_DISCONNECTED; goto done; } @@ -1895,7 +1897,7 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, voi if (temp_data && temp_data->original_requestid) { blade_session_t *relay = NULL; if (downstream) { - if (!(relay = blade_routemgr_upstream_lookup(bh->routemgr))) { + if (!(relay = blade_sessionmgr_upstream_lookup(bh->sessionmgr))) { goto done; } } else { @@ -1929,18 +1931,10 @@ done: // blade.broadcast request generator KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data) { - ks_status_t ret = KS_STATUS_SUCCESS; - ks_assert(bh); ks_assert(protocol); - ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, BLADE_RPCBROADCAST_COMMAND_EVENT, NULL, protocol, channel, event, params, callback, data); - - // @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations - // if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler - // is where this normally occurs, however this is not a simple case as the callback expects a blade_rpc_request_t parameter containing context - - return ret; + return blade_subscriptionmgr_broadcast(bh->subscriptionmgr, BLADE_RPCBROADCAST_COMMAND_EVENT, NULL, protocol, channel, event, params, callback, data); } // @todo blade_handle_rpcbroadcast_raw() to encapsulate adding subcommands to broadcast to support protocol removal, protocol channel removal, and normal event broadcast @@ -2023,21 +2017,12 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void req_params_params = cJSON_GetObjectItem(req_params, "params"); - blade_subscriptionmgr_broadcast(bh->subscriptionmgr, command, blade_session_id_get(bs), req_params_protocol, req_params_channel, req_params_event, req_params_params, NULL, NULL); - - if (command == BLADE_RPCBROADCAST_COMMAND_EVENT) { + if (!blade_session_loopback(bs)) blade_subscriptionmgr_broadcast(bh->subscriptionmgr, command, blade_session_id_get(bs), req_params_protocol, req_params_channel, req_params_event, req_params_params, NULL, NULL); + else if (command == BLADE_RPCBROADCAST_COMMAND_EVENT) { bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_protocol, req_params_channel); if (bsub) { - const char *localid = NULL; - - blade_routemgr_local_copy(bh->routemgr, &localid); - ks_assert(localid); - - if (ks_hash_search(blade_subscription_subscribers_get(bsub), (void *)localid, KS_UNLOCKED)) { - callback = blade_subscription_callback_get(bsub); - if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub)); - } - ks_pool_free(&localid); + callback = blade_subscription_callback_get(bsub); + if (callback) ret = callback(brpcreq, blade_subscription_callback_data_get(bsub)); } } diff --git a/libs/libblade/src/blade_subscriptionmgr.c b/libs/libblade/src/blade_subscriptionmgr.c index 64105239cd..59cabdbf27 100644 --- a/libs/libblade/src/blade_subscriptionmgr.c +++ b/libs/libblade/src/blade_subscriptionmgr.c @@ -338,7 +338,7 @@ KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, con } } -KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_nodeid, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data) +KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_sessionid, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data) { ks_pool_t *pool = NULL; const char *bsub_key = NULL; @@ -376,13 +376,13 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t ks_hash_this(it, (const void **)&key, NULL, &value); - if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue; + //if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue; - // @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through - if (blade_routemgr_local_check(blade_handle_routemgr_get(bsmgr->handle), (const char *)key)) continue; + //if (blade_routemgr_local_check(blade_handle_routemgr_get(bsmgr->handle), (const char *)key)) continue; bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key); if (bs) { + if (excluded_sessionid && !ks_safe_strcasecmp(excluded_sessionid, blade_session_id_get(bs))) continue; if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool); if (!ks_hash_search(routers, (void *)blade_session_id_get(bs), KS_UNLOCKED)) ks_hash_insert(routers, (void *)blade_session_id_get(bs), (void *)bs); else blade_session_read_unlock(bs); @@ -420,13 +420,13 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t ks_hash_this(it2, (const void **)&key2, NULL, &value2); - if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key2)) continue; + //if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key2)) continue; - // @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through - if (blade_routemgr_local_check(blade_handle_routemgr_get(bsmgr->handle), (const char *)key2)) continue; + //if (blade_routemgr_local_check(blade_handle_routemgr_get(bsmgr->handle), (const char *)key2)) continue; bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key2); if (bs) { + if (excluded_sessionid && !ks_safe_strcasecmp(excluded_sessionid, blade_session_id_get(bs))) continue; if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool); if (!ks_hash_search(routers, (void *)blade_session_id_get(bs), KS_UNLOCKED)) ks_hash_insert(routers, (void *)blade_session_id_get(bs), (void *)bs); else blade_session_read_unlock(bs); @@ -444,9 +444,9 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t } - bs = blade_routemgr_upstream_lookup(blade_handle_routemgr_get(bsmgr->handle)); + bs = blade_sessionmgr_upstream_lookup(blade_handle_sessionmgr_get(bsmgr->handle)); if (bs) { - if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) { + if (!excluded_sessionid || ks_safe_strcasecmp(excluded_sessionid, blade_session_id_get(bs))) { if (!routers) ks_hash_create(&routers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, pool); ks_hash_insert(routers, (void *)blade_session_id_get(bs), (void *)bs); } diff --git a/libs/libblade/src/blade_transport_wss.c b/libs/libblade/src/blade_transport_wss.c index ea123f4a0e..57aa2c9e02 100644 --- a/libs/libblade/src/blade_transport_wss.c +++ b/libs/libblade/src/blade_transport_wss.c @@ -832,6 +832,8 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_ const char *jsonrpc = NULL; const char *id = NULL; const char *method = NULL; + const char *sessionid = NULL; + uuid_t uuid; const char *nodeid = NULL; ks_time_t timeout; @@ -901,15 +903,17 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_ json_params = cJSON_GetObjectItem(json_req, "params"); if (json_params) { - nodeid = cJSON_GetObjectCstr(json_params, "session-id"); - if (nodeid) { - // @todo validate uuid format by parsing, not currently available in uuid functions, send -32602 (invalid params) if invalid - ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", nodeid); + sessionid = cJSON_GetObjectCstr(json_params, "sessionid"); + if (sessionid) { + ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", sessionid); } } - if (nodeid) { - bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), nodeid); // bs comes out read locked if not null to prevent it being cleaned up before we are done + ks_uuid(&uuid); + nodeid = ks_uuid_str(ks_pool_get(bc), &uuid); + + if (sessionid) { + bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), sessionid); if (bs) { if (blade_session_terminating(bs)) { blade_session_read_unlock(bs); @@ -917,21 +921,22 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_ bs = NULL; } else { ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs)); + // @todo validate against IP address or something to ensure reconnects are acceptable } } } if (!bs) { - blade_session_create(&bs, bh, NULL); + blade_session_create(&bs, bh, BLADE_SESSION_FLAGS_NONE, NULL); ks_assert(bs); - nodeid = blade_session_id_get(bs); - ks_log(KS_LOG_DEBUG, "Session (%s) created\n", nodeid); + sessionid = blade_session_id_get(bs); + ks_log(KS_LOG_DEBUG, "Session (%s) created\n", sessionid); blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise if (blade_session_startup(bs) != KS_STATUS_SUCCESS) { - ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", nodeid); + ks_log(KS_LOG_DEBUG, "Session (%s) startup failed\n", sessionid); blade_transport_wss_rpc_error_send(bc, id, -32603, "Internal error, session could not be started"); blade_session_read_unlock(bs); blade_session_destroy(&bs); @@ -941,7 +946,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_ // This is an inbound connection, thus it is always creating a downstream session - ks_log(KS_LOG_DEBUG, "Session (%s) started\n", nodeid); + ks_log(KS_LOG_DEBUG, "Session (%s) started\n", sessionid); blade_sessionmgr_session_add(blade_handle_sessionmgr_get(bh), bs); // This is primarily to cleanup the routes added to the blade_handle for main routing when a session terminates, these don't have a lot of use otherwise but it will keep the main route table @@ -951,16 +956,17 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_ // a message should pass through when it does not match the local node id from blade_routemgr_t, and must be matched with a call to blade_session_route_add() for cleanup, additionally when // a "blade.route" is received the identity it carries affects these routes along with the sessionid of the downstream session it came through, and "blade.route" would also // result in the new identities being added as routes however federation registration would require a special process to maintain proper routing - blade_routemgr_route_add(blade_handle_routemgr_get(bh), nodeid, nodeid); + blade_routemgr_route_add(blade_handle_routemgr_get(bh), nodeid, sessionid); } blade_rpc_response_raw_create(&json_res, &json_result, id); ks_assert(json_res); + cJSON_AddStringToObject(json_result, "sessionid", sessionid); cJSON_AddStringToObject(json_result, "nodeid", nodeid); if (!blade_routemgr_master_pack(blade_handle_routemgr_get(bh), json_result, "master-nodeid")) { - ks_log(KS_LOG_DEBUG, "Master nodeid unavailable\n"); + ks_log(KS_LOG_DEBUG, "Master nodeid unavailable, upstream is not established\n"); blade_transport_wss_rpc_error_send(bc, id, -32602, "Master nodeid unavailable"); ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; goto done; @@ -969,7 +975,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_ // This starts the final process for associating the connection to the session, including for reconnecting to an existing session, this simply // associates the session to this connection, upon return the remainder of the association for the session to the connection is handled along // with making sure both this connection and the session state machines are in running states - blade_connection_session_set(bc, nodeid); + blade_connection_session_set(bc, sessionid); // @todo end of reusable handler for "blade.connect" request @@ -1004,6 +1010,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade const char *id = NULL; cJSON *json_error = NULL; cJSON *json_result = NULL; + const char *sessionid = NULL; const char *nodeid = NULL; const char *master_nodeid = NULL; blade_session_t *bs = NULL; @@ -1032,7 +1039,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade blade_rpc_request_raw_create(pool, &json_req, &json_params, &mid, "blade.connect"); ks_assert(json_req); - if (btwssl->session_id) cJSON_AddStringToObject(json_params, "session-id", btwssl->session_id); + if (btwssl->session_id) cJSON_AddStringToObject(json_params, "sessionid", btwssl->session_id); ks_log(KS_LOG_DEBUG, "Session (%s) requested\n", (btwssl->session_id ? btwssl->session_id : "none")); @@ -1086,6 +1093,13 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade goto done; } + sessionid = cJSON_GetObjectCstr(json_result, "sessionid"); + if (!sessionid) { + ks_log(KS_LOG_DEBUG, "Received message 'result' is missing 'sessionid'\n"); + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } + nodeid = cJSON_GetObjectCstr(json_result, "nodeid"); if (!nodeid) { ks_log(KS_LOG_DEBUG, "Received message 'result' is missing 'nodeid'\n"); @@ -1100,15 +1114,19 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade goto done; } - - // @todo validate uuid format by parsing, not currently available in uuid functions - bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), nodeid); // bs comes out read locked if not null to prevent it being cleaned up before we are done + bs = blade_sessionmgr_upstream_lookup(blade_handle_sessionmgr_get(bh)); if (bs) { - ks_log(KS_LOG_DEBUG, "Session (%s) located\n", blade_session_id_get(bs)); + if (ks_safe_strcasecmp(blade_session_id_get(bs), sessionid)) { + ks_log(KS_LOG_DEBUG, "Already have upstream session with different sessionid, could not establish session\n"); + blade_session_read_unlock(bs); + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } + ks_log(KS_LOG_DEBUG, "Session (%s) reestablishing\n", blade_session_id_get(bs)); } if (!bs) { - blade_session_create(&bs, bh, nodeid); + blade_session_create(&bs, bh, BLADE_SESSION_FLAGS_UPSTREAM, sessionid); ks_assert(bs); ks_log(KS_LOG_DEBUG, "Session (%s) created\n", blade_session_id_get(bs)); @@ -1123,20 +1141,11 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_outbound(blade goto done; } - // This is an outbound connection, thus it is always creating an upstream session, defined by the sessionid matching the local_nodeid in the handle - - if (blade_routemgr_local_set(blade_handle_routemgr_get(bh), nodeid) != KS_STATUS_SUCCESS) { - ks_log(KS_LOG_DEBUG, "Session (%s) abandoned, upstream already available\n", blade_session_id_get(bs)); - blade_session_read_unlock(bs); - blade_session_hangup(bs); - ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; - goto done; - } - + // This is an outbound connection, thus it is always creating an upstream session ks_log(KS_LOG_DEBUG, "Session (%s) started\n", blade_session_id_get(bs)); - blade_sessionmgr_session_add(blade_handle_sessionmgr_get(bh), bs); + blade_routemgr_local_set(blade_handle_routemgr_get(bh), nodeid); blade_routemgr_master_set(blade_handle_routemgr_get(bh), master_nodeid); } diff --git a/libs/libblade/src/include/blade_routemgr.h b/libs/libblade/src/include/blade_routemgr.h index c75426c183..d1e930b657 100644 --- a/libs/libblade/src/include/blade_routemgr.h +++ b/libs/libblade/src/include/blade_routemgr.h @@ -43,7 +43,6 @@ KS_DECLARE(ks_status_t) blade_routemgr_local_set(blade_routemgr_t *brmgr, const KS_DECLARE(ks_bool_t) blade_routemgr_local_check(blade_routemgr_t *brmgr, const char *target); KS_DECLARE(ks_bool_t) blade_routemgr_local_copy(blade_routemgr_t *brmgr, const char **nodeid); KS_DECLARE(ks_bool_t) blade_routemgr_local_pack(blade_routemgr_t *brmgr, cJSON *json, const char *key); -KS_DECLARE(blade_session_t *) blade_routemgr_upstream_lookup(blade_routemgr_t *brmgr); KS_DECLARE(ks_status_t) blade_routemgr_master_set(blade_routemgr_t *brmgr, const char *nodeid); KS_DECLARE(ks_bool_t) blade_routemgr_master_check(blade_routemgr_t *brmgr, const char *target); KS_DECLARE(ks_bool_t) blade_routemgr_master_pack(blade_routemgr_t *brmgr, cJSON *json, const char *key); diff --git a/libs/libblade/src/include/blade_session.h b/libs/libblade/src/include/blade_session.h index 4bdbe8a85c..fb7a2c64ce 100644 --- a/libs/libblade/src/include/blade_session.h +++ b/libs/libblade/src/include/blade_session.h @@ -36,11 +36,13 @@ #include KS_BEGIN_EXTERN_C -KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh, const char *id); +KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh, blade_session_flags_t flags, const char *sessionid); KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP); KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs); KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs); KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs); +KS_DECLARE(ks_bool_t) blade_session_loopback(blade_session_t *bs); +KS_DECLARE(ks_bool_t) blade_session_upstream(blade_session_t *bs); KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs); KS_DECLARE(blade_session_state_t) blade_session_state_get(blade_session_t *bs); KS_DECLARE(ks_status_t) blade_session_route_add(blade_session_t *bs, const char *nodeid); diff --git a/libs/libblade/src/include/blade_sessionmgr.h b/libs/libblade/src/include/blade_sessionmgr.h index 62f2783d8d..9666142a14 100644 --- a/libs/libblade/src/include/blade_sessionmgr.h +++ b/libs/libblade/src/include/blade_sessionmgr.h @@ -39,7 +39,10 @@ KS_BEGIN_EXTERN_C KS_DECLARE(ks_status_t) blade_sessionmgr_create(blade_sessionmgr_t **bsmgrP, blade_handle_t *bh); KS_DECLARE(ks_status_t) blade_sessionmgr_destroy(blade_sessionmgr_t **bsmgrP); KS_DECLARE(blade_handle_t *) blade_sessionmgr_handle_get(blade_sessionmgr_t *bsmgr); +KS_DECLARE(ks_status_t) blade_sessionmgr_startup(blade_sessionmgr_t *bsmgr, config_setting_t *config); KS_DECLARE(ks_status_t) blade_sessionmgr_shutdown(blade_sessionmgr_t *bsmgr); +KS_DECLARE(blade_session_t *) blade_sessionmgr_loopback_lookup(blade_sessionmgr_t *bsmgr); +KS_DECLARE(blade_session_t *) blade_sessionmgr_upstream_lookup(blade_sessionmgr_t *bsmgr); KS_DECLARE(blade_session_t *) blade_sessionmgr_session_lookup(blade_sessionmgr_t *bsmgr, const char *id); KS_DECLARE(ks_status_t) blade_sessionmgr_session_add(blade_sessionmgr_t *bsmgr, blade_session_t *bs); KS_DECLARE(ks_status_t) blade_sessionmgr_session_remove(blade_sessionmgr_t *bsmgr, blade_session_t *bs); diff --git a/libs/libblade/src/include/blade_subscriptionmgr.h b/libs/libblade/src/include/blade_subscriptionmgr.h index 0cc811985e..ace7db8887 100644 --- a/libs/libblade/src/include/blade_subscriptionmgr.h +++ b/libs/libblade/src/include/blade_subscriptionmgr.h @@ -44,7 +44,7 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_subscription_remove(blade_subscrip KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *channel, const char *subscriber); KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *channel, const char *subscriber); KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target); -KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_nodeid, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data); +KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, blade_rpcbroadcast_command_t command, const char *excluded_sessionid, const char *protocol, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data); KS_END_EXTERN_C #endif diff --git a/libs/libblade/src/include/blade_types.h b/libs/libblade/src/include/blade_types.h index 93d61b6f95..2003e3dfbb 100644 --- a/libs/libblade/src/include/blade_types.h +++ b/libs/libblade/src/include/blade_types.h @@ -95,6 +95,11 @@ typedef enum { BLADE_CONNECTION_STATE_HOOK_BYPASS, } blade_connection_state_hook_t; +typedef enum { + BLADE_SESSION_FLAGS_NONE = 0 << 0, + BLADE_SESSION_FLAGS_LOOPBACK = 1 << 0, + BLADE_SESSION_FLAGS_UPSTREAM = 1 << 1, +} blade_session_flags_t; typedef enum { BLADE_SESSION_STATE_CONDITION_PRE,