Skip to content

Commit

Permalink
chore(kv): optimize set commands monitor (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Aug 9, 2024
1 parent 3139894 commit 2bc3720
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,22 @@ protected Reply execute(Command command) {
RedisSet set = null;
Set<BytesKey> existsMemberSet = null;

KvCacheMonitor.Type type = null;

if (first) {
set = new RedisSet(new HashSet<>(memberSet));
setWriteBuffer.put(cacheKey, set);
//
type = KvCacheMonitor.Type.write_buffer;
KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw());
} else {
WriteBufferValue<RedisSet> bufferValue = setWriteBuffer.get(cacheKey);
if (bufferValue != null) {
set = bufferValue.getValue();
existsMemberSet = set.sadd(memberSet);
result = setWriteBuffer.put(cacheKey, set);
//
type = KvCacheMonitor.Type.write_buffer;
KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw());
}
}
Expand All @@ -124,6 +130,7 @@ protected Reply execute(Command command) {
existsSet = set.sadd(memberSet);
}
} else {
type = KvCacheMonitor.Type.local_cache;
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
}
if (existsMemberSet == null && existsSet != null) {
Expand Down Expand Up @@ -157,31 +164,37 @@ protected Reply execute(Command command) {
EncodeVersion encodeVersion = keyMeta.getEncodeVersion();

if (encodeVersion == EncodeVersion.version_0) {
return saddVersion0(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result);
return saddVersion0(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result, type);
}

if (encodeVersion == EncodeVersion.version_1) {
return saddVersion1(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result);
return saddVersion1(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result, type);
}

if (encodeVersion == EncodeVersion.version_2) {
if (!first) {
int ret = checkAndUpdateCache(cacheKey, memberSet, memberSize);
if (existsMemberSize < 0 && ret >= 0) {
existsMemberSize = ret;
//
type = KvCacheMonitor.Type.redis_cache;
KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw());
}
}
return saddVersion0(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result);
return saddVersion0(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result, type);
}

if (encodeVersion == EncodeVersion.version_3) {
if (!first) {
int ret = checkAndUpdateCache(cacheKey, memberSet, memberSize);
if (existsMemberSize < 0 && ret >= 0) {
//
type = KvCacheMonitor.Type.redis_cache;
KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw());
existsMemberSize = ret;
}
}
return saddVersion1(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result);
return saddVersion1(keyMeta, key, cacheKey, first, memberSize, existsMemberSize, memberSet, result, type);
}

return ErrorReply.INTERNAL_ERROR;
Expand Down Expand Up @@ -217,7 +230,10 @@ private int checkAndUpdateCache(byte[] cacheKey, Set<BytesKey> memberSet, int me
}

private Reply saddVersion0(KeyMeta keyMeta, byte[] key, byte[] cacheKey, boolean first, int memberSize,
int existsMemberSize, Set<BytesKey> memberSet, Result result) {
int existsMemberSize, Set<BytesKey> memberSet, Result result, KvCacheMonitor.Type type) {
if (type == null) {
KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw());
}
if (first) {
writeMembers(keyMeta, key, cacheKey, memberSet, result);
return IntegerReply.parse(memberSize);
Expand Down Expand Up @@ -257,7 +273,10 @@ private Reply saddVersion0(KeyMeta keyMeta, byte[] key, byte[] cacheKey, boolean
}

private Reply saddVersion1(KeyMeta keyMeta, byte[] key, byte[] cacheKey, boolean first, int memberSize,
int existsMemberSize, Set<BytesKey> memberSet, Result result) {
int existsMemberSize, Set<BytesKey> memberSet, Result result, KvCacheMonitor.Type type) {
if (type == null) {
KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw());
}
if (first) {
writeMembers(keyMeta, key, cacheKey, memberSet, result);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ protected Reply execute(Command command) {
WriteBufferValue<RedisSet> bufferValue = setWriteBuffer.get(cacheKey);
if (bufferValue != null) {
RedisSet set = bufferValue.getValue();
//
KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw());
return IntegerReply.parse(set.scard());
}
Expand All @@ -72,6 +73,8 @@ protected Reply execute(Command command) {
if (hotKey) {
set = loadLRUCache(keyMeta, key);
setLRUCache.putAllForRead(key, cacheKey, set);
//
KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw());
return IntegerReply.parse(set.scard());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ protected Reply execute(Command command) {
if (hotKey) {
set = loadLRUCache(keyMeta, key);
setLRUCache.putAllForRead(key, cacheKey, set);
//
KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw());
boolean sismeber = set.sismeber(new BytesKey(member));
return IntegerReply.parse(sismeber ? 1 : 0);
}
Expand All @@ -98,13 +100,16 @@ protected Reply execute(Command command) {
byte[] raw = ((BulkReply) replies[0]).getRaw();
if (Utils.bytesToString(raw).equalsIgnoreCase("1")) {
if (replies[1] instanceof IntegerReply) {
KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw());
return replies[1];
}
}
}
}
}

KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw());

byte[] subKey = keyDesign.setMemberSubKey(keyMeta, key, member);
KeyValue keyValue = kvClient.get(subKey);
if (keyValue == null || keyValue.getKey() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ protected Reply execute(Command command) {
if (hotKey) {
set = loadLRUCache(keyMeta, key);
setLRUCache.putAllForRead(key, cacheKey, set);
//
KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw());
Map<BytesKey, Boolean> smismember = set.smismember(members);
return toReply(smismember, members);
}
Expand Down Expand Up @@ -120,6 +122,7 @@ protected Reply execute(Command command) {
}

KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw());

Map<BytesKey, Boolean> smismember = smismemberFromKv(keyMeta, key, members);
return toReply(smismember, members);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ protected Reply execute(Command command) {
}

KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw());

Set<BytesKey> set = smembersFromKv(keyMeta, key);
if (cacheConfig.isSetLocalCacheEnable()) {
cacheConfig.getSetLRUCache().putAllForRead(key, cacheKey, new RedisSet(set));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,13 @@ protected Reply execute(Command command) {

Set<BytesKey> spop = null;
Result result = null;
KvCacheMonitor.Type type = null;

WriteBufferValue<RedisSet> bufferValue = setWriteBuffer.get(cacheKey);
if (bufferValue != null) {
RedisSet set = bufferValue.getValue();
//
type = KvCacheMonitor.Type.write_buffer;
KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw());
spop = set.spop(count);
result = setWriteBuffer.put(cacheKey, set);
Expand All @@ -88,6 +91,10 @@ protected Reply execute(Command command) {

if (spop == null) {
spop = setLRUCache.spop(key, cacheKey, count);
if (spop != null) {
type = KvCacheMonitor.Type.local_cache;
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
}
} else {
setLRUCache.srem(key, cacheKey, spop);
}
Expand Down Expand Up @@ -118,6 +125,12 @@ protected Reply execute(Command command) {
if (spop == null) {
if (encodeVersion == EncodeVersion.version_2 || encodeVersion == EncodeVersion.version_3) {
spop = spopFromCache(cacheKey, count);
if (spop != null) {
if (type != null) {
type = KvCacheMonitor.Type.redis_cache;
KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw());
}
}
}
} else {
if (encodeVersion == EncodeVersion.version_2 || encodeVersion == EncodeVersion.version_3) {
Expand All @@ -128,6 +141,10 @@ protected Reply execute(Command command) {
}
}

if (type == null) {
KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw());
}

if (spop == null) {
spop = srandmemberFromKv(keyMeta, key, count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ protected Reply execute(Command command) {
if (hotKey) {
set = loadLRUCache(keyMeta, key);
setLRUCache.putAllForRead(key, cacheKey, set);

//
KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw());
Set<BytesKey> srandmember = set.srandmember(count);
return toReply(srandmember, batch);
}
Expand Down Expand Up @@ -135,6 +136,7 @@ protected Reply execute(Command command) {
}

KvCacheMonitor.kvStore(cacheConfig.getNamespace(), redisCommand().strRaw());

Set<BytesKey> set = srandmemberFromKv(keyMeta, key, count);

return toReply(set, batch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ protected Reply execute(Command command) {

Set<BytesKey> removedMembers = null;
Result result = null;
KvCacheMonitor.Type type = null;

WriteBufferValue<RedisSet> bufferValue = setWriteBuffer.get(cacheKey);
if (bufferValue != null) {
RedisSet set = bufferValue.getValue();
type = KvCacheMonitor.Type.write_buffer;
KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw());
removedMembers = set.srem(members);
result = setWriteBuffer.put(cacheKey, set);
Expand All @@ -76,6 +78,10 @@ protected Reply execute(Command command) {

if (removedMembers == null) {
removedMembers = setLRUCache.srem(key, cacheKey, members);
if (removedMembers != null) {
type = KvCacheMonitor.Type.local_cache;
KvCacheMonitor.localCache(cacheConfig.getNamespace(), redisCommand().strRaw());
}
} else {
setLRUCache.srem(key, cacheKey, members);
}
Expand Down Expand Up @@ -118,6 +124,10 @@ protected Reply execute(Command command) {
return pair.getFirst();
}
if (pair.getSecond() != null && pair.getSecond() >= 0) {
if (type == null) {
type = KvCacheMonitor.Type.redis_cache;
KvCacheMonitor.redisCache(cacheConfig.getNamespace(), redisCommand().strRaw());
}
removeSize = pair.getSecond();
}
}
Expand Down

0 comments on commit 2bc3720

Please sign in to comment.