From e4176f5873ebec86d1e005aa1dd6c1728d6e2c36 Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Fri, 2 Dec 2016 19:57:45 +0000 Subject: [PATCH] FS-9775: Initial work towards sending messages, refactored into less headers, test updated --- libs/libks/Makefile.am | 5 +- libs/libks/src/dht/ks_dht-int.h | 25 +- libs/libks/src/dht/ks_dht.c | 282 +++++++++++++++++++++-- libs/libks/src/dht/ks_dht.h | 116 +++++++++- libs/libks/src/dht/ks_dht_endpoint-int.h | 29 --- libs/libks/src/dht/ks_dht_endpoint.c | 4 +- libs/libks/src/dht/ks_dht_endpoint.h | 29 --- libs/libks/src/dht/ks_dht_message.c | 97 +++++++- libs/libks/src/dht/ks_dht_message.h | 43 ---- libs/libks/src/dht/ks_dht_nodeid.c | 3 +- libs/libks/src/dht/ks_dht_nodeid.h | 42 ---- libs/libks/src/dht/ks_dht_transaction.c | 87 +++++++ libs/libks/test/testdht2.c | 14 +- 13 files changed, 587 insertions(+), 189 deletions(-) delete mode 100644 libs/libks/src/dht/ks_dht_endpoint-int.h delete mode 100644 libs/libks/src/dht/ks_dht_endpoint.h delete mode 100644 libs/libks/src/dht/ks_dht_message.h delete mode 100644 libs/libks/src/dht/ks_dht_nodeid.h create mode 100644 libs/libks/src/dht/ks_dht_transaction.c diff --git a/libs/libks/Makefile.am b/libs/libks/Makefile.am index e81a564a06..341925e6d4 100644 --- a/libs/libks/Makefile.am +++ b/libs/libks/Makefile.am @@ -13,7 +13,7 @@ libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/k libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c libks_la_SOURCES += src/utp/utp_api.cpp src/utp/utp_callbacks.cpp src/utp/utp_hash.cpp src/utp/utp_internal.cpp libks_la_SOURCES += src/utp/utp_packedsockaddr.cpp src/utp/utp_utils.cpp src/ks_bencode.c -libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_nodeid.c src/dht/ks_dht_message.c +libks_la_SOURCES += src/dht/ks_dht.c src/dht/ks_dht_endpoint.c src/dht/ks_dht_nodeid.c src/dht/ks_dht_message.c src/dht/ks_dht_transaction.c libks_la_SOURCES += crypt/aeskey.c crypt/aestab.c crypt/sha2.c crypt/twofish.c crypt/aes_modes.c crypt/aescrypt.c crypt/twofish_cfb.c #aes.h aescpp.h brg_endian.h aesopt.h aestab.h brg_types.h sha2.h twofish.h @@ -29,8 +29,7 @@ library_include_HEADERS += src/include/ks_dso.h src/include/ks_dht.h src/include library_include_HEADERS += src/include/ks_printf.h src/include/ks_hash.h src/include/ks_ssl.h src/include/kws.h library_include_HEADERS += src/utp/utp_internal.h src/utp/utp.h src/utp/utp_types.h src/utp/utp_callbacks.h src/utp/utp_templates.h library_include_HEADERS += src/utp/utp_hash.h src/utp/utp_packedsockaddr.h src/utp/utp_utils.h src/include/ks_utp.h -library_include_HEADERS += src/dht/ks_dht.h src/dht/ks_dht-int.h src/dht/ks_dht_endpoint.h src/dht/ks_dht_endpoint-int.h -library_include_HEADERS += src/dht/ks_dht_nodeid.h src/dht/ks_dht_message.h +library_include_HEADERS += src/dht/ks_dht.h src/dht/ks_dht-int.h tests: libks.la $(MAKE) -C test tests diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 9f10f5599e..81ed2f558d 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -5,12 +5,35 @@ KS_BEGIN_EXTERN_C - +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht); KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr); KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); + KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); + +KS_DECLARE(ks_status_t) ks_dht2_send_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr); +KS_DECLARE(ks_status_t) ks_dht2_send_response_ping(ks_dht2_t *dht, + ks_sockaddr_t *raddr, + uint8_t *transactionid, + ks_size_t transactionid_length); + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_endpoint_alloc(ks_dht2_endpoint_t **endpoint, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dht2_endpoint_prealloc(ks_dht2_endpoint_t *endpoint, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dht2_endpoint_free(ks_dht2_endpoint_t *endpoint); + +KS_DECLARE(ks_status_t) ks_dht2_endpoint_init(ks_dht2_endpoint_t *endpoint, const ks_sockaddr_t *addr, ks_socket_t sock); +KS_DECLARE(ks_status_t) ks_dht2_endpoint_deinit(ks_dht2_endpoint_t *endpoint); + KS_END_EXTERN_C diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index e0288b61f4..18f0d5e996 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -1,6 +1,5 @@ #include "ks_dht.h" #include "ks_dht-int.h" -#include "ks_dht_endpoint-int.h" #include "sodium.h" /** @@ -62,6 +61,9 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t ks_assert(dht); ks_assert(dht->pool); + dht->autoroute = KS_FALSE; + dht->autoroute_port = 0; + if (ks_dht2_nodeid_prealloc(&dht->nodeid, dht->pool) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL; } @@ -72,6 +74,7 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_dht2_register_type(dht, "q", ks_dht2_process_query); + ks_dht2_register_type(dht, "r", ks_dht2_process_response); // @todo ks_hash_insert the r/e callbacks into type registry ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); @@ -86,6 +89,9 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t dht->endpoints_poll = NULL; dht->recv_buffer_length = 0; + + dht->transactionid_next = rand() % 0xFFFF; + ks_hash_create(&dht->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, dht->pool); return KS_STATUS_SUCCESS; } @@ -97,6 +103,11 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht) { ks_assert(dht); + dht->transactionid_next = 0; + if (dht->transactions_hash) { + ks_hash_destroy(&dht->transactions_hash); + dht->transactions_hash = NULL; + } dht->recv_buffer_length = 0; for (int32_t i = 0; i < dht->endpoints_size; ++i) { ks_dht2_endpoint_t *ep = dht->endpoints[i]; @@ -130,6 +141,9 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht) } ks_dht2_nodeid_deinit(&dht->nodeid); + + dht->autoroute = KS_FALSE; + dht->autoroute_port = 0; return KS_STATUS_SUCCESS; } @@ -137,7 +151,26 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht) /** * */ -KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback) +KS_DECLARE(ks_status_t) ks_dht2_autoroute(ks_dht2_t *dht, ks_bool_t autoroute, ks_port_t port) +{ + ks_assert(dht); + + if (!autoroute) { + port = 0; + } else if (port == 0) { + return KS_STATUS_FAIL; + } + + dht->autoroute = autoroute; + dht->autoroute_port = port; + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback) { ks_assert(dht); ks_assert(value); @@ -149,7 +182,7 @@ KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, /** * */ -KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback) +KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback) { ks_assert(dht); ks_assert(value); @@ -161,7 +194,7 @@ KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value /** * */ -KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr) +KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr, ks_dht2_endpoint_t **endpoint) { ks_dht2_endpoint_t *ep; ks_socket_t sock; @@ -172,6 +205,9 @@ KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr) ks_assert(addr->family == AF_INET || addr->family == AF_INET6); ks_assert(addr->port); + if (endpoint) { + *endpoint = NULL; + } dht->bind_ipv4 |= addr->family == AF_INET; dht->bind_ipv6 |= addr->family == AF_INET6; @@ -215,7 +251,11 @@ KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr) sizeof(struct pollfd) * dht->endpoints_size); dht->endpoints_poll[epindex].fd = ep->sock; dht->endpoints_poll[epindex].events = POLLIN | POLLERR; - + + if (endpoint) { + *endpoint = ep; + } + return KS_STATUS_SUCCESS; } @@ -260,6 +300,63 @@ KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout) return KS_STATUS_SUCCESS; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +{ + // @todo lookup standard def for IPV6 max size + char ip[48]; + ks_dht2_endpoint_t *ep; + // @todo calculate max IPV6 payload size? + char buf[1000]; + ks_size_t buf_len; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(message); + ks_assert(message->data); + + // @todo blacklist check + + ks_ip_route(ip, sizeof(ip), raddr->host); + + if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) { + ks_sockaddr_t addr; + ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family); + if (ks_dht2_bind(dht, &addr, &ep) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + } + + if (!ep) { + ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host); + return KS_STATUS_FAIL; + } + + buf_len = ben_encode2(buf, sizeof(buf), message->data); + + ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", raddr->host, raddr->port); + ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data)); + + if (ks_socket_sendto(ep->sock, (void *)buf, &buf_len, raddr) != KS_STATUS_SUCCESS) { + ks_log(KS_LOG_DEBUG, "Socket error\n"); + return KS_STATUS_FAIL; + } + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht) +{ + ks_assert(dht); + + return KS_STATUS_SUCCESS; +} + /** * */ @@ -276,7 +373,7 @@ KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht) KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr) { ks_dht2_message_t message; - ks_dht2_registry_callback_t callback; + ks_dht2_message_callback_t callback; ks_status_t ret = KS_STATUS_FAIL; ks_assert(dht); @@ -294,16 +391,21 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr) return KS_STATUS_FAIL; } - if (ks_dht2_message_init(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) { + if (ks_dht2_message_init(&message, KS_FALSE) != KS_STATUS_SUCCESS) { return KS_STATUS_FAIL; } + + if (ks_dht2_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) { + goto done; + } - if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) { + if (!(callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) { ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); } else { ret = callback(dht, raddr, &message); } + done: ks_dht2_message_deinit(&message); return ret; @@ -319,23 +421,24 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *rad const char *qv; ks_size_t qv_len; char query[KS_DHT_MESSAGE_QUERY_MAX_SIZE]; - ks_dht2_registry_callback_t callback; + ks_dht2_message_callback_t callback; ks_status_t ret = KS_STATUS_FAIL; ks_assert(dht); ks_assert(raddr); ks_assert(message); + // @todo start of ks_dht2_message_parse_query q = ben_dict_get_by_str(message->data, "q"); if (!q) { - ks_log(KS_LOG_DEBUG, "Message missing required key 'q'\n"); + ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n"); return KS_STATUS_FAIL; } qv = ben_str_val(q); qv_len = ben_str_len(q); if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) { - ks_log(KS_LOG_DEBUG, "Message 'q' value has an unexpectedly large size of %d\n", qv_len); + ks_log(KS_LOG_DEBUG, "Message query 'q' value has an unexpectedly large size of %d\n", qv_len); return KS_STATUS_FAIL; } @@ -345,13 +448,14 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *rad a = ben_dict_get_by_str(message->data, "a"); if (!a) { - ks_log(KS_LOG_DEBUG, "Message missing required key 'a'\n"); + ks_log(KS_LOG_DEBUG, "Message query missing required key 'a'\n"); return KS_STATUS_FAIL; } + // @todo end of ks_dht2_message_parse_query message->args = a; - if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) { + if (!(callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) { ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query); } else { ret = callback(dht, raddr, message); @@ -360,6 +464,46 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *rad return ret; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +{ + struct bencode *r; + ks_dht2_transaction_t *transaction; + uint32_t transactionid; + uint16_t *tid; + ks_status_t ret = KS_STATUS_FAIL; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(message); + + // @todo start of ks_dht2_message_parse_response + r = ben_dict_get_by_str(message->data, "r"); + if (!r) { + ks_log(KS_LOG_DEBUG, "Message response missing required key 'r'\n"); + return KS_STATUS_FAIL; + } + // todo end of ks_dht2_message_parse_response + + message->args = r; + + tid = (uint16_t *)message->transactionid; + transactionid = ntohs(*tid); + + transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED); + ks_hash_read_unlock(dht->transactions_hash); + + if (!transaction) { + ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid); + } else { + ret = transaction->callback(dht, raddr, message); + } + + return ret; +} + /** * */ @@ -369,6 +513,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t const char *idv; ks_size_t idv_len; ks_dht2_nodeid_t nid; + ks_status_t ret = KS_STATUS_FAIL; ks_assert(dht); ks_assert(raddr); @@ -397,13 +542,122 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t } //ks_log(KS_LOG_DEBUG, "Message query ping id is '%s'\n", id->id); - ks_log(KS_LOG_DEBUG, "Mesage query ping is valid\n"); + ks_log(KS_LOG_DEBUG, "Message query ping is valid\n"); + + ret = ks_dht2_send_response_ping(dht, raddr, message->transactionid, message->transactionid_length); ks_dht2_nodeid_deinit(&nid); + return ret; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +{ + ks_assert(dht); + ks_assert(raddr); + ks_assert(message); + return KS_STATUS_SUCCESS; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_send_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr) +{ + uint32_t transactionid; + ks_dht2_transaction_t *transaction = NULL; + ks_dht2_message_t query; + struct bencode *a; + ks_status_t ret = KS_STATUS_FAIL; + + ks_assert(dht); + ks_assert(raddr); + + // @todo atomic increment or mutex... + transactionid = dht->transactionid_next++; + + if (ks_dht2_transaction_alloc(&transaction, dht->pool) != KS_STATUS_SUCCESS) { + goto done; + } + + if (ks_dht2_transaction_init(transaction, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) { + goto done; + } + + if (ks_dht2_message_prealloc(&query, dht->pool) != KS_STATUS_SUCCESS) { + goto done; + } + + if (ks_dht2_message_init(&query, KS_TRUE) != KS_STATUS_SUCCESS) { + goto done; + } + + if (ks_dht2_message_query(&query, transactionid, "ping", &a) != KS_STATUS_SUCCESS) { + goto done; + } + + // @todo transaction expiration and raddr + + // @todo transactions_hash mutex? + ks_hash_insert(dht->transactions_hash, (void *)&transactionid, transaction); + + // @note a joins response.data and will be freed with it + ben_dict_set(a, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH)); + + ks_log(KS_LOG_DEBUG, "Sending message query ping\n"); + ret = ks_dht2_send(dht, raddr, &query); + + done: + if (transaction && ret != KS_STATUS_SUCCESS) { + ks_dht2_transaction_deinit(transaction); + ks_dht2_transaction_free(transaction); + } + ks_dht2_message_deinit(&query); + return ret; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_send_response_ping(ks_dht2_t *dht, + ks_sockaddr_t *raddr, + uint8_t *transactionid, + ks_size_t transactionid_length) +{ + ks_dht2_message_t response; + struct bencode *r; + ks_status_t ret = KS_STATUS_FAIL; + + ks_assert(dht); + ks_assert(raddr); + + if (ks_dht2_message_prealloc(&response, dht->pool) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + + if (ks_dht2_message_init(&response, KS_TRUE) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + + if (ks_dht2_message_response(&response, transactionid, transactionid_length, &r) != KS_STATUS_SUCCESS) { + goto done; + } + + // @note r joins response.data and will be freed with it + ben_dict_set(r, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH)); + + ks_log(KS_LOG_DEBUG, "Sending message response ping\n"); + ret = ks_dht2_send(dht, raddr, &response); + + done: + ks_dht2_message_deinit(&response); + return ret; +} + /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index e3ce9ead77..7629d2da3b 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -4,9 +4,6 @@ #include "ks.h" #include "ks_bencode.h" -#include "ks_dht_endpoint.h" -#include "ks_dht_message.h" -#include "ks_dht_nodeid.h" KS_BEGIN_EXTERN_C @@ -14,11 +11,62 @@ KS_BEGIN_EXTERN_C #define KS_DHT_DEFAULT_PORT 5309 #define KS_DHT_RECV_BUFFER_SIZE 0xFFFF +#define KS_DHT_NODEID_LENGTH 20 + +#define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20 +#define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20 +#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20 + + typedef struct ks_dht2_s ks_dht2_t; +typedef struct ks_dht2_nodeid_s ks_dht2_nodeid_t; +typedef struct ks_dht2_nodeid_raw_s ks_dht2_nodeid_raw_t; +typedef struct ks_dht2_message_s ks_dht2_message_t; +typedef struct ks_dht2_endpoint_s ks_dht2_endpoint_t; +typedef struct ks_dht2_transaction_s ks_dht2_transaction_t; + + +typedef ks_status_t (*ks_dht2_message_callback_t)(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); + +struct ks_dht2_nodeid_raw_s { + uint8_t id[KS_DHT_NODEID_LENGTH]; +}; + +struct ks_dht2_nodeid_s { + ks_pool_t *pool; + uint8_t id[KS_DHT_NODEID_LENGTH]; +}; + +struct ks_dht2_message_s { + ks_pool_t *pool; + struct bencode *data; + uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE]; + ks_size_t transactionid_length; + char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE]; + struct bencode *args; +}; + +struct ks_dht2_endpoint_s { + ks_pool_t *pool; + ks_sockaddr_t addr; + ks_socket_t sock; +}; + +struct ks_dht2_transaction_s { + ks_pool_t *pool; + uint16_t transactionid; + ks_dht2_message_callback_t callback; + // @todo expiration data +}; + + struct ks_dht2_s { ks_pool_t *pool; ks_bool_t pool_alloc; + ks_bool_t autoroute; + ks_port_t autoroute_port; + ks_dht2_nodeid_t nodeid; ks_hash_t *registry_type; @@ -34,11 +82,14 @@ struct ks_dht2_s { uint8_t recv_buffer[KS_DHT_RECV_BUFFER_SIZE]; ks_size_t recv_buffer_length; + + uint16_t transactionid_next; + ks_hash_t *transactions_hash; }; -typedef ks_status_t (*ks_dht2_registry_callback_t)(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); - - +/** + * + */ KS_DECLARE(ks_status_t) ks_dht2_alloc(ks_dht2_t **dht, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht2_prealloc(ks_dht2_t *dht, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht2_free(ks_dht2_t *dht); @@ -47,15 +98,62 @@ KS_DECLARE(ks_status_t) ks_dht2_free(ks_dht2_t *dht); KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t *nodeid); KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht); +KS_DECLARE(ks_status_t) ks_dht2_autoroute(ks_dht2_t *dht, ks_bool_t autoroute, ks_port_t port); -KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr); +KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr, ks_dht2_endpoint_t **endpoint); KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout); -KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback); -KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback); +KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback); +KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback); +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_nodeid_alloc(ks_dht2_nodeid_t **nodeid, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dht2_nodeid_prealloc(ks_dht2_nodeid_t *nodeid, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dht2_nodeid_free(ks_dht2_nodeid_t *nodeid); +KS_DECLARE(ks_status_t) ks_dht2_nodeid_init(ks_dht2_nodeid_t *nodeid, const ks_dht2_nodeid_raw_t *id); +KS_DECLARE(ks_status_t) ks_dht2_nodeid_deinit(ks_dht2_nodeid_t *nodeid); + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_message_alloc(ks_dht2_message_t **message, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dht2_message_prealloc(ks_dht2_message_t *message, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dht2_message_free(ks_dht2_message_t *message); + +KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, ks_bool_t alloc_data); +KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message); + +KS_DECLARE(ks_status_t) ks_dht2_message_parse(ks_dht2_message_t *message, const uint8_t *buffer, ks_size_t buffer_length); + +KS_DECLARE(ks_status_t) ks_dht2_message_query(ks_dht2_message_t *message, + uint16_t transactionid, + const char *query, + struct bencode **args); +KS_DECLARE(ks_status_t) ks_dht2_message_response(ks_dht2_message_t *message, + uint8_t *transactionid, + ks_size_t transactionid_length, + struct bencode **args); + +/** + * + */ + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_transaction_alloc(ks_dht2_transaction_t **transaction, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dht2_transaction_prealloc(ks_dht2_transaction_t *trasnaction, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transaction); + +KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction, + uint16_t transactionid, + ks_dht2_message_callback_t callback); +KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transaction); + KS_END_EXTERN_C #endif /* KS_DHT_H */ diff --git a/libs/libks/src/dht/ks_dht_endpoint-int.h b/libs/libks/src/dht/ks_dht_endpoint-int.h deleted file mode 100644 index 2d0971419c..0000000000 --- a/libs/libks/src/dht/ks_dht_endpoint-int.h +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef KS_DHT_ENDPOINT_INT_H -#define KS_DHT_ENDPOINT_INT_H - -#include "ks.h" - -KS_BEGIN_EXTERN_C - -KS_DECLARE(ks_status_t) ks_dht2_endpoint_alloc(ks_dht2_endpoint_t **endpoint, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht2_endpoint_prealloc(ks_dht2_endpoint_t *endpoint, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht2_endpoint_free(ks_dht2_endpoint_t *endpoint); - -KS_DECLARE(ks_status_t) ks_dht2_endpoint_init(ks_dht2_endpoint_t *endpoint, const ks_sockaddr_t *addr, ks_socket_t sock); -KS_DECLARE(ks_status_t) ks_dht2_endpoint_deinit(ks_dht2_endpoint_t *endpoint); - -KS_END_EXTERN_C - -#endif /* KS_DHT_ENDPOINT_H */ - - -/* For Emacs: -* Local Variables: -* mode:c -* indent-tabs-mode:t -* tab-width:4 -* c-basic-offset:4 -* End: -* For VIM: -* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: -*/ diff --git a/libs/libks/src/dht/ks_dht_endpoint.c b/libs/libks/src/dht/ks_dht_endpoint.c index 286d8f43c1..d33a62dc07 100644 --- a/libs/libks/src/dht/ks_dht_endpoint.c +++ b/libs/libks/src/dht/ks_dht_endpoint.c @@ -1,5 +1,5 @@ -#include "ks_dht_endpoint.h" -#include "ks_dht_endpoint-int.h" +#include "ks_dht.h" +#include "ks_dht-int.h" /** * diff --git a/libs/libks/src/dht/ks_dht_endpoint.h b/libs/libks/src/dht/ks_dht_endpoint.h deleted file mode 100644 index 6d9df0551d..0000000000 --- a/libs/libks/src/dht/ks_dht_endpoint.h +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef KS_DHT_ENDPOINT_H -#define KS_DHT_ENDPOINT_H - -#include "ks.h" - -KS_BEGIN_EXTERN_C - -typedef struct ks_dht2_endpoint_s ks_dht2_endpoint_t; -struct ks_dht2_endpoint_s { - ks_pool_t *pool; - ks_sockaddr_t addr; - ks_socket_t sock; -}; - -KS_END_EXTERN_C - -#endif /* KS_DHT_ENDPOINT_H */ - - -/* For Emacs: -* Local Variables: -* mode:c -* indent-tabs-mode:t -* tab-width:4 -* c-basic-offset:4 -* End: -* For VIM: -* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: -*/ diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index f7ee960809..921038b87b 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -1,4 +1,5 @@ #include "ks_dht.h" +#include "ks_dht-int.h" /** * @@ -46,7 +47,44 @@ KS_DECLARE(ks_status_t) ks_dht2_message_free(ks_dht2_message_t *message) /** * */ -KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, const uint8_t *buffer, ks_size_t buffer_length) +KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, ks_bool_t alloc_data) +{ + ks_assert(message); + ks_assert(message->pool); + + message->data = NULL; + message->args = NULL; + message->transactionid_length = 0; + message->type[0] = '\0'; + if (alloc_data) { + message->data = ben_dict(); + } + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message) +{ + ks_assert(message); + + message->args = NULL; + message->type[0] = '\0'; + message->transactionid_length = 0; + if (message->data) { + ben_free(message->data); + message->data = NULL; + } + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_message_parse(ks_dht2_message_t *message, const uint8_t *buffer, ks_size_t buffer_length) { struct bencode *t; struct bencode *y; @@ -58,8 +96,7 @@ KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, const u ks_assert(message); ks_assert(message->pool); ks_assert(buffer); - - message->args = NULL; + ks_assert(!message->data); message->data = ben_decode((const void *)buffer, buffer_length); if (!message->data) { @@ -115,16 +152,56 @@ KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, const u /** * */ -KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message) +KS_DECLARE(ks_status_t) ks_dht2_message_query(ks_dht2_message_t *message, + uint16_t transactionid, + const char *query, + struct bencode **args) { + struct bencode *a; + uint16_t tid; + ks_assert(message); + ks_assert(query); - message->args = NULL; - message->type[0] = '\0'; - message->transactionid_length = 0; - if (message->data) { - ben_free(message->data); - message->data = NULL; + tid = htons(transactionid); + + ben_dict_set(message->data, ben_blob("t", 1), ben_blob((uint8_t *)&tid, 2)); + ben_dict_set(message->data, ben_blob("y", 1), ben_blob("q", 1)); + ben_dict_set(message->data, ben_blob("q", 1), ben_blob(query, strlen(query))); + + // @note r joins message->data and will be freed with it + a = ben_dict(); + ben_dict_set(message->data, ben_blob("a", 1), a); + + if (args) { + *args = a; + } + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_message_response(ks_dht2_message_t *message, + uint8_t *transactionid, + ks_size_t transactionid_length, + struct bencode **args) +{ + struct bencode *r; + + ks_assert(message); + ks_assert(transactionid); + + ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); + ben_dict_set(message->data, ben_blob("y", 1), ben_blob("r", 1)); + + // @note r joins message->data and will be freed with it + r = ben_dict(); + ben_dict_set(message->data, ben_blob("r", 1), r); + + if (args) { + *args = r; } return KS_STATUS_SUCCESS; diff --git a/libs/libks/src/dht/ks_dht_message.h b/libs/libks/src/dht/ks_dht_message.h deleted file mode 100644 index 1530dc94f1..0000000000 --- a/libs/libks/src/dht/ks_dht_message.h +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef KS_DHT_MESSAGE_H -#define KS_DHT_MESSAGE_H - -#include "ks.h" - -KS_BEGIN_EXTERN_C - -#define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20 -#define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20 -#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20 - -typedef struct ks_dht2_message_s ks_dht2_message_t; -struct ks_dht2_message_s { - ks_pool_t *pool; - struct bencode *data; - uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE]; - ks_size_t transactionid_length; - char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE]; - struct bencode *args; -}; - -KS_DECLARE(ks_status_t) ks_dht2_message_alloc(ks_dht2_message_t **message, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht2_message_prealloc(ks_dht2_message_t *message, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht2_message_free(ks_dht2_message_t *message); - -KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, const uint8_t *buffer, ks_size_t buffer_length); -KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message); - -KS_END_EXTERN_C - -#endif /* KS_DHT_MESSAGE_H */ - -/* For Emacs: - * Local Variables: - * mode:c - * indent-tabs-mode:t - * tab-width:4 - * c-basic-offset:4 - * End: - * For VIM: - * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: - */ - diff --git a/libs/libks/src/dht/ks_dht_nodeid.c b/libs/libks/src/dht/ks_dht_nodeid.c index af5999c4b3..be4b188463 100644 --- a/libs/libks/src/dht/ks_dht_nodeid.c +++ b/libs/libks/src/dht/ks_dht_nodeid.c @@ -1,4 +1,5 @@ -#include "ks_dht_nodeid.h" +#include "ks_dht.h" +#include "ks_dht-int.h" #include "sodium.h" /** diff --git a/libs/libks/src/dht/ks_dht_nodeid.h b/libs/libks/src/dht/ks_dht_nodeid.h deleted file mode 100644 index 39705e9e10..0000000000 --- a/libs/libks/src/dht/ks_dht_nodeid.h +++ /dev/null @@ -1,42 +0,0 @@ -#ifndef KS_DHT_NODEID_H -#define KS_DHT_NODEID_H - -#include "ks.h" - -KS_BEGIN_EXTERN_C - -#define KS_DHT_NODEID_LENGTH 20 - -typedef struct ks_dht2_nodeid_raw_s ks_dht2_nodeid_raw_t; -struct ks_dht2_nodeid_raw_s { - uint8_t id[KS_DHT_NODEID_LENGTH]; -}; - -typedef struct ks_dht2_nodeid_s ks_dht2_nodeid_t; -struct ks_dht2_nodeid_s { - ks_pool_t *pool; - uint8_t id[KS_DHT_NODEID_LENGTH]; -}; - -KS_DECLARE(ks_status_t) ks_dht2_nodeid_alloc(ks_dht2_nodeid_t **nodeid, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht2_nodeid_prealloc(ks_dht2_nodeid_t *nodeid, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht2_nodeid_free(ks_dht2_nodeid_t *nodeid); - -KS_DECLARE(ks_status_t) ks_dht2_nodeid_init(ks_dht2_nodeid_t *nodeid, const ks_dht2_nodeid_raw_t *id); -KS_DECLARE(ks_status_t) ks_dht2_nodeid_deinit(ks_dht2_nodeid_t *nodeid); - -KS_END_EXTERN_C - -#endif /* KS_DHT_NODEID_H */ - -/* For Emacs: - * Local Variables: - * mode:c - * indent-tabs-mode:t - * tab-width:4 - * c-basic-offset:4 - * End: - * For VIM: - * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: - */ - diff --git a/libs/libks/src/dht/ks_dht_transaction.c b/libs/libks/src/dht/ks_dht_transaction.c new file mode 100644 index 0000000000..4946e1ceec --- /dev/null +++ b/libs/libks/src/dht/ks_dht_transaction.c @@ -0,0 +1,87 @@ +#include "ks_dht.h" +#include "ks_dht-int.h" + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_transaction_alloc(ks_dht2_transaction_t **transaction, ks_pool_t *pool) +{ + ks_dht2_transaction_t *tran; + + ks_assert(transaction); + ks_assert(pool); + + *transaction = tran = ks_pool_alloc(pool, sizeof(ks_dht2_transaction_t)); + tran->pool = pool; + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_transaction_prealloc(ks_dht2_transaction_t *transaction, ks_pool_t *pool) +{ + ks_assert(transaction); + ks_assert(pool); + + transaction->pool = pool; + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transaction) +{ + ks_assert(transaction); + + ks_dht2_transaction_deinit(transaction); + ks_pool_free(transaction->pool, transaction); + + return KS_STATUS_SUCCESS; +} + + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction, + uint16_t transactionid, + ks_dht2_message_callback_t callback) +{ + ks_assert(transaction); + ks_assert(transaction->pool); + ks_assert(callback); + + transaction->transactionid = transactionid; + transaction->callback = callback; + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transaction) +{ + ks_assert(transaction); + + transaction->transactionid = 0; + transaction->callback = NULL; + + return KS_STATUS_SUCCESS; +} + + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index 7fd5d31e0e..d2dfbbd8a6 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -1,7 +1,6 @@ #include #include <../dht/ks_dht.h> #include <../dht/ks_dht-int.h> -#include <../dht/ks_dht_endpoint-int.h> #include #define TEST_DHT1_REGISTER_TYPE_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze" @@ -65,13 +64,13 @@ int main() { err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET); ok(err == KS_STATUS_SUCCESS); - err = ks_dht2_bind(dht1, &addr); + err = ks_dht2_bind(dht1, &addr, NULL); ok(err == KS_STATUS_SUCCESS); err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 1, AF_INET); ok(err == KS_STATUS_SUCCESS); - err = ks_dht2_bind(&dht2, &addr); + err = ks_dht2_bind(&dht2, &addr, NULL); ok(err == KS_STATUS_SUCCESS); raddr = addr; @@ -81,13 +80,13 @@ int main() { err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT, AF_INET6); ok(err == KS_STATUS_SUCCESS); - err = ks_dht2_bind(dht1, &addr); + err = ks_dht2_bind(dht1, &addr, NULL); ok(err == KS_STATUS_SUCCESS); err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 1, AF_INET6); ok(err == KS_STATUS_SUCCESS); - err = ks_dht2_bind(&dht2, &addr); + err = ks_dht2_bind(&dht2, &addr, NULL); ok(err == KS_STATUS_SUCCESS); } @@ -105,8 +104,11 @@ int main() { err = ks_dht2_process(dht1, &raddr); ok(err == KS_STATUS_SUCCESS); - + err = ks_dht2_pulse(&dht2, 1000); + ok(err == KS_STATUS_SUCCESS); + + diag("Cleanup\n"); /* Cleanup and shutdown */ err = ks_dht2_deinit(&dht2);