FS-9775: Tweaks, bug fixes, etc. Committing in preparation for introducing into libblade.

This commit is contained in:
Shane Bryldt 2017-01-05 16:19:55 +00:00 committed by Mike Jerris
parent 66fdf5fa19
commit aaa13f3ba6
5 changed files with 185 additions and 151 deletions

View File

@ -21,6 +21,14 @@ KS_BEGIN_EXTERN_C
*/
KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint);
/**
* Called internally to receive datagrams from endpoints.
* Handles datagrams by dispatching each through a threadpool.
* @param dht pointer to the dht instance
* @param timeout time in ms to wait for an incoming datagram on any endpoint
*/
KS_DECLARE(void) ks_dht_pulse_endpoints(ks_dht_t *dht, int32_t timeout);
/**
* Called internally to expire or reannounce storage item data.
* Handles reannouncing and purging of expiring storage items.
@ -334,14 +342,6 @@ KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message);
*/
KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length);
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
uint8_t *transactionid,
ks_size_t transactionid_length,
struct bencode **args);
/**
*

View File

@ -229,36 +229,46 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
d = *dht;
// @todo ks_dht_shutdown to stop further incoming data and pulse to finish processing and flushing data
/**
* Cleanup the storageitems hash and it's contents if it is allocated.
* Cleanup the type, query, and error registries if they have been allocated.
* No dependancies during destruction, entries are function pointers with no cleanup required.
*/
if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash);
d->storageitems_pulse = 0;
if (d->registry_type) ks_hash_destroy(&d->registry_type);
if (d->registry_query) ks_hash_destroy(&d->registry_query);
if (d->registry_error) ks_hash_destroy(&d->registry_error);
/**
* Cleanup the endpoint management and entries if they have been allocated.
* No dependancies during destruction, entries are destroyed through hash destructor.
* Sockets are closed during entry destruction, and route table references to local nodes are released.
*/
if (d->endpoints) ks_pool_free(d->pool, &d->endpoints);
if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll);
if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash);
/**
* Zero out the opaque write token variables.
* Cleanup the transaction management and entries if they have been allocated.
* No dependancies during destruction, entries are destroyed through hash destructor.
*/
d->token_secret_current = 0;
d->token_secret_previous = 0;
d->token_secret_expiration = 0;
d->tokens_pulse = 0;
/**
* Cleanup the route tables if they are allocated.
*/
if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4);
if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6);
/**
* Cleanup the transactions mutex and hash if they are allocated.
*/
d->transactionid_next = 0;
if (d->transactionid_mutex) ks_mutex_destroy(&d->transactionid_mutex);
if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash);
d->transactions_pulse = 0;
/**
* Cleanup the jobs mutex and jobs if they are allocated.
* Cleanup the message send queue and entries if they have been allocated.
* No dependancies during destruction, entries must be destroyed here.
*/
if (d->send_q) {
ks_dht_message_t *msg;
while (ks_q_pop_timeout(d->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) ks_dht_message_destroy(&msg);
ks_q_destroy(&d->send_q);
}
if (d->send_q_unsent) ks_dht_message_destroy(&d->send_q_unsent);
/**
* Cleanup the jobs management and entries if they have been allocated.
* Route table node and storage item references are released, entries must be destroyed here.
*/
for (ks_dht_job_t *job = d->jobs_first, *jobn = NULL; job; job = jobn) {
jobn = job->next;
@ -267,65 +277,25 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
if (d->jobs_mutex) ks_mutex_destroy(&d->jobs_mutex);
/**
* Probably don't need this, recv_buffer_length is temporary and may change
* Cleanup the storageitems hash and it's contents if it is allocated.
* No dependancies during destruction, entries are destroyed through hash destructor.
*/
d->recv_buffer_length = 0;
if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash);
/**
* Cleanup the send queue and it's contents if it is allocated.
* Cleanup the route tables if they are allocated.
* No nodes should be referenced anymore by this point.
*/
if (d->send_q) {
ks_dht_message_t *msg;
while (ks_q_pop_timeout(d->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) ks_dht_message_destroy(&msg);
ks_q_destroy(&d->send_q);
}
/**
* Cleanup the cached popped message if it is set.
*/
if (d->send_q_unsent) ks_dht_message_destroy(&d->send_q_unsent);
if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4);
if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6);
/**
* Probably don't need this
*/
d->endpoints_length = 0;
d->endpoints_size = 0;
/**
* Cleanup the array of endpoint pointers if it is allocated.
*/
if (d->endpoints) ks_pool_free(d->pool, &d->endpoints);
/**
* Cleanup the array of endpoint polling data if it is allocated.
*/
if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll);
/**
* Cleanup the endpoints hash if it is allocated, and any endpoints that have been allocated.
*/
if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash);
/**
* Cleanup the type, query, and error registries if they have been allocated.
*/
if (d->registry_type) ks_hash_destroy(&d->registry_type);
if (d->registry_query) ks_hash_destroy(&d->registry_query);
if (d->registry_error) ks_hash_destroy(&d->registry_error);
/**
* Probably don't need this
*/
d->autoroute = KS_FALSE;
d->autoroute_port = 0;
/**
* If the thread pool was allocated internally, destroy it.
* If this fails, something catastrophically bad happened like memory corruption.
*/
if (d->tpool_alloc) ks_thread_pool_destroy(&d->tpool);
d->tpool_alloc = KS_FALSE;
/**
* Temporarily store the allocator level variables because freeing the dht instance will invalidate it.
*/
@ -335,16 +305,11 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht)
/**
* Free the dht instance from the pool, after this the dht instance memory is invalid.
*/
ks_pool_free(d->pool, &d);
/**
* At this point dht instance is invalidated so NULL the pointer.
*/
*dht = d = NULL;
ks_pool_free(d->pool, dht);
d = NULL;
/**
* If the pool was allocated internally, destroy it using the temporary variables stored earlier.
* If this fails, something catastrophically bad happened like memory corruption.
*/
if (pool_alloc) ks_pool_close(&pool);
}
@ -587,20 +552,39 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
{
ks_dht_datagram_t *datagram = NULL;
ks_sockaddr_t raddr;
ks_assert(dht);
ks_assert(timeout >= 0 && timeout <= 1000);
// this should be called with a timeout of less than 1000ms, preferrably around 100ms
ks_dht_pulse_endpoints(dht, timeout);
if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
ks_dht_pulse_storageitems(dht);
ks_dht_pulse_jobs(dht);
ks_dht_pulse_send(dht);
ks_dht_pulse_transactions(dht);
ks_dht_pulse_tokens(dht);
}
KS_DECLARE(void) ks_dht_pulse_endpoints(ks_dht_t *dht, int32_t timeout)
{
ks_assert(dht);
ks_assert(timeout >= 0 && timeout <= 1000);
if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0;
if (ks_poll(dht->endpoints_poll, dht->endpoints_length, timeout) > 0) {
for (int32_t i = 0; i < dht->endpoints_length; ++i) {
ks_dht_datagram_t *datagram = NULL;
ks_sockaddr_t raddr = (const ks_sockaddr_t){ 0 };
if (!(dht->endpoints_poll[i].revents & POLLIN)) continue;
raddr = (const ks_sockaddr_t){ 0 };
dht->recv_buffer_length = sizeof(dht->recv_buffer);
raddr.family = dht->endpoints[i]->addr.family;
if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) != KS_STATUS_SUCCESS) continue;
@ -616,19 +600,6 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
if (ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram);
}
}
if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4);
if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6);
ks_dht_pulse_storageitems(dht);
ks_dht_pulse_jobs(dht);
ks_dht_pulse_send(dht);
ks_dht_pulse_transactions(dht);
ks_dht_pulse_tokens(dht);
}
KS_DECLARE(void) ks_dht_pulse_storageitems(ks_dht_t *dht)
@ -1702,6 +1673,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo
ks_size_t nodes_count = 0;
ks_dht_nodeid_t distance;
int32_t results_index = -1;
ks_bool_t finished = KS_FALSE;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
@ -1713,7 +1685,10 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo
ks_mutex_lock(search->mutex);
search->searching--;
if (job->result != KS_DHT_JOB_RESULT_SUCCESS) goto done;
if (job->result != KS_DHT_JOB_RESULT_SUCCESS) {
finished = KS_TRUE;
goto done;
}
ks_dht_utility_nodeid_xor(&distance, &job->response_id->nodeid, &search->target);
if (search->results_length < KS_DHT_SEARCH_RESULTS_MAX_SIZE) {
@ -1781,11 +1756,12 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo
ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target);
}
}
finished = search->searching == 0;
done:
ks_mutex_unlock(search->mutex);
if (search->searching == 0) {
if (finished) {
if (search->callback) search->callback(dht, job);
ks_dht_search_destroy(&search);
}
@ -1795,9 +1771,9 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo
KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job)
{
ks_bool_t locked_search = KS_FALSE;
ks_dht_search_t *search = NULL;
ks_dhtrt_querynodes_t query;
ks_bool_t finished = KS_FALSE;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
@ -1807,7 +1783,6 @@ KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job)
search = (ks_dht_search_t *)job->data;
ks_mutex_lock(search->mutex);
locked_search = KS_TRUE;
// find closest good nodes to target locally and store as the closest results
query.nodeid = search->target;
@ -1828,11 +1803,12 @@ KS_DECLARE(ks_status_t) ks_dht_query_search(ks_dht_t *dht, ks_dht_job_t *job)
ks_dht_findnode(dht, &node->addr, ks_dht_search_findnode_callback, search, &search->target);
}
ks_dhtrt_release_querynodes(&query);
finished = search->searching == 0;
// done:
if (locked_search) ks_mutex_unlock(search->mutex);
ks_mutex_unlock(search->mutex);
if (search->searching == 0) {
if (finished) {
if (search->callback) search->callback(dht, job);
ks_dht_search_destroy(&search);
}
@ -1846,6 +1822,7 @@ KS_DECLARE(void) ks_dht_search(ks_dht_t *dht,
ks_dhtrt_routetable_t *table,
ks_dht_nodeid_t *target)
{
char target_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_dht_search_t *search = NULL;
ks_dht_job_t *job = NULL;
@ -1853,6 +1830,8 @@ KS_DECLARE(void) ks_dht_search(ks_dht_t *dht,
ks_assert(table);
ks_assert(target);
ks_log(KS_LOG_INFO, "[%s] Searching\n", ks_dht_hex(target->id, target_buf, KS_DHT_NODEID_SIZE));
ks_dht_search_create(&search, dht->pool, table, target, callback, data);
ks_assert(search);
@ -1866,6 +1845,7 @@ KS_DECLARE(void) ks_dht_search(ks_dht_t *dht,
KS_DECLARE(ks_status_t) ks_dht_publish_get_callback(ks_dht_t *dht, ks_dht_job_t *job)
{
ks_dht_publish_t *publish = NULL;
ks_bool_t finished = KS_FALSE;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
@ -1874,22 +1854,20 @@ KS_DECLARE(ks_status_t) ks_dht_publish_get_callback(ks_dht_t *dht, ks_dht_job_t
publish = (ks_dht_publish_t *)job->data;
// @todo callbacks need job to contain cascaded publish->data before calling
if (job->result != KS_DHT_JOB_RESULT_SUCCESS) {
job->data = publish->data;
if (publish->callback) publish->callback(dht, job);
finished = KS_TRUE;
goto done;
}
if (!job->response_hasitem || (publish->item->mutable && job->response_seq < publish->item->seq)) {
ks_dht_put(dht, &job->raddr, publish->callback, publish->data, &job->response_token, publish->cas, publish->item);
} else if (publish->callback) {
job->data = publish->data;
publish->callback(dht, job);
}
} else finished = KS_TRUE;
done:
if (finished) {
job->data = publish->data;
if (publish->callback) publish->callback(dht, job);
}
ks_dht_publish_destroy(&publish);
return ret;
}
@ -1901,6 +1879,7 @@ KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht,
int64_t cas,
ks_dht_storageitem_t *item)
{
char target_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_dht_publish_t *publish = NULL;
const uint8_t *salt = NULL;
size_t salt_length = 0;
@ -1910,6 +1889,8 @@ KS_DECLARE(void) ks_dht_publish(ks_dht_t *dht,
ks_assert(cas >= 0);
ks_assert(item);
ks_log(KS_LOG_INFO, "[%s] Publishing to %s %d\n", ks_dht_hex(item->id.id, target_buf, KS_DHT_NODEID_SIZE), raddr->host, raddr->port);
if (item->salt) {
salt = (const uint8_t *)ben_str_val(item->salt);
salt_length = ben_str_len(item->salt);
@ -1977,8 +1958,6 @@ KS_DECLARE(ks_status_t) ks_dht_distribute_search_callback(ks_dht_t *dht, ks_dht_
ks_dht_distribute_destroy(&distribute);
}
ks_dht_search_destroy(&search);
return ret;
}
@ -1989,12 +1968,15 @@ KS_DECLARE(void) ks_dht_distribute(ks_dht_t *dht,
int64_t cas,
ks_dht_storageitem_t *item)
{
char target_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_dht_distribute_t *distribute = NULL;
ks_assert(dht);
ks_assert(table);
ks_assert(cas >= 0);
ks_assert(item);
ks_log(KS_LOG_INFO, "[%s] Distributing\n", ks_dht_hex(item->id.id, target_buf, KS_DHT_NODEID_SIZE));
ks_dht_distribute_create(&distribute, dht->pool, callback, data, cas, item);
ks_assert(distribute);
@ -2201,7 +2183,13 @@ KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job)
&message,
NULL)) != KS_STATUS_SUCCESS) goto done;
//ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
ks_log(KS_LOG_INFO,
"[%s %d] Ping query to %s %d\n",
message->endpoint->addr.host,
message->endpoint->addr.port,
message->raddr.host,
message->raddr.port);
ks_q_push(dht->send_q, (void *)message);
done:
@ -2217,6 +2205,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
ks_assert(message);
ks_assert(message->args);
ks_log(KS_LOG_INFO,
"[%s %d] Ping query from %s %d\n",
message->endpoint->addr.host,
message->endpoint->addr.port,
message->raddr.host,
message->raddr.port);
//ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
if ((ret = ks_dht_response_setup(dht,
@ -2241,6 +2236,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t
ks_assert(dht);
ks_assert(job);
ks_log(KS_LOG_INFO,
"[%s %d] Ping response from %s %d\n",
job->response->endpoint->addr.host,
job->response->endpoint->addr.port,
job->response->raddr.host,
job->response->raddr.port);
//ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
// done:
@ -2297,7 +2299,13 @@ KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job)
ben_dict_set(a, ben_blob("want", 4), want);
}
//ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
ks_log(KS_LOG_INFO,
"[%s %d] Findnode query to %s %d\n",
message->endpoint->addr.host,
message->endpoint->addr.port,
message->raddr.host,
message->raddr.port);
ks_q_push(dht->send_q, (void *)message);
done:
@ -2324,6 +2332,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
ks_assert(message);
ks_assert(message->args);
ks_log(KS_LOG_INFO,
"[%s %d] Findnode query from %s %d\n",
message->endpoint->addr.host,
message->endpoint->addr.port,
message->raddr.host,
message->raddr.port);
if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
@ -2449,6 +2464,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
ks_assert(dht);
ks_assert(job);
ks_log(KS_LOG_INFO,
"[%s %d] Findnode response from %s %d\n",
job->response->endpoint->addr.host,
job->response->endpoint->addr.port,
job->response->raddr.host,
job->response->raddr.port);
n = ben_dict_get_by_str(job->response->args, "nodes");
if (n && dht->rt_ipv4) {
//n4 = KS_TRUE;
@ -2551,7 +2573,13 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
if (item && item->mutable && item->seq > 0) ben_dict_set(a, ben_blob("seq", 3), ben_int(item->seq));
ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
//ks_log(KS_LOG_DEBUG, "Sending message query get\n");
ks_log(KS_LOG_INFO,
"[%s %d] Get query to %s %d\n",
message->endpoint->addr.host,
message->endpoint->addr.port,
message->raddr.host,
message->raddr.port);
if (item) {
ks_dht_storageitem_dereference(item);
ks_mutex_unlock(item->mutex);
@ -2583,6 +2611,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
ks_assert(message);
ks_assert(message->args);
ks_log(KS_LOG_INFO,
"[%s %d] Get query from %s %d\n",
message->endpoint->addr.host,
message->endpoint->addr.port,
message->raddr.host,
message->raddr.port);
if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
message->endpoint,
@ -2609,10 +2644,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
}
ks_hash_read_unlock(dht->storageitems_hash);
// If the item is mutable and available locally and a specific sequence was requested and the local item is not newer then do not send k, sig, or v back
sequence_snuffed = item && sequence >= 0 && item->seq <= sequence;
// @todo if sequence is explicitly provided then requester has the data, so if the local sequence is lower
// maybe send a get query to the requester to update the local data
query.nodeid = *target;
query.type = KS_DHT_REMOTE;
@ -2734,6 +2766,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t
ks_assert(dht);
ks_assert(job);
ks_log(KS_LOG_INFO,
"[%s %d] Get response from %s %d\n",
job->response->endpoint->addr.host,
job->response->endpoint->addr.port,
job->response->raddr.host,
job->response->raddr.port);
if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
job->response_token = *token;
@ -2928,7 +2967,13 @@ KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job)
ben_dict_set(a, ben_blob("token", 5), ben_blob(job->query_token.token, KS_DHT_TOKEN_SIZE));
ben_dict_set(a, ben_blob("v", 1), ben_clone(job->query_storageitem->v));
//ks_log(KS_LOG_DEBUG, "Sending message query put\n");
ks_log(KS_LOG_INFO,
"[%s %d] Put query to %s %d\n",
message->endpoint->addr.host,
message->endpoint->addr.port,
message->raddr.host,
message->raddr.port);
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
@ -2958,6 +3003,12 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
ks_assert(message);
ks_assert(message->args);
ks_log(KS_LOG_INFO,
"[%s %d] Put query from %s %d\n",
message->endpoint->addr.host,
message->endpoint->addr.port,
message->raddr.host,
message->raddr.port);
if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) {
ks_dht_error(dht,
@ -3186,7 +3237,14 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t
ks_assert(dht);
ks_assert(job);
ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
ks_log(KS_LOG_INFO,
"[%s %d] Put response from %s %d\n",
job->response->endpoint->addr.host,
job->response->endpoint->addr.port,
job->response->raddr.host,
job->response->raddr.port);
//ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
// done:
return ret;

View File

@ -150,7 +150,6 @@ struct ks_dht_job_s {
int64_t query_cas;
ks_dht_token_t query_token;
ks_dht_storageitem_t *query_storageitem;
int32_t query_family;
// error response parameters
int64_t error_code;

View File

@ -110,29 +110,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
uint8_t *transactionid,
ks_size_t transactionid_length,
struct bencode **args)
{
struct bencode *r;
ks_assert(message);
ks_assert(transactionid);
ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
ben_dict_set(message->data, ben_blob("y", 1), ben_blob("r", 1));
// @note r joins message->data and will be freed with it
r = ben_dict();
ks_assert(r);
ben_dict_set(message->data, ben_blob("r", 1), r);
if (args) *args = r;
return KS_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -93,7 +93,7 @@ int main() {
err = ks_init();
ok(!err);
ks_global_set_default_logger(7);
ks_global_set_default_logger(KS_LOG_LEVEL_INFO);
err = ks_find_local_ip(v4, sizeof(v4), &mask, AF_INET, NULL);
ok(err == KS_STATUS_SUCCESS);
@ -172,7 +172,7 @@ int main() {
ks_dht_ping(dht2, &raddr1, NULL, NULL); // (QUERYING)
ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING)
ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response
ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet