diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index b14a151237..50ed2c6d77 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -537,7 +537,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid * If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint. */ if (ep->addr.family == AF_INET) { - if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done; + if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht, dht->pool)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_create_node(dht->rt_ipv4, ep->nodeid, KS_DHT_LOCAL, @@ -545,7 +545,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ep->addr.port, &ep->node)) != KS_STATUS_SUCCESS) goto done; } else { - if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done; + if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht, dht->pool)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_create_node(dht->rt_ipv6, ep->nodeid, KS_DHT_LOCAL, diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 3546baf525..aacd74b43c 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -89,7 +89,7 @@ enum ks_dht_nodetype_t { KS_DHT_REMOTE=0x01, struct ks_dht_node_s { ks_dht_nodeid_t nodeid; ks_sockaddr_t addr; - enum ks_afflags_t family; /* AF_INET or AF_INET6 */ +// enum ks_afflags_t family; /* AF_INET or AF_INET6 */ enum ks_dht_nodetype_t type; /* local or remote */ ks_dhtrt_routetable_t* table; ks_rwl_t *reflock; @@ -425,8 +425,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, */ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_dht_t *dht, - ks_pool_t *pool, - ks_thread_pool_t* tpool); + ks_pool_t *pool); KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table); KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table, diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index 1778e297f3..32e2183e5e 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -59,8 +59,6 @@ typedef struct ks_dhtrt_bucket_entry_s { ks_time_t tyme; uint8_t id[KS_DHT_NODEID_SIZE]; ks_dht_node_t *gptr; /* ptr to peer */ - enum ks_dht_nodetype_t type; - enum ks_afflags_t family; uint8_t inuse; uint8_t outstanding_pings; uint8_t flags; /* active, suspect, expired */ @@ -98,7 +96,6 @@ typedef struct ks_dhtrt_internal_s { uint8_t localid[KS_DHT_NODEID_SIZE]; ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */ ks_dht_t *dht; - ks_thread_pool_t *tpool; ks_rwl_t *lock; /* lock for safe traversal of the tree */ ks_time_t last_process_table; ks_time_t next_process_table_delta; @@ -169,8 +166,6 @@ static ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id); static char *ks_dhtrt_printableid(uint8_t *id, char *buffer); -static -unsigned char ks_dhtrt_isactive(ks_dhtrt_bucket_entry_t *entry); static uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query); @@ -199,8 +194,7 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *table, ks_dhtrt_bucket_entry_t *entry); KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_dht_t *dht, - ks_pool_t *pool, - ks_thread_pool_t* tpool) + ks_pool_t *pool) { (void)ks_dhtrt_find_relatedbucketheader; @@ -212,7 +206,6 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_dhtrt_internal_t *internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t)); ks_rwl_create(&internal->lock, pool); - internal->tpool = tpool; internal->dht = dht; internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL; ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool); @@ -301,6 +294,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, assert(header != NULL); /* should always find a header */ ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id); + if (bentry != 0) { bentry->tyme = ks_time_now_sec(); @@ -319,18 +313,20 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, tnode = ks_dhtrt_make_node(table); tnode->table = table; + enum ks_afflags_t family; + for (int i = 0; i < 5; ++i) { if (ip[i] == ':') { - tnode->family = AF_INET6; break; + family = AF_INET6; break; } else if (ip[i] == '.') { - tnode->family = AF_INET; break; + family = AF_INET; break; } } memcpy(tnode->nodeid.id, nodeid.id, KS_DHT_NODEID_SIZE); tnode->type = type; - if (( ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) || + if (( ks_addr_set(&tnode->addr, ip, port, family) != KS_STATUS_SUCCESS) || ( ks_rwl_create(&tnode->reflock, table->pool) != KS_STATUS_SUCCESS)) { ks_pool_free(table->pool, &tnode); ks_rwl_read_unlock(internal->lock); @@ -1031,7 +1027,7 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table, int8_t all) ks_dhtrt_deletednode_t *deleted = internal->deleted_node; ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL; -#ifdef KS_DHT_DEBUGPRINTFX_ +#ifdef KS_DHT_DEBUGPRINTF_ ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: internal->deleted_count %d\n", internal->deleted_count); #endif @@ -1040,14 +1036,14 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table, int8_t all) uint32_t threshold = KS_DHTRT_RECYCLE_NODE_THRESHOLD; if (all) { - threshold = 1; + threshold = 0; } while(internal->deleted_count > threshold && deleted) { ks_dht_node_t* node = deleted->node; #ifdef KS_DHT_DEBUGPRINTFX_ - ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock\n"); + ks_log(KS_LOG_DEBUG, "ALLOC process_deleted : try write lock\n"); #endif if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) { @@ -1057,8 +1053,8 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table, int8_t all) deleted = deleted->next; ks_pool_free(table->pool, &temp); --internal->deleted_count; -#ifdef KS_DHT_DEBUGPRINTF_ - ks_log(KS_LOG_DEBUG, "ALLOC process_deleted: internal->deleted_count %d\n", internal->deleted_count); +#ifdef KS_DHT_DEBUGPRINTFX__ + ks_log(KS_LOG_DEBUG, "ALLOC process_deleted: internal->deleted_count reduced to %d\n", internal->deleted_count); #endif if (prev != NULL) { prev->next = deleted; @@ -1069,8 +1065,8 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table, int8_t all) } else { -#ifdef KS_DHT_DEBUGPRINTFX_ - ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock failed\n"); +#ifdef KS_DHT_DEBUGPRINTF_ + ks_log(KS_LOG_DEBUG, "ALLOC process_deleted : try write lock failed\n"); #endif prev = deleted; deleted = prev->next; @@ -1117,7 +1113,7 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) { b->entries[ix].flags, b->entries[ix].outstanding_pings, n->type, - n->family, + n->addr.family, buffer); } else { @@ -1263,8 +1259,6 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original, /* move it to the left */ memcpy(dest->entries[lix].id, source->entries[rix].id, KS_DHT_NODEID_SIZE); dest->entries[lix].gptr = source->entries[rix].gptr; - dest->entries[lix].family = source->entries[rix].family; - dest->entries[lix].type = source->entries[rix].type; dest->entries[lix].inuse = 1; ++lix; ++dest->count; @@ -1341,8 +1335,6 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node) if ( freeentries[free].inuse = 1; bucket->entries[free].gptr = node; - bucket->entries[free].type = node->type; - bucket->entries[free].family = node->family; bucket->entries[free].tyme = ks_time_now_sec(); bucket->entries[free].flags = DHTPEER_DUBIOUS; @@ -1454,11 +1446,10 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id, for (uint8_t ix=0; ixentries[ix].inuse == 1 && /* in use */ - bucket->entries[ix].flags == DHTPEER_ACTIVE && /* not dubious or expired */ - (family == ifboth || bucket->entries[ix].family == family) && /* match if family */ - (bucket->entries[ix].type & type) && /* match type */ - ks_dhtrt_isactive( &(bucket->entries[ix])) ) { + if ( bucket->entries[ix].inuse == 1 && /* in use */ + bucket->entries[ix].flags == DHTPEER_ACTIVE && /* not dubious or expired */ + (family == ifboth || bucket->entries[ix].gptr->addr.family == family) && /* match if family */ + (bucket->entries[ix].gptr->type & type) ) { /* match type */ /* calculate xor value */ ks_dhtrt_xor(bucket->entries[ix].id, id, xorvalue ); @@ -1561,7 +1552,7 @@ void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t* deleted->next = internal->deleted_node; internal->deleted_node = deleted; /* add to deleted queue */ ++internal->deleted_count; -#ifdef KS_DHT_DEBUGPRINTFX_ +#ifdef KS_DHT_DEBUGPRINTF_ ks_log(KS_LOG_DEBUG, "ALLOC: Queue for delete %d\n", internal->deleted_count); #endif ks_mutex_unlock(internal->deleted_node_lock); diff --git a/libs/libks/test/testbuckets.c b/libs/libks/test/testbuckets.c index 29bd657643..421371be38 100644 --- a/libs/libks/test/testbuckets.c +++ b/libs/libks/test/testbuckets.c @@ -31,10 +31,10 @@ void test01() printf("**** testbuckets - test01 start\n"); fflush(stdout); ks_dhtrt_routetable_t *rt; - ks_dhtrt_initroute(&rt, dht, pool, tpool); + ks_dhtrt_initroute(&rt, dht, pool); ks_dhtrt_deinitroute(&rt); - ks_dhtrt_initroute(&rt, dht, pool, tpool); + ks_dhtrt_initroute(&rt, dht, pool); ks_dht_nodeid_t nodeid, homeid; memset(homeid.id, 0xdd, KS_DHT_NODEID_SIZE); homeid.id[19] = 0; @@ -183,43 +183,43 @@ void test03() for (int i=0; i<200; ++i) { if (i%10 == 0) { - ++nodeid.id[0]; - nodeid.id[1] = 0; + ++nodeid.id[0]; + nodeid.id[1] = 0; } else { - ++nodeid.id[1]; + ++nodeid.id[1]; } ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); - if (s0 == KS_STATUS_SUCCESS) { - ks_dhtrt_touch_node(rt, nodeid); - ++ipv4_remote; + if (s0 == KS_STATUS_SUCCESS) { + ks_dhtrt_touch_node(rt, nodeid); + ++ipv4_remote; } } for (int i=0; i<2; ++i) { - if (i%10 == 0) { - ++nodeid.id[0]; - nodeid.id[1] = 0; + if (i%10 == 0) { + ++nodeid.id[0]; + nodeid.id[1] = 0; } else { - ++nodeid.id[1]; + ++nodeid.id[1]; } ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer); if (s0 == KS_STATUS_SUCCESS) { - ks_dhtrt_touch_node(rt, nodeid); - ++ipv4_local; + ks_dhtrt_touch_node(rt, nodeid); + ++ipv4_local; } } for (int i=0; i<201; ++i) { if (i%10 == 0) { - ++nodeid.id[0]; - nodeid.id[1] = 0; + ++nodeid.id[0]; + nodeid.id[1] = 0; } else { - ++nodeid.id[1]; + ++nodeid.id[1]; } ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer); ks_dhtrt_touch_node(rt, nodeid); @@ -274,20 +274,20 @@ void test04() for (int i=0,i2=0,i3=0; i<10000; ++i, ++i2, ++i3) { if (i%20 == 0) { - nodeid.id[0] = nodeid.id[0] / 2; - if (i2%20 == 0) { - nodeid.id[1] = nodeid.id[1] / 2; - i2 = 0; - if (i3%20 == 0) { - nodeid.id[2] = nodeid.id[2] / 2; - } - } - else { - ++nodeid.id[3]; - } + nodeid.id[0] = nodeid.id[0] / 2; + if (i2%20 == 0) { + nodeid.id[1] = nodeid.id[1] / 2; + i2 = 0; + if (i3%20 == 0) { + nodeid.id[2] = nodeid.id[2] / 2; + } + } + else { + ++nodeid.id[3]; + } } else { - ++nodeid.id[1]; + ++nodeid.id[1]; } ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); ks_dhtrt_touch_node(rt, nodeid); @@ -395,8 +395,8 @@ void test06() ks_dhtrt_touch_node(rt, g_nodeid1); ks_dht_node_t *peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=2 - peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=3 - peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=4 + peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=3 + peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=4 ks_dhtrt_release_node(peer2); //lock=3 ks_dhtrt_sharelock_node(peer2); //lock=4 @@ -423,6 +423,50 @@ void test06() return; } +void test07() +{ + printf("**** testbuckets - test07 start\n"); fflush(stdout); + + ks_dht_node_t *peer; + memset(g_nodeid1.id, 0xef, KS_DHT_NODEID_SIZE); + memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE); + + char ipv6[] = "1234:1234:1234:1234"; + char ipv4[] = "123.123.123.123"; + unsigned short port = 7000; + + /* build a delete queue */ + + for(int i0=0, i1=0; i0<150; ++i0, ++i1) { + if (i0%20 == 0) { + g_nodeid2.id[0]>>=1; + } + else { + ++ g_nodeid2.id[19]; + } + ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, &peer); + ks_dhtrt_touch_node(rt, g_nodeid2); + ks_dhtrt_release_node(peer); + } + + memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE); + for (int i0=0, i1=0; i0<150; ++i0, ++i1) { + if (i0%20 == 0) { + g_nodeid2.id[0]>>=1; + } + else { + ++ g_nodeid2.id[19]; + } + ks_dht_node_t* n = ks_dhtrt_find_node(rt, g_nodeid2); + ks_dhtrt_release_node(n); + ks_dhtrt_delete_node(rt, n); + } + + ks_dhtrt_process_table(rt); + + printf("**** test07 should delete 100 nodes, leaving 50\n"); fflush(stdout); + printf("**** testbuckets - test07 ended\n"); fflush(stdout); +} static int gindex = 1; @@ -461,8 +505,8 @@ static void *test60ex2(ks_thread_t *thread, void *data) ks_sleep(10000); for (int i=0; i