FS-9775: Exclude non-active nodes from dhtrt_find_node

This commit is contained in:
colm 2016-12-19 16:14:23 -05:00 committed by Mike Jerris
parent 41731d553a
commit 51c1b7a719
2 changed files with 204 additions and 63 deletions

View File

@ -98,7 +98,7 @@ typedef struct ks_dhtrt_internal_s {
ks_time_t last_process_table;
ks_mutex_t *deleted_node_lock;
ks_dhtrt_deletednode_t *deleted_node;
ks_dhtrt_deletednode_t *free_nodes;
ks_dhtrt_deletednode_t *free_node_ex;
uint32_t deleted_count;
} ks_dhtrt_internal_t;
@ -184,10 +184,14 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry);
/* debugging */
#define KS_DHT_DEBUGPRINTF_
/* # define KS_DHT_DEBUGPRINTFX_ very verbose */
/* # define KS_DHT_DEBUGLOCKPRINTF_ debug locking */
/* very verbose */
/* # define KS_DHT_DEBUGPRINTFX_ */
/* debug locking */
/* # define KS_DHT_DEBUGLOCKPRINTF_ */
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool)
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
ks_pool_t *pool,
ks_thread_pool_t* tpool)
{
unsigned char initmask[KS_DHT_NODEID_SIZE];
memset(initmask, 0xff, sizeof(initmask));
@ -232,6 +236,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
unsigned short port,
ks_dht_node_t **node)
{
ks_dht_node_t *tnode;
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab write lock and insert */
@ -246,13 +251,15 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
bentry->flags = DHTPEER_ACTIVE;
}
(*node) = bentry->gptr;
tnode = bentry->gptr;
ks_rwl_read_lock( tnode->reflock);
ks_rwl_read_unlock(internal->lock);
(*node) = tnode;
return KS_STATUS_SUCCESS;
}
ks_rwl_read_unlock(internal->lock);
ks_dht_node_t *tnode = ks_dhtrt_make_node(table);
tnode = ks_dhtrt_make_node(table);
tnode->table = table;
for (int i = 0; i < 5; ++i) {
@ -274,7 +281,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
}
ks_status_t s = ks_dhtrt_insert_node(table, tnode);
if (tnode && s == KS_STATUS_SUCCESS) {
ks_rwl_read_lock( tnode->reflock);
}
(*node) = tnode;
return s;
@ -466,7 +477,7 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
ks_log(KS_LOG_DEBUG, "Insert node: read LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
ks_log(KS_LOG_DEBUG, "Find node: read LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
//fflush(stdout);
#endif
@ -477,7 +488,7 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_
ks_rwl_read_lock(node->reflock);
}
#ifdef KS_DHT_DEBUGLOCKPRINTF_
ks_log(KS_LOG_DEBUG, "Insert node: read UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
ks_log(KS_LOG_DEBUG, "Find node: read UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
//fflush(stdout);
#endif
ks_rwl_read_unlock(bucket->lock);
@ -857,10 +868,19 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL;
#ifdef KS_DHT_DEBUGPRINTF_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: internal->deleted_count %d\n", internal->deleted_count);
#endif
/* reclaim excess memory */
while(internal->deleted_count > KS_DHTRT_RECYCLE_NODE_THRESHOLD && deleted) {
ks_dht_node_t* node = deleted->node;
#ifdef KS_DHT_DEBUGPRINTFX__
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock\n");
#endif
if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) {
ks_rwl_destroy(&(node->reflock));
ks_pool_free(table->pool, &node);
@ -868,7 +888,9 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
deleted = deleted->next;
ks_pool_free(table->pool, &temp);
--internal->deleted_count;
#ifdef KS_DHT_DEBUGPRINTFX_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted: internal->deleted_count %d\n", internal->deleted_count);
#endif
if (prev != NULL) {
prev->next = deleted;
}
@ -878,10 +900,18 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
}
else {
#ifdef KS_DHT_DEBUGPRINTFX__
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock failed\n");
#endif
prev = deleted;
deleted = prev->next;
}
}
#ifdef KS_DHT_DEBUGPRINTF_
ks_log(KS_LOG_DEBUG, "ALLOC process_deleted exit: internal->deleted_count %d\n", internal->deleted_count);
#endif
ks_mutex_unlock(internal->deleted_node_lock);
}
@ -913,7 +943,7 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
memset(buffer, 0, 100);
if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer);
else strcpy(buffer, "<free>");
ks_log(KS_LOG_DEBUG, " slot %d: %s\n", ix, buffer);
ks_log(KS_LOG_DEBUG, " slot %d: %d %s\n", ix, b->entries[ix].flags, buffer);
}
ks_log(KS_LOG_DEBUG, " --------------------------\n\n");
@ -947,8 +977,8 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt
#ifdef KS_DHT_DEBUGPRINTF_
char buffer[100];
ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s ", ks_dhtrt_printableid(mask, buffer));
if (parent) ks_log(KS_LOG_DEBUG, "from parent mask: %s ", ks_dhtrt_printableid(parent->mask, buffer));
ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s\n", ks_dhtrt_printableid(mask, buffer));
if (parent) ks_log(KS_LOG_DEBUG, " ... from parent mask: %s\n", ks_dhtrt_printableid(parent->mask, buffer));
printf("\n");
#endif
return header;
@ -1049,7 +1079,7 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original,
char buffer[100];
ks_log(KS_LOG_DEBUG, "\nsplitting bucket orginal: %s\n", ks_dhtrt_printableid(original->mask, buffer));
ks_log(KS_LOG_DEBUG, " into (left) mask: %s size: %d\n", ks_dhtrt_printableid(left->mask, buffer), left->bucket->count);
ks_log(KS_LOG_DEBUG, " and (right) mask: %s size: %d\n\n", ks_dhtrt_printableid(right->mask, buffer), right->bucket->count);
ks_log(KS_LOG_DEBUG, " and (right) mask: %s size: %d\n", ks_dhtrt_printableid(right->mask, buffer), right->bucket->count);
#endif
return;
}
@ -1138,7 +1168,7 @@ ks_dht_node_t *ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
#ifdef KS_DHT_DEBUGPRINTFX_
char bufferx[100];
if ( bucket->entries[ix].inuse == 1) {
if ( bucket->entries[ix].inuse == 1 && bucket->entries[ix].flags == DHTPEER_ACTIVE ) {
ks_log(KS_LOG_DEBUG, "bucket->entries[%d].id = %s inuse=%x\n", ix,
ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
bucket->entries[ix].inuse );
@ -1162,7 +1192,7 @@ ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
#ifdef KS_DHT_DEBUGPRINTFX_
char bufferx[100];_
char bufferx[100];
ks_log(KS_LOG_DEBUG, "bucket->entries[%d].id = %s inuse=%c\n", ix,
ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
bucket->entries[ix].inuse );
@ -1315,19 +1345,22 @@ void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t*
{
ks_dhtrt_internal_t* internal = table->internal;
ks_mutex_lock(internal->deleted_node_lock);
ks_dhtrt_deletednode_t* deleted = internal->free_nodes;
ks_dhtrt_deletednode_t* deleted = internal->free_node_ex; /* grab a free stub */
if (deleted) {
internal->free_nodes = deleted->next;
internal->free_node_ex = deleted->next;
}
else {
deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t));
}
deleted->node = node;
deleted->next = internal->deleted_node;
internal->deleted_node = deleted;
deleted->next = internal->deleted_node;
internal->deleted_node = deleted; /* add to deleted queue */
++internal->deleted_count;
#ifdef KS_DHT_DEBUGPRINTFX_
ks_log(KS_LOG_DEBUG, "ALLOC: Queue for delete %d\n", internal->deleted_count);
#endif
ks_mutex_unlock(internal->deleted_node_lock);
}
@ -1340,12 +1373,15 @@ ks_dht_node_t* ks_dhtrt_make_node(ks_dhtrt_routetable_t* table)
/* to to reuse a deleted node */
if (internal->deleted_count) {
ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
node = deleted->node;
node = deleted->node; /* take the node */
memset(node, 0, sizeof(ks_dht_node_t));
deleted->node = 0; /* avoid accidents */
internal->deleted_node = deleted->next;
deleted->next = internal->free_nodes;
internal->free_nodes = deleted;
--internal->deleted_count;
deleted->next = internal->free_node_ex; /* save the stub for reuse */
--internal->deleted_count;
#ifdef KS_DHT_DEBUGPRINTFX_
ks_log(KS_LOG_DEBUG, "ALLOC: Reusing a node struct %d\n", internal->deleted_count);
#endif
}
ks_mutex_unlock(internal->deleted_node_lock);

View File

@ -447,27 +447,108 @@ void test06()
}
/* test resue of node memory */
void test50()
{
printf("*** testbuckets - test50 start\n"); fflush(stdout);
ks_dht_node_t* peer;
ks_dht_nodeid_t nodeid, nodeid2;
memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE);
memset(nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
char ipv6[] = "1234:1234:1234:1234";
char ipv4[] = "123.123.123.123";
unsigned short port = 7000;
enum ks_afflags_t both = ifboth;
ks_status_t status;
for (int i=0,i2=0; i<200; ++i, ++i2) {
if (i%20 == 0) {
nodeid.id[0] = nodeid.id[0] / 2;
if(i2%20 == 0) {
i2 = 0;
nodeid.id[1] = nodeid.id[1] / 2;
}
else {
++nodeid.id[2];
}
}
else {
++nodeid.id[1];
}
ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
}
memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE);
for (int i=0,i2=0; i<200; ++i, ++i2) {
if (i%20 == 0) {
nodeid.id[0] = nodeid.id[0] / 2;
if(i2%20 == 0) {
i2 = 0;
nodeid.id[1] = nodeid.id[1] / 2;
}
else {
++nodeid.id[2];
}
}
else {
++nodeid.id[1];
}
ks_dht_node_t *n = ks_dhtrt_find_node(rt, nodeid);
if (n != NULL) {
ks_dhtrt_release_node(n);
ks_dhtrt_delete_node(rt, n);
}
}
ks_dhtrt_process_table(rt);
memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE);
for (int i=0,i2=0; i<200; ++i, ++i2) {
if (i%20 == 0) {
nodeid.id[0] = nodeid.id[0] / 2;
if(i2%20 == 0) {
i2 = 0;
nodeid.id[1] = nodeid.id[1] / 2;
}
else {
++nodeid.id[2];
}
}
else {
++nodeid.id[1];
}
ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
}
printf("*** testbuckets - test50 start\n"); fflush(stdout);
return;
}
int main(int argc, char* argv[]) {
printf("testdhtbuckets - start\n");
int tests[10];
int tests[100];
if (argc == 0) {
tests[0] = 1;
tests[1] = 1;
tests[2] = 1;
tests[3] = 1;
tests[4] = 1;
tests[5] = 1;
tests[6] = 0;
tests[7] = 0;
tests[8] = 0;
tests[9] = 0;
}
else {
for(int tix=1; tix<10 && tix<argc; ++tix) {
for(int tix=1; tix<100 && tix<argc; ++tix) {
long i = strtol(argv[tix], NULL, 0);
tests[i] = 1;
tests[tix] = i;
}
}
@ -494,41 +575,65 @@ int main(int argc, char* argv[]) {
ks_dhtrt_initroute(&rt, pool, tpool);
ks_dhtrt_deinitroute(&rt);
if (tests[1] == 1) {
ks_dhtrt_initroute(&rt, pool, tpool);
test01();
ks_dhtrt_deinitroute(&rt);
for(int tix=0; tix<argc; ++tix) {
if (tests[tix] == 1) {
ks_dhtrt_initroute(&rt, pool, tpool);
test01();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 2) {
ks_dhtrt_initroute(&rt, pool, tpool);
test02();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 3) {
ks_dhtrt_initroute(&rt, pool, tpool);
test03();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 4) {
ks_dhtrt_initroute(&rt, pool, tpool);
test04();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 5) {
ks_dhtrt_initroute(&rt, pool, tpool);
test05();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 6) {
ks_dhtrt_initroute(&rt, pool, tpool);
test06();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 50) {
ks_dhtrt_initroute(&rt, pool, tpool);
test50();
ks_dhtrt_deinitroute(&rt);
continue;
}
}
if (tests[2] == 1) {
ks_dhtrt_initroute(&rt, pool, tpool);
test02();
ks_dhtrt_deinitroute(&rt);
}
if (tests[3] == 1) {
ks_dhtrt_initroute(&rt, pool, tpool);
test03();
ks_dhtrt_deinitroute(&rt);
}
if (tests[4] == 1) {
ks_dhtrt_initroute(&rt, pool, tpool);
test04();
ks_dhtrt_deinitroute(&rt);
}
if (tests[5] == 1) {
ks_dhtrt_initroute(&rt, pool, tpool);
test05();
ks_dhtrt_deinitroute(&rt);
}
if (tests[6] == 1) {
ks_dhtrt_initroute(&rt, pool, tpool);
test06();
ks_dhtrt_deinitroute(&rt);
}
return 0;
}