diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java index 6e1575a464f9..a8eb928cd8c4 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java @@ -45,4 +45,14 @@ public interface TabletInsertionEvent extends Event { * contains the results collected by the {@link RowCollector} */ Iterable processTablet(BiConsumer consumer); + + /** + * The consumer processes the row of data with the largest timestamp and collects the results by + * {@link RowCollector}. + * + * @return {@code Iterable} a list of new {@link TabletInsertionEvent} + * contains the results collected by the {@link RowCollector} + */ + Iterable processMaxTimestampRowByRow( + BiConsumer consumer); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 45f8b70a8259..b9c749d93e22 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -301,6 +301,17 @@ public Iterable processTablet( .collect(Collectors.toList()); } + @Override + public Iterable processMaxTimestampRowByRow( + BiConsumer consumer) { + return initDataContainers().stream() + .map( + tabletInsertionDataContainer -> + tabletInsertionDataContainer.processMaxTimestampRowByRow(consumer)) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + /////////////////////////// convertToTablet /////////////////////////// public boolean isAligned(final int i) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java new file mode 100644 index 000000000000..0fdf111a40fb --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeLastPointTabletEvent.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tablet; + +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.pattern.PipePattern; +import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.db.pipe.processor.downsampling.lastpoint.LastPointFilter; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock; + +import org.apache.tsfile.write.record.Tablet; + +public class PipeLastPointTabletEvent extends EnrichedEvent { + + private Tablet tablet; + + private PipeTabletMemoryBlock allocatedMemoryBlock; + + private PartialPathLastObjectCache> partialPathToLatestTimeCache; + + public PipeLastPointTabletEvent( + final Tablet tablet, + final PartialPathLastObjectCache> partialPathToLatestTimeCache, + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta, + final PipePattern pipePattern, + final long startTime, + final long endTime) { + super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime); + this.tablet = tablet; + this.partialPathToLatestTimeCache = partialPathToLatestTimeCache; + } + + @Override + public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { + allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocateWithRetry(tablet); + return true; + } + + @Override + public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { + allocatedMemoryBlock.close(); + tablet = null; + return true; + } + + @Override + public ProgressIndex getProgressIndex() { + return MinimumProgressIndex.INSTANCE; + } + + @Override + public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( + String pipeName, + long creationTime, + PipeTaskMeta pipeTaskMeta, + PipePattern pattern, + long startTime, + long endTime) { + return new PipeLastPointTabletEvent( + tablet, + partialPathToLatestTimeCache, + pipeName, + creationTime, + pipeTaskMeta, + pipePattern, + startTime, + endTime); + } + + @Override + public boolean isGeneratedByPipe() { + return false; + } + + @Override + public boolean mayEventTimeOverlappedWithTimeRange() { + return true; + } + + @Override + public boolean mayEventPathsOverlappedWithPattern() { + return true; + } + + /////////////////////////// Object /////////////////////////// + + @Override + public String toString() { + return String.format( + "PipeLastPointTabletEvent{tablet=%s, allocatedMemoryBlock=%s}", + tablet, allocatedMemoryBlock) + + " - " + + super.toString(); + } + + /////////////////////////// Getter /////////////////////////// + + public Tablet getTablet() { + return tablet; + } + + public PartialPathLastObjectCache> getPartialPathToLatestTimeCache() { + return partialPathToLatestTimeCache; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 285121ae63ca..4706673c97e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -234,6 +234,16 @@ public Iterable processTablet( return dataContainer.processTablet(consumer); } + @Override + public Iterable processMaxTimestampRowByRow( + BiConsumer consumer) { + if (dataContainer == null) { + dataContainer = + new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, isAligned, pipePattern); + } + return dataContainer.processMaxTimestampRowByRow(consumer); + } + /////////////////////////// convertToTablet /////////////////////////// public boolean isAligned() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index c02b71af2d54..2e777964c477 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -636,6 +636,34 @@ public List processRowByRow(final BiConsumer processMaxTimestampRowByRow( + final BiConsumer consumer) { + if (valueColumns.length == 0 || timestampColumn.length == 0) { + return Collections.emptyList(); + } + + final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, sourceEvent); + int maxTimestampRowCount = 0; + for (int i = 1; i < rowCount; i++) { + if (timestampColumn[maxTimestampRowCount] < timestampColumn[i]) { + maxTimestampRowCount = i; + } + consumer.accept( + new PipeRow( + maxTimestampRowCount, + getDeviceStr(), + isAligned, + measurementSchemaList, + timestampColumn, + valueColumnTypes, + valueColumns, + nullValueColumnBitmaps, + columnNameStringList), + rowCollector); + } + return rowCollector.convertToTabletInsertionEvents(shouldReport); + } + public List processTablet(final BiConsumer consumer) { final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, sourceEvent); consumer.accept(convertToTablet(), rowCollector); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java new file mode 100644 index 000000000000..bdd394b8b6e7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsblock/PipeLastPointTsBlockEvent.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tsblock; + +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.pattern.PipePattern; +import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.utils.function.QuadConsumer; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeLastPointTabletEvent; +import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.db.pipe.processor.downsampling.lastpoint.LastPointFilter; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.utils.DateUtils; +import org.apache.tsfile.write.UnSupportedDataTypeException; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.time.LocalDate; +import java.util.List; + +public class PipeLastPointTsBlockEvent extends EnrichedEvent { + + private final PartialPath partialPath; + + private final List measurementSchemas; + + private final long captureTime; + + private TsBlock tsBlock; + + private PipeMemoryBlock pipeMemoryBlock; + + public PipeLastPointTsBlockEvent( + final TsBlock tsBlock, + final long captureTime, + final PartialPath partialPath, + final List measurementSchemas, + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta, + final PipePattern pipePattern, + final long startTime, + final long endTime) { + super(pipeName, creationTime, pipeTaskMeta, pipePattern, startTime, endTime); + this.tsBlock = tsBlock; + this.partialPath = partialPath; + this.measurementSchemas = measurementSchemas; + this.captureTime = captureTime; + } + + @Override + public boolean internallyIncreaseResourceReferenceCount(String holderMessage) { + pipeMemoryBlock = + PipeDataNodeResourceManager.memory().forceAllocate(tsBlock.getRetainedSizeInBytes()); + return true; + } + + @Override + public boolean internallyDecreaseResourceReferenceCount(String holderMessage) { + pipeMemoryBlock.close(); + tsBlock = null; + return false; + } + + @Override + public ProgressIndex getProgressIndex() { + return MinimumProgressIndex.INSTANCE; + } + + @Override + public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( + String pipeName, + long creationTime, + PipeTaskMeta pipeTaskMeta, + PipePattern pattern, + long startTime, + long endTime) { + return new PipeLastPointTsBlockEvent( + tsBlock, + captureTime, + partialPath, + measurementSchemas, + pipeName, + creationTime, + pipeTaskMeta, + pipePattern, + startTime, + endTime); + } + + @Override + public boolean isGeneratedByPipe() { + return false; + } + + @Override + public boolean mayEventTimeOverlappedWithTimeRange() { + return true; + } + + @Override + public boolean mayEventPathsOverlappedWithPattern() { + return true; + } + + /////////////////////////// PipeLastPointTabletEvent /////////////////////////// + + public PipeLastPointTabletEvent convertToPipeLastPointTabletEvent( + PartialPathLastObjectCache> partialPathToLatestTimeCache, + QuadConsumer> filterConsumer) { + BitMap columnSelectionMap = new BitMap(tsBlock.getValueColumnCount()); + filterConsumer.accept( + columnSelectionMap, tsBlock, partialPath.getDevicePath().toString(), measurementSchemas); + if (columnSelectionMap.isAllMarked()) { + return null; + } + return new PipeLastPointTabletEvent( + convertToTablet(), + partialPathToLatestTimeCache, + pipeName, + creationTime, + pipeTaskMeta, + pipePattern, + startTime, + endTime); + } + + /////////////////////////// convertToTablet /////////////////////////// + + private Tablet convertToTablet() { + final int columnCount = tsBlock.getValueColumnCount(); + Object[] values = new Object[columnCount]; + BitMap[] bitMaps = new BitMap[columnCount]; + + for (int i = 0; i < columnCount; ++i) { + final Column column = tsBlock.getColumn(i); + final int rowCount = column.getPositionCount(); + final BitMap bitMap = new BitMap(rowCount); + for (int j = 0; j < rowCount; j++) { + if (column.isNull(j)) { + bitMap.mark(j); + } + } + bitMaps[i] = bitMap; + + final TSDataType type = column.getDataType(); + switch (type) { + case BOOLEAN: + values[i] = column.getBooleans(); + break; + case INT32: + values[i] = column.getInts(); + break; + case DATE: + int[] dates = column.getInts(); + LocalDate[] localDates = new LocalDate[dates.length]; + for (int row = 0; row < dates.length; row++) { + localDates[row] = DateUtils.parseIntToLocalDate(dates[row]); + } + values[i] = localDates; + break; + case INT64: + case TIMESTAMP: + values[i] = column.getLongs(); + break; + case FLOAT: + values[i] = column.getFloats(); + break; + case DOUBLE: + values[i] = column.getDoubles(); + break; + case TEXT: + case BLOB: + case STRING: + values[i] = column.getBinaries(); + break; + default: + throw new UnSupportedDataTypeException( + "TsBlock format to Tablet format does not support type " + type); + } + } + + final Tablet tablet = + new Tablet( + partialPath.getDevicePath().getFullPath(), + measurementSchemas, + tsBlock.getTimeColumn().getLongs(), + values, + bitMaps, + tsBlock.getValueColumnCount()); + + return tablet; + } + + /////////////////////////// Object /////////////////////////// + + @Override + public String toString() { + return String.format( + "PipeLastPointTsBlockEvent{tsBlock=%s, partialPath=%s, measurementSchemas=%s, captureTime=%s, pipeMemoryBlock=%s}", + tsBlock, partialPath, measurementSchemas, captureTime, pipeMemoryBlock) + + " - " + + super.toString(); + } + + /////////////////////////// Getter /////////////////////////// + + public long getCaptureTime() { + return captureTime; + } + + public PartialPath getPartialPath() { + return partialPath; + } + + public List getMeasurementSchemas() { + return measurementSchemas; + } + + public TsBlock getTsBlock() { + return tsBlock; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java new file mode 100644 index 000000000000..eed3db498694 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointFilter.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.lastpoint; + +import java.util.Objects; + +public class LastPointFilter { + + /** The last point time and value we compare current point against timestamp and value */ + private volatile long lastPointTimestamp; + + private volatile T lastPointValue; + + // Record whether it is consumed by sink + private volatile boolean isConsumed; + + public LastPointFilter(final long firstTimestamp, final T firstValue) { + this.lastPointTimestamp = firstTimestamp; + this.lastPointValue = firstValue; + } + + public boolean filter(final long timestamp, final T value) { + return filter(timestamp, value, false); + } + + public boolean filterAndMarkAsConsumed(final long timestamp, final T value) { + return filter(timestamp, value, true); + } + + /** + * Filters timestamps and values to determine if the given timestamp is the most recent, and if + * the value has changed since the last recorded timestamp. + * + *

This method checks if the provided timestamp is newer than the last recorded timestamp. If + * the timestamp is the same, it further checks whether the value differs from the last recorded + * value. If the timestamp is not newer and the value has not changed, the method returns false. + * + *

Additionally, if the value is consumed by a sink (as indicated by the isConsumed flag being + * true), it marks the value as consumed in the internal state. + * + * @param timestamp The timestamp to check against the most recent timestamp. + * @param value The value to check for changes against the last recorded value. + * @param isConsumed A flag indicating whether the value has been consumed by a sink. If true, and + * the timestamp and value pass the filter conditions, the value is marked as consumed + * internally. + * @return {@code true} if the timestamp is newer or the value has changed since the last recorded + * timestamp; {@code false} otherwise. + */ + private boolean filter(final long timestamp, final T value, final boolean isConsumed) { + synchronized (this) { + if (this.lastPointTimestamp > timestamp) { + return false; + } + + // Check for the same timestamp and unchanged value, and whether it was previously consumed. + if (this.lastPointTimestamp == timestamp + && Objects.equals(lastPointValue, value) + && this.isConsumed) { + return false; + } + + // Reset the last recorded timestamp and value with the new ones. + reset(timestamp, value); + + // Mark as consumed if the value is consumed by a sink + if (isConsumed) { + this.isConsumed = true; + } + } + return true; + } + + private void reset(final long timestamp, final T value) { + this.lastPointTimestamp = timestamp; + this.lastPointValue = value; + isConsumed = false; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java new file mode 100644 index 000000000000..71b2093ad0d5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/lastpoint/LastPointSamplingProcessor.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.lastpoint; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.pipe.event.common.row.PipeRemarkableRow; +import org.apache.iotdb.db.pipe.event.common.row.PipeRow; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeLastPointTabletEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsblock.PipeLastPointTsBlockEvent; +import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; +import org.apache.iotdb.pipe.api.PipeProcessor; +import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.collector.EventCollector; +import org.apache.iotdb.pipe.api.collector.RowCollector; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY; + +public class LastPointSamplingProcessor implements PipeProcessor { + + private final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE = DataNodeSchemaCache.getInstance(); + + private PartialPathLastObjectCache> partialPathToLatestTimeCache; + + protected long memoryLimitInBytes; + + private PartialPathLastObjectCache> initPathLastObjectCache( + long memoryLimitInBytes) { + partialPathToLatestTimeCache = + new PartialPathLastObjectCache>(memoryLimitInBytes) { + @Override + protected long calculateMemoryUsage(LastPointFilter filter) { + return 64; // Long.BYTES * 8 + } + }; + return partialPathToLatestTimeCache; + } + + @Override + public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) + throws Exception { + if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) + && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { + return; + } + + final AtomicReference exception = new AtomicReference<>(); + + final Iterable iterable = + tabletInsertionEvent.processMaxTimestampRowByRow( + (row, rowCollector) -> { + processRow(row, rowCollector, exception); + }); + + iterable.forEach( + event -> { + try { + PipeRawTabletInsertionEvent insertionEvent = (PipeRawTabletInsertionEvent) event; + eventCollector.collect( + new PipeLastPointTabletEvent( + insertionEvent.convertToTablet(), + partialPathToLatestTimeCache, + insertionEvent.getPipeName(), + insertionEvent.getCreationTime(), + insertionEvent.getPipeTaskMeta(), + insertionEvent.getPipePattern(), + insertionEvent.getStartTime(), + insertionEvent.getEndTime())); + } catch (Exception e) { + exception.set(e); + } + }); + + if (exception.get() != null) { + throw exception.get(); + } + } + + protected void processRow( + Row row, RowCollector rowCollector, AtomicReference exception) { + + final PipeRemarkableRow remarkableRow = new PipeRemarkableRow((PipeRow) row); + + boolean hasLatestPointMeasurements = false; + for (int i = 0, size = row.size(); i < size; i++) { + if (row.isNull(i)) { + continue; + } + TimeValuePair timeValuePair = null; + String fullPath = row.getDeviceId() + TsFileConstant.PATH_SEPARATOR + row.getColumnName(i); + try { + // global latest point filter + timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(new PartialPath(fullPath)); + } catch (Exception e) { + exception.set(e); + } + + if (timeValuePair != null && timeValuePair.getTimestamp() > row.getTime()) { + remarkableRow.markNull(i); + return; + } + + // local latest point filter + final LastPointFilter filter = + partialPathToLatestTimeCache.getPartialPathLastObject(fullPath); + if (filter != null) { + if (filter.filter(row.getTime(), row.getObject(i))) { + hasLatestPointMeasurements = true; + } else { + remarkableRow.markNull(i); + } + } else { + hasLatestPointMeasurements = true; + partialPathToLatestTimeCache.setPartialPathLastObject( + fullPath, new LastPointFilter<>(row.getTime(), row.getObject(i))); + } + + if (hasLatestPointMeasurements) { + try { + rowCollector.collectRow(row); + } catch (Exception e) { + exception.set(e); + } + } + } + } + + @Override + public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) + throws Exception { + try { + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + process(tabletInsertionEvent, eventCollector); + } + } finally { + tsFileInsertionEvent.close(); + } + } + + @Override + public void process(Event event, EventCollector eventCollector) throws IOException { + if (!(event instanceof PipeLastPointTsBlockEvent)) { + eventCollector.collect(event); + return; + } + PipeLastPointTsBlockEvent tsBlockEvent = (PipeLastPointTsBlockEvent) event; + + final PipeLastPointTabletEvent pipeLastPointTabletEvent = + tsBlockEvent.convertToPipeLastPointTabletEvent( + partialPathToLatestTimeCache, this::markEligibleColumnsBasedOnFilter); + + if (pipeLastPointTabletEvent != null) { + eventCollector.collect(pipeLastPointTabletEvent); + } + } + + @Override + public void validate(PipeParameterValidator validator) throws Exception { + + memoryLimitInBytes = + validator + .getParameters() + .getLongOrDefault( + PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY, + PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE); + + validator.validate( + memoryLimitInBytes -> (Long) memoryLimitInBytes > 0, + String.format( + "%s must be > 0, but got %s", + PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY, memoryLimitInBytes), + memoryLimitInBytes); + } + + @Override + public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) + throws Exception { + partialPathToLatestTimeCache = initPathLastObjectCache(memoryLimitInBytes); + } + + @Override + public void close() throws Exception { + partialPathToLatestTimeCache.close(); + } + + /** + * Marks a column in the TsBlock as eligible based on the presence of the latest data point. + * + *

This method evaluates a single row within the TsBlock to determine if the column contains + * the latest data point. If a LastPointFilter for the column is found in the cache, the method + * checks whether the current data point is the latest. If no filter is present in the cache, the + * column is automatically considered eligible. + * + * @param columnSelectionMap The BitMap that marks the eligibility of the column. + * @param tsBlock The TsBlock that contains the columns to be evaluated. + * @param deviceId The device identifier used to build the filter path. + * @param measurementSchemas The list of measurement schemas used to complete the filter path. + */ + private void markEligibleColumnsBasedOnFilter( + BitMap columnSelectionMap, + TsBlock tsBlock, + String deviceId, + List measurementSchemas) { + int columnCount = tsBlock.getValueColumnCount(); + + for (int i = 0; i < columnCount; i++) { + Column column = tsBlock.getColumn(i); + if (column.isNull(0)) { + columnSelectionMap.mark(i); + continue; + } + String fullPath = + deviceId + TsFileConstant.PATH_SEPARATOR + measurementSchemas.get(i).getMeasurementId(); + + final LastPointFilter filter = + partialPathToLatestTimeCache.getPartialPathLastObject(fullPath); + if (filter != null && !filter.filter(tsBlock.getTimeByIndex(0), column.getObject(0))) { + columnSelectionMap.mark(i); + } + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java index b0ef2b851162..d592617b10af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java @@ -21,8 +21,11 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeLastPointTabletEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.db.pipe.processor.downsampling.lastpoint.LastPointFilter; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; @@ -33,11 +36,17 @@ import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload; +import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -145,6 +154,19 @@ private void constructBatch(final EnrichedEvent event) { if (firstEventProcessingTime == Long.MIN_VALUE) { firstEventProcessingTime = System.currentTimeMillis(); } + } else if (event instanceof PipeLastPointTabletEvent) { + final PipeLastPointTabletEvent lastPointTabletEvent = (PipeLastPointTabletEvent) event; + + final Tablet tablet = lastPointTabletEvent.getTablet(); + if (Objects.isNull(tablet)) { + return; + } + if (lastPointFilter(tablet, lastPointTabletEvent.getPartialPathToLatestTimeCache())) { + return; + } + tablets.add(tablet); + // tablet size in bytes + totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet); } } @@ -168,6 +190,63 @@ private List convertToTablets(final TabletInsertionEvent tabletInsertion return Collections.emptyList(); } + /////////////////////////////// filter /////////////////////////////// + + private boolean lastPointFilter( + Tablet tablet, PartialPathLastObjectCache> cache) { + String deviceId = tablet.getDeviceId(); + Object[] dataValues = tablet.values; + int columnCount = dataValues.length; + BitMap columnFilterMap = new BitMap(columnCount); + + for (int i = 0; i < columnCount; i++) { + BitMap fieldBitMap = tablet.bitMaps[columnCount]; + + if (fieldBitMap.isMarked(0)) { + columnFilterMap.mark(i); + continue; + } + + IMeasurementSchema schema = tablet.getSchemas().get(i); + String measurementPath = deviceId + TsFileConstant.PATH_SEPARATOR + schema.getMeasurementId(); + + LastPointFilter filter = cache.getPartialPathLastObject(measurementPath); + if (filter != null + && !filter.filterAndMarkAsConsumed( + tablet.timestamps[i], getObject(dataValues[i], schema.getType()))) { + columnFilterMap.mark(i); + fieldBitMap.mark(0); + } + } + + return columnFilterMap.isAllMarked(); + } + + public Object getObject(final Object value, final TSDataType dataType) { + switch (dataType) { + case INT32: + return ((int[]) value)[0]; + case DATE: + return ((LocalDate[]) value)[0]; + case INT64: + case TIMESTAMP: + return ((long[]) value)[0]; + case FLOAT: + return ((float[]) value)[0]; + case DOUBLE: + return ((double[]) value)[0]; + case BOOLEAN: + return ((boolean[]) value)[0]; + case TEXT: + case BLOB: + case STRING: + return ((Binary[]) value)[0]; + default: + throw new UnsupportedOperationException( + String.format("unsupported data type %s", dataType)); + } + } + /////////////////////////////// stringify /////////////////////////////// public String toString() { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java new file mode 100644 index 000000000000..a3d0c2d40ec7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeLastPointTsBlockEventTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeLastPointTabletEvent; +import org.apache.iotdb.db.pipe.event.common.tsblock.PipeLastPointTsBlockEvent; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.utils.DateUtils; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class PipeLastPointTsBlockEventTest { + + public TsBlock createTsBlock() { + long[] timeArray = {1L, 2L, 3L, 4L, 5L}; + boolean[] booleanValueArray = {true, false, false, false, true}; + boolean[] booleanIsNull = {true, true, false, true, false}; + int[] intValueArray = {10, 20, 30, 40, 50}; + boolean[] intIsNull = {false, true, false, false, true}; + long[] longValueArray = {100L, 200L, 300L, 400, 500L}; + boolean[] longIsNull = {true, false, false, true, true}; + float[] floatValueArray = {1000.0f, 2000.0f, 3000.0f, 4000.0f, 5000.0f}; + boolean[] floatIsNull = {false, false, true, true, false}; + double[] doubleValueArray = {10000.0, 20000.0, 30000.0, 40000.0, 50000.0}; + boolean[] doubleIsNull = {true, false, false, true, false}; + Binary[] binaryValueArray = { + new Binary("19970909", TSFileConfig.STRING_CHARSET), + new Binary("ty", TSFileConfig.STRING_CHARSET), + new Binary("love", TSFileConfig.STRING_CHARSET), + new Binary("zm", TSFileConfig.STRING_CHARSET), + new Binary("19950421", TSFileConfig.STRING_CHARSET) + }; + boolean[] binaryIsNull = {false, false, false, false, false}; + + TsBlockBuilder builder = new TsBlockBuilder(getMeasurementDataTypes()); + for (int i = 0; i < timeArray.length; i++) { + builder.getTimeColumnBuilder().writeLong(timeArray[i]); + if (booleanIsNull[i]) { + builder.getColumnBuilder(0).appendNull(); + } else { + builder.getColumnBuilder(0).writeBoolean(booleanValueArray[i]); + } + if (intIsNull[i]) { + builder.getColumnBuilder(1).appendNull(); + } else { + builder.getColumnBuilder(1).writeInt(intValueArray[i]); + } + if (longIsNull[i]) { + builder.getColumnBuilder(2).appendNull(); + } else { + builder.getColumnBuilder(2).writeLong(longValueArray[i]); + } + if (floatIsNull[i]) { + builder.getColumnBuilder(3).appendNull(); + } else { + builder.getColumnBuilder(3).writeFloat(floatValueArray[i]); + } + if (doubleIsNull[i]) { + builder.getColumnBuilder(4).appendNull(); + } else { + builder.getColumnBuilder(4).writeDouble(doubleValueArray[i]); + } + if (binaryIsNull[i]) { + builder.getColumnBuilder(5).appendNull(); + } else { + builder.getColumnBuilder(5).writeBinary(binaryValueArray[i]); + } + builder.declarePosition(); + } + return builder.build(); + } + + private String[] getMeasurementIdentifiers() { + return new String[] {"s1", "s2", "s3", "s4", "s5", "s6"}; + } + + private List getMeasurementDataTypes() { + return Arrays.asList( + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT); + } + + private List createMeasurementSchemas() { + List measurementSchemas = new ArrayList<>(); + final String[] measurementIds = getMeasurementIdentifiers(); + final List dataTypes = getMeasurementDataTypes(); + + for (int i = 0; i < measurementIds.length; i++) { + measurementSchemas.add(new MeasurementSchema(measurementIds[i], dataTypes.get(i))); + } + + return measurementSchemas; + } + + private PartialPath createPartialPath() throws IllegalPathException { + return new PartialPath("root.ts"); + } + + private PipeLastPointTsBlockEvent generatePipeLastPointEvent( + TsBlock tsBlock, PartialPath partialPath, List measurementSchemas) + throws IllegalPathException { + PipeLastPointTsBlockEvent pipeLastPointTsBlockEvent = + new PipeLastPointTsBlockEvent( + tsBlock, + System.currentTimeMillis(), + partialPath, + measurementSchemas, + "test", + System.currentTimeMillis(), + null, + null, + 0L, + 1L); + + return pipeLastPointTsBlockEvent; + } + + @Test + public void convertToPipeLastPointTabletEventTest() throws IllegalPathException { + TsBlock tsBlock = createTsBlock(); + PartialPath partialPath = createPartialPath(); + List measurementSchemas = createMeasurementSchemas(); + PipeLastPointTsBlockEvent tsBlockEvent = + generatePipeLastPointEvent(tsBlock, partialPath, measurementSchemas); + + PipeLastPointTabletEvent lastPointTabletEvent = + tsBlockEvent.convertToPipeLastPointTabletEvent( + null, (bitMap, block, deviceId, schemas) -> {}); + + Tablet tablet = lastPointTabletEvent.getTablet(); + + Object[] values = tablet.values; + int rowNum = tsBlock.getTimeColumn().getPositionCount(); + int columnNum = values.length; + for (int row = 0; row < rowNum; row++) { + Assert.assertEquals(tablet.timestamps[row], tsBlock.getTimeColumn().getLong(row)); + for (int column = 0; column < columnNum; column++) { + Column tsColumn = tsBlock.getColumn(column); + BitMap bitMap = tablet.bitMaps[column]; + Assert.assertEquals(tsColumn.isNull(row), bitMap.isMarked(row)); + if (tsColumn.isNull(row)) { + continue; + } + switch (tsColumn.getDataType()) { + case BOOLEAN: + Assert.assertEquals(tsColumn.getBoolean(row), ((boolean[]) values[column])[row]); + break; + case INT32: + Assert.assertEquals(tsColumn.getInt(row), ((int[]) values[column])[row]); + break; + case DATE: + Assert.assertEquals( + DateUtils.parseIntToLocalDate(tsColumn.getInt(row)), + ((LocalDate[]) values[column])[row]); + break; + case INT64: + case TIMESTAMP: + Assert.assertEquals(tsColumn.getLong(row), ((long[]) values[column])[row]); + break; + case FLOAT: + Assert.assertEquals( + Float.compare(tsColumn.getFloat(row), ((float[]) values[column])[row]), 0); + break; + case DOUBLE: + Assert.assertEquals( + Double.compare(tsColumn.getDouble(row), ((double[]) values[column])[row]), 0); + break; + case TEXT: + case BLOB: + case STRING: + Assert.assertEquals(tsColumn.getBinary(row), ((Binary[]) values[column])[row]); + break; + } + } + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java index 22bc87b2917b..b4525afc4f70 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java @@ -32,6 +32,11 @@ public class PipeProcessorConstant { "processor.down-sampling.memory-limit-in-bytes"; public static final long PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE = 16 * MB; + public static final String PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY = + "processor.last-point-sampling.memory-limit-in-bytes"; + public static final long PROCESSOR_LAST_POINT_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE = + 16 * MB; + public static final String PROCESSOR_OUTPUT_MAX_DELAY_SECONDS_KEY = "processor.output.max-delay-seconds"; public static final long PROCESSOR_OUTPUT_MAX_DELAY_SECONDS_DEFAULT_VALUE = -1; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/QuadConsumer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/QuadConsumer.java new file mode 100644 index 000000000000..e0f1cc3f2707 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/QuadConsumer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.function; + +@FunctionalInterface +public interface QuadConsumer { + + void accept(T t, U u, V v, W w); +}