FS-9775: Started working on "put", ran into a bug in job states which is fixed now, adjusted find_node response to add nodes to the job

This commit is contained in:
Shane Bryldt 2016-12-20 22:07:11 +00:00 committed by Mike Jerris
parent 4338c1b941
commit 8be2251b76
11 changed files with 480 additions and 72 deletions

View File

@ -3,7 +3,7 @@ EXTRA_DIST =
SUBDIRS = . test
AUTOMAKE_OPTIONS = subdir-objects
AM_CFLAGS += -I$(top_srcdir)/src -I$(top_srcdir)/src/include -I$(top_srcdir)/crypt
AM_CFLAGS += -I$(top_srcdir)/src -I$(top_srcdir)/src/include -I$(top_srcdir)/crypt -O0
AM_CPPFLAGS = $(AM_CFLAGS)
lib_LTLIBRARIES = libks.la

View File

@ -218,6 +218,7 @@ KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht);
KS_DECLARE(ks_status_t) ks_dht_query_ping(ks_dht_t *dht, ks_dht_job_t *job);
KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job);
KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job);
KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job);
KS_DECLARE(void *)ks_dht_process(ks_thread_t *thread, void *data);
@ -260,6 +261,18 @@ KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback,
ks_dht_nodeid_t *target);
KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback,
ks_dht_nodeid_t *target,
uint8_t *salt,
ks_size_t salt_length);
KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback,
ks_dht_nodeid_t *target,
uint8_t *salt,
ks_size_t salt_length);
KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job);
@ -290,13 +303,13 @@ KS_DECLARE(void) ks_dht_search_pending_destroy(ks_dht_search_pending_t **pending
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v);
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, struct bencode *v);
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
ks_pool_t *pool,
ks_dht_nodeid_t *target,
struct bencode *v,
ks_dht_storageitem_key_t *k,
uint8_t *salt,
ks_size_t salt_length,
struct bencode *salt,
int64_t sequence,
ks_dht_storageitem_signature_t *signature);
KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item);

View File

@ -102,7 +102,7 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
/**
* Default expirations to not be checked for one pulse.
*/
d->pulse_expirations = ks_time_now() + (KS_DHT_PULSE_EXPIRATIONS * 1000);
d->pulse_expirations = ks_time_now() + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC);
/**
* Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full.
@ -171,12 +171,12 @@ KS_DECLARE(ks_status_t) ks_dht_create(ks_dht_t **dht, ks_pool_t *pool, ks_thread
* The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets.
*/
d->token_secret_current = d->token_secret_previous = rand();
d->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
d->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
/**
* Create the hash to store arbitrary data for BEP44.
*/
ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_hash_create(&d->storageitems_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, d->pool);
ks_assert(d->storageitems_hash);
/**
@ -678,7 +678,7 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
ks_assert(dht);
if (dht->pulse_expirations > now) return;
dht->pulse_expirations = now + (KS_DHT_PULSE_EXPIRATIONS * 1000);
dht->pulse_expirations = now + ((ks_time_t)KS_DHT_PULSE_EXPIRATIONS * KS_USEC_PER_SEC);
ks_hash_write_lock(dht->transactions_hash);
for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
@ -689,9 +689,9 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
ks_hash_this(it, &key, NULL, (void **)&value);
if (value->finished) remove = KS_TRUE;
else if (value->expiration <= now) {
// if the transaction expires, so does the attached job, it may try again with a new transaction
// if the transaction expires, so does the attached job, but the job may try again with a new transaction
value->job->state = KS_DHT_JOB_STATE_EXPIRING;
ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d, %d %d\n", value->transactionid, now, value->expiration);
remove = KS_TRUE;
}
if (remove) {
@ -704,8 +704,10 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht)
if (dht->rt_ipv4) ks_dht_pulse_expirations_searches(dht, dht->searches4_hash);
if (dht->rt_ipv6) ks_dht_pulse_expirations_searches(dht, dht->searches6_hash);
// @todo storageitem keepalive and expiration (callback at half of expiration time to determine if we locally care about reannouncing?)
if (dht->token_secret_expiration && dht->token_secret_expiration <= now) {
dht->token_secret_expiration = ks_time_now() + (KS_DHT_TOKENSECRET_EXPIRATION * 1000);
dht->token_secret_expiration = ks_time_now() + ((ks_time_t)KS_DHT_TOKENSECRET_EXPIRATION * KS_USEC_PER_SEC);
dht->token_secret_previous = dht->token_secret_current;
dht->token_secret_current = rand();
}
@ -931,6 +933,82 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_key(struct bencode *args,
ks_bool_t optional,
const char *key,
ks_dht_storageitem_key_t **sikey)
{
struct bencode *k;
const char *kv;
ks_size_t kv_len;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(args);
ks_assert(key);
ks_assert(sikey);
*sikey = NULL;
k = ben_dict_get_by_str(args, key);
if (!k) {
if (!optional) {
ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
ret = KS_STATUS_ARG_INVALID;
}
goto done;
}
kv = ben_str_val(k);
kv_len = ben_str_len(k);
if (kv_len != KS_DHT_STORAGEITEM_KEY_SIZE) {
ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, kv_len);
return KS_STATUS_ARG_INVALID;
}
*sikey = (ks_dht_storageitem_key_t *)kv;
done:
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_utility_extract_storageitem_signature(struct bencode *args,
ks_bool_t optional,
const char *key,
ks_dht_storageitem_signature_t **signature)
{
struct bencode *sig;
const char *sigv;
ks_size_t sigv_len;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(args);
ks_assert(key);
ks_assert(signature);
*signature = NULL;
sig = ben_dict_get_by_str(args, key);
if (!sig) {
if (!optional) {
ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key);
ret = KS_STATUS_ARG_INVALID;
}
goto done;
}
sigv = ben_str_val(sig);
sigv_len = ben_str_len(sig);
if (sigv_len != KS_DHT_STORAGEITEM_SIGNATURE_SIZE) {
ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, sigv_len);
return KS_STATUS_ARG_INVALID;
}
*signature = (ks_dht_storageitem_signature_t *)sigv;
done:
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, const ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token)
{
@ -968,6 +1046,44 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, const ks_sockaddr_t *ra
return memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0;
}
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_immutable(struct bencode *value, ks_dht_nodeid_t *target)
{
SHA_CTX sha;
const uint8_t *v;
size_t v_len;
ks_assert(value);
ks_assert(target);
v = (const uint8_t *)ben_str_val(value);
v_len = ben_str_len(value);
if (!SHA1_Init(&sha) ||
!SHA1_Update(&sha, v, v_len) ||
!SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_storageitem_target_mutable(ks_dht_storageitem_key_t *k, struct bencode *salt, ks_dht_nodeid_t *target)
{
SHA_CTX sha;
ks_assert(k);
ks_assert(target);
if (!SHA1_Init(&sha) ||
!SHA1_Update(&sha, k->key, KS_DHT_STORAGEITEM_KEY_SIZE)) return KS_STATUS_FAIL;
if (salt) {
const uint8_t *s = (const uint8_t *)ben_str_val(salt);
size_t s_len = ben_str_len(salt);
if (s_len > 0 && !SHA1_Update(&sha, s, s_len)) return KS_STATUS_FAIL;
}
if (!SHA1_Final(target->id, &sha)) return KS_STATUS_FAIL;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message)
{
@ -1026,7 +1142,6 @@ KS_DECLARE(ks_status_t) ks_dht_query_setup(ks_dht_t *dht,
ks_mutex_unlock(dht->tid_mutex);
if ((ret = ks_dht_transaction_create(&trans, dht->pool, job, transactionid, callback)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_message_create(&msg, dht->pool, ep, &job->raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done;
// if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done;
@ -1248,6 +1363,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
tid = (uint32_t *)message->transactionid;
transactionid = ntohl(*tid);
ks_log(KS_LOG_DEBUG, "Message response transaction id %d\n", transactionid);
transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
ks_hash_read_unlock(dht->transactions_hash);
@ -1264,6 +1380,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t
transaction->job->state = KS_DHT_JOB_STATE_PROCESSING;
message->transaction = transaction;
if ((ret = transaction->callback(dht, transaction->job)) != KS_STATUS_SUCCESS) transaction->job->state = KS_DHT_JOB_STATE_EXPIRING;
transaction->job->response = NULL; // message is destroyed after we return, stop using it
transaction->finished = KS_TRUE;
}
@ -1523,6 +1640,7 @@ KS_DECLARE(void) ks_dht_pulse_jobs(ks_dht_t *dht)
jobn = job->next;
switch (job->state) {
case KS_DHT_JOB_STATE_QUERYING:
job->state = KS_DHT_JOB_STATE_RESPONDING;
if (job->query_callback && job->query_callback(dht, job) != KS_STATUS_SUCCESS) job->state = KS_DHT_JOB_STATE_EXPIRING;
break;
case KS_DHT_JOB_STATE_RESPONDING:
@ -1675,12 +1793,12 @@ KS_DECLARE(ks_status_t) ks_dht_query_findnode(ks_dht_t *dht, ks_dht_job_t *job)
&message,
&a)) != KS_STATUS_SUCCESS) goto done;
//memcpy(transaction->target.id, job->target.id, KS_DHT_NODEID_SIZE);
transaction->target = job->target;
//memcpy(transaction->target.id, job->query_target.id, KS_DHT_NODEID_SIZE);
//transaction->target = job->query_target;
ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE));
ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
// Only request both v4 and v6 if we have both interfaces bound and are looking for our own node id, aka bootstrapping
if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, job->target.id, KS_DHT_NODEID_SIZE)) {
if (dht->rt_ipv4 && dht->rt_ipv6 && !memcmp(message->endpoint->nodeid.id, job->query_target.id, KS_DHT_NODEID_SIZE)) {
struct bencode *want = ben_list();
ben_list_append_str(want, "n4");
ben_list_append_str(want, "n6");
@ -1812,13 +1930,13 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
ks_assert(job);
n = ben_dict_get_by_str(job->response->args, "nodes");
if (n) {
if (n && dht->rt_ipv4) {
//n4 = KS_TRUE;
nodes = (const uint8_t *)ben_str_val(n);
nodes_size = ben_str_len(n);
}
n = ben_dict_get_by_str(job->response->args, "nodes6");
if (n) {
if (n && dht->rt_ipv6) {
//n6 = KS_TRUE;
nodes6 = (const uint8_t *)ben_str_val(n);
nodes6_size = ben_str_len(n);
@ -1827,7 +1945,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
searches = job->response->raddr.family == AF_INET ? dht->searches4_hash : dht->searches6_hash;
ks_hash_read_lock(searches);
search = ks_hash_search(searches, job->response->transaction->target.id, KS_UNLOCKED);
search = ks_hash_search(searches, job->query_target.id, KS_UNLOCKED);
if (search) {
ks_dht_search_pending_t *pending = NULL;
@ -1852,8 +1970,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
ks_dhtrt_release_node(node);
job->response_nodes[job->response_nodes_count++] = node;
// @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6
if (search && job->response->raddr.family == AF_INET && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
ks_dht_nodeid_t distance;
int32_t results_index = -1;
@ -1913,8 +2032,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
ks_dhtrt_release_node(node);
job->response_nodes6[job->response_nodes6_count++] = node;
// @todo move search to it's own job, and make reusable for find_node and get, and others that return nodes/nodes6
if (search && job->response->raddr.family == AF_INET6 && !ks_hash_search(search->pending, nid.id, KS_UNLOCKED)) {
ks_dht_nodeid_t distance;
int32_t results_index = -1;
@ -1961,12 +2081,35 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_j
ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
job->state = KS_DHT_JOB_STATE_COMPLETING;
done:
if(search) ks_mutex_unlock(search->mutex);
return ret;
}
// @todo ks_dht_get
KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
ks_dht_nodeid_t *target,
uint8_t *salt,
ks_size_t salt_length)
{
ks_dht_job_t *job = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
ks_assert(target);
if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
ks_dht_job_build_get(job, ks_dht_query_get, callback, target, salt, salt_length);
ks_dht_jobs_add(dht, job);
done:
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
{
@ -1985,7 +2128,7 @@ KS_DECLARE(ks_status_t) ks_dht_query_get(ks_dht_t *dht, ks_dht_job_t *job)
&a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
// @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available
ben_dict_set(a, ben_blob("target", 6), ben_blob(job->target.id, KS_DHT_NODEID_SIZE));
ben_dict_set(a, ben_blob("target", 6), ben_blob(job->query_target.id, KS_DHT_NODEID_SIZE));
ks_log(KS_LOG_DEBUG, "Sending message query get\n");
ks_q_push(dht->send_q, (void *)message);
@ -2024,7 +2167,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token);
item = ks_hash_search(dht->storageitems_hash, target->id, KS_READLOCKED);
ks_hash_read_lock(dht->storageitems_hash);
item = ks_hash_search(dht->storageitems_hash, target->id, KS_UNLOCKED);
ks_hash_read_unlock(dht->storageitems_hash);
sequence_snuffed = item && sequence >= 0 && item->seq <= sequence;
@ -2100,28 +2244,224 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t
KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_job_t *job)
{
ks_dht_token_t *token;
ks_dht_storageitem_t *item = NULL;
ks_dht_token_t *token = NULL;
ks_dht_storageitem_key_t *k = NULL;
ks_dht_storageitem_signature_t *sig = NULL;
struct bencode *seq;
int64_t sequence = -1;
struct bencode *v = NULL;
//ks_size_t v_len = 0;
struct bencode *n;
ks_dht_node_t *node = NULL;
const uint8_t *nodes = NULL;
const uint8_t *nodes6 = NULL;
size_t nodes_size = 0;
size_t nodes6_size = 0;
size_t nodes_len = 0;
size_t nodes6_len = 0;
char id_buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_bool_t storageitems_locked = KS_FALSE;
ks_dht_storageitem_t *olditem = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(job);
// @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided
if ((ret = ks_dht_utility_extract_token(job->response->args, "token", &token)) != KS_STATUS_SUCCESS) goto done;
job->response_token = *token;
// @todo add extract function for mutable ks_dht_storageitem_key_t
// @todo add extract function for mutable ks_dht_storageitem_signature_t
if ((ret = ks_dht_utility_extract_storageitem_key(job->response->args, KS_TRUE, "k", &k)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dht_utility_extract_storageitem_signature(job->response->args, KS_TRUE, "sig", &sig)) != KS_STATUS_SUCCESS) goto done;
// @todo add/touch bucket entries for other nodes/nodes6 returned
seq = ben_dict_get_by_str(job->response->args, "seq");
if (seq) sequence = ben_int_val(seq);
if (seq && ((k && !sig) || (!k && sig))) {
ks_log(KS_LOG_DEBUG, "Must provide both k and sig for mutable data");
ret = KS_STATUS_ARG_INVALID;
goto done;
}
v = ben_dict_get_by_str(job->response->args, "v");
//if (v) v_len = ben_str_len(v);
n = ben_dict_get_by_str(job->response->args, "nodes");
if (n && dht->rt_ipv4) {
nodes = (const uint8_t *)ben_str_val(n);
nodes_size = ben_str_len(n);
}
n = ben_dict_get_by_str(job->response->args, "nodes6");
if (n && dht->rt_ipv6) {
nodes6 = (const uint8_t *)ben_str_val(n);
nodes6_size = ben_str_len(n);
}
ks_log(KS_LOG_DEBUG, "Message response get is reached\n");
while (nodes_len < nodes_size) {
ks_dht_nodeid_t nid;
ks_sockaddr_t addr;
addr.family = AF_INET;
if ((ret = ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG,
"Expanded ipv4 nodeinfo for %s (%s %d)\n",
ks_dht_hexid(&nid, id_buf),
addr.host,
addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_dhtrt_create_node(dht->rt_ipv4, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
job->response_nodes[job->response_nodes_count++] = node;
}
while (nodes6_len < nodes6_size) {
ks_dht_nodeid_t nid;
ks_sockaddr_t addr;
addr.family = AF_INET6;
if ((ret = ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr)) != KS_STATUS_SUCCESS) goto done;
ks_log(KS_LOG_DEBUG,
"Expanded ipv6 nodeinfo for %s (%s %d)\n",
ks_dht_hexid(&nid, id_buf),
addr.host,
addr.port);
ks_log(KS_LOG_DEBUG, "Creating node %s\n", ks_dht_hexid(&nid, id_buf));
ks_dhtrt_create_node(dht->rt_ipv6, nid, KS_DHT_REMOTE, addr.host, addr.port, &node);
job->response_nodes6[job->response_nodes6_count++] = node;
}
ks_hash_write_lock(dht->storageitems_hash);
storageitems_locked = KS_TRUE;
olditem = ks_hash_search(dht->storageitems_hash, job->query_target.id, KS_UNLOCKED);
if (v) {
ks_dht_nodeid_t tmptarget;
if (!seq) {
// immutable
if ((ret = ks_dht_storageitem_target_immutable(v, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) {
ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n");
ret = KS_STATUS_FAIL;
goto done;
}
if (olditem) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
else if ((ret = ks_dht_storageitem_create_immutable(&item,
dht->pool,
&tmptarget,
v)) != KS_STATUS_SUCCESS) goto done;
} else {
// mutable
struct bencode *tmp = NULL;
uint8_t *tmpsig = NULL;
size_t tmpsig_len = 0;
int32_t res = 0;
if ((ret = ks_dht_storageitem_target_mutable(k, job->query_salt, &tmptarget)) != KS_STATUS_SUCCESS) goto done;
if (memcmp(tmptarget.id, job->query_target.id, KS_DHT_NODEID_SIZE) != 0) {
ks_log(KS_LOG_DEBUG, "Immutable data hash does not match requested target id\n");
ret = KS_STATUS_FAIL;
goto done;
}
tmp = ben_dict();
if (job->query_salt) ben_dict_set(tmp, ben_blob("salt", 4), ben_clone(job->query_salt));
ben_dict_set(tmp, ben_blob("seq", 3), ben_clone(seq));
ben_dict_set(tmp, ben_blob("v", 1), ben_clone(v));
tmpsig = ben_encode(&tmpsig_len, tmp);
ben_free(tmp);
res = crypto_sign_verify_detached(sig->sig, tmpsig, tmpsig_len, k->key);
free(tmpsig);
if (res) {
ks_log(KS_LOG_DEBUG, "Immutable data signature failed to verify\n");
ret = KS_STATUS_FAIL;
goto done;
}
if (olditem) {
if (olditem->seq >= sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
else {
ks_hash_remove(dht->storageitems_hash, olditem->id.id);
olditem = NULL;
}
}
if (!olditem && (ret = ks_dht_storageitem_create_mutable(&item,
dht->pool,
&tmptarget,
v,
k,
job->query_salt,
sequence,
sig)) != KS_STATUS_SUCCESS) goto done;
}
if (item && (ret = ks_hash_insert(dht->storageitems_hash, item->id.id, item)) != KS_STATUS_SUCCESS) goto done;
} else if (seq && olditem && olditem->seq == sequence) olditem->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
if (item) job->response_storageitem = item;
else if (olditem) job->response_storageitem = olditem;
job->state = KS_DHT_JOB_STATE_COMPLETING;
done:
if (storageitems_locked) ks_hash_write_unlock(dht->storageitems_hash);
if (ret != KS_STATUS_SUCCESS) {
if (item) ks_dht_storageitem_destroy(&item);
}
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_put(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
ks_dht_nodeid_t *target,
uint8_t *salt,
ks_size_t salt_length)
{
ks_dht_job_t *job = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(raddr);
ks_assert(target);
if ((ret = ks_dht_job_create(&job, dht->pool, raddr, 3)) != KS_STATUS_SUCCESS) goto done;
ks_dht_job_build_put(job, ks_dht_query_put, callback, target, salt, salt_length);
ks_dht_jobs_add(dht, job);
done:
return ret;
}
// @todo ks_dht_put
// @todo ks_dht_query_put
KS_DECLARE(ks_status_t) ks_dht_query_put(ks_dht_t *dht, ks_dht_job_t *job)
{
ks_dht_message_t *message = NULL;
struct bencode *a = NULL;
ks_assert(dht);
ks_assert(job);
if (ks_dht_query_setup(dht,
job,
"put",
ks_dht_process_response_put,
NULL,
&message,
&a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
ks_log(KS_LOG_DEBUG, "Sending message query put\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message)
{
@ -2159,6 +2499,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t
ks_log(KS_LOG_DEBUG, "Message response put is reached\n");
job->state = KS_DHT_JOB_STATE_COMPLETING;
// done:
return ret;
}

View File

@ -22,6 +22,8 @@ KS_BEGIN_EXTERN_C
#define KS_DHT_NODEID_SIZE 20
#define KS_DHT_RESPONSE_NODES_MAX_SIZE 8
#define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20
#define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20
#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
@ -34,6 +36,7 @@ KS_BEGIN_EXTERN_C
#define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES
#define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64
#define KS_DHT_STORAGEITEM_SIGNATURE_SIZE crypto_sign_BYTES
#define KS_DHT_STORAGEITEM_EXPIRATION 7200
#define KS_DHT_TOKEN_SIZE SHA_DIGEST_LENGTH
#define KS_DHT_TOKENSECRET_EXPIRATION 300
@ -92,6 +95,10 @@ struct ks_dht_node_s {
ks_rwl_t *reflock;
};
struct ks_dht_token_s {
uint8_t token[KS_DHT_TOKEN_SIZE];
};
enum ks_dht_job_state_t {
KS_DHT_JOB_STATE_QUERYING,
KS_DHT_JOB_STATE_RESPONDING,
@ -124,7 +131,16 @@ struct ks_dht_job_s {
//ks_dht_nodeid_t response_id;
// job specific query parameters
ks_dht_nodeid_t target;
ks_dht_nodeid_t query_target;
struct bencode *query_salt;
// job specific response parameters
ks_dht_node_t *response_nodes[KS_DHT_RESPONSE_NODES_MAX_SIZE];
ks_size_t response_nodes_count;
ks_dht_node_t *response_nodes6[KS_DHT_RESPONSE_NODES_MAX_SIZE];
ks_size_t response_nodes6_count;
ks_dht_token_t response_token;
ks_dht_storageitem_t *response_storageitem;
};
struct ks_dhtrt_routetable_s {
@ -142,10 +158,6 @@ struct ks_dhtrt_querynodes_s {
ks_dht_node_t* nodes[ KS_DHTRT_MAXQUERYSIZE ]; /* out: array of peers (ks_dht_node_t* nodes[incount]) */
};
struct ks_dht_token_s {
uint8_t token[KS_DHT_TOKEN_SIZE];
};
struct ks_dht_storageitem_key_s {
uint8_t key[KS_DHT_STORAGEITEM_KEY_SIZE];
};
@ -181,7 +193,7 @@ struct ks_dht_transaction_s {
ks_pool_t *pool;
ks_dht_job_t *job;
uint32_t transactionid;
ks_dht_nodeid_t target; // @todo look at moving this into job now
//ks_dht_nodeid_t target; // @todo look at moving this into job now
ks_dht_job_callback_t callback;
ks_time_t expiration;
ks_bool_t finished;
@ -209,14 +221,13 @@ struct ks_dht_search_pending_s {
struct ks_dht_storageitem_s {
ks_pool_t *pool;
ks_dht_nodeid_t id;
// @todo ks_time_t expiration;
ks_time_t expiration;
struct bencode *v;
ks_bool_t mutable;
ks_dht_storageitem_key_t pk;
ks_dht_storageitem_key_t sk;
uint8_t salt[KS_DHT_STORAGEITEM_SALT_MAX_SIZE];
ks_size_t salt_length;
struct bencode *salt;
int64_t seq;
ks_dht_storageitem_signature_t sig;
};
@ -352,6 +363,12 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout);
KS_DECLARE(ks_status_t) ks_dht_ping(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback);
KS_DECLARE(ks_status_t) ks_dht_findnode(ks_dht_t *dht, const ks_sockaddr_t *raddr, ks_dht_job_callback_t callback, ks_dht_nodeid_t *target);
KS_DECLARE(ks_status_t) ks_dht_get(ks_dht_t *dht,
const ks_sockaddr_t *raddr,
ks_dht_job_callback_t callback,
ks_dht_nodeid_t *target,
uint8_t *salt,
ks_size_t salt_length);
/**
* Create a network search of the closest nodes to a target.

View File

@ -1469,6 +1469,7 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry
ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings);
#endif
ks_dht_node_t* node = entry->gptr;
ks_log(KS_LOG_DEBUG, "Node addr %s %d\n", node->addr.host, node->addr.port);
ks_dht_ping(internal->dht, &node->addr, NULL);
return;

View File

@ -38,6 +38,7 @@ KS_DECLARE(ks_status_t) ks_dht_job_create(ks_dht_job_t **job,
KS_DECLARE(void) ks_dht_job_build_ping(ks_dht_job_t *job, ks_dht_job_callback_t query_callback, ks_dht_job_callback_t finish_callback)
{
ks_assert(job);
ks_assert(query_callback);
job->query_callback = query_callback;
job->finish_callback = finish_callback;
@ -49,11 +50,46 @@ KS_DECLARE(void) ks_dht_job_build_findnode(ks_dht_job_t *job,
ks_dht_nodeid_t *target)
{
ks_assert(job);
ks_assert(query_callback);
ks_assert(target);
job->query_callback = query_callback;
job->finish_callback = finish_callback;
job->target = *target;
job->query_target = *target;
}
KS_DECLARE(void) ks_dht_job_build_get(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback,
ks_dht_nodeid_t *target,
uint8_t *salt,
ks_size_t salt_length)
{
ks_assert(job);
ks_assert(query_callback);
ks_assert(target);
job->query_callback = query_callback;
job->finish_callback = finish_callback;
job->query_target = *target;
if (salt && salt_length > 0) job->query_salt = ben_blob(salt, salt_length);
}
KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback,
ks_dht_nodeid_t *target,
uint8_t *salt,
ks_size_t salt_length)
{
ks_assert(job);
ks_assert(query_callback);
ks_assert(target);
job->query_callback = query_callback;
job->finish_callback = finish_callback;
job->query_target = *target;
if (salt && salt_length > 0) job->query_salt = ben_blob(salt, salt_length);
}
KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
@ -65,6 +101,10 @@ KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
j = *job;
if (j->query_salt) ben_free(j->query_salt);
for (int32_t i = 0; i < j->response_nodes_count; ++i) ks_dhtrt_release_node(j->response_nodes[i]);
for (int32_t i = 0; i < j->response_nodes6_count; ++i) ks_dhtrt_release_node(j->response_nodes6[i]);
ks_pool_free(j->pool, job);
}

View File

@ -88,7 +88,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const ui
memcpy(message->transactionid, tv, tv_len);
message->transactionid_length = tv_len;
// @todo hex output of transactionid
//ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", *transactionid);
//ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", message->transactionid);
y = ben_dict_get_by_str(message->data, "y");
if (!y) {

View File

@ -92,7 +92,7 @@ KS_DECLARE(ks_status_t) ks_dht_search_pending_create(ks_dht_search_pending_t **p
p->pool = pool;
p->nodeid = *nodeid;
p->expiration = ks_time_now() + (KS_DHT_SEARCH_EXPIRATION * 1000);
p->expiration = ks_time_now() + ((ks_time_t)KS_DHT_SEARCH_EXPIRATION * KS_USEC_PER_SEC);
p->finished = KS_FALSE;
// done:

View File

@ -2,12 +2,9 @@
#include "ks_dht-int.h"
#include "sodium.h"
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, struct bencode *v)
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t **item, ks_pool_t *pool, ks_dht_nodeid_t *target, struct bencode *v)
{
ks_dht_storageitem_t *si;
SHA_CTX sha;
size_t enc_len = 0;
uint8_t *enc = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(item);
@ -19,16 +16,18 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t
ks_assert(si);
si->pool = pool;
si->id = *target;
si->mutable = KS_FALSE;
si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
si->v = ben_clone(v);
ks_assert(si->v);
enc = ben_encode(&enc_len, si->v);
ks_assert(enc);
SHA1_Init(&sha);
SHA1_Update(&sha, enc, enc_len);
SHA1_Final(si->id.id, &sha);
free(enc);
//enc = ben_encode(&enc_len, si->v);
//ks_assert(enc);
//SHA1_Init(&sha);
//SHA1_Update(&sha, enc, enc_len);
//SHA1_Final(si->id.id, &sha);
//free(enc);
// done:
if (ret != KS_STATUS_SUCCESS) {
@ -39,15 +38,14 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_immutable(ks_dht_storageitem_t
KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t **item,
ks_pool_t *pool,
ks_dht_nodeid_t *target,
struct bencode *v,
ks_dht_storageitem_key_t *k,
uint8_t *salt,
ks_size_t salt_length,
struct bencode *salt,
int64_t sequence,
ks_dht_storageitem_signature_t *signature)
{
ks_dht_storageitem_t *si;
SHA_CTX sha;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(item);
@ -55,30 +53,25 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_create_mutable(ks_dht_storageitem_t *
ks_assert(v);
ks_assert(SHA_DIGEST_LENGTH == KS_DHT_NODEID_SIZE);
ks_assert(k);
ks_assert(!(!salt && salt_length > 0));
ks_assert(!(salt_length > KS_DHT_STORAGEITEM_SIGNATURE_SIZE));
ks_assert(signature);
*item = si = ks_pool_alloc(pool, sizeof(ks_dht_storageitem_t));
ks_assert(si);
si->pool = pool;
si->id = *target;
si->mutable = KS_TRUE;
si->expiration = ks_time_now() + ((ks_time_t)KS_DHT_STORAGEITEM_EXPIRATION * KS_USEC_PER_SEC);
si->v = ben_clone(v);
ks_assert(si->v);
memcpy(si->pk.key, k->key, KS_DHT_STORAGEITEM_KEY_SIZE);
if (salt && salt_length > 0) {
memcpy(si->salt, salt, salt_length);
si->salt_length = salt_length;
si->pk = *k;
if (salt) {
si->salt = ben_clone(salt);
ks_assert(si->salt);
}
si->seq = sequence;
memcpy(si->sig.sig, signature->sig, KS_DHT_STORAGEITEM_SIGNATURE_SIZE);
SHA1_Init(&sha);
SHA1_Update(&sha, si->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE);
if (si->salt && si->salt_length > 0) SHA1_Update(&sha, si->salt, si->salt_length);
SHA1_Final(si->id.id, &sha);
si->sig = *signature;
// done:
if (ret != KS_STATUS_SUCCESS) {
@ -103,6 +96,10 @@ KS_DECLARE(void) ks_dht_storageitem_destroy(ks_dht_storageitem_t **item)
ben_free(si->v);
si->v = NULL;
}
if (si->salt) {
ben_free(si->salt);
si->salt = NULL;
}
ks_pool_free(si->pool, item);
}

View File

@ -21,7 +21,7 @@ KS_DECLARE(ks_status_t) ks_dht_transaction_create(ks_dht_transaction_t **transac
t->job = job;
t->transactionid = transactionid;
t->callback = callback;
t->expiration = ks_time_now() + (KS_DHT_TRANSACTION_EXPIRATION * 1000);
t->expiration = ks_time_now() + ((ks_time_t)KS_DHT_TRANSACTION_EXPIRATION * KS_USEC_PER_SEC);
// done:
if (ret != KS_STATUS_SUCCESS) {

View File

@ -132,7 +132,6 @@ int main() {
//err = ks_dht_process(dht1, &raddr);
//ok(err == KS_STATUS_SUCCESS);
diag("Ping test\n");
//ks_dht_send_ping(dht2, ep2, &raddr1); // Queue bootstrap ping from dht2 to dht1
@ -163,7 +162,6 @@ int main() {
diag("Find_Node test\n");
//ks_dht_send_findnode(dht3, ep3, &raddr1, &ep2->nodeid); // Queue findnode from dht3 to dht1
ks_dht_findnode(dht3, &raddr1, NULL, &ep2->nodeid);
ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1