diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 22b80530df..24bec0c531 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -267,6 +267,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search); KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback); +KS_DECLARE(void) ks_dht_search_expire(ks_dht_search_t *search, ks_hash_t *pending, int32_t *active); KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid); KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending); diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index a1b4c65fc3..aaaef29fc5 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -83,13 +83,6 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread ks_assert(d->registry_error); // @todo register 301 error for internal get/put CAS hash mismatch retry handler - /** - * Default these to FALSE, binding will set them TRUE when a respective address is bound. - * @todo these may not be useful anymore they are from legacy code - */ - d->bind_ipv4 = KS_FALSE; - d->bind_ipv6 = KS_FALSE; - /** * Initialize the data used to track endpoints to NULL, binding will handle latent allocations. * The endpoints and endpoints_poll arrays are maintained in parallel to optimize polling. @@ -154,13 +147,16 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Create the hash to store searches. */ - ks_hash_create(&d->search_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); - ks_assert(d->search_hash); + ks_hash_create(&d->searches4_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_assert(d->searches4_hash); + ks_hash_create(&d->searches6_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_assert(d->searches6_hash); /** - * The search hash uses arbitrary key size, which requires the key size be provided. + * The searches hash uses arbitrary key size, which requires the key size be provided. */ - ks_hash_set_keysize(d->search_hash, KS_DHT_NODEID_SIZE); + ks_hash_set_keysize(d->searches4_hash, KS_DHT_NODEID_SIZE); + ks_hash_set_keysize(d->searches6_hash, KS_DHT_NODEID_SIZE); /** * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets. @@ -225,14 +221,23 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) /** * Cleanup the search hash and it's contents if it is allocated. */ - if (d->search_hash) { - for (it = ks_hash_first(d->search_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + if (d->searches6_hash) { + for (it = ks_hash_first(d->searches6_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { ks_dht_search_t *val; ks_hash_this_val(it, (void **)&val); ks_dht_search_destroy(&val); } - ks_hash_destroy(&d->search_hash); + ks_hash_destroy(&d->searches6_hash); + } + if (d->searches4_hash) { + for (it = ks_hash_first(d->searches4_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + ks_dht_search_t *val; + + ks_hash_this_val(it, (void **)&val); + ks_dht_search_destroy(&val); + } + ks_hash_destroy(&d->searches4_hash); } /** @@ -285,30 +290,18 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) /** * Cleanup the array of endpoint pointers if it is allocated. */ - if (d->endpoints) { - ks_pool_free(d->pool, &d->endpoints); - d->endpoints = NULL; - } + if (d->endpoints) ks_pool_free(d->pool, &d->endpoints); /** * Cleanup the array of endpoint polling data if it is allocated. */ - if (d->endpoints_poll) { - ks_pool_free(d->pool, &d->endpoints_poll); - d->endpoints_poll = NULL; - } + if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll); /** * Cleanup the endpoints hash if it is allocated. */ if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash); - /** - * Probably don't need this - */ - d->bind_ipv4 = KS_FALSE; - d->bind_ipv6 = KS_FALSE; - /** * Cleanup the type, query, and error registries if they have been allocated. */ @@ -474,12 +467,6 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid return KS_STATUS_FAIL; } - /** - * Legacy code, this can probably go away - */ - dht->bind_ipv4 |= addr->family == AF_INET; - dht->bind_ipv6 |= addr->family == AF_INET6; - /** * Attempt to open a UDP datagram socket for the given address family. */ @@ -494,8 +481,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid /** * Attempt to bind the socket to the desired local address. */ - // @todo shouldn't ks_addr_bind take a const addr *? - if ((ret = ks_addr_bind(sock, (ks_sockaddr_t *)addr)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_addr_bind(sock, addr)) != KS_STATUS_SUCCESS) goto done; /** * Allocate the endpoint to track the local socket. @@ -616,6 +602,54 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6); } +KS_DECLARE(void) ks_dht_pulse_expirations_searches(ks_dht_t *dht, ks_hash_t *searches) +{ + ks_hash_iterator_t *it = NULL; + ks_time_t now = ks_time_now(); + + ks_assert(dht); + ks_assert(searches); + + ks_hash_write_lock(searches); + for (it = ks_hash_first(searches, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + const void *key = NULL; + ks_dht_search_t *value = NULL; + int32_t active = 0; + + ks_hash_this(it, &key, NULL, (void **)&value); + + ks_mutex_lock(value->mutex); + for (ks_hash_iterator_t *i = ks_hash_first(value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) { + const void *k = NULL; + ks_dht_search_pending_t *v = NULL; + + ks_hash_this(i, &k, NULL, (void **)&v); + + if (v->finished) continue; + + if (v->expiration <= now) { + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_log(KS_LOG_DEBUG, + "Search for %s pending find_node to %s has expired without response\n", + ks_dht_hexid(&value->target, id_buf), + ks_dht_hexid(&v->nodeid, id2_buf)); + v->finished = KS_TRUE; + continue; + } + active++; + } + ks_mutex_unlock(value->mutex); + + if (active == 0) { + for (int32_t index = 0; index < value->callbacks_size; ++index) value->callbacks[index](dht, value); + ks_hash_remove(searches, (void *)key); + ks_dht_search_destroy(&value); + } + } + ks_hash_write_unlock(searches); +} + KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) { ks_hash_iterator_t *it = NULL; @@ -645,44 +679,8 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) } ks_hash_write_unlock(dht->transactions_hash); - ks_hash_write_lock(dht->search_hash); - for (it = ks_hash_first(dht->search_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - const void *search_key = NULL; - ks_dht_search_t *search_value = NULL; - - ks_hash_this(it, &search_key, NULL, (void **)&search_value); - - ks_hash_write_lock(search_value->pending); - for (ks_hash_iterator_t *i = ks_hash_first(search_value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) { - const void *pending_key = NULL; - ks_dht_search_pending_t *pending_value = NULL; - ks_bool_t pending_remove = KS_FALSE; - - ks_hash_this(i, &pending_key, NULL, (void **)&pending_value); - - if (pending_value->finished) pending_remove = KS_TRUE; - else if (pending_value->expiration <= now) { - char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; - char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1]; - ks_log(KS_LOG_DEBUG, - "Search for %s pending find_node to %s has expired without response\n", - ks_dht_hexid(&search_value->target, id_buf), - ks_dht_hexid(&pending_value->nodeid, id2_buf)); - pending_remove = KS_TRUE; - } - if (pending_remove) { - ks_hash_remove(search_value->pending, (void *)pending_key); - ks_dht_search_pending_destroy(&pending_value); - } - } - ks_hash_write_unlock(search_value->pending); - if (ks_hash_count(search_value->pending) == 0) { - for (int32_t index = 0; index < search_value->callbacks_size; ++index) search_value->callbacks[index](dht, search_value); - ks_hash_remove(dht->search_hash, (void *)search_key); - ks_dht_search_destroy(&search_value); - } - } - ks_hash_write_unlock(dht->search_hash); + if (dht->rt_ipv4) ks_dht_pulse_expirations_searches(dht, dht->searches4_hash); + if (dht->rt_ipv6) ks_dht_pulse_expirations_searches(dht, dht->searches6_hash); if (dht->token_secret_expiration && dht->token_secret_expiration <= now) { dht->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000); @@ -771,7 +769,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t * memcpy(buffer + (*buffer_length), paddr, sizeof(uint32_t)); *buffer_length += addr_len; - memcpy(buffer + (*buffer_length), (const void *)&port, sizeof(uint16_t)); + memcpy(buffer + (*buffer_length), &port, sizeof(uint16_t)); *buffer_length += sizeof(uint16_t); return KS_STATUS_SUCCESS; @@ -801,8 +799,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer, port = *((uint16_t *)(buffer + *buffer_length)); *buffer_length += sizeof(uint16_t); - // @todo ks_addr_set_raw second parameter should be const? - return ks_addr_set_raw(address, (void *)paddr, port, address->family); + return ks_addr_set_raw(address, paddr, port, address->family); } KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid, @@ -824,7 +821,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *n return KS_STATUS_NO_MEM; } - memcpy(buffer + (*buffer_length), (void *)nodeid, KS_DHT_NODEID_SIZE); + memcpy(buffer + (*buffer_length), nodeid->id, KS_DHT_NODEID_SIZE); *buffer_length += KS_DHT_NODEID_SIZE; return ks_dht_utility_compact_addressinfo(address, buffer, buffer_length, buffer_size); @@ -953,7 +950,7 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, k KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) { // @todo calculate max IPV6 payload size? - char buf[1000]; + char buf[1001]; ks_size_t buf_len; ks_assert(dht); @@ -963,8 +960,11 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) // @todo blacklist check - // @todo use different encode function to check if all data was encoded, do not send large incomplete messages buf_len = ben_encode2(buf, sizeof(buf), message->data); + if (buf_len >= sizeof(buf)) { + ks_log(KS_LOG_DEBUG, "Dropping message that is too large\n"); + return KS_STATUS_FAIL; + } ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port); ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data)); @@ -1192,13 +1192,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, - int family, + int32_t family, ks_dht_nodeid_t *target, ks_dht_search_callback_t callback, ks_dht_search_t **search) { + ks_bool_t locked_searches = KS_FALSE; ks_bool_t locked_search = KS_FALSE; - ks_bool_t locked_pending = KS_FALSE; + ks_hash_t *searches = NULL; + ks_dhtrt_routetable_t *rt = NULL; ks_dht_search_t *s = NULL; ks_bool_t inserted = KS_FALSE; ks_bool_t allocated = KS_FALSE; @@ -1211,12 +1213,27 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, if (search) *search = NULL; - // @todo start write lock on search_hash and hold until after inserting - // check hash for target to see if search already exists - ks_hash_write_lock(dht->search_hash); - locked_search = KS_TRUE; + if (family == AF_INET) { + if (!dht->rt_ipv4) { + ret = KS_STATUS_FAIL; + goto done; + } + searches = dht->searches4_hash; + rt = dht->rt_ipv4; + } else { + if (!dht->rt_ipv6) { + ret = KS_STATUS_FAIL; + goto done; + } + searches = dht->searches6_hash; + rt = dht->rt_ipv6; + } - s = ks_hash_search(dht->search_hash, target->id, KS_UNLOCKED); + // check hash for target to see if search already exists + ks_hash_write_lock(searches); + locked_searches = KS_TRUE; + + s = ks_hash_search(searches, target->id, KS_UNLOCKED); // if search does not exist, create new search and store in hash by target if (!s) { @@ -1230,49 +1247,56 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, // if the search is old then bail out and return successfully if (!allocated) goto done; - if ((ret = ks_hash_insert(dht->search_hash, s->target.id, s)) == KS_STATUS_SUCCESS) goto done; + // everything past this point until final cleanup is only for when a search of the target does not already exist + + if ((ret = ks_hash_insert(searches, s->target.id, s)) == KS_STATUS_SUCCESS) goto done; inserted = KS_TRUE; - // lock pending before unlocking the search hash to prevent this search from being used before we finish setting it up - ks_hash_write_lock(s->pending); - locked_pending = KS_TRUE; + // lock search before unlocking the searches_hash to prevent this search from being used before we finish setting it up + ks_mutex_lock(s->mutex); + locked_search = KS_TRUE; - // release search hash lock now, but pending is still locked - ks_hash_write_unlock(dht->search_hash); - locked_search = KS_FALSE; + // release searches_hash lock now, but search is still locked + ks_hash_write_unlock(searches); + locked_searches = KS_FALSE; // find closest good nodes to target locally and store as the closest results - query.nodeid = *target; + query.nodeid = *target; query.type = KS_DHT_REMOTE; query.max = KS_DHT_SEARCH_RESULTS_MAX_SIZE; query.family = family; query.count = 0; - ks_dhtrt_findclosest_nodes(family == AF_INET ? dht->rt_ipv4 : dht->rt_ipv6, &query); + ks_dhtrt_findclosest_nodes(rt, &query); for (int32_t i = 0; i < query.count; ++i) { ks_dht_node_t *n = query.nodes[i]; ks_dht_search_pending_t *pending = NULL; - - s->results[i] = n->nodeid; - ks_dht_utility_nodeid_xor(&s->distances[i], &n->nodeid, &s->target); - // add to pending with expiration + + // always take the initial local closest good nodes as results, they are already good nodes that are closest with no results yet + s->results[s->results_length] = n->nodeid; + ks_dht_utility_nodeid_xor(&s->distances[s->results_length], &n->nodeid, &s->target); + s->results_length++; + + pending = ks_hash_search(s->pending, n->nodeid.id, KS_UNLOCKED); + if (pending) continue; // skip duplicates, this really shouldn't happen on a new search but we sanity check + + // add to pending with expiration, if any of this fails it's almost catastrophic so just bail out and fail the entire search attempt + // there are no probable causes for a failure but check them anyway if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_hash_insert(s->pending, n->nodeid.id, pending)) != KS_STATUS_SUCCESS) { ks_dht_search_pending_destroy(&pending); goto done; } if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done; - // increment here in case we end up bailing out; execute with what it has or destroy the search? - s->results_length++; } - // @todo release query nodes - ks_hash_write_unlock(s->pending); - locked_pending = KS_FALSE; + // @todo release closest local query node locks + ks_mutex_unlock(s->mutex); + locked_search = KS_FALSE; if (search) *search = s; done: - if (locked_search) ks_hash_write_unlock(dht->search_hash); - if (locked_pending) ks_hash_write_unlock(s->pending); + if (locked_searches) ks_hash_write_unlock(searches); + if (locked_search) ks_mutex_unlock(s->mutex); if (ret != KS_STATUS_SUCCESS) { if (!inserted && s) ks_dht_search_destroy(&s); *search = NULL; @@ -1513,7 +1537,13 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE)); - // @todo produce "want" value if both families are bound + // Only request both v4 and v6 if we have both interfaces bound and are looking for our own node id, aka bootstrapping + if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, targetid->id, KS_DHT_NODEID_SIZE)) { + struct bencode *want = ben_list(); + ben_list_append_str(want, "n4"); + ben_list_append_str(want, "n6"); + ben_dict_set(a, ben_blob("want", 4), want); + } ks_log(KS_LOG_DEBUG, "Sending message query find_node\n"); ks_q_push(dht->send_q, (void *)message); @@ -1579,7 +1609,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE if (want4) { query.family = AF_INET; - ks_dhtrt_findclosest_nodes(routetable, &query); + ks_dhtrt_findclosest_nodes(dht->rt_ipv4, &query); for (int32_t i = 0; i < query.count; ++i) { ks_dht_node_t *qn = query.nodes[i]; @@ -1592,10 +1622,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); } + // @todo release query nodes } if (want6) { query.family = AF_INET6; - ks_dhtrt_findclosest_nodes(routetable, &query); + ks_dhtrt_findclosest_nodes(dht->rt_ipv6, &query); for (int32_t i = 0; i < query.count; ++i) { ks_dht_node_t *qn = query.nodes[i]; @@ -1633,6 +1664,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m { ks_dht_nodeid_t *id; struct bencode *n; + //ks_bool_t n4 = KS_FALSE; + //ks_bool_t n6 = KS_FALSE; const uint8_t *nodes = NULL; const uint8_t *nodes6 = NULL; size_t nodes_size = 0; @@ -1642,6 +1675,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_hash_t *searches = NULL; ks_dht_search_t *search = NULL; ks_status_t ret = KS_STATUS_SUCCESS; @@ -1653,11 +1687,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m n = ben_dict_get_by_str(message->args, "nodes"); if (n) { + //n4 = KS_TRUE; nodes = (const uint8_t *)ben_str_val(n); nodes_size = ben_str_len(n); } n = ben_dict_get_by_str(message->args, "nodes6"); if (n) { + //n6 = KS_TRUE; nodes6 = (const uint8_t *)ben_str_val(n); nodes6_size = ben_str_len(n); } @@ -1671,14 +1707,18 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done; - ks_hash_read_lock(dht->search_hash); - search = ks_hash_search(dht->search_hash, message->transaction->target.id, KS_UNLOCKED); - ks_hash_read_unlock(dht->search_hash); + searches = message->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash; + + ks_hash_read_lock(searches); + search = ks_hash_search(searches, message->transaction->target.id, KS_UNLOCKED); if (search) { - ks_dht_search_pending_t *pending = ks_hash_search(search->pending, id->id, KS_READLOCKED); - ks_hash_read_unlock(search->pending); + ks_dht_search_pending_t *pending = NULL; + + ks_mutex_lock(search->mutex); + pending = ks_hash_search(search->pending, id->id, KS_UNLOCKED); if (pending) pending->finished = KS_TRUE; } + ks_hash_read_unlock(searches); while (nodes_len < nodes_size) { ks_dht_nodeid_t nid; @@ -1697,7 +1737,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); ks_dhtrt_release_node(node); - if (search) { + if (search && message->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) { ks_dht_nodeid_t distance; int32_t results_index = -1; @@ -1707,10 +1747,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m search->results_length++; } else { for (int32_t index = 0; index < search->results_length; ++index) { - // Check if new node is closer than this existing result + // Check if new node is closer than this previous result if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) { // If this is the first node that is further then keep it - // Else if two or more nodes are further, and this existing result is further than the previous one then keep it + // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result if (results_index < 0) results_index = index; else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index; } @@ -1757,12 +1797,55 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); ks_dhtrt_release_node(node); + + if (search && message->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) { + ks_dht_nodeid_t distance; + int32_t results_index = -1; + + ks_dht_utility_nodeid_xor(&distance, &nid, &search->target); + if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) { + results_index = search->results_length; + search->results_length++; + } else { + for (int32_t index = 0; index < search->results_length; ++index) { + // Check if new node is closer than this previous result + if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) { + // If this is the first node that is further then keep it + // Else if two or more nodes are further, and this previous result is further than the current one then keep the current result + if (results_index < 0) results_index = index; + else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index; + } + } + } + + if (results_index >= 0) { + char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_dht_search_pending_t *pending = NULL; + + ks_log(KS_LOG_DEBUG, + "Set closer node id %s (%s) in search of target id %s at results index %d\n", + ks_dht_hexid(&nid, id_buf), + ks_dht_hexid(&distance, id2_buf), + ks_dht_hexid(&search->target, id3_buf), + results_index); + search->results[results_index] = nid; + search->distances[results_index] = distance; + + if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) { + ks_dht_search_pending_destroy(&pending); + goto done; + } + if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done; + } + } } - // @todo repeat above for ipv6 table ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n"); done: + if(search) ks_mutex_unlock(search->mutex); return ret; } diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index c9f60ef181..51b4f84d9f 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -149,17 +149,6 @@ struct ks_dht_transaction_s { ks_bool_t finished; }; -// Check if search already exists for the target id, if so add another callback, must be a popular target id -// Otherwise create new search, set target id, add callback, and insert the search into the dht search_hash with target id key -// Get closest local nodes to target id, check against results, send_findnode for closer nodes and add to pending hash with queried node id -// Upon receiving find_node response, check target id against dht search_hash, check responding node id against pending hash, set finished for purging -// Update results if responding node id is closer than any current result, or the results are not full -// Check response nodes against results, send_findnode for closer nodes and add to pending hash with an expiration -// Pulse expirations purges expired and finished from pending hash, once hash is empty callbacks are called providing results array -// Note: -// During the lifetime of a search, the ks_dht_node_t's must be kept alive -// Do a query touch on nodes prior to being added to pending, this should reset timeout and keep the nodes alive long enough even if they are dubious -// Nodes which land in results are known good with recent response to find_nodes and should be around for a while before route table worries about cleanup struct ks_dht_search_s { ks_pool_t *pool; ks_mutex_t *mutex; @@ -208,9 +197,6 @@ struct ks_dht_s { ks_hash_t *registry_query; ks_hash_t *registry_error; - ks_bool_t bind_ipv4; - ks_bool_t bind_ipv6; - ks_dht_endpoint_t **endpoints; int32_t endpoints_size; ks_hash_t *endpoints_hash; @@ -230,7 +216,8 @@ struct ks_dht_s { ks_dhtrt_routetable_t *rt_ipv4; ks_dhtrt_routetable_t *rt_ipv6; - ks_hash_t *search_hash; + ks_hash_t *searches4_hash; + ks_hash_t *searches6_hash; volatile uint32_t token_secret_current; volatile uint32_t token_secret_previous; @@ -300,7 +287,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, * @param dht pointer to the dht instance * @param nodeid pointer to a nodeid for this endpoint, may be NULL to generate one randomly * @param addr pointer to the local address information - * @param dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint output + * @param endpoint dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint output * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ... * @see ks_socket_option * @see ks_addr_bind @@ -320,6 +307,25 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid */ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout); +/** + * Create a network search of the closest nodes to a target. + * @param dht pointer to the dht instance + * @param family either AF_INET or AF_INET6 for the appropriate network to search + * @param target pointer to the nodeid for the target to be searched + * @param callback an optional callback to add to the search when it is finished + * @param search dereferenced out pointer to the allocated search, may be NULL to ignore search output + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL + * @see ks_dht_search_create + * @see ks_dht_search_callback_add + * @see ks_hash_insert + * @see ks_dht_search_pending_create + * @see ks_dht_send_findnode + */ +KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, + int32_t family, + ks_dht_nodeid_t *target, + ks_dht_search_callback_t callback, + ks_dht_search_t **search); /** * diff --git a/libs/libks/src/dht/ks_dht_datagram.c b/libs/libks/src/dht/ks_dht_datagram.c index a13c090d18..fe98cffc3c 100644 --- a/libs/libks/src/dht/ks_dht_datagram.c +++ b/libs/libks/src/dht/ks_dht_datagram.c @@ -31,8 +31,7 @@ KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram, // done: if (ret != KS_STATUS_SUCCESS) { - if (dg) ks_dht_datagram_destroy(&dg); - *datagram = NULL; + ks_dht_datagram_destroy(datagram); } return ret; } @@ -46,9 +45,7 @@ KS_DECLARE(void) ks_dht_datagram_destroy(ks_dht_datagram_t **datagram) dg = *datagram; - ks_pool_free(dg->pool, &dg); - - *datagram = NULL; + ks_pool_free(dg->pool, datagram); } /* For Emacs: diff --git a/libs/libks/src/dht/ks_dht_endpoint.c b/libs/libks/src/dht/ks_dht_endpoint.c index 4907d15f34..d8c2965fc4 100644 --- a/libs/libks/src/dht/ks_dht_endpoint.c +++ b/libs/libks/src/dht/ks_dht_endpoint.c @@ -30,8 +30,7 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint, // done: if (ret != KS_STATUS_SUCCESS) { - if (ep) ks_dht_endpoint_destroy(&ep); - *endpoint = NULL; + if (ep) ks_dht_endpoint_destroy(endpoint); } return ret; } @@ -49,9 +48,8 @@ KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint) ep = *endpoint; if (ep->sock != KS_SOCK_INVALID) ks_socket_close(&ep->sock); - ks_pool_free(ep->pool, &ep); - *endpoint = NULL; + ks_pool_free(ep->pool, endpoint); } /* For Emacs: diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index 3430c2853d..75780f46c5 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -26,8 +26,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message, // done: if (ret != KS_STATUS_SUCCESS) { - if (m) ks_dht_message_destroy(&m); - *message = NULL; + ks_dht_message_destroy(message); } return ret; } @@ -45,9 +44,8 @@ KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message) ben_free(m->data); m->data = NULL; } - ks_pool_free(m->pool, &(*message)); - *message = NULL; + ks_pool_free(m->pool, message); } diff --git a/libs/libks/src/dht/ks_dht_search.c b/libs/libks/src/dht/ks_dht_search.c index 1945fcd294..e15d726f3b 100644 --- a/libs/libks/src/dht/ks_dht_search.c +++ b/libs/libks/src/dht/ks_dht_search.c @@ -27,8 +27,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t // done: if (ret != KS_STATUS_SUCCESS) { - if (s) ks_dht_search_destroy(&s); - *search = NULL; + if (s) ks_dht_search_destroy(search); } return ret; } @@ -58,9 +57,7 @@ KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search) } if (s->mutex) ks_mutex_destroy(&s->mutex); - ks_pool_free(s->pool, &s); - - *search = NULL; + ks_pool_free(s->pool, search); } KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback) @@ -100,10 +97,9 @@ KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **p // done: if (ret != KS_STATUS_SUCCESS) { - if (p) ks_dht_search_pending_destroy(&p); - *pending = NULL; + if (p) ks_dht_search_pending_destroy(pending); } - return KS_STATUS_SUCCESS; + return ret; } KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending) @@ -115,9 +111,7 @@ KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending p = *pending; - ks_pool_free(p->pool, &p); - - *pending = NULL; + ks_pool_free(p->pool, pending); } /* For Emacs: diff --git a/libs/libks/src/dht/ks_dht_storageitem.c b/libs/libks/src/dht/ks_dht_storageitem.c index c20cdd1e9c..ce34bde9f9 100644 --- a/libs/libks/src/dht/ks_dht_storageitem.c +++ b/libs/libks/src/dht/ks_dht_storageitem.c @@ -32,8 +32,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t // done: if (ret != KS_STATUS_SUCCESS) { - if (si) ks_dht_storageitem_destroy(&si); - *item = NULL; + if (si) ks_dht_storageitem_destroy(item); } return ret; } @@ -83,8 +82,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t * // done: if (ret != KS_STATUS_SUCCESS) { - if (si) ks_dht_storageitem_destroy(&si); - *item = NULL; + if (si) ks_dht_storageitem_destroy(item); } return ret; } @@ -105,9 +103,8 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item) ben_free(si->v); si->v = NULL; } - ks_pool_free(si->pool, &si); - *item = NULL; + ks_pool_free(si->pool, item); } /* For Emacs: diff --git a/libs/libks/src/dht/ks_dht_transaction.c b/libs/libks/src/dht/ks_dht_transaction.c index 91db11447d..6d6b7baf35 100644 --- a/libs/libks/src/dht/ks_dht_transaction.c +++ b/libs/libks/src/dht/ks_dht_transaction.c @@ -25,8 +25,7 @@ KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transac // done: if (ret != KS_STATUS_SUCCESS) { - if (t) ks_dht_transaction_destroy(&t); - *transaction = NULL; + if (t) ks_dht_transaction_destroy(transaction); } return ret; } @@ -40,9 +39,7 @@ KS_DECLARE(void) ks_dht_transaction_destroy(ks_dht_transaction_t **transaction) t = *transaction; - ks_pool_free(t->pool, &t); - - *transaction = NULL; + ks_pool_free(t->pool, transaction); } /* For Emacs: diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index 76ce6abe22..36dc09e431 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -149,9 +149,9 @@ int main() { diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up for (int i = 0; i < 10; ++i) { - diag("DHT 1\n"); + //diag("DHT 1\n"); ks_dht_pulse(dht1, 100); - diag("DHT 2\n"); + //diag("DHT 2\n"); ks_dht_pulse(dht2, 100); } ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good @@ -174,9 +174,9 @@ int main() { diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up for (int i = 0; i < 10; ++i) { - diag("DHT 1\n"); + //diag("DHT 1\n"); ks_dht_pulse(dht1, 100); - diag("DHT 2\n"); + //diag("DHT 2\n"); ks_dht_pulse(dht2, 100); } ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good