Skip to content

Commit

Permalink
feat(kv): redis cache/storage (#285)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Aug 12, 2024
1 parent 6a69d06 commit 4961999
Show file tree
Hide file tree
Showing 26 changed files with 96 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,10 @@ private Commanders initCommanders() {
KvGcExecutor gcExecutor = new KvGcExecutor(kvClient, keyDesign, kvConfig);
gcExecutor.start();

boolean metaCacheEnable = RedisKvConf.getBoolean(namespace, "kv.key.meta.cache.enable", true);
CacheConfig cacheConfig = new CacheConfig(namespace);

CacheConfig cacheConfig = new CacheConfig(namespace, metaCacheEnable);

RedisTemplate redisTemplate = initRedisTemplate();
RedisTemplate cacheRedisTemplate = initRedisTemplate("kv.redis.cache");
RedisTemplate storageRedisTemplate = initRedisTemplate("kv.redis.storage");

KeyMetaServer keyMetaServer = new DefaultKeyMetaServer(kvClient, keyDesign, gcExecutor, cacheConfig);

Expand All @@ -239,13 +238,12 @@ private Commanders initCommanders() {
WriteBuffer<RedisSet> setWriteBuffer = WriteBuffer.newWriteBuffer(namespace, "set");

CommanderConfig commanderConfig = new CommanderConfig(kvClient, keyDesign, cacheConfig, kvConfig,
keyMetaServer, redisTemplate, gcExecutor, hashWriteBuffer, zsetWriteBuffer, setWriteBuffer);
keyMetaServer, cacheRedisTemplate, storageRedisTemplate, gcExecutor, hashWriteBuffer, zsetWriteBuffer, setWriteBuffer);

return new Commanders(commanderConfig);
}

private RedisTemplate initRedisTemplate() {
String key = "kv.redis.cache";
private RedisTemplate initRedisTemplate(String key) {
String type = RedisKvConf.getString(namespace, key + ".config.type", "local");
if (type.equalsIgnoreCase("local")) {
String url = RedisKvConf.getString(namespace, key + ".url", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public abstract class Commander {
protected final CacheConfig cacheConfig;
protected final KvConfig kvConfig;
protected final KeyMetaServer keyMetaServer;
protected final RedisTemplate redisTemplate;
protected final RedisTemplate cacheRedisTemplate;
protected final RedisTemplate storageRedisTemplate;
protected final KvGcExecutor gcExecutor;
protected final MpscSlotHashExecutor asyncWriteExecutor = KvExecutors.getInstance().getAsyncWriteExecutor();
protected final WriteBuffer<RedisHash> hashWriteBuffer;
Expand All @@ -46,7 +47,8 @@ public Commander(CommanderConfig commanderConfig) {
this.cacheConfig = commanderConfig.getCacheConfig();
this.kvConfig = commanderConfig.getKvConfig();
this.keyMetaServer = commanderConfig.getKeyMetaServer();
this.redisTemplate = commanderConfig.getRedisTemplate();
this.cacheRedisTemplate = commanderConfig.getCacheRedisTemplate();
this.storageRedisTemplate = commanderConfig.getStorageRedisTemplate();
this.gcExecutor = commanderConfig.getGcExecutor();
this.hashWriteBuffer = commanderConfig.getHashWriteBuffer();
this.zsetWriteBuffer = commanderConfig.getZsetWriteBuffer();
Expand All @@ -60,27 +62,11 @@ public Commander(CommanderConfig commanderConfig) {
protected abstract Reply execute(Command command);

protected final Reply sync(CompletableFuture<Reply> future) {
return redisTemplate.sync(future, cacheConfig.cacheTimeoutMillis());
return cacheRedisTemplate.sync(future, cacheConfig.cacheTimeoutMillis());
}

protected final List<Reply> sync(List<CompletableFuture<Reply>> futures) {
return redisTemplate.sync(futures, cacheConfig.cacheTimeoutMillis());
}

protected final Reply checkCache(byte[] script, byte[] cacheKey, byte[][] args) {
//cache
Reply reply = sync(redisTemplate.sendLua(script, new byte[][]{cacheKey}, args));
if (reply instanceof ErrorReply) {
return reply;
}
if (reply instanceof MultiBulkReply) {
Reply[] replies = ((MultiBulkReply) reply).getReplies();
String type = Utils.bytesToString(((BulkReply) replies[0]).getRaw());
if (type.equalsIgnoreCase("1")) {//cache hit
return replies[1];
}
}
return null;
return cacheRedisTemplate.sync(futures, cacheConfig.cacheTimeoutMillis());
}

protected final void submitAsyncWriteTask(byte[] cacheKey, Result result, Runnable runnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,24 @@ public class CommanderConfig {
private final CacheConfig cacheConfig;
private final KvConfig kvConfig;
private final KeyMetaServer keyMetaServer;
private final RedisTemplate redisTemplate;
private final RedisTemplate cacheRedisTemplate;
private final RedisTemplate storageRedisTemplate;
private final KvGcExecutor gcExecutor;
private final WriteBuffer<RedisHash> hashWriteBuffer;
private final WriteBuffer<RedisZSet> zsetWriteBuffer;
private final WriteBuffer<RedisSet> setWriteBuffer;

public CommanderConfig(KVClient kvClient, KeyDesign keyDesign, CacheConfig cacheConfig,
KvConfig kvConfig, KeyMetaServer keyMetaServer,
RedisTemplate redisTemplate, KvGcExecutor gcExecutor,
RedisTemplate cacheRedisTemplate, RedisTemplate storageRedisTemplate, KvGcExecutor gcExecutor,
WriteBuffer<RedisHash> hashWriteBuffer, WriteBuffer<RedisZSet> zsetWriteBuffer, WriteBuffer<RedisSet> setWriteBuffer) {
this.kvClient = kvClient;
this.keyDesign = keyDesign;
this.cacheConfig = cacheConfig;
this.kvConfig = kvConfig;
this.keyMetaServer = keyMetaServer;
this.redisTemplate = redisTemplate;
this.cacheRedisTemplate = cacheRedisTemplate;
this.storageRedisTemplate = storageRedisTemplate;
this.gcExecutor = gcExecutor;
this.hashWriteBuffer = hashWriteBuffer;
this.zsetWriteBuffer = zsetWriteBuffer;
Expand All @@ -64,8 +66,12 @@ public KeyMetaServer getKeyMetaServer() {
return keyMetaServer;
}

public RedisTemplate getRedisTemplate() {
return redisTemplate;
public RedisTemplate getCacheRedisTemplate() {
return cacheRedisTemplate;
}

public RedisTemplate getStorageRedisTemplate() {
return storageRedisTemplate;
}

public KvGcExecutor getGcExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ protected Reply execute(Command command) {
EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
//redis
if (keyType == KeyType.zset && encodeVersion == EncodeVersion.version_1) {
redisTemplate.sendDel(cacheKey);
storageRedisTemplate.sendDel(cacheKey);
}
//local
if (keyType == KeyType.zset && cacheConfig.isZSetLocalCacheEnable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected Reply execute(Command command) {
EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
if (keyType == KeyType.zset && encodeVersion == EncodeVersion.version_1) {
byte[] cacheKey = keyDesign.cacheKey(keyMeta, key);
redisTemplate.sendPExpire(cacheKey, expireTime - System.currentTimeMillis() + 1000L);
storageRedisTemplate.sendPExpire(cacheKey, expireTime - System.currentTimeMillis() + 1000L);
}
return IntegerReply.REPLY_1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected Reply execute(Command command) {
EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
if (keyType == KeyType.zset && encodeVersion == EncodeVersion.version_1) {
byte[] cacheKey = keyDesign.cacheKey(keyMeta, key);
redisTemplate.sendPExpire(cacheKey, expireTime - System.currentTimeMillis() + 1000L);
storageRedisTemplate.sendPExpire(cacheKey, expireTime - System.currentTimeMillis() + 1000L);
}
return IntegerReply.REPLY_1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Reply execute(Command command) {
EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
if (keyType == KeyType.zset && encodeVersion == EncodeVersion.version_1) {
byte[] cacheKey = keyDesign.cacheKey(keyMeta, key);
redisTemplate.sendPExpire(cacheKey, expireTime - System.currentTimeMillis() + 1000L);
storageRedisTemplate.sendPExpire(cacheKey, expireTime - System.currentTimeMillis() + 1000L);
}
return IntegerReply.REPLY_1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected Reply execute(Command command) {
EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
if (keyType == KeyType.zset && encodeVersion == EncodeVersion.version_1) {
byte[] cacheKey = keyDesign.cacheKey(keyMeta, key);
redisTemplate.sendPExpire(cacheKey, expireTime - System.currentTimeMillis() + 1000L);
storageRedisTemplate.sendPExpire(cacheKey, expireTime - System.currentTimeMillis() + 1000L);
}
return IntegerReply.REPLY_1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected Reply execute(Command command) {
EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
if (keyType == KeyType.zset && (encodeVersion == EncodeVersion.version_1)) {
byte[] cacheKey = keyDesign.cacheKey(keyMeta, key);
redisTemplate.sendDel(cacheKey);
storageRedisTemplate.sendDel(cacheKey);
}
}
return IntegerReply.parse(ret);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,14 @@ private Reply zaddVersion1(KeyMeta keyMeta, byte[] key, byte[] cacheKey, int mem
}
}
if (!memberIndexCacheWriteCommands.isEmpty()) {
List<Reply> replyList = sync(redisTemplate.sendCommand(memberIndexCacheWriteCommands));
List<Reply> replyList = sync(cacheRedisTemplate.sendCommand(memberIndexCacheWriteCommands));
for (Reply reply : replyList) {
if (reply instanceof ErrorReply) {
return reply;
}
}
}
return sync(redisTemplate.sendCommand(new Command(rewriteCmd)));
return sync(storageRedisTemplate.sendCommand(new Command(rewriteCmd)));
}

private void updateKeyMeta(KeyMeta keyMeta, byte[] key, int add) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected Reply execute(Command command) {
EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
if (encodeVersion == EncodeVersion.version_1) {
KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return sync(redisTemplate.sendCommand(new Command(new byte[][]{RedisCommand.ZCARD.raw(), cacheKey})));
return sync(storageRedisTemplate.sendCommand(new Command(new byte[][]{RedisCommand.ZCARD.raw(), cacheKey})));
} else {
return IntegerReply.parse(BytesUtils.toInt(keyMeta.getExtra()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected Reply execute(Command command) {

if (encodeVersion == EncodeVersion.version_1) {
KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return sync(redisTemplate.sendCommand(new Command(new byte[][]{RedisCommand.ZCOUNT.raw(), cacheKey, objects[2], objects[3]})));
return sync(storageRedisTemplate.sendCommand(new Command(new byte[][]{RedisCommand.ZCOUNT.raw(), cacheKey, objects[2], objects[3]})));
}

return ErrorReply.INTERNAL_ERROR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected Reply execute(Command command) {
cmd[i] = index.getRef();
}
KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return sync(redisTemplate.sendCommand(new Command(cmd)));
return sync(storageRedisTemplate.sendCommand(new Command(cmd)));
}
return ErrorReply.INTERNAL_ERROR;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected final Reply zrangeVersion1(KeyMeta keyMeta, byte[] key, byte[] cacheKe
System.arraycopy(objects, 0, cmd, 0, objects.length);
cmd[1] = cacheKey;

Reply reply = sync(redisTemplate.sendCommand(new Command(cmd)));
Reply reply = sync(storageRedisTemplate.sendCommand(new Command(cmd)));
if (reply instanceof ErrorReply) {
return reply;
}
Expand Down Expand Up @@ -80,7 +80,7 @@ protected final Reply checkReplyWithIndex(KeyMeta keyMeta, byte[] key, MultiBulk
commandList.add(cmd1);
commandList.add(cmd2);
}
List<Reply> replyList = sync(redisTemplate.sendCommand(commandList));
List<Reply> replyList = sync(cacheRedisTemplate.sendCommand(commandList));
for (int i = 0; i < replyList.size(); i++) {
Reply subReply = replyList.get(i);
if (subReply instanceof ErrorReply) {
Expand Down Expand Up @@ -126,7 +126,7 @@ protected final Reply checkReplyWithIndex(KeyMeta keyMeta, byte[] key, MultiBulk
missingMemberMap.remove(bytesKey);
}
if (!buildCacheCommands.isEmpty()) {
List<Reply> replyList = sync(redisTemplate.sendCommand(buildCacheCommands));
List<Reply> replyList = sync(cacheRedisTemplate.sendCommand(buildCacheCommands));
for (Reply reply1 : replyList) {
if (reply1 instanceof ErrorReply) {
return reply1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected Reply execute(Command command) {
Index index = Index.fromRaw(cmd[2]);
cmd[2] = index.getRef();
KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return sync(redisTemplate.sendCommand(new Command(cmd)));
return sync(storageRedisTemplate.sendCommand(new Command(cmd)));
}

return ErrorReply.INTERNAL_ERROR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,17 @@ private Reply zremVersion1(KeyMeta keyMeta, byte[] key, byte[] cacheKey, Set<Byt
i++;
}

List<Command> commandList = new ArrayList<>(3);
List<Command> commandList = new ArrayList<>(2);
commandList.add(new Command(cmd));
commandList.add(new Command(new byte[][]{RedisCommand.ZCARD.raw(), cacheKey}));

List<CompletableFuture<Reply>> futures = storageRedisTemplate.sendCommand(commandList);

CompletableFuture<Reply> future = null;
if (indexCacheDeleteCmd.size() > 1) {
commandList.add(new Command(indexCacheDeleteCmd.toArray(new byte[0][0])));
future = cacheRedisTemplate.sendCommand(new Command(indexCacheDeleteCmd.toArray(new byte[0][0])));
}

List<CompletableFuture<Reply>> futures = redisTemplate.sendCommand(commandList);

if (result.isKvWriteDelayEnable()) {
submitAsyncWriteTask(cacheKey, result, () -> {
if (!deleteIndexKeys.isEmpty()) {
Expand All @@ -246,6 +248,11 @@ private Reply zremVersion1(KeyMeta keyMeta, byte[] key, byte[] cacheKey, Set<Byt
keyMetaServer.deleteKeyMeta(key);
}
}

if (future != null) {
sync(future);
}

return replyList.get(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ protected final Reply zremrangeVersion1(KeyMeta keyMeta, byte[] key, byte[] cach
zremCmd[i] = index.getRef();
i ++;
}
CompletableFuture<Reply> future1 = redisTemplate.sendCommand(new Command(zremCmd));
CompletableFuture<Reply> future1 = storageRedisTemplate.sendCommand(new Command(zremCmd));
CompletableFuture<Reply> future2 = null;
if (!delCacheKeys.isEmpty()) {
List<byte[]> delCmd = new ArrayList<>(delCacheKeys.size() + 1);
delCmd.add(RedisCommand.DEL.raw());
delCmd.addAll(delCacheKeys);
future2 = redisTemplate.sendCommand(new Command(delCmd.toArray(new byte[0][0])));
future2 = cacheRedisTemplate.sendCommand(new Command(delCmd.toArray(new byte[0][0])));
}

if (result.isKvWriteDelayEnable()) {
Expand Down Expand Up @@ -82,7 +82,7 @@ protected final Reply zremrangeVersion1(KeyMeta keyMeta, byte[] key, byte[] cach
return ErrorReply.INTERNAL_ERROR;
}
cmd[1] = cacheKey;
Reply reply = sync(redisTemplate.sendCommand(new Command(cmd)));
Reply reply = sync(cacheRedisTemplate.sendCommand(new Command(cmd)));
if (reply instanceof ErrorReply) {
return reply;
}
Expand Down Expand Up @@ -115,10 +115,12 @@ protected final Reply zremrangeVersion1(KeyMeta keyMeta, byte[] key, byte[] cach
List<Command> cmds = new ArrayList<>(2);
cmds.add(new Command(zremCmd));
cmds.add(new Command(new byte[][]{RedisCommand.ZCARD.raw(), cacheKey}));

CompletableFuture<Reply> future = null;
if (delCacheKeys.size() > 1) {
cmds.add(new Command(delCacheKeys.toArray(new byte[0][0])));
future = cacheRedisTemplate.sendCommand(new Command(delCacheKeys.toArray(new byte[0][0])));
}
List<CompletableFuture<Reply>> futures = redisTemplate.sendCommand(cmds);
List<CompletableFuture<Reply>> futures = cacheRedisTemplate.sendCommand(cmds);
if (!delStoreKeys.isEmpty()) {
kvClient.batchDelete(delStoreKeys.toArray(new byte[0][0]));
}
Expand All @@ -134,6 +136,9 @@ protected final Reply zremrangeVersion1(KeyMeta keyMeta, byte[] key, byte[] cach
keyMetaServer.deleteKeyMeta(key);
}
}
if (future != null) {
sync(future);
}
return IntegerReply.parse(replies.length);
}
return ErrorReply.INTERNAL_ERROR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private Reply zremrangeByLex(KeyMeta keyMeta, byte[] key, byte[] cacheKey, ZSetL
commands.add(new Command(deleteCmd.toArray(new byte[0][0])));
}

List<CompletableFuture<Reply>> futures = redisTemplate.sendCommand(commands);
List<CompletableFuture<Reply>> futures = cacheRedisTemplate.sendCommand(commands);

if (result.isKvWriteDelayEnable()) {
submitAsyncWriteTask(cacheKey, result, () -> kvClient.batchDelete(deleteStoreKeys.toArray(new byte[0][0])));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected Reply execute(Command command) {
Index index = Index.fromRaw(cmd[2]);
cmd[2] = index.getRef();
KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return sync(redisTemplate.sendCommand(new Command(cmd)));
return sync(storageRedisTemplate.sendCommand(new Command(cmd)));
}

return ErrorReply.INTERNAL_ERROR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected Reply execute(Command command) {
if (encodeVersion == EncodeVersion.version_1) {
Index index = Index.fromRaw(member);
KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return sync(redisTemplate.sendCommand(new Command(new byte[][]{RedisCommand.ZSCORE.raw(), cacheKey, index.getRef()})));
return sync(storageRedisTemplate.sendCommand(new Command(new byte[][]{RedisCommand.ZSCORE.raw(), cacheKey, index.getRef()})));
}

return ErrorReply.INTERNAL_ERROR;
Expand Down
Loading

0 comments on commit 4961999

Please sign in to comment.