diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 15fcb32548..4f835ed18d 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -8,30 +8,31 @@ KS_BEGIN_EXTERN_C /** * */ -KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht); -KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht); +KS_DECLARE(void) ks_dht2_idle(ks_dht2_t *dht); +KS_DECLARE(void) ks_dht2_idle_expirations(ks_dht2_t *dht); +KS_DECLARE(void) ks_dht2_idle_send(ks_dht2_t *dht); -KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr); -KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, uint8_t *transactionid, ks_size_t transactionid_length, long long errorcode, const char *errorstr); +KS_DECLARE(ks_status_t) ks_dht2_send_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr); +KS_DECLARE(ks_status_t) ks_dht2_send_findnode(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_nodeid_raw_t *targetid); -KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); -KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); -KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr); -KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); -KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_dht2_message_t *message); -KS_DECLARE(ks_status_t) ks_dht2_send_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr); -KS_DECLARE(ks_status_t) ks_dht2_send_response_ping(ks_dht2_t *dht, - ks_sockaddr_t *raddr, - uint8_t *transactionid, - ks_size_t transactionid_length); +KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_process_query_findnode(ks_dht2_t *dht, ks_dht2_message_t *message); + +KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_process_response_findnode(ks_dht2_t *dht, ks_dht2_message_t *message); /** * diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index d0460d5425..746c9e62fa 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -79,6 +79,7 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_dht2_register_query(dht, "ping", ks_dht2_process_query_ping); + ks_dht2_register_query(dht, "find_node", ks_dht2_process_query_findnode); ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); // @todo register 301 error for internal get/put CAS hash mismatch retry handler @@ -90,7 +91,9 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t dht->endpoints_size = 0; ks_hash_create(&dht->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, dht->pool); dht->endpoints_poll = NULL; - + + ks_q_create(&dht->send_q, dht->pool, 0); + dht->send_q_unsent = NULL; dht->recv_buffer_length = 0; dht->transactionid_next = 1; //rand(); @@ -112,6 +115,20 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht) dht->transactions_hash = NULL; } dht->recv_buffer_length = 0; + if (dht->send_q) { + ks_dht2_message_t *msg; + while (ks_q_pop_timeout(dht->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) { + ks_dht2_message_deinit(msg); + ks_dht2_message_free(msg); + } + ks_q_destroy(&dht->send_q); + dht->send_q = NULL; + } + if (dht->send_q_unsent) { + ks_dht2_message_deinit(dht->send_q_unsent); + ks_dht2_message_free(dht->send_q_unsent); + dht->send_q_unsent = NULL; + } for (int32_t i = 0; i < dht->endpoints_size; ++i) { ks_dht2_endpoint_t *ep = dht->endpoints[i]; ks_dht2_endpoint_deinit(ep); @@ -280,7 +297,7 @@ KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr, /** * */ -KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout) +KS_DECLARE(void) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout) { int32_t result; @@ -294,30 +311,21 @@ KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout) } result = ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout); - if (result < 0) { - return KS_STATUS_FAIL; - } - - if (result == 0) { - ks_dht2_idle(dht); - return KS_STATUS_TIMEOUT; - } - - for (int32_t i = 0; i < dht->endpoints_size; ++i) { - if (dht->endpoints_poll[i].revents & POLLIN) { - ks_sockaddr_t raddr = KS_SA_INIT; - dht->recv_buffer_length = KS_DHT_RECV_BUFFER_SIZE; + if (result > 0) { + for (int32_t i = 0; i < dht->endpoints_size; ++i) { + if (dht->endpoints_poll[i].revents & POLLIN) { + ks_sockaddr_t raddr = KS_SA_INIT; + dht->recv_buffer_length = KS_DHT_RECV_BUFFER_SIZE; - raddr.family = dht->endpoints[i]->addr.family; - if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) { - ks_dht2_process(dht, &raddr); + raddr.family = dht->endpoints[i]->addr.family; + if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) { + ks_dht2_process(dht, &raddr); + } } } } ks_dht2_idle(dht); - - return KS_STATUS_SUCCESS; } /** @@ -333,21 +341,19 @@ KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht) /** * */ -KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht) +KS_DECLARE(void) ks_dht2_idle(ks_dht2_t *dht) { ks_assert(dht); - if (ks_dht2_idle_expirations(dht) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - return KS_STATUS_SUCCESS; + ks_dht2_idle_expirations(dht); + + ks_dht2_idle_send(dht); } /** * */ -KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht) +KS_DECLARE(void) ks_dht2_idle_expirations(ks_dht2_t *dht) { ks_hash_iterator_t *it = NULL; ks_time_t now = ks_time_now_sec(); @@ -375,6 +381,240 @@ KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht) } } ks_hash_write_unlock(dht->transactions_hash); +} + +/** + * + */ +KS_DECLARE(void) ks_dht2_idle_send(ks_dht2_t *dht) +{ + ks_dht2_message_t *message; + ks_bool_t bail = KS_FALSE; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(dht); + + while (!bail) { + message = NULL; + if (dht->send_q_unsent) { + message = dht->send_q_unsent; + dht->send_q_unsent = NULL; + } + if (!message) { + bail = ks_q_pop_timeout(dht->send_q, (void **)&message, 1) != KS_STATUS_SUCCESS || !message; + } + if (!bail) { + bail = (ret = ks_dht2_send(dht, message)) != KS_STATUS_SUCCESS; + if (ret == KS_STATUS_BREAK) { + dht->send_q_unsent = message; + } else if (ret == KS_STATUS_SUCCESS) { + ks_dht2_message_deinit(message); + ks_dht2_message_free(message); + } + } + } +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_dht2_message_t *message) +{ + // @todo lookup standard def for IPV6 max size + char ip[48]; + ks_dht2_endpoint_t *ep; + // @todo calculate max IPV6 payload size? + char buf[1000]; + ks_size_t buf_len; + + ks_assert(dht); + ks_assert(message); + ks_assert(message->data); + + // @todo blacklist check + + ks_ip_route(ip, sizeof(ip), message->raddr.host); + + if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) { + ks_sockaddr_t addr; + ks_addr_set(&addr, ip, dht->autoroute_port, message->raddr.family); + if (ks_dht2_bind(dht, &addr, &ep) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + } + + if (!ep) { + ks_log(KS_LOG_DEBUG, "No route available to %s\n", message->raddr.host); + return KS_STATUS_FAIL; + } + + buf_len = ben_encode2(buf, sizeof(buf), message->data); + + ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port); + ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data)); + + return ks_socket_sendto(ep->sock, (void *)buf, &buf_len, &message->raddr); +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht, + ks_sockaddr_t *raddr, + uint8_t *transactionid, + ks_size_t transactionid_length, + long long errorcode, + const char *errorstr) +{ + ks_dht2_message_t *error = NULL; + struct bencode *e = NULL; + ks_status_t ret = KS_STATUS_FAIL; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(transactionid); + ks_assert(errorstr); + + if (ks_dht2_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + + if (ks_dht2_message_init(error, raddr, KS_TRUE) != KS_STATUS_SUCCESS) { + goto done; + } + + if (ks_dht2_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) { + goto done; + } + + ben_list_append(e, ben_int(errorcode)); + ben_list_append(e, ben_blob(errorstr, strlen(errorstr))); + + ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode); + ks_q_push(dht->send_q, (void *)error); + + ret = KS_STATUS_SUCCESS; + + done: + if (ret != KS_STATUS_SUCCESS && error) { + ks_dht2_message_deinit(error); + ks_dht2_message_free(error); + } + return ret; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_setup_query(ks_dht2_t *dht, + ks_sockaddr_t *raddr, + const char *query, + ks_dht2_message_callback_t callback, + ks_dht2_message_t **message, + struct bencode **args) +{ + uint32_t transactionid; + ks_dht2_transaction_t *trans = NULL; + ks_dht2_message_t *msg = NULL; + ks_status_t ret = KS_STATUS_FAIL; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(query); + ks_assert(callback); + ks_assert(message); + + *message = NULL; + + // @todo atomic increment or mutex + transactionid = dht->transactionid_next++; + + if (ks_dht2_transaction_alloc(&trans, dht->pool) != KS_STATUS_SUCCESS) { + goto done; + } + + if (ks_dht2_transaction_init(trans, raddr, transactionid, callback) != KS_STATUS_SUCCESS) { + goto done; + } + + if (ks_dht2_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) { + goto done; + } + + if (ks_dht2_message_init(msg, raddr, KS_TRUE) != KS_STATUS_SUCCESS) { + goto done; + } + + if (ks_dht2_message_query(msg, transactionid, query, args) != KS_STATUS_SUCCESS) { + goto done; + } + + *message = msg; + + ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans); + + ret = KS_STATUS_SUCCESS; + + done: + if (ret != KS_STATUS_SUCCESS) { + if (trans) { + ks_dht2_transaction_deinit(trans); + ks_dht2_transaction_free(trans); + } + if (msg) { + ks_dht2_message_deinit(msg); + ks_dht2_message_free(msg); + } + *message = NULL; + } + return ret; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_send_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr) +{ + ks_dht2_message_t *message = NULL; + struct bencode *a = NULL; + + ks_assert(dht); + ks_assert(raddr); + + if (ks_dht2_setup_query(dht, raddr, "ping", ks_dht2_process_response_ping, &message, &a) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + + ben_dict_set(a, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH)); + + ks_log(KS_LOG_DEBUG, "Sending message query ping\n"); + ks_q_push(dht->send_q, (void *)message); + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_send_findnode(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_nodeid_raw_t *targetid) +{ + ks_dht2_message_t *message = NULL; + struct bencode *a = NULL; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(targetid); + + if (ks_dht2_setup_query(dht, raddr, "find_node", ks_dht2_process_response_findnode, &message, &a) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + + ben_dict_set(a, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH)); + ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_LENGTH)); + + ks_log(KS_LOG_DEBUG, "Sending message query find_node\n"); + ks_q_push(dht->send_q, (void *)message); + //ks_dht2_send(dht, raddr, message); return KS_STATUS_SUCCESS; } @@ -403,7 +643,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr) return KS_STATUS_FAIL; } - if (ks_dht2_message_init(&message, KS_FALSE) != KS_STATUS_SUCCESS) { + if (ks_dht2_message_init(&message, raddr, KS_FALSE) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL; } @@ -414,7 +654,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr) if (!(callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) { ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); } else { - ret = callback(dht, raddr, &message); + ret = callback(dht, &message); } done: @@ -426,97 +666,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr) /** * */ -KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) -{ - // @todo lookup standard def for IPV6 max size - char ip[48]; - ks_dht2_endpoint_t *ep; - // @todo calculate max IPV6 payload size? - char buf[1000]; - ks_size_t buf_len; - - ks_assert(dht); - ks_assert(raddr); - ks_assert(message); - ks_assert(message->data); - - // @todo blacklist check - - ks_ip_route(ip, sizeof(ip), raddr->host); - - if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) { - ks_sockaddr_t addr; - ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family); - if (ks_dht2_bind(dht, &addr, &ep) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - } - - if (!ep) { - ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host); - return KS_STATUS_FAIL; - } - - buf_len = ben_encode2(buf, sizeof(buf), message->data); - - ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", raddr->host, raddr->port); - ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data)); - - if (ks_socket_sendto(ep->sock, (void *)buf, &buf_len, raddr) != KS_STATUS_SUCCESS) { - ks_log(KS_LOG_DEBUG, "Socket error\n"); - return KS_STATUS_FAIL; - } - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht, - ks_sockaddr_t *raddr, - uint8_t *transactionid, - ks_size_t transactionid_length, - long long errorcode, - const char *errorstr) -{ - ks_dht2_message_t error; - struct bencode *e; - ks_status_t ret = KS_STATUS_FAIL; - - ks_assert(dht); - ks_assert(raddr); - ks_assert(transactionid); - ks_assert(errorstr); - - if (ks_dht2_message_prealloc(&error, dht->pool) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - if (ks_dht2_message_init(&error, KS_TRUE) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - if (ks_dht2_message_error(&error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) { - goto done; - } - - // @note e joins response.data and will be freed with it - ben_list_append(e, ben_int(errorcode)); - ben_list_append(e, ben_blob(errorstr, strlen(errorstr))); - - ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode); - ret = ks_dht2_send(dht, raddr, &error); - - done: - ks_dht2_message_deinit(&error); - return ret; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_dht2_message_t *message) { struct bencode *q; struct bencode *a; @@ -527,7 +677,6 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *rad ks_status_t ret = KS_STATUS_FAIL; ks_assert(dht); - ks_assert(raddr); ks_assert(message); // @todo start of ks_dht2_message_parse_query @@ -560,7 +709,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *rad if (!(callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) { ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query); } else { - ret = callback(dht, raddr, message); + ret = callback(dht, message); } return ret; @@ -569,7 +718,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *rad /** * */ -KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_dht2_message_t *message) { struct bencode *r; ks_dht2_transaction_t *transaction; @@ -578,7 +727,6 @@ KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t * ks_status_t ret = KS_STATUS_FAIL; ks_assert(dht); - ks_assert(raddr); ks_assert(message); // @todo start of ks_dht2_message_parse_response @@ -599,17 +747,17 @@ KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t * if (!transaction) { ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid); - } else if (!ks_addr_cmp(raddr, &transaction->raddr)) { + } else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) { ks_log(KS_LOG_DEBUG, "Message response rejected due to spoofing from %s %d, expected %s %d\n", - raddr->host, - raddr->port, + message->raddr.host, + message->raddr.port, transaction->raddr.host, transaction->raddr.port); } else { // @todo mark transaction for later removal transaction->finished = KS_TRUE; - ret = transaction->callback(dht, raddr, message); + ret = transaction->callback(dht, message); } return ret; @@ -618,7 +766,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t * /** * */ -KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_dht2_message_t *message) { struct bencode *e; struct bencode *ec; @@ -633,7 +781,6 @@ KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *rad ks_status_t ret = KS_STATUS_FAIL; ks_assert(dht); - ks_assert(raddr); ks_assert(message); // @todo start of ks_dht2_message_parse_error @@ -666,11 +813,11 @@ KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *rad if (!transaction) { ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid); - } else if (!ks_addr_cmp(raddr, &transaction->raddr)) { + } else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) { ks_log(KS_LOG_DEBUG, "Message error rejected due to spoofing from %s %d, expected %s %d\n", - raddr->host, - raddr->port, + message->raddr.host, + message->raddr.port, transaction->raddr.host, transaction->raddr.port); } else { @@ -679,7 +826,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *rad transaction->finished = KS_TRUE; if ((callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) { - ret = callback(dht, raddr, message); + ret = callback(dht, message); } else { ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error); ret = KS_STATUS_SUCCESS; @@ -692,16 +839,16 @@ KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *rad /** * */ -KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_dht2_message_t *message) { struct bencode *id; - const char *idv; + //const char *idv; ks_size_t idv_len; - ks_dht2_nodeid_t nid; + ks_dht2_message_t *response = NULL; + struct bencode *r = NULL; ks_status_t ret = KS_STATUS_FAIL; ks_assert(dht); - ks_assert(raddr); ks_assert(message); ks_assert(message->args); @@ -711,39 +858,131 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t return KS_STATUS_FAIL; } - idv = ben_str_val(id); + //idv = ben_str_val(id); idv_len = ben_str_len(id); if (idv_len != KS_DHT_NODEID_LENGTH) { ks_log(KS_LOG_DEBUG, "Message args 'id' value has an unexpected size of %d\n", idv_len); return KS_STATUS_FAIL; } - if (ks_dht2_nodeid_prealloc(&nid, dht->pool) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - if (ks_dht2_nodeid_init(&nid, (const ks_dht2_nodeid_raw_t *)idv) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + // @todo add/touch bucket entry for remote node ks_log(KS_LOG_DEBUG, "Message query ping is valid\n"); - ret = ks_dht2_send_response_ping(dht, raddr, message->transactionid, message->transactionid_length); + + if (ks_dht2_message_alloc(&response, dht->pool) != KS_STATUS_SUCCESS) { + goto done; + } - ks_dht2_nodeid_deinit(&nid); + if (ks_dht2_message_init(response, &message->raddr, KS_TRUE) != KS_STATUS_SUCCESS) { + goto done; + } + if (ks_dht2_message_response(response, message->transactionid, message->transactionid_length, &r) != KS_STATUS_SUCCESS) { + goto done; + } + + ben_dict_set(r, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH)); + + ks_log(KS_LOG_DEBUG, "Sending message response ping\n"); + ks_q_push(dht->send_q, (void *)response); + + ret = KS_STATUS_SUCCESS; + + done: + if (ret != KS_STATUS_SUCCESS && response) { + ks_dht2_message_deinit(response); + ks_dht2_message_free(response); + } return ret; } /** * */ -KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +KS_DECLARE(ks_status_t) ks_dht2_process_query_findnode(ks_dht2_t *dht, ks_dht2_message_t *message) +{ + struct bencode *id; + struct bencode *target; + //const char *idv; + //const char *targetv; + ks_size_t idv_len; + ks_size_t targetv_len; + ks_dht2_message_t *response = NULL; + struct bencode *r = NULL; + ks_status_t ret = KS_STATUS_FAIL; + + ks_assert(dht); + ks_assert(message); + ks_assert(message->args); + + id = ben_dict_get_by_str(message->args, "id"); + if (!id) { + ks_log(KS_LOG_DEBUG, "Message args missing required key 'id'\n"); + return KS_STATUS_FAIL; + } + + //idv = ben_str_val(id); + idv_len = ben_str_len(id); + if (idv_len != KS_DHT_NODEID_LENGTH) { + ks_log(KS_LOG_DEBUG, "Message args 'id' value has an unexpected size of %d\n", idv_len); + return KS_STATUS_FAIL; + } + + target = ben_dict_get_by_str(message->args, "target"); + if (!target) { + ks_log(KS_LOG_DEBUG, "Message args missing required key 'target'\n"); + return KS_STATUS_FAIL; + } + + //targetv = ben_str_val(target); + targetv_len = ben_str_len(target); + if (targetv_len != KS_DHT_NODEID_LENGTH) { + ks_log(KS_LOG_DEBUG, "Message args 'target' value has an unexpected size of %d\n", targetv_len); + return KS_STATUS_FAIL; + } + + + ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n"); + + + if (ks_dht2_message_alloc(&response, dht->pool) != KS_STATUS_SUCCESS) { + goto done; + } + + if (ks_dht2_message_init(response, &message->raddr, KS_TRUE) != KS_STATUS_SUCCESS) { + goto done; + } + + if (ks_dht2_message_response(response, message->transactionid, message->transactionid_length, &r) != KS_STATUS_SUCCESS) { + goto done; + } + + ben_dict_set(r, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH)); + + ks_log(KS_LOG_DEBUG, "Sending message response find_node\n"); + ks_q_push(dht->send_q, (void *)response); + + ret = KS_STATUS_SUCCESS; + + done: + if (ret != KS_STATUS_SUCCESS && response) { + ks_dht2_message_deinit(response); + ks_dht2_message_free(response); + } + return ret; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_dht2_message_t *message) { ks_assert(dht); - ks_assert(raddr); ks_assert(message); + // @todo add/touch bucket entry for remote node + ks_log(KS_LOG_DEBUG, "Message response ping is reached\n"); return KS_STATUS_SUCCESS; @@ -752,96 +991,16 @@ KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockadd /** * */ -KS_DECLARE(ks_status_t) ks_dht2_send_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr) +KS_DECLARE(ks_status_t) ks_dht2_process_response_findnode(ks_dht2_t *dht, ks_dht2_message_t *message) { - uint32_t transactionid; - ks_dht2_transaction_t *transaction = NULL; - ks_dht2_message_t query; - struct bencode *a; - ks_status_t ret = KS_STATUS_FAIL; - ks_assert(dht); - ks_assert(raddr); + ks_assert(message); - // @todo atomic increment or mutex... - transactionid = dht->transactionid_next++; + // @todo add/touch bucket entry for remote node and other nodes returned - if (ks_dht2_transaction_alloc(&transaction, dht->pool) != KS_STATUS_SUCCESS) { - goto done; - } + ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n"); - if (ks_dht2_transaction_init(transaction, raddr, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) { - goto done; - } - - if (ks_dht2_message_prealloc(&query, dht->pool) != KS_STATUS_SUCCESS) { - goto done; - } - - if (ks_dht2_message_init(&query, KS_TRUE) != KS_STATUS_SUCCESS) { - goto done; - } - - if (ks_dht2_message_query(&query, transactionid, "ping", &a) != KS_STATUS_SUCCESS) { - goto done; - } - - // @todo transaction expiration and raddr for validation - - ks_hash_insert(dht->transactions_hash, (void *)&transaction->transactionid, transaction); - - // @note a joins response.data and will be freed with it - ben_dict_set(a, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH)); - - ks_log(KS_LOG_DEBUG, "Sending message query ping with transaction id %d\n", transactionid); - ret = ks_dht2_send(dht, raddr, &query); - - done: - if (transaction && ret != KS_STATUS_SUCCESS) { - ks_dht2_transaction_deinit(transaction); - ks_dht2_transaction_free(transaction); - } - ks_dht2_message_deinit(&query); - return ret; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht2_send_response_ping(ks_dht2_t *dht, - ks_sockaddr_t *raddr, - uint8_t *transactionid, - ks_size_t transactionid_length) -{ - ks_dht2_message_t response; - struct bencode *r; - ks_status_t ret = KS_STATUS_FAIL; - - ks_assert(dht); - ks_assert(raddr); - ks_assert(transactionid); - - if (ks_dht2_message_prealloc(&response, dht->pool) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - if (ks_dht2_message_init(&response, KS_TRUE) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - if (ks_dht2_message_response(&response, transactionid, transactionid_length, &r) != KS_STATUS_SUCCESS) { - goto done; - } - - // @note r joins response.data and will be freed with it - ben_dict_set(r, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH)); - - ks_log(KS_LOG_DEBUG, "Sending message response ping\n"); - ret = ks_dht2_send(dht, raddr, &response); - - done: - ks_dht2_message_deinit(&response); - return ret; + return KS_STATUS_SUCCESS; } /* For Emacs: diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 8985387ef1..7270bc7d67 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -28,7 +28,7 @@ typedef struct ks_dht2_endpoint_s ks_dht2_endpoint_t; typedef struct ks_dht2_transaction_s ks_dht2_transaction_t; -typedef ks_status_t (*ks_dht2_message_callback_t)(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); +typedef ks_status_t (*ks_dht2_message_callback_t)(ks_dht2_t *dht, ks_dht2_message_t *message); struct ks_dht2_nodeid_raw_s { uint8_t id[KS_DHT_NODEID_LENGTH]; @@ -41,6 +41,7 @@ struct ks_dht2_nodeid_s { struct ks_dht2_message_s { ks_pool_t *pool; + ks_sockaddr_t raddr; struct bencode *data; uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE]; ks_size_t transactionid_length; @@ -85,6 +86,8 @@ struct ks_dht2_s { ks_hash_t *endpoints_hash; struct pollfd *endpoints_poll; + ks_q_t *send_q; + ks_dht2_message_t *send_q_unsent; uint8_t recv_buffer[KS_DHT_RECV_BUFFER_SIZE]; ks_size_t recv_buffer_length; @@ -106,7 +109,7 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht); KS_DECLARE(ks_status_t) ks_dht2_autoroute(ks_dht2_t *dht, ks_bool_t autoroute, ks_port_t port); KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr, ks_dht2_endpoint_t **endpoint); -KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout); +KS_DECLARE(void) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout); KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback); @@ -129,7 +132,7 @@ KS_DECLARE(ks_status_t) ks_dht2_message_alloc(ks_dht2_message_t **message, ks_po KS_DECLARE(ks_status_t) ks_dht2_message_prealloc(ks_dht2_message_t *message, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht2_message_free(ks_dht2_message_t *message); -KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, ks_bool_t alloc_data); +KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, ks_sockaddr_t *raddr, ks_bool_t alloc_data); KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_message_parse(ks_dht2_message_t *message, const uint8_t *buffer, ks_size_t buffer_length); diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index ed88a0fe63..2f770e6cf6 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -47,11 +47,12 @@ KS_DECLARE(ks_status_t) ks_dht2_message_free(ks_dht2_message_t *message) /** * */ -KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, ks_bool_t alloc_data) +KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, ks_sockaddr_t *raddr, ks_bool_t alloc_data) { ks_assert(message); ks_assert(message->pool); + message->raddr = *raddr; message->data = NULL; message->args = NULL; message->transactionid_length = 0; @@ -70,6 +71,7 @@ KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message) { ks_assert(message); + message->raddr = (const ks_sockaddr_t){ 0 }; message->args = NULL; message->type[0] = '\0'; message->transactionid_length = 0; diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index f54b776511..2a7b91ac1e 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -6,11 +6,11 @@ #define TEST_DHT1_REGISTER_TYPE_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze" #define TEST_DHT1_PROCESS_QUERY_PING_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe" -ks_status_t dht_z_callback(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +ks_status_t dht_z_callback(ks_dht2_t *dht, ks_dht2_message_t *message) { diag("dht_z_callback\n"); ok(message->transactionid[0] == '4' && message->transactionid[1] == '2'); - ks_dht2_send_error(dht, raddr, message->transactionid, message->transactionid_length, 201, "Generic test error"); + ks_dht2_send_error(dht, &message->raddr, message->transactionid, message->transactionid_length, 201, "Generic test error"); return KS_STATUS_SUCCESS; } @@ -91,6 +91,8 @@ int main() { ok(err == KS_STATUS_SUCCESS); } + diag("Custom type tests\n"); + buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER); memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen); dht1->recv_buffer_length = buflen; @@ -98,25 +100,30 @@ int main() { err = ks_dht2_process(dht1, &raddr); ok(err == KS_STATUS_SUCCESS); - err = ks_dht2_pulse(&dht2, 1000); - ok(err == KS_STATUS_SUCCESS); + ks_dht2_pulse(dht1, 100); + ks_dht2_pulse(&dht2, 100); + + //buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER); //memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen); //dht1->recv_buffer_length = buflen; //err = ks_dht2_process(dht1, &raddr); //ok(err == KS_STATUS_SUCCESS); + - err = ks_dht2_send_query_ping(dht1, &raddr); - ok(err == KS_STATUS_SUCCESS); + diag("Ping tests\n"); + + ks_dht2_send_ping(dht1, &raddr); + + ks_dht2_pulse(dht1, 100); + + ks_dht2_pulse(&dht2, 100); - err = ks_dht2_pulse(&dht2, 1000); - ok(err == KS_STATUS_SUCCESS); - - err = ks_dht2_pulse(dht1, 1000); - ok(err == KS_STATUS_SUCCESS); + ks_dht2_pulse(dht1, 100); + diag("Cleanup\n"); /* Cleanup and shutdown */