Skip to content

Commit

Permalink
Modify code
Browse files Browse the repository at this point in the history
  • Loading branch information
luoluoyuyu committed Aug 11, 2024
1 parent 4f9604b commit 2c7409f
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ public List<TabletInsertionEvent> processMaxTimestampRowByRow(
consumer.accept(
new PipeRow(
maxTimestampRowCount,
deviceId,
getDeviceStr(),
isAligned,
measurementSchemaList,
timestampColumn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.schema.IMeasurementSchema;

import java.time.LocalDate;
import java.util.List;
Expand All @@ -48,7 +48,7 @@ public class PipeLastPointTsBlockEvent extends EnrichedEvent {

private final PartialPath partialPath;

private final List<MeasurementSchema> measurementSchemas;
private final List<IMeasurementSchema> measurementSchemas;

private final long captureTime;

Expand All @@ -60,7 +60,7 @@ public PipeLastPointTsBlockEvent(
final TsBlock tsBlock,
final long captureTime,
final PartialPath partialPath,
final List<MeasurementSchema> measurementSchemas,
final List<IMeasurementSchema> measurementSchemas,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
Expand Down Expand Up @@ -133,9 +133,10 @@ public boolean mayEventPathsOverlappedWithPattern() {

public PipeLastPointTabletEvent convertToPipeLastPointTabletEvent(
PartialPathLastObjectCache<LastPointFilter<?>> partialPathToLatestTimeCache,
QuadConsumer<BitMap, TsBlock, String, List<MeasurementSchema>> filterConsumer) {
QuadConsumer<BitMap, TsBlock, String, List<IMeasurementSchema>> filterConsumer) {
BitMap columnSelectionMap = new BitMap(tsBlock.getValueColumnCount());
filterConsumer.accept(columnSelectionMap, tsBlock, partialPath.getDevice(), measurementSchemas);
filterConsumer.accept(
columnSelectionMap, tsBlock, partialPath.getDevicePath().toString(), measurementSchemas);
if (columnSelectionMap.isAllMarked()) {
return null;
}
Expand Down Expand Up @@ -207,7 +208,7 @@ private Tablet convertToTablet() {

final Tablet tablet =
new Tablet(
partialPath.getDevice(),
partialPath.getDevicePath().getFullPath(),
measurementSchemas,
tsBlock.getTimeColumn().getLongs(),
values,
Expand Down Expand Up @@ -238,7 +239,7 @@ public PartialPath getPartialPath() {
return partialPath;
}

public List<MeasurementSchema> getMeasurementSchemas() {
public List<IMeasurementSchema> getMeasurementSchemas() {
return measurementSchemas;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.schema.IMeasurementSchema;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -238,7 +238,7 @@ private void markEligibleColumnsBasedOnFilter(
BitMap columnSelectionMap,
TsBlock tsBlock,
String deviceId,
List<MeasurementSchema> measurementSchemas) {
List<IMeasurementSchema> measurementSchemas) {
int columnCount = tsBlock.getValueColumnCount();

for (int i = 0; i < columnCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -194,7 +194,7 @@ private List<Tablet> convertToTablets(final TabletInsertionEvent tabletInsertion

private boolean lastPointFilter(
Tablet tablet, PartialPathLastObjectCache<LastPointFilter<?>> cache) {
String deviceId = tablet.deviceId;
String deviceId = tablet.getDeviceId();
Object[] dataValues = tablet.values;
int columnCount = dataValues.length;
BitMap columnFilterMap = new BitMap(columnCount);
Expand All @@ -207,7 +207,7 @@ private boolean lastPointFilter(
continue;
}

MeasurementSchema schema = tablet.getSchemas().get(i);
IMeasurementSchema schema = tablet.getSchemas().get(i);
String measurementPath = deviceId + TsFileConstant.PATH_SEPARATOR + schema.getMeasurementId();

LastPointFilter filter = cache.getPartialPathLastObject(measurementPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeLastPointTabletEvent;
import org.apache.iotdb.db.pipe.event.common.tsblock.PipeLastPointTsBlockEvent;
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;

import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.common.conf.TSFileConfig;
Expand All @@ -34,6 +33,7 @@
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -118,8 +118,8 @@ private List<TSDataType> getMeasurementDataTypes() {
TSDataType.TEXT);
}

private List<MeasurementSchema> createMeasurementSchemas() {
List<MeasurementSchema> measurementSchemas = new ArrayList<>();
private List<IMeasurementSchema> createMeasurementSchemas() {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
final String[] measurementIds = getMeasurementIdentifiers();
final List<TSDataType> dataTypes = getMeasurementDataTypes();

Expand All @@ -131,11 +131,11 @@ private List<MeasurementSchema> createMeasurementSchemas() {
}

private PartialPath createPartialPath() throws IllegalPathException {
return new PartialPath(DeviceIDFactory.getInstance().getDeviceID("root.ts"));
return new PartialPath("root.ts");
}

private PipeLastPointTsBlockEvent generatePipeLastPointEvent(
TsBlock tsBlock, PartialPath partialPath, List<MeasurementSchema> measurementSchemas)
TsBlock tsBlock, PartialPath partialPath, List<IMeasurementSchema> measurementSchemas)
throws IllegalPathException {
PipeLastPointTsBlockEvent pipeLastPointTsBlockEvent =
new PipeLastPointTsBlockEvent(
Expand All @@ -157,7 +157,7 @@ private PipeLastPointTsBlockEvent generatePipeLastPointEvent(
public void convertToPipeLastPointTabletEventTest() throws IllegalPathException {
TsBlock tsBlock = createTsBlock();
PartialPath partialPath = createPartialPath();
List<MeasurementSchema> measurementSchemas = createMeasurementSchemas();
List<IMeasurementSchema> measurementSchemas = createMeasurementSchemas();
PipeLastPointTsBlockEvent tsBlockEvent =
generatePipeLastPointEvent(tsBlock, partialPath, measurementSchemas);

Expand Down

0 comments on commit 2c7409f

Please sign in to comment.