From 41731d553abd96915056a3fb4e48d33efc951756 Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Sun, 18 Dec 2016 21:15:47 +0000 Subject: [PATCH] FS-9775: Overhauled query/response handling by implementing a reusable job system to handle the common plumbing --- libs/libks/Makefile.am | 2 +- libs/libks/src/dht/ks_dht-int.h | 68 +-- libs/libks/src/dht/ks_dht.c | 571 ++++++++++++++---------- libs/libks/src/dht/ks_dht.h | 74 ++- libs/libks/src/dht/ks_dht_job.c | 80 ++++ libs/libks/src/dht/ks_dht_message.c | 56 +-- libs/libks/src/dht/ks_dht_transaction.c | 8 +- libs/libks/test/testdht2.c | 14 +- 8 files changed, 525 insertions(+), 348 deletions(-) create mode 100644 libs/libks/src/dht/ks_dht_job.c diff --git a/libs/libks/Makefile.am b/libs/libks/Makefile.am index 64f366a26e..ce21d3d898 100644 --- a/libs/libks/Makefile.am +++ b/libs/libks/Makefile.am @@ -14,7 +14,7 @@ libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_datagram.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c -libks_la_SOURCES += src/dht/ks_dht_search.c src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c +libks_la_SOURCES += src/dht/ks_dht_job.c src/dht/ks_dht_search.c src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c #aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 24bec0c531..166af867c1 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -19,7 +19,7 @@ KS_BEGIN_EXTERN_C * @see ks_addr_set * @see ks_dht_bind */ -KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint); +KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint); /** * Called internally to expire various data. @@ -131,7 +131,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const * @param token pointer to the output token being generated * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL */ -KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token); +KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token); /** * Verify an opaque write token matches the provided remote address and target nodeid. @@ -142,7 +142,7 @@ KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *ra * @param token pointer to the input token being compared * @return Either KS_TRUE if verification passes, otherwise KS_FALSE */ -KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token); +KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token); /** * Encodes a message for transmission as a UDP datagram and sends it. @@ -158,8 +158,7 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message); * Sets up the common parts of a query message. * Determines the local endpoint aware of autorouting, assigns the remote address, generates a transaction, and queues a callback. * @param dht pointer to the dht instance - * @param ep pointer to the endpoint, may be NULL to find an endpoint or autoroute one - * @param raddr pointer to the remote address + * @param job pointer to the job * @param query string value of the query type, for example "ping" * @param callback callback to be called when response to transaction is received * @param transaction dereferenced out pointer to the allocated transaction, may be NULL to ignore output @@ -174,11 +173,10 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message); * @see ks_dht_message_query * @see ks_hash_insert */ -KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, - ks_dht_endpoint_t *ep, - ks_sockaddr_t *raddr, +KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht, + ks_dht_job_t *job, const char *query, - ks_dht_message_callback_t callback, + ks_dht_job_callback_t callback, ks_dht_transaction_t **transaction, ks_dht_message_t **message, struct bencode **args); @@ -199,25 +197,27 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, * @see ks_dht_message_init * @see ks_dht_message_response */ -KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, +KS_DECLARE(ks_status_t) ks_dht_response_setup(ks_dht_t *dht, ks_dht_endpoint_t *ep, - ks_sockaddr_t *raddr, + const ks_sockaddr_t *raddr, uint8_t *transactionid, ks_size_t transactionid_length, ks_dht_message_t **message, struct bencode **args); -KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, - ks_dht_endpoint_t *ep, - ks_sockaddr_t *raddr, - uint8_t *transactionid, - ks_size_t transactionid_length, - long long errorcode, - const char *errorstr); -KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr); -KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid); -KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid); +KS_DECLARE(ks_status_t) ks_dht_error(ks_dht_t *dht, + ks_dht_endpoint_t *ep, + const ks_sockaddr_t *raddr, + uint8_t *transactionid, + ks_size_t transactionid_length, + long long errorcode, + const char *errorstr); + +KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht); +KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job); +KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job); +KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job); KS_DECLARE(void *)ks_dht_process(ks_thread_t *thread, void *data); @@ -226,16 +226,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message); -KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message); +KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t *job); KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message); -KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message); +KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_job_t *job); KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message); -KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message); +KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t *job); KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message); -KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message); +KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t *job); /** @@ -248,6 +248,20 @@ KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram, const ks_sockaddr_t *raddr); KS_DECLARE(void) ks_dht_datagram_destroy(ks_dht_datagram_t **datagram); +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job, + ks_pool_t *pool, + const ks_sockaddr_t *raddr, + int32_t attempts); +KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback); +KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job, + ks_dht_job_callback_t query_callback, + ks_dht_job_callback_t finish_callback, + ks_dht_nodeid_t *target); +KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job); + /** * @@ -292,9 +306,9 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item); */ KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transaction, ks_pool_t *pool, - ks_sockaddr_t *raddr, + ks_dht_job_t *job, uint32_t transactionid, - ks_dht_message_callback_t callback); + ks_dht_job_callback_t callback); KS_DECLARE(void) ks_dht_transaction_destroy(ks_dht_transaction_t **transaction); KS_END_EXTERN_C diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index aaaef29fc5..6c73318873 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -120,6 +120,15 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread */ d->recv_buffer_length = 0; + /** + * Initialize the jobs mutex + */ + ks_mutex_create(&d->jobs_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool); + ks_assert(d->jobs_mutex); + + d->jobs_first = NULL; + d->jobs_last = NULL; + /** * Initialize the transaction id mutex, should use atomic increment instead */ @@ -167,13 +176,13 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Create the hash to store arbitrary data for BEP44. */ - ks_hash_create(&d->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); - ks_assert(d->storage_hash); + ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_assert(d->storageitems_hash); /** - * The storage hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's. + * The storageitems hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's. */ - ks_hash_set_keysize(d->storage_hash, KS_DHT_NODEID_SIZE); + ks_hash_set_keysize(d->storageitems_hash, KS_DHT_NODEID_SIZE); // done: if (ret != KS_STATUS_SUCCESS) { @@ -198,17 +207,17 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) d = *dht; /** - * Cleanup the storage hash and it's contents if it is allocated. + * Cleanup the storageitems hash and it's contents if it is allocated. */ - if (d->storage_hash) { - for (it = ks_hash_first(d->storage_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + if (d->storageitems_hash) { + for (it = ks_hash_first(d->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { ks_dht_storageitem_t *val; ks_hash_this_val(it, (void **)&val); ks_dht_storageitem_destroy(&val); } - ks_hash_destroy(&d->storage_hash); + ks_hash_destroy(&d->storageitems_hash); } /** @@ -254,6 +263,15 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) if (d->tid_mutex) ks_mutex_destroy(&d->tid_mutex); if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash); + /** + * Cleanup the jobs mutex and jobs if they are allocated. + */ + for (ks_dht_job_t *job = d->jobs_first, *jobn = NULL; job; job = jobn) { + jobn = job->next; + ks_dht_job_destroy(&job); + } + if (d->jobs_mutex) ks_mutex_destroy(&d->jobs_mutex); + /** * Probably don't need this, recv_buffer_length is temporary and may change */ @@ -363,7 +381,7 @@ KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t dht->autoroute_port = port; } -KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint) +KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint) { // @todo lookup standard def for IPV6 max size char ip[48 + 1]; @@ -594,6 +612,8 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) } } + ks_dht_pulse_jobs(dht); + ks_dht_pulse_send(dht); ks_dht_pulse_expirations(dht); @@ -669,6 +689,8 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) ks_hash_this(it, &key, NULL, (void **)&value); if (value->finished) remove = KS_TRUE; else if (value->expiration <= now) { + // if the transaction expires, so does the attached job, it may try again with a new transaction + value->job->state = KS_DHT_JOB_STATE_EXPIRING; ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid); remove = KS_TRUE; } @@ -910,7 +932,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const } -KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token) +KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token) { SHA_CTX sha; uint16_t port = 0; @@ -933,7 +955,7 @@ KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *ra return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token) +KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token) { ks_dht_token_t tok; @@ -949,8 +971,7 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, k KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) { - // @todo calculate max IPV6 payload size? - char buf[1001]; + char buf[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; ks_size_t buf_len; ks_assert(dht); @@ -973,22 +994,23 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) } -KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, - ks_dht_endpoint_t *ep, - ks_sockaddr_t *raddr, +KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht, + ks_dht_job_t *job, const char *query, - ks_dht_message_callback_t callback, + ks_dht_job_callback_t callback, ks_dht_transaction_t **transaction, ks_dht_message_t **message, struct bencode **args) { + ks_dht_endpoint_t *ep = NULL; uint32_t transactionid; ks_dht_transaction_t *trans = NULL; ks_dht_message_t *msg = NULL; - ks_status_t ret = KS_STATUS_FAIL; + struct bencode *a = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); - ks_assert(raddr); + ks_assert(job); ks_assert(query); ks_assert(callback); ks_assert(message); @@ -996,18 +1018,32 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, if (transaction) *transaction = NULL; *message = NULL; - if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret; + if ((ret = ks_dht_autoroute_check(dht, &job->raddr, &ep)) != KS_STATUS_SUCCESS) goto done; // @todo atomic increment ks_mutex_lock(dht->tid_mutex); transactionid = dht->transactionid_next++; ks_mutex_unlock(dht->tid_mutex); - if ((ret = ks_dht_transaction_create(&trans, dht->pool, raddr, transactionid, callback)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done; + // if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done; + transactionid = htonl(transactionid); + + ben_dict_set(msg->data, ben_blob("t", 1), ben_blob((uint8_t *)&transactionid, sizeof(uint32_t))); + ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("q", 1)); + ben_dict_set(msg->data, ben_blob("q", 1), ben_blob(query, strlen(query))); + + // @note a joins msg->data and will be freed with it + a = ben_dict(); + ks_assert(a); + ben_dict_set(msg->data, ben_blob("a", 1), a); + + if (args) *args = a; + + ben_dict_set(a, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE)); *message = msg; @@ -1015,8 +1051,6 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, if (transaction) *transaction = trans; - ret = KS_STATUS_SUCCESS; - done: if (ret != KS_STATUS_SUCCESS) { if (trans) ks_dht_transaction_destroy(&trans); @@ -1026,16 +1060,17 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, return ret; } -KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, +KS_DECLARE(ks_status_t) ks_dht_response_setup(ks_dht_t *dht, ks_dht_endpoint_t *ep, - ks_sockaddr_t *raddr, + const ks_sockaddr_t *raddr, uint8_t *transactionid, ks_size_t transactionid_length, ks_dht_message_t **message, struct bencode **args) { ks_dht_message_t *msg = NULL; - ks_status_t ret = KS_STATUS_FAIL; + struct bencode *r = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(raddr); @@ -1048,12 +1083,21 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done; + //if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done; + ben_dict_set(msg->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); + ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("r", 1)); + + // @note r joins msg->data and will be freed with it + r = ben_dict(); + ks_assert(r); + ben_dict_set(msg->data, ben_blob("r", 1), r); + + if (args) *args = r; + + ben_dict_set(r, ben_blob("id", 2), ben_blob(ep->nodeid.id, KS_DHT_NODEID_SIZE)); *message = msg; - ret = KS_STATUS_SUCCESS; - done: if (ret != KS_STATUS_SUCCESS) { if (msg) ks_dht_message_destroy(&msg); @@ -1103,14 +1147,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me struct bencode *a; const char *qv; ks_size_t qv_len; + ks_dht_nodeid_t *id; + ks_dht_node_t *node; char query[KS_DHT_MESSAGE_QUERY_MAX_SIZE]; ks_dht_message_callback_t callback; - ks_status_t ret = KS_STATUS_FAIL; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); - // @todo start of ks_dht_message_parse_query q = ben_dict_get_by_str(message->data, "q"); if (!q) { ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n"); @@ -1121,7 +1167,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me qv_len = ben_str_len(q); if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) { ks_log(KS_LOG_DEBUG, "Message query 'q' value has an unexpectedly large size of %d\n", qv_len); - return KS_STATUS_FAIL; + ret = KS_STATUS_FAIL; + goto done; } memcpy(query, qv, qv_len); @@ -1131,42 +1178,73 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me a = ben_dict_get_by_str(message->data, "a"); if (!a) { ks_log(KS_LOG_DEBUG, "Message query missing required key 'a'\n"); - return KS_STATUS_FAIL; + ret = KS_STATUS_FAIL; + goto done; } - // @todo end of ks_dht_message_parse_query message->args = a; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; + message->args_id = *id; + + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); + if ((ret = ks_dhtrt_create_node(message->endpoint->node->table, + *id, + KS_DHT_REMOTE, + message->raddr.host, + message->raddr.port, + &node)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; + callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_READLOCKED); ks_hash_read_unlock(dht->registry_query); if (!callback) ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query); else ret = callback(dht, message); + done: return ret; } KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t *message) { struct bencode *r; + ks_dht_nodeid_t *id; + ks_dht_node_t *node; ks_dht_transaction_t *transaction; uint32_t *tid; uint32_t transactionid; - ks_status_t ret = KS_STATUS_FAIL; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); - // @todo start of ks_dht_message_parse_response r = ben_dict_get_by_str(message->data, "r"); if (!r) { ks_log(KS_LOG_DEBUG, "Message response missing required key 'r'\n"); - return KS_STATUS_FAIL; + ret = KS_STATUS_FAIL; + goto done; } - // @todo end of ks_dht_message_parse_response - + message->args = r; + + if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; + message->args_id = *id; + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); + if ((ret = ks_dhtrt_create_node(message->endpoint->node->table, + *id, + KS_DHT_REMOTE, + message->raddr.host, + message->raddr.port, + &node)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; + + ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); + if ((ret = ks_dhtrt_touch_node(message->endpoint->node->table, *id)) != KS_STATUS_SUCCESS) goto done; + + tid = (uint32_t *)message->transactionid; transactionid = ntohl(*tid); @@ -1174,19 +1252,22 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t ks_hash_read_unlock(dht->transactions_hash); if (!transaction) ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid); - else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) { + else if (!ks_addr_cmp(&message->raddr, &transaction->job->raddr)) { ks_log(KS_LOG_DEBUG, "Message response rejected due to spoofing from %s %d, expected %s %d\n", message->raddr.host, message->raddr.port, - transaction->raddr.host, - transaction->raddr.port); + transaction->job->raddr.host, + transaction->job->raddr.port); } else { + transaction->job->response = message; + transaction->job->state = KS_DHT_JOB_STATE_PROCESSING; message->transaction = transaction; - ret = transaction->callback(dht, message); + if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING; transaction->finished = KS_TRUE; } + done: return ret; } @@ -1286,9 +1367,9 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_search_pending_destroy(&pending); goto done; } - if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_findnode(dht, &n->addr, NULL, target)) != KS_STATUS_SUCCESS) goto done; } - // @todo release closest local query node locks + ks_dhtrt_release_querynodes(&query); ks_mutex_unlock(s->mutex); locked_search = KS_FALSE; @@ -1305,13 +1386,13 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, } -KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, - ks_dht_endpoint_t *ep, - ks_sockaddr_t *raddr, - uint8_t *transactionid, - ks_size_t transactionid_length, - long long errorcode, - const char *errorstr) +KS_DECLARE(ks_status_t) ks_dht_error(ks_dht_t *dht, + ks_dht_endpoint_t *ep, + const ks_sockaddr_t *raddr, + uint8_t *transactionid, + ks_size_t transactionid_length, + long long errorcode, + const char *errorstr) { ks_dht_message_t *error = NULL; struct bencode *e = NULL; @@ -1326,7 +1407,15 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, if ((ret = ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done; + //if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done; + ben_dict_set(error->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); + ben_dict_set(error->data, ben_blob("y", 1), ben_blob("e", 1)); + + // @note e joins error->data and will be freed with it + e = ben_list(); + ks_assert(e); + ben_dict_set(error->data, ben_blob("e", 1), e); + ben_list_append(e, ben_int(errorcode)); ben_list_append(e, ben_blob(errorstr, strlen(errorstr))); @@ -1357,7 +1446,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me ks_assert(dht); ks_assert(message); - // @todo start of ks_dht_message_parse_error e = ben_dict_get_by_str(message->data, "e"); if (!e) { ks_log(KS_LOG_DEBUG, "Message error missing required key 'e'\n"); @@ -1376,7 +1464,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me memcpy(error, et, es_len); error[es_len] = '\0'; - // @todo end of ks_dht_message_parse_error message->args = e; @@ -1392,13 +1479,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me goto done; } - if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) { + if (!ks_addr_cmp(&message->raddr, &transaction->job->raddr)) { ks_log(KS_LOG_DEBUG, "Message error rejected due to spoofing from %s %d, expected %s %d\n", message->raddr.host, message->raddr.port, - transaction->raddr.host, - transaction->raddr.port); + transaction->job->raddr.host, + transaction->job->raddr.port); ret = KS_STATUS_FAIL; goto done; } @@ -1415,26 +1502,92 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me return ret; } - -KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr) +KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job) { - ks_dht_message_t *message = NULL; - struct bencode *a = NULL; + ks_assert(dht); + ks_assert(job); + + ks_mutex_lock(dht->jobs_mutex); + if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = job; + else dht->jobs_first = dht->jobs_last = job; + ks_mutex_unlock(dht->jobs_mutex); +} + +KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht) +{ + ks_assert(dht); + + ks_mutex_lock(dht->jobs_mutex); + for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) { + ks_bool_t remove = KS_FALSE; + jobn = job->next; + switch (job->state) { + case KS_DHT_JOB_STATE_QUERYING: + if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING; + break; + case KS_DHT_JOB_STATE_RESPONDING: + break; + case KS_DHT_JOB_STATE_EXPIRING: + job->attempts--; + if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING; + else { + if (job->finish_callback) job->finish_callback(dht, job); + remove = KS_TRUE; + } + break; + case KS_DHT_JOB_STATE_PROCESSING: + break; + case KS_DHT_JOB_STATE_COMPLETING: + if (job->finish_callback) job->finish_callback(dht, job); + remove = KS_TRUE; + break; + default: break; + } + + if (remove) { + if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL; + else if (!jobp) dht->jobs_first = jobn; + else if (!jobn) dht->jobs_last = jobp; + else jobp->next = jobn; + ks_dht_job_destroy(&job); + } else jobp = job; + } + ks_mutex_unlock(dht->jobs_mutex); +} + +KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback) +{ + ks_dht_job_t *job = NULL; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(raddr); - if ((ret = ks_dht_setup_query(dht, - ep, - raddr, + if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done; + ks_dht_job_build_ping(job, ks_dht_query_ping, callback); + ks_dht_jobs_add(dht, job); + + // next step in ks_dht_pulse_jobs with QUERYING state + + done: + return ret; +} + +KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job) +{ + ks_dht_message_t *message = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(dht); + ks_assert(job); + + if ((ret = ks_dht_query_setup(dht, + job, "ping", ks_dht_process_response_ping, NULL, &message, - &a)) != KS_STATUS_SUCCESS) goto done; - - ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); + NULL)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Sending message query ping\n"); ks_q_push(dht->send_q, (void *)message); @@ -1445,37 +1598,22 @@ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, k KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message) { - ks_dht_nodeid_t *id; ks_dht_message_t *response = NULL; - struct bencode *r = NULL; - ks_dhtrt_routetable_t *routetable = NULL; - ks_dht_node_t *node = NULL; - char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); ks_assert(message->args); - if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; - - routetable = message->endpoint->node->table; - - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; - ks_log(KS_LOG_DEBUG, "Message query ping is valid\n"); - if ((ret = ks_dht_setup_response(dht, + if ((ret = ks_dht_response_setup(dht, message->endpoint, &message->raddr, message->transactionid, message->transactionid_length, &response, - &r)) != KS_STATUS_SUCCESS) goto done; - - ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); + NULL)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Sending message response ping\n"); ks_q_push(dht->send_q, (void *)response); @@ -1484,36 +1622,42 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ return ret; } -KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message) +KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t *job) { - ks_dht_nodeid_t *id; - ks_dhtrt_routetable_t *routetable = NULL; - ks_dht_node_t *node = NULL; - char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); - ks_assert(message); - - if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; - - routetable = message->endpoint->node->table; - - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; - - ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); - if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done; + ks_assert(job); ks_log(KS_LOG_DEBUG, "Message response ping is reached\n"); + job->state = KS_DHT_JOB_STATE_COMPLETING; + + // done: + return ret; +} + + +KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target) +{ + ks_dht_job_t *job = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(target); + + if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done; + ks_dht_job_build_findnode(job, ks_dht_query_findnode, callback, target); + ks_dht_jobs_add(dht, job); + + // next step in ks_dht_pulse_jobs with QUERYING state + done: return ret; } - -KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid) +KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job) { ks_dht_transaction_t *transaction = NULL; ks_dht_message_t *message = NULL; @@ -1521,24 +1665,22 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); - ks_assert(raddr); - ks_assert(targetid); + ks_assert(job); - if ((ret = ks_dht_setup_query(dht, - ep, - raddr, + if ((ret = ks_dht_query_setup(dht, + job, "find_node", ks_dht_process_response_findnode, &transaction, &message, &a)) != KS_STATUS_SUCCESS) goto done; - memcpy(transaction->target.id, targetid->id, KS_DHT_NODEID_SIZE); + //memcpy(transaction->target.id, job->target.id, KS_DHT_NODEID_SIZE); + transaction->target = job->target; - ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); - ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE)); + ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE)); // Only request both v4 and v6 if we have both interfaces bound and are looking for our own node id, aka bootstrapping - if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, targetid->id, KS_DHT_NODEID_SIZE)) { + if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, job->target.id, KS_DHT_NODEID_SIZE)) { struct bencode *want = ben_list(); ben_list_append_str(want, "n4"); ben_list_append_str(want, "n6"); @@ -1554,7 +1696,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message) { - ks_dht_nodeid_t *id; ks_dht_nodeid_t *target; struct bencode *want; ks_bool_t want4 = KS_FALSE; @@ -1565,8 +1706,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess uint8_t buffer6[1000]; ks_size_t buffer4_length = 0; ks_size_t buffer6_length = 0; - ks_dhtrt_routetable_t *routetable = NULL; - ks_dht_node_t *node = NULL; ks_dhtrt_querynodes_t query; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_status_t ret = KS_STATUS_SUCCESS; @@ -1575,8 +1714,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_assert(message); ks_assert(message->args); - if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done; want = ben_dict_get_by_str(message->args, "want"); @@ -1595,12 +1732,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess want6 = message->raddr.family == AF_INET6; } - routetable = message->endpoint->node->table; - - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; - ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n"); @@ -1622,7 +1753,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); } - // @todo release query nodes + ks_dhtrt_release_querynodes(&query); } if (want6) { query.family = AF_INET6; @@ -1639,9 +1770,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); } + ks_dhtrt_release_querynodes(&query); } - if ((ret = ks_dht_setup_response(dht, + if ((ret = ks_dht_response_setup(dht, message->endpoint, &message->raddr, message->transactionid, @@ -1649,7 +1781,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess &response, &r)) != KS_STATUS_SUCCESS) goto done; - ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length)); if (want6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length)); @@ -1660,9 +1791,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess return ret; } -KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message) +KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_job_t *job) { - ks_dht_nodeid_t *id; struct bencode *n; //ks_bool_t n4 = KS_FALSE; //ks_bool_t n6 = KS_FALSE; @@ -1672,50 +1802,37 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m size_t nodes6_size = 0; size_t nodes_len = 0; size_t nodes6_len = 0; - ks_dhtrt_routetable_t *routetable = NULL; - ks_dht_node_t *node = NULL; - char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_hash_t *searches = NULL; ks_dht_search_t *search = NULL; + ks_dht_node_t *node = NULL; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); - ks_assert(message); - ks_assert(message->transaction); + ks_assert(job); - if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; - - n = ben_dict_get_by_str(message->args, "nodes"); + n = ben_dict_get_by_str(job->response->args, "nodes"); if (n) { //n4 = KS_TRUE; nodes = (const uint8_t *)ben_str_val(n); nodes_size = ben_str_len(n); } - n = ben_dict_get_by_str(message->args, "nodes6"); + n = ben_dict_get_by_str(job->response->args, "nodes6"); if (n) { //n6 = KS_TRUE; nodes6 = (const uint8_t *)ben_str_val(n); nodes6_size = ben_str_len(n); } - routetable = message->endpoint->node->table; - - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; - - ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); - if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done; - - searches = message->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash; + searches = job->response->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash; ks_hash_read_lock(searches); - search = ks_hash_search(searches, message->transaction->target.id, KS_UNLOCKED); + search = ks_hash_search(searches, job->response->transaction->target.id, KS_UNLOCKED); if (search) { ks_dht_search_pending_t *pending = NULL; ks_mutex_lock(search->mutex); - pending = ks_hash_search(search->pending, id->id, KS_UNLOCKED); + pending = ks_hash_search(search->pending, job->response->args_id.id, KS_UNLOCKED); if (pending) pending->finished = KS_TRUE; } ks_hash_read_unlock(searches); @@ -1737,7 +1854,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); ks_dhtrt_release_node(node); - if (search && message->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) { + if (search && job->response->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) { ks_dht_nodeid_t distance; int32_t results_index = -1; @@ -1776,7 +1893,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_dht_search_pending_destroy(&pending); goto done; } - if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_findnode(dht, &addr, NULL, &search->target)) != KS_STATUS_SUCCESS) goto done; } } } @@ -1798,7 +1915,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); ks_dhtrt_release_node(node); - if (search && message->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) { + if (search && job->response->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) { ks_dht_nodeid_t distance; int32_t results_index = -1; @@ -1837,7 +1954,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_dht_search_pending_destroy(&pending); goto done; } - if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_findnode(dht, &addr, NULL, &search->target)) != KS_STATUS_SUCCESS) goto done; } } } @@ -1849,28 +1966,26 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m return ret; } +// @todo ks_dht_get -KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid) +KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job) { ks_dht_message_t *message = NULL; struct bencode *a = NULL; ks_assert(dht); - ks_assert(raddr); - ks_assert(targetid); + ks_assert(job); - if (ks_dht_setup_query(dht, - ep, - raddr, + if (ks_dht_query_setup(dht, + job, "get", ks_dht_process_response_get, NULL, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available - ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE)); + ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE)); ks_log(KS_LOG_DEBUG, "Sending message query get\n"); ks_q_push(dht->send_q, (void *)message); @@ -1880,7 +1995,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message) { - ks_dht_nodeid_t *id; ks_dht_nodeid_t *target; struct bencode *seq; int64_t sequence = -1; @@ -1889,8 +2003,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ks_dht_storageitem_t *item = NULL; ks_dht_message_t *response = NULL; struct bencode *r = NULL; - ks_dhtrt_routetable_t *routetable = NULL; - ks_dht_node_t *node = NULL; + ks_dhtrt_querynodes_t query; + uint8_t buffer4[1000]; + uint8_t buffer6[1000]; + ks_size_t buffer4_length = 0; + ks_size_t buffer6_length = 0; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_status_t ret = KS_STATUS_SUCCESS; @@ -1898,34 +2015,61 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ks_assert(message); ks_assert(message->args); - if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done; seq = ben_dict_get_by_str(message->args, "seq"); if (seq) sequence = ben_int_val(seq); - routetable = message->endpoint->node->table; - - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; - ks_log(KS_LOG_DEBUG, "Message query get is valid\n"); ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token); - item = ks_hash_search(dht->storage_hash, (void *)target, KS_READLOCKED); - ks_hash_read_unlock(dht->storage_hash); + item = ks_hash_search(dht->storageitems_hash, target->id, KS_READLOCKED); + ks_hash_read_unlock(dht->storageitems_hash); sequence_snuffed = item && sequence >= 0 && item->seq <= sequence; - // @todo if sequence is provided then requester has the data so if the local sequence is lower, maybe create job to update local data from the requester? + // @todo if sequence is provided then requester has the data, so if the local sequence is lower maybe send a get to the requester to update local data? - // @todo find closest ipv4 and ipv6 nodes to target + query.nodeid = *target; + query.type = KS_DHT_REMOTE; + query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE + if (dht->rt_ipv4) { + query.family = AF_INET; - // @todo compact ipv4 and ipv6 nodes into separate buffers + ks_dhtrt_findclosest_nodes(dht->rt_ipv4, &query); + for (int32_t i = 0; i < query.count; ++i) { + ks_dht_node_t *qn = query.nodes[i]; - if ((ret = ks_dht_setup_response(dht, + if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid, + &qn->addr, + buffer4, + &buffer4_length, + sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done; + + ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); + } + ks_dhtrt_release_querynodes(&query); + } + if (dht->rt_ipv6) { + query.family = AF_INET6; + + ks_dhtrt_findclosest_nodes(dht->rt_ipv6, &query); + for (int32_t i = 0; i < query.count; ++i) { + ks_dht_node_t *qn = query.nodes[i]; + + if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid, + &qn->addr, + buffer6, + &buffer6_length, + sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done; + + ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); + } + ks_dhtrt_release_querynodes(&query); + } + + + if ((ret = ks_dht_response_setup(dht, message->endpoint, &message->raddr, message->transactionid, @@ -1933,7 +2077,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t &response, &r)) != KS_STATUS_SUCCESS) goto done; - ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ben_dict_set(r, ben_blob("token", 5), ben_blob(token.token, KS_DHT_TOKEN_SIZE)); if (item) { if (item->mutable) { @@ -1945,7 +2088,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t } if (!sequence_snuffed) ben_dict_set(r, ben_blob("v", 1), ben_clone(item->v)); } - // @todo nodes, nodes6 + if (dht->rt_ipv4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length)); + if (dht->rt_ipv6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length)); ks_log(KS_LOG_DEBUG, "Sending message response get\n"); ks_q_push(dht->send_q, (void *)response); @@ -1954,34 +2098,20 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t return ret; } -KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message) +KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t *job) { - ks_dht_nodeid_t *id; ks_dht_token_t *token; - ks_dhtrt_routetable_t *routetable = NULL; - ks_dht_node_t *node = NULL; - char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); - ks_assert(message); + ks_assert(job); // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided - if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; - - if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done; // @todo add extract function for mutable ks_dht_storageitem_key_t // @todo add extract function for mutable ks_dht_storageitem_signature_t - routetable = message->endpoint->node->table; - - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; - - ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); - if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done; // @todo add/touch bucket entries for other nodes/nodes6 returned ks_log(KS_LOG_DEBUG, "Message response get is reached\n"); @@ -1990,34 +2120,22 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag return ret; } - -// @todo ks_dht_send_put +// @todo ks_dht_put +// @todo ks_dht_query_put KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message) { - ks_dht_nodeid_t *id; ks_dht_message_t *response = NULL; struct bencode *r = NULL; - ks_dhtrt_routetable_t *routetable = NULL; - ks_dht_node_t *node = NULL; - char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(message); ks_assert(message->args); - if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; - - routetable = message->endpoint->node->table; - - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; - ks_log(KS_LOG_DEBUG, "Message query put is valid\n"); - if ((ret = ks_dht_setup_response(dht, + if ((ret = ks_dht_response_setup(dht, message->endpoint, &message->raddr, message->transactionid, @@ -2025,8 +2143,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t &response, &r)) != KS_STATUS_SUCCESS) goto done; - //ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); - ks_log(KS_LOG_DEBUG, "Sending message response put\n"); ks_q_push(dht->send_q, (void *)response); @@ -2034,31 +2150,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t return ret; } -KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message) +KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t *job) { - ks_dht_nodeid_t *id; - ks_dhtrt_routetable_t *routetable = NULL; - ks_dht_node_t *node = NULL; - char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); - ks_assert(message); - - if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; - - routetable = message->endpoint->node->table; - - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); - if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; - - ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); - if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done; + ks_assert(job); ks_log(KS_LOG_DEBUG, "Message response put is reached\n"); - done: + // done: return ret; } diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 51b4f84d9f..1f320b997a 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -42,6 +42,7 @@ KS_BEGIN_EXTERN_C typedef struct ks_dht_s ks_dht_t; typedef struct ks_dht_datagram_s ks_dht_datagram_t; +typedef struct ks_dht_job_s ks_dht_job_t; typedef struct ks_dht_nodeid_s ks_dht_nodeid_t; typedef struct ks_dht_token_s ks_dht_token_t; typedef struct ks_dht_storageitem_key_s ks_dht_storageitem_key_t; @@ -57,6 +58,7 @@ typedef struct ks_dhtrt_querynodes_s ks_dhtrt_querynodes_t; typedef struct ks_dht_storageitem_s ks_dht_storageitem_t; +typedef ks_status_t (*ks_dht_job_callback_t)(ks_dht_t *dht, ks_dht_job_t *job); typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message); typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search); @@ -90,6 +92,41 @@ struct ks_dht_node_s { ks_rwl_t *reflock; }; +enum ks_dht_job_state_t { + KS_DHT_JOB_STATE_QUERYING, + KS_DHT_JOB_STATE_RESPONDING, + KS_DHT_JOB_STATE_EXPIRING, + KS_DHT_JOB_STATE_PROCESSING, + KS_DHT_JOB_STATE_COMPLETING, +}; + +//enum ks_dht_job_type_t { +// KS_DHT_JOB_TYPE_NONE = 0, +// KS_DHT_JOB_TYPE_PING, +// KS_DHT_JOB_TYPE_FINDNODE, +//}; + +struct ks_dht_job_s { + ks_pool_t *pool; + ks_dht_t *dht; + ks_dht_job_t *next; + + enum ks_dht_job_state_t state; + + ks_sockaddr_t raddr; // will obtain local endpoint node id when creating message using raddr + int32_t attempts; + + //enum ks_dht_job_type_t type; + ks_dht_job_callback_t query_callback; + ks_dht_job_callback_t finish_callback; + + ks_dht_message_t *response; + //ks_dht_nodeid_t response_id; + + // job specific query parameters + ks_dht_nodeid_t target; +}; + struct ks_dhtrt_routetable_s { void* internal; ks_pool_t* pool; @@ -127,6 +164,7 @@ struct ks_dht_message_s { ks_dht_transaction_t *transaction; char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE]; struct bencode *args; + ks_dht_nodeid_t args_id; }; struct ks_dht_endpoint_s { @@ -141,10 +179,10 @@ struct ks_dht_endpoint_s { struct ks_dht_transaction_s { ks_pool_t *pool; - ks_sockaddr_t raddr; + ks_dht_job_t *job; uint32_t transactionid; - ks_dht_nodeid_t target; - ks_dht_message_callback_t callback; + ks_dht_nodeid_t target; // @todo look at moving this into job now + ks_dht_job_callback_t callback; ks_time_t expiration; ks_bool_t finished; }; @@ -209,6 +247,10 @@ struct ks_dht_s { uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error ks_size_t recv_buffer_length; + ks_mutex_t *jobs_mutex; + ks_dht_job_t *jobs_first; + ks_dht_job_t *jobs_last; + ks_mutex_t *tid_mutex; volatile uint32_t transactionid_next; ks_hash_t *transactions_hash; @@ -222,7 +264,8 @@ struct ks_dht_s { volatile uint32_t token_secret_current; volatile uint32_t token_secret_previous; ks_time_t token_secret_expiration; - ks_hash_t *storage_hash; + + ks_hash_t *storageitems_hash; }; /** @@ -307,6 +350,9 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid */ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout); +KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback); +KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target); + /** * Create a network search of the closest nodes to a target. * @param dht pointer to the dht instance @@ -327,13 +373,15 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_search_callback_t callback, ks_dht_search_t **search); + + /** * */ KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message, ks_pool_t *pool, ks_dht_endpoint_t *endpoint, - ks_sockaddr_t *raddr, + const ks_sockaddr_t *raddr, ks_bool_t alloc_data); /** * @@ -345,14 +393,6 @@ KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message); */ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length); -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message, - uint32_t transactionid, - const char *query, - struct bencode **args); - /** * */ @@ -361,14 +401,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, ks_size_t transactionid_length, struct bencode **args); -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message, - uint8_t *transactionid, - ks_size_t transactionid_length, - struct bencode **args); - /** * route table methods diff --git a/libs/libks/src/dht/ks_dht_job.c b/libs/libks/src/dht/ks_dht_job.c new file mode 100644 index 0000000000..c99556aa39 --- /dev/null +++ b/libs/libks/src/dht/ks_dht_job.c @@ -0,0 +1,80 @@ +#include "ks_dht.h" +#include "ks_dht-int.h" + +KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job, + ks_pool_t *pool, + const ks_sockaddr_t *raddr, + int32_t attempts) +{ + ks_dht_job_t *j; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(job); + ks_assert(pool); + //ks_assert(dht); + ks_assert(raddr); + ks_assert(attempts > 0 && attempts <= 10); + + *job = j = ks_pool_alloc(pool, sizeof(ks_dht_job_t)); + ks_assert(j); + + j->pool = pool; + j->state = KS_DHT_JOB_STATE_QUERYING; + j->raddr = *raddr; + j->attempts = attempts; + + //ks_mutex_lock(dht->jobs_mutex); + //if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = j; + //else dht->jobs_first = dht->jobs_last = j; + //ks_mutex_unlock(dht->jobs_mutex); + + // done: + if (ret != KS_STATUS_SUCCESS) { + if (j) ks_dht_job_destroy(job); + } + return ret; +} + +KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback) +{ + ks_assert(job); + + job->query_callback = query_callback; + job->finish_callback = finish_callback; +} + +KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job, + ks_dht_job_callback_t query_callback, + ks_dht_job_callback_t finish_callback, + ks_dht_nodeid_t *target) +{ + ks_assert(job); + ks_assert(target); + + job->query_callback = query_callback; + job->finish_callback = finish_callback; + job->target = *target; +} + +KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job) +{ + ks_dht_job_t *j; + + ks_assert(job); + ks_assert(*job); + + j = *job; + + ks_pool_free(j->pool, job); +} + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index 75780f46c5..7997b41c04 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -4,7 +4,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message, ks_pool_t *pool, ks_dht_endpoint_t *endpoint, - ks_sockaddr_t *raddr, + const ks_sockaddr_t *raddr, ks_bool_t alloc_data) { ks_dht_message_t *m; @@ -110,33 +110,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message, - uint32_t transactionid, - const char *query, - struct bencode **args) -{ - struct bencode *a; - uint32_t tid; - - ks_assert(message); - ks_assert(query); - - tid = htonl(transactionid); - - ben_dict_set(message->data, ben_blob("t", 1), ben_blob((uint8_t *)&tid, sizeof(uint32_t))); - ben_dict_set(message->data, ben_blob("y", 1), ben_blob("q", 1)); - ben_dict_set(message->data, ben_blob("q", 1), ben_blob(query, strlen(query))); - - // @note r joins message->data and will be freed with it - a = ben_dict(); - ks_assert(a); - ben_dict_set(message->data, ben_blob("a", 1), a); - - if (args) *args = a; - - return KS_STATUS_SUCCESS; -} - KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, uint8_t *transactionid, ks_size_t transactionid_length, @@ -160,33 +133,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, return KS_STATUS_SUCCESS; } -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message, - uint8_t *transactionid, - ks_size_t transactionid_length, - struct bencode **args) -{ - struct bencode *e; - - ks_assert(message); - ks_assert(transactionid); - - ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); - ben_dict_set(message->data, ben_blob("y", 1), ben_blob("e", 1)); - - // @note r joins message->data and will be freed with it - e = ben_list(); - ks_assert(e); - ben_dict_set(message->data, ben_blob("e", 1), e); - - if (args) *args = e; - - return KS_STATUS_SUCCESS; -} - - /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/src/dht/ks_dht_transaction.c b/libs/libks/src/dht/ks_dht_transaction.c index 6d6b7baf35..0f0458329c 100644 --- a/libs/libks/src/dht/ks_dht_transaction.c +++ b/libs/libks/src/dht/ks_dht_transaction.c @@ -3,22 +3,22 @@ KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transaction, ks_pool_t *pool, - ks_sockaddr_t *raddr, + ks_dht_job_t *job, uint32_t transactionid, - ks_dht_message_callback_t callback) + ks_dht_job_callback_t callback) { ks_dht_transaction_t *t; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(transaction); ks_assert(pool); - ks_assert(raddr); + ks_assert(job); *transaction = t = ks_pool_alloc(pool, sizeof(ks_dht_transaction_t)); ks_assert(t); t->pool = pool; - t->raddr = *raddr; + t->job = job; t->transactionid = transactionid; t->callback = callback; t->expiration = ks_time_now() + (KS_DHT_TRANSACTION_EXPIRATION * 1000); diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index 36dc09e431..d160a0cc45 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -10,7 +10,7 @@ ks_status_t dht_z_callback(ks_dht_t *dht, ks_dht_message_t *message) { diag("dht_z_callback\n"); ok(message->transactionid[0] == '4' && message->transactionid[1] == '2'); - ks_dht_send_error(dht, message->endpoint, &message->raddr, message->transactionid, message->transactionid_length, 201, "Generic test error"); + ks_dht_error(dht, message->endpoint, &message->raddr, message->transactionid, message->transactionid_length, 201, "Generic test error"); return KS_STATUS_SUCCESS; } @@ -135,18 +135,21 @@ int main() { diag("Ping test\n"); - ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1 + //ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1 + ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING) - ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 + ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING) ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet - ks_dht_pulse(dht2, 100); // Receive and process ping response from dht1 + ks_dht_pulse(dht2, 100); // Receive and process ping response from dht1 (PROCESSING then COMPLETING) ok(ks_dhtrt_find_node(dht2->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good + ks_dht_pulse(dht2, 100); // (COMPLETING) + diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up for (int i = 0; i < 10; ++i) { //diag("DHT 1\n"); @@ -160,7 +163,8 @@ int main() { diag("Find_Node test\n"); - ks_dht_send_findnode(dht3, ep3, &raddr1, &ep2->nodeid); // Queue findnode from dht3 to dht1 + //ks_dht_send_findnode(dht3, ep3, &raddr1, &ep2->nodeid); // Queue findnode from dht3 to dht1 + ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid); ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1