From 0850c16152f294c5ccd53e689016a332cf1639f3 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 1 Aug 2024 22:49:22 +0800 Subject: [PATCH 01/12] Add PipeLastQueryEvent --- .../common/tablet/PipeLastQueryEvent.java | 146 ++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastQueryEvent.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastQueryEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastQueryEvent.java new file mode 100644 index 000000000000..61a4d6f91875 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastQueryEvent.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tablet; + +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.pattern.PipePattern; +import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock; + +import org.apache.tsfile.write.record.Tablet; + +import java.util.List; + +public class PipeLastQueryEvent extends EnrichedEvent { + + private List tablets; + + private PipeTabletMemoryBlock allocatedMemoryBlock; + + private final boolean isAligned; + + protected PipeLastQueryEvent( + final List tablets, + final boolean isAligned, + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta, + final PipePattern pipePattern, + final long startTime, + final long endTime) { + super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime); + this.tablets = tablets; + this.isAligned = isAligned; + } + + protected PipeLastQueryEvent( + final List tablets, + final boolean isAligned, + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta, + final PipePattern pipePattern) { + this( + tablets, + isAligned, + pipeName, + creationTime, + pipeTaskMeta, + pipePattern, + Long.MIN_VALUE, + Long.MAX_VALUE); + } + + protected PipeLastQueryEvent( + final List tablets, + final boolean isAligned, + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta) { + this( + tablets, + isAligned, + pipeName, + creationTime, + pipeTaskMeta, + null, + Long.MIN_VALUE, + Long.MAX_VALUE); + } + + @Override + public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { + tablets.forEach(tablet -> PipeDataNodeResourceManager.memory().forceAllocateWithRetry(tablet)); + return true; + } + + @Override + public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { + allocatedMemoryBlock.close(); + tablets = null; + return true; + } + + @Override + public ProgressIndex getProgressIndex() { + return MinimumProgressIndex.INSTANCE; + } + + @Override + public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( + String pipeName, + long creationTime, + PipeTaskMeta pipeTaskMeta, + PipePattern pattern, + long startTime, + long endTime) { + return new PipeLastQueryEvent( + tablets, isAligned, pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime); + } + + @Override + public boolean isGeneratedByPipe() { + return false; + } + + @Override + public boolean mayEventTimeOverlappedWithTimeRange() { + return true; + } + + @Override + public boolean mayEventPathsOverlappedWithPattern() { + return true; + } + + /////////////////////////// Object /////////////////////////// + + @Override + public String toString() { + return String.format( + "PipeRawTabletInsertionEvent{tablets=%s, isAligned=%s, allocatedMemoryBlock=%s}", + tablets, isAligned, allocatedMemoryBlock) + + " - " + + super.toString(); + } +} From 868ccc69dff578385cb0ac374e395179fca7f8c1 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Fri, 2 Aug 2024 16:34:30 +0800 Subject: [PATCH 02/12] Modify the code --- .../common/tablet/PipeLastQueryEvent.java | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastQueryEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastQueryEvent.java index 61a4d6f91875..d2af44250732 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastQueryEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastQueryEvent.java @@ -35,13 +35,13 @@ public class PipeLastQueryEvent extends EnrichedEvent { private List tablets; - private PipeTabletMemoryBlock allocatedMemoryBlock; + private final long captureTime; - private final boolean isAligned; + private PipeTabletMemoryBlock allocatedMemoryBlock; protected PipeLastQueryEvent( final List tablets, - final boolean isAligned, + final long captureTime, final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, @@ -50,19 +50,19 @@ protected PipeLastQueryEvent( final long endTime) { super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime); this.tablets = tablets; - this.isAligned = isAligned; + this.captureTime = captureTime; } protected PipeLastQueryEvent( final List tablets, - final boolean isAligned, + final long captureTime, final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, final PipePattern pipePattern) { this( tablets, - isAligned, + captureTime, pipeName, creationTime, pipeTaskMeta, @@ -73,13 +73,13 @@ protected PipeLastQueryEvent( protected PipeLastQueryEvent( final List tablets, - final boolean isAligned, + final long captureTime, final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta) { this( tablets, - isAligned, + captureTime, pipeName, creationTime, pipeTaskMeta, @@ -115,7 +115,14 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( long startTime, long endTime) { return new PipeLastQueryEvent( - tablets, isAligned, pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime); + tablets, + System.currentTimeMillis(), + pipeName, + creationTime, + pipeTaskMeta, + pipePattern, + startTime, + endTime); } @Override @@ -138,9 +145,17 @@ public boolean mayEventPathsOverlappedWithPattern() { @Override public String toString() { return String.format( - "PipeRawTabletInsertionEvent{tablets=%s, isAligned=%s, allocatedMemoryBlock=%s}", - tablets, isAligned, allocatedMemoryBlock) + "PipeRawTabletInsertionEvent{tablets=%s, captureTime=%s, allocatedMemoryBlock=%s}", + tablets, captureTime, allocatedMemoryBlock) + " - " + super.toString(); } + + public List getTablets() { + return tablets; + } + + public long getCaptureTime() { + return captureTime; + } } From dd1eab64ac6aa3d13ccb4e259268a19dc5acf998 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Mon, 5 Aug 2024 14:50:48 +0800 Subject: [PATCH 03/12] add PipeLastPointTsBlockEvent --- ...ent.java => PipeLastPointTabletEvent.java} | 65 ++---- .../tsblock/PipeLastPointTsBlockEvent.java | 213 ++++++++++++++++++ 2 files changed, 228 insertions(+), 50 deletions(-) rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/{PipeLastQueryEvent.java => PipeLastPointTabletEvent.java} (69%) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastQueryEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java similarity index 69% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastQueryEvent.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java index d2af44250732..1f418c551982 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastQueryEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java @@ -29,18 +29,16 @@ import org.apache.tsfile.write.record.Tablet; -import java.util.List; +public class PipeLastPointTabletEvent extends EnrichedEvent { -public class PipeLastQueryEvent extends EnrichedEvent { - - private List tablets; + private Tablet tablet; private final long captureTime; private PipeTabletMemoryBlock allocatedMemoryBlock; - protected PipeLastQueryEvent( - final List tablets, + public PipeLastPointTabletEvent( + final Tablet tablet, final long captureTime, final String pipeName, final long creationTime, @@ -49,55 +47,20 @@ protected PipeLastQueryEvent( final long startTime, final long endTime) { super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime); - this.tablets = tablets; + this.tablet = tablet; this.captureTime = captureTime; } - protected PipeLastQueryEvent( - final List tablets, - final long captureTime, - final String pipeName, - final long creationTime, - final PipeTaskMeta pipeTaskMeta, - final PipePattern pipePattern) { - this( - tablets, - captureTime, - pipeName, - creationTime, - pipeTaskMeta, - pipePattern, - Long.MIN_VALUE, - Long.MAX_VALUE); - } - - protected PipeLastQueryEvent( - final List tablets, - final long captureTime, - final String pipeName, - final long creationTime, - final PipeTaskMeta pipeTaskMeta) { - this( - tablets, - captureTime, - pipeName, - creationTime, - pipeTaskMeta, - null, - Long.MIN_VALUE, - Long.MAX_VALUE); - } - @Override public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { - tablets.forEach(tablet -> PipeDataNodeResourceManager.memory().forceAllocateWithRetry(tablet)); + allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocateWithRetry(tablet); return true; } @Override public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { allocatedMemoryBlock.close(); - tablets = null; + tablet = null; return true; } @@ -114,8 +77,8 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( PipePattern pattern, long startTime, long endTime) { - return new PipeLastQueryEvent( - tablets, + return new PipeLastPointTabletEvent( + tablet, System.currentTimeMillis(), pipeName, creationTime, @@ -145,14 +108,16 @@ public boolean mayEventPathsOverlappedWithPattern() { @Override public String toString() { return String.format( - "PipeRawTabletInsertionEvent{tablets=%s, captureTime=%s, allocatedMemoryBlock=%s}", - tablets, captureTime, allocatedMemoryBlock) + "PipeLastPointTabletEvent{tablets=%s, captureTime=%s, allocatedMemoryBlock=%s}", + tablet, captureTime, allocatedMemoryBlock) + " - " + super.toString(); } - public List getTablets() { - return tablets; + /////////////////////////// Getter /////////////////////////// + + public Tablet getTablets() { + return tablet; } public long getCaptureTime() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java new file mode 100644 index 000000000000..10a62e8d9305 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tsblock; + +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.pattern.PipePattern; +import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeLastPointTabletEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.DateUtils; +import org.apache.tsfile.write.UnSupportedDataTypeException; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; + +import java.time.LocalDate; +import java.util.List; + +public class PipeLastPointTsBlockEvent extends EnrichedEvent { + + private final PartialPath partialPath; + + private final List measurementSchemas; + + private final long captureTime; + + private TsBlock tsBlock; + + private PipeMemoryBlock pipeMemoryBlock; + + protected PipeLastPointTsBlockEvent( + final TsBlock tsBlock, + final long captureTime, + final PartialPath partialPath, + final List measurementSchemas, + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta, + final PipePattern pipePattern, + final long startTime, + final long endTime) { + super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime); + this.tsBlock = tsBlock; + this.partialPath = partialPath; + this.measurementSchemas = measurementSchemas; + this.captureTime = captureTime; + } + + @Override + public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { + pipeMemoryBlock = + PipeDataNodeResourceManager.memory().forceAllocate(tsBlock.getRetainedSizeInBytes()); + return true; + } + + @Override + public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { + pipeMemoryBlock.close(); + tsBlock = null; + return false; + } + + @Override + public ProgressIndex getProgressIndex() { + return MinimumProgressIndex.INSTANCE; + } + + @Override + public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( + String pipeName, + long creationTime, + PipeTaskMeta pipeTaskMeta, + PipePattern pattern, + long startTime, + long endTime) { + return new PipeLastPointTsBlockEvent( + tsBlock, + captureTime, + partialPath, + measurementSchemas, + pipeName, + creationTime, + pipeTaskMeta, + pipePattern, + startTime, + endTime); + } + + @Override + public boolean isGeneratedByPipe() { + return false; + } + + @Override + public boolean mayEventTimeOverlappedWithTimeRange() { + return true; + } + + @Override + public boolean mayEventPathsOverlappedWithPattern() { + return true; + } + + /////////////////////////// PipeLastPointTabletEvent /////////////////////////// + + public PipeLastPointTabletEvent toPipeLastPointTabletEvent() { + return new PipeLastPointTabletEvent( + convertToTablets(), + captureTime, + pipeName, + creationTime, + pipeTaskMeta, + pipePattern, + startTime, + endTime); + } + + /////////////////////////// convertToTablet /////////////////////////// + + private Tablet convertToTablets() { + Object[] values = new Object[tsBlock.getValueColumnCount()]; + for (int i = 0; i < tsBlock.getValueColumnCount(); ++i) { + Column column = tsBlock.getColumn(i); + TSDataType type = column.getDataType(); + switch (type) { + case BOOLEAN: + values[i] = column.getBooleans(); + break; + case INT32: + values[i] = column.getInts(); + break; + case DATE: + int[] dates = column.getInts(); + LocalDate[] localDates = new LocalDate[dates.length]; + for (int row = 0; row < dates.length; row++) { + localDates[row] = DateUtils.parseIntToLocalDate(dates[row]); + } + values[i] = localDates; + break; + case INT64: + case TIMESTAMP: + values[i] = column.getLongs(); + break; + case FLOAT: + values[i] = column.getFloats(); + break; + case DOUBLE: + values[i] = column.getDoubles(); + break; + case TEXT: + case BLOB: + case STRING: + values[i] = column.getBinaries(); + break; + default: + throw new UnSupportedDataTypeException("UnSupported" + type); + } + } + + final Tablet tablet = + new Tablet( + partialPath.getDevice(), + measurementSchemas, + tsBlock.getTimeColumn().getLongs(), + values, + null, + tsBlock.getValueColumnCount()); + + tablet.initBitMaps(); + return tablet; + } + + /////////////////////////// Object /////////////////////////// + + @Override + public String toString() { + return String.format( + "LastPointTsBlockEvent{tsBlock=%s, partialPath=%s, measurementSchemas=%s, captureTime=%s, pipeMemoryBlock=%s}", + tsBlock, partialPath, measurementSchemas, captureTime, pipeMemoryBlock) + + " - " + + super.toString(); + } + + /////////////////////////// Getter /////////////////////////// + + public long getCaptureTime() { + return captureTime; + } +} From 8368e8ee11c21610d50f817f231e8f6bb8ba9912 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Mon, 5 Aug 2024 19:09:32 +0800 Subject: [PATCH 04/12] Modify code format --- .../common/tablet/PipeLastPointTabletEvent.java | 13 +++---------- .../common/tsblock/PipeLastPointTsBlockEvent.java | 9 +++++---- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java index 1f418c551982..2d4714e45fc5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java @@ -78,14 +78,7 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( long startTime, long endTime) { return new PipeLastPointTabletEvent( - tablet, - System.currentTimeMillis(), - pipeName, - creationTime, - pipeTaskMeta, - pipePattern, - startTime, - endTime); + tablet, captureTime, pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime); } @Override @@ -108,7 +101,7 @@ public boolean mayEventPathsOverlappedWithPattern() { @Override public String toString() { return String.format( - "PipeLastPointTabletEvent{tablets=%s, captureTime=%s, allocatedMemoryBlock=%s}", + "PipeLastPointTabletEvent{tablet=%s, captureTime=%s, allocatedMemoryBlock=%s}", tablet, captureTime, allocatedMemoryBlock) + " - " + super.toString(); @@ -116,7 +109,7 @@ public String toString() { /////////////////////////// Getter /////////////////////////// - public Tablet getTablets() { + public Tablet getTablet() { return tablet; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java index 10a62e8d9305..27473dc320c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java @@ -129,7 +129,7 @@ public boolean mayEventPathsOverlappedWithPattern() { public PipeLastPointTabletEvent toPipeLastPointTabletEvent() { return new PipeLastPointTabletEvent( - convertToTablets(), + convertToTablet(), captureTime, pipeName, creationTime, @@ -141,7 +141,7 @@ public PipeLastPointTabletEvent toPipeLastPointTabletEvent() { /////////////////////////// convertToTablet /////////////////////////// - private Tablet convertToTablets() { + private Tablet convertToTablet() { Object[] values = new Object[tsBlock.getValueColumnCount()]; for (int i = 0; i < tsBlock.getValueColumnCount(); ++i) { Column column = tsBlock.getColumn(i); @@ -177,7 +177,8 @@ private Tablet convertToTablets() { values[i] = column.getBinaries(); break; default: - throw new UnSupportedDataTypeException("UnSupported" + type); + throw new UnSupportedDataTypeException( + "TsBlock format to Tablet format does not support type " + type); } } @@ -199,7 +200,7 @@ private Tablet convertToTablets() { @Override public String toString() { return String.format( - "LastPointTsBlockEvent{tsBlock=%s, partialPath=%s, measurementSchemas=%s, captureTime=%s, pipeMemoryBlock=%s}", + "PipeLastPointTsBlockEvent{tsBlock=%s, partialPath=%s, measurementSchemas=%s, captureTime=%s, pipeMemoryBlock=%s}", tsBlock, partialPath, measurementSchemas, captureTime, pipeMemoryBlock) + " - " + super.toString(); From c7bcf4d3e3dd46323ce9cc120969043148ab87bc Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Tue, 6 Aug 2024 15:54:25 +0800 Subject: [PATCH 05/12] add LastPointSamplingProcessor --- .../tablet/PipeLastPointTabletEvent.java | 16 +- .../tsblock/PipeLastPointTsBlockEvent.java | 6 +- .../lastpoint/LastPointFilter.java | 60 +++++ .../lastpoint/LastPointSamplingProcessor.java | 220 ++++++++++++++++++ .../constant/PipeProcessorConstant.java | 5 + 5 files changed, 305 insertions(+), 2 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java index 2d4714e45fc5..0d462cb1d10e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.pattern.PipePattern; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.db.pipe.processor.downsampling.lastpoint.LastPointFilter; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock; @@ -37,9 +39,12 @@ public class PipeLastPointTabletEvent extends EnrichedEvent { private PipeTabletMemoryBlock allocatedMemoryBlock; + private PartialPathLastObjectCache> partialPathToLatestTimeCache; + public PipeLastPointTabletEvent( final Tablet tablet, final long captureTime, + final PartialPathLastObjectCache> partialPathToLatestTimeCache, final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, @@ -49,6 +54,7 @@ public PipeLastPointTabletEvent( super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime); this.tablet = tablet; this.captureTime = captureTime; + this.partialPathToLatestTimeCache = partialPathToLatestTimeCache; } @Override @@ -78,7 +84,15 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( long startTime, long endTime) { return new PipeLastPointTabletEvent( - tablet, captureTime, pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime); + tablet, + captureTime, + partialPathToLatestTimeCache, + pipeName, + creationTime, + pipeTaskMeta, + pipePattern, + startTime, + endTime); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java index 27473dc320c3..f3906cd11a42 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java @@ -26,6 +26,8 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.event.common.tablet.PipeLastPointTabletEvent; +import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.db.pipe.processor.downsampling.lastpoint.LastPointFilter; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; @@ -127,10 +129,12 @@ public boolean mayEventPathsOverlappedWithPattern() { /////////////////////////// PipeLastPointTabletEvent /////////////////////////// - public PipeLastPointTabletEvent toPipeLastPointTabletEvent() { + public PipeLastPointTabletEvent toPipeLastPointTabletEvent( + PartialPathLastObjectCache> partialPathToLatestTimeCache) { return new PipeLastPointTabletEvent( convertToTablet(), captureTime, + partialPathToLatestTimeCache, pipeName, creationTime, pipeTaskMeta, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java new file mode 100644 index 000000000000..0bc37c11a866 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.lastpoint; + +import java.util.Objects; + +public class LastPointFilter { + + private long lastStoredTimestamp; + + private T lastStoredValue; + + private boolean isConsumed; + + public LastPointFilter(final long firstTimestamp, final T firstValue) { + this.lastStoredTimestamp = firstTimestamp; + this.lastStoredValue = firstValue; + this.isConsumed = false; + } + + public boolean filter(final long timestamp, final T value) { + if (this.lastStoredTimestamp > timestamp) { + return false; + } + + if (this.lastStoredTimestamp == timestamp && Objects.equals(lastStoredValue, value)) { + return false; + } + + reset(timestamp, value); + return true; + } + + private void reset(final long timestamp, final T value) { + this.lastStoredTimestamp = timestamp; + this.lastStoredValue = value; + this.isConsumed = false; + } + + public void consume() { + this.isConsumed = true; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java new file mode 100644 index 000000000000..0ac1aeeb9fff --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.lastpoint; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.pipe.event.common.row.PipeRemarkableRow; +import org.apache.iotdb.db.pipe.event.common.row.PipeRow; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeLastPointTabletEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsblock.PipeLastPointTsBlockEvent; +import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; +import org.apache.iotdb.pipe.api.PipeProcessor; +import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.collector.EventCollector; +import org.apache.iotdb.pipe.api.collector.RowCollector; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; + +import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.read.TimeValuePair; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY; + +public class LastPointSamplingProcessor implements PipeProcessor { + + private final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE = DataNodeSchemaCache.getInstance(); + + private PartialPathLastObjectCache> partialPathToLatestTimeCache; + + protected long memoryLimitInBytes; + + private PartialPathLastObjectCache> initPathLastObjectCache( + long memoryLimitInBytes) { + partialPathToLatestTimeCache = + new PartialPathLastObjectCache>(memoryLimitInBytes) { + @Override + protected long calculateMemoryUsage(LastPointFilter filter) { + return 64; // Long.BYTES * 8 + } + }; + return partialPathToLatestTimeCache; + } + + @Override + public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) + throws Exception { + if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) + && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { + return; + } + + final AtomicReference exception = new AtomicReference<>(); + + Iterable iterable = + tabletInsertionEvent.processRowByRow( + (row, rowCollector) -> { + processRow(row, rowCollector, exception); + }); + + final long captureTime = System.currentTimeMillis(); + iterable.forEach( + event -> { + try { + PipeRawTabletInsertionEvent insertionEvent = (PipeRawTabletInsertionEvent) event; + eventCollector.collect( + new PipeLastPointTabletEvent( + insertionEvent.convertToTablet(), + captureTime, + partialPathToLatestTimeCache, + insertionEvent.getPipeName(), + insertionEvent.getCreationTime(), + insertionEvent.getPipeTaskMeta(), + insertionEvent.getPipePattern(), + insertionEvent.getStartTime(), + insertionEvent.getEndTime())); + } catch (Exception e) { + exception.set(e); + } + }); + + if (exception.get() != null) { + throw exception.get(); + } + } + + protected void processRow( + Row row, RowCollector rowCollector, AtomicReference exception) { + + final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) row); + + boolean hasLastPointMeasurements = false; + for (int i = 0, size = row.size(); i < size; i++) { + if (row.isNull(i)) { + continue; + } + TimeValuePair timeValuePair = null; + PartialPath path = null; + try { + path = + new PartialPath( + row.getDeviceId() + TsFileConstant.PATH_SEPARATOR + row.getColumnName(i)); + } catch (Exception e) { + exception.set(e); + continue; + } + + timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(path); + final boolean timeValuePairIsNotNull = timeValuePair != null; + if (timeValuePairIsNotNull && timeValuePair.getTimestamp() > row.getTime()) { + remarkableRow.markNull(i); + return; + } + + final LastPointFilter filter = + partialPathToLatestTimeCache.getPartialPathLastObject(path.getFullPath()); + if (filter != null) { + if (filter.filter(row.getTime(), row.getObject(i))) { + hasLastPointMeasurements = true; + } else { + remarkableRow.markNull(i); + } + } else { + hasLastPointMeasurements = true; + partialPathToLatestTimeCache.setPartialPathLastObject( + path.getFullPath(), new LastPointFilter<>(row.getTime(), row.getObject(i))); + } + + if (hasLastPointMeasurements) { + try { + rowCollector.collectRow(row); + } catch (Exception e) { + exception.set(e); + } + } + } + } + + @Override + public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) + throws Exception { + try { + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + process(tabletInsertionEvent, eventCollector); + } + } finally { + tsFileInsertionEvent.close(); + } + } + + @Override + public void process(Event event, EventCollector eventCollector) throws IOException { + if (!(event instanceof PipeLastPointTsBlockEvent)) { + eventCollector.collect(event); + return; + } + PipeLastPointTsBlockEvent tsBlockEvent = (PipeLastPointTsBlockEvent) event; + + final PipeLastPointTabletEvent pipeLastPointTabletEvent = + tsBlockEvent.toPipeLastPointTabletEvent(partialPathToLatestTimeCache); + + eventCollector.collect(pipeLastPointTabletEvent); + } + + @Override + public void validate(PipeParameterValidator validator) throws Exception { + + memoryLimitInBytes = + validator + .getParameters() + .getLongOrDefault( + PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY, + PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE); + + validator.validate( + memoryLimitInBytes -> (Long) memoryLimitInBytes > 0, + String.format( + "%s must be > 0, but got %s", + PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY, memoryLimitInBytes), + memoryLimitInBytes); + } + + @Override + public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) + throws Exception { + partialPathToLatestTimeCache = initPathLastObjectCache(memoryLimitInBytes); + } + + @Override + public void close() throws Exception { + partialPathToLatestTimeCache.close(); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java index 22bc87b2917b..b4525afc4f70 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java @@ -32,6 +32,11 @@ public class PipeProcessorConstant { "processor.down-sampling.memory-limit-in-bytes"; public static final long PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE = 16 * MB; + public static final String PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY = + "processor.last-point-sampling.memory-limit-in-bytes"; + public static final long PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE = + 16 * MB; + public static final String PROCESSOR_OUTPUT_MAX_DELAY_SECONDS_KEY = "processor.output.max-delay-seconds"; public static final long PROCESSOR_OUTPUT_MAX_DELAY_SECONDS_DEFAULT_VALUE = -1; From e9125956f5f986f1078a81fd948115080b310d59 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Tue, 6 Aug 2024 16:45:23 +0800 Subject: [PATCH 06/12] modify code format --- .../lastpoint/LastPointSamplingProcessor.java | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java index 0ac1aeeb9fff..2b920b58e0fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java @@ -78,7 +78,7 @@ public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector ev final AtomicReference exception = new AtomicReference<>(); - Iterable iterable = + final Iterable iterable = tabletInsertionEvent.processRowByRow( (row, rowCollector) -> { processRow(row, rowCollector, exception); @@ -115,44 +115,41 @@ protected void processRow( final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) row); - boolean hasLastPointMeasurements = false; + boolean hasLatestPointMeasurements = false; for (int i = 0, size = row.size(); i < size; i++) { if (row.isNull(i)) { continue; } TimeValuePair timeValuePair = null; - PartialPath path = null; + String fullPath = row.getDeviceId() + TsFileConstant.PATH_SEPARATOR + row.getColumnName(i); try { - path = - new PartialPath( - row.getDeviceId() + TsFileConstant.PATH_SEPARATOR + row.getColumnName(i)); + // global latest point filter + timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(new PartialPath(fullPath)); } catch (Exception e) { exception.set(e); - continue; } - timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(path); - final boolean timeValuePairIsNotNull = timeValuePair != null; - if (timeValuePairIsNotNull && timeValuePair.getTimestamp() > row.getTime()) { + if (timeValuePair != null && timeValuePair.getTimestamp() > row.getTime()) { remarkableRow.markNull(i); return; } + // local latest point filter final LastPointFilter filter = - partialPathToLatestTimeCache.getPartialPathLastObject(path.getFullPath()); + partialPathToLatestTimeCache.getPartialPathLastObject(fullPath); if (filter != null) { if (filter.filter(row.getTime(), row.getObject(i))) { - hasLastPointMeasurements = true; + hasLatestPointMeasurements = true; } else { remarkableRow.markNull(i); } } else { - hasLastPointMeasurements = true; + hasLatestPointMeasurements = true; partialPathToLatestTimeCache.setPartialPathLastObject( - path.getFullPath(), new LastPointFilter<>(row.getTime(), row.getObject(i))); + fullPath, new LastPointFilter<>(row.getTime(), row.getObject(i))); } - if (hasLastPointMeasurements) { + if (hasLatestPointMeasurements) { try { rowCollector.collectRow(row); } catch (Exception e) { From 9d2d932e73098c28b92aead556d9ad61858188c2 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Tue, 6 Aug 2024 17:33:16 +0800 Subject: [PATCH 07/12] add comment --- .../lastpoint/LastPointFilter.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java index 0bc37c11a866..32cdd2302dc8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java @@ -23,24 +23,26 @@ public class LastPointFilter { - private long lastStoredTimestamp; + /** The last point time and value we compare current point against Timestamp and Value */ + private long lastPointTimestamp; - private T lastStoredValue; + private T lastPointValue; + // Record whether it is consumed by Sink private boolean isConsumed; public LastPointFilter(final long firstTimestamp, final T firstValue) { - this.lastStoredTimestamp = firstTimestamp; - this.lastStoredValue = firstValue; + this.lastPointTimestamp = firstTimestamp; + this.lastPointValue = firstValue; this.isConsumed = false; } public boolean filter(final long timestamp, final T value) { - if (this.lastStoredTimestamp > timestamp) { + if (this.lastPointTimestamp > timestamp) { return false; } - if (this.lastStoredTimestamp == timestamp && Objects.equals(lastStoredValue, value)) { + if (this.lastPointTimestamp == timestamp && Objects.equals(lastPointValue, value)) { return false; } @@ -49,8 +51,8 @@ public boolean filter(final long timestamp, final T value) { } private void reset(final long timestamp, final T value) { - this.lastStoredTimestamp = timestamp; - this.lastStoredValue = value; + this.lastPointTimestamp = timestamp; + this.lastPointValue = value; this.isConsumed = false; } From 1a00ea5bcac1658f75dec0f23c87fb4afdda8ee2 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Tue, 6 Aug 2024 17:39:44 +0800 Subject: [PATCH 08/12] add comment --- .../processor/downsampling/lastpoint/LastPointFilter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java index 32cdd2302dc8..0dc79042a51c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java @@ -23,12 +23,12 @@ public class LastPointFilter { - /** The last point time and value we compare current point against Timestamp and Value */ + /** The last point time and value we compare current point against timestamp and value */ private long lastPointTimestamp; private T lastPointValue; - // Record whether it is consumed by Sink + // Record whether it is consumed by sink private boolean isConsumed; public LastPointFilter(final long firstTimestamp, final T firstValue) { From b7f15c23946ba2ee397defab411a715324733c11 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 8 Aug 2024 15:12:47 +0800 Subject: [PATCH 09/12] add PipeLastPointTsBlockEventTest --- .../tsblock/PipeLastPointTsBlockEvent.java | 28 ++- .../lastpoint/LastPointSamplingProcessor.java | 2 +- .../event/PipeLastPointTsBlockEventTest.java | 214 ++++++++++++++++++ 3 files changed, 235 insertions(+), 9 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java index f3906cd11a42..ccfc48f51ccc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java @@ -34,6 +34,7 @@ import org.apache.tsfile.block.column.Column; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.DateUtils; import org.apache.tsfile.write.UnSupportedDataTypeException; import org.apache.tsfile.write.record.Tablet; @@ -54,7 +55,7 @@ public class PipeLastPointTsBlockEvent extends EnrichedEvent { private PipeMemoryBlock pipeMemoryBlock; - protected PipeLastPointTsBlockEvent( + public PipeLastPointTsBlockEvent( final TsBlock tsBlock, final long captureTime, final PartialPath partialPath, @@ -129,7 +130,7 @@ public boolean mayEventPathsOverlappedWithPattern() { /////////////////////////// PipeLastPointTabletEvent /////////////////////////// - public PipeLastPointTabletEvent toPipeLastPointTabletEvent( + public PipeLastPointTabletEvent convertToPipeLastPointTabletEvent( PartialPathLastObjectCache> partialPathToLatestTimeCache) { return new PipeLastPointTabletEvent( convertToTablet(), @@ -146,10 +147,22 @@ public PipeLastPointTabletEvent toPipeLastPointTabletEvent( /////////////////////////// convertToTablet /////////////////////////// private Tablet convertToTablet() { - Object[] values = new Object[tsBlock.getValueColumnCount()]; - for (int i = 0; i < tsBlock.getValueColumnCount(); ++i) { - Column column = tsBlock.getColumn(i); - TSDataType type = column.getDataType(); + final int columnCount = tsBlock.getValueColumnCount(); + Object[] values = new Object[columnCount]; + BitMap[] bitMaps = new BitMap[columnCount]; + + for (int i = 0; i < columnCount; ++i) { + final Column column = tsBlock.getColumn(i); + final int rowCount = column.getPositionCount(); + final BitMap bitMap = new BitMap(rowCount); + for (int j = 0; j < rowCount; j++) { + if (column.isNull(j)) { + bitMap.mark(j); + } + } + bitMaps[i] = bitMap; + + final TSDataType type = column.getDataType(); switch (type) { case BOOLEAN: values[i] = column.getBooleans(); @@ -192,10 +205,9 @@ private Tablet convertToTablet() { measurementSchemas, tsBlock.getTimeColumn().getLongs(), values, - null, + bitMaps, tsBlock.getValueColumnCount()); - tablet.initBitMaps(); return tablet; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java index 2b920b58e0fe..b9bc2b201a56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java @@ -181,7 +181,7 @@ public void process(Event event, EventCollector eventCollector) throws IOExcepti PipeLastPointTsBlockEvent tsBlockEvent = (PipeLastPointTsBlockEvent) event; final PipeLastPointTabletEvent pipeLastPointTabletEvent = - tsBlockEvent.toPipeLastPointTabletEvent(partialPathToLatestTimeCache); + tsBlockEvent.convertToPipeLastPointTabletEvent(partialPathToLatestTimeCache); eventCollector.collect(pipeLastPointTabletEvent); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java new file mode 100644 index 000000000000..5154f7523497 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeLastPointTabletEvent; +import org.apache.iotdb.db.pipe.event.common.tsblock.PipeLastPointTsBlockEvent; +import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.utils.DateUtils; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class PipeLastPointTsBlockEventTest { + + public TsBlock createTsBlock() { + long[] timeArray = {1L, 2L, 3L, 4L, 5L}; + boolean[] booleanValueArray = {true, false, false, false, true}; + boolean[] booleanIsNull = {true, true, false, true, false}; + int[] intValueArray = {10, 20, 30, 40, 50}; + boolean[] intIsNull = {false, true, false, false, true}; + long[] longValueArray = {100L, 200L, 300L, 400, 500L}; + boolean[] longIsNull = {true, false, false, true, true}; + float[] floatValueArray = {1000.0f, 2000.0f, 3000.0f, 4000.0f, 5000.0f}; + boolean[] floatIsNull = {false, false, true, true, false}; + double[] doubleValueArray = {10000.0, 20000.0, 30000.0, 40000.0, 50000.0}; + boolean[] doubleIsNull = {true, false, false, true, false}; + Binary[] binaryValueArray = { + new Binary("19970909", TSFileConfig.STRING_CHARSET), + new Binary("ty", TSFileConfig.STRING_CHARSET), + new Binary("love", TSFileConfig.STRING_CHARSET), + new Binary("zm", TSFileConfig.STRING_CHARSET), + new Binary("19950421", TSFileConfig.STRING_CHARSET) + }; + boolean[] binaryIsNull = {false, false, false, false, false}; + + TsBlockBuilder builder = new TsBlockBuilder(getMeasurementDataTypes()); + for (int i = 0; i < timeArray.length; i++) { + builder.getTimeColumnBuilder().writeLong(timeArray[i]); + if (booleanIsNull[i]) { + builder.getColumnBuilder(0).appendNull(); + } else { + builder.getColumnBuilder(0).writeBoolean(booleanValueArray[i]); + } + if (intIsNull[i]) { + builder.getColumnBuilder(1).appendNull(); + } else { + builder.getColumnBuilder(1).writeInt(intValueArray[i]); + } + if (longIsNull[i]) { + builder.getColumnBuilder(2).appendNull(); + } else { + builder.getColumnBuilder(2).writeLong(longValueArray[i]); + } + if (floatIsNull[i]) { + builder.getColumnBuilder(3).appendNull(); + } else { + builder.getColumnBuilder(3).writeFloat(floatValueArray[i]); + } + if (doubleIsNull[i]) { + builder.getColumnBuilder(4).appendNull(); + } else { + builder.getColumnBuilder(4).writeDouble(doubleValueArray[i]); + } + if (binaryIsNull[i]) { + builder.getColumnBuilder(5).appendNull(); + } else { + builder.getColumnBuilder(5).writeBinary(binaryValueArray[i]); + } + builder.declarePosition(); + } + return builder.build(); + } + + private String[] getMeasurementIdentifiers() { + return new String[] {"s1", "s2", "s3", "s4", "s5", "s6"}; + } + + private List getMeasurementDataTypes() { + return Arrays.asList( + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT); + } + + private List createMeasurementSchemas() { + List measurementSchemas = new ArrayList<>(); + final String[] measurementIds = getMeasurementIdentifiers(); + final List dataTypes = getMeasurementDataTypes(); + + for (int i = 0; i < measurementIds.length; i++) { + measurementSchemas.add(new MeasurementSchema(measurementIds[i], dataTypes.get(i))); + } + + return measurementSchemas; + } + + private PartialPath createPartialPath() throws IllegalPathException { + return new PartialPath(DeviceIDFactory.getInstance().getDeviceID("root.ts")); + } + + private PipeLastPointTsBlockEvent generatePipeLastPointEvent( + TsBlock tsBlock, PartialPath partialPath, List measurementSchemas) + throws IllegalPathException { + PipeLastPointTsBlockEvent pipeLastPointTsBlockEvent = + new PipeLastPointTsBlockEvent( + tsBlock, + System.currentTimeMillis(), + partialPath, + measurementSchemas, + "test", + System.currentTimeMillis(), + null, + null, + 0L, + 1L); + + return pipeLastPointTsBlockEvent; + } + + @Test + public void convertToPipeLastPointTabletEventTest() throws IllegalPathException { + TsBlock tsBlock = createTsBlock(); + PartialPath partialPath = createPartialPath(); + List measurementSchemas = createMeasurementSchemas(); + PipeLastPointTsBlockEvent tsBlockEvent = + generatePipeLastPointEvent(tsBlock, partialPath, measurementSchemas); + + PipeLastPointTabletEvent lastPointTabletEvent = + tsBlockEvent.convertToPipeLastPointTabletEvent(null); + + Tablet tablet = lastPointTabletEvent.getTablet(); + + Object[] values = tablet.values; + int rowNum = tsBlock.getTimeColumn().getPositionCount(); + int columnNum = values.length; + for (int row = 0; row < rowNum; row++) { + Assert.assertEquals(tablet.timestamps[row], tsBlock.getTimeColumn().getLong(row)); + for (int column = 0; column < columnNum; column++) { + Column tsColumn = tsBlock.getColumn(column); + BitMap bitMap = tablet.bitMaps[column]; + Assert.assertEquals(tsColumn.isNull(row), bitMap.isMarked(row)); + if (tsColumn.isNull(row)) { + continue; + } + switch (tsColumn.getDataType()) { + case BOOLEAN: + Assert.assertEquals(tsColumn.getBoolean(row), ((boolean[]) values[column])[row]); + break; + case INT32: + Assert.assertEquals(tsColumn.getInt(row), ((int[]) values[column])[row]); + break; + case DATE: + Assert.assertEquals( + DateUtils.parseIntToLocalDate(tsColumn.getInt(row)), + ((LocalDate[]) values[column])[row]); + break; + case INT64: + case TIMESTAMP: + Assert.assertEquals(tsColumn.getLong(row), ((long[]) values[column])[row]); + break; + case FLOAT: + Assert.assertEquals( + Float.compare(tsColumn.getFloat(row), ((float[]) values[column])[row]), 0); + break; + case DOUBLE: + Assert.assertEquals( + Double.compare(tsColumn.getDouble(row), ((double[]) values[column])[row]), 0); + break; + case TEXT: + case BLOB: + case STRING: + Assert.assertEquals(tsColumn.getBinary(row), ((Binary[]) values[column])[row]); + break; + } + } + } + } +} From fd9a7fadf7655ace66ec4e27e3fe690ad5a9d76b Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Fri, 9 Aug 2024 19:41:39 +0800 Subject: [PATCH 10/12] Modify the code of Event, Processor and Sink --- .../dml/insertion/TabletInsertionEvent.java | 10 +++ .../PipeInsertNodeTabletInsertionEvent.java | 11 +++ .../tablet/PipeLastPointTabletEvent.java | 13 +-- .../tablet/PipeRawTabletInsertionEvent.java | 10 +++ .../tablet/TabletInsertionDataContainer.java | 28 +++++++ .../tsblock/PipeLastPointTsBlockEvent.java | 22 ++++- .../lastpoint/LastPointFilter.java | 65 +++++++++++---- .../lastpoint/LastPointSamplingProcessor.java | 53 ++++++++++-- .../SubscriptionPipeTabletEventBatch.java | 80 +++++++++++++++++++ .../event/PipeLastPointTsBlockEventTest.java | 3 +- .../commons/utils/function/QuadConsumer.java | 26 ++++++ 11 files changed, 288 insertions(+), 33 deletions(-) create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/QuadConsumer.java diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java index 6e1575a464f9..a8eb928cd8c4 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java @@ -45,4 +45,14 @@ public interface TabletInsertionEvent extends Event { * contains the results collected by the {@link RowCollector} */ Iterable processTablet(BiConsumer consumer); + + /** + * The consumer processes the row of data with the largest timestamp and collects the results by + * {@link RowCollector}. + * + * @return {@code Iterable} a list of new {@link TabletInsertionEvent} + * contains the results collected by the {@link RowCollector} + */ + Iterable processMaxTimestampRowByRow( + BiConsumer consumer); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index de3e39ed4127..4befec44d3fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -301,6 +301,17 @@ public Iterable processTablet( .collect(Collectors.toList()); } + @Override + public Iterable processMaxTimestampRowByRow( + BiConsumer consumer) { + return initDataContainers().stream() + .map( + tabletInsertionDataContainer -> + tabletInsertionDataContainer.processMaxTimestampRowByRow(consumer)) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + /////////////////////////// convertToTablet /////////////////////////// public boolean isAligned(final int i) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java index 0d462cb1d10e..0fdf111a40fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java @@ -35,15 +35,12 @@ public class PipeLastPointTabletEvent extends EnrichedEvent { private Tablet tablet; - private final long captureTime; - private PipeTabletMemoryBlock allocatedMemoryBlock; private PartialPathLastObjectCache> partialPathToLatestTimeCache; public PipeLastPointTabletEvent( final Tablet tablet, - final long captureTime, final PartialPathLastObjectCache> partialPathToLatestTimeCache, final String pipeName, final long creationTime, @@ -53,7 +50,6 @@ public PipeLastPointTabletEvent( final long endTime) { super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime); this.tablet = tablet; - this.captureTime = captureTime; this.partialPathToLatestTimeCache = partialPathToLatestTimeCache; } @@ -85,7 +81,6 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( long endTime) { return new PipeLastPointTabletEvent( tablet, - captureTime, partialPathToLatestTimeCache, pipeName, creationTime, @@ -115,8 +110,8 @@ public boolean mayEventPathsOverlappedWithPattern() { @Override public String toString() { return String.format( - "PipeLastPointTabletEvent{tablet=%s, captureTime=%s, allocatedMemoryBlock=%s}", - tablet, captureTime, allocatedMemoryBlock) + "PipeLastPointTabletEvent{tablet=%s, allocatedMemoryBlock=%s}", + tablet, allocatedMemoryBlock) + " - " + super.toString(); } @@ -127,7 +122,7 @@ public Tablet getTablet() { return tablet; } - public long getCaptureTime() { - return captureTime; + public PartialPathLastObjectCache> getPartialPathToLatestTimeCache() { + return partialPathToLatestTimeCache; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 65da3828931b..a8fb765bd9d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -232,6 +232,16 @@ public Iterable processTablet( return dataContainer.processTablet(consumer); } + @Override + public Iterable processMaxTimestampRowByRow( + BiConsumer consumer) { + if (dataContainer == null) { + dataContainer = + new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, isAligned, pipePattern); + } + return dataContainer.processMaxTimestampRowByRow(consumer); + } + /////////////////////////// convertToTablet /////////////////////////// public boolean isAligned() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index 454854115544..df2c3c8e3c81 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -624,6 +624,34 @@ public List processRowByRow(final BiConsumer processMaxTimestampRowByRow( + final BiConsumer consumer) { + if (valueColumns.length == 0 || timestampColumn.length == 0) { + return Collections.emptyList(); + } + + final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, sourceEvent); + int maxTimestampRowCount = 0; + for (int i = 1; i < rowCount; i++) { + if (timestampColumn[maxTimestampRowCount] < timestampColumn[i]) { + maxTimestampRowCount = i; + } + consumer.accept( + new PipeRow( + maxTimestampRowCount, + deviceId, + isAligned, + measurementSchemaList, + timestampColumn, + valueColumnTypes, + valueColumns, + nullValueColumnBitmaps, + columnNameStringList), + rowCollector); + } + return rowCollector.convertToTabletInsertionEvents(shouldReport); + } + public List processTablet(final BiConsumer consumer) { final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, sourceEvent); consumer.accept(convertToTablet(), rowCollector); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java index ccfc48f51ccc..13248ff513a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.pattern.PipePattern; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.utils.function.QuadConsumer; import org.apache.iotdb.db.pipe.event.common.tablet.PipeLastPointTabletEvent; import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; import org.apache.iotdb.db.pipe.processor.downsampling.lastpoint.LastPointFilter; @@ -131,10 +132,15 @@ public boolean mayEventPathsOverlappedWithPattern() { /////////////////////////// PipeLastPointTabletEvent /////////////////////////// public PipeLastPointTabletEvent convertToPipeLastPointTabletEvent( - PartialPathLastObjectCache> partialPathToLatestTimeCache) { + PartialPathLastObjectCache> partialPathToLatestTimeCache, + QuadConsumer> filterConsumer) { + BitMap columnSelectionMap = new BitMap(tsBlock.getValueColumnCount()); + filterConsumer.accept(columnSelectionMap, tsBlock, partialPath.getDevice(), measurementSchemas); + if (columnSelectionMap.isAllMarked()) { + return null; + } return new PipeLastPointTabletEvent( convertToTablet(), - captureTime, partialPathToLatestTimeCache, pipeName, creationTime, @@ -227,4 +233,16 @@ public String toString() { public long getCaptureTime() { return captureTime; } + + public PartialPath getPartialPath() { + return partialPath; + } + + public List getMeasurementSchemas() { + return measurementSchemas; + } + + public TsBlock getTsBlock() { + return tsBlock; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java index 0dc79042a51c..eed3db498694 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java @@ -24,39 +24,72 @@ public class LastPointFilter { /** The last point time and value we compare current point against timestamp and value */ - private long lastPointTimestamp; + private volatile long lastPointTimestamp; - private T lastPointValue; + private volatile T lastPointValue; // Record whether it is consumed by sink - private boolean isConsumed; + private volatile boolean isConsumed; public LastPointFilter(final long firstTimestamp, final T firstValue) { this.lastPointTimestamp = firstTimestamp; this.lastPointValue = firstValue; - this.isConsumed = false; } public boolean filter(final long timestamp, final T value) { - if (this.lastPointTimestamp > timestamp) { - return false; - } + return filter(timestamp, value, false); + } - if (this.lastPointTimestamp == timestamp && Objects.equals(lastPointValue, value)) { - return false; - } + public boolean filterAndMarkAsConsumed(final long timestamp, final T value) { + return filter(timestamp, value, true); + } + + /** + * Filters timestamps and values to determine if the given timestamp is the most recent, and if + * the value has changed since the last recorded timestamp. + * + *

This method checks if the provided timestamp is newer than the last recorded timestamp. If + * the timestamp is the same, it further checks whether the value differs from the last recorded + * value. If the timestamp is not newer and the value has not changed, the method returns false. + * + *

Additionally, if the value is consumed by a sink (as indicated by the isConsumed flag being + * true), it marks the value as consumed in the internal state. + * + * @param timestamp The timestamp to check against the most recent timestamp. + * @param value The value to check for changes against the last recorded value. + * @param isConsumed A flag indicating whether the value has been consumed by a sink. If true, and + * the timestamp and value pass the filter conditions, the value is marked as consumed + * internally. + * @return {@code true} if the timestamp is newer or the value has changed since the last recorded + * timestamp; {@code false} otherwise. + */ + private boolean filter(final long timestamp, final T value, final boolean isConsumed) { + synchronized (this) { + if (this.lastPointTimestamp > timestamp) { + return false; + } - reset(timestamp, value); + // Check for the same timestamp and unchanged value, and whether it was previously consumed. + if (this.lastPointTimestamp == timestamp + && Objects.equals(lastPointValue, value) + && this.isConsumed) { + return false; + } + + // Reset the last recorded timestamp and value with the new ones. + reset(timestamp, value); + + // Mark as consumed if the value is consumed by a sink + if (isConsumed) { + this.isConsumed = true; + } + } return true; } private void reset(final long timestamp, final T value) { this.lastPointTimestamp = timestamp; this.lastPointValue = value; - this.isConsumed = false; - } - - public void consume() { - this.isConsumed = true; + isConsumed = false; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java index b9bc2b201a56..07e95c130ab8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java @@ -39,10 +39,15 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.tsfile.block.column.Column; import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.schema.MeasurementSchema; import java.io.IOException; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE; @@ -79,12 +84,11 @@ public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector ev final AtomicReference exception = new AtomicReference<>(); final Iterable iterable = - tabletInsertionEvent.processRowByRow( + tabletInsertionEvent.processMaxTimestampRowByRow( (row, rowCollector) -> { processRow(row, rowCollector, exception); }); - final long captureTime = System.currentTimeMillis(); iterable.forEach( event -> { try { @@ -92,7 +96,6 @@ public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector ev eventCollector.collect( new PipeLastPointTabletEvent( insertionEvent.convertToTablet(), - captureTime, partialPathToLatestTimeCache, insertionEvent.getPipeName(), insertionEvent.getCreationTime(), @@ -181,9 +184,12 @@ public void process(Event event, EventCollector eventCollector) throws IOExcepti PipeLastPointTsBlockEvent tsBlockEvent = (PipeLastPointTsBlockEvent) event; final PipeLastPointTabletEvent pipeLastPointTabletEvent = - tsBlockEvent.convertToPipeLastPointTabletEvent(partialPathToLatestTimeCache); + tsBlockEvent.convertToPipeLastPointTabletEvent( + partialPathToLatestTimeCache, this::markEligibleColumnsBasedOnFilter); - eventCollector.collect(pipeLastPointTabletEvent); + if (pipeLastPointTabletEvent != null) { + eventCollector.collect(pipeLastPointTabletEvent); + } } @Override @@ -214,4 +220,41 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati public void close() throws Exception { partialPathToLatestTimeCache.close(); } + + /** + * Marks a column in the TsBlock as eligible based on the presence of the latest data point. + * + *

This method evaluates a single row within the TsBlock to determine if the column contains + * the latest data point. If a LastPointFilter for the column is found in the cache, the method + * checks whether the current data point is the latest. If no filter is present in the cache, the + * column is automatically considered eligible. + * + * @param columnSelectionMap The BitMap that marks the eligibility of the column. + * @param tsBlock The TsBlock that contains the columns to be evaluated. + * @param deviceId The device identifier used to build the filter path. + * @param measurementSchemas The list of measurement schemas used to complete the filter path. + */ + private void markEligibleColumnsBasedOnFilter( + BitMap columnSelectionMap, + TsBlock tsBlock, + String deviceId, + List measurementSchemas) { + int columnCount = tsBlock.getValueColumnCount(); + + for (int i = 0; i < columnCount; i++) { + Column column = tsBlock.getColumn(i); + if (column.isNull(0)) { + columnSelectionMap.mark(i); + continue; + } + String fullPath = + deviceId + TsFileConstant.PATH_SEPARATOR + measurementSchemas.get(i).getMeasurementId(); + + final LastPointFilter filter = + partialPathToLatestTimeCache.getPartialPathLastObject(fullPath); + if (filter != null && !filter.filter(tsBlock.getTimeByIndex(0), column.getObject(0))) { + columnSelectionMap.mark(i); + } + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java index b0ef2b851162..29d233af8e44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java @@ -21,8 +21,11 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeLastPointTabletEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.db.pipe.processor.downsampling.lastpoint.LastPointFilter; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; @@ -33,11 +36,17 @@ import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload; +import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -145,6 +154,19 @@ private void constructBatch(final EnrichedEvent event) { if (firstEventProcessingTime == Long.MIN_VALUE) { firstEventProcessingTime = System.currentTimeMillis(); } + } else if (event instanceof PipeLastPointTabletEvent) { + final PipeLastPointTabletEvent lastPointTabletEvent = (PipeLastPointTabletEvent) event; + + final Tablet tablet = lastPointTabletEvent.getTablet(); + if (Objects.isNull(tablet)) { + return; + } + if (lastPointFilter(tablet, lastPointTabletEvent.getPartialPathToLatestTimeCache())) { + return; + } + tablets.add(tablet); + // tablet size in bytes + totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet); } } @@ -168,6 +190,64 @@ private List convertToTablets(final TabletInsertionEvent tabletInsertion return Collections.emptyList(); } + /////////////////////////////// filter /////////////////////////////// + + private boolean lastPointFilter( + Tablet tablet, PartialPathLastObjectCache> cache) { + String deviceIdentifier = tablet.deviceId; + Object[] dataValues = tablet.values; + int fieldCount = dataValues.length; + BitMap columnFilterMap = new BitMap(fieldCount); + + for (int i = 0; i < fieldCount; i++) { + BitMap fieldBitMap = tablet.bitMaps[fieldCount]; + + if (fieldBitMap.isMarked(0)) { + columnFilterMap.mark(i); + continue; + } + + MeasurementSchema schema = tablet.getSchemas().get(i); + String measurementPath = + deviceIdentifier + TsFileConstant.PATH_SEPARATOR + schema.getMeasurementId(); + + LastPointFilter filter = cache.getPartialPathLastObject(measurementPath); + if (filter != null + && !filter.filterAndMarkAsConsumed( + tablet.timestamps[i], getObject(dataValues[i], schema.getType()))) { + columnFilterMap.mark(i); + fieldBitMap.mark(0); + } + } + + return columnFilterMap.isAllMarked(); + } + + public Object getObject(final Object value, final TSDataType dataType) { + switch (dataType) { + case INT32: + return ((int[]) value)[0]; + case DATE: + return ((LocalDate[]) value)[0]; + case INT64: + case TIMESTAMP: + return ((long[]) value)[0]; + case FLOAT: + return ((float[]) value)[0]; + case DOUBLE: + return ((double[]) value)[0]; + case BOOLEAN: + return ((boolean[]) value)[0]; + case TEXT: + case BLOB: + case STRING: + return ((Binary[]) value)[0]; + default: + throw new UnsupportedOperationException( + String.format("unsupported data type %s", dataType)); + } + } + /////////////////////////////// stringify /////////////////////////////// public String toString() { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java index 5154f7523497..aa92bc6f4ced 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java @@ -162,7 +162,8 @@ public void convertToPipeLastPointTabletEventTest() throws IllegalPathException generatePipeLastPointEvent(tsBlock, partialPath, measurementSchemas); PipeLastPointTabletEvent lastPointTabletEvent = - tsBlockEvent.convertToPipeLastPointTabletEvent(null); + tsBlockEvent.convertToPipeLastPointTabletEvent( + null, (bitMap, block, deviceId, schemas) -> {}); Tablet tablet = lastPointTabletEvent.getTablet(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/QuadConsumer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/QuadConsumer.java new file mode 100644 index 000000000000..e0f1cc3f2707 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/QuadConsumer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.function; + +@FunctionalInterface +public interface QuadConsumer { + + void accept(T t, U u, V v, W w); +} From e64f901724ebfce203e62febd73b9233d476cd18 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Fri, 9 Aug 2024 20:09:32 +0800 Subject: [PATCH 11/12] Modify code format --- .../batch/SubscriptionPipeTabletEventBatch.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java index 29d233af8e44..76f6596b56fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java @@ -194,13 +194,13 @@ private List convertToTablets(final TabletInsertionEvent tabletInsertion private boolean lastPointFilter( Tablet tablet, PartialPathLastObjectCache> cache) { - String deviceIdentifier = tablet.deviceId; + String deviceId = tablet.deviceId; Object[] dataValues = tablet.values; - int fieldCount = dataValues.length; - BitMap columnFilterMap = new BitMap(fieldCount); + int columnCount = dataValues.length; + BitMap columnFilterMap = new BitMap(columnCount); - for (int i = 0; i < fieldCount; i++) { - BitMap fieldBitMap = tablet.bitMaps[fieldCount]; + for (int i = 0; i < columnCount; i++) { + BitMap fieldBitMap = tablet.bitMaps[columnCount]; if (fieldBitMap.isMarked(0)) { columnFilterMap.mark(i); @@ -208,8 +208,7 @@ private boolean lastPointFilter( } MeasurementSchema schema = tablet.getSchemas().get(i); - String measurementPath = - deviceIdentifier + TsFileConstant.PATH_SEPARATOR + schema.getMeasurementId(); + String measurementPath = deviceId + TsFileConstant.PATH_SEPARATOR + schema.getMeasurementId(); LastPointFilter filter = cache.getPartialPathLastObject(measurementPath); if (filter != null From 2c7409fcb32c590ab8eabe3ff737fcaef4d740c9 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Sun, 11 Aug 2024 15:23:08 +0800 Subject: [PATCH 12/12] Modify code --- .../tablet/TabletInsertionDataContainer.java | 2 +- .../common/tsblock/PipeLastPointTsBlockEvent.java | 15 ++++++++------- .../lastpoint/LastPointSamplingProcessor.java | 4 ++-- .../batch/SubscriptionPipeTabletEventBatch.java | 6 +++--- .../pipe/event/PipeLastPointTsBlockEventTest.java | 12 ++++++------ 5 files changed, 20 insertions(+), 19 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index 7bbeb511ec59..2e777964c477 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -651,7 +651,7 @@ public List processMaxTimestampRowByRow( consumer.accept( new PipeRow( maxTimestampRowCount, - deviceId, + getDeviceStr(), isAligned, measurementSchemaList, timestampColumn, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java index 13248ff513a3..bdd394b8b6e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java @@ -39,7 +39,7 @@ import org.apache.tsfile.utils.DateUtils; import org.apache.tsfile.write.UnSupportedDataTypeException; import org.apache.tsfile.write.record.Tablet; -import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.IMeasurementSchema; import java.time.LocalDate; import java.util.List; @@ -48,7 +48,7 @@ public class PipeLastPointTsBlockEvent extends EnrichedEvent { private final PartialPath partialPath; - private final List measurementSchemas; + private final List measurementSchemas; private final long captureTime; @@ -60,7 +60,7 @@ public PipeLastPointTsBlockEvent( final TsBlock tsBlock, final long captureTime, final PartialPath partialPath, - final List measurementSchemas, + final List measurementSchemas, final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, @@ -133,9 +133,10 @@ public boolean mayEventPathsOverlappedWithPattern() { public PipeLastPointTabletEvent convertToPipeLastPointTabletEvent( PartialPathLastObjectCache> partialPathToLatestTimeCache, - QuadConsumer> filterConsumer) { + QuadConsumer> filterConsumer) { BitMap columnSelectionMap = new BitMap(tsBlock.getValueColumnCount()); - filterConsumer.accept(columnSelectionMap, tsBlock, partialPath.getDevice(), measurementSchemas); + filterConsumer.accept( + columnSelectionMap, tsBlock, partialPath.getDevicePath().toString(), measurementSchemas); if (columnSelectionMap.isAllMarked()) { return null; } @@ -207,7 +208,7 @@ private Tablet convertToTablet() { final Tablet tablet = new Tablet( - partialPath.getDevice(), + partialPath.getDevicePath().getFullPath(), measurementSchemas, tsBlock.getTimeColumn().getLongs(), values, @@ -238,7 +239,7 @@ public PartialPath getPartialPath() { return partialPath; } - public List getMeasurementSchemas() { + public List getMeasurementSchemas() { return measurementSchemas; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java index 07e95c130ab8..71b2093ad0d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java @@ -44,7 +44,7 @@ import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.utils.BitMap; -import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.IMeasurementSchema; import java.io.IOException; import java.util.List; @@ -238,7 +238,7 @@ private void markEligibleColumnsBasedOnFilter( BitMap columnSelectionMap, TsBlock tsBlock, String deviceId, - List measurementSchemas) { + List measurementSchemas) { int columnCount = tsBlock.getValueColumnCount(); for (int i = 0; i < columnCount; i++) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java index 76f6596b56fb..d592617b10af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java @@ -41,7 +41,7 @@ import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.record.Tablet; -import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -194,7 +194,7 @@ private List convertToTablets(final TabletInsertionEvent tabletInsertion private boolean lastPointFilter( Tablet tablet, PartialPathLastObjectCache> cache) { - String deviceId = tablet.deviceId; + String deviceId = tablet.getDeviceId(); Object[] dataValues = tablet.values; int columnCount = dataValues.length; BitMap columnFilterMap = new BitMap(columnCount); @@ -207,7 +207,7 @@ private boolean lastPointFilter( continue; } - MeasurementSchema schema = tablet.getSchemas().get(i); + IMeasurementSchema schema = tablet.getSchemas().get(i); String measurementPath = deviceId + TsFileConstant.PATH_SEPARATOR + schema.getMeasurementId(); LastPointFilter filter = cache.getPartialPathLastObject(measurementPath); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java index aa92bc6f4ced..a3d0c2d40ec7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.pipe.event.common.tablet.PipeLastPointTabletEvent; import org.apache.iotdb.db.pipe.event.common.tsblock.PipeLastPointTsBlockEvent; -import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory; import org.apache.tsfile.block.column.Column; import org.apache.tsfile.common.conf.TSFileConfig; @@ -34,6 +33,7 @@ import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.DateUtils; import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; @@ -118,8 +118,8 @@ private List getMeasurementDataTypes() { TSDataType.TEXT); } - private List createMeasurementSchemas() { - List measurementSchemas = new ArrayList<>(); + private List createMeasurementSchemas() { + List measurementSchemas = new ArrayList<>(); final String[] measurementIds = getMeasurementIdentifiers(); final List dataTypes = getMeasurementDataTypes(); @@ -131,11 +131,11 @@ private List createMeasurementSchemas() { } private PartialPath createPartialPath() throws IllegalPathException { - return new PartialPath(DeviceIDFactory.getInstance().getDeviceID("root.ts")); + return new PartialPath("root.ts"); } private PipeLastPointTsBlockEvent generatePipeLastPointEvent( - TsBlock tsBlock, PartialPath partialPath, List measurementSchemas) + TsBlock tsBlock, PartialPath partialPath, List measurementSchemas) throws IllegalPathException { PipeLastPointTsBlockEvent pipeLastPointTsBlockEvent = new PipeLastPointTsBlockEvent( @@ -157,7 +157,7 @@ private PipeLastPointTsBlockEvent generatePipeLastPointEvent( public void convertToPipeLastPointTabletEventTest() throws IllegalPathException { TsBlock tsBlock = createTsBlock(); PartialPath partialPath = createPartialPath(); - List measurementSchemas = createMeasurementSchemas(); + List measurementSchemas = createMeasurementSchemas(); PipeLastPointTsBlockEvent tsBlockEvent = generatePipeLastPointEvent(tsBlock, partialPath, measurementSchemas);