diff --git a/libs/libblade/src/blade_rpc.c b/libs/libblade/src/blade_rpc.c index ca1a3712cc..29510344f1 100644 --- a/libs/libblade/src/blade_rpc.c +++ b/libs/libblade/src/blade_rpc.c @@ -42,7 +42,7 @@ struct blade_rpc_s { const char *realm; blade_rpc_request_callback_t callback; - cJSON *data; + void *data; }; struct blade_rpc_request_s { @@ -54,7 +54,7 @@ struct blade_rpc_request_s { cJSON *message; const char *message_id; // pulled from message for easier keying blade_rpc_response_callback_t callback; - cJSON *data; + void *data; // @todo ttl to wait for response before injecting an error response locally }; @@ -72,22 +72,22 @@ struct blade_rpc_response_s { static void blade_rpc_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type) { - blade_rpc_t *brpc = (blade_rpc_t *)ptr; + //blade_rpc_t *brpc = (blade_rpc_t *)ptr; - ks_assert(brpc); + //ks_assert(brpc); switch (action) { case KS_MPCL_ANNOUNCE: break; case KS_MPCL_TEARDOWN: - if (brpc->data) cJSON_Delete(brpc->data); + // @todo delete data if present, requires update to ks_pool for self tracking the pool in allocation header break; case KS_MPCL_DESTROY: break; } } -KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, cJSON *data) +KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, void *data) { blade_rpc_t *brpc = NULL; ks_pool_t *pool = NULL; @@ -170,7 +170,7 @@ KS_DECLARE(blade_rpc_request_callback_t) blade_rpc_callback_get(blade_rpc_t *brp return brpc->callback; } -KS_DECLARE(cJSON *) blade_rpc_data_get(blade_rpc_t *brpc) +KS_DECLARE(void *) blade_rpc_data_get(blade_rpc_t *brpc) { ks_assert(brpc); @@ -190,7 +190,7 @@ static void blade_rpc_request_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_ case KS_MPCL_TEARDOWN: ks_pool_free(brpcreq->pool, (void **)&brpcreq->session_id); cJSON_Delete(brpcreq->message); - if (brpcreq->data) cJSON_Delete(brpcreq->data); + // @todo delete data if present, requires update to ks_pool for self tracking the pool in allocation header break; case KS_MPCL_DESTROY: break; @@ -203,7 +203,7 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP, const char *session_id, cJSON *json, blade_rpc_response_callback_t callback, - cJSON *data) + void *data) { blade_rpc_request_t *brpcreq = NULL; @@ -278,7 +278,7 @@ KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_r return brpcreq->callback; } -KS_DECLARE(cJSON *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq) +KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq) { ks_assert(brpcreq); return brpcreq->data; diff --git a/libs/libblade/src/blade_session.c b/libs/libblade/src/blade_session.c index aecd52c273..92fb4144b1 100644 --- a/libs/libblade/src/blade_session.c +++ b/libs/libblade/src/blade_session.c @@ -582,7 +582,7 @@ ks_status_t blade_session_onstate_run(blade_session_t *bs) } -KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, cJSON *data) +KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data) { blade_rpc_request_t *brpcreq = NULL; const char *method = NULL; diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index 0f153e951f..c2538b5470 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -47,14 +47,14 @@ struct blade_handle_s { blade_sessionmgr_t *sessionmgr; }; -ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, cJSON *data); -ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, cJSON *data); -ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON *data); -ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, cJSON *data); -ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, cJSON *data); -ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON *data); -ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJSON *data); -ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, cJSON *data); +ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *data); +ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *data); +ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void *data); +ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *data); +ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *data); +ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void *data); +ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, void *data); +ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data); static void blade_handle_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type) @@ -352,8 +352,20 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio // @todo all higher level errors should be handled by each of the calls internally so that a normal result response can be sent with an error block inside the result // which is important for implementation of blade.execute where errors can be relayed back to the requester properly +typedef struct blade_rpcsubscribe_data_s blade_rpcsubscribe_data_t; +struct blade_rpcsubscribe_data_s { + ks_pool_t *pool; + blade_rpc_response_callback_t original_callback; + void *original_data; + blade_rpc_request_callback_t channel_callback; + void *channel_data; + const char *relayed_messageid; +}; + +ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data); + // blade.register request generator -KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, cJSON *data) +KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data) { ks_status_t ret = KS_STATUS_SUCCESS; blade_session_t *bs = NULL; @@ -390,7 +402,7 @@ done: } // blade.register request handler -ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) +ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *data) { blade_handle_t *bh = NULL; blade_session_t *bs = NULL; @@ -454,7 +466,7 @@ done: // blade.publish request generator -KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data) +KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data) { ks_status_t ret = KS_STATUS_SUCCESS; blade_session_t *bs = NULL; @@ -514,7 +526,7 @@ done: } // blade.publish request handler -ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) +ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *data) { blade_handle_t *bh = NULL; blade_session_t *bs = NULL; @@ -644,7 +656,7 @@ done: // blade.authorize request generator -KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data) +KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data) { ks_status_t ret = KS_STATUS_SUCCESS; blade_session_t *bs = NULL; @@ -704,7 +716,7 @@ done: } // blade.authorize request handler -ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) +ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, void *data) { blade_handle_t *bh = NULL; blade_session_t *bs = NULL; @@ -830,8 +842,6 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON if (remove) { if (!res_result_unauthorized_channels) res_result_unauthorized_channels = cJSON_CreateArray(); cJSON_AddItemToArray(res_result_unauthorized_channels, cJSON_CreateString(channel->valuestring)); - // @todo unauthorizing channels should force a subscribe remove request for the target if they are subscribed, to prevent further events from reaching the target - // this will require the master node to invoke the subscription removal as opposed to the target who normally invokes subscribe } else { if (!res_result_authorized_channels) res_result_authorized_channels = cJSON_CreateArray(); cJSON_AddItemToArray(res_result_authorized_channels, cJSON_CreateString(channel->valuestring)); @@ -854,6 +864,10 @@ ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup blade_session_send(bs, res, NULL, NULL); + if (res_result_unauthorized_channels) { + blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, NULL, res_result_unauthorized_channels, req_params_authorized_nodeid, KS_TRUE, NULL, NULL); + } + done: if (res) cJSON_Delete(res); @@ -867,7 +881,7 @@ done: // @todo discuss system to support caching locate results, and internally subscribing to receive event updates related to protocols which have been located // to ensure local caches remain synced when protocol controllers change, but this requires additional filters for event propagating to avoid broadcasting // every protocol update to everyone which may actually be a better way than an explicit locate request -KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, cJSON *data) +KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, void *data) { ks_status_t ret = KS_STATUS_SUCCESS; blade_session_t *bs = NULL; @@ -918,7 +932,7 @@ done: } // blade.locate request handler -ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) +ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *data) { blade_handle_t *bh = NULL; blade_session_t *bs = NULL; @@ -1021,7 +1035,7 @@ done: // blade.execute request generator -KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data) +KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data) { ks_status_t ret = KS_STATUS_SUCCESS; blade_session_t *bs = NULL; @@ -1074,7 +1088,7 @@ done: } // blade.execute request handler -ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) +ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *data) { ks_bool_t ret = KS_FALSE; blade_handle_t *bh = NULL; @@ -1303,20 +1317,40 @@ KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJ } +static void blade_rpcsubscribe_data_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type) +{ + blade_rpcsubscribe_data_t *brpcsd = (blade_rpcsubscribe_data_t *)ptr; + + ks_assert(brpcsd); + + switch (action) { + case KS_MPCL_ANNOUNCE: + break; + case KS_MPCL_TEARDOWN: + if (brpcsd->relayed_messageid) ks_pool_free(brpcsd->pool, &brpcsd->relayed_messageid); + break; + case KS_MPCL_DESTROY: + break; + } +} + // blade.subscribe request generator -KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, cJSON *data, blade_rpc_request_callback_t channel_callback, cJSON *channel_data) +KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data) { ks_status_t ret = KS_STATUS_SUCCESS; + ks_pool_t *pool = NULL; blade_session_t *bs = NULL; const char *localid = NULL; - blade_subscription_t *bsub = NULL; - cJSON *temp_data = NULL; + blade_rpcsubscribe_data_t *temp_data = NULL; ks_assert(bh); ks_assert(protocol); ks_assert(realm); ks_assert(subscribe_channels || unsubscribe_channels); + pool = blade_handle_pool_get(bh); + ks_assert(pool); + // @note this is always produced by a subscriber, and sent upstream, master will only use the internal raw call if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) { ret = KS_STATUS_DISCONNECTED; @@ -1326,21 +1360,16 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char blade_upstreammgr_localid_copy(bh->upstreammgr, bh->pool, &localid); ks_assert(localid); - if (unsubscribe_channels) { - cJSON *channel = NULL; - cJSON_ArrayForEach(channel, unsubscribe_channels) { - blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, &bsub, protocol, realm, channel->valuestring, localid); - } - } - - temp_data = cJSON_CreateObject(); + // @note since this is allocated in the handle's pool, if the handle is shutdown during a pending request, then the data + // memory will be cleaned up with the handle, otherwise should be cleaned up in the response callback + temp_data = (blade_rpcsubscribe_data_t *)ks_pool_alloc(pool, sizeof(blade_rpcsubscribe_data_t)); + temp_data->pool = pool; + temp_data->original_callback = callback; + temp_data->original_data = data; + temp_data->channel_callback = channel_callback; + temp_data->channel_data = channel_data; + ks_pool_set_cleanup(pool, temp_data, NULL, blade_rpcsubscribe_data_cleanup); - if (callback) cJSON_AddItemToObject(temp_data, "callback", cJSON_CreatePtr((uintptr_t)callback)); - if (data) cJSON_AddItemToObject(temp_data, "data", data); - - if (channel_callback) cJSON_AddItemToObject(temp_data, "channel-callback", cJSON_CreatePtr((uintptr_t)channel_callback)); - if (channel_data) cJSON_AddItemToObject(temp_data, "channel-data", channel_data); - ret = blade_handle_rpcsubscribe_raw(bh, protocol, realm, subscribe_channels, unsubscribe_channels, localid, KS_FALSE, blade_rpcsubscribe_response_handler, temp_data); ks_pool_free(bh->pool, &localid); @@ -1351,7 +1380,7 @@ done: return ret; } -KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, cJSON *data) +ks_status_t blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, blade_rpcsubscribe_data_t *data) { ks_status_t ret = KS_STATUS_SUCCESS; blade_session_t *bs = NULL; @@ -1384,6 +1413,13 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const pool = blade_handle_pool_get(bh); ks_assert(pool); + if (unsubscribe_channels) { + cJSON *channel = NULL; + cJSON_ArrayForEach(channel, unsubscribe_channels) { + blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, protocol, realm, channel->valuestring, subscriber); + } + } + blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.subscribe"); cJSON_AddStringToObject(req_params, "protocol", protocol); @@ -1406,7 +1442,7 @@ done: } // blade.subscribe request handler -ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) +ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void *data) { blade_handle_t *bh = NULL; blade_session_t *bs = NULL; @@ -1491,16 +1527,18 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request processing\n", blade_session_id_get(bs)); - if (req_params_unsubscribe_channels) { - cJSON *channel = NULL; - cJSON_ArrayForEach(channel, req_params_unsubscribe_channels) { - blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid); - } - } - masterlocal = blade_upstreammgr_masterlocal(blade_handle_upstreammgr_get(bh)); if (masterlocal || blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bh), req_params_subscriber_nodeid)) { + // @note This is normally handled by blade_handle_rpcsubscribe_raw() to ensure authorization removals are processed during the request path + // including on the node they start on, whether that is the master or the subscriber + if (req_params_unsubscribe_channels) { + cJSON *channel = NULL; + cJSON_ArrayForEach(channel, req_params_unsubscribe_channels) { + blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid); + } + } + blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq)); cJSON_AddStringToObject(res_result, "protocol", req_params_protocol); @@ -1532,13 +1570,12 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup blade_session_send(bs, res, NULL, NULL); } else { - cJSON *temp_data = cJSON_CreateObject(); - - // @note track this so that when this local node gets a response to this propagated request we know what messageid to propagate the response with - cJSON_AddStringToObject(temp_data, "messageid", blade_rpc_request_messageid_get(brpcreq)); + blade_rpcsubscribe_data_t *temp_data = (blade_rpcsubscribe_data_t *)ks_pool_alloc(pool, sizeof(blade_rpcsubscribe_data_t)); + temp_data->pool = pool; + temp_data->relayed_messageid = ks_pstrdup(pool, blade_rpc_request_messageid_get(brpcreq)); + ks_pool_set_cleanup(pool, temp_data, NULL, blade_rpcsubscribe_data_cleanup); blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, req_params_subscribe_channels, req_params_unsubscribe_channels, req_params_subscriber_nodeid, downstream, blade_rpcsubscribe_response_handler, temp_data); - cJSON_Delete(temp_data); } done: @@ -1550,16 +1587,13 @@ done: } // blade.subscribe response handler -ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJSON *data) +ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, void *data) { ks_bool_t ret = KS_FALSE; + blade_rpc_request_t *brpcreq = NULL; blade_handle_t *bh = NULL; blade_session_t *bs = NULL; - blade_rpc_response_callback_t original_callback = NULL; - cJSON *original_data = NULL; - blade_rpc_request_callback_t channel_callback = NULL; - cJSON *channel_data = NULL; - const char *messageid = NULL; + blade_rpcsubscribe_data_t *temp_data = NULL; cJSON *res = NULL; cJSON *res_result = NULL; const char *res_result_protocol = NULL; @@ -1573,19 +1607,15 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJS ks_assert(brpcres); ks_assert(data); + brpcreq = blade_rpc_response_request_get(brpcres); + bh = blade_rpc_response_handle_get(brpcres); ks_assert(bh); bs = blade_sessionmgr_session_lookup(bh->sessionmgr, blade_rpc_response_sessionid_get(brpcres)); ks_assert(bs); - original_data = cJSON_GetObjectItem(data, "data"); - original_callback = (blade_rpc_response_callback_t)(uintptr_t)cJSON_GetObjectPtr(data, "callback"); - channel_data = cJSON_GetObjectItem(data, "channel-data"); - channel_callback = (blade_rpc_request_callback_t)(uintptr_t)cJSON_GetObjectPtr(data, "channel-callback"); - - // @note when messageid exists, it means this message is only intended to be examined and relayed, the local node is not the subscriber - messageid = cJSON_GetObjectCstr(data, "messageid"); + temp_data = (blade_rpcsubscribe_data_t *)data; res = blade_rpc_response_message_get(brpcres); ks_assert(res); @@ -1631,15 +1661,15 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJS cJSON_ArrayForEach(channel, res_result_subscribe_channels) { blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, &bsub, res_result_protocol, res_result_realm, channel->valuestring, res_result_subscriber_nodeid); // @note these will only get assigned on the last response, received by the subscriber - if (channel_callback) blade_subscription_callback_set(bsub, channel_callback); - if (channel_data) blade_subscription_callback_data_set(bsub, channel_data); + if (temp_data && temp_data->channel_callback) blade_subscription_callback_set(bsub, temp_data->channel_callback); + if (temp_data && temp_data->channel_data) blade_subscription_callback_data_set(bsub, temp_data->channel_data); } } // @note this will only happen on the last response, received by the subscriber - if (original_callback) ret = original_callback(brpcres, original_data); + if (temp_data && temp_data->original_callback) ret = temp_data->original_callback(brpcres, temp_data->original_data); - if (messageid) { + if (temp_data && temp_data->relayed_messageid) { blade_session_t *relay = NULL; if (downstream) { if (!(relay = blade_upstreammgr_session_get(bh->upstreammgr))) { @@ -1651,7 +1681,7 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJS } } - blade_rpc_response_raw_create(&res, &res_result, messageid); + blade_rpc_response_raw_create(&res, &res_result, temp_data->relayed_messageid); cJSON_AddStringToObject(res_result, "protocol", res_result_protocol); cJSON_AddStringToObject(res_result, "realm", res_result_realm); @@ -1668,13 +1698,14 @@ ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJS } done: + if (temp_data) ks_pool_free(temp_data->pool, &temp_data); blade_session_read_unlock(bs); return ret; } // blade.broadcast request generator -KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data) +KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data) { ks_status_t ret = KS_STATUS_SUCCESS; @@ -1693,7 +1724,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char } // blade.broadcast request handler -ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) +ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data) { ks_bool_t ret = KS_FALSE; blade_handle_t *bh = NULL; diff --git a/libs/libblade/src/blade_subscriptionmgr.c b/libs/libblade/src/blade_subscriptionmgr.c index ea40e5c114..fc34b32d71 100644 --- a/libs/libblade/src/blade_subscriptionmgr.c +++ b/libs/libblade/src/blade_subscriptionmgr.c @@ -292,7 +292,7 @@ KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, con } } -KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data) +KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data) { const char *bsub_key = NULL; blade_subscription_t *bsub = NULL; diff --git a/libs/libblade/src/include/blade_rpc.h b/libs/libblade/src/include/blade_rpc.h index f68b09f8c1..a966088af4 100644 --- a/libs/libblade/src/include/blade_rpc.h +++ b/libs/libblade/src/include/blade_rpc.h @@ -36,14 +36,14 @@ #include KS_BEGIN_EXTERN_C -KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, cJSON *data); +KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, void *data); KS_DECLARE(ks_status_t) blade_rpc_destroy(blade_rpc_t **brpcP); KS_DECLARE(blade_handle_t *) blade_rpc_handle_get(blade_rpc_t *brpc); KS_DECLARE(const char *) blade_rpc_method_get(blade_rpc_t *brpc); KS_DECLARE(const char *) blade_rpc_protocol_get(blade_rpc_t *brpc); KS_DECLARE(const char *) blade_rpc_realm_get(blade_rpc_t *brpc); KS_DECLARE(blade_rpc_request_callback_t) blade_rpc_callback_get(blade_rpc_t *brpc); -KS_DECLARE(cJSON *) blade_rpc_data_get(blade_rpc_t *brpc); +KS_DECLARE(void *) blade_rpc_data_get(blade_rpc_t *brpc); KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP, blade_handle_t *bh, @@ -51,7 +51,7 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP, const char *session_id, cJSON *json, blade_rpc_response_callback_t callback, - cJSON *data); + void *data); KS_DECLARE(ks_status_t) blade_rpc_request_destroy(blade_rpc_request_t **brpcreqP); KS_DECLARE(ks_status_t) blade_rpc_request_duplicate(blade_rpc_request_t **brpcreqP, blade_rpc_request_t *brpcreq); KS_DECLARE(blade_handle_t *) blade_rpc_request_handle_get(blade_rpc_request_t *brpcreq); @@ -59,7 +59,7 @@ KS_DECLARE(const char *) blade_rpc_request_sessionid_get(blade_rpc_request_t *br KS_DECLARE(cJSON *) blade_rpc_request_message_get(blade_rpc_request_t *brpcreq); KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *brpcreq); KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq); -KS_DECLARE(cJSON *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq); +KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq); KS_DECLARE(ks_status_t) blade_rpc_request_raw_create(ks_pool_t *pool, cJSON **json, cJSON **params, const char **id, const char *method); diff --git a/libs/libblade/src/include/blade_session.h b/libs/libblade/src/include/blade_session.h index 710464e859..b84ac257ad 100644 --- a/libs/libblade/src/include/blade_session.h +++ b/libs/libblade/src/include/blade_session.h @@ -62,7 +62,7 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs); KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs); KS_DECLARE(const char *) blade_session_connection_get(blade_session_t *bs); KS_DECLARE(ks_status_t) blade_session_connection_set(blade_session_t *bs, const char *id); -KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, cJSON *data); +KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data); KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json); KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json); KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json); diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h index 80f6671053..211a2d1d8a 100644 --- a/libs/libblade/src/include/blade_stack.h +++ b/libs/libblade/src/include/blade_stack.h @@ -59,25 +59,24 @@ KS_DECLARE(blade_sessionmgr_t *) blade_handle_sessionmgr_get(blade_handle_t *bh) KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id); -KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, cJSON *data); +KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data); -KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data); +KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data); -KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data); +KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, void *data); -KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, cJSON *data); +KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, void *data); -KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data); +KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data); KS_DECLARE(const char *) blade_rpcexecute_request_requester_nodeid_get(blade_rpc_request_t *brpcreq); KS_DECLARE(const char *) blade_rpcexecute_request_responder_nodeid_get(blade_rpc_request_t *brpcreq); KS_DECLARE(cJSON *) blade_rpcexecute_request_params_get(blade_rpc_request_t *brpcreq); KS_DECLARE(cJSON *) blade_rpcexecute_response_result_get(blade_rpc_response_t *brpcres); KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJSON *result); -KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, cJSON *data, blade_rpc_request_callback_t channel_callback, cJSON *channel_data); -KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, cJSON *data); +KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t channel_callback, void *channel_data); -KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data); +KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data); KS_DECLARE(cJSON *) blade_rpcbroadcast_request_params_get(blade_rpc_request_t *brpcreq); KS_END_EXTERN_C diff --git a/libs/libblade/src/include/blade_subscriptionmgr.h b/libs/libblade/src/include/blade_subscriptionmgr.h index 9b4b0bda2c..f903d23012 100644 --- a/libs/libblade/src/include/blade_subscriptionmgr.h +++ b/libs/libblade/src/include/blade_subscriptionmgr.h @@ -43,7 +43,7 @@ KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(bla KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber); KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber); KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target); -KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data); +KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, void *data); KS_END_EXTERN_C #endif diff --git a/libs/libblade/src/include/blade_types.h b/libs/libblade/src/include/blade_types.h index df7f7ae8c1..2c1eee0396 100644 --- a/libs/libblade/src/include/blade_types.h +++ b/libs/libblade/src/include/blade_types.h @@ -62,8 +62,8 @@ typedef struct blade_connectionmgr_s blade_connectionmgr_t; typedef struct blade_sessionmgr_s blade_sessionmgr_t; typedef struct blade_session_callback_data_s blade_session_callback_data_t; -typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, cJSON *data); -typedef ks_bool_t (*blade_rpc_response_callback_t)(blade_rpc_response_t *brpcres, cJSON *data); +typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, void *data); +typedef ks_bool_t (*blade_rpc_response_callback_t)(blade_rpc_response_t *brpcres, void *data); typedef enum { diff --git a/libs/libblade/test/testcli.c b/libs/libblade/test/testcli.c index be09279ee7..d618b8c823 100644 --- a/libs/libblade/test/testcli.c +++ b/libs/libblade/test/testcli.c @@ -18,6 +18,8 @@ struct command_def_s { void command_quit(blade_handle_t *bh, char *args); void command_locate(blade_handle_t *bh, char *args); void command_join(blade_handle_t *bh, char *args); +void command_subscribe(blade_handle_t *bh, char *args); +void command_unsubscribe(blade_handle_t *bh, char *args); void command_leave(blade_handle_t *bh, char *args); void command_talk(blade_handle_t *bh, char *args); @@ -25,6 +27,8 @@ static const struct command_def_s command_defs[] = { { "quit", command_quit }, { "locate", command_locate }, { "join", command_join }, + { "subscribe", command_subscribe }, + { "unsubscribe", command_unsubscribe }, { "leave", command_leave }, { "talk", command_talk }, @@ -33,7 +37,7 @@ static const struct command_def_s command_defs[] = { const char *g_testcon_nodeid = NULL; -ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, cJSON *data) +ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, void *data) { blade_handle_t *bh = NULL; blade_session_t *bs = NULL; @@ -87,7 +91,7 @@ ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, cJSON *dat return KS_FALSE; } -ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, cJSON *data) +ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, void *data) { blade_handle_t *bh = NULL; blade_session_t *bs = NULL; @@ -111,7 +115,7 @@ ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, cJSON *data) return KS_FALSE; } -ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, cJSON *data) +ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, void *data) { blade_handle_t *bh = NULL; blade_session_t *bs = NULL; @@ -135,7 +139,7 @@ ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, cJSON *data return KS_FALSE; } -ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, cJSON *data) +ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, void *data) { blade_handle_t *bh = NULL; blade_session_t *bs = NULL; @@ -159,7 +163,35 @@ ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, cJSON *data) return KS_FALSE; } -ks_bool_t test_broadcast_handler(blade_rpc_request_t *brpcreq, cJSON *data) +ks_bool_t test_subscribe_response_handler(blade_rpc_response_t *brpcres, void *data) +{ + blade_handle_t *bh = NULL; + blade_session_t *bs = NULL; + cJSON *res = NULL; + cJSON *res_result = NULL; + + ks_assert(brpcres); + + bh = blade_rpc_response_handle_get(brpcres); + ks_assert(bh); + + bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_response_sessionid_get(brpcres)); + ks_assert(bs); + + res = blade_rpc_response_message_get(brpcres); + ks_assert(res); + + res_result = cJSON_GetObjectItem(res, "result"); + ks_assert(res_result); + + ks_log(KS_LOG_DEBUG, "Session (%s) blade.subscribe response processing\n", blade_session_id_get(bs)); + + blade_session_read_unlock(bs); + + return KS_FALSE; +} + +ks_bool_t test_channel_handler(blade_rpc_request_t *brpcreq, void *data) { blade_handle_t *bh = NULL; blade_session_t *bs = NULL; @@ -177,7 +209,7 @@ ks_bool_t test_broadcast_handler(blade_rpc_request_t *brpcreq, cJSON *data) params = blade_rpcbroadcast_request_params_get(brpcreq); ks_assert(params); - ks_log(KS_LOG_DEBUG, "Session (%s) test broadcast processing\n", blade_session_id_get(bs)); + ks_log(KS_LOG_DEBUG, "Session (%s) test channel event processing\n", blade_session_id_get(bs)); blade_session_read_unlock(bs); @@ -327,7 +359,6 @@ void command_locate(blade_handle_t *bh, char *args) void command_join(blade_handle_t *bh, char *args) { cJSON *params = NULL; - cJSON *channels = NULL; ks_assert(bh); ks_assert(args); @@ -337,14 +368,38 @@ void command_join(blade_handle_t *bh, char *args) return; } - params = cJSON_CreateObject(); blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.join", "test", "mydomain.com", params, test_join_response_handler, NULL); cJSON_Delete(params); +} + +void command_subscribe(blade_handle_t *bh, char *args) +{ + cJSON *channels = NULL; + + if (!g_testcon_nodeid) { + ks_log(KS_LOG_DEBUG, "Protocol controller has not been located\n"); + return; + } channels = cJSON_CreateArray(); - cJSON_AddItemToArray(channels, cJSON_CreateString("test")); - blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, NULL, NULL, test_broadcast_handler, NULL); + cJSON_AddItemToArray(channels, cJSON_CreateString("channel")); + blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, NULL, NULL, test_channel_handler, NULL); + cJSON_Delete(channels); +} + +void command_unsubscribe(blade_handle_t *bh, char *args) +{ + cJSON *channels = NULL; + + if (!g_testcon_nodeid) { + ks_log(KS_LOG_DEBUG, "Protocol controller has not been located\n"); + return; + } + + channels = cJSON_CreateArray(); + cJSON_AddItemToArray(channels, cJSON_CreateString("channel")); + blade_handle_rpcsubscribe(bh, "test", "mydomain.com", NULL, channels, test_subscribe_response_handler, NULL, test_channel_handler, NULL); cJSON_Delete(channels); } @@ -364,11 +419,6 @@ void command_leave(blade_handle_t *bh, char *args) params = cJSON_CreateObject(); blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.leave", "test", "mydomain.com", params, test_leave_response_handler, NULL); cJSON_Delete(params); - - channels = cJSON_CreateArray(); - cJSON_AddItemToArray(channels, cJSON_CreateString("test")); - blade_handle_rpcsubscribe(bh, "test", "mydomain.com", NULL, channels, NULL, NULL, NULL, NULL); - cJSON_Delete(channels); } void command_talk(blade_handle_t *bh, char *args) diff --git a/libs/libblade/test/testcon.c b/libs/libblade/test/testcon.c index 52eab40696..ec3a50c705 100644 --- a/libs/libblade/test/testcon.c +++ b/libs/libblade/test/testcon.c @@ -88,7 +88,7 @@ ks_status_t testproto_destroy(testproto_t **testP) return KS_STATUS_SUCCESS; } -ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, cJSON *data) +ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, void *data) { //testproto_t *test = NULL; blade_handle_t *bh = NULL; @@ -97,7 +97,7 @@ ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, cJSON *da ks_assert(brpcres); ks_assert(data); - //test = (testproto_t *)cJSON_GetPtrValue(data); + //test = (testproto_t *)data; bh = blade_rpc_response_handle_get(brpcres); ks_assert(bh); @@ -112,7 +112,7 @@ ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, cJSON *da return KS_FALSE; } -ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) +ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data) { testproto_t *test = NULL; blade_handle_t *bh = NULL; @@ -126,7 +126,7 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) ks_assert(brpcreq); ks_assert(data); - test = (testproto_t *)cJSON_GetPtrValue(data); + test = (testproto_t *)data; bh = blade_rpc_request_handle_get(brpcreq); ks_assert(bh); @@ -183,7 +183,7 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) return KS_FALSE; } -ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) +ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, void *data) { testproto_t *test = NULL; blade_handle_t *bh = NULL; @@ -191,12 +191,13 @@ ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) const char *requester_nodeid = NULL; //const char *key = NULL; cJSON *params = NULL; + cJSON *channels = NULL; cJSON *result = NULL; ks_assert(brpcreq); ks_assert(data); - test = (testproto_t *)cJSON_GetPtrValue(data); + test = (testproto_t *)data; bh = blade_rpc_request_handle_get(brpcreq); ks_assert(bh); @@ -216,22 +217,36 @@ ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) ks_hash_remove(test->participants, (void *)requester_nodeid); ks_hash_write_unlock(test->participants); - blade_session_read_unlock(bs); + // deauthorize channels with the master for the requester + channels = cJSON_CreateArray(); + cJSON_AddItemToArray(channels, cJSON_CreateString("channel")); + blade_handle_rpcauthorize(bh, requester_nodeid, KS_TRUE, "test", "mydomain.com", channels, NULL, NULL); + + cJSON_Delete(channels); + + // send rpcexecute response to the requester result = cJSON_CreateObject(); blade_rpcexecute_response_send(brpcreq, result); + cJSON_Delete(result); + + blade_session_read_unlock(bs); + + // broadcast to authorized nodes that have subscribed, that the requester has left params = cJSON_CreateObject(); cJSON_AddStringToObject(params, "leaver-nodeid", requester_nodeid); blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "leave", params, NULL, NULL); + cJSON_Delete(params); + return KS_FALSE; } -ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) +ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, void *data) { //testproto_t *test = NULL; blade_handle_t *bh = NULL; @@ -244,7 +259,7 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) ks_assert(brpcreq); ks_assert(data); - //test = (testproto_t *)cJSON_GetPtrValue(data); + //test = (testproto_t *)data; bh = blade_rpc_request_handle_get(brpcreq); ks_assert(bh); @@ -263,12 +278,16 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) ks_log(KS_LOG_DEBUG, "Session (%s) test.talk (%s) request processing\n", blade_session_id_get(bs), requester_nodeid); - blade_session_read_unlock(bs); - + // send rpcexecute response to the requester result = cJSON_CreateObject(); blade_rpcexecute_response_send(brpcreq, result); + cJSON_Delete(result); + + blade_session_read_unlock(bs); + + // broadcast to authorized nodes that have subscribed, that the requester has said something params = cJSON_CreateObject(); cJSON_AddStringToObject(params, "text", text); @@ -277,6 +296,8 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data) blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "talk", params, NULL, NULL); + cJSON_Delete(params); + return KS_FALSE; } @@ -345,19 +366,21 @@ int main(int argc, char **argv) // @todo use session state change callback to know when the session is ready and the realm(s) available from blade.connect, this hack temporarily ensures it's ready before trying to publish upstream ks_sleep_ms(3000); - blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, cJSON_CreatePtr((uintptr_t)test)); + blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, (void *)test); blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc); - blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, cJSON_CreatePtr((uintptr_t)test)); + blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, (void *)test); blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc); - blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, cJSON_CreatePtr((uintptr_t)test)); + blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, (void *)test); blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc); channels = cJSON_CreateArray(); cJSON_AddItemToArray(channels, cJSON_CreateString("channel")); - blade_handle_rpcpublish(bh, "test", "mydomain.com", channels, test_publish_response_handler, cJSON_CreatePtr((uintptr_t)test)); + blade_handle_rpcpublish(bh, "test", "mydomain.com", channels, test_publish_response_handler, (void *)test); + + cJSON_Delete(channels); } }