diff --git a/libs/libblade/src/blade_session.c b/libs/libblade/src/blade_session.c index b91dfb0fac..955e058557 100644 --- a/libs/libblade/src/blade_session.c +++ b/libs/libblade/src/blade_session.c @@ -643,6 +643,7 @@ KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, bla ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) { + blade_handle_t *bh = NULL; blade_jsonrpc_request_t *bjsonrpcreq = NULL; blade_jsonrpc_response_t *bjsonrpcres = NULL; const char *jsonrpc = NULL; @@ -655,6 +656,8 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) ks_log(KS_LOG_DEBUG, "Session (%s) processing\n", bs->id); + bh = blade_session_handle_get(bs); + ks_assert(bh); jsonrpc = cJSON_GetObjectCstr(json, "jsonrpc"); if (!jsonrpc || strcmp(jsonrpc, "2.0")) { @@ -679,9 +682,48 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) // 2) Receiving a request (server: method callee or provider) blade_jsonrpc_t *bjsonrpc = NULL; blade_jsonrpc_request_callback_t callback = NULL; + cJSON *params = NULL; ks_log(KS_LOG_DEBUG, "Session (%s) receiving request (%s) for %s\n", bs->id, id, method); + params = cJSON_GetObjectItem(json, "params"); + if (params) { + const char *params_requester_nodeid = cJSON_GetObjectCstr(params, "requester-nodeid"); + const char *params_responder_nodeid = cJSON_GetObjectCstr(params, "responder-nodeid"); + if (params_requester_nodeid && params_responder_nodeid && !blade_handle_local_nodeid_compare(bh, params_responder_nodeid)) { + // not meant for local processing, continue with standard unicast routing for requests + blade_session_t *bs_router = blade_handle_route_lookup(bh, params_responder_nodeid); + if (!bs_router) { + bs_router = blade_handle_sessions_upstream(bh); + if (!bs_router) { + cJSON *res = NULL; + cJSON *res_error = NULL; + + ks_log(KS_LOG_DEBUG, "Session (%s) request (%s => %s) but upstream session unavailable\n", blade_session_id_get(bs), params_requester_nodeid, params_responder_nodeid); + blade_jsonrpc_error_raw_create(&res, &res_error, id, -32603, "Upstream session unavailable"); + + // needed in case this error must propagate further than the session which sent it + cJSON_AddStringToObject(res_error, "requester-nodeid", params_requester_nodeid); + cJSON_AddStringToObject(res_error, "responder-nodeid", params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded + + blade_session_send(bs, res, NULL); + return KS_STATUS_DISCONNECTED; + } + } + + if (bs_router == bs) { + // @todo avoid circular by sending back an error instead, really should not happen but check for posterity in case a node is misbehaving for some reason + } + + ks_log(KS_LOG_DEBUG, "Session (%s) request (%s => %s) routing (%s)\n", blade_session_id_get(bs), params_requester_nodeid, params_responder_nodeid, blade_session_id_get(bs_router)); + blade_session_send(bs_router, json, NULL); + blade_session_read_unlock(bs_router); + + return KS_STATUS_SUCCESS; + } + } + + // reach here if the request was not captured for routing, this SHOULD always mean the message is to be processed by local handlers bjsonrpc = blade_handle_jsonrpc_lookup(bs->handle, method); if (!bjsonrpc) { @@ -702,9 +744,42 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) // @note This is scenario 4 // 4) Receiving a response or error (client: method caller or consumer) blade_jsonrpc_response_callback_t callback = NULL; + cJSON *error = NULL; + cJSON *result = NULL; + cJSON *object = NULL; ks_log(KS_LOG_DEBUG, "Session (%s) receiving response (%s)\n", bs->id, id); + error = cJSON_GetObjectItem(json, "error"); + result = cJSON_GetObjectItem(json, "result"); + object = error ? error : result; + + if (object) { + const char *object_requester_nodeid = cJSON_GetObjectCstr(object, "requester-nodeid"); + const char *object_responder_nodeid = cJSON_GetObjectCstr(object, "responder-nodeid"); + if (object_requester_nodeid && object_responder_nodeid && !blade_handle_local_nodeid_compare(bh, object_requester_nodeid)) { + // not meant for local processing, continue with standard unicast routing for responses + blade_session_t *bs_router = blade_handle_route_lookup(bh, object_requester_nodeid); + if (!bs_router) { + bs_router = blade_handle_sessions_upstream(bh); + if (!bs_router) { + ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) but upstream session unavailable\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid); + return KS_STATUS_DISCONNECTED; + } + } + + if (bs_router == bs) { + // @todo avoid circular, really should not happen but check for posterity in case a node is misbehaving for some reason + } + + ks_log(KS_LOG_DEBUG, "Session (%s) response (%s <= %s) routing (%s)\n", blade_session_id_get(bs), object_requester_nodeid, object_responder_nodeid, blade_session_id_get(bs_router)); + blade_session_send(bs_router, json, NULL); + blade_session_read_unlock(bs_router); + + return KS_STATUS_SUCCESS; + } + } + bjsonrpcreq = blade_handle_requests_lookup(bs->handle, id); if (!bjsonrpcreq) { // @todo hangup session entirely? diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index ca3ab13046..5d141fa1bd 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -564,9 +564,9 @@ KS_DECLARE(ks_status_t) blade_handle_route_remove(blade_handle_t *bh, const char ks_hash_remove(bh->routes, (void *)nodeid); // @todo when a route is removed, upstream needs to be notified, for whatever reason the node is no longer available through - // this node so the routes leading here need to be cleared by passing a "blade.route" upstream to remove the routes, this - // should actually happen only for local sessions, and blade.route should be always passed upstream AND processed locally, so - // we don't want to duplicate blade.route calls already being passed up if this route is not a local session + // this node so the routes leading here need to be cleared by passing a "blade.register" upstream to remove the routes, this + // should actually happen only for local sessions, and blade.register should be always passed upstream AND processed locally, so + // we don't want to duplicate blade.register calls already being passed up if this route is not a local session // @note everything below here is for master-only cleanup when a node is no longer routable @@ -1172,38 +1172,6 @@ ks_bool_t blade_protocol_publish_request_handler(blade_jsonrpc_request_t *breq, goto done; } - // errors sent above this point are meant to be handled by the first node which receives the request, should not occur after the first node validates - // errors (and the response) sent after this point must include the requester-nodeid and responder-nodeid for proper routing - - if (!blade_handle_local_nodeid_compare(bh, req_params_responder_nodeid)) { - // not meant for local processing, continue with routing which on a publish request, it always goes upstream to the master node - blade_session_t *bsu = blade_handle_sessions_upstream(bh); - if (!bsu) { - cJSON *res_error = NULL; - - ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) but upstream session unavailable\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid); - blade_jsonrpc_error_raw_create(&res, &res_error, blade_jsonrpc_request_messageid_get(breq), -32603, "Upstream session unavailable"); - - // needed in case this error must propagate further than the session which sent it - cJSON_AddStringToObject(res_error, "requester-nodeid", req_params_requester_nodeid); - cJSON_AddStringToObject(res_error, "responder-nodeid", req_params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded - - blade_session_send(bs, res, NULL); - goto done; - } - - // @todo this creates a new request that is tracked locally, in order to receive the response in a callback to route it correctly, this could be simplified - // by using a couple special fields to indicate common routing approaches based on a routing block in common for every message, thus being able to bypass this - // and still be able to properly route responses without a specific response handler on every intermediate router, in which case messages that are only being - // routed would not enter into these handlers and would not leave a footprint passing through routers - ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) routing upstream (%s)\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid, blade_session_id_get(bsu)); - blade_session_send(bsu, req, blade_protocol_publish_response_handler); - blade_session_read_unlock(bsu); - - goto done; - } - - // this local node must be responder-nodeid for the request, so process the request ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid); bp_key = ks_psprintf(bh->pool, "%s@%s", req_params_protocol, req_params_realm); @@ -1265,8 +1233,8 @@ ks_bool_t blade_protocol_publish_response_handler(blade_jsonrpc_response_t *bres cJSON *res_error = NULL; cJSON *res_result = NULL; cJSON *res_object = NULL; - const char *requester_nodeid = NULL; - const char *responder_nodeid = NULL; + //const char *requester_nodeid = NULL; + //const char *responder_nodeid = NULL; ks_assert(bres); @@ -1288,24 +1256,9 @@ ks_bool_t blade_protocol_publish_response_handler(blade_jsonrpc_response_t *bres } res_object = res_error ? res_error : res_result; - requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid"); - responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid"); + //requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid"); + //responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid"); - if (requester_nodeid && responder_nodeid && !blade_handle_local_nodeid_compare(bh, requester_nodeid)) { - blade_session_t *bsd = blade_handle_sessions_lookup(bh, requester_nodeid); - if (!bsd) { - ks_log(KS_LOG_DEBUG, "Session (%s) publish response (%s to %s) but downstream session unavailable\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid); - goto done; - } - - ks_log(KS_LOG_DEBUG, "Session (%s) publish response (%s to %s) routing downstream (%s)\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid, blade_session_id_get(bsd)); - blade_session_send(bsd, res, NULL); - blade_session_read_unlock(bsd); - - goto done; - } - - // this local node must be requester-nodeid for the response, or the response lacks routing nodeids, so process the response ks_log(KS_LOG_DEBUG, "Session (%s) publish response processing\n", blade_session_id_get(bs)); if (res_error) { @@ -1446,38 +1399,6 @@ ks_bool_t blade_protocol_locate_request_handler(blade_jsonrpc_request_t *breq, v goto done; } - // errors sent above this point are meant to be handled by the first node which receives the request, should not occur after the first node validates - // errors (and the response) sent after this point must include the requester-nodeid and responder-nodeid for proper routing - - if (!blade_handle_local_nodeid_compare(bh, req_params_responder_nodeid)) { - // not meant for local processing, continue with routing which on a locate request, it always goes upstream to the master node - blade_session_t *bsu = blade_handle_sessions_upstream(bh); - if (!bsu) { - cJSON *res_error = NULL; - - ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) but upstream session unavailable\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid); - blade_jsonrpc_error_raw_create(&res, &res_error, blade_jsonrpc_request_messageid_get(breq), -32603, "Upstream session unavailable"); - - // needed in case this error must propagate further than the session which sent it - cJSON_AddStringToObject(res_error, "requester-nodeid", req_params_requester_nodeid); - cJSON_AddStringToObject(res_error, "responder-nodeid", req_params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded - - blade_session_send(bs, res, NULL); - goto done; - } - - // @todo this creates a new request that is tracked locally, in order to receive the response in a callback to route it correctly, this could be simplified - // by using a couple special fields to indicate common routing approaches based on a routing block in common for every message, thus being able to bypass this - // and still be able to properly route responses without a specific response handler on every intermediate router, in which case messages that are only being - // routed would not enter into these handlers and would not leave a footprint passing through routers - ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) routing upstream (%s)\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid, blade_session_id_get(bsu)); - blade_session_send(bsu, req, blade_protocol_locate_response_handler); - blade_session_read_unlock(bsu); - - goto done; - } - - // this local node must be responder-nodeid for the request, so process the request ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid); res_result_providers = cJSON_CreateObject(); @@ -1560,21 +1481,6 @@ ks_bool_t blade_protocol_locate_response_handler(blade_jsonrpc_response_t *bres) requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid"); responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid"); - if (requester_nodeid && responder_nodeid && !blade_handle_local_nodeid_compare(bh, requester_nodeid)) { - blade_session_t *bsd = blade_handle_sessions_lookup(bh, requester_nodeid); - if (!bsd) { - ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) but downstream session unavailable\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid); - goto done; - } - - ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) routing downstream (%s)\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid, blade_session_id_get(bsd)); - blade_session_send(bsd, res, NULL); - blade_session_read_unlock(bsd); - - goto done; - } - - // this local node must be requester-nodeid for the response, or the response lacks routing nodeids, so process the response ks_log(KS_LOG_DEBUG, "Session (%s) locate response processing\n", blade_session_id_get(bs)); if (res_error) { diff --git a/libs/libblade/src/blade_transport_wss.c b/libs/libblade/src/blade_transport_wss.c index bd05598623..ddf7e7be99 100644 --- a/libs/libblade/src/blade_transport_wss.c +++ b/libs/libblade/src/blade_transport_wss.c @@ -843,7 +843,7 @@ blade_connection_state_hook_t blade_transport_wss_onstate_startup_inbound(blade_ blade_session_route_add(bs, nodeid); // This is the main routing entry to make an identity routable through a session when a message is received for a given identity in this table, these allow efficiently determine which session // a message should pass through when it does not match the local node identities from blade_handle_identity_register(), and must be matched with a call to blade_session_route_add() for cleanup, - // additionally when a "blade.route" is received the identity it carries affects these routes along with the sessionid of the downstream session it came through, and "blade.register" would also + // additionally when a "blade.register" is received the identity it carries affects these routes along with the sessionid of the downstream session it came through, and "blade.register" would also // result in the new identities being added as routes however new entire wildcard subrealm registration would require a special process for matching any identities from those subrealms blade_handle_route_add(bh, nodeid, nodeid);