FS-9775: initialize dht nodes as dubious

This commit is contained in:
colm 2016-12-16 14:41:05 -05:00 committed by Mike Jerris
parent f7027cd343
commit df61ab87bf
2 changed files with 106 additions and 75 deletions

View File

@ -45,9 +45,9 @@
/* peer flags */ /* peer flags */
#define DHTPEER_ACTIVE 1 #define DHTPEER_DUBIOUS 0
#define DHTPEER_SUSPECT 2 #define DHTPEER_EXPIRED 1
#define DHTPEER_EXPIRED 3 #define DHTPEER_ACTIVE 2
typedef uint8_t ks_dhtrt_nodeid_t[KS_DHT_NODEID_SIZE]; typedef uint8_t ks_dhtrt_nodeid_t[KS_DHT_NODEID_SIZE];
@ -235,7 +235,6 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id); ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id);
if (bentry != 0) { if (bentry != 0) {
bentry->tyme = ks_time_now_sec(); bentry->tyme = ks_time_now_sec();
bentry->type = type;
(*node) = bentry->gptr; (*node) = bentry->gptr;
ks_rwl_read_unlock(internal->lock); ks_rwl_read_unlock(internal->lock);
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
@ -284,14 +283,16 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh
if (bucket != 0) { /* we found a bucket*/ if (bucket != 0) { /* we found a bucket*/
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100]; char buf[100];
printf("delete node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); ks_log(KS_LOG_DEBUG, "Delete node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //printf("delete node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
//fflush(stdout);
#endif #endif
ks_rwl_write_lock(bucket->lock); ks_rwl_write_lock(bucket->lock);
s = ks_dhtrt_delete_id(bucket, node->nodeid.id); s = ks_dhtrt_delete_id(bucket, node->nodeid.id);
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
printf("delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); ks_log(KS_LOG_DEBUG, "Delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //printf("delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
//fflush(stdout);
#endif #endif
ks_rwl_write_unlock(bucket->lock); ks_rwl_write_unlock(bucket->lock);
@ -335,8 +336,8 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
} }
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100]; char buf[100];
printf("insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); ks_log(KS_LOG_DEBUG, "Insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //fflush(stdout);
#endif #endif
ks_rwl_write_lock(bucket->lock); ks_rwl_write_lock(bucket->lock);
@ -349,8 +350,8 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
ks_status_t s = ks_dhtrt_insert_id(bucket, node); ks_status_t s = ks_dhtrt_insert_id(bucket, node);
if (s == KS_STATUS_SUCCESS) { if (s == KS_STATUS_SUCCESS) {
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); ks_log(KS_LOG_DEBUG, "insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //fflush(stdout);
#endif #endif
ks_rwl_write_unlock(bucket->lock); ks_rwl_write_unlock(bucket->lock);
ks_rwl_write_unlock(internal->lock); ks_rwl_write_unlock(internal->lock);
@ -367,11 +368,11 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
if ( !(header->flags & BHF_LEFT) ) { /* only the left handside node can be split */ if ( !(header->flags & BHF_LEFT) ) { /* only the left handside node can be split */
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char bufx[100]; char bufx[100];
printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, bufx)); ks_log(KS_LOG_DEBUG, "nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, bufx));
#endif #endif
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //fflush(stdout);
#endif #endif
ks_rwl_write_unlock(bucket->lock); ks_rwl_write_unlock(bucket->lock);
ks_rwl_write_unlock(internal->lock); ks_rwl_write_unlock(internal->lock);
@ -386,11 +387,11 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
if (newmask[KS_DHT_NODEID_SIZE-1] == 0) { /* no more bits to shift - is this possible */ if (newmask[KS_DHT_NODEID_SIZE-1] == 0) { /* no more bits to shift - is this possible */
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char bufx[100]; char bufx[100];
printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, bufx)); ks_log(KS_LOG_DEBUG," nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, bufx));
#endif #endif
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //fflush(stdout);
#endif #endif
ks_rwl_write_unlock(bucket->lock); ks_rwl_write_unlock(bucket->lock);
ks_rwl_write_unlock(internal->lock); ks_rwl_write_unlock(internal->lock);
@ -415,9 +416,9 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
if (ks_dhtrt_ismasked(node->nodeid.id, newleft->mask)) { if (ks_dhtrt_ismasked(node->nodeid.id, newleft->mask)) {
bucket = newleft->bucket; bucket = newleft->bucket;
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->right->mask, buf)); ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->right->mask, buf));
printf("insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(newleft->mask, buf)); ks_log(KS_LOG_DEBUG, "Insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(newleft->mask, buf));
fflush(stdout); //fflush(stdout);
#endif #endif
ks_rwl_write_lock(bucket->lock); /* lock new bucket */ ks_rwl_write_lock(bucket->lock); /* lock new bucket */
@ -433,16 +434,16 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char buffer[100]; char buffer[100];
printf("inserting nodeid %s ", ks_dhtrt_printableid(node->nodeid.id, buffer)); ks_log(KS_LOG_DEBUG, "Inserting nodeid %s\n", ks_dhtrt_printableid(node->nodeid.id, buffer));
printf("into bucket %s\n", ks_dhtrt_printableid(header->mask, buffer)); ks_log(KS_LOG_DEBUG, " ...into bucket %s\n", ks_dhtrt_printableid(header->mask, buffer));
#endif #endif
ks_status_t s = ks_dhtrt_insert_id(bucket, node); ks_status_t s = ks_dhtrt_insert_id(bucket, node);
ks_rwl_write_unlock(internal->lock); ks_rwl_write_unlock(internal->lock);
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
printf("insert node: UNLOCKING bucket %s\n", ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n",
ks_dhtrt_printableid(header->mask, buf)); ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //fflush(stdout);
#endif #endif
ks_rwl_write_unlock(bucket->lock); ks_rwl_write_unlock(bucket->lock);
return s; return s;
@ -466,8 +467,8 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100]; char buf[100];
printf("insert node: read LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); ks_log(KS_LOG_DEBUG, "Insert node: read LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //fflush(stdout);
#endif #endif
ks_rwl_read_lock(bucket->lock); ks_rwl_read_lock(bucket->lock);
@ -477,8 +478,8 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_
ks_rwl_read_lock(node->reflock); ks_rwl_read_lock(node->reflock);
} }
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
printf("insert node: read UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); ks_log(KS_LOG_DEBUG, "Insert node: read UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //fflush(stdout);
#endif #endif
ks_rwl_read_unlock(bucket->lock); ks_rwl_read_unlock(bucket->lock);
} }
@ -501,8 +502,8 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh
ks_rwl_write_lock(header->bucket->lock); ks_rwl_write_lock(header->bucket->lock);
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100]; char buf[100];
printf("insert node: write bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); ks_log(KS_LOG_DEBUG, "Touch node: write bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //fflush(stdout);
#endif #endif
ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id); ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
@ -511,7 +512,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh
e->tyme = ks_time_now_sec(); e->tyme = ks_time_now_sec();
e->outstanding_pings = 0; e->outstanding_pings = 0;
if (e->flags == DHTPEER_EXPIRED) { if (e->flags == DHTPEER_EXPIRED) {
--header->bucket->expired_count; --header->bucket->expired_count;
} }
@ -519,8 +520,8 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh
s = KS_STATUS_SUCCESS; s = KS_STATUS_SUCCESS;
} }
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); ks_log(KS_LOG_DEBUG, "Touch node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //fflush(stdout);
#endif #endif
ks_rwl_write_unlock(header->bucket->lock); ks_rwl_write_unlock(header->bucket->lock);
} }
@ -578,8 +579,8 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char buffer[100]; char buffer[100];
printf("finding %d closest nodes for nodeid %s\n", max, ks_dhtrt_printableid(query->nodeid.id, buffer)); ks_log(KS_LOG_DEBUG, "Finding %d closest nodes for nodeid %s\n", max, ks_dhtrt_printableid(query->nodeid.id, buffer));
printf(" starting at mask: %s\n", ks_dhtrt_printableid(header->mask, buffer)); ks_log(KS_LOG_DEBUG, " ...starting at mask: %s\n", ks_dhtrt_printableid(header->mask, buffer));
#endif #endif
ks_dhtrt_sortedxors_t xort0; ks_dhtrt_sortedxors_t xort0;
@ -597,7 +598,7 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
total += cnt; total += cnt;
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
printf(" bucket header %s yielded %d nodes; total=%d\n", buffer, cnt, total); ks_log(KS_LOG_DEBUG, "Bucket %s yielded %d nodes; total=%d\n", buffer, cnt, total);
#endif #endif
if (total >= query->max || if (total >= query->max ||
@ -630,7 +631,7 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
total += cnt; total += cnt;
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
printf(" stage2: sibling bucket header %s yielded %d nodes, total=%d\n", ks_log(KS_LOG_DEBUG," stage2: sibling bucket header %s yielded %d nodes, total=%d\n",
ks_dhtrt_printableid(header->mask, buffer), cnt, total); ks_dhtrt_printableid(header->mask, buffer), cnt, total);
#endif #endif
@ -677,7 +678,7 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
lheader, xortn, leftid ,max); lheader, xortn, leftid ,max);
max -= cnt; max -= cnt;
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
printf(" stage3: seaching left bucket header %s yielded %d nodes, total=%d\n", ks_log(KS_LOG_DEBUG," stage3: seaching left bucket header %s yielded %d nodes, total=%d\n",
ks_dhtrt_printableid(lheader->mask, buffer), cnt, total); ks_dhtrt_printableid(lheader->mask, buffer), cnt, total);
#endif #endif
} }
@ -695,7 +696,7 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
rheader, xortn1, rightid , max); rheader, xortn1, rightid , max);
max -= cnt; max -= cnt;
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
printf(" stage3: seaching right bucket header %s yielded %d nodes, total=%d\n", ks_log(KS_LOG_DEBUG," stage3: seaching right bucket header %s yielded %d nodes, total=%d\n",
ks_dhtrt_printableid(rheader->mask, buffer), cnt, total); ks_dhtrt_printableid(rheader->mask, buffer), cnt, total);
#endif #endif
} }
@ -778,8 +779,8 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100]; char buf[100];
printf("process_table: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); ks_log(KS_LOG_DEBUG,"process_table: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //fflush(stdout);
#endif #endif
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) { for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
@ -797,7 +798,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
continue; continue;
} }
if (e->flags == DHTPEER_SUSPECT) { if (e->flags == DHTPEER_DUBIOUS) {
ks_dhtrt_ping(e); ks_dhtrt_ping(e);
continue; continue;
} }
@ -805,7 +806,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
ks_time_t tdiff = t0 - e->tyme; ks_time_t tdiff = t0 - e->tyme;
if (tdiff > KS_DHTRT_INACTIVETIME) { if (tdiff > KS_DHTRT_INACTIVETIME) {
e->flags = DHTPEER_SUSPECT; e->flags = DHTPEER_DUBIOUS;
ks_dhtrt_ping(e); ks_dhtrt_ping(e);
} }
@ -817,8 +818,8 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf1[100]; char buf1[100];
printf("process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1)); ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
fflush(stdout); //fflush(stdout);
#endif #endif
ks_rwl_write_unlock(b->lock); ks_rwl_write_unlock(b->lock);
@ -827,8 +828,8 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
else { else {
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char buf2[100]; char buf2[100];
printf("process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf2)); ks_log(KS_LOG_DEBUG,"process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf2));
fflush(stdout); //fflush(stdout);
#endif #endif
} }
} }
@ -898,23 +899,23 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
stack[stackix++] = header; stack[stackix++] = header;
/* walk and report left handsize */ /* walk and report left handsize */
memset(buffer, 0, 100); memset(buffer, 0, 100);
/*ks_log*/ printf("bucket header: [%s]\n", ks_dhtrt_printableid(header->mask, buffer) ); ks_log(KS_LOG_DEBUG, "bucket header: [%s]\n", ks_dhtrt_printableid(header->mask, buffer) );
if (header->bucket) { if (header->bucket) {
ks_dhtrt_bucket_t *b = header->bucket; ks_dhtrt_bucket_t *b = header->bucket;
printf(" bucket holds %d entries\n", b->count); ks_log(KS_LOG_DEBUG, " bucket holds %d entries\n", b->count);
if (b->count > 0 && level == 7) { if (b->count > 0 && level == 7) {
printf(" --------------------------\n"); ks_log(KS_LOG_DEBUG, " --------------------------\n");
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) { for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
memset(buffer, 0, 100); memset(buffer, 0, 100);
if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer); if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer);
else strcpy(buffer, "<free>"); else strcpy(buffer, "<free>");
printf(" slot %d: %s\n", ix, buffer); ks_log(KS_LOG_DEBUG, " slot %d: %s\n", ix, buffer);
} }
printf(" --------------------------\n\n"); ks_log(KS_LOG_DEBUG, " --------------------------\n\n");
} }
} }
@ -945,8 +946,8 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char buffer[100]; char buffer[100];
printf("creating bucket header for mask: %s ", ks_dhtrt_printableid(mask, buffer)); ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s ", ks_dhtrt_printableid(mask, buffer));
if (parent) printf("from parent mask: %s ", ks_dhtrt_printableid(parent->mask, buffer)); if (parent) ks_log(KS_LOG_DEBUG, "from parent mask: %s ", ks_dhtrt_printableid(parent->mask, buffer));
printf("\n"); printf("\n");
#endif #endif
return header; return header;
@ -1045,9 +1046,9 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original,
original->right = right; original->right = right;
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char buffer[100]; char buffer[100];
printf("\nsplitting bucket orginal: %s\n", ks_dhtrt_printableid(original->mask, buffer)); ks_log(KS_LOG_DEBUG, "\nsplitting bucket orginal: %s\n", ks_dhtrt_printableid(original->mask, buffer));
printf(" into (left) mask: %s size: %d\n", ks_dhtrt_printableid(left->mask, buffer), left->bucket->count); ks_log(KS_LOG_DEBUG, " into (left) mask: %s size: %d\n", ks_dhtrt_printableid(left->mask, buffer), left->bucket->count);
printf(" and (right) mask: %s size: %d\n\n", ks_dhtrt_printableid(right->mask, buffer), right->bucket->count); ks_log(KS_LOG_DEBUG, " and (right) mask: %s size: %d\n\n", ks_dhtrt_printableid(right->mask, buffer), right->bucket->count);
#endif #endif
return; return;
} }
@ -1088,11 +1089,10 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
else if (!memcmp(bucket->entries[ix].id, node->nodeid.id, KS_DHT_NODEID_SIZE)) { else if (!memcmp(bucket->entries[ix].id, node->nodeid.id, KS_DHT_NODEID_SIZE)) {
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char buffer[100]; char buffer[100];
printf("duplicate peer %s found at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), ix); ks_log(KS_LOG_DEBUG, "duplicate peer %s found at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), ix);
#endif #endif
bucket->entries[ix].tyme = ks_time_now_sec(); bucket->entries[ix].tyme = ks_time_now_sec();
bucket->entries[ix].flags &= DHTPEER_ACTIVE; return KS_STATUS_SUCCESS; /* already exists : leave flags unchanged */
return KS_STATUS_SUCCESS; /* already exists */
} }
} }
@ -1108,7 +1108,7 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
bucket->entries[free].type = node->type; bucket->entries[free].type = node->type;
bucket->entries[free].family = node->family; bucket->entries[free].family = node->family;
bucket->entries[free].tyme = ks_time_now_sec(); bucket->entries[free].tyme = ks_time_now_sec();
bucket->entries[free].flags &= DHTPEER_ACTIVE; bucket->entries[free].flags = DHTPEER_DUBIOUS;
if (free != expiredix) { /* are we are taking a free slot rather than replacing an expired node? */ if (free != expiredix) { /* are we are taking a free slot rather than replacing an expired node? */
++bucket->count; /* yes: increment total count */ ++bucket->count; /* yes: increment total count */
@ -1117,7 +1117,7 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
memcpy(bucket->entries[free].id, node->nodeid.id, KS_DHT_NODEID_SIZE); memcpy(bucket->entries[free].id, node->nodeid.id, KS_DHT_NODEID_SIZE);
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char buffer[100]; char buffer[100];
printf("Inserting node %s at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), free); ks_log(KS_LOG_DEBUG, "Inserting node %s at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), free);
#endif #endif
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
@ -1130,7 +1130,7 @@ ks_dht_node_t *ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t
{ {
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char buffer[100]; char buffer[100];
printf("find nodeid for: %s\n", ks_dhtrt_printableid(id, buffer)); ks_log(KS_LOG_DEBUG, "Find nodeid for: %s\n", ks_dhtrt_printableid(id, buffer));
#endif #endif
@ -1138,7 +1138,7 @@ ks_dht_node_t *ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t
#ifdef KS_DHT_DEBUGPRINTFX_ #ifdef KS_DHT_DEBUGPRINTFX_
char bufferx[100]; char bufferx[100];
if ( bucket->entries[ix].inuse == 1) { if ( bucket->entries[ix].inuse == 1) {
printf("\nbucket->entries[%d].id = %s inuse=%x\n", ix, ks_log(KS_LOG_DEBUG, "bucket->entries[%d].id = %s inuse=%x\n", ix,
ks_dhtrt_printableid(bucket->entries[ix].id, bufferx), ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
bucket->entries[ix].inuse ); bucket->entries[ix].inuse );
} }
@ -1156,13 +1156,13 @@ ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
{ {
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char buffer[100]; char buffer[100];
printf("\ndeleting node for: %s\n", ks_dhtrt_printableid(id, buffer)); ks_log(KS_LOG_DEBUG, "deleting node for: %s\n", ks_dhtrt_printableid(id, buffer));
#endif #endif
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) { for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
#ifdef KS_DHT_DEBUGPRINTFX_ #ifdef KS_DHT_DEBUGPRINTFX_
char bufferx[100];_ char bufferx[100];_
printf("\nbucket->entries[%d].id = %s inuse=%c\n", ix, ks_log(KS_LOG_DEBUG, "bucket->entries[%d].id = %s inuse=%c\n", ix,
ks_dhtrt_printableid(bucket->entries[ix].id, bufferx), ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
bucket->entries[ix].inuse ); bucket->entries[ix].inuse );
#endif #endif
@ -1202,7 +1202,7 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id,
if (bucket == 0) { /* sanity */ if (bucket == 0) { /* sanity */
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char buf[100]; char buf[100];
printf("closestbucketnodes: intermediate tree node found %s\n", ks_log(KS_LOG_DEBUG, "closestbucketnodes: intermediate tree node found %s\n",
ks_dhtrt_printableid(header->mask, buf)); ks_dhtrt_printableid(header->mask, buf));
#endif #endif
@ -1211,16 +1211,16 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id,
ks_rwl_read_lock(bucket->lock); /* get a read lock : released in load_query when the results are copied */ ks_rwl_read_lock(bucket->lock); /* get a read lock : released in load_query when the results are copied */
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100]; char buf[100];
printf("closestbucketnodes: LOCKING bucket %s\n", ks_log(KS_LOG_DEBUG, "closestbucketnodes: LOCKING bucket %s\n",
ks_dhtrt_printableid(header->mask, buf)); ks_dhtrt_printableid(header->mask, buf));
fflush(stdout); //fflush(stdout);
#endif #endif
for (uint8_t ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) { for (uint8_t ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
if ( bucket->entries[ix].inuse == 1 && /* in use */ if ( bucket->entries[ix].inuse == 1 && /* in use */
bucket->entries[ix].flags != DHTPEER_EXPIRED && /* not expired */ bucket->entries[ix].flags == DHTPEER_ACTIVE && /* not dubious or expired */
(family == ifboth || bucket->entries[ix].family == family) && /* match if family */ (family == ifboth || bucket->entries[ix].family == family) && /* match if family */
(bucket->entries[ix].type & type) && /* match type */ (bucket->entries[ix].type & type) && /* match type */
ks_dhtrt_isactive( &(bucket->entries[ix])) ) { ks_dhtrt_isactive( &(bucket->entries[ix])) ) {
@ -1239,10 +1239,11 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id,
unsigned int prev_xorix = KS_DHT_BUCKETSIZE; unsigned int prev_xorix = KS_DHT_BUCKETSIZE;
for (int ix2=0; ix2<count; ++ix2) { for (int ix2=0; ix2<count; ++ix2) {
if (memcmp(xorvalue, xors->xort[xorix].xor, KS_DHT_NODEID_SIZE) > 0) { if (memcmp(xorvalue, xors->xort[xorix].xor, KS_DHT_NODEID_SIZE) > 0) {
break; /* insert before xorix, after prev_xoris */ break; /* insert before xorix, after prev_xoris */
} }
prev_xorix = xorix; prev_xorix = xorix;
xorix = xors->xort[xorix].nextix; xorix = xors->xort[xorix].nextix;
} }
@ -1255,6 +1256,7 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id,
xors->xort[count].ix = ix; xors->xort[count].ix = ix;
xors->xort[count].nextix = xorix; /* correct forward chain */ xors->xort[count].nextix = xorix; /* correct forward chain */
if (prev_xorix < KS_DHT_BUCKETSIZE) { /* correct backward chain */ if (prev_xorix < KS_DHT_BUCKETSIZE) { /* correct backward chain */
xors->xort[prev_xorix].nextix = count; xors->xort[prev_xorix].nextix = count;
} else { } else {
@ -1278,7 +1280,7 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t
while (current) { while (current) {
#ifdef KS_DHT_DEBUGPRINTF_ #ifdef KS_DHT_DEBUGPRINTF_
char buf[100]; char buf[100];
printf(" loadquery from bucket %s count %d\n", ks_log(KS_LOG_DEBUG, " loadquery from bucket %s count %d\n",
ks_dhtrt_printableid(current->bheader->mask,buf), current->count); ks_dhtrt_printableid(current->bheader->mask,buf), current->count);
#endif #endif
int xorix = current->startix; int xorix = current->startix;
@ -1294,7 +1296,7 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t
#ifdef KS_DHT_DEBUGLOCKPRINTF_ #ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf1[100]; char buf1[100];
printf("load_query: UNLOCKING bucket %s\n", ks_log(KS_LOG_DEBUG, "load_query: UNLOCKING bucket %s\n",
ks_dhtrt_printableid(current->bheader->mask, buf1)); ks_dhtrt_printableid(current->bheader->mask, buf1));
fflush(stdout); fflush(stdout);
#endif #endif

View File

@ -90,24 +90,38 @@ void test02()
nodeid.id[0] = 1; nodeid.id[0] = 1;
status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv6, port, &peer); status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv6, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
nodeid.id[0] = 2; nodeid.id[0] = 2;
status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer); status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
nodeid.id[0] = 3; nodeid.id[0] = 3;
status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer); status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
nodeid.id[0] = 4; nodeid.id[0] = 4;
status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv6, port, &peer); status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv6, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
nodeid.id[1] = 1; nodeid.id[1] = 1;
status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer); status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
nodeid.id[19] = 1; nodeid.id[19] = 1;
status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
nodeid.id[19] = 2; nodeid.id[19] = 2;
status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
nodeid.id[19] = 3; nodeid.id[19] = 3;
status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
nodeid.id[19] = 4; nodeid.id[19] = 4;
status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer); status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
nodeid.id[19] = 5;
status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
nodeid.id[19] = 6;
status = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
int qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both); int qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both);
printf("\n*** local query count expected 3, actual %d\n", qcount); fflush(stdout); printf("\n*** local query count expected 3, actual %d\n", qcount); fflush(stdout);
@ -132,6 +146,14 @@ void test02()
qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, ifv4); qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, ifv4);
printf("\n*** AF_INET count expected 4, actual %d\n", qcount); fflush(stdout); printf("\n*** AF_INET count expected 4, actual %d\n", qcount); fflush(stdout);
nodeid.id[19] = 5;
ks_dhtrt_touch_node(rt, nodeid);
nodeid.id[19] = 6;
ks_dhtrt_touch_node(rt, nodeid);
qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, ifv4);
printf("\n*** AF_INET (after touch) count expected 6, actual %d\n", qcount); fflush(stdout);
printf("*** testbuckets - test02 finished\n"); fflush(stdout); printf("*** testbuckets - test02 finished\n"); fflush(stdout);
return; return;
@ -162,6 +184,7 @@ void test03()
++nodeid.id[1]; ++nodeid.id[1];
} }
ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
} }
for (int i=0; i<2; ++i) { for (int i=0; i<2; ++i) {
@ -173,6 +196,7 @@ void test03()
} }
ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer); ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
} }
for (int i=0; i<201; ++i) { for (int i=0; i<201; ++i) {
@ -183,6 +207,7 @@ void test03()
++nodeid.id[1]; ++nodeid.id[1];
} }
ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer); ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
} }
@ -242,6 +267,7 @@ void test04()
++nodeid.id[1]; ++nodeid.id[1];
} }
ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer);
ks_dhtrt_touch_node(rt, nodeid);
} }
@ -368,6 +394,7 @@ static void *test06ex(ks_thread_t *thread, void *data)
++nodeid.id[19]; ++nodeid.id[19];
ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer); ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer);
ks_sleep(1000); ks_sleep(1000);
ks_dhtrt_touch_node(rt, nodeid);
} }
for (int i=0; i<test06nodes; ++i) { for (int i=0; i<test06nodes; ++i) {
@ -443,6 +470,8 @@ int main(int argc, char* argv[]) {
} }
ks_init(); ks_init();
ks_global_set_default_logger(7);
ks_status_t status; ks_status_t status;
char *str = NULL; char *str = NULL;