diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index fc71cddacf..1c33b91478 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -22,11 +22,18 @@ KS_BEGIN_EXTERN_C KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint); /** - * Called internally to expire various data. - * Handles purging of expired and finished transactions, rotating token secrets, etc. + * Called internally to expire search data. + * Handles completing and purging of finished searches. * @param dht pointer to the dht instance */ -KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht); +KS_DECLARE(void) ks_dht_pulse_searches(ks_dht_t *dht); + +/** + * Called internally to process job state machine. + * Handles completing and purging of finished jobs. + * @param dht pointer to the dht instance + */ +KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht); /** * Called internally to send queued messages. @@ -35,6 +42,20 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht); */ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht); +/** + * Called internally to expire transactions. + * Handles purging of expired and finished transactions. + * @param dht pointer to the dht instance + */ +KS_DECLARE(void) ks_dht_pulse_transactions(ks_dht_t *dht); + +/** + * Called internally to expire and cycle tokens. + * Handles cycling new secret entropy for token generation. + * @param dht pointer to the dht instance + */ +KS_DECLARE(void) ks_dht_pulse_tokens(ks_dht_t *dht); + /** * Converts a ks_dht_nodeid_t into it's hex string representation. * @param id pointer to the nodeid diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index 056584943e..8392ba4017 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -2,6 +2,12 @@ #include "ks_dht-int.h" #include "sodium.h" +void ks_dht_endpoint_destructor(void *ptr) { ks_dht_endpoint_destroy((ks_dht_endpoint_t **)&ptr); } + +void ks_dht_transaction_destructor(void *ptr) { ks_dht_transaction_destroy((ks_dht_transaction_t **)&ptr); } + +void ks_dht_storageitem_destructor(void *ptr) { ks_dht_storageitem_destroy((ks_dht_storageitem_t **)&ptr); } + KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread_pool_t *tpool) { ks_bool_t pool_alloc = !pool; @@ -52,7 +58,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Create the message type registry. */ - ks_hash_create(&d->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_hash_create(&d->registry_type, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); ks_assert(d->registry_type); /** @@ -65,7 +71,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Create the message query registry. */ - ks_hash_create(&d->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_hash_create(&d->registry_query, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); ks_assert(d->registry_query); /** @@ -79,7 +85,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Create the message error registry. */ - ks_hash_create(&d->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_hash_create(&d->registry_error, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); ks_assert(d->registry_error); // @todo register 301 error for internal get/put CAS hash mismatch retry handler @@ -88,6 +94,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread * The endpoints and endpoints_poll arrays are maintained in parallel to optimize polling. */ d->endpoints = NULL; + d->endpoints_length = 0; d->endpoints_size = 0; d->endpoints_poll = NULL; @@ -96,13 +103,20 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread * This also provides the basis for autorouting to find unbound interfaces and bind them at runtime. * This hash uses the host ip string concatenated with a colon and the port, ie: "123.123.123.123:123" or ipv6 equivilent */ - ks_hash_create(&d->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, d->pool); + ks_hash_create_ex(&d->endpoints_hash, + 2, + NULL, + NULL, + KS_HASH_MODE_CASE_INSENSITIVE, + KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, + ks_dht_endpoint_destructor, + d->pool); ks_assert(d->endpoints_hash); /** - * Default expirations to not be checked for one pulse. + * Default transactions expirations to not be checked for one pulse. */ - d->pulse_expirations = ks_time_now() + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC); + d->transactions_pulse = ks_time_now() + ((ks_time_t)KS_DHT_TRANSACTIONS_PULSE * KS_USEC_PER_SEC); /** * Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full. @@ -132,8 +146,8 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Initialize the transaction id mutex, should use atomic increment instead */ - ks_mutex_create(&d->tid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool); - ks_assert(d->tid_mutex); + ks_mutex_create(&d->transactionid_mutex, KS_MUTEX_FLAG_DEFAULT, d->pool); + ks_assert(d->transactionid_mutex); /** * Initialize the first transaction id randomly, this doesn't really matter. @@ -144,7 +158,14 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread * Create the hash to track pending transactions on queries that are pending responses. * It should be impossible to receive a duplicate transaction id in the hash before it expires, but if it does an error is preferred. */ - ks_hash_create(&d->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, d->pool); + ks_hash_create_ex(&d->transactions_hash, + 16, + NULL, + NULL, + KS_HASH_MODE_INT, + KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, + ks_dht_transaction_destructor, + d->pool); ks_assert(d->transactions_hash); /** @@ -163,12 +184,19 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets. */ d->token_secret_current = d->token_secret_previous = rand(); - d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC); + d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKEN_EXPIRATION * KS_USEC_PER_SEC); /** * Create the hash to store arbitrary data for BEP44. */ - ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_hash_create_ex(&d->storageitems_hash, + 16, + NULL, + NULL, + KS_HASH_MODE_ARBITRARY, + KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, + ks_dht_storageitem_destructor, + d->pool); ks_assert(d->storageitems_hash); /** @@ -191,7 +219,6 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) ks_dht_t *d = NULL; ks_pool_t *pool = NULL; ks_bool_t pool_alloc = KS_FALSE; - ks_hash_iterator_t *it = NULL; ks_assert(dht); ks_assert(*dht); @@ -201,15 +228,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) /** * Cleanup the storageitems hash and it's contents if it is allocated. */ - if (d->storageitems_hash) { - for (it = ks_hash_first(d->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - const void *key = NULL; - ks_dht_storageitem_t *val = NULL; - ks_hash_this(it, &key, NULL, (void **)&val); - ks_dht_storageitem_destroy(&val); - } - ks_hash_destroy(&d->storageitems_hash); - } + if (d->storageitems_hash) ks_hash_destroy(&d->storageitems_hash); /** * Zero out the opaque write token variables. @@ -229,7 +248,6 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) /** * Cleanup the route tables if they are allocated. - * @todo check if endpoints need to be destroyed first to release the readlock on their node */ if (d->rt_ipv4) ks_dhtrt_deinitroute(&d->rt_ipv4); if (d->rt_ipv6) ks_dhtrt_deinitroute(&d->rt_ipv6); @@ -238,7 +256,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) * Cleanup the transactions mutex and hash if they are allocated. */ d->transactionid_next = 0; - if (d->tid_mutex) ks_mutex_destroy(&d->tid_mutex); + if (d->transactionid_mutex) ks_mutex_destroy(&d->transactionid_mutex); if (d->transactions_hash) ks_hash_destroy(&d->transactions_hash); /** @@ -272,15 +290,9 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) /** * Probably don't need this */ - d->pulse_expirations = 0; + d->transactions_pulse = 0; - /** - * Cleanup any endpoints that have been allocated. - */ - for (int32_t i = 0; i < d->endpoints_size; ++i) { - ks_dht_endpoint_t *ep = d->endpoints[i]; - ks_dht_endpoint_destroy(&ep); - } + d->endpoints_length = 0; d->endpoints_size = 0; /** @@ -294,7 +306,7 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) if (d->endpoints_poll) ks_pool_free(d->pool, &d->endpoints_poll); /** - * Cleanup the endpoints hash if it is allocated. + * Cleanup the endpoints hash if it is allocated, and any endpoints that have been allocated. */ if (d->endpoints_hash) ks_hash_destroy(&d->endpoints_hash); @@ -383,8 +395,9 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_ /** * Check if the endpoint has already been bound for the address we want to route through. */ - ep = ks_hash_search(dht->endpoints_hash, ip, KS_READLOCKED); - if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret; + ks_hash_read_lock(dht->endpoints_hash); + ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED); + ks_hash_read_unlock(dht->endpoints_hash); /** * If the endpoint has not been bound, and autorouting is enabled then try to bind the new address. @@ -411,38 +424,44 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, const ks_sockaddr_ return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) +KS_DECLARE(void) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) { ks_assert(dht); ks_assert(value); ks_assert(callback); - return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback); + ks_hash_write_lock(dht->registry_type); + ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback); + ks_hash_write_unlock(dht->registry_type); } -KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) +KS_DECLARE(void) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) { ks_assert(dht); ks_assert(value); ks_assert(callback); - return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback); + ks_hash_write_lock(dht->registry_query); + ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback); + ks_hash_write_unlock(dht->registry_query); } -KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) +KS_DECLARE(void) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) { ks_assert(dht); ks_assert(value); ks_assert(callback); - return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback); + ks_hash_write_lock(dht->registry_error); + ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback); + ks_hash_write_unlock(dht->registry_error); } KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint) { - ks_dht_endpoint_t *ep = NULL; ks_socket_t sock = KS_SOCK_INVALID; + ks_dht_endpoint_t *ep = NULL; int32_t epindex = 0; ks_status_t ret = KS_STATUS_SUCCESS; @@ -456,17 +475,21 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid */ if (endpoint) *endpoint = NULL; - ep = ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_READLOCKED); - ks_hash_read_unlock(dht->endpoints_hash); - if (ep) { + ks_hash_write_lock(dht->endpoints_hash); + + if (ks_hash_search(dht->endpoints_hash, (void *)addr->host, KS_UNLOCKED)) { ks_log(KS_LOG_DEBUG, "Attempted to bind to %s more than once.\n", addr->host); - return KS_STATUS_FAIL; + ret = KS_STATUS_FAIL; + goto done; } /** * Attempt to open a UDP datagram socket for the given address family. */ - if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) return KS_STATUS_FAIL; + if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) { + ret = KS_STATUS_FAIL; + goto done; + } /** * Set some common socket options for non-blocking IO and forced binding when already in use @@ -485,32 +508,37 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ks_dht_endpoint_create(&ep, dht->pool, nodeid, addr, sock); ks_assert(ep); + /** + * Add the new endpoint into the endpoints hash for quick lookups. + */ + ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep); + /** * Resize the endpoints array to take another endpoint pointer. */ - epindex = dht->endpoints_size++; - dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool, - (void *)dht->endpoints, - sizeof(ks_dht_endpoint_t *) * dht->endpoints_size); - ks_assert(dht->endpoints); + epindex = dht->endpoints_length++; + if (dht->endpoints_length > dht->endpoints_size) { + dht->endpoints_size = dht->endpoints_length; + dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool, + (void *)dht->endpoints, + sizeof(ks_dht_endpoint_t *) * dht->endpoints_size); + ks_assert(dht->endpoints); + /** + * Resize the endpoints_poll array to keep in parallel with endpoints array. + */ + dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool, + (void *)dht->endpoints_poll, + sizeof(struct pollfd) * dht->endpoints_size); + ks_assert(dht->endpoints_poll); + } + /** + * Populate the new endpoint data + */ dht->endpoints[epindex] = ep; - - /** - * Add the new endpoint into the endpoints hash for quick lookups. - * @todo insert returns 0 when OOM, ks_pool_alloc will abort so insert can only succeed - */ - if ((ret = ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) != KS_STATUS_SUCCESS) goto done; - - /** - * Resize the endpoints_poll array to keep in parallel with endpoints array, populate new entry with the right data. - */ - dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool, - (void *)dht->endpoints_poll, - sizeof(struct pollfd) * dht->endpoints_size); - ks_assert(dht->endpoints_poll); dht->endpoints_poll[epindex].fd = ep->sock; dht->endpoints_poll[epindex].events = POLLIN | POLLERR; + /** * If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint. */ @@ -551,12 +579,13 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid */ if (ep) { ks_hash_remove(dht->endpoints_hash, ep->addr.host); - ks_dht_endpoint_destroy(&ep); + dht->endpoints_length--; } else if (sock != KS_SOCK_INVALID) ks_socket_close(&sock); if (endpoint) *endpoint = NULL; } + ks_hash_write_unlock(dht->endpoints_hash); return ret; } @@ -566,13 +595,13 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) ks_sockaddr_t raddr; ks_assert(dht); - ks_assert(timeout > 0); + ks_assert(timeout >= 0 && timeout <= 1000); + // this should be called with a timeout of less than 1000ms, preferrably around 100ms if (dht->send_q_unsent || ks_q_size(dht->send_q) > 0) timeout = 0; - // @todo confirm how poll/wsapoll react to zero size and NULL array - if (ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout) > 0) { - for (int32_t i = 0; i < dht->endpoints_size; ++i) { + if (ks_poll(dht->endpoints_poll, dht->endpoints_length, timeout) > 0) { + for (int32_t i = 0; i < dht->endpoints_length; ++i) { if (!(dht->endpoints_poll[i].revents & POLLIN)) continue; raddr = (const ks_sockaddr_t){ 0 }; @@ -592,49 +621,30 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) } } + if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4); + if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6); + + ks_dht_pulse_searches(dht); + + // @todo pulse_storageitems for keepalive and expiration + // hold keepalive counter on items to determine what to reannounce vs expire + ks_dht_pulse_jobs(dht); ks_dht_pulse_send(dht); - ks_dht_pulse_expirations(dht); + ks_dht_pulse_transactions(dht); - if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4); - if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6); + ks_dht_pulse_tokens(dht); } -KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) +KS_DECLARE(void) ks_dht_pulse_searches(ks_dht_t *dht) { - ks_hash_iterator_t *it = NULL; ks_dht_search_t *searches_first = NULL; ks_dht_search_t *searches_last = NULL; - ks_time_t now = ks_time_now(); - + ks_assert(dht); - if (dht->pulse_expirations > now) return; - dht->pulse_expirations = now + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC); - - ks_hash_write_lock(dht->transactions_hash); - for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - const void *key = NULL; - ks_dht_transaction_t *value = NULL; - ks_bool_t remove = KS_FALSE; - - ks_hash_this(it, &key, NULL, (void **)&value); - if (value->finished) remove = KS_TRUE; - else if (value->expiration <= now) { - // if the transaction expires, so does the attached job, but the job may try again with a new transaction - value->job->state = KS_DHT_JOB_STATE_EXPIRING; - ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid); - remove = KS_TRUE; - } - if (remove) { - ks_hash_remove(dht->transactions_hash, (void *)key); - ks_dht_transaction_destroy(&value); - } - } - ks_hash_write_unlock(dht->transactions_hash); - ks_mutex_lock(dht->searches_mutex); for (ks_dht_search_t *search = dht->searches_first, *searchn = NULL, *searchp = NULL; search; search = searchn) { ks_bool_t done = KS_FALSE; @@ -665,14 +675,53 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) if (search->callback) search->callback(dht, search); ks_dht_search_destroy(&search); } +} - if (dht->token_secret_expiration && dht->token_secret_expiration <= now) { - dht->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC); - dht->token_secret_previous = dht->token_secret_current; - dht->token_secret_current = rand(); +KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht) +{ + ks_dht_job_t *first = NULL; + ks_dht_job_t *last = NULL; + + ks_assert(dht); + + ks_mutex_lock(dht->jobs_mutex); + for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) { + ks_bool_t done = KS_FALSE; + jobn = job->next; + + if (job->state == KS_DHT_JOB_STATE_QUERYING) { + job->state = KS_DHT_JOB_STATE_RESPONDING; + if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING; + } + if (job->state == KS_DHT_JOB_STATE_EXPIRING) { + job->attempts--; + if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING; + else done = KS_TRUE; + } + if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE; + + if (done) { + if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL; + else if (!jobp) dht->jobs_first = jobn; + else if (!jobn) { + dht->jobs_last = jobp; + dht->jobs_last->next = NULL; + } + else jobp->next = jobn; + + job->next = NULL; + if (last) last = last->next = job; + else first = last = job; + } else jobp = job; } + ks_mutex_unlock(dht->jobs_mutex); - // @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?) + for (ks_dht_job_t *job = first, *jobn = NULL; job; job = jobn) { + jobn = job->next; + // this cannot occur inside of the main loop, may add new jobs invalidating list pointers + if (job->finish_callback) job->finish_callback(dht, job); + ks_dht_job_destroy(&job); + } } KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht) @@ -698,6 +747,51 @@ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht) } } +KS_DECLARE(void) ks_dht_pulse_transactions(ks_dht_t *dht) +{ + ks_hash_iterator_t *it = NULL; + ks_time_t now = ks_time_now(); + + ks_assert(dht); + + if (dht->transactions_pulse > now) return; + dht->transactions_pulse = now + ((ks_time_t)KS_DHT_TRANSACTIONS_PULSE * KS_USEC_PER_SEC); + + ks_hash_write_lock(dht->transactions_hash); + for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + const void *key = NULL; + ks_dht_transaction_t *value = NULL; + ks_bool_t remove = KS_FALSE; + + ks_hash_this(it, &key, NULL, (void **)&value); + if (value->finished) remove = KS_TRUE; + else if (value->expiration <= now) { + // if the transaction expires, so does the attached job, but the job may try again with a new transaction + value->job->state = KS_DHT_JOB_STATE_EXPIRING; + ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid); + remove = KS_TRUE; + } + if (remove) ks_hash_remove(dht->transactions_hash, (void *)key); + } + ks_hash_write_unlock(dht->transactions_hash); +} + +KS_DECLARE(void) ks_dht_pulse_tokens(ks_dht_t *dht) +{ + ks_time_t now = ks_time_now(); + + ks_assert(dht); + + if (dht->tokens_pulse > now) return; + dht->tokens_pulse = now + ((ks_time_t)KS_DHT_TOKENS_PULSE * KS_USEC_PER_SEC); + + if (dht->token_secret_expiration && dht->token_secret_expiration <= now) { + dht->token_secret_expiration = now + ((ks_time_t)KS_DHT_TOKEN_EXPIRATION * KS_USEC_PER_SEC); + dht->token_secret_previous = dht->token_secret_current; + dht->token_secret_current = rand(); + } +} + KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len) { char *t = buffer; @@ -736,8 +830,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t * ks_assert(buffer_size); ks_assert(address->family == AF_INET || address->family == AF_INET6); - // @todo change parameters to dereferenced pointer and forward buffer pointer directly - addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8); if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) { @@ -775,8 +867,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer, ks_assert(address); ks_assert(address->family == AF_INET ||address->family == AF_INET6); - // @todo change parameters to dereferenced pointer and forward buffer pointer directly - addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8); if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_NO_MEM; @@ -800,8 +890,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *n ks_assert(buffer_size); ks_assert(address->family == AF_INET || address->family == AF_INET6); - // @todo change parameters to dereferenced pointer and forward buffer pointer directly - if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) { ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n"); return KS_STATUS_NO_MEM; @@ -825,8 +913,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer, ks_assert(address); ks_assert(address->family == AF_INET ||address->family == AF_INET6); - // @todo change parameters to dereferenced pointer and forward buffer pointer directly - if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_NO_MEM; memcpy(nodeid->id, buffer + *buffer_length, KS_DHT_NODEID_SIZE); @@ -1269,10 +1355,9 @@ KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht, if ((ret = ks_dht_autoroute_check(dht, &job->raddr, &ep)) != KS_STATUS_SUCCESS) goto done; - // @todo atomic increment - ks_mutex_lock(dht->tid_mutex); + ks_mutex_lock(dht->transactionid_mutex); transactionid = dht->transactionid_next++; - ks_mutex_unlock(dht->tid_mutex); + ks_mutex_unlock(dht->transactionid_mutex); if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; @@ -1295,7 +1380,9 @@ KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht, *message = msg; - if ((ret = ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) != KS_STATUS_SUCCESS) goto done; + ks_hash_write_lock(dht->transactions_hash); + ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans); + ks_hash_write_unlock(dht->transactions_hash); if (transaction) *transaction = trans; @@ -1327,11 +1414,19 @@ KS_DECLARE(ks_status_t) ks_dht_response_setup(ks_dht_t *dht, *message = NULL; - if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret; + if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_message_create(&msg, dht->pool, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + ep, + raddr, + transactionid, + transactionid_length, + 202, + "Internal message create error"); + goto done; + } - //if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done; ben_dict_set(msg->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); ben_dict_set(msg->data, ben_blob("y", 1), ben_blob("r", 1)); @@ -1381,7 +1476,8 @@ KS_DECLARE(void *) ks_dht_process(ks_thread_t *thread, void *data) if (ks_dht_message_parse(message, datagram->buffer, datagram->buffer_length) != KS_STATUS_SUCCESS) goto done; - callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(datagram->dht->registry_type, message->type, KS_READLOCKED); + ks_hash_read_lock(datagram->dht->registry_type); + callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(datagram->dht->registry_type, message->type, KS_UNLOCKED); ks_hash_read_unlock(datagram->dht->registry_type); if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message->type); @@ -1413,13 +1509,28 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me q = ben_dict_get_by_str(message->data, "q"); if (!q) { ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n"); - return KS_STATUS_FAIL; + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 203, + "Message query missing required key 'q'"); + ret = KS_STATUS_FAIL; + goto done; } qv = ben_str_val(q); qv_len = ben_str_len(q); if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) { ks_log(KS_LOG_DEBUG, "Message query 'q' value has an unexpectedly large size of %d\n", qv_len); + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 203, + "Message query 'q' value is too large"); ret = KS_STATUS_FAIL; goto done; } @@ -1431,13 +1542,29 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me a = ben_dict_get_by_str(message->data, "a"); if (!a) { ks_log(KS_LOG_DEBUG, "Message query missing required key 'a'\n"); + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 203, + "Message query missing required key 'a'"); ret = KS_STATUS_FAIL; goto done; } message->args = a; - if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 203, + "Message query args missing required key 'id'"); + goto done; + } message->args_id = *id; ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE)); @@ -1447,13 +1574,41 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me message->raddr.host, message->raddr.port, KS_DHTRT_CREATE_PING, - &node)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; - - callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_READLOCKED); + &node)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 202, + "Internal route table create node error"); + goto done; + } + if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 202, + "Internal route table release node error"); + goto done; + } + + ks_hash_read_lock(dht->registry_query); + callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED); ks_hash_read_unlock(dht->registry_query); - if (!callback) ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query); + if (!callback) { + ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query); + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 204, + "Message query method is not registered"); + } else ret = callback(dht, message); done: @@ -1504,7 +1659,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t transactionid = ntohl(*tid); ks_log(KS_LOG_DEBUG, "Message response transaction id %d\n", transactionid); - transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED); + + ks_hash_read_lock(dht->transactions_hash); + transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_UNLOCKED); ks_hash_read_unlock(dht->transactions_hash); if (!transaction) ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid); @@ -1582,12 +1739,15 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode_callback(ks_dht_t *dht, ks_dht_jo ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE), ks_dht_hex(job->search->target.id, id3_buf, KS_DHT_NODEID_SIZE), results_index); - // @todo add lock on node + + if (job->search->results[results_index]) ks_dhtrt_release_node(job->search->results[results_index]); job->search->results[results_index] = node; job->search->distances[results_index] = distance; ks_hash_insert(job->search->searched, node->nodeid.id, (void *)KS_TRUE); ks_hash_insert(job->search->searching, node->nodeid.id, (void *)KS_TRUE); + + ks_dhtrt_sharelock_node(node); if ((ret = ks_dht_findnode(dht, job->search, &node->addr, ks_dht_search_findnode_callback, &job->search->target)) != KS_STATUS_SUCCESS) goto done; } @@ -1668,10 +1828,15 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht, ks_hash_insert(s->searched, n->nodeid.id, (void *)KS_TRUE); ks_hash_insert(s->searching, n->nodeid.id, (void *)KS_TRUE); + + ks_dhtrt_sharelock_node(n); - if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_findnode(dht, s, &n->addr, ks_dht_search_findnode_callback, target)) != KS_STATUS_SUCCESS) { + ks_dhtrt_release_querynodes(&query); + goto done; + } } - //ks_dhtrt_release_querynodes(&query); + ks_dhtrt_release_querynodes(&query); ks_mutex_unlock(s->mutex); locked_search = KS_FALSE; @@ -1812,7 +1977,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me tid = (uint32_t *)message->transactionid; transactionid = ntohl(*tid); - transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED); + ks_hash_read_lock(dht->transactions_hash); + transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_UNLOCKED); ks_hash_read_unlock(dht->transactions_hash); if (!transaction) { @@ -1834,7 +2000,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me transaction->finished = KS_TRUE; - callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED); + ks_hash_read_lock(dht->registry_error); + callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED); ks_hash_read_unlock(dht->registry_error); if (callback) ret = callback(dht, message); @@ -1854,52 +2021,6 @@ KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job) ks_mutex_unlock(dht->jobs_mutex); } -KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht) -{ - ks_dht_job_t *first = NULL; - ks_dht_job_t *last = NULL; - - ks_assert(dht); - - ks_mutex_lock(dht->jobs_mutex); - for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) { - ks_bool_t done = KS_FALSE; - jobn = job->next; - - if (job->state == KS_DHT_JOB_STATE_QUERYING) { - job->state = KS_DHT_JOB_STATE_RESPONDING; - if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING; - } - if (job->state == KS_DHT_JOB_STATE_EXPIRING) { - job->attempts--; - if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING; - else done = KS_TRUE; - } - if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE; - - if (done) { - if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL; - else if (!jobp) dht->jobs_first = jobn; - else if (!jobn) { - dht->jobs_last = jobp; - dht->jobs_last->next = NULL; - } - else jobp->next = jobn; - - job->next = NULL; - if (last) last = last->next = job; - else first = last = job; - } else jobp = job; - } - ks_mutex_unlock(dht->jobs_mutex); - - for (ks_dht_job_t *job = first, *jobn = NULL; job; job = jobn) { - jobn = job->next; - // this cannot occur inside of the main loop, may add new jobs invalidating list pointers - if (job->finish_callback) job->finish_callback(dht, job); - ks_dht_job_destroy(&job); - } -} KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback) { @@ -2064,11 +2185,19 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess ks_assert(message); ks_assert(message->args); - if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 203, + "Message query findnode args missing required key 'target'"); + goto done; + } want = ben_dict_get_by_str(message->args, "want"); if (want) { - // @todo use ben_list_for_each size_t want_len = ben_list_len(want); for (size_t i = 0; i < want_len; ++i) { struct bencode *iv = ben_list_get(want, i); @@ -2087,7 +2216,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess query.nodeid = *target; query.type = KS_DHT_REMOTE; - query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE + query.max = 8; // @todo should be like KS_DHTRT_BUCKET_SIZE if (want4) { query.family = AF_INET; ks_dhtrt_findclosest_nodes(dht->rt_ipv4, &query); @@ -2099,7 +2228,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess &qn->addr, buffer4, &buffer4_length, - sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done; + sizeof(buffer4))) != KS_STATUS_SUCCESS) { + ks_dhtrt_release_querynodes(&query); + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 202, + "Internal compact v4 nodeinfo error"); + goto done; + } ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port); @@ -2117,7 +2256,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess &qn->addr, buffer6, &buffer6_length, - sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done; + sizeof(buffer6))) != KS_STATUS_SUCCESS) { + ks_dhtrt_release_querynodes(&query); + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 202, + "Internal compact v6 nodeinfo error"); + goto done; + } ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port); @@ -2244,6 +2393,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job) { ks_dht_message_t *message = NULL; struct bencode *a = NULL; + ks_dht_storageitem_t *item = NULL; ks_assert(dht); ks_assert(job); @@ -2256,7 +2406,11 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job) &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available + ks_hash_read_lock(dht->storageitems_hash); + item = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED); + ks_hash_read_unlock(dht->storageitems_hash); + + if (item && item->mutable && item->seq > 0) ben_dict_set(a, ben_blob("seq", 3), ben_int(item->seq)); ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE)); //ks_log(KS_LOG_DEBUG, "Sending message query get\n"); @@ -2287,7 +2441,16 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ks_assert(message); ks_assert(message->args); - if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_utility_extract_nodeid(message->args, "target", &target)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 203, + "Message query get args missing required key 'target'"); + goto done; + } seq = ben_dict_get_by_str(message->args, "seq"); if (seq) sequence = ben_int_val(seq); @@ -2317,7 +2480,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t &qn->addr, buffer4, &buffer4_length, - sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done; + sizeof(buffer4))) != KS_STATUS_SUCCESS) { + ks_dhtrt_release_querynodes(&query); + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 202, + "Internal compact v4 nodeinfo error"); + goto done; + } ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port); @@ -2335,7 +2508,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t &qn->addr, buffer6, &buffer6_length, - sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done; + sizeof(buffer6))) != KS_STATUS_SUCCESS) { + ks_dhtrt_release_querynodes(&query); + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 202, + "Internal compact v6 nodeinfo error"); + goto done; + } ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port); @@ -2523,22 +2706,19 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t sequence, sig)) != KS_STATUS_SUCCESS) goto done; } - if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done; - item = ks_hash_search(dht->storageitems_hash, item->id.id, KS_UNLOCKED); + if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item); } else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); if (item) job->response_storageitem = item; else if (olditem) job->response_storageitem = olditem; done: - if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash); if (ret != KS_STATUS_SUCCESS) { - if (item) ks_dht_storageitem_destroy(&item); } + if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash); return ret; } -// @todo add a public function to add storageitem_t's to the store before calling this for authoring new data, reuse function in the "get" handlers // @todo add reference counting system to storageitem_t to know what to keep alive with reannouncements versus allowing to expire KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht, const ks_sockaddr_t *raddr, @@ -2620,12 +2800,49 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t ks_assert(message->args); - if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 203, + "Message query put args missing required key 'token'"); + goto done; + } - if ((ret = ks_dht_utility_extract_storageitem_pkey(message->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_utility_extract_storageitem_signature(message->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_utility_extract_storageitem_pkey(message->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 203, + "Message query put 'k' is malformed"); + goto done; + } + if ((ret = ks_dht_utility_extract_storageitem_signature(message->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 203, + "Message query put 'sig' is malformed"); + goto done; + } salt = ben_dict_get_by_str(message->args, "salt"); + if (salt && ben_str_len(salt) > KS_DHT_STORAGEITEM_SALT_MAX_SIZE) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 207, + "Message query put 'salt' is too large"); + goto done; + } seq = ben_dict_get_by_str(message->args, "seq"); if (seq) sequence = ben_int_val(seq); @@ -2635,6 +2852,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t if (seq && (!k || !sig)) { ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data\n"); + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 203, + "Message query put for mutable data must include both 'k' and 'sig'"); ret = KS_STATUS_ARG_INVALID; goto done; } @@ -2642,6 +2866,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t v = ben_dict_get_by_str(message->args, "v"); if (!v) { ks_log(KS_LOG_DEBUG, "Must provide v\n"); + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 203, + "Message query put args missing required key 'v'"); ret = KS_STATUS_ARG_INVALID; goto done; } @@ -2649,23 +2880,50 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t if (!seq) { // immutable - if ((ret = ks_dht_storageitem_target_immutable_internal(v, &target)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_storageitem_target_immutable_internal(v, &target)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 202, + "Internal storage item target immutable error"); + goto done; + } } else { // mutable - if ((ret = ks_dht_storageitem_target_mutable_internal(k, salt, &target)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_storageitem_target_mutable_internal(k, salt, &target)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 202, + "Internal storage item target mutable error"); + goto done; + } } + + ks_hash_write_lock(dht->storageitems_hash); + storageitems_locked = KS_TRUE; + olditem = ks_hash_search(dht->storageitems_hash, target.id, KS_UNLOCKED); if (!ks_dht_token_verify(dht, &message->raddr, &target, token)) { ks_log(KS_LOG_DEBUG, "Invalid token\n"); + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 203, + "Message query put token is invalid"); ret = KS_STATUS_FAIL; goto done; } //ks_log(KS_LOG_DEBUG, "Message query put is valid\n"); - ks_hash_write_lock(dht->storageitems_hash); - storageitems_locked = KS_TRUE; if (!seq) { // immutable @@ -2674,27 +2932,61 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t dht->pool, &target, v, - KS_TRUE)) != KS_STATUS_SUCCESS) goto done; + KS_TRUE)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 202, + "Internal storage item create immutable error"); + goto done; + } } else { // mutable if (!ks_dht_storageitem_signature_verify(sig, k, salt, seq, v)) { ks_log(KS_LOG_DEBUG, "Mutable data signature failed to verify\n"); + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 206, + "Message query put signature is invalid"); ret = KS_STATUS_FAIL; goto done; } if (olditem) { if (cas && olditem->seq != cas_seq) { - // @todo send 301 error instead of the response + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 301, + "Message query put cas mismatch"); goto done; } if (olditem->seq > sequence) { - // @todo send 302 error instead of the response + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 302, + "Message query put sequence is less than current"); goto done; } if (olditem->seq == sequence) { if (ben_cmp(olditem->v, v) != 0) { - // @todo send 201? error instead of the response + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 201, + "Message query put sequence is equal to current but values are different"); goto done; } } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig); @@ -2709,9 +3001,18 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t salt, KS_TRUE, sequence, - sig)) != KS_STATUS_SUCCESS) goto done; + sig)) != KS_STATUS_SUCCESS) { + ks_dht_error(dht, + message->endpoint, + &message->raddr, + message->transactionid, + message->transactionid_length, + 202, + "Internal storage item create mutable error"); + goto done; + } } - if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done; + if (item) ks_hash_insert(dht->storageitems_hash, item->id.id, item); if ((ret = ks_dht_response_setup(dht, message->endpoint, @@ -2725,10 +3026,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t ks_q_push(dht->send_q, (void *)response); done: - if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash); if (ret != KS_STATUS_SUCCESS) { - if (item) ks_dht_storageitem_destroy(&item); + if (item) ks_hash_remove(dht->storageitems_hash, item->id.id); } + if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash); return ret; } diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 7e8598b53f..7f869f67ea 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -18,7 +18,6 @@ KS_BEGIN_EXTERN_C #define KS_DHT_DATAGRAM_BUFFER_SIZE 1000 //#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF -#define KS_DHT_PULSE_EXPIRATIONS 1 #define KS_DHT_NODEID_SIZE 20 @@ -30,6 +29,8 @@ KS_BEGIN_EXTERN_C #define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256 #define KS_DHT_TRANSACTION_EXPIRATION 10 +#define KS_DHT_TRANSACTIONS_PULSE 1 + #define KS_DHT_SEARCH_EXPIRATION 10 #define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE @@ -40,7 +41,8 @@ KS_BEGIN_EXTERN_C #define KS_DHT_STORAGEITEM_EXPIRATION 7200 #define KS_DHT_TOKEN_SIZE SHA_DIGEST_LENGTH -#define KS_DHT_TOKENSECRET_EXPIRATION 300 +#define KS_DHT_TOKEN_EXPIRATION 300 +#define KS_DHT_TOKENS_PULSE 1 #define KS_DHTRT_MAXQUERYSIZE 20 @@ -257,12 +259,11 @@ struct ks_dht_s { ks_hash_t *registry_error; ks_dht_endpoint_t **endpoints; + int32_t endpoints_length; int32_t endpoints_size; ks_hash_t *endpoints_hash; struct pollfd *endpoints_poll; - ks_time_t pulse_expirations; - ks_q_t *send_q; ks_dht_message_t *send_q_unsent; uint8_t recv_buffer[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; // Add 1, if we receive it then overflow error @@ -272,7 +273,8 @@ struct ks_dht_s { ks_dht_job_t *jobs_first; ks_dht_job_t *jobs_last; - ks_mutex_t *tid_mutex; + ks_time_t transactions_pulse; + ks_mutex_t *transactionid_mutex; volatile uint32_t transactionid_next; ks_hash_t *transactions_hash; @@ -283,6 +285,7 @@ struct ks_dht_s { ks_dht_search_t *searches_first; ks_dht_search_t *searches_last; + ks_time_t tokens_pulse; volatile uint32_t token_secret_current; volatile uint32_t token_secret_previous; ks_time_t token_secret_expiration; @@ -323,9 +326,8 @@ KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t * @param dht pointer to the dht instance * @param value string of the type text under the 'y' key of a message * @param callback the callback to be called when a message matches - * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL */ -KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback); +KS_DECLARE(void) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback); /** * Register a callback for a specific message query. @@ -333,9 +335,8 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k * @param dht pointer to the dht instance * @param value string of the type text under the 'q' key of a message * @param callback the callback to be called when a message matches - * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL */ -KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback); +KS_DECLARE(void) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback); /** * Register a callback for a specific message error. @@ -343,9 +344,8 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, * @param dht pointer to the dht instance * @param value string of the errorcode under the first item of the 'e' key of a message * @param callback the callback to be called when a message matches - * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL */ -KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback); +KS_DECLARE(void) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback); /** * Bind a local address and port for receiving UDP datagrams. diff --git a/libs/libks/src/dht/ks_dht_search.c b/libs/libks/src/dht/ks_dht_search.c index 2417fd0c4b..313856d09a 100644 --- a/libs/libks/src/dht/ks_dht_search.c +++ b/libs/libks/src/dht/ks_dht_search.c @@ -23,11 +23,11 @@ KS_DECLARE(ks_status_t) ks_dht_search_create(ks_dht_search_t **search, ks_pool_t s->callback = callback; - ks_hash_create(&s->searched, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool); + ks_hash_create(&s->searched, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool); ks_assert(s->searched); ks_hash_set_keysize(s->searched, KS_DHT_NODEID_SIZE); - ks_hash_create(&s->searching, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool); + ks_hash_create(&s->searching, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, s->pool); ks_assert(s->searching); ks_hash_set_keysize(s->searching, KS_DHT_NODEID_SIZE);