FS-10167: Temporary commit for peer review

This commit is contained in:
Shane Bryldt 2017-07-25 11:00:45 -06:00
parent f1ae0b3841
commit c6e60de302
25 changed files with 1078 additions and 415 deletions

View File

@ -38,8 +38,7 @@ struct blade_mastermgr_s {
ks_pool_t *pool;
// @todo how does "exclusive" play into the controllers, does "exclusive" mean only one provider can exist for a given protocol and realm? what does non exclusive mean?
ks_hash_t *protocols; // protocols that have been published with blade.publish, and the details to locate a protocol provider with blade.locate
ks_hash_t *protocols_cleanup; // keyed by the nodeid, each value is a hash_t* of which contains string keys matching the "protocol@realm" keys to remove each nodeid from as a provider during cleanup
ks_hash_t *protocols; // protocols that have been published with blade.publish, and the details to locate a protocol controller with blade.locate
};
@ -76,9 +75,6 @@ KS_DECLARE(ks_status_t) blade_mastermgr_create(blade_mastermgr_t **bmmgrP, blade
ks_hash_create(&bmmgr->protocols, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bmmgr->pool);
ks_assert(bmmgr->protocols);
ks_hash_create(&bmmgr->protocols_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bmmgr->pool);
ks_assert(bmmgr->protocols_cleanup);
ks_pool_set_cleanup(pool, bmmgr, NULL, blade_mastermgr_cleanup);
*bmmgrP = bmmgr;
@ -113,6 +109,40 @@ KS_DECLARE(blade_handle_t *) blade_mastermgr_handle_get(blade_mastermgr_t *bmmgr
return bmmgr->handle;
}
KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const char *nodeid)
{
ks_hash_t *cleanup = NULL;
ks_hash_write_lock(bmmgr->protocols);
for (ks_hash_iterator_t *it = ks_hash_first(bmmgr->protocols, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const char *key = NULL;
blade_protocol_t *bp = NULL;
ks_hash_this(it, (const void **)&key, NULL, (void **)&bp);
if (blade_protocol_purge(bp, nodeid)) {
if (!cleanup) ks_hash_create(&cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, bmmgr->pool);
ks_hash_insert(cleanup, key, bp);
}
}
if (cleanup) {
for (ks_hash_iterator_t *it = ks_hash_first(cleanup, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const char *key = NULL;
blade_protocol_t *bp = NULL;
ks_hash_this(it, (const void **)&key, NULL, (void **)&bp);
ks_log(KS_LOG_DEBUG, "Protocol Removed: %s\n", key);
ks_hash_remove(bmmgr->protocols, key);
}
ks_hash_destroy(&cleanup);
}
ks_hash_write_unlock(bmmgr->protocols);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(blade_protocol_t *) blade_mastermgr_protocol_lookup(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm)
{
blade_protocol_t *bp = NULL;
@ -135,7 +165,6 @@ KS_DECLARE(ks_status_t) blade_mastermgr_controller_add(blade_mastermgr_t *bmmgr,
{
blade_protocol_t *bp = NULL;
char *key = NULL;
ks_hash_t *cleanup = NULL;
ks_assert(bmmgr);
ks_assert(protocol);
@ -159,60 +188,131 @@ KS_DECLARE(ks_status_t) blade_mastermgr_controller_add(blade_mastermgr_t *bmmgr,
ks_hash_insert(bmmgr->protocols, (void *)ks_pstrdup(bmmgr->pool, key), bp);
}
cleanup = (ks_hash_t *)ks_hash_search(bmmgr->protocols_cleanup, (void *)controller, KS_UNLOCKED);
if (!cleanup) {
ks_hash_create(&cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bmmgr->pool);
ks_assert(cleanup);
ks_hash_insert(bmmgr->protocols_cleanup, (void *)ks_pstrdup(bmmgr->pool, controller), cleanup);
}
ks_hash_insert(cleanup, (void *)ks_pstrdup(bmmgr->pool, key), (void *)KS_TRUE);
blade_protocol_controllers_add(bp, controller);
ks_log(KS_LOG_DEBUG, "Protocol Controller Added: %s to %s\n", controller, key);
ks_pool_free(bmmgr->pool, &key);
ks_hash_write_unlock(bmmgr->protocols);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_mastermgr_controller_remove(blade_mastermgr_t *bmmgr, const char *controller)
KS_DECLARE(ks_status_t) blade_mastermgr_channel_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel)
{
ks_hash_t *cleanup = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
blade_protocol_t *bp = NULL;
char *key = NULL;
ks_assert(bmmgr);
ks_assert(controller);
ks_assert(protocol);
ks_assert(realm);
ks_assert(channel);
ks_hash_write_lock(bmmgr->protocols);
cleanup = (ks_hash_t *)ks_hash_search(bmmgr->protocols_cleanup, (void *)controller, KS_UNLOCKED);
if (cleanup) {
for (ks_hash_iterator_t *it = ks_hash_first(cleanup, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
key = ks_psprintf(bmmgr->pool, "%s@%s", protocol, realm);
bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_READLOCKED);
if (!bp) {
ret = KS_STATUS_NOT_FOUND;
goto done;
}
blade_protocol_channel_add(bp, channel);
done:
ks_pool_free(bmmgr->pool, &key);
ks_hash_read_unlock(bmmgr->protocols);
return ret;
}
KS_DECLARE(ks_status_t) blade_mastermgr_channel_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_protocol_t *bp = NULL;
ks_hash_t *controllers = NULL;
char *key = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
ks_assert(bmmgr);
ks_assert(protocol);
ks_assert(realm);
ks_assert(channel);
bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, key, KS_UNLOCKED);
ks_assert(bp); // should not happen when a cleanup still has a provider tracked for a protocol
key = ks_psprintf(bmmgr->pool, "%s@%s", protocol, realm);
ks_log(KS_LOG_DEBUG, "Protocol Controller Removed: %s from %s\n", controller, key);
blade_protocol_controllers_remove(bp, controller);
controllers = blade_protocol_controllers_get(bp);
if (ks_hash_count(controllers) == 0) {
// @note this depends on locking something outside of the protocol that won't be destroyed, like the top level
// protocols hash, but assumes then that any reader keeps the top level hash read locked while using the protocol
// so it cannot be deleted
ks_log(KS_LOG_DEBUG, "Protocol Removed: %s\n", key);
ks_hash_remove(bmmgr->protocols, key);
bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_READLOCKED);
if (!bp) {
ret = KS_STATUS_NOT_FOUND;
goto done;
}
}
ks_hash_remove(bmmgr->protocols_cleanup, (void *)controller);
}
ks_hash_write_unlock(bmmgr->protocols);
return KS_STATUS_SUCCESS;
blade_protocol_channel_remove(bp, channel);
done:
ks_pool_free(bmmgr->pool, &key);
ks_hash_read_unlock(bmmgr->protocols);
return ret;
}
KS_DECLARE(ks_status_t) blade_mastermgr_channel_authorize(blade_mastermgr_t *bmmgr, ks_bool_t remove, const char *protocol, const char *realm, const char *channel, const char *controller, const char *target)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_protocol_t *bp = NULL;
char *key = NULL;
//ks_hash_t *cleanup = NULL;
ks_assert(bmmgr);
ks_assert(protocol);
ks_assert(realm);
ks_assert(channel);
ks_assert(controller);
ks_assert(target);
key = ks_psprintf(bmmgr->pool, "%s@%s", protocol, realm);
bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_READLOCKED);
if (!bp) {
ret = KS_STATUS_NOT_FOUND;
goto done;
}
ret = blade_protocol_channel_authorize(bp, remove, channel, controller, target);
done:
ks_pool_free(bmmgr->pool, &key);
ks_hash_read_unlock(bmmgr->protocols);
return ret;
}
KS_DECLARE(ks_bool_t) blade_mastermgr_channel_verify(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel, const char *target)
{
ks_bool_t ret = KS_FALSE;
blade_protocol_t *bp = NULL;
char *key = NULL;
//ks_hash_t *cleanup = NULL;
ks_assert(bmmgr);
ks_assert(protocol);
ks_assert(realm);
ks_assert(channel);
ks_assert(target);
key = ks_psprintf(bmmgr->pool, "%s@%s", protocol, realm);
bp = (blade_protocol_t *)ks_hash_search(bmmgr->protocols, (void *)key, KS_READLOCKED);
if (!bp) goto done;
ret = blade_protocol_channel_verify(bp, channel, target);
done:
ks_pool_free(bmmgr->pool, &key);
ks_hash_read_unlock(bmmgr->protocols);
return ret;
}
/* For Emacs:

View File

@ -39,6 +39,7 @@ struct blade_protocol_s {
const char *name;
const char *realm;
ks_hash_t *controllers;
ks_hash_t *channels;
// @todo descriptors (schema, etc) for each method within a protocol
};
@ -56,6 +57,7 @@ static void blade_protocol_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_poo
if (bp->name) ks_pool_free(bp->pool, &bp->name);
if (bp->realm) ks_pool_free(bp->pool, &bp->realm);
if (bp->controllers) ks_hash_destroy(&bp->controllers);
if (bp->channels) ks_hash_destroy(&bp->channels);
break;
case KS_MPCL_DESTROY:
break;
@ -76,9 +78,12 @@ KS_DECLARE(ks_status_t) blade_protocol_create(blade_protocol_t **bpP, ks_pool_t
bp->name = ks_pstrdup(pool, name);
bp->realm = ks_pstrdup(pool, realm);
ks_hash_create(&bp->controllers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
ks_hash_create(&bp->controllers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
ks_assert(bp->controllers);
ks_hash_create(&bp->channels, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
ks_assert(bp->channels);
ks_pool_set_cleanup(pool, bp, NULL, blade_protocol_cleanup);
*bpP = bp;
@ -100,11 +105,53 @@ KS_DECLARE(ks_status_t) blade_protocol_destroy(blade_protocol_t **bpP)
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_hash_t *) blade_protocol_controllers_get(blade_protocol_t *bp)
KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid)
{
ks_assert(bp);
return bp->controllers;
ks_bool_t ret = KS_FALSE;
ks_assert(bp);
ks_assert(nodeid);
// @todo iterate all channels, remove the nodeid from all authorized hashes
ks_hash_write_lock(bp->channels);
for (ks_hash_iterator_t *it = ks_hash_first(bp->channels, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const char *key = NULL;
ks_hash_t *authorizations = NULL;
ks_hash_this(it, (const void **)&key, NULL, (void **)&authorizations);
if (ks_hash_remove(authorizations, nodeid)) {
ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Removed: %s from %s@%s/%s\n", nodeid, bp->name, bp->realm, key);
}
}
ks_hash_write_unlock(bp->channels);
ks_hash_write_lock(bp->controllers);
if (ks_hash_remove(bp->controllers, (void *)nodeid)) {
ks_log(KS_LOG_DEBUG, "Protocol Controller Removed: %s from %s@%s\n", nodeid, bp->name, bp->realm);
}
ret = ks_hash_count(bp->controllers) == 0;
ks_hash_write_unlock(bp->controllers);
return ret;
}
KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp)
{
cJSON *controllers = cJSON_CreateObject();
ks_assert(bp);
for (ks_hash_iterator_t *it = ks_hash_first(bp->controllers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const char *key = NULL;
void *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
cJSON_AddItemToArray(controllers, cJSON_CreateString(key));
}
return controllers;
}
KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, const char *nodeid)
@ -117,19 +164,108 @@ KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, con
key = ks_pstrdup(bp->pool, nodeid);
ks_hash_insert(bp->controllers, (void *)key, (void *)KS_TRUE);
return KS_STATUS_SUCCESS;
ks_log(KS_LOG_DEBUG, "Protocol Controller Added: %s to %s@%s\n", nodeid, bp->name, bp->realm);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_protocol_controllers_remove(blade_protocol_t *bp, const char *nodeid)
KS_DECLARE(ks_status_t) blade_protocol_channel_add(blade_protocol_t *bp, const char *name)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_hash_t *authorized = NULL;
char *key = NULL;
ks_assert(bp);
ks_assert(name);
ks_hash_write_lock(bp->channels);
if (ks_hash_search(bp->channels, name, KS_UNLOCKED)) {
ret = KS_STATUS_DUPLICATE_OPERATION;
goto done;
}
ks_hash_create(&authorized, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bp->pool);
key = ks_pstrdup(bp->pool, name);
ks_hash_insert(bp->channels, (void *)key, (void *)authorized);
ks_log(KS_LOG_DEBUG, "Protocol Channel Added: %s to %s@%s\n", key, bp->name, bp->realm);
done:
ks_hash_write_unlock(bp->channels);
return ret;
}
KS_DECLARE(ks_status_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name)
{
ks_assert(bp);
ks_assert(nodeid);
ks_assert(name);
ks_hash_remove(bp->controllers, (void *)nodeid);
ks_hash_remove(bp->channels, (void *)name);
ks_log(KS_LOG_DEBUG, "Protocol Channel Removed: %s from %s@%s\n", name, bp->name, bp->realm);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, ks_bool_t remove, const char *channel, const char *controller, const char *target)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_hash_t *authorizations = NULL;
ks_bool_t allowed = KS_FALSE;
ks_assert(bp);
ks_assert(channel);
ks_assert(controller);
ks_assert(target);
allowed = (ks_bool_t)(intptr_t)ks_hash_search(bp->controllers, (void *)controller, KS_READLOCKED);
ks_hash_read_unlock(bp->controllers);
if (!allowed) {
ret = KS_STATUS_NOT_ALLOWED;
goto done;
}
// @todo verify controller, get ks_hash_t* value based on channel, add target to the channels hash
authorizations = (ks_hash_t *)ks_hash_search(bp->channels, (void *)channel, KS_READLOCKED);
if (authorizations) {
if (remove) {
if (ks_hash_remove(authorizations, (void *)target)) {
ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Removed: %s from %s@%s/%s\n", target, bp->name, bp->realm, channel);
} else ret = KS_STATUS_NOT_FOUND;
}
else {
ks_hash_insert(authorizations, (void *)ks_pstrdup(bp->pool, target), (void *)KS_TRUE);
ks_log(KS_LOG_DEBUG, "Protocol Channel Authorization Added: %s to %s@%s/%s\n", target, bp->name, bp->realm, channel);
}
}
ks_hash_read_unlock(bp->channels);
if (!authorizations) ret = KS_STATUS_NOT_FOUND;
done:
return ret;
}
KS_DECLARE(ks_bool_t) blade_protocol_channel_verify(blade_protocol_t *bp, const char *channel, const char *target)
{
ks_bool_t ret = KS_FALSE;
ks_hash_t *authorizations = NULL;
ks_assert(bp);
ks_assert(channel);
ks_assert(target);
// @todo verify controller, get ks_hash_t* value based on channel, add target to the channels hash
authorizations = (ks_hash_t *)ks_hash_search(bp->channels, (void *)channel, KS_READLOCKED);
if (authorizations) ret = ks_hash_search(authorizations, target, KS_UNLOCKED) != NULL;
ks_hash_read_unlock(bp->channels);
return ret;
}
/* For Emacs:

View File

@ -157,12 +157,14 @@ KS_DECLARE(ks_status_t) blade_routemgr_route_remove(blade_routemgr_t *brmgr, con
blade_handle_rpcregister(brmgr->handle, target, KS_TRUE, NULL, NULL);
blade_subscriptionmgr_purge(blade_handle_subscriptionmgr_get(brmgr->handle), target);
// @note protocols are cleaned up here because routes can be removed that are not locally connected with a session but still
// have protocols published to the master node from further downstream, in which case if a route is announced upstream to be
// removed, a master node is still able to catch that here even when there is no direct session, but is also hit when there
// is a direct session being terminated
blade_mastermgr_controller_remove(blade_handle_mastermgr_get(brmgr->handle), target);
blade_mastermgr_purge(blade_handle_mastermgr_get(brmgr->handle), target);
return KS_STATUS_SUCCESS;
}

View File

@ -42,7 +42,7 @@ struct blade_rpc_s {
const char *realm;
blade_rpc_request_callback_t callback;
void *data;
cJSON *data;
};
struct blade_rpc_request_s {
@ -54,7 +54,7 @@ struct blade_rpc_request_s {
cJSON *message;
const char *message_id; // pulled from message for easier keying
blade_rpc_response_callback_t callback;
void *data;
cJSON *data;
// @todo ttl to wait for response before injecting an error response locally
};
@ -72,21 +72,22 @@ struct blade_rpc_response_s {
static void blade_rpc_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
{
//blade_rpc_t *brpc = (blade_rpc_t *)ptr;
blade_rpc_t *brpc = (blade_rpc_t *)ptr;
//ks_assert(brpc);
ks_assert(brpc);
switch (action) {
case KS_MPCL_ANNOUNCE:
break;
case KS_MPCL_TEARDOWN:
if (brpc->data) cJSON_Delete(brpc->data);
break;
case KS_MPCL_DESTROY:
break;
}
}
KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, void *data)
KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, cJSON *data)
{
blade_rpc_t *brpc = NULL;
ks_pool_t *pool = NULL;
@ -169,7 +170,7 @@ KS_DECLARE(blade_rpc_request_callback_t) blade_rpc_callback_get(blade_rpc_t *brp
return brpc->callback;
}
KS_DECLARE(void *) blade_rpc_data_get(blade_rpc_t *brpc)
KS_DECLARE(cJSON *) blade_rpc_data_get(blade_rpc_t *brpc)
{
ks_assert(brpc);
@ -189,6 +190,7 @@ static void blade_rpc_request_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_
case KS_MPCL_TEARDOWN:
ks_pool_free(brpcreq->pool, (void **)&brpcreq->session_id);
cJSON_Delete(brpcreq->message);
if (brpcreq->data) cJSON_Delete(brpcreq->data);
break;
case KS_MPCL_DESTROY:
break;
@ -201,7 +203,7 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
const char *session_id,
cJSON *json,
blade_rpc_response_callback_t callback,
void *data)
cJSON *data)
{
blade_rpc_request_t *brpcreq = NULL;
@ -276,7 +278,7 @@ KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_r
return brpcreq->callback;
}
KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq)
KS_DECLARE(cJSON *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq)
{
ks_assert(brpcreq);
return brpcreq->data;

View File

@ -582,7 +582,7 @@ ks_status_t blade_session_onstate_run(blade_session_t *bs)
}
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data)
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, cJSON *data)
{
blade_rpc_request_t *brpcreq = NULL;
const char *method = NULL;
@ -706,6 +706,12 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
blade_session_send(bs_router, json, NULL, NULL);
blade_session_read_unlock(bs_router);
// @todo if this is a subscribe request to remove subscriptions, it must carry a field unsubscribed-channels for which
// subscriptions should be removed along the request path, regardless of whether it's the consumer requesting or the
// master due to a deauthorization, without respect to waiting for the response as it should be a gaurenteed operation
// even when requested by the subscriber. This unsubscribed-channels field is simply treated as a special field, regardless
// of the actual method of the request.
return KS_STATUS_SUCCESS;
}
}
@ -763,6 +769,11 @@ ks_status_t blade_session_process(blade_session_t *bs, cJSON *json)
blade_session_send(bs_router, json, NULL, NULL);
blade_session_read_unlock(bs_router);
// @todo if this is a subscribe response to add a subscriber with the master as the responder-nodeid, it must have a
// subscribed-channels field which should be added for the requester-nodeid, this will ensure the subscriptions are
// added with respect to the master response and not immediately upon request. This subscribed-channels field is
// simply treated as a special field, regardless of the actual method of the request which is unavailable here.
return KS_STATUS_SUCCESS;
}
}

View File

@ -200,8 +200,6 @@ KS_DECLARE(ks_status_t) blade_sessionmgr_session_remove(blade_sessionmgr_t *bsmg
ks_log(KS_LOG_DEBUG, "Session Removed: %s\n", id);
blade_subscriptionmgr_subscriber_cleanup(blade_handle_subscriptionmgr_get(bsmgr->handle), id);
if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), id)) {
blade_upstreammgr_localid_set(blade_handle_upstreammgr_get(bsmgr->handle), NULL);
blade_upstreammgr_masterid_set(blade_handle_upstreammgr_get(bsmgr->handle), NULL);

View File

@ -47,12 +47,14 @@ struct blade_handle_s {
blade_sessionmgr_t *sessionmgr;
};
ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data);
ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJSON *data);
ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, cJSON *data);
static void blade_handle_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
@ -225,6 +227,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
blade_rpc_create(&brpc, bh, "blade.publish", NULL, NULL, blade_rpcpublish_request_handler, NULL);
blade_rpcmgr_corerpc_add(bh->rpcmgr, brpc);
blade_rpc_create(&brpc, bh, "blade.authorize", NULL, NULL, blade_rpcauthorize_request_handler, NULL);
blade_rpcmgr_corerpc_add(bh->rpcmgr, brpc);
blade_rpc_create(&brpc, bh, "blade.locate", NULL, NULL, blade_rpclocate_request_handler, NULL);
blade_rpcmgr_corerpc_add(bh->rpcmgr, brpc);
@ -348,7 +353,7 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio
// which is important for implementation of blade.execute where errors can be relayed back to the requester properly
// blade.register request generator
KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -385,7 +390,7 @@ done:
}
// blade.register request handler
ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, void *data)
ks_bool_t blade_rpcregister_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -449,7 +454,7 @@ done:
// blade.publish request generator
KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data)
KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -459,7 +464,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *
const char *id = NULL;
ks_assert(bh);
ks_assert(name);
ks_assert(protocol);
ks_assert(realm);
// @todo consideration for the Master trying to publish a protocol, with no upstream
@ -474,7 +479,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.publish");
// fill in the req_params
cJSON_AddStringToObject(req_params, "protocol", name);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &id);
@ -489,6 +494,12 @@ KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *
cJSON_AddStringToObject(req_params, "responder-nodeid", id);
ks_pool_free(pool, &id);
// @todo may want to switch this system to use a blade_rpcpublish_args_t with validation on the contents on this list internally
// and to produce the entire json block internally in case the channel args change to include additional information like an encryption key,
// however if passing encryption keys then they should be asymetrically encrypted using a public key provided by the master so that
// the channel keys can be transmitted without intermediate nodes being able to snoop them
if (channels) cJSON_AddItemToObject(req_params, "channels", cJSON_Duplicate(channels, 1));
// @todo add a parameter containing a block of json for schema definitions for each of the methods being published
ks_log(KS_LOG_DEBUG, "Session (%s) publish request started\n", blade_session_id_get(bs));
@ -503,12 +514,13 @@ done:
}
// blade.publish request handler
ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *data)
ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
cJSON *req_params_channels = NULL;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
const char *req_params_requester_nodeid = NULL;
@ -576,10 +588,41 @@ ks_bool_t blade_rpcpublish_request_handler(blade_rpc_request_t *brpcreq, void *d
goto done;
}
req_params_channels = cJSON_GetObjectItem(req_params, "channels");
if (req_params_channels) {
int size = 0;
if (req_params_channels->type != cJSON_Array) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' type, expected array\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
size = cJSON_GetArraySize(req_params_channels);
for (int index = 0; index < size; ++index) {
cJSON *element = cJSON_GetArrayItem(req_params_channels, index);
if (element->type != cJSON_String) {
ks_log(KS_LOG_DEBUG, "Session (%s) publish request invalid 'channels' element type, expected string\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
}
}
ks_log(KS_LOG_DEBUG, "Session (%s) publish request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
blade_mastermgr_controller_add(bh->mastermgr, req_params_protocol, req_params_realm, req_params_requester_nodeid);
if (req_params_channels) {
int size = cJSON_GetArraySize(req_params_channels);
for (int index = 0; index < size; ++index) {
cJSON *element = cJSON_GetArrayItem(req_params_channels, index);
blade_mastermgr_channel_add(bh->mastermgr, req_params_protocol, req_params_realm, element->valuestring);
}
}
// build the actual response finally
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
@ -600,11 +643,8 @@ done:
}
// blade.locate request generator
// @todo discuss system to support caching locate results, and internally subscribing to receive event updates related to protocols which have been located
// to ensure local caches remain synced when protocol controllers change, but this requires additional filters for event propagating to avoid broadcasting
// every protocol update to everyone which may actually be a better way than an explicit locate request
KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data)
// blade.authorize request generator
KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -614,7 +654,230 @@ KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *n
const char *id = NULL;
ks_assert(bh);
ks_assert(name);
ks_assert(nodeid);
ks_assert(protocol);
ks_assert(realm);
ks_assert(channels);
// @todo consideration for the Master trying to publish a protocol, with no upstream
if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
pool = blade_handle_pool_get(bh);
ks_assert(pool);
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.authorize");
// fill in the req_params
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
if (remove) cJSON_AddTrueToObject(req_params, "remove");
cJSON_AddStringToObject(req_params, "authorized-nodeid", nodeid);
blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &id);
ks_assert(id);
cJSON_AddStringToObject(req_params, "requester-nodeid", id);
ks_pool_free(pool, &id);
blade_upstreammgr_masterid_copy(bh->upstreammgr, pool, &id);
ks_assert(id);
cJSON_AddStringToObject(req_params, "responder-nodeid", id);
ks_pool_free(pool, &id);
cJSON_AddItemToObject(req_params, "channels", cJSON_Duplicate(channels, 1));
// @todo add a parameter containing a block of json for schema definitions for each of the methods being published
ks_log(KS_LOG_DEBUG, "Session (%s) authorize request started\n", blade_session_id_get(bs));
ret = blade_session_send(bs, req, callback, data);
done:
if (req) cJSON_Delete(req);
if (bs) blade_session_read_unlock(bs);
return ret;
}
// blade.authorize request handler
ks_bool_t blade_rpcauthorize_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
cJSON *req_params_channels = NULL;
cJSON *req_params_remove = NULL;
cJSON *channel = NULL;
ks_bool_t remove = KS_FALSE;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
const char *req_params_authorized_nodeid = NULL;
const char *req_params_requester_nodeid = NULL;
const char *req_params_responder_nodeid = NULL;
cJSON *res = NULL;
cJSON *res_result = NULL;
cJSON *res_result_authorized_channels = NULL;
cJSON *res_result_unauthorized_channels = NULL;
cJSON *res_result_failed_channels = NULL;
ks_assert(brpcreq);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_request_sessionid_get(brpcreq));
ks_assert(bs);
req = blade_rpc_request_message_get(brpcreq);
ks_assert(req);
req_params = cJSON_GetObjectItem(req, "params");
if (!req_params) {
ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'params' object\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'protocol'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_realm = cJSON_GetObjectCstr(req_params, "realm");
if (!req_params_realm) {
ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'realm'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params realm");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_remove = cJSON_GetObjectItem(req_params, "remove");
if (req_params_remove && req_params_remove->type == cJSON_True) remove = KS_TRUE;
req_params_authorized_nodeid = cJSON_GetObjectCstr(req_params, "authorized-nodeid");
if (!req_params_authorized_nodeid) {
ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'authorized-nodeid'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params authorized-nodeid");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
// @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now
req_params_requester_nodeid = cJSON_GetObjectCstr(req_params, "requester-nodeid");
if (!req_params_requester_nodeid) {
ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'requester-nodeid'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params requester-nodeid");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_responder_nodeid = cJSON_GetObjectCstr(req_params, "responder-nodeid");
if (!req_params_responder_nodeid) {
ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'responder-nodeid'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params responder-nodeid");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_channels = cJSON_GetObjectItem(req_params, "channels");
if (!req_params_channels) {
ks_log(KS_LOG_DEBUG, "Session (%s) authorize request missing 'channels'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channels");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
if (req_params_channels->type != cJSON_Array) {
ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'channels' type, expected array\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
cJSON_ArrayForEach(channel, req_params_channels) {
if (channel->type != cJSON_String) {
ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'channels' element type, expected string\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params channels");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
}
if (!blade_upstreammgr_masterid_compare(bh->upstreammgr, req_params_responder_nodeid)) {
ks_log(KS_LOG_DEBUG, "Session (%s) authorize request invalid 'responder-nodeid' (%s)\n", blade_session_id_get(bs), req_params_responder_nodeid);
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Invalid params responder-nodeid");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
ks_log(KS_LOG_DEBUG, "Session (%s) authorize request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
// build the actual response finally
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
cJSON_ArrayForEach(channel, req_params_channels) {
if (blade_mastermgr_channel_authorize(bh->mastermgr, remove, req_params_protocol, req_params_realm, channel->valuestring, req_params_requester_nodeid, req_params_authorized_nodeid) == KS_STATUS_SUCCESS) {
if (remove) {
if (!res_result_unauthorized_channels) res_result_unauthorized_channels = cJSON_CreateArray();
cJSON_AddItemToArray(res_result_unauthorized_channels, cJSON_CreateString(channel->valuestring));
// @todo unauthorizing channels should force a subscribe remove request for the target if they are subscribed, to prevent further events from reaching the target
// this will require the master node to invoke the subscription removal as opposed to the target who normally invokes subscribe
} else {
if (!res_result_authorized_channels) res_result_authorized_channels = cJSON_CreateArray();
cJSON_AddItemToArray(res_result_authorized_channels, cJSON_CreateString(channel->valuestring));
}
} else {
if (!res_result_failed_channels) res_result_failed_channels = cJSON_CreateArray();
cJSON_AddItemToArray(res_result_failed_channels, cJSON_CreateString(channel->valuestring));
}
}
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
cJSON_AddStringToObject(res_result, "authorized-nodeid", req_params_authorized_nodeid);
cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
if (res_result_authorized_channels) cJSON_AddItemToObject(res_result, "authorized-channels", res_result_authorized_channels);
if (res_result_unauthorized_channels) cJSON_AddItemToObject(res_result, "unauthorized-channels", res_result_unauthorized_channels);
if (res_result_failed_channels) cJSON_AddItemToObject(res_result, "failed-channels", res_result_failed_channels);
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
done:
if (res) cJSON_Delete(res);
if (bs) blade_session_read_unlock(bs);
return KS_FALSE;
}
// blade.locate request generator
// @todo discuss system to support caching locate results, and internally subscribing to receive event updates related to protocols which have been located
// to ensure local caches remain synced when protocol controllers change, but this requires additional filters for event propagating to avoid broadcasting
// every protocol update to everyone which may actually be a better way than an explicit locate request
KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
ks_pool_t *pool = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
const char *id = NULL;
ks_assert(bh);
ks_assert(protocol);
ks_assert(realm);
if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
@ -628,7 +891,7 @@ KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *n
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.locate");
// fill in the req_params
cJSON_AddStringToObject(req_params, "protocol", name);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &id);
@ -655,7 +918,7 @@ done:
}
// blade.locate request handler
ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *data)
ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -667,7 +930,7 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
const char *req_params_responder_nodeid = NULL;
cJSON *res = NULL;
cJSON *res_result = NULL;
cJSON *res_result_controllers;
cJSON *res_result_controllers = NULL;
blade_protocol_t *bp = NULL;
ks_assert(brpcreq);
@ -732,20 +995,8 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
ks_log(KS_LOG_DEBUG, "Session (%s) locate request (%s to %s) processing\n", blade_session_id_get(bs), req_params_requester_nodeid, req_params_responder_nodeid);
res_result_controllers = cJSON_CreateObject();
bp = blade_mastermgr_protocol_lookup(bh->mastermgr, req_params_protocol, req_params_realm);
if (bp) {
ks_hash_t *controllers = blade_protocol_controllers_get(bp);
for (ks_hash_iterator_t *it = ks_hash_first(controllers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const char *key = NULL;
void *value = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
cJSON_AddItemToArray(res_result_controllers, cJSON_CreateString(key));
}
}
if (bp) res_result_controllers = blade_protocol_controllers_pack(bp);
// build the actual response finally
@ -755,7 +1006,7 @@ ks_bool_t blade_rpclocate_request_handler(blade_rpc_request_t *brpcreq, void *da
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
cJSON_AddStringToObject(res_result, "requester-nodeid", req_params_requester_nodeid);
cJSON_AddStringToObject(res_result, "responder-nodeid", req_params_responder_nodeid);
cJSON_AddItemToObject(res_result, "controllers", res_result_controllers);
if (res_result_controllers) cJSON_AddItemToObject(res_result, "controllers", res_result_controllers);
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
@ -770,7 +1021,7 @@ done:
// blade.execute request generator
KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -813,12 +1064,6 @@ KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *
ks_log(KS_LOG_DEBUG, "Session (%s) execute request started\n", blade_session_id_get(bs));
// @todo change what blade_rpc_request_t carries for tracking data, use a tuple instead which makes it
// easier to free the tuple and potentially associated data if a request needs to be destroyed without
// the callback being called to know that the data is a tuple to destroy, meanwhile a tuple offers a
// spare pointer for universally wrapping callback + data pairs in a case like this
// in which case do not create the tuple here, just pass 2 data pointers to send and let it store them
// in the internal tuple
ret = blade_session_send(bs, req, callback, data);
done:
@ -829,7 +1074,7 @@ done:
}
// blade.execute request handler
ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, void *data)
ks_bool_t blade_rpcexecute_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
ks_bool_t ret = KS_FALSE;
blade_handle_t *bh = NULL;
@ -1059,19 +1304,20 @@ KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJ
// blade.subscribe request generator
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data)
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, cJSON *data, blade_rpc_request_callback_t channel_callback, cJSON *channel_data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
const char *localid = NULL;
ks_bool_t propagate = KS_FALSE;
blade_subscription_t *bsub = NULL;
cJSON *temp_data = NULL;
ks_assert(bh);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
ks_assert(subscribe_channels || unsubscribe_channels);
// @note this is always produced by a subscriber, and sent upstream, master will only use the internal raw call
if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
@ -1080,28 +1326,32 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char
blade_upstreammgr_localid_copy(bh->upstreammgr, bh->pool, &localid);
ks_assert(localid);
if (remove) {
propagate = blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, &bsub, event, protocol, realm, localid);
} else {
propagate = blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, &bsub, event, protocol, realm, localid);
ks_assert(event_callback);
if (unsubscribe_channels) {
cJSON *channel = NULL;
cJSON_ArrayForEach(channel, unsubscribe_channels) {
blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, &bsub, protocol, realm, channel->valuestring, localid);
}
}
temp_data = cJSON_CreateObject();
if (callback) cJSON_AddItemToObject(temp_data, "callback", cJSON_CreatePtr((uintptr_t)callback));
if (data) cJSON_AddItemToObject(temp_data, "data", data);
if (channel_callback) cJSON_AddItemToObject(temp_data, "channel-callback", cJSON_CreatePtr((uintptr_t)channel_callback));
if (channel_data) cJSON_AddItemToObject(temp_data, "channel-data", channel_data);
ret = blade_handle_rpcsubscribe_raw(bh, protocol, realm, subscribe_channels, unsubscribe_channels, localid, KS_FALSE, blade_rpcsubscribe_response_handler, temp_data);
ks_pool_free(bh->pool, &localid);
if (!remove && bsub) {
blade_subscription_callback_set(bsub, event_callback);
blade_subscription_callback_data_set(bsub, event_data);
}
if (propagate) ret = blade_handle_rpcsubscribe_raw(bh, event, protocol, realm, remove, callback, data);
done:
if (bs) blade_session_read_unlock(bs);
return ret;
}
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
@ -1110,11 +1360,23 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const
cJSON *req_params = NULL;
ks_assert(bh);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
ks_assert(subscribe_channels || unsubscribe_channels);
ks_assert(subscriber);
if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
if (downstream) {
// @note if a master is sending a downstream update, it may only use unsubscribe_channels, cannot force a subscription without a subscriber callback
if (subscribe_channels) {
ret = KS_STATUS_NOT_ALLOWED;
goto done;
}
if (!(bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bh), subscriber))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
}
else if (!(bs = blade_upstreammgr_session_get(bh->upstreammgr))) {
ret = KS_STATUS_DISCONNECTED;
goto done;
}
@ -1124,14 +1386,17 @@ KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const
blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.subscribe");
cJSON_AddStringToObject(req_params, "event", event);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
if (remove) cJSON_AddTrueToObject(req_params, "remove");
cJSON_AddStringToObject(req_params, "subscriber-nodeid", subscriber);
if (downstream) cJSON_AddTrueToObject(req_params, "downstream");
if (subscribe_channels) cJSON_AddItemToObject(req_params, "subscribe-channels", cJSON_Duplicate(subscribe_channels, 1));
if (unsubscribe_channels) cJSON_AddItemToObject(req_params, "unsubscribe-channels", cJSON_Duplicate(unsubscribe_channels, 1));
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request started\n", blade_session_id_get(bs));
ret = blade_session_send(bs, req, callback, data);
ret = blade_session_send(bs, req, blade_rpcsubscribe_response_handler, data);
done:
if (req) cJSON_Delete(req);
@ -1141,21 +1406,25 @@ done:
}
// blade.subscribe request handler
ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void *data)
ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
ks_pool_t *pool = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
const char *req_params_event = NULL;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
cJSON *req_params_remove = NULL;
ks_bool_t remove = KS_FALSE;
const char *req_params_subscriber_nodeid = NULL;
cJSON *req_params_downstream = NULL;
ks_bool_t downstream = KS_FALSE;
cJSON *req_params_subscribe_channels = NULL;
cJSON *req_params_unsubscribe_channels = NULL;
ks_bool_t masterlocal = KS_FALSE;
cJSON *res = NULL;
cJSON *res_result = NULL;
ks_bool_t propagate = KS_FALSE;
cJSON *res_result_subscribe_channels = NULL;
cJSON *res_result_failed_channels = NULL;
ks_assert(brpcreq);
@ -1179,14 +1448,6 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
goto done;
}
req_params_event = cJSON_GetObjectCstr(req_params, "event");
if (!req_params_event) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'event'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'protocol'\n", blade_session_id_get(bs));
@ -1203,30 +1464,82 @@ ks_bool_t blade_rpcsubscribe_request_handler(blade_rpc_request_t *brpcreq, void
goto done;
}
req_params_remove = cJSON_GetObjectItem(req_params, "remove");
remove = req_params_remove && req_params_remove->type == cJSON_True;
req_params_subscriber_nodeid = cJSON_GetObjectCstr(req_params, "subscriber-nodeid");
if (!req_params_subscriber_nodeid) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'subscriber-nodeid'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params subscriber-nodeid");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
// @todo this may not be required, may be able to assume direction based on the session the message is received from, if came from upstream
// then it is heading downstream, otherwise it is heading upstream
req_params_downstream = cJSON_GetObjectItem(req_params, "downstream");
downstream = req_params_downstream && req_params_downstream->type == cJSON_True;
req_params_subscribe_channels = cJSON_GetObjectItem(req_params, "subscribe-channels");
req_params_unsubscribe_channels = cJSON_GetObjectItem(req_params, "unsubscribe-channels");
if (!req_params_subscribe_channels && !req_params_unsubscribe_channels) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'subscribe-channels' or 'unsubscribe-channels'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params subscribe-channels or unsubscribe-channels");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
// @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request processing\n", blade_session_id_get(bs));
if (remove) {
propagate = blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
} else {
propagate = blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, NULL, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
if (req_params_unsubscribe_channels) {
cJSON *channel = NULL;
cJSON_ArrayForEach(channel, req_params_unsubscribe_channels) {
blade_subscriptionmgr_subscriber_remove(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
}
}
if (propagate) blade_handle_rpcsubscribe_raw(bh, req_params_event, req_params_protocol, req_params_realm, remove, NULL, NULL);
masterlocal = blade_upstreammgr_masterlocal(blade_handle_upstreammgr_get(bh));
// build the actual response finally
if (masterlocal || blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bh), req_params_subscriber_nodeid)) {
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
cJSON_AddStringToObject(res_result, "event", req_params_event);
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
cJSON_AddStringToObject(res_result, "subscriber-nodeid", req_params_subscriber_nodeid);
if (downstream) cJSON_AddTrueToObject(res_result, "downstream");
if (req_params_subscribe_channels) {
// @note this can only be received by the master due to other validation logic in requests which prevents the master from sending a request containing subscribe-channels
cJSON *channel = NULL;
cJSON_ArrayForEach(channel, req_params_subscribe_channels) {
if (blade_mastermgr_channel_verify(bh->mastermgr, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid)) {
blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, NULL, req_params_protocol, req_params_realm, channel->valuestring, req_params_subscriber_nodeid);
if (!res_result_subscribe_channels) res_result_subscribe_channels = cJSON_CreateArray();
cJSON_AddItemToArray(res_result_subscribe_channels, cJSON_CreateString(channel->valuestring));
} else {
if (!res_result_failed_channels) res_result_failed_channels = cJSON_CreateArray();
cJSON_AddItemToArray(res_result_failed_channels, cJSON_CreateString(channel->valuestring));
}
}
}
if (res_result_subscribe_channels) cJSON_AddItemToObject(res_result, "subscribe-channels", res_result_subscribe_channels);
if (res_result_failed_channels) cJSON_AddItemToObject(res_result, "failed-channels", res_result_failed_channels);
// @note unsubscribe-channels get handled during the request path, so the response handlers do not need to reiterate them for any forseeable reason, and if they are needed they
// could be pulled from the original request associated to the response
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
} else {
cJSON *temp_data = cJSON_CreateObject();
// @note track this so that when this local node gets a response to this propagated request we know what messageid to propagate the response with
cJSON_AddStringToObject(temp_data, "messageid", blade_rpc_request_messageid_get(brpcreq));
blade_handle_rpcsubscribe_raw(bh, req_params_protocol, req_params_realm, req_params_subscribe_channels, req_params_unsubscribe_channels, req_params_subscriber_nodeid, downstream, blade_rpcsubscribe_response_handler, temp_data);
cJSON_Delete(temp_data);
}
done:
@ -1236,31 +1549,141 @@ done:
return KS_FALSE;
}
// blade.subscribe response handler
ks_bool_t blade_rpcsubscribe_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
{
ks_bool_t ret = KS_FALSE;
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
blade_rpc_response_callback_t original_callback = NULL;
cJSON *original_data = NULL;
blade_rpc_request_callback_t channel_callback = NULL;
cJSON *channel_data = NULL;
const char *messageid = NULL;
cJSON *res = NULL;
cJSON *res_result = NULL;
const char *res_result_protocol = NULL;
const char *res_result_realm = NULL;
const char *res_result_subscriber_nodeid = NULL;
cJSON *res_result_downstream = NULL;
ks_bool_t downstream = KS_FALSE;
cJSON *res_result_subscribe_channels = NULL;
cJSON *res_result_failed_channels = NULL;
ks_assert(brpcres);
ks_assert(data);
bh = blade_rpc_response_handle_get(brpcres);
ks_assert(bh);
bs = blade_sessionmgr_session_lookup(bh->sessionmgr, blade_rpc_response_sessionid_get(brpcres));
ks_assert(bs);
original_data = cJSON_GetObjectItem(data, "data");
original_callback = (blade_rpc_response_callback_t)(uintptr_t)cJSON_GetObjectPtr(data, "callback");
channel_data = cJSON_GetObjectItem(data, "channel-data");
channel_callback = (blade_rpc_request_callback_t)(uintptr_t)cJSON_GetObjectPtr(data, "channel-callback");
// @note when messageid exists, it means this message is only intended to be examined and relayed, the local node is not the subscriber
messageid = cJSON_GetObjectCstr(data, "messageid");
res = blade_rpc_response_message_get(brpcres);
ks_assert(res);
res_result = cJSON_GetObjectItem(res, "result");
if (!res_result) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe response missing 'result' object\n", blade_session_id_get(bs));
goto done;
}
// @todo the following 3 fields, protocol, realm, and subscriber-nodeid may not be required to carry in the response as they could be
// obtained from the original request tied to the response, change this later
res_result_protocol = cJSON_GetObjectCstr(res_result, "protocol");
if (!res_result_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe response missing 'protocol'\n", blade_session_id_get(bs));
goto done;
}
res_result_realm = cJSON_GetObjectCstr(res_result, "realm");
if (!res_result_realm) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe response missing 'realm'\n", blade_session_id_get(bs));
goto done;
}
res_result_subscriber_nodeid = cJSON_GetObjectCstr(res_result, "subscriber-nodeid");
if (!res_result_subscriber_nodeid) {
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe response missing 'subscriber-nodeid'\n", blade_session_id_get(bs));
goto done;
}
res_result_downstream = cJSON_GetObjectItem(res_result, "downstream");
downstream = res_result_downstream && res_result_downstream->type == cJSON_True;
res_result_subscribe_channels = cJSON_GetObjectItem(res_result, "subscribe-channels");
res_result_failed_channels = cJSON_GetObjectItem(res_result, "failed-channels");
if (res_result_subscribe_channels) {
// @note only reach here when the master has responded to authorize subscriptions to channels, so all nodes along the path must
// add the subscriber
cJSON *channel = NULL;
blade_subscription_t *bsub = NULL;
cJSON_ArrayForEach(channel, res_result_subscribe_channels) {
blade_subscriptionmgr_subscriber_add(bh->subscriptionmgr, &bsub, res_result_protocol, res_result_realm, channel->valuestring, res_result_subscriber_nodeid);
// @note these will only get assigned on the last response, received by the subscriber
if (channel_callback) blade_subscription_callback_set(bsub, channel_callback);
if (channel_data) blade_subscription_callback_data_set(bsub, channel_data);
}
}
// @note this will only happen on the last response, received by the subscriber
if (original_callback) ret = original_callback(brpcres, original_data);
if (messageid) {
blade_session_t *relay = NULL;
if (downstream) {
if (!(relay = blade_upstreammgr_session_get(bh->upstreammgr))) {
goto done;
}
} else {
if (!(relay = blade_routemgr_route_lookup(bh->routemgr, res_result_subscriber_nodeid))) {
goto done;
}
}
blade_rpc_response_raw_create(&res, &res_result, messageid);
cJSON_AddStringToObject(res_result, "protocol", res_result_protocol);
cJSON_AddStringToObject(res_result, "realm", res_result_realm);
cJSON_AddStringToObject(res_result, "subscriber-nodeid", res_result_subscriber_nodeid);
if (downstream) cJSON_AddTrueToObject(res_result, "downstream");
if (res_result_subscribe_channels) cJSON_AddItemToObject(res_result, "subscribe-channels", cJSON_Duplicate(res_result_subscribe_channels, 1));
if (res_result_failed_channels) cJSON_AddItemToObject(res_result, "failed-channels", cJSON_Duplicate(res_result_failed_channels, 1));
blade_session_send(relay, res, NULL, NULL);
cJSON_Delete(res);
blade_session_read_unlock(relay);
}
done:
blade_session_read_unlock(bs);
return ret;
}
// blade.broadcast request generator
KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *broadcaster_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_pool_t *pool = NULL;
const char *localid = NULL;
ks_assert(bh);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
// this will ensure any downstream subscriber sessions, and upstream session if available will be broadcasted to
pool = blade_handle_pool_get(bh);
if (!broadcaster_nodeid) {
blade_upstreammgr_localid_copy(bh->upstreammgr, pool, &localid);
ks_assert(localid);
broadcaster_nodeid = localid;
}
ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, broadcaster_nodeid, NULL, event, protocol, realm, params, callback, data);
if (localid) ks_pool_free(pool, &localid);
ret = blade_subscriptionmgr_broadcast(bh->subscriptionmgr, NULL, protocol, realm, channel, event, params, callback, data);
// @todo must check if the local node is also subscribed to receive the event, this is a special edge case which has some extra considerations
// if the local node is subscribed to receive the event, it should be received here as a special case, otherwise the broadcast request handler
@ -1270,17 +1693,17 @@ KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char
}
// blade.broadcast request handler
ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void *data)
ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
ks_bool_t ret = KS_FALSE;
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
const char *req_params_broadcaster_nodeid = NULL;
const char *req_params_event = NULL;
const char *req_params_protocol = NULL;
const char *req_params_realm = NULL;
const char *req_params_channel = NULL;
const char *req_params_event = NULL;
cJSON *req_params_params = NULL;
blade_subscription_t *bsub = NULL;
blade_rpc_request_callback_t callback = NULL;
@ -1306,22 +1729,6 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
goto done;
}
req_params_broadcaster_nodeid = cJSON_GetObjectCstr(req_params, "broadcaster-nodeid");
if (!req_params_broadcaster_nodeid) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'broadcaster-nodeid'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params broadcaster-nodeid");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_event = cJSON_GetObjectCstr(req_params, "event");
if (!req_params_event) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
if (!req_params_protocol) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'protocol'\n", blade_session_id_get(bs));
@ -1338,11 +1745,27 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
goto done;
}
req_params_channel = cJSON_GetObjectCstr(req_params, "channel");
if (!req_params_channel) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'channel'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params channel");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_event = cJSON_GetObjectCstr(req_params, "event");
if (!req_params_event) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request missing 'event'\n", blade_session_id_get(bs));
blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
blade_session_send(bs, res, NULL, NULL);
goto done;
}
req_params_params = cJSON_GetObjectItem(req_params, "params");
blade_subscriptionmgr_broadcast(bh->subscriptionmgr, req_params_broadcaster_nodeid, blade_session_id_get(bs), req_params_event, req_params_protocol, req_params_realm, req_params_params, NULL, NULL);
blade_subscriptionmgr_broadcast(bh->subscriptionmgr, blade_session_id_get(bs), req_params_protocol, req_params_realm, req_params_channel, req_params_event, req_params_params, NULL, NULL);
bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_event, req_params_protocol, req_params_realm);
bsub = blade_subscriptionmgr_subscription_lookup(bh->subscriptionmgr, req_params_protocol, req_params_realm, req_params_channel);
if (bsub) {
const char *localid = NULL;
ks_pool_t *pool = NULL;
@ -1362,10 +1785,11 @@ ks_bool_t blade_rpcbroadcast_request_handler(blade_rpc_request_t *brpcreq, void
// build the actual response finally
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
cJSON_AddStringToObject(res_result, "broadcaster-nodeid", req_params_broadcaster_nodeid);
cJSON_AddStringToObject(res_result, "event", req_params_event);
// @todo this is not neccessary, can obtain this from the original request
cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
cJSON_AddStringToObject(res_result, "realm", req_params_realm);
cJSON_AddStringToObject(res_result, "channel", req_params_channel);
cJSON_AddStringToObject(res_result, "event", req_params_event);
// request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
blade_session_send(bs, res, NULL, NULL);
@ -1378,23 +1802,6 @@ done:
return ret;
}
KS_DECLARE(const char *) blade_rpcbroadcast_request_broadcaster_nodeid_get(blade_rpc_request_t *brpcreq)
{
cJSON *req = NULL;
cJSON *req_params = NULL;
const char *req_broadcaster_nodeid = NULL;
ks_assert(brpcreq);
req = blade_rpc_request_message_get(brpcreq);
ks_assert(req);
req_params = cJSON_GetObjectItem(req, "params");
if (req_params) req_broadcaster_nodeid = cJSON_GetObjectCstr(req_params, "broadcaster-nodeid");
return req_broadcaster_nodeid;
}
KS_DECLARE(cJSON *) blade_rpcbroadcast_request_params_get(blade_rpc_request_t *brpcreq)
{
cJSON *req = NULL;

View File

@ -36,9 +36,9 @@
struct blade_subscription_s {
ks_pool_t *pool;
const char *event;
const char *protocol;
const char *realm;
const char *channel;
ks_hash_t *subscribers;
blade_rpc_request_callback_t callback;
@ -56,9 +56,9 @@ static void blade_subscription_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks
case KS_MPCL_ANNOUNCE:
break;
case KS_MPCL_TEARDOWN:
if (bsub->event) ks_pool_free(bsub->pool, &bsub->event);
if (bsub->protocol) ks_pool_free(bsub->pool, &bsub->protocol);
if (bsub->realm) ks_pool_free(bsub->pool, &bsub->subscribers);
if (bsub->channel) ks_pool_free(bsub->pool, &bsub->channel);
if (bsub->subscribers) ks_hash_destroy(&bsub->subscribers);
break;
case KS_MPCL_DESTROY:
@ -66,21 +66,21 @@ static void blade_subscription_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks
}
}
KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *event, const char *protocol, const char *realm)
KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *protocol, const char *realm, const char *channel)
{
blade_subscription_t *bsub = NULL;
ks_assert(bsubP);
ks_assert(pool);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
ks_assert(channel);
bsub = ks_pool_alloc(pool, sizeof(blade_subscription_t));
bsub->pool = pool;
bsub->event = ks_pstrdup(pool, event);
bsub->protocol = ks_pstrdup(pool, protocol);
bsub->realm = ks_pstrdup(pool, realm);
bsub->channel = ks_pstrdup(pool, channel);
ks_hash_create(&bsub->subscribers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bsub->pool);
ks_assert(bsub->subscribers);
@ -106,14 +106,6 @@ KS_DECLARE(ks_status_t) blade_subscription_destroy(blade_subscription_t **bsubP)
return KS_STATUS_SUCCESS;
}
KS_DECLARE(const char *) blade_subscription_event_get(blade_subscription_t *bsub)
{
ks_assert(bsub);
return bsub->event;
}
KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub)
{
ks_assert(bsub);
@ -130,6 +122,14 @@ KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub
}
KS_DECLARE(const char *) blade_subscription_channel_get(blade_subscription_t *bsub)
{
ks_assert(bsub);
return bsub->channel;
}
KS_DECLARE(ks_hash_t *) blade_subscription_subscribers_get(blade_subscription_t *bsub)
{
ks_assert(bsub);

View File

@ -128,17 +128,17 @@ KS_DECLARE(blade_handle_t *) blade_subscriptionmgr_handle_get(blade_subscription
// return bs;
//}
KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *event, const char *protocol, const char *realm)
KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel)
{
blade_subscription_t *bsub = NULL;
char *key = NULL;
ks_assert(bsmgr);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
ks_assert(channel);
key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, event);
key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, channel);
bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)key, KS_READLOCKED);
// @todo if (bsub) blade_subscription_read_lock(bsub);
@ -149,7 +149,7 @@ KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(bla
return bsub;
}
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *target)
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber)
{
char *key = NULL;
blade_subscription_t *bsub = NULL;
@ -157,40 +157,40 @@ KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr
ks_bool_t propagate = KS_FALSE;
ks_assert(bsmgr);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
ks_assert(target);
ks_assert(channel);
ks_assert(subscriber);
key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, event);
key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, channel);
ks_hash_write_lock(bsmgr->subscriptions);
bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)key, KS_UNLOCKED);
if (!bsub) {
blade_subscription_create(&bsub, bsmgr->pool, event, protocol, realm);
blade_subscription_create(&bsub, bsmgr->pool, protocol, realm, channel);
ks_assert(bsub);
ks_hash_insert(bsmgr->subscriptions, (void *)ks_pstrdup(bsmgr->pool, key), bsub);
propagate = KS_TRUE;
}
bsub_cleanup = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)target, KS_UNLOCKED);
bsub_cleanup = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)subscriber, KS_UNLOCKED);
if (!bsub_cleanup) {
ks_hash_create(&bsub_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bsmgr->pool);
ks_assert(bsub_cleanup);
ks_log(KS_LOG_DEBUG, "Subscription Added: %s\n", key);
ks_hash_insert(bsmgr->subscriptions_cleanup, (void *)ks_pstrdup(bsmgr->pool, target), (void *)bsub_cleanup);
ks_hash_insert(bsmgr->subscriptions_cleanup, (void *)ks_pstrdup(bsmgr->pool, subscriber), (void *)bsub_cleanup);
}
ks_hash_insert(bsub_cleanup, (void *)ks_pstrdup(bsmgr->pool, key), (void *)KS_TRUE);
blade_subscription_subscribers_add(bsub, target);
blade_subscription_subscribers_add(bsub, subscriber);
ks_hash_write_unlock(bsmgr->subscriptions);
ks_log(KS_LOG_DEBUG, "Subscriber Added: %s to %s\n", target, key);
ks_log(KS_LOG_DEBUG, "Subscriber Added: %s to %s\n", subscriber, key);
ks_pool_free(bsmgr->pool, &key);
@ -199,7 +199,7 @@ KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr
return propagate;
}
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *target)
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber)
{
char *key = NULL;
blade_subscription_t *bsub = NULL;
@ -207,28 +207,28 @@ KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscription
ks_bool_t propagate = KS_FALSE;
ks_assert(bsmgr);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
ks_assert(target);
ks_assert(channel);
ks_assert(subscriber);
key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, event);
key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, channel);
ks_hash_write_lock(bsmgr->subscriptions);
bsub = (blade_subscription_t *)ks_hash_search(bsmgr->subscriptions, (void *)key, KS_UNLOCKED);
if (bsub) {
bsub_cleanup = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)target, KS_UNLOCKED);
bsub_cleanup = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)subscriber, KS_UNLOCKED);
ks_assert(bsub_cleanup);
ks_hash_remove(bsub_cleanup, key);
if (ks_hash_count(bsub_cleanup) == 0) {
ks_hash_remove(bsmgr->subscriptions_cleanup, (void *)target);
ks_hash_remove(bsmgr->subscriptions_cleanup, (void *)subscriber);
}
ks_log(KS_LOG_DEBUG, "Subscriber Removed: %s from %s\n", target, key);
blade_subscription_subscribers_remove(bsub, target);
ks_log(KS_LOG_DEBUG, "Subscriber Removed: %s from %s\n", subscriber, key);
blade_subscription_subscribers_remove(bsub, subscriber);
if (ks_hash_count(blade_subscription_subscribers_get(bsub)) == 0) {
ks_log(KS_LOG_DEBUG, "Subscription Removed: %s\n", key);
@ -246,7 +246,7 @@ KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscription
return propagate;
}
KS_DECLARE(void) blade_subscriptionmgr_subscriber_cleanup(blade_subscriptionmgr_t *bsmgr, const char *target)
KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target)
{
ks_bool_t unsubbed = KS_FALSE;
@ -255,9 +255,9 @@ KS_DECLARE(void) blade_subscriptionmgr_subscriber_cleanup(blade_subscriptionmgr_
while (!unsubbed) {
ks_hash_t *subscriptions = NULL;
const char *event = NULL;
const char *protocol = NULL;
const char *realm = NULL;
const char *channel = NULL;
ks_hash_read_lock(bsmgr->subscriptions);
subscriptions = (ks_hash_t *)ks_hash_search(bsmgr->subscriptions_cleanup, (void *)target, KS_UNLOCKED);
@ -276,36 +276,43 @@ KS_DECLARE(void) blade_subscriptionmgr_subscriber_cleanup(blade_subscriptionmgr_
ks_assert(bsub);
// @note allocate these to avoid lifecycle issues when the last subscriber is removed causing the subscription to be removed
event = ks_pstrdup(bsmgr->pool, blade_subscription_event_get(bsub));
protocol = ks_pstrdup(bsmgr->pool, blade_subscription_protocol_get(bsub));
realm = ks_pstrdup(bsmgr->pool, blade_subscription_realm_get(bsub));
channel = ks_pstrdup(bsmgr->pool, blade_subscription_channel_get(bsub));
}
ks_hash_read_unlock(bsmgr->subscriptions);
if (!unsubbed) {
if (blade_subscriptionmgr_subscriber_remove(bsmgr, NULL, event, protocol, realm, target)) {
blade_handle_rpcsubscribe_raw(bsmgr->handle, event, protocol, realm, KS_TRUE, NULL, NULL);
}
ks_pool_free(bsmgr->pool, &event);
blade_subscriptionmgr_subscriber_remove(bsmgr, NULL, protocol, realm, channel, target);
ks_pool_free(bsmgr->pool, &protocol);
ks_pool_free(bsmgr->pool, &realm);
ks_pool_free(bsmgr->pool, &channel);
}
}
}
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *broadcaster_nodeid, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data)
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data)
{
const char *bsub_key = NULL;
blade_subscription_t *bsub = NULL;
blade_session_t *bs = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
ks_assert(bsmgr);
ks_assert(broadcaster_nodeid);
ks_assert(event);
ks_assert(protocol);
ks_assert(realm);
ks_assert(channel);
bsub_key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, event);
bsub_key = ks_psprintf(bsmgr->pool, "%s@%s/%s", protocol, realm, channel);
blade_rpc_request_raw_create(bsmgr->pool, &req, &req_params, NULL, "blade.broadcast");
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
cJSON_AddStringToObject(req_params, "channel", channel);
cJSON_AddStringToObject(req_params, "event", event);
if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
ks_hash_read_lock(bsmgr->subscriptions);
@ -318,32 +325,20 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
for (ks_hash_iterator_t *it = ks_hash_first(subscribers, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
void *key = NULL;
void *value = NULL;
cJSON *req = NULL;
cJSON *req_params = NULL;
ks_hash_this(it, (const void **)&key, NULL, &value);
if (excluded_nodeid && !ks_safe_strcasecmp(excluded_nodeid, (const char *)key)) continue;
// @todo broadcast producer is also a local subscriber... requires special consideration with no session to request through
if (blade_upstreammgr_localid_compare(blade_handle_upstreammgr_get(bsmgr->handle), (const char *)key)) continue;
bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bsmgr->handle), (const char *)key);
bs = blade_routemgr_route_lookup(blade_handle_routemgr_get(bsmgr->handle), (const char *)key);
if (bs) {
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs));
blade_rpc_request_raw_create(bsmgr->pool, &req, &req_params, NULL, "blade.broadcast");
cJSON_AddStringToObject(req_params, "broadcaster-nodeid", broadcaster_nodeid);
cJSON_AddStringToObject(req_params, "event", event);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
ks_log(KS_LOG_DEBUG, "Broadcasting: %s through %s\n", bsub_key, blade_session_id_get(bs));
blade_session_send(bs, req, callback, data);
cJSON_Delete(req);
blade_session_read_unlock(bs);
}
}
@ -351,33 +346,21 @@ KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t
ks_hash_read_unlock(bsmgr->subscriptions);
ks_pool_free(bsmgr->pool, &bsub_key);
bs = blade_upstreammgr_session_get(blade_handle_upstreammgr_get(bsmgr->handle));
if (bs) {
if (!excluded_nodeid || ks_safe_strcasecmp(blade_session_id_get(bs), excluded_nodeid)) {
cJSON *req = NULL;
cJSON *req_params = NULL;
ks_log(KS_LOG_DEBUG, "Session (%s) broadcast request started\n", blade_session_id_get(bs));
blade_rpc_request_raw_create(bsmgr->pool, &req, &req_params, NULL, "blade.broadcast");
cJSON_AddStringToObject(req_params, "broadcaster-nodeid", broadcaster_nodeid);
cJSON_AddStringToObject(req_params, "event", event);
cJSON_AddStringToObject(req_params, "protocol", protocol);
cJSON_AddStringToObject(req_params, "realm", realm);
if (params) cJSON_AddItemToObject(req_params, "params", cJSON_Duplicate(params, 1));
ks_log(KS_LOG_DEBUG, "Broadcasting: %s through %s\n", bsub_key, blade_session_id_get(bs));
blade_session_send(bs, req, callback, data);
cJSON_Delete(req);
}
blade_session_read_unlock(bs);
}
cJSON_Delete(req);
ks_pool_free(bsmgr->pool, &bsub_key);
return KS_STATUS_SUCCESS;
}

View File

@ -250,6 +250,21 @@ KS_DECLARE(ks_status_t) blade_upstreammgr_masterid_copy(blade_upstreammgr_t *bum
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_bool_t) blade_upstreammgr_masterlocal(blade_upstreammgr_t *bumgr)
{
ks_bool_t ret = KS_FALSE;
ks_assert(bumgr);
ks_rwl_read_lock(bumgr->masterid_rwl);
ks_rwl_read_lock(bumgr->localid_rwl);
ret = bumgr->masterid && bumgr->localid && !ks_safe_strcasecmp(bumgr->masterid, bumgr->localid);
ks_rwl_read_unlock(bumgr->localid_rwl);
ks_rwl_read_unlock(bumgr->masterid_rwl);
return ret;
}
KS_DECLARE(ks_status_t) blade_upstreammgr_realm_add(blade_upstreammgr_t *bumgr, const char *realm)
{
char *key = NULL;

View File

@ -39,9 +39,13 @@ KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_mastermgr_create(blade_mastermgr_t **bmmgrP, blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_mastermgr_destroy(blade_mastermgr_t **bmmgrP);
KS_DECLARE(blade_handle_t *) blade_mastermgr_handle_get(blade_mastermgr_t *bmmgr);
KS_DECLARE(ks_status_t) blade_mastermgr_purge(blade_mastermgr_t *bmmgr, const char *nodeid);
KS_DECLARE(blade_protocol_t *) blade_mastermgr_protocol_lookup(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm);
KS_DECLARE(ks_status_t) blade_mastermgr_controller_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *controller);
KS_DECLARE(ks_status_t) blade_mastermgr_controller_remove(blade_mastermgr_t *bmmgr, const char *controller);
KS_DECLARE(ks_status_t) blade_mastermgr_channel_add(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel);
KS_DECLARE(ks_status_t) blade_mastermgr_channel_remove(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel);
KS_DECLARE(ks_status_t) blade_mastermgr_channel_authorize(blade_mastermgr_t *bmmgr, ks_bool_t remove, const char *protocol, const char *realm, const char *channel, const char *controller, const char *target);
KS_DECLARE(ks_bool_t) blade_mastermgr_channel_verify(blade_mastermgr_t *bmmgr, const char *protocol, const char *realm, const char *channel, const char *target);
KS_END_EXTERN_C
#endif

View File

@ -38,9 +38,13 @@
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_protocol_create(blade_protocol_t **bpP, ks_pool_t *pool, const char *name, const char *realm);
KS_DECLARE(ks_status_t) blade_protocol_destroy(blade_protocol_t **bpP);
KS_DECLARE(ks_hash_t *) blade_protocol_controllers_get(blade_protocol_t *bp);
KS_DECLARE(ks_bool_t) blade_protocol_purge(blade_protocol_t *bp, const char *nodeid);
KS_DECLARE(ks_status_t) blade_protocol_controllers_add(blade_protocol_t *bp, const char *nodeid);
KS_DECLARE(ks_status_t) blade_protocol_controllers_remove(blade_protocol_t *bp, const char *nodeid);
KS_DECLARE(cJSON *) blade_protocol_controllers_pack(blade_protocol_t *bp);
KS_DECLARE(ks_status_t) blade_protocol_channel_add(blade_protocol_t *bp, const char *name);
KS_DECLARE(ks_status_t) blade_protocol_channel_remove(blade_protocol_t *bp, const char *name);
KS_DECLARE(ks_status_t) blade_protocol_channel_authorize(blade_protocol_t *bp, ks_bool_t remove, const char *channel, const char *controller, const char *target);
KS_DECLARE(ks_bool_t) blade_protocol_channel_verify(blade_protocol_t *bp, const char *channel, const char *target);
KS_END_EXTERN_C
#endif

View File

@ -36,14 +36,14 @@
#include <blade.h>
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_rpc_create(blade_rpc_t **brpcP, blade_handle_t *bh, const char *method, const char *protocol, const char *realm, blade_rpc_request_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_rpc_destroy(blade_rpc_t **brpcP);
KS_DECLARE(blade_handle_t *) blade_rpc_handle_get(blade_rpc_t *brpc);
KS_DECLARE(const char *) blade_rpc_method_get(blade_rpc_t *brpc);
KS_DECLARE(const char *) blade_rpc_protocol_get(blade_rpc_t *brpc);
KS_DECLARE(const char *) blade_rpc_realm_get(blade_rpc_t *brpc);
KS_DECLARE(blade_rpc_request_callback_t) blade_rpc_callback_get(blade_rpc_t *brpc);
KS_DECLARE(void *) blade_rpc_data_get(blade_rpc_t *brpc);
KS_DECLARE(cJSON *) blade_rpc_data_get(blade_rpc_t *brpc);
KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
blade_handle_t *bh,
@ -51,14 +51,15 @@ KS_DECLARE(ks_status_t) blade_rpc_request_create(blade_rpc_request_t **brpcreqP,
const char *session_id,
cJSON *json,
blade_rpc_response_callback_t callback,
void *data);
cJSON *data);
KS_DECLARE(ks_status_t) blade_rpc_request_destroy(blade_rpc_request_t **brpcreqP);
KS_DECLARE(ks_status_t) blade_rpc_request_duplicate(blade_rpc_request_t **brpcreqP, blade_rpc_request_t *brpcreq);
KS_DECLARE(blade_handle_t *) blade_rpc_request_handle_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(const char *) blade_rpc_request_sessionid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(cJSON *) blade_rpc_request_message_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(const char *) blade_rpc_request_messageid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(blade_rpc_response_callback_t) blade_rpc_request_callback_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(void *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(cJSON *) blade_rpc_request_data_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(ks_status_t) blade_rpc_request_raw_create(ks_pool_t *pool, cJSON **json, cJSON **params, const char **id, const char *method);

View File

@ -62,7 +62,7 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs);
KS_DECLARE(ks_bool_t) blade_session_terminating(blade_session_t *bs);
KS_DECLARE(const char *) blade_session_connection_get(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_connection_set(blade_session_t *bs, const char *id);
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json);
KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json);
KS_DECLARE(ks_status_t) blade_session_receiving_push(blade_session_t *bs, cJSON *json);

View File

@ -59,24 +59,25 @@ KS_DECLARE(blade_sessionmgr_t *) blade_handle_sessionmgr_get(blade_handle_t *bh)
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id);
KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpcregister(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpcpublish(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *name, const char *realm, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpcauthorize(blade_handle_t *bh, const char *nodeid, ks_bool_t remove, const char *protocol, const char *realm, cJSON *channels, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpclocate(blade_handle_t *bh, const char *protocol, const char *realm, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_handle_rpcexecute(blade_handle_t *bh, const char *nodeid, const char *method, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(const char *) blade_rpcexecute_request_requester_nodeid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(const char *) blade_rpcexecute_request_responder_nodeid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(cJSON *) blade_rpcexecute_request_params_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(cJSON *) blade_rpcexecute_response_result_get(blade_rpc_response_t *brpcres);
KS_DECLARE(void) blade_rpcexecute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data);
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, blade_rpc_response_callback_t callback, cJSON *data, blade_rpc_request_callback_t channel_callback, cJSON *channel_data);
KS_DECLARE(ks_status_t) blade_handle_rpcsubscribe_raw(blade_handle_t *bh, const char *protocol, const char *realm, cJSON *subscribe_channels, cJSON *unsubscribe_channels, const char *subscriber, ks_bool_t downstream, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *broadcaster_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(const char *) blade_rpcbroadcast_request_broadcaster_nodeid_get(blade_rpc_request_t *brpcreq);
KS_DECLARE(ks_status_t) blade_handle_rpcbroadcast(blade_handle_t *bh, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
KS_DECLARE(cJSON *) blade_rpcbroadcast_request_params_get(blade_rpc_request_t *brpcreq);
KS_END_EXTERN_C

View File

@ -36,11 +36,11 @@
#include <blade.h>
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *event, const char *protocol, const char *realm);
KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *protocol, const char *realm, const char *channel);
KS_DECLARE(ks_status_t) blade_subscription_destroy(blade_subscription_t **bsubP);
KS_DECLARE(const char *) blade_subscription_event_get(blade_subscription_t *bsub);
KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub);
KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub);
KS_DECLARE(const char *) blade_subscription_channel_get(blade_subscription_t *bsub);
KS_DECLARE(ks_hash_t *) blade_subscription_subscribers_get(blade_subscription_t *bsub);
KS_DECLARE(ks_status_t) blade_subscription_subscribers_add(blade_subscription_t *bsub, const char *nodeid);
KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription_t *bsub, const char *nodeid);

View File

@ -39,11 +39,11 @@ KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_subscriptionmgr_create(blade_subscriptionmgr_t **bsmgrP, blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_destroy(blade_subscriptionmgr_t **bsmgrP);
KS_DECLARE(blade_handle_t *) blade_subscriptionmgr_handle_get(blade_subscriptionmgr_t *bsmgr);
KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *event, const char *protocol, const char *realm);
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *target);
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *target);
KS_DECLARE(void) blade_subscriptionmgr_subscriber_cleanup(blade_subscriptionmgr_t *bsmgr, const char *target);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *broadcaster_nodeid, const char *excluded_nodeid, const char *event, const char *protocol, const char *realm, cJSON *params, blade_rpc_response_callback_t callback, void *data);
KS_DECLARE(blade_subscription_t *) blade_subscriptionmgr_subscription_lookup(blade_subscriptionmgr_t *bsmgr, const char *protocol, const char *realm, const char *channel);
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_add(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
KS_DECLARE(ks_bool_t) blade_subscriptionmgr_subscriber_remove(blade_subscriptionmgr_t *bsmgr, blade_subscription_t **bsubP, const char *protocol, const char *realm, const char *channel, const char *subscriber);
KS_DECLARE(void) blade_subscriptionmgr_purge(blade_subscriptionmgr_t *bsmgr, const char *target);
KS_DECLARE(ks_status_t) blade_subscriptionmgr_broadcast(blade_subscriptionmgr_t *bsmgr, const char *excluded_nodeid, const char *protocol, const char *realm, const char *channel, const char *event, cJSON *params, blade_rpc_response_callback_t callback, cJSON *data);
KS_END_EXTERN_C
#endif

View File

@ -62,8 +62,8 @@ typedef struct blade_connectionmgr_s blade_connectionmgr_t;
typedef struct blade_sessionmgr_s blade_sessionmgr_t;
typedef struct blade_session_callback_data_s blade_session_callback_data_t;
typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, void *data);
typedef ks_bool_t (*blade_rpc_response_callback_t)(blade_rpc_response_t *brpcres, void *data);
typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, cJSON *data);
typedef ks_bool_t (*blade_rpc_response_callback_t)(blade_rpc_response_t *brpcres, cJSON *data);
typedef enum {

View File

@ -47,6 +47,7 @@ KS_DECLARE(blade_session_t *) blade_upstreammgr_session_get(blade_upstreammgr_t
KS_DECLARE(ks_status_t) blade_upstreammgr_masterid_set(blade_upstreammgr_t *bumgr, const char *id);
KS_DECLARE(ks_bool_t) blade_upstreammgr_masterid_compare(blade_upstreammgr_t *bumgr, const char *id);
KS_DECLARE(ks_status_t) blade_upstreammgr_masterid_copy(blade_upstreammgr_t *bumgr, ks_pool_t *pool, const char **id);
KS_DECLARE(ks_bool_t) blade_upstreammgr_masterlocal(blade_upstreammgr_t *bumgr);
//KS_DECLARE(ks_hash_t *) blade_upstreammgr_realm_lookup(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_upstreammgr_realm_add(blade_upstreammgr_t *bumgr, const char *realm);
KS_DECLARE(ks_status_t) blade_upstreammgr_realm_remove(blade_upstreammgr_t *bumgr, const char *realm);

View File

@ -290,10 +290,15 @@ void command_execute(blade_handle_t *bh, char *args)
void command_subscribe(blade_handle_t *bh, char *args)
{
cJSON *channels = NULL;
ks_assert(bh);
ks_assert(args);
blade_handle_rpcsubscribe(bh, "test.event", "test", "mydomain.com", KS_FALSE, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
cJSON_Delete(channels);
}
/* For Emacs:

View File

@ -81,11 +81,12 @@ ks_bool_t test_echo_request_handler(blade_rpc_request_t *brpcreq, void *data)
cJSON_AddStringToObject(result, "text", text);
blade_rpcexecute_response_send(brpcreq, result);
cJSON_Delete(result);
return KS_FALSE;
}
ks_bool_t test_event_response_handler(blade_rpc_response_t *brpcres, void *data)
ks_bool_t test_broadcast_response_handler(blade_rpc_response_t *brpcres, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -98,7 +99,7 @@ ks_bool_t test_event_response_handler(blade_rpc_response_t *brpcres, void *data)
bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_response_sessionid_get(brpcres));
ks_assert(bs);
ks_log(KS_LOG_DEBUG, "Session (%s) test.event response processing\n", blade_session_id_get(bs));
ks_log(KS_LOG_DEBUG, "Session (%s) test broadcast response processing\n", blade_session_id_get(bs));
blade_session_read_unlock(bs);
@ -243,7 +244,7 @@ void command_publish(blade_handle_t *bh, char *args)
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
// @todo build up json-based method schema for each protocolrpc registered above, and pass into blade_handle_rpcpublish() to attach to the request, to be stored in the blade_protocol_t tracked by the master node
blade_handle_rpcpublish(bh, "test", "mydomain.com", blade_publish_response_handler, NULL);
blade_handle_rpcpublish(bh, "test", "mydomain.com", NULL, blade_publish_response_handler, NULL);
}
void command_broadcast(blade_handle_t *bh, char *args)
@ -251,7 +252,7 @@ void command_broadcast(blade_handle_t *bh, char *args)
ks_assert(bh);
ks_assert(args);
blade_handle_rpcbroadcast(bh, NULL, "test.event", "test", "mydomain.com", NULL, test_event_response_handler, NULL);
blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "event", NULL, test_broadcast_response_handler, NULL);
}

View File

@ -33,7 +33,7 @@ static const struct command_def_s command_defs[] = {
const char *g_testcon_nodeid = NULL;
ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, void *data)
ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -87,7 +87,7 @@ ks_bool_t test_locate_response_handler(blade_rpc_response_t *brpcres, void *data
return KS_FALSE;
}
ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, void *data)
ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -111,7 +111,7 @@ ks_bool_t test_join_response_handler(blade_rpc_response_t *brpcres, void *data)
return KS_FALSE;
}
ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, void *data)
ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -135,7 +135,7 @@ ks_bool_t test_leave_response_handler(blade_rpc_response_t *brpcres, void *data)
return KS_FALSE;
}
ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, void *data)
ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
@ -159,11 +159,10 @@ ks_bool_t test_talk_response_handler(blade_rpc_response_t *brpcres, void *data)
return KS_FALSE;
}
ks_bool_t test_join_broadcast_handler(blade_rpc_request_t *brpcreq, void *data)
ks_bool_t test_broadcast_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
const char *broadcaster_nodeid = NULL;
cJSON *params = NULL;
//cJSON *result = NULL;
@ -178,70 +177,7 @@ ks_bool_t test_join_broadcast_handler(blade_rpc_request_t *brpcreq, void *data)
params = blade_rpcbroadcast_request_params_get(brpcreq);
ks_assert(params);
broadcaster_nodeid = blade_rpcbroadcast_request_broadcaster_nodeid_get(brpcreq);
ks_assert(broadcaster_nodeid);
ks_log(KS_LOG_DEBUG, "Session (%s) test.join (%s) broadcast processing\n", blade_session_id_get(bs), broadcaster_nodeid);
blade_session_read_unlock(bs);
return KS_FALSE;
}
ks_bool_t test_leave_broadcast_handler(blade_rpc_request_t *brpcreq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
const char *broadcaster_nodeid = NULL;
cJSON *params = NULL;
//cJSON *result = NULL;
ks_assert(brpcreq);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_request_sessionid_get(brpcreq));
ks_assert(bs);
params = blade_rpcbroadcast_request_params_get(brpcreq);
ks_assert(params);
broadcaster_nodeid = blade_rpcbroadcast_request_broadcaster_nodeid_get(brpcreq);
ks_assert(broadcaster_nodeid);
ks_log(KS_LOG_DEBUG, "Session (%s) test.leave (%s) broadcast processing\n", blade_session_id_get(bs), broadcaster_nodeid);
blade_session_read_unlock(bs);
return KS_FALSE;
}
ks_bool_t test_talk_broadcast_handler(blade_rpc_request_t *brpcreq, void *data)
{
blade_handle_t *bh = NULL;
blade_session_t *bs = NULL;
const char *broadcaster_nodeid = NULL;
cJSON *params = NULL;
//cJSON *result = NULL;
ks_assert(brpcreq);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_request_sessionid_get(brpcreq));
ks_assert(bs);
broadcaster_nodeid = blade_rpcbroadcast_request_broadcaster_nodeid_get(brpcreq);
ks_assert(broadcaster_nodeid);
params = blade_rpcbroadcast_request_params_get(brpcreq);
ks_assert(params);
// @todo pull out text from params
ks_log(KS_LOG_DEBUG, "Session (%s) test.talk (%s) broadcast processing\n", blade_session_id_get(bs), broadcaster_nodeid);
ks_log(KS_LOG_DEBUG, "Session (%s) test broadcast processing\n", blade_session_id_get(bs));
blade_session_read_unlock(bs);
@ -391,6 +327,7 @@ void command_locate(blade_handle_t *bh, char *args)
void command_join(blade_handle_t *bh, char *args)
{
cJSON *params = NULL;
cJSON *channels = NULL;
ks_assert(bh);
ks_assert(args);
@ -402,17 +339,19 @@ void command_join(blade_handle_t *bh, char *args)
params = cJSON_CreateObject();
blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.join", "test", "mydomain.com", params, test_join_response_handler, NULL);
cJSON_Delete(params);
blade_handle_rpcsubscribe(bh, "test.join", "test", "mydomain.com", KS_FALSE, NULL, NULL, test_join_broadcast_handler, NULL);
blade_handle_rpcsubscribe(bh, "test.leave", "test", "mydomain.com", KS_FALSE, NULL, NULL, test_leave_broadcast_handler, NULL);
blade_handle_rpcsubscribe(bh, "test.talk", "test", "mydomain.com", KS_FALSE, NULL, NULL, test_talk_broadcast_handler, NULL);
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
blade_handle_rpcsubscribe(bh, "test", "mydomain.com", channels, NULL, NULL, NULL, test_broadcast_handler, NULL);
cJSON_Delete(channels);
}
void command_leave(blade_handle_t *bh, char *args)
{
cJSON *params = NULL;
cJSON *channels = NULL;
ks_assert(bh);
ks_assert(args);
@ -423,12 +362,13 @@ void command_leave(blade_handle_t *bh, char *args)
}
params = cJSON_CreateObject();
blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.leave", "test", "mydomain.com", params, test_leave_response_handler, NULL);
cJSON_Delete(params);
blade_handle_rpcsubscribe(bh, "test.join", "test", "mydomain.com", KS_TRUE, NULL, NULL, NULL, NULL);
blade_handle_rpcsubscribe(bh, "test.leave", "test", "mydomain.com", KS_TRUE, NULL, NULL, NULL, NULL);
blade_handle_rpcsubscribe(bh, "test.talk", "test", "mydomain.com", KS_TRUE, NULL, NULL, NULL, NULL);
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("test"));
blade_handle_rpcsubscribe(bh, "test", "mydomain.com", NULL, channels, NULL, NULL, NULL, NULL);
cJSON_Delete(channels);
}
void command_talk(blade_handle_t *bh, char *args)
@ -448,10 +388,9 @@ void command_talk(blade_handle_t *bh, char *args)
}
params = cJSON_CreateObject();
cJSON_AddStringToObject(params, "text", args);
blade_handle_rpcexecute(bh, g_testcon_nodeid, "test.talk", "test", "mydomain.com", params, test_talk_response_handler, NULL);
cJSON_Delete(params);
}
/* For Emacs:

View File

@ -88,7 +88,7 @@ ks_status_t testproto_destroy(testproto_t **testP)
return KS_STATUS_SUCCESS;
}
ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, void *data)
ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, cJSON *data)
{
//testproto_t *test = NULL;
blade_handle_t *bh = NULL;
@ -97,7 +97,7 @@ ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, void *dat
ks_assert(brpcres);
ks_assert(data);
//test = (testproto_t *)data;
//test = (testproto_t *)cJSON_GetPtrValue(data);
bh = blade_rpc_response_handle_get(brpcres);
ks_assert(bh);
@ -112,7 +112,7 @@ ks_bool_t test_publish_response_handler(blade_rpc_response_t *brpcres, void *dat
return KS_FALSE;
}
ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data)
ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
testproto_t *test = NULL;
blade_handle_t *bh = NULL;
@ -120,48 +120,70 @@ ks_bool_t test_join_request_handler(blade_rpc_request_t *brpcreq, void *data)
const char *requester_nodeid = NULL;
const char *key = NULL;
cJSON *params = NULL;
cJSON *channels = NULL;
cJSON *result = NULL;
ks_assert(brpcreq);
ks_assert(data);
test = (testproto_t *)data;
test = (testproto_t *)cJSON_GetPtrValue(data);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
// session for execute response
bs = blade_sessionmgr_session_lookup(blade_handle_sessionmgr_get(bh), blade_rpc_request_sessionid_get(brpcreq));
ks_assert(bs);
requester_nodeid = blade_rpcexecute_request_requester_nodeid_get(brpcreq);
ks_assert(requester_nodeid);
// inner rpcexecute parameters
params = blade_rpcexecute_request_params_get(brpcreq);
ks_assert(params);
ks_log(KS_LOG_DEBUG, "Session (%s) test.join request processing\n", blade_session_id_get(bs));
// add to participants
key = ks_pstrdup(test->pool, requester_nodeid);
ks_assert(key);
// @todo to properly maintain protocol details tied to a specific node like this participants list requires a way to know if a specific node of interest goes offline to cleanup associated details
// refer back to work notes on ideas about this
ks_hash_write_lock(test->participants);
ks_hash_insert(test->participants, (void *)key, (void *)KS_TRUE);
ks_hash_write_unlock(test->participants);
blade_session_read_unlock(bs);
// authorize channels with the master for the requester
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
blade_handle_rpcauthorize(bh, requester_nodeid, KS_FALSE, "test", "mydomain.com", channels, NULL, NULL);
cJSON_Delete(channels);
// send rpcexecute response to the requester
result = cJSON_CreateObject();
blade_rpcexecute_response_send(brpcreq, result);
cJSON_Delete(result);
blade_session_read_unlock(bs);
// broadcast to authorized nodes that have subscribed, that the requester has joined
params = cJSON_CreateObject();
blade_handle_rpcbroadcast(bh, requester_nodeid, "test.join", "test", "mydomain.com", params, NULL, NULL);
cJSON_AddStringToObject(params, "joiner-nodeid", requester_nodeid);
blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "join", params, NULL, NULL);
cJSON_Delete(params);
return KS_FALSE;
}
ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, void *data)
ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
testproto_t *test = NULL;
blade_handle_t *bh = NULL;
@ -174,7 +196,7 @@ ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, void *data)
ks_assert(brpcreq);
ks_assert(data);
test = (testproto_t *)data;
test = (testproto_t *)cJSON_GetPtrValue(data);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
@ -202,12 +224,14 @@ ks_bool_t test_leave_request_handler(blade_rpc_request_t *brpcreq, void *data)
params = cJSON_CreateObject();
blade_handle_rpcbroadcast(bh, requester_nodeid, "test.leave", "test", "mydomain.com", params, NULL, NULL);
cJSON_AddStringToObject(params, "leaver-nodeid", requester_nodeid);
blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "leave", params, NULL, NULL);
return KS_FALSE;
}
ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, void *data)
ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, cJSON *data)
{
//testproto_t *test = NULL;
blade_handle_t *bh = NULL;
@ -220,7 +244,7 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, void *data)
ks_assert(brpcreq);
ks_assert(data);
//test = (testproto_t *)data;
//test = (testproto_t *)cJSON_GetPtrValue(data);
bh = blade_rpc_request_handle_get(brpcreq);
ks_assert(bh);
@ -249,7 +273,9 @@ ks_bool_t test_talk_request_handler(blade_rpc_request_t *brpcreq, void *data)
cJSON_AddStringToObject(params, "text", text);
blade_handle_rpcbroadcast(bh, requester_nodeid, "test.talk", "test", "mydomain.com", params, NULL, NULL);
cJSON_AddStringToObject(params, "talker-nodeid", requester_nodeid);
blade_handle_rpcbroadcast(bh, "test", "mydomain.com", "channel", "talk", params, NULL, NULL);
return KS_FALSE;
}
@ -314,19 +340,24 @@ int main(int argc, char **argv)
blade_identity_destroy(&target);
if (connected) {
cJSON *channels = NULL;
// @todo use session state change callback to know when the session is ready and the realm(s) available from blade.connect, this hack temporarily ensures it's ready before trying to publish upstream
ks_sleep_ms(3000);
blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, test);
blade_rpc_create(&brpc, bh, "test.join", "test", "mydomain.com", test_join_request_handler, cJSON_CreatePtr((uintptr_t)test));
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, test);
blade_rpc_create(&brpc, bh, "test.leave", "test", "mydomain.com", test_leave_request_handler, cJSON_CreatePtr((uintptr_t)test));
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, test);
blade_rpc_create(&brpc, bh, "test.talk", "test", "mydomain.com", test_talk_request_handler, cJSON_CreatePtr((uintptr_t)test));
blade_rpcmgr_protocolrpc_add(blade_handle_rpcmgr_get(bh), brpc);
blade_handle_rpcpublish(bh, "test", "mydomain.com", test_publish_response_handler, test);
channels = cJSON_CreateArray();
cJSON_AddItemToArray(channels, cJSON_CreateString("channel"));
blade_handle_rpcpublish(bh, "test", "mydomain.com", channels, test_publish_response_handler, cJSON_CreatePtr((uintptr_t)test));
}
}

View File

@ -45,6 +45,9 @@ extern "C"
KS_DECLARE(cJSON *) cJSON_CreateStringPrintf(const char *fmt, ...);
KS_DECLARE(const char *)cJSON_GetObjectCstr(const cJSON *object, const char *string);
KS_DECLARE(cJSON *) cJSON_CreatePtr(uintptr_t pointer);
KS_DECLARE(uintptr_t) cJSON_GetPtrValue(const cJSON *object);
KS_DECLARE(uintptr_t) cJSON_GetObjectPtr(const cJSON *object, const char *string);
static inline cJSON *ks_json_add_child_obj(cJSON *json, const char *name, cJSON *obj)
{

View File

@ -28,6 +28,25 @@ KS_DECLARE(const char *)cJSON_GetObjectCstr(const cJSON *object, const char *str
return cj->valuestring;
}
KS_DECLARE(cJSON *) cJSON_CreatePtr(uintptr_t pointer)
{
// @todo check for 32bit and use integer storage instead
return cJSON_CreateStringPrintf("%p", (void *)pointer);
}
KS_DECLARE(uintptr_t) cJSON_GetPtrValue(const cJSON *object)
{
// @todo check for 32bit and use integer storage instead
void *pointer = NULL;
if (object && object->type == cJSON_String) sscanf_s(object->valuestring, "%p", &pointer);
return (uintptr_t)pointer;
}
KS_DECLARE(uintptr_t) cJSON_GetObjectPtr(const cJSON *object, const char *string)
{
return cJSON_GetPtrValue(cJSON_GetObjectItem(object, string));
}
/* For Emacs:
* Local Variables:
* mode:c