From 5dfd6d1b8f8f2dae3041c845c6b7b20610a6c60e Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Tue, 27 Dec 2016 04:27:35 +0000 Subject: [PATCH] FS-9775: Bug fixes and exposed interface changes while implementing tests for get/put which are functional and pass initial tests now. Deep searching needs to be revamped now to complete the full announcing process. --- libs/libks/src/dht/ks_dht-int.h | 73 ++- libs/libks/src/dht/ks_dht.c | 615 ++++++++++++++++++------ libs/libks/src/dht/ks_dht.h | 127 +++-- libs/libks/src/dht/ks_dht_job.c | 14 +- libs/libks/src/dht/ks_dht_message.c | 2 +- libs/libks/src/dht/ks_dht_search.c | 6 +- libs/libks/src/dht/ks_dht_storageitem.c | 137 ++++-- libs/libks/test/testdht2.c | 174 +++++-- 8 files changed, 860 insertions(+), 288 deletions(-) diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 0c475a16ff..89e3643ee5 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -41,7 +41,7 @@ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht); * @param buffer pointer to the buffer able to contain at least (KS_DHT_NODEID_SIZE * 2) + 1 characters * @return The pointer to the front of the populated string buffer */ -KS_DECLARE(char *) ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer); +KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len); /** * Compacts address information as per the DHT specifications. @@ -270,9 +270,9 @@ KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job, KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback, - ks_dht_nodeid_t *target, - uint8_t *salt, - ks_size_t salt_length); + ks_dht_token_t *token, + int64_t cas, + ks_dht_storageitem_t *item); KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job); @@ -286,7 +286,33 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_create(ks_dht_endpoint_t **endpoint, ks_socket_t sock); KS_DECLARE(void) ks_dht_endpoint_destroy(ks_dht_endpoint_t **endpoint); +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message, + ks_pool_t *pool, + ks_dht_endpoint_t *endpoint, + const ks_sockaddr_t *raddr, + ks_bool_t alloc_data); +/** + * + */ +KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message); +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length); + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, + uint8_t *transactionid, + ks_size_t transactionid_length, + struct bencode **args); + + /** * */ @@ -299,19 +325,50 @@ KS_DECLARE(void) ks_dht_search_expire(ks_dht_search_t *search, ks_hash_t *pendin KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **pending, ks_pool_t *pool, const ks_dht_nodeid_t *nodeid); KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending); +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable_internal(struct bencode *value, ks_dht_nodeid_t *target); /** * */ -KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, struct bencode *v); +KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable_internal(ks_dht_storageitem_pkey_t *pk, struct bencode *salt, ks_dht_nodeid_t *target); + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable_internal(ks_dht_storageitem_t **item, + ks_pool_t *pool, + ks_dht_nodeid_t *target, + struct bencode *v, + ks_bool_t clone_v); +KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, + ks_pool_t *pool, + ks_dht_nodeid_t *target, + const uint8_t *value, + ks_size_t value_length); +KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable_internal(ks_dht_storageitem_t **item, + ks_pool_t *pool, + ks_dht_nodeid_t *target, + struct bencode *v, + ks_bool_t clone_v, + ks_dht_storageitem_pkey_t *pk, + struct bencode *salt, + ks_bool_t clone_salt, + int64_t sequence, + ks_dht_storageitem_signature_t *signature); KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, - struct bencode *v, - ks_dht_storageitem_key_t *k, - struct bencode *salt, + const uint8_t *value, + ks_size_t value_length, + ks_dht_storageitem_pkey_t *pk, + const uint8_t *salt, + ks_size_t salt_length, int64_t sequence, ks_dht_storageitem_signature_t *signature); +KS_DECLARE(void) ks_dht_storageitem_update_mutable(ks_dht_storageitem_t *item, struct bencode *v, int64_t sequence, ks_dht_storageitem_signature_t *signature); KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item); /** diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index 50ed2c6d77..b54e2ad2c4 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -211,10 +211,9 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) */ if (d->storageitems_hash) { for (it = ks_hash_first(d->storageitems_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - ks_dht_storageitem_t *val; - - ks_hash_this_val(it, (void **)&val); - + const void *key = NULL; + ks_dht_storageitem_t *val = NULL; + ks_hash_this(it, &key, NULL, (void **)&val); ks_dht_storageitem_destroy(&val); } ks_hash_destroy(&d->storageitems_hash); @@ -232,18 +231,18 @@ KS_DECLARE(void) ks_dht_destroy(ks_dht_t **dht) */ if (d->searches6_hash) { for (it = ks_hash_first(d->searches6_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - ks_dht_search_t *val; - - ks_hash_this_val(it, (void **)&val); + const void *key = NULL; + ks_dht_search_t *val = NULL; + ks_hash_this(it, &key, NULL, (void **)&val); ks_dht_search_destroy(&val); } ks_hash_destroy(&d->searches6_hash); } if (d->searches4_hash) { for (it = ks_hash_first(d->searches4_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - ks_dht_search_t *val; - - ks_hash_this_val(it, (void **)&val); + const void *key = NULL; + ks_dht_search_t *val = NULL; + ks_hash_this(it, &key, NULL, (void **)&val); ks_dht_search_destroy(&val); } ks_hash_destroy(&d->searches4_hash); @@ -652,8 +651,8 @@ KS_DECLARE(void) ks_dht_pulse_expirations_searches(ks_dht_t *dht, ks_hash_t *sea char id2_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_log(KS_LOG_DEBUG, "Search for %s pending find_node to %s has expired without response\n", - ks_dht_hexid(&value->target, id_buf), - ks_dht_hexid(&v->nodeid, id2_buf)); + ks_dht_hex(value->target.id, id_buf, KS_DHT_NODEID_SIZE), + ks_dht_hex(v->nodeid.id, id2_buf, KS_DHT_NODEID_SIZE)); v->finished = KS_TRUE; continue; } @@ -691,7 +690,7 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) else if (value->expiration <= now) { // if the transaction expires, so does the attached job, but the job may try again with a new transaction value->job->state = KS_DHT_JOB_STATE_EXPIRING; - ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d, %d %d\n", value->transactionid, now, value->expiration); + ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid); remove = KS_TRUE; } if (remove) { @@ -736,16 +735,16 @@ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht) } } -KS_DECLARE(char *) ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer) +KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len) { char *t = buffer; - ks_assert(id); + ks_assert(data); ks_assert(buffer); - memset(buffer, 0, KS_DHT_NODEID_SIZE * 2 + 1); + memset(buffer, 0, len * 2 + 1); - for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i, t += 2) sprintf(t, "%02X", id->id[i]); + for (int i = 0; i < len; ++i, t += 2) sprintf(t, "%02X", data[i]); return buffer; } @@ -933,10 +932,10 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *args, - ks_bool_t optional, - const char *key, - ks_dht_storageitem_key_t **sikey) +KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_pkey(struct bencode *args, + ks_bool_t optional, + const char *key, + ks_dht_storageitem_pkey_t **pkey) { struct bencode *k; const char *kv; @@ -945,9 +944,9 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *a ks_assert(args); ks_assert(key); - ks_assert(sikey); + ks_assert(pkey); - *sikey = NULL; + *pkey = NULL; k = ben_dict_get_by_str(args, key); if (!k) { @@ -960,12 +959,12 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *a kv = ben_str_val(k); kv_len = ben_str_len(k); - if (kv_len != KS_DHT_STORAGEITEM_KEY_SIZE) { + if (kv_len != KS_DHT_STORAGEITEM_PKEY_SIZE) { ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, kv_len); return KS_STATUS_ARG_INVALID; } - *sikey = (ks_dht_storageitem_key_t *)kv; + *pkey = (ks_dht_storageitem_pkey_t *)kv; done: return ret; @@ -1046,35 +1045,54 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, const ks_sockaddr_t *ra return memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0; } -KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(struct bencode *value, ks_dht_nodeid_t *target) +KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable_internal(struct bencode *value, ks_dht_nodeid_t *target) { SHA_CTX sha; - const uint8_t *v; + uint8_t *v = NULL; size_t v_len; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(value); ks_assert(target); - v = (const uint8_t *)ben_str_val(value); - v_len = ben_str_len(value); - + v = ben_encode(&v_len, value); if (!SHA1_Init(&sha) || !SHA1_Update(&sha, v, v_len) || - !SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL; + !SHA1_Final(target->id, &sha)) { + ret = KS_STATUS_FAIL; + } + free(v); - return KS_STATUS_SUCCESS; + return ret; } -KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_key_t *k, struct bencode *salt, ks_dht_nodeid_t *target) +KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(const uint8_t *value, ks_size_t value_length, ks_dht_nodeid_t *target) +{ + struct bencode *v = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(value); + ks_assert(value_length > 0); + ks_assert(target); + + v = ben_blob(value, value_length); + ret = ks_dht_storageitem_target_immutable_internal(v, target); + ben_free(v); + + return ret; +} + +KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable_internal(ks_dht_storageitem_pkey_t *pk, struct bencode *salt, ks_dht_nodeid_t *target) { SHA_CTX sha; + //char buf1[KS_DHT_NODEID_SIZE * 2 + 1]; - ks_assert(k); + ks_assert(pk); ks_assert(target); if (!SHA1_Init(&sha) || - !SHA1_Update(&sha, k->key, KS_DHT_STORAGEITEM_KEY_SIZE)) return KS_STATUS_FAIL; + !SHA1_Update(&sha, pk->key, KS_DHT_STORAGEITEM_PKEY_SIZE)) return KS_STATUS_FAIL; if (salt) { const uint8_t *s = (const uint8_t *)ben_str_val(salt); size_t s_len = ben_str_len(salt); @@ -1082,9 +1100,156 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_key } if (!SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL; + //ks_log(KS_LOG_DEBUG, "Mutable ID: %s\n", ks_dht_hex(target->id, buf1, KS_DHT_NODEID_SIZE)); return KS_STATUS_SUCCESS; } +KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_pkey_t *pk, const uint8_t *salt, ks_size_t salt_length, ks_dht_nodeid_t *target) +{ + struct bencode *s = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(pk); + ks_assert(target); + + if (salt && salt_length > 0) s = ben_blob(salt, salt_length); + ret = ks_dht_storageitem_target_mutable_internal(pk, s, target); + if (s) ben_free(s); + + return ret; +} + +KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_encode(uint8_t **encoded, + ks_size_t *encoded_length, + struct bencode *salt, + struct bencode *seq, + struct bencode *v) +{ + char *enc = NULL; + char *salt_enc = NULL; + size_t salt_enc_length = 0; + char *seq_enc = NULL; + size_t seq_enc_length = 0; + char *v_enc = NULL; + size_t v_enc_length = 0; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(encoded); + ks_assert(encoded_length); + ks_assert(seq); + ks_assert(v); + + if (salt) salt_enc = ben_encode(&salt_enc_length, salt); + seq_enc = ben_encode(&seq_enc_length, seq); + v_enc = ben_encode(&v_enc_length, v); + + *encoded_length = (salt ? 6 : 0) + // 4:salt + salt_enc_length + + 5 + // 3:seq + seq_enc_length + + 3 + // 1:v + v_enc_length; + enc = malloc((*encoded_length) + 1); + *encoded = (uint8_t *)enc; + enc[0] = '\0'; + + if (salt) { + strncat(enc, "4:salt", 6); + strncat(enc, salt_enc, salt_enc_length); + } + strncat(enc, "3:seq", 5); + strncat(enc, seq_enc, seq_enc_length); + strncat(enc, "1:v", 3); + strncat(enc, v_enc, v_enc_length); + + return ret; +} + +KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate_internal(ks_dht_storageitem_signature_t *sig, + ks_dht_storageitem_skey_t *sk, + struct bencode *salt, + struct bencode *seq, + struct bencode *v) +{ + uint8_t *tmpsig = NULL; + size_t tmpsig_len = 0; + ks_status_t ret = KS_STATUS_SUCCESS; + //char buf1[KS_DHT_STORAGEITEM_SIGNATURE_SIZE * 2 + 1]; + + ks_assert(sig); + ks_assert(sk); + ks_assert(seq); + ks_assert(v); + + if ((ret = ks_dht_storageitem_signature_encode(&tmpsig, &tmpsig_len, salt, seq, v)) != KS_STATUS_SUCCESS) goto done; + + if (crypto_sign_detached(sig->sig, NULL, tmpsig, tmpsig_len, sk->key) != 0) { + ret = KS_STATUS_FAIL; + goto done; + } + //ks_log(KS_LOG_DEBUG, "Signed: %s\n", ks_dht_hex(sig->sig, buf1, KS_DHT_STORAGEITEM_SIGNATURE_SIZE)); + + done: + if (tmpsig) free(tmpsig); + return ret; +} + +KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate(ks_dht_storageitem_signature_t *sig, + ks_dht_storageitem_skey_t *sk, + const uint8_t *salt, + ks_size_t salt_length, + int64_t sequence, + const uint8_t *value, + ks_size_t value_length) +{ + struct bencode *s = NULL; + struct bencode *seq = NULL; + struct bencode *v = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(sig); + ks_assert(sk); + ks_assert(sequence > 0); + ks_assert(value); + ks_assert(value_length > 0); + + if (salt && salt_length > 0) s = ben_blob(salt, salt_length); + seq = ben_int(sequence); + v = ben_blob(value, value_length); + + ret = ks_dht_storageitem_signature_generate_internal(sig, sk, s, seq, v); + + if (s) ben_free(s); + ben_free(seq); + ben_free(v); + + return ret; +} + +KS_DECLARE(ks_bool_t) ks_dht_storageitem_signature_verify(ks_dht_storageitem_signature_t *sig, + ks_dht_storageitem_pkey_t *pk, + struct bencode *salt, + struct bencode *seq, + struct bencode *v) +{ + uint8_t *tmpsig = NULL; + size_t tmpsig_len = 0; + int32_t res = 0; + + ks_assert(sig); + ks_assert(pk); + ks_assert(seq); + ks_assert(v); + + if (ks_dht_storageitem_signature_encode(&tmpsig, &tmpsig_len, salt, seq, v) != KS_STATUS_SUCCESS) return KS_FALSE; + + res = crypto_sign_verify_detached(sig->sig, tmpsig, tmpsig_len, pk->key); + + if (tmpsig) free(tmpsig); + + return res == 0; +} + KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) { char buf[KS_DHT_DATAGRAM_BUFFER_SIZE + 1]; @@ -1288,7 +1453,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me memcpy(query, qv, qv_len); query[qv_len] = '\0'; - ks_log(KS_LOG_DEBUG, "Message query is '%s'\n", query); + //ks_log(KS_LOG_DEBUG, "Message query is '%s'\n", query); a = ben_dict_get_by_str(message->data, "a"); if (!a) { @@ -1302,7 +1467,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; message->args_id = *id; - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE)); if ((ret = ks_dhtrt_create_node(message->endpoint->node->table, *id, KS_DHT_REMOTE, @@ -1347,7 +1512,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t if ((ret = ks_dht_utility_extract_nodeid(message->args, "id", &id)) != KS_STATUS_SUCCESS) goto done; message->args_id = *id; - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(id, id_buf)); + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE)); if ((ret = ks_dhtrt_create_node(message->endpoint->node->table, *id, KS_DHT_REMOTE, @@ -1356,7 +1521,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t &node)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_release_node(node)) != KS_STATUS_SUCCESS) goto done; - ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hexid(id, id_buf)); + ks_log(KS_LOG_DEBUG, "Touching node %s\n", ks_dht_hex(id->id, id_buf, KS_DHT_NODEID_SIZE)); if ((ret = ks_dhtrt_touch_node(message->endpoint->node->table, *id)) != KS_STATUS_SUCCESS) goto done; @@ -1377,9 +1542,10 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t transaction->job->raddr.port); } else { transaction->job->response = message; - transaction->job->state = KS_DHT_JOB_STATE_PROCESSING; message->transaction = transaction; if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING; + else transaction->job->state = KS_DHT_JOB_STATE_COMPLETING; + transaction->job->response = NULL; // message is destroyed after we return, stop using it transaction->finished = KS_TRUE; } @@ -1502,6 +1668,46 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, return ret; } +KS_DECLARE(void) ks_dht_storageitems_read_lock(ks_dht_t *dht) +{ + ks_assert(dht); + ks_hash_read_lock(dht->storageitems_hash); +} + +KS_DECLARE(void) ks_dht_storageitems_read_unlock(ks_dht_t *dht) +{ + ks_assert(dht); + ks_hash_read_unlock(dht->storageitems_hash); +} + +KS_DECLARE(void) ks_dht_storageitems_write_lock(ks_dht_t *dht) +{ + ks_assert(dht); + ks_hash_write_lock(dht->storageitems_hash); +} + +KS_DECLARE(void) ks_dht_storageitems_write_unlock(ks_dht_t *dht) +{ + ks_assert(dht); + ks_hash_write_lock(dht->storageitems_hash); +} + +KS_DECLARE(ks_dht_storageitem_t *) ks_dht_storageitems_find(ks_dht_t *dht, ks_dht_nodeid_t *target) +{ + ks_assert(dht); + ks_assert(target); + + return ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED); +} + +KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storageitem_t *item) +{ + ks_assert(dht); + ks_assert(item); + + return ks_hash_insert(dht->storageitems_hash, item->id.id, item); +} + KS_DECLARE(ks_status_t) ks_dht_error(ks_dht_t *dht, ks_dht_endpoint_t *ep, @@ -1632,45 +1838,45 @@ KS_DECLARE(void) ks_dht_jobs_add(ks_dht_t *dht, ks_dht_job_t *job) KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht) { + ks_dht_job_t *first = NULL; + ks_dht_job_t *last = NULL; + ks_assert(dht); ks_mutex_lock(dht->jobs_mutex); for (ks_dht_job_t *job = dht->jobs_first, *jobn = NULL, *jobp = NULL; job; job = jobn) { - ks_bool_t remove = KS_FALSE; + ks_bool_t done = KS_FALSE; jobn = job->next; - switch (job->state) { - case KS_DHT_JOB_STATE_QUERYING: + if (job->state == KS_DHT_JOB_STATE_QUERYING) { job->state = KS_DHT_JOB_STATE_RESPONDING; if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING; - break; - case KS_DHT_JOB_STATE_RESPONDING: - break; - case KS_DHT_JOB_STATE_EXPIRING: + } + if (job->state == KS_DHT_JOB_STATE_EXPIRING) { job->attempts--; if (job->attempts > 0) job->state = KS_DHT_JOB_STATE_QUERYING; - else { - if (job->finish_callback) job->finish_callback(dht, job); - remove = KS_TRUE; - } - break; - case KS_DHT_JOB_STATE_PROCESSING: - break; - case KS_DHT_JOB_STATE_COMPLETING: - if (job->finish_callback) job->finish_callback(dht, job); - remove = KS_TRUE; - break; - default: break; + else done = KS_TRUE; } + if (job->state == KS_DHT_JOB_STATE_COMPLETING) done = KS_TRUE; - if (remove) { + if (done) { if (!jobp && !jobn) dht->jobs_first = dht->jobs_last = NULL; else if (!jobp) dht->jobs_first = jobn; else if (!jobn) dht->jobs_last = jobp; else jobp->next = jobn; - ks_dht_job_destroy(&job); + + job->next = NULL; + if (last) last = last->next = job; + else first = last = job; } else jobp = job; } ks_mutex_unlock(dht->jobs_mutex); + + for (ks_dht_job_t *job = first, *jobn = NULL; job; job = jobn) { + jobn = job->next; + // this cannot occur inside of the main loop, may add new jobs invalidating list pointers + if (job->finish_callback) job->finish_callback(dht, job); + ks_dht_job_destroy(&job); + } } KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback) @@ -1681,6 +1887,8 @@ KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, k ks_assert(dht); ks_assert(raddr); + //ks_log(KS_LOG_DEBUG, "Starting ping!\n"); + if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done; ks_dht_job_build_ping(job, ks_dht_query_ping, callback); ks_dht_jobs_add(dht, job); @@ -1707,7 +1915,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job) &message, NULL)) != KS_STATUS_SUCCESS) goto done; - ks_log(KS_LOG_DEBUG, "Sending message query ping\n"); + //ks_log(KS_LOG_DEBUG, "Sending message query ping\n"); ks_q_push(dht->send_q, (void *)message); done: @@ -1723,7 +1931,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ ks_assert(message); ks_assert(message->args); - ks_log(KS_LOG_DEBUG, "Message query ping is valid\n"); + //ks_log(KS_LOG_DEBUG, "Message query ping is valid\n"); if ((ret = ks_dht_response_setup(dht, message->endpoint, @@ -1733,7 +1941,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ &response, NULL)) != KS_STATUS_SUCCESS) goto done; - ks_log(KS_LOG_DEBUG, "Sending message response ping\n"); + //ks_log(KS_LOG_DEBUG, "Sending message response ping\n"); ks_q_push(dht->send_q, (void *)response); done: @@ -1747,9 +1955,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_job_t ks_assert(dht); ks_assert(job); - ks_log(KS_LOG_DEBUG, "Message response ping is reached\n"); - - job->state = KS_DHT_JOB_STATE_COMPLETING; + //ks_log(KS_LOG_DEBUG, "Message response ping is reached\n"); // done: return ret; @@ -1805,7 +2011,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job) ben_dict_set(a, ben_blob("want", 4), want); } - ks_log(KS_LOG_DEBUG, "Sending message query find_node\n"); + //ks_log(KS_LOG_DEBUG, "Sending message query find_node\n"); ks_q_push(dht->send_q, (void *)message); done: @@ -1850,7 +2056,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess want6 = message->raddr.family == AF_INET6; } - ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n"); + //ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n"); query.nodeid = *target; @@ -1869,7 +2075,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess &buffer4_length, sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done; - ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); + ks_log(KS_LOG_DEBUG, + "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port); } ks_dhtrt_release_querynodes(&query); } @@ -1886,7 +2093,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess &buffer6_length, sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done; - ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); + ks_log(KS_LOG_DEBUG, + "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port); } ks_dhtrt_release_querynodes(&query); } @@ -1902,7 +2110,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length)); if (want6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length)); - ks_log(KS_LOG_DEBUG, "Sending message response find_node\n"); + //ks_log(KS_LOG_DEBUG, "Sending message response find_node\n"); ks_q_push(dht->send_q, (void *)response); done: @@ -1964,11 +2172,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j ks_log(KS_LOG_DEBUG, "Expanded ipv4 nodeinfo for %s (%s %d)\n", - ks_dht_hexid(&nid, id_buf), + ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE), addr.host, addr.port); - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE)); ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); job->response_nodes[job->response_nodes_count++] = node; @@ -2000,9 +2208,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j ks_log(KS_LOG_DEBUG, "Set closer node id %s (%s) in search of target id %s at results index %d\n", - ks_dht_hexid(&nid, id_buf), - ks_dht_hexid(&distance, id2_buf), - ks_dht_hexid(&search->target, id3_buf), + ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE), + ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE), + ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE), results_index); search->results[results_index] = nid; search->distances[results_index] = distance; @@ -2026,11 +2234,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j ks_log(KS_LOG_DEBUG, "Expanded ipv6 nodeinfo for %s (%s %d)\n", - ks_dht_hexid(&nid, id_buf), + ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE), addr.host, addr.port); - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE)); ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); job->response_nodes6[job->response_nodes6_count++] = node; @@ -2062,9 +2270,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j ks_log(KS_LOG_DEBUG, "Set closer node id %s (%s) in search of target id %s at results index %d\n", - ks_dht_hexid(&nid, id_buf), - ks_dht_hexid(&distance, id2_buf), - ks_dht_hexid(&search->target, id3_buf), + ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE), + ks_dht_hex(distance.id, id2_buf, KS_DHT_NODEID_SIZE), + ks_dht_hex(search->target.id, id3_buf, KS_DHT_NODEID_SIZE), results_index); search->results[results_index] = nid; search->distances[results_index] = distance; @@ -2079,9 +2287,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j } } - ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n"); - - job->state = KS_DHT_JOB_STATE_COMPLETING; + //ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n"); done: if(search) ks_mutex_unlock(search->mutex); @@ -2130,7 +2336,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job) // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE)); - ks_log(KS_LOG_DEBUG, "Sending message query get\n"); + //ks_log(KS_LOG_DEBUG, "Sending message query get\n"); ks_q_push(dht->send_q, (void *)message); return KS_STATUS_SUCCESS; @@ -2163,7 +2369,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t seq = ben_dict_get_by_str(message->args, "seq"); if (seq) sequence = ben_int_val(seq); - ks_log(KS_LOG_DEBUG, "Message query get is valid\n"); + //ks_log(KS_LOG_DEBUG, "Message query get is valid\n"); ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token); @@ -2190,7 +2396,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t &buffer4_length, sizeof(buffer4))) != KS_STATUS_SUCCESS) goto done; - ks_log(KS_LOG_DEBUG, "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); + ks_log(KS_LOG_DEBUG, + "Compacted ipv4 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port); } ks_dhtrt_release_querynodes(&query); } @@ -2207,7 +2414,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t &buffer6_length, sizeof(buffer6))) != KS_STATUS_SUCCESS) goto done; - ks_log(KS_LOG_DEBUG, "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hexid(&qn->nodeid, id_buf), qn->addr.host, qn->addr.port); + ks_log(KS_LOG_DEBUG, + "Compacted ipv6 nodeinfo for %s (%s %d)\n", ks_dht_hex(qn->nodeid.id, id_buf, KS_DHT_NODEID_SIZE), qn->addr.host, qn->addr.port); } ks_dhtrt_release_querynodes(&query); } @@ -2225,7 +2433,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t if (item) { if (item->mutable) { if (!sequence_snuffed) { - ben_dict_set(r, ben_blob("k", 1), ben_blob(item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE)); + ben_dict_set(r, ben_blob("k", 1), ben_blob(item->pk.key, KS_DHT_STORAGEITEM_PKEY_SIZE)); ben_dict_set(r, ben_blob("sig", 3), ben_blob(item->sig.sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE)); } ben_dict_set(r, ben_blob("seq", 3), ben_int(item->seq)); @@ -2235,7 +2443,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t if (dht->rt_ipv4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length)); if (dht->rt_ipv6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length)); - ks_log(KS_LOG_DEBUG, "Sending message response get\n"); + //ks_log(KS_LOG_DEBUG, "Sending message response get\n"); ks_q_push(dht->send_q, (void *)response); done: @@ -2246,7 +2454,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t { ks_dht_storageitem_t *item = NULL; ks_dht_token_t *token = NULL; - ks_dht_storageitem_key_t *k = NULL; + ks_dht_storageitem_pkey_t *k = NULL; ks_dht_storageitem_signature_t *sig = NULL; struct bencode *seq; int64_t sequence = -1; @@ -2271,7 +2479,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done; job->response_token = *token; - if ((ret = ks_dht_utility_extract_storageitem_key(job->response->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_utility_extract_storageitem_pkey(job->response->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dht_utility_extract_storageitem_signature(job->response->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done; seq = ben_dict_get_by_str(job->response->args, "seq"); @@ -2297,7 +2505,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t nodes6_size = ben_str_len(n); } - ks_log(KS_LOG_DEBUG, "Message response get is reached\n"); + //ks_log(KS_LOG_DEBUG, "Message response get is reached\n"); while (nodes_len < nodes_size) { ks_dht_nodeid_t nid; @@ -2308,11 +2516,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t ks_log(KS_LOG_DEBUG, "Expanded ipv4 nodeinfo for %s (%s %d)\n", - ks_dht_hexid(&nid, id_buf), + ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE), addr.host, addr.port); - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE)); ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); job->response_nodes[job->response_nodes_count++] = node; } @@ -2325,11 +2533,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t ks_log(KS_LOG_DEBUG, "Expanded ipv6 nodeinfo for %s (%s %d)\n", - ks_dht_hexid(&nid, id_buf), + ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE), addr.host, addr.port); - ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf)); + ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hex(nid.id, id_buf, KS_DHT_NODEID_SIZE)); ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node); job->response_nodes6[job->response_nodes6_count++] = node; } @@ -2343,72 +2551,62 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t if (!seq) { // immutable - if ((ret = ks_dht_storageitem_target_immutable(v, &tmptarget)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_storageitem_target_immutable_internal(v, &tmptarget)) != KS_STATUS_SUCCESS) goto done; if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) { ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n"); ret = KS_STATUS_FAIL; goto done; } if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); - else if ((ret = ks_dht_storageitem_create_immutable(&item, - dht->pool, - &tmptarget, - v)) != KS_STATUS_SUCCESS) goto done; + else if ((ret = ks_dht_storageitem_create_immutable_internal(&item, + dht->pool, + &tmptarget, + v, + KS_TRUE)) != KS_STATUS_SUCCESS) goto done; } else { // mutable - struct bencode *tmp = NULL; - uint8_t *tmpsig = NULL; - size_t tmpsig_len = 0; - int32_t res = 0; - - if ((ret = ks_dht_storageitem_target_mutable(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_storageitem_target_mutable_internal(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done; if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) { - ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n"); + ks_log(KS_LOG_DEBUG, "Mutable data hash does not match requested target id\n"); ret = KS_STATUS_FAIL; goto done; } - tmp = ben_dict(); - if (job->query_salt) ben_dict_set(tmp, ben_blob("salt", 4), ben_clone(job->query_salt)); - ben_dict_set(tmp, ben_blob("seq", 3), ben_clone(seq)); - ben_dict_set(tmp, ben_blob("v", 1), ben_clone(v)); - tmpsig = ben_encode(&tmpsig_len, tmp); - ben_free(tmp); - - res = crypto_sign_verify_detached(sig->sig, tmpsig, tmpsig_len, k->key); - - free(tmpsig); - - if (res) { - ks_log(KS_LOG_DEBUG, "Immutable data signature failed to verify\n"); + if (!ks_dht_storageitem_signature_verify(sig, k, job->query_salt, seq, v)) { + ks_log(KS_LOG_DEBUG, "Mutable data signature failed to verify\n"); ret = KS_STATUS_FAIL; goto done; } - + ks_log(KS_LOG_DEBUG, "Signature verified for %s\n", ks_dht_hex(tmptarget.id, id_buf, KS_DHT_NODEID_SIZE)); if (olditem) { - if (olditem->seq >= sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); - else { - ks_hash_remove(dht->storageitems_hash, olditem->id.id); - olditem = NULL; + if (olditem->seq > sequence) { + goto done; } + if (olditem->seq == sequence) { + if (ben_cmp(olditem->v, v) != 0) { + goto done; + } + } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig); + olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); } - if (!olditem && (ret = ks_dht_storageitem_create_mutable(&item, - dht->pool, - &tmptarget, - v, - k, - job->query_salt, - sequence, - sig)) != KS_STATUS_SUCCESS) goto done; + else if ((ret = ks_dht_storageitem_create_mutable_internal(&item, + dht->pool, + &tmptarget, + v, + KS_TRUE, + k, + job->query_salt, + KS_TRUE, + sequence, + sig)) != KS_STATUS_SUCCESS) goto done; } if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done; + item = ks_hash_search(dht->storageitems_hash, item->id.id, KS_UNLOCKED); } else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); if (item) job->response_storageitem = item; else if (olditem) job->response_storageitem = olditem; - job->state = KS_DHT_JOB_STATE_COMPLETING; - done: if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash); if (ret != KS_STATUS_SUCCESS) { @@ -2417,22 +2615,25 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t return ret; } +// @todo add a public function to add storageitem_t's to the store before calling this for authoring new data, reuse function in the "get" handlers +// @todo add reference counting system to storageitem_t to know what to keep alive with reannouncements versus allowing to expire KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, - ks_dht_nodeid_t *target, - uint8_t *salt, - ks_size_t salt_length) + ks_dht_token_t *token, + int64_t cas, + ks_dht_storageitem_t *item) { ks_dht_job_t *job = NULL; ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(raddr); - ks_assert(target); + ks_assert(token); + ks_assert(item); if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done; - ks_dht_job_build_put(job, ks_dht_query_put, callback, target, salt, salt_length); + ks_dht_job_build_put(job, ks_dht_query_put, callback, token, cas, item); ks_dht_jobs_add(dht, job); done: @@ -2455,9 +2656,17 @@ KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job) &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (job->query_storageitem->mutable) { + if (job->query_cas > 0) ben_dict_set(a, ben_blob("cas", 3), ben_int(job->query_cas)); + ben_dict_set(a, ben_blob("k", 1), ben_blob(job->query_storageitem->pk.key, KS_DHT_STORAGEITEM_PKEY_SIZE)); + if (job->query_storageitem->salt) ben_dict_set(a, ben_blob("salt", 4), ben_clone(job->query_storageitem->salt)); + ben_dict_set(a, ben_blob("seq", 3), ben_int(job->query_storageitem->seq)); + ben_dict_set(a, ben_blob("sig", 3), ben_blob(job->query_storageitem->sig.sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE)); + } + ben_dict_set(a, ben_blob("token", 5), ben_blob(job->query_token.token, KS_DHT_TOKEN_SIZE)); + ben_dict_set(a, ben_blob("v", 1), ben_clone(job->query_storageitem->v)); - - ks_log(KS_LOG_DEBUG, "Sending message query put\n"); + //ks_log(KS_LOG_DEBUG, "Sending message query put\n"); ks_q_push(dht->send_q, (void *)message); return KS_STATUS_SUCCESS; @@ -2465,6 +2674,20 @@ KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job) KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message) { + ks_dht_token_t *token = NULL; + ks_dht_storageitem_pkey_t *k = NULL; + ks_dht_storageitem_signature_t *sig = NULL; + struct bencode *salt = NULL; + struct bencode *seq = NULL; + int64_t sequence = -1; + struct bencode *cas = NULL; + int64_t cas_seq = -1; + struct bencode *v = NULL; + //ks_size_t v_len = 0; + ks_bool_t storageitems_locked = KS_FALSE; + ks_dht_storageitem_t *item = NULL; + ks_dht_storageitem_t *olditem = NULL; + ks_dht_nodeid_t target; ks_dht_message_t *response = NULL; struct bencode *r = NULL; ks_status_t ret = KS_STATUS_SUCCESS; @@ -2473,7 +2696,99 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t ks_assert(message); ks_assert(message->args); - ks_log(KS_LOG_DEBUG, "Message query put is valid\n"); + + if ((ret = ks_dht_utility_extract_token(message->args, "token", &token)) != KS_STATUS_SUCCESS) goto done; + + if ((ret = ks_dht_utility_extract_storageitem_pkey(message->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_utility_extract_storageitem_signature(message->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done; + + salt = ben_dict_get_by_str(message->args, "salt"); + + seq = ben_dict_get_by_str(message->args, "seq"); + if (seq) sequence = ben_int_val(seq); + + cas = ben_dict_get_by_str(message->args, "cas"); + if (cas) cas_seq = ben_int_val(cas); + + if (seq && (!k || !sig)) { + ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data\n"); + ret = KS_STATUS_ARG_INVALID; + goto done; + } + + v = ben_dict_get_by_str(message->args, "v"); + if (!v) { + ks_log(KS_LOG_DEBUG, "Must provide v\n"); + ret = KS_STATUS_ARG_INVALID; + goto done; + } + //v_len = ben_str_len(v); + + if (!seq) { + // immutable + if ((ret = ks_dht_storageitem_target_immutable_internal(v, &target)) != KS_STATUS_SUCCESS) goto done; + } else { + // mutable + if ((ret = ks_dht_storageitem_target_mutable_internal(k, salt, &target)) != KS_STATUS_SUCCESS) goto done; + } + olditem = ks_hash_search(dht->storageitems_hash, target.id, KS_UNLOCKED); + + if (!ks_dht_token_verify(dht, &message->raddr, &target, token)) { + ks_log(KS_LOG_DEBUG, "Invalid token\n"); + ret = KS_STATUS_FAIL; + goto done; + } + + //ks_log(KS_LOG_DEBUG, "Message query put is valid\n"); + + ks_hash_write_lock(dht->storageitems_hash); + storageitems_locked = KS_TRUE; + + if (!seq) { + // immutable + if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); + else if ((ret = ks_dht_storageitem_create_immutable_internal(&item, + dht->pool, + &target, + v, + KS_TRUE)) != KS_STATUS_SUCCESS) goto done; + } else { + // mutable + if (!ks_dht_storageitem_signature_verify(sig, k, salt, seq, v)) { + ks_log(KS_LOG_DEBUG, "Mutable data signature failed to verify\n"); + ret = KS_STATUS_FAIL; + goto done; + } + + if (olditem) { + if (cas && olditem->seq != cas_seq) { + // @todo send 301 error instead of the response + goto done; + } + if (olditem->seq > sequence) { + // @todo send 302 error instead of the response + goto done; + } + if (olditem->seq == sequence) { + if (ben_cmp(olditem->v, v) != 0) { + // @todo send 201? error instead of the response + goto done; + } + } else ks_dht_storageitem_update_mutable(olditem, v, sequence, sig); + olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); + } + else if ((ret = ks_dht_storageitem_create_mutable_internal(&item, + dht->pool, + &target, + v, + KS_TRUE, + k, + salt, + KS_TRUE, + sequence, + sig)) != KS_STATUS_SUCCESS) goto done; + } + if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dht_response_setup(dht, message->endpoint, @@ -2483,10 +2798,14 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t &response, &r)) != KS_STATUS_SUCCESS) goto done; - ks_log(KS_LOG_DEBUG, "Sending message response put\n"); + //ks_log(KS_LOG_DEBUG, "Sending message response put\n"); ks_q_push(dht->send_q, (void *)response); done: + if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash); + if (ret != KS_STATUS_SUCCESS) { + if (item) ks_dht_storageitem_destroy(&item); + } return ret; } @@ -2499,8 +2818,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t ks_log(KS_LOG_DEBUG, "Message response put is reached\n"); - job->state = KS_DHT_JOB_STATE_COMPLETING; - // done: return ret; } diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index aacd74b43c..7a2caca643 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -29,11 +29,12 @@ KS_BEGIN_EXTERN_C #define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20 #define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256 -#define KS_DHT_TRANSACTION_EXPIRATION 30 +#define KS_DHT_TRANSACTION_EXPIRATION 10 #define KS_DHT_SEARCH_EXPIRATION 10 #define KS_DHT_SEARCH_RESULTS_MAX_SIZE 8 // @todo replace with KS_DHTRT_BUCKET_SIZE -#define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES +#define KS_DHT_STORAGEITEM_PKEY_SIZE crypto_sign_PUBLICKEYBYTES +#define KS_DHT_STORAGEITEM_SKEY_SIZE crypto_sign_SECRETKEYBYTES #define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64 #define KS_DHT_STORAGEITEM_SIGNATURE_SIZE crypto_sign_BYTES #define KS_DHT_STORAGEITEM_EXPIRATION 7200 @@ -48,7 +49,8 @@ typedef struct ks_dht_datagram_s ks_dht_datagram_t; typedef struct ks_dht_job_s ks_dht_job_t; typedef struct ks_dht_nodeid_s ks_dht_nodeid_t; typedef struct ks_dht_token_s ks_dht_token_t; -typedef struct ks_dht_storageitem_key_s ks_dht_storageitem_key_t; +typedef struct ks_dht_storageitem_pkey_s ks_dht_storageitem_pkey_t; +typedef struct ks_dht_storageitem_skey_s ks_dht_storageitem_skey_t; typedef struct ks_dht_storageitem_signature_s ks_dht_storageitem_signature_t; typedef struct ks_dht_message_s ks_dht_message_t; typedef struct ks_dht_endpoint_s ks_dht_endpoint_t; @@ -103,7 +105,6 @@ enum ks_dht_job_state_t { KS_DHT_JOB_STATE_QUERYING, KS_DHT_JOB_STATE_RESPONDING, KS_DHT_JOB_STATE_EXPIRING, - KS_DHT_JOB_STATE_PROCESSING, KS_DHT_JOB_STATE_COMPLETING, }; @@ -133,6 +134,9 @@ struct ks_dht_job_s { // job specific query parameters ks_dht_nodeid_t query_target; struct bencode *query_salt; + int64_t query_cas; + ks_dht_token_t query_token; + ks_dht_storageitem_t *query_storageitem; // job specific response parameters ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE]; @@ -158,8 +162,12 @@ struct ks_dhtrt_querynodes_s { ks_dht_node_t* nodes[ KS_DHTRT_MAXQUERYSIZE ]; /* out: array of peers (ks_dht_node_t* nodes[incount]) */ }; -struct ks_dht_storageitem_key_s { - uint8_t key[KS_DHT_STORAGEITEM_KEY_SIZE]; +struct ks_dht_storageitem_pkey_s { + uint8_t key[KS_DHT_STORAGEITEM_PKEY_SIZE]; +}; + +struct ks_dht_storageitem_skey_s { + uint8_t key[KS_DHT_STORAGEITEM_SKEY_SIZE]; }; struct ks_dht_storageitem_signature_s { @@ -225,8 +233,9 @@ struct ks_dht_storageitem_s { struct bencode *v; ks_bool_t mutable; - ks_dht_storageitem_key_t pk; - ks_dht_storageitem_key_t sk; + ks_mutex_t *mutex; + ks_dht_storageitem_pkey_t pk; + ks_dht_storageitem_skey_t sk; struct bencode *salt; int64_t seq; ks_dht_storageitem_signature_t sig; @@ -361,14 +370,88 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid */ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout); + +KS_DECLARE(char *) ks_dht_hex(const uint8_t *data, char *buffer, ks_size_t len); +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(const uint8_t *value, ks_size_t value_length, ks_dht_nodeid_t *target); + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_pkey_t *pk, const uint8_t *salt, ks_size_t salt_length, ks_dht_nodeid_t *target); + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_storageitem_signature_generate(ks_dht_storageitem_signature_t *sig, + ks_dht_storageitem_skey_t *sk, + const uint8_t *salt, + ks_size_t salt_length, + int64_t sequence, + const uint8_t *value, + ks_size_t value_length); + +/** + * + */ +KS_DECLARE(void) ks_dht_storageitems_read_lock(ks_dht_t *dht); + +/** + * + */ +KS_DECLARE(void) ks_dht_storageitems_read_unlock(ks_dht_t *dht); + +/** + * + */ +KS_DECLARE(void) ks_dht_storageitems_write_lock(ks_dht_t *dht); + +/** + * + */ +KS_DECLARE(void) ks_dht_storageitems_write_unlock(ks_dht_t *dht); + +/** + * + */ +KS_DECLARE(ks_dht_storageitem_t *) ks_dht_storageitems_find(ks_dht_t *dht, ks_dht_nodeid_t *target); + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_storageitems_insert(ks_dht_t *dht, ks_dht_storageitem_t *item); + +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback); + +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target); + +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target, uint8_t *salt, ks_size_t salt_length); + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht, + const ks_sockaddr_t *raddr, + ks_dht_job_callback_t callback, + ks_dht_token_t *token, + int64_t cas, + ks_dht_storageitem_t *item); /** * Create a network search of the closest nodes to a target. @@ -391,34 +474,6 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_search_t **search); - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_create(ks_dht_message_t **message, - ks_pool_t *pool, - ks_dht_endpoint_t *endpoint, - const ks_sockaddr_t *raddr, - ks_bool_t alloc_data); -/** - * - */ -KS_DECLARE(void) ks_dht_message_destroy(ks_dht_message_t **message); - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length); - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, - uint8_t *transactionid, - ks_size_t transactionid_length, - struct bencode **args); - - /** * route table methods * diff --git a/libs/libks/src/dht/ks_dht_job.c b/libs/libks/src/dht/ks_dht_job.c index d92b5a62a7..97e30b460d 100644 --- a/libs/libks/src/dht/ks_dht_job.c +++ b/libs/libks/src/dht/ks_dht_job.c @@ -78,18 +78,20 @@ KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job, KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback, - ks_dht_nodeid_t *target, - uint8_t *salt, - ks_size_t salt_length) + ks_dht_token_t *token, + int64_t cas, + ks_dht_storageitem_t *item) { ks_assert(job); ks_assert(query_callback); - ks_assert(target); + ks_assert(token); + ks_assert(item); job->query_callback = query_callback; job->finish_callback = finish_callback; - job->query_target = *target; - if (salt && salt_length > 0) job->query_salt = ben_blob(salt, salt_length); + job->query_token = *token; + job->query_cas = cas; + job->query_storageitem = item; } KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job) diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index e9385cb74b..a88240f652 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -105,7 +105,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui memcpy(message->type, yv, yv_len); message->type[yv_len] = '\0'; - ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", message->type); + //ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", message->type); return KS_STATUS_SUCCESS; } diff --git a/libs/libks/src/dht/ks_dht_search.c b/libs/libks/src/dht/ks_dht_search.c index 3dc838545c..31e62062b0 100644 --- a/libs/libks/src/dht/ks_dht_search.c +++ b/libs/libks/src/dht/ks_dht_search.c @@ -44,9 +44,9 @@ KS_DECLARE(void) ks_dht_search_destroy(ks_dht_search_t **search) if (s->pending) { for (it = ks_hash_first(s->pending, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - ks_dht_search_pending_t *val; - - ks_hash_this_val(it, (void **)&val); + const void *key = NULL; + ks_dht_search_pending_t *val = NULL; + ks_hash_this(it, &key, NULL, (void **)&val); ks_dht_search_pending_destroy(&val); } ks_hash_destroy(&s->pending); diff --git a/libs/libks/src/dht/ks_dht_storageitem.c b/libs/libks/src/dht/ks_dht_storageitem.c index f34783c28f..42bffa0319 100644 --- a/libs/libks/src/dht/ks_dht_storageitem.c +++ b/libs/libks/src/dht/ks_dht_storageitem.c @@ -2,7 +2,11 @@ #include "ks_dht-int.h" #include "sodium.h" -KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, struct bencode *v) +KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable_internal(ks_dht_storageitem_t **item, + ks_pool_t *pool, + ks_dht_nodeid_t *target, + struct bencode *v, + ks_bool_t clone_v) { ks_dht_storageitem_t *si; ks_status_t ret = KS_STATUS_SUCCESS; @@ -19,15 +23,77 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t si->id = *target; si->mutable = KS_FALSE; si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); - si->v = ben_clone(v); + si->v = clone_v ? ben_clone(v) : v; ks_assert(si->v); - //enc = ben_encode(&enc_len, si->v); - //ks_assert(enc); - //SHA1_Init(&sha); - //SHA1_Update(&sha, enc, enc_len); - //SHA1_Final(si->id.id, &sha); - //free(enc); + // done: + if (ret != KS_STATUS_SUCCESS) { + if (si) ks_dht_storageitem_destroy(item); + } + return ret; +} + +KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, + ks_pool_t *pool, + ks_dht_nodeid_t *target, + const uint8_t *value, + ks_size_t value_length) +{ + struct bencode *v = NULL; + + ks_assert(item); + ks_assert(pool); + ks_assert(value); + ks_assert(value_length > 0); + ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE); + + v = ben_blob(value, value_length); + ks_assert(v); + + return ks_dht_storageitem_create_immutable_internal(item, pool, target, v, KS_FALSE); +} + +KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable_internal(ks_dht_storageitem_t **item, + ks_pool_t *pool, + ks_dht_nodeid_t *target, + struct bencode *v, + ks_bool_t clone_v, + ks_dht_storageitem_pkey_t *pk, + struct bencode *salt, + ks_bool_t clone_salt, + int64_t sequence, + ks_dht_storageitem_signature_t *signature) +{ + ks_dht_storageitem_t *si; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(item); + ks_assert(pool); + ks_assert(v); + ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE); + ks_assert(pk); + ks_assert(signature); + + *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t)); + ks_assert(si); + + si->pool = pool; + si->id = *target; + si->mutable = KS_TRUE; + si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); + si->v = clone_v ? ben_clone(v) : v; + ks_assert(si->v); + + ks_mutex_create(&si->mutex, KS_MUTEX_FLAG_DEFAULT, si->pool); + ks_assert(si->mutex); + + si->pk = *pk; + if (salt) { + si->salt = clone_salt ? ben_clone(salt) : salt; + ks_assert(si->salt); + } + si->seq = sequence; + si->sig = *signature; // done: if (ret != KS_STATUS_SUCCESS) { @@ -39,45 +105,43 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, - struct bencode *v, - ks_dht_storageitem_key_t *k, - struct bencode *salt, + const uint8_t *value, + ks_size_t value_length, + ks_dht_storageitem_pkey_t *pk, + const uint8_t *salt, + ks_size_t salt_length, int64_t sequence, ks_dht_storageitem_signature_t *signature) { - ks_dht_storageitem_t *si; - ks_status_t ret = KS_STATUS_SUCCESS; + struct bencode *v = NULL; + struct bencode *s = NULL; ks_assert(item); ks_assert(pool); - ks_assert(v); + ks_assert(value); + ks_assert(value_length > 0); ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE); - ks_assert(k); + ks_assert(pk); ks_assert(signature); - *item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t)); - ks_assert(si); + v = ben_blob(value, value_length); + if (salt && salt_length > 0) s = ben_blob(salt, salt_length); + return ks_dht_storageitem_create_mutable_internal(item, pool, target, v, KS_FALSE, pk, s, KS_FALSE, sequence, signature); +} - si->pool = pool; - si->id = *target; - si->mutable = KS_TRUE; - si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC); - si->v = ben_clone(v); - ks_assert(si->v); +KS_DECLARE(void) ks_dht_storageitem_update_mutable(ks_dht_storageitem_t *item, struct bencode *v, int64_t sequence, ks_dht_storageitem_signature_t *signature) +{ + ks_assert(item); + ks_assert(v); + ks_assert(sequence); + ks_assert(signature); - si->pk = *k; - if (salt) { - si->salt = ben_clone(salt); - ks_assert(si->salt); - } - si->seq = sequence; - si->sig = *signature; - - // done: - if (ret != KS_STATUS_SUCCESS) { - if (si) ks_dht_storageitem_destroy(item); - } - return ret; + ks_mutex_lock(item->mutex); + ben_free(item->v); + item->v = ben_clone(v); + item->seq = sequence; + item->sig = *signature; + ks_mutex_unlock(item->mutex); } /** @@ -96,6 +160,7 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item) ben_free(si->v); si->v = NULL; } + if (si->mutex) ks_mutex_destroy(&si->mutex); if (si->salt) { ben_free(si->salt); si->salt = NULL; diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index 7472c41ce7..60d0d71de0 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -3,14 +3,32 @@ #include <../dht/ks_dht-int.h> #include -#define TEST_DHT1_REGISTER_TYPE_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze" -#define TEST_DHT1_PROCESS_QUERY_PING_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe" +ks_dht_storageitem_skey_t sk; +ks_dht_storageitem_pkey_t pk; -ks_status_t dht_z_callback(ks_dht_t *dht, ks_dht_message_t *message) +ks_status_t dht2_put_callback(ks_dht_t *dht, ks_dht_job_t *job) { - diag("dht_z_callback\n"); - ok(message->transactionid[0] == '4' && message->transactionid[1] == '2'); - ks_dht_error(dht, message->endpoint, &message->raddr, message->transactionid, message->transactionid_length, 201, "Generic test error"); + diag("dht2_put_callback\n"); + return KS_STATUS_SUCCESS; +} + +ks_status_t dht2_get_token_callback(ks_dht_t *dht, ks_dht_job_t *job) +{ + char buf[KS_DHT_TOKEN_SIZE * 2 + 1]; + const char *v = "Hello World!"; + size_t v_len = strlen(v); + ks_dht_storageitem_signature_t sig; + ks_dht_storageitem_t *mutable = NULL; + + diag("dht2_get_token_callback %s\n", ks_dht_hex(job->response_token.token, buf, KS_DHT_TOKEN_SIZE)); + + ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len); + // @todo check if exists + ks_dht_storageitem_create_mutable(&mutable, dht->pool, &job->query_target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig); + mutable->sk = sk; + ks_dht_storageitems_insert(dht, mutable); + + ks_dht_put(dht, &job->raddr, dht2_put_callback, &job->response_token, 0, mutable); return KS_STATUS_SUCCESS; } @@ -30,6 +48,27 @@ int main() { ks_sockaddr_t raddr1; //ks_sockaddr_t raddr2; //ks_sockaddr_t raddr3; + ks_dht_nodeid_t target; + //ks_dht_storageitem_t *immutable = NULL; + //ks_dht_storageitem_t *mutable = NULL; + //const char *v = "Hello World!"; + //size_t v_len = strlen(v); + //ks_dht_storageitem_skey_t sk; //= { { 0xe0, 0x6d, 0x31, 0x83, 0xd1, 0x41, 0x59, 0x22, 0x84, 0x33, 0xed, 0x59, 0x92, 0x21, 0xb8, 0x0b, + //0xd0, 0xa5, 0xce, 0x83, 0x52, 0xe4, 0xbd, 0xf0, 0x26, 0x2f, 0x76, 0x78, 0x6e, 0xf1, 0xc7, 0x4d, + //0xb7, 0xe7, 0xa9, 0xfe, 0xa2, 0xc0, 0xeb, 0x26, 0x9d, 0x61, 0xe3, 0xb3, 0x8e, 0x45, 0x0a, 0x22, + //0xe7, 0x54, 0x94, 0x1a, 0xc7, 0x84, 0x79, 0xd6, 0xc5, 0x4e, 0x1f, 0xaf, 0x60, 0x37, 0x88, 0x1d } }; + //ks_dht_storageitem_pkey_t pk; //= { { 0x77, 0xff, 0x84, 0x90, 0x5a, 0x91, 0x93, 0x63, 0x67, 0xc0, 0x13, 0x60, 0x80, 0x31, 0x04, 0xf9, + //0x24, 0x32, 0xfc, 0xd9, 0x04, 0xa4, 0x35, 0x11, 0x87, 0x6d, 0xf5, 0xcd, 0xf3, 0xe7, 0xe5, 0x48 } }; + //uint8_t sk1[KS_DHT_STORAGEITEM_SKEY_SIZE]; + //uint8_t pk1[KS_DHT_STORAGEITEM_PKEY_SIZE]; + //ks_dht_storageitem_signature_t sig; + //char sk_buf[KS_DHT_STORAGEITEM_SKEY_SIZE * 2 + 1]; + //char pk_buf[KS_DHT_STORAGEITEM_PKEY_SIZE * 2 + 1]; + //const char *test1vector = "3:seqi1e1:v12:Hello World!"; + //const char *test1vector = "4:salt6:foobar3:seqi1e1:v12:Hello World!"; + //size_t test1vector_len = strlen(test1vector); + //uint8_t test1vector_sig[KS_DHT_STORAGEITEM_SIGNATURE_SIZE]; + //char test1vector_buf[KS_DHT_STORAGEITEM_SIGNATURE_SIZE * 2 + 1]; err = ks_init(); ok(!err); @@ -62,9 +101,6 @@ int main() { err = ks_dht_create(&dht3, NULL, NULL); ok(err == KS_STATUS_SUCCESS); - - ks_dht_register_type(dht1, "z", dht_z_callback); - if (have_v4) { err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET); ok(err == KS_STATUS_SUCCESS); @@ -111,30 +147,9 @@ int main() { ok(err == KS_STATUS_SUCCESS); } - //diag("Custom type tests\n"); - - //buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER); - //memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen); - //dht1->recv_buffer_length = buflen; - - //err = ks_dht_process(dht1, ep1, &raddr); - //ok(err == KS_STATUS_SUCCESS); - - //ks_dht_pulse(dht1, 100); - - //ks_dht_pulse(&dht2, 100); - - - //buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER); - //memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen); - //dht1->recv_buffer_length = buflen; - - //err = ks_dht_process(dht1, &raddr); - //ok(err == KS_STATUS_SUCCESS); - + /* diag("Ping test\n"); - //ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1 ks_dht_ping(dht2, &raddr1, NULL); // (QUERYING) ks_dht_pulse(dht2, 100); // Send queued ping from dht2 to dht1 (RESPONDING) @@ -147,7 +162,7 @@ int main() { ok(ks_dhtrt_find_node(dht2->rt_ipv4, ep1->nodeid) != NULL); // The node should be good, and thus be returned as good - ks_dht_pulse(dht2, 100); // (COMPLETING) + ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING) diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up for (int i = 0; i < 10; ++i) { @@ -157,31 +172,92 @@ int main() { ks_dht_pulse(dht2, 100); } ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good + */ - // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid - - diag("Find_Node test\n"); - - ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid); - - ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1 - - ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response - - ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet - - ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1 - - ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet + //diag("Get test\n"); - diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up + + /* + ks_dht_storageitem_target_immutable((uint8_t *)v, v_len, &target); + ks_dht_storageitem_create_immutable(&immutable, dht1->pool, &target, (uint8_t *)v, v_len); + ks_dht_storageitems_insert(dht1, immutable); + */ + + /* + crypto_sign_keypair(pk.key, sk.key); + + ks_dht_storageitem_signature_generate(&sig, &sk, NULL, 0, 1, (uint8_t *)v, v_len); + ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target); + ks_dht_storageitem_create_mutable(&mutable, dht1->pool, &target, (uint8_t *)v, v_len, &pk, NULL, 0, 1, &sig); + mutable->sk = sk; + ks_dht_storageitems_insert(dht1, mutable); + + ks_dht_get(dht2, &raddr1, dht2_get_callback, &target, NULL, 0); + + ks_dht_pulse(dht2, 100); // send get query + + ks_dht_pulse(dht1, 100); // receive get query and send get response + + ks_dht_pulse(dht2, 100); // receive get response + + ok(ks_dht_storageitems_find(dht2, &target) != NULL); // item should be verified and stored + + ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING) + */ + + diag("Put test\n"); + + crypto_sign_keypair(pk.key, sk.key); + + ks_dht_storageitem_target_mutable(&pk, NULL, 0, &target); + + ks_dht_get(dht2, &raddr1, dht2_get_token_callback, &target, NULL, 0); // create job + + ks_dht_pulse(dht2, 100); // send get query + + ks_dht_pulse(dht1, 100); // receive get query and send get response + + ks_dht_pulse(dht2, 100); // receive get response + + ks_dht_pulse(dht2, 100); // Call finish callback and purge the job (COMPLETING), send put query + + ks_dht_pulse(dht1, 100); // receive put query and send put response + + ks_dht_pulse(dht2, 100); // receive put response + + ks_dht_pulse(dht2, 100); // Call finish callback and purse the job (COMPLETING) + for (int i = 0; i < 10; ++i) { //diag("DHT 1\n"); ks_dht_pulse(dht1, 100); //diag("DHT 2\n"); ks_dht_pulse(dht2, 100); } - ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good + + // Test bootstrap find_node from dht3 to dht1 to find dht2 nodeid + + //diag("Find_Node test\n"); + + //ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid); + + //ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1 + + //ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response + + //ok(ks_dhtrt_find_node(dht1->rt_ipv4, ep3->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet + + //ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1 + + //ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) == NULL); // The node should be dubious, and thus not be returned as good yet + + //diag("Pulsing for route table pings\n"); // Wait for route table pinging to catch up + //for (int i = 0; i < 10; ++i) { + //diag("DHT 1\n"); + //ks_dht_pulse(dht1, 100); + //diag("DHT 2\n"); + //ks_dht_pulse(dht2, 100); + //} + //ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); // The node should be good by now, and thus be returned as good /* Cleanup and shutdown */