FS-9775: Most of search functionality is finished, needs testing when route table is ready, still reviewing a few things related to recent lock changes

This commit is contained in:
Shane Bryldt 2016-12-16 01:58:21 +00:00 committed by Mike Jerris
parent 68e5321da0
commit 73e4c22255
8 changed files with 329 additions and 320 deletions

View File

@ -15,16 +15,16 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
/** /**
* Create a new internally managed pool if one wasn't provided, and returns KS_STATUS_NO_MEM if pool was not created. * Create a new internally managed pool if one wasn't provided, and returns KS_STATUS_NO_MEM if pool was not created.
*/ */
if (pool_alloc && (ret = ks_pool_open(&pool)) != KS_STATUS_SUCCESS) goto done; if (pool_alloc) {
ks_pool_open(&pool);
ks_assert(pool);
}
/** /**
* Allocate the dht instance from the pool, and returns KS_STATUS_NO_MEM if the dht was not created. * Allocate the dht instance from the pool, and returns KS_STATUS_NO_MEM if the dht was not created.
*/ */
*dht = d = ks_pool_alloc(pool, sizeof(ks_dht_t)); *dht = d = ks_pool_alloc(pool, sizeof(ks_dht_t));
if (!d) { ks_assert(d);
ret = KS_STATUS_NO_MEM;
goto done;
}
/** /**
* Keep track of the pool used for future allocations and cleanup. * Keep track of the pool used for future allocations and cleanup.
@ -39,12 +39,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
d->tpool = tpool; d->tpool = tpool;
if (!tpool) { if (!tpool) {
d->tpool_alloc = KS_TRUE; d->tpool_alloc = KS_TRUE;
if ((ret = ks_thread_pool_create(&d->tpool, ks_thread_pool_create(&d->tpool, KS_DHT_TPOOL_MIN, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
KS_DHT_TPOOL_MIN, ks_assert(d->tpool);
KS_DHT_TPOOL_MAX,
KS_DHT_TPOOL_STACK,
KS_PRI_NORMAL,
KS_DHT_TPOOL_IDLE)) != KS_STATUS_SUCCESS) goto done;
} }
/** /**
@ -56,10 +52,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
/** /**
* Create the message type registry. * Create the message type registry.
*/ */
if ((ret = ks_hash_create(&d->registry_type, ks_hash_create(&d->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
KS_HASH_MODE_DEFAULT, ks_assert(d->registry_type);
KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
d->pool)) != KS_STATUS_SUCCESS) goto done;
/** /**
* Register the message type callbacks for query (q), response (r), and error (e) * Register the message type callbacks for query (q), response (r), and error (e)
@ -71,10 +65,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
/** /**
* Create the message query registry. * Create the message query registry.
*/ */
if ((ret = ks_hash_create(&d->registry_query, ks_hash_create(&d->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
KS_HASH_MODE_DEFAULT, ks_assert(d->registry_query);
KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
d->pool)) != KS_STATUS_SUCCESS) goto done;
/** /**
* Register the message query callbacks for ping, find_node, etc. * Register the message query callbacks for ping, find_node, etc.
@ -87,10 +79,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
/** /**
* Create the message error registry. * Create the message error registry.
*/ */
if ((ret = ks_hash_create(&d->registry_error, ks_hash_create(&d->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
KS_HASH_MODE_DEFAULT, ks_assert(d->registry_error);
KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
d->pool)) != KS_STATUS_SUCCESS) goto done;
// @todo register 301 error for internal get/put CAS hash mismatch retry handler // @todo register 301 error for internal get/put CAS hash mismatch retry handler
/** /**
@ -113,20 +103,19 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
* This also provides the basis for autorouting to find unbound interfaces and bind them at runtime. * This also provides the basis for autorouting to find unbound interfaces and bind them at runtime.
* This hash uses the host ip string concatenated with a colon and the port, ie: "123.123.123.123:123" or ipv6 equivilent * This hash uses the host ip string concatenated with a colon and the port, ie: "123.123.123.123:123" or ipv6 equivilent
*/ */
if ((ret = ks_hash_create(&d->endpoints_hash, ks_hash_create(&d->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, d->pool);
KS_HASH_MODE_DEFAULT, ks_assert(d->endpoints_hash);
KS_HASH_FLAG_RWLOCK,
d->pool)) != KS_STATUS_SUCCESS) goto done;
/** /**
* Default expirations to not be checked for one pulse. * Default expirations to not be checked for one pulse.
*/ */
d->pulse_expirations = ks_time_now_sec() + KS_DHT_PULSE_EXPIRATIONS; d->pulse_expirations = ks_time_now() + (KS_DHT_PULSE_EXPIRATIONS * 1000);
/** /**
* Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full. * Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full.
*/ */
if ((ret = ks_q_create(&d->send_q, d->pool, 0)) != KS_STATUS_SUCCESS) goto done; ks_q_create(&d->send_q, d->pool, 0);
ks_assert(d->send_q);
/** /**
* If a message is popped from the queue for sending but the system buffers are too full, this is used to temporarily store the message. * If a message is popped from the queue for sending but the system buffers are too full, this is used to temporarily store the message.
@ -141,7 +130,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
/** /**
* Initialize the transaction id mutex, should use atomic increment instead * Initialize the transaction id mutex, should use atomic increment instead
*/ */
if ((ret = ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool)) != KS_STATUS_SUCCESS) goto done; ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool);
ks_assert(d->tid_mutex);
/** /**
* Initialize the first transaction id randomly, this doesn't really matter. * Initialize the first transaction id randomly, this doesn't really matter.
@ -152,10 +142,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
* Create the hash to track pending transactions on queries that are pending responses. * Create the hash to track pending transactions on queries that are pending responses.
* It should be impossible to receive a duplicate transaction id in the hash before it expires, but if it does an error is preferred. * It should be impossible to receive a duplicate transaction id in the hash before it expires, but if it does an error is preferred.
*/ */
if ((ret = ks_hash_create(&d->transactions_hash, ks_hash_create(&d->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, d->pool);
KS_HASH_MODE_INT, ks_assert(d->transactions_hash);
KS_HASH_FLAG_RWLOCK,
d->pool)) != KS_STATUS_SUCCESS) goto done;
/** /**
* The internal route tables will be latent allocated when binding. * The internal route tables will be latent allocated when binding.
@ -166,10 +154,9 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
/** /**
* Create the hash to store searches. * Create the hash to store searches.
*/ */
if ((ret = ks_hash_create(&d->search_hash, ks_hash_create(&d->search_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
KS_HASH_MODE_ARBITRARY, ks_assert(d->search_hash);
KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
d->pool)) != KS_STATUS_SUCCESS) goto done;
/** /**
* The search hash uses arbitrary key size, which requires the key size be provided. * The search hash uses arbitrary key size, which requires the key size be provided.
*/ */
@ -179,21 +166,20 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
* The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets. * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
*/ */
d->token_secret_current = d->token_secret_previous = rand(); d->token_secret_current = d->token_secret_previous = rand();
d->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION; d->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
/** /**
* Create the hash to store arbitrary data for BEP44. * Create the hash to store arbitrary data for BEP44.
*/ */
if ((ret = ks_hash_create(&d->storage_hash, ks_hash_create(&d->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
KS_HASH_MODE_ARBITRARY, ks_assert(d->storage_hash);
KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK,
d->pool)) != KS_STATUS_SUCCESS) goto done;
/** /**
* The storage hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's. * The storage hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's.
*/ */
ks_hash_set_keysize(d->storage_hash, KS_DHT_NODEID_SIZE); ks_hash_set_keysize(d->storage_hash, KS_DHT_NODEID_SIZE);
done: // done:
if (ret != KS_STATUS_SUCCESS) { if (ret != KS_STATUS_SUCCESS) {
if (d) ks_dht_destroy(&d); if (d) ks_dht_destroy(&d);
else if (pool_alloc && pool) ks_pool_close(&pool); else if (pool_alloc && pool) ks_pool_close(&pool);
@ -442,7 +428,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k
ks_assert(value); ks_assert(value);
ks_assert(callback); ks_assert(callback);
return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback);
} }
KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
@ -451,7 +437,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value,
ks_assert(value); ks_assert(value);
ks_assert(callback); ks_assert(callback);
return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback);
} }
KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback)
@ -460,7 +446,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value,
ks_assert(value); ks_assert(value);
ks_assert(callback); ks_assert(callback);
return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback);
} }
@ -482,7 +468,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
if (endpoint) *endpoint = NULL; if (endpoint) *endpoint = NULL;
ep = ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_READLOCKED); ep = ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_READLOCKED);
if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret; ks_hash_read_unlock(dht->endpoints_hash);
if (ep) { if (ep) {
ks_log(KS_LOG_DEBUG, "Attempted to bind to %s more than once.\n", addr->host); ks_log(KS_LOG_DEBUG, "Attempted to bind to %s more than once.\n", addr->host);
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
@ -514,7 +500,8 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
/** /**
* Allocate the endpoint to track the local socket. * Allocate the endpoint to track the local socket.
*/ */
if ((ret = ks_dht_endpoint_create(&ep, dht->pool, nodeid, addr, sock)) != KS_STATUS_SUCCESS) goto done; ks_dht_endpoint_create(&ep, dht->pool, nodeid, addr, sock);
ks_assert(ep);
/** /**
* Resize the endpoints array to take another endpoint pointer. * Resize the endpoints array to take another endpoint pointer.
@ -523,15 +510,14 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool, dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool,
(void *)dht->endpoints, (void *)dht->endpoints,
sizeof(ks_dht_endpoint_t *) * dht->endpoints_size); sizeof(ks_dht_endpoint_t *) * dht->endpoints_size);
ks_assert(dht->endpoints);
dht->endpoints[epindex] = ep; dht->endpoints[epindex] = ep;
/** /**
* Add the new endpoint into the endpoints hash for quick lookups. * Add the new endpoint into the endpoints hash for quick lookups.
* @todo insert returns 0 when OOM, ks_pool_alloc will abort so insert can only succeed
*/ */
if (!ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) { if ((ret = ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) != KS_STATUS_SUCCESS) goto done;
ret = KS_STATUS_FAIL;
goto done;
}
/** /**
* Resize the endpoints_poll array to keep in parallel with endpoints array, populate new entry with the right data. * Resize the endpoints_poll array to keep in parallel with endpoints array, populate new entry with the right data.
@ -539,6 +525,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool, dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool,
(void *)dht->endpoints_poll, (void *)dht->endpoints_poll,
sizeof(struct pollfd) * dht->endpoints_size); sizeof(struct pollfd) * dht->endpoints_size);
ks_assert(dht->endpoints_poll);
dht->endpoints_poll[epindex].fd = ep->sock; dht->endpoints_poll[epindex].fd = ep->sock;
dht->endpoints_poll[epindex].events = POLLIN | POLLERR; dht->endpoints_poll[epindex].events = POLLIN | POLLERR;
@ -553,9 +540,6 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
ep->addr.host, ep->addr.host,
ep->addr.port, ep->addr.port,
&ep->node)) != KS_STATUS_SUCCESS) goto done; &ep->node)) != KS_STATUS_SUCCESS) goto done;
/**
* Do not release the ep->node, keep it alive until cleanup
*/
} else { } else {
if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool)) != KS_STATUS_SUCCESS) goto done; if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_create_node(dht->rt_ipv6, if ((ret = ks_dhtrt_create_node(dht->rt_ipv6,
@ -564,18 +548,16 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
ep->addr.host, ep->addr.host,
ep->addr.port, ep->addr.port,
&ep->node)) != KS_STATUS_SUCCESS) goto done; &ep->node)) != KS_STATUS_SUCCESS) goto done;
}
/** /**
* Do not release the ep->node, keep it alive until cleanup * Do not release the ep->node, keep it alive until cleanup
*/ */
}
/** /**
* If the endpoint output is being captured, assign it and return successfully. * If the endpoint output is being captured, assign it and return successfully.
*/ */
if (endpoint) *endpoint = ep; if (endpoint) *endpoint = ep;
ret = KS_STATUS_SUCCESS;
done: done:
if (ret != KS_STATUS_SUCCESS) { if (ret != KS_STATUS_SUCCESS) {
/** /**
@ -583,7 +565,10 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
* This will be done in ks_dht_endpoint_destroy only if the socket was assigned during a successful ks_dht_endpoint_create. * This will be done in ks_dht_endpoint_destroy only if the socket was assigned during a successful ks_dht_endpoint_create.
* Then return whatever failure condition resulted in landed here. * Then return whatever failure condition resulted in landed here.
*/ */
if (ep) ks_dht_endpoint_destroy(&ep); if (ep) {
ks_hash_remove(dht->endpoints_hash, ep->addr.host);
ks_dht_endpoint_destroy(&ep);
}
else if (sock != KS_SOCK_INVALID) ks_socket_close(&sock); else if (sock != KS_SOCK_INVALID) ks_socket_close(&sock);
if (endpoint) *endpoint = NULL; if (endpoint) *endpoint = NULL;
@ -594,7 +579,6 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
{ {
ks_dht_datagram_t *datagram = NULL; ks_dht_datagram_t *datagram = NULL;
int32_t result;
ks_sockaddr_t raddr; ks_sockaddr_t raddr;
ks_assert(dht); ks_assert(dht);
@ -602,8 +586,8 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0; if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0;
result = ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout); // @todo confirm how poll/wsapoll react to zero size and NULL array
if (result > 0) { if (ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout) > 0) {
for (int32_t i = 0; i < dht->endpoints_size; ++i) { for (int32_t i = 0; i < dht->endpoints_size; ++i) {
if (!(dht->endpoints_poll[i].revents & POLLIN)) continue; if (!(dht->endpoints_poll[i].revents & POLLIN)) continue;
@ -617,8 +601,10 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
continue; continue;
} }
if (ks_dht_datagram_create(&datagram, dht->pool, dht, dht->endpoints[i], &raddr) == KS_STATUS_SUCCESS && ks_dht_datagram_create(&datagram, dht->pool, dht, dht->endpoints[i], &raddr);
ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram); ks_assert(datagram);
if (ks_thread_pool_add_job(dht->tpool, ks_dht_process, datagram) != KS_STATUS_SUCCESS) ks_dht_datagram_destroy(&datagram);
} }
} }
@ -633,13 +619,12 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout)
KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
{ {
ks_hash_iterator_t *it = NULL; ks_hash_iterator_t *it = NULL;
ks_time_t now = ks_time_now_sec(); ks_time_t now = ks_time_now();
ks_assert(dht); ks_assert(dht);
if (dht->pulse_expirations <= now) { if (dht->pulse_expirations > now) return;
dht->pulse_expirations = now + KS_DHT_PULSE_EXPIRATIONS; dht->pulse_expirations = now + (KS_DHT_PULSE_EXPIRATIONS * 1000);
}
ks_hash_write_lock(dht->transactions_hash); ks_hash_write_lock(dht->transactions_hash);
for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
@ -660,8 +645,47 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
} }
ks_hash_write_unlock(dht->transactions_hash); ks_hash_write_unlock(dht->transactions_hash);
ks_hash_write_lock(dht->search_hash);
for (it = ks_hash_first(dht->search_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const void *search_key = NULL;
ks_dht_search_t *search_value = NULL;
ks_hash_this(it, &search_key, NULL, (void **)&search_value);
ks_hash_write_lock(search_value->pending);
for (ks_hash_iterator_t *i = ks_hash_first(search_value->pending, KS_UNLOCKED); i; i = ks_hash_next(&i)) {
const void *pending_key = NULL;
ks_dht_search_pending_t *pending_value = NULL;
ks_bool_t pending_remove = KS_FALSE;
ks_hash_this(i, &pending_key, NULL, (void **)&pending_value);
if (pending_value->finished) pending_remove = KS_TRUE;
else if (pending_value->expiration <= now) {
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_log(KS_LOG_DEBUG,
"Search for %s pending find_node to %s has expired without response\n",
ks_dht_hexid(&search_value->target, id_buf),
ks_dht_hexid(&pending_value->nodeid, id2_buf));
pending_remove = KS_TRUE;
}
if (pending_remove) {
ks_hash_remove(search_value->pending, (void *)pending_key);
ks_dht_search_pending_destroy(&pending_value);
}
}
ks_hash_write_unlock(search_value->pending);
if (ks_hash_count(search_value->pending) == 0) {
for (int32_t index = 0; index < search_value->callbacks_size; ++index) search_value->callbacks[index](dht, search_value);
ks_hash_remove(dht->search_hash, (void *)search_key);
ks_dht_search_destroy(&search_value);
}
}
ks_hash_write_unlock(dht->search_hash);
if (dht->token_secret_expiration && dht->token_secret_expiration <= now) { if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION; dht->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
dht->token_secret_previous = dht->token_secret_current; dht->token_secret_previous = dht->token_secret_current;
dht->token_secret_current = rand(); dht->token_secret_current = rand();
} }
@ -987,10 +1011,7 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht,
*message = msg; *message = msg;
if (!ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) { if ((ret = ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) != KS_STATUS_SUCCESS) goto done;
ret = KS_STATUS_FAIL;
goto done;
}
if (transaction) *transaction = trans; if (transaction) *transaction = trans;
@ -1176,11 +1197,13 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
ks_dht_search_callback_t callback, ks_dht_search_callback_t callback,
ks_dht_search_t **search) ks_dht_search_t **search)
{ {
ks_bool_t locked_search = KS_FALSE;
ks_bool_t locked_pending = KS_FALSE;
ks_dht_search_t *s = NULL; ks_dht_search_t *s = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_bool_t inserted = KS_FALSE; ks_bool_t inserted = KS_FALSE;
ks_bool_t allocated = KS_FALSE; ks_bool_t allocated = KS_FALSE;
ks_dhtrt_querynodes_t query; ks_dhtrt_querynodes_t query;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(family == AF_INET || family == AF_INET6); ks_assert(family == AF_INET || family == AF_INET6);
@ -1188,9 +1211,12 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
if (search) *search = NULL; if (search) *search = NULL;
// @todo start write lock on search_hash and hold until after inserting
// check hash for target to see if search already exists // check hash for target to see if search already exists
s = ks_hash_search(dht->search_hash, target->id, KS_READLOCKED); ks_hash_write_lock(dht->search_hash);
ks_hash_read_unlock(dht->search_hash); // @todo hold lock until finished adding new entry? locked_search = KS_TRUE;
s = ks_hash_search(dht->search_hash, target->id, KS_UNLOCKED);
// if search does not exist, create new search and store in hash by target // if search does not exist, create new search and store in hash by target
if (!s) { if (!s) {
@ -1204,6 +1230,17 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
// if the search is old then bail out and return successfully // if the search is old then bail out and return successfully
if (!allocated) goto done; if (!allocated) goto done;
if ((ret = ks_hash_insert(dht->search_hash, s->target.id, s)) == KS_STATUS_SUCCESS) goto done;
inserted = KS_TRUE;
// lock pending before unlocking the search hash to prevent this search from being used before we finish setting it up
ks_hash_write_lock(s->pending);
locked_pending = KS_TRUE;
// release search hash lock now, but pending is still locked
ks_hash_write_unlock(dht->search_hash);
locked_search = KS_FALSE;
// find closest good nodes to target locally and store as the closest results // find closest good nodes to target locally and store as the closest results
query.nodeid = *target; query.nodeid = *target;
query.type = KS_DHT_REMOTE; query.type = KS_DHT_REMOTE;
@ -1219,27 +1256,27 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht,
ks_dht_utility_nodeid_xor(&s->distances[i], &n->nodeid, &s->target); ks_dht_utility_nodeid_xor(&s->distances[i], &n->nodeid, &s->target);
// add to pending with expiration // add to pending with expiration
if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dht_search_pending_create(&pending, s->pool, &n->nodeid)) != KS_STATUS_SUCCESS) goto done;
if (!ks_hash_insert(s->pending, n->nodeid.id, pending)) { if ((ret = ks_hash_insert(s->pending, n->nodeid.id, pending)) != KS_STATUS_SUCCESS) {
ks_dht_search_pending_destroy(&pending); ks_dht_search_pending_destroy(&pending);
ret = KS_STATUS_FAIL;
goto done; goto done;
} }
if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dht_send_findnode(dht, NULL, &n->addr, target)) != KS_STATUS_SUCCESS) goto done;
// increment here in case we end up bailing out; execute with what it has or destroy the search?
s->results_length++;
} }
s->results_length = query.count;
// @todo release query nodes // @todo release query nodes
ks_hash_write_unlock(s->pending);
// @todo if entry has been added since we checked above this may fail, try adding callback instead of failing? or retain lock from earlier locked_pending = KS_FALSE;
if (!ks_hash_insert(dht->search_hash, s->target.id, s)) {
ret = KS_STATUS_FAIL;
goto done;
}
inserted = KS_TRUE;
if (search) *search = s; if (search) *search = s;
done: done:
if (ret != KS_STATUS_SUCCESS && !inserted && s) ks_dht_search_destroy(&s); if (locked_search) ks_hash_write_unlock(dht->search_hash);
if (locked_pending) ks_hash_write_unlock(s->pending);
if (ret != KS_STATUS_SUCCESS) {
if (!inserted && s) ks_dht_search_destroy(&s);
*search = NULL;
}
return ret; return ret;
} }
@ -1254,18 +1291,18 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
{ {
ks_dht_message_t *error = NULL; ks_dht_message_t *error = NULL;
struct bencode *e = NULL; struct bencode *e = NULL;
ks_status_t ret = KS_STATUS_FAIL; ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(raddr); ks_assert(raddr);
ks_assert(transactionid); ks_assert(transactionid);
ks_assert(errorstr); ks_assert(errorstr);
if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_message_create(&error, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dht_message_error(error, transactionid, transactionid_length, &e)) != KS_STATUS_SUCCESS) goto done;
ben_list_append(e, ben_int(errorcode)); ben_list_append(e, ben_int(errorcode));
ben_list_append(e, ben_blob(errorstr, strlen(errorstr))); ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
@ -1273,8 +1310,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht,
ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode); ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
ks_q_push(dht->send_q, (void *)error); ks_q_push(dht->send_q, (void *)error);
ret = KS_STATUS_SUCCESS;
done: done:
if (ret != KS_STATUS_SUCCESS && error) ks_dht_message_destroy(&error); if (ret != KS_STATUS_SUCCESS && error) ks_dht_message_destroy(&error);
return ret; return ret;
@ -1292,7 +1327,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
ks_dht_transaction_t *transaction; ks_dht_transaction_t *transaction;
uint32_t *tid; uint32_t *tid;
uint32_t transactionid; uint32_t transactionid;
ks_status_t ret = KS_STATUS_FAIL; ks_dht_message_callback_t callback;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
@ -1308,7 +1344,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
es_len = ben_str_len(es); es_len = ben_str_len(es);
if (es_len >= KS_DHT_MESSAGE_ERROR_MAX_SIZE) { if (es_len >= KS_DHT_MESSAGE_ERROR_MAX_SIZE) {
ks_log(KS_LOG_DEBUG, "Message error value has an unexpectedly large size of %d\n", es_len); ks_log(KS_LOG_DEBUG, "Message error value has an unexpectedly large size of %d\n", es_len);
return KS_STATUS_FAIL; ret = KS_STATUS_FAIL;
goto done;
} }
errorcode = ben_int_val(ec); errorcode = ben_int_val(ec);
et = ben_str_val(es); et = ben_str_val(es);
@ -1327,27 +1364,30 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me
if (!transaction) { if (!transaction) {
ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid); ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid);
} else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) { ret = KS_STATUS_FAIL;
goto done;
}
if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
ks_log(KS_LOG_DEBUG, ks_log(KS_LOG_DEBUG,
"Message error rejected due to spoofing from %s %d, expected %s %d\n", "Message error rejected due to spoofing from %s %d, expected %s %d\n",
message->raddr.host, message->raddr.host,
message->raddr.port, message->raddr.port,
transaction->raddr.host, transaction->raddr.host,
transaction->raddr.port); transaction->raddr.port);
} else { ret = KS_STATUS_FAIL;
ks_dht_message_callback_t callback; goto done;
}
transaction->finished = KS_TRUE; transaction->finished = KS_TRUE;
callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED); callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED);
ks_hash_read_unlock(dht->registry_error); ks_hash_read_unlock(dht->registry_error);
if (callback) ret = callback(dht, message); if (callback) ret = callback(dht, message);
else { else ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
ret = KS_STATUS_SUCCESS;
}
}
done:
return ret; return ret;
} }
@ -1356,25 +1396,27 @@ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, k
{ {
ks_dht_message_t *message = NULL; ks_dht_message_t *message = NULL;
struct bencode *a = NULL; struct bencode *a = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(raddr); ks_assert(raddr);
if (ks_dht_setup_query(dht, if ((ret = ks_dht_setup_query(dht,
ep, ep,
raddr, raddr,
"ping", "ping",
ks_dht_process_response_ping, ks_dht_process_response_ping,
NULL, NULL,
&message, &message,
&a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; &a)) != KS_STATUS_SUCCESS) goto done;
ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query ping\n"); ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
ks_q_push(dht->send_q, (void *)message); ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS; done:
return ret;
} }
KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message) KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message)
@ -1385,37 +1427,37 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_
ks_dhtrt_routetable_t *routetable = NULL; ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL; ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
ks_assert(message->args); ks_assert(message->args);
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
routetable = message->endpoint->node->table; routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Message query ping is valid\n"); ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
if (ks_dht_setup_response(dht, if ((ret = ks_dht_setup_response(dht,
message->endpoint, message->endpoint,
&message->raddr, &message->raddr,
message->transactionid, message->transactionid,
message->transactionid_length, message->transactionid_length,
&response, &response,
&r) != KS_STATUS_SUCCESS) { &r)) != KS_STATUS_SUCCESS) goto done;
return KS_STATUS_FAIL;
}
ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message response ping\n"); ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
ks_q_push(dht->send_q, (void *)response); ks_q_push(dht->send_q, (void *)response);
return KS_STATUS_SUCCESS; done:
return ret;
} }
KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message) KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message)
@ -1424,24 +1466,26 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa
ks_dhtrt_routetable_t *routetable = NULL; ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL; ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
routetable = message->endpoint->node->table; routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Message response ping is reached\n"); ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
return KS_STATUS_SUCCESS; done:
return ret;
} }
@ -1450,19 +1494,20 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e
ks_dht_transaction_t *transaction = NULL; ks_dht_transaction_t *transaction = NULL;
ks_dht_message_t *message = NULL; ks_dht_message_t *message = NULL;
struct bencode *a = NULL; struct bencode *a = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(raddr); ks_assert(raddr);
ks_assert(targetid); ks_assert(targetid);
if (ks_dht_setup_query(dht, if ((ret = ks_dht_setup_query(dht,
ep, ep,
raddr, raddr,
"find_node", "find_node",
ks_dht_process_response_findnode, ks_dht_process_response_findnode,
&transaction, &transaction,
&message, &message,
&a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; &a)) != KS_STATUS_SUCCESS) goto done;
memcpy(transaction->target.id, targetid->id, KS_DHT_NODEID_SIZE); memcpy(transaction->target.id, targetid->id, KS_DHT_NODEID_SIZE);
@ -1473,7 +1518,8 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e
ks_log(KS_LOG_DEBUG, "Sending message query find_node\n"); ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
ks_q_push(dht->send_q, (void *)message); ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS; done:
return ret;
} }
KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message) KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message)
@ -1493,14 +1539,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
ks_dht_node_t *node = NULL; ks_dht_node_t *node = NULL;
ks_dhtrt_querynodes_t query; ks_dhtrt_querynodes_t query;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
ks_assert(message->args); ks_assert(message->args);
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
want = ben_dict_get_by_str(message->args, "want"); want = ben_dict_get_by_str(message->args, "want");
if (want) { if (want) {
@ -1521,8 +1568,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
routetable = message->endpoint->node->table; routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n"); ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
@ -1537,11 +1584,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
for (int32_t i = 0; i < query.count; ++i) { for (int32_t i = 0; i < query.count; ++i) {
ks_dht_node_t *qn = query.nodes[i]; ks_dht_node_t *qn = query.nodes[i];
if (ks_dht_utility_compact_nodeinfo(&qn->nodeid, if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid,
&qn->addr, &qn->addr,
buffer4, buffer4,
&buffer4_length, &buffer4_length,
sizeof(buffer4)) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
} }
@ -1553,23 +1600,23 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
for (int32_t i = 0; i < query.count; ++i) { for (int32_t i = 0; i < query.count; ++i) {
ks_dht_node_t *qn = query.nodes[i]; ks_dht_node_t *qn = query.nodes[i];
if (ks_dht_utility_compact_nodeinfo(&qn->nodeid, if ((ret = ks_dht_utility_compact_nodeinfo(&qn->nodeid,
&qn->addr, &qn->addr,
buffer6, buffer6,
&buffer6_length, &buffer6_length,
sizeof(buffer6)) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port);
} }
} }
if (ks_dht_setup_response(dht, if ((ret = ks_dht_setup_response(dht,
message->endpoint, message->endpoint,
&message->raddr, &message->raddr,
message->transactionid, message->transactionid,
message->transactionid_length, message->transactionid_length,
&response, &response,
&r) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; &r)) != KS_STATUS_SUCCESS) goto done;
ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length)); if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length));
@ -1578,7 +1625,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess
ks_log(KS_LOG_DEBUG, "Sending message response find_node\n"); ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
ks_q_push(dht->send_q, (void *)response); ks_q_push(dht->send_q, (void *)response);
return KS_STATUS_SUCCESS; done:
return ret;
} }
KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message) KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message)
@ -1595,12 +1643,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
ks_dht_node_t *node = NULL; ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_dht_search_t *search = NULL; ks_dht_search_t *search = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
ks_assert(message->transaction); ks_assert(message->transaction);
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
n = ben_dict_get_by_str(message->args, "nodes"); n = ben_dict_get_by_str(message->args, "nodes");
if (n) { if (n) {
@ -1616,13 +1665,14 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
routetable = message->endpoint->node->table; routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
search = ks_hash_search(dht->search_hash, message->transaction->target.id, KS_READLOCKED); ks_hash_read_lock(dht->search_hash);
search = ks_hash_search(dht->search_hash, message->transaction->target.id, KS_UNLOCKED);
ks_hash_read_unlock(dht->search_hash); ks_hash_read_unlock(dht->search_hash);
if (search) { if (search) {
ks_dht_search_pending_t *pending = ks_hash_search(search->pending, id->id, KS_READLOCKED); ks_dht_search_pending_t *pending = ks_hash_search(search->pending, id->id, KS_READLOCKED);
@ -1635,7 +1685,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
ks_sockaddr_t addr; ks_sockaddr_t addr;
addr.family = AF_INET; addr.family = AF_INET;
if (ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, ks_log(KS_LOG_DEBUG,
"Expanded ipv4 nodeinfo for %s (%s %d)\n", "Expanded ipv4 nodeinfo for %s (%s %d)\n",
@ -1681,12 +1731,12 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
search->results[results_index] = nid; search->results[results_index] = nid;
search->distances[results_index] = distance; search->distances[results_index] = distance;
if (ks_dht_search_pending_create(&pending, search->pool, &nid) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_search_pending_create(&pending, search->pool, &nid)) != KS_STATUS_SUCCESS) goto done;
if (!ks_hash_insert(search->pending, nid.id, pending)) { if ((ret = ks_hash_insert(search->pending, nid.id, pending)) != KS_STATUS_SUCCESS) {
ks_dht_search_pending_destroy(&pending); ks_dht_search_pending_destroy(&pending);
return KS_STATUS_FAIL; goto done;
} }
if (ks_dht_send_findnode(dht, NULL, &addr, &search->target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_send_findnode(dht, NULL, &addr, &search->target)) != KS_STATUS_SUCCESS) goto done;
} }
} }
} }
@ -1696,7 +1746,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
ks_sockaddr_t addr; ks_sockaddr_t addr;
addr.family = AF_INET6; addr.family = AF_INET6;
if (ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, ks_log(KS_LOG_DEBUG,
"Expanded ipv6 nodeinfo for %s (%s %d)\n", "Expanded ipv6 nodeinfo for %s (%s %d)\n",
@ -1712,7 +1762,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m
ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n"); ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
return KS_STATUS_SUCCESS; done:
return ret;
} }
@ -1758,14 +1809,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
ks_dhtrt_routetable_t *routetable = NULL; ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL; ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
ks_assert(message->args); ks_assert(message->args);
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done;
seq = ben_dict_get_by_str(message->args, "seq"); seq = ben_dict_get_by_str(message->args, "seq");
if (seq) sequence = ben_int_val(seq); if (seq) sequence = ben_int_val(seq);
@ -1773,8 +1825,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
routetable = message->endpoint->node->table; routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Message query get is valid\n"); ks_log(KS_LOG_DEBUG, "Message query get is valid\n");
@ -1790,15 +1842,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
// @todo compact ipv4 and ipv6 nodes into separate buffers // @todo compact ipv4 and ipv6 nodes into separate buffers
if (ks_dht_setup_response(dht, if ((ret = ks_dht_setup_response(dht,
message->endpoint, message->endpoint,
&message->raddr, &message->raddr,
message->transactionid, message->transactionid,
message->transactionid_length, message->transactionid_length,
&response, &response,
&r) != KS_STATUS_SUCCESS) { &r)) != KS_STATUS_SUCCESS) goto done;
return KS_STATUS_FAIL;
}
ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ben_dict_set(r, ben_blob("token", 5), ben_blob(token.token, KS_DHT_TOKEN_SIZE)); ben_dict_set(r, ben_blob("token", 5), ben_blob(token.token, KS_DHT_TOKEN_SIZE));
@ -1817,7 +1867,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
ks_log(KS_LOG_DEBUG, "Sending message response get\n"); ks_log(KS_LOG_DEBUG, "Sending message response get\n");
ks_q_push(dht->send_q, (void *)response); ks_q_push(dht->send_q, (void *)response);
return KS_STATUS_SUCCESS; done:
return ret;
} }
KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message) KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message)
@ -1827,14 +1878,15 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag
ks_dhtrt_routetable_t *routetable = NULL; ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL; ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
// @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
// @todo add extract function for mutable ks_dht_storageitem_key_t // @todo add extract function for mutable ks_dht_storageitem_key_t
// @todo add extract function for mutable ks_dht_storageitem_signature_t // @todo add extract function for mutable ks_dht_storageitem_signature_t
@ -1842,16 +1894,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag
routetable = message->endpoint->node->table; routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
// @todo add/touch bucket entries for other nodes/nodes6 returned // @todo add/touch bucket entries for other nodes/nodes6 returned
ks_log(KS_LOG_DEBUG, "Message response get is reached\n"); ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
return KS_STATUS_SUCCESS; done:
return ret;
} }
@ -1865,37 +1918,37 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t
ks_dhtrt_routetable_t *routetable = NULL; ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL; ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
ks_assert(message->args); ks_assert(message->args);
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
routetable = message->endpoint->node->table; routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Message query put is valid\n"); ks_log(KS_LOG_DEBUG, "Message query put is valid\n");
if (ks_dht_setup_response(dht, if ((ret = ks_dht_setup_response(dht,
message->endpoint, message->endpoint,
&message->raddr, &message->raddr,
message->transactionid, message->transactionid,
message->transactionid_length, message->transactionid_length,
&response, &response,
&r) != KS_STATUS_SUCCESS) { &r)) != KS_STATUS_SUCCESS) goto done;
return KS_STATUS_FAIL;
}
//ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); //ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message response put\n"); ks_log(KS_LOG_DEBUG, "Sending message response put\n");
ks_q_push(dht->send_q, (void *)response); ks_q_push(dht->send_q, (void *)response);
return KS_STATUS_SUCCESS; done:
return ret;
} }
KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message) KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message)
@ -1904,24 +1957,26 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_messag
ks_dhtrt_routetable_t *routetable = NULL; ks_dhtrt_routetable_t *routetable = NULL;
ks_dht_node_t *node = NULL; ks_dht_node_t *node = NULL;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht); ks_assert(dht);
ks_assert(message); ks_assert(message);
if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done;
routetable = message->endpoint->node->table; routetable = message->endpoint->node->table;
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_create_node(routetable, *id, KS_DHT_REMOTE, message->raddr.host, message->raddr.port, &node)) != KS_STATUS_SUCCESS) goto done;
if (ks_dhtrt_release_node(node) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf));
if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; if ((ret = ks_dhtrt_touch_node(routetable, *id)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG, "Message response put is reached\n"); ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
return KS_STATUS_SUCCESS; done:
return ret;
} }
/* For Emacs: /* For Emacs:

View File

@ -27,7 +27,7 @@ KS_BEGIN_EXTERN_C
#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20 #define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
#define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256 #define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256
#define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30 #define KS_DHT_TRANSACTION_EXPIRATION 30
#define KS_DHT_SEARCH_EXPIRATION 10 #define KS_DHT_SEARCH_EXPIRATION 10
#define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE #define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE

View File

@ -19,12 +19,9 @@ KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram,
ks_assert(raddr->family == AF_INET || raddr->family == AF_INET6); ks_assert(raddr->family == AF_INET || raddr->family == AF_INET6);
*datagram = dg = ks_pool_alloc(pool, sizeof(ks_dht_datagram_t)); *datagram = dg = ks_pool_alloc(pool, sizeof(ks_dht_datagram_t));
if (!dg) { ks_assert(dg);
ret = KS_STATUS_NO_MEM;
goto done;
}
dg->pool = pool;
dg->pool = pool;
dg->dht = dht; dg->dht = dht;
dg->endpoint = endpoint; dg->endpoint = endpoint;
dg->raddr = *raddr; dg->raddr = *raddr;
@ -32,7 +29,7 @@ KS_DECLARE(ks_status_t) ks_dht_datagram_create(ks_dht_datagram_t **datagram,
memcpy(dg->buffer, dht->recv_buffer, dht->recv_buffer_length); memcpy(dg->buffer, dht->recv_buffer, dht->recv_buffer_length);
dg->buffer_length = dht->recv_buffer_length; dg->buffer_length = dht->recv_buffer_length;
done: // done:
if (ret != KS_STATUS_SUCCESS) { if (ret != KS_STATUS_SUCCESS) {
if (dg) ks_dht_datagram_destroy(&dg); if (dg) ks_dht_datagram_destroy(&dg);
*datagram = NULL; *datagram = NULL;

View File

@ -20,17 +20,15 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint,
ks_assert(addr->family == AF_INET || addr->family == AF_INET6); ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
*endpoint = ep = ks_pool_alloc(pool, sizeof(ks_dht_endpoint_t)); *endpoint = ep = ks_pool_alloc(pool, sizeof(ks_dht_endpoint_t));
if (!ep) { ks_assert(ep);
ret = KS_STATUS_NO_MEM;
goto done;
}
ep->pool = pool; ep->pool = pool;
if (!nodeid) randombytes_buf(ep->nodeid.id, KS_DHT_NODEID_SIZE); if (!nodeid) randombytes_buf(ep->nodeid.id, KS_DHT_NODEID_SIZE);
else memcpy(ep->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE); else memcpy(ep->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE);
ep->addr = *addr; ep->addr = *addr;
ep->sock = sock; ep->sock = sock;
done: // done:
if (ret != KS_STATUS_SUCCESS) { if (ret != KS_STATUS_SUCCESS) {
if (ep) ks_dht_endpoint_destroy(&ep); if (ep) ks_dht_endpoint_destroy(&ep);
*endpoint = NULL; *endpoint = NULL;
@ -50,9 +48,6 @@ KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint)
ep = *endpoint; ep = *endpoint;
if (ep->node) {
// @todo release the node?
}
if (ep->sock != KS_SOCK_INVALID) ks_socket_close(&ep->sock); if (ep->sock != KS_SOCK_INVALID) ks_socket_close(&ep->sock);
ks_pool_free(ep->pool, ep); ks_pool_free(ep->pool, ep);

View File

@ -1,9 +1,6 @@
#include "ks_dht.h" #include "ks_dht.h"
#include "ks_dht-int.h" #include "ks_dht-int.h"
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message, KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
ks_pool_t *pool, ks_pool_t *pool,
ks_dht_endpoint_t *endpoint, ks_dht_endpoint_t *endpoint,
@ -17,17 +14,17 @@ KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
ks_assert(pool); ks_assert(pool);
*message = m = ks_pool_alloc(pool, sizeof(ks_dht_message_t)); *message = m = ks_pool_alloc(pool, sizeof(ks_dht_message_t));
if (!m) { ks_assert(m);
ret = KS_STATUS_NO_MEM;
goto done;
}
m->pool = pool;
m->pool = pool;
m->endpoint = endpoint; m->endpoint = endpoint;
m->raddr = *raddr; m->raddr = *raddr;
if (alloc_data) m->data = ben_dict(); if (alloc_data) {
m->data = ben_dict();
ks_assert(m->data);
}
done: // done:
if (ret != KS_STATUS_SUCCESS) { if (ret != KS_STATUS_SUCCESS) {
if (m) ks_dht_message_destroy(&m); if (m) ks_dht_message_destroy(&m);
*message = NULL; *message = NULL;
@ -35,9 +32,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message,
return ret; return ret;
} }
/**
*
*/
KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message) KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message)
{ {
ks_dht_message_t *m; ks_dht_message_t *m;
@ -57,9 +51,6 @@ KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message)
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length) KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length)
{ {
struct bencode *t; struct bencode *t;
@ -121,9 +112,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message, KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
uint32_t transactionid, uint32_t transactionid,
const char *query, const char *query,
@ -143,6 +131,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
// @note r joins message->data and will be freed with it // @note r joins message->data and will be freed with it
a = ben_dict(); a = ben_dict();
ks_assert(a);
ben_dict_set(message->data, ben_blob("a", 1), a); ben_dict_set(message->data, ben_blob("a", 1), a);
if (args) *args = a; if (args) *args = a;
@ -150,9 +139,6 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message,
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
uint8_t *transactionid, uint8_t *transactionid,
ks_size_t transactionid_length, ks_size_t transactionid_length,
@ -168,6 +154,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
// @note r joins message->data and will be freed with it // @note r joins message->data and will be freed with it
r = ben_dict(); r = ben_dict();
ks_assert(r);
ben_dict_set(message->data, ben_blob("r", 1), r); ben_dict_set(message->data, ben_blob("r", 1), r);
if (args) *args = r; if (args) *args = r;
@ -193,6 +180,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
// @note r joins message->data and will be freed with it // @note r joins message->data and will be freed with it
e = ben_list(); e = ben_list();
ks_assert(e);
ben_dict_set(message->data, ben_blob("e", 1), e); ben_dict_set(message->data, ben_blob("e", 1), e);
if (args) *args = e; if (args) *args = e;

View File

@ -2,9 +2,6 @@
#include "ks_dht-int.h" #include "ks_dht-int.h"
#include "sodium.h" #include "sodium.h"
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target) KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t *pool, const ks_dht_nodeid_t *target)
{ {
ks_dht_search_t *s; ks_dht_search_t *s;
@ -15,32 +12,27 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t
ks_assert(target); ks_assert(target);
*search = s = ks_pool_alloc(pool, sizeof(ks_dht_search_t)); *search = s = ks_pool_alloc(pool, sizeof(ks_dht_search_t));
if (!s) { ks_assert(s);
ret = KS_STATUS_NO_MEM;
goto done;
}
s->pool = pool; s->pool = pool;
if ((ret = ks_mutex_create(&s->mutex, KS_MUTEX_FLAG_DEFAULT, s->pool)) != KS_STATUS_SUCCESS) goto done; ks_mutex_create(&s->mutex, KS_MUTEX_FLAG_DEFAULT, s->pool);
ks_assert(s->mutex);
memcpy(s->target.id, target->id, KS_DHT_NODEID_SIZE); memcpy(s->target.id, target->id, KS_DHT_NODEID_SIZE);
if ((ret = ks_hash_create(&s->pending, ks_hash_create(&s->pending, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK, s->pool);
KS_HASH_MODE_ARBITRARY, ks_assert(s->pending);
KS_HASH_FLAG_RWLOCK,
s->pool)) != KS_STATUS_SUCCESS) goto done;
ks_hash_set_keysize(s->pending, KS_DHT_NODEID_SIZE); ks_hash_set_keysize(s->pending, KS_DHT_NODEID_SIZE);
done: // done:
if (ret != KS_STATUS_SUCCESS) { if (ret != KS_STATUS_SUCCESS) {
if (s) ks_dht_search_destroy(&s); if (s) ks_dht_search_destroy(&s);
*search = NULL; *search = NULL;
} }
return KS_STATUS_SUCCESS; return ret;
} }
/**
*
*/
KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search) KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search)
{ {
ks_dht_search_t *s; ks_dht_search_t *s;
@ -83,7 +75,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_callback_add(ks_dht_search_t *search, ks_d
search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool, search->callbacks = (ks_dht_search_callback_t *)ks_pool_resize(search->pool,
(void *)search->callbacks, (void *)search->callbacks,
sizeof(ks_dht_search_callback_t) * search->callbacks_size); sizeof(ks_dht_search_callback_t) * search->callbacks_size);
if (!search->callbacks) return KS_STATUS_NO_MEM; ks_assert(search->callbacks);
search->callbacks[index] = callback; search->callbacks[index] = callback;
ks_mutex_unlock(search->mutex); ks_mutex_unlock(search->mutex);
} }
@ -99,17 +91,14 @@ KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **p
ks_assert(pool); ks_assert(pool);
*pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t)); *pending = p = ks_pool_alloc(pool, sizeof(ks_dht_search_pending_t));
if (!p) { ks_assert(p);
ret = KS_STATUS_NO_MEM;
goto done;
}
p->pool = pool;
p->pool = pool;
p->nodeid = *nodeid; p->nodeid = *nodeid;
p->expiration = ks_time_now_sec() + KS_DHT_SEARCH_EXPIRATION; p->expiration = ks_time_now() + (KS_DHT_SEARCH_EXPIRATION * 1000);
p->finished = KS_FALSE; p->finished = KS_FALSE;
done: // done:
if (ret != KS_STATUS_SUCCESS) { if (ret != KS_STATUS_SUCCESS) {
if (p) ks_dht_search_pending_destroy(&p); if (p) ks_dht_search_pending_destroy(&p);
*pending = NULL; *pending = NULL;

View File

@ -2,9 +2,6 @@
#include "ks_dht-int.h" #include "ks_dht-int.h"
#include "sodium.h" #include "sodium.h"
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v) KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v)
{ {
ks_dht_storageitem_t *si; ks_dht_storageitem_t *si;
@ -19,27 +16,21 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t
ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE); ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
*item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t)); *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t));
if (!si) { ks_assert(si);
ret = KS_STATUS_NO_MEM;
goto done;
}
si->pool = pool; si->pool = pool;
si->mutable = KS_FALSE; si->mutable = KS_FALSE;
si->v = ben_clone(v); si->v = ben_clone(v);
if (!si->v) { ks_assert(si->v);
ret = KS_STATUS_NO_MEM;
goto done;
}
enc = ben_encode(&enc_len, si->v); enc = ben_encode(&enc_len, si->v);
ks_assert(enc);
SHA1_Init(&sha); SHA1_Init(&sha);
SHA1_Update(&sha, enc, enc_len); SHA1_Update(&sha, enc, enc_len);
SHA1_Final(si->id.id, &sha); SHA1_Final(si->id.id, &sha);
free(enc); free(enc);
done: // done:
if (ret != KS_STATUS_SUCCESS) { if (ret != KS_STATUS_SUCCESS) {
if (si) ks_dht_storageitem_destroy(&si); if (si) ks_dht_storageitem_destroy(&si);
*item = NULL; *item = NULL;
@ -70,15 +61,12 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
ks_assert(signature); ks_assert(signature);
*item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t)); *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t));
if (!si) { ks_assert(si);
ret = KS_STATUS_NO_MEM;
goto done;
}
si->pool = pool; si->pool = pool;
si->v = ben_clone(v);
si->mutable = KS_TRUE; si->mutable = KS_TRUE;
si->v = ben_clone(v);
ks_assert(si->v);
memcpy(si->pk.key, k->key, KS_DHT_STORAGEITEM_KEY_SIZE); memcpy(si->pk.key, k->key, KS_DHT_STORAGEITEM_KEY_SIZE);
if (salt && salt_length > 0) { if (salt && salt_length > 0) {
@ -93,7 +81,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
if (si->salt && si->salt_length > 0) SHA1_Update(&sha, si->salt, si->salt_length); if (si->salt && si->salt_length > 0) SHA1_Update(&sha, si->salt, si->salt_length);
SHA1_Final(si->id.id, &sha); SHA1_Final(si->id.id, &sha);
done: // done:
if (ret != KS_STATUS_SUCCESS) { if (ret != KS_STATUS_SUCCESS) {
if (si) ks_dht_storageitem_destroy(&si); if (si) ks_dht_storageitem_destroy(&si);
*item = NULL; *item = NULL;

View File

@ -15,18 +15,15 @@ KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transac
ks_assert(raddr); ks_assert(raddr);
*transaction = t = ks_pool_alloc(pool, sizeof(ks_dht_transaction_t)); *transaction = t = ks_pool_alloc(pool, sizeof(ks_dht_transaction_t));
if (!t) { ks_assert(t);
ret = KS_STATUS_NO_MEM;
goto done;
}
t->pool = pool;
t->pool = pool;
t->raddr = *raddr; t->raddr = *raddr;
t->transactionid = transactionid; t->transactionid = transactionid;
t->callback = callback; t->callback = callback;
t->expiration = ks_time_now_sec() + KS_DHT_TRANSACTION_EXPIRATION_DELAY; t->expiration = ks_time_now() + (KS_DHT_TRANSACTION_EXPIRATION * 1000);
done: // done:
if (ret != KS_STATUS_SUCCESS) { if (ret != KS_STATUS_SUCCESS) {
if (t) ks_dht_transaction_destroy(&t); if (t) ks_dht_transaction_destroy(&t);
*transaction = NULL; *transaction = NULL;