From 66fdf5fa192bb948312efb399f43ec7798610f90 Mon Sep 17 00:00:00 2001 From: colm <colm@freeswitch1> Date: Fri, 6 Jan 2017 17:25:52 -0500 Subject: [PATCH] FS-9775: Implement serialization, deserialization & repopulation for dht table --- libs/libks/src/dht/ks_dht.h | 4 + libs/libks/src/dht/ks_dht_bucket.c | 197 ++++++++++++++++++++++++++++- libs/libks/test/testbuckets.c | 91 ++++++++++++- 3 files changed, 289 insertions(+), 3 deletions(-) diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index a4ca006c58..8d183a4c4a 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -570,6 +570,10 @@ KS_DECLARE(ks_status_t) ks_dhtrt_release_querynodes(ks_dhtrt_querynodes_t KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t* table); +KS_DECLARE(uint32_t) ks_dhtrt_serialize(ks_dhtrt_routetable_t* table, void** ptr); +KS_DECLARE(ks_status_t) ks_dhtrt_deserialize(ks_dhtrt_routetable_t* table, void* ptr); + + /* debugging aids */ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t* table, int level); diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index 72063c8630..50e7526b5a 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -106,6 +106,8 @@ typedef struct ks_dhtrt_internal_s { ks_dhtrt_deletednode_t *deleted_node; ks_dhtrt_deletednode_t *free_node_ex; uint32_t deleted_count; + uint32_t bucket_count; + uint32_t header_count; } ks_dhtrt_internal_t; typedef struct ks_dhtrt_xort_s { @@ -203,7 +205,6 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_dht_t *dht, ks_pool_t *pool) { -(void)ks_dhtrt_find_relatedbucketheader; unsigned char initmask[KS_DHT_NODEID_SIZE]; memset(initmask, 0xff, sizeof(initmask)); @@ -224,6 +225,8 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, initial_header->flags = BHF_LEFT; /* fake left to allow splitting */ internal->buckets = initial_header; initial_header->bucket = ks_dhtrt_create_bucket(pool); + internal->header_count = 1; + internal->bucket_count = 1; table->pool = pool; *tableP = table; @@ -281,6 +284,8 @@ KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **tableP) return; } + + KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid, enum ks_dht_nodetype_t type, @@ -500,6 +505,8 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no newleft->left1bit = newright; ks_dhtrt_split_bucket(header, newleft, newright); + internal->header_count += 2; + ++internal->bucket_count; /* ok now we need to try again to see if the bucket has capacity */ /* which bucket do care about */ @@ -1796,6 +1803,194 @@ static char *ks_dhtrt_printableid(uint8_t *id, char *buffer) } +/* + * + * serialization and deserialization + * --------------------------------- +*/ + +typedef struct ks_dhtrt_serialized_bucket_s +{ + uint16_t count; + char eye[4]; + ks_dhtrt_nodeid_t id; +} ks_dhtrt_serialized_bucket_t; + +typedef struct ks_dhtrt_serialized_routetable_s +{ + uint32_t size; + uint8_t version; + uint8_t count; + char eye[4]; +} ks_dhtrt_serialized_routetable_t; + +#define DHTRT_SERIALIZATION_VERSION 1 + +static void ks_dhtrt_serialize_node(ks_dht_node_t *source, ks_dht_node_t *dest) +{ + memcpy(dest, source, sizeof(ks_dht_node_t)); + memset(&dest->table, 0, sizeof(void*)); + memset(&dest->reflock, 0, sizeof(void*)); +} + +static void ks_dhtrt_serialize_bucket(ks_dhtrt_routetable_t *table, + ks_dhtrt_serialized_routetable_t *stable, + ks_dhtrt_bucket_header_t* header, + unsigned char* buffer) +{ + uint8_t tzero = 0; + ks_dhtrt_serialized_bucket_t *s = (ks_dhtrt_serialized_bucket_t*)buffer; + + memcpy(s->eye, "HEAD", 4); + memcpy(s->id, header->mask, KS_DHT_NODEID_SIZE); + buffer += sizeof(ks_dhtrt_serialized_bucket_t); + stable->size += sizeof(ks_dhtrt_serialized_bucket_t); + + if (header->bucket != 0) { + ks_dhtrt_bucket_t* bucket = header->bucket; + + memcpy(&s->count, &bucket->count, sizeof(uint8_t)); + + for (int i=0; i< KS_DHT_BUCKETSIZE; ++i) { + if (bucket->entries[i].inuse == 1) { + ks_dhtrt_serialize_node(bucket->entries[i].gptr, (ks_dht_node_t*)buffer); + buffer += sizeof(ks_dht_node_t); + stable->size += sizeof(ks_dht_node_t); + } + } + } + else { + memcpy(&s->count, &tzero, sizeof(uint8_t)); + } +} + +static void ks_dhtrt_serialize_table(ks_dhtrt_routetable_t *table, + ks_dhtrt_serialized_routetable_t *stable, + unsigned char *buffer) +{ + ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8]; + int stackix=0; + + ks_dhtrt_internal_t *internal = table->internal; + ks_dhtrt_bucket_header_t *header = internal->buckets; + + while (header) { + stack[stackix++] = header; + + ++stable->count; + + ks_dhtrt_serialize_bucket(table, stable, header, buffer); + buffer = (unsigned char*)stable + stable->size; + + header = header->left; + + if (header == 0 && stackix > 1) { + stackix -= 2; + header = stack[stackix]; + header = header->right; + } + } + return; +} + +KS_DECLARE(uint32_t) ks_dhtrt_serialize(ks_dhtrt_routetable_t *table, void **ptr) +{ + ks_dhtrt_internal_t *internal = table->internal; + ks_rwl_write_lock(internal->lock); /* grab write lock */ + + uint32_t buffer_size = 3200 * sizeof(ks_dht_node_t); + buffer_size += internal->header_count * sizeof(ks_dhtrt_serialized_bucket_t); + buffer_size += sizeof(ks_dhtrt_serialized_routetable_t); + unsigned char *buffer = (*ptr) = ks_pool_alloc(table->pool, buffer_size); + + ks_dhtrt_serialized_routetable_t *stable = (ks_dhtrt_serialized_routetable_t*)buffer; + stable->size = sizeof(ks_dhtrt_serialized_routetable_t); + stable->version = DHTRT_SERIALIZATION_VERSION; + memcpy(stable->eye, "DHRT", 4); + + buffer += sizeof(ks_dhtrt_serialized_routetable_t); + + ks_dhtrt_serialize_table(table, stable, buffer); + ks_rwl_write_unlock(internal->lock); /* write unlock */ + return stable->size; +} + + +static void ks_dhtrt_deserialize_node(ks_dhtrt_routetable_t *table, + ks_dht_node_t *source, + ks_dht_node_t *dest) +{ + memcpy(dest, source, sizeof(ks_dht_node_t)); + dest->table = table; + ks_rwl_create(&dest->reflock, table->pool); +} + + +KS_DECLARE(ks_status_t) ks_dhtrt_deserialize(ks_dhtrt_routetable_t *table, void* buffer) +{ + ks_dhtrt_internal_t *internal = table->internal; + ks_rwl_write_lock(internal->lock); /* grab write lock */ + unsigned char *ptr = (unsigned char*)buffer; + + ks_dhtrt_serialized_routetable_t *stable = (ks_dhtrt_serialized_routetable_t*)buffer; + ptr += sizeof(ks_dhtrt_serialized_routetable_t); + + /* unpack and chain the buckets */ + for (int i=0; i<stable->count; ++i) { + + ks_dhtrt_serialized_bucket_t *s = (ks_dhtrt_serialized_bucket_t*)ptr; + + if (memcmp(s->eye, "HEAD", 4)) { + assert(0); + ks_rwl_write_unlock(internal->lock); /* write unlock */ + return KS_STATUS_FAIL; + } + + ptr += sizeof(ks_dhtrt_serialized_bucket_t); + + /* currently adding the nodes individually + * need a better way to do this that is compatible with the pending + * changes for supernode support + */ + + char buf[51]; + ks_log(KS_LOG_DEBUG, "deserialize bucket [%s] count %d\n", ks_dhtrt_printableid(s->id, buf), s->count); + + int mid = s->count >>1; + ks_dht_node_t *fnode = NULL; + ks_dht_node_t *node = NULL; + + for(int i0=0; i0<s->count; ++i0) { + /* recreate the node */ + ks_dht_node_t *node = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t)); + if (i0 == mid) fnode = node; + ks_dhtrt_deserialize_node(table, (ks_dht_node_t*)ptr, node); + ptr += sizeof(ks_dht_node_t); + ks_dhtrt_insert_node(table, node, 0); + } + + /* + * now the bucket is complete - now trigger a find. + * This staggers the series of finds. We only do this for populated tables here. + * Once the table is loaded, process_table will as normal start the ping/find process to + * update and populate the table. + */ + + if (s->count > 0) { + if (fnode) { + ks_dhtrt_find(table, internal, &fnode->nodeid); + } + else if (node) { + ks_dhtrt_find(table, internal, &node->nodeid); + } + } + } + + ks_rwl_write_unlock(internal->lock); /* write unlock */ + return KS_STATUS_SUCCESS; +} + + /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/test/testbuckets.c b/libs/libks/test/testbuckets.c index 18890f251a..fd1a07c043 100644 --- a/libs/libks/test/testbuckets.c +++ b/libs/libks/test/testbuckets.c @@ -620,8 +620,6 @@ void test09() char ipv4[] = "123.123.123.123"; unsigned short port = 7000; - /* build a delete queue */ - int cix=0; for(int i0=0, i1=0; i0<150; ++i0, ++i1) { @@ -655,6 +653,88 @@ void test09() +typedef struct ks_dhtrt_serialized_routetable_s +{ + uint32_t size; + uint8_t version; + uint8_t count; + char eye[4]; +} ks_dhtrt_serialized_routetable_t; + + +void test10() +{ + printf("**** testbuckets - test10 start\n"); fflush(stdout); + + ks_dht_node_t *peer; + memset(g_nodeid1.id, 0xef, KS_DHT_NODEID_SIZE); + memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE); + + char ipv6[] = "1234:1234:1234:1234"; + char ipv4[] = "123.123.123.123"; + unsigned short port = 7000; + + int cix=0; + + for(int i0=0, i1=0; i0<2500; ++i0, ++i1) { + if (i0%20 == 0) { + g_nodeid2.id[cix]>>=1; + //ks_dhtrt_dump(rt, 7); + if ( g_nodeid2.id[cix] == 0) ++cix; + g_nodeid2.id[19] = 0; + } + else { + ++g_nodeid2.id[19]; + } + ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, KS_DHTRT_CREATE_DEFAULT, &peer); + ks_dhtrt_touch_node(rt, g_nodeid2); + ks_dhtrt_release_node(peer); + } + + /* this should expire all nodes after 15 minutes and 3 pings */ + void *buffer = NULL; + uint32_t size = ks_dhtrt_serialize(rt, &buffer); + + + if (size > 0) { + ks_dhtrt_serialized_routetable_t* p = (ks_dhtrt_serialized_routetable_t*)buffer; + printf("\n\ntest10: version %d bucket count %d size %d\n\n", p->version, p->count, p->size); + ks_dhtrt_dump(rt, 7); + } + else { + printf("test10: error on serialize\n"); + return; + } + + + ks_dhtrt_routetable_t* rt2; + ks_dhtrt_initroute(&rt2, dht, pool); + ks_dhtrt_deserialize(rt2, buffer); + ks_dhtrt_dump(rt2, 7); + + ks_dht_nodeid_t id; + memset(id.id, 0xef, 20); + id.id[0] = 0x0e; + id.id[19] = 0x05; + + ks_dhtrt_touch_node(rt2, id); + ks_dht_node_t* n = ks_dhtrt_find_node(rt2, id); + + if (n == NULL) { + printf("test10: failed Unable to find reloaded node \n"); + exit(200); + } + + + ks_dhtrt_deinitroute(&rt2); + + printf("test10: complete\n"); + + return; + +} + + @@ -1152,6 +1232,13 @@ int main(int argc, char *argv[]) { continue; } + if (tests[tix] == 10) { + ks_dhtrt_initroute(&rt, dht, pool); + test10(); + ks_dhtrt_deinitroute(&rt); + continue; + } + if (tests[tix] == 30) {