FS-9775: Implement serialization, deserialization & repopulation for dht table

This commit is contained in:
colm 2017-01-06 17:25:52 -05:00 committed by Mike Jerris
parent 183116452b
commit 66fdf5fa19
3 changed files with 289 additions and 3 deletions

View File

@ -570,6 +570,10 @@ KS_DECLARE(ks_status_t) ks_dhtrt_release_querynodes(ks_dhtrt_querynodes_t
KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t* table);
KS_DECLARE(uint32_t) ks_dhtrt_serialize(ks_dhtrt_routetable_t* table, void** ptr);
KS_DECLARE(ks_status_t) ks_dhtrt_deserialize(ks_dhtrt_routetable_t* table, void* ptr);
/* debugging aids */
KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t* table, int level);

View File

@ -106,6 +106,8 @@ typedef struct ks_dhtrt_internal_s {
ks_dhtrt_deletednode_t *deleted_node;
ks_dhtrt_deletednode_t *free_node_ex;
uint32_t deleted_count;
uint32_t bucket_count;
uint32_t header_count;
} ks_dhtrt_internal_t;
typedef struct ks_dhtrt_xort_s {
@ -203,7 +205,6 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
ks_dht_t *dht,
ks_pool_t *pool)
{
(void)ks_dhtrt_find_relatedbucketheader;
unsigned char initmask[KS_DHT_NODEID_SIZE];
memset(initmask, 0xff, sizeof(initmask));
@ -224,6 +225,8 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP,
initial_header->flags = BHF_LEFT; /* fake left to allow splitting */
internal->buckets = initial_header;
initial_header->bucket = ks_dhtrt_create_bucket(pool);
internal->header_count = 1;
internal->bucket_count = 1;
table->pool = pool;
*tableP = table;
@ -281,6 +284,8 @@ KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **tableP)
return;
}
KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
ks_dht_nodeid_t nodeid,
enum ks_dht_nodetype_t type,
@ -500,6 +505,8 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
newleft->left1bit = newright;
ks_dhtrt_split_bucket(header, newleft, newright);
internal->header_count += 2;
++internal->bucket_count;
/* ok now we need to try again to see if the bucket has capacity */
/* which bucket do care about */
@ -1796,6 +1803,194 @@ static char *ks_dhtrt_printableid(uint8_t *id, char *buffer)
}
/*
*
* serialization and deserialization
* ---------------------------------
*/
typedef struct ks_dhtrt_serialized_bucket_s
{
uint16_t count;
char eye[4];
ks_dhtrt_nodeid_t id;
} ks_dhtrt_serialized_bucket_t;
typedef struct ks_dhtrt_serialized_routetable_s
{
uint32_t size;
uint8_t version;
uint8_t count;
char eye[4];
} ks_dhtrt_serialized_routetable_t;
#define DHTRT_SERIALIZATION_VERSION 1
static void ks_dhtrt_serialize_node(ks_dht_node_t *source, ks_dht_node_t *dest)
{
memcpy(dest, source, sizeof(ks_dht_node_t));
memset(&dest->table, 0, sizeof(void*));
memset(&dest->reflock, 0, sizeof(void*));
}
static void ks_dhtrt_serialize_bucket(ks_dhtrt_routetable_t *table,
ks_dhtrt_serialized_routetable_t *stable,
ks_dhtrt_bucket_header_t* header,
unsigned char* buffer)
{
uint8_t tzero = 0;
ks_dhtrt_serialized_bucket_t *s = (ks_dhtrt_serialized_bucket_t*)buffer;
memcpy(s->eye, "HEAD", 4);
memcpy(s->id, header->mask, KS_DHT_NODEID_SIZE);
buffer += sizeof(ks_dhtrt_serialized_bucket_t);
stable->size += sizeof(ks_dhtrt_serialized_bucket_t);
if (header->bucket != 0) {
ks_dhtrt_bucket_t* bucket = header->bucket;
memcpy(&s->count, &bucket->count, sizeof(uint8_t));
for (int i=0; i< KS_DHT_BUCKETSIZE; ++i) {
if (bucket->entries[i].inuse == 1) {
ks_dhtrt_serialize_node(bucket->entries[i].gptr, (ks_dht_node_t*)buffer);
buffer += sizeof(ks_dht_node_t);
stable->size += sizeof(ks_dht_node_t);
}
}
}
else {
memcpy(&s->count, &tzero, sizeof(uint8_t));
}
}
static void ks_dhtrt_serialize_table(ks_dhtrt_routetable_t *table,
ks_dhtrt_serialized_routetable_t *stable,
unsigned char *buffer)
{
ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
int stackix=0;
ks_dhtrt_internal_t *internal = table->internal;
ks_dhtrt_bucket_header_t *header = internal->buckets;
while (header) {
stack[stackix++] = header;
++stable->count;
ks_dhtrt_serialize_bucket(table, stable, header, buffer);
buffer = (unsigned char*)stable + stable->size;
header = header->left;
if (header == 0 && stackix > 1) {
stackix -= 2;
header = stack[stackix];
header = header->right;
}
}
return;
}
KS_DECLARE(uint32_t) ks_dhtrt_serialize(ks_dhtrt_routetable_t *table, void **ptr)
{
ks_dhtrt_internal_t *internal = table->internal;
ks_rwl_write_lock(internal->lock); /* grab write lock */
uint32_t buffer_size = 3200 * sizeof(ks_dht_node_t);
buffer_size += internal->header_count * sizeof(ks_dhtrt_serialized_bucket_t);
buffer_size += sizeof(ks_dhtrt_serialized_routetable_t);
unsigned char *buffer = (*ptr) = ks_pool_alloc(table->pool, buffer_size);
ks_dhtrt_serialized_routetable_t *stable = (ks_dhtrt_serialized_routetable_t*)buffer;
stable->size = sizeof(ks_dhtrt_serialized_routetable_t);
stable->version = DHTRT_SERIALIZATION_VERSION;
memcpy(stable->eye, "DHRT", 4);
buffer += sizeof(ks_dhtrt_serialized_routetable_t);
ks_dhtrt_serialize_table(table, stable, buffer);
ks_rwl_write_unlock(internal->lock); /* write unlock */
return stable->size;
}
static void ks_dhtrt_deserialize_node(ks_dhtrt_routetable_t *table,
ks_dht_node_t *source,
ks_dht_node_t *dest)
{
memcpy(dest, source, sizeof(ks_dht_node_t));
dest->table = table;
ks_rwl_create(&dest->reflock, table->pool);
}
KS_DECLARE(ks_status_t) ks_dhtrt_deserialize(ks_dhtrt_routetable_t *table, void* buffer)
{
ks_dhtrt_internal_t *internal = table->internal;
ks_rwl_write_lock(internal->lock); /* grab write lock */
unsigned char *ptr = (unsigned char*)buffer;
ks_dhtrt_serialized_routetable_t *stable = (ks_dhtrt_serialized_routetable_t*)buffer;
ptr += sizeof(ks_dhtrt_serialized_routetable_t);
/* unpack and chain the buckets */
for (int i=0; i<stable->count; ++i) {
ks_dhtrt_serialized_bucket_t *s = (ks_dhtrt_serialized_bucket_t*)ptr;
if (memcmp(s->eye, "HEAD", 4)) {
assert(0);
ks_rwl_write_unlock(internal->lock); /* write unlock */
return KS_STATUS_FAIL;
}
ptr += sizeof(ks_dhtrt_serialized_bucket_t);
/* currently adding the nodes individually
* need a better way to do this that is compatible with the pending
* changes for supernode support
*/
char buf[51];
ks_log(KS_LOG_DEBUG, "deserialize bucket [%s] count %d\n", ks_dhtrt_printableid(s->id, buf), s->count);
int mid = s->count >>1;
ks_dht_node_t *fnode = NULL;
ks_dht_node_t *node = NULL;
for(int i0=0; i0<s->count; ++i0) {
/* recreate the node */
ks_dht_node_t *node = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t));
if (i0 == mid) fnode = node;
ks_dhtrt_deserialize_node(table, (ks_dht_node_t*)ptr, node);
ptr += sizeof(ks_dht_node_t);
ks_dhtrt_insert_node(table, node, 0);
}
/*
* now the bucket is complete - now trigger a find.
* This staggers the series of finds. We only do this for populated tables here.
* Once the table is loaded, process_table will as normal start the ping/find process to
* update and populate the table.
*/
if (s->count > 0) {
if (fnode) {
ks_dhtrt_find(table, internal, &fnode->nodeid);
}
else if (node) {
ks_dhtrt_find(table, internal, &node->nodeid);
}
}
}
ks_rwl_write_unlock(internal->lock); /* write unlock */
return KS_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -620,8 +620,6 @@ void test09()
char ipv4[] = "123.123.123.123";
unsigned short port = 7000;
/* build a delete queue */
int cix=0;
for(int i0=0, i1=0; i0<150; ++i0, ++i1) {
@ -655,6 +653,88 @@ void test09()
typedef struct ks_dhtrt_serialized_routetable_s
{
uint32_t size;
uint8_t version;
uint8_t count;
char eye[4];
} ks_dhtrt_serialized_routetable_t;
void test10()
{
printf("**** testbuckets - test10 start\n"); fflush(stdout);
ks_dht_node_t *peer;
memset(g_nodeid1.id, 0xef, KS_DHT_NODEID_SIZE);
memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
char ipv6[] = "1234:1234:1234:1234";
char ipv4[] = "123.123.123.123";
unsigned short port = 7000;
int cix=0;
for(int i0=0, i1=0; i0<2500; ++i0, ++i1) {
if (i0%20 == 0) {
g_nodeid2.id[cix]>>=1;
//ks_dhtrt_dump(rt, 7);
if ( g_nodeid2.id[cix] == 0) ++cix;
g_nodeid2.id[19] = 0;
}
else {
++g_nodeid2.id[19];
}
ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, KS_DHTRT_CREATE_DEFAULT, &peer);
ks_dhtrt_touch_node(rt, g_nodeid2);
ks_dhtrt_release_node(peer);
}
/* this should expire all nodes after 15 minutes and 3 pings */
void *buffer = NULL;
uint32_t size = ks_dhtrt_serialize(rt, &buffer);
if (size > 0) {
ks_dhtrt_serialized_routetable_t* p = (ks_dhtrt_serialized_routetable_t*)buffer;
printf("\n\ntest10: version %d bucket count %d size %d\n\n", p->version, p->count, p->size);
ks_dhtrt_dump(rt, 7);
}
else {
printf("test10: error on serialize\n");
return;
}
ks_dhtrt_routetable_t* rt2;
ks_dhtrt_initroute(&rt2, dht, pool);
ks_dhtrt_deserialize(rt2, buffer);
ks_dhtrt_dump(rt2, 7);
ks_dht_nodeid_t id;
memset(id.id, 0xef, 20);
id.id[0] = 0x0e;
id.id[19] = 0x05;
ks_dhtrt_touch_node(rt2, id);
ks_dht_node_t* n = ks_dhtrt_find_node(rt2, id);
if (n == NULL) {
printf("test10: failed Unable to find reloaded node \n");
exit(200);
}
ks_dhtrt_deinitroute(&rt2);
printf("test10: complete\n");
return;
}
@ -1152,6 +1232,13 @@ int main(int argc, char *argv[]) {
continue;
}
if (tests[tix] == 10) {
ks_dhtrt_initroute(&rt, dht, pool);
test10();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 30) {