From c4ed130073f2ae485e8e40073f0ee3c449b3256e Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Wed, 28 Dec 2016 00:52:10 +0000 Subject: [PATCH] FS-9775: First tested pass on search functionality, not tested with deep searching at multiple levels --- libs/libks/src/dht/ks_dht-int.h | 9 +- libs/libks/src/dht/ks_dht.c | 409 ++++++++++++----------------- libs/libks/src/dht/ks_dht.h | 46 ++-- libs/libks/src/dht/ks_dht_job.c | 4 + libs/libks/src/dht/ks_dht_search.c | 84 +----- libs/libks/test/testdht2.c | 88 +++++-- 6 files changed, 270 insertions(+), 370 deletions(-) diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 89e3643ee5..fc71cddacf 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -258,10 +258,12 @@ KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job, int32_t attempts); KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback); KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job, + ks_dht_search_t *search, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback, ks_dht_nodeid_t *target); KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job, + ks_dht_search_t *search, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback, ks_dht_nodeid_t *target, @@ -316,14 +318,9 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, /** * */ -KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target); +KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback); KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search); -KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback); -KS_DECLARE(void) ks_dht_search_expire(ks_dht_search_t *search, ks_hash_t *pending, int32_t *active); - -KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid); -KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending); /** * diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index 28c9815947..056584943e 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -154,19 +154,11 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread d->rt_ipv6 = NULL; /** - * Create the hash to store searches. + * Create the mutex to handle searches list. */ - ks_hash_create(&d->searches4_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); - ks_assert(d->searches4_hash); - ks_hash_create(&d->searches6_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); - ks_assert(d->searches6_hash); + ks_mutex_create(&d->searches_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool); + ks_assert(d->searches_mutex); - /** - * The searches hash uses arbitrary key size, which requires the key size be provided. - */ - ks_hash_set_keysize(d->searches4_hash, KS_DHT_NODEID_SIZE); - ks_hash_set_keysize(d->searches6_hash, KS_DHT_NODEID_SIZE); - /** * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets. */ @@ -227,25 +219,12 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) d->token_secret_expiration = 0; /** - * Cleanup the search hash and it's contents if it is allocated. + * Cleanup the search mutex and searches if they are allocated. */ - if (d->searches6_hash) { - for (it = ks_hash_first(d->searches6_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - const void *key = NULL; - ks_dht_search_t *val = NULL; - ks_hash_this(it, &key, NULL, (void **)&val); - ks_dht_search_destroy(&val); - } - ks_hash_destroy(&d->searches6_hash); - } - if (d->searches4_hash) { - for (it = ks_hash_first(d->searches4_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - const void *key = NULL; - ks_dht_search_t *val = NULL; - ks_hash_this(it, &key, NULL, (void **)&val); - ks_dht_search_destroy(&val); - } - ks_hash_destroy(&d->searches4_hash); + if (d->searches_mutex) ks_mutex_destroy(&d->searches_mutex); + for (ks_dht_search_t *search = d->searches_first, *searchn = NULL; search; search = searchn) { + searchn = search->next; + ks_dht_search_destroy(&search); } /** @@ -623,57 +602,11 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6); } -KS_DECLARE(void) ks_dht_pulse_expirations_searches(ks_dht_t *dht, ks_hash_t *searches) -{ - ks_hash_iterator_t *it = NULL; - ks_time_t now = ks_time_now(); - - ks_assert(dht); - ks_assert(searches); - - ks_hash_write_lock(searches); - for (it = ks_hash_first(searches, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - const void *key = NULL; - ks_dht_search_t *value = NULL; - int32_t active = 0; - - ks_hash_this(it, &key, NULL, (void **)&value); - - ks_mutex_lock(value->mutex); - for (ks_hash_iterator_t *i = ks_hash_first(value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) { - const void *k = NULL; - ks_dht_search_pending_t *v = NULL; - - ks_hash_this(i, &k, NULL, (void **)&v); - - if (v->finished) continue; - - if (v->expiration <= now) { - char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; - char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1]; - ks_log(KS_LOG_DEBUG, - "Search for %s pending find_node to %s has expired without response\n", - ks_dht_hex(value->target.id, id_buf, KS_DHT_NODEID_SIZE), - ks_dht_hex(v->nodeid.id, id2_buf, KS_DHT_NODEID_SIZE)); - v->finished = KS_TRUE; - continue; - } - active++; - } - ks_mutex_unlock(value->mutex); - - if (active == 0) { - for (int32_t index = 0; index < value->callbacks_size; ++index) value->callbacks[index](dht, value); - ks_hash_remove(searches, (void *)key); - ks_dht_search_destroy(&value); - } - } - ks_hash_write_unlock(searches); -} - KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) { ks_hash_iterator_t *it = NULL; + ks_dht_search_t *searches_first = NULL; + ks_dht_search_t *searches_last = NULL; ks_time_t now = ks_time_now(); ks_assert(dht); @@ -702,16 +635,44 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) } ks_hash_write_unlock(dht->transactions_hash); - if (dht->rt_ipv4) ks_dht_pulse_expirations_searches(dht, dht->searches4_hash); - if (dht->rt_ipv6) ks_dht_pulse_expirations_searches(dht, dht->searches6_hash); + ks_mutex_lock(dht->searches_mutex); + for (ks_dht_search_t *search = dht->searches_first, *searchn = NULL, *searchp = NULL; search; search = searchn) { + ks_bool_t done = KS_FALSE; + searchn = search->next; - // @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?) + ks_mutex_lock(search->mutex); + done = ks_hash_count(search->searching) == 0; + + if (done) { + if (!searchp && !searchn) dht->searches_first = dht->searches_last = NULL; + else if (!searchp) dht->searches_first = searchn; + else if (!searchn) { + dht->searches_last = searchp; + dht->searches_last->next = NULL; + } + else searchp->next = searchn; + + search->next = NULL; + if (searches_last) searches_last = searches_last->next = search; + else searches_first = searches_last = search; + } else searchp = search; + ks_mutex_unlock(search->mutex); + } + ks_mutex_unlock(dht->searches_mutex); + + for (ks_dht_search_t *search = searches_first, *searchn = NULL; search; search = searchn) { + searchn = search->next; + if (search->callback) search->callback(dht, search); + ks_dht_search_destroy(&search); + } if (dht->token_secret_expiration && dht->token_secret_expiration <= now) { dht->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC); dht->token_secret_previous = dht->token_secret_current; dht->token_secret_current = rand(); } + + // @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?) } KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht) @@ -1270,7 +1231,12 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) return KS_STATUS_FAIL; } - ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port); + ks_log(KS_LOG_DEBUG, + "Sending message to %s %d on %s %d\n", + message->raddr.host, + message->raddr.port, + message->endpoint->addr.host, + message->endpoint->addr.port); ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data)); return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr); @@ -1398,7 +1364,12 @@ KS_DECLARE(void *) ks_dht_process(ks_thread_t *thread, void *data) ks_assert(thread); ks_assert(data); - ks_log(KS_LOG_DEBUG, "Received message from %s %d\n", datagram->raddr.host, datagram->raddr.port); + ks_log(KS_LOG_DEBUG, + "Received message from %s %d on %s %d\n", + datagram->raddr.host, + datagram->raddr.port, + datagram->endpoint->addr.host, + datagram->endpoint->addr.port); if (datagram->raddr.family != AF_INET && datagram->raddr.family != AF_INET6) { ks_log(KS_LOG_DEBUG, "Message from unsupported address family\n"); goto done; @@ -1475,7 +1446,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me KS_DHT_REMOTE, message->raddr.host, message->raddr.port, - KS_DHTRT_CREATE_DEFAULT, + KS_DHTRT_CREATE_PING, &node)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; @@ -1521,7 +1492,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t KS_DHT_REMOTE, message->raddr.host, message->raddr.port, - KS_DHTRT_CREATE_DEFAULT, + KS_DHTRT_CREATE_TOUCH, &node)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; @@ -1546,6 +1517,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t transaction->job->raddr.port); } else { transaction->job->response = message; + transaction->job->response_id = message->args_id; message->transaction = transaction; if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING; else transaction->job->state = KS_DHT_JOB_STATE_COMPLETING; @@ -1558,20 +1530,85 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t return ret; } +KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_job_t *job) +{ + ks_dht_node_t **nodes = NULL; + ks_size_t nodes_count = 0; + ks_dht_node_t *node = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; -KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, - int32_t family, - ks_dht_nodeid_t *target, - ks_dht_search_callback_t callback, - ks_dht_search_t **search) + ks_assert(dht); + ks_assert(job); + ks_assert(job->search); + + ks_mutex_lock(job->search->mutex); + ks_hash_remove(job->search->searching, job->response_id.id); + + nodes = job->raddr.family == AF_INET ? job->response_nodes : job->response_nodes6; + nodes_count = job->raddr.family == AF_INET ? job->response_nodes_count : job->response_nodes6_count; + + for (int32_t i = 0; i < nodes_count; ++i) { + ks_dht_nodeid_t distance; + int32_t results_index = -1; + + node = nodes[i]; + + if (ks_hash_search(job->search->searched, node->nodeid.id, KS_UNLOCKED) != 0) continue; + + ks_dht_utility_nodeid_xor(&distance, &node->nodeid, &job->search->target); + if (job->search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) { + results_index = job->search->results_length; + job->search->results_length++; + } else { + for (int32_t index = 0; index < job->search->results_length; ++index) { + // Check if new node is closer than this previous result + if (memcmp(distance.id, job->search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) { + // If this is the first node that is further then keep it + // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result + if (results_index < 0) results_index = index; + else if (memcmp(job->search->distances[index].id, job->search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index; + } + } + } + + if (results_index >= 0) { + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + + ks_log(KS_LOG_DEBUG, + "Set closer node id %s (%s) in search of target id %s at results index %d\n", + ks_dht_hex(node->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), + ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE), + ks_dht_hex(job->search->target.id, id3_buf, KS_DHT_NODEID_SIZE), + results_index); + // @todo add lock on node + job->search->results[results_index] = node; + job->search->distances[results_index] = distance; + + ks_hash_insert(job->search->searched, node->nodeid.id, (void *)KS_TRUE); + ks_hash_insert(job->search->searching, node->nodeid.id, (void *)KS_TRUE); + + if ((ret = ks_dht_findnode(dht, job->search, &node->addr, ks_dht_search_findnode_callback, &job->search->target)) != KS_STATUS_SUCCESS) goto done; + } + } + + done: + ks_mutex_unlock(job->search->mutex); + + return ret; +} + +KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht, + int32_t family, + ks_dht_nodeid_t *target, + ks_dht_search_callback_t callback, + ks_dht_search_t **search) { ks_bool_t locked_searches = KS_FALSE; ks_bool_t locked_search = KS_FALSE; - ks_hash_t *searches = NULL; ks_dhtrt_routetable_t *rt = NULL; ks_dht_search_t *s = NULL; - ks_bool_t inserted = KS_FALSE; - ks_bool_t allocated = KS_FALSE; ks_dhtrt_querynodes_t query; ks_status_t ret = KS_STATUS_SUCCESS; @@ -1586,46 +1623,28 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ret = KS_STATUS_FAIL; goto done; } - searches = dht->searches4_hash; rt = dht->rt_ipv4; } else { if (!dht->rt_ipv6) { ret = KS_STATUS_FAIL; goto done; } - searches = dht->searches6_hash; rt = dht->rt_ipv6; } - // check hash for target to see if search already exists - ks_hash_write_lock(searches); + ks_mutex_lock(dht->searches_mutex); locked_searches = KS_TRUE; - s = ks_hash_search(searches, target->id, KS_UNLOCKED); + if ((ret = ks_dht_search_create(&s, dht->pool, target, callback)) != KS_STATUS_SUCCESS) goto done; - // if search does not exist, create new search and store in hash by target - if (!s) { - if ((ret = ks_dht_search_create(&s, dht->pool, target)) != KS_STATUS_SUCCESS) goto done; - allocated = KS_TRUE; - } else inserted = KS_TRUE; - - // add callback regardless of whether the search is new or old - if ((ret = ks_dht_search_callback_add(s, callback)) != KS_STATUS_SUCCESS) goto done; - - // if the search is old then bail out and return successfully - if (!allocated) goto done; - - // everything past this point until final cleanup is only for when a search of the target does not already exist - - if ((ret = ks_hash_insert(searches, s->target.id, s)) == KS_STATUS_SUCCESS) goto done; - inserted = KS_TRUE; - - // lock search before unlocking the searches_hash to prevent this search from being used before we finish setting it up + if (dht->searches_last) dht->searches_last = dht->searches_last->next = s; + else dht->searches_first = dht->searches_last = s; + ks_mutex_lock(s->mutex); locked_search = KS_TRUE; - // release searches_hash lock now, but search is still locked - ks_hash_write_unlock(searches); + // release searches lock now, but search is still locked + ks_mutex_unlock(dht->searches_mutex); locked_searches = KS_FALSE; // find closest good nodes to target locally and store as the closest results @@ -1637,36 +1656,32 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dhtrt_findclosest_nodes(rt, &query); for (int32_t i = 0; i < query.count; ++i) { ks_dht_node_t *n = query.nodes[i]; - ks_dht_search_pending_t *pending = NULL; + ks_bool_t searched = KS_FALSE; // always take the initial local closest good nodes as results, they are already good nodes that are closest with no results yet - s->results[s->results_length] = n->nodeid; + s->results[s->results_length] = n; ks_dht_utility_nodeid_xor(&s->distances[s->results_length], &n->nodeid, &s->target); s->results_length++; - pending = ks_hash_search(s->pending, n->nodeid.id, KS_UNLOCKED); - if (pending) continue; // skip duplicates, this really shouldn't happen on a new search but we sanity check + searched = ks_hash_search(s->searched, n->nodeid.id, KS_UNLOCKED) != 0; + if (searched) continue; // skip duplicates, this really shouldn't happen on a new search but we sanity check - // add to pending with expiration, if any of this fails it's almost catastrophic so just bail out and fail the entire search attempt - // there are no probable causes for a failure but check them anyway - if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_hash_insert(s->pending, n->nodeid.id, pending)) != KS_STATUS_SUCCESS) { - ks_dht_search_pending_destroy(&pending); - goto done; - } - if ((ret = ks_dht_findnode(dht, &n->addr, NULL, target)) != KS_STATUS_SUCCESS) goto done; + ks_hash_insert(s->searched, n->nodeid.id, (void *)KS_TRUE); + ks_hash_insert(s->searching, n->nodeid.id, (void *)KS_TRUE); + + if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) goto done; } - ks_dhtrt_release_querynodes(&query); + //ks_dhtrt_release_querynodes(&query); ks_mutex_unlock(s->mutex); locked_search = KS_FALSE; if (search) *search = s; done: - if (locked_searches) ks_hash_write_unlock(searches); + if (locked_searches) ks_mutex_unlock(dht->searches_mutex); if (locked_search) ks_mutex_unlock(s->mutex); if (ret != KS_STATUS_SUCCESS) { - if (!inserted && s) ks_dht_search_destroy(&s); + //if (s) ks_dht_search_destroy(&s); *search = NULL; } return ret; @@ -1833,7 +1848,6 @@ KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job) { ks_assert(dht); ks_assert(job); - ks_mutex_lock(dht->jobs_mutex); if (dht->jobs_last) dht->jobs_last = dht->jobs_last->next = job; else dht->jobs_first = dht->jobs_last = job; @@ -1851,6 +1865,7 @@ KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht) for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) { ks_bool_t done = KS_FALSE; jobn = job->next; + if (job->state == KS_DHT_JOB_STATE_QUERYING) { job->state = KS_DHT_JOB_STATE_RESPONDING; if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING; @@ -1865,7 +1880,10 @@ KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht) if (done) { if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL; else if (!jobp) dht->jobs_first = jobn; - else if (!jobn) dht->jobs_last = jobp; + else if (!jobn) { + dht->jobs_last = jobp; + dht->jobs_last->next = NULL; + } else jobp->next = jobn; job->next = NULL; @@ -1966,7 +1984,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t } -KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target) +KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, + ks_dht_search_t *search, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + ks_dht_nodeid_t *target) { ks_dht_job_t *job = NULL; ks_status_t ret = KS_STATUS_SUCCESS; @@ -1976,7 +1998,7 @@ KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *radd ks_assert(target); if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done; - ks_dht_job_build_findnode(job, ks_dht_query_findnode, callback, target); + ks_dht_job_build_findnode(job, search, ks_dht_query_findnode, callback, target); ks_dht_jobs_add(dht, job); // next step in ks_dht_pulse_jobs with QUERYING state @@ -2132,8 +2154,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j size_t nodes6_size = 0; size_t nodes_len = 0; size_t nodes6_len = 0; - ks_hash_t *searches = NULL; - ks_dht_search_t *search = NULL; ks_dht_node_t *node = NULL; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_status_t ret = KS_STATUS_SUCCESS; @@ -2154,19 +2174,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j nodes6_size = ben_str_len(n); } - searches = job->response->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash; - - ks_hash_read_lock(searches); - search = ks_hash_search(searches, job->query_target.id, KS_UNLOCKED); - if (search) { - ks_dht_search_pending_t *pending = NULL; - - ks_mutex_lock(search->mutex); - pending = ks_hash_search(search->pending, job->response->args_id.id, KS_UNLOCKED); - if (pending) pending->finished = KS_TRUE; - } - ks_hash_read_unlock(searches); - while (nodes_len < nodes_size) { ks_dht_nodeid_t nid; ks_sockaddr_t addr; @@ -2181,52 +2188,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j addr.port); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE)); - ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node); + ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_PING, &node); job->response_nodes[job->response_nodes_count++] = node; - - // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6 - if (search && job->response->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) { - ks_dht_nodeid_t distance; - int32_t results_index = -1; - - ks_dht_utility_nodeid_xor(&distance, &nid, &search->target); - if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) { - results_index = search->results_length; - search->results_length++; - } else { - for (int32_t index = 0; index < search->results_length; ++index) { - // Check if new node is closer than this previous result - if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) { - // If this is the first node that is further then keep it - // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result - if (results_index < 0) results_index = index; - else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index; - } - } - } - - if (results_index >= 0) { - char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1]; - char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1]; - ks_dht_search_pending_t *pending = NULL; - - ks_log(KS_LOG_DEBUG, - "Set closer node id %s (%s) in search of target id %s at results index %d\n", - ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE), - ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE), - ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE), - results_index); - search->results[results_index] = nid; - search->distances[results_index] = distance; - - if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) { - ks_dht_search_pending_destroy(&pending); - goto done; - } - if ((ret = ks_dht_findnode(dht, &addr, NULL, &search->target)) != KS_STATUS_SUCCESS) goto done; - } - } } while (nodes6_len < nodes6_size) { @@ -2243,63 +2206,19 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j addr.port); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE)); - ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node); + ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_PING, &node); job->response_nodes6[job->response_nodes6_count++] = node; - - // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6 - if (search && job->response->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) { - ks_dht_nodeid_t distance; - int32_t results_index = -1; - - ks_dht_utility_nodeid_xor(&distance, &nid, &search->target); - if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) { - results_index = search->results_length; - search->results_length++; - } else { - for (int32_t index = 0; index < search->results_length; ++index) { - // Check if new node is closer than this previous result - if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) { - // If this is the first node that is further then keep it - // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result - if (results_index < 0) results_index = index; - else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index; - } - } - } - - if (results_index >= 0) { - char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1]; - char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1]; - ks_dht_search_pending_t *pending = NULL; - - ks_log(KS_LOG_DEBUG, - "Set closer node id %s (%s) in search of target id %s at results index %d\n", - ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE), - ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE), - ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE), - results_index); - search->results[results_index] = nid; - search->distances[results_index] = distance; - - if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) { - ks_dht_search_pending_destroy(&pending); - goto done; - } - if ((ret = ks_dht_findnode(dht, &addr, NULL, &search->target)) != KS_STATUS_SUCCESS) goto done; - } - } } //ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n"); done: - if(search) ks_mutex_unlock(search->mutex); return ret; } KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht, + ks_dht_search_t *search, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target, @@ -2314,7 +2233,7 @@ KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht, ks_assert(target); if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done; - ks_dht_job_build_get(job, ks_dht_query_get, callback, target, salt, salt_length); + ks_dht_job_build_get(job, search, ks_dht_query_get, callback, target, salt, salt_length); ks_dht_jobs_add(dht, job); done: @@ -2525,7 +2444,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t addr.port); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE)); - ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node); + ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_PING, &node); job->response_nodes[job->response_nodes_count++] = node; } while (nodes6_len < nodes6_size) { @@ -2542,7 +2461,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t addr.port); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE)); - ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_DEFAULT, &node); + ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, KS_DHTRT_CREATE_PING, &node); job->response_nodes6[job->response_nodes6_count++] = node; } diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 237495711e..7e8598b53f 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -18,7 +18,7 @@ KS_BEGIN_EXTERN_C #define KS_DHT_DATAGRAM_BUFFER_SIZE 1000 //#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF -#define KS_DHT_PULSE_EXPIRATIONS 10 +#define KS_DHT_PULSE_EXPIRATIONS 1 #define KS_DHT_NODEID_SIZE 20 @@ -126,6 +126,8 @@ struct ks_dht_job_s { enum ks_dht_job_state_t state; + ks_dht_search_t *search; + ks_sockaddr_t raddr; // will obtain local endpoint node id when creating message using raddr int32_t attempts; @@ -134,7 +136,6 @@ struct ks_dht_job_s { ks_dht_job_callback_t finish_callback; ks_dht_message_t *response; - //ks_dht_nodeid_t response_id; // job specific query parameters ks_dht_nodeid_t query_target; @@ -144,6 +145,7 @@ struct ks_dht_job_s { ks_dht_storageitem_t *query_storageitem; // job specific response parameters + ks_dht_nodeid_t response_id; ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE]; ks_size_t response_nodes_count; ks_dht_node_t *response_nodes6[KS_DHT_RESPONSE_NODES_MAX_SIZE]; @@ -214,23 +216,17 @@ struct ks_dht_transaction_s { struct ks_dht_search_s { ks_pool_t *pool; - ks_mutex_t *mutex; + ks_dht_search_t *next; ks_dht_nodeid_t target; - ks_dht_search_callback_t *callbacks; - ks_size_t callbacks_size; - ks_hash_t *pending; - ks_dht_nodeid_t results[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; + ks_dht_search_callback_t callback; + ks_mutex_t *mutex; + ks_hash_t *searched; + ks_hash_t *searching; + ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; ks_dht_nodeid_t distances[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; ks_size_t results_length; }; -struct ks_dht_search_pending_s { - ks_pool_t *pool; - ks_dht_nodeid_t nodeid; - ks_time_t expiration; - ks_bool_t finished; -}; - struct ks_dht_storageitem_s { ks_pool_t *pool; ks_dht_nodeid_t id; @@ -283,8 +279,9 @@ struct ks_dht_s { ks_dhtrt_routetable_t *rt_ipv4; ks_dhtrt_routetable_t *rt_ipv6; - ks_hash_t *searches4_hash; - ks_hash_t *searches6_hash; + ks_mutex_t *searches_mutex; + ks_dht_search_t *searches_first; + ks_dht_search_t *searches_last; volatile uint32_t token_secret_current; volatile uint32_t token_secret_previous; @@ -436,12 +433,17 @@ KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, k /** * */ -KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target); +KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, + ks_dht_search_t *search, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + ks_dht_nodeid_t *target); /** * */ KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht, + ks_dht_search_t *search, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target, @@ -472,11 +474,11 @@ KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht, * @see ks_dht_search_pending_create * @see ks_dht_send_findnode */ -KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, - int32_t family, - ks_dht_nodeid_t *target, - ks_dht_search_callback_t callback, - ks_dht_search_t **search); +KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht, + int32_t family, + ks_dht_nodeid_t *target, + ks_dht_search_callback_t callback, + ks_dht_search_t **search); /** diff --git a/libs/libks/src/dht/ks_dht_job.c b/libs/libks/src/dht/ks_dht_job.c index 97e30b460d..9720282a7f 100644 --- a/libs/libks/src/dht/ks_dht_job.c +++ b/libs/libks/src/dht/ks_dht_job.c @@ -45,6 +45,7 @@ KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t } KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job, + ks_dht_search_t *search, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback, ks_dht_nodeid_t *target) @@ -53,12 +54,14 @@ KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job, ks_assert(query_callback); ks_assert(target); + job->search = search; job->query_callback = query_callback; job->finish_callback = finish_callback; job->query_target = *target; } KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job, + ks_dht_search_t *search, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback, ks_dht_nodeid_t *target, @@ -69,6 +72,7 @@ KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job, ks_assert(query_callback); ks_assert(target); + job->search = search; job->query_callback = query_callback; job->finish_callback = finish_callback; job->query_target = *target; diff --git a/libs/libks/src/dht/ks_dht_search.c b/libs/libks/src/dht/ks_dht_search.c index 31e62062b0..2417fd0c4b 100644 --- a/libs/libks/src/dht/ks_dht_search.c +++ b/libs/libks/src/dht/ks_dht_search.c @@ -2,7 +2,7 @@ #include "ks_dht-int.h" #include "sodium.h" -KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target) +KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback) { ks_dht_search_t *s; ks_status_t ret = KS_STATUS_SUCCESS; @@ -21,9 +21,15 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t memcpy(s->target.id, target->id, KS_DHT_NODEID_SIZE); - ks_hash_create(&s->pending, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK, s->pool); - ks_assert(s->pending); - ks_hash_set_keysize(s->pending, KS_DHT_NODEID_SIZE); + s->callback = callback; + + ks_hash_create(&s->searched, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool); + ks_assert(s->searched); + ks_hash_set_keysize(s->searched, KS_DHT_NODEID_SIZE); + + ks_hash_create(&s->searching, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool); + ks_assert(s->searching); + ks_hash_set_keysize(s->searching, KS_DHT_NODEID_SIZE); // done: if (ret != KS_STATUS_SUCCESS) { @@ -35,85 +41,19 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search) { ks_dht_search_t *s; - ks_hash_iterator_t *it; ks_assert(search); ks_assert(*search); s = *search; - if (s->pending) { - for (it = ks_hash_first(s->pending, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - const void *key = NULL; - ks_dht_search_pending_t *val = NULL; - ks_hash_this(it, &key, NULL, (void **)&val); - ks_dht_search_pending_destroy(&val); - } - ks_hash_destroy(&s->pending); - } - if (s->callbacks) { - ks_pool_free(s->pool, &s->callbacks); - s->callbacks = NULL; - } + if (s->searching) ks_hash_destroy(&s->searching); + if (s->searched) ks_hash_destroy(&s->searched); if (s->mutex) ks_mutex_destroy(&s->mutex); ks_pool_free(s->pool, search); } -KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback) -{ - ks_assert(search); - - if (callback) { - int32_t index; - - ks_mutex_lock(search->mutex); - index = search->callbacks_size++; - search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool, - (void *)search->callbacks, - sizeof(ks_dht_search_callback_t) * search->callbacks_size); - ks_assert(search->callbacks); - search->callbacks[index] = callback; - ks_mutex_unlock(search->mutex); - } - return KS_STATUS_SUCCESS; -} - -KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid) -{ - ks_dht_search_pending_t *p; - ks_status_t ret = KS_STATUS_SUCCESS; - - ks_assert(pending); - ks_assert(pool); - - *pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t)); - ks_assert(p); - - p->pool = pool; - p->nodeid = *nodeid; - p->expiration = ks_time_now() + ((ks_time_t)KS_DHT_SEARCH_EXPIRATION * KS_USEC_PER_SEC); - p->finished = KS_FALSE; - - // done: - if (ret != KS_STATUS_SUCCESS) { - if (p) ks_dht_search_pending_destroy(pending); - } - return ret; -} - -KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending) -{ - ks_dht_search_pending_t *p; - - ks_assert(pending); - ks_assert(*pending); - - p = *pending; - - ks_pool_free(p->pool, pending); -} - /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index 60d0d71de0..28b79126d4 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -32,6 +32,12 @@ ks_status_t dht2_get_token_callback(ks_dht_t *dht, ks_dht_job_t *job) return KS_STATUS_SUCCESS; } +ks_status_t dht2_search_findnode_callback(ks_dht_t *dht, ks_dht_search_t *search) +{ + diag("dht2_search_findnode_callback %d\n", search->results_length); + return KS_STATUS_SUCCESS; +} + int main() { //ks_size_t buflen; ks_status_t err; @@ -48,7 +54,7 @@ int main() { ks_sockaddr_t raddr1; //ks_sockaddr_t raddr2; //ks_sockaddr_t raddr3; - ks_dht_nodeid_t target; + //ks_dht_nodeid_t target; //ks_dht_storageitem_t *immutable = NULL; //ks_dht_storageitem_t *mutable = NULL; //const char *v = "Hello World!"; @@ -147,7 +153,6 @@ int main() { ok(err == KS_STATUS_SUCCESS); } - /* diag("Ping test\n"); ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING) @@ -166,14 +171,35 @@ int main() { diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up for (int i = 0; i < 10; ++i) { - //diag("DHT 1\n"); ks_dht_pulse(dht1, 100); - //diag("DHT 2\n"); ks_dht_pulse(dht2, 100); + ks_dht_pulse(dht3, 100); } ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good - */ + + ks_dht_ping(dht3, &raddr1, NULL); // (QUERYING) + + ks_dht_pulse(dht3, 100); // Send queued ping from dht3 to dht1 (RESPONDING) + + ks_dht_pulse(dht1, 100); // Receive and process ping query from dht3, queue and send ping response + + ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet + + ks_dht_pulse(dht3, 100); // Receive and process ping response from dht1 (PROCESSING then COMPLETING) + + ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good + + ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING) + + diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up + for (int i = 0; i < 20; ++i) { + ks_dht_pulse(dht1, 100); + ks_dht_pulse(dht2, 100); + ks_dht_pulse(dht3, 100); + } + ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good + //diag("Get test\n"); @@ -205,13 +231,14 @@ int main() { ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING) */ + /* diag("Put test\n"); crypto_sign_keypair(pk.key, sk.key); ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target); - ks_dht_get(dht2, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job + ks_dht_get(dht2, NULL, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job ks_dht_pulse(dht2, 100); // send get query @@ -225,40 +252,51 @@ int main() { ks_dht_pulse(dht2, 100); // receive put response - ks_dht_pulse(dht2, 100); // Call finish callback and purse the job (COMPLETING) + ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING) for (int i = 0; i < 10; ++i) { - //diag("DHT 1\n"); ks_dht_pulse(dht1, 100); - //diag("DHT 2\n"); ks_dht_pulse(dht2, 100); + ks_dht_pulse(dht3, 100); } + */ // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid - //diag("Find_Node test\n"); + /* + diag("Find_Node test\n"); - //ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid); + ks_dht_findnode(dht3, NULL, &raddr1, NULL, &ep2->nodeid); - //ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1 + ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1 - //ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response + ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response - //ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet + ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet - //ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1 - - //ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet + ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1 - //diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up - //for (int i = 0; i < 10; ++i) { - //diag("DHT 1\n"); - //ks_dht_pulse(dht1, 100); - //diag("DHT 2\n"); - //ks_dht_pulse(dht2, 100); - //} - //ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good + ks_dht_pulse(dht3, 100); // Call finish callback and purge the job (COMPLETING) + ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet + + diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up + for (int i = 0; i < 10; ++i) { + ks_dht_pulse(dht1, 100); + ks_dht_pulse(dht2, 100); + ks_dht_pulse(dht3, 100); + } + ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good + */ + + diag("Search test\n"); + ks_dht_search_findnode(dht3, AF_INET, &ep2->nodeid, dht2_search_findnode_callback, NULL); + diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up + for (int i = 0; i < 30; ++i) { + ks_dht_pulse(dht1, 100); + ks_dht_pulse(dht2, 100); + ks_dht_pulse(dht3, 100); + } /* Cleanup and shutdown */ diag("Cleanup\n");