From aaa13f3ba6d23f9abaad9102f230cc43329024c5 Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Thu, 5 Jan 2017 16:19:55 +0000 Subject: [PATCH] FS-9775: Tweaks, bug fixes, etc. Committing in preparation for introducing into libblade. --- libs/libks/src/dht/ks_dht-int.h | 16 +- libs/libks/src/dht/ks_dht.c | 292 +++++++++++++++++----------- libs/libks/src/dht/ks_dht.h | 1 - libs/libks/src/dht/ks_dht_message.c | 23 --- libs/libks/test/testdht2.c | 4 +- 5 files changed, 185 insertions(+), 151 deletions(-) diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 2e8ce749e3..6d06d4682b 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -21,6 +21,14 @@ KS_BEGIN_EXTERN_C */ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint); +/** + * Called internally to receive datagrams from endpoints. + * Handles datagrams by dispatching each through a threadpool. + * @param dht pointer to the dht instance + * @param timeout time in ms to wait for an incoming datagram on any endpoint + */ +KS_DECLARE(void) ks_dht_pulse_endpoints(ks_dht_t *dht, int32_t timeout); + /** * Called internally to expire or reannounce storage item data. * Handles reannouncing and purging of expiring storage items. @@ -334,14 +342,6 @@ KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message); */ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length); -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, - uint8_t *transactionid, - ks_size_t transactionid_length, - struct bencode **args); - /** * diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index 7007ddb8eb..4cffb7cef7 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -229,36 +229,46 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) d = *dht; + // @todo ks_dht_shutdown to stop further incoming data and pulse to finish processing and flushing data + /** - * Cleanup the storageitems hash and it's contents if it is allocated. + * Cleanup the type, query, and error registries if they have been allocated. + * No dependancies during destruction, entries are function pointers with no cleanup required. */ - if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash); - d->storageitems_pulse = 0; + if (d->registry_type) ks_hash_destroy(&d->registry_type); + if (d->registry_query) ks_hash_destroy(&d->registry_query); + if (d->registry_error) ks_hash_destroy(&d->registry_error); + + /** + * Cleanup the endpoint management and entries if they have been allocated. + * No dependancies during destruction, entries are destroyed through hash destructor. + * Sockets are closed during entry destruction, and route table references to local nodes are released. + */ + if (d->endpoints) ks_pool_free(d->pool, &d->endpoints); + if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll); + if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash); /** - * Zero out the opaque write token variables. + * Cleanup the transaction management and entries if they have been allocated. + * No dependancies during destruction, entries are destroyed through hash destructor. */ - d->token_secret_current = 0; - d->token_secret_previous = 0; - d->token_secret_expiration = 0; - d->tokens_pulse = 0; - - /** - * Cleanup the route tables if they are allocated. - */ - if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4); - if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6); - - /** - * Cleanup the transactions mutex and hash if they are allocated. - */ - d->transactionid_next = 0; if (d->transactionid_mutex) ks_mutex_destroy(&d->transactionid_mutex); if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash); - d->transactions_pulse = 0; /** - * Cleanup the jobs mutex and jobs if they are allocated. + * Cleanup the message send queue and entries if they have been allocated. + * No dependancies during destruction, entries must be destroyed here. + */ + if (d->send_q) { + ks_dht_message_t *msg; + while (ks_q_pop_timeout(d->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) ks_dht_message_destroy(&msg); + ks_q_destroy(&d->send_q); + } + if (d->send_q_unsent) ks_dht_message_destroy(&d->send_q_unsent); + + /** + * Cleanup the jobs management and entries if they have been allocated. + * Route table node and storage item references are released, entries must be destroyed here. */ for (ks_dht_job_t *job = d->jobs_first, *jobn = NULL; job; job = jobn) { jobn = job->next; @@ -267,65 +277,25 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) if (d->jobs_mutex) ks_mutex_destroy(&d->jobs_mutex); /** - * Probably don't need this, recv_buffer_length is temporary and may change + * Cleanup the storageitems hash and it's contents if it is allocated. + * No dependancies during destruction, entries are destroyed through hash destructor. */ - d->recv_buffer_length = 0; + if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash); /** - * Cleanup the send queue and it's contents if it is allocated. + * Cleanup the route tables if they are allocated. + * No nodes should be referenced anymore by this point. */ - if (d->send_q) { - ks_dht_message_t *msg; - while (ks_q_pop_timeout(d->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) ks_dht_message_destroy(&msg); - ks_q_destroy(&d->send_q); - } - - /** - * Cleanup the cached popped message if it is set. - */ - if (d->send_q_unsent) ks_dht_message_destroy(&d->send_q_unsent); + if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4); + if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6); - /** - * Probably don't need this - */ - d->endpoints_length = 0; - d->endpoints_size = 0; - - /** - * Cleanup the array of endpoint pointers if it is allocated. - */ - if (d->endpoints) ks_pool_free(d->pool, &d->endpoints); - - /** - * Cleanup the array of endpoint polling data if it is allocated. - */ - if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll); - - /** - * Cleanup the endpoints hash if it is allocated, and any endpoints that have been allocated. - */ - if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash); - - /** - * Cleanup the type, query, and error registries if they have been allocated. - */ - if (d->registry_type) ks_hash_destroy(&d->registry_type); - if (d->registry_query) ks_hash_destroy(&d->registry_query); - if (d->registry_error) ks_hash_destroy(&d->registry_error); - - /** - * Probably don't need this - */ - d->autoroute = KS_FALSE; - d->autoroute_port = 0; /** * If the thread pool was allocated internally, destroy it. - * If this fails, something catastrophically bad happened like memory corruption. */ if (d->tpool_alloc) ks_thread_pool_destroy(&d->tpool); - d->tpool_alloc = KS_FALSE; + /** * Temporarily store the allocator level variables because freeing the dht instance will invalidate it. */ @@ -335,16 +305,11 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) /** * Free the dht instance from the pool, after this the dht instance memory is invalid. */ - ks_pool_free(d->pool, &d); - - /** - * At this point dht instance is invalidated so NULL the pointer. - */ - *dht = d = NULL; + ks_pool_free(d->pool, dht); + d = NULL; /** * If the pool was allocated internally, destroy it using the temporary variables stored earlier. - * If this fails, something catastrophically bad happened like memory corruption. */ if (pool_alloc) ks_pool_close(&pool); } @@ -587,20 +552,39 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) { - ks_dht_datagram_t *datagram = NULL; - ks_sockaddr_t raddr; - ks_assert(dht); ks_assert(timeout >= 0 && timeout <= 1000); // this should be called with a timeout of less than 1000ms, preferrably around 100ms + ks_dht_pulse_endpoints(dht, timeout); + + if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4); + if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6); + + ks_dht_pulse_storageitems(dht); + + ks_dht_pulse_jobs(dht); + + ks_dht_pulse_send(dht); + + ks_dht_pulse_transactions(dht); + + ks_dht_pulse_tokens(dht); +} + +KS_DECLARE(void) ks_dht_pulse_endpoints(ks_dht_t *dht, int32_t timeout) +{ + ks_assert(dht); + ks_assert(timeout >= 0 && timeout <= 1000); + if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0; if (ks_poll(dht->endpoints_poll, dht->endpoints_length, timeout) > 0) { for (int32_t i = 0; i < dht->endpoints_length; ++i) { + ks_dht_datagram_t *datagram = NULL; + ks_sockaddr_t raddr = (const ks_sockaddr_t){ 0 }; if (!(dht->endpoints_poll[i].revents & POLLIN)) continue; - raddr = (const ks_sockaddr_t){ 0 }; dht->recv_buffer_length = sizeof(dht->recv_buffer); raddr.family = dht->endpoints[i]->addr.family; if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) != KS_STATUS_SUCCESS) continue; @@ -616,19 +600,6 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) if (ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram); } } - - if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4); - if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6); - - ks_dht_pulse_storageitems(dht); - - ks_dht_pulse_jobs(dht); - - ks_dht_pulse_send(dht); - - ks_dht_pulse_transactions(dht); - - ks_dht_pulse_tokens(dht); } KS_DECLARE(void) ks_dht_pulse_storageitems(ks_dht_t *dht) @@ -1702,6 +1673,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo ks_size_t nodes_count = 0; ks_dht_nodeid_t distance; int32_t results_index = -1; + ks_bool_t finished = KS_FALSE; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); @@ -1713,7 +1685,10 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo ks_mutex_lock(search->mutex); search->searching--; - if (job->result != KS_DHT_JOB_RESULT_SUCCESS) goto done; + if (job->result != KS_DHT_JOB_RESULT_SUCCESS) { + finished = KS_TRUE; + goto done; + } ks_dht_utility_nodeid_xor(&distance, &job->response_id->nodeid, &search->target); if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) { @@ -1781,11 +1756,12 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target); } } + finished = search->searching == 0; done: ks_mutex_unlock(search->mutex); - if (search->searching == 0) { + if (finished) { if (search->callback) search->callback(dht, job); ks_dht_search_destroy(&search); } @@ -1795,9 +1771,9 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job) { - ks_bool_t locked_search = KS_FALSE; ks_dht_search_t *search = NULL; ks_dhtrt_querynodes_t query; + ks_bool_t finished = KS_FALSE; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); @@ -1807,7 +1783,6 @@ KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job) search = (ks_dht_search_t *)job->data; ks_mutex_lock(search->mutex); - locked_search = KS_TRUE; // find closest good nodes to target locally and store as the closest results query.nodeid = search->target; @@ -1828,11 +1803,12 @@ KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job) ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target); } ks_dhtrt_release_querynodes(&query); + finished = search->searching == 0; // done: - if (locked_search) ks_mutex_unlock(search->mutex); + ks_mutex_unlock(search->mutex); - if (search->searching == 0) { + if (finished) { if (search->callback) search->callback(dht, job); ks_dht_search_destroy(&search); } @@ -1846,6 +1822,7 @@ KS_DECLARE(void) ks_dht_search(ks_dht_t *dht, ks_dhtrt_routetable_t *table, ks_dht_nodeid_t *target) { + char target_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_dht_search_t *search = NULL; ks_dht_job_t *job = NULL; @@ -1853,6 +1830,8 @@ KS_DECLARE(void) ks_dht_search(ks_dht_t *dht, ks_assert(table); ks_assert(target); + ks_log(KS_LOG_INFO, "[%s] Searching\n", ks_dht_hex(target->id, target_buf, KS_DHT_NODEID_SIZE)); + ks_dht_search_create(&search, dht->pool, table, target, callback, data); ks_assert(search); @@ -1866,6 +1845,7 @@ KS_DECLARE(void) ks_dht_search(ks_dht_t *dht, KS_DECLARE(ks_status_t) ks_dht_publish_get_callback(ks_dht_t *dht, ks_dht_job_t *job) { ks_dht_publish_t *publish = NULL; + ks_bool_t finished = KS_FALSE; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); @@ -1874,22 +1854,20 @@ KS_DECLARE(ks_status_t) ks_dht_publish_get_callback(ks_dht_t *dht, ks_dht_job_t publish = (ks_dht_publish_t *)job->data; - // @todo callbacks need job to contain cascaded publish->data before calling if (job->result != KS_DHT_JOB_RESULT_SUCCESS) { - job->data = publish->data; - if (publish->callback) publish->callback(dht, job); + finished = KS_TRUE; goto done; } if (!job->response_hasitem || (publish->item->mutable && job->response_seq < publish->item->seq)) { ks_dht_put(dht, &job->raddr, publish->callback, publish->data, &job->response_token, publish->cas, publish->item); - } else if (publish->callback) { - job->data = publish->data; - publish->callback(dht, job); - } + } else finished = KS_TRUE; done: - + if (finished) { + job->data = publish->data; + if (publish->callback) publish->callback(dht, job); + } ks_dht_publish_destroy(&publish); return ret; } @@ -1901,6 +1879,7 @@ KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht, int64_t cas, ks_dht_storageitem_t *item) { + char target_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_dht_publish_t *publish = NULL; const uint8_t *salt = NULL; size_t salt_length = 0; @@ -1910,6 +1889,8 @@ KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht, ks_assert(cas >= 0); ks_assert(item); + ks_log(KS_LOG_INFO, "[%s] Publishing to %s %d\n", ks_dht_hex(item->id.id, target_buf, KS_DHT_NODEID_SIZE), raddr->host, raddr->port); + if (item->salt) { salt = (const uint8_t *)ben_str_val(item->salt); salt_length = ben_str_len(item->salt); @@ -1977,8 +1958,6 @@ KS_DECLARE(ks_status_t) ks_dht_distribute_search_callback(ks_dht_t *dht, ks_dht_ ks_dht_distribute_destroy(&distribute); } - ks_dht_search_destroy(&search); - return ret; } @@ -1989,12 +1968,15 @@ KS_DECLARE(void) ks_dht_distribute(ks_dht_t *dht, int64_t cas, ks_dht_storageitem_t *item) { + char target_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_dht_distribute_t *distribute = NULL; ks_assert(dht); ks_assert(table); ks_assert(cas >= 0); ks_assert(item); + + ks_log(KS_LOG_INFO, "[%s] Distributing\n", ks_dht_hex(item->id.id, target_buf, KS_DHT_NODEID_SIZE)); ks_dht_distribute_create(&distribute, dht->pool, callback, data, cas, item); ks_assert(distribute); @@ -2201,7 +2183,13 @@ KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job) &message, NULL)) != KS_STATUS_SUCCESS) goto done; - //ks_log(KS_LOG_DEBUG, "Sending message query ping\n"); + ks_log(KS_LOG_INFO, + "[%s %d] Ping query to %s %d\n", + message->endpoint->addr.host, + message->endpoint->addr.port, + message->raddr.host, + message->raddr.port); + ks_q_push(dht->send_q, (void *)message); done: @@ -2217,6 +2205,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ ks_assert(message); ks_assert(message->args); + ks_log(KS_LOG_INFO, + "[%s %d] Ping query from %s %d\n", + message->endpoint->addr.host, + message->endpoint->addr.port, + message->raddr.host, + message->raddr.port); + //ks_log(KS_LOG_DEBUG, "Message query ping is valid\n"); if ((ret = ks_dht_response_setup(dht, @@ -2241,6 +2236,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t ks_assert(dht); ks_assert(job); + ks_log(KS_LOG_INFO, + "[%s %d] Ping response from %s %d\n", + job->response->endpoint->addr.host, + job->response->endpoint->addr.port, + job->response->raddr.host, + job->response->raddr.port); + //ks_log(KS_LOG_DEBUG, "Message response ping is reached\n"); // done: @@ -2297,7 +2299,13 @@ KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job) ben_dict_set(a, ben_blob("want", 4), want); } - //ks_log(KS_LOG_DEBUG, "Sending message query find_node\n"); + ks_log(KS_LOG_INFO, + "[%s %d] Findnode query to %s %d\n", + message->endpoint->addr.host, + message->endpoint->addr.port, + message->raddr.host, + message->raddr.port); + ks_q_push(dht->send_q, (void *)message); done: @@ -2324,6 +2332,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_assert(message); ks_assert(message->args); + ks_log(KS_LOG_INFO, + "[%s %d] Findnode query from %s %d\n", + message->endpoint->addr.host, + message->endpoint->addr.port, + message->raddr.host, + message->raddr.port); + if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) { ks_dht_error(dht, message->endpoint, @@ -2449,6 +2464,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j ks_assert(dht); ks_assert(job); + ks_log(KS_LOG_INFO, + "[%s %d] Findnode response from %s %d\n", + job->response->endpoint->addr.host, + job->response->endpoint->addr.port, + job->response->raddr.host, + job->response->raddr.port); + n = ben_dict_get_by_str(job->response->args, "nodes"); if (n && dht->rt_ipv4) { //n4 = KS_TRUE; @@ -2551,7 +2573,13 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job) if (item && item->mutable && item->seq > 0) ben_dict_set(a, ben_blob("seq", 3), ben_int(item->seq)); ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE)); - //ks_log(KS_LOG_DEBUG, "Sending message query get\n"); + ks_log(KS_LOG_INFO, + "[%s %d] Get query to %s %d\n", + message->endpoint->addr.host, + message->endpoint->addr.port, + message->raddr.host, + message->raddr.port); + if (item) { ks_dht_storageitem_dereference(item); ks_mutex_unlock(item->mutex); @@ -2583,6 +2611,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ks_assert(message); ks_assert(message->args); + ks_log(KS_LOG_INFO, + "[%s %d] Get query from %s %d\n", + message->endpoint->addr.host, + message->endpoint->addr.port, + message->raddr.host, + message->raddr.port); + if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) { ks_dht_error(dht, message->endpoint, @@ -2609,10 +2644,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t } ks_hash_read_unlock(dht->storageitems_hash); - // If the item is mutable and available locally and a specific sequence was requested and the local item is not newer then do not send k, sig, or v back sequence_snuffed = item && sequence >= 0 && item->seq <= sequence; - // @todo if sequence is explicitly provided then requester has the data, so if the local sequence is lower - // maybe send a get query to the requester to update the local data query.nodeid = *target; query.type = KS_DHT_REMOTE; @@ -2734,6 +2766,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t ks_assert(dht); ks_assert(job); + ks_log(KS_LOG_INFO, + "[%s %d] Get response from %s %d\n", + job->response->endpoint->addr.host, + job->response->endpoint->addr.port, + job->response->raddr.host, + job->response->raddr.port); + if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done; job->response_token = *token; @@ -2928,7 +2967,13 @@ KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job) ben_dict_set(a, ben_blob("token", 5), ben_blob(job->query_token.token, KS_DHT_TOKEN_SIZE)); ben_dict_set(a, ben_blob("v", 1), ben_clone(job->query_storageitem->v)); - //ks_log(KS_LOG_DEBUG, "Sending message query put\n"); + ks_log(KS_LOG_INFO, + "[%s %d] Put query to %s %d\n", + message->endpoint->addr.host, + message->endpoint->addr.port, + message->raddr.host, + message->raddr.port); + ks_q_push(dht->send_q, (void *)message); return KS_STATUS_SUCCESS; @@ -2958,6 +3003,12 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t ks_assert(message); ks_assert(message->args); + ks_log(KS_LOG_INFO, + "[%s %d] Put query from %s %d\n", + message->endpoint->addr.host, + message->endpoint->addr.port, + message->raddr.host, + message->raddr.port); if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) { ks_dht_error(dht, @@ -3186,7 +3237,14 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t ks_assert(dht); ks_assert(job); - ks_log(KS_LOG_DEBUG, "Message response put is reached\n"); + ks_log(KS_LOG_INFO, + "[%s %d] Put response from %s %d\n", + job->response->endpoint->addr.host, + job->response->endpoint->addr.port, + job->response->raddr.host, + job->response->raddr.port); + + //ks_log(KS_LOG_DEBUG, "Message response put is reached\n"); // done: return ret; diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 8d183a4c4a..3652c47a39 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -150,7 +150,6 @@ struct ks_dht_job_s { int64_t query_cas; ks_dht_token_t query_token; ks_dht_storageitem_t *query_storageitem; - int32_t query_family; // error response parameters int64_t error_code; diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index a88240f652..5cf29b7677 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -110,29 +110,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, - uint8_t *transactionid, - ks_size_t transactionid_length, - struct bencode **args) -{ - struct bencode *r; - - ks_assert(message); - ks_assert(transactionid); - - ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); - ben_dict_set(message->data, ben_blob("y", 1), ben_blob("r", 1)); - - // @note r joins message->data and will be freed with it - r = ben_dict(); - ks_assert(r); - ben_dict_set(message->data, ben_blob("r", 1), r); - - if (args) *args = r; - - return KS_STATUS_SUCCESS; -} - /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index fd4ab7964c..59eee76cf4 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -93,7 +93,7 @@ int main() { err = ks_init(); ok(!err); - ks_global_set_default_logger(7); + ks_global_set_default_logger(KS_LOG_LEVEL_INFO); err = ks_find_local_ip(v4, sizeof(v4), &mask, AF_INET, NULL); ok(err == KS_STATUS_SUCCESS); @@ -172,7 +172,7 @@ int main() { ks_dht_ping(dht2, &raddr1, NULL, NULL); // (QUERYING) ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING) - + ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet