diff --git a/libs/libblade/src/blade_session.c b/libs/libblade/src/blade_session.c index 41333a2bb9..ed7b3e45a2 100644 --- a/libs/libblade/src/blade_session.c +++ b/libs/libblade/src/blade_session.c @@ -257,25 +257,25 @@ KS_DECLARE(ks_hash_t *) blade_session_realms_get(blade_session_t *bs) return bs->realms; } -KS_DECLARE(ks_status_t) blade_session_route_add(blade_session_t *bs, const char *identity) +KS_DECLARE(ks_status_t) blade_session_route_add(blade_session_t *bs, const char *nodeid) { char *key = NULL; ks_assert(bs); - ks_assert(identity); + ks_assert(nodeid); - key = ks_pstrdup(bs->pool, identity); + key = ks_pstrdup(bs->pool, nodeid); ks_hash_insert(bs->routes, (void *)key, (void *)KS_TRUE); return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) blade_session_route_remove(blade_session_t *bs, const char *identity) +KS_DECLARE(ks_status_t) blade_session_route_remove(blade_session_t *bs, const char *nodeid) { ks_assert(bs); - ks_assert(identity); + ks_assert(nodeid); - ks_hash_remove(bs->routes, (void *)identity); + ks_hash_remove(bs->routes, (void *)nodeid); return KS_STATUS_SUCCESS; } @@ -774,12 +774,11 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json) blade_handle_requests_remove(brpcreq); callback = blade_rpc_request_callback_get(brpcreq); - ks_assert(callback); blade_rpc_response_create(&brpcres, bs->handle, bs->pool, bs->id, brpcreq, json); ks_assert(brpcres); - disconnect = callback(brpcres, blade_rpc_request_callback_data_get(brpcreq)); + if (callback) disconnect = callback(brpcres, blade_rpc_request_callback_data_get(brpcreq)); blade_rpc_response_destroy(&brpcres); } diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index dda42e3233..c50f4f8e60 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -87,6 +87,7 @@ struct blade_handle_s { }; +ks_bool_t blade_protocol_register_request_handler(blade_rpc_request_t *brpcreq, void *data); ks_bool_t blade_protocol_publish_request_handler(blade_rpc_request_t *brpcreq, void *data); ks_bool_t blade_protocol_locate_request_handler(blade_rpc_request_t *brpcreq, void *data); ks_bool_t blade_protocol_execute_request_handler(blade_rpc_request_t *brpcreq, void *data); @@ -340,6 +341,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_ } // register internal core rpcs for blade.xxx + blade_rpc_create(&brpc, bh, "blade.register", NULL, NULL, blade_protocol_register_request_handler, NULL); + blade_handle_corerpc_register(brpc); + blade_rpc_create(&brpc, bh, "blade.publish", NULL, NULL, blade_protocol_publish_request_handler, NULL); blade_handle_corerpc_register(brpc); @@ -448,7 +452,7 @@ KS_DECLARE(ks_status_t) blade_handle_local_nodeid_set(blade_handle_t *bh, const goto done; } - if (bh->master_nodeid) ks_pool_free(bh->pool, &bh->local_nodeid); + if (bh->local_nodeid) ks_pool_free(bh->pool, &bh->local_nodeid); if (nodeid) bh->local_nodeid = ks_pstrdup(bh->pool, nodeid); ks_log(KS_LOG_DEBUG, "Local NodeID: %s\n", nodeid); @@ -563,8 +567,7 @@ KS_DECLARE(ks_status_t) blade_handle_route_add(blade_handle_t *bh, const char *n ks_log(KS_LOG_DEBUG, "Route Added: %s through %s\n", key, value); - // @todo when a route is added, upstream needs to be notified that the identity can be found through the session to the - // upstream router, and likewise up the chain to the Master Router Node, to create a complete route from anywhere else + blade_protocol_register(bh, nodeid, KS_FALSE, NULL, NULL); return KS_STATUS_SUCCESS; } @@ -579,10 +582,7 @@ KS_DECLARE(ks_status_t) blade_handle_route_remove(blade_handle_t *bh, const char ks_hash_remove(bh->routes, (void *)nodeid); - // @todo when a route is removed, upstream needs to be notified, for whatever reason the node is no longer available through - // this node so the routes leading here need to be cleared by passing a "blade.register" upstream to remove the routes, this - // should actually happen only for local sessions, and blade.register should be always passed upstream AND processed locally, so - // we don't want to duplicate blade.register calls already being passed up if this route is not a local session + blade_protocol_register(bh, nodeid, KS_TRUE, NULL, NULL); // @note everything below here is for master-only cleanup when a node is no longer routable @@ -1038,7 +1038,7 @@ KS_DECLARE(blade_session_t *) blade_handle_sessions_upstream(blade_handle_t *bh) ks_assert(bh); ks_rwl_read_lock(bh->local_nodeid_rwl); - bs = blade_handle_sessions_lookup(bh, bh->local_nodeid); + if (bh->local_nodeid) bs = blade_handle_sessions_lookup(bh, bh->local_nodeid); ks_rwl_read_unlock(bh->local_nodeid_rwl); return bs; @@ -1126,6 +1126,107 @@ KS_DECLARE(void) blade_handle_session_state_callbacks_execute(blade_session_t *b // @todo all higher level errors should be handled by each of the calls internally so that a normal result response can be sent with an error block inside the result // which is important for implementation of blade.execute where errors can be relayed back to the requester properly +// blade.register request generator +KS_DECLARE(ks_status_t) blade_protocol_register(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + blade_session_t *bs = NULL; + ks_pool_t *pool = NULL; + cJSON *req = NULL; + cJSON *req_params = NULL; + + ks_assert(bh); + ks_assert(nodeid); + + if (!(bs = blade_handle_sessions_upstream(bh))) { + ret = KS_STATUS_DISCONNECTED; + goto done; + } + + pool = blade_handle_pool_get(bh); + ks_assert(pool); + + blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.register"); + + // fill in the req_params + cJSON_AddStringToObject(req_params, "nodeid", nodeid); + if (remove) cJSON_AddTrueToObject(req_params, "remove"); + + ks_log(KS_LOG_DEBUG, "Session (%s) register request (%s %s) started\n", blade_session_id_get(bs), remove ? "removing" : "adding", nodeid); + + ret = blade_session_send(bs, req, callback, data); + +done: + if (req) cJSON_Delete(req); + if (bs) blade_session_read_unlock(bs); + + return ret; +} + +// blade.register request handler +ks_bool_t blade_protocol_register_request_handler(blade_rpc_request_t *brpcreq, void *data) +{ + blade_handle_t *bh = NULL; + blade_session_t *bs = NULL; + cJSON *req = NULL; + cJSON *req_params = NULL; + const char *req_params_nodeid = NULL; + cJSON *req_params_remove = NULL; + ks_bool_t remove = KS_FALSE; + cJSON *res = NULL; + cJSON *res_result = NULL; + + ks_assert(brpcreq); + + bh = blade_rpc_request_handle_get(brpcreq); + ks_assert(bh); + + bs = blade_handle_sessions_lookup(bh, blade_rpc_request_sessionid_get(brpcreq)); + ks_assert(bs); + + req = blade_rpc_request_message_get(brpcreq); + ks_assert(req); + + req_params = cJSON_GetObjectItem(req, "params"); + if (!req_params) { + ks_log(KS_LOG_DEBUG, "Session (%s) register request missing 'params' object\n", blade_session_id_get(bs)); + blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object"); + blade_session_send(bs, res, NULL, NULL); + goto done; + } + + req_params_nodeid = cJSON_GetObjectCstr(req_params, "nodeid"); + if (!req_params_nodeid) { + ks_log(KS_LOG_DEBUG, "Session (%s) register request missing 'nodeid'\n", blade_session_id_get(bs)); + blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params nodeid"); + blade_session_send(bs, res, NULL, NULL); + goto done; + } + req_params_remove = cJSON_GetObjectItem(req_params, "remove"); + remove = req_params_remove && req_params_remove->type == cJSON_True; + + ks_log(KS_LOG_DEBUG, "Session (%s) register request (%s %s) processing\n", blade_session_id_get(bs), remove ? "removing" : "adding", req_params_nodeid); + + if (remove) { + blade_session_route_remove(bs, req_params_nodeid); + blade_handle_route_remove(bh, req_params_nodeid); + } else { + blade_session_route_add(bs, req_params_nodeid); + blade_handle_route_add(bh, req_params_nodeid, blade_session_id_get(bs)); + } + + blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq)); + blade_session_send(bs, res, NULL, NULL); + +done: + + if (res) cJSON_Delete(res); + if (bs) blade_session_read_unlock(bs); + + return KS_FALSE; +} + + // blade.publish request generator KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data) { @@ -1167,10 +1268,6 @@ KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *n ret = blade_session_send(bs, req, callback, data); - // @todo upon return, a provider should register the methods for this protocol to be locally available - // prior to receiving a response, if the response is an error then unregister, but if it is successful - // then the node is already primed to receive any immediate requests - done: if (req) cJSON_Delete(req); if (bs) blade_session_read_unlock(bs); @@ -1724,96 +1821,6 @@ KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcr blade_session_read_unlock(bs); } -// blade.locate response handler -//ks_bool_t blade_protocol_locate_response_handler(blade_rpc_response_t *brpcres, void *data) -//{ -// ks_bool_t ret = KS_FALSE; -// blade_handle_t *bh = NULL; -// blade_session_t *bs = NULL; -// blade_rpc_response_callback_wrapper_t *wrapper = NULL; -// blade_rpc_response_callback_t callback = NULL; -// cJSON *res = NULL; -// cJSON *res_error = NULL; -// cJSON *res_result = NULL; -// cJSON *res_object = NULL; -// //cJSON *res_result_providers = NULL; -// const char *requester_nodeid = NULL; -// const char *responder_nodeid = NULL; -// //const char *res_result_protocol = NULL; -// //const char *res_result_realm = NULL; -// -// ks_assert(brpcres); -// ks_assert(data); -// -// wrapper = (blade_rpc_response_callback_wrapper_t *)data; -// callback = wrapper->callback; -// data = wrapper->data; -// ks_pool_free(wrapper->pool, &wrapper); -// -// bh = blade_rpc_response_handle_get(brpcres); -// ks_assert(bh); -// -// bs = blade_handle_sessions_lookup(bh, blade_rpc_response_sessionid_get(brpcres)); -// ks_assert(bs); -// -// res = blade_rpc_response_message_get(brpcres); -// ks_assert(res); -// -// res_error = cJSON_GetObjectItem(res, "error"); -// res_result = cJSON_GetObjectItem(res, "result"); -// -// if (!res_error && !res_result) { -// ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'error' or 'result' object\n", blade_session_id_get(bs)); -// goto done; -// } -// res_object = res_error ? res_error : res_result; -// -// requester_nodeid = cJSON_GetObjectCstr(res_object, "requester-nodeid"); -// responder_nodeid = cJSON_GetObjectCstr(res_object, "responder-nodeid"); -// -// ks_log(KS_LOG_DEBUG, "Session (%s) locate response processing\n", blade_session_id_get(bs)); -// -// if (callback) ret = callback(brpcres, data); -// -// //if (res_error) { -// // // @todo process error response -// // ks_log(KS_LOG_DEBUG, "Session (%s) locate response error... add details\n", blade_session_id_get(bs)); -// // goto done; -// //} -// -// // process result response -// -// //res_result_protocol = cJSON_GetObjectCstr(res_result, "protocol"); -// //if (!res_result_protocol) { -// // ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'protocol'\n", blade_session_id_get(bs)); -// // goto done; -// //} -// -// //res_result_realm = cJSON_GetObjectCstr(res_result, "realm"); -// //if (!res_result_realm) { -// // ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'realm'\n", blade_session_id_get(bs)); -// // goto done; -// //} -// -// //res_result_providers = cJSON_GetObjectItem(res_result, "providers"); -// //if (!res_result_providers) { -// // ks_log(KS_LOG_DEBUG, "Session (%s) locate response missing 'providers'\n", blade_session_id_get(bs)); -// // goto done; -// //} -// -// //for (int index = 0; index < cJSON_GetArraySize(res_result_providers); ++index) { -// // cJSON *elem = cJSON_GetArrayItem(res_result_providers, index); -// // if (elem->type == cJSON_String) { -// // ks_log(KS_LOG_DEBUG, "Session (%s) locate (%s@%s) provider (%s)\n", blade_session_id_get(bs), res_result_protocol, res_result_realm, elem->valuestring); -// // } -// //} -// -//done: -// if (bs) blade_session_read_unlock(bs); -// -// return ret; -//} - /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libblade/src/include/blade_session.h b/libs/libblade/src/include/blade_session.h index 612fdbccea..b84ac257ad 100644 --- a/libs/libblade/src/include/blade_session.h +++ b/libs/libblade/src/include/blade_session.h @@ -46,8 +46,8 @@ KS_DECLARE(blade_session_state_t) blade_session_state_get(blade_session_t *bs); KS_DECLARE(ks_status_t) blade_session_realm_add(blade_session_t *bs, const char *realm); KS_DECLARE(ks_status_t) blade_session_realm_remove(blade_session_t *bs, const char *realm); KS_DECLARE(ks_hash_t *) blade_session_realms_get(blade_session_t *bs); -KS_DECLARE(ks_status_t) blade_session_route_add(blade_session_t *bs, const char *identity); -KS_DECLARE(ks_status_t) blade_session_route_remove(blade_session_t *bs, const char *identity); +KS_DECLARE(ks_status_t) blade_session_route_add(blade_session_t *bs, const char *nodeid); +KS_DECLARE(ks_status_t) blade_session_route_remove(blade_session_t *bs, const char *nodeid); KS_DECLARE(cJSON *) blade_session_properties_get(blade_session_t *bs); KS_DECLARE(ks_status_t) blade_session_read_lock(blade_session_t *bs, ks_bool_t block); KS_DECLARE(ks_status_t) blade_session_read_unlock(blade_session_t *bs); diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h index 7095ecb192..46c449f251 100644 --- a/libs/libblade/src/include/blade_stack.h +++ b/libs/libblade/src/include/blade_stack.h @@ -95,6 +95,8 @@ KS_DECLARE(ks_status_t) blade_handle_session_state_callback_register(blade_handl KS_DECLARE(ks_status_t) blade_handle_session_state_callback_unregister(blade_handle_t *bh, const char *id); KS_DECLARE(void) blade_handle_session_state_callbacks_execute(blade_session_t *bs, blade_session_state_condition_t condition); +KS_DECLARE(ks_status_t) blade_protocol_register(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data); + KS_DECLARE(ks_status_t) blade_protocol_publish(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data); KS_DECLARE(ks_status_t) blade_protocol_locate(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data);