Skip to content

Commit

Permalink
feat(kv): optimize spop/srem/hdel command (#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Aug 13, 2024
1 parent 8eb3654 commit 2b466e6
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public int scard() {
return set.size();
}

public boolean isEmpty() {
return set.isEmpty();
}

public boolean sismeber(BytesKey bytesKey) {
return set.contains(bytesKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.HashLRUCache;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.CommanderConfig;
import com.netease.nim.camellia.redis.proxy.reply.*;
import com.netease.nim.camellia.redis.proxy.upstream.kv.domain.DeleteType;
import com.netease.nim.camellia.redis.proxy.upstream.kv.kv.KeyValue;
import com.netease.nim.camellia.redis.proxy.upstream.kv.kv.Sort;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.EncodeVersion;
Expand Down Expand Up @@ -69,7 +70,8 @@ protected Reply execute(Command command) {
int delCount = -1;

Result result = null;
boolean deleteAll = false;
DeleteType deleteType = DeleteType.unknown;

WriteBufferValue<RedisHash> writeBufferValue = hashWriteBuffer.get(cacheKey);
if (writeBufferValue != null) {
type = KvCacheMonitor.Type.write_buffer;
Expand All @@ -84,7 +86,11 @@ protected Reply execute(Command command) {
return IntegerReply.REPLY_0;
}
result = hashWriteBuffer.put(cacheKey, hash);
deleteAll = hash.isEmpty();
if (hash.isEmpty()) {
deleteType = DeleteType.delete_all;
} else {
deleteType = DeleteType.delete_someone;
}
}

if (cacheConfig.isHashLocalCacheEnable()) {
Expand Down Expand Up @@ -130,7 +136,9 @@ protected Reply execute(Command command) {
if (hash != null) {
result = hashWriteBuffer.put(cacheKey, hash.duplicate());
if (hash.isEmpty()) {
deleteAll = true;
deleteType = DeleteType.delete_all;
} else {
deleteType = DeleteType.delete_someone;
}
}
}
Expand All @@ -151,28 +159,41 @@ protected Reply execute(Command command) {
subKeys[i] = keyDesign.hashFieldSubKey(keyMeta, key, field.getKey());
i++;
}
if (encodeVersion == EncodeVersion.version_0) {
if (delCount < 0) {

if (delCount < 0) {
if (encodeVersion == EncodeVersion.version_0) {
boolean[] exists = kvClient.exists(subKeys);
delCount = Utils.count(exists);
}
if (delCount > 0) {
int size = BytesUtils.toInt(keyMeta.getExtra()) - delCount;
if (size <= 0 || deleteAll) {
keyMetaServer.deleteKeyMeta(key);
} else {
byte[] extra = BytesUtils.toBytes(size);
keyMeta = new KeyMeta(keyMeta.getEncodeVersion(), keyMeta.getKeyType(), keyMeta.getKeyVersion(), keyMeta.getExpireTime(), extra);
keyMetaServer.createOrUpdateKeyMeta(key, keyMeta);
}

if (encodeVersion == EncodeVersion.version_0) {
batchDeleteSubKeys(key, keyMeta, cacheKey, result, subKeys, false);
} else {
boolean checkHLen = deleteType == DeleteType.unknown;
batchDeleteSubKeys(key, keyMeta, cacheKey, result, subKeys, checkHLen);
}

if (deleteType == DeleteType.delete_all) {
keyMetaServer.deleteKeyMeta(key);
} else {
if (encodeVersion == EncodeVersion.version_0) {
if (delCount > 0) {
int size = BytesUtils.toInt(keyMeta.getExtra()) - delCount;
if (size <= 0) {
keyMetaServer.deleteKeyMeta(key);
} else {
byte[] extra = BytesUtils.toBytes(size);
keyMeta = new KeyMeta(keyMeta.getEncodeVersion(), keyMeta.getKeyType(), keyMeta.getKeyVersion(), keyMeta.getExpireTime(), extra);
keyMetaServer.createOrUpdateKeyMeta(key, keyMeta);
}
}
}
batchDeleteSubKeys(key, keyMeta, cacheKey, result, subKeys, false);
}

if (encodeVersion == EncodeVersion.version_0) {
return IntegerReply.parse(delCount);
} else {
if (deleteAll) {
keyMetaServer.deleteKeyMeta(key);
}
batchDeleteSubKeys(key, keyMeta, cacheKey, result, subKeys, !deleteAll);
return IntegerReply.parse(fieldSize);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.RedisSet;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.SetLRUCache;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.CommanderConfig;
import com.netease.nim.camellia.redis.proxy.upstream.kv.domain.DeleteType;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.EncodeVersion;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.KeyMeta;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.KeyType;
Expand Down Expand Up @@ -68,6 +69,7 @@ protected Reply execute(Command command) {
Set<BytesKey> spop = null;
Result result = null;
KvCacheMonitor.Type type = null;
DeleteType deleteType = DeleteType.unknown;

WriteBufferValue<RedisSet> bufferValue = setWriteBuffer.get(cacheKey);
if (bufferValue != null) {
Expand All @@ -77,6 +79,11 @@ protected Reply execute(Command command) {
KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw());
spop = set.spop(count);
result = setWriteBuffer.put(cacheKey, set);
if (set.isEmpty()) {
deleteType = DeleteType.delete_all;
} else {
deleteType = DeleteType.delete_someone;
}
}
if (cacheConfig.isSetLocalCacheEnable()) {
SetLRUCache setLRUCache = cacheConfig.getSetLRUCache();
Expand Down Expand Up @@ -108,6 +115,11 @@ protected Reply execute(Command command) {
RedisSet set = setLRUCache.getForWrite(key, cacheKey);
if (set != null) {
result = setWriteBuffer.put(cacheKey, new RedisSet(new HashSet<>(set.smembers())));
if (set.isEmpty()) {
deleteType = DeleteType.delete_all;
} else {
deleteType = DeleteType.delete_someone;
}
}
}
}
Expand All @@ -126,10 +138,19 @@ protected Reply execute(Command command) {
spop = srandmemberFromKv(keyMeta, key, count);
}

removeMembers(keyMeta, key, cacheKey, spop, result);

if (encodeVersion == EncodeVersion.version_0) {
updateKeyMeta(keyMeta, key, spop.size() * -1);
removeMembers(keyMeta, key, cacheKey, spop, result, false);
} else {
boolean checkSCard = deleteType == DeleteType.unknown;
removeMembers(keyMeta, key, cacheKey, spop, result, checkSCard);
}

if (deleteType == DeleteType.delete_all) {
keyMetaServer.deleteKeyMeta(key);
} else {
if (encodeVersion == EncodeVersion.version_0) {
updateKeyMeta(keyMeta, key, spop.size() * -1);
}
}

return toReply(spop, batch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.RedisSet;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.SetLRUCache;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.CommanderConfig;
import com.netease.nim.camellia.redis.proxy.upstream.kv.domain.DeleteType;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.EncodeVersion;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.KeyMeta;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.KeyType;
Expand Down Expand Up @@ -62,6 +63,7 @@ protected Reply execute(Command command) {
Set<BytesKey> removedMembers = null;
Result result = null;
KvCacheMonitor.Type type = null;
DeleteType deleteType = DeleteType.unknown;

WriteBufferValue<RedisSet> bufferValue = setWriteBuffer.get(cacheKey);
if (bufferValue != null) {
Expand All @@ -70,6 +72,11 @@ protected Reply execute(Command command) {
KvCacheMonitor.writeBuffer(cacheConfig.getNamespace(), redisCommand().strRaw());
removedMembers = set.srem(members);
result = setWriteBuffer.put(cacheKey, set);
if (set.isEmpty()) {
deleteType = DeleteType.delete_all;
} else {
deleteType = DeleteType.delete_someone;
}
}

if (cacheConfig.isSetLocalCacheEnable()) {
Expand Down Expand Up @@ -102,6 +109,11 @@ protected Reply execute(Command command) {
RedisSet set = setLRUCache.getForWrite(key, cacheKey);
if (set != null) {
result = setWriteBuffer.put(cacheKey, new RedisSet(new HashSet<>(set.smembers())));
if (set.isEmpty()) {
deleteType = DeleteType.delete_all;
} else {
deleteType = DeleteType.delete_someone;
}
}
}
}
Expand Down Expand Up @@ -137,15 +149,26 @@ protected Reply execute(Command command) {
}
}

removeMembers(keyMeta, key, cacheKey, members, result);

if (encodeVersion == EncodeVersion.version_0) {
if (removeSize > 0) {
removeMembers(keyMeta, key, cacheKey, members, result, false);
} else {
boolean checkSCard = deleteType == DeleteType.unknown;
removeMembers(keyMeta, key, cacheKey, members, result, checkSCard);
}

if (deleteType == DeleteType.delete_all) {
keyMetaServer.deleteKeyMeta(key);
} else {
if (encodeVersion == EncodeVersion.version_0) {
updateKeyMeta(keyMeta, key, removeSize * -1);
}
}

if (encodeVersion == EncodeVersion.version_0) {
return IntegerReply.parse(removeSize);
} else {
return IntegerReply.parse(size);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected final void writeMembers(KeyMeta keyMeta, byte[] key, byte[] cacheKey,
}
}

protected final void removeMembers(KeyMeta keyMeta, byte[] key, byte[] cacheKey, Collection<BytesKey> members, Result result) {
protected final void removeMembers(KeyMeta keyMeta, byte[] key, byte[] cacheKey, Collection<BytesKey> members, Result result, boolean checkSCard) {
byte[][] subKeys = new byte[members.size()][];
int i = 0;
for (BytesKey bytesKey : members) {
Expand All @@ -104,12 +104,27 @@ protected final void removeMembers(KeyMeta keyMeta, byte[] key, byte[] cacheKey,
if (result.isKvWriteDelayEnable()) {
submitAsyncWriteTask(cacheKey, result, () -> {
kvClient.batchDelete(subKeys);
if (checkSCard) {
if (getSizeFromKv(keyMeta, key) == 0) {
keyMetaServer.deleteKeyMeta(key);
}
}
});
} else {
kvClient.batchDelete(subKeys);
if (checkSCard) {
if (getSizeFromKv(keyMeta, key) == 0) {
keyMetaServer.deleteKeyMeta(key);
}
}
}
}

private long getSizeFromKv(KeyMeta keyMeta, byte[] key) {
byte[] startKey = keyDesign.setMemberSubKey(keyMeta, key, new byte[0]);
return kvClient.countByPrefix(startKey, startKey, false);
}

protected final Map<BytesKey, Boolean> smismemberFromKv(KeyMeta keyMeta, byte[] key, Collection<BytesKey> members) {
byte[][] subKeys = new byte[members.size()][];
int i = 0;
Expand Down Expand Up @@ -146,8 +161,4 @@ protected final void updateKeyMeta(KeyMeta keyMeta, byte[] key, int add) {
}
}
}

protected final byte[] smembersCacheMillis() {
return Utils.stringToBytes(String.valueOf(cacheConfig.smembersCacheMillis()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.netease.nim.camellia.redis.proxy.upstream.kv.domain;

/**
* Created by caojiajun on 2024/8/13
*/
public enum DeleteType {
delete_all,
delete_someone,
unknown,
;
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,19 @@
package com.netease.nim.camellia.redis.proxy.upstream.kv.meta;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.netease.nim.camellia.redis.proxy.monitor.KvCacheMonitor;
import com.netease.nim.camellia.redis.proxy.monitor.KvGcMonitor;
import com.netease.nim.camellia.redis.proxy.reply.*;
import com.netease.nim.camellia.redis.proxy.upstream.kv.buffer.WriteBufferValue;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.KeyMetaLRUCache;
import com.netease.nim.camellia.redis.proxy.upstream.kv.buffer.WriteBuffer;
import com.netease.nim.camellia.redis.proxy.upstream.kv.buffer.Result;
import com.netease.nim.camellia.redis.proxy.upstream.kv.cache.ValueWrapper;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.KvExecutors;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.RedisTemplate;
import com.netease.nim.camellia.redis.proxy.upstream.kv.domain.CacheConfig;
import com.netease.nim.camellia.redis.proxy.upstream.kv.domain.KeyDesign;
import com.netease.nim.camellia.redis.proxy.upstream.kv.exception.KvException;
import com.netease.nim.camellia.redis.proxy.upstream.kv.gc.KvGcExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.kv.kv.KVClient;
import com.netease.nim.camellia.redis.proxy.upstream.kv.kv.KeyValue;
import com.netease.nim.camellia.redis.proxy.util.MpscSlotHashExecutor;
import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector;
import com.netease.nim.camellia.tools.utils.BytesKey;

import java.util.concurrent.CompletableFuture;

/**
* Created by caojiajun on 2024/4/9
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,30 @@ public static void testHash(CamelliaRedisTemplate template) {
assertEquals(hsetnx2, 1L);
}

template.del(key);
{
Long hset1 = template.hset(key, "f1", "v1");
assertEquals(hset1, 1L);

Long hset2 = template.hset(key, "f2", "v2");
assertEquals(hset2, 1L);

Long hset3 = template.hset(key, "f3", "v3");
assertEquals(hset3, 1L);

Map<String, String> map = template.hgetAll(key);
assertEquals(map.size(), 3);

template.hdel(key, "f1", "f2");
template.hdel(key, "f1", "f3");

String type = template.type(key);
assertEquals(type, "none");

String v = template.setex(key, 10, "v");
assertEquals(v, "OK");
}

template.del(key);
} catch (Exception e) {
System.out.println("error");
Expand Down
Loading

0 comments on commit 2b466e6

Please sign in to comment.