Skip to content

Commit

Permalink
[To rc/1.3.3] Fix the cache invalidation logic in RouteBalancer
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi authored Oct 23, 2024
1 parent 9a3c8e1 commit 58e3a69
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public enum CnToDnRequestType {
INVALIDATE_PARTITION_CACHE,
INVALIDATE_PERMISSION_CACHE,
INVALIDATE_SCHEMA_CACHE,
INVALIDATE_LAST_CACHE,
CLEAR_CACHE,

// Function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ protected void initActionMapBuilder() {
(req, client, handler) ->
client.invalidateMatchedSchemaCache(
(TInvalidateMatchedSchemaCacheReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnRequestType.INVALIDATE_LAST_CACHE,
(req, client, handler) ->
client.invalidateLastCache((String) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnRequestType.DELETE_DATA_FOR_DELETE_SCHEMA,
(req, client, handler) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public static DataNodeAsyncRequestRPCHandler<?> buildHandler(
case FULL_MERGE:
case FLUSH:
case CLEAR_CACHE:
case INVALIDATE_LAST_CACHE:
case START_REPAIR_DATA:
case STOP_REPAIR_DATA:
case LOAD_CONFIGURATION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
Expand Down Expand Up @@ -255,17 +254,17 @@ private void balanceRegionLeader(

private void invalidateSchemaCacheOfOldLeaders(
Map<TConsensusGroupId, Integer> oldLeaderMap, Set<TConsensusGroupId> successTransferSet) {
DataNodeAsyncRequestContext<TInvalidateCacheReq, TSStatus> invalidateSchemaCacheRequestHandler =
new DataNodeAsyncRequestContext<>(CnToDnRequestType.INVALIDATE_SCHEMA_CACHE);
DataNodeAsyncRequestContext<String, TSStatus> invalidateSchemaCacheRequestHandler =
new DataNodeAsyncRequestContext<>(CnToDnRequestType.INVALIDATE_LAST_CACHE);
AtomicInteger requestIndex = new AtomicInteger(0);
oldLeaderMap.entrySet().stream()
.filter(entry -> TConsensusGroupType.DataRegion == entry.getKey().getType())
.filter(entry -> successTransferSet.contains(entry.getKey()))
.forEach(
entry -> {
// set target
Integer dataNodeId = entry.getValue();
TDataNodeLocation dataNodeLocation =
final Integer dataNodeId = entry.getValue();
final TDataNodeLocation dataNodeLocation =
getNodeManager().getRegisteredDataNode(dataNodeId).getLocation();
if (dataNodeLocation == null) {
LOGGER.warn("DataNodeLocation is null, datanodeId {}", dataNodeId);
Expand All @@ -274,10 +273,9 @@ private void invalidateSchemaCacheOfOldLeaders(
invalidateSchemaCacheRequestHandler.putNodeLocation(
requestIndex.get(), dataNodeLocation);
// set req
TConsensusGroupId consensusGroupId = entry.getKey();
String database = getPartitionManager().getRegionStorageGroup(consensusGroupId);
invalidateSchemaCacheRequestHandler.putRequest(
requestIndex.get(), new TInvalidateCacheReq(true, database));
final TConsensusGroupId consensusGroupId = entry.getKey();
final String database = getPartitionManager().getRegionStorageGroup(consensusGroupId);
invalidateSchemaCacheRequestHandler.putRequest(requestIndex.get(), database);
requestIndex.incrementAndGet();
});
CnToDnInternalServiceAsyncRequestManager.getInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,19 @@ public TSStatus createDataRegion(TCreateDataRegionReq req) {
}

@Override
public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) {
public TSStatus invalidatePartitionCache(final TInvalidateCacheReq req) {
ClusterPartitionFetcher.getInstance().invalidAllCache();
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

@Override
public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
public TSStatus invalidateLastCache(final String database) {
DataNodeSchemaCache.getInstance().invalidateLastCacheInDataRegion(database);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

@Override
public TSStatus invalidateSchemaCache(final TInvalidateCacheReq req) {
DataNodeSchemaCache.getInstance().takeWriteLock();
try {
// req.getFullPath() is a database path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,13 @@ service IDataNodeRPCService {
*/
common.TSStatus invalidatePartitionCache(TInvalidateCacheReq req)

/**
* Config node will invalidate last cache.
*
* @param string:database(without root)
*/
common.TSStatus invalidateLastCache(string database)

/**
* Config node will invalidate Schema Info cache.
*
Expand Down

0 comments on commit 58e3a69

Please sign in to comment.