FS-9775: Drive ping logic from dhtrt_process_table

This commit is contained in:
colm 2016-12-20 10:35:40 -05:00 committed by Mike Jerris
parent 51c1b7a719
commit 4338c1b941
4 changed files with 179 additions and 48 deletions

View File

@ -537,7 +537,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
* If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint.
*/
if (ep->addr.family == AF_INET) {
if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done;
if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_create_node(dht->rt_ipv4,
ep->nodeid,
KS_DHT_LOCAL,
@ -545,7 +545,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
ep->addr.port,
&ep->node)) != KS_STATUS_SUCCESS) goto done;
} else {
if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done;
if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_create_node(dht->rt_ipv6,
ep->nodeid,
KS_DHT_LOCAL,

View File

@ -406,7 +406,10 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message,
* route table methods
*
*/
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool);
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
ks_dht_t *dht,
ks_pool_t *pool,
ks_thread_pool_t* tpool);
KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table);
KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table,

View File

@ -39,9 +39,11 @@
/* change for testing */
#define KS_DHT_BUCKETSIZE 20
#define KS_DHTRT_INACTIVETIME (15*60)
#define KS_DHTRT_INACTIVETIME (10*60)
#define KS_DHTRT_EXPIREDTIME (15*60)
#define KS_DHTRT_MAXPING 3
#define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60)
#define KS_DHTRT_PROCESSTABLE_SHORTINTERVAL (120)
#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 100
/* peer flags */
@ -49,6 +51,7 @@
#define DHTPEER_EXPIRED 1
#define DHTPEER_ACTIVE 2
typedef uint8_t ks_dhtrt_nodeid_t[KS_DHT_NODEID_SIZE];
/* internal structures */
@ -79,7 +82,6 @@ typedef struct ks_dhtrt_bucket_header_s {
struct ks_dhtrt_bucket_header_s * left;
struct ks_dhtrt_bucket_header_s * right;
ks_dhtrt_bucket_t * bucket;
ks_dhtrt_bucket_t * bucketv6;
ks_time_t tyme; /* last processed time */
unsigned char mask[KS_DHT_NODEID_SIZE]; /* node id mask */
unsigned char flags;
@ -93,9 +95,11 @@ typedef struct ks_dhtrt_deletednode_s {
typedef struct ks_dhtrt_internal_s {
uint8_t localid[KS_DHT_NODEID_SIZE];
ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */
ks_dht_t *dht;
ks_thread_pool_t *tpool;
ks_rwl_t *lock; /* lock for safe traversal of the tree */
ks_time_t last_process_table;
ks_time_t next_process_table_delta;
ks_mutex_t *deleted_node_lock;
ks_dhtrt_deletednode_t *deleted_node;
ks_dhtrt_deletednode_t *free_node_ex;
@ -130,6 +134,8 @@ ks_dhtrt_bucket_t *ks_dhtrt_create_bucket(ks_pool_t *pool);
static
ks_dhtrt_bucket_header_t *ks_dhtrt_find_bucketheader(ks_dhtrt_routetable_t *table, ks_dhtrt_nodeid_t id);
static
ks_dhtrt_bucket_header_t *ks_dhtrt_find_relatedbucketheader(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t id);
static
ks_dhtrt_bucket_entry_t *ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t id);
static
@ -178,7 +184,7 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid,
unsigned int max);
static
void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry);
void ks_dhtrt_ping(ks_dhtrt_internal_t *table, ks_dhtrt_bucket_entry_t *entry);
@ -187,12 +193,15 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry);
/* very verbose */
/* # define KS_DHT_DEBUGPRINTFX_ */
/* debug locking */
/* # define KS_DHT_DEBUGLOCKPRINTF_ */
#define KS_DHT_DEBUGLOCKPRINTF_
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
ks_dht_t *dht,
ks_pool_t *pool,
ks_thread_pool_t* tpool)
{
(void)ks_dhtrt_find_relatedbucketheader;
unsigned char initmask[KS_DHT_NODEID_SIZE];
memset(initmask, 0xff, sizeof(initmask));
@ -202,6 +211,8 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
ks_rwl_create(&internal->lock, pool);
internal->tpool = tpool;
internal->dht = dht;
internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL;
ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool);
table->internal = internal;
@ -305,15 +316,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
ks_log(KS_LOG_DEBUG, "Delete node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
//printf("delete node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
//fflush(stdout);
#endif
ks_rwl_write_lock(bucket->lock);
s = ks_dhtrt_delete_id(bucket, node->nodeid.id);
#ifdef KS_DHT_DEBUGLOCKPRINTF_
ks_log(KS_LOG_DEBUG, "Delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
//printf("delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
//fflush(stdout);
#endif
ks_rwl_write_unlock(bucket->lock);
@ -767,13 +774,20 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
/* */
ks_dhtrt_internal_t *internal = table->internal;
int ping_count = 0;
ks_time_t t0 = ks_time_now_sec();
if (t0 - internal->last_process_table < KS_DHTRT_PROCESSTABLE_INTERVAL) {
/*
printf("process_table: %" PRId64 " %" PRId64 "\n", t0 - internal->last_process_table, internal->next_process_table_delta);
*/
if (t0 - internal->last_process_table < internal->next_process_table_delta) {
return;
}
internal->last_process_table = t0;
ks_log(KS_LOG_DEBUG,"process_table in progress\n");
ks_rwl_read_lock(internal->lock); /* grab read lock */
@ -805,22 +819,40 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
/* more than n pings outstanding? */
if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
if (e->flags == DHTPEER_DUBIOUS) {
continue;
}
if ( e->flags != DHTPEER_EXPIRED &&
e->outstanding_pings >= KS_DHTRT_MAXPING ) {
#ifdef KS_DHT_DEBUGPRINTF_
ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n",
ks_dhtrt_printableid(e->id, buf));
#endif
e->flags = DHTPEER_EXPIRED;
++b->expired_count;
continue;
}
if (e->flags == DHTPEER_DUBIOUS) {
ks_dhtrt_ping(e);
/* if there are any outstanding pings - send another */
if (e->outstanding_pings > 0) {
ks_dhtrt_ping(internal, e);
++ping_count;
continue;
}
ks_time_t tdiff = t0 - e->tyme;
if (tdiff > KS_DHTRT_INACTIVETIME) {
e->flags = DHTPEER_DUBIOUS;
ks_dhtrt_ping(e);
if (tdiff > KS_DHTRT_EXPIREDTIME) {
e->flags = DHTPEER_DUBIOUS; /* mark as dubious */
ks_dhtrt_ping(internal, e); /* final effort to activate */
continue;
}
if (tdiff > KS_DHTRT_INACTIVETIME) { /* inactive for suspicious length */
ks_dhtrt_ping(internal, e); /* kick */
++ping_count;
continue;
}
} /* end if not local */
@ -857,6 +889,14 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
ks_dhtrt_process_deleted(table);
if (ping_count == 0) {
internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_INTERVAL;
}
else {
internal->next_process_table_delta = KS_DHTRT_PROCESSTABLE_SHORTINTERVAL;
}
ks_log(KS_LOG_DEBUG,"process_table complete\n");
return;
}
@ -943,7 +983,10 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
memset(buffer, 0, 100);
if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer);
else strcpy(buffer, "<free>");
ks_log(KS_LOG_DEBUG, " slot %d: %d %s\n", ix, b->entries[ix].flags, buffer);
ks_log(KS_LOG_DEBUG, " slot %d: %d %d %s\n", ix,
b->entries[ix].flags,
b->entries[ix].outstanding_pings,
buffer);
}
ks_log(KS_LOG_DEBUG, " --------------------------\n\n");
@ -979,7 +1022,6 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt
char buffer[100];
ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s\n", ks_dhtrt_printableid(mask, buffer));
if (parent) ks_log(KS_LOG_DEBUG, " ... from parent mask: %s\n", ks_dhtrt_printableid(parent->mask, buffer));
printf("\n");
#endif
return header;
}
@ -1018,6 +1060,32 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_find_bucketheader(ks_dhtrt_routetable_t *tabl
return NULL;
}
static
ks_dhtrt_bucket_header_t *ks_dhtrt_find_relatedbucketheader(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t id)
{
/*
using the passed bucket header as a starting point find the right bucket.
This is a shortcut used in query to shorten the search path for queries extending beyond a single bucket.
*/
while (header) {
if ( header->bucket ) {
return header;
}
/* left hand side is more restrictive (closer) so should be tried first */
if (header->left != 0 && (ks_dhtrt_ismasked(id, header->left->mask))) {
header = header->left;
} else {
header = header->right;
}
}
return NULL;
}
static
ks_dhtrt_bucket_entry_t *ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header_t *header, ks_dhtrt_nodeid_t nodeid)
{
@ -1392,18 +1460,17 @@ ks_dht_node_t* ks_dhtrt_make_node(ks_dhtrt_routetable_t* table)
return node;
}
void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) {
void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry) {
++entry->outstanding_pings;
/* @todo */
/* set the appropriate command in the node and queue if for processing */
/*ks_dht_node_t *node = entry->gptr; */
/* ++entry->outstanding_pings; */
#ifdef KS_DHT_DEBUGPRINTF_
char buf[100];
printf(" ping queued for nodeid %s count %d\n",
ks_log(KS_LOG_DEBUG, "Ping queued for nodeid %s count %d\n",
ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings);
#endif
ks_dht_node_t* node = entry->gptr;
ks_dht_ping(internal->dht, &node->addr, NULL);
return;
}

View File

@ -5,6 +5,7 @@
//#include "ks.h"
#include "../src/dht/ks_dht.h"
ks_dht_t* dht;
ks_dhtrt_routetable_t* rt;
ks_pool_t* pool;
ks_thread_pool_t* tpool;
@ -30,10 +31,10 @@ void test01()
printf("*** testbuckets - test01 start\n"); fflush(stdout);
ks_dhtrt_routetable_t* rt;
ks_dhtrt_initroute(&rt, pool, tpool);
ks_dhtrt_initroute(&rt, dht, pool, tpool);
ks_dhtrt_deinitroute(&rt);
ks_dhtrt_initroute(&rt, pool, tpool);
ks_dhtrt_initroute(&rt, dht, pool, tpool);
ks_dht_nodeid_t nodeid, homeid;
memset(homeid.id, 0xdd, KS_DHT_NODEID_SIZE);
homeid.id[19] = 0;
@ -255,14 +256,18 @@ void test04()
ks_status_t status;
for (int i=0,i2=0; i<10000; ++i) {
if (i%40 == 0) {
++nodeid.id[0];
if(i2%40 == 0) {
++nodeid.id[1];
for (int i=0,i2=0,i3=0; i<10000; ++i, ++i2, ++i3) {
if (i%20 == 0) {
nodeid.id[0] = nodeid.id[0] / 2;
if(i2%20 == 0) {
nodeid.id[1] = nodeid.id[1] / 2;
i2 = 0;
if(i3%20 == 0) {
nodeid.id[2] = nodeid.id[2] / 2;
}
}
else {
++nodeid.id[2];
++nodeid.id[3];
}
}
else {
@ -531,6 +536,54 @@ void test50()
}
/* test process_table */
void test51()
{
printf("*** testbuckets - test51 start\n"); fflush(stdout);
ks_dht_node_t* peer;
ks_dht_nodeid_t nodeid, nodeid2;
memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE);
memset(nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
char ipv6[] = "1234:1234:1234:1234";
char ipv4[] = "123.123.123.123";
unsigned short port = 7000;
enum ks_afflags_t both = ifboth;
ks_status_t status;
for (int i=0,i2=0; i<2; ++i, ++i2) {
if (i%20 == 0) {
nodeid.id[0] = nodeid.id[0] / 2;
if(i2%20 == 0) {
i2 = 0;
nodeid.id[1] = nodeid.id[1] / 2;
}
else {
++nodeid.id[2];
}
}
else {
++nodeid.id[1];
}
ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
}
for(int ix=0; ix<50; ++ix) {
ks_dhtrt_process_table(rt);
ks_sleep(1000 * 1000 * 120);
printf("*** pulse ks_dhtrt_process_table\n");
if ( ix%2 == 0) ks_dhtrt_dump(rt, 7);
}
printf("*** testbuckets - test51 complete\n"); fflush(stdout);
return;
}
int main(int argc, char* argv[]) {
@ -554,8 +607,10 @@ int main(int argc, char* argv[]) {
ks_init();
ks_global_set_default_logger(7);
ks_dht_create(&dht, NULL, NULL);
ks_thread_pool_create(&tpool, KS_DHT_TPOOL_MIN, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
ks_thread_pool_create(&tpool, 0, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE);
ks_status_t status;
char *str = NULL;
@ -572,61 +627,67 @@ int main(int argc, char* argv[]) {
printf("init/deinit routeable\n"); fflush(stdout);
ks_dhtrt_initroute(&rt, pool, tpool);
ks_dhtrt_initroute(&rt, dht, pool, tpool);
ks_dhtrt_deinitroute(&rt);
for(int tix=0; tix<argc; ++tix) {
if (tests[tix] == 1) {
ks_dhtrt_initroute(&rt, pool, tpool);
ks_dhtrt_initroute(&rt, dht, pool, tpool);
test01();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 2) {
ks_dhtrt_initroute(&rt, pool, tpool);
ks_dhtrt_initroute(&rt, dht, pool, tpool);
test02();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 3) {
ks_dhtrt_initroute(&rt, pool, tpool);
ks_dhtrt_initroute(&rt, dht, pool, tpool);
test03();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 4) {
ks_dhtrt_initroute(&rt, pool, tpool);
ks_dhtrt_initroute(&rt, dht, pool, tpool);
test04();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 5) {
ks_dhtrt_initroute(&rt, pool, tpool);
ks_dhtrt_initroute(&rt, dht, pool, tpool);
test05();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 6) {
ks_dhtrt_initroute(&rt, pool, tpool);
ks_dhtrt_initroute(&rt, dht, pool, tpool);
test06();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 50) {
ks_dhtrt_initroute(&rt, pool, tpool);
ks_dhtrt_initroute(&rt, dht, pool, tpool);
test50();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 51) {
ks_dhtrt_initroute(&rt, dht, pool, tpool);
test51();
ks_dhtrt_deinitroute(&rt);
continue;
}