Skip to content

Commit

Permalink
Pipe: Fix the problem that the table model chunkData is not registere…
Browse files Browse the repository at this point in the history
…d with TableSchema
  • Loading branch information
luoluoyuyu committed Oct 24, 2024
1 parent ecacaa1 commit 95d0494
Showing 1 changed file with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
Expand Down Expand Up @@ -71,6 +73,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import static org.apache.iotdb.db.utils.constant.SqlConstant.ROOT;
import static org.apache.iotdb.db.utils.constant.SqlConstant.ROOT_DOT;

/**
* {@link LoadTsFileManager} is used for dealing with {@link LoadTsFilePieceNode} and {@link
* LoadCommand}. This class turn the content of a piece of loading TsFile into a new TsFile. When
Expand Down Expand Up @@ -408,6 +413,23 @@ private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws
dataPartition2Writer.put(partitionInfo, writer);
}
TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo);

// Table model needs to register TableSchema
final String tableName =
chunkData.getDevice() != null ? chunkData.getDevice().getTableName() : null;
if (tableName != null && !(tableName.startsWith(ROOT_DOT) || tableName.equals(ROOT))) {
writer
.getSchema()
.getTableSchemaMap()
.computeIfAbsent(
tableName,
t ->
TableSchema.of(
DataNodeTableCache.getInstance()
.getTable(partitionInfo.getDataRegion().getDatabaseName(), t))
.toTsFileTableSchemaNoAttribute());
}

if (!Objects.equals(chunkData.getDevice(), dataPartition2LastDevice.get(partitionInfo))) {
if (dataPartition2LastDevice.containsKey(partitionInfo)) {
writer.endChunkGroup();
Expand Down

0 comments on commit 95d0494

Please sign in to comment.