FS-9952: More work on the connection and transport code, couple things left to do but nearly ready for testing upto starting session negotiations

This commit is contained in:
Shane Bryldt 2017-02-10 02:17:20 +00:00 committed by Mike Jerris
parent eb4ece83b9
commit de1ddf187b
4 changed files with 398 additions and 136 deletions

View File

@ -37,16 +37,18 @@ struct blade_connection_s {
blade_handle_t *handle; blade_handle_t *handle;
ks_pool_t *pool; ks_pool_t *pool;
void *transport_init_data;
void *transport_data; void *transport_data;
blade_transport_callbacks_t *transport_callbacks; blade_transport_callbacks_t *transport_callbacks;
ks_bool_t shutdown; ks_bool_t shutdown;
// @todo add auto generated UUID // @todo add auto generated UUID
blade_connection_direction_t direction;
ks_thread_t *state_thread; ks_thread_t *state_thread;
blade_connection_state_t state; blade_connection_state_t state;
ks_q_t *sending; ks_q_t *sending;
ks_q_t *receiving; //ks_q_t *receiving;
}; };
void *blade_connection_state_thread(ks_thread_t *thread, void *data); void *blade_connection_state_thread(ks_thread_t *thread, void *data);
@ -54,7 +56,7 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data);
KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
blade_handle_t *bh, blade_handle_t *bh,
void *transport_data, void *transport_init_data,
blade_transport_callbacks_t *transport_callbacks) blade_transport_callbacks_t *transport_callbacks)
{ {
blade_connection_t *bc = NULL; blade_connection_t *bc = NULL;
@ -62,7 +64,6 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
ks_assert(bcP); ks_assert(bcP);
ks_assert(bh); ks_assert(bh);
ks_assert(transport_data);
ks_assert(transport_callbacks); ks_assert(transport_callbacks);
pool = blade_handle_pool_get(bh); pool = blade_handle_pool_get(bh);
@ -70,10 +71,10 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
bc = ks_pool_alloc(pool, sizeof(blade_connection_t)); bc = ks_pool_alloc(pool, sizeof(blade_connection_t));
bc->handle = bh; bc->handle = bh;
bc->pool = pool; bc->pool = pool;
bc->transport_data = transport_data; bc->transport_init_data = transport_init_data;
bc->transport_callbacks = transport_callbacks; bc->transport_callbacks = transport_callbacks;
ks_q_create(&bc->sending, pool, 0); ks_q_create(&bc->sending, pool, 0);
ks_q_create(&bc->receiving, pool, 0); //ks_q_create(&bc->receiving, pool, 0);
*bcP = bc; *bcP = bc;
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
@ -91,17 +92,18 @@ KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP)
blade_connection_shutdown(bc); blade_connection_shutdown(bc);
ks_q_destroy(&bc->sending); ks_q_destroy(&bc->sending);
ks_q_destroy(&bc->receiving); //ks_q_destroy(&bc->receiving);
ks_pool_free(bc->pool, bcP); ks_pool_free(bc->pool, bcP);
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc) KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction)
{ {
ks_assert(bc); ks_assert(bc);
bc->direction = direction;
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE); blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE);
if (ks_thread_create_ex(&bc->state_thread, if (ks_thread_create_ex(&bc->state_thread,
@ -112,7 +114,6 @@ KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc)
KS_PRI_NORMAL, KS_PRI_NORMAL,
bc->pool) != KS_STATUS_SUCCESS) { bc->pool) != KS_STATUS_SUCCESS) {
// @todo error logging // @todo error logging
blade_connection_disconnect(bc);
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
@ -136,6 +137,13 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc)
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc)
{
ks_assert(bc);
return bc->transport_init_data;
}
KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc) KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc)
{ {
ks_assert(bc); ks_assert(bc);
@ -143,19 +151,72 @@ KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc)
return bc->transport_data; return bc->transport_data;
} }
KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state) KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data)
{ {
ks_assert(bc); ks_assert(bc);
bc->transport_callbacks->onstate(bc, state, BLADE_CONNECTION_STATE_CONDITION_PRE); bc->transport_data = transport_data;
}
blade_transport_state_callback_t blade_connection_state_callback_lookup(blade_connection_t *bc, blade_connection_state_t state)
{
blade_transport_state_callback_t callback = NULL;
ks_assert(bc);
switch (state) {
case BLADE_CONNECTION_STATE_DISCONNECT:
if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_disconnect_inbound;
else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_disconnect_outbound;
break;
case BLADE_CONNECTION_STATE_NEW:
if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_new_inbound;
else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_new_outbound;
break;
case BLADE_CONNECTION_STATE_CONNECT:
if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_connect_inbound;
else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_connect_outbound;
break;
case BLADE_CONNECTION_STATE_ATTACH:
if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_attach_inbound;
else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_attach_outbound;
break;
case BLADE_CONNECTION_STATE_DETACH:
if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_detach_inbound;
else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_detach_outbound;
break;
case BLADE_CONNECTION_STATE_READY:
if (bc->direction == BLADE_CONNECTION_DIRECTION_INBOUND) callback = bc->transport_callbacks->onstate_ready_inbound;
else if(bc->direction == BLADE_CONNECTION_DIRECTION_OUTBOUND) callback = bc->transport_callbacks->onstate_ready_outbound;
break;
default: break;
}
return callback;
}
KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state)
{
blade_transport_state_callback_t callback = NULL;
blade_connection_state_hook_t hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
ks_assert(bc);
callback = blade_connection_state_callback_lookup(bc, state);
if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_PRE);
bc->state = state; bc->state = state;
if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc);
} }
KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc) KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc)
{ {
ks_assert(bc); ks_assert(bc);
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT); if (bc->state != BLADE_CONNECTION_STATE_DETACH && bc->state != BLADE_CONNECTION_STATE_DISCONNECT)
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DETACH);
} }
KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, blade_identity_t *target, cJSON *json) KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, blade_identity_t *target, cJSON *json)
@ -178,26 +239,30 @@ KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, bla
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
KS_DECLARE(ks_status_t) blade_connection_receiving_push(blade_connection_t *bc, cJSON *json) // @todo may not need receiving queue on connection, by the time we are queueing we should have a session to receive into
{ //KS_DECLARE(ks_status_t) blade_connection_receiving_push(blade_connection_t *bc, cJSON *json)
ks_assert(bc); //{
ks_assert(json); // ks_assert(bc);
// ks_assert(json);
return ks_q_push(bc->receiving, json); // return ks_q_push(bc->receiving, json);
} //}
KS_DECLARE(ks_status_t) blade_connection_receiving_pop(blade_connection_t *bc, cJSON **json) //KS_DECLARE(ks_status_t) blade_connection_receiving_pop(blade_connection_t *bc, cJSON **json)
{ //{
ks_assert(bc); // ks_assert(bc);
ks_assert(json); // ks_assert(json);
return ks_q_trypop(bc->receiving, (void **)json); // return ks_q_trypop(bc->receiving, (void **)json);
} //}
void *blade_connection_state_thread(ks_thread_t *thread, void *data) void *blade_connection_state_thread(ks_thread_t *thread, void *data)
{ {
blade_connection_t *bc = NULL; blade_connection_t *bc = NULL;
blade_connection_state_hook_t hook; blade_connection_state_t state;
blade_transport_state_callback_t callback = NULL;
blade_connection_state_hook_t hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
cJSON *json = NULL;
ks_assert(thread); ks_assert(thread);
ks_assert(data); ks_assert(data);
@ -205,20 +270,34 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
bc = (blade_connection_t *)data; bc = (blade_connection_t *)data;
while (!bc->shutdown) { while (!bc->shutdown) {
// @todo need to get messages from the transport into receiving queue, and pop messages from sending queue to write out using transport
// sending is relatively easy, but receiving cannot occur universally due to cases like kws_init() blocking and expecting data to be on the wire // @todo pop from connection sending queue and call transport callback to write one message (passing target identity too)
// and other transports may have similar behaviours, but CONNECTIN, ATTACH, and READY require async message passing into application layer // and delete the cJSON object here after returning from callback
// and sending whenever the response hits the queue
// @todo it's possible that onstate could handle receiving and sending messages during the appropriate states, but this means some states // @todo seems like connection will not need a receiving queue as the session will exist prior to async transmissions
// like CONNECTIN which may send and receive multiple messages require BYPASSing until the application layer updates the state or disconnects
state = bc->state;
hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
callback = blade_connection_state_callback_lookup(bc, state);
// @todo should this just go in the ready state callback? it's generalized here, so the callback for READY doesn't really
// need to do anything
if (state == BLADE_CONNECTION_STATE_READY && bc->transport_callbacks->onreceive(bc, &json) == KS_STATUS_SUCCESS && json) {
// @todo push json to session receiving queue
}
hook = bc->transport_callbacks->onstate(bc, bc->state, BLADE_CONNECTION_STATE_CONDITION_POST); if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST);
if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT)
blade_connection_disconnect(bc); if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT && (state == BLADE_CONNECTION_STATE_DETACH || state == BLADE_CONNECTION_STATE_DISCONNECT))
hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
if (hook == BLADE_CONNECTION_STATE_HOOK_DISCONNECT) blade_connection_disconnect(bc);
else if (hook == BLADE_CONNECTION_STATE_HOOK_SUCCESS) { else if (hook == BLADE_CONNECTION_STATE_HOOK_SUCCESS) {
// @todo pop from sending queue, and pass to transport callback to send out switch (state) {
switch (bc->state) { case BLADE_CONNECTION_STATE_DISCONNECT:
return NULL;
case BLADE_CONNECTION_STATE_NEW: case BLADE_CONNECTION_STATE_NEW:
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_CONNECT); blade_connection_state_set(bc, BLADE_CONNECTION_STATE_CONNECT);
break; break;
@ -226,10 +305,14 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_ATTACH); blade_connection_state_set(bc, BLADE_CONNECTION_STATE_ATTACH);
break; break;
case BLADE_CONNECTION_STATE_ATTACH: case BLADE_CONNECTION_STATE_ATTACH:
// @todo receive message with nullable session id for reconnect and some sort of secure token for a reconnect challenge?
// determine how much of session management is handled here... do we process these session negotiation messages without
// passing it up to the application layer? or does the application layer give back a session and build the response?
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY); blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY);
break; break;
case BLADE_CONNECTION_STATE_DETACH: case BLADE_CONNECTION_STATE_DETACH:
blade_connection_disconnect(bc); // @todo detach from session if this connection is attached
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT);
break; break;
default: break; default: break;
} }

View File

@ -37,6 +37,7 @@
typedef struct blade_module_wss_s blade_module_wss_t; typedef struct blade_module_wss_s blade_module_wss_t;
typedef struct blade_transport_wss_s blade_transport_wss_t; typedef struct blade_transport_wss_s blade_transport_wss_t;
typedef struct blade_transport_wss_init_s blade_transport_wss_init_t;
struct blade_module_wss_s { struct blade_module_wss_s {
blade_handle_t *handle; blade_handle_t *handle;
@ -70,42 +71,81 @@ struct blade_transport_wss_s {
kws_t *kws; kws_t *kws;
}; };
struct blade_transport_wss_init_s {
blade_module_wss_t *module;
ks_pool_t *pool;
ks_socket_t sock;
};
ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t *bh); ks_status_t blade_module_wss_create(blade_module_wss_t **bm_wssP, blade_handle_t *bh);
ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP); ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP);
ks_status_t blade_module_wss_onload(blade_module_t **bmP, blade_handle_t *bh); ks_status_t blade_module_wss_on_load(blade_module_t **bmP, blade_handle_t *bh);
ks_status_t blade_module_wss_onunload(blade_module_t *bm); ks_status_t blade_module_wss_on_unload(blade_module_t *bm);
ks_status_t blade_module_wss_onstartup(blade_module_t *bm, config_setting_t *config); ks_status_t blade_module_wss_on_startup(blade_module_t *bm, config_setting_t *config);
ks_status_t blade_module_wss_onshutdown(blade_module_t *bm); ks_status_t blade_module_wss_on_shutdown(blade_module_t *bm);
ks_status_t blade_module_wss_listen(blade_module_wss_t *bm, ks_sockaddr_t *addr); ks_status_t blade_module_wss_listen(blade_module_wss_t *bm, ks_sockaddr_t *addr);
void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data); void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data);
ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_module_wss_t *bm_wss, ks_socket_t sock); ks_status_t blade_transport_wss_create(blade_transport_wss_t **bt_wssP, blade_module_wss_t *bm_wss, ks_socket_t sock);
ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP); ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP);
ks_status_t blade_transport_wss_onconnect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target); ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target);
blade_connection_rank_t blade_transport_wss_onrank(blade_connection_t *bc, blade_identity_t *target); blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target);
blade_connection_state_hook_t blade_transport_wss_onstate(blade_connection_t *bc, blade_connection_state_t state, blade_connection_state_condition_t condition);
ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t *target, cJSON *json);
ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json);
blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition);
blade_connection_state_hook_t blade_transport_wss_on_state_new_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
blade_connection_state_hook_t blade_transport_wss_on_state_new_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
blade_connection_state_hook_t blade_transport_wss_on_state_connect_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition);
blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connection_t *bc, blade_connection_state_condition_t condition);
blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connection_t *bc, blade_connection_state_condition_t condition);
ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock);
ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP);
static blade_module_callbacks_t g_module_wss_callbacks = static blade_module_callbacks_t g_module_wss_callbacks =
{ {
blade_module_wss_onload, blade_module_wss_on_load,
blade_module_wss_onunload, blade_module_wss_on_unload,
blade_module_wss_onstartup, blade_module_wss_on_startup,
blade_module_wss_onshutdown, blade_module_wss_on_shutdown,
}; };
static blade_transport_callbacks_t g_transport_wss_callbacks = static blade_transport_callbacks_t g_transport_wss_callbacks =
{ {
blade_transport_wss_onconnect, blade_transport_wss_on_connect,
blade_transport_wss_onrank, blade_transport_wss_on_rank,
blade_transport_wss_onstate, blade_transport_wss_on_send,
blade_transport_wss_on_receive,
blade_transport_wss_on_state_disconnect,
blade_transport_wss_on_state_disconnect,
blade_transport_wss_on_state_new_inbound,
blade_transport_wss_on_state_new_outbound,
blade_transport_wss_on_state_connect_inbound,
blade_transport_wss_on_state_connect_outbound,
blade_transport_wss_on_state_attach_inbound,
blade_transport_wss_on_state_attach_outbound,
blade_transport_wss_on_state_detach,
blade_transport_wss_on_state_detach,
blade_transport_wss_on_state_ready,
blade_transport_wss_on_state_ready,
}; };
@ -144,7 +184,7 @@ ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP)
bm_wss = *bm_wssP; bm_wss = *bm_wssP;
blade_module_wss_onshutdown(bm_wss->module); blade_module_wss_on_shutdown(bm_wss->module);
blade_module_destroy(&bm_wss->module); blade_module_destroy(&bm_wss->module);
@ -156,7 +196,7 @@ ks_status_t blade_module_wss_destroy(blade_module_wss_t **bm_wssP)
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
ks_status_t blade_module_wss_onload(blade_module_t **bmP, blade_handle_t *bh) ks_status_t blade_module_wss_on_load(blade_module_t **bmP, blade_handle_t *bh)
{ {
blade_module_wss_t *bm_wss = NULL; blade_module_wss_t *bm_wss = NULL;
@ -171,7 +211,7 @@ ks_status_t blade_module_wss_onload(blade_module_t **bmP, blade_handle_t *bh)
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
ks_status_t blade_module_wss_onunload(blade_module_t *bm) ks_status_t blade_module_wss_on_unload(blade_module_t *bm)
{ {
blade_module_wss_t *bm_wss = NULL; blade_module_wss_t *bm_wss = NULL;
@ -293,7 +333,7 @@ ks_status_t blade_module_wss_config(blade_module_wss_t *bm_wss, config_setting_t
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
ks_status_t blade_module_wss_onstartup(blade_module_t *bm, config_setting_t *config) ks_status_t blade_module_wss_on_startup(blade_module_t *bm, config_setting_t *config)
{ {
blade_module_wss_t *bm_wss = NULL; blade_module_wss_t *bm_wss = NULL;
@ -331,7 +371,7 @@ ks_status_t blade_module_wss_onstartup(blade_module_t *bm, config_setting_t *con
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
ks_status_t blade_module_wss_onshutdown(blade_module_t *bm) ks_status_t blade_module_wss_on_shutdown(blade_module_t *bm)
{ {
blade_module_wss_t *bm_wss = NULL; blade_module_wss_t *bm_wss = NULL;
blade_transport_wss_t *bt_wss = NULL; blade_transport_wss_t *bt_wss = NULL;
@ -423,6 +463,7 @@ ks_status_t blade_module_wss_listen(blade_module_wss_t *bm_wss, ks_sockaddr_t *a
void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
{ {
blade_module_wss_t *bm_wss = NULL; blade_module_wss_t *bm_wss = NULL;
blade_transport_wss_init_t *bt_wss_init = NULL;
blade_transport_wss_t *bt_wss = NULL; blade_transport_wss_t *bt_wss = NULL;
blade_connection_t *bc = NULL; blade_connection_t *bc = NULL;
@ -448,27 +489,32 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
continue; continue;
} }
blade_transport_wss_create(&bt_wss, bm_wss, sock); blade_transport_wss_init_create(&bt_wss_init, bm_wss, sock);
ks_assert(bt_wss); ks_assert(bt_wss_init);
blade_connection_create(&bc, bm_wss->handle, bt_wss, bm_wss->transport_callbacks); blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks);
ks_assert(bc); ks_assert(bc);
blade_connection_startup(bc); if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_INBOUND) != KS_STATUS_SUCCESS) {
blade_connection_destroy(&bc);
blade_transport_wss_init_destroy(&bt_wss_init);
ks_socket_close(&sock);
continue;
}
list_append(&bm_wss->connected, bc); list_append(&bm_wss->connected, bc);
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW); blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW);
} }
} }
while (ks_q_trypop(bm_wss->disconnected, (void **)&bc) == KS_STATUS_SUCCESS) { while (ks_q_trypop(bm_wss->disconnected, (void **)&bc) == KS_STATUS_SUCCESS) {
bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
list_delete(&bm_wss->connected, bc); list_delete(&bm_wss->connected, bc);
if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init);
blade_connection_destroy(&bc); blade_connection_destroy(&bc);
blade_transport_wss_destroy(&bt_wss); if (bt_wss) blade_transport_wss_destroy(&bt_wss);
} }
} }
@ -512,7 +558,7 @@ ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP)
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
ks_status_t blade_transport_wss_onconnect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target) ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target)
{ {
ks_assert(bcP); ks_assert(bcP);
ks_assert(bm); ks_assert(bm);
@ -525,14 +571,44 @@ ks_status_t blade_transport_wss_onconnect(blade_connection_t **bcP, blade_module
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
blade_connection_rank_t blade_transport_wss_onrank(blade_connection_t *bc, blade_identity_t *target) blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target)
{ {
ks_assert(bc); ks_assert(bc);
ks_assert(target); ks_assert(target);
return BLADE_CONNECTION_RANK_POOR;
}
ks_status_t blade_transport_wss_write(blade_transport_wss_t *bt_wss, cJSON *json)
{
char *json_str = cJSON_PrintUnformatted(json);
ks_size_t json_str_len = 0;
if (!json_str) {
// @todo error logging
return KS_STATUS_FAIL;
}
json_str_len = strlen(json_str) + 1; // @todo determine if WSOC_TEXT null terminates when read_frame is called, or if it's safe to include like this
kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len);
free(json_str);
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t *target, cJSON *json)
{
blade_transport_wss_t *bt_wss = NULL;
ks_assert(bc);
ks_assert(json);
ks_log(KS_LOG_DEBUG, "Send Callback\n");
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
return blade_transport_wss_write(bt_wss, json);
}
ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json) ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json)
{ {
// @todo get exact timeout from service config? // @todo get exact timeout from service config?
@ -559,12 +635,6 @@ ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
//if (blade_handle_message_claim(blade_service_handle(peer->service), &message, frame_data, frame_data_len) != KS_STATUS_SUCCESS || !message) {
// @todo error logging
// return KS_STATUS_FAIL;
//}
// @todo convert frame_data to cJSON safely, make sure data is null-terminated at frame_data_len
if (!(*json = cJSON_Parse((char *)frame_data))) { if (!(*json = cJSON_Parse((char *)frame_data))) {
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
@ -572,77 +642,170 @@ ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
ks_status_t blade_transport_wss_write(blade_transport_wss_t *bt_wss, cJSON *json) ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json)
{
//blade_message_get(message, &target, &json);
char *json_str = cJSON_PrintUnformatted(json);
ks_size_t json_str_len = 0;
if (!json_str) {
// @todo error logging
return KS_STATUS_FAIL;
}
json_str_len = strlen(json_str) + 1;
kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len);
return KS_STATUS_SUCCESS;
}
blade_connection_state_hook_t blade_transport_wss_onstate(blade_connection_t *bc, blade_connection_state_t state, blade_connection_state_condition_t condition)
{ {
blade_transport_wss_t *bt_wss = NULL; blade_transport_wss_t *bt_wss = NULL;
//cJSON *json = NULL;
ks_assert(bc); ks_assert(bc);
ks_assert(json);
ks_log(KS_LOG_DEBUG, "Receive Callback\n");
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
switch (state) { return blade_transport_wss_read(bt_wss, json);
case BLADE_CONNECTION_STATE_DISCONNECT: }
{
if (condition == BLADE_CONNECTION_STATE_CONDITION_POST) {
ks_q_push(bt_wss->module->disconnected, bc);
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NONE);
}
break;
}
case BLADE_CONNECTION_STATE_NEW:
{
if (condition == BLADE_CONNECTION_STATE_CONDITION_POST) {
// @todo: SSL init stuffs based on data from peer->service->config_websockets_ssl to pass into kws_init
if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, NULL, KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) {
// @todo error logging
return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
}
}
break;
}
case BLADE_CONNECTION_STATE_CONNECT:
{
// @todo abstract read message and write message, so these can be called from connection and processed from there
//if (blade_transport_wss_read(bt_wss, &json) != KS_STATUS_SUCCESS) return BLADE_CONNECTION_STATEHOOK_DISCONNECT;
//if (json) { blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition)
// @todo processing connectin messages for identity registration {
// cJSON_Delete(json); blade_transport_wss_t *bt_wss = NULL;
//blade_connection_receiving_push(conn, json);
//} ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
ks_q_push(bt_wss->module->disconnected, bc);
// @todo wrap identity + json into an envelope for queueing through the connection
//while (blade_connection_sending_pop(bc, (void **)&json) == KS_STATUS_SUCCESS && json) {
// ks_status_t ret = blade_transport_wss_write(bt_wss, json);
// cJSON_Delete(json);
// if (ret != KS_STATUS_SUCCESS) return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
//}
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
//break;
}
default: break;
}
return BLADE_CONNECTION_STATE_HOOK_SUCCESS; return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
} }
blade_connection_state_hook_t blade_transport_wss_on_state_new_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
blade_transport_wss_t *bt_wss = NULL;
blade_transport_wss_init_t *bt_wss_init = NULL;
ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc);
blade_transport_wss_create(&bt_wss, bt_wss_init->module, bt_wss_init->sock);
ks_assert(bt_wss);
blade_connection_transport_set(bc, bt_wss);
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
blade_connection_state_hook_t blade_transport_wss_on_state_new_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
blade_connection_state_hook_t blade_transport_wss_on_state_connect_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
blade_transport_wss_t *bt_wss = NULL;
ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
if (condition == BLADE_CONNECTION_STATE_CONDITION_PRE) return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
// @todo: SSL init stuffs based on data from config to pass into kws_init
if (kws_init(&bt_wss->kws, bt_wss->sock, NULL, NULL, KWS_BLOCK, bt_wss->pool) != KS_STATUS_SUCCESS) {
// @todo error logging
return BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
}
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
// @todo Establish sessid and discover existing session or create and register new session through BLADE commands
// Set session state to CONNECT if its new or RECONNECT if existing
// start session and its thread if its new
return BLADE_CONNECTION_STATE_HOOK_BYPASS;
}
blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
ks_assert(bc);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
ks_status_t blade_transport_wss_init_create(blade_transport_wss_init_t **bt_wssiP, blade_module_wss_t *bm_wss, ks_socket_t sock)
{
blade_transport_wss_init_t *bt_wssi = NULL;
ks_assert(bt_wssiP);
ks_assert(bm_wss);
ks_assert(sock != KS_SOCK_INVALID);
bt_wssi = ks_pool_alloc(bm_wss->pool, sizeof(blade_transport_wss_init_t));
bt_wssi->module = bm_wss;
bt_wssi->pool = bm_wss->pool;
bt_wssi->sock = sock;
*bt_wssiP = bt_wssi;
return KS_STATUS_SUCCESS;
}
ks_status_t blade_transport_wss_init_destroy(blade_transport_wss_init_t **bt_wssiP)
{
blade_transport_wss_init_t *bt_wssi = NULL;
ks_assert(bt_wssiP);
ks_assert(*bt_wssiP);
bt_wssi = *bt_wssiP;
ks_pool_free(bt_wssi->pool, bt_wssiP);
return KS_STATUS_SUCCESS;
}
/* For Emacs: /* For Emacs:
* Local Variables: * Local Variables:
* mode:c * mode:c

View File

@ -41,9 +41,11 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP,
void *transport_data, void *transport_data,
blade_transport_callbacks_t *transport_callbacks); blade_transport_callbacks_t *transport_callbacks);
KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP); KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP);
KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc); KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction);
KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc); KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc);
KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc);
KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc); KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc);
KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data);
KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state); KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state);
KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc); KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc);
KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target); KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target);

View File

@ -62,8 +62,8 @@ typedef enum {
} blade_connection_state_t; } blade_connection_state_t;
typedef enum { typedef enum {
BLADE_CONNECTION_DIRECTION_IN, BLADE_CONNECTION_DIRECTION_INBOUND,
BLADE_CONNECTION_DIRECTION_OUT, BLADE_CONNECTION_DIRECTION_OUTBOUND,
} blade_connection_direction_t; } blade_connection_direction_t;
typedef enum { typedef enum {
@ -99,14 +99,28 @@ struct blade_module_callbacks_s {
typedef ks_status_t (*blade_transport_connect_callback_t)(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target); typedef ks_status_t (*blade_transport_connect_callback_t)(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target);
typedef blade_connection_rank_t (*blade_transport_rank_callback_t)(blade_connection_t *bc, blade_identity_t *target); typedef blade_connection_rank_t (*blade_transport_rank_callback_t)(blade_connection_t *bc, blade_identity_t *target);
typedef blade_connection_state_hook_t (*blade_transport_state_callback_t)(blade_connection_t *bc, typedef ks_status_t (*blade_transport_send_callback_t)(blade_connection_t *bc, blade_identity_t *target, cJSON *json);
blade_connection_state_t state, typedef ks_status_t (*blade_transport_receive_callback_t)(blade_connection_t *bc, cJSON **json);
blade_connection_state_condition_t condition); typedef blade_connection_state_hook_t (*blade_transport_state_callback_t)(blade_connection_t *bc, blade_connection_state_condition_t condition);
struct blade_transport_callbacks_s { struct blade_transport_callbacks_s {
blade_transport_connect_callback_t onconnect; blade_transport_connect_callback_t onconnect;
blade_transport_rank_callback_t onrank; blade_transport_rank_callback_t onrank;
blade_transport_state_callback_t onstate; blade_transport_send_callback_t onsend;
blade_transport_receive_callback_t onreceive;
blade_transport_state_callback_t onstate_disconnect_inbound;
blade_transport_state_callback_t onstate_disconnect_outbound;
blade_transport_state_callback_t onstate_new_inbound;
blade_transport_state_callback_t onstate_new_outbound;
blade_transport_state_callback_t onstate_connect_inbound;
blade_transport_state_callback_t onstate_connect_outbound;
blade_transport_state_callback_t onstate_attach_inbound;
blade_transport_state_callback_t onstate_attach_outbound;
blade_transport_state_callback_t onstate_detach_inbound;
blade_transport_state_callback_t onstate_detach_outbound;
blade_transport_state_callback_t onstate_ready_inbound;
blade_transport_state_callback_t onstate_ready_outbound;
}; };