diff --git a/libs/libks/Makefile.am b/libs/libks/Makefile.am index d1931c19bd..64f366a26e 100644 --- a/libs/libks/Makefile.am +++ b/libs/libks/Makefile.am @@ -13,8 +13,8 @@ libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/k libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c -libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c src/dht/ks_dht_search.c -libks_la_SOURCES += src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c +libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_datagram.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c +libks_la_SOURCES += src/dht/ks_dht_search.c src/dht/ks_dht_storageitem.c src/dht/ks_dht_bucket.c libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c #aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index ae7ae23870..93afb44a70 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -162,6 +162,7 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message); * @param raddr pointer to the remote address * @param query string value of the query type, for example "ping" * @param callback callback to be called when response to transaction is received + * @param transaction dereferenced out pointer to the allocated transaction, may be NULL to ignore output * @param message dereferenced out pointer to the allocated message * @param args dereferenced out pointer to the allocated bencode args, may be NULL to ignore output * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ... @@ -178,6 +179,7 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, ks_sockaddr_t *raddr, const char *query, ks_dht_message_callback_t callback, + ks_dht_transaction_t **transaction, ks_dht_message_t **message, struct bencode **args); @@ -217,7 +219,8 @@ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, k KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid); KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid); -KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr); +KS_DECLARE(void *)ks_dht_process(ks_thread_t *thread, void *data); +KS_DECLARE(ks_status_t) ks_dht_process_(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr); KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t *message); @@ -235,6 +238,20 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message); + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_datagram_alloc(ks_dht_datagram_t **datagram, ks_pool_t *pool); +KS_DECLARE(void) ks_dht_datagram_prealloc(ks_dht_datagram_t *datagram, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dht_datagram_free(ks_dht_datagram_t **datagram); + +KS_DECLARE(ks_status_t) ks_dht_datagram_init(ks_dht_datagram_t *datagram, + ks_dht_t *dht, + ks_dht_endpoint_t *endpoint, + const ks_sockaddr_t *raddr); +KS_DECLARE(ks_status_t) ks_dht_datagram_deinit(ks_dht_datagram_t *datagram); + /** * */ @@ -255,9 +272,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_alloc(ks_dht_search_t **search, ks_pool_t KS_DECLARE(void) ks_dht_search_prealloc(ks_dht_search_t *search, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search); -KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, - const ks_dht_nodeid_t *target, - ks_dht_search_callback_t callback); +KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target); KS_DECLARE(ks_status_t) ks_dht_search_deinit(ks_dht_search_t *search); KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback); @@ -266,7 +281,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_pending_alloc(ks_dht_search_pending_t **pe KS_DECLARE(void) ks_dht_search_pending_prealloc(ks_dht_search_pending_t *pending, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pending); -KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node, ks_time_t expiration); +KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node); KS_DECLARE(ks_status_t) ks_dht_search_pending_deinit(ks_dht_search_pending_t *pending); /** diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index 870c9daec2..ee713121bb 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -90,13 +90,27 @@ KS_DECLARE(ks_status_t) ks_dht_free(ks_dht_t **dht) } -KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht) +KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht, ks_thread_pool_t *tpool) { ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(dht->pool); + /** + * Create a new internally managed thread pool if one wasn't provided. + */ + if (!tpool) { + if ((ret = ks_thread_pool_create(&tpool, + KS_DHT_TPOOL_MIN, + KS_DHT_TPOOL_MAX, + KS_DHT_TPOOL_STACK, + KS_PRI_NORMAL, + KS_DHT_TPOOL_IDLE)) != KS_STATUS_SUCCESS) return ret; + dht->tpool_alloc = KS_TRUE; + } + dht->tpool = tpool; + /** * Default autorouting to disabled. */ @@ -349,6 +363,13 @@ KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht) dht->autoroute = KS_FALSE; dht->autoroute_port = 0; + /** + * If the thread pool was allocated internally, destroy it. + * If this fails, something catastrophically bad happened like memory corruption. + */ + if (dht->tpool_alloc && (ret = ks_thread_pool_destroy(&dht->tpool)) != KS_STATUS_SUCCESS) return ret; + dht->tpool_alloc = KS_FALSE; + return KS_STATUS_SUCCESS; } @@ -372,7 +393,7 @@ KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint) { // @todo lookup standard def for IPV6 max size - char ip[48]; + char ip[48 + 1]; ks_dht_endpoint_t *ep = NULL; ks_status_t ret = KS_STATUS_SUCCESS; @@ -392,7 +413,6 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *rad /** * Check if the endpoint has already been bound for the address we want to route through. - * @todo ip:port for key to allow a single ip with multiple endpoints on different ports */ ep = ks_hash_search(dht->endpoints_hash, ip, KS_READLOCKED); if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret; @@ -467,6 +487,13 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid */ if (endpoint) *endpoint = NULL; + ep = ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_READLOCKED); + if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret; + if (ep) { + ks_log(KS_LOG_DEBUG, "Attempted to bind to %s more than once.\n", addr->host); + return KS_STATUS_FAIL; + } + /** * Legacy code, this can probably go away */ @@ -511,7 +538,6 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid /** * Add the new endpoint into the endpoints hash for quick lookups. - * @todo ip:port for key to allow a single ip with multiple endpoints on different ports */ if (!ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) { ret = KS_STATUS_FAIL; @@ -538,6 +564,9 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ep->addr.host, ep->addr.port, &ep->node)) != KS_STATUS_SUCCESS) goto done; + /** + * Do not release the ep->node, keep it alive until cleanup + */ } else { if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_create_node(dht->rt_ipv6, @@ -546,6 +575,9 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ep->addr.host, ep->addr.port, &ep->node)) != KS_STATUS_SUCCESS) goto done; + /** + * Do not release the ep->node, keep it alive until cleanup + */ } /** @@ -573,6 +605,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) { + ks_dht_datagram_t *datagram = NULL; int32_t result; ks_assert(dht); @@ -589,12 +622,25 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) for (int32_t i = 0; i < dht->endpoints_size; ++i) { if (dht->endpoints_poll[i].revents & POLLIN) { ks_sockaddr_t raddr = KS_SA_INIT; - dht->recv_buffer_length = KS_DHT_RECV_BUFFER_SIZE; + dht->recv_buffer_length = sizeof(dht->recv_buffer); raddr.family = dht->endpoints[i]->addr.family; if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) { - // @todo copy data to a ks_dht_frame then create job to call ks_dht_process from threadpool - ks_dht_process(dht, dht->endpoints[i], &raddr); + if (dht->recv_buffer_length == sizeof(dht->recv_buffer)) { + ks_log(KS_LOG_DEBUG, "Dropped oversize datagram from %s %d\n", raddr.host, raddr.port); + } else { + // @todo check for recycled datagrams + if (ks_dht_datagram_alloc(&datagram, dht->pool) == KS_STATUS_SUCCESS) { + if (ks_dht_datagram_init(datagram, dht, dht->endpoints[i], &raddr) != KS_STATUS_SUCCESS) { + // @todo add to recycled datagrams + ks_dht_datagram_free(&datagram); + } else if (ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) { + // @todo add to recycled datagrams + ks_dht_datagram_deinit(datagram); + ks_dht_datagram_free(&datagram); + } + } + } } } } @@ -700,6 +746,8 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t * ks_assert(buffer_size); ks_assert(address->family == AF_INET || address->family == AF_INET6); + // @todo change parameters to dereferenced pointer and forward buffer pointer directly + addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8); if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) { @@ -737,6 +785,8 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer, ks_assert(address); ks_assert(address->family == AF_INET ||address->family == AF_INET6); + // @todo change parameters to dereferenced pointer and forward buffer pointer directly + addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8); if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_NO_MEM; @@ -761,6 +811,8 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *n ks_assert(buffer_size); ks_assert(address->family == AF_INET || address->family == AF_INET6); + // @todo change parameters to dereferenced pointer and forward buffer pointer directly + if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) { ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n"); return KS_STATUS_NO_MEM; @@ -784,6 +836,8 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer, ks_assert(address); ks_assert(address->family == AF_INET ||address->family == AF_INET6); + // @todo change parameters to dereferenced pointer and forward buffer pointer directly + if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_NO_MEM; memcpy(nodeid->id, buffer + *buffer_length, KS_DHT_NODEID_SIZE); @@ -903,6 +957,7 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) // @todo blacklist check + // @todo use different encode function to check if all data was encoded, do not send large incomplete messages buf_len = ben_encode2(buf, sizeof(buf), message->data); ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port); @@ -917,6 +972,7 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, ks_sockaddr_t *raddr, const char *query, ks_dht_message_callback_t callback, + ks_dht_transaction_t **transaction, ks_dht_message_t **message, struct bencode **args) { @@ -931,6 +987,7 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, ks_assert(callback); ks_assert(message); + if (transaction) *transaction = NULL; *message = NULL; if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret; @@ -955,6 +1012,8 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, goto done; } + if (transaction) *transaction = trans; + ret = KS_STATUS_SUCCESS; done: @@ -1012,7 +1071,46 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, } -KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr) +KS_DECLARE(void *) ks_dht_process(ks_thread_t *thread, void *data) +{ + ks_dht_datagram_t *datagram = (ks_dht_datagram_t *)data; + ks_dht_message_t message; + ks_dht_message_callback_t callback; + + ks_assert(thread); + ks_assert(data); + + ks_log(KS_LOG_DEBUG, "Received message from %s %d\n", datagram->raddr.host, datagram->raddr.port); + if (datagram->raddr.family != AF_INET && datagram->raddr.family != AF_INET6) { + ks_log(KS_LOG_DEBUG, "Message from unsupported address family\n"); + return NULL; + } + + // @todo blacklist check for bad actor nodes + + if (ks_dht_message_prealloc(&message, datagram->dht->pool) != KS_STATUS_SUCCESS) return NULL; + + if (ks_dht_message_init(&message, datagram->endpoint, &datagram->raddr, KS_FALSE) != KS_STATUS_SUCCESS) return NULL; + + if (ks_dht_message_parse(&message, datagram->buffer, datagram->buffer_length) != KS_STATUS_SUCCESS) goto done; + + callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(datagram->dht->registry_type, message.type, KS_READLOCKED); + ks_hash_read_unlock(datagram->dht->registry_type); + + if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); + else callback(datagram->dht, &message); + + done: + ks_dht_message_deinit(&message); + + // @todo recycle datagram + ks_dht_datagram_deinit(datagram); + ks_dht_datagram_free(&datagram); + + return NULL; +} + +KS_DECLARE(ks_status_t) ks_dht_process_(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr) { ks_dht_message_t message; ks_dht_message_callback_t callback; @@ -1140,40 +1238,81 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t } -KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_nodeid_t *id) //, ks_dht_search_callback_t callback) +KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, + int family, + ks_dht_nodeid_t *target, + ks_dht_search_callback_t callback, + ks_dht_search_t **search) { + ks_dht_search_t *s = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; + ks_bool_t inserted = KS_FALSE; + ks_bool_t allocated = KS_FALSE; + ks_dhtrt_querynodes_t query; + ks_assert(dht); - ks_assert(id); + ks_assert(family == AF_INET || family == AF_INET6); + ks_assert(target); - // @todo check hash for id to see if search already exists + if (search) *search = NULL; - // @todo if search does not exist, create new search and store in hash by id + // check hash for target to see if search already exists + s = ks_hash_search(dht->search_hash, target->id, KS_READLOCKED); + ks_hash_read_unlock(dht->search_hash); - // @todo queue callback into search, if multiple tasks are searching the same id they can all be notified of results + // if search does not exist, create new search and store in hash by target + if (!s) { + if ((ret = ks_dht_search_alloc(&s, dht->pool)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_search_init(s, target)) != KS_STATUS_SUCCESS) goto done; + allocated = KS_TRUE; + } else inserted = KS_TRUE; - // @todo if search existed already and is already running then bail out and let it run + // add callback regardless of whether the search is new or old + if ((ret = ks_dht_search_callback_add(s, callback)) != KS_STATUS_SUCCESS) goto done; - // @todo find closest nodes to id locally, store as closest results, and queue in search pending a find_node call for closer nodes + // if the search is old then bail out and return successfully + if (!allocated) goto done; - // @todo pop a pending find_node call from search queue and call ks_dht_send_find_node, track last popped for timeout + // find closest good nodes to target locally and store as the closest results + query.nodeid = *target; + query.type = KS_DHT_REMOTE; + query.max = KS_DHT_SEARCH_RESULTS_MAX_SIZE; + query.family = family; + ks_dhtrt_findclosest_nodes(family == AF_INET ? dht->rt_ipv4 : dht->rt_ipv6, &query); + for (int32_t i = 0; i < query.count; ++i) { + ks_dht_node_t *n = query.nodes[i]; + ks_dht_search_pending_t *pending = NULL; + s->results[i] = n; + // add to pending with expiration + if ((ret = ks_dht_search_pending_alloc(&pending, s->pool)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_search_pending_init(pending, n)) != KS_STATUS_SUCCESS) { + ks_dht_search_pending_free(&pending); + goto done; + } + if (!ks_hash_insert(s->pending, n->nodeid.id, n)) { + ks_dht_search_pending_deinit(pending); + ks_dht_search_pending_free(&pending); + goto done; + } + // @todo call send_findnode, but transactions need to track the target id from a find_node query since find_node response does not contain it + } + s->results_length = query.count; + + if (!ks_hash_insert(dht->search_hash, s->target.id, s)) { + ret = KS_STATUS_FAIL; + goto done; + } + inserted = KS_TRUE; + if (search) *search = s; + ret = KS_STATUS_SUCCESS; - // @todo upon receiving response to find_node, check for an existing search by the id - - // @todo keep track of the closest K(8) nodes found to the id - - // @todo if there is closer node(s) in response, update furthest search result(s) and queue find_node calls for closer nodes - - // @todo if search queue is empty, call callbacks - - // @todo otherwise pop a pending find_node call from search queue and call ks_dht_send_find_node, track last popped for timeout - - - // @todo during pulse iterate searches and check for last popped timeout where find_node received no reply - - // @todo pop a pending find_node call, or call callbacks if empty - - return KS_STATUS_SUCCESS; + done: + if (ret != KS_STATUS_SUCCESS && !inserted && s) { + ks_dht_search_deinit(s); + ks_dht_search_free(&s); + } + return ret; } @@ -1298,7 +1437,14 @@ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, k ks_assert(dht); ks_assert(raddr); - if (ks_dht_setup_query(dht, ep, raddr, "ping", ks_dht_process_response_ping, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (ks_dht_setup_query(dht, + ep, + raddr, + "ping", + ks_dht_process_response_ping, + NULL, + &message, + &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); @@ -1327,6 +1473,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Message query ping is valid\n"); @@ -1364,7 +1511,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - + if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; @@ -1376,6 +1524,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid) { + ks_dht_transaction_t *transaction = NULL; ks_dht_message_t *message = NULL; struct bencode *a = NULL; @@ -1383,10 +1532,20 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e ks_assert(raddr); ks_assert(targetid); - if (ks_dht_setup_query(dht, ep, raddr, "find_node", ks_dht_process_response_findnode, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (ks_dht_setup_query(dht, + ep, + raddr, + "find_node", + ks_dht_process_response_findnode, + &transaction, + &message, + &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + memcpy(transaction->target.id, targetid->id, KS_DHT_NODEID_SIZE); ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE)); + // @todo produce "want" value if both families are bound ks_log(KS_LOG_DEBUG, "Sending message query find_node\n"); ks_q_push(dht->send_q, (void *)message); @@ -1422,11 +1581,12 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess want = ben_dict_get_by_str(message->args, "want"); if (want) { + // @todo use ben_list_for_each size_t want_len = ben_list_len(want); for (size_t i = 0; i < want_len; ++i) { struct bencode *iv = ben_list_get(want, i); - if (!ben_cmp_with_str(iv, "n4")) want4 = KS_TRUE; - if (!ben_cmp_with_str(iv, "n6")) want6 = KS_TRUE; + if (!ben_cmp_with_str(iv, "n4") && dht->rt_ipv4) want4 = KS_TRUE; + if (!ben_cmp_with_str(iv, "n6") && dht->rt_ipv6) want6 = KS_TRUE; } } @@ -1439,6 +1599,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n"); @@ -1451,18 +1612,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_dhtrt_findclosest_nodes(routetable, &query); for (int32_t i = 0; i < query.count; ++i) { - if (ks_dht_utility_compact_nodeinfo(&query.nodes[i]->nodeid, - &query.nodes[i]->addr, + ks_dht_node_t *qn = query.nodes[i]; + + if (ks_dht_utility_compact_nodeinfo(&qn->nodeid, + &qn->addr, buffer4, &buffer4_length, - sizeof(buffer4)) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - ks_log(KS_LOG_DEBUG, - "Compacted ipv4 nodeinfo for %s (%s %d)\n", - ks_dht_hexid(&query.nodes[i]->nodeid, id_buf), - query.nodes[i]->addr.host, - query.nodes[i]->addr.port); + sizeof(buffer4)) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); } } if (want6) { @@ -1470,18 +1628,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_dhtrt_findclosest_nodes(routetable, &query); for (int32_t i = 0; i < query.count; ++i) { - if (ks_dht_utility_compact_nodeinfo(&query.nodes[i]->nodeid, - &query.nodes[i]->addr, + ks_dht_node_t *qn = query.nodes[i]; + + if (ks_dht_utility_compact_nodeinfo(&qn->nodeid, + &qn->addr, buffer6, &buffer6_length, - sizeof(buffer6)) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - ks_log(KS_LOG_DEBUG, - "Compacted ipv6 nodeinfo for %s (%s %d)\n", - ks_dht_hexid(&query.nodes[i]->nodeid, id_buf), - query.nodes[i]->addr.host, - query.nodes[i]->addr.port); + sizeof(buffer6)) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); } } @@ -1491,9 +1646,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess message->transactionid, message->transactionid_length, &response, - &r) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + &r) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length)); @@ -1522,6 +1675,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_assert(dht); ks_assert(message); + // @todo pass in the ks_dht_transaction_t from the original query, available one call higher, to get the target id for search updating + // @todo make a utility function to produce a xor of two nodeid's for distance checks based on memcmp on the existing results and new response nodes + // @todo lookup search by target from transaction, lookup responding node id in search pending hash, set entry to finished for purging + // @todo check response nodes for closer nodes than results contain, skip duplicates, add pending and call send_findnode for new closer results + if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; n = ben_dict_get_by_str(message->args, "nodes"); @@ -1539,6 +1697,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; @@ -1558,6 +1717,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); + ks_dhtrt_release_node(node); } while (nodes6_len < nodes6_size) { @@ -1575,6 +1735,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); + ks_dhtrt_release_node(node); } // @todo repeat above for ipv6 table @@ -1593,7 +1754,14 @@ KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks ks_assert(raddr); ks_assert(targetid); - if (ks_dht_setup_query(dht, ep, raddr, "get", ks_dht_process_response_get, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (ks_dht_setup_query(dht, + ep, + raddr, + "get", + ks_dht_process_response_get, + NULL, + &message, + &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available @@ -1635,6 +1803,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Message query get is valid\n"); @@ -1703,6 +1872,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; @@ -1735,6 +1905,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Message query put is valid\n"); @@ -1772,6 +1943,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 538d19ea2f..ca5154c100 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -9,7 +9,15 @@ KS_BEGIN_EXTERN_C #define KS_DHT_DEFAULT_PORT 5309 -#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF + +#define KS_DHT_TPOOL_MIN 2 +#define KS_DHT_TPOOL_MAX 8 +#define KS_DHT_TPOOL_STACK (1024 * 256) +#define KS_DHT_TPOOL_IDLE 10 + +#define KS_DHT_DATAGRAM_BUFFER_SIZE 1000 + +//#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF #define KS_DHT_PULSE_EXPIRATIONS 10 #define KS_DHT_NODEID_SIZE 20 @@ -33,6 +41,7 @@ KS_BEGIN_EXTERN_C #define KS_DHTRT_MAXQUERYSIZE 20 typedef struct ks_dht_s ks_dht_t; +typedef struct ks_dht_datagram_s ks_dht_datagram_t; typedef struct ks_dht_nodeid_s ks_dht_nodeid_t; typedef struct ks_dht_token_s ks_dht_token_t; typedef struct ks_dht_storageitem_key_s ks_dht_storageitem_key_t; @@ -51,6 +60,15 @@ typedef struct ks_dht_storageitem_s ks_dht_storageitem_t; typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message); typedef ks_status_t (*ks_dht_search_callback_t)(ks_dht_t *dht, ks_dht_search_t *search); +struct ks_dht_datagram_s { + ks_pool_t *pool; + ks_dht_t *dht; + ks_dht_endpoint_t *endpoint; + ks_sockaddr_t raddr; + uint8_t buffer[KS_DHT_DATAGRAM_BUFFER_SIZE]; + ks_size_t buffer_length; +}; + /** * Note: This must remain a structure for casting from raw data */ @@ -115,6 +133,8 @@ struct ks_dht_endpoint_s { ks_dht_nodeid_t nodeid; ks_sockaddr_t addr; ks_socket_t sock; + // @todo make sure this node is unlocked, and never gets destroyed, should also never use local nodes in search results as they can be internal + // network addresses, not what others have contacted through ks_dht_node_t *node; }; @@ -122,6 +142,7 @@ struct ks_dht_transaction_s { ks_pool_t *pool; ks_sockaddr_t raddr; uint32_t transactionid; + ks_dht_nodeid_t target; ks_dht_message_callback_t callback; ks_time_t expiration; ks_bool_t finished; @@ -145,13 +166,13 @@ struct ks_dht_search_s { ks_dht_search_callback_t *callbacks; ks_size_t callbacks_size; ks_hash_t *pending; - ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; + ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; // @todo change this to track the nodeid only, and obtain the nodes only if/when needed ks_size_t results_length; }; struct ks_dht_search_pending_s { ks_pool_t *pool; - ks_dht_node_t *node; + ks_dht_node_t *node; // @todo change this to track the nodeid only, and obtain the node only if/when needed ks_time_t expiration; ks_bool_t finished; }; @@ -175,6 +196,9 @@ struct ks_dht_s { ks_pool_t *pool; ks_bool_t pool_alloc; + ks_thread_pool_t *tpool; + ks_bool_t tpool_alloc; + ks_bool_t autoroute; ks_port_t autoroute_port; @@ -194,7 +218,7 @@ struct ks_dht_s { ks_q_t *send_q; ks_dht_message_t *send_q_unsent; - uint8_t recv_buffer[KS_DHT_RECV_BUFFER_SIZE]; + uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error ks_size_t recv_buffer_length; volatile uint32_t transactionid_next; @@ -243,12 +267,13 @@ KS_DECLARE(ks_status_t) ks_dht_free(ks_dht_t **dht); * Constructor function for ks_dht_t. * Must be used regardless of how ks_dht_t is allocated, will allocate and initialize internal state including registration of message handlers. * @param dht pointer to the dht instance + * @param tpool pointer to a thread pool, may be NULL to create a new thread pool internally * @return The ks_status_t result: KS_STATUS_SUCCESS, ... * @see ks_hash_create * @see ks_dht_register_type * @see ks_q_create */ -KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht); +KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht, ks_thread_pool_t *tpool); /** * Destructor function for ks_dht_t. diff --git a/libs/libks/src/dht/ks_dht_datagram.c b/libs/libks/src/dht/ks_dht_datagram.c new file mode 100644 index 0000000000..e27b7493c1 --- /dev/null +++ b/libs/libks/src/dht/ks_dht_datagram.c @@ -0,0 +1,83 @@ +#include "ks_dht.h" +#include "ks_dht-int.h" +#include "sodium.h" + +KS_DECLARE(ks_status_t) ks_dht_datagram_alloc(ks_dht_datagram_t **datagram, ks_pool_t *pool) +{ + ks_dht_datagram_t *dg; + + ks_assert(datagram); + ks_assert(pool); + + *datagram = dg = ks_pool_alloc(pool, sizeof(ks_dht_datagram_t)); + dg->pool = pool; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(void) ks_dht_datagram_prealloc(ks_dht_datagram_t *datagram, ks_pool_t *pool) +{ + ks_assert(datagram); + ks_assert(pool); + + memset(datagram, 0, sizeof(ks_dht_datagram_t)); + + datagram->pool = pool; +} + +KS_DECLARE(ks_status_t) ks_dht_datagram_free(ks_dht_datagram_t **datagram) +{ + ks_assert(datagram); + ks_assert(*datagram); + + ks_dht_datagram_deinit(*datagram); + ks_pool_free((*datagram)->pool, *datagram); + + *datagram = NULL; + + return KS_STATUS_SUCCESS; +} + + +KS_DECLARE(ks_status_t) ks_dht_datagram_init(ks_dht_datagram_t *datagram, ks_dht_t *dht, ks_dht_endpoint_t *endpoint, const ks_sockaddr_t *raddr) +{ + ks_assert(datagram); + ks_assert(datagram->pool); + ks_assert(dht); + ks_assert(endpoint); + ks_assert(raddr); + ks_assert(raddr->family == AF_INET || raddr->family == AF_INET6); + + datagram->dht = dht; + datagram->endpoint = endpoint; + datagram->raddr = *raddr; + + memcpy(datagram->buffer, dht->recv_buffer, dht->recv_buffer_length); + datagram->buffer_length = dht->recv_buffer_length; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) ks_dht_datagram_deinit(ks_dht_datagram_t *datagram) +{ + ks_assert(datagram); + + datagram->buffer_length = 0; + datagram->raddr = (const ks_sockaddr_t){ 0 }; + datagram->endpoint = NULL; + datagram->dht = NULL; + + return KS_STATUS_SUCCESS; +} + + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libks/src/dht/ks_dht_search.c b/libs/libks/src/dht/ks_dht_search.c index 4dfd3b2008..3d52bfd1d0 100644 --- a/libs/libks/src/dht/ks_dht_search.c +++ b/libs/libks/src/dht/ks_dht_search.c @@ -53,7 +53,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search) /** * */ -KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target, ks_dht_search_callback_t callback) +KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target) { ks_status_t ret = KS_STATUS_SUCCESS; @@ -64,8 +64,6 @@ KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht if ((ret = ks_mutex_create(&search->mutex, KS_MUTEX_FLAG_DEFAULT, search->pool)) != KS_STATUS_SUCCESS) return ret; memcpy(search->target.id, target->id, KS_DHT_NODEID_SIZE); - if (callback) ks_dht_search_callback_add(search, callback); - if ((ret = ks_hash_create(&search->pending, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK, @@ -112,11 +110,14 @@ KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_d ks_assert(search); if (callback) { - int32_t index = search->callbacks_size++; + int32_t index; + // @todo lock mutex + index = search->callbacks_size++; search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool, (void *)search->callbacks, sizeof(ks_dht_search_callback_t) * search->callbacks_size); search->callbacks[index] = callback; + // @todo unlock mutex } return KS_STATUS_SUCCESS; } @@ -159,14 +160,14 @@ KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pen return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node, ks_time_t expiration) +KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node) { ks_assert(pending); ks_assert(pending->pool); ks_assert(node); pending->node = node; - pending->expiration = expiration; + pending->expiration = ks_time_now_sec() + KS_DHT_SEARCH_EXPIRATION; pending->finished = KS_FALSE; return KS_STATUS_SUCCESS; diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index 82a00e6364..20200a8b93 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -56,18 +56,18 @@ int main() { err = ks_dht_alloc(&dht1, NULL); ok(err == KS_STATUS_SUCCESS); - err = ks_dht_init(dht1); + err = ks_dht_init(dht1, NULL); ok(err == KS_STATUS_SUCCESS); ks_dht_prealloc(&dht2, dht1->pool); - err = ks_dht_init(&dht2); + err = ks_dht_init(&dht2, NULL); ok(err == KS_STATUS_SUCCESS); err = ks_dht_alloc(&dht3, NULL); ok(err == KS_STATUS_SUCCESS); - err = ks_dht_init(dht3); + err = ks_dht_init(dht3, NULL); ok(err == KS_STATUS_SUCCESS); @@ -174,6 +174,8 @@ int main() { ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response + ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet + ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1 ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet