diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index 888fd1d30f..ca3ab13046 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -88,6 +88,9 @@ struct blade_handle_s { ks_bool_t blade_protocol_publish_request_handler(blade_jsonrpc_request_t *breq, void *data); ks_bool_t blade_protocol_publish_response_handler(blade_jsonrpc_response_t *bres); +ks_bool_t blade_protocol_locate_request_handler(blade_jsonrpc_request_t *breq, void *data); +ks_bool_t blade_protocol_locate_response_handler(blade_jsonrpc_response_t *bres); + typedef struct blade_handle_session_state_callback_registration_s blade_handle_session_state_callback_registration_t; struct blade_handle_session_state_callback_registration_s { @@ -328,6 +331,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_ blade_jsonrpc_create(&bjsonrpc, bh, "blade.publish", blade_protocol_publish_request_handler, NULL); blade_handle_jsonrpc_register(bjsonrpc); + blade_jsonrpc_create(&bjsonrpc, bh, "blade.locate", blade_protocol_locate_request_handler, NULL); + blade_handle_jsonrpc_register(bjsonrpc); + blade_transport_wss_create(&bt, bh); ks_assert(bt); bh->default_transport = bt; @@ -1042,7 +1048,7 @@ KS_DECLARE(void) blade_handle_session_state_callbacks_execute(blade_session_t *b // blade.publish request generator -// @todo add additional async callback to be called upon a publish response to inform caller of the result? +// @todo add additional async callback to be called upon a publish response to inform caller of the result from the publish response handler KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm) { ks_status_t ret = KS_STATUS_SUCCESS; @@ -1073,8 +1079,14 @@ KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *n cJSON_AddStringToObject(req_params, "responder-nodeid", bh->master_nodeid); ks_rwl_read_unlock(bh->master_nodeid_rwl); + // @todo add a parameter containing a block of json for schema definitions for each of the methods being published + ks_log(KS_LOG_DEBUG, "Session (%s) publish request started\n", blade_session_id_get(bs)); - ret = blade_session_send(bs, req, blade_protocol_publish_response_handler); + ret = blade_session_send(bs, req, blade_protocol_publish_response_handler); // @todo add another callback to parameters so caller can be informed of blade.publish response + + // @todo upon return, a provider should register the methods for this protocol to be locally available + // prior to receiving a response, if the response is an error then unregister, but if it is successful + // then the node is already primed to receive any immediate requests done: if (req) cJSON_Delete(req); @@ -1229,6 +1241,7 @@ ks_bool_t blade_protocol_publish_request_handler(blade_jsonrpc_request_t *breq, blade_jsonrpc_response_raw_create(&res, &res_result, blade_jsonrpc_request_messageid_get(breq)); cJSON_AddStringToObject(res_result, "protocol", req_params_protocol); + cJSON_AddStringToObject(res_result, "realm", req_params_realm); cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid); cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid); @@ -1309,6 +1322,300 @@ done: return KS_FALSE; } + +// blade.locate request generator +// @todo add additional async callback to be called upon a locate response to inform caller of the result from the locate response handler +KS_DECLARE(ks_status_t) blade_protocol_locate(blade_handle_t *bh, const char *name, const char *realm) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + cJSON *req = NULL; + cJSON *req_params = NULL; + blade_session_t *bs = NULL; + + ks_assert(bh); + ks_assert(name); + ks_assert(realm); + + if (!(bs = blade_handle_sessions_upstream(bh))) { + ret = KS_STATUS_DISCONNECTED; + goto done; + } + + blade_jsonrpc_request_raw_create(blade_handle_pool_get(bh), &req, &req_params, NULL, "blade.locate"); + + // fill in the req_params + cJSON_AddStringToObject(req_params, "protocol", name); + cJSON_AddStringToObject(req_params, "realm", realm); + + ks_rwl_read_lock(bh->local_nodeid_rwl); + cJSON_AddStringToObject(req_params, "requester-nodeid", bh->local_nodeid); + ks_rwl_read_unlock(bh->local_nodeid_rwl); + + ks_rwl_read_lock(bh->master_nodeid_rwl); + cJSON_AddStringToObject(req_params, "responder-nodeid", bh->master_nodeid); + ks_rwl_read_unlock(bh->master_nodeid_rwl); + + ks_log(KS_LOG_DEBUG, "Session (%s) locate request started\n", blade_session_id_get(bs)); + ret = blade_session_send(bs, req, blade_protocol_locate_response_handler); // @todo add another callback to parameters so caller can be informed of blade.publish response + + // @todo upon return, a provider should register the methods for this protocol to be locally available + // prior to receiving a response, if the response is an error then unregister, but if it is successful + // then the node is already primed to receive any immediate requests + +done: + if (req) cJSON_Delete(req); + if (bs) blade_session_read_unlock(bs); + + return ret; +} + +// blade.locate request handler +ks_bool_t blade_protocol_locate_request_handler(blade_jsonrpc_request_t *breq, void *data) +{ + blade_handle_t *bh = NULL; + blade_session_t *bs = NULL; + cJSON *req = NULL; + cJSON *req_params = NULL; + const char *req_params_protocol = NULL; + const char *req_params_realm = NULL; + const char *req_params_requester_nodeid = NULL; + const char *req_params_responder_nodeid = NULL; + cJSON *res = NULL; + cJSON *res_result = NULL; + cJSON *res_result_providers; + blade_protocol_t *bp = NULL; + const char *bp_key = NULL; + + ks_assert(breq); + + bh = blade_jsonrpc_request_handle_get(breq); + ks_assert(bh); + + bs = blade_handle_sessions_lookup(bh, blade_jsonrpc_request_sessionid_get(breq)); + ks_assert(bs); + + req = blade_jsonrpc_request_message_get(breq); + ks_assert(req); + + req_params = cJSON_GetObjectItem(req, "params"); + if (!req_params) { + ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'params' object\n", blade_session_id_get(bs)); + blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params object"); + blade_session_send(bs, res, NULL); + goto done; + } + + req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol"); + if (!req_params_protocol) { + ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'protocol'\n", blade_session_id_get(bs)); + blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params protocol"); + blade_session_send(bs, res, NULL); + goto done; + } + + req_params_realm = cJSON_GetObjectCstr(req_params, "realm"); + if (!req_params_realm) { + ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'realm'\n", blade_session_id_get(bs)); + blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params realm"); + blade_session_send(bs, res, NULL); + goto done; + } + + // @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now + + req_params_requester_nodeid = cJSON_GetObjectCstr(req_params, "requester-nodeid"); + if (!req_params_requester_nodeid) { + ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'requester-nodeid'\n", blade_session_id_get(bs)); + blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params requester-nodeid"); + blade_session_send(bs, res, NULL); + goto done; + } + + req_params_responder_nodeid = cJSON_GetObjectCstr(req_params, "responder-nodeid"); + if (!req_params_responder_nodeid) { + ks_log(KS_LOG_DEBUG, "Session (%s) locate request missing 'responder-nodeid'\n", blade_session_id_get(bs)); + blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Missing params responder-nodeid"); + blade_session_send(bs, res, NULL); + goto done; + } + + if (!blade_handle_master_nodeid_compare(bh, req_params_responder_nodeid)) { + ks_log(KS_LOG_DEBUG, "Session (%s) locate request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid); + blade_jsonrpc_error_raw_create(&res, NULL, blade_jsonrpc_request_messageid_get(breq), -32602, "Invalid params responder-nodeid"); + blade_session_send(bs, res, NULL); + goto done; + } + + // errors sent above this point are meant to be handled by the first node which receives the request, should not occur after the first node validates + // errors (and the response) sent after this point must include the requester-nodeid and responder-nodeid for proper routing + + if (!blade_handle_local_nodeid_compare(bh, req_params_responder_nodeid)) { + // not meant for local processing, continue with routing which on a locate request, it always goes upstream to the master node + blade_session_t *bsu = blade_handle_sessions_upstream(bh); + if (!bsu) { + cJSON *res_error = NULL; + + ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) but upstream session unavailable\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid); + blade_jsonrpc_error_raw_create(&res, &res_error, blade_jsonrpc_request_messageid_get(breq), -32603, "Upstream session unavailable"); + + // needed in case this error must propagate further than the session which sent it + cJSON_AddStringToObject(res_error, "requester-nodeid", req_params_requester_nodeid); + cJSON_AddStringToObject(res_error, "responder-nodeid", req_params_responder_nodeid); // @todo responder-nodeid should become the local_nodeid to inform of which node actually responded + + blade_session_send(bs, res, NULL); + goto done; + } + + // @todo this creates a new request that is tracked locally, in order to receive the response in a callback to route it correctly, this could be simplified + // by using a couple special fields to indicate common routing approaches based on a routing block in common for every message, thus being able to bypass this + // and still be able to properly route responses without a specific response handler on every intermediate router, in which case messages that are only being + // routed would not enter into these handlers and would not leave a footprint passing through routers + ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) routing upstream (%s)\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid, blade_session_id_get(bsu)); + blade_session_send(bsu, req, blade_protocol_locate_response_handler); + blade_session_read_unlock(bsu); + + goto done; + } + + // this local node must be responder-nodeid for the request, so process the request + ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid); + + res_result_providers = cJSON_CreateObject(); + + bp_key = ks_psprintf(bh->pool, "%s@%s", req_params_protocol, req_params_realm); + + ks_hash_read_lock(bh->protocols); + + bp = (blade_protocol_t *)ks_hash_search(bh->protocols, (void *)bp_key, KS_UNLOCKED); + if (bp) { + ks_hash_t *providers = blade_protocol_providers_get(bp); + for (ks_hash_iterator_t *it = ks_hash_first(providers, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + const char *key = NULL; + void *value = NULL; + + ks_hash_this(it, (const void **)&key, NULL, &value); + + cJSON_AddItemToArray(res_result_providers, cJSON_CreateString(key)); + } + } + + ks_hash_read_unlock(bh->protocols); + + + // build the actual response finally + blade_jsonrpc_response_raw_create(&res, &res_result, blade_jsonrpc_request_messageid_get(breq)); + + cJSON_AddStringToObject(res_result, "protocol", req_params_protocol); + cJSON_AddStringToObject(res_result, "realm", req_params_realm); + cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid); + cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid); + cJSON_AddItemToObject(res_result, "providers", res_result_providers); + + // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup + blade_session_send(bs, res, NULL); + +done: + + if (res) cJSON_Delete(res); + if (bs) blade_session_read_unlock(bs); + + return KS_FALSE; +} + +// blade.locate response handler +ks_bool_t blade_protocol_locate_response_handler(blade_jsonrpc_response_t *bres) +{ + blade_handle_t *bh = NULL; + blade_session_t *bs = NULL; + cJSON *res = NULL; + cJSON *res_error = NULL; + cJSON *res_result = NULL; + cJSON *res_object = NULL; + cJSON *res_result_providers = NULL; + const char *requester_nodeid = NULL; + const char *responder_nodeid = NULL; + const char *res_result_protocol = NULL; + const char *res_result_realm = NULL; + + ks_assert(bres); + + bh = blade_jsonrpc_response_handle_get(bres); + ks_assert(bh); + + bs = blade_handle_sessions_lookup(bh, blade_jsonrpc_response_sessionid_get(bres)); + ks_assert(bs); + + res = blade_jsonrpc_response_message_get(bres); + ks_assert(res); + + res_error = cJSON_GetObjectItem(res, "error"); + res_result = cJSON_GetObjectItem(res, "result"); + + if (!res_error && !res_result) { + ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'error' or 'result' object\n", blade_session_id_get(bs)); + goto done; + } + res_object = res_error ? res_error : res_result; + + requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid"); + responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid"); + + if (requester_nodeid && responder_nodeid && !blade_handle_local_nodeid_compare(bh, requester_nodeid)) { + blade_session_t *bsd = blade_handle_sessions_lookup(bh, requester_nodeid); + if (!bsd) { + ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) but downstream session unavailable\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid); + goto done; + } + + ks_log(KS_LOG_DEBUG, "Session (%s) locate response (%s to %s) routing downstream (%s)\n", blade_session_id_get(bs), requester_nodeid, responder_nodeid, blade_session_id_get(bsd)); + blade_session_send(bsd, res, NULL); + blade_session_read_unlock(bsd); + + goto done; + } + + // this local node must be requester-nodeid for the response, or the response lacks routing nodeids, so process the response + ks_log(KS_LOG_DEBUG, "Session (%s) locate response processing\n", blade_session_id_get(bs)); + + if (res_error) { + // @todo process error response + ks_log(KS_LOG_DEBUG, "Session (%s) locate response error... add details\n", blade_session_id_get(bs)); + goto done; + } + + // process result response + + res_result_protocol = cJSON_GetObjectCstr(res_result, "protocol"); + if (!res_result_protocol) { + ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'protocol'\n", blade_session_id_get(bs)); + goto done; + } + + res_result_realm = cJSON_GetObjectCstr(res_result, "realm"); + if (!res_result_realm) { + ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'realm'\n", blade_session_id_get(bs)); + goto done; + } + + res_result_providers = cJSON_GetObjectItem(res_result, "providers"); + if (!res_result_providers) { + ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'providers'\n", blade_session_id_get(bs)); + goto done; + } + + for (int index = 0; index < cJSON_GetArraySize(res_result_providers); ++index) { + cJSON *elem = cJSON_GetArrayItem(res_result_providers, index); + if (elem->type == cJSON_String) { + ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) provider (%s)\n", blade_session_id_get(bs), res_result_protocol, res_result_realm, elem->valuestring); + } + } + +done: + if (bs) blade_session_read_unlock(bs); + + return KS_FALSE; +} + /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h index 7b9f56aed5..05c47acf0f 100644 --- a/libs/libblade/src/include/blade_stack.h +++ b/libs/libblade/src/include/blade_stack.h @@ -92,6 +92,7 @@ KS_DECLARE(ks_status_t) blade_handle_session_state_callback_unregister(blade_han KS_DECLARE(void) blade_handle_session_state_callbacks_execute(blade_session_t *bs, blade_session_state_condition_t condition); KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm); +KS_DECLARE(ks_status_t) blade_protocol_locate(blade_handle_t *bh, const char *name, const char *realm); KS_END_EXTERN_C #endif diff --git a/libs/libblade/test/blades.c b/libs/libblade/test/blades.c index 4d7b13b09e..6402ecfe81 100644 --- a/libs/libblade/test/blades.c +++ b/libs/libblade/test/blades.c @@ -16,9 +16,11 @@ struct command_def_s { }; void command_quit(blade_handle_t *bh, char *args); +void command_locate(blade_handle_t *bh, char *args); static const struct command_def_s command_defs[] = { { "quit", command_quit }, + { "locate", command_locate }, { NULL, NULL } }; @@ -110,7 +112,7 @@ int main(int argc, char **argv) blade_identity_destroy(&target); - ks_sleep_ms(5000); + ks_sleep_ms(5000); // @todo use session state change callback to know when the session is ready, this ensures it's ready before trying to publish upstream blade_protocol_publish(bh, "test", "mydomain.com"); } @@ -191,6 +193,13 @@ void command_quit(blade_handle_t *bh, char *args) g_shutdown = KS_TRUE; } +void command_locate(blade_handle_t *bh, char *args) +{ + ks_assert(bh); + ks_assert(args); + + blade_protocol_locate(bh, "test", "mydomain.com"); +}