Skip to content

Commit

Permalink
feat(kv): optimize kv local cache build logic (#278)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Aug 7, 2024
1 parent 2d618c1 commit 2bb5d4e
Show file tree
Hide file tree
Showing 41 changed files with 368 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private void rebuild() {
logger.info("kv hot key calculator cache build, namespace = {}, keyType = {}, capacity = {}", namespace, keyType, capacity);
}

int threshold = RedisKvConf.getInt(namespace, keyType + ".kv.hot.key.threshold", 10);
int threshold = RedisKvConf.getInt(namespace, keyType + ".kv.hot.key.threshold", 2);
int window = RedisKvConf.getInt(namespace, keyType + ".kv.hot.key.time.window", 5000);
if (this.threshold != threshold) {
logger.info("kv hot key calculator cache build, namespace = {}, keyType = {}, threshold = {}", namespace, keyType, threshold);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,32 @@ protected Reply execute(Command command) {
byte[] cacheKey = keyDesign.cacheKey(keyMeta, key);
KeyType keyType = keyMeta.getKeyType();
EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
if (keyType == KeyType.zset && (encodeVersion == EncodeVersion.version_1
|| encodeVersion == EncodeVersion.version_2 || encodeVersion == EncodeVersion.version_3)) {
storeRedisTemplate.sendDel(cacheKey);
//redis
if (keyType == KeyType.zset) {
if (encodeVersion == EncodeVersion.version_1 || encodeVersion == EncodeVersion.version_2) {
cacheRedisTemplate.sendDel(cacheKey);
}
if (encodeVersion == EncodeVersion.version_3) {
storeRedisTemplate.sendDel(cacheKey);
}
}
if (keyType == KeyType.hash && (encodeVersion == EncodeVersion.version_2 || encodeVersion == EncodeVersion.version_3)) {
cacheRedisTemplate.sendDel(cacheKey);
if (keyType == KeyType.hash) {
if (encodeVersion == EncodeVersion.version_2 || encodeVersion == EncodeVersion.version_3) {
cacheRedisTemplate.sendDel(cacheKey);
}
}
if (keyType == KeyType.set && (encodeVersion == EncodeVersion.version_2 || encodeVersion == EncodeVersion.version_3)) {
cacheRedisTemplate.sendDel(cacheKey);
}
if (keyType == KeyType.hash && cacheConfig.isHashLocalCacheEnable()) {
cacheConfig.getHashLRUCache().del(key, cacheKey);
if (keyType == KeyType.set) {
if (encodeVersion == EncodeVersion.version_2 || encodeVersion == EncodeVersion.version_3) {
cacheRedisTemplate.sendDel(cacheKey);
}
}
//local
if (keyType == KeyType.zset && cacheConfig.isZSetLocalCacheEnable()) {
cacheConfig.getZSetLRUCache().del(key, cacheKey);
}
if (keyType == KeyType.hash && cacheConfig.isHashLocalCacheEnable()) {
cacheConfig.getHashLRUCache().del(key, cacheKey);
}
if (keyType == KeyType.set && cacheConfig.isSetLocalCacheEnable()) {
cacheConfig.getSetLRUCache().del(key, cacheKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.netease.nim.camellia.redis.proxy.upstream.kv.buffer.WriteBufferValue;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.RedisHash;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.HashLRUCache;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.Commander;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.CommanderConfig;
import com.netease.nim.camellia.redis.proxy.reply.*;
import com.netease.nim.camellia.redis.proxy.upstream.kv.kv.KeyValue;
Expand All @@ -28,7 +27,7 @@
* <p>
* Created by caojiajun on 2024/4/11
*/
public class HDelCommander extends Commander {
public class HDelCommander extends Hash0Commander {

private static final byte[] script = ("local arg1 = redis.call('exists', KEYS[1]);\n" +
"if tonumber(arg1) == 1 then\n" +
Expand Down Expand Up @@ -95,6 +94,7 @@ protected Reply execute(Command command) {

if (cacheConfig.isHashLocalCacheEnable()) {
HashLRUCache hashLRUCache = cacheConfig.getHashLRUCache();

Map<BytesKey, byte[]> deleteMaps = hashLRUCache.hdel(key, cacheKey, fields);
if (deleteMaps != null) {
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
Expand All @@ -108,6 +108,23 @@ protected Reply execute(Command command) {
}
return IntegerReply.REPLY_0;
}
if (deleteMaps == null) {
boolean hotKey = hashLRUCache.isHotKey(key);
if (hotKey) {
RedisHash hash = loadLRUCache(keyMeta, key);
hashLRUCache.putAllForWrite(key, cacheKey, hash);
deleteMaps = hash.hdel(fields);
if (deleteMaps != null && delCount < 0) {
delCount = deleteMaps.size();
}
if (delCount == 0) {
if (encodeVersion == EncodeVersion.version_1 || encodeVersion == EncodeVersion.version_3) {
return IntegerReply.parse(fields.size());
}
return IntegerReply.REPLY_0;
}
}
}
if (deleteMaps != null && result == null) {
RedisHash hash = hashLRUCache.getForWrite(key, cacheKey);
if (hash != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,20 @@ protected Reply execute(Command command) {

if (cacheConfig.isHashLocalCacheEnable()) {
HashLRUCache hashLRUCache = cacheConfig.getHashLRUCache();

RedisHash hash = hashLRUCache.getForRead(key, cacheKey);
if (hash != null) {
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return IntegerReply.parse(hash.hexists(new BytesKey(field)));
}

boolean hotKey = hashLRUCache.isHotKey(key);

if (hotKey) {
hash = loadLRUCache(keyMeta, key);
hashLRUCache.putAllForRead(key, cacheKey, hash);
return IntegerReply.parse(hash.hexists(new BytesKey(field)));
}
}

EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.netease.nim.camellia.redis.proxy.upstream.kv.buffer.WriteBufferValue;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.RedisHash;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.HashLRUCache;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.Commander;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.CommanderConfig;
import com.netease.nim.camellia.redis.proxy.upstream.kv.kv.KeyValue;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.EncodeVersion;
Expand All @@ -26,7 +25,7 @@
* <p>
* Created by caojiajun on 2024/4/7
*/
public class HGetCommander extends Commander {
public class HGetCommander extends Hash0Commander {

public HGetCommander(CommanderConfig commanderConfig) {
super(commanderConfig);
Expand Down Expand Up @@ -85,11 +84,19 @@ protected Reply execute(Command command) {

if (cacheConfig.isHashLocalCacheEnable()) {
HashLRUCache hashLRUCache = cacheConfig.getHashLRUCache();

RedisHash hash = hashLRUCache.getForRead(key, cacheKey);
if (hash != null) {
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return new BulkReply(hash.hget(new BytesKey(field)));
}

boolean hotKey = hashLRUCache.isHotKey(key);
if (hotKey) {
hash = loadLRUCache(keyMeta, key);
hashLRUCache.putAllForRead(key, cacheKey, hash);
return new BulkReply(hash.hget(new BytesKey(field)));
}
}

EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
Expand Down Expand Up @@ -141,11 +148,4 @@ protected Reply execute(Command command) {
return new BulkReply(keyValue.getValue());
}

private byte[] hgetallCacheMillis() {
return Utils.stringToBytes(String.valueOf(cacheConfig.hgetallCacheMillis()));
}

private byte[] hgetCacheMillis() {
return Utils.stringToBytes(String.valueOf(cacheConfig.hgetCacheMillis()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.netease.nim.camellia.redis.proxy.reply.MultiBulkReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.kv.buffer.WriteBufferValue;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.HashLRUCache;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.RedisHash;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.CommanderConfig;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.EncodeVersion;
Expand Down Expand Up @@ -70,11 +71,18 @@ protected Reply execute(Command command) {
}

if (cacheConfig.isHashLocalCacheEnable()) {
RedisHash hash = cacheConfig.getHashLRUCache().getForRead(key, cacheKey);
HashLRUCache hashLRUCache = cacheConfig.getHashLRUCache();
RedisHash hash = hashLRUCache.getForRead(key, cacheKey);
if (hash != null) {
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return toReply(hash.hgetAll());
}
boolean hotKey = hashLRUCache.isHotKey(key);
if (hotKey) {
hash = loadLRUCache(keyMeta, key);
hashLRUCache.putAllForRead(key, cacheKey, hash);
return toReply(hash.hgetAll());
}
}

EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import com.netease.nim.camellia.redis.proxy.reply.IntegerReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.kv.buffer.WriteBufferValue;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.HashLRUCache;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.RedisHash;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.Commander;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.CommanderConfig;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.EncodeVersion;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.KeyMeta;
Expand All @@ -20,7 +20,7 @@
* <p>
* Created by caojiajun on 2024/4/11
*/
public class HLenCommander extends Commander {
public class HLenCommander extends Hash0Commander {

public HLenCommander(CommanderConfig commanderConfig) {
super(commanderConfig);
Expand Down Expand Up @@ -48,7 +48,10 @@ protected Reply execute(Command command) {
if (keyMeta.getKeyType() != KeyType.hash) {
return ErrorReply.WRONG_TYPE;
}
Reply reply = getSizeFromCache(keyMeta, key);

byte[] cacheKey = keyDesign.cacheKey(keyMeta, key);

Reply reply = getSizeFromCache(keyMeta, key, cacheKey);
if (reply != null) {
return reply;
}
Expand All @@ -61,7 +64,6 @@ protected Reply execute(Command command) {
long size = getSizeFromKv(keyMeta, key);
return IntegerReply.parse(size);
} else if (encodeVersion == EncodeVersion.version_3) {
byte[] cacheKey = keyDesign.cacheKey(keyMeta, key);
reply = sync(cacheRedisTemplate.sendCommand(new Command(new byte[][]{RedisCommand.HLEN.raw(), cacheKey})));
if (reply instanceof IntegerReply) {
Long size = ((IntegerReply) reply).getInteger();
Expand All @@ -78,20 +80,29 @@ protected Reply execute(Command command) {
}
}

private Reply getSizeFromCache(KeyMeta keyMeta, byte[] key) {
byte[] cacheKey = keyDesign.cacheKey(keyMeta, key);
private Reply getSizeFromCache(KeyMeta keyMeta, byte[] key, byte[] cacheKey) {
WriteBufferValue<RedisHash> writeBufferValue = hashWriteBuffer.get(cacheKey);
if (writeBufferValue != null) {
RedisHash hash = writeBufferValue.getValue();
KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw());
return IntegerReply.parse(hash.hlen());
}
if (cacheConfig.isHashLocalCacheEnable()) {
RedisHash hash = cacheConfig.getHashLRUCache().getForRead(key, cacheKey);
HashLRUCache hashLRUCache = cacheConfig.getHashLRUCache();
RedisHash hash = hashLRUCache.getForRead(key, cacheKey);
if (hash != null) {
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return IntegerReply.parse(hash.hlen());
}
EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
if (encodeVersion == EncodeVersion.version_1) {
boolean hotKey = hashLRUCache.isHotKey(key);
if (hotKey) {
hash = loadLRUCache(keyMeta, key);
hashLRUCache.putAllForRead(key, cacheKey, hash);
return IntegerReply.parse(hash.hlen());
}
}
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.netease.nim.camellia.redis.proxy.reply.MultiBulkReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.kv.buffer.WriteBufferValue;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.HashLRUCache;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.RedisHash;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.Commander;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.CommanderConfig;
import com.netease.nim.camellia.redis.proxy.upstream.kv.kv.KeyValue;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.EncodeVersion;
Expand All @@ -26,7 +26,7 @@
* <p>
* Created by caojiajun on 2024/4/24
*/
public class HMGetCommander extends Commander {
public class HMGetCommander extends Hash0Commander {

private static final byte[] script1 = ("local arg = redis.call('exists', KEYS[1]);\n" +
"if tonumber(arg) == 1 then\n" +
Expand Down Expand Up @@ -86,11 +86,20 @@ protected Reply execute(Command command) {
}

if (cacheConfig.isHashLocalCacheEnable()) {
RedisHash hash = cacheConfig.getHashLRUCache().getForRead(key, cacheKey);
HashLRUCache hashLRUCache = cacheConfig.getHashLRUCache();

RedisHash hash = hashLRUCache.getForRead(key, cacheKey);
if (hash != null) {
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return toReply2(fields, hash.hgetAll());
}

boolean hotKey = hashLRUCache.isHotKey(key);
if (hotKey) {
hash = loadLRUCache(keyMeta, key);
hashLRUCache.putAllForRead(key, cacheKey, hash);
return toReply2(fields, hash.hgetAll());
}
}

EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
Expand Down Expand Up @@ -235,7 +244,4 @@ private Reply toReply2(byte[][] fields, Map<BytesKey, byte[]> map) {
return new MultiBulkReply(replies);
}

private byte[] hgetCacheMillis() {
return Utils.stringToBytes(String.valueOf(cacheConfig.hgetCacheMillis()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,13 @@ protected Reply execute(Command command) {
if (cacheConfig.isHashLocalCacheEnable()) {
HashLRUCache hashLRUCache = cacheConfig.getHashLRUCache();

boolean hotKey = hashLRUCache.isHotKey(key);

Map<BytesKey, byte[]> existsMap = null;
if (first) {
hashLRUCache.putAllForWrite(key, cacheKey, new RedisHash(new HashMap<>(fieldMap)));
} else {
existsMap = hashLRUCache.hset(key, cacheKey, fieldMap);
if (existsMap == null) {
boolean hotKey = hashLRUCache.isHotKey(key);
if (hotKey) {
Map<BytesKey, byte[]> map = hgetallFromKv(keyMeta, key);
hashLRUCache.putAllForWrite(key, cacheKey, new RedisHash(map));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.netease.nim.camellia.tools.utils.BytesKey;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
Expand Down Expand Up @@ -123,11 +124,10 @@ protected Reply execute(Command command) {
if (cacheConfig.isHashLocalCacheEnable()) {
HashLRUCache hashLRUCache = cacheConfig.getHashLRUCache();

boolean hotKey = hashLRUCache.isHotKey(key);

boolean loadFromKv = false;
RedisHash hash = hashLRUCache.getForWrite(key, cacheKey);
if (hash == null) {
boolean hotKey = hashLRUCache.isHotKey(key);
if (hotKey) {
Map<BytesKey, byte[]> map = hgetallFromKv(keyMeta, key);
loadFromKv = true;
Expand All @@ -143,7 +143,9 @@ protected Reply execute(Command command) {
}
return IntegerReply.REPLY_0;
} else {
hash.hset(filedKey, value);
Map<BytesKey, byte[]> fieldMap = new HashMap<>();
fieldMap.put(filedKey, value);
hashLRUCache.hset(key, cacheKey, fieldMap);
cacheCheck = cache_hit_not_exists;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ protected Reply execute(Command command) {
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return IntegerReply.parse(hash.hstrlen(new BytesKey(field)));
}
boolean hotKey = hashLRUCache.isHotKey(key);
if (hotKey) {
hash = loadLRUCache(keyMeta, key);
hashLRUCache.putAllForRead(key, cacheKey, hash);
return IntegerReply.parse(hash.hstrlen(new BytesKey(field)));
}
}

EncodeVersion encodeVersion = keyMeta.getEncodeVersion();
Expand Down
Loading

0 comments on commit 2bb5d4e

Please sign in to comment.