Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor slotmap update functions #107

Merged
merged 7 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 46 additions & 95 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,7 @@ static int cluster_master_slave_mapping_with_name(valkeyClusterContext *cc,
/**
* Parse the "cluster slots" command reply to nodes dict.
*/
static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply,
int flags) {
static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) {
int ret;
cluster_slot *slot = NULL;
dict *nodes = NULL;
Expand All @@ -685,22 +684,20 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply,
valkeyClusterNode *master = NULL, *slave;
uint32_t i, idx;

if (reply == NULL) {
return NULL;
if (reply->type != VALKEY_REPLY_ARRAY) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type");
goto error;
}
if (reply->elements == 0) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "No slot information");
goto error;
}

nodes = dictCreate(&clusterNodesDictType, NULL);
if (nodes == NULL) {
goto oom;
}

if (reply->type != VALKEY_REPLY_ARRAY || reply->elements <= 0) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command(cluster slots) reply error: "
"reply is not an array.");
goto error;
}

for (i = 0; i < reply->elements; i++) {
elem_slots = reply->element[i];
if (elem_slots->type != VALKEY_REPLY_ARRAY ||
Expand Down Expand Up @@ -819,7 +816,7 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply,
}

slot = NULL;
} else if (flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) {
} else if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) {
slave = node_get_with_slots(cc, elem_ip, elem_port,
VALKEY_ROLE_SLAVE);
if (slave == NULL) {
Expand Down Expand Up @@ -864,8 +861,7 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply,
/**
* Parse the "cluster nodes" command reply to nodes dict.
*/
static dict *parse_cluster_nodes(valkeyClusterContext *cc, char *str, int str_len,
int flags) {
static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
int ret;
dict *nodes = NULL;
dict *nodes_name = NULL;
Expand All @@ -880,13 +876,18 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, char *str, int str_le
int k;
int len;

if (reply->type != VALKEY_REPLY_STRING) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type");
goto error;
}

nodes = dictCreate(&clusterNodesDictType, NULL);
if (nodes == NULL) {
goto oom;
}

start = str;
end = start + str_len;
start = reply->str;
end = start + reply->len;

line_start = start;

Expand Down Expand Up @@ -940,19 +941,20 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, char *str, int str_le
freeValkeyClusterNode(master);
goto oom;
}

ret = dictAdd(nodes, key, master);
if (ret != DICT_OK) {
// Key already exists, but possibly an OOM error
valkeyClusterSetError(
cc, VALKEY_ERR_OTHER,
"The address already exists in the nodes");
if (dictFind(nodes, key) != NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Duplicate addresses in cluster nodes response");
sdsfree(key);
freeValkeyClusterNode(master);
goto error;
}
if (dictAdd(nodes, key, master) != DICT_OK) {
sdsfree(key);
freeValkeyClusterNode(master);
goto oom;
}

if (flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) {
if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) {
ret = cluster_master_slave_mapping_with_name(
cc, &nodes_name, master, master->name);
if (ret != VALKEY_OK) {
Expand Down Expand Up @@ -1004,7 +1006,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, char *str, int str_le

}
// add slave node
else if ((flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) &&
else if ((cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) &&
(role_len >= 5 && memcmp(role, "slave", 5) == 0)) {
slave = node_get_with_nodes(cc, part, count_part,
VALKEY_ROLE_SLAVE);
Expand Down Expand Up @@ -1064,92 +1066,41 @@ static int clusterUpdateRouteSendCommand(valkeyClusterContext *cc,
VALKEY_COMMAND_CLUSTER_SLOTS :
VALKEY_COMMAND_CLUSTER_NODES);
if (valkeyAppendCommand(c, cmd) != VALKEY_OK) {
const char *msg = (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS ?
"Command (cluster slots) send error." :
"Command (cluster nodes) send error.");
valkeyClusterSetError(cc, c->err, msg);
valkeyClusterSetError(cc, c->err, c->errstr);
return VALKEY_ERR;
}
/* Flush buffer to socket. */
if (valkeyBufferWrite(c, NULL) == VALKEY_ERR)
if (valkeyBufferWrite(c, NULL) == VALKEY_ERR) {
valkeyClusterSetError(cc, c->err, c->errstr);
return VALKEY_ERR;
}

return VALKEY_OK;
}

/* Receives and handles a CLUSTER SLOTS reply from node with context c. */
static int handleClusterSlotsReply(valkeyClusterContext *cc, valkeyContext *c) {
/* Receives and handles a CLUSTER SLOTS or CLUSTER NODES reply from node with
* context c. */
static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc,
valkeyContext *c) {
valkeyReply *reply = NULL;
int result = valkeyGetReply(c, (void **)&reply);
if (result != VALKEY_OK) {
if (c->err == VALKEY_ERR_TIMEOUT) {
valkeyClusterSetError(
cc, c->err,
"Command (cluster slots) reply error (socket timeout)");
} else {
valkeyClusterSetError(
cc, VALKEY_ERR_OTHER,
"Command (cluster slots) reply error (NULL).");
}
return VALKEY_ERR;
} else if (reply->type != VALKEY_REPLY_ARRAY) {
if (reply->type == VALKEY_REPLY_ERROR) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, reply->str);
} else {
valkeyClusterSetError(
cc, VALKEY_ERR_OTHER,
"Command (cluster slots) reply error: type is not array.");
}
freeReplyObject(reply);
if (valkeyGetReply(c, (void **)&reply) != VALKEY_OK) {
valkeyClusterSetError(cc, c->err, c->errstr);
return VALKEY_ERR;
}

dict *nodes = parse_cluster_slots(cc, reply, cc->flags);
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
}

/* Receives and handles a CLUSTER NODES reply from node with context c. */
static int handleClusterNodesReply(valkeyClusterContext *cc, valkeyContext *c) {
valkeyReply *reply = NULL;
int result = valkeyGetReply(c, (void **)&reply);
if (result != VALKEY_OK) {
if (c->err == VALKEY_ERR_TIMEOUT) {
valkeyClusterSetError(cc, c->err,
"Command (cluster nodes) reply error "
"(socket timeout)");
} else {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command (cluster nodes) reply error "
"(NULL).");
}
return VALKEY_ERR;
} else if (reply->type != VALKEY_REPLY_STRING) {
if (reply->type == VALKEY_REPLY_ERROR) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, reply->str);
} else {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command(cluster nodes) reply error: "
"type is not string.");
}
if (reply->type == VALKEY_REPLY_ERROR) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, reply->str);
freeReplyObject(reply);
return VALKEY_ERR;
}

dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags);
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
}

/* Receives and handles a CLUSTER SLOTS or CLUSTER NODES reply from node with
* context c. */
static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc,
valkeyContext *c) {
dict *nodes;
if (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS) {
return handleClusterSlotsReply(cc, c);
nodes = parse_cluster_slots(cc, reply);
} else {
return handleClusterNodesReply(cc, c);
nodes = parse_cluster_nodes(cc, reply);
}
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
}

/**
Expand Down Expand Up @@ -3025,7 +2976,7 @@ void clusterSlotsReplyCallback(valkeyAsyncContext *ac, void *r,
}

valkeyClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_slots(cc, reply, cc->flags);
dict *nodes = parse_cluster_slots(cc, reply);
if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) {
/* Ignore failures for now */
}
Expand All @@ -3046,7 +2997,7 @@ void clusterNodesReplyCallback(valkeyAsyncContext *ac, void *r,
}

valkeyClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags);
dict *nodes = parse_cluster_nodes(cc, reply);
if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) {
/* Ignore failures for now */
}
Expand Down
2 changes: 2 additions & 0 deletions tests/ct_out_of_memory_handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ void test_alloc_failure_handling(void) {
prepare_allocation_test(cc, i);
result = valkeyClusterConnect2(cc);
assert(result == VALKEY_ERR);
ASSERT_STR_EQ(cc->errstr, "Out of memory");
}

prepare_allocation_test(cc, 128);
Expand Down Expand Up @@ -521,6 +522,7 @@ void test_alloc_failure_handling_async(void) {
prepare_allocation_test(acc->cc, i);
result = valkeyClusterConnect2(acc->cc);
assert(result == VALKEY_ERR);
ASSERT_STR_EQ(acc->cc->errstr, "Out of memory");
}

prepare_allocation_test(acc->cc, 126);
Expand Down
Loading