diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 93afb44a70..22b80530df 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -220,7 +220,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid); KS_DECLARE(void *)ks_dht_process(ks_thread_t *thread, void *data); -KS_DECLARE(ks_status_t) ks_dht_process_(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr); KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t *message); @@ -242,80 +241,60 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag /** * */ -KS_DECLARE(ks_status_t) ks_dht_datagram_alloc(ks_dht_datagram_t **datagram, ks_pool_t *pool); -KS_DECLARE(void) ks_dht_datagram_prealloc(ks_dht_datagram_t *datagram, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht_datagram_free(ks_dht_datagram_t **datagram); +KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram, + ks_pool_t *pool, + ks_dht_t *dht, + ks_dht_endpoint_t *endpoint, + const ks_sockaddr_t *raddr); +KS_DECLARE(void) ks_dht_datagram_destroy(ks_dht_datagram_t **datagram); -KS_DECLARE(ks_status_t) ks_dht_datagram_init(ks_dht_datagram_t *datagram, - ks_dht_t *dht, - ks_dht_endpoint_t *endpoint, - const ks_sockaddr_t *raddr); -KS_DECLARE(ks_status_t) ks_dht_datagram_deinit(ks_dht_datagram_t *datagram); /** * */ -KS_DECLARE(ks_status_t) ks_dht_endpoint_alloc(ks_dht_endpoint_t **endpoint, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht_endpoint_prealloc(ks_dht_endpoint_t *endpoint, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht_endpoint_free(ks_dht_endpoint_t **endpoint); +KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint, + ks_pool_t *pool, + const ks_dht_nodeid_t *nodeid, + const ks_sockaddr_t *addr, + ks_socket_t sock); +KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint); -KS_DECLARE(ks_status_t) ks_dht_endpoint_init(ks_dht_endpoint_t *endpoint, - const ks_dht_nodeid_t *nodeid, - const ks_sockaddr_t *addr, - ks_socket_t sock); -KS_DECLARE(ks_status_t) ks_dht_endpoint_deinit(ks_dht_endpoint_t *endpoint); /** * */ -KS_DECLARE(ks_status_t) ks_dht_search_alloc(ks_dht_search_t **search, ks_pool_t *pool); -KS_DECLARE(void) ks_dht_search_prealloc(ks_dht_search_t *search, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search); - -KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target); -KS_DECLARE(ks_status_t) ks_dht_search_deinit(ks_dht_search_t *search); +KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target); +KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search); KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback); -KS_DECLARE(ks_status_t) ks_dht_search_pending_alloc(ks_dht_search_pending_t **pending, ks_pool_t *pool); -KS_DECLARE(void) ks_dht_search_pending_prealloc(ks_dht_search_pending_t *pending, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pending); +KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid); +KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending); -KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node); -KS_DECLARE(ks_status_t) ks_dht_search_pending_deinit(ks_dht_search_pending_t *pending); /** * */ -KS_DECLARE(ks_status_t) ks_dht_storageitem_alloc(ks_dht_storageitem_t **item, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht_storageitem_prealloc(ks_dht_storageitem_t *item, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht_storageitem_free(ks_dht_storageitem_t **item); +KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v); +KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item, + ks_pool_t *pool, + struct bencode *v, + ks_dht_storageitem_key_t *k, + uint8_t *salt, + ks_size_t salt_length, + int64_t sequence, + ks_dht_storageitem_signature_t *signature); +KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item); -KS_DECLARE(ks_status_t) ks_dht_storageitem_init(ks_dht_storageitem_t *item, struct bencode *v); -KS_DECLARE(ks_status_t) ks_dht_storageitem_deinit(ks_dht_storageitem_t *item); - -KS_DECLARE(ks_status_t) ks_dht_storageitem_create(ks_dht_storageitem_t *item, ks_bool_t mutable); -KS_DECLARE(ks_status_t) ks_dht_storageitem_immutable(ks_dht_storageitem_t *item); -KS_DECLARE(ks_status_t) ks_dht_storageitem_mutable(ks_dht_storageitem_t *item, - ks_dht_storageitem_key_t *k, - uint8_t *salt, - ks_size_t salt_length, - int64_t sequence, - ks_dht_storageitem_signature_t *signature); /** * */ -//KS_DECLARE(ks_status_t) ks_dht_node_alloc(ks_dht_node_t **node, ks_pool_t *pool); -//KS_DECLARE(ks_status_t) ks_dht_node_prealloc(ks_dht_node_t *node, ks_pool_t *pool); -//KS_DECLARE(ks_status_t) ks_dht_node_free(ks_dht_node_t *node); - -//KS_DECLARE(ks_status_t) ks_dht_node_init(ks_dht_node_t *node, const ks_dht_nodeid_t *id, const ks_sockaddr_t *addr); -//KS_DECLARE(ks_status_t) ks_dht_node_deinit(ks_dht_node_t *node); - -//KS_DECLARE(ks_status_t) ks_dht_node_address_check(ks_dht_node_t *node, const ks_sockaddr_t *addr); -//KS_DECLARE(ks_bool_t) ks_dht_node_address_exists(ks_dht_node_t *node, const ks_sockaddr_t *addr); -//KS_DECLARE(ks_status_t) ks_dht_node_address_add(ks_dht_node_t *node, const ks_sockaddr_t *addr); - +KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transaction, + ks_pool_t *pool, + ks_sockaddr_t *raddr, + uint32_t transactionid, + ks_dht_message_callback_t callback); +KS_DECLARE(void) ks_dht_transaction_destroy(ks_dht_transaction_t **transaction); KS_END_EXTERN_C diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index ee713121bb..9128275ac3 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -2,24 +2,29 @@ #include "ks_dht-int.h" #include "sodium.h" -KS_DECLARE(ks_status_t) ks_dht_alloc(ks_dht_t **dht, ks_pool_t *pool) +KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread_pool_t *tpool) { ks_bool_t pool_alloc = !pool; - ks_dht_t *d; + ks_dht_t *d = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); + *dht = NULL; + /** * Create a new internally managed pool if one wasn't provided, and returns KS_STATUS_NO_MEM if pool was not created. */ - if (pool_alloc) ks_pool_open(&pool); - if (!pool) return KS_STATUS_NO_MEM; - + if (pool_alloc && (ret = ks_pool_open(&pool)) != KS_STATUS_SUCCESS) goto done; + /** * Allocate the dht instance from the pool, and returns KS_STATUS_NO_MEM if the dht was not created. */ *dht = d = ks_pool_alloc(pool, sizeof(ks_dht_t)); - if (!d) return KS_STATUS_NO_MEM; + if (!d) { + ret = KS_STATUS_NO_MEM; + goto done; + } /** * Keep track of the pool used for future allocations and cleanup. @@ -28,350 +33,339 @@ KS_DECLARE(ks_status_t) ks_dht_alloc(ks_dht_t **dht, ks_pool_t *pool) d->pool = pool; d->pool_alloc = pool_alloc; - return KS_STATUS_SUCCESS; -} - -KS_DECLARE(void) ks_dht_prealloc(ks_dht_t *dht, ks_pool_t *pool) -{ - ks_assert(dht); - ks_assert(pool); - - /** - * Treat preallocate function like allocate, zero the memory like pool allocations do. - */ - memset(dht, 0, sizeof(ks_dht_t)); - - /** - * Keep track of the pool used for future allocations, pool must - */ - dht->pool = pool; - dht->pool_alloc = KS_FALSE; -} - -KS_DECLARE(ks_status_t) ks_dht_free(ks_dht_t **dht) -{ - ks_pool_t *pool = NULL; - ks_bool_t pool_alloc = KS_FALSE; - ks_status_t ret = KS_STATUS_SUCCESS; - - ks_assert(dht); - ks_assert(*dht); - - /** - * Call ks_dht_deinit to ensure everything has been cleaned up internally. - * The pool member variables must not be messed with in deinit, they are managed at the allocator layer. - */ - if ((ret = ks_dht_deinit(*dht)) != KS_STATUS_SUCCESS) return ret; - - /** - * Temporarily store the allocator level variables because freeing the dht instance will invalidate it. - */ - pool = (*dht)->pool; - pool_alloc = (*dht)->pool_alloc; - - /** - * Free the dht instance from the pool, after this the dht instance memory is invalid. - */ - if ((ret = ks_pool_free((*dht)->pool, *dht)) != KS_STATUS_SUCCESS) return ret; - - /** - * At this point dht instance is invalidated so NULL the pointer. - */ - *dht = NULL; - - /** - * If the pool was allocated internally, destroy it using the temporary variables stored earlier. - * If this fails, something catastrophically bad happened like memory corruption. - */ - if (pool_alloc && (ret = ks_pool_close(&pool)) != KS_STATUS_SUCCESS) return ret; - - - return KS_STATUS_SUCCESS; -} - - -KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht, ks_thread_pool_t *tpool) -{ - ks_status_t ret = KS_STATUS_SUCCESS; - - ks_assert(dht); - ks_assert(dht->pool); - /** * Create a new internally managed thread pool if one wasn't provided. */ + d->tpool = tpool; if (!tpool) { - if ((ret = ks_thread_pool_create(&tpool, + d->tpool_alloc = KS_TRUE; + if ((ret = ks_thread_pool_create(&d->tpool, KS_DHT_TPOOL_MIN, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, - KS_DHT_TPOOL_IDLE)) != KS_STATUS_SUCCESS) return ret; - dht->tpool_alloc = KS_TRUE; + KS_DHT_TPOOL_IDLE)) != KS_STATUS_SUCCESS) goto done; } - dht->tpool = tpool; /** * Default autorouting to disabled. */ - dht->autoroute = KS_FALSE; - dht->autoroute_port = 0; + d->autoroute = KS_FALSE; + d->autoroute_port = 0; /** * Create the message type registry. */ - if ((ret = ks_hash_create(&dht->registry_type, + if ((ret = ks_hash_create(&d->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, - dht->pool)) != KS_STATUS_SUCCESS) return ret; + d->pool)) != KS_STATUS_SUCCESS) goto done; /** * Register the message type callbacks for query (q), response (r), and error (e) */ - ks_dht_register_type(dht, "q", ks_dht_process_query); - ks_dht_register_type(dht, "r", ks_dht_process_response); - ks_dht_register_type(dht, "e", ks_dht_process_error); + ks_dht_register_type(d, "q", ks_dht_process_query); + ks_dht_register_type(d, "r", ks_dht_process_response); + ks_dht_register_type(d, "e", ks_dht_process_error); /** * Create the message query registry. */ - if ((ret = ks_hash_create(&dht->registry_query, + if ((ret = ks_hash_create(&d->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, - dht->pool)) != KS_STATUS_SUCCESS) return ret; + d->pool)) != KS_STATUS_SUCCESS) goto done; /** * Register the message query callbacks for ping, find_node, etc. */ - ks_dht_register_query(dht, "ping", ks_dht_process_query_ping); - ks_dht_register_query(dht, "find_node", ks_dht_process_query_findnode); - ks_dht_register_query(dht, "get", ks_dht_process_query_get); - ks_dht_register_query(dht, "put", ks_dht_process_query_put); + ks_dht_register_query(d, "ping", ks_dht_process_query_ping); + ks_dht_register_query(d, "find_node", ks_dht_process_query_findnode); + ks_dht_register_query(d, "get", ks_dht_process_query_get); + ks_dht_register_query(d, "put", ks_dht_process_query_put); /** * Create the message error registry. */ - if ((ret = ks_hash_create(&dht->registry_error, + if ((ret = ks_hash_create(&d->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, - dht->pool)) != KS_STATUS_SUCCESS) return ret; + d->pool)) != KS_STATUS_SUCCESS) goto done; // @todo register 301 error for internal get/put CAS hash mismatch retry handler /** * Default these to FALSE, binding will set them TRUE when a respective address is bound. * @todo these may not be useful anymore they are from legacy code */ - dht->bind_ipv4 = KS_FALSE; - dht->bind_ipv6 = KS_FALSE; + d->bind_ipv4 = KS_FALSE; + d->bind_ipv6 = KS_FALSE; /** * Initialize the data used to track endpoints to NULL, binding will handle latent allocations. * The endpoints and endpoints_poll arrays are maintained in parallel to optimize polling. */ - dht->endpoints = NULL; - dht->endpoints_size = 0; - dht->endpoints_poll = NULL; + d->endpoints = NULL; + d->endpoints_size = 0; + d->endpoints_poll = NULL; /** * Create the endpoints hash for fast lookup, this is used to route externally provided remote addresses when the local endpoint is unknown. * This also provides the basis for autorouting to find unbound interfaces and bind them at runtime. * This hash uses the host ip string concatenated with a colon and the port, ie: "123.123.123.123:123" or ipv6 equivilent */ - if ((ret = ks_hash_create(&dht->endpoints_hash, + if ((ret = ks_hash_create(&d->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, - dht->pool)) != KS_STATUS_SUCCESS) return ret; + d->pool)) != KS_STATUS_SUCCESS) goto done; /** * Default expirations to not be checked for one pulse. */ - dht->pulse_expirations = ks_time_now_sec() + KS_DHT_PULSE_EXPIRATIONS; + d->pulse_expirations = ks_time_now_sec() + KS_DHT_PULSE_EXPIRATIONS; /** * Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full. */ - if ((ret = ks_q_create(&dht->send_q, dht->pool, 0)) != KS_STATUS_SUCCESS) return ret; + if ((ret = ks_q_create(&d->send_q, d->pool, 0)) != KS_STATUS_SUCCESS) goto done; /** * If a message is popped from the queue for sending but the system buffers are too full, this is used to temporarily store the message. */ - dht->send_q_unsent = NULL; + d->send_q_unsent = NULL; /** * The dht uses a single internal large receive buffer for receiving all frames, this may change in the future to offload processing to a threadpool. */ - dht->recv_buffer_length = 0; + d->recv_buffer_length = 0; + + /** + * Initialize the transaction id mutex, should use atomic increment instead + */ + if ((ret = ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool)) != KS_STATUS_SUCCESS) goto done; /** * Initialize the first transaction id randomly, this doesn't really matter. */ - dht->transactionid_next = 1; //rand(); + d->transactionid_next = 1; //rand(); /** * Create the hash to track pending transactions on queries that are pending responses. * It should be impossible to receive a duplicate transaction id in the hash before it expires, but if it does an error is preferred. */ - if ((ret = ks_hash_create(&dht->transactions_hash, + if ((ret = ks_hash_create(&d->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, - dht->pool)) != KS_STATUS_SUCCESS) return ret; + d->pool)) != KS_STATUS_SUCCESS) goto done; /** * The internal route tables will be latent allocated when binding. */ - dht->rt_ipv4 = NULL; - dht->rt_ipv6 = NULL; + d->rt_ipv4 = NULL; + d->rt_ipv6 = NULL; + /** + * Create the hash to store searches. + */ + if ((ret = ks_hash_create(&d->search_hash, + KS_HASH_MODE_ARBITRARY, + KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, + d->pool)) != KS_STATUS_SUCCESS) goto done; + /** + * The search hash uses arbitrary key size, which requires the key size be provided. + */ + ks_hash_set_keysize(d->search_hash, KS_DHT_NODEID_SIZE); + /** * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets. */ - dht->token_secret_current = dht->token_secret_previous = rand(); - dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION; + d->token_secret_current = d->token_secret_previous = rand(); + d->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION; /** * Create the hash to store arbitrary data for BEP44. */ - if ((ret = ks_hash_create(&dht->storage_hash, + if ((ret = ks_hash_create(&d->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, - dht->pool)) != KS_STATUS_SUCCESS) return ret; + d->pool)) != KS_STATUS_SUCCESS) goto done; /** * The storage hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's. */ - ks_hash_set_keysize(dht->storage_hash, KS_DHT_NODEID_SIZE); + ks_hash_set_keysize(d->storage_hash, KS_DHT_NODEID_SIZE); - return KS_STATUS_SUCCESS; + done: + if (ret != KS_STATUS_SUCCESS) { + if (d) ks_dht_destroy(&d); + else if (pool_alloc && pool) ks_pool_close(&pool); + + *dht = NULL; + } + return ret; } -KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht) +KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) { - ks_hash_iterator_t *it; - ks_status_t ret = KS_STATUS_SUCCESS; - + ks_dht_t *d = NULL; + ks_pool_t *pool = NULL; + ks_bool_t pool_alloc = KS_FALSE; + ks_hash_iterator_t *it = NULL; + ks_assert(dht); + ks_assert(*dht); + + d = *dht; /** * Cleanup the storage hash and it's contents if it is allocated. */ - if (dht->storage_hash) { - for (it = ks_hash_first(dht->storage_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - const void *key; + if (d->storage_hash) { + for (it = ks_hash_first(d->storage_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { ks_dht_storageitem_t *val; - ks_hash_this(it, &key, NULL, (void **)&val); - if ((ret = ks_dht_storageitem_deinit(val)) != KS_STATUS_SUCCESS) return ret; - if ((ret = ks_dht_storageitem_free(&val)) != KS_STATUS_SUCCESS) return ret; + + ks_hash_this_val(it, (void **)&val); + + ks_dht_storageitem_destroy(&val); } - ks_hash_destroy(&dht->storage_hash); + ks_hash_destroy(&d->storage_hash); } /** * Zero out the opaque write token variables. */ - dht->token_secret_current = 0; - dht->token_secret_previous = 0; - dht->token_secret_expiration = 0; + d->token_secret_current = 0; + d->token_secret_previous = 0; + d->token_secret_expiration = 0; + + /** + * Cleanup the search hash and it's contents if it is allocated. + */ + if (d->search_hash) { + for (it = ks_hash_first(d->search_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + ks_dht_search_t *val; + + ks_hash_this_val(it, (void **)&val); + ks_dht_search_destroy(&val); + } + ks_hash_destroy(&d->search_hash); + } /** * Cleanup the route tables if they are allocated. + * @todo check if endpoints need to be destroyed first to release the readlock on their node */ - if (dht->rt_ipv4) ks_dhtrt_deinitroute(&dht->rt_ipv4); - if (dht->rt_ipv6) ks_dhtrt_deinitroute(&dht->rt_ipv6); + if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4); + if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6); /** - * Cleanup the transactions hash if it is allocated. + * Cleanup the transactions mutex and hash if they are allocated. */ - dht->transactionid_next = 0; - if (dht->transactions_hash) ks_hash_destroy(&dht->transactions_hash); + d->transactionid_next = 0; + if (d->tid_mutex) ks_mutex_destroy(&d->tid_mutex); + if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash); /** * Probably don't need this, recv_buffer_length is temporary and may change */ - dht->recv_buffer_length = 0; + d->recv_buffer_length = 0; /** * Cleanup the send queue and it's contents if it is allocated. */ - if (dht->send_q) { + if (d->send_q) { ks_dht_message_t *msg; - while (ks_q_pop_timeout(dht->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) { - if ((ret = ks_dht_message_deinit(msg)) != KS_STATUS_SUCCESS) return ret; - if ((ret = ks_dht_message_free(&msg)) != KS_STATUS_SUCCESS) return ret; - } - if ((ret = ks_q_destroy(&dht->send_q)) != KS_STATUS_SUCCESS) return ret; + while (ks_q_pop_timeout(d->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) ks_dht_message_destroy(&msg); + ks_q_destroy(&d->send_q); } /** * Cleanup the cached popped message if it is set. */ - if (dht->send_q_unsent) { - if ((ret = ks_dht_message_deinit(dht->send_q_unsent)) != KS_STATUS_SUCCESS) return ret; - if ((ret = ks_dht_message_free(&dht->send_q_unsent)) != KS_STATUS_SUCCESS) return ret; - } + if (d->send_q_unsent) ks_dht_message_destroy(&d->send_q_unsent); /** * Probably don't need this */ - dht->pulse_expirations = 0; + d->pulse_expirations = 0; /** * Cleanup any endpoints that have been allocated. */ - for (int32_t i = 0; i < dht->endpoints_size; ++i) { - ks_dht_endpoint_t *ep = dht->endpoints[i]; - if ((ret = ks_dht_endpoint_deinit(ep)) != KS_STATUS_SUCCESS) return ret; - if ((ret = ks_dht_endpoint_free(&ep)) != KS_STATUS_SUCCESS) return ret; + for (int32_t i = 0; i < d->endpoints_size; ++i) { + ks_dht_endpoint_t *ep = d->endpoints[i]; + ks_dht_endpoint_destroy(&ep); } - dht->endpoints_size = 0; + d->endpoints_size = 0; /** * Cleanup the array of endpoint pointers if it is allocated. */ - if (dht->endpoints) { - if ((ret = ks_pool_free(dht->pool, dht->endpoints)) != KS_STATUS_SUCCESS) return ret; - dht->endpoints = NULL; + if (d->endpoints) { + ks_pool_free(d->pool, d->endpoints); + d->endpoints = NULL; } /** * Cleanup the array of endpoint polling data if it is allocated. */ - if (dht->endpoints_poll) { - if ((ret = ks_pool_free(dht->pool, dht->endpoints_poll)) != KS_STATUS_SUCCESS) return ret; - dht->endpoints_poll = NULL; + if (d->endpoints_poll) { + ks_pool_free(d->pool, d->endpoints_poll); + d->endpoints_poll = NULL; } /** * Cleanup the endpoints hash if it is allocated. */ - if (dht->endpoints_hash) ks_hash_destroy(&dht->endpoints_hash); + if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash); /** * Probably don't need this */ - dht->bind_ipv4 = KS_FALSE; - dht->bind_ipv6 = KS_FALSE; + d->bind_ipv4 = KS_FALSE; + d->bind_ipv6 = KS_FALSE; /** * Cleanup the type, query, and error registries if they have been allocated. */ - if (dht->registry_type) ks_hash_destroy(&dht->registry_type); - if (dht->registry_query) ks_hash_destroy(&dht->registry_query); - if (dht->registry_error) ks_hash_destroy(&dht->registry_error); + if (d->registry_type) ks_hash_destroy(&d->registry_type); + if (d->registry_query) ks_hash_destroy(&d->registry_query); + if (d->registry_error) ks_hash_destroy(&d->registry_error); /** * Probably don't need this */ - dht->autoroute = KS_FALSE; - dht->autoroute_port = 0; + d->autoroute = KS_FALSE; + d->autoroute_port = 0; /** * If the thread pool was allocated internally, destroy it. * If this fails, something catastrophically bad happened like memory corruption. */ - if (dht->tpool_alloc && (ret = ks_thread_pool_destroy(&dht->tpool)) != KS_STATUS_SUCCESS) return ret; - dht->tpool_alloc = KS_FALSE; + if (d->tpool_alloc) ks_thread_pool_destroy(&d->tpool); + d->tpool_alloc = KS_FALSE; - return KS_STATUS_SUCCESS; + /** + * Temporarily store the allocator level variables because freeing the dht instance will invalidate it. + */ + pool = d->pool; + pool_alloc = d->pool_alloc; + + /** + * Free the dht instance from the pool, after this the dht instance memory is invalid. + */ + ks_pool_free(d->pool, d); + + /** + * At this point dht instance is invalidated so NULL the pointer. + */ + *dht = d = NULL; + + /** + * If the pool was allocated internally, destroy it using the temporary variables stored earlier. + * If this fails, something catastrophically bad happened like memory corruption. + */ + if (pool_alloc) ks_pool_close(&pool); } + KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t port) { @@ -520,12 +514,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid /** * Allocate the endpoint to track the local socket. */ - if ((ret = ks_dht_endpoint_alloc(&ep, dht->pool)) != KS_STATUS_SUCCESS) goto done; - - /** - * Initialize the node, may provide NULL nodeid to have one generated internally. - */ - if ((ret = ks_dht_endpoint_init(ep, nodeid, addr, sock)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_endpoint_create(&ep, dht->pool, nodeid, addr, sock)) != KS_STATUS_SUCCESS) goto done; /** * Resize the endpoints array to take another endpoint pointer. @@ -591,14 +580,13 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid if (ret != KS_STATUS_SUCCESS) { /** * If any failures occur, we need to make sure the socket is properly closed. - * This will be done in ks_dht_endpoint_deinit only if the socket was assigned during a successful ks_dht_endpoint_init. + * This will be done in ks_dht_endpoint_destroy only if the socket was assigned during a successful ks_dht_endpoint_create. * Then return whatever failure condition resulted in landed here. */ - if (sock != KS_SOCK_INVALID && ep && ep->sock == KS_SOCK_INVALID) ks_socket_close(&sock); - if (ep) { - ks_dht_endpoint_deinit(ep); - ks_dht_endpoint_free(&ep); - } + if (ep) ks_dht_endpoint_destroy(&ep); + else if (sock != KS_SOCK_INVALID) ks_socket_close(&sock); + + if (endpoint) *endpoint = NULL; } return ret; } @@ -607,49 +595,37 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) { ks_dht_datagram_t *datagram = NULL; int32_t result; + ks_sockaddr_t raddr; ks_assert(dht); - ks_assert (timeout >= 0); + ks_assert (timeout > 0); - // @todo why was old DHT code checking for poll descriptor resizing here? - - if (timeout == 0) { - // @todo deal with default timeout, should return quickly but not hog the CPU polling - } + if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0; result = ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout); if (result > 0) { for (int32_t i = 0; i < dht->endpoints_size; ++i) { - if (dht->endpoints_poll[i].revents & POLLIN) { - ks_sockaddr_t raddr = KS_SA_INIT; - dht->recv_buffer_length = sizeof(dht->recv_buffer); - - raddr.family = dht->endpoints[i]->addr.family; - if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) { - if (dht->recv_buffer_length == sizeof(dht->recv_buffer)) { - ks_log(KS_LOG_DEBUG, "Dropped oversize datagram from %s %d\n", raddr.host, raddr.port); - } else { - // @todo check for recycled datagrams - if (ks_dht_datagram_alloc(&datagram, dht->pool) == KS_STATUS_SUCCESS) { - if (ks_dht_datagram_init(datagram, dht, dht->endpoints[i], &raddr) != KS_STATUS_SUCCESS) { - // @todo add to recycled datagrams - ks_dht_datagram_free(&datagram); - } else if (ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) { - // @todo add to recycled datagrams - ks_dht_datagram_deinit(datagram); - ks_dht_datagram_free(&datagram); - } - } - } - } + if (!(dht->endpoints_poll[i].revents & POLLIN)) continue; + + raddr = (const ks_sockaddr_t){ 0 }; + dht->recv_buffer_length = sizeof(dht->recv_buffer); + raddr.family = dht->endpoints[i]->addr.family; + if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) != KS_STATUS_SUCCESS) continue; + + if (dht->recv_buffer_length == sizeof(dht->recv_buffer)) { + ks_log(KS_LOG_DEBUG, "Dropped oversize datagram from %s %d\n", raddr.host, raddr.port); + continue; } + + if (ks_dht_datagram_create(&datagram, dht->pool, dht, dht->endpoints[i], &raddr) == KS_STATUS_SUCCESS && + ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram); } } - ks_dht_pulse_expirations(dht); - ks_dht_pulse_send(dht); + ks_dht_pulse_expirations(dht); + if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4); if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6); } @@ -678,8 +654,8 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) remove = KS_TRUE; } if (remove) { - ks_hash_remove(dht->transactions_hash, (char *)key); - ks_pool_free(value->pool, value); + ks_hash_remove(dht->transactions_hash, (void *)key); + ks_dht_transaction_destroy(&value); } } ks_hash_write_unlock(dht->transactions_hash); @@ -709,10 +685,7 @@ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht) if (!bail) { bail = (ret = ks_dht_send(dht, message)) != KS_STATUS_SUCCESS; if (ret == KS_STATUS_BREAK) dht->send_q_unsent = message; - else if (ret == KS_STATUS_SUCCESS) { - ks_dht_message_deinit(message); - ks_dht_message_free(&message); - } + else ks_dht_message_destroy(&message); } } } @@ -731,6 +704,15 @@ KS_DECLARE(char *) ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer) return buffer; } +KS_DECLARE(void) ks_dht_utility_nodeid_xor(ks_dht_nodeid_t *dest, ks_dht_nodeid_t *src1, ks_dht_nodeid_t *src2) +{ + ks_assert(dest); + ks_assert(src1); + ks_assert(src2); + + for (int32_t i = 0; i < KS_DHT_NODEID_SIZE; ++i) dest->id[i] = src1->id[i] ^ src2->id[i]; +} + KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *address, uint8_t *buffer, ks_size_t *buffer_length, @@ -992,16 +974,14 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret; - // @todo atomic increment or mutex + // @todo atomic increment + ks_mutex_lock(dht->tid_mutex); transactionid = dht->transactionid_next++; + ks_mutex_unlock(dht->tid_mutex); - if ((ret = ks_dht_transaction_alloc(&trans, dht->pool)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_transaction_create(&trans, dht->pool, raddr, transactionid, callback)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_transaction_init(trans, raddr, transactionid, callback)) != KS_STATUS_SUCCESS) goto done; - - if ((ret = ks_dht_message_alloc(&msg, dht->pool)) != KS_STATUS_SUCCESS) goto done; - - if ((ret = ks_dht_message_init(msg, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done; @@ -1018,14 +998,8 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, done: if (ret != KS_STATUS_SUCCESS) { - if (trans) { - ks_dht_transaction_deinit(trans); - ks_dht_transaction_free(&trans); - } - if (msg) { - ks_dht_message_deinit(msg); - ks_dht_message_free(&msg); - } + if (trans) ks_dht_transaction_destroy(&trans); + if (msg) ks_dht_message_destroy(&msg); *message = NULL; } return ret; @@ -1051,9 +1025,7 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret; - if ((ret = ks_dht_message_alloc(&msg, dht->pool)) != KS_STATUS_SUCCESS) goto done; - - if ((ret = ks_dht_message_init(msg, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done; @@ -1062,9 +1034,8 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, ret = KS_STATUS_SUCCESS; done: - if (ret != KS_STATUS_SUCCESS && msg) { - ks_dht_message_deinit(msg); - ks_dht_message_free(&msg); + if (ret != KS_STATUS_SUCCESS) { + if (msg) ks_dht_message_destroy(&msg); *message = NULL; } return ret; @@ -1074,7 +1045,7 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, KS_DECLARE(void *) ks_dht_process(ks_thread_t *thread, void *data) { ks_dht_datagram_t *datagram = (ks_dht_datagram_t *)data; - ks_dht_message_t message; + ks_dht_message_t *message = NULL; ks_dht_message_callback_t callback; ks_assert(thread); @@ -1083,67 +1054,27 @@ KS_DECLARE(void *) ks_dht_process(ks_thread_t *thread, void *data) ks_log(KS_LOG_DEBUG, "Received message from %s %d\n", datagram->raddr.host, datagram->raddr.port); if (datagram->raddr.family != AF_INET && datagram->raddr.family != AF_INET6) { ks_log(KS_LOG_DEBUG, "Message from unsupported address family\n"); - return NULL; + goto done; } // @todo blacklist check for bad actor nodes - if (ks_dht_message_prealloc(&message, datagram->dht->pool) != KS_STATUS_SUCCESS) return NULL; + if (ks_dht_message_create(&message, datagram->dht->pool, datagram->endpoint, &datagram->raddr, KS_FALSE) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_init(&message, datagram->endpoint, &datagram->raddr, KS_FALSE) != KS_STATUS_SUCCESS) return NULL; + if (ks_dht_message_parse(message, datagram->buffer, datagram->buffer_length) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_parse(&message, datagram->buffer, datagram->buffer_length) != KS_STATUS_SUCCESS) goto done; - - callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(datagram->dht->registry_type, message.type, KS_READLOCKED); + callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(datagram->dht->registry_type, message->type, KS_READLOCKED); ks_hash_read_unlock(datagram->dht->registry_type); - if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); - else callback(datagram->dht, &message); + if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message->type); + else callback(datagram->dht, message); done: - ks_dht_message_deinit(&message); - - // @todo recycle datagram - ks_dht_datagram_deinit(datagram); - ks_dht_datagram_free(&datagram); - + if (message) ks_dht_message_destroy(&message); + if (datagram) ks_dht_datagram_destroy(&datagram); return NULL; } -KS_DECLARE(ks_status_t) ks_dht_process_(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr) -{ - ks_dht_message_t message; - ks_dht_message_callback_t callback; - ks_status_t ret = KS_STATUS_FAIL; - - ks_assert(dht); - ks_assert(raddr); - - ks_log(KS_LOG_DEBUG, "Received message from %s %d\n", raddr->host, raddr->port); - if (raddr->family != AF_INET && raddr->family != AF_INET6) { - ks_log(KS_LOG_DEBUG, "Message from unsupported address family\n"); - return KS_STATUS_FAIL; - } - - // @todo blacklist check for bad actor nodes - - if (ks_dht_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - - if (ks_dht_message_init(&message, ep, raddr, KS_FALSE) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - - if (ks_dht_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) goto done; - - callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_READLOCKED); - ks_hash_read_unlock(dht->registry_type); - - if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); - else ret = callback(dht, &message); - - done: - ks_dht_message_deinit(&message); - - return ret; -} KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *message) { @@ -1230,6 +1161,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t transaction->raddr.host, transaction->raddr.port); } else { + message->transaction = transaction; ret = transaction->callback(dht, message); transaction->finished = KS_TRUE; } @@ -1258,12 +1190,11 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, // check hash for target to see if search already exists s = ks_hash_search(dht->search_hash, target->id, KS_READLOCKED); - ks_hash_read_unlock(dht->search_hash); + ks_hash_read_unlock(dht->search_hash); // @todo hold lock until finished adding new entry? // if search does not exist, create new search and store in hash by target if (!s) { - if ((ret = ks_dht_search_alloc(&s, dht->pool)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_search_init(s, target)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_search_create(&s, dht->pool, target)) != KS_STATUS_SUCCESS) goto done; allocated = KS_TRUE; } else inserted = KS_TRUE; @@ -1278,26 +1209,27 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, query.type = KS_DHT_REMOTE; query.max = KS_DHT_SEARCH_RESULTS_MAX_SIZE; query.family = family; + query.count = 0; ks_dhtrt_findclosest_nodes(family == AF_INET ? dht->rt_ipv4 : dht->rt_ipv6, &query); for (int32_t i = 0; i < query.count; ++i) { ks_dht_node_t *n = query.nodes[i]; ks_dht_search_pending_t *pending = NULL; - s->results[i] = n; + + s->results[i] = n->nodeid; + ks_dht_utility_nodeid_xor(&s->distances[i], &n->nodeid, &s->target); // add to pending with expiration - if ((ret = ks_dht_search_pending_alloc(&pending, s->pool)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_search_pending_init(pending, n)) != KS_STATUS_SUCCESS) { - ks_dht_search_pending_free(&pending); + if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done; + if (!ks_hash_insert(s->pending, n->nodeid.id, pending)) { + ks_dht_search_pending_destroy(&pending); + ret = KS_STATUS_FAIL; goto done; } - if (!ks_hash_insert(s->pending, n->nodeid.id, n)) { - ks_dht_search_pending_deinit(pending); - ks_dht_search_pending_free(&pending); - goto done; - } - // @todo call send_findnode, but transactions need to track the target id from a find_node query since find_node response does not contain it + if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done; } s->results_length = query.count; - + // @todo release query nodes + + // @todo if entry has been added since we checked above this may fail, try adding callback instead of failing? or retain lock from earlier if (!ks_hash_insert(dht->search_hash, s->target.id, s)) { ret = KS_STATUS_FAIL; goto done; @@ -1305,13 +1237,9 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, inserted = KS_TRUE; if (search) *search = s; - ret = KS_STATUS_SUCCESS; done: - if (ret != KS_STATUS_SUCCESS && !inserted && s) { - ks_dht_search_deinit(s); - ks_dht_search_free(&s); - } + if (ret != KS_STATUS_SUCCESS && !inserted && s) ks_dht_search_destroy(&s); return ret; } @@ -1335,9 +1263,7 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dht_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - - if (ks_dht_message_init(error, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done; + if (ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) goto done; @@ -1350,10 +1276,7 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, ret = KS_STATUS_SUCCESS; done: - if (ret != KS_STATUS_SUCCESS && error) { - ks_dht_message_deinit(error); - ks_dht_message_free(&error); - } + if (ret != KS_STATUS_SUCCESS && error) ks_dht_message_destroy(&error); return ret; } @@ -1671,14 +1594,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_dhtrt_routetable_t *routetable = NULL; ks_dht_node_t *node = NULL; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_dht_search_t *search = NULL; ks_assert(dht); ks_assert(message); - - // @todo pass in the ks_dht_transaction_t from the original query, available one call higher, to get the target id for search updating - // @todo make a utility function to produce a xor of two nodeid's for distance checks based on memcmp on the existing results and new response nodes - // @todo lookup search by target from transaction, lookup responding node id in search pending hash, set entry to finished for purging - // @todo check response nodes for closer nodes than results contain, skip duplicates, add pending and call send_findnode for new closer results + ks_assert(message->transaction); if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; @@ -1702,6 +1622,14 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + search = ks_hash_search(dht->search_hash, message->transaction->target.id, KS_READLOCKED); + ks_hash_read_unlock(dht->search_hash); + if (search) { + ks_dht_search_pending_t *pending = ks_hash_search(search->pending, id->id, KS_READLOCKED); + ks_hash_read_unlock(search->pending); + if (pending) pending->finished = KS_TRUE; + } + while (nodes_len < nodes_size) { ks_dht_nodeid_t nid; ks_sockaddr_t addr; @@ -1718,6 +1646,49 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); ks_dhtrt_release_node(node); + + if (search) { + ks_dht_nodeid_t distance; + int32_t results_index = -1; + + ks_dht_utility_nodeid_xor(&distance, &nid, &search->target); + if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) { + results_index = search->results_length; + search->results_length++; + } else { + for (int32_t index = 0; index < search->results_length; ++index) { + // Check if new node is closer than this existing result + if (memcmp(distance.id, search->distances[index].id, KS_DHT_NODEID_SIZE) < 0) { + // If this is the first node that is further then keep it + // Else if two or more nodes are further, and this existing result is further than the previous one then keep it + if (results_index < 0) results_index = index; + else if (memcmp(search->distances[index].id, search->distances[results_index].id, KS_DHT_NODEID_SIZE) > 0) results_index = index; + } + } + } + + if (results_index >= 0) { + char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + char id3_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_dht_search_pending_t *pending = NULL; + + ks_log(KS_LOG_DEBUG, + "Set closer node id %s (%s) in search of target id %s at results index %d\n", + ks_dht_hexid(&nid, id_buf), + ks_dht_hexid(&distance, id2_buf), + ks_dht_hexid(&search->target, id3_buf), + results_index); + search->results[results_index] = nid; + search->distances[results_index] = distance; + + if (ks_dht_search_pending_create(&pending, search->pool, &nid) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (!ks_hash_insert(search->pending, nid.id, pending)) { + ks_dht_search_pending_destroy(&pending); + return KS_STATUS_FAIL; + } + if (ks_dht_send_findnode(dht, NULL, &addr, &search->target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + } + } } while (nodes6_len < nodes6_size) { diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 1ceb99490e..0b4007b5c1 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -124,6 +124,7 @@ struct ks_dht_message_s { struct bencode *data; uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE]; ks_size_t transactionid_length; + ks_dht_transaction_t *transaction; char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE]; struct bencode *args; }; @@ -166,13 +167,14 @@ struct ks_dht_search_s { ks_dht_search_callback_t *callbacks; ks_size_t callbacks_size; ks_hash_t *pending; - ks_dht_node_t *results[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; // @todo change this to track the nodeid only, and obtain the nodes only if/when needed + ks_dht_nodeid_t results[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; + ks_dht_nodeid_t distances[KS_DHT_SEARCH_RESULTS_MAX_SIZE]; ks_size_t results_length; }; struct ks_dht_search_pending_s { ks_pool_t *pool; - ks_dht_node_t *node; // @todo change this to track the nodeid only, and obtain the node only if/when needed + ks_dht_nodeid_t nodeid; ks_time_t expiration; ks_bool_t finished; }; @@ -221,6 +223,7 @@ struct ks_dht_s { uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error ks_size_t recv_buffer_length; + ks_mutex_t *tid_mutex; volatile uint32_t transactionid_next; ks_hash_t *transactions_hash; @@ -236,61 +239,21 @@ struct ks_dht_s { }; /** - * Allocator function for ks_dht_t. - * Should be used when a ks_dht_t is allocated on the heap, and may provide an external memory pool or allocate one internally. + * Constructor function for ks_dht_t. + * Will allocate and initialize internal state including registration of message handlers. * @param dht dereferenced out pointer to the allocated dht instance - * @param pool pointer to the memory pool used by the dht instance, may be NULL to create a new pool internally - * @param The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM + * @param pool pointer to the memory pool used by the dht instance, may be NULL to create a new memory pool internally + * @param tpool pointer to a thread pool used by the dht instance, may be NULL to create a new thread pool internally + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM */ -KS_DECLARE(ks_status_t) ks_dht_alloc(ks_dht_t **dht, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread_pool_t *tpool); -/** - * Preallocator function for ks_dht_t. - * Should be used when a ks_dht_t is preallocated on the stack or within another structure, and must provide an external memory pool. - * @param dht pointer to the dht instance - * @param pool pointer to the memory pool used by the dht instance - */ -KS_DECLARE(void) ks_dht_prealloc(ks_dht_t *dht, ks_pool_t *pool); - -/** - * Deallocator function for ks_dht_t. - * Must be used when a ks_dht_t is allocated using ks_dht_alloc, will also destroy memory pool if it was created internally. - * @param dht dereferenced in/out pointer to the dht instance, NULL upon return - * @return The ks_status_t result: KS_STATUS_SUCCESS, ... - * @see ks_dht_deinit - * @see ks_pool_free - * @see ks_pool_close - */ -KS_DECLARE(ks_status_t) ks_dht_free(ks_dht_t **dht); - -/** - * Constructor function for ks_dht_t. - * Must be used regardless of how ks_dht_t is allocated, will allocate and initialize internal state including registration of message handlers. - * @param dht pointer to the dht instance - * @param tpool pointer to a thread pool, may be NULL to create a new thread pool internally - * @return The ks_status_t result: KS_STATUS_SUCCESS, ... - * @see ks_hash_create - * @see ks_dht_register_type - * @see ks_q_create - */ -KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht, ks_thread_pool_t *tpool); - /** * Destructor function for ks_dht_t. - * Must be used regardless of how ks_dht_t is allocated, will deallocate and deinitialize internal state. - * @param dht pointer to the dht instance - * @return The ks_status_t result: KS_STATUS_SUCCESS, ... - * @see ks_dht_storageitem_deinit - * @see ks_dht_storageitem_free - * @see ks_hash_destroy - * @see ks_dht_message_deinit - * @see ks_dht_message_free - * @see ks_q_destroy - * @see ks_dht_endpoint_deinit - * @see ks_dht_endpoint_free - * @see ks_pool_free + * Will deinitialize and deallocate internal state. + * @param dht dereferenced in/out pointer to the dht instance, NULL upon return */ -KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht); +KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht); /** * Enable or disable (default) autorouting support. @@ -361,27 +324,15 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout); /** * */ -KS_DECLARE(ks_status_t) ks_dht_message_alloc(ks_dht_message_t **message, ks_pool_t *pool); - +KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message, + ks_pool_t *pool, + ks_dht_endpoint_t *endpoint, + ks_sockaddr_t *raddr, + ks_bool_t alloc_data); /** * */ -KS_DECLARE(ks_status_t) ks_dht_message_prealloc(ks_dht_message_t *message, ks_pool_t *pool); - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_free(ks_dht_message_t **message); - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_init(ks_dht_message_t *message, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_bool_t alloc_data); - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_deinit(ks_dht_message_t *message); +KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message); /** * @@ -412,32 +363,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message, ks_size_t transactionid_length, struct bencode **args); -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_transaction_alloc(ks_dht_transaction_t **transaction, ks_pool_t *pool); - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_transaction_prealloc(ks_dht_transaction_t *transaction, ks_pool_t *pool); - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_transaction_free(ks_dht_transaction_t **transaction); - -KS_DECLARE(ks_status_t) ks_dht_transaction_init(ks_dht_transaction_t *transaction, - ks_sockaddr_t *raddr, - uint32_t transactionid, - ks_dht_message_callback_t callback); - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_transaction_deinit(ks_dht_transaction_t *transaction); - - /** * route table methods diff --git a/libs/libks/src/dht/ks_dht_datagram.c b/libs/libks/src/dht/ks_dht_datagram.c index e27b7493c1..8b6140f2b2 100644 --- a/libs/libks/src/dht/ks_dht_datagram.c +++ b/libs/libks/src/dht/ks_dht_datagram.c @@ -2,75 +2,58 @@ #include "ks_dht-int.h" #include "sodium.h" -KS_DECLARE(ks_status_t) ks_dht_datagram_alloc(ks_dht_datagram_t **datagram, ks_pool_t *pool) +KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram, + ks_pool_t *pool, + ks_dht_t *dht, + ks_dht_endpoint_t *endpoint, + const ks_sockaddr_t *raddr) { ks_dht_datagram_t *dg; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(datagram); ks_assert(pool); - - *datagram = dg = ks_pool_alloc(pool, sizeof(ks_dht_datagram_t)); - dg->pool = pool; - - return KS_STATUS_SUCCESS; -} - -KS_DECLARE(void) ks_dht_datagram_prealloc(ks_dht_datagram_t *datagram, ks_pool_t *pool) -{ - ks_assert(datagram); - ks_assert(pool); - - memset(datagram, 0, sizeof(ks_dht_datagram_t)); - - datagram->pool = pool; -} - -KS_DECLARE(ks_status_t) ks_dht_datagram_free(ks_dht_datagram_t **datagram) -{ - ks_assert(datagram); - ks_assert(*datagram); - - ks_dht_datagram_deinit(*datagram); - ks_pool_free((*datagram)->pool, *datagram); - - *datagram = NULL; - - return KS_STATUS_SUCCESS; -} - - -KS_DECLARE(ks_status_t) ks_dht_datagram_init(ks_dht_datagram_t *datagram, ks_dht_t *dht, ks_dht_endpoint_t *endpoint, const ks_sockaddr_t *raddr) -{ - ks_assert(datagram); - ks_assert(datagram->pool); ks_assert(dht); ks_assert(endpoint); ks_assert(raddr); ks_assert(raddr->family == AF_INET || raddr->family == AF_INET6); + + *datagram = dg = ks_pool_alloc(pool, sizeof(ks_dht_datagram_t)); + if (!dg) { + ret = KS_STATUS_NO_MEM; + goto done; + } + dg->pool = pool; - datagram->dht = dht; - datagram->endpoint = endpoint; - datagram->raddr = *raddr; + dg->dht = dht; + dg->endpoint = endpoint; + dg->raddr = *raddr; - memcpy(datagram->buffer, dht->recv_buffer, dht->recv_buffer_length); - datagram->buffer_length = dht->recv_buffer_length; + memcpy(dg->buffer, dht->recv_buffer, dht->recv_buffer_length); + dg->buffer_length = dht->recv_buffer_length; - return KS_STATUS_SUCCESS; + done: + if (ret != KS_STATUS_SUCCESS) { + if (dg) ks_dht_datagram_destroy(&dg); + *datagram = NULL; + } + return ret; } -KS_DECLARE(ks_status_t) ks_dht_datagram_deinit(ks_dht_datagram_t *datagram) +KS_DECLARE(void) ks_dht_datagram_destroy(ks_dht_datagram_t **datagram) { + ks_dht_datagram_t *dg; + ks_assert(datagram); + ks_assert(*datagram); - datagram->buffer_length = 0; - datagram->raddr = (const ks_sockaddr_t){ 0 }; - datagram->endpoint = NULL; - datagram->dht = NULL; + dg = *datagram; - return KS_STATUS_SUCCESS; + ks_pool_free(dg->pool, dg); + + *datagram = NULL; } - /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/src/dht/ks_dht_endpoint.c b/libs/libks/src/dht/ks_dht_endpoint.c index cf07c8ccb6..61184bfcbf 100644 --- a/libs/libks/src/dht/ks_dht_endpoint.c +++ b/libs/libks/src/dht/ks_dht_endpoint.c @@ -5,87 +5,60 @@ /** * */ -KS_DECLARE(ks_status_t) ks_dht_endpoint_alloc(ks_dht_endpoint_t **endpoint, ks_pool_t *pool) +KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint, + ks_pool_t *pool, + const ks_dht_nodeid_t *nodeid, + const ks_sockaddr_t *addr, + ks_socket_t sock) +{ + ks_dht_endpoint_t *ep; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(endpoint); + ks_assert(pool); + ks_assert(addr); + ks_assert(addr->family == AF_INET || addr->family == AF_INET6); + + *endpoint = ep = ks_pool_alloc(pool, sizeof(ks_dht_endpoint_t)); + if (!ep) { + ret = KS_STATUS_NO_MEM; + goto done; + } + ep->pool = pool; + if (!nodeid) randombytes_buf(ep->nodeid.id, KS_DHT_NODEID_SIZE); + else memcpy(ep->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE); + ep->addr = *addr; + ep->sock = sock; + + done: + if (ret != KS_STATUS_SUCCESS) { + if (ep) ks_dht_endpoint_destroy(&ep); + *endpoint = NULL; + } + return ret; +} + +/** + * + */ +KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint) { ks_dht_endpoint_t *ep; - ks_assert(endpoint); - ks_assert(pool); - - *endpoint = ep = ks_pool_alloc(pool, sizeof(ks_dht_endpoint_t)); - ep->pool = pool; - ep->sock = KS_SOCK_INVALID; - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_endpoint_prealloc(ks_dht_endpoint_t *endpoint, ks_pool_t *pool) -{ - ks_assert(endpoint); - ks_assert(pool); - - memset(endpoint, 0, sizeof(ks_dht_endpoint_t)); - - endpoint->pool = pool; - endpoint->sock = KS_SOCK_INVALID; - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_endpoint_free(ks_dht_endpoint_t **endpoint) -{ ks_assert(endpoint); ks_assert(*endpoint); - ks_dht_endpoint_deinit(*endpoint); - ks_pool_free((*endpoint)->pool, *endpoint); + ep = *endpoint; + + if (ep->node) { + // @todo release the node? + } + if (ep->sock != KS_SOCK_INVALID) ks_socket_close(&ep->sock); + ks_pool_free(ep->pool, ep); *endpoint = NULL; - - return KS_STATUS_SUCCESS; } - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_endpoint_init(ks_dht_endpoint_t *endpoint, const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *addr, ks_socket_t sock) -{ - ks_assert(endpoint); - ks_assert(endpoint->pool); - ks_assert(addr); - ks_assert(addr->family == AF_INET || addr->family == AF_INET6); - - if (!nodeid) randombytes_buf(endpoint->nodeid.id, KS_DHT_NODEID_SIZE); - else memcpy(endpoint->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE); - - endpoint->addr = *addr; - endpoint->sock = sock; - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_endpoint_deinit(ks_dht_endpoint_t *endpoint) -{ - ks_assert(endpoint); - - endpoint->node = NULL; - if (endpoint->sock != KS_SOCK_INVALID) ks_socket_close(&endpoint->sock); - endpoint->addr = (const ks_sockaddr_t){ 0 }; - - return KS_STATUS_SUCCESS; -} - - /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index 8c71a9f6e0..63ea519fc0 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -4,86 +4,59 @@ /** * */ -KS_DECLARE(ks_status_t) ks_dht_message_alloc(ks_dht_message_t **message, ks_pool_t *pool) +KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message, + ks_pool_t *pool, + ks_dht_endpoint_t *endpoint, + ks_sockaddr_t *raddr, + ks_bool_t alloc_data) { - ks_dht_message_t *msg; + ks_dht_message_t *m; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(message); ks_assert(pool); - *message = msg = ks_pool_alloc(pool, sizeof(ks_dht_message_t)); - msg->pool = pool; + *message = m = ks_pool_alloc(pool, sizeof(ks_dht_message_t)); + if (!m) { + ret = KS_STATUS_NO_MEM; + goto done; + } + m->pool = pool; - return KS_STATUS_SUCCESS; + m->endpoint = endpoint; + m->raddr = *raddr; + if (alloc_data) m->data = ben_dict(); + + done: + if (ret != KS_STATUS_SUCCESS) { + if (m) ks_dht_message_destroy(&m); + *message = NULL; + } + return ret; } /** * */ -KS_DECLARE(ks_status_t) ks_dht_message_prealloc(ks_dht_message_t *message, ks_pool_t *pool) +KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message) { - ks_assert(message); - ks_assert(pool); + ks_dht_message_t *m; - memset(message, 0, sizeof(ks_dht_message_t)); - - message->pool = pool; - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_free(ks_dht_message_t **message) -{ ks_assert(message); ks_assert(*message); - ks_dht_message_deinit(*message); - ks_pool_free((*message)->pool, *message); + m = *message; + + if (m->data) { + ben_free(m->data); + m->data = NULL; + } + ks_pool_free(m->pool, *message); *message = NULL; - - return KS_STATUS_SUCCESS; } -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_init(ks_dht_message_t *message, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_bool_t alloc_data) -{ - ks_assert(message); - ks_assert(message->pool); - - message->endpoint = ep; - message->raddr = *raddr; - if (alloc_data) message->data = ben_dict(); - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_deinit(ks_dht_message_t *message) -{ - ks_assert(message); - - message->endpoint = NULL; - message->raddr = (const ks_sockaddr_t){ 0 }; - message->args = NULL; - message->type[0] = '\0'; - message->transactionid_length = 0; - if (message->data) { - ben_free(message->data); - message->data = NULL; - } - - return KS_STATUS_SUCCESS; -} - /** * */ @@ -104,7 +77,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui message->data = ben_decode((const void *)buffer, buffer_length); if (!message->data) { ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n"); - goto failure; + return KS_STATUS_FAIL; } ks_log(KS_LOG_DEBUG, "Message decoded\n"); @@ -113,14 +86,14 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui t = ben_dict_get_by_str(message->data, "t"); if (!t) { ks_log(KS_LOG_DEBUG, "Message missing required key 't'\n"); - goto failure; + return KS_STATUS_FAIL; } tv = ben_str_val(t); tv_len = ben_str_len(t); if (tv_len > KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE) { ks_log(KS_LOG_DEBUG, "Message 't' value has an unexpectedly large size of %d\n", tv_len); - goto failure; + return KS_STATUS_FAIL; } memcpy(message->transactionid, tv, tv_len); @@ -131,14 +104,14 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui y = ben_dict_get_by_str(message->data, "y"); if (!y) { ks_log(KS_LOG_DEBUG, "Message missing required key 'y'\n"); - goto failure; + return KS_STATUS_FAIL; } yv = ben_str_val(y); yv_len = ben_str_len(y); if (yv_len >= KS_DHT_MESSAGE_TYPE_MAX_SIZE) { ks_log(KS_LOG_DEBUG, "Message 'y' value has an unexpectedly large size of %d\n", yv_len); - goto failure; + return KS_STATUS_FAIL; } memcpy(message->type, yv, yv_len); @@ -146,10 +119,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", message->type); return KS_STATUS_SUCCESS; - - failure: - ks_dht_message_deinit(message); - return KS_STATUS_FAIL; } /** diff --git a/libs/libks/src/dht/ks_dht_search.c b/libs/libks/src/dht/ks_dht_search.c index 3d52bfd1d0..ce314e6155 100644 --- a/libs/libks/src/dht/ks_dht_search.c +++ b/libs/libks/src/dht/ks_dht_search.c @@ -5,104 +5,70 @@ /** * */ -KS_DECLARE(ks_status_t) ks_dht_search_alloc(ks_dht_search_t **search, ks_pool_t *pool) +KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target) { ks_dht_search_t *s; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(search); ks_assert(pool); - + ks_assert(target); + *search = s = ks_pool_alloc(pool, sizeof(ks_dht_search_t)); + if (!s) { + ret = KS_STATUS_NO_MEM; + goto done; + } s->pool = pool; + if ((ret = ks_mutex_create(&s->mutex, KS_MUTEX_FLAG_DEFAULT, s->pool)) != KS_STATUS_SUCCESS) goto done; + memcpy(s->target.id, target->id, KS_DHT_NODEID_SIZE); + + if ((ret = ks_hash_create(&s->pending, + KS_HASH_MODE_ARBITRARY, + KS_HASH_FLAG_RWLOCK, + s->pool)) != KS_STATUS_SUCCESS) goto done; + ks_hash_set_keysize(s->pending, KS_DHT_NODEID_SIZE); + + done: + if (ret != KS_STATUS_SUCCESS) { + if (s) ks_dht_search_destroy(&s); + *search = NULL; + } return KS_STATUS_SUCCESS; } /** * */ -KS_DECLARE(void) ks_dht_search_prealloc(ks_dht_search_t *search, ks_pool_t *pool) +KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search) { - ks_assert(search); - ks_assert(pool); + ks_dht_search_t *s; + ks_hash_iterator_t *it; - memset(search, 0, sizeof(ks_dht_search_t)); - - search->pool = pool; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_search_free(ks_dht_search_t **search) -{ - ks_status_t ret = KS_STATUS_SUCCESS; - ks_assert(search); ks_assert(*search); - if ((ret = ks_dht_search_deinit(*search)) != KS_STATUS_SUCCESS) return ret; - if ((ret = ks_pool_free((*search)->pool, *search)) != KS_STATUS_SUCCESS) return ret; + s = *search; - *search = NULL; - - return KS_STATUS_SUCCESS; -} - - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_search_init(ks_dht_search_t *search, const ks_dht_nodeid_t *target) -{ - ks_status_t ret = KS_STATUS_SUCCESS; - - ks_assert(search); - ks_assert(search->pool); - ks_assert(target); - - if ((ret = ks_mutex_create(&search->mutex, KS_MUTEX_FLAG_DEFAULT, search->pool)) != KS_STATUS_SUCCESS) return ret; - memcpy(search->target.id, target->id, KS_DHT_NODEID_SIZE); - - if ((ret = ks_hash_create(&search->pending, - KS_HASH_MODE_ARBITRARY, - KS_HASH_FLAG_RWLOCK, - search->pool)) != KS_STATUS_SUCCESS) return ret; - ks_hash_set_keysize(search->pending, KS_DHT_NODEID_SIZE); - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_search_deinit(ks_dht_search_t *search) -{ - ks_hash_iterator_t *it; - ks_status_t ret = KS_STATUS_SUCCESS; - - ks_assert(search); - - search->results_length = 0; - if (search->pending) { - for (it = ks_hash_first(search->pending, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - const void *key; + if (s->pending) { + for (it = ks_hash_first(s->pending, KS_UNLOCKED); it; it = ks_hash_next(&it)) { ks_dht_search_pending_t *val; - ks_hash_this(it, &key, NULL, (void **)&val); - if ((ret = ks_dht_search_pending_deinit(val)) != KS_STATUS_SUCCESS) return ret; - if ((ret = ks_dht_search_pending_free(&val)) != KS_STATUS_SUCCESS) return ret; + ks_hash_this_val(it, (void **)&val); + ks_dht_search_pending_destroy(&val); } - ks_hash_destroy(&search->pending); + ks_hash_destroy(&s->pending); } - search->callbacks_size = 0; - if (search->callbacks) { - if ((ret = ks_pool_free(search->pool, search->callbacks)) != KS_STATUS_SUCCESS) return ret; - search->callbacks = NULL; + if (s->callbacks) { + ks_pool_free(s->pool, s->callbacks); + s->callbacks = NULL; } - if (search->mutex && (ret = ks_mutex_destroy(&search->mutex)) != KS_STATUS_SUCCESS) return ret; + if (s->mutex) ks_mutex_destroy(&s->mutex); - return KS_STATUS_SUCCESS; + ks_pool_free(s->pool, s); + + *search = NULL; } KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_dht_search_callback_t callback) @@ -111,77 +77,58 @@ KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_d if (callback) { int32_t index; - // @todo lock mutex + + ks_mutex_lock(search->mutex); index = search->callbacks_size++; search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool, (void *)search->callbacks, sizeof(ks_dht_search_callback_t) * search->callbacks_size); + if (!search->callbacks) return KS_STATUS_NO_MEM; search->callbacks[index] = callback; - // @todo unlock mutex + ks_mutex_unlock(search->mutex); } return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) ks_dht_search_pending_alloc(ks_dht_search_pending_t **pending, ks_pool_t *pool) +KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid) { ks_dht_search_pending_t *p; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(pending); ks_assert(pool); *pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t)); + if (!p) { + ret = KS_STATUS_NO_MEM; + goto done; + } p->pool = pool; + p->nodeid = *nodeid; + p->expiration = ks_time_now_sec() + KS_DHT_SEARCH_EXPIRATION; + p->finished = KS_FALSE; + + done: + if (ret != KS_STATUS_SUCCESS) { + if (p) ks_dht_search_pending_destroy(&p); + *pending = NULL; + } return KS_STATUS_SUCCESS; } -KS_DECLARE(void) ks_dht_search_pending_prealloc(ks_dht_search_pending_t *pending, ks_pool_t *pool) +KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending) { - ks_assert(pending); - ks_assert(pool); + ks_dht_search_pending_t *p; - memset(pending, 0, sizeof(ks_dht_search_pending_t)); - - pending->pool = pool; -} - -KS_DECLARE(ks_status_t) ks_dht_search_pending_free(ks_dht_search_pending_t **pending) -{ - ks_status_t ret = KS_STATUS_SUCCESS; - ks_assert(pending); ks_assert(*pending); - if ((ret = ks_dht_search_pending_deinit(*pending)) != KS_STATUS_SUCCESS) return ret; - if ((ret = ks_pool_free((*pending)->pool, *pending)) != KS_STATUS_SUCCESS) return ret; + p = *pending; + + ks_pool_free(p->pool, p); *pending = NULL; - - return KS_STATUS_SUCCESS; -} - -KS_DECLARE(ks_status_t) ks_dht_search_pending_init(ks_dht_search_pending_t *pending, ks_dht_node_t *node) -{ - ks_assert(pending); - ks_assert(pending->pool); - ks_assert(node); - - pending->node = node; - pending->expiration = ks_time_now_sec() + KS_DHT_SEARCH_EXPIRATION; - pending->finished = KS_FALSE; - - return KS_STATUS_SUCCESS; -} - -KS_DECLARE(ks_status_t) ks_dht_search_pending_deinit(ks_dht_search_pending_t *pending) -{ - ks_assert(pending); - - pending->node = NULL; - pending->expiration = 0; - pending->finished = KS_FALSE; - - return KS_STATUS_SUCCESS; } /* For Emacs: diff --git a/libs/libks/src/dht/ks_dht_storageitem.c b/libs/libks/src/dht/ks_dht_storageitem.c index d76247bcf7..58c3018a6a 100644 --- a/libs/libks/src/dht/ks_dht_storageitem.c +++ b/libs/libks/src/dht/ks_dht_storageitem.c @@ -5,192 +5,123 @@ /** * */ -KS_DECLARE(ks_status_t) ks_dht_storageitem_alloc(ks_dht_storageitem_t **item, ks_pool_t *pool) +KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v) { ks_dht_storageitem_t *si; - - ks_assert(item); - ks_assert(pool); - - *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t)); - si->pool = pool; - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_storageitem_prealloc(ks_dht_storageitem_t *item, ks_pool_t *pool) -{ - ks_assert(item); - ks_assert(pool); - - memset(item, 0, sizeof(ks_dht_storageitem_t)); - - item->pool = pool; - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_storageitem_free(ks_dht_storageitem_t **item) -{ - ks_assert(item); - ks_assert(*item); - - ks_dht_storageitem_deinit(*item); - ks_pool_free((*item)->pool, *item); - - *item = NULL; - - return KS_STATUS_SUCCESS; -} - - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_storageitem_init(ks_dht_storageitem_t *item, struct bencode *v) -{ - ks_assert(item); - ks_assert(item->pool); - ks_assert(v); - ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE); - - item->v = ben_clone(v); - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_storageitem_deinit(ks_dht_storageitem_t *item) -{ - ks_assert(item); - - if (item->v) { - ben_free(item->v); - item->v = NULL; - } - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_storageitem_create(ks_dht_storageitem_t *item, ks_bool_t mutable) -{ - SHA_CTX sha; - - ks_assert(item); - ks_assert(item->pool); - ks_assert(item->v); - - item->mutable = mutable; - - if (!mutable) { - size_t enc_len = 0; - uint8_t *enc = ben_encode(&enc_len, item->v); - SHA1_Init(&sha); - SHA1_Update(&sha, enc, enc_len); - SHA1_Final(item->id.id, &sha); - free(enc); - } else { - size_t enc_len = 0; - uint8_t *enc = NULL; - struct bencode *sig = ben_dict(); - - crypto_sign_keypair(item->pk.key, item->sk.key); - randombytes_buf(item->salt, KS_DHT_STORAGEITEM_SALT_MAX_SIZE); - item->salt_length = KS_DHT_STORAGEITEM_SALT_MAX_SIZE; - item->seq = 1; - - ben_dict_set(sig, ben_blob("salt", 4), ben_blob(item->salt, item->salt_length)); - ben_dict_set(sig, ben_blob("seq", 3), ben_int(item->seq)); - ben_dict_set(sig, ben_blob("v", 1), ben_clone(item->v)); - enc = ben_encode(&enc_len, sig); - ben_free(sig); - - SHA1_Init(&sha); - SHA1_Update(&sha, enc, enc_len); - SHA1_Final(item->sig.sig, &sha); - - free(enc); - - SHA1_Init(&sha); - SHA1_Update(&sha, item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE); - SHA1_Update(&sha, item->salt, item->salt_length); - SHA1_Final(item->id.id, &sha); - } - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_storageitem_immutable(ks_dht_storageitem_t *item) -{ SHA_CTX sha; size_t enc_len = 0; uint8_t *enc = NULL; - - ks_assert(item); - ks_assert(item->v); + ks_status_t ret = KS_STATUS_SUCCESS; - item->mutable = KS_FALSE; + ks_assert(item); + ks_assert(pool); + ks_assert(v); + ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE); + + *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t)); + if (!si) { + ret = KS_STATUS_NO_MEM; + goto done; + } + si->pool = pool; + + si->mutable = KS_FALSE; - enc = ben_encode(&enc_len, item->v); + si->v = ben_clone(v); + if (!si->v) { + ret = KS_STATUS_NO_MEM; + goto done; + } + + enc = ben_encode(&enc_len, si->v); SHA1_Init(&sha); SHA1_Update(&sha, enc, enc_len); - SHA1_Final(item->id.id, &sha); + SHA1_Final(si->id.id, &sha); free(enc); - return KS_STATUS_SUCCESS; + done: + if (ret != KS_STATUS_SUCCESS) { + if (si) ks_dht_storageitem_destroy(&si); + *item = NULL; + } + return ret; +} + +KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item, + ks_pool_t *pool, + struct bencode *v, + ks_dht_storageitem_key_t *k, + uint8_t *salt, + ks_size_t salt_length, + int64_t sequence, + ks_dht_storageitem_signature_t *signature) +{ + ks_dht_storageitem_t *si; + SHA_CTX sha; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(item); + ks_assert(pool); + ks_assert(v); + ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE); + ks_assert(k); + ks_assert(!(!salt && salt_length > 0)); + ks_assert(!(salt_length > KS_DHT_STORAGEITEM_SIGNATURE_SIZE)); + ks_assert(signature); + + *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t)); + if (!si) { + ret = KS_STATUS_NO_MEM; + goto done; + } + si->pool = pool; + + si->v = ben_clone(v); + + si->mutable = KS_TRUE; + + memcpy(si->pk.key, k->key, KS_DHT_STORAGEITEM_KEY_SIZE); + if (salt && salt_length > 0) { + memcpy(si->salt, salt, salt_length); + si->salt_length = salt_length; + } + si->seq = sequence; + memcpy(si->sig.sig, signature->sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE); + + SHA1_Init(&sha); + SHA1_Update(&sha, si->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE); + if (si->salt && si->salt_length > 0) SHA1_Update(&sha, si->salt, si->salt_length); + SHA1_Final(si->id.id, &sha); + + done: + if (ret != KS_STATUS_SUCCESS) { + if (si) ks_dht_storageitem_destroy(&si); + *item = NULL; + } + return ret; } /** * */ -KS_DECLARE(ks_status_t) ks_dht_storageitem_mutable(ks_dht_storageitem_t *item, - ks_dht_storageitem_key_t *k, - uint8_t *salt, - ks_size_t salt_length, - int64_t sequence, - ks_dht_storageitem_signature_t *signature) +KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item) { - SHA_CTX sha; + ks_dht_storageitem_t *si; ks_assert(item); - ks_assert(item->v); - ks_assert(k); - ks_assert(!(!salt && salt_length > 0)); - ks_assert(salt_length > KS_DHT_STORAGEITEM_SIGNATURE_SIZE); - ks_assert(signature); + ks_assert(*item); - item->mutable = KS_TRUE; + si = *item; - memcpy(item->pk.key, k->key, KS_DHT_STORAGEITEM_KEY_SIZE); - if (salt && salt_length > 0) { - memcpy(item->salt, salt, salt_length); - item->salt_length = salt_length; + if (si->v) { + ben_free(si->v); + si->v = NULL; } - item->seq = sequence; - memcpy(item->sig.sig, signature->sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE); + ks_pool_free(si->pool, si); - SHA1_Init(&sha); - SHA1_Update(&sha, item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE); - if (item->salt && item->salt_length > 0) SHA1_Update(&sha, item->salt, item->salt_length); - SHA1_Final(item->id.id, &sha); - - return KS_STATUS_SUCCESS; + *item = NULL; } - + /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/src/dht/ks_dht_transaction.c b/libs/libks/src/dht/ks_dht_transaction.c index 604cb0353a..18978bf689 100644 --- a/libs/libks/src/dht/ks_dht_transaction.c +++ b/libs/libks/src/dht/ks_dht_transaction.c @@ -1,92 +1,52 @@ #include "ks_dht.h" #include "ks_dht-int.h" -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_transaction_alloc(ks_dht_transaction_t **transaction, ks_pool_t *pool) +KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transaction, + ks_pool_t *pool, + ks_sockaddr_t *raddr, + uint32_t transactionid, + ks_dht_message_callback_t callback) { - ks_dht_transaction_t *tran; + ks_dht_transaction_t *t; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(transaction); ks_assert(pool); + ks_assert(raddr); - *transaction = tran = ks_pool_alloc(pool, sizeof(ks_dht_transaction_t)); - tran->pool = pool; + *transaction = t = ks_pool_alloc(pool, sizeof(ks_dht_transaction_t)); + if (!t) { + ret = KS_STATUS_NO_MEM; + goto done; + } + t->pool = pool; - return KS_STATUS_SUCCESS; + t->raddr = *raddr; + t->transactionid = transactionid; + t->callback = callback; + t->expiration = ks_time_now_sec() + KS_DHT_TRANSACTION_EXPIRATION_DELAY; + + done: + if (ret != KS_STATUS_SUCCESS) { + if (t) ks_dht_transaction_destroy(&t); + *transaction = NULL; + } + return ret; } -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_transaction_prealloc(ks_dht_transaction_t *transaction, ks_pool_t *pool) +KS_DECLARE(void) ks_dht_transaction_destroy(ks_dht_transaction_t **transaction) { - ks_assert(transaction); - ks_assert(pool); + ks_dht_transaction_t *t; - memset(transaction, 0, sizeof(ks_dht_transaction_t)); - - transaction->pool = pool; - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_transaction_free(ks_dht_transaction_t **transaction) -{ ks_assert(transaction); ks_assert(*transaction); - ks_dht_transaction_deinit(*transaction); - ks_pool_free((*transaction)->pool, *transaction); + t = *transaction; + + ks_pool_free(t->pool, t); *transaction = NULL; - - return KS_STATUS_SUCCESS; } - - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_transaction_init(ks_dht_transaction_t *transaction, - ks_sockaddr_t *raddr, - uint32_t transactionid, - ks_dht_message_callback_t callback) -{ - ks_assert(transaction); - ks_assert(raddr); - ks_assert(transaction->pool); - ks_assert(callback); - - transaction->raddr = *raddr; - transaction->transactionid = transactionid; - transaction->callback = callback; - transaction->expiration = ks_time_now_sec() + KS_DHT_TRANSACTION_EXPIRATION_DELAY; - transaction->finished = KS_FALSE; - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_transaction_deinit(ks_dht_transaction_t *transaction) -{ - ks_assert(transaction); - - transaction->raddr = (const ks_sockaddr_t){ 0 }; - transaction->transactionid = 0; - transaction->callback = NULL; - transaction->expiration = 0; - transaction->finished = KS_FALSE; - - return KS_STATUS_SUCCESS; -} - /* For Emacs: * Local Variables: diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index 20200a8b93..76ce6abe22 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -19,7 +19,7 @@ int main() { ks_status_t err; int mask = 0; ks_dht_t *dht1 = NULL; - ks_dht_t dht2; + ks_dht_t *dht2 = NULL; ks_dht_t *dht3 = NULL; ks_dht_endpoint_t *ep1; ks_dht_endpoint_t *ep2; @@ -53,23 +53,15 @@ int main() { diag("Binding to %s on ipv6\n", v6); } - err = ks_dht_alloc(&dht1, NULL); + err = ks_dht_create(&dht1, NULL, NULL); ok(err == KS_STATUS_SUCCESS); - err = ks_dht_init(dht1, NULL); + err = ks_dht_create(&dht2, NULL, NULL); ok(err == KS_STATUS_SUCCESS); - ks_dht_prealloc(&dht2, dht1->pool); - - err = ks_dht_init(&dht2, NULL); - ok(err == KS_STATUS_SUCCESS); - - err = ks_dht_alloc(&dht3, NULL); + err = ks_dht_create(&dht3, NULL, NULL); ok(err == KS_STATUS_SUCCESS); - err = ks_dht_init(dht3, NULL); - ok(err == KS_STATUS_SUCCESS); - ks_dht_register_type(dht1, "z", dht_z_callback); @@ -85,7 +77,7 @@ int main() { err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 1, AF_INET); ok(err == KS_STATUS_SUCCESS); - err = ks_dht_bind(&dht2, NULL, &addr, &ep2); + err = ks_dht_bind(dht2, NULL, &addr, &ep2); ok(err == KS_STATUS_SUCCESS); //raddr2 = addr; @@ -109,7 +101,7 @@ int main() { err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 1, AF_INET6); ok(err == KS_STATUS_SUCCESS); - err = ks_dht_bind(&dht2, NULL, &addr, NULL); + err = ks_dht_bind(dht2, NULL, &addr, NULL); ok(err == KS_STATUS_SUCCESS); err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 2, AF_INET6); @@ -143,24 +135,24 @@ int main() { diag("Ping test\n"); - ks_dht_send_ping(&dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1 + ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1 - ks_dht_pulse(&dht2, 100); // Send queued ping from dht2 to dht1 + ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet - ks_dht_pulse(&dht2, 100); // Receive and process ping response from dht1 + ks_dht_pulse(dht2, 100); // Receive and process ping response from dht1 - ok(ks_dhtrt_find_node(dht2.rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good + ok(ks_dhtrt_find_node(dht2->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good - diag("Pulsing for route table pings\n"); // Wait a second for route table pinging to catch up + diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up for (int i = 0; i < 10; ++i) { diag("DHT 1\n"); ks_dht_pulse(dht1, 100); diag("DHT 2\n"); - ks_dht_pulse(&dht2, 100); + ks_dht_pulse(dht2, 100); } ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good @@ -180,35 +172,26 @@ int main() { ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet - diag("Pulsing for route table pings\n"); // Wait a second for route table pinging to catch up + diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up for (int i = 0; i < 10; ++i) { diag("DHT 1\n"); ks_dht_pulse(dht1, 100); diag("DHT 2\n"); - ks_dht_pulse(&dht2, 100); + ks_dht_pulse(dht2, 100); } ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good - diag("Cleanup\n"); + /* Cleanup and shutdown */ + diag("Cleanup\n"); - err = ks_dht_deinit(dht3); - ok(err == KS_STATUS_SUCCESS); + ks_dht_destroy(&dht3); - err = ks_dht_free(&dht3); - ok(err == KS_STATUS_SUCCESS); + ks_dht_destroy(&dht2); - err = ks_dht_deinit(&dht2); - ok(err == KS_STATUS_SUCCESS); + ks_dht_destroy(&dht1); - err = ks_dht_deinit(dht1); - ok(err == KS_STATUS_SUCCESS); - - err = ks_dht_free(&dht1); - ok(err == KS_STATUS_SUCCESS); - - err = ks_shutdown(); - ok(err == KS_STATUS_SUCCESS); + ks_shutdown(); done_testing(); }