diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index 6c73318873..ff457c4078 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -537,7 +537,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid * If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint. */ if (ep->addr.family == AF_INET) { - if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done; + if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_create_node(dht->rt_ipv4, ep->nodeid, KS_DHT_LOCAL, @@ -545,7 +545,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ep->addr.port, &ep->node)) != KS_STATUS_SUCCESS) goto done; } else { - if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done; + if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_create_node(dht->rt_ipv6, ep->nodeid, KS_DHT_LOCAL, diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 1f320b997a..4e4f07456b 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -406,7 +406,10 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, * route table methods * */ -KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool); +KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, + ks_dht_t *dht, + ks_pool_t *pool, + ks_thread_pool_t* tpool); KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table); KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table, diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index 5577b49dc9..a04717ada8 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -39,9 +39,11 @@ /* change for testing */ #define KS_DHT_BUCKETSIZE 20 -#define KS_DHTRT_INACTIVETIME (15*60) +#define KS_DHTRT_INACTIVETIME (10*60) +#define KS_DHTRT_EXPIREDTIME (15*60) #define KS_DHTRT_MAXPING 3 #define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60) +#define KS_DHTRT_PROCESSTABLE_SHORTINTERVAL (120) #define KS_DHTRT_RECYCLE_NODE_THRESHOLD 100 /* peer flags */ @@ -49,6 +51,7 @@ #define DHTPEER_EXPIRED 1 #define DHTPEER_ACTIVE 2 + typedef uint8_t ks_dhtrt_nodeid_t[KS_DHT_NODEID_SIZE]; /* internal structures */ @@ -79,7 +82,6 @@ typedef struct ks_dhtrt_bucket_header_s { struct ks_dhtrt_bucket_header_s * left; struct ks_dhtrt_bucket_header_s * right; ks_dhtrt_bucket_t * bucket; - ks_dhtrt_bucket_t * bucketv6; ks_time_t tyme; /* last processed time */ unsigned char mask[KS_DHT_NODEID_SIZE]; /* node id mask */ unsigned char flags; @@ -93,9 +95,11 @@ typedef struct ks_dhtrt_deletednode_s { typedef struct ks_dhtrt_internal_s { uint8_t localid[KS_DHT_NODEID_SIZE]; ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */ + ks_dht_t *dht; ks_thread_pool_t *tpool; ks_rwl_t *lock; /* lock for safe traversal of the tree */ ks_time_t last_process_table; + ks_time_t next_process_table_delta; ks_mutex_t *deleted_node_lock; ks_dhtrt_deletednode_t *deleted_node; ks_dhtrt_deletednode_t *free_node_ex; @@ -130,6 +134,8 @@ ks_dhtrt_bucket_t *ks_dhtrt_create_bucket(ks_pool_t *pool); static ks_dhtrt_bucket_header_t *ks_dhtrt_find_bucketheader(ks_dhtrt_routetable_t *table, ks_dhtrt_nodeid_t id); static +ks_dhtrt_bucket_header_t *ks_dhtrt_find_relatedbucketheader(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t id); +static ks_dhtrt_bucket_entry_t *ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t id); static @@ -178,7 +184,7 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid, unsigned int max); static -void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry); +void ks_dhtrt_ping(ks_dhtrt_internal_t *table, ks_dhtrt_bucket_entry_t *entry); @@ -187,12 +193,15 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry); /* very verbose */ /* # define KS_DHT_DEBUGPRINTFX_ */ /* debug locking */ -/* # define KS_DHT_DEBUGLOCKPRINTF_ */ +#define KS_DHT_DEBUGLOCKPRINTF_ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, - ks_pool_t *pool, - ks_thread_pool_t* tpool) + ks_dht_t *dht, + ks_pool_t *pool, + ks_thread_pool_t* tpool) { +(void)ks_dhtrt_find_relatedbucketheader; + unsigned char initmask[KS_DHT_NODEID_SIZE]; memset(initmask, 0xff, sizeof(initmask)); @@ -202,6 +211,8 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_rwl_create(&internal->lock, pool); internal->tpool = tpool; + internal->dht = dht; + internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL; ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool); table->internal = internal; @@ -305,15 +316,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh #ifdef KS_DHT_DEBUGLOCKPRINTF_ char buf[100]; ks_log(KS_LOG_DEBUG, "Delete node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //printf("delete node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif ks_rwl_write_lock(bucket->lock); s = ks_dhtrt_delete_id(bucket, node->nodeid.id); #ifdef KS_DHT_DEBUGLOCKPRINTF_ ks_log(KS_LOG_DEBUG, "Delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //printf("delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif ks_rwl_write_unlock(bucket->lock); @@ -767,13 +774,20 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) /* */ ks_dhtrt_internal_t *internal = table->internal; + int ping_count = 0; ks_time_t t0 = ks_time_now_sec(); - if (t0 - internal->last_process_table < KS_DHTRT_PROCESSTABLE_INTERVAL) { + /* + printf("process_table: %" PRId64 " %" PRId64 "\n", t0 - internal->last_process_table, internal->next_process_table_delta); + */ + + if (t0 - internal->last_process_table < internal->next_process_table_delta) { return; } + internal->last_process_table = t0; + ks_log(KS_LOG_DEBUG,"process_table in progress\n"); ks_rwl_read_lock(internal->lock); /* grab read lock */ @@ -805,22 +819,40 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) /* more than n pings outstanding? */ - if (e->outstanding_pings >= KS_DHTRT_MAXPING) { + if (e->flags == DHTPEER_DUBIOUS) { + continue; + } + + if ( e->flags != DHTPEER_EXPIRED && + e->outstanding_pings >= KS_DHTRT_MAXPING ) { +#ifdef KS_DHT_DEBUGPRINTF_ + ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n", + ks_dhtrt_printableid(e->id, buf)); +#endif e->flags = DHTPEER_EXPIRED; ++b->expired_count; continue; } - if (e->flags == DHTPEER_DUBIOUS) { - ks_dhtrt_ping(e); - continue; + /* if there are any outstanding pings - send another */ + if (e->outstanding_pings > 0) { + ks_dhtrt_ping(internal, e); + ++ping_count; + continue; } ks_time_t tdiff = t0 - e->tyme; - if (tdiff > KS_DHTRT_INACTIVETIME) { - e->flags = DHTPEER_DUBIOUS; - ks_dhtrt_ping(e); + if (tdiff > KS_DHTRT_EXPIREDTIME) { + e->flags = DHTPEER_DUBIOUS; /* mark as dubious */ + ks_dhtrt_ping(internal, e); /* final effort to activate */ + continue; + } + + if (tdiff > KS_DHTRT_INACTIVETIME) { /* inactive for suspicious length */ + ks_dhtrt_ping(internal, e); /* kick */ + ++ping_count; + continue; } } /* end if not local */ @@ -830,8 +862,8 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) } /* end for each bucket_entry */ #ifdef KS_DHT_DEBUGLOCKPRINTF_ - char buf1[100]; - ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1)); + char buf1[100]; + ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1)); #endif ks_rwl_write_unlock(b->lock); @@ -857,6 +889,14 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) ks_dhtrt_process_deleted(table); + if (ping_count == 0) { + internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL; + } + else { + internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_SHORTINTERVAL; + } + ks_log(KS_LOG_DEBUG,"process_table complete\n"); + return; } @@ -943,7 +983,10 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) { memset(buffer, 0, 100); if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer); else strcpy(buffer, ""); - ks_log(KS_LOG_DEBUG, " slot %d: %d %s\n", ix, b->entries[ix].flags, buffer); + ks_log(KS_LOG_DEBUG, " slot %d: %d %d %s\n", ix, + b->entries[ix].flags, + b->entries[ix].outstanding_pings, + buffer); } ks_log(KS_LOG_DEBUG, " --------------------------\n\n"); @@ -979,7 +1022,6 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt char buffer[100]; ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s\n", ks_dhtrt_printableid(mask, buffer)); if (parent) ks_log(KS_LOG_DEBUG, " ... from parent mask: %s\n", ks_dhtrt_printableid(parent->mask, buffer)); - printf("\n"); #endif return header; } @@ -1018,6 +1060,32 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_find_bucketheader(ks_dhtrt_routetable_t *tabl return NULL; } +static +ks_dhtrt_bucket_header_t *ks_dhtrt_find_relatedbucketheader(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t id) +{ + /* + using the passed bucket header as a starting point find the right bucket. + This is a shortcut used in query to shorten the search path for queries extending beyond a single bucket. + */ + + while (header) { + if ( header->bucket ) { + return header; + } + + /* left hand side is more restrictive (closer) so should be tried first */ + if (header->left != 0 && (ks_dhtrt_ismasked(id, header->left->mask))) { + header = header->left; + } else { + header = header->right; + } + } + + return NULL; +} + + + static ks_dhtrt_bucket_entry_t *ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t nodeid) { @@ -1392,18 +1460,17 @@ ks_dht_node_t* ks_dhtrt_make_node(ks_dhtrt_routetable_t* table) return node; } -void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) { +void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry) { ++entry->outstanding_pings; - /* @todo */ - /* set the appropriate command in the node and queue if for processing */ - /*ks_dht_node_t *node = entry->gptr; */ - /* ++entry->outstanding_pings; */ #ifdef KS_DHT_DEBUGPRINTF_ char buf[100]; - printf(" ping queued for nodeid %s count %d\n", + ks_log(KS_LOG_DEBUG, "Ping queued for nodeid %s count %d\n", ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings); #endif + ks_dht_node_t* node = entry->gptr; + ks_dht_ping(internal->dht, &node->addr, NULL); + return; } diff --git a/libs/libks/test/testbuckets.c b/libs/libks/test/testbuckets.c index 43fd1b2136..e5204faf1b 100644 --- a/libs/libks/test/testbuckets.c +++ b/libs/libks/test/testbuckets.c @@ -5,6 +5,7 @@ //#include "ks.h" #include "../src/dht/ks_dht.h" +ks_dht_t* dht; ks_dhtrt_routetable_t* rt; ks_pool_t* pool; ks_thread_pool_t* tpool; @@ -30,10 +31,10 @@ void test01() printf("*** testbuckets - test01 start\n"); fflush(stdout); ks_dhtrt_routetable_t* rt; - ks_dhtrt_initroute(&rt, pool, tpool); + ks_dhtrt_initroute(&rt, dht, pool, tpool); ks_dhtrt_deinitroute(&rt); - ks_dhtrt_initroute(&rt, pool, tpool); + ks_dhtrt_initroute(&rt, dht, pool, tpool); ks_dht_nodeid_t nodeid, homeid; memset(homeid.id, 0xdd, KS_DHT_NODEID_SIZE); homeid.id[19] = 0; @@ -255,14 +256,18 @@ void test04() ks_status_t status; - for (int i=0,i2=0; i<10000; ++i) { - if (i%40 == 0) { - ++nodeid.id[0]; - if(i2%40 == 0) { - ++nodeid.id[1]; + for (int i=0,i2=0,i3=0; i<10000; ++i, ++i2, ++i3) { + if (i%20 == 0) { + nodeid.id[0] = nodeid.id[0] / 2; + if(i2%20 == 0) { + nodeid.id[1] = nodeid.id[1] / 2; + i2 = 0; + if(i3%20 == 0) { + nodeid.id[2] = nodeid.id[2] / 2; + } } else { - ++nodeid.id[2]; + ++nodeid.id[3]; } } else { @@ -531,6 +536,54 @@ void test50() } +/* test process_table */ +void test51() +{ + printf("*** testbuckets - test51 start\n"); fflush(stdout); + + ks_dht_node_t* peer; + ks_dht_nodeid_t nodeid, nodeid2; + memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE); + memset(nodeid2.id, 0xef, KS_DHT_NODEID_SIZE); + + char ipv6[] = "1234:1234:1234:1234"; + char ipv4[] = "123.123.123.123"; + unsigned short port = 7000; + enum ks_afflags_t both = ifboth; + + ks_status_t status; + + for (int i=0,i2=0; i<2; ++i, ++i2) { + if (i%20 == 0) { + nodeid.id[0] = nodeid.id[0] / 2; + if(i2%20 == 0) { + i2 = 0; + nodeid.id[1] = nodeid.id[1] / 2; + } + else { + ++nodeid.id[2]; + } + } + else { + ++nodeid.id[1]; + } + ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); + ks_dhtrt_touch_node(rt, nodeid); + } + + for(int ix=0; ix<50; ++ix) { + ks_dhtrt_process_table(rt); + ks_sleep(1000 * 1000 * 120); + printf("*** pulse ks_dhtrt_process_table\n"); + if ( ix%2 == 0) ks_dhtrt_dump(rt, 7); + } + + + printf("*** testbuckets - test51 complete\n"); fflush(stdout); + + return; +} + int main(int argc, char* argv[]) { @@ -554,8 +607,10 @@ int main(int argc, char* argv[]) { ks_init(); ks_global_set_default_logger(7); + ks_dht_create(&dht, NULL, NULL); - ks_thread_pool_create(&tpool, KS_DHT_TPOOL_MIN, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE); + + ks_thread_pool_create(&tpool, 0, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE); ks_status_t status; char *str = NULL; @@ -572,61 +627,67 @@ int main(int argc, char* argv[]) { printf("init/deinit routeable\n"); fflush(stdout); - ks_dhtrt_initroute(&rt, pool, tpool); + ks_dhtrt_initroute(&rt, dht, pool, tpool); ks_dhtrt_deinitroute(&rt); for(int tix=0; tix