From aa47b4bec2c028e1911df71d95a029c6075cee91 Mon Sep 17 00:00:00 2001 From: colm <colm@freeswitch1> Date: Fri, 9 Dec 2016 12:44:16 -0500 Subject: [PATCH] FS-9775: Match up datatypes, alloc node_t, remove ks_dht_bucket.h --- libs/libks/Makefile.am | 2 +- libs/libks/src/dht/ks_dht.h | 59 ++- libs/libks/src/dht/ks_dht_bucket.c | 618 +++++++++++++---------------- libs/libks/src/dht/ks_dht_bucket.h | 108 ----- 4 files changed, 330 insertions(+), 457 deletions(-) delete mode 100644 libs/libks/src/dht/ks_dht_bucket.h diff --git a/libs/libks/Makefile.am b/libs/libks/Makefile.am index e83c984eee..2ce72b33d9 100644 --- a/libs/libks/Makefile.am +++ b/libs/libks/Makefile.am @@ -30,7 +30,7 @@ library_include_HEADERS += src/include/ks_dso.h src/include/ks_platform.h src/in library_include_HEADERS += src/include/ks_printf.h src/include/ks_hash.h src/include/ks_ssl.h src/include/kws.h library_include_HEADERS += src/utp/utp_internal.h src/utp/utp.h src/utp/utp_types.h src/utp/utp_callbacks.h src/utp/utp_templates.h library_include_HEADERS += src/utp/utp_hash.h src/utp/utp_packedsockaddr.h src/utp/utp_utils.h src/include/ks_utp.h -library_include_HEADERS += src/dht/ks_dht.h src/dht/ks_dht-int.h src/dht/ks_dht_bucket.h +library_include_HEADERS += src/dht/ks_dht.h src/dht/ks_dht-int.h tests: libks.la $(MAKE) -C test tests diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index a86881d31a..c1251bac93 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -3,7 +3,6 @@ #include "ks.h" #include "ks_bencode.h" -#include "ks_dht_bucket.h" KS_BEGIN_EXTERN_C @@ -25,7 +24,9 @@ typedef struct ks_dht_nodeid_s ks_dht_nodeid_t; typedef struct ks_dht_message_s ks_dht_message_t; typedef struct ks_dht_endpoint_s ks_dht_endpoint_t; typedef struct ks_dht_transaction_s ks_dht_transaction_t; - +typedef struct ks_dht_node_s ks_dht_node_t; +typedef struct ks_dhtrt_routetable_s ks_dhtrt_routetable_t; +typedef struct ks_dhtrt_querynodes_s ks_dhtrt_querynodes_t; typedef ks_status_t (*ks_dht_message_callback_t)(ks_dht_t *dht, ks_dht_message_t *message); @@ -36,6 +37,29 @@ struct ks_dht_nodeid_s { uint8_t id[KS_DHT_NODEID_SIZE]; }; +enum ipfamily { ifv4=AF_INET, ifv6=AF_INET6, ifboth=AF_INET+AF_INET6}; + +struct ks_dht_node_s { + ks_dht_nodeid_t nodeid; + ks_sockaddr_t addr; + enum ipfamily family; /* in: AF_INET or AF_INET6 or both */ + ks_dhtrt_routetable_t* table; +}; + +struct ks_dhtrt_routetable_s { + void* internal; + ks_pool_t* pool; + ks_logger_t logger; +}; + +struct ks_dhtrt_querynodes_s { + ks_dht_nodeid_t nodeid; /* in: id to query */ + enum ipfamily family; /* in: AF_INET or AF_INET6 or both */ + uint8_t max; /* in: maximum to return */ + uint8_t count; /* out: number returned */ + ks_dht_node_t* nodes[ KS_DHT_MESSAGE_QUERY_MAX_SIZE]; /* out: array of peers (ks_dht_node_t* nodes[incount]) */ +}; + struct ks_dht_message_s { ks_pool_t *pool; ks_dht_endpoint_t *endpoint; @@ -91,8 +115,8 @@ struct ks_dht_s { volatile uint32_t transactionid_next; ks_hash_t *transactions_hash; - ks_dhtrt_routetable *rt_ipv4; - ks_dhtrt_routetable *rt_ipv6; + ks_dhtrt_routetable_t *rt_ipv4; + ks_dhtrt_routetable_t *rt_ipv6; }; /** @@ -156,6 +180,33 @@ KS_DECLARE(ks_status_t) ks_dht_transaction_init(ks_dht_transaction_t *transactio uint32_t transactionid, ks_dht_message_callback_t callback); KS_DECLARE(ks_status_t) ks_dht_transaction_deinit(ks_dht_transaction_t *transaction); + + +/** + * route table methods + * + */ +KS_DECLARE(ks_dhtrt_routetable_t*) ks_dhtrt_initroute( ks_pool_t *pool, ks_dht_nodeid_t nodeid); +KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t* table ); + +KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table, + ks_dht_nodeid_t nodeid, + char* ip, unsigned short port, + ks_dht_node_t** node); + +KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t* table, ks_dht_node_t* node); + +KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t nodeid); +KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t nodeid); + +KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t* table, ks_dhtrt_querynodes_t* query); +KS_DECLARE(ks_dht_node_t*) ks_dhtrt_find_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t id); + +KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t* table); + +/* debugging aids */ +KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t* table, int level); + KS_END_EXTERN_C diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index aea4f1dcec..6ce5ff9bf6 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, Anthony Miiessaly II + * Copyright (c) 2016, FreeSWITCH * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,7 +34,7 @@ #pragma GCC optimize ("O0") -#include "ks_dht_bucket.h" +#include "ks_dht.h" /* change for testing */ @@ -47,143 +47,119 @@ #define DHTPEER_SUSPECT 2 #define DHTPEER_EXPIRED 3 -/* internal structures */ -typedef struct ks_dhtrt_rw_lock_s { - ks_pool_t* pool; - ks_mutex_t* mutex; - ks_cond_t* rcond; - volatile uint16_t read_count; - ks_cond_t* wcond; - volatile uint16_t write_count; /* hopefully never more than 1 ! */ -} ks_dhtrt_rw_lock; +typedef uint8_t ks_dhtrt_nodeid_t[KS_DHT_NODEID_SIZE]; +/* internal structures */ typedef struct ks_dhtrt_bucket_entry_s { - ks_time_t tyme; - unsigned char id[KS_DHT_IDSIZE]; - ks_dhtrt_node* gptr; /* ptr to peer */ + ks_time_t tyme; + uint8_t id[KS_DHT_NODEID_SIZE]; + ks_dht_node_t* gptr; /* ptr to peer */ uint8_t inuse; uint8_t outstanding_pings; uint8_t flags; /* active, suspect, expired */ -} ks_dhtrt_bucket_entry; +} ks_dhtrt_bucket_entry_t; typedef struct ks_dhtrt_bucket_s { - ks_dhtrt_bucket_entry entries[KS_DHT_BUCKETSIZE]; + ks_dhtrt_bucket_entry_t entries[KS_DHT_BUCKETSIZE]; uint8_t count; uint8_t expired_count; -} ks_dhtrt_bucket; + ks_rwl_t* lock; + uint8_t locked; +} ks_dhtrt_bucket_t; #define BHF_LEFT 0x80 -typedef struct ks_dhtrt_bucket_header { - struct ks_dhtrt_bucket_header* parent; - struct ks_dhtrt_bucket_header* left; - struct ks_dhtrt_bucket_header* right; - ks_dhtrt_bucket* bucket; +typedef struct ks_dhtrt_bucket_header_s { + struct ks_dhtrt_bucket_header_s* parent; + struct ks_dhtrt_bucket_header_s* left; + struct ks_dhtrt_bucket_header_s* right; + ks_dhtrt_bucket_t* bucket; ks_time_t tyme; /* last processed time */ - unsigned char mask[KS_DHT_IDSIZE]; /* node id mask */ + unsigned char mask[KS_DHT_NODEID_SIZE]; /* node id mask */ unsigned char flags; -} ks_dhtrt_bucket_header; +} ks_dhtrt_bucket_header_t; typedef struct ks_dhtrt_internal_s { - ks_dhtrt_bucket_header* buckets; /* root bucketheader */ - /* */ - -} ks_dhtrt_internal; + uint8_t localid[KS_DHT_NODEID_SIZE]; + ks_dhtrt_bucket_header_t* buckets; /* root bucketheader */ + ks_rwl_t* lock; /* lock for safe traversal of the tree */ + uint8_t locked; +} ks_dhtrt_internal_t; typedef struct ks_dhtrt_xort_s { unsigned int ix; /* index of bucket array */ - unsigned char xor[KS_DHT_IDSIZE]; /* corresponding xor value */ + unsigned char xor[KS_DHT_NODEID_SIZE]; /* corresponding xor value */ unsigned int nextix; -} ks_dhtrt_xort; +} ks_dhtrt_xort_t; typedef struct ks_dhtrt_sortedxors_s { - ks_dhtrt_bucket_header* bheader; - ks_dhtrt_xort xort[KS_DHT_BUCKETSIZE]; - unsigned char hixor[KS_DHT_IDSIZE]; + ks_dhtrt_bucket_header_t* bheader; + ks_dhtrt_xort_t xort[KS_DHT_BUCKETSIZE]; + unsigned char hixor[KS_DHT_NODEID_SIZE]; unsigned int startix; unsigned int count; struct ks_dhtrt_sortedxors_s* next; -} ks_dhtrt_sortedxors; +} ks_dhtrt_sortedxors_t; /* --- static functions ---- */ static -ks_dhtrt_bucket_header* ks_dhtrt_create_bucketheader( +ks_dhtrt_bucket_header_t* ks_dhtrt_create_bucketheader( ks_pool_t *pool, - ks_dhtrt_bucket_header* parent, + ks_dhtrt_bucket_header_t* parent, unsigned char* mask); static -ks_dhtrt_bucket* ks_dhtrt_create_bucket(ks_pool_t* pool); +ks_dhtrt_bucket_t* ks_dhtrt_create_bucket(ks_pool_t* pool); static -ks_dhtrt_bucket_header* ks_dhtrt_find_bucketheader(ks_dhtrt_routetable* table, unsigned char* id); +ks_dhtrt_bucket_header_t* ks_dhtrt_find_bucketheader(ks_dhtrt_routetable_t* table, ks_dhtrt_nodeid_t id); static -ks_dhtrt_bucket_entry* ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header* header, ks_dhtrt_nodeid id); +ks_dhtrt_bucket_entry_t* ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header_t* header, ks_dhtrt_nodeid_t id); static -void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header* original, ks_dhtrt_bucket_header* left, ks_dhtrt_bucket_header* right); +void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t* original, ks_dhtrt_bucket_header_t* left, ks_dhtrt_bucket_header_t* right); static -ks_dht_node_t* ks_dhtrt_find_nodeid(ks_dhtrt_bucket* bucket, ks_dhtrt_nodeid nodeid); +ks_dht_node_t* ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t* bucket, ks_dhtrt_nodeid_t nodeid); static void -ks_dhtrt_shiftright(unsigned char* id); +ks_dhtrt_shiftright(uint8_t* id); static -void ks_dhtrt_shiftleft(unsigned char* id); -static int -ks_dhtrt_xorcmp(const unsigned char *id1, const unsigned char *id2, const unsigned char *ref); +void ks_dhtrt_shiftleft(uint8_t* id); static void -ks_dhtrt_xor(const unsigned char *id1, const unsigned char *id2, unsigned char *xor); +ks_dhtrt_xor(const uint8_t* id1, const uint8_t* id2, uint8_t* xor); static int -ks_dhtrt_ismasked(const unsigned char *id1, const unsigned char *mask); +ks_dhtrt_ismasked(const uint8_t* id1, const uint8_t* mask); static -ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable* table, ks_dhtrt_node* node); +ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t* table, ks_dht_node_t* node); static -ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* node); +ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t* bucket, ks_dht_node_t* node); static -void ks_dhtrt_delete_id(ks_dhtrt_bucket* bucket, ks_dhtrt_nodeid id); +void ks_dhtrt_delete_id(ks_dhtrt_bucket_t* bucket, ks_dhtrt_nodeid_t id); static -char* ks_dhtrt_printableid(const unsigned char* id, char* buffer); +char* ks_dhtrt_printableid(uint8_t* id, char* buffer); static -unsigned char ks_dhtrt_isactive(ks_dhtrt_bucket_entry* entry); +unsigned char ks_dhtrt_isactive(ks_dhtrt_bucket_entry_t* entry); static -uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes* query, ks_dhtrt_sortedxors* xort); +uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t* query, ks_dhtrt_sortedxors_t* xort); static uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid, - ks_dhtrt_bucket_header* header, - ks_dhtrt_sortedxors* xors, + ks_dhtrt_bucket_header_t* header, + ks_dhtrt_sortedxors_t* xors, unsigned char* hixor, unsigned int max); static -void ks_dhtrt_ping(ks_dhtrt_bucket_entry* entry); - -static -ks_status_t ks_dhtrt_initrwlock( ks_dhtrt_rw_lock* lock); -static -void ks_dhtrt_deinitrwlock( ks_dhtrt_rw_lock* lock); - -static -void ks_dhtrt_getreadlock( ks_dhtrt_rw_lock* lock); -static -ks_status_t ks_dhtrt_tryreadlock( ks_dhtrt_rw_lock* lock); -static -void ks_dhtrt_releasereadlock( ks_dhtrt_rw_lock* lock); -static -void ks_dhtrt_getwritelock( ks_dhtrt_rw_lock* lock); -static -ks_status_t ks_dhtrt_trywritelock( ks_dhtrt_rw_lock* lock); -static -void ks_dhtrt_releasewritelock( ks_dhtrt_rw_lock* lock); +void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t* entry); /* debugging */ #define KS_DHT_DEBUGPRINTF_ - +/* # define KS_DHT_DEBUGPRINTFX_ very verbose */ /* @@ -196,21 +172,22 @@ void ks_dhtrt_releasewritelock( ks_dhtrt_rw_lock* lock); */ -KS_DECLARE(ks_dhtrt_routetable*) ks_dhtrt_initroute( ks_pool_t *pool, ks_dhtrt_nodeid localid) +KS_DECLARE(ks_dhtrt_routetable_t*) ks_dhtrt_initroute( ks_pool_t *pool, ks_dht_nodeid_t nodeid) { - ks_log(KS_LOG_ERROR, "hello world\n"); - unsigned char initmask[KS_DHT_IDSIZE]; + unsigned char initmask[KS_DHT_NODEID_SIZE]; memset(initmask, 0xff, sizeof(initmask)); - ks_dhtrt_routetable* table = ks_pool_alloc(pool, sizeof(ks_dhtrt_routetable)); - memset(table, 0, sizeof(ks_dhtrt_routetable)); + ks_dhtrt_routetable_t* table = ks_pool_alloc(pool, sizeof(ks_dhtrt_routetable_t)); + memset(table, 0, sizeof(ks_dhtrt_routetable_t)); - ks_dhtrt_internal* internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal)); - memset(internal, 0, sizeof(ks_dhtrt_internal)); + ks_dhtrt_internal_t* internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t)); + memset(internal, 0, sizeof(ks_dhtrt_internal_t)); + /*ks_rwl_create(&internal->lock, pool);*/ + if (nodeid.id != 0) memcpy(internal->localid, nodeid.id, KS_DHT_NODEID_SIZE); table->internal = internal; /* initialize root bucket */ - ks_dhtrt_bucket_header* initial_header = ks_dhtrt_create_bucketheader(pool, 0, initmask); + ks_dhtrt_bucket_header_t* initial_header = ks_dhtrt_create_bucketheader(pool, 0, initmask); initial_header->flags = BHF_LEFT; /* fake left to allow splitting */ internal->buckets = initial_header; initial_header->bucket = ks_dhtrt_create_bucket(pool); @@ -218,52 +195,67 @@ KS_DECLARE(ks_dhtrt_routetable*) ks_dhtrt_initroute( ks_pool_t *pool, ks_dhtrt_n return table; } -KS_DECLARE(void) ks_dhtrt_deinitroute( ks_dhtrt_routetable* table ) +KS_DECLARE(void) ks_dhtrt_deinitroute( ks_dhtrt_routetable_t* table ) { - /*todo*/ + /* @todo*/ ks_pool_free(table->pool, table); return; } -KS_DECLARE(ks_dhtrt_node*) ks_dhtrt_create_node(ks_dhtrt_routetable* table, ks_dhtrt_nodeid nodeid, ks_dht_node_t* node) +KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t* table, + ks_dht_nodeid_t nodeid, + char* ip, + unsigned short port, + ks_dht_node_t** node) { - /* first see if it exists */ - ks_dhtrt_node* peer = ks_dhtrt_find_node(table, nodeid); - if (peer != 0) { /* humm not sure - this might be an error */ - return peer; - } - peer = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_node)); - memset(peer, 0, sizeof(ks_dhtrt_node)); - memcpy(peer->id, nodeid, KS_DHT_IDSIZE); - ks_status_t status = ks_dhtrt_insert_node(table, peer); - if (status == KS_STATUS_FAIL) { - ks_pool_free(table->pool, peer); - return 0; - } - peer->handle = node; - return peer; + ks_dht_node_t* tnode = ks_dhtrt_find_node(table, nodeid); + if (tnode != 0) return KS_STATUS_FAIL; /* protect against duplicates */ + /* @todo - replace with reusable memory pool */ + tnode = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t)); + + tnode->table = table; + for(int i=0; i<5; ++i) { + if (ip[i] == ':') { tnode->family = AF_INET6; break;} + else if (ip[i] == '.') { tnode->family = AF_INET; break; } + } + memcpy(tnode->nodeid.id, nodeid.id, KS_DHT_NODEID_SIZE); + + if ( (ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) || + (ks_dhtrt_insert_node(table, tnode) != KS_STATUS_SUCCESS) ) { + ks_pool_free(table->pool, tnode); + return KS_STATUS_FAIL; + } + + (*node) = tnode; + return KS_STATUS_SUCCESS; } - -KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable* table, ks_dhtrt_node* peer) +KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t* table, ks_dht_node_t* node) { - ks_dhtrt_bucket_header* header = ks_dhtrt_find_bucketheader(table, peer->id); + ks_dhtrt_bucket_header_t* header = ks_dhtrt_find_bucketheader(table, node->nodeid.id); if (header != 0) { - ks_dhtrt_bucket* bucket = header->bucket; + ks_dhtrt_bucket_t* bucket = header->bucket; if (bucket != 0) { /* we were not able to find a bucket*/ - ks_dhtrt_delete_id(bucket, peer->id); + ks_dhtrt_delete_id(bucket, node->nodeid.id); } } - ks_pool_free(table->pool, peer); + ks_pool_free(table->pool, node); return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) ks_dhtrt_insert_node(ks_dhtrt_routetable* table, ks_dhtrt_node* peer) +static +ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t* table, ks_dht_node_t* node) { - ks_dhtrt_bucket* bucket = 0; + ks_dhtrt_bucket_t* bucket = 0; int insanity = 0; + + /* first see if it exists */ + ks_dht_node_t* peer = ks_dhtrt_find_node(table, node->nodeid); + if (peer != 0) { + return KS_STATUS_FAIL; + } - ks_dhtrt_bucket_header* header = ks_dhtrt_find_bucketheader(table, peer->id); + ks_dhtrt_bucket_header_t* header = ks_dhtrt_find_bucketheader(table, node->nodeid.id); bucket = header->bucket; assert(bucket != 0); /* we were not able to find a bucket*/ @@ -273,7 +265,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_insert_node(ks_dhtrt_routetable* table, ks_dhtr /* first - seek a stale entry to eject */ if (bucket->expired_count) { - ks_status_t s = ks_dhtrt_insert_id(bucket, peer); + ks_status_t s = ks_dhtrt_insert_id(bucket, node); if (s == KS_STATUS_SUCCESS) return KS_STATUS_SUCCESS; } @@ -286,19 +278,19 @@ KS_DECLARE(ks_status_t) ks_dhtrt_insert_node(ks_dhtrt_routetable* table, ks_dhtr if ( !(header->flags & BHF_LEFT) ) { /* only the left handside node can be split */ #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; - printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(peer->id, buffer)); + printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, buffer)); #endif return KS_STATUS_FAIL; } /* bucket must be split */ /* work out new mask */ - unsigned char newmask[KS_DHT_IDSIZE]; - memcpy(newmask, header->mask, KS_DHT_IDSIZE); - if (newmask[KS_DHT_IDSIZE-1] == 0) { /* no more bits to shift - is this possible */ + unsigned char newmask[KS_DHT_NODEID_SIZE]; + memcpy(newmask, header->mask, KS_DHT_NODEID_SIZE); + if (newmask[KS_DHT_NODEID_SIZE-1] == 0) { /* no more bits to shift - is this possible */ #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; - printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(peer->id, buffer)); + printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(peer->nodeid.id, buffer)); #endif return KS_STATUS_FAIL; } @@ -307,15 +299,15 @@ KS_DECLARE(ks_status_t) ks_dhtrt_insert_node(ks_dhtrt_routetable* table, ks_dhtr ks_dhtrt_shiftright(newmask); /* create the new bucket structures */ - ks_dhtrt_bucket_header* newleft = ks_dhtrt_create_bucketheader(table->pool, header, newmask); + ks_dhtrt_bucket_header_t* newleft = ks_dhtrt_create_bucketheader(table->pool, header, newmask); newleft->bucket = ks_dhtrt_create_bucket(table->pool); newleft->flags = BHF_LEFT; /* flag as left hand side - therefore splitable */ - ks_dhtrt_bucket_header* newright = ks_dhtrt_create_bucketheader(table->pool, header, header->mask); + ks_dhtrt_bucket_header_t* newright = ks_dhtrt_create_bucketheader(table->pool, header, header->mask); ks_dhtrt_split_bucket(header, newleft, newright); /* ok now we need to try again to see if the bucket has capacity */ /* which bucket do care about */ - if (ks_dhtrt_ismasked(peer->id, newleft->mask)) { + if (ks_dhtrt_ismasked(node->nodeid.id, newleft->mask)) { bucket = newleft->bucket; header = newleft; } @@ -328,28 +320,28 @@ KS_DECLARE(ks_status_t) ks_dhtrt_insert_node(ks_dhtrt_routetable* table, ks_dhtr #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; - printf("inserting nodeid %s ", ks_dhtrt_printableid(peer->id, buffer)); + printf("inserting nodeid %s ", ks_dhtrt_printableid(node->nodeid.id, buffer)); printf("into bucket %s\n", ks_dhtrt_printableid(header->mask, buffer)); #endif /* by this point we have a viable bucket */ - return ks_dhtrt_insert_id(bucket, peer); + return ks_dhtrt_insert_id(bucket, node); } -KS_DECLARE(ks_dht_node_t*) ks_dhtrt_find_node(ks_dhtrt_routetable* table, ks_dhtrt_nodeid nodeid) { - ks_dhtrt_bucket_header* header = ks_dhtrt_find_bucketheader(table, nodeid); - if (header != 0) return 0; - ks_dhtrt_bucket* bucket = header->bucket; - if (bucket != 0) return 0; /* probably a logic error ?*/ - return ks_dhtrt_find_nodeid(bucket, nodeid); +KS_DECLARE(ks_dht_node_t*) ks_dhtrt_find_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t nodeid) { + ks_dhtrt_bucket_header_t* header = ks_dhtrt_find_bucketheader(table, nodeid.id); + if (header == 0) return 0; + ks_dhtrt_bucket_t* bucket = header->bucket; + if (bucket == 0) return 0; /* probably a logic error ?*/ + return ks_dhtrt_find_nodeid(bucket, nodeid.id); } -KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable* table, ks_dhtrt_nodeid nodeid) +KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t nodeid) { - ks_dhtrt_bucket_header* header = ks_dhtrt_find_bucketheader(table, nodeid); + ks_dhtrt_bucket_header_t* header = ks_dhtrt_find_bucketheader(table, nodeid.id); if (header == 0) return KS_STATUS_FAIL; if (header->bucket == 0) return KS_STATUS_FAIL; - ks_dhtrt_bucket_entry* e = ks_dhtrt_find_bucketentry(header, nodeid); + ks_dhtrt_bucket_entry_t* e = ks_dhtrt_find_bucketentry(header, nodeid.id); if (e != 0) { e->tyme = ks_time_now(); e->outstanding_pings = 0; @@ -360,11 +352,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable* table, ks_dhtr return KS_STATUS_FAIL; } -KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable* table, ks_dhtrt_nodeid nodeid) +KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t nodeid) { - ks_dhtrt_bucket_header* header = ks_dhtrt_find_bucketheader(table, nodeid); + ks_dhtrt_bucket_header_t* header = ks_dhtrt_find_bucketheader(table, nodeid.id); if (header == 0) return KS_STATUS_FAIL; - ks_dhtrt_bucket_entry* e = ks_dhtrt_find_bucketentry(header, nodeid); + ks_dhtrt_bucket_entry_t* e = ks_dhtrt_find_bucketentry(header, nodeid.id); if (e != 0) { e->flags = DHTPEER_EXPIRED; return KS_STATUS_SUCCESS; @@ -372,7 +364,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable* table, ks_dht return KS_STATUS_FAIL; } -KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable* table, ks_dhtrt_querynodes* query) +KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t* table, ks_dhtrt_querynodes_t* query) { query->count = 0; uint8_t max = query->max; @@ -381,24 +373,24 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable* table, ks_dh if (max == 0) return 0; /* sanity check */ - ks_dhtrt_bucket_header* header = ks_dhtrt_find_bucketheader(table, query->id); + ks_dhtrt_bucket_header_t* header = ks_dhtrt_find_bucketheader(table, query->nodeid.id); #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; - printf("finding %d closest nodes for nodeid %s\n", max, ks_dhtrt_printableid(query->id, buffer)); + printf("finding %d closest nodes for nodeid %s\n", max, ks_dhtrt_printableid(query->nodeid.id, buffer)); printf(" starting at mask: %s\n", ks_dhtrt_printableid(header->mask, buffer)); #endif - ks_dhtrt_sortedxors xort0; + ks_dhtrt_sortedxors_t xort0; memset(&xort0, 0 , sizeof(xort0)); - ks_dhtrt_nodeid initid; - memset(initid, 0xff, KS_DHT_IDSIZE); + ks_dhtrt_nodeid_t initid; + memset(initid, 0xff, KS_DHT_NODEID_SIZE); xort0.bheader = header; /* step 1 - look at immediate bucket */ /* --------------------------------- */ - cnt = ks_dhtrt_findclosest_bucketnodes(query->id, header, &xort0, initid ,max); + cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, header, &xort0, initid ,max); max -= cnt; total += cnt; @@ -412,11 +404,11 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable* table, ks_dh /* step2 - look at sibling */ /* ----------------------- */ - ks_dhtrt_sortedxors xort1; + ks_dhtrt_sortedxors_t xort1; xort0.next = &xort1; memset(&xort1, 0 , sizeof(xort1)); - memcpy(initid, &xort0.hixor, KS_DHT_IDSIZE); - ks_dhtrt_bucket_header* parent = header->parent; + memcpy(initid, &xort0.hixor, KS_DHT_NODEID_SIZE); + ks_dhtrt_bucket_header_t* parent = header->parent; if (header == parent->left) { xort1.bheader = header = parent->right; } @@ -429,7 +421,7 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable* table, ks_dh } } - cnt = ks_dhtrt_findclosest_bucketnodes(query->id, header, &xort1, initid ,max); + cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, header, &xort1, initid ,max); max -= cnt; total += cnt; @@ -444,20 +436,20 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable* table, ks_dh /* step3 and beyond ... work left and right until the count is satisfied */ /* ---------------------------------------------------------------------- */ - memcpy(initid, &xort0.hixor, KS_DHT_IDSIZE); + memcpy(initid, &xort0.hixor, KS_DHT_NODEID_SIZE); - unsigned char leftid[KS_DHT_IDSIZE]; - unsigned char rightid[KS_DHT_IDSIZE]; - memcpy(leftid, xort0.bheader->mask, KS_DHT_IDSIZE); - memcpy(rightid, xort1.bheader->mask, KS_DHT_IDSIZE); + unsigned char leftid[KS_DHT_NODEID_SIZE]; + unsigned char rightid[KS_DHT_NODEID_SIZE]; + memcpy(leftid, xort0.bheader->mask, KS_DHT_NODEID_SIZE); + memcpy(rightid, xort1.bheader->mask, KS_DHT_NODEID_SIZE); int insanity = 0; - ks_dhtrt_bucket_header* lheader; - ks_dhtrt_bucket_header* rheader; - ks_dhtrt_sortedxors* prev = &xort1; - ks_dhtrt_sortedxors* tofree = 0; - ks_dhtrt_sortedxors* xortn; - ks_dhtrt_sortedxors* xortn1; + ks_dhtrt_bucket_header_t* lheader; + ks_dhtrt_bucket_header_t* rheader; + ks_dhtrt_sortedxors_t* prev = &xort1; + ks_dhtrt_sortedxors_t* tofree = 0; + ks_dhtrt_sortedxors_t* xortn; + ks_dhtrt_sortedxors_t* xortn1; do { lheader = 0; @@ -468,12 +460,12 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable* table, ks_dh ks_dhtrt_shiftleft(leftid); lheader = ks_dhtrt_find_bucketheader(table, leftid); if (lheader) { - xortn = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors)); - memset(xortn, 0, sizeof(ks_dhtrt_sortedxors)); + xortn = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t)); + memset(xortn, 0, sizeof(ks_dhtrt_sortedxors_t)); if (tofree == 0) tofree = xortn; prev->next = xortn; prev = xortn; - cnt += ks_dhtrt_findclosest_bucketnodes(query->id, lheader, xortn, leftid ,max); + cnt += ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, lheader, xortn, leftid ,max); max -= cnt; #ifdef KS_DHT_DEBUGPRINTF_ printf(" stage3: seaching left bucket header %s yielded %d nodes, total=%d\n", @@ -482,15 +474,15 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable* table, ks_dh } } - if (max > 0 && rightid[KS_DHT_IDSIZE-1] != 0x00) { + if (max > 0 && rightid[KS_DHT_NODEID_SIZE-1] != 0x00) { ks_dhtrt_shiftright(rightid); rheader = ks_dhtrt_find_bucketheader(table, rightid); if (rheader) { - xortn1 = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors)); - memset(xortn1, 0, sizeof(ks_dhtrt_sortedxors)); + xortn1 = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t)); + memset(xortn1, 0, sizeof(ks_dhtrt_sortedxors_t)); prev->next = xortn1; prev = xortn1; - cnt = ks_dhtrt_findclosest_bucketnodes(query->id, rheader, xortn1, rightid , max); + cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, rheader, xortn1, rightid , max); max -= cnt; #ifdef KS_DHT_DEBUGPRINTF_ printf(" stage3: seaching right bucket header %s yielded %d nodes, total=%d\n", @@ -511,14 +503,14 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable* table, ks_dh ks_dhtrt_load_query(query, &xort0); /* free up the xort structs on heap */ while(tofree) { - ks_dhtrt_sortedxors* x = tofree->next; + ks_dhtrt_sortedxors_t* x = tofree->next; ks_pool_free(table->pool, tofree); tofree = x->next; } return query->count; } -KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable* table) +KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t* table) { /* walk the table and update the status of all known knodes */ /* anything that is suspect automatically becomes expired */ @@ -532,18 +524,18 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable* table) /* inactive again it is considered inactive */ /* */ - ks_dhtrt_internal* internal = table->internal; - ks_dhtrt_bucket_header* header = internal->buckets; - ks_dhtrt_bucket_header* stack[KS_DHT_IDSIZE * 8]; + ks_dhtrt_internal_t* internal = table->internal; + ks_dhtrt_bucket_header_t* header = internal->buckets; + ks_dhtrt_bucket_header_t* stack[KS_DHT_NODEID_SIZE * 8]; int stackix=0; ks_time_t t0 = ks_time_now(); while(header) { stack[stackix++] = header; if (header->bucket) { - ks_dhtrt_bucket* b = header->bucket; + ks_dhtrt_bucket_t* b = header->bucket; for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) { - ks_dhtrt_bucket_entry* e = &b->entries[ix]; + ks_dhtrt_bucket_entry_t* e = &b->entries[ix]; if (e->inuse == 1) { /* more than n pings outstanding? */ if (e->outstanding_pings >= KS_DHTRT_MAXPING) { @@ -574,23 +566,22 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable* table) } -KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable* table, int level) { +KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t* table, int level) { /* dump buffer headers */ char buffer[100]; memset(buffer, 0, 100); - ks_dhtrt_internal* internal = table->internal; - ks_dhtrt_bucket_header* header = internal->buckets; - ks_dhtrt_bucket_header* stack[KS_DHT_IDSIZE * 8]; + ks_dhtrt_internal_t* internal = table->internal; + ks_dhtrt_bucket_header_t* header = internal->buckets; + ks_dhtrt_bucket_header_t* stack[KS_DHT_NODEID_SIZE * 8]; int stackix = 0; - while(header) { stack[stackix++] = header; /* walk and report left handsize */ memset(buffer, 0, 100); /*ks_log*/ printf("bucket header: [%s]\n", ks_dhtrt_printableid(header->mask, buffer) ); if (header->bucket) { - ks_dhtrt_bucket* b = header->bucket; + ks_dhtrt_bucket_t* b = header->bucket; printf(" bucket holds %d entries\n", b->count); if (level == 7) { @@ -615,40 +606,15 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable* table, int level) { return; } - - - - -/* stupid routines to avoid unused warnings */ -void colm() { - ks_dhtrt_shiftright(0); - ks_dhtrt_xor(0, 0, 0); - ks_dhtrt_xorcmp(0, 0, 0); - ks_dhtrt_split_bucket(0, 0, 0); - ks_dhtrt_shiftleft(0); - ks_dhtrt_initrwlock( 0); - ks_dhtrt_deinitrwlock( 0); - - ks_dhtrt_getreadlock( 0); - ks_dhtrt_getwritelock( 0); - ks_dhtrt_tryreadlock( 0); - ks_dhtrt_trywritelock( 0); - ks_dhtrt_releasereadlock( 0); - ks_dhtrt_releasewritelock( 0); - -} - - - /* internal functions */ static -ks_dhtrt_bucket_header* ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt_bucket_header* parent, unsigned char* mask) +ks_dhtrt_bucket_header_t* ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt_bucket_header_t* parent, uint8_t* mask) { - ks_dhtrt_bucket_header* header = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_header)); - memset(header, 0, sizeof(ks_dhtrt_bucket_header)); + ks_dhtrt_bucket_header_t* header = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_header_t)); + memset(header, 0, sizeof(ks_dhtrt_bucket_header_t)); memcpy(header->mask, mask, sizeof(header->mask)); header->parent = parent; @@ -662,22 +628,23 @@ ks_dhtrt_bucket_header* ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt_b } static -ks_dhtrt_bucket* ks_dhtrt_create_bucket(ks_pool_t *pool) +ks_dhtrt_bucket_t* ks_dhtrt_create_bucket(ks_pool_t *pool) { - ks_dhtrt_bucket* bucket = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket)); - memset(bucket, 0, sizeof(ks_dhtrt_bucket)); + ks_dhtrt_bucket_t* bucket = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_t)); + memset(bucket, 0, sizeof(ks_dhtrt_bucket_t)); + /*ks_rwl_create(&bucket->lock, pool);*/ return bucket; } static -ks_dhtrt_bucket_header* ks_dhtrt_find_bucketheader(ks_dhtrt_routetable* table, unsigned char* id) +ks_dhtrt_bucket_header_t* ks_dhtrt_find_bucketheader(ks_dhtrt_routetable_t* table, ks_dhtrt_nodeid_t id) { /* find the right bucket. if a bucket header has a bucket, it does not children so it must be the bucket to use */ - ks_dhtrt_internal* internal = table->internal; - ks_dhtrt_bucket_header* header = internal->buckets; + ks_dhtrt_internal_t* internal = table->internal; + ks_dhtrt_bucket_header_t* header = internal->buckets; while(header) { if ( header->bucket ) { return header; @@ -692,16 +659,16 @@ ks_dhtrt_bucket_header* ks_dhtrt_find_bucketheader(ks_dhtrt_routetable* table, u } static -ks_dhtrt_bucket_entry* ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header* header, ks_dhtrt_nodeid nodeid) +ks_dhtrt_bucket_entry_t* ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header_t* header, ks_dhtrt_nodeid_t nodeid) { - ks_dhtrt_bucket* bucket = header->bucket; + ks_dhtrt_bucket_t* bucket = header->bucket; if (bucket == 0) return 0; for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) { #ifdef KS_DHT_DEBUGPRINTF_ #endif if ( bucket->entries[ix].inuse == 1 && - (!memcmp(nodeid, bucket->entries[ix].id, KS_DHT_IDSIZE)) ) { + (!memcmp(nodeid, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) { return &(bucket->entries[ix]); } } @@ -710,21 +677,29 @@ ks_dhtrt_bucket_entry* ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header* header, static -void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header* original, ks_dhtrt_bucket_header* left, ks_dhtrt_bucket_header* right) +void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t* original, + ks_dhtrt_bucket_header_t* left, + ks_dhtrt_bucket_header_t* right) { /* so split the bucket in two based on the masks in the new header */ /* the existing bucket - with the remaining ids will be taken by the right hand side */ - ks_dhtrt_bucket* source = original->bucket; - ks_dhtrt_bucket* dest = left->bucket; + ks_dhtrt_bucket_t* source = original->bucket; + ks_dhtrt_bucket_t* dest = left->bucket; int lix = 0; int rix = 0; + + /* ****************** */ + /* bucket write lock */ + /* ****************** */ + /*ks_rwl_write_lock(source->lock);*/ + source->locked=1; for( ; rix<KS_DHT_BUCKETSIZE; ++rix) { if (ks_dhtrt_ismasked(source->entries[rix].id, left->mask)) { /* move it to the left */ - memcpy(dest->entries[lix].id, source->entries[rix].id, KS_DHT_IDSIZE); + memcpy(dest->entries[lix].id, source->entries[rix].id, KS_DHT_NODEID_SIZE); dest->entries[lix].gptr = source->entries[rix].gptr; dest->entries[lix].inuse = 1; ++lix; @@ -735,6 +710,12 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header* original, ks_dhtrt_bucket_hea --source->count; } } + /* *********************** */ + /* end bucket write lock */ + /* *********************** */ + source->locked=0; + /*ks_rwl_write_unlock(source->lock);*/ + /* give original bucket to the new left hand side header */ right->bucket = source; original->bucket = 0; @@ -756,7 +737,7 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header* original, ks_dhtrt_bucket_hea * so at least the static array does away with the need for locking. */ static -ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer) +ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t* bucket, ks_dht_node_t* node) { /* sanity checks */ if (!bucket || bucket->count >= KS_DHT_BUCKETSIZE) { @@ -776,10 +757,10 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer) else if (free == KS_DHT_BUCKETSIZE && bucket->entries[ix].flags == DHTPEER_EXPIRED) { expiredix = ix; } - else if (!memcmp(bucket->entries[ix].id, peer->id, KS_DHT_IDSIZE)) { + else if (!memcmp(bucket->entries[ix].id, node->nodeid.id, KS_DHT_NODEID_SIZE)) { #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; - printf("duplicate peer %s found at %d ", ks_dhtrt_printableid(peer->id, buffer), ix); + printf("duplicate peer %s found at %d ", ks_dhtrt_printableid(node->nodeid.id, buffer), ix); #endif bucket->entries[ix].tyme = ks_time_now(); bucket->entries[ix].flags &= DHTPEER_ACTIVE; @@ -787,6 +768,11 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer) } } + /* ****************** */ + /* bucket write lock */ + /* ****************** */ + /*ks_rwl_write_lock(bucket->lock);*/ + bucket->locked = 1; if (free == KS_DHT_BUCKETSIZE && expiredix<KS_DHT_BUCKETSIZE ) { /* bump this one - but only if we have no other option */ free = expiredix; @@ -795,53 +781,63 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket* bucket, ks_dhtrt_node* peer) if ( free<KS_DHT_BUCKETSIZE ) { bucket->entries[free].inuse = 1; - bucket->entries[free].gptr = peer; + bucket->entries[free].gptr = node; bucket->entries[free].tyme = ks_time_now(); bucket->entries[free].flags &= DHTPEER_ACTIVE; + ++bucket->count; - memcpy(bucket->entries[free].id, peer->id, KS_DHT_IDSIZE); + memcpy(bucket->entries[free].id, node->nodeid.id, KS_DHT_NODEID_SIZE); + bucket->locked = 0; + /*ks_rwl_write_unlock(bucket->lock);*/ #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; - printf("inserting peer %s ", ks_dhtrt_printableid(peer->id, buffer)); - printf("into bucket mask at index %d\n", free); -#endif + printf("Inserting node %s\n", ks_dhtrt_printableid(node->nodeid.id, buffer)); +#endif return KS_STATUS_SUCCESS; } - + bucket->locked = 0; + /*ks_rwl_write_unlock(bucket->lock);*/ + /* ********************** */ + /* end bucket write lock */ + /* ********************** */ + return KS_STATUS_FAIL; } static -ks_dht_node_t* ks_dhtrt_find_nodeid(ks_dhtrt_bucket* bucket, ks_dhtrt_nodeid nodeid) +ks_dht_node_t* ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t* bucket, ks_dhtrt_nodeid_t id) { #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; - printf("\nfind noeid for: %s\n", ks_dhtrt_printableid(nodeid, buffer)); + printf("find nodeid for: %s\n", ks_dhtrt_printableid(id, buffer)); #endif for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) { -#ifdef KS_DHT_DEBUGPRINTF_ - printf("\nbucket->entries[%d].id = %s inuse=%c\n", ix, - ks_dhtrt_printableid(bucket->entries[ix].id, buffer), +#ifdef KS_DHT_DEBUGPRINTFX_ + char bufferx[100]; + if ( bucket->entries[ix].inuse == 1) { + printf("\nbucket->entries[%d].id = %s inuse=%x\n", ix, + ks_dhtrt_printableid(bucket->entries[ix].id, bufferx), bucket->entries[ix].inuse ); + } #endif if ( bucket->entries[ix].inuse == 1 && - (!memcmp(nodeid, bucket->entries[ix].id, KS_DHT_IDSIZE)) ) { - return bucket->entries[ix].gptr->handle; + (!memcmp(id, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) { + return bucket->entries[ix].gptr; } } return 0; } static -void ks_dhtrt_delete_id(ks_dhtrt_bucket* bucket, ks_dhtrt_nodeid nodeid) +void ks_dhtrt_delete_id(ks_dhtrt_bucket_t* bucket, ks_dhtrt_nodeid_t id) { #ifdef KS_DHT_DEBUGPRINTF_ - char buffer[100]; - printf("\ndeleting node for: %s\n", ks_dhtrt_printableid(nodeid, buffer)); -#endif + char buffer[100]; + printf("\ndeleting node for: %s\n", ks_dhtrt_printableid(id, buffer)); +#endif for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) { #ifdef KS_DHT_DEBUGPRINTF_ @@ -850,7 +846,7 @@ void ks_dhtrt_delete_id(ks_dhtrt_bucket* bucket, ks_dhtrt_nodeid nodeid) bucket->entries[ix].inuse ); #endif if ( bucket->entries[ix].inuse == 1 && - (!memcmp(nodeid, bucket->entries[ix].id, KS_DHT_IDSIZE)) ) { + (!memcmp(id, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) { bucket->entries[ix].inuse = 0; bucket->entries[ix].gptr = 0; bucket->entries[ix].flags = 0; @@ -862,21 +858,21 @@ void ks_dhtrt_delete_id(ks_dhtrt_bucket* bucket, ks_dhtrt_nodeid nodeid) static -uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid, - ks_dhtrt_bucket_header* header, - ks_dhtrt_sortedxors* xors, +uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id, + ks_dhtrt_bucket_header_t* header, + ks_dhtrt_sortedxors_t* xors, unsigned char* hixor, /*todo: remove */ unsigned int max) { uint8_t count = 0; /* count of nodes added this time */ xors->startix = KS_DHT_BUCKETSIZE; xors->count = 0; - unsigned char xorvalue[KS_DHT_IDSIZE]; + unsigned char xorvalue[KS_DHT_NODEID_SIZE]; /* just ugh! - there must be a better way to do this */ /* walk the entire bucket calculating the xor value on the way */ /* add valid & relevant entries to the xor values */ - ks_dhtrt_bucket* bucket = header->bucket; + ks_dhtrt_bucket_t* bucket = header->bucket; if (bucket == 0) { /* sanity */ #ifdef KS_DHT_DEBUGPRINTF_ @@ -892,11 +888,11 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid, ks_dhtrt_isactive( &(bucket->entries[ix])) ) { /* calculate xor value */ - ks_dhtrt_xor(nodeid, bucket->entries[ix].id, xorvalue ); + ks_dhtrt_xor(bucket->entries[ix].id, id, xorvalue ); /* do we need to hold this one */ if ( count < max || /* yes: we have not filled the quota yet */ - (memcmp(xorvalue, hixor, KS_DHT_IDSIZE) < 0)) { /* or is closer node than one already selected */ + (memcmp(xorvalue, hixor, KS_DHT_NODEID_SIZE) < 0)) { /* or is closer node than one already selected */ /* now sort the new xorvalue into the results structure */ /* this now becomes worst case O(n*2) logic - is there a better way */ @@ -905,7 +901,7 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid, unsigned int prev_xorix = KS_DHT_BUCKETSIZE; for(int ix2=0; ix2<count; ++ix2) { - if (memcmp(xorvalue, xors->xort[xorix].xor, KS_DHT_IDSIZE) > 0) { + if (memcmp(xorvalue, xors->xort[xorix].xor, KS_DHT_NODEID_SIZE) > 0) { break; /* insert before xorix, after prev_xoris */ } @@ -917,7 +913,7 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid, count -> array slot to added newly identified node insert_point -> the array slot before which we need to insert the newly identified node */ - memcpy(xors->xort[count].xor, xorvalue, KS_DHT_IDSIZE); + memcpy(xors->xort[count].xor, xorvalue, KS_DHT_NODEID_SIZE); xors->xort[count].ix = ix; xors->xort[count].nextix = xorix; /* correct forward chain */ @@ -936,9 +932,9 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid, } static -uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes* query, ks_dhtrt_sortedxors* xort) +uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t* query, ks_dhtrt_sortedxors_t* xort) { - ks_dhtrt_sortedxors* current = xort; + ks_dhtrt_sortedxors_t* current = xort; uint8_t loaded = 0; while(current) { #ifdef KS_DHT_DEBUGPRINTF_ @@ -949,7 +945,7 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes* query, ks_dhtrt_sortedxors* xor int xorix = current->startix; for (uint8_t ix = 0; ix<= current->count && loaded < query->max; ++ix ) { unsigned int z = current->xort[xorix].ix; - query->nodes[ix] = current->bheader->bucket->entries[z].gptr->handle; + query->nodes[ix] = current->bheader->bucket->entries[z].gptr; ++loaded; } if (loaded >= query->max) break; @@ -959,7 +955,7 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes* query, ks_dhtrt_sortedxors* xor return loaded; } -void ks_dhtrt_ping(ks_dhtrt_bucket_entry* entry) { +void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t* entry) { ++entry->outstanding_pings; /* @todo */ /* set the appropriate command in the node and queue if for processing */ @@ -978,16 +974,16 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry* entry) { so format must be a right filled mask (hex: ..ffffffff) */ static -void ks_dhtrt_shiftright(unsigned char* id) +void ks_dhtrt_shiftright(uint8_t* id) { unsigned char b0 = 0; unsigned char b1 = 0; - for(int i = KS_DHT_IDSIZE-1; i >= 0; --i) { + for(int i = KS_DHT_NODEID_SIZE-1; i >= 0; --i) { if (id[i] == 0) break; /* beyond mask- we are done */ b1 = id[i] & 0x01; id[i] >>= 1; - if (i != (KS_DHT_IDSIZE-1)) { + if (i != (KS_DHT_NODEID_SIZE-1)) { id[i+1] |= (b0 << 7); } b0 = b1; @@ -996,9 +992,9 @@ void ks_dhtrt_shiftright(unsigned char* id) } static -void ks_dhtrt_shiftleft(unsigned char* id) { +void ks_dhtrt_shiftleft(uint8_t* id) { - for(int i = KS_DHT_IDSIZE-1; i >= 0; --i) { + for(int i = KS_DHT_NODEID_SIZE-1; i >= 0; --i) { if (id[i] == 0xff) continue; id[i] <<= 1; id[i] |= 0x01; @@ -1008,28 +1004,37 @@ void ks_dhtrt_shiftleft(unsigned char* id) { } /* Determine whether id1 or id2 is closer to ref */ -static int ks_dhtrt_xorcmp(const unsigned char *id1, const unsigned char *id2, const unsigned char *ref) + +/* +@todo: remove ? simple memcpy seems to do the job ? + +static int +ks_dhtrt_xorcmp(const uint8_t* id1, const uint8_t* id2, const uint8_t* ref); + +static int ks_dhtrt_xorcmp(const uint8_t* id1, const uint8_t* id2, const uint8_t* ref) { int i; - for (i = 0; i < KS_DHT_IDSIZE; i++) { - unsigned char xor1, xor2; + for (i = 0; i < KS_DHT_NODEID_SIZE; i++) { + uint8_t xor1, xor2; if (id1[i] == id2[i]) { continue; } xor1 = id1[i] ^ ref[i]; xor2 = id2[i] ^ ref[i]; if (xor1 < xor2) { - return -1; /* id1 is closer */ + return -1; / * id1 is closer * / } - return 1; /* id2 is closer */ + return 1; / * id2 is closer * / } - return 0; /* id2 and id2 are identical ! */ + return 0; / * id2 and id2 are identical ! * / } +*/ + /* create an xor value from two ids */ -static void ks_dhtrt_xor(const unsigned char *id1, const unsigned char *id2, unsigned char *xor) +static void ks_dhtrt_xor(const uint8_t* id1, const uint8_t* id2, uint8_t* xor) { - for (int i = 0; i < KS_DHT_IDSIZE; ++i) { + for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i) { if (id1[i] == id2[i]) { xor[i] = 0; } @@ -1039,9 +1044,9 @@ static void ks_dhtrt_xor(const unsigned char *id1, const unsigned char *id2, uns } /* is id masked by mask 1 => yes, 0=> no */ -static int ks_dhtrt_ismasked(const unsigned char *id, const unsigned char *mask) +static int ks_dhtrt_ismasked(const uint8_t* id, const unsigned char *mask) { - for (int i = 0; i < KS_DHT_IDSIZE; ++i) { + for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i) { if (mask[i] == 0 && id[i] != 0) return 0; else if (mask[i] == 0xff) return 1; else if (id[i] > mask[i]) return 0; @@ -1049,92 +1054,17 @@ static int ks_dhtrt_ismasked(const unsigned char *id, const unsigned char *mask) return 1; } -static -ks_status_t ks_dhtrt_initrwlock( ks_dhtrt_rw_lock* lock) -{ - ks_status_t s = ks_mutex_create(&lock->mutex, 0, lock->pool); - if (s != KS_STATUS_SUCCESS) return s; - s = ks_cond_create_ex(&lock->rcond, lock->pool, lock->mutex); - if (s != KS_STATUS_SUCCESS) return s; - s = ks_cond_create_ex(&lock->wcond, lock->pool, lock->mutex); - return s; -} - -static -void ks_dhtrt_deinitrwlock( ks_dhtrt_rw_lock* lock) -{ - ks_cond_destroy(&lock->rcond); - ks_cond_destroy(&lock->wcond); - ks_mutex_destroy(&lock->mutex); - memset(lock, 0, sizeof(ks_dhtrt_rw_lock)); -} - -static -void ks_dhtrt_getreadlock( ks_dhtrt_rw_lock* lock) -{ - ks_mutex_lock(lock->mutex); - while (lock->write_count > 0) { - ks_cond_wait(lock->rcond); - } - ++lock->read_count; - ks_mutex_unlock(lock->mutex); -} - -static -ks_status_t ks_dhtrt_tryreadlock( ks_dhtrt_rw_lock* lock) -{ - return KS_STATUS_FAIL; -} - -static -void ks_dhtrt_releasereadlock( ks_dhtrt_rw_lock* lock) -{ - ks_mutex_lock(lock->mutex); - --lock->read_count; - if (lock->read_count == 0) - ks_cond_signal(lock->wcond); - ks_mutex_unlock(lock->mutex); -} - -static -void ks_dhtrt_getwritelock( ks_dhtrt_rw_lock* lock) -{ - ks_mutex_lock(lock->mutex); - while (lock->read_count > 0) { - ks_cond_wait(lock->wcond); - } - ++lock->write_count; - ks_mutex_unlock(lock->mutex); -} - -static -ks_status_t ks_dhtrt_trywritelock( ks_dhtrt_rw_lock* lock) -{ - return KS_STATUS_FAIL; -} - -static -void ks_dhtrt_releasewritelock( ks_dhtrt_rw_lock* lock) -{ - ks_mutex_lock(lock->mutex); - --lock->write_count; - assert(lock->write_count==0); - ks_cond_broadcast(lock->rcond); - ks_mutex_unlock(lock->mutex); -} - - -static char* ks_dhtrt_printableid(const unsigned char* id, char* buffer) +static char* ks_dhtrt_printableid(uint8_t* id, char* buffer) { char* t = buffer; - memset(buffer, 0, KS_DHT_IDSIZE*2); - for (int i = 0; i < KS_DHT_IDSIZE; ++i, buffer+=2) { + memset(buffer, 0, KS_DHT_NODEID_SIZE*2); + for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i, buffer+=2) { sprintf(buffer, "%02x", id[i]); } return t; } -unsigned char ks_dhtrt_isactive(ks_dhtrt_bucket_entry* entry) +unsigned char ks_dhtrt_isactive(ks_dhtrt_bucket_entry_t* entry) { /* todo */ return 1; diff --git a/libs/libks/src/dht/ks_dht_bucket.h b/libs/libks/src/dht/ks_dht_bucket.h deleted file mode 100644 index 26f7704962..0000000000 --- a/libs/libks/src/dht/ks_dht_bucket.h +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (c) 2016 Colm Quinn - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * * Neither the name of the original author; nor the names of any contributors - * may be used to endorse or promote products derived from this software - * without specific prior written permission. - * - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER - * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _KS_DHT_BUCKETS_H_ -#define _KS_DHT_BUCKETS_H_ - -#ifdef __cplusplus -#define KS_BEGIN_EXTERN_C extern "C" { -#define KS_END_EXTERN_C } -#else -#define KS_BEGIN_EXTERN_C -#define KS_END_EXTERN_C -#endif - -#include "ks.h" - -KS_BEGIN_EXTERN_C - -/* @todo: temporary - replace with real definiton when available */ -typedef void ks_dht_node_t; - - -enum ks_dhtrt_nodestate_t {DHTRT_UNKNOWN, DHTRT_ACTIVE, DHTRT_SUSPECT, DHTRT_EXPIRED}; - -typedef ks_status_t (*ks_dhtrt_callback)(ks_dht_node_t*, enum ks_dhtrt_nodestate_t); - - -/* for testing */ -#define KS_DHT_BUCKETSIZE 20 -#define KS_DHT_IDSIZE 20 - - -typedef struct ks_dhtrt_node_s { - unsigned char id[KS_DHT_IDSIZE]; - ks_dht_node_t* handle; -} ks_dhtrt_node; - -typedef struct ks_dhtrt_routetable_s { - void* internal; /* ks_dhtrt_internal */ - ks_pool_t* pool; /* */ - ks_logger_t logger; -} ks_dhtrt_routetable; - -typedef struct ks_dhtrt_querynodes_s { - unsigned char id[KS_DHT_IDSIZE]; /* in: id to query */ - uint8_t max; /* in: maximum to return */ - uint8_t count; /* out: number returned */ - ks_dht_node_t* nodes[KS_DHT_BUCKETSIZE]; /* out: array of peers (ks_dht_node_t* peer[incount]) */ -} ks_dhtrt_querynodes; - - -typedef unsigned char ks_dhtrt_nodeid[KS_DHT_IDSIZE]; - -/* methods */ - -ks_dhtrt_routetable* ks_dhtrt_initroute( ks_pool_t *pool, ks_dhtrt_nodeid localid); -ks_status_t ks_dhtrt_registercallback(ks_dhtrt_callback, enum ks_dhtrt_nodestate_t); -void ks_dhtrt_deinitroute(ks_dhtrt_routetable* table ); - -ks_dhtrt_node* ks_dhtrt_create_node(ks_dhtrt_routetable* table, ks_dhtrt_nodeid nodeid, ks_dht_node_t* node); -ks_status_t ks_dhtrt_delete_node(ks_dhtrt_routetable* table, ks_dhtrt_node* node); - -ks_status_t ks_dhtrt_touch_node(ks_dhtrt_routetable* table, ks_dhtrt_nodeid nodeid); -ks_status_t ks_dhtrt_expire_node(ks_dhtrt_routetable* table, ks_dhtrt_nodeid nodeid); - -uint8_t ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable* table, ks_dhtrt_querynodes* query); -ks_dht_node_t* ks_dhtrt_find_node(ks_dhtrt_routetable* table, ks_dhtrt_nodeid id); - - -/* debugging aids */ -void ks_dhtrt_dump(ks_dhtrt_routetable* table, int level); -void ks_dhtrt_process_table(ks_dhtrt_routetable* table); - - -KS_END_EXTERN_C - -#endif -