From a698651018292fafb024a91c4e8226451629c2ea Mon Sep 17 00:00:00 2001 From: colm Date: Fri, 30 Dec 2016 20:30:11 -0500 Subject: [PATCH] FS-9775: DHT Repopulate empty buckets --- libs/libks/src/dht/ks_dht-int.h | 5 + libs/libks/src/dht/ks_dht.c | 37 ++++++ libs/libks/src/dht/ks_dht.h | 9 ++ libs/libks/src/dht/ks_dht_bucket.c | 175 ++++++++++++++++------------- libs/libks/src/dht/ks_dht_job.c | 17 +++ libs/libks/test/testbuckets.c | 128 ++++++++++++++++++++- 6 files changed, 289 insertions(+), 82 deletions(-) diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 1c33b91478..76d797ef1c 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -296,6 +296,11 @@ KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job, ks_dht_token_t *token, int64_t cas, ks_dht_storageitem_t *item); +KS_DECLARE(void) ks_dht_job_build_search_findnode(ks_dht_job_t *job, + ks_dht_nodeid_t *target, + uint32_t family, + ks_dht_job_callback_t query_callback, + ks_dht_job_callback_t finish_callback); KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job); diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index 8392ba4017..a40c1cf8c5 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -3046,6 +3046,43 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t return ret; } +KS_DECLARE(ks_status_t) ks_dht_exec_search_findnode(ks_dht_t *dht, ks_dht_job_t *job) +{ + return ks_dht_search_findnode(dht, + job->query_family, + &job->query_target, + NULL, + NULL); +} + +KS_DECLARE(ks_status_t) ks_dht_queue_search_findnode(ks_dht_t* dht, + ks_dhtrt_routetable_t *rt, + ks_dht_nodeid_t *target, + ks_dht_job_callback_t callback) +{ + ks_dht_job_t *job = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(dht); + ks_assert(rt); + ks_assert(target); + + ks_sockaddr_t taddr; /* just to satisfy the api */ + + if ((ret = ks_dht_job_create(&job, dht->pool, &taddr, 3)) == KS_STATUS_SUCCESS) { + + int32_t family = AF_INET; + + if (rt == dht->rt_ipv6) { + family = AF_INET6; + } + + ks_dht_job_build_search_findnode(job, target, family, ks_dht_exec_search_findnode, callback); + } + + return ret; +} + /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 7f869f67ea..019688682d 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -145,6 +145,7 @@ struct ks_dht_job_s { int64_t query_cas; ks_dht_token_t query_token; ks_dht_storageitem_t *query_storageitem; + uint32_t query_family; // job specific response parameters ks_dht_nodeid_t response_id; @@ -480,6 +481,14 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht, ks_dht_search_callback_t callback, ks_dht_search_t **search); +KS_DECLARE(ks_status_t) ks_dht_queue_search_findnode(ks_dht_t* dht, + ks_dhtrt_routetable_t *rt, + ks_dht_nodeid_t *target, + ks_dht_job_callback_t callback); + +KS_DECLARE(ks_status_t) ks_dht_exec_search_findnode(ks_dht_t *dht, ks_dht_job_t *job); + + /** * route table methods diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index 9fde747e7f..382c8155e4 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -58,6 +58,7 @@ typedef uint8_t ks_dhtrt_nodeid_t[KS_DHT_NODEID_SIZE]; /* internal structures */ typedef struct ks_dhtrt_bucket_entry_s { ks_time_t tyme; + ks_time_t ping_tyme; uint8_t id[KS_DHT_NODEID_SIZE]; ks_dht_node_t *gptr; /* ptr to peer */ uint8_t inuse; @@ -186,9 +187,9 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid, unsigned int max); static -void ks_dhtrt_ping(ks_dhtrt_internal_t *table, ks_dhtrt_bucket_entry_t *entry); +void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry); static -void ks_dhtrt_find(ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *nodeid); +void ks_dhtrt_find(ks_dhtrt_routetable_t *table, ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *nodeid); /* debugging */ @@ -484,20 +485,20 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no return KS_STATUS_FAIL; } - /* shift right x bits : todo 1 bit for the moment */ + /* shift right 1 bit */ ks_dhtrt_shiftright(newmask); /* create the new bucket structures */ ks_dhtrt_bucket_header_t *newleft = ks_dhtrt_create_bucketheader(table->pool, header, newmask); - header->right1bit = newleft; - newleft->left1bit = header; - newleft->bucket = ks_dhtrt_create_bucket(table->pool); newleft->flags = BHF_LEFT; /* flag as left hand side - therefore splitable */ ks_dhtrt_bucket_header_t *newright = ks_dhtrt_create_bucketheader(table->pool, header, header->mask); + newright->right1bit = newleft; + newleft->left1bit = newright; + ks_dhtrt_split_bucket(header, newleft, newright); /* ok now we need to try again to see if the bucket has capacity */ @@ -938,6 +939,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) ks_time_t t0 = ks_time_now_sec(); if (t0 - internal->last_process_table < internal->next_process_table_delta) { + /*printf("process table: next scan not scheduled\n");*/ return; } @@ -967,87 +969,90 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) if (b->count == 0) { - if (t0 - b->findtyme >= KS_DHTRT_EXPIREDTIME) { /* bucket has been empty for a while */ + if (t0 - b->findtyme >= (ks_time_t)KS_DHTRT_EXPIREDTIME) { /* bucket has been empty for a while */ + ks_dht_nodeid_t targetid; - if (header->left1bit) { - ks_dhtrt_midmask(header->left1bit->mask, header->mask, targetid.id); - } - else if (header->right1bit) { + + if (header->right1bit) { ks_dhtrt_midmask(header->mask, header->right1bit->mask, targetid.id); } else { - ks_dhtrt_shiftright(targetid.id); + ks_dhtrt_nodeid_t rightid; + memcpy(rightid, header->mask, KS_DHT_NODEID_SIZE); + ks_dhtrt_shiftright(rightid); + ks_dhtrt_midmask(header->mask, rightid, targetid.id); } - ks_dhtrt_find(internal, &targetid); - continue; + + ks_dhtrt_find(table, internal, &targetid); + b->findtyme = t0; } } + else { + for (int ix=0; ixentries[ix]; - for (int ix=0; ixentries[ix]; + if (e->inuse == 1) { - if (e->inuse == 1) { + ks_time_t tdiff = t0 - e->tyme; - if (e->gptr->type != KS_DHT_LOCAL) { /* 'local' nodes do not get expired */ + if (e->gptr->type != KS_DHT_LOCAL) { /* 'local' nodes do not get expired */ - /* more than n pings outstanding? */ + /* more than n pings outstanding? */ - if (e->flags == DHTPEER_DUBIOUS) { - continue; - } - - if ( e->flags != DHTPEER_EXPIRED && - e->outstanding_pings >= KS_DHTRT_MAXPING ) { - ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n", - ks_dhtrt_printableid(e->id, buf)); - e->flags = DHTPEER_EXPIRED; - ++b->expired_count; - e->outstanding_pings = 0; /* extinguish all hope: do not retry again */ - continue; - } - - /* if not on shortest interval and there are any outstanding pings - send another */ - if ( internal->next_process_table_delta == KS_DHTRT_PROCESSTABLE_SHORTINTERVAL120 && - e->outstanding_pings > 0) { - ks_dhtrt_ping(internal, e); - - if (e->outstanding_pings == 2) { - ++ping2_count; /* return in 60 seconds for final check */ - } - else { - ++ping_count; + if (e->flags == DHTPEER_DUBIOUS) { + continue; /* nothin' to see here */ } - continue; - } + /* refresh empty buckets */ + if ( e->flags != DHTPEER_EXPIRED && + tdiff >= KS_DHTRT_EXPIREDTIME && /* beyond expired time */ + e->outstanding_pings >= KS_DHTRT_MAXPING ) { /* has been retried */ + ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n", + ks_dhtrt_printableid(e->id, buf)); + e->flags = DHTPEER_EXPIRED; + ++b->expired_count; + e->outstanding_pings = 0; /* extinguish all hope: do not retry again */ + continue; + } - /* if on shortest interval and there are two outstanding pings - send another and final */ - if ( internal->next_process_table_delta == KS_DHTRT_PROCESSTABLE_SHORTINTERVAL60 && - e->outstanding_pings >= 2) { - ks_dhtrt_ping(internal, e); - ++ping_count; - continue; - } - - ks_time_t tdiff = t0 - e->tyme; + /* re ping in-doubt nodes */ + if ( e->outstanding_pings > 0) { + ks_time_t tping = t0 - e->ping_tyme; /* time since we last pinged */ - if (tdiff > KS_DHTRT_EXPIREDTIME) { - e->flags = DHTPEER_DUBIOUS; /* mark as dubious */ - ks_dhtrt_ping(internal, e); /* final effort to activate */ - continue; - } + if (e->outstanding_pings == KS_DHTRT_MAXPING - 1) { /* final ping */ + ks_dhtrt_ping(internal, e); + e->ping_tyme = t0; + ++ping2_count; + } + else if (tping >= KS_DHTRT_PROCESSTABLE_SHORTINTERVAL120) { + ks_dhtrt_ping(internal, e); + e->ping_tyme = t0; + ++ping_count; + } + continue; + } - if (tdiff > KS_DHTRT_INACTIVETIME) { /* inactive for suspicious length */ - ks_dhtrt_ping(internal, e); /* kick */ - ++ping_count; - continue; - } + /* look for newly expired nodes */ + if (tdiff > KS_DHTRT_EXPIREDTIME) { + e->flags = DHTPEER_DUBIOUS; /* mark as dubious */ + ks_dhtrt_ping(internal, e); /* final effort to activate */ + e->ping_tyme = t0; + continue; + } - } /* end if not local */ + if (tdiff > KS_DHTRT_INACTIVETIME) { /* inactive for suspicious length */ + ks_dhtrt_ping(internal, e); /* kick */ + e->ping_tyme = t0; + ++ping_count; + continue; + } - } /* end if e->inuse */ + } /* end if not local */ - } /* end for each bucket_entry */ + } /* end if e->inuse */ + + } /* end for each bucket_entry */ + } /* if bucket->count == 0 .... else */ #ifdef KS_DHT_DEBUGLOCKPRINTF_ char buf[100]; @@ -1664,6 +1669,7 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry char buf[100]; ks_log(KS_LOG_DEBUG, "Ping queued for nodeid %s count %d\n", ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings); + /*printf("ping: %s\n", buf); fflush(stdout);*/ #endif ks_dht_node_t* node = entry->gptr; ks_log(KS_LOG_DEBUG, "Node addr %s %d\n", node->addr.host, node->addr.port); @@ -1673,19 +1679,14 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry } static -void ks_dhtrt_find(ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *nodeid) { +void ks_dhtrt_find(ks_dhtrt_routetable_t *table, ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *target) { -#ifdef KS_DHT_DEBUGPRINTF_ char buf[100]; - ks_log(KS_LOG_DEBUG, "Find queued for mask %s\n", ks_dhtrt_printableid(nodeid->id, buf)); -#endif - + ks_log(KS_LOG_DEBUG, "Find queued for target %s\n", ks_dhtrt_printableid(target->id, buf)); + ks_dht_queue_search_findnode(internal->dht, table, target, NULL); return; } - - - /* strictly for shifting the bucketheader mask so format must be a right filled mask (hex: ..ffffffff) @@ -1723,7 +1724,8 @@ void ks_dhtrt_shiftleft(uint8_t *id) { static void ks_dhtrt_midmask(uint8_t *leftid, uint8_t *rightid, uint8_t *midpt) { - int i = 0; + uint8_t i = 0; + memset(midpt, 0, sizeof KS_DHT_NODEID_SIZE); for ( ; i < KS_DHT_NODEID_SIZE; ++i) { @@ -1731,17 +1733,28 @@ void ks_dhtrt_midmask(uint8_t *leftid, uint8_t *rightid, uint8_t *midpt) { if (leftid[i] == 0 && rightid[i] == 0) { continue; } - break; /* first non zero */ + else if (leftid[i] == 0 || rightid[i] == 0) { + midpt[i] = leftid[i] | rightid[i]; + continue; + } + else { + if (leftid[i] == rightid[i]) { + midpt[i] = leftid[i] >> 1; + i++; + } + else { + uint16_t x = leftid[i] + rightid[i]; + x >>= 1; + midpt[i++] = (uint8_t)x; + } + break; + } } if (i == KS_DHT_NODEID_SIZE) { return; } - uint16_t x = leftid[i] + rightid[i]; - x >>= 1; - midpt[i++] = (uint8_t)x; - if ( i < KS_DHT_NODEID_SIZE ) { memcpy(&midpt[i], &rightid[i], KS_DHT_NODEID_SIZE-i); } diff --git a/libs/libks/src/dht/ks_dht_job.c b/libs/libks/src/dht/ks_dht_job.c index 9720282a7f..7479a305fd 100644 --- a/libs/libks/src/dht/ks_dht_job.c +++ b/libs/libks/src/dht/ks_dht_job.c @@ -98,6 +98,23 @@ KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job, job->query_storageitem = item; } +KS_DECLARE(void) ks_dht_job_build_search_findnode(ks_dht_job_t *job, + ks_dht_nodeid_t *target, + uint32_t family, + ks_dht_job_callback_t query_callback, + ks_dht_job_callback_t finish_callback) +{ + ks_assert(job); + ks_assert(target); + ks_assert(family); + + job->search = NULL; + job->query_callback = query_callback; + job->finish_callback = finish_callback; + job->query_target = *target; + job->query_family = family; +} + KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job) { ks_dht_job_t *j; diff --git a/libs/libks/test/testbuckets.c b/libs/libks/test/testbuckets.c index f0cc32129e..18890f251a 100644 --- a/libs/libks/test/testbuckets.c +++ b/libs/libks/test/testbuckets.c @@ -548,6 +548,117 @@ void test07() } +void test08() +{ + printf("**** testbuckets - test08 start\n"); fflush(stdout); + + ks_dht_node_t *peer; + memset(g_nodeid1.id, 0xef, KS_DHT_NODEID_SIZE); + memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE); + + char ipv6[] = "1234:1234:1234:1234"; + char ipv4[] = "123.123.123.123"; + unsigned short port = 7000; + + /* build a delete queue */ + + int cix=0; + + for(int i0=0, i1=0; i0<150; ++i0, ++i1) { + if (i0%20 == 0) { + g_nodeid2.id[cix]>>=1; + //ks_dhtrt_dump(rt, 7); + if ( g_nodeid2.id[cix] == 0) ++cix; + g_nodeid2.id[19] = 0; + } + else { + ++g_nodeid2.id[19]; + } + ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, KS_DHTRT_CREATE_DEFAULT, &peer); + ks_dhtrt_touch_node(rt, g_nodeid2); + ks_dhtrt_release_node(peer); + } + + cix = 0; + + memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE); + for (int i0=0, i1=0; i0<150; ++i0, ++i1) { + if (i0%20 == 0) { + g_nodeid2.id[cix]>>=1; + if ( g_nodeid2.id[cix] == 0) ++cix; + g_nodeid2.id[19] = 0; + } + else { + ++g_nodeid2.id[19]; + } + ks_dht_node_t* n = ks_dhtrt_find_node(rt, g_nodeid2); + ks_dhtrt_release_node(n); + ks_dhtrt_delete_node(rt, n); + } + + /* this should drive the search_findnode */ + + for(int i=0; i<45; ++i) { + printf("firing process table\n"); + ks_dhtrt_process_table(rt); + ks_sleep(1000 * 1000 * 60); /* sleep one minutes */ + } + + printf("**** testbuckets - test08 ended\n"); fflush(stdout); +} + + +void test09() +{ + printf("**** testbuckets - test09 start\n"); fflush(stdout); + + ks_dht_node_t *peer; + memset(g_nodeid1.id, 0xef, KS_DHT_NODEID_SIZE); + memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE); + + char ipv6[] = "1234:1234:1234:1234"; + char ipv4[] = "123.123.123.123"; + unsigned short port = 7000; + + /* build a delete queue */ + + int cix=0; + + for(int i0=0, i1=0; i0<150; ++i0, ++i1) { + if (i0%20 == 0) { + g_nodeid2.id[cix]>>=1; + //ks_dhtrt_dump(rt, 7); + if ( g_nodeid2.id[cix] == 0) ++cix; + g_nodeid2.id[19] = 0; + } + else { + ++g_nodeid2.id[19]; + } + ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, KS_DHTRT_CREATE_DEFAULT, &peer); + ks_dhtrt_touch_node(rt, g_nodeid2); + ks_dhtrt_release_node(peer); + } + + /* this should expire all nodes after 15 minutes and 3 pings */ + + printf("\n\n\n\n"); + + for(int i=0; i<45; ++i) { + printf("firing process table\n"); + ks_dhtrt_process_table(rt); + ks_sleep(1000 * 1000 * 30); /* sleep 30 seconds */ + } + + printf("**** testbuckets - test09 ended\n"); fflush(stdout); +} + + + + + + + + static int gindex = 1; static ks_mutex_t *glock; static int gstop = 0; @@ -1020,7 +1131,6 @@ int main(int argc, char *argv[]) { continue; } - if (tests[tix] == 7) { ks_dhtrt_initroute(&rt, dht, pool); test07(); @@ -1028,6 +1138,22 @@ int main(int argc, char *argv[]) { continue; } + if (tests[tix] == 8) { + ks_dhtrt_initroute(&rt, dht, pool); + test08(); + ks_dhtrt_deinitroute(&rt); + continue; + } + + if (tests[tix] == 9) { + ks_dhtrt_initroute(&rt, dht, pool); + test09(); + ks_dhtrt_deinitroute(&rt); + continue; + } + + + if (tests[tix] == 30) { ks_dhtrt_initroute(&rt, dht, pool); test30();