Skip to content

Commit

Permalink
Pipe: fix calculations of PipeDataNodeRemainingEventAndTimeOperator (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies authored Oct 23, 2024
1 parent 58e3a69 commit cb0d2a5
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
PipeDataNodeResourceManager.wal().pin(walEntryHandler);
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseInsertionEventCount(pipeName + "_" + creationTime);
.increaseTabletEventCount(pipeName + "_" + creationTime);
}
return true;
} catch (final Exception e) {
Expand Down Expand Up @@ -169,7 +169,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
} finally {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseInsertionEventCount(pipeName + "_" + creationTime);
.decreaseTabletEventCount(pipeName + "_" + creationTime);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.increaseInsertionEventCount(pipeName + "_" + creationTime);
.increaseTabletEventCount(pipeName + "_" + creationTime);
}
return true;
}
Expand All @@ -127,7 +127,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
if (Objects.nonNull(pipeName)) {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
.decreaseInsertionEventCount(pipeName + "_" + creationTime);
.decreaseTabletEventCount(pipeName + "_" + creationTime);
}
allocatedMemoryBlock.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,16 @@ public void register(final IoTDBSchemaRegionExtractor extractor) {
}
}

public void increaseInsertionEventCount(final String pipeID) {
public void increaseTabletEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.increaseInsertionEventCount();
.increaseTabletEventCount();
}

public void decreaseInsertionEventCount(final String pipeID) {
public void decreaseTabletEventCount(final String pipeID) {
remainingEventAndTimeOperatorMap
.computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator())
.decreaseInsertionEventCount();
.decreaseTabletEventCount();
}

public void increaseTsFileEventCount(final String pipeID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
Collections.newSetFromMap(new ConcurrentHashMap<>());

private final AtomicInteger insertionEventCount = new AtomicInteger(0);
private final AtomicInteger tabletEventCount = new AtomicInteger(0);
private final AtomicInteger tsfileEventCount = new AtomicInteger(0);
private final AtomicInteger heartbeatEventCount = new AtomicInteger(0);

Expand All @@ -58,12 +58,12 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {

//////////////////////////// Remaining event & time calculation ////////////////////////////

void increaseInsertionEventCount() {
tsfileEventCount.incrementAndGet();
void increaseTabletEventCount() {
tabletEventCount.incrementAndGet();
}

void decreaseInsertionEventCount() {
tsfileEventCount.decrementAndGet();
void decreaseTabletEventCount() {
tabletEventCount.decrementAndGet();
}

void increaseTsFileEventCount() {
Expand All @@ -84,6 +84,7 @@ void decreaseHeartbeatEventCount() {

long getRemainingEvents() {
return tsfileEventCount.get()
+ tabletEventCount.get()
+ heartbeatEventCount.get()
+ schemaRegionExtractors.stream()
.map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
Expand All @@ -105,7 +106,7 @@ long getRemainingEvents() {
final double invocationValue = collectInvocationHistogram.getMean();
// Do not take heartbeat event into account
final double totalDataRegionWriteEventCount =
tsfileEventCount.get() * Math.max(invocationValue, 1) + insertionEventCount.get();
tsfileEventCount.get() * Math.max(invocationValue, 1) + tabletEventCount.get();

dataRegionCommitMeter.updateAndGet(
meter -> {
Expand Down

0 comments on commit cb0d2a5

Please sign in to comment.