Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
jt2594838 committed Oct 25, 2024
1 parent 2ffd6b4 commit cb83826
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
Expand Down Expand Up @@ -132,6 +131,7 @@
import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

Expand All @@ -158,7 +158,6 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1993,6 +1992,7 @@ public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorExcepti
WritingMetrics.getInstance().recordActiveTimePartitionCount(-1);
} finally {
writeUnlock();
writeUnlock();
}
}

Expand Down Expand Up @@ -2264,7 +2264,6 @@ public void deleteByDevice(final MeasurementPath pattern, final DeleteDataNode n

try {
TreeDeviceSchemaCacheManager.getInstance().invalidateLastCache(pattern);
Set<PartialPath> devicePaths = new HashSet<>(pattern.getDevicePathPattern());
// write log to impacted working TsFileProcessors
List<WALFlushListener> walListeners =
logDeletionInWAL(startTime, endTime, searchIndex, pattern);
Expand Down Expand Up @@ -2475,9 +2474,11 @@ public void insertSeparatorToWAL() {

private boolean canSkipDelete(TsFileResource tsFileResource, ModEntry deletion) {
long fileStartTime = tsFileResource.getTimeIndex().getMinStartTime();
long fileEndTime = tsFileResource.getTimeIndex().getMaxEndTime();
long fileEndTime =
tsFileResource.isClosed() ? tsFileResource.getTimeIndex().getMaxEndTime() : Long.MAX_VALUE;

if (!deletion.getTimeRange().contains(fileStartTime, fileEndTime)) {
if (!ModificationUtils.overlap(
deletion.getStartTime(), deletion.getEndTime(), fileStartTime, fileEndTime)) {
return true;
}
ITimeIndex timeIndex = tsFileResource.getTimeIndex();
Expand All @@ -2487,7 +2488,10 @@ private boolean canSkipDelete(TsFileResource tsFileResource, ModEntry deletion)

for (IDeviceID device : tsFileResource.getDevices()) {
long startTime = tsFileResource.getTimeIndex().getStartTime(device);
long endTime = tsFileResource.getTimeIndex().getEndTime(device);
long endTime =
tsFileResource.isClosed()
? tsFileResource.getTimeIndex().getEndTime(device)
: Long.MAX_VALUE;
if (deletion.affects(device, startTime, endTime)) {
return false;
}
Expand All @@ -2509,15 +2513,15 @@ private void deleteDataInUnsealedFiles(
if (tsFileResource.isClosed()) {
sealedTsFiles.add(tsFileResource);
} else {
tsFileResource.getProcessor().getFlushQueryLock().readLock().lock();
tsFileResource.getProcessor().getFlushQueryLock().writeLock().lock();
if (tsFileResource.isClosed()) {
sealedTsFiles.add(tsFileResource);
tsFileResource.getProcessor().getFlushQueryLock().readLock().unlock();
tsFileResource.getProcessor().getFlushQueryLock().writeLock().unlock();
} else {
try {
tsFileResource.getProcessor().deleteDataInMemory(deletion);
} finally {
tsFileResource.getProcessor().getFlushQueryLock().readLock().unlock();
tsFileResource.getProcessor().getFlushQueryLock().writeLock().unlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,20 +815,22 @@ private long[] filterDeletedTimestamp(TVList tvList, List<TimeRange> deletionLis
}

@Override
public void delete(ModEntry modEntry) {
public long delete(ModEntry modEntry) {
List<Pair<IDeviceID, IWritableMemChunkGroup>> targetDeviceList = new ArrayList<>();
for (Entry<IDeviceID, IWritableMemChunkGroup> entry : memTableMap.entrySet()) {
if (modEntry.affects(entry.getKey())) {
targetDeviceList.add(new Pair<>(entry.getKey(), entry.getValue()));
}
}

long pointDeleted = 0;
for (Pair<IDeviceID, IWritableMemChunkGroup> pair : targetDeviceList) {
pair.right.delete(modEntry);
pointDeleted += pair.right.delete(modEntry);
if (pair.right.getMemChunkMap().isEmpty()) {
memTableMap.remove(pair.left);
}
}
return pointDeleted;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,12 @@ void queryForDeviceRegionScan(

boolean isEmpty();

/** Delete data in the MemTable according to the modEntry. */
void delete(ModEntry modEntry);
/**
* Delete data in the MemTable according to the modEntry.
*
* @return
*/
long delete(ModEntry modEntry);

/**
* Make a copy of this MemTable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,8 +1165,11 @@ public void deleteDataInMemory(ModEntry deletion) {
}
try {
if (workMemTable != null) {
logger.info("[Deletion] Deletion with {} in workMemTable", deletion);
workMemTable.delete(deletion);
long pointDeleted = workMemTable.delete(deletion);
logger.info(
"[Deletion] Deletion with {} in workMemTable, {} points deleted",
deletion,
pointDeleted);
}
// Flushing memTables are immutable, only record this deletion in these memTables for read
if (!flushingMemTables.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected ModEntry(ModType modType) {

public int serializedSize() {
// modType + time range
return Byte.BYTES + Long.BYTES * 2;
return Byte.BYTES + Long.BYTES * 2 + Byte.BYTES * 2;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.modification;

import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.utils.ModificationUtils;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.common.TimeRange;
Expand Down Expand Up @@ -80,7 +81,8 @@ public int serializedSize() {

@Override
public boolean affects(IDeviceID deviceID, long startTime, long endTime) {
return predicate.matches(deviceID) && timeRange.contains(startTime, endTime);
return predicate.matches(deviceID)
&& ModificationUtils.overlap(getStartTime(), getEndTime(), startTime, endTime);
}

@Override
Expand Down Expand Up @@ -117,7 +119,8 @@ public int compareTo(ModEntry o) {
}

public TableDeletionEntry clone() {
return new TableDeletionEntry(predicate, new TimeRange(timeRange.getMin(), timeRange.getMax()));
TimeRange timeRangeCopy = new TimeRange(timeRange.getMin(), timeRange.getMax());
return new TableDeletionEntry(predicate, timeRangeCopy);
}

public String getTableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternUtil;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.storageengine.dataregion.modification.v1.Deletion;
import org.apache.iotdb.db.utils.ModificationUtils;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.common.TimeRange;
Expand Down Expand Up @@ -57,6 +59,7 @@ public TreeDeletionEntry(PartialPath path, TimeRange timeRange) {
this.timeRange = timeRange;
}

@TestOnly
public TreeDeletionEntry(PartialPath path, long endTime) {
this();
this.pathPattern = path;
Expand Down Expand Up @@ -120,19 +123,24 @@ public boolean matches(PartialPath path) {

@Override
public boolean affects(IDeviceID deviceID, long startTime, long endTime) {
try {
return pathPattern.endWithMultiLevelWildcard()
&& pathPattern.getDevicePath().matchFullPath(new PartialPath(deviceID))
&& timeRange.contains(startTime, endTime);
} catch (IllegalPathException e) {
return false;
}
return affects(deviceID)
&& ModificationUtils.overlap(getStartTime(), getEndTime(), startTime, endTime);
}

@Override
public boolean affects(IDeviceID deviceID) {
try {
return pathPattern.matchPrefixPath(new PartialPath(deviceID));
if (pathPattern.endWithMultiLevelWildcard()) {
PartialPath deviceIdPath = new PartialPath(deviceID);
// pattern: root.db1.d1.**, deviceId: root.db1.d1, match
return pathPattern.getDevicePath().matchFullPath(deviceIdPath)
// pattern: root.db1.**, deviceId: root.db1.d1, match
|| pathPattern.matchFullPath(deviceIdPath);
} else {
// pattern: root.db1.d1.s1, deviceId: root.db1.d1, match
// pattern: root.db1.d1, deviceId: root.db1.d1, not match
return pathPattern.getDevicePath().matchFullPath(new PartialPath(deviceID));
}
} catch (IllegalPathException e) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ private ModificationUtils() {
// util class
}

// both ranges are closed
public static boolean overlap(long startA, long endA, long startB, long endB) {
return endB >= startA && startB <= endA;
}

/**
* modifyChunkMetaData iterates the chunkMetaData and applies all available modifications on it to
* generate a ModifiedChunkMetadata. <br>
Expand Down

0 comments on commit cb83826

Please sign in to comment.