From 183116452ba98cfd615ae97b440fb96ebb6f68ac Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Tue, 3 Jan 2017 07:09:02 +0000 Subject: [PATCH] FS-9775: A bunch of stuff related to chaining multiple jobs, bug fixes, few other changes --- libs/libks/Makefile.am | 3 +- libs/libks/src/dht/ks_dht-int.h | 44 +- libs/libks/src/dht/ks_dht.c | 783 ++++++++++++++---------- libs/libks/src/dht/ks_dht.h | 164 +++-- libs/libks/src/dht/ks_dht_bucket.c | 4 +- libs/libks/src/dht/ks_dht_distribute.c | 65 ++ libs/libks/src/dht/ks_dht_job.c | 39 +- libs/libks/src/dht/ks_dht_publish.c | 62 ++ libs/libks/src/dht/ks_dht_search.c | 15 +- libs/libks/src/dht/ks_dht_storageitem.c | 33 + libs/libks/src/ks_thread_pool.c | 8 +- libs/libks/test/testdht2.c | 216 ++++--- 12 files changed, 925 insertions(+), 511 deletions(-) create mode 100644 libs/libks/src/dht/ks_dht_distribute.c create mode 100644 libs/libks/src/dht/ks_dht_publish.c diff --git a/libs/libks/Makefile.am b/libs/libks/Makefile.am index 6d8c5c466f..ae74a81dbd 100644 --- a/libs/libks/Makefile.am +++ b/libs/libks/Makefile.am @@ -14,7 +14,8 @@ libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_datagram.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c -libks_la_SOURCES += src/dht/ks_dht_job.c src/dht/ks_dht_search.c src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c +libks_la_SOURCES += src/dht/ks_dht_job.c src/dht/ks_dht_search.c src/dht/ks_dht_publish.c src/dht/ks_dht_distribute.c src/dht/ks_dht_storageitem.c +libks_la_SOURCES += src/dht/ks_dht_bucket.c libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c #aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 76d797ef1c..2e8ce749e3 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -22,11 +22,11 @@ KS_BEGIN_EXTERN_C KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint); /** - * Called internally to expire search data. - * Handles completing and purging of finished searches. + * Called internally to expire or reannounce storage item data. + * Handles reannouncing and purging of expiring storage items. * @param dht pointer to the dht instance */ -KS_DECLARE(void) ks_dht_pulse_searches(ks_dht_t *dht); +KS_DECLARE(void) ks_dht_pulse_storageitems(ks_dht_t *dht); /** * Called internally to process job state machine. @@ -276,19 +276,18 @@ KS_DECLARE(void) ks_dht_datagram_destroy(ks_dht_datagram_t **datagram); KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job, ks_pool_t *pool, const ks_sockaddr_t *raddr, - int32_t attempts); + int32_t attempts, + void *data); KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback); KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job, - ks_dht_search_t *search, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback, ks_dht_nodeid_t *target); KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job, - ks_dht_search_t *search, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback, ks_dht_nodeid_t *target, - uint8_t *salt, + const uint8_t *salt, ks_size_t salt_length); KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, @@ -296,6 +295,9 @@ KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job, ks_dht_token_t *token, int64_t cas, ks_dht_storageitem_t *item); +KS_DECLARE(void) ks_dht_job_build_search(ks_dht_job_t *job, + ks_dht_job_callback_t query_callback, + ks_dht_job_callback_t finish_callback); KS_DECLARE(void) ks_dht_job_build_search_findnode(ks_dht_job_t *job, ks_dht_nodeid_t *target, uint32_t family, @@ -344,9 +346,35 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, /** * */ -KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback); +KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, + ks_pool_t *pool, + ks_dhtrt_routetable_t *table, + const ks_dht_nodeid_t *target, + ks_dht_job_callback_t callback, + void *data); KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search); +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_publish_create(ks_dht_publish_t **publish, + ks_pool_t *pool, + ks_dht_job_callback_t callback, + void *data, + int64_t cas, + ks_dht_storageitem_t *item); +KS_DECLARE(void) ks_dht_publish_destroy(ks_dht_publish_t **publish); + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_distribute_create(ks_dht_distribute_t **distribute, + ks_pool_t *pool, + ks_dht_storageitem_callback_t callback, + void *data, + int64_t cas, + ks_dht_storageitem_t *item); +KS_DECLARE(void) ks_dht_distribute_destroy(ks_dht_distribute_t **distribute); /** * diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index a40c1cf8c5..7007ddb8eb 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -175,17 +175,21 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread d->rt_ipv6 = NULL; /** - * Create the mutex to handle searches list. + * Default tokens expirations to not be checked for one pulse. */ - ks_mutex_create(&d->searches_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool); - ks_assert(d->searches_mutex); - + d->tokens_pulse = ks_time_now() + ((ks_time_t)KS_DHT_TOKENS_PULSE * KS_USEC_PER_SEC); + /** * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets. */ d->token_secret_current = d->token_secret_previous = rand(); d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKEN_EXPIRATION * KS_USEC_PER_SEC); + /** + * Default storageitems expirations to not be checked for one pulse. + */ + d->storageitems_pulse = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEMS_PULSE * KS_USEC_PER_SEC); + /** * Create the hash to store arbitrary data for BEP44. */ @@ -229,6 +233,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) * Cleanup the storageitems hash and it's contents if it is allocated. */ if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash); + d->storageitems_pulse = 0; /** * Zero out the opaque write token variables. @@ -236,15 +241,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) d->token_secret_current = 0; d->token_secret_previous = 0; d->token_secret_expiration = 0; - - /** - * Cleanup the search mutex and searches if they are allocated. - */ - if (d->searches_mutex) ks_mutex_destroy(&d->searches_mutex); - for (ks_dht_search_t *search = d->searches_first, *searchn = NULL; search; search = searchn) { - searchn = search->next; - ks_dht_search_destroy(&search); - } + d->tokens_pulse = 0; /** * Cleanup the route tables if they are allocated. @@ -258,6 +255,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) d->transactionid_next = 0; if (d->transactionid_mutex) ks_mutex_destroy(&d->transactionid_mutex); if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash); + d->transactions_pulse = 0; /** * Cleanup the jobs mutex and jobs if they are allocated. @@ -290,8 +288,6 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) /** * Probably don't need this */ - d->transactions_pulse = 0; - d->endpoints_length = 0; d->endpoints_size = 0; @@ -624,10 +620,7 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4); if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6); - ks_dht_pulse_searches(dht); - - // @todo pulse_storageitems for keepalive and expiration - // hold keepalive counter on items to determine what to reannounce vs expire + ks_dht_pulse_storageitems(dht); ks_dht_pulse_jobs(dht); @@ -638,43 +631,58 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) ks_dht_pulse_tokens(dht); } -KS_DECLARE(void) ks_dht_pulse_searches(ks_dht_t *dht) +KS_DECLARE(void) ks_dht_pulse_storageitems(ks_dht_t *dht) { - ks_dht_search_t *searches_first = NULL; - ks_dht_search_t *searches_last = NULL; - + ks_hash_iterator_t *it = NULL; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_time_t now = ks_time_now(); + ks_assert(dht); - ks_mutex_lock(dht->searches_mutex); - for (ks_dht_search_t *search = dht->searches_first, *searchn = NULL, *searchp = NULL; search; search = searchn) { - ks_bool_t done = KS_FALSE; - searchn = search->next; + if (dht->storageitems_pulse > now) return; + dht->storageitems_pulse = now + ((ks_time_t)KS_DHT_STORAGEITEMS_PULSE * KS_USEC_PER_SEC); - ks_mutex_lock(search->mutex); - done = ks_hash_count(search->searching) == 0; - - if (done) { - if (!searchp && !searchn) dht->searches_first = dht->searches_last = NULL; - else if (!searchp) dht->searches_first = searchn; - else if (!searchn) { - dht->searches_last = searchp; - dht->searches_last->next = NULL; + ks_hash_write_lock(dht->storageitems_hash); + for (it = ks_hash_first(dht->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + const void *key = NULL; + ks_dht_storageitem_t *value = NULL; + ks_bool_t remove = KS_FALSE; + + ks_hash_this(it, &key, NULL, (void **)&value); + + ks_mutex_lock(value->mutex); + + if (value->keepalive <= now) { + value->keepalive = now + ((ks_time_t)KS_DHT_STORAGEITEM_KEEPALIVE * KS_USEC_PER_SEC); + if (value->refc > 0) { + value->expiration = now + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); + ks_log(KS_LOG_DEBUG, "Item keepalive %s\n", ks_dht_hex(value->id.id, id_buf, KS_DHT_NODEID_SIZE)); + if (dht->rt_ipv4) ks_dht_distribute(dht, NULL, NULL, dht->rt_ipv4, 0, value); + if (dht->rt_ipv6) ks_dht_distribute(dht, NULL, NULL, dht->rt_ipv6, 0, value); } - else searchp->next = searchn; + } + + remove = value->refc == 0 && value->expiration <= now; + if (remove) ks_hash_remove(dht->storageitems_hash, (void *)key); - search->next = NULL; - if (searches_last) searches_last = searches_last->next = search; - else searches_first = searches_last = search; - } else searchp = search; - ks_mutex_unlock(search->mutex); - } - ks_mutex_unlock(dht->searches_mutex); + ks_mutex_unlock(value->mutex); - for (ks_dht_search_t *search = searches_first, *searchn = NULL; search; search = searchn) { - searchn = search->next; - if (search->callback) search->callback(dht, search); - ks_dht_search_destroy(&search); + if (remove) { + ks_log(KS_LOG_DEBUG, "Item expired %s\n", ks_dht_hex(value->id.id, id_buf, KS_DHT_NODEID_SIZE)); + ks_dht_storageitem_destroy(&value); + } } + ks_hash_write_unlock(dht->storageitems_hash); +} + +KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job) +{ + ks_assert(dht); + ks_assert(job); + ks_mutex_lock(dht->jobs_mutex); + if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = job; + else dht->jobs_first = dht->jobs_last = job; + ks_mutex_unlock(dht->jobs_mutex); } KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht) @@ -686,21 +694,24 @@ KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht) ks_mutex_lock(dht->jobs_mutex); for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) { - ks_bool_t done = KS_FALSE; jobn = job->next; if (job->state == KS_DHT_JOB_STATE_QUERYING) { job->state = KS_DHT_JOB_STATE_RESPONDING; - if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING; + if (job->query_callback(dht, job) != KS_STATUS_SUCCESS) { + job->result = KS_DHT_JOB_RESULT_FAILURE; + job->state = KS_DHT_JOB_STATE_COMPLETING; + } } if (job->state == KS_DHT_JOB_STATE_EXPIRING) { job->attempts--; if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING; - else done = KS_TRUE; + else { + job->result = KS_DHT_JOB_RESULT_EXPIRED; + job->state = KS_DHT_JOB_STATE_COMPLETING; + } } - if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE; - - if (done) { + if (job->state == KS_DHT_JOB_STATE_COMPLETING) { if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL; else if (!jobp) dht->jobs_first = jobn; else if (!jobn) { @@ -1359,8 +1370,11 @@ KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht, transactionid = dht->transactionid_next++; ks_mutex_unlock(dht->transactionid_mutex); - if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; + ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback); + ks_assert(trans); + + ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE); + ks_assert(msg); // if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done; transactionid = htonl(transactionid); @@ -1416,16 +1430,8 @@ KS_DECLARE(ks_status_t) ks_dht_response_setup(ks_dht_t *dht, if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) { - ks_dht_error(dht, - ep, - raddr, - transactionid, - transactionid_length, - 202, - "Internal message create error"); - goto done; - } + ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE); + ks_assert(msg); ben_dict_set(msg->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("r", 1)); @@ -1472,7 +1478,8 @@ KS_DECLARE(void *) ks_dht_process(ks_thread_t *thread, void *data) // @todo blacklist check for bad actor nodes - if (ks_dht_message_create(&message, datagram->dht->pool, datagram->endpoint, &datagram->raddr, KS_FALSE) != KS_STATUS_SUCCESS) goto done; + ks_dht_message_create(&message, datagram->dht->pool, datagram->endpoint, &datagram->raddr, KS_FALSE); + ks_assert(message); if (ks_dht_message_parse(message, datagram->buffer, datagram->buffer_length) != KS_STATUS_SUCCESS) goto done; @@ -1649,7 +1656,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t message->raddr.port, KS_DHTRT_CREATE_TOUCH, &node)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE)); if ((ret = ks_dhtrt_touch_node(message->endpoint->node->table, *id)) != KS_STATUS_SUCCESS) goto done; @@ -1673,185 +1679,329 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t transaction->job->raddr.host, transaction->job->raddr.port); } else { + ks_dhtrt_sharelock_node(node); transaction->job->response = message; - transaction->job->response_id = message->args_id; + transaction->job->response_id = node; message->transaction = transaction; - if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING; - else transaction->job->state = KS_DHT_JOB_STATE_COMPLETING; + if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->result = KS_DHT_JOB_RESULT_FAILURE; + transaction->job->state = KS_DHT_JOB_STATE_COMPLETING; transaction->job->response = NULL; // message is destroyed after we return, stop using it transaction->finished = KS_TRUE; } done: + if (node) ks_dhtrt_release_node(node); return ret; } KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_job_t *job) { + ks_dht_search_t *search = NULL; ks_dht_node_t **nodes = NULL; ks_size_t nodes_count = 0; - ks_dht_node_t *node = NULL; + ks_dht_nodeid_t distance; + int32_t results_index = -1; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(job); - ks_assert(job->search); + ks_assert(job->data); - ks_mutex_lock(job->search->mutex); - ks_hash_remove(job->search->searching, job->response_id.id); + search = (ks_dht_search_t *)job->data; + + ks_mutex_lock(search->mutex); + search->searching--; + if (job->result != KS_DHT_JOB_RESULT_SUCCESS) goto done; + + ks_dht_utility_nodeid_xor(&distance, &job->response_id->nodeid, &search->target); + if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) { + results_index = search->results_length; + search->results_length++; + } else { + for (int32_t index = 0; index < search->results_length; ++index) { + // Check if responding node is closer than the current result + if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) { + // Only existing results which are further from the target than the responding node are considered for replacement + + // If this is the first node that is further then keep it and keep looking for existing results which are further than this result + // If additional results are further, and the current result is further than a previous result, use the current result as furthest to replace + if (results_index < 0) results_index = index; + else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index; + } + } + } + + if (results_index >= 0) { + // The results are either not full yet, or this responding node is closer than the furthest existing result + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + + ks_log(KS_LOG_DEBUG, + "Set closer node id %s (%s) in search of target id %s at results index %d\n", + ks_dht_hex(job->response_id->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), + ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE), + ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE), + results_index); + + if (search->results[results_index]) ks_dhtrt_release_node(search->results[results_index]); + ks_dhtrt_sharelock_node(job->response_id); + + search->results[results_index] = job->response_id; + search->distances[results_index] = distance; + } + nodes = job->raddr.family == AF_INET ? job->response_nodes : job->response_nodes6; nodes_count = job->raddr.family == AF_INET ? job->response_nodes_count : job->response_nodes6_count; for (int32_t i = 0; i < nodes_count; ++i) { - ks_dht_nodeid_t distance; - int32_t results_index = -1; - - node = nodes[i]; + ks_bool_t closer = KS_FALSE; + ks_dht_node_t *node = nodes[i]; - if (ks_hash_search(job->search->searched, node->nodeid.id, KS_UNLOCKED) != 0) continue; + // skip duplicates already searched + if (ks_hash_search(search->searched, node->nodeid.id, KS_UNLOCKED) != 0) continue; - ks_dht_utility_nodeid_xor(&distance, &node->nodeid, &job->search->target); - if (job->search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) { - results_index = job->search->results_length; - job->search->results_length++; - } else { - for (int32_t index = 0; index < job->search->results_length; ++index) { - // Check if new node is closer than this previous result - if (memcmp(distance.id, job->search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) { - // If this is the first node that is further then keep it - // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result - if (results_index < 0) results_index = index; - else if (memcmp(job->search->distances[index].id, job->search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index; - } - } + // calculate distance of new node from target + ks_dht_utility_nodeid_xor(&distance, &node->nodeid, &search->target); + + // if the results are not full, or the new node is closer than any result then the new node should be checked + if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) closer = KS_TRUE; + for (int32_t index = 0; !closer && index < search->results_length; ++index) { + // Check if new node is closer than this current result + closer = memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0; } - if (results_index >= 0) { - char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; - char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1]; - char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1]; - - ks_log(KS_LOG_DEBUG, - "Set closer node id %s (%s) in search of target id %s at results index %d\n", - ks_dht_hex(node->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), - ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE), - ks_dht_hex(job->search->target.id, id3_buf, KS_DHT_NODEID_SIZE), - results_index); - - if (job->search->results[results_index]) ks_dhtrt_release_node(job->search->results[results_index]); - job->search->results[results_index] = node; - job->search->distances[results_index] = distance; - - ks_hash_insert(job->search->searched, node->nodeid.id, (void *)KS_TRUE); - ks_hash_insert(job->search->searching, node->nodeid.id, (void *)KS_TRUE); + if (closer) { + // track new node as searched and searching then send off a findnode query to validate it as a result and end up back here for new closer nodes + ks_hash_insert(search->searched, node->nodeid.id, (void *)KS_TRUE); + search->searching++; - ks_dhtrt_sharelock_node(node); - - if ((ret = ks_dht_findnode(dht, job->search, &node->addr, ks_dht_search_findnode_callback, &job->search->target)) != KS_STATUS_SUCCESS) goto done; + ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target); } } - + done: - ks_mutex_unlock(job->search->mutex); + ks_mutex_unlock(search->mutex); + + if (search->searching == 0) { + if (search->callback) search->callback(dht, job); + ks_dht_search_destroy(&search); + } return ret; } -KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht, - int32_t family, - ks_dht_nodeid_t *target, - ks_dht_search_callback_t callback, - ks_dht_search_t **search) +KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job) { - ks_bool_t locked_searches = KS_FALSE; ks_bool_t locked_search = KS_FALSE; - ks_dhtrt_routetable_t *rt = NULL; - ks_dht_search_t *s = NULL; + ks_dht_search_t *search = NULL; ks_dhtrt_querynodes_t query; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); - ks_assert(family == AF_INET || family == AF_INET6); - ks_assert(target); + ks_assert(job); + ks_assert(job->data); - if (search) *search = NULL; + search = (ks_dht_search_t *)job->data; - if (family == AF_INET) { - if (!dht->rt_ipv4) { - ret = KS_STATUS_FAIL; - goto done; - } - rt = dht->rt_ipv4; - } else { - if (!dht->rt_ipv6) { - ret = KS_STATUS_FAIL; - goto done; - } - rt = dht->rt_ipv6; - } - - ks_mutex_lock(dht->searches_mutex); - locked_searches = KS_TRUE; - - if ((ret = ks_dht_search_create(&s, dht->pool, target, callback)) != KS_STATUS_SUCCESS) goto done; - - if (dht->searches_last) dht->searches_last = dht->searches_last->next = s; - else dht->searches_first = dht->searches_last = s; - - ks_mutex_lock(s->mutex); + ks_mutex_lock(search->mutex); locked_search = KS_TRUE; - // release searches lock now, but search is still locked - ks_mutex_unlock(dht->searches_mutex); - locked_searches = KS_FALSE; - // find closest good nodes to target locally and store as the closest results - query.nodeid = *target; + query.nodeid = search->target; query.type = KS_DHT_REMOTE; query.max = KS_DHT_SEARCH_RESULTS_MAX_SIZE; - query.family = family; + query.family = search->table == dht->rt_ipv4 ? AF_INET : AF_INET6; query.count = 0; - ks_dhtrt_findclosest_nodes(rt, &query); + ks_dhtrt_findclosest_nodes(search->table, &query); for (int32_t i = 0; i < query.count; ++i) { - ks_dht_node_t *n = query.nodes[i]; - ks_bool_t searched = KS_FALSE; + ks_dht_node_t *node = query.nodes[i]; - // always take the initial local closest good nodes as results, they are already good nodes that are closest with no results yet - s->results[s->results_length] = n; - ks_dht_utility_nodeid_xor(&s->distances[s->results_length], &n->nodeid, &s->target); - s->results_length++; - - searched = ks_hash_search(s->searched, n->nodeid.id, KS_UNLOCKED) != 0; - if (searched) continue; // skip duplicates, this really shouldn't happen on a new search but we sanity check - - ks_hash_insert(s->searched, n->nodeid.id, (void *)KS_TRUE); - ks_hash_insert(s->searching, n->nodeid.id, (void *)KS_TRUE); - - ks_dhtrt_sharelock_node(n); + // skip duplicates already searched, this really shouldn't happen on a new search but we sanity check + if (ks_hash_search(search->searched, node->nodeid.id, KS_UNLOCKED) != 0) continue; - if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) { - ks_dhtrt_release_querynodes(&query); - goto done; - } + ks_hash_insert(search->searched, node->nodeid.id, (void *)KS_TRUE); + search->searching++; + + ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target); } ks_dhtrt_release_querynodes(&query); - ks_mutex_unlock(s->mutex); - locked_search = KS_FALSE; - if (search) *search = s; + // done: + if (locked_search) ks_mutex_unlock(search->mutex); + + if (search->searching == 0) { + if (search->callback) search->callback(dht, job); + ks_dht_search_destroy(&search); + } + + return ret; +} + +KS_DECLARE(void) ks_dht_search(ks_dht_t *dht, + ks_dht_job_callback_t callback, + void *data, + ks_dhtrt_routetable_t *table, + ks_dht_nodeid_t *target) +{ + ks_dht_search_t *search = NULL; + ks_dht_job_t *job = NULL; + + ks_assert(dht); + ks_assert(table); + ks_assert(target); + + ks_dht_search_create(&search, dht->pool, table, target, callback, data); + ks_assert(search); + + ks_dht_job_create(&job, dht->pool, NULL, 3, search); + ks_assert(job); + + ks_dht_job_build_search(job, ks_dht_query_search, NULL); + ks_dht_jobs_add(dht, job); +} + +KS_DECLARE(ks_status_t) ks_dht_publish_get_callback(ks_dht_t *dht, ks_dht_job_t *job) +{ + ks_dht_publish_t *publish = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(dht); + ks_assert(job); + ks_assert(job->data); + + publish = (ks_dht_publish_t *)job->data; + + // @todo callbacks need job to contain cascaded publish->data before calling + if (job->result != KS_DHT_JOB_RESULT_SUCCESS) { + job->data = publish->data; + if (publish->callback) publish->callback(dht, job); + goto done; + } + + if (!job->response_hasitem || (publish->item->mutable && job->response_seq < publish->item->seq)) { + ks_dht_put(dht, &job->raddr, publish->callback, publish->data, &job->response_token, publish->cas, publish->item); + } else if (publish->callback) { + job->data = publish->data; + publish->callback(dht, job); + } done: - if (locked_searches) ks_mutex_unlock(dht->searches_mutex); - if (locked_search) ks_mutex_unlock(s->mutex); - if (ret != KS_STATUS_SUCCESS) { - //if (s) ks_dht_search_destroy(&s); - *search = NULL; - } + + ks_dht_publish_destroy(&publish); return ret; } +KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + void *data, + int64_t cas, + ks_dht_storageitem_t *item) +{ + ks_dht_publish_t *publish = NULL; + const uint8_t *salt = NULL; + size_t salt_length = 0; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(cas >= 0); + ks_assert(item); + + if (item->salt) { + salt = (const uint8_t *)ben_str_val(item->salt); + salt_length = ben_str_len(item->salt); + } + + ks_dht_publish_create(&publish, dht->pool, callback, data, cas, item); + ks_assert(publish); + + ks_dht_get(dht, raddr, ks_dht_publish_get_callback, publish, &item->id, salt, salt_length); +} + +KS_DECLARE(ks_status_t) ks_dht_distribute_publish_callback(ks_dht_t *dht, ks_dht_job_t *job) +{ + ks_dht_distribute_t *distribute = NULL; + ks_bool_t finished = KS_FALSE; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(dht); + ks_assert(job); + ks_assert(job->data); + + distribute = (ks_dht_distribute_t *)job->data; + ks_mutex_lock(distribute->mutex); + distribute->publishing--; + finished = distribute->publishing == 0; + ks_mutex_unlock(distribute->mutex); + + if (finished) { + if (distribute->callback) distribute->callback(dht, distribute->item); + ks_dht_distribute_destroy(&distribute); + } + + return ret; +} + +KS_DECLARE(ks_status_t) ks_dht_distribute_search_callback(ks_dht_t *dht, ks_dht_job_t *job) +{ + ks_dht_search_t *search = NULL; + ks_dht_distribute_t *distribute = NULL; + ks_bool_t finished = KS_FALSE; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(dht); + ks_assert(job); + ks_assert(job->data); + + search = (ks_dht_search_t *)job->data; + ks_assert(search->data); + + distribute = (ks_dht_distribute_t *)search->data; + + ks_mutex_lock(distribute->mutex); + for (int32_t index = 0; index < search->results_length; ++index) { + ks_dht_node_t *node = search->results[index]; + if (node->type == KS_DHT_LOCAL) continue; + + distribute->publishing++; + ks_dht_publish(dht, &node->addr, ks_dht_distribute_publish_callback, distribute, distribute->cas, distribute->item); + } + finished = distribute->publishing == 0; + ks_mutex_unlock(distribute->mutex); + + if (finished) { + if (distribute->callback) distribute->callback(dht, distribute->item); + ks_dht_distribute_destroy(&distribute); + } + + ks_dht_search_destroy(&search); + + return ret; +} + +KS_DECLARE(void) ks_dht_distribute(ks_dht_t *dht, + ks_dht_storageitem_callback_t callback, + void *data, + ks_dhtrt_routetable_t *table, + int64_t cas, + ks_dht_storageitem_t *item) +{ + ks_dht_distribute_t *distribute = NULL; + + ks_assert(dht); + ks_assert(table); + ks_assert(cas >= 0); + ks_assert(item); + + ks_dht_distribute_create(&distribute, dht->pool, callback, data, cas, item); + ks_assert(distribute); + + ks_dht_search(dht, ks_dht_distribute_search_callback, distribute, table, &item->id); +} + KS_DECLARE(void) ks_dht_storageitems_read_lock(ks_dht_t *dht) { ks_assert(dht); @@ -1878,10 +2028,15 @@ KS_DECLARE(void) ks_dht_storageitems_write_unlock(ks_dht_t *dht) KS_DECLARE(ks_dht_storageitem_t *) ks_dht_storageitems_find(ks_dht_t *dht, ks_dht_nodeid_t *target) { + ks_dht_storageitem_t *item = NULL; + ks_assert(dht); ks_assert(target); - return ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED); + item = ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED); + if (item) ks_dht_storageitem_reference(item); + + return item; } KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storageitem_t *item) @@ -1912,13 +2067,12 @@ KS_DECLARE(ks_status_t) ks_dht_error(ks_dht_t *dht, if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; + ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE); + ks_assert(error); - //if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done; ben_dict_set(error->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); ben_dict_set(error->data, ben_blob("y", 1), ben_blob("e", 1)); - // @note e joins error->data and will be freed with it e = ben_list(); ks_assert(e); ben_dict_set(error->data, ben_blob("e", 1), e); @@ -1997,7 +2151,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me ret = KS_STATUS_FAIL; goto done; } - + transaction->job->result = KS_DHT_JOB_RESULT_ERROR; + transaction->job->error_code = errorcode; + transaction->job->error_description = ben_clone(es); + transaction->job->state = KS_DHT_JOB_STATE_COMPLETING; transaction->finished = KS_TRUE; ks_hash_read_lock(dht->registry_error); @@ -2011,35 +2168,21 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me return ret; } -KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job) -{ - ks_assert(dht); - ks_assert(job); - ks_mutex_lock(dht->jobs_mutex); - if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = job; - else dht->jobs_first = dht->jobs_last = job; - ks_mutex_unlock(dht->jobs_mutex); -} - -KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback) +KS_DECLARE(void) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, void *data) { ks_dht_job_t *job = NULL; - ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(raddr); //ks_log(KS_LOG_DEBUG, "Starting ping!\n"); - if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done; + ks_dht_job_create(&job, dht->pool, raddr, 3, data); + ks_assert(job); + ks_dht_job_build_ping(job, ks_dht_query_ping, callback); ks_dht_jobs_add(dht, job); - - // next step in ks_dht_pulse_jobs with QUERYING state - - done: - return ret; } KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job) @@ -2105,27 +2248,23 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t } -KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, - ks_dht_search_t *search, - const ks_sockaddr_t *raddr, - ks_dht_job_callback_t callback, - ks_dht_nodeid_t *target) +KS_DECLARE(void) ks_dht_findnode(ks_dht_t *dht, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + void *data, + ks_dht_nodeid_t *target) { ks_dht_job_t *job = NULL; - ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(raddr); ks_assert(target); - if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done; - ks_dht_job_build_findnode(job, search, ks_dht_query_findnode, callback, target); + ks_dht_job_create(&job, dht->pool, raddr, 3, data); + ks_assert(job); + + ks_dht_job_build_findnode(job, ks_dht_query_findnode, callback, target); ks_dht_jobs_add(dht, job); - - // next step in ks_dht_pulse_jobs with QUERYING state - - done: - return ret; } KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job) @@ -2366,27 +2505,25 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j } -KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht, - ks_dht_search_t *search, - const ks_sockaddr_t *raddr, - ks_dht_job_callback_t callback, - ks_dht_nodeid_t *target, - uint8_t *salt, - ks_size_t salt_length) +KS_DECLARE(void) ks_dht_get(ks_dht_t *dht, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + void *data, + ks_dht_nodeid_t *target, + const uint8_t *salt, + ks_size_t salt_length) { ks_dht_job_t *job = NULL; - ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(raddr); ks_assert(target); - if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done; - ks_dht_job_build_get(job, search, ks_dht_query_get, callback, target, salt, salt_length); + ks_dht_job_create(&job, dht->pool, raddr, 3, data); + ks_assert(job); + + ks_dht_job_build_get(job, ks_dht_query_get, callback, target, salt, salt_length); ks_dht_jobs_add(dht, job); - - done: - return ret; } KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job) @@ -2407,13 +2544,18 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job) &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_hash_read_lock(dht->storageitems_hash); - item = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED); + item = ks_dht_storageitems_find(dht, &job->query_target); + if (item) ks_mutex_lock(item->mutex); ks_hash_read_unlock(dht->storageitems_hash); if (item && item->mutable && item->seq > 0) ben_dict_set(a, ben_blob("seq", 3), ben_int(item->seq)); ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE)); //ks_log(KS_LOG_DEBUG, "Sending message query get\n"); + if (item) { + ks_dht_storageitem_dereference(item); + ks_mutex_unlock(item->mutex); + } ks_q_push(dht->send_q, (void *)message); return KS_STATUS_SUCCESS; @@ -2460,11 +2602,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token); ks_hash_read_lock(dht->storageitems_hash); - item = ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED); + item = ks_dht_storageitems_find(dht, target); + if (item) { + ks_mutex_lock(item->mutex); + item->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); + } ks_hash_read_unlock(dht->storageitems_hash); + // If the item is mutable and available locally and a specific sequence was requested and the local item is not newer then do not send k, sig, or v back sequence_snuffed = item && sequence >= 0 && item->seq <= sequence; - // @todo if sequence is provided then requester has the data, so if the local sequence is lower maybe send a get to the requester to update local data? + // @todo if sequence is explicitly provided then requester has the data, so if the local sequence is lower + // maybe send a get query to the requester to update the local data query.nodeid = *target; query.type = KS_DHT_REMOTE; @@ -2553,6 +2701,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ks_q_push(dht->send_q, (void *)response); done: + if (item) { + ks_dht_storageitem_dereference(item); + ks_mutex_unlock(item->mutex); + } return ret; } @@ -2589,7 +2741,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t if ((ret = ks_dht_utility_extract_storageitem_signature(job->response->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done; seq = ben_dict_get_by_str(job->response->args, "seq"); - if (seq) sequence = ben_int_val(seq); + if (seq) { + sequence = ben_int_val(seq); + job->response_hasitem = KS_TRUE; + job->response_seq = sequence; + } if (seq && ((k && !sig) || (!k && sig))) { ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data"); @@ -2598,6 +2754,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t } v = ben_dict_get_by_str(job->response->args, "v"); + if (v) job->response_hasitem = KS_TRUE; //if (v) v_len = ben_str_len(v); n = ben_dict_get_by_str(job->response->args, "nodes"); @@ -2650,7 +2807,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t ks_hash_write_lock(dht->storageitems_hash); storageitems_locked = KS_TRUE; - olditem = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED); + olditem = ks_dht_storageitems_find(dht, &job->query_target); + if (olditem) ks_mutex_lock(olditem->mutex); if (v) { ks_dht_nodeid_t tmptarget; @@ -2664,11 +2822,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t goto done; } if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); - else if ((ret = ks_dht_storageitem_create_immutable_internal(&item, - dht->pool, - &tmptarget, - v, - KS_TRUE)) != KS_STATUS_SUCCESS) goto done; + else { + ks_dht_storageitem_create_immutable_internal(&item, dht->pool, &tmptarget, v, KS_TRUE); + ks_assert(item); + } } else { // mutable if ((ret = ks_dht_storageitem_target_mutable_internal(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done; @@ -2692,19 +2849,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t if (ben_cmp(olditem->v, v) != 0) { goto done; } - } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig); + } else { + ks_dht_storageitem_update_mutable(olditem, v, sequence, sig); + if (olditem->callback) olditem->callback(dht, olditem); + } olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); } - else if ((ret = ks_dht_storageitem_create_mutable_internal(&item, - dht->pool, - &tmptarget, - v, - KS_TRUE, - k, - job->query_salt, - KS_TRUE, - sequence, - sig)) != KS_STATUS_SUCCESS) goto done; + else { + ks_dht_storageitem_create_mutable_internal(&item, dht->pool, &tmptarget, v, KS_TRUE, k, job->query_salt, KS_TRUE, sequence, sig); + ks_assert(item); + } } if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item); } else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); @@ -2712,35 +2866,40 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t if (item) job->response_storageitem = item; else if (olditem) job->response_storageitem = olditem; + if (job->response_storageitem) ks_dht_storageitem_reference(job->response_storageitem); + done: + if (olditem) { + ks_dht_storageitem_dereference(olditem); + ks_mutex_unlock(olditem->mutex); + } + if (item) ks_dht_storageitem_dereference(item); if (ret != KS_STATUS_SUCCESS) { } if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash); return ret; } -// @todo add reference counting system to storageitem_t to know what to keep alive with reannouncements versus allowing to expire -KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht, - const ks_sockaddr_t *raddr, - ks_dht_job_callback_t callback, - ks_dht_token_t *token, - int64_t cas, - ks_dht_storageitem_t *item) +KS_DECLARE(void) ks_dht_put(ks_dht_t *dht, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + void *data, + ks_dht_token_t *token, + int64_t cas, + ks_dht_storageitem_t *item) { ks_dht_job_t *job = NULL; - ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(raddr); ks_assert(token); ks_assert(item); - if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done; + ks_dht_job_create(&job, dht->pool, raddr, 3, data); + ks_assert(job); + ks_dht_job_build_put(job, ks_dht_query_put, callback, token, cas, item); ks_dht_jobs_add(dht, job); - - done: - return ret; } KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job) @@ -2907,7 +3066,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t ks_hash_write_lock(dht->storageitems_hash); storageitems_locked = KS_TRUE; - olditem = ks_hash_search(dht->storageitems_hash, target.id, KS_UNLOCKED); + olditem = ks_dht_storageitems_find(dht, &target); + if (olditem) ks_mutex_lock(olditem->mutex); if (!ks_dht_token_verify(dht, &message->raddr, &target, token)) { ks_log(KS_LOG_DEBUG, "Invalid token\n"); @@ -2928,19 +3088,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t if (!seq) { // immutable if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); - else if ((ret = ks_dht_storageitem_create_immutable_internal(&item, - dht->pool, - &target, - v, - KS_TRUE)) != KS_STATUS_SUCCESS) { - ks_dht_error(dht, - message->endpoint, - &message->raddr, - message->transactionid, - message->transactionid_length, - 202, - "Internal storage item create immutable error"); - goto done; + else { + ks_dht_storageitem_create_immutable_internal(&item, dht->pool, &target, v, KS_TRUE); + ks_assert(item); } } else { // mutable @@ -2989,27 +3139,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t "Message query put sequence is equal to current but values are different"); goto done; } - } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig); + } else { + ks_dht_storageitem_update_mutable(olditem, v, sequence, sig); + if (olditem->callback) olditem->callback(dht, olditem); + } olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); } - else if ((ret = ks_dht_storageitem_create_mutable_internal(&item, - dht->pool, - &target, - v, - KS_TRUE, - k, - salt, - KS_TRUE, - sequence, - sig)) != KS_STATUS_SUCCESS) { - ks_dht_error(dht, - message->endpoint, - &message->raddr, - message->transactionid, - message->transactionid_length, - 202, - "Internal storage item create mutable error"); - goto done; + else { + ks_dht_storageitem_create_mutable_internal(&item, dht->pool, &target, v, KS_TRUE, k, salt, KS_TRUE, sequence, sig); + ks_assert(item); } } if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item); @@ -3024,8 +3162,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t //ks_log(KS_LOG_DEBUG, "Sending message response put\n"); ks_q_push(dht->send_q, (void *)response); + + //if (dht->rt_ipv4) ks_dht_distribute(dht, AF_INET, NULL, NULL, 0, olditem ? olditem : item); + //if (dht->rt_ipv6) ks_dht_distribute(dht, AF_INET6, NULL, NULL, 0, olditem ? olditem : item); done: + if (olditem) { + ks_dht_storageitem_dereference(olditem); + ks_mutex_unlock(olditem->mutex); + } + if (item) ks_dht_storageitem_dereference(item); if (ret != KS_STATUS_SUCCESS) { if (item) ks_hash_remove(dht->storageitems_hash, item->id.id); } @@ -3046,43 +3192,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t return ret; } -KS_DECLARE(ks_status_t) ks_dht_exec_search_findnode(ks_dht_t *dht, ks_dht_job_t *job) -{ - return ks_dht_search_findnode(dht, - job->query_family, - &job->query_target, - NULL, - NULL); -} - -KS_DECLARE(ks_status_t) ks_dht_queue_search_findnode(ks_dht_t* dht, - ks_dhtrt_routetable_t *rt, - ks_dht_nodeid_t *target, - ks_dht_job_callback_t callback) -{ - ks_dht_job_t *job = NULL; - ks_status_t ret = KS_STATUS_SUCCESS; - - ks_assert(dht); - ks_assert(rt); - ks_assert(target); - - ks_sockaddr_t taddr; /* just to satisfy the api */ - - if ((ret = ks_dht_job_create(&job, dht->pool, &taddr, 3)) == KS_STATUS_SUCCESS) { - - int32_t family = AF_INET; - - if (rt == dht->rt_ipv6) { - family = AF_INET6; - } - - ks_dht_job_build_search_findnode(job, target, family, ks_dht_exec_search_findnode, callback); - } - - return ret; -} - /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 019688682d..a4ca006c58 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -31,7 +31,6 @@ KS_BEGIN_EXTERN_C #define KS_DHT_TRANSACTION_EXPIRATION 10 #define KS_DHT_TRANSACTIONS_PULSE 1 -#define KS_DHT_SEARCH_EXPIRATION 10 #define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE #define KS_DHT_STORAGEITEM_PKEY_SIZE crypto_sign_PUBLICKEYBYTES @@ -39,6 +38,8 @@ KS_BEGIN_EXTERN_C #define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64 #define KS_DHT_STORAGEITEM_SIGNATURE_SIZE crypto_sign_BYTES #define KS_DHT_STORAGEITEM_EXPIRATION 7200 +#define KS_DHT_STORAGEITEM_KEEPALIVE 300 +#define KS_DHT_STORAGEITEMS_PULSE 10 #define KS_DHT_TOKEN_SIZE SHA_DIGEST_LENGTH #define KS_DHT_TOKEN_EXPIRATION 300 @@ -58,7 +59,8 @@ typedef struct ks_dht_message_s ks_dht_message_t; typedef struct ks_dht_endpoint_s ks_dht_endpoint_t; typedef struct ks_dht_transaction_s ks_dht_transaction_t; typedef struct ks_dht_search_s ks_dht_search_t; -typedef struct ks_dht_search_pending_s ks_dht_search_pending_t; +typedef struct ks_dht_publish_s ks_dht_publish_t; +typedef struct ks_dht_distribute_s ks_dht_distribute_t; typedef struct ks_dht_node_s ks_dht_node_t; typedef struct ks_dhtrt_routetable_s ks_dhtrt_routetable_t; typedef struct ks_dhtrt_querynodes_s ks_dhtrt_querynodes_t; @@ -67,7 +69,9 @@ typedef struct ks_dht_storageitem_s ks_dht_storageitem_t; typedef ks_status_t (*ks_dht_job_callback_t)(ks_dht_t *dht, ks_dht_job_t *job); typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message); -typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search); +//typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search); +typedef ks_status_t (*ks_dht_storageitem_callback_t)(ks_dht_t *dht, ks_dht_storageitem_t *item); + struct ks_dht_datagram_s { ks_pool_t *pool; @@ -115,11 +119,12 @@ enum ks_dht_job_state_t { KS_DHT_JOB_STATE_COMPLETING, }; -//enum ks_dht_job_type_t { -// KS_DHT_JOB_TYPE_NONE = 0, -// KS_DHT_JOB_TYPE_PING, -// KS_DHT_JOB_TYPE_FINDNODE, -//}; +enum ks_dht_job_result_t { + KS_DHT_JOB_RESULT_SUCCESS = 0, + KS_DHT_JOB_RESULT_EXPIRED, + KS_DHT_JOB_RESULT_ERROR, + KS_DHT_JOB_RESULT_FAILURE, +}; struct ks_dht_job_s { ks_pool_t *pool; @@ -127,8 +132,7 @@ struct ks_dht_job_s { ks_dht_job_t *next; enum ks_dht_job_state_t state; - - ks_dht_search_t *search; + enum ks_dht_job_result_t result; ks_sockaddr_t raddr; // will obtain local endpoint node id when creating message using raddr int32_t attempts; @@ -137,6 +141,7 @@ struct ks_dht_job_s { ks_dht_job_callback_t query_callback; ks_dht_job_callback_t finish_callback; + void *data; ks_dht_message_t *response; // job specific query parameters @@ -145,15 +150,22 @@ struct ks_dht_job_s { int64_t query_cas; ks_dht_token_t query_token; ks_dht_storageitem_t *query_storageitem; - uint32_t query_family; + int32_t query_family; + // error response parameters + int64_t error_code; + struct bencode *error_description; + // job specific response parameters - ks_dht_nodeid_t response_id; + ks_dht_node_t *response_id; ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE]; ks_size_t response_nodes_count; ks_dht_node_t *response_nodes6[KS_DHT_RESPONSE_NODES_MAX_SIZE]; ks_size_t response_nodes6_count; + ks_dht_token_t response_token; + int64_t response_seq; + ks_bool_t response_hasitem; ks_dht_storageitem_t *response_storageitem; }; @@ -219,25 +231,48 @@ struct ks_dht_transaction_s { struct ks_dht_search_s { ks_pool_t *pool; - ks_dht_search_t *next; + ks_dhtrt_routetable_t *table; ks_dht_nodeid_t target; - ks_dht_search_callback_t callback; + ks_dht_job_callback_t callback; + void *data; ks_mutex_t *mutex; ks_hash_t *searched; - ks_hash_t *searching; + int32_t searching; ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; ks_dht_nodeid_t distances[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; ks_size_t results_length; }; +struct ks_dht_publish_s { + ks_pool_t *pool; + ks_dht_job_callback_t callback; + void *data; + int64_t cas; + ks_dht_storageitem_t *item; +}; + +struct ks_dht_distribute_s { + ks_pool_t *pool; + ks_dht_storageitem_callback_t callback; + void *data; + ks_mutex_t *mutex; + int32_t publishing; + int64_t cas; + ks_dht_storageitem_t *item; +}; + struct ks_dht_storageitem_s { ks_pool_t *pool; ks_dht_nodeid_t id; ks_time_t expiration; + ks_time_t keepalive; struct bencode *v; - - ks_bool_t mutable; + ks_mutex_t *mutex; + volatile int32_t refc; + ks_dht_storageitem_callback_t callback; + + ks_bool_t mutable; ks_dht_storageitem_pkey_t pk; ks_dht_storageitem_skey_t sk; struct bencode *salt; @@ -282,15 +317,12 @@ struct ks_dht_s { ks_dhtrt_routetable_t *rt_ipv4; ks_dhtrt_routetable_t *rt_ipv6; - ks_mutex_t *searches_mutex; - ks_dht_search_t *searches_first; - ks_dht_search_t *searches_last; - ks_time_t tokens_pulse; volatile uint32_t token_secret_current; volatile uint32_t token_secret_previous; ks_time_t token_secret_expiration; + ks_time_t storageitems_pulse; ks_hash_t *storageitems_hash; }; @@ -395,7 +427,22 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate(ks_dht_storageitem int64_t sequence, const uint8_t *value, ks_size_t value_length); - + +/** + * + */ +KS_DECLARE(void) ks_dht_storageitem_reference(ks_dht_storageitem_t *item); + +/** + * + */ +KS_DECLARE(void) ks_dht_storageitem_dereference(ks_dht_storageitem_t *item); + +/** + * + */ +KS_DECLARE(void) ks_dht_storageitem_callback(ks_dht_storageitem_t *item, ks_dht_storageitem_callback_t callback); + /** * */ @@ -429,37 +476,38 @@ KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storage /** * */ -KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback); +KS_DECLARE(void) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, void *data); /** * */ -KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, - ks_dht_search_t *search, - const ks_sockaddr_t *raddr, - ks_dht_job_callback_t callback, - ks_dht_nodeid_t *target); +KS_DECLARE(void) ks_dht_findnode(ks_dht_t *dht, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + void *data, + ks_dht_nodeid_t *target); /** * */ -KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht, - ks_dht_search_t *search, - const ks_sockaddr_t *raddr, - ks_dht_job_callback_t callback, - ks_dht_nodeid_t *target, - uint8_t *salt, - ks_size_t salt_length); +KS_DECLARE(void) ks_dht_get(ks_dht_t *dht, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + void *data, + ks_dht_nodeid_t *target, + const uint8_t *salt, + ks_size_t salt_length); /** * */ -KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht, - const ks_sockaddr_t *raddr, - ks_dht_job_callback_t callback, - ks_dht_token_t *token, - int64_t cas, - ks_dht_storageitem_t *item); +KS_DECLARE(void) ks_dht_put(ks_dht_t *dht, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + void *data, + ks_dht_token_t *token, + int64_t cas, + ks_dht_storageitem_t *item); /** * Create a network search of the closest nodes to a target. @@ -468,27 +516,29 @@ KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht, * @param target pointer to the nodeid for the target to be searched * @param callback an optional callback to add to the search when it is finished * @param search dereferenced out pointer to the allocated search, may be NULL to ignore search output - * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL * @see ks_dht_search_create - * @see ks_dht_search_callback_add * @see ks_hash_insert - * @see ks_dht_search_pending_create - * @see ks_dht_send_findnode + * @see ks_dht_findnode */ -KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht, - int32_t family, - ks_dht_nodeid_t *target, - ks_dht_search_callback_t callback, - ks_dht_search_t **search); - -KS_DECLARE(ks_status_t) ks_dht_queue_search_findnode(ks_dht_t* dht, - ks_dhtrt_routetable_t *rt, - ks_dht_nodeid_t *target, - ks_dht_job_callback_t callback); - -KS_DECLARE(ks_status_t) ks_dht_exec_search_findnode(ks_dht_t *dht, ks_dht_job_t *job); +KS_DECLARE(void) ks_dht_search(ks_dht_t *dht, + ks_dht_job_callback_t callback, + void *data, + ks_dhtrt_routetable_t *table, + ks_dht_nodeid_t *target); +KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + void *data, + int64_t cas, + ks_dht_storageitem_t *item); +KS_DECLARE(void) ks_dht_distribute(ks_dht_t *dht, + ks_dht_storageitem_callback_t callback, + void *data, + ks_dhtrt_routetable_t *table, + int64_t cas, + ks_dht_storageitem_t *item); /** * route table methods diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index 382c8155e4..72063c8630 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -1673,7 +1673,7 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry #endif ks_dht_node_t* node = entry->gptr; ks_log(KS_LOG_DEBUG, "Node addr %s %d\n", node->addr.host, node->addr.port); - ks_dht_ping(internal->dht, &node->addr, NULL); + ks_dht_ping(internal->dht, &node->addr, NULL, NULL); return; } @@ -1683,7 +1683,7 @@ void ks_dhtrt_find(ks_dhtrt_routetable_t *table, ks_dhtrt_internal_t *internal, char buf[100]; ks_log(KS_LOG_DEBUG, "Find queued for target %s\n", ks_dhtrt_printableid(target->id, buf)); - ks_dht_queue_search_findnode(internal->dht, table, target, NULL); + ks_dht_search(internal->dht, NULL, NULL, table, target); return; } diff --git a/libs/libks/src/dht/ks_dht_distribute.c b/libs/libks/src/dht/ks_dht_distribute.c new file mode 100644 index 0000000000..24669ebfb8 --- /dev/null +++ b/libs/libks/src/dht/ks_dht_distribute.c @@ -0,0 +1,65 @@ +#include "ks_dht.h" +#include "ks_dht-int.h" +#include "sodium.h" + +KS_DECLARE(ks_status_t) ks_dht_distribute_create(ks_dht_distribute_t **distribute, + ks_pool_t *pool, + ks_dht_storageitem_callback_t callback, + void *data, + int64_t cas, + ks_dht_storageitem_t *item) +{ + ks_dht_distribute_t *d; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(distribute); + ks_assert(pool); + ks_assert(cas >= 0); + ks_assert(item); + + *distribute = d = ks_pool_alloc(pool, sizeof(ks_dht_distribute_t)); + ks_assert(d); + + d->pool = pool; + + d->callback = callback; + d->data = data; + ks_mutex_create(&d->mutex, KS_MUTEX_FLAG_DEFAULT, d->pool); + ks_assert(d->mutex); + d->cas = cas; + d->item = item; + + ks_dht_storageitem_reference(d->item); + + // done: + if (ret != KS_STATUS_SUCCESS) { + if (d) ks_dht_distribute_destroy(distribute); + } + return ret; +} + +KS_DECLARE(void) ks_dht_distribute_destroy(ks_dht_distribute_t **distribute) +{ + ks_dht_distribute_t *d; + + ks_assert(distribute); + ks_assert(*distribute); + + d = *distribute; + + if (d->mutex) ks_mutex_destroy(&d->mutex); + ks_dht_storageitem_dereference(d->item); + + ks_pool_free(d->pool, distribute); +} + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libks/src/dht/ks_dht_job.c b/libs/libks/src/dht/ks_dht_job.c index 7479a305fd..5aeaf08ec9 100644 --- a/libs/libks/src/dht/ks_dht_job.c +++ b/libs/libks/src/dht/ks_dht_job.c @@ -4,7 +4,8 @@ KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job, ks_pool_t *pool, const ks_sockaddr_t *raddr, - int32_t attempts) + int32_t attempts, + void *data) { ks_dht_job_t *j; ks_status_t ret = KS_STATUS_SUCCESS; @@ -12,7 +13,6 @@ KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job, ks_assert(job); ks_assert(pool); //ks_assert(dht); - ks_assert(raddr); ks_assert(attempts > 0 && attempts <= 10); *job = j = ks_pool_alloc(pool, sizeof(ks_dht_job_t)); @@ -20,13 +20,9 @@ KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job, j->pool = pool; j->state = KS_DHT_JOB_STATE_QUERYING; - j->raddr = *raddr; + if (raddr) j->raddr = *raddr; j->attempts = attempts; - - //ks_mutex_lock(dht->jobs_mutex); - //if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = j; - //else dht->jobs_first = dht->jobs_last = j; - //ks_mutex_unlock(dht->jobs_mutex); + j->data = data; // done: if (ret != KS_STATUS_SUCCESS) { @@ -45,7 +41,6 @@ KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t } KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job, - ks_dht_search_t *search, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback, ks_dht_nodeid_t *target) @@ -54,25 +49,22 @@ KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job, ks_assert(query_callback); ks_assert(target); - job->search = search; job->query_callback = query_callback; job->finish_callback = finish_callback; job->query_target = *target; } KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job, - ks_dht_search_t *search, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback, ks_dht_nodeid_t *target, - uint8_t *salt, + const uint8_t *salt, ks_size_t salt_length) { ks_assert(job); ks_assert(query_callback); ks_assert(target); - job->search = search; job->query_callback = query_callback; job->finish_callback = finish_callback; job->query_target = *target; @@ -96,23 +88,18 @@ KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job, job->query_token = *token; job->query_cas = cas; job->query_storageitem = item; + ks_dht_storageitem_reference(job->query_storageitem); } -KS_DECLARE(void) ks_dht_job_build_search_findnode(ks_dht_job_t *job, - ks_dht_nodeid_t *target, - uint32_t family, - ks_dht_job_callback_t query_callback, - ks_dht_job_callback_t finish_callback) +KS_DECLARE(void) ks_dht_job_build_search(ks_dht_job_t *job, + ks_dht_job_callback_t query_callback, + ks_dht_job_callback_t finish_callback) { ks_assert(job); - ks_assert(target); - ks_assert(family); + ks_assert(query_callback); - job->search = NULL; job->query_callback = query_callback; job->finish_callback = finish_callback; - job->query_target = *target; - job->query_family = family; } KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job) @@ -125,9 +112,15 @@ KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job) j = *job; if (j->query_salt) ben_free(j->query_salt); + if (j->response_id) ks_dhtrt_release_node(j->response_id); for (int32_t i = 0; i < j->response_nodes_count; ++i) ks_dhtrt_release_node(j->response_nodes[i]); for (int32_t i = 0; i < j->response_nodes6_count; ++i) ks_dhtrt_release_node(j->response_nodes6[i]); + if (j->query_storageitem) ks_dht_storageitem_dereference(j->query_storageitem); + if (j->response_storageitem) ks_dht_storageitem_dereference(j->response_storageitem); + + if (j->error_description) ben_free(j->error_description); + ks_pool_free(j->pool, job); } diff --git a/libs/libks/src/dht/ks_dht_publish.c b/libs/libks/src/dht/ks_dht_publish.c new file mode 100644 index 0000000000..8affdd5a40 --- /dev/null +++ b/libs/libks/src/dht/ks_dht_publish.c @@ -0,0 +1,62 @@ +#include "ks_dht.h" +#include "ks_dht-int.h" +#include "sodium.h" + +KS_DECLARE(ks_status_t) ks_dht_publish_create(ks_dht_publish_t **publish, + ks_pool_t *pool, + ks_dht_job_callback_t callback, + void *data, + int64_t cas, + ks_dht_storageitem_t *item) +{ + ks_dht_publish_t *p; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(publish); + ks_assert(pool); + ks_assert(cas >= 0); + ks_assert(item); + + *publish = p = ks_pool_alloc(pool, sizeof(ks_dht_publish_t)); + ks_assert(p); + + p->pool = pool; + + p->callback = callback; + p->data = data; + p->cas = cas; + p->item = item; + + ks_dht_storageitem_reference(p->item); + + // done: + if (ret != KS_STATUS_SUCCESS) { + if (p) ks_dht_publish_destroy(publish); + } + return ret; +} + +KS_DECLARE(void) ks_dht_publish_destroy(ks_dht_publish_t **publish) +{ + ks_dht_publish_t *p; + + ks_assert(publish); + ks_assert(*publish); + + p = *publish; + + ks_dht_storageitem_dereference(p->item); + + ks_pool_free(p->pool, publish); +} + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libks/src/dht/ks_dht_search.c b/libs/libks/src/dht/ks_dht_search.c index 313856d09a..5569d78425 100644 --- a/libs/libks/src/dht/ks_dht_search.c +++ b/libs/libks/src/dht/ks_dht_search.c @@ -2,13 +2,19 @@ #include "ks_dht-int.h" #include "sodium.h" -KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback) +KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, + ks_pool_t *pool, + ks_dhtrt_routetable_t *table, + const ks_dht_nodeid_t *target, + ks_dht_job_callback_t callback, + void *data) { ks_dht_search_t *s; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(search); ks_assert(pool); + ks_assert(table); ks_assert(target); *search = s = ks_pool_alloc(pool, sizeof(ks_dht_search_t)); @@ -19,17 +25,17 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t ks_mutex_create(&s->mutex, KS_MUTEX_FLAG_DEFAULT, s->pool); ks_assert(s->mutex); + s->table = table; memcpy(s->target.id, target->id, KS_DHT_NODEID_SIZE); s->callback = callback; + s->data = data; ks_hash_create(&s->searched, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool); ks_assert(s->searched); ks_hash_set_keysize(s->searched, KS_DHT_NODEID_SIZE); - ks_hash_create(&s->searching, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool); - ks_assert(s->searching); - ks_hash_set_keysize(s->searching, KS_DHT_NODEID_SIZE); + s->searching = 0; // done: if (ret != KS_STATUS_SUCCESS) { @@ -47,7 +53,6 @@ KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search) s = *search; - if (s->searching) ks_hash_destroy(&s->searching); if (s->searched) ks_hash_destroy(&s->searched); if (s->mutex) ks_mutex_destroy(&s->mutex); diff --git a/libs/libks/src/dht/ks_dht_storageitem.c b/libs/libks/src/dht/ks_dht_storageitem.c index 42bffa0319..175e000968 100644 --- a/libs/libks/src/dht/ks_dht_storageitem.c +++ b/libs/libks/src/dht/ks_dht_storageitem.c @@ -23,9 +23,12 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable_internal(ks_dht_stor si->id = *target; si->mutable = KS_FALSE; si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); + si->keepalive = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_KEEPALIVE * KS_USEC_PER_SEC); si->v = clone_v ? ben_clone(v) : v; ks_assert(si->v); + si->refc = 1; + // done: if (ret != KS_STATUS_SUCCESS) { if (si) ks_dht_storageitem_destroy(item); @@ -81,9 +84,12 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable_internal(ks_dht_storag si->id = *target; si->mutable = KS_TRUE; si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); + si->keepalive = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_KEEPALIVE * KS_USEC_PER_SEC); si->v = clone_v ? ben_clone(v) : v; ks_assert(si->v); + si->refc = 1; + ks_mutex_create(&si->mutex, KS_MUTEX_FLAG_DEFAULT, si->pool); ks_assert(si->mutex); @@ -169,6 +175,33 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item) ks_pool_free(si->pool, item); } +KS_DECLARE(void) ks_dht_storageitem_reference(ks_dht_storageitem_t *item) +{ + ks_assert(item); + + ks_mutex_lock(item->mutex); + item->refc++; + ks_mutex_unlock(item->mutex); +} + +KS_DECLARE(void) ks_dht_storageitem_dereference(ks_dht_storageitem_t *item) +{ + ks_assert(item); + + ks_mutex_lock(item->mutex); + item->refc--; + ks_mutex_unlock(item->mutex); + + ks_assert(item->refc >= 0); +} + +KS_DECLARE(void) ks_dht_storageitem_callback(ks_dht_storageitem_t *item, ks_dht_storageitem_callback_t callback) +{ + ks_assert(item); + + item->callback = callback; +} + /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/src/ks_thread_pool.c b/libs/libks/src/ks_thread_pool.c index 8368ebdd24..0ce9fdff84 100644 --- a/libs/libks/src/ks_thread_pool.c +++ b/libs/libks/src/ks_thread_pool.c @@ -102,10 +102,10 @@ static int check_queue(ks_thread_pool_t *tp, ks_bool_t adding) need--; } - + /* ks_log(KS_LOG_DEBUG, "WORKER check: adding %d need %d running %d dying %d total %d max %d\n", adding, need, tp->running_thread_count, tp->dying_thread_count, tp->thread_count, tp->max); - + */ return need; } @@ -129,10 +129,10 @@ static void *worker_thread(ks_thread_t *thread, void *data) ks_status_t status; status = ks_q_pop_timeout(tp->q, &pop, 1000); - + /* ks_log(KS_LOG_DEBUG, "WORKER %d idle_sec %d running %d dying %d total %d max %d\n", my_id, idle_sec, tp->running_thread_count, tp->dying_thread_count, tp->thread_count, tp->max); - + */ check_queue(tp, KS_FALSE); if (status == KS_STATUS_TIMEOUT) { diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index 28b79126d4..fd4ab7964c 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -6,6 +6,18 @@ ks_dht_storageitem_skey_t sk; ks_dht_storageitem_pkey_t pk; +ks_status_t dht2_updated_callback(ks_dht_t *dht, ks_dht_storageitem_t *item) +{ + diag("dht2_updated_callback\n"); + return KS_STATUS_SUCCESS; +} + +ks_status_t dht2_distribute_callback(ks_dht_t *dht, ks_dht_storageitem_t *item) +{ + diag("dht2_distribute_callback\n"); + return KS_STATUS_SUCCESS; +} + ks_status_t dht2_put_callback(ks_dht_t *dht, ks_dht_job_t *job) { diag("dht2_put_callback\n"); @@ -28,13 +40,14 @@ ks_status_t dht2_get_token_callback(ks_dht_t *dht, ks_dht_job_t *job) mutable->sk = sk; ks_dht_storageitems_insert(dht, mutable); - ks_dht_put(dht, &job->raddr, dht2_put_callback, &job->response_token, 0, mutable); + ks_dht_put(dht, &job->raddr, dht2_put_callback, NULL, &job->response_token, 0, mutable); return KS_STATUS_SUCCESS; } -ks_status_t dht2_search_findnode_callback(ks_dht_t *dht, ks_dht_search_t *search) +ks_status_t dht2_search_callback(ks_dht_t *dht, ks_dht_job_t *job) { - diag("dht2_search_findnode_callback %d\n", search->results_length); + ks_dht_search_t *search = (ks_dht_search_t *)job->data; + diag("dht2_search_callback %d\n", search->results_length); return KS_STATUS_SUCCESS; } @@ -54,11 +67,12 @@ int main() { ks_sockaddr_t raddr1; //ks_sockaddr_t raddr2; //ks_sockaddr_t raddr3; - //ks_dht_nodeid_t target; + ks_dht_nodeid_t target; //ks_dht_storageitem_t *immutable = NULL; - //ks_dht_storageitem_t *mutable = NULL; - //const char *v = "Hello World!"; - //size_t v_len = strlen(v); + ks_dht_storageitem_t *mutable1 = NULL; + ks_dht_storageitem_t *mutable2 = NULL; + const char *v = "Hello World!"; + size_t v_len = strlen(v); //ks_dht_storageitem_skey_t sk; //= { { 0xe0, 0x6d, 0x31, 0x83, 0xd1, 0x41, 0x59, 0x22, 0x84, 0x33, 0xed, 0x59, 0x92, 0x21, 0xb8, 0x0b, //0xd0, 0xa5, 0xce, 0x83, 0x52, 0xe4, 0xbd, 0xf0, 0x26, 0x2f, 0x76, 0x78, 0x6e, 0xf1, 0xc7, 0x4d, //0xb7, 0xe7, 0xa9, 0xfe, 0xa2, 0xc0, 0xeb, 0x26, 0x9d, 0x61, 0xe3, 0xb3, 0x8e, 0x45, 0x0a, 0x22, @@ -67,7 +81,7 @@ int main() { //0x24, 0x32, 0xfc, 0xd9, 0x04, 0xa4, 0x35, 0x11, 0x87, 0x6d, 0xf5, 0xcd, 0xf3, 0xe7, 0xe5, 0x48 } }; //uint8_t sk1[KS_DHT_STORAGEITEM_SKEY_SIZE]; //uint8_t pk1[KS_DHT_STORAGEITEM_PKEY_SIZE]; - //ks_dht_storageitem_signature_t sig; + ks_dht_storageitem_signature_t sig; //char sk_buf[KS_DHT_STORAGEITEM_SKEY_SIZE * 2 + 1]; //char pk_buf[KS_DHT_STORAGEITEM_PKEY_SIZE * 2 + 1]; //const char *test1vector = "3:seqi1e1:v12:Hello World!"; @@ -155,7 +169,7 @@ int main() { diag("Ping test\n"); - ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING) + ks_dht_ping(dht2, &raddr1, NULL, NULL); // (QUERYING) ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING) @@ -178,7 +192,7 @@ int main() { ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good - ks_dht_ping(dht3, &raddr1, NULL); // (QUERYING) + ks_dht_ping(dht3, &raddr1, NULL, NULL); // (QUERYING) ks_dht_pulse(dht3, 100); // Send queued ping from dht3 to dht1 (RESPONDING) @@ -193,80 +207,19 @@ int main() { ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING) diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up - for (int i = 0; i < 20; ++i) { + for (int i = 0; i < 10; ++i) { ks_dht_pulse(dht1, 100); ks_dht_pulse(dht2, 100); ks_dht_pulse(dht3, 100); } ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good - //diag("Get test\n"); - - - /* - ks_dht_storageitem_target_immutable((uint8_t *)v, v_len, &target); - ks_dht_storageitem_create_immutable(&immutable, dht1->pool, &target, (uint8_t *)v, v_len); - ks_dht_storageitems_insert(dht1, immutable); - */ - - /* - crypto_sign_keypair(pk.key, sk.key); - - ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len); - ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target); - ks_dht_storageitem_create_mutable(&mutable, dht1->pool, &target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig); - mutable->sk = sk; - ks_dht_storageitems_insert(dht1, mutable); - - ks_dht_get(dht2, &raddr1, dht2_get_callback, &target, NULL, 0); - - ks_dht_pulse(dht2, 100); // send get query - - ks_dht_pulse(dht1, 100); // receive get query and send get response - - ks_dht_pulse(dht2, 100); // receive get response - - ok(ks_dht_storageitems_find(dht2, &target) != NULL); // item should be verified and stored - - ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING) - */ - - /* - diag("Put test\n"); - - crypto_sign_keypair(pk.key, sk.key); - - ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target); - - ks_dht_get(dht2, NULL, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job - - ks_dht_pulse(dht2, 100); // send get query - - ks_dht_pulse(dht1, 100); // receive get query and send get response - - ks_dht_pulse(dht2, 100); // receive get response - - ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING), send put query - - ks_dht_pulse(dht1, 100); // receive put query and send put response - - ks_dht_pulse(dht2, 100); // receive put response - - ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING) - - for (int i = 0; i < 10; ++i) { - ks_dht_pulse(dht1, 100); - ks_dht_pulse(dht2, 100); - ks_dht_pulse(dht3, 100); - } - */ - // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid /* diag("Find_Node test\n"); - ks_dht_findnode(dht3, NULL, &raddr1, NULL, &ep2->nodeid); + ks_dht_findnode(dht3, NULL, &raddr1, NULL, NULL, &ep2->nodeid); ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1 @@ -290,14 +243,129 @@ int main() { */ diag("Search test\n"); - ks_dht_search_findnode(dht3, AF_INET, &ep2->nodeid, dht2_search_findnode_callback, NULL); + + ks_dht_search(dht3, dht2_search_callback, NULL, dht3->rt_ipv4, &ep2->nodeid); diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up + for (int i = 0; i < 20; ++i) { + ks_dht_pulse(dht1, 100); + ks_dht_pulse(dht2, 100); + ks_dht_pulse(dht3, 100); + } + + //diag("Get test\n"); + + + /* + ks_dht_storageitem_target_immutable((uint8_t *)v, v_len, &target); + ks_dht_storageitem_create_immutable(&immutable, dht1->pool, &target, (uint8_t *)v, v_len); + ks_dht_storageitems_insert(dht1, immutable); + */ + + /* + crypto_sign_keypair(pk.key, sk.key); + + ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len); + ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target); + ks_dht_storageitem_create_mutable(&mutable, dht1->pool, &target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig); + mutable->sk = sk; + ks_dht_storageitems_insert(dht1, mutable); + + ks_dht_get(dht2, &raddr1, dht2_get_callback, NULL, &target, NULL, 0); + + ks_dht_pulse(dht2, 100); // send get query + + ks_dht_pulse(dht1, 100); // receive get query and send get response + + ks_dht_pulse(dht2, 100); // receive get response + + ok(ks_dht_storageitems_find(dht2, &target) != NULL); // item should be verified and stored + + ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING) + */ + + /* + diag("Put test\n"); + + crypto_sign_keypair(pk.key, sk.key); + + ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target); + + ks_dht_get(dht2, &raddr1, dht2_get_token_callback, NULL, &target, NULL, 0); // create job + + for (int i = 0; i < 20; ++i) { + ks_dht_pulse(dht1, 100); + ks_dht_pulse(dht2, 100); + ks_dht_pulse(dht3, 100); + } + */ + + /* + diag("Publish test\n"); + + crypto_sign_keypair(pk.key, sk.key); + + ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target); + + ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len); + + ks_dht_storageitem_create_mutable(&mutable, dht2->pool, &target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig); + mutable->sk = sk; + ks_dht_storageitems_insert(dht2, mutable); + + ks_dht_publish(dht2, &raddr1, dht2_put_callback, NULL, 0, mutable); // create job + + for (int i = 0; i < 20; ++i) { + ks_dht_pulse(dht1, 100); + ks_dht_pulse(dht2, 100); + ks_dht_pulse(dht3, 100); + } + */ + + + diag("Distribute test\n"); + + crypto_sign_keypair(pk.key, sk.key); + + ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target); + + ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len); + + ks_dht_storageitem_create_mutable(&mutable2, dht2->pool, &target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig); + mutable2->sk = sk; + ks_dht_storageitems_insert(dht2, mutable2); + + ks_dht_distribute(dht2, dht2_distribute_callback, NULL, dht2->rt_ipv4, 0, mutable2); // create job + for (int i = 0; i < 30; ++i) { ks_dht_pulse(dht1, 100); ks_dht_pulse(dht2, 100); ks_dht_pulse(dht3, 100); } + ks_dht_storageitem_dereference(mutable2); + ok(mutable2->refc == 0); + mutable1 = ks_dht_storageitems_find(dht1, &target); + ok(mutable1 != NULL); + + ks_dht_storageitem_callback(mutable1, dht2_updated_callback); + ks_dht_storageitem_callback(mutable2, dht2_updated_callback); + + ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 2, (uint8_t *)v, v_len); + mutable1->seq = 2; + mutable1->sig = sig; + + //ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 2, (uint8_t *)v, v_len); + //mutable2->seq = 2; + //mutable2->sig = sig; + + ks_dht_distribute(dht2, dht2_distribute_callback, NULL, dht2->rt_ipv4, 0, mutable2); + for (int i = 0; i < 30; ++i) { + ks_dht_pulse(dht1, 100); + ks_dht_pulse(dht2, 100); + ks_dht_pulse(dht3, 100); + } + ks_dht_storageitem_dereference(mutable1); + /* Cleanup and shutdown */ diag("Cleanup\n");