From 2bc3720452d5b8278ff3e68c01b5bcc08eee463c Mon Sep 17 00:00:00 2001 From: caojiajun Date: Fri, 9 Aug 2024 13:24:43 +0800 Subject: [PATCH] chore(kv): optimize set commands monitor (#271) --- .../kv/command/set/SAddCommander.java | 31 +++++++++++++++---- .../kv/command/set/SCardCommander.java | 3 ++ .../kv/command/set/SIsMemberCommander.java | 5 +++ .../kv/command/set/SMIsMemberCommander.java | 3 ++ .../kv/command/set/SMembersCommander.java | 1 + .../kv/command/set/SPopCommander.java | 17 ++++++++++ .../kv/command/set/SRandMemberCommander.java | 4 ++- .../kv/command/set/SRemCommander.java | 10 ++++++ 8 files changed, 67 insertions(+), 7 deletions(-) diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SAddCommander.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SAddCommander.java index 92981ff60..1404a8174 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SAddCommander.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SAddCommander.java @@ -92,9 +92,13 @@ protected Reply execute(Command command) { RedisSet set = null; Set existsMemberSet = null; + KvCacheMonitor.Type type = null; + if (first) { set = new RedisSet(new HashSet<>(memberSet)); setWriteBuffer.put(cacheKey, set); + // + type = KvCacheMonitor.Type.write_buffer; KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw()); } else { WriteBufferValue bufferValue = setWriteBuffer.get(cacheKey); @@ -102,6 +106,8 @@ protected Reply execute(Command command) { set = bufferValue.getValue(); existsMemberSet = set.sadd(memberSet); result = setWriteBuffer.put(cacheKey, set); + // + type = KvCacheMonitor.Type.write_buffer; KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw()); } } @@ -124,6 +130,7 @@ protected Reply execute(Command command) { existsSet = set.sadd(memberSet); } } else { + type = KvCacheMonitor.Type.local_cache; KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw()); } if (existsMemberSet == null && existsSet != null) { @@ -157,11 +164,11 @@ protected Reply execute(Command command) { EncodeVersion encodeVersion = keyMeta.getEncodeVersion(); if (encodeVersion == EncodeVersion.version_0) { - return saddVersion0(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result); + return saddVersion0(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result, type); } if (encodeVersion == EncodeVersion.version_1) { - return saddVersion1(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result); + return saddVersion1(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result, type); } if (encodeVersion == EncodeVersion.version_2) { @@ -169,19 +176,25 @@ protected Reply execute(Command command) { int ret = checkAndUpdateCache(cacheKey, memberSet, memberSize); if (existsMemberSize < 0 && ret >= 0) { existsMemberSize = ret; + // + type = KvCacheMonitor.Type.redis_cache; + KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw()); } } - return saddVersion0(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result); + return saddVersion0(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result, type); } if (encodeVersion == EncodeVersion.version_3) { if (!first) { int ret = checkAndUpdateCache(cacheKey, memberSet, memberSize); if (existsMemberSize < 0 && ret >= 0) { + // + type = KvCacheMonitor.Type.redis_cache; + KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw()); existsMemberSize = ret; } } - return saddVersion1(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result); + return saddVersion1(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result, type); } return ErrorReply.INTERNAL_ERROR; @@ -217,7 +230,10 @@ private int checkAndUpdateCache(byte[] cacheKey, Set memberSet, int me } private Reply saddVersion0(KeyMeta keyMeta, byte[] key, byte[] cacheKey, boolean first, int memberSize, - int existsMemberSize, Set memberSet, Result result) { + int existsMemberSize, Set memberSet, Result result, KvCacheMonitor.Type type) { + if (type == null) { + KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw()); + } if (first) { writeMembers(keyMeta, key, cacheKey, memberSet, result); return IntegerReply.parse(memberSize); @@ -257,7 +273,10 @@ private Reply saddVersion0(KeyMeta keyMeta, byte[] key, byte[] cacheKey, boolean } private Reply saddVersion1(KeyMeta keyMeta, byte[] key, byte[] cacheKey, boolean first, int memberSize, - int existsMemberSize, Set memberSet, Result result) { + int existsMemberSize, Set memberSet, Result result, KvCacheMonitor.Type type) { + if (type == null) { + KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw()); + } if (first) { writeMembers(keyMeta, key, cacheKey, memberSet, result); } else { diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SCardCommander.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SCardCommander.java index 923c95f91..666a1c266 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SCardCommander.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SCardCommander.java @@ -54,6 +54,7 @@ protected Reply execute(Command command) { WriteBufferValue bufferValue = setWriteBuffer.get(cacheKey); if (bufferValue != null) { RedisSet set = bufferValue.getValue(); + // KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw()); return IntegerReply.parse(set.scard()); } @@ -72,6 +73,8 @@ protected Reply execute(Command command) { if (hotKey) { set = loadLRUCache(keyMeta, key); setLRUCache.putAllForRead(key, cacheKey, set); + // + KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw()); return IntegerReply.parse(set.scard()); } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SIsMemberCommander.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SIsMemberCommander.java index 9234f99d1..58b8cd6a1 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SIsMemberCommander.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SIsMemberCommander.java @@ -84,6 +84,8 @@ protected Reply execute(Command command) { if (hotKey) { set = loadLRUCache(keyMeta, key); setLRUCache.putAllForRead(key, cacheKey, set); + // + KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw()); boolean sismeber = set.sismeber(new BytesKey(member)); return IntegerReply.parse(sismeber ? 1 : 0); } @@ -98,6 +100,7 @@ protected Reply execute(Command command) { byte[] raw = ((BulkReply) replies[0]).getRaw(); if (Utils.bytesToString(raw).equalsIgnoreCase("1")) { if (replies[1] instanceof IntegerReply) { + KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw()); return replies[1]; } } @@ -105,6 +108,8 @@ protected Reply execute(Command command) { } } + KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw()); + byte[] subKey = keyDesign.setMemberSubKey(keyMeta, key, member); KeyValue keyValue = kvClient.get(subKey); if (keyValue == null || keyValue.getKey() == null) { diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SMIsMemberCommander.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SMIsMemberCommander.java index f93564bb9..93b0973bd 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SMIsMemberCommander.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SMIsMemberCommander.java @@ -89,6 +89,8 @@ protected Reply execute(Command command) { if (hotKey) { set = loadLRUCache(keyMeta, key); setLRUCache.putAllForRead(key, cacheKey, set); + // + KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw()); Map smismember = set.smismember(members); return toReply(smismember, members); } @@ -120,6 +122,7 @@ protected Reply execute(Command command) { } KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw()); + Map smismember = smismemberFromKv(keyMeta, key, members); return toReply(smismember, members); } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SMembersCommander.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SMembersCommander.java index 0edc0dc84..062dd437b 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SMembersCommander.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SMembersCommander.java @@ -100,6 +100,7 @@ protected Reply execute(Command command) { } KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw()); + Set set = smembersFromKv(keyMeta, key); if (cacheConfig.isSetLocalCacheEnable()) { cacheConfig.getSetLRUCache().putAllForRead(key, cacheKey, new RedisSet(set)); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SPopCommander.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SPopCommander.java index 40a08dd59..51e480c9d 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SPopCommander.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SPopCommander.java @@ -75,10 +75,13 @@ protected Reply execute(Command command) { Set spop = null; Result result = null; + KvCacheMonitor.Type type = null; WriteBufferValue bufferValue = setWriteBuffer.get(cacheKey); if (bufferValue != null) { RedisSet set = bufferValue.getValue(); + // + type = KvCacheMonitor.Type.write_buffer; KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw()); spop = set.spop(count); result = setWriteBuffer.put(cacheKey, set); @@ -88,6 +91,10 @@ protected Reply execute(Command command) { if (spop == null) { spop = setLRUCache.spop(key, cacheKey, count); + if (spop != null) { + type = KvCacheMonitor.Type.local_cache; + KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw()); + } } else { setLRUCache.srem(key, cacheKey, spop); } @@ -118,6 +125,12 @@ protected Reply execute(Command command) { if (spop == null) { if (encodeVersion == EncodeVersion.version_2 || encodeVersion == EncodeVersion.version_3) { spop = spopFromCache(cacheKey, count); + if (spop != null) { + if (type != null) { + type = KvCacheMonitor.Type.redis_cache; + KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw()); + } + } } } else { if (encodeVersion == EncodeVersion.version_2 || encodeVersion == EncodeVersion.version_3) { @@ -128,6 +141,10 @@ protected Reply execute(Command command) { } } + if (type == null) { + KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw()); + } + if (spop == null) { spop = srandmemberFromKv(keyMeta, key, count); } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SRandMemberCommander.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SRandMemberCommander.java index 274f6720c..4621640ae 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SRandMemberCommander.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SRandMemberCommander.java @@ -102,7 +102,8 @@ protected Reply execute(Command command) { if (hotKey) { set = loadLRUCache(keyMeta, key); setLRUCache.putAllForRead(key, cacheKey, set); - + // + KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw()); Set srandmember = set.srandmember(count); return toReply(srandmember, batch); } @@ -135,6 +136,7 @@ protected Reply execute(Command command) { } KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw()); + Set set = srandmemberFromKv(keyMeta, key, count); return toReply(set, batch); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SRemCommander.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SRemCommander.java index 8577ff070..5aa0ee4ae 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SRemCommander.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/kv/command/set/SRemCommander.java @@ -62,10 +62,12 @@ protected Reply execute(Command command) { Set removedMembers = null; Result result = null; + KvCacheMonitor.Type type = null; WriteBufferValue bufferValue = setWriteBuffer.get(cacheKey); if (bufferValue != null) { RedisSet set = bufferValue.getValue(); + type = KvCacheMonitor.Type.write_buffer; KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw()); removedMembers = set.srem(members); result = setWriteBuffer.put(cacheKey, set); @@ -76,6 +78,10 @@ protected Reply execute(Command command) { if (removedMembers == null) { removedMembers = setLRUCache.srem(key, cacheKey, members); + if (removedMembers != null) { + type = KvCacheMonitor.Type.local_cache; + KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw()); + } } else { setLRUCache.srem(key, cacheKey, members); } @@ -118,6 +124,10 @@ protected Reply execute(Command command) { return pair.getFirst(); } if (pair.getSecond() != null && pair.getSecond() >= 0) { + if (type == null) { + type = KvCacheMonitor.Type.redis_cache; + KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw()); + } removeSize = pair.getSecond(); } }