Skip to content

Commit

Permalink
feat(kv): optimize lru cache clear on slot change (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Sep 9, 2024
1 parent 5b164de commit c72a38c
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,43 +67,45 @@ public boolean isHotKey(byte[] key) {
}

public void putAllForRead(int slot, byte[] cacheKey, RedisHash hash) {
localCache.put(slot, new BytesKey(cacheKey), hash);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
localCache.put(slotCacheKey, hash);
}

public void putAllForWrite(int slot, byte[] cacheKey, RedisHash hash) {
localCacheForWrite.put(slot, new BytesKey(cacheKey), hash);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
localCacheForWrite.put(slotCacheKey, hash);
}

public RedisHash getForRead(int slot, byte[] cacheKey) {
BytesKey bytesKey = new BytesKey(cacheKey);
RedisHash hash = localCache.get(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
RedisHash hash = localCache.get(slotCacheKey);
if (hash == null) {
hash = localCacheForWrite.get(slot, bytesKey);
hash = localCacheForWrite.get(slotCacheKey);
if (hash != null) {
localCache.put(slot, bytesKey, hash);
localCacheForWrite.remove(slot, bytesKey);
localCache.put(slotCacheKey, hash);
localCacheForWrite.remove(slotCacheKey);
}
}
return hash;
}

public RedisHash getForWrite(int slot, byte[] cacheKey) {
BytesKey bytesKey = new BytesKey(cacheKey);
RedisHash hash = localCache.get(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
RedisHash hash = localCache.get(slotCacheKey);
if (hash == null) {
hash = localCacheForWrite.get(slot, bytesKey);
hash = localCacheForWrite.get(slotCacheKey);
}
return hash;
}

public Map<BytesKey, byte[]> hset(int slot, byte[] cacheKey, Map<BytesKey, byte[]> fieldMap) {
BytesKey bytesKey = new BytesKey(cacheKey);
RedisHash hash1 = localCacheForWrite.get(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
RedisHash hash1 = localCacheForWrite.get(slotCacheKey);
Map<BytesKey, byte[]> result1 = null;
if (hash1 != null) {
result1 = hash1.hset(fieldMap);
}
RedisHash hash2 = localCache.get(slot, bytesKey);
RedisHash hash2 = localCache.get(slotCacheKey);
Map<BytesKey, byte[]> result2 = null;
if (hash2 != null) {
result2 = hash2.hset(fieldMap);
Expand All @@ -115,13 +117,13 @@ public Map<BytesKey, byte[]> hset(int slot, byte[] cacheKey, Map<BytesKey, byte[
}

public Map<BytesKey, byte[]> hdel(int slot, byte[] cacheKey, Set<BytesKey> fields) {
BytesKey bytesKey = new BytesKey(cacheKey);
RedisHash hash1 = localCacheForWrite.get(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
RedisHash hash1 = localCacheForWrite.get(slotCacheKey);
Map<BytesKey, byte[]> result1 = null;
if (hash1 != null) {
result1 = hash1.hdel(fields);
}
RedisHash hash2 = localCache.get(slot, bytesKey);
RedisHash hash2 = localCache.get(slotCacheKey);
Map<BytesKey, byte[]> result2 = null;
if (hash2 != null) {
result2 = hash2.hdel(fields);
Expand All @@ -133,9 +135,9 @@ public Map<BytesKey, byte[]> hdel(int slot, byte[] cacheKey, Set<BytesKey> field
}

public void del(int slot, byte[] cacheKey) {
BytesKey bytesKey = new BytesKey(cacheKey);
localCache.remove(slot, bytesKey);
localCacheForWrite.remove(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
localCache.remove(slotCacheKey);
localCacheForWrite.remove(slotCacheKey);
}

public void clear() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import com.netease.nim.camellia.redis.proxy.upstream.kv.conf.RedisKvConf;
import com.netease.nim.camellia.redis.proxy.upstream.kv.meta.KeyMeta;
import com.netease.nim.camellia.redis.proxy.util.RedisClusterCRC16Utils;
import com.netease.nim.camellia.tools.utils.BytesKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,34 +56,34 @@ private void rebuild() {
}

public ValueWrapper<KeyMeta> get(int slot, byte[] key) {
BytesKey bytesKey = new BytesKey(key);
KeyMeta keyMeta = localCache.get(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, key);
KeyMeta keyMeta = localCache.get(slotCacheKey);
if (keyMeta != null) {
return () -> keyMeta;
}
Boolean bool = nullCache.get(slot, bytesKey);
Boolean bool = nullCache.get(slotCacheKey);
if (bool != null) {
return () -> null;
}
return null;
}

public void remove(int slot, byte[] key) {
BytesKey bytesKey = new BytesKey(key);
localCache.remove(slot, bytesKey);
nullCache.put(slot, bytesKey, Boolean.TRUE);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, key);
localCache.remove(slotCacheKey);
nullCache.put(slotCacheKey, Boolean.TRUE);
}

public void put(int slot, byte[] key, KeyMeta keyMeta) {
BytesKey bytesKey = new BytesKey(key);
localCache.put(slot, bytesKey, keyMeta);
nullCache.remove(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, key);
localCache.put(slotCacheKey, keyMeta);
nullCache.remove(slotCacheKey);
}

public void setNull(int slot, byte[] key) {
BytesKey bytesKey = new BytesKey(key);
localCache.remove(slot, bytesKey);
nullCache.put(slot, bytesKey, Boolean.TRUE);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, key);
localCache.remove(slotCacheKey);
nullCache.put(slotCacheKey, Boolean.TRUE);
}

public void clear() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,57 +67,59 @@ public boolean isHotKey(byte[] key) {
}

public void putAllForRead(int slot, byte[] cacheKey, RedisSet set) {
localCache.put(slot, new BytesKey(cacheKey), set);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
localCache.put(slotCacheKey, set);
}

public void putAllForWrite(int slot, byte[] cacheKey, RedisSet set) {
localCacheForWrite.put(slot, new BytesKey(cacheKey), set);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
localCacheForWrite.put(slotCacheKey, set);
}

public RedisSet getForRead(int slot, byte[] cacheKey) {
BytesKey bytesKey = new BytesKey(cacheKey);
RedisSet set = localCache.get(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
RedisSet set = localCache.get(slotCacheKey);
if (set == null) {
set = localCacheForWrite.get(slot, bytesKey);
set = localCacheForWrite.get(slotCacheKey);
if (set != null) {
localCache.put(slot, bytesKey, set);
localCacheForWrite.remove(slot, bytesKey);
localCache.put(slotCacheKey, set);
localCacheForWrite.remove(slotCacheKey);
}
}
return set;
}

public RedisSet getForWrite(int slot, byte[] cacheKey) {
BytesKey bytesKey = new BytesKey(cacheKey);
RedisSet set = localCache.get(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
RedisSet set = localCache.get(slotCacheKey);
if (set == null) {
set = localCacheForWrite.get(slot, bytesKey);
set = localCacheForWrite.get(slotCacheKey);
}
return set;
}

public Set<BytesKey> sadd(int slot, byte[] cacheKey, Set<BytesKey> memberSet) {
BytesKey bytesKey = new BytesKey(cacheKey);
RedisSet set = localCache.get(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
RedisSet set = localCache.get(slotCacheKey);
Set<BytesKey> result = null;
if (set != null) {
result = set.sadd(memberSet);
}
set = localCacheForWrite.get(slot, bytesKey);
set = localCacheForWrite.get(slotCacheKey);
if (set != null) {
result = set.sadd(memberSet);
}
return result;
}

public Set<BytesKey> spop(int slot, byte[] cacheKey, int count) {
BytesKey bytesKey = new BytesKey(cacheKey);
RedisSet set = localCache.get(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
RedisSet set = localCache.get(slotCacheKey);
Set<BytesKey> result = null;
if (set != null) {
result = set.spop(count);
}
set = localCacheForWrite.get(slot, bytesKey);
set = localCacheForWrite.get(slotCacheKey);
if (set != null) {
if (result != null) {
set.srem(result);
Expand All @@ -129,23 +131,23 @@ public Set<BytesKey> spop(int slot, byte[] cacheKey, int count) {
}

public Set<BytesKey> srem(int slot, byte[] cacheKey, Collection<BytesKey> members) {
BytesKey bytesKey = new BytesKey(cacheKey);
RedisSet set = localCache.get(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
RedisSet set = localCache.get(slotCacheKey);
Set<BytesKey> result = null;
if (set != null) {
result = set.srem(members);
}
set = localCacheForWrite.get(slot, bytesKey);
set = localCacheForWrite.get(slotCacheKey);
if (set != null) {
result = set.srem(members);
}
return result;
}

public void del(int slot, byte[] cacheKey) {
BytesKey bytesKey = new BytesKey(cacheKey);
localCache.remove(slot, bytesKey);
localCacheForWrite.remove(slot, bytesKey);
SlotCacheKey slotCacheKey = new SlotCacheKey(slot, cacheKey);
localCache.remove(slotCacheKey);
localCacheForWrite.remove(slotCacheKey);
}

public void clear() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.netease.nim.camellia.redis.proxy.upstream.kv.cache;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;

/**
* Created by caojiajun on 2024/9/9
*/
public class SlotCacheKey {
private final byte[] key;
private final int slot;
private int hashCode;

public SlotCacheKey(int slot, byte[] key) {
this.slot = slot;
this.key = key;
}

public byte[] getKey() {
return key;
}

public int getSlot() {
return slot;
}

@Override
public boolean equals(Object object) {
if (this == object) return true;
if (object == null || getClass() != object.getClass()) return false;

SlotCacheKey slotCacheKey = (SlotCacheKey) object;

return Arrays.equals(key, slotCacheKey.key);
}

@Override
public int hashCode() {
if (hashCode == 0) {
hashCode = Arrays.hashCode(key);
}
return hashCode;
}

@Override
public String toString() {
if (key == null) return null;
return new String(key, StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
public class SlotLRUCache<V> {

private final ConcurrentLinkedHashMap<BytesKey, V>[] array;
private final ConcurrentLinkedHashMap<SlotCacheKey, V>[] array;
private final int segmentSize;
private final boolean is2Power;

Expand All @@ -22,64 +22,64 @@ public SlotLRUCache(int capacity) {
is2Power = MathUtil.is2Power(segmentSize);
array = new ConcurrentLinkedHashMap[segmentSize];
for (int i = 0; i< segmentSize; i++) {
ConcurrentLinkedHashMap<BytesKey, V> subMap = new ConcurrentLinkedHashMap.Builder<BytesKey, V>()
ConcurrentLinkedHashMap<SlotCacheKey, V> subMap = new ConcurrentLinkedHashMap.Builder<SlotCacheKey, V>()
.initialCapacity(capacity / segmentSize)
.maximumWeightedCapacity(capacity / segmentSize)
.build();
array[i] = subMap;
}
}

public V get(int slot, BytesKey cacheKey) {
int index = MathUtil.mod(is2Power, slot, segmentSize);
ConcurrentLinkedHashMap<BytesKey, V> subMap = array[index];
public V get(SlotCacheKey cacheKey) {
int index = MathUtil.mod(is2Power, cacheKey.getSlot(), segmentSize);
ConcurrentLinkedHashMap<SlotCacheKey, V> subMap = array[index];
return subMap.get(cacheKey);
}


public void put(int slot, BytesKey cacheKey, V value) {
int index = MathUtil.mod(is2Power, slot, segmentSize);
ConcurrentLinkedHashMap<BytesKey, V> subMap = array[index];
public void put(SlotCacheKey cacheKey, V value) {
int index = MathUtil.mod(is2Power, cacheKey.getSlot(), segmentSize);
ConcurrentLinkedHashMap<SlotCacheKey, V> subMap = array[index];
subMap.put(cacheKey, value);
}

public void remove(int slot, BytesKey cacheKey) {
int index = MathUtil.mod(is2Power, slot, segmentSize);
ConcurrentLinkedHashMap<BytesKey, V> subMap = array[index];
public void remove(SlotCacheKey cacheKey) {
int index = MathUtil.mod(is2Power, cacheKey.getSlot(), segmentSize);
ConcurrentLinkedHashMap<SlotCacheKey, V> subMap = array[index];
subMap.remove(cacheKey);
}

public void clear() {
for (ConcurrentLinkedHashMap<BytesKey, V> subMap : array) {
for (ConcurrentLinkedHashMap<SlotCacheKey, V> subMap : array) {
subMap.clear();
}
}

public void clear(int slot) {
int index = MathUtil.mod(is2Power, slot, segmentSize);
ConcurrentLinkedHashMap<BytesKey, V> subMap = array[index];
subMap.clear();
ConcurrentLinkedHashMap<SlotCacheKey, V> subMap = array[index];
subMap.entrySet().removeIf(entry -> entry.getKey().getSlot() == slot);
}

public void setCapacity(int capacity) {
for (ConcurrentLinkedHashMap<BytesKey, V> subMap : array) {
for (ConcurrentLinkedHashMap<SlotCacheKey, V> subMap : array) {
subMap.setCapacity(capacity / segmentSize);
}
}

public long size() {
long estimateSize = 0;
for (ConcurrentLinkedHashMap<BytesKey, V> map : array) {
for (ConcurrentLinkedHashMap<SlotCacheKey, V> map : array) {
estimateSize += map.size();
}
return estimateSize;
}

public long estimateSize() {
long estimateSize = 0;
for (ConcurrentLinkedHashMap<BytesKey, V> map : array) {
estimateSize += map.size() * 8L;
for (Map.Entry<BytesKey, V> entry : map.entrySet()) {
for (ConcurrentLinkedHashMap<SlotCacheKey, V> map : array) {
estimateSize += map.size() * 12L;
for (Map.Entry<SlotCacheKey, V> entry : map.entrySet()) {
estimateSize += entry.getKey().getKey().length;
V value = entry.getValue();
if (value instanceof EstimateSizeValue) {
Expand Down
Loading

0 comments on commit c72a38c

Please sign in to comment.