FS-9775: Refactored sending of messages to utilize a more scalable and throttled queue for outgoing messages

This commit is contained in:
Shane Bryldt 2016-12-06 15:15:12 +00:00 committed by Mike Jerris
parent b51038f618
commit 3e12cca293
5 changed files with 436 additions and 264 deletions

View File

@ -8,30 +8,31 @@ KS_BEGIN_EXTERN_C
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht);
KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht);
KS_DECLARE(void) ks_dht2_idle(ks_dht2_t *dht);
KS_DECLARE(void) ks_dht2_idle_expirations(ks_dht2_t *dht);
KS_DECLARE(void) ks_dht2_idle_send(ks_dht2_t *dht);
KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr);
KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht,
ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length,
long long errorcode,
const char *errorstr);
KS_DECLARE(ks_status_t) ks_dht2_send_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr);
KS_DECLARE(ks_status_t) ks_dht2_send_findnode(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_nodeid_raw_t *targetid);
KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr);
KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_send_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr);
KS_DECLARE(ks_status_t) ks_dht2_send_response_ping(ks_dht2_t *dht,
ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length);
KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_query_findnode(ks_dht2_t *dht, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_response_findnode(ks_dht2_t *dht, ks_dht2_message_t *message);
/**
*

View File

@ -79,6 +79,7 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t
ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
ks_dht2_register_query(dht, "ping", ks_dht2_process_query_ping);
ks_dht2_register_query(dht, "find_node", ks_dht2_process_query_findnode);
ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
// @todo register 301 error for internal get/put CAS hash mismatch retry handler
@ -90,7 +91,9 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t
dht->endpoints_size = 0;
ks_hash_create(&dht->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, dht->pool);
dht->endpoints_poll = NULL;
ks_q_create(&dht->send_q, dht->pool, 0);
dht->send_q_unsent = NULL;
dht->recv_buffer_length = 0;
dht->transactionid_next = 1; //rand();
@ -112,6 +115,20 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
dht->transactions_hash = NULL;
}
dht->recv_buffer_length = 0;
if (dht->send_q) {
ks_dht2_message_t *msg;
while (ks_q_pop_timeout(dht->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) {
ks_dht2_message_deinit(msg);
ks_dht2_message_free(msg);
}
ks_q_destroy(&dht->send_q);
dht->send_q = NULL;
}
if (dht->send_q_unsent) {
ks_dht2_message_deinit(dht->send_q_unsent);
ks_dht2_message_free(dht->send_q_unsent);
dht->send_q_unsent = NULL;
}
for (int32_t i = 0; i < dht->endpoints_size; ++i) {
ks_dht2_endpoint_t *ep = dht->endpoints[i];
ks_dht2_endpoint_deinit(ep);
@ -280,7 +297,7 @@ KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr,
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout)
KS_DECLARE(void) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout)
{
int32_t result;
@ -294,30 +311,21 @@ KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout)
}
result = ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout);
if (result < 0) {
return KS_STATUS_FAIL;
}
if (result == 0) {
ks_dht2_idle(dht);
return KS_STATUS_TIMEOUT;
}
for (int32_t i = 0; i < dht->endpoints_size; ++i) {
if (dht->endpoints_poll[i].revents & POLLIN) {
ks_sockaddr_t raddr = KS_SA_INIT;
dht->recv_buffer_length = KS_DHT_RECV_BUFFER_SIZE;
if (result > 0) {
for (int32_t i = 0; i < dht->endpoints_size; ++i) {
if (dht->endpoints_poll[i].revents & POLLIN) {
ks_sockaddr_t raddr = KS_SA_INIT;
dht->recv_buffer_length = KS_DHT_RECV_BUFFER_SIZE;
raddr.family = dht->endpoints[i]->addr.family;
if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) {
ks_dht2_process(dht, &raddr);
raddr.family = dht->endpoints[i]->addr.family;
if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) {
ks_dht2_process(dht, &raddr);
}
}
}
}
ks_dht2_idle(dht);
return KS_STATUS_SUCCESS;
}
/**
@ -333,21 +341,19 @@ KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht)
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht)
KS_DECLARE(void) ks_dht2_idle(ks_dht2_t *dht)
{
ks_assert(dht);
if (ks_dht2_idle_expirations(dht) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
return KS_STATUS_SUCCESS;
ks_dht2_idle_expirations(dht);
ks_dht2_idle_send(dht);
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht)
KS_DECLARE(void) ks_dht2_idle_expirations(ks_dht2_t *dht)
{
ks_hash_iterator_t *it = NULL;
ks_time_t now = ks_time_now_sec();
@ -375,6 +381,240 @@ KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht)
}
}
ks_hash_write_unlock(dht->transactions_hash);
}
/**
*
*/
KS_DECLARE(void) ks_dht2_idle_send(ks_dht2_t *dht)
{
ks_dht2_message_t *message;
ks_bool_t bail = KS_FALSE;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
while (!bail) {
message = NULL;
if (dht->send_q_unsent) {
message = dht->send_q_unsent;
dht->send_q_unsent = NULL;
}
if (!message) {
bail = ks_q_pop_timeout(dht->send_q, (void **)&message, 1) != KS_STATUS_SUCCESS || !message;
}
if (!bail) {
bail = (ret = ks_dht2_send(dht, message)) != KS_STATUS_SUCCESS;
if (ret == KS_STATUS_BREAK) {
dht->send_q_unsent = message;
} else if (ret == KS_STATUS_SUCCESS) {
ks_dht2_message_deinit(message);
ks_dht2_message_free(message);
}
}
}
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_dht2_message_t *message)
{
// @todo lookup standard def for IPV6 max size
char ip[48];
ks_dht2_endpoint_t *ep;
// @todo calculate max IPV6 payload size?
char buf[1000];
ks_size_t buf_len;
ks_assert(dht);
ks_assert(message);
ks_assert(message->data);
// @todo blacklist check
ks_ip_route(ip, sizeof(ip), message->raddr.host);
if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
ks_sockaddr_t addr;
ks_addr_set(&addr, ip, dht->autoroute_port, message->raddr.family);
if (ks_dht2_bind(dht, &addr, &ep) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
}
if (!ep) {
ks_log(KS_LOG_DEBUG, "No route available to %s\n", message->raddr.host);
return KS_STATUS_FAIL;
}
buf_len = ben_encode2(buf, sizeof(buf), message->data);
ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", message->raddr.host, message->raddr.port);
ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
return ks_socket_sendto(ep->sock, (void *)buf, &buf_len, &message->raddr);
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht,
ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length,
long long errorcode,
const char *errorstr)
{
ks_dht2_message_t *error = NULL;
struct bencode *e = NULL;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(transactionid);
ks_assert(errorstr);
if (ks_dht2_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
if (ks_dht2_message_init(error, raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
goto done;
}
if (ks_dht2_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) {
goto done;
}
ben_list_append(e, ben_int(errorcode));
ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
ks_q_push(dht->send_q, (void *)error);
ret = KS_STATUS_SUCCESS;
done:
if (ret != KS_STATUS_SUCCESS && error) {
ks_dht2_message_deinit(error);
ks_dht2_message_free(error);
}
return ret;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_setup_query(ks_dht2_t *dht,
ks_sockaddr_t *raddr,
const char *query,
ks_dht2_message_callback_t callback,
ks_dht2_message_t **message,
struct bencode **args)
{
uint32_t transactionid;
ks_dht2_transaction_t *trans = NULL;
ks_dht2_message_t *msg = NULL;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(query);
ks_assert(callback);
ks_assert(message);
*message = NULL;
// @todo atomic increment or mutex
transactionid = dht->transactionid_next++;
if (ks_dht2_transaction_alloc(&trans, dht->pool) != KS_STATUS_SUCCESS) {
goto done;
}
if (ks_dht2_transaction_init(trans, raddr, transactionid, callback) != KS_STATUS_SUCCESS) {
goto done;
}
if (ks_dht2_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) {
goto done;
}
if (ks_dht2_message_init(msg, raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
goto done;
}
if (ks_dht2_message_query(msg, transactionid, query, args) != KS_STATUS_SUCCESS) {
goto done;
}
*message = msg;
ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans);
ret = KS_STATUS_SUCCESS;
done:
if (ret != KS_STATUS_SUCCESS) {
if (trans) {
ks_dht2_transaction_deinit(trans);
ks_dht2_transaction_free(trans);
}
if (msg) {
ks_dht2_message_deinit(msg);
ks_dht2_message_free(msg);
}
*message = NULL;
}
return ret;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_send_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr)
{
ks_dht2_message_t *message = NULL;
struct bencode *a = NULL;
ks_assert(dht);
ks_assert(raddr);
if (ks_dht2_setup_query(dht, raddr, "ping", ks_dht2_process_response_ping, &message, &a) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
ben_dict_set(a, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH));
ks_log(KS_LOG_DEBUG, "Sending message query ping\n");
ks_q_push(dht->send_q, (void *)message);
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_send_findnode(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_nodeid_raw_t *targetid)
{
ks_dht2_message_t *message = NULL;
struct bencode *a = NULL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(targetid);
if (ks_dht2_setup_query(dht, raddr, "find_node", ks_dht2_process_response_findnode, &message, &a) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
ben_dict_set(a, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH));
ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_LENGTH));
ks_log(KS_LOG_DEBUG, "Sending message query find_node\n");
ks_q_push(dht->send_q, (void *)message);
//ks_dht2_send(dht, raddr, message);
return KS_STATUS_SUCCESS;
}
@ -403,7 +643,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
return KS_STATUS_FAIL;
}
if (ks_dht2_message_init(&message, KS_FALSE) != KS_STATUS_SUCCESS) {
if (ks_dht2_message_init(&message, raddr, KS_FALSE) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
@ -414,7 +654,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
if (!(callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) {
ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
} else {
ret = callback(dht, raddr, &message);
ret = callback(dht, &message);
}
done:
@ -426,97 +666,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
{
// @todo lookup standard def for IPV6 max size
char ip[48];
ks_dht2_endpoint_t *ep;
// @todo calculate max IPV6 payload size?
char buf[1000];
ks_size_t buf_len;
ks_assert(dht);
ks_assert(raddr);
ks_assert(message);
ks_assert(message->data);
// @todo blacklist check
ks_ip_route(ip, sizeof(ip), raddr->host);
if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
ks_sockaddr_t addr;
ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family);
if (ks_dht2_bind(dht, &addr, &ep) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
}
if (!ep) {
ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host);
return KS_STATUS_FAIL;
}
buf_len = ben_encode2(buf, sizeof(buf), message->data);
ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", raddr->host, raddr->port);
ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
if (ks_socket_sendto(ep->sock, (void *)buf, &buf_len, raddr) != KS_STATUS_SUCCESS) {
ks_log(KS_LOG_DEBUG, "Socket error\n");
return KS_STATUS_FAIL;
}
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht,
ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length,
long long errorcode,
const char *errorstr)
{
ks_dht2_message_t error;
struct bencode *e;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(transactionid);
ks_assert(errorstr);
if (ks_dht2_message_prealloc(&error, dht->pool) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
if (ks_dht2_message_init(&error, KS_TRUE) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
if (ks_dht2_message_error(&error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) {
goto done;
}
// @note e joins response.data and will be freed with it
ben_list_append(e, ben_int(errorcode));
ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
ret = ks_dht2_send(dht, raddr, &error);
done:
ks_dht2_message_deinit(&error);
return ret;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_dht2_message_t *message)
{
struct bencode *q;
struct bencode *a;
@ -527,7 +677,6 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *rad
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(message);
// @todo start of ks_dht2_message_parse_query
@ -560,7 +709,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *rad
if (!(callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) {
ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
} else {
ret = callback(dht, raddr, message);
ret = callback(dht, message);
}
return ret;
@ -569,7 +718,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *rad
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_dht2_message_t *message)
{
struct bencode *r;
ks_dht2_transaction_t *transaction;
@ -578,7 +727,6 @@ KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(message);
// @todo start of ks_dht2_message_parse_response
@ -599,17 +747,17 @@ KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *
if (!transaction) {
ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
} else if (!ks_addr_cmp(raddr, &transaction->raddr)) {
} else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
ks_log(KS_LOG_DEBUG,
"Message response rejected due to spoofing from %s %d, expected %s %d\n",
raddr->host,
raddr->port,
message->raddr.host,
message->raddr.port,
transaction->raddr.host,
transaction->raddr.port);
} else {
// @todo mark transaction for later removal
transaction->finished = KS_TRUE;
ret = transaction->callback(dht, raddr, message);
ret = transaction->callback(dht, message);
}
return ret;
@ -618,7 +766,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_dht2_message_t *message)
{
struct bencode *e;
struct bencode *ec;
@ -633,7 +781,6 @@ KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *rad
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(message);
// @todo start of ks_dht2_message_parse_error
@ -666,11 +813,11 @@ KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *rad
if (!transaction) {
ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid);
} else if (!ks_addr_cmp(raddr, &transaction->raddr)) {
} else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) {
ks_log(KS_LOG_DEBUG,
"Message error rejected due to spoofing from %s %d, expected %s %d\n",
raddr->host,
raddr->port,
message->raddr.host,
message->raddr.port,
transaction->raddr.host,
transaction->raddr.port);
} else {
@ -679,7 +826,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *rad
transaction->finished = KS_TRUE;
if ((callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) {
ret = callback(dht, raddr, message);
ret = callback(dht, message);
} else {
ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
ret = KS_STATUS_SUCCESS;
@ -692,16 +839,16 @@ KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *rad
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_dht2_message_t *message)
{
struct bencode *id;
const char *idv;
//const char *idv;
ks_size_t idv_len;
ks_dht2_nodeid_t nid;
ks_dht2_message_t *response = NULL;
struct bencode *r = NULL;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(message);
ks_assert(message->args);
@ -711,39 +858,131 @@ KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t
return KS_STATUS_FAIL;
}
idv = ben_str_val(id);
//idv = ben_str_val(id);
idv_len = ben_str_len(id);
if (idv_len != KS_DHT_NODEID_LENGTH) {
ks_log(KS_LOG_DEBUG, "Message args 'id' value has an unexpected size of %d\n", idv_len);
return KS_STATUS_FAIL;
}
if (ks_dht2_nodeid_prealloc(&nid, dht->pool) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
if (ks_dht2_nodeid_init(&nid, (const ks_dht2_nodeid_raw_t *)idv) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
// @todo add/touch bucket entry for remote node
ks_log(KS_LOG_DEBUG, "Message query ping is valid\n");
ret = ks_dht2_send_response_ping(dht, raddr, message->transactionid, message->transactionid_length);
if (ks_dht2_message_alloc(&response, dht->pool) != KS_STATUS_SUCCESS) {
goto done;
}
ks_dht2_nodeid_deinit(&nid);
if (ks_dht2_message_init(response, &message->raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
goto done;
}
if (ks_dht2_message_response(response, message->transactionid, message->transactionid_length, &r) != KS_STATUS_SUCCESS) {
goto done;
}
ben_dict_set(r, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH));
ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
ks_q_push(dht->send_q, (void *)response);
ret = KS_STATUS_SUCCESS;
done:
if (ret != KS_STATUS_SUCCESS && response) {
ks_dht2_message_deinit(response);
ks_dht2_message_free(response);
}
return ret;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
KS_DECLARE(ks_status_t) ks_dht2_process_query_findnode(ks_dht2_t *dht, ks_dht2_message_t *message)
{
struct bencode *id;
struct bencode *target;
//const char *idv;
//const char *targetv;
ks_size_t idv_len;
ks_size_t targetv_len;
ks_dht2_message_t *response = NULL;
struct bencode *r = NULL;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(message);
ks_assert(message->args);
id = ben_dict_get_by_str(message->args, "id");
if (!id) {
ks_log(KS_LOG_DEBUG, "Message args missing required key 'id'\n");
return KS_STATUS_FAIL;
}
//idv = ben_str_val(id);
idv_len = ben_str_len(id);
if (idv_len != KS_DHT_NODEID_LENGTH) {
ks_log(KS_LOG_DEBUG, "Message args 'id' value has an unexpected size of %d\n", idv_len);
return KS_STATUS_FAIL;
}
target = ben_dict_get_by_str(message->args, "target");
if (!target) {
ks_log(KS_LOG_DEBUG, "Message args missing required key 'target'\n");
return KS_STATUS_FAIL;
}
//targetv = ben_str_val(target);
targetv_len = ben_str_len(target);
if (targetv_len != KS_DHT_NODEID_LENGTH) {
ks_log(KS_LOG_DEBUG, "Message args 'target' value has an unexpected size of %d\n", targetv_len);
return KS_STATUS_FAIL;
}
ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n");
if (ks_dht2_message_alloc(&response, dht->pool) != KS_STATUS_SUCCESS) {
goto done;
}
if (ks_dht2_message_init(response, &message->raddr, KS_TRUE) != KS_STATUS_SUCCESS) {
goto done;
}
if (ks_dht2_message_response(response, message->transactionid, message->transactionid_length, &r) != KS_STATUS_SUCCESS) {
goto done;
}
ben_dict_set(r, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH));
ks_log(KS_LOG_DEBUG, "Sending message response find_node\n");
ks_q_push(dht->send_q, (void *)response);
ret = KS_STATUS_SUCCESS;
done:
if (ret != KS_STATUS_SUCCESS && response) {
ks_dht2_message_deinit(response);
ks_dht2_message_free(response);
}
return ret;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_dht2_message_t *message)
{
ks_assert(dht);
ks_assert(raddr);
ks_assert(message);
// @todo add/touch bucket entry for remote node
ks_log(KS_LOG_DEBUG, "Message response ping is reached\n");
return KS_STATUS_SUCCESS;
@ -752,96 +991,16 @@ KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockadd
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_send_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr)
KS_DECLARE(ks_status_t) ks_dht2_process_response_findnode(ks_dht2_t *dht, ks_dht2_message_t *message)
{
uint32_t transactionid;
ks_dht2_transaction_t *transaction = NULL;
ks_dht2_message_t query;
struct bencode *a;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(message);
// @todo atomic increment or mutex...
transactionid = dht->transactionid_next++;
// @todo add/touch bucket entry for remote node and other nodes returned
if (ks_dht2_transaction_alloc(&transaction, dht->pool) != KS_STATUS_SUCCESS) {
goto done;
}
ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n");
if (ks_dht2_transaction_init(transaction, raddr, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) {
goto done;
}
if (ks_dht2_message_prealloc(&query, dht->pool) != KS_STATUS_SUCCESS) {
goto done;
}
if (ks_dht2_message_init(&query, KS_TRUE) != KS_STATUS_SUCCESS) {
goto done;
}
if (ks_dht2_message_query(&query, transactionid, "ping", &a) != KS_STATUS_SUCCESS) {
goto done;
}
// @todo transaction expiration and raddr for validation
ks_hash_insert(dht->transactions_hash, (void *)&transaction->transactionid, transaction);
// @note a joins response.data and will be freed with it
ben_dict_set(a, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH));
ks_log(KS_LOG_DEBUG, "Sending message query ping with transaction id %d\n", transactionid);
ret = ks_dht2_send(dht, raddr, &query);
done:
if (transaction && ret != KS_STATUS_SUCCESS) {
ks_dht2_transaction_deinit(transaction);
ks_dht2_transaction_free(transaction);
}
ks_dht2_message_deinit(&query);
return ret;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_send_response_ping(ks_dht2_t *dht,
ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length)
{
ks_dht2_message_t response;
struct bencode *r;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(transactionid);
if (ks_dht2_message_prealloc(&response, dht->pool) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
if (ks_dht2_message_init(&response, KS_TRUE) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
if (ks_dht2_message_response(&response, transactionid, transactionid_length, &r) != KS_STATUS_SUCCESS) {
goto done;
}
// @note r joins response.data and will be freed with it
ben_dict_set(r, ben_blob("id", 2), ben_blob(dht->nodeid.id, KS_DHT_NODEID_LENGTH));
ks_log(KS_LOG_DEBUG, "Sending message response ping\n");
ret = ks_dht2_send(dht, raddr, &response);
done:
ks_dht2_message_deinit(&response);
return ret;
return KS_STATUS_SUCCESS;
}
/* For Emacs:

View File

@ -28,7 +28,7 @@ typedef struct ks_dht2_endpoint_s ks_dht2_endpoint_t;
typedef struct ks_dht2_transaction_s ks_dht2_transaction_t;
typedef ks_status_t (*ks_dht2_message_callback_t)(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
typedef ks_status_t (*ks_dht2_message_callback_t)(ks_dht2_t *dht, ks_dht2_message_t *message);
struct ks_dht2_nodeid_raw_s {
uint8_t id[KS_DHT_NODEID_LENGTH];
@ -41,6 +41,7 @@ struct ks_dht2_nodeid_s {
struct ks_dht2_message_s {
ks_pool_t *pool;
ks_sockaddr_t raddr;
struct bencode *data;
uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE];
ks_size_t transactionid_length;
@ -85,6 +86,8 @@ struct ks_dht2_s {
ks_hash_t *endpoints_hash;
struct pollfd *endpoints_poll;
ks_q_t *send_q;
ks_dht2_message_t *send_q_unsent;
uint8_t recv_buffer[KS_DHT_RECV_BUFFER_SIZE];
ks_size_t recv_buffer_length;
@ -106,7 +109,7 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht);
KS_DECLARE(ks_status_t) ks_dht2_autoroute(ks_dht2_t *dht, ks_bool_t autoroute, ks_port_t port);
KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr, ks_dht2_endpoint_t **endpoint);
KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout);
KS_DECLARE(void) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout);
KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback);
@ -129,7 +132,7 @@ KS_DECLARE(ks_status_t) ks_dht2_message_alloc(ks_dht2_message_t **message, ks_po
KS_DECLARE(ks_status_t) ks_dht2_message_prealloc(ks_dht2_message_t *message, ks_pool_t *pool);
KS_DECLARE(ks_status_t) ks_dht2_message_free(ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, ks_bool_t alloc_data);
KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, ks_sockaddr_t *raddr, ks_bool_t alloc_data);
KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_message_parse(ks_dht2_message_t *message, const uint8_t *buffer, ks_size_t buffer_length);

View File

@ -47,11 +47,12 @@ KS_DECLARE(ks_status_t) ks_dht2_message_free(ks_dht2_message_t *message)
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, ks_bool_t alloc_data)
KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, ks_sockaddr_t *raddr, ks_bool_t alloc_data)
{
ks_assert(message);
ks_assert(message->pool);
message->raddr = *raddr;
message->data = NULL;
message->args = NULL;
message->transactionid_length = 0;
@ -70,6 +71,7 @@ KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message)
{
ks_assert(message);
message->raddr = (const ks_sockaddr_t){ 0 };
message->args = NULL;
message->type[0] = '\0';
message->transactionid_length = 0;

View File

@ -6,11 +6,11 @@
#define TEST_DHT1_REGISTER_TYPE_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze"
#define TEST_DHT1_PROCESS_QUERY_PING_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe"
ks_status_t dht_z_callback(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
ks_status_t dht_z_callback(ks_dht2_t *dht, ks_dht2_message_t *message)
{
diag("dht_z_callback\n");
ok(message->transactionid[0] == '4' && message->transactionid[1] == '2');
ks_dht2_send_error(dht, raddr, message->transactionid, message->transactionid_length, 201, "Generic test error");
ks_dht2_send_error(dht, &message->raddr, message->transactionid, message->transactionid_length, 201, "Generic test error");
return KS_STATUS_SUCCESS;
}
@ -91,6 +91,8 @@ int main() {
ok(err == KS_STATUS_SUCCESS);
}
diag("Custom type tests\n");
buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER);
memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen);
dht1->recv_buffer_length = buflen;
@ -98,25 +100,30 @@ int main() {
err = ks_dht2_process(dht1, &raddr);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht2_pulse(&dht2, 1000);
ok(err == KS_STATUS_SUCCESS);
ks_dht2_pulse(dht1, 100);
ks_dht2_pulse(&dht2, 100);
//buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER);
//memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen);
//dht1->recv_buffer_length = buflen;
//err = ks_dht2_process(dht1, &raddr);
//ok(err == KS_STATUS_SUCCESS);
err = ks_dht2_send_query_ping(dht1, &raddr);
ok(err == KS_STATUS_SUCCESS);
diag("Ping tests\n");
ks_dht2_send_ping(dht1, &raddr);
ks_dht2_pulse(dht1, 100);
ks_dht2_pulse(&dht2, 100);
err = ks_dht2_pulse(&dht2, 1000);
ok(err == KS_STATUS_SUCCESS);
err = ks_dht2_pulse(dht1, 1000);
ok(err == KS_STATUS_SUCCESS);
ks_dht2_pulse(dht1, 100);
diag("Cleanup\n");
/* Cleanup and shutdown */