FS-9775: Overhauled query/response handling by implementing a reusable job system to handle the common plumbing

This commit is contained in:
Shane Bryldt 2016-12-18 21:15:47 +00:00 committed by Mike Jerris
parent 767326b047
commit 41731d553a
8 changed files with 525 additions and 348 deletions

View File

@ -14,7 +14,7 @@ libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c
libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp
libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c
libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_datagram.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c
libks_la_SOURCES += src/dht/ks_dht_search.c src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c
libks_la_SOURCES += src/dht/ks_dht_job.c src/dht/ks_dht_search.c src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c
libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c
#aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h

View File

@ -19,7 +19,7 @@ KS_BEGIN_EXTERN_C
* @see ks_addr_set
* @see ks_dht_bind
*/
KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint);
KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint);
/**
* Called internally to expire various data.
@ -131,7 +131,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const
* @param token pointer to the output token being generated
* @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL
*/
KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token);
KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token);
/**
* Verify an opaque write token matches the provided remote address and target nodeid.
@ -142,7 +142,7 @@ KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *ra
* @param token pointer to the input token being compared
* @return Either KS_TRUE if verification passes, otherwise KS_FALSE
*/
KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token);
KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token);
/**
* Encodes a message for transmission as a UDP datagram and sends it.
@ -158,8 +158,7 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message);
* Sets up the common parts of a query message.
* Determines the local endpoint aware of autorouting, assigns the remote address, generates a transaction, and queues a callback.
* @param dht pointer to the dht instance
* @param ep pointer to the endpoint, may be NULL to find an endpoint or autoroute one
* @param raddr pointer to the remote address
* @param job pointer to the job
* @param query string value of the query type, for example "ping"
* @param callback callback to be called when response to transaction is received
* @param transaction dereferenced out pointer to the allocated transaction, may be NULL to ignore output
@ -174,11 +173,10 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message);
* @see ks_dht_message_query
* @see ks_hash_insert
*/
KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht,
ks_dht_job_t *job,
const char *query,
ks_dht_message_callback_t callback,
ks_dht_job_callback_t callback,
ks_dht_transaction_t **transaction,
ks_dht_message_t **message,
struct bencode **args);
@ -199,25 +197,27 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
* @see ks_dht_message_init
* @see ks_dht_message_response
*/
KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht,
KS_DECLARE(ks_status_t) ks_dht_response_setup(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
const ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length,
ks_dht_message_t **message,
struct bencode **args);
KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length,
long long errorcode,
const char *errorstr);
KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr);
KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid);
KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid);
KS_DECLARE(ks_status_t) ks_dht_error(ks_dht_t *dht,
ks_dht_endpoint_t *ep,
const ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length,
long long errorcode,
const char *errorstr);
KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht);
KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job);
KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job);
KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job);
KS_DECLARE(void *)ks_dht_process(ks_thread_t *thread, void *data);
@ -226,16 +226,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t *job);
KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_job_t *job);
KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t *job);
KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message);
KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t *job);
/**
@ -248,6 +248,20 @@ KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram,
const ks_sockaddr_t *raddr);
KS_DECLARE(void) ks_dht_datagram_destroy(ks_dht_datagram_t **datagram);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job,
ks_pool_t *pool,
const ks_sockaddr_t *raddr,
int32_t attempts);
KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback);
KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback,
ks_dht_nodeid_t *target);
KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job);
/**
*
@ -292,9 +306,9 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item);
*/
KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transaction,
ks_pool_t *pool,
ks_sockaddr_t *raddr,
ks_dht_job_t *job,
uint32_t transactionid,
ks_dht_message_callback_t callback);
ks_dht_job_callback_t callback);
KS_DECLARE(void) ks_dht_transaction_destroy(ks_dht_transaction_t **transaction);
KS_END_EXTERN_C

File diff suppressed because it is too large Load Diff

View File

@ -42,6 +42,7 @@ KS_BEGIN_EXTERN_C
typedef struct ks_dht_s ks_dht_t;
typedef struct ks_dht_datagram_s ks_dht_datagram_t;
typedef struct ks_dht_job_s ks_dht_job_t;
typedef struct ks_dht_nodeid_s ks_dht_nodeid_t;
typedef struct ks_dht_token_s ks_dht_token_t;
typedef struct ks_dht_storageitem_key_s ks_dht_storageitem_key_t;
@ -57,6 +58,7 @@ typedef struct ks_dhtrt_querynodes_s ks_dhtrt_querynodes_t;
typedef struct ks_dht_storageitem_s ks_dht_storageitem_t;
typedef ks_status_t (*ks_dht_job_callback_t)(ks_dht_t *dht, ks_dht_job_t *job);
typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message);
typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search);
@ -90,6 +92,41 @@ struct ks_dht_node_s {
ks_rwl_t *reflock;
};
enum ks_dht_job_state_t {
KS_DHT_JOB_STATE_QUERYING,
KS_DHT_JOB_STATE_RESPONDING,
KS_DHT_JOB_STATE_EXPIRING,
KS_DHT_JOB_STATE_PROCESSING,
KS_DHT_JOB_STATE_COMPLETING,
};
//enum ks_dht_job_type_t {
// KS_DHT_JOB_TYPE_NONE = 0,
// KS_DHT_JOB_TYPE_PING,
// KS_DHT_JOB_TYPE_FINDNODE,
//};
struct ks_dht_job_s {
ks_pool_t *pool;
ks_dht_t *dht;
ks_dht_job_t *next;
enum ks_dht_job_state_t state;
ks_sockaddr_t raddr; // will obtain local endpoint node id when creating message using raddr
int32_t attempts;
//enum ks_dht_job_type_t type;
ks_dht_job_callback_t query_callback;
ks_dht_job_callback_t finish_callback;
ks_dht_message_t *response;
//ks_dht_nodeid_t response_id;
// job specific query parameters
ks_dht_nodeid_t target;
};
struct ks_dhtrt_routetable_s {
void* internal;
ks_pool_t* pool;
@ -127,6 +164,7 @@ struct ks_dht_message_s {
ks_dht_transaction_t *transaction;
char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE];
struct bencode *args;
ks_dht_nodeid_t args_id;
};
struct ks_dht_endpoint_s {
@ -141,10 +179,10 @@ struct ks_dht_endpoint_s {
struct ks_dht_transaction_s {
ks_pool_t *pool;
ks_sockaddr_t raddr;
ks_dht_job_t *job;
uint32_t transactionid;
ks_dht_nodeid_t target;
ks_dht_message_callback_t callback;
ks_dht_nodeid_t target; // @todo look at moving this into job now
ks_dht_job_callback_t callback;
ks_time_t expiration;
ks_bool_t finished;
};
@ -209,6 +247,10 @@ struct ks_dht_s {
uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error
ks_size_t recv_buffer_length;
ks_mutex_t *jobs_mutex;
ks_dht_job_t *jobs_first;
ks_dht_job_t *jobs_last;
ks_mutex_t *tid_mutex;
volatile uint32_t transactionid_next;
ks_hash_t *transactions_hash;
@ -222,7 +264,8 @@ struct ks_dht_s {
volatile uint32_t token_secret_current;
volatile uint32_t token_secret_previous;
ks_time_t token_secret_expiration;
ks_hash_t *storage_hash;
ks_hash_t *storageitems_hash;
};
/**
@ -307,6 +350,9 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
*/
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback);
KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target);
/**
* Create a network search of the closest nodes to a target.
* @param dht pointer to the dht instance
@ -327,13 +373,15 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
ks_dht_search_callback_t callback,
ks_dht_search_t **search);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
ks_pool_t *pool,
ks_dht_endpoint_t *endpoint,
ks_sockaddr_t *raddr,
const ks_sockaddr_t *raddr,
ks_bool_t alloc_data);
/**
*
@ -345,14 +393,6 @@ KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message);
*/
KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
uint32_t transactionid,
const char *query,
struct bencode **args);
/**
*
*/
@ -361,14 +401,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
ks_size_t transactionid_length,
struct bencode **args);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
uint8_t *transactionid,
ks_size_t transactionid_length,
struct bencode **args);
/**
* route table methods

View File

@ -0,0 +1,80 @@
#include "ks_dht.h"
#include "ks_dht-int.h"
KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job,
ks_pool_t *pool,
const ks_sockaddr_t *raddr,
int32_t attempts)
{
ks_dht_job_t *j;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(job);
ks_assert(pool);
//ks_assert(dht);
ks_assert(raddr);
ks_assert(attempts > 0 && attempts <= 10);
*job = j = ks_pool_alloc(pool, sizeof(ks_dht_job_t));
ks_assert(j);
j->pool = pool;
j->state = KS_DHT_JOB_STATE_QUERYING;
j->raddr = *raddr;
j->attempts = attempts;
//ks_mutex_lock(dht->jobs_mutex);
//if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = j;
//else dht->jobs_first = dht->jobs_last = j;
//ks_mutex_unlock(dht->jobs_mutex);
// done:
if (ret != KS_STATUS_SUCCESS) {
if (j) ks_dht_job_destroy(job);
}
return ret;
}
KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback)
{
ks_assert(job);
job->query_callback = query_callback;
job->finish_callback = finish_callback;
}
KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback,
ks_dht_nodeid_t *target)
{
ks_assert(job);
ks_assert(target);
job->query_callback = query_callback;
job->finish_callback = finish_callback;
job->target = *target;
}
KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
{
ks_dht_job_t *j;
ks_assert(job);
ks_assert(*job);
j = *job;
ks_pool_free(j->pool, job);
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@ -4,7 +4,7 @@
KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
ks_pool_t *pool,
ks_dht_endpoint_t *endpoint,
ks_sockaddr_t *raddr,
const ks_sockaddr_t *raddr,
ks_bool_t alloc_data)
{
ks_dht_message_t *m;
@ -110,33 +110,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
uint32_t transactionid,
const char *query,
struct bencode **args)
{
struct bencode *a;
uint32_t tid;
ks_assert(message);
ks_assert(query);
tid = htonl(transactionid);
ben_dict_set(message->data, ben_blob("t", 1), ben_blob((uint8_t *)&tid, sizeof(uint32_t)));
ben_dict_set(message->data, ben_blob("y", 1), ben_blob("q", 1));
ben_dict_set(message->data, ben_blob("q", 1), ben_blob(query, strlen(query)));
// @note r joins message->data and will be freed with it
a = ben_dict();
ks_assert(a);
ben_dict_set(message->data, ben_blob("a", 1), a);
if (args) *args = a;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
uint8_t *transactionid,
ks_size_t transactionid_length,
@ -160,33 +133,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
uint8_t *transactionid,
ks_size_t transactionid_length,
struct bencode **args)
{
struct bencode *e;
ks_assert(message);
ks_assert(transactionid);
ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
ben_dict_set(message->data, ben_blob("y", 1), ben_blob("e", 1));
// @note r joins message->data and will be freed with it
e = ben_list();
ks_assert(e);
ben_dict_set(message->data, ben_blob("e", 1), e);
if (args) *args = e;
return KS_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -3,22 +3,22 @@
KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transaction,
ks_pool_t *pool,
ks_sockaddr_t *raddr,
ks_dht_job_t *job,
uint32_t transactionid,
ks_dht_message_callback_t callback)
ks_dht_job_callback_t callback)
{
ks_dht_transaction_t *t;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(transaction);
ks_assert(pool);
ks_assert(raddr);
ks_assert(job);
*transaction = t = ks_pool_alloc(pool, sizeof(ks_dht_transaction_t));
ks_assert(t);
t->pool = pool;
t->raddr = *raddr;
t->job = job;
t->transactionid = transactionid;
t->callback = callback;
t->expiration = ks_time_now() + (KS_DHT_TRANSACTION_EXPIRATION * 1000);

View File

@ -10,7 +10,7 @@ ks_status_t dht_z_callback(ks_dht_t *dht, ks_dht_message_t *message)
{
diag("dht_z_callback\n");
ok(message->transactionid[0] == '4' && message->transactionid[1] == '2');
ks_dht_send_error(dht, message->endpoint, &message->raddr, message->transactionid, message->transactionid_length, 201, "Generic test error");
ks_dht_error(dht, message->endpoint, &message->raddr, message->transactionid, message->transactionid_length, 201, "Generic test error");
return KS_STATUS_SUCCESS;
}
@ -135,18 +135,21 @@ int main() {
diag("Ping test\n");
ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1
//ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1
ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING)
ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1
ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING)
ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet
ks_dht_pulse(dht2, 100); // Receive and process ping response from dht1
ks_dht_pulse(dht2, 100); // Receive and process ping response from dht1 (PROCESSING then COMPLETING)
ok(ks_dhtrt_find_node(dht2->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good
ks_dht_pulse(dht2, 100); // (COMPLETING)
diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up
for (int i = 0; i < 10; ++i) {
//diag("DHT 1\n");
@ -160,7 +163,8 @@ int main() {
diag("Find_Node test\n");
ks_dht_send_findnode(dht3, ep3, &raddr1, &ep2->nodeid); // Queue findnode from dht3 to dht1
//ks_dht_send_findnode(dht3, ep3, &raddr1, &ep2->nodeid); // Queue findnode from dht3 to dht1
ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid);
ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1