From ca19bee71c2ea479fe3db31d1f0ee50c57592d15 Mon Sep 17 00:00:00 2001
From: Shane Bryldt <astaelan@gmail.com>
Date: Sat, 10 Jun 2017 20:30:58 -0600
Subject: [PATCH] FS-10167: Update to the preliminary blade.subscribe support,
 added registration of local callback for processing an event received through
 upcoming blade.broadcast

---
 libs/libblade/src/blade_stack.c               | 40 +++++++++++++------
 libs/libblade/src/blade_subscription.c        | 31 ++++++++++++++
 libs/libblade/src/include/blade_stack.h       |  6 +--
 .../libblade/src/include/blade_subscription.h |  4 ++
 libs/libblade/test/bladec.c                   | 22 +++++++++-
 5 files changed, 86 insertions(+), 17 deletions(-)

diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c
index dbfff34eb1..973f3a416f 100644
--- a/libs/libblade/src/blade_stack.c
+++ b/libs/libblade/src/blade_stack.c
@@ -874,7 +874,7 @@ KS_DECLARE(blade_rpc_t *) blade_handle_protocolrpc_lookup(blade_handle_t *bh, co
 	return brpc;
 }
 
-KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid)
+KS_DECLARE(ks_bool_t) blade_handle_subscriber_add(blade_handle_t *bh, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *nodeid)
 {
 	char *key = NULL;
 	blade_subscription_t *bsub = NULL;
@@ -919,12 +919,12 @@ KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const ch
 
 	ks_pool_free(bh->pool, &key);
 
-	if (propagate) blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_FALSE, NULL, NULL);
+	if (bsubP) *bsubP = bsub;
 
-	return KS_STATUS_SUCCESS;
+	return propagate;
 }
 
-KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid)
+KS_DECLARE(ks_bool_t) blade_handle_subscriber_remove(blade_handle_t *bh, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *nodeid)
 {
 	char *key = NULL;
 	blade_subscription_t *bsub = NULL;
@@ -966,9 +966,9 @@ KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const
 
 	ks_pool_free(bh->pool, &key);
 
-	if (propagate) blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_TRUE, NULL, NULL);
+	if (bsubP) *bsubP = bsub;
 
-	return KS_STATUS_SUCCESS;
+	return propagate;
 }
 
 
@@ -1129,7 +1129,9 @@ KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs)
 		ks_hash_read_unlock(bh->subscriptions);
 
 		if (!unsubbed) {
-			blade_handle_subscriber_remove(bh, event, protocol, realm, id);
+			if (blade_handle_subscriber_remove(bh, NULL, event, protocol, realm, id)) {
+				blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_TRUE, NULL, NULL);
+			}
 			ks_pool_free(bh->pool, &event);
 			ks_pool_free(bh->pool, &protocol);
 			ks_pool_free(bh->pool, &realm);
@@ -1968,10 +1970,12 @@ KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcr
 
 
 // blade.subscribe request generator
-KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data)
 {
 	ks_status_t ret = KS_STATUS_SUCCESS;
 	blade_session_t *bs = NULL;
+	ks_bool_t propagate = KS_FALSE;
+	blade_subscription_t *bsub = NULL;
 
 	ks_assert(bh);
 	ks_assert(event);
@@ -1982,12 +1986,19 @@ KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char
 		ret = KS_STATUS_DISCONNECTED;
 		goto done;
 	}
-
+	
 	if (remove) {
-		blade_handle_subscriber_remove(bh, event, protocol, realm, bh->local_nodeid);
+		propagate = blade_handle_subscriber_remove(bh, &bsub, event, protocol, realm, bh->local_nodeid);
 	} else {
-		blade_handle_subscriber_add(bh, event, protocol, realm, bh->local_nodeid);
+		propagate = blade_handle_subscriber_add(bh, &bsub, event, protocol, realm, bh->local_nodeid);
+		ks_assert(event_callback);
 	}
+	if (bsub) {
+		blade_subscription_callback_set(bsub, event_callback);
+		blade_subscription_callback_data_set(bsub, event_data);
+	}
+
+	if (propagate) ret = blade_protocol_subscribe_raw(bh, event, protocol, realm, remove, callback, data);
 
 done:
 	if (bs) blade_session_read_unlock(bs);
@@ -2049,6 +2060,7 @@ ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq,
 	ks_bool_t remove = KS_FALSE;
 	cJSON *res = NULL;
 	cJSON *res_result = NULL;
+	ks_bool_t propagate = KS_FALSE;
 
 	ks_assert(brpcreq);
 
@@ -2104,11 +2116,13 @@ ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq,
 	ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request processing\n", blade_session_id_get(bs));
 
 	if (remove) {
-		blade_handle_subscriber_remove(bh, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
+		propagate = blade_handle_subscriber_remove(bh, NULL, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
 	} else {
-		blade_handle_subscriber_add(bh, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
+		propagate = blade_handle_subscriber_add(bh, NULL, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
 	}
 
+	if (propagate) blade_protocol_subscribe_raw(bh, req_params_event, req_params_protocol, req_params_realm, remove, NULL, NULL);
+
 	// build the actual response finally
 	blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
 
diff --git a/libs/libblade/src/blade_subscription.c b/libs/libblade/src/blade_subscription.c
index b17e351642..599e7d01c0 100644
--- a/libs/libblade/src/blade_subscription.c
+++ b/libs/libblade/src/blade_subscription.c
@@ -40,6 +40,9 @@ struct blade_subscription_s {
 	const char *protocol;
 	const char *realm;
 	ks_hash_t *subscribers;
+
+	blade_rpc_request_callback_t callback;
+	void *callback_data;
 };
 
 
@@ -160,6 +163,34 @@ KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription
 
 }
 
+KS_DECLARE(blade_rpc_request_callback_t) blade_subscription_callback_get(blade_subscription_t *bsub)
+{
+	ks_assert(bsub);
+
+	return bsub->callback;
+}
+
+KS_DECLARE(void) blade_subscription_callback_set(blade_subscription_t *bsub, blade_rpc_request_callback_t callback)
+{
+	ks_assert(bsub);
+
+	bsub->callback = callback;
+}
+
+KS_DECLARE(void *) blade_subscription_callback_data_get(blade_subscription_t *bsub)
+{
+	ks_assert(bsub);
+
+	return bsub->callback_data;
+}
+
+KS_DECLARE(void) blade_subscription_callback_data_set(blade_subscription_t *bsub, void *data)
+{
+	ks_assert(bsub);
+
+	bsub->callback_data = data;
+}
+
 /* For Emacs:
  * Local Variables:
  * mode:c
diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h
index c460b8d839..4f9e69467b 100644
--- a/libs/libblade/src/include/blade_stack.h
+++ b/libs/libblade/src/include/blade_stack.h
@@ -78,8 +78,8 @@ KS_DECLARE(ks_status_t) blade_handle_protocolrpc_register(blade_rpc_t *brpc);
 KS_DECLARE(ks_status_t) blade_handle_protocolrpc_unregister(blade_rpc_t *brpc);
 KS_DECLARE(blade_rpc_t *) blade_handle_protocolrpc_lookup(blade_handle_t *bh, const char *method, const char *protocol, const char *realm);
 
-KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid);
-KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid);
+KS_DECLARE(ks_bool_t) blade_handle_subscriber_add(blade_handle_t *bh, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *nodeid);
+KS_DECLARE(ks_bool_t) blade_handle_subscriber_remove(blade_handle_t *bh, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *nodeid);
 
 
 KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id);
@@ -109,7 +109,7 @@ KS_DECLARE(cJSON *) blade_protocol_execute_request_params_get(blade_rpc_request_
 KS_DECLARE(cJSON *) blade_protocol_execute_response_result_get(blade_rpc_response_t *brpcres);
 KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
 
-KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data);
 KS_DECLARE(ks_status_t) blade_protocol_subscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
 
 KS_END_EXTERN_C
diff --git a/libs/libblade/src/include/blade_subscription.h b/libs/libblade/src/include/blade_subscription.h
index c6528aa068..a0057e8472 100644
--- a/libs/libblade/src/include/blade_subscription.h
+++ b/libs/libblade/src/include/blade_subscription.h
@@ -44,6 +44,10 @@ KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub
 KS_DECLARE(ks_hash_t *) blade_subscription_subscribers_get(blade_subscription_t *bsub);
 KS_DECLARE(ks_status_t) blade_subscription_subscribers_add(blade_subscription_t *bsub, const char *nodeid);
 KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription_t *bsub, const char *nodeid);
+KS_DECLARE(blade_rpc_request_callback_t) blade_subscription_callback_get(blade_subscription_t *bsub);
+KS_DECLARE(void) blade_subscription_callback_set(blade_subscription_t *bsub, blade_rpc_request_callback_t callback);
+KS_DECLARE(void *) blade_subscription_callback_data_get(blade_subscription_t *bsub);
+KS_DECLARE(void) blade_subscription_callback_data_set(blade_subscription_t *bsub, void *data);
 KS_END_EXTERN_C
 
 #endif
diff --git a/libs/libblade/test/bladec.c b/libs/libblade/test/bladec.c
index c2e9011e37..1d38aa3d53 100644
--- a/libs/libblade/test/bladec.c
+++ b/libs/libblade/test/bladec.c
@@ -131,6 +131,26 @@ ks_bool_t blade_subscribe_response_handler(blade_rpc_response_t *brpcres, void *
 	return KS_FALSE;
 }
 
+ks_bool_t test_event_request_handler(blade_rpc_request_t *brpcreq, void *data)
+{
+	blade_handle_t *bh = NULL;
+	blade_session_t *bs = NULL;
+
+	ks_assert(brpcreq);
+
+	bh = blade_rpc_request_handle_get(brpcreq);
+	ks_assert(bh);
+
+	bs = blade_handle_sessions_lookup(bh, blade_rpc_request_sessionid_get(brpcreq));
+	ks_assert(bs);
+
+	ks_log(KS_LOG_DEBUG, "Session (%s) test.event request processing\n", blade_session_id_get(bs));
+
+	blade_session_read_unlock(bs);
+
+	return KS_FALSE;
+}
+
 int main(int argc, char **argv)
 {
 	blade_handle_t *bh = NULL;
@@ -273,7 +293,7 @@ void command_subscribe(blade_handle_t *bh, char *args)
 	ks_assert(bh);
 	ks_assert(args);
 
-	blade_protocol_subscribe(bh, "test.event", "test", "mydomain.com", KS_FALSE, blade_subscribe_response_handler, NULL);
+	blade_protocol_subscribe(bh, "test.event", "test", "mydomain.com", KS_FALSE, blade_subscribe_response_handler, NULL, test_event_request_handler, NULL);
 }
 
 /* For Emacs: