From eac02b764b1e6e7f72e2367d4b891d616091ed07 Mon Sep 17 00:00:00 2001 From: colm Date: Wed, 21 Dec 2016 18:43:22 -0500 Subject: [PATCH] FS-9775: Implement deinit dht routetable --- libs/libks/src/dht/ks_dht_bucket.c | 132 +++++++++++++++++++++++------ libs/libks/test/testbuckets.c | 15 +++- 2 files changed, 120 insertions(+), 27 deletions(-) diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index 6341d24296..c3f46602ac 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -44,7 +44,7 @@ #define KS_DHTRT_MAXPING 3 #define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60) #define KS_DHTRT_PROCESSTABLE_SHORTINTERVAL (120) -#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 0 +#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 100 /* peer flags */ #define DHTPEER_DUBIOUS 0 @@ -157,7 +157,7 @@ int ks_dhtrt_ismasked(const uint8_t *id1, const uint8_t *mask); static void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t *table, ks_dht_node_t* node); static -void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table); +void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table, int8_t all); static ks_dht_node_t *ks_dhtrt_make_node(ks_dhtrt_routetable_t *table); @@ -195,7 +195,7 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *table, ks_dhtrt_bucket_entry_t *entry); /* very verbose */ /* # define KS_DHT_DEBUGPRINTFX_ */ /* debug locking */ -#define KS_DHT_DEBUGLOCKPRINTF_ +/* #define KS_DHT_DEBUGLOCKPRINTF_ */ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_dht_t *dht, @@ -231,13 +231,52 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, return KS_STATUS_SUCCESS; } -KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table) +KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **tableP) { /* @todo*/ + ks_dhtrt_routetable_t* table = *tableP; - ks_pool_t *pool = (*table)->pool; + if (!table || !table->internal) { + return; + } - ks_pool_free(pool, &(*table)); + ks_dhtrt_internal_t* internal = table->internal; + ks_rwl_write_lock(internal->lock); /* grab write lock */ + ks_dhtrt_process_deleted(table, 1); + table->internal = NULL; /* make sure no other threads get in */ + ks_pool_t *pool = table->pool; + + + ks_dhtrt_bucket_header_t *header = internal->buckets; + ks_dhtrt_bucket_header_t *last_header; + ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8]; + int stackix=0; + + while (header) { + stack[stackix++] = header; + + if (header->bucket) { + ks_pool_free(pool, &header->bucket); + header->bucket = NULL; + } + last_header = header; + header = header->left; + + if (header == 0 && stackix > 1) { + stackix -= 2; + header = stack[stackix]; + header = header->right; +#ifdef KS_DHT_DEBUGLOCKPRINTFX_ + char buf[100]; + ks_log(KS_LOG_DEBUG,"deinit: freeing bucket header %s\n", ks_dhtrt_printableid(last_header->mask, buf)); +#endif + ks_pool_free(pool, &last_header); + } + } + + ks_pool_free(pool, &internal); + ks_pool_free(pool, &table); + *tableP = NULL; return; } @@ -249,8 +288,13 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, unsigned short port, ks_dht_node_t **node) { + if (!table || !table->internal) { + return KS_STATUS_FAIL; + } + ks_dht_node_t *tnode; ks_dhtrt_internal_t* internal = table->internal; + ks_rwl_read_lock(internal->lock); /* grab write lock and insert */ ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id); @@ -306,8 +350,13 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node) { + if (!table || !table->internal) { + return KS_STATUS_FAIL; + } + ks_status_t s = KS_STATUS_FAIL; ks_dhtrt_internal_t* internal = table->internal; + ks_rwl_read_lock(internal->lock); /* grab read lock */ ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id); @@ -339,8 +388,13 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh static ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node) { + if (!table || !table->internal) { + return KS_STATUS_FAIL; + } + ks_dhtrt_internal_t* internal = table->internal; - ks_dhtrt_bucket_t *bucket = 0; + ks_dhtrt_bucket_t *bucket = 0; + int insanity = 0; ks_rwl_write_lock(internal->lock); @@ -465,10 +519,13 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) { + if (!table || !table->internal) { + return NULL; + } - ks_dht_node_t* node = NULL; - + ks_dht_node_t* node = NULL; ks_dhtrt_internal_t* internal = table->internal; + ks_rwl_read_lock(internal->lock); /* grab read lock */ ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id); @@ -504,8 +561,13 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) { + if (!table || !table->internal) { + return KS_STATUS_FAIL; + } + ks_status_t s = KS_STATUS_FAIL; - ks_dhtrt_internal_t* internal = table->internal; + ks_dhtrt_internal_t* internal = table->internal; + ks_rwl_read_lock(internal->lock); /* grab read lock */ ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id); @@ -542,8 +604,13 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) { + if (!table || !table->internal) { + return KS_STATUS_FAIL; + } + ks_status_t s = KS_STATUS_FAIL; ks_dhtrt_internal_t *internal = table->internal; + ks_rwl_read_lock(internal->lock); /* grab read lock */ ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id); @@ -563,8 +630,14 @@ KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table, ks_dh KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query) { - uint8_t count = 0; - ks_dhtrt_internal_t *internal = table->internal; + if (!table || !table->internal) { + return KS_STATUS_FAIL; + query->count = 0; + } + + uint8_t count = 0; + ks_dhtrt_internal_t *internal = table->internal; + ks_rwl_read_lock(internal->lock); /* grab read lock */ count = ks_dhtrt_findclosest_locked_nodes(table, query); ks_rwl_read_unlock(internal->lock); /* release read lock */ @@ -822,7 +895,11 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) /* inactive again it is considered inactive */ /* */ - ks_dhtrt_internal_t *internal = table->internal; + if (!table || !table->internal) { + return; + } + + ks_dhtrt_internal_t *internal = table->internal; int ping_count = 0; ks_time_t t0 = ks_time_now_sec(); @@ -875,8 +952,9 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) if ( e->flags != DHTPEER_EXPIRED && e->outstanding_pings >= KS_DHTRT_MAXPING ) { #ifdef KS_DHT_DEBUGPRINTF_ + char buf1[100]; ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n", - ks_dhtrt_printableid(e->id, buf)); + ks_dhtrt_printableid(e->id, buf1)); #endif e->flags = DHTPEER_EXPIRED; ++b->expired_count; @@ -912,8 +990,8 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) } /* end for each bucket_entry */ #ifdef KS_DHT_DEBUGLOCKPRINTF_ - char buf1[100]; - ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1)); + char buf[100]; + ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); #endif ks_rwl_write_unlock(b->lock); @@ -921,8 +999,8 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) } /* end of if trywrite_lock successful */ else { #ifdef KS_DHT_DEBUGPRINTF_ - char buf2[100]; - ks_log(KS_LOG_DEBUG,"process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf2)); + char buf1[100]; + ks_log(KS_LOG_DEBUG,"process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf1)); #endif } } @@ -937,7 +1015,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) } ks_rwl_read_unlock(internal->lock); /* release read lock */ - ks_dhtrt_process_deleted(table); + ks_dhtrt_process_deleted(table, 0); if (ping_count == 0) { internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL; @@ -950,7 +1028,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) return; } -void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table) +void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table, int8_t all) { ks_dhtrt_internal_t* internal = table->internal; ks_mutex_lock(internal->deleted_node_lock); @@ -958,18 +1036,22 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table) ks_dhtrt_deletednode_t *deleted = internal->deleted_node; ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL; -#ifdef KS_DHT_DEBUGPRINTF_ +#ifdef KS_DHT_DEBUGPRINTFX_ ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: internal->deleted_count %d\n", internal->deleted_count); #endif /* reclaim excess memory */ - printf("%d %d %p\n", internal->deleted_count, KS_DHTRT_RECYCLE_NODE_THRESHOLD, (void*)deleted); fflush(stdout); + uint32_t threshold = KS_DHTRT_RECYCLE_NODE_THRESHOLD; - while(internal->deleted_count > KS_DHTRT_RECYCLE_NODE_THRESHOLD && deleted) { + if (all) { + threshold = 1; + } + + while(internal->deleted_count > threshold && deleted) { ks_dht_node_t* node = deleted->node; -#ifdef KS_DHT_DEBUGPRINTF_ +#ifdef KS_DHT_DEBUGPRINTFX_ ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock\n"); #endif @@ -992,7 +1074,7 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table) } else { -#ifdef KS_DHT_DEBUGPRINTF_ +#ifdef KS_DHT_DEBUGPRINTFX_ ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock failed\n"); #endif prev = deleted; diff --git a/libs/libks/test/testbuckets.c b/libs/libks/test/testbuckets.c index a463b3380c..86d4c85b61 100644 --- a/libs/libks/test/testbuckets.c +++ b/libs/libks/test/testbuckets.c @@ -357,6 +357,8 @@ ks_dht_node_t* g_peer; static void *testnodelocking_ex1(ks_thread_t *thread, void *data) { + //lock=3 on entry + ks_dhtrt_release_node(g_peer); //lock=2 ks_dhtrt_release_node(g_peer); //lock=1 ks_dhtrt_release_node(g_peer); //lock=0 return NULL; @@ -364,9 +366,17 @@ static void *testnodelocking_ex1(ks_thread_t *thread, void *data) static void *testnodelocking_ex2(ks_thread_t *thread, void *data) { - ks_dht_node_t* peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=4 + // lock=4 on entry + ks_dht_node_t* peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=5 + ks_dhtrt_release_node(peer2); //lock=4 + ks_dhtrt_sharelock_node(peer2); //lock=5 + ks_dhtrt_release_node(peer2); //lock=4 + ks_dhtrt_sharelock_node(peer2); //lock=5 + ks_dhtrt_release_node(peer2); //lock=4 ks_dhtrt_release_node(peer2); //lock=3 - ks_dhtrt_release_node(peer2); //lock=2 + ks_dhtrt_find_node(rt, g_nodeid1); //lock=4 + ks_dhtrt_release_node(peer2); //lock=3 + return NULL; } @@ -391,6 +401,7 @@ void test06() peer2 = ks_dhtrt_find_node(rt, g_nodeid1); //lock=4 ks_dhtrt_release_node(peer2); //lock=3 + ks_dhtrt_sharelock_node(peer2); //lock=4 g_peer = peer2;