From 8be2251b76777bbbbe8d77cd12eac6d9b06a71eb Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Tue, 20 Dec 2016 22:07:11 +0000 Subject: [PATCH] FS-9775: Started working on "put", ran into a bug in job states which is fixed now, adjusted find_node response to add nodes to the job --- libs/libks/Makefile.am | 2 +- libs/libks/src/dht/ks_dht-int.h | 19 +- libs/libks/src/dht/ks_dht.c | 396 ++++++++++++++++++++++-- libs/libks/src/dht/ks_dht.h | 35 ++- libs/libks/src/dht/ks_dht_bucket.c | 1 + libs/libks/src/dht/ks_dht_job.c | 42 ++- libs/libks/src/dht/ks_dht_message.c | 2 +- libs/libks/src/dht/ks_dht_search.c | 2 +- libs/libks/src/dht/ks_dht_storageitem.c | 49 ++- libs/libks/src/dht/ks_dht_transaction.c | 2 +- libs/libks/test/testdht2.c | 2 - 11 files changed, 480 insertions(+), 72 deletions(-) diff --git a/libs/libks/Makefile.am b/libs/libks/Makefile.am index ce21d3d898..6d8c5c466f 100644 --- a/libs/libks/Makefile.am +++ b/libs/libks/Makefile.am @@ -3,7 +3,7 @@ EXTRA_DIST = SUBDIRS = . test AUTOMAKE_OPTIONS = subdir-objects -AM_CFLAGS += -I$(top_srcdir)/src -I$(top_srcdir)/src/include -I$(top_srcdir)/crypt +AM_CFLAGS += -I$(top_srcdir)/src -I$(top_srcdir)/src/include -I$(top_srcdir)/crypt -O0 AM_CPPFLAGS = $(AM_CFLAGS) lib_LTLIBRARIES = libks.la diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 166af867c1..0c475a16ff 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -218,6 +218,7 @@ KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht); KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job); KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job); KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job); +KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job); KS_DECLARE(void *)ks_dht_process(ks_thread_t *thread, void *data); @@ -260,6 +261,18 @@ KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback, ks_dht_nodeid_t *target); +KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job, + ks_dht_job_callback_t query_callback, + ks_dht_job_callback_t finish_callback, + ks_dht_nodeid_t *target, + uint8_t *salt, + ks_size_t salt_length); +KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job, + ks_dht_job_callback_t query_callback, + ks_dht_job_callback_t finish_callback, + ks_dht_nodeid_t *target, + uint8_t *salt, + ks_size_t salt_length); KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job); @@ -290,13 +303,13 @@ KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending /** * */ -KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v); +KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, struct bencode *v); KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item, ks_pool_t *pool, + ks_dht_nodeid_t *target, struct bencode *v, ks_dht_storageitem_key_t *k, - uint8_t *salt, - ks_size_t salt_length, + struct bencode *salt, int64_t sequence, ks_dht_storageitem_signature_t *signature); KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item); diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index ff457c4078..b14a151237 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -102,7 +102,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread /** * Default expirations to not be checked for one pulse. */ - d->pulse_expirations = ks_time_now() + (KS_DHT_PULSE_EXPIRATIONS * 1000); + d->pulse_expirations = ks_time_now() + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC); /** * Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full. @@ -171,12 +171,12 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets. */ d->token_secret_current = d->token_secret_previous = rand(); - d->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000); + d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC); /** * Create the hash to store arbitrary data for BEP44. */ - ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); + ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool); ks_assert(d->storageitems_hash); /** @@ -678,7 +678,7 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) ks_assert(dht); if (dht->pulse_expirations > now) return; - dht->pulse_expirations = now + (KS_DHT_PULSE_EXPIRATIONS * 1000); + dht->pulse_expirations = now + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC); ks_hash_write_lock(dht->transactions_hash); for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { @@ -689,9 +689,9 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) ks_hash_this(it, &key, NULL, (void **)&value); if (value->finished) remove = KS_TRUE; else if (value->expiration <= now) { - // if the transaction expires, so does the attached job, it may try again with a new transaction + // if the transaction expires, so does the attached job, but the job may try again with a new transaction value->job->state = KS_DHT_JOB_STATE_EXPIRING; - ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid); + ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d, %d %d\n", value->transactionid, now, value->expiration); remove = KS_TRUE; } if (remove) { @@ -704,8 +704,10 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) if (dht->rt_ipv4) ks_dht_pulse_expirations_searches(dht, dht->searches4_hash); if (dht->rt_ipv6) ks_dht_pulse_expirations_searches(dht, dht->searches6_hash); + // @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?) + if (dht->token_secret_expiration && dht->token_secret_expiration <= now) { - dht->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000); + dht->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC); dht->token_secret_previous = dht->token_secret_current; dht->token_secret_current = rand(); } @@ -931,6 +933,82 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const return KS_STATUS_SUCCESS; } +KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *args, + ks_bool_t optional, + const char *key, + ks_dht_storageitem_key_t **sikey) +{ + struct bencode *k; + const char *kv; + ks_size_t kv_len; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(args); + ks_assert(key); + ks_assert(sikey); + + *sikey = NULL; + + k = ben_dict_get_by_str(args, key); + if (!k) { + if (!optional) { + ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key); + ret = KS_STATUS_ARG_INVALID; + } + goto done; + } + + kv = ben_str_val(k); + kv_len = ben_str_len(k); + if (kv_len != KS_DHT_STORAGEITEM_KEY_SIZE) { + ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, kv_len); + return KS_STATUS_ARG_INVALID; + } + + *sikey = (ks_dht_storageitem_key_t *)kv; + + done: + return ret; +} + +KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_signature(struct bencode *args, + ks_bool_t optional, + const char *key, + ks_dht_storageitem_signature_t **signature) +{ + struct bencode *sig; + const char *sigv; + ks_size_t sigv_len; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(args); + ks_assert(key); + ks_assert(signature); + + *signature = NULL; + + sig = ben_dict_get_by_str(args, key); + if (!sig) { + if (!optional) { + ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key); + ret = KS_STATUS_ARG_INVALID; + } + goto done; + } + + sigv = ben_str_val(sig); + sigv_len = ben_str_len(sig); + if (sigv_len != KS_DHT_STORAGEITEM_SIGNATURE_SIZE) { + ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, sigv_len); + return KS_STATUS_ARG_INVALID; + } + + *signature = (ks_dht_storageitem_signature_t *)sigv; + + done: + return ret; +} + KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token) { @@ -968,6 +1046,44 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, const ks_sockaddr_t *ra return memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0; } +KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(struct bencode *value, ks_dht_nodeid_t *target) +{ + SHA_CTX sha; + const uint8_t *v; + size_t v_len; + + ks_assert(value); + ks_assert(target); + + v = (const uint8_t *)ben_str_val(value); + v_len = ben_str_len(value); + + if (!SHA1_Init(&sha) || + !SHA1_Update(&sha, v, v_len) || + !SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_key_t *k, struct bencode *salt, ks_dht_nodeid_t *target) +{ + SHA_CTX sha; + + ks_assert(k); + ks_assert(target); + + + if (!SHA1_Init(&sha) || + !SHA1_Update(&sha, k->key, KS_DHT_STORAGEITEM_KEY_SIZE)) return KS_STATUS_FAIL; + if (salt) { + const uint8_t *s = (const uint8_t *)ben_str_val(salt); + size_t s_len = ben_str_len(salt); + if (s_len > 0 && !SHA1_Update(&sha, s, s_len)) return KS_STATUS_FAIL; + } + if (!SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL; + + return KS_STATUS_SUCCESS; +} KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) { @@ -1026,7 +1142,6 @@ KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht, ks_mutex_unlock(dht->tid_mutex); if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done; - if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; // if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done; @@ -1248,6 +1363,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t tid = (uint32_t *)message->transactionid; transactionid = ntohl(*tid); + ks_log(KS_LOG_DEBUG, "Message response transaction id %d\n", transactionid); transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED); ks_hash_read_unlock(dht->transactions_hash); @@ -1264,6 +1380,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t transaction->job->state = KS_DHT_JOB_STATE_PROCESSING; message->transaction = transaction; if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING; + transaction->job->response = NULL; // message is destroyed after we return, stop using it transaction->finished = KS_TRUE; } @@ -1523,6 +1640,7 @@ KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht) jobn = job->next; switch (job->state) { case KS_DHT_JOB_STATE_QUERYING: + job->state = KS_DHT_JOB_STATE_RESPONDING; if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING; break; case KS_DHT_JOB_STATE_RESPONDING: @@ -1675,12 +1793,12 @@ KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job) &message, &a)) != KS_STATUS_SUCCESS) goto done; - //memcpy(transaction->target.id, job->target.id, KS_DHT_NODEID_SIZE); - transaction->target = job->target; + //memcpy(transaction->target.id, job->query_target.id, KS_DHT_NODEID_SIZE); + //transaction->target = job->query_target; - ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE)); + ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE)); // Only request both v4 and v6 if we have both interfaces bound and are looking for our own node id, aka bootstrapping - if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, job->target.id, KS_DHT_NODEID_SIZE)) { + if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, job->query_target.id, KS_DHT_NODEID_SIZE)) { struct bencode *want = ben_list(); ben_list_append_str(want, "n4"); ben_list_append_str(want, "n6"); @@ -1812,13 +1930,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j ks_assert(job); n = ben_dict_get_by_str(job->response->args, "nodes"); - if (n) { + if (n && dht->rt_ipv4) { //n4 = KS_TRUE; nodes = (const uint8_t *)ben_str_val(n); nodes_size = ben_str_len(n); } n = ben_dict_get_by_str(job->response->args, "nodes6"); - if (n) { + if (n && dht->rt_ipv6) { //n6 = KS_TRUE; nodes6 = (const uint8_t *)ben_str_val(n); nodes6_size = ben_str_len(n); @@ -1827,7 +1945,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j searches = job->response->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash; ks_hash_read_lock(searches); - search = ks_hash_search(searches, job->response->transaction->target.id, KS_UNLOCKED); + search = ks_hash_search(searches, job->query_target.id, KS_UNLOCKED); if (search) { ks_dht_search_pending_t *pending = NULL; @@ -1852,8 +1970,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); - ks_dhtrt_release_node(node); + job->response_nodes[job->response_nodes_count++] = node; + // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6 if (search && job->response->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) { ks_dht_nodeid_t distance; int32_t results_index = -1; @@ -1913,8 +2032,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); - ks_dhtrt_release_node(node); + job->response_nodes6[job->response_nodes6_count++] = node; + // @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6 if (search && job->response->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) { ks_dht_nodeid_t distance; int32_t results_index = -1; @@ -1961,12 +2081,35 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n"); + job->state = KS_DHT_JOB_STATE_COMPLETING; + done: if(search) ks_mutex_unlock(search->mutex); return ret; } -// @todo ks_dht_get + +KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + ks_dht_nodeid_t *target, + uint8_t *salt, + ks_size_t salt_length) +{ + ks_dht_job_t *job = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(target); + + if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done; + ks_dht_job_build_get(job, ks_dht_query_get, callback, target, salt, salt_length); + ks_dht_jobs_add(dht, job); + + done: + return ret; +} KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job) { @@ -1985,7 +2128,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job) &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available - ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE)); + ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE)); ks_log(KS_LOG_DEBUG, "Sending message query get\n"); ks_q_push(dht->send_q, (void *)message); @@ -2024,7 +2167,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token); - item = ks_hash_search(dht->storageitems_hash, target->id, KS_READLOCKED); + ks_hash_read_lock(dht->storageitems_hash); + item = ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED); ks_hash_read_unlock(dht->storageitems_hash); sequence_snuffed = item && sequence >= 0 && item->seq <= sequence; @@ -2100,28 +2244,224 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t *job) { - ks_dht_token_t *token; + ks_dht_storageitem_t *item = NULL; + ks_dht_token_t *token = NULL; + ks_dht_storageitem_key_t *k = NULL; + ks_dht_storageitem_signature_t *sig = NULL; + struct bencode *seq; + int64_t sequence = -1; + struct bencode *v = NULL; + //ks_size_t v_len = 0; + struct bencode *n; + ks_dht_node_t *node = NULL; + const uint8_t *nodes = NULL; + const uint8_t *nodes6 = NULL; + size_t nodes_size = 0; + size_t nodes6_size = 0; + size_t nodes_len = 0; + size_t nodes6_len = 0; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + ks_bool_t storageitems_locked = KS_FALSE; + ks_dht_storageitem_t *olditem = NULL; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(job); - // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done; + job->response_token = *token; - // @todo add extract function for mutable ks_dht_storageitem_key_t - // @todo add extract function for mutable ks_dht_storageitem_signature_t + if ((ret = ks_dht_utility_extract_storageitem_key(job->response->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_utility_extract_storageitem_signature(job->response->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done; - // @todo add/touch bucket entries for other nodes/nodes6 returned + seq = ben_dict_get_by_str(job->response->args, "seq"); + if (seq) sequence = ben_int_val(seq); + + if (seq && ((k && !sig) || (!k && sig))) { + ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data"); + ret = KS_STATUS_ARG_INVALID; + goto done; + } + + v = ben_dict_get_by_str(job->response->args, "v"); + //if (v) v_len = ben_str_len(v); + + n = ben_dict_get_by_str(job->response->args, "nodes"); + if (n && dht->rt_ipv4) { + nodes = (const uint8_t *)ben_str_val(n); + nodes_size = ben_str_len(n); + } + n = ben_dict_get_by_str(job->response->args, "nodes6"); + if (n && dht->rt_ipv6) { + nodes6 = (const uint8_t *)ben_str_val(n); + nodes6_size = ben_str_len(n); + } ks_log(KS_LOG_DEBUG, "Message response get is reached\n"); + while (nodes_len < nodes_size) { + ks_dht_nodeid_t nid; + ks_sockaddr_t addr; + + addr.family = AF_INET; + if ((ret = ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done; + + ks_log(KS_LOG_DEBUG, + "Expanded ipv4 nodeinfo for %s (%s %d)\n", + ks_dht_hexid(&nid, id_buf), + addr.host, + addr.port); + + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); + ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); + job->response_nodes[job->response_nodes_count++] = node; + } + while (nodes6_len < nodes6_size) { + ks_dht_nodeid_t nid; + ks_sockaddr_t addr; + + addr.family = AF_INET6; + if ((ret = ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done; + + ks_log(KS_LOG_DEBUG, + "Expanded ipv6 nodeinfo for %s (%s %d)\n", + ks_dht_hexid(&nid, id_buf), + addr.host, + addr.port); + + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); + ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); + job->response_nodes6[job->response_nodes6_count++] = node; + } + + ks_hash_write_lock(dht->storageitems_hash); + storageitems_locked = KS_TRUE; + olditem = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED); + + if (v) { + ks_dht_nodeid_t tmptarget; + + if (!seq) { + // immutable + if ((ret = ks_dht_storageitem_target_immutable(v, &tmptarget)) != KS_STATUS_SUCCESS) goto done; + if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) { + ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n"); + ret = KS_STATUS_FAIL; + goto done; + } + if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); + else if ((ret = ks_dht_storageitem_create_immutable(&item, + dht->pool, + &tmptarget, + v)) != KS_STATUS_SUCCESS) goto done; + } else { + // mutable + struct bencode *tmp = NULL; + uint8_t *tmpsig = NULL; + size_t tmpsig_len = 0; + int32_t res = 0; + + if ((ret = ks_dht_storageitem_target_mutable(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done; + if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) { + ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n"); + ret = KS_STATUS_FAIL; + goto done; + } + + tmp = ben_dict(); + if (job->query_salt) ben_dict_set(tmp, ben_blob("salt", 4), ben_clone(job->query_salt)); + ben_dict_set(tmp, ben_blob("seq", 3), ben_clone(seq)); + ben_dict_set(tmp, ben_blob("v", 1), ben_clone(v)); + tmpsig = ben_encode(&tmpsig_len, tmp); + ben_free(tmp); + + res = crypto_sign_verify_detached(sig->sig, tmpsig, tmpsig_len, k->key); + + free(tmpsig); + + if (res) { + ks_log(KS_LOG_DEBUG, "Immutable data signature failed to verify\n"); + ret = KS_STATUS_FAIL; + goto done; + } + + if (olditem) { + if (olditem->seq >= sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); + else { + ks_hash_remove(dht->storageitems_hash, olditem->id.id); + olditem = NULL; + } + } + if (!olditem && (ret = ks_dht_storageitem_create_mutable(&item, + dht->pool, + &tmptarget, + v, + k, + job->query_salt, + sequence, + sig)) != KS_STATUS_SUCCESS) goto done; + } + if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done; + } else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); + + if (item) job->response_storageitem = item; + else if (olditem) job->response_storageitem = olditem; + + job->state = KS_DHT_JOB_STATE_COMPLETING; + + done: + if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash); + if (ret != KS_STATUS_SUCCESS) { + if (item) ks_dht_storageitem_destroy(&item); + } + return ret; +} + +KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + ks_dht_nodeid_t *target, + uint8_t *salt, + ks_size_t salt_length) +{ + ks_dht_job_t *job = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(target); + + if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done; + ks_dht_job_build_put(job, ks_dht_query_put, callback, target, salt, salt_length); + ks_dht_jobs_add(dht, job); + done: return ret; } -// @todo ks_dht_put -// @todo ks_dht_query_put +KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job) +{ + ks_dht_message_t *message = NULL; + struct bencode *a = NULL; + + ks_assert(dht); + ks_assert(job); + + if (ks_dht_query_setup(dht, + job, + "put", + ks_dht_process_response_put, + NULL, + &message, + &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + + + ks_log(KS_LOG_DEBUG, "Sending message query put\n"); + ks_q_push(dht->send_q, (void *)message); + + return KS_STATUS_SUCCESS; +} KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message) { @@ -2159,6 +2499,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t ks_log(KS_LOG_DEBUG, "Message response put is reached\n"); + job->state = KS_DHT_JOB_STATE_COMPLETING; + // done: return ret; } diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 4e4f07456b..64fd8937af 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -22,6 +22,8 @@ KS_BEGIN_EXTERN_C #define KS_DHT_NODEID_SIZE 20 +#define KS_DHT_RESPONSE_NODES_MAX_SIZE 8 + #define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20 #define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20 #define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20 @@ -34,6 +36,7 @@ KS_BEGIN_EXTERN_C #define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES #define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64 #define KS_DHT_STORAGEITEM_SIGNATURE_SIZE crypto_sign_BYTES +#define KS_DHT_STORAGEITEM_EXPIRATION 7200 #define KS_DHT_TOKEN_SIZE SHA_DIGEST_LENGTH #define KS_DHT_TOKENSECRET_EXPIRATION 300 @@ -92,6 +95,10 @@ struct ks_dht_node_s { ks_rwl_t *reflock; }; +struct ks_dht_token_s { + uint8_t token[KS_DHT_TOKEN_SIZE]; +}; + enum ks_dht_job_state_t { KS_DHT_JOB_STATE_QUERYING, KS_DHT_JOB_STATE_RESPONDING, @@ -124,7 +131,16 @@ struct ks_dht_job_s { //ks_dht_nodeid_t response_id; // job specific query parameters - ks_dht_nodeid_t target; + ks_dht_nodeid_t query_target; + struct bencode *query_salt; + + // job specific response parameters + ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE]; + ks_size_t response_nodes_count; + ks_dht_node_t *response_nodes6[KS_DHT_RESPONSE_NODES_MAX_SIZE]; + ks_size_t response_nodes6_count; + ks_dht_token_t response_token; + ks_dht_storageitem_t *response_storageitem; }; struct ks_dhtrt_routetable_s { @@ -142,10 +158,6 @@ struct ks_dhtrt_querynodes_s { ks_dht_node_t* nodes[ KS_DHTRT_MAXQUERYSIZE ]; /* out: array of peers (ks_dht_node_t* nodes[incount]) */ }; -struct ks_dht_token_s { - uint8_t token[KS_DHT_TOKEN_SIZE]; -}; - struct ks_dht_storageitem_key_s { uint8_t key[KS_DHT_STORAGEITEM_KEY_SIZE]; }; @@ -181,7 +193,7 @@ struct ks_dht_transaction_s { ks_pool_t *pool; ks_dht_job_t *job; uint32_t transactionid; - ks_dht_nodeid_t target; // @todo look at moving this into job now + //ks_dht_nodeid_t target; // @todo look at moving this into job now ks_dht_job_callback_t callback; ks_time_t expiration; ks_bool_t finished; @@ -209,14 +221,13 @@ struct ks_dht_search_pending_s { struct ks_dht_storageitem_s { ks_pool_t *pool; ks_dht_nodeid_t id; - // @todo ks_time_t expiration; + ks_time_t expiration; struct bencode *v; ks_bool_t mutable; ks_dht_storageitem_key_t pk; ks_dht_storageitem_key_t sk; - uint8_t salt[KS_DHT_STORAGEITEM_SALT_MAX_SIZE]; - ks_size_t salt_length; + struct bencode *salt; int64_t seq; ks_dht_storageitem_signature_t sig; }; @@ -352,6 +363,12 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout); KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback); KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target); +KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + ks_dht_nodeid_t *target, + uint8_t *salt, + ks_size_t salt_length); /** * Create a network search of the closest nodes to a target. diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index a04717ada8..f0d2bbadd0 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -1469,6 +1469,7 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings); #endif ks_dht_node_t* node = entry->gptr; + ks_log(KS_LOG_DEBUG, "Node addr %s %d\n", node->addr.host, node->addr.port); ks_dht_ping(internal->dht, &node->addr, NULL); return; diff --git a/libs/libks/src/dht/ks_dht_job.c b/libs/libks/src/dht/ks_dht_job.c index c99556aa39..d92b5a62a7 100644 --- a/libs/libks/src/dht/ks_dht_job.c +++ b/libs/libks/src/dht/ks_dht_job.c @@ -38,6 +38,7 @@ KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job, KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback) { ks_assert(job); + ks_assert(query_callback); job->query_callback = query_callback; job->finish_callback = finish_callback; @@ -49,11 +50,46 @@ KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job, ks_dht_nodeid_t *target) { ks_assert(job); + ks_assert(query_callback); ks_assert(target); job->query_callback = query_callback; job->finish_callback = finish_callback; - job->target = *target; + job->query_target = *target; +} + +KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job, + ks_dht_job_callback_t query_callback, + ks_dht_job_callback_t finish_callback, + ks_dht_nodeid_t *target, + uint8_t *salt, + ks_size_t salt_length) +{ + ks_assert(job); + ks_assert(query_callback); + ks_assert(target); + + job->query_callback = query_callback; + job->finish_callback = finish_callback; + job->query_target = *target; + if (salt && salt_length > 0) job->query_salt = ben_blob(salt, salt_length); +} + +KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job, + ks_dht_job_callback_t query_callback, + ks_dht_job_callback_t finish_callback, + ks_dht_nodeid_t *target, + uint8_t *salt, + ks_size_t salt_length) +{ + ks_assert(job); + ks_assert(query_callback); + ks_assert(target); + + job->query_callback = query_callback; + job->finish_callback = finish_callback; + job->query_target = *target; + if (salt && salt_length > 0) job->query_salt = ben_blob(salt, salt_length); } KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job) @@ -65,6 +101,10 @@ KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job) j = *job; + if (j->query_salt) ben_free(j->query_salt); + for (int32_t i = 0; i < j->response_nodes_count; ++i) ks_dhtrt_release_node(j->response_nodes[i]); + for (int32_t i = 0; i < j->response_nodes6_count; ++i) ks_dhtrt_release_node(j->response_nodes6[i]); + ks_pool_free(j->pool, job); } diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index 7997b41c04..e9385cb74b 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -88,7 +88,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui memcpy(message->transactionid, tv, tv_len); message->transactionid_length = tv_len; // @todo hex output of transactionid - //ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", *transactionid); + //ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", message->transactionid); y = ben_dict_get_by_str(message->data, "y"); if (!y) { diff --git a/libs/libks/src/dht/ks_dht_search.c b/libs/libks/src/dht/ks_dht_search.c index e15d726f3b..3dc838545c 100644 --- a/libs/libks/src/dht/ks_dht_search.c +++ b/libs/libks/src/dht/ks_dht_search.c @@ -92,7 +92,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **p p->pool = pool; p->nodeid = *nodeid; - p->expiration = ks_time_now() + (KS_DHT_SEARCH_EXPIRATION * 1000); + p->expiration = ks_time_now() + ((ks_time_t)KS_DHT_SEARCH_EXPIRATION * KS_USEC_PER_SEC); p->finished = KS_FALSE; // done: diff --git a/libs/libks/src/dht/ks_dht_storageitem.c b/libs/libks/src/dht/ks_dht_storageitem.c index ce34bde9f9..f34783c28f 100644 --- a/libs/libks/src/dht/ks_dht_storageitem.c +++ b/libs/libks/src/dht/ks_dht_storageitem.c @@ -2,12 +2,9 @@ #include "ks_dht-int.h" #include "sodium.h" -KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v) +KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, struct bencode *v) { ks_dht_storageitem_t *si; - SHA_CTX sha; - size_t enc_len = 0; - uint8_t *enc = NULL; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(item); @@ -19,16 +16,18 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t ks_assert(si); si->pool = pool; + si->id = *target; si->mutable = KS_FALSE; + si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); si->v = ben_clone(v); ks_assert(si->v); - - enc = ben_encode(&enc_len, si->v); - ks_assert(enc); - SHA1_Init(&sha); - SHA1_Update(&sha, enc, enc_len); - SHA1_Final(si->id.id, &sha); - free(enc); + + //enc = ben_encode(&enc_len, si->v); + //ks_assert(enc); + //SHA1_Init(&sha); + //SHA1_Update(&sha, enc, enc_len); + //SHA1_Final(si->id.id, &sha); + //free(enc); // done: if (ret != KS_STATUS_SUCCESS) { @@ -39,15 +38,14 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item, ks_pool_t *pool, + ks_dht_nodeid_t *target, struct bencode *v, ks_dht_storageitem_key_t *k, - uint8_t *salt, - ks_size_t salt_length, + struct bencode *salt, int64_t sequence, ks_dht_storageitem_signature_t *signature) { ks_dht_storageitem_t *si; - SHA_CTX sha; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(item); @@ -55,30 +53,25 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t * ks_assert(v); ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE); ks_assert(k); - ks_assert(!(!salt && salt_length > 0)); - ks_assert(!(salt_length > KS_DHT_STORAGEITEM_SIGNATURE_SIZE)); ks_assert(signature); *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t)); ks_assert(si); si->pool = pool; + si->id = *target; si->mutable = KS_TRUE; + si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); si->v = ben_clone(v); ks_assert(si->v); - memcpy(si->pk.key, k->key, KS_DHT_STORAGEITEM_KEY_SIZE); - if (salt && salt_length > 0) { - memcpy(si->salt, salt, salt_length); - si->salt_length = salt_length; + si->pk = *k; + if (salt) { + si->salt = ben_clone(salt); + ks_assert(si->salt); } si->seq = sequence; - memcpy(si->sig.sig, signature->sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE); - - SHA1_Init(&sha); - SHA1_Update(&sha, si->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE); - if (si->salt && si->salt_length > 0) SHA1_Update(&sha, si->salt, si->salt_length); - SHA1_Final(si->id.id, &sha); + si->sig = *signature; // done: if (ret != KS_STATUS_SUCCESS) { @@ -103,6 +96,10 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item) ben_free(si->v); si->v = NULL; } + if (si->salt) { + ben_free(si->salt); + si->salt = NULL; + } ks_pool_free(si->pool, item); } diff --git a/libs/libks/src/dht/ks_dht_transaction.c b/libs/libks/src/dht/ks_dht_transaction.c index 0f0458329c..39dcd6a4d1 100644 --- a/libs/libks/src/dht/ks_dht_transaction.c +++ b/libs/libks/src/dht/ks_dht_transaction.c @@ -21,7 +21,7 @@ KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transac t->job = job; t->transactionid = transactionid; t->callback = callback; - t->expiration = ks_time_now() + (KS_DHT_TRANSACTION_EXPIRATION * 1000); + t->expiration = ks_time_now() + ((ks_time_t)KS_DHT_TRANSACTION_EXPIRATION * KS_USEC_PER_SEC); // done: if (ret != KS_STATUS_SUCCESS) { diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index d160a0cc45..7472c41ce7 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -132,7 +132,6 @@ int main() { //err = ks_dht_process(dht1, &raddr); //ok(err == KS_STATUS_SUCCESS); - diag("Ping test\n"); //ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1 @@ -163,7 +162,6 @@ int main() { diag("Find_Node test\n"); - //ks_dht_send_findnode(dht3, ep3, &raddr1, &ep2->nodeid); // Queue findnode from dht3 to dht1 ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid); ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1