Skip to content

Commit

Permalink
Use BULKI to serialize kvtags on the server
Browse files Browse the repository at this point in the history
  • Loading branch information
houjun committed Oct 29, 2024
1 parent 2e5de75 commit 60981a2
Showing 1 changed file with 123 additions and 5 deletions.
128 changes: 123 additions & 5 deletions src/server/pdc_server_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
#include "mercury_hash_table.h"
#include "pdc_malloc.h"
#include "string_utils.h"
#include "bulki.h"
#include "bulki_serde.h"

#define BLOOM_TYPE_T counting_bloom_t
#define BLOOM_NEW new_counting_bloom
Expand All @@ -74,6 +76,8 @@ double server_update_time_g = 0.0;
double server_hash_insert_time_g = 0.0;
double server_bloom_init_time_g = 0.0;
uint32_t n_metadata_g = 0;
uint32_t metadata_total_size_g = 0;
uint32_t metadata_total_count_g = 0;

pbool_t
PDC_region_is_identical(region_info_transfer_t reg1, region_info_transfer_t reg2)
Expand Down Expand Up @@ -2900,6 +2904,21 @@ PDC_Server_add_kvtag_sqlite3(metadata_add_kvtag_in_t *in, metadata_add_tag_out_t
return ret_value;
}

static uint64_t
PDC_get_kvtag_size(pdc_kvtag_t *kvtag)
{
uint64_t size = 0;

if (NULL == kvtag)
return size;

size += sizeof(pdc_kvtag_t);
size += strlen(kvtag->name) + 1;
size += kvtag->size;

return size;
}

static perr_t
PDC_Server_add_kvtag_someta(metadata_add_kvtag_in_t *in, metadata_add_tag_out_t *out)
{
Expand All @@ -2917,6 +2936,8 @@ PDC_Server_add_kvtag_someta(metadata_add_kvtag_in_t *in, metadata_add_tag_out_t
if (target != NULL) {
PDC_add_kvtag_to_list(&target->kvtag_list_head, &in->kvtag);
out->ret = 1;
metadata_total_size_g += PDC_get_kvtag_size(&in->kvtag);
metadata_total_count_g++;
} // if (lookup_value != NULL)
else {
// Object not found
Expand All @@ -2928,6 +2949,8 @@ PDC_Server_add_kvtag_someta(metadata_add_kvtag_in_t *in, metadata_add_tag_out_t
cont_lookup_value = hash_table_lookup(container_hash_table_g, &hash_key);
if (cont_lookup_value != NULL) {
PDC_add_kvtag_to_list(&cont_lookup_value->kvtag_list_head, &in->kvtag);
metadata_total_size_g += PDC_get_kvtag_size(&in->kvtag);
metadata_total_count_g++;
out->ret = 1;
}
else {
Expand Down Expand Up @@ -3194,6 +3217,98 @@ PDC_Server_get_kvtag_someta(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_
return ret_value;
}

// Serialize all kvtags in current server to nsplit buffers, each with
// approx the same number of kvtags so they can be sent to (node-local) clients
// for parallel search
static perr_t
PDC_Server_seralize_kvtag_someta(int nbuf, void **bufs, uint64_t *buf_sizes)
{
perr_t ret_value = SUCCEED;
pdc_hash_table_entry_head *head;
pdc_metadata_t * elt;
pdc_kvtag_list_t * kvtag_list_elt;
HashTableIterator hash_table_iter;
int n_entry, nkvtag_in_buf = 0, nkvtag_per_buf = 0, buf_i = 0;
int is_prev_objid = 0;
HashTablePair pair;
BULKI_Entity *key, *obj_key, *val, *obj_val;
BULKI *bulki;

nkvtag_per_buf = ceil(metadata_total_count_g / nbuf);

if (metadata_hash_table_g != NULL) {

n_entry = hash_table_num_entries(metadata_hash_table_g);
hash_table_iterate(metadata_hash_table_g, &hash_table_iter);

// Init first BULKI buf
bulki = BULKI_init(nkvtag_per_buf);

while (n_entry != 0 && hash_table_iter_has_more(&hash_table_iter)) {
pair = hash_table_iter_next(&hash_table_iter);
head = pair.value;
DL_FOREACH(head->metadata, elt)
{
obj_key = BULKI_ENTITY("_pdc_id", 1, PDC_STRING, PDC_CLS_ITEM);
obj_val = BULKI_ENTITY(&elt->obj_id, 1, PDC_UINT64, PDC_CLS_ITEM);
// Add obj_id before all kvtags of an obj
if (is_prev_objid == 0) {
BULKI_put(bulki, obj_key, obj_val);
is_prev_objid = 1;
}

DL_FOREACH(elt->kvtag_list_head, kvtag_list_elt)
{
if (nkvtag_in_buf >= nkvtag_per_buf) {
// Serialize the data after the current one reached limit
bufs[buf_i] = BULKI_serialize(bulki, &buf_sizes[buf_i]);
BULKI_free(bulki, 1);
buf_i++;
nkvtag_in_buf = 0;
}
if (nkvtag_in_buf == 0 && buf_i > 0) {
if (buf_i >= nbuf) {
printf("==PDC_SERVER[%d]: Error with %s, buf ptr overflow!\n",
pdc_server_rank_g, __func__);
ret_value = FAIL;
goto done;
}
// Init BULKI buf
bulki = BULKI_init(nkvtag_per_buf);
// Add obj_id before all kvtags of an obj
BULKI_put(bulki, obj_key, obj_val);
is_prev_objid = 1;
}

// Add to a BULKI buffer
key = BULKI_ENTITY(kvtag_list_elt->kvtag->name, 1, PDC_STRING, PDC_CLS_ITEM);
if (kvtag_list_elt->kvtag->type == PDC_STRING) {
val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, 1,
kvtag_list_elt->kvtag->type, PDC_CLS_ITEM);
}
else {
val = BULKI_ENTITY(kvtag_list_elt->kvtag->value, kvtag_list_elt->kvtag->size,
kvtag_list_elt->kvtag->type, PDC_CLS_ITEM);
}
BULKI_put(bulki, key, val);
is_prev_objid = 0;
nkvtag_in_buf++;
} // End for each kvtag in list
} // End for each metadata from hash table entry
} // End looping metadata hash table

// Serialize last buf
bufs[buf_i] = BULKI_serialize(bulki, &buf_sizes[buf_i]);
} // if (metadata_hash_table_g != NULL)
else {
printf("==PDC_SERVER: metadata_hash_table_g not initialized!\n");
ret_value = FAIL;
}

done:
return ret_value;
}

perr_t
PDC_Server_get_kvtag(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_t *out)
{
Expand Down Expand Up @@ -3276,17 +3391,18 @@ PDC_Server_get_kvtag(metadata_get_kvtag_in_t *in, metadata_get_kvtag_out_t *out)
FUNC_LEAVE(ret_value);
}

static perr_t
static uint64_t
PDC_del_kvtag_value_from_list(pdc_kvtag_list_t **list_head, char *key)
{
perr_t ret_value = SUCCEED;
uint64_t kvtag_size;
pdc_kvtag_list_t *elt;

FUNC_ENTER(NULL);

DL_FOREACH(*list_head, elt)
{
if (strcmp(elt->kvtag->name, key) == 0) {
kvtag_size = PDC_get_kvtag_size(elt->kvtag);
free(elt->kvtag->name);
free(elt->kvtag->value);
free(elt->kvtag);
Expand All @@ -3298,7 +3414,7 @@ PDC_del_kvtag_value_from_list(pdc_kvtag_list_t **list_head, char *key)

fflush(stdout);

FUNC_LEAVE(ret_value);
FUNC_LEAVE(kvtag_size);
}

static perr_t
Expand Down Expand Up @@ -3369,7 +3485,8 @@ PDC_Server_del_kvtag_someta(metadata_get_kvtag_in_t *in, metadata_add_tag_out_t
pdc_metadata_t *target;
target = find_metadata_by_id_from_list(lookup_value->metadata, obj_id);
if (target != NULL) {
ret_value = PDC_del_kvtag_value_from_list(&target->kvtag_list_head, in->key);
metadata_total_size_g -= PDC_del_kvtag_value_from_list(&target->kvtag_list_head, in->key);
metadata_total_count_g--;
out->ret = 1;
}
else {
Expand All @@ -3382,7 +3499,8 @@ PDC_Server_del_kvtag_someta(metadata_get_kvtag_in_t *in, metadata_add_tag_out_t
else {
cont_lookup_value = hash_table_lookup(container_hash_table_g, &hash_key);
if (cont_lookup_value != NULL) {
PDC_del_kvtag_value_from_list(&cont_lookup_value->kvtag_list_head, in->key);
metadata_total_size_g -= PDC_del_kvtag_value_from_list(&cont_lookup_value->kvtag_list_head, in->key);
metadata_total_count_g--;
out->ret = 1;
}
else {
Expand Down

0 comments on commit 60981a2

Please sign in to comment.