From 58e3a699f043692f7d04798fede26a0f2e1951f5 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 23 Oct 2024 19:10:05 +0800 Subject: [PATCH] [To rc/1.3.3] Fix the cache invalidation logic in RouteBalancer --- .../confignode/client/CnToDnRequestType.java | 1 + ...CnToDnInternalServiceAsyncRequestManager.java | 4 ++++ .../rpc/DataNodeAsyncRequestRPCHandler.java | 1 + .../manager/load/balancer/RouteBalancer.java | 16 +++++++--------- .../impl/DataNodeInternalRPCServiceImpl.java | 10 ++++++++-- .../src/main/thrift/datanode.thrift | 7 +++++++ 6 files changed, 28 insertions(+), 11 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/CnToDnRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/CnToDnRequestType.java index bc7307282573..4b7ebd57dbf7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/CnToDnRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/CnToDnRequestType.java @@ -56,6 +56,7 @@ public enum CnToDnRequestType { INVALIDATE_PARTITION_CACHE, INVALIDATE_PERMISSION_CACHE, INVALIDATE_SCHEMA_CACHE, + INVALIDATE_LAST_CACHE, CLEAR_CACHE, // Function diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java index 01240b0dab80..6a47a7ca4333 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java @@ -270,6 +270,10 @@ protected void initActionMapBuilder() { (req, client, handler) -> client.invalidateMatchedSchemaCache( (TInvalidateMatchedSchemaCacheReq) req, (DataNodeTSStatusRPCHandler) handler)); + actionMapBuilder.put( + CnToDnRequestType.INVALIDATE_LAST_CACHE, + (req, client, handler) -> + client.invalidateLastCache((String) req, (DataNodeTSStatusRPCHandler) handler)); actionMapBuilder.put( CnToDnRequestType.DELETE_DATA_FOR_DELETE_SCHEMA, (req, client, handler) -> diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java index 0cd2013a705d..19be87ef0681 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java @@ -195,6 +195,7 @@ public static DataNodeAsyncRequestRPCHandler buildHandler( case FULL_MERGE: case FLUSH: case CLEAR_CACHE: + case INVALIDATE_LAST_CACHE: case START_REPAIR_DATA: case STOP_REPAIR_DATA: case LOAD_CONFIGURATION: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index 8cf4275906ac..ad52faa62497 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -47,7 +47,6 @@ import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.consensus.ConsensusFactory; -import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq; import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq; import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; @@ -255,8 +254,8 @@ private void balanceRegionLeader( private void invalidateSchemaCacheOfOldLeaders( Map oldLeaderMap, Set successTransferSet) { - DataNodeAsyncRequestContext invalidateSchemaCacheRequestHandler = - new DataNodeAsyncRequestContext<>(CnToDnRequestType.INVALIDATE_SCHEMA_CACHE); + DataNodeAsyncRequestContext invalidateSchemaCacheRequestHandler = + new DataNodeAsyncRequestContext<>(CnToDnRequestType.INVALIDATE_LAST_CACHE); AtomicInteger requestIndex = new AtomicInteger(0); oldLeaderMap.entrySet().stream() .filter(entry -> TConsensusGroupType.DataRegion == entry.getKey().getType()) @@ -264,8 +263,8 @@ private void invalidateSchemaCacheOfOldLeaders( .forEach( entry -> { // set target - Integer dataNodeId = entry.getValue(); - TDataNodeLocation dataNodeLocation = + final Integer dataNodeId = entry.getValue(); + final TDataNodeLocation dataNodeLocation = getNodeManager().getRegisteredDataNode(dataNodeId).getLocation(); if (dataNodeLocation == null) { LOGGER.warn("DataNodeLocation is null, datanodeId {}", dataNodeId); @@ -274,10 +273,9 @@ private void invalidateSchemaCacheOfOldLeaders( invalidateSchemaCacheRequestHandler.putNodeLocation( requestIndex.get(), dataNodeLocation); // set req - TConsensusGroupId consensusGroupId = entry.getKey(); - String database = getPartitionManager().getRegionStorageGroup(consensusGroupId); - invalidateSchemaCacheRequestHandler.putRequest( - requestIndex.get(), new TInvalidateCacheReq(true, database)); + final TConsensusGroupId consensusGroupId = entry.getKey(); + final String database = getPartitionManager().getRegionStorageGroup(consensusGroupId); + invalidateSchemaCacheRequestHandler.putRequest(requestIndex.get(), database); requestIndex.incrementAndGet(); }); CnToDnInternalServiceAsyncRequestManager.getInstance() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 1aea93a3a02f..40ac005458e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -497,13 +497,19 @@ public TSStatus createDataRegion(TCreateDataRegionReq req) { } @Override - public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) { + public TSStatus invalidatePartitionCache(final TInvalidateCacheReq req) { ClusterPartitionFetcher.getInstance().invalidAllCache(); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } @Override - public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) { + public TSStatus invalidateLastCache(final String database) { + DataNodeSchemaCache.getInstance().invalidateLastCacheInDataRegion(database); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + + @Override + public TSStatus invalidateSchemaCache(final TInvalidateCacheReq req) { DataNodeSchemaCache.getInstance().takeWriteLock(); try { // req.getFullPath() is a database path diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 8c6c88480dd5..d8b9461cb4f5 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -702,6 +702,13 @@ service IDataNodeRPCService { */ common.TSStatus invalidatePartitionCache(TInvalidateCacheReq req) + /** + * Config node will invalidate last cache. + * + * @param string:database(without root) + */ + common.TSStatus invalidateLastCache(string database) + /** * Config node will invalidate Schema Info cache. *