From cb0d2a51650e1e32cb878962a428a347d90dc1e5 Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Wed, 23 Oct 2024 19:50:20 +0800 Subject: [PATCH] Pipe: fix calculations of PipeDataNodeRemainingEventAndTimeOperator (#13876) (#13889) --- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 4 ++-- .../common/tablet/PipeRawTabletInsertionEvent.java | 4 ++-- .../PipeDataNodeRemainingEventAndTimeMetrics.java | 8 ++++---- .../PipeDataNodeRemainingEventAndTimeOperator.java | 13 +++++++------ 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index bd01f26b6556..5932c6b544b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -136,7 +136,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa PipeDataNodeResourceManager.wal().pin(walEntryHandler); if (Objects.nonNull(pipeName)) { PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .increaseInsertionEventCount(pipeName + "_" + creationTime); + .increaseTabletEventCount(pipeName + "_" + creationTime); } return true; } catch (final Exception e) { @@ -169,7 +169,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa } finally { if (Objects.nonNull(pipeName)) { PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .decreaseInsertionEventCount(pipeName + "_" + creationTime); + .decreaseTabletEventCount(pipeName + "_" + creationTime); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index a5795ede49c7..b89c11b1f39b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -118,7 +118,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet)); if (Objects.nonNull(pipeName)) { PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .increaseInsertionEventCount(pipeName + "_" + creationTime); + .increaseTabletEventCount(pipeName + "_" + creationTime); } return true; } @@ -127,7 +127,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { if (Objects.nonNull(pipeName)) { PipeDataNodeRemainingEventAndTimeMetrics.getInstance() - .decreaseInsertionEventCount(pipeName + "_" + creationTime); + .decreaseTabletEventCount(pipeName + "_" + creationTime); } allocatedMemoryBlock.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java index a8765b3b61dd..25c2ede24076 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java @@ -129,16 +129,16 @@ public void register(final IoTDBSchemaRegionExtractor extractor) { } } - public void increaseInsertionEventCount(final String pipeID) { + public void increaseTabletEventCount(final String pipeID) { remainingEventAndTimeOperatorMap .computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator()) - .increaseInsertionEventCount(); + .increaseTabletEventCount(); } - public void decreaseInsertionEventCount(final String pipeID) { + public void decreaseTabletEventCount(final String pipeID) { remainingEventAndTimeOperatorMap .computeIfAbsent(pipeID, k -> new PipeDataNodeRemainingEventAndTimeOperator()) - .decreaseInsertionEventCount(); + .decreaseTabletEventCount(); } public void increaseTsFileEventCount(final String pipeID) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java index c00722151462..bee0e6975b44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java @@ -44,7 +44,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { private final Set schemaRegionExtractors = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final AtomicInteger insertionEventCount = new AtomicInteger(0); + private final AtomicInteger tabletEventCount = new AtomicInteger(0); private final AtomicInteger tsfileEventCount = new AtomicInteger(0); private final AtomicInteger heartbeatEventCount = new AtomicInteger(0); @@ -58,12 +58,12 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { //////////////////////////// Remaining event & time calculation //////////////////////////// - void increaseInsertionEventCount() { - tsfileEventCount.incrementAndGet(); + void increaseTabletEventCount() { + tabletEventCount.incrementAndGet(); } - void decreaseInsertionEventCount() { - tsfileEventCount.decrementAndGet(); + void decreaseTabletEventCount() { + tabletEventCount.decrementAndGet(); } void increaseTsFileEventCount() { @@ -84,6 +84,7 @@ void decreaseHeartbeatEventCount() { long getRemainingEvents() { return tsfileEventCount.get() + + tabletEventCount.get() + heartbeatEventCount.get() + schemaRegionExtractors.stream() .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount) @@ -105,7 +106,7 @@ long getRemainingEvents() { final double invocationValue = collectInvocationHistogram.getMean(); // Do not take heartbeat event into account final double totalDataRegionWriteEventCount = - tsfileEventCount.get() * Math.max(invocationValue, 1) + insertionEventCount.get(); + tsfileEventCount.get() * Math.max(invocationValue, 1) + tabletEventCount.get(); dataRegionCommitMeter.updateAndGet( meter -> {