Skip to content

Commit

Permalink
feat(kv): optimize consensus cluster mode (#318)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Sep 13, 2024
1 parent 64d0941 commit 81c54d9
Showing 1 changed file with 69 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ConsensusProxyClusterModeProvider extends AbstractProxyClusterModeP
private static final Logger logger = LoggerFactory.getLogger(ConsensusProxyClusterModeProvider.class);

private final ReentrantLock lock = new ReentrantLock();
private final ReentrantLock initLock = new ReentrantLock();
private ConsensusLeaderSelector leaderSelector;
private volatile ProxyNode leader;
private volatile ProxyClusterSlotMap slotMap;
Expand Down Expand Up @@ -65,16 +66,46 @@ public void init() {
}
//add leader change listener
addLeaderChangeListener();
//init
//init leader or follower
initLeaderOrFollower();
//heartbeat to follower if current node is leader
startHeartbeatToSlave();
//heartbeat to leader if current node is follower
startHeartbeatToMaster();
}

private void addLeaderChangeListener() {
//add leader change listener
leaderSelector.addConsensusLeaderChangeListener(() -> {
initLock.lock();
try {
leader = leaderSelector.getLeader();
initLeaderOrFollower();
} finally {
initLock.unlock();
}
});
//schedule for reveal all the details
schedule.scheduleAtFixedRate(() -> {
initLock.lock();
try {
ProxyNode newLeader = leaderSelector.getLeader();
if (newLeader != null && leader != newLeader) {
leader = newLeader;
initLeaderOrFollower();
}
} finally {
initLock.unlock();
}
}, 10, 10, TimeUnit.SECONDS);
}

private void initLeaderOrFollower() {
if (currentNodeLeader()) {
initLeader();
} else {
initFollower();
}
//heartbeat to follower if current node is leader
startHeartbeatToSlave();
//heartbeat to leader if current node is follower
startHeartbeatToMaster();
}

private void initLeader() {
Expand Down Expand Up @@ -122,24 +153,6 @@ private void initFollower() {
updateSlotMap(this.slotMap, newSlotMap, "initFollower", false);
}

private void sleep(long ms) {
try {
TimeUnit.MILLISECONDS.sleep(ms);
} catch (InterruptedException e) {
logger.error(e.toString(), e);
}
}

private void addLeaderChangeListener() {
leaderSelector.addConsensusLeaderChangeListener(() -> {
leader = leaderSelector.getLeader();
if (currentNodeLeader()) {
initLeader();
} else {
initFollower();
}
});
}

private void startHeartbeatToSlave() {
int intervalSeconds = ClusterModeConfig.clusterModeHeartbeatIntervalSeconds();
Expand All @@ -154,7 +167,7 @@ private void startHeartbeatToMaster() {
private ProxyClusterSlotMap getSlotMapFromLeader() {
Reply reply;
try {
if (leader.equals(current())) {
if (currentNodeLeader()) {
return ProxyClusterSlotMapUtils.localSlotMap(current(), leaderSelector.getSlotMap());
}
reply = sendCmd(leader, ClusterModeCmd.send_get_slot_map_from_leader, "{}");
Expand Down Expand Up @@ -220,7 +233,7 @@ public Reply proxyHeartbeat(Command command) {
return new ErrorReply("ERR target not follower");
}
if (cmd == ClusterModeCmd.send_heartbeat_to_follower) {
return followerReceiveLeaderHeartbeat(source, data);
return followerReceiveLeaderHeartbeat(data);
}
if (cmd == ClusterModeCmd.send_slot_map_to_follower) {
return followerReceiveNewSlotMap(source, data);
Expand Down Expand Up @@ -259,31 +272,37 @@ private Reply followerReceiveNewSlotMap(ProxyNode leader, JSONObject data) {
return StatusReply.OK;
}

private Reply followerReceiveLeaderHeartbeat(ProxyNode leader, JSONObject data) {
private Reply followerReceiveLeaderHeartbeat(JSONObject data) {
String md5 = data.getString("md5");
if (md5 != null && slotMap != null && !slotMap.getMd5().equals(md5)) {
try {
executor.submit(() -> {
ProxyClusterSlotMap newSlotMap = getSlotMapFromLeader();
updateSlotMap(slotMap, newSlotMap, "heartbeat-md5-check|leader=" + leader, false);
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
updateSlotMapFromMaster();
}
JSONObject json = new JSONObject();
json.put("status", ClusterModeStatus.getStatus().getValue());
return new BulkReply(Utils.stringToBytes(json.toJSONString()));
}

private void updateSlotMapFromMaster() {
try {
executor.submit(() -> {
ProxyClusterSlotMap newSlotMap = getSlotMapFromLeader();
updateSlotMap(slotMap, newSlotMap, "heartbeat-md5-check|leader=" + leader, false);
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

//=====leader=====

private Reply leaderReceiveSlaveHeartbeat(ProxyNode follower, JSONObject data) {
pendingNodes.add(follower);
if (logger.isDebugEnabled()) {
logger.debug("leader receive follower heartbeat, follower = {}, data = {}", follower, data);
}
return StatusReply.OK;
JSONObject response = new JSONObject();
response.put("md5", slotMap.getMd5());
return new BulkReply(Utils.stringToBytes(response.toJSONString()));
}

//====heartbeat=====
Expand Down Expand Up @@ -343,6 +362,13 @@ private void sendHeartbeatToLeader0() {
if (reply instanceof ErrorReply) {
logger.error("send heartbeat to leader error, leader = {}, error = {}", targetLeader, ((ErrorReply) reply).getError());
}
if (reply instanceof BulkReply) {
JSONObject json = JSONObject.parseObject(Utils.bytesToString(((BulkReply) reply).getRaw()));
String md5 = json.getString("md5");
if (slotMap == null || !slotMap.getMd5().equals(md5)) {
updateSlotMapFromMaster();
}
}
} catch (Exception e) {
logger.error("sendHeartbeatToLeader0 error", e);
}
Expand Down Expand Up @@ -453,4 +479,12 @@ private void updateSlotMap(ProxyClusterSlotMap oldSlotMap, ProxyClusterSlotMap n
}
}

private void sleep(long ms) {
try {
TimeUnit.MILLISECONDS.sleep(ms);
} catch (InterruptedException e) {
logger.error(e.toString(), e);
}
}

}

0 comments on commit 81c54d9

Please sign in to comment.