From 58302c9482331d419c8b16d1f98bed404976f23f Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 24 Oct 2024 18:58:16 +0800 Subject: [PATCH 1/3] fix cached time chunk flip twice --- .../utils/executor/batch/utils/BatchCompactionPlan.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java index 3b30e798227b..53e25da2b60c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java @@ -51,7 +51,7 @@ public Chunk getTimeChunkFromCache(TsFileSequenceReader reader, ChunkMetadata ch if (chunk == null) { chunk = reader.readMemChunk(chunkMetadata); } - chunk.getData().flip(); + chunk.getData().reset(); return chunk; } @@ -59,6 +59,7 @@ public void addTimeChunkToCache(String file, long offset, Chunk chunk) { if (cachedTimeChunkSize >= MAX_CACHED_TIME_CHUNKS_SIZE) { return; } + chunk.getData().mark(); cachedTimeChunks.put( new Pair<>(file, offset), new Chunk( From 192787faa8ec640da55327279cb8c35fdf5a4b4b Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 24 Oct 2024 19:21:56 +0800 Subject: [PATCH 2/3] add ut --- .../batch/utils/BatchCompactionPlan.java | 16 +++++++- .../AbstractCompactionEstimator.java | 2 +- ...dAlignedSeriesFastInnerCompactionTest.java | 37 +++++++++++++++++++ 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java index 53e25da2b60c..847f628ef032 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus; import org.apache.tsfile.file.metadata.ChunkMetadata; @@ -35,7 +36,7 @@ import java.util.Map; public class BatchCompactionPlan { - public static final long MAX_CACHED_TIME_CHUNKS_SIZE = 2 * 1024 * 1024; + public static long maxCachedTimeChunksSize = 2 * 1024 * 1024; private final List compactChunkPlans = new ArrayList<>(); private final Map> alignedPageModifiedStatusCache = new HashMap<>(); @@ -50,13 +51,14 @@ public Chunk getTimeChunkFromCache(TsFileSequenceReader reader, ChunkMetadata ch Chunk chunk = cachedTimeChunks.get(key); if (chunk == null) { chunk = reader.readMemChunk(chunkMetadata); + chunk.getData().mark(); } chunk.getData().reset(); return chunk; } public void addTimeChunkToCache(String file, long offset, Chunk chunk) { - if (cachedTimeChunkSize >= MAX_CACHED_TIME_CHUNKS_SIZE) { + if (cachedTimeChunkSize >= maxCachedTimeChunksSize) { return; } chunk.getData().mark(); @@ -98,6 +100,16 @@ public boolean isEmpty() { return compactChunkPlans.isEmpty(); } + @TestOnly + public static void setMaxCachedTimeChunksSize(long size) { + maxCachedTimeChunksSize = size; + } + + @TestOnly + public static long getMaxCachedTimeChunksSize() { + return maxCachedTimeChunksSize; + } + @Override public String toString() { return compactChunkPlans.toString(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java index 43dd0959303f..4ec74c57d63e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java @@ -68,7 +68,7 @@ public abstract class AbstractCompactionEstimator { ((double) SystemInfo.getInstance().getMemorySizeForCompaction() / IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()) - + BatchCompactionPlan.MAX_CACHED_TIME_CHUNKS_SIZE; + + BatchCompactionPlan.maxCachedTimeChunksSize; protected abstract long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java index c4f95addd282..1b4d693d6b97 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.BatchCompactionPlan; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils; import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -319,6 +320,42 @@ public void testCompactionByDeserialize() throws Exception { validate(targetResource); } + @Test + public void testCompactionByDeserializeWithLargeTimeChunk() throws Exception { + long defaultMaxCachedTimeChunkSize = BatchCompactionPlan.getMaxCachedTimeChunksSize(); + try { + BatchCompactionPlan.setMaxCachedTimeChunksSize(1); + TsFileResource unseqResource1 = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[][] { + new TimeRange[] {new TimeRange(100, 200), new TimeRange(500, 600)} + }, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(false, false, false), + false); + unseqResources.add(unseqResource1); + + TsFileResource unseqResource2 = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[] {new TimeRange(150, 550)}, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(false, false, false), + false); + unseqResources.add(unseqResource2); + + TsFileResource targetResource = performCompaction(); + validate(targetResource); + } finally { + BatchCompactionPlan.setMaxCachedTimeChunksSize(defaultMaxCachedTimeChunkSize); + } + } + @Test public void testCompactionByDeserializeWithEmptyColumn() throws Exception { TsFileResource unseqResource1 = From c2992500bc538e42ee567cbdae1e7b6c9b86f316 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Fri, 25 Oct 2024 09:56:57 +0800 Subject: [PATCH 3/3] rewind --- .../utils/executor/batch/utils/BatchCompactionPlan.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java index 847f628ef032..1ba5ff680ec0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java @@ -51,9 +51,8 @@ public Chunk getTimeChunkFromCache(TsFileSequenceReader reader, ChunkMetadata ch Chunk chunk = cachedTimeChunks.get(key); if (chunk == null) { chunk = reader.readMemChunk(chunkMetadata); - chunk.getData().mark(); } - chunk.getData().reset(); + chunk.getData().rewind(); return chunk; } @@ -61,7 +60,6 @@ public void addTimeChunkToCache(String file, long offset, Chunk chunk) { if (cachedTimeChunkSize >= maxCachedTimeChunksSize) { return; } - chunk.getData().mark(); cachedTimeChunks.put( new Pair<>(file, offset), new Chunk(