Skip to content

Commit

Permalink
[profile](pipeline) populate exec node's total time counter using pip…
Browse files Browse the repository at this point in the history
…eline task's running time counters
  • Loading branch information
Doris-Extras committed Jul 27, 2023
1 parent fb41265 commit 2cffad6
Show file tree
Hide file tree
Showing 43 changed files with 258 additions and 644 deletions.
5 changes: 1 addition & 4 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ std::string ExecNode::get_name() {
}

Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_TIMER(_projection_timer);
using namespace vectorized;
MutableBlock mutable_block =
Expand Down Expand Up @@ -570,8 +571,4 @@ Status ExecNode::get_next_after_projects(
return func(state, block, eos);
}

Status ExecNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) {
return Status::NotSupported("{} not implements sink", get_name());
}

} // namespace doris
37 changes: 29 additions & 8 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,27 @@ class ExecNode {

// Emit data, both need impl with method: sink
// Eg: Aggregation, Sort, Scan
[[nodiscard]] virtual Status pull(RuntimeState* state, vectorized::Block* output_block,
bool* eos) {
return get_next(state, output_block, eos);
[[nodiscard]] Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
return do_pull(state, output_block, eos);
}

[[nodiscard]] virtual Status push(RuntimeState* state, vectorized::Block* input_block,
bool eos) {
return Status::OK();
// add do_pull and do_push and do_sink method to exec node, because sink,push,pull will be
// reused by many other methods, we could not add total time counter correctly.
[[nodiscard]] Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
return do_push(state, input_block, eos);
}

bool can_read() const { return _can_read; }

// Sink Data to ExecNode to do some stock work, both need impl with method: get_result
// `eos` means source is exhausted, exec node should do some finalize work
// Eg: Aggregation, Sort
[[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* input_block,
bool eos);
[[nodiscard]] Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
return do_sink(state, input_block, eos);
}

// Resets the stream of row batches to be retrieved by subsequent GetNext() calls.
// Clears all internal state, returning this node to the state it was in after calling
Expand Down Expand Up @@ -249,6 +253,23 @@ class ExecNode {
/// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc
Status do_projections(vectorized::Block* origin_block, vectorized::Block* output_block);

[[nodiscard]] virtual Status do_sink(RuntimeState* state, vectorized::Block* input_block,
bool eos) {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"do_sink method is not implemented by this class");
}

[[nodiscard]] virtual Status do_push(RuntimeState* state, vectorized::Block* input_block,
bool eos) {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"do_push method is not implemented by this class");
}

[[nodiscard]] virtual Status do_pull(RuntimeState* state, vectorized::Block* output_block,
bool* eos) {
return get_next(state, output_block, eos);
}

int _id; // unique w/in single plan tree
TPlanNodeType::type _type;
ObjectPool* _pool;
Expand Down
11 changes: 11 additions & 0 deletions be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ class ExchangeSourceOperator final : public SourceOperator<ExchangeSourceOperato
ExchangeSourceOperator(OperatorBuilderBase*, ExecNode*);
bool can_read() override;
bool is_pending_finish() const override;

// Should add wait source time to exchange node
void update_profile(PipelineTaskTimer& pipeline_task_timer) override {
StreamingOperator<ExchangeSourceOperatorBuilder>::_node->runtime_profile()
->total_time_counter()
->update(pipeline_task_timer.wait_source_time +
pipeline_task_timer.wait_dependency_time);

StreamingOperator<ExchangeSourceOperatorBuilder>::_node->update_wait_source_time(
pipeline_task_timer.wait_source_time);
}
};

} // namespace doris::pipeline
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class HashJoinBuildSink final : public StreamingOperator<HashJoinBuildSinkBuilde
HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node);
bool can_write() override { return _node->can_sink_write(); }
bool is_pending_finish() const override { return !_node->ready_for_finish(); }
// build is not on critital path, should not update total time counter
void update_profile(PipelineTaskTimer& pipeline_task_timer) override {}
};

} // namespace pipeline
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/nested_loop_join_build_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class NestLoopJoinBuildOperator final : public StreamingOperator<NestLoopJoinBui
public:
NestLoopJoinBuildOperator(OperatorBuilderBase* operator_builder, ExecNode* node);
bool can_write() override { return true; }
// build is not on critital path, should not update total time counter
void update_profile(PipelineTaskTimer& pipeline_task_timer) override {}
};

} // namespace pipeline
Expand Down
38 changes: 29 additions & 9 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,13 @@ class RuntimeState;
class TDataSink;
} // namespace doris

#define OPERATOR_CODE_GENERATOR(NAME, SUBCLASS) \
NAME##Builder::NAME##Builder(int32_t id, ExecNode* exec_node) \
: OperatorBuilder(id, #NAME, exec_node) {} \
\
OperatorPtr NAME##Builder::build_operator() { \
return std::make_shared<NAME>(this, _node); \
} \
\
NAME::NAME(OperatorBuilderBase* operator_builder, ExecNode* node) \
#define OPERATOR_CODE_GENERATOR(NAME, SUBCLASS) \
NAME##Builder::NAME##Builder(int32_t id, ExecNode* exec_node) \
: OperatorBuilder(id, #NAME, exec_node) {} \
\
OperatorPtr NAME##Builder::build_operator() { return std::make_shared<NAME>(this, _node); } \
\
NAME::NAME(OperatorBuilderBase* operator_builder, ExecNode* node) \
: SUBCLASS(operator_builder, node) {};

namespace doris::pipeline {
Expand Down Expand Up @@ -221,6 +219,8 @@ class OperatorBase {
return Status::NotSupported(error_msg.str());
}

virtual void update_profile(PipelineTaskTimer& pipeline_task_timer) {}

/**
* pending_finish means we have called `close` and there are still some work to do before finishing.
* Now it is a pull-based pipeline and operators pull data from its child by this method.
Expand Down Expand Up @@ -303,6 +303,11 @@ class DataSinkOperator : public OperatorBase {

[[nodiscard]] RuntimeProfile* get_runtime_profile() const override { return _sink->profile(); }

// Should add wait sink time to all sink node
void update_profile(PipelineTaskTimer& pipeline_task_timer) override {
_sink->profile()->total_time_counter()->update(pipeline_task_timer.wait_sink_time);
}

protected:
NodeType* _sink;
};
Expand Down Expand Up @@ -370,6 +375,11 @@ class StreamingOperator : public OperatorBase {
return _node->runtime_profile();
}

void update_profile(PipelineTaskTimer& pipeline_task_timer) override {
_node->runtime_profile()->total_time_counter()->update(
pipeline_task_timer.wait_source_time + pipeline_task_timer.wait_dependency_time);
}

protected:
NodeType* _node;
bool _use_projection;
Expand Down Expand Up @@ -397,6 +407,9 @@ class SourceOperator : public StreamingOperator<OperatorBuilderType> {
source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE;
return Status::OK();
}

// In most scenarios, source operator does not do anything.
void update_profile(PipelineTaskTimer& pipeline_task_timer) override {}
};

/**
Expand Down Expand Up @@ -454,6 +467,13 @@ class StatefulOperator : public StreamingOperator<OperatorBuilderType> {
return Status::OK();
}

void update_profile(PipelineTaskTimer& pipeline_task_timer) override {
StreamingOperator<OperatorBuilderType>::_node->runtime_profile()
->total_time_counter()
->update(pipeline_task_timer.wait_source_time +
pipeline_task_timer.wait_dependency_time);
}

protected:
std::unique_ptr<vectorized::Block> _child_block;
SourceState _child_source_state;
Expand Down
10 changes: 10 additions & 0 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ class ScanOperator : public SourceOperator<ScanOperatorBuilder> {
std::string debug_string() const override;

Status try_close(RuntimeState* state) override;

// Should add wait source time to scan node
void update_profile(PipelineTaskTimer& pipeline_task_timer) override {
StreamingOperator<ScanOperatorBuilder>::_node->runtime_profile()
->total_time_counter()
->update(pipeline_task_timer.wait_source_time +
pipeline_task_timer.wait_dependency_time);
StreamingOperator<ScanOperatorBuilder>::_node->update_wait_source_time(
pipeline_task_timer.wait_source_time);
}
};

} // namespace doris::pipeline
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/set_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class SetSinkOperator : public StreamingOperator<SetSinkOperatorBuilder<is_inter

bool can_write() override { return true; }

void update_profile(PipelineTaskTimer& pipeline_task_timer) override {}

private:
vectorized::VSetOperationNode<is_intersect>* _set_node;
};
Expand Down
19 changes: 19 additions & 0 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ void PipelineTask::_fresh_profile_counter() {
COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time);
COUNTER_SET(_dst_pending_finish_over_timer, _dst_pending_finish_over_time);
COUNTER_SET(_pip_task_total_timer, (int64_t)_pipeline_task_watcher.elapsed_time());
COUNTER_SET(_wait_dependency_timer, (int64_t)_wait_dependency_watcher.elapsed_time());
}

void PipelineTask::_init_profile() {
Expand All @@ -98,6 +99,7 @@ void PipelineTask::_init_profile() {
_wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
_wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime");
_wait_dependency_timer = ADD_TIMER(_task_profile, "WaitDependencyTime");
_block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
_block_by_source_counts = ADD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT);
_block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT);
Expand Down Expand Up @@ -279,6 +281,16 @@ Status PipelineTask::finalize() {
_task_queue->update_statistics(this, _finalize_timer->value());
}
}};

// propagate wait_source_time, wait_sink_time, wait_dependency_time to all nodes
PipelineTaskTimer pipeline_task_timer;
pipeline_task_timer.wait_dependency_time = (int64_t)_wait_dependency_watcher.elapsed_time();
pipeline_task_timer.wait_source_time = (int64_t)_wait_source_watcher.elapsed_time();
pipeline_task_timer.wait_sink_time = (int64_t)_wait_sink_watcher.elapsed_time();
for (auto& optor : _operators) {
optor->update_profile(pipeline_task_timer);
}
_sink->update_profile(pipeline_task_timer);
SCOPED_TIMER(_finalize_timer);
return _sink->finalize(_state);
}
Expand Down Expand Up @@ -342,6 +354,10 @@ void PipelineTask::set_state(PipelineTaskState state) {
if (state == PipelineTaskState::RUNNABLE) {
_wait_bf_watcher.stop();
}
} else if (_cur_state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
if (state == PipelineTaskState::RUNNABLE) {
_wait_dependency_watcher.stop();
}
} else if (_cur_state == PipelineTaskState::RUNNABLE) {
COUNTER_UPDATE(_block_counts, 1);
if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
Expand All @@ -352,6 +368,9 @@ void PipelineTask::set_state(PipelineTaskState state) {
COUNTER_UPDATE(_block_by_sink_counts, 1);
} else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
_wait_bf_watcher.start();
} else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
_wait_dependency_watcher.start();
// block by dependency should be 1, not need log it
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ class PipelineTask {
// TODO we should calculate the time between when really runnable and runnable
MonotonicStopWatch _wait_schedule_watcher;
RuntimeProfile::Counter* _wait_schedule_timer;
MonotonicStopWatch _wait_dependency_watcher;
RuntimeProfile::Counter* _wait_dependency_timer;
RuntimeProfile::Counter* _yield_counts;
RuntimeProfile::Counter* _core_change_times;

Expand Down
10 changes: 5 additions & 5 deletions be/src/util/runtime_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,11 @@ void RuntimeProfile::pretty_print(std::ostream* s, const std::string& prefix) co
stream.flags(std::ios::fixed);
stream << prefix << _name << ":";

if (total_time->second->value() != 0) {
stream << "(Active: "
<< PrettyPrinter::print(total_time->second->value(), total_time->second->type())
<< ", non-child: " << std::setprecision(2) << _local_time_percent << "%)";
}
//if (total_time->second->value() != 0) {
stream << "(Active: "
<< PrettyPrinter::print(total_time->second->value(), total_time->second->type())
<< ", non-child: " << std::setprecision(2) << _local_time_percent << "%)";
//}

stream << std::endl;

Expand Down
7 changes: 7 additions & 0 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ class TRuntimeProfileTree;

class ObjectPool;

// This class is used by pipeline task to populate the ExecNode's runtime profile
struct PipelineTaskTimer {
int64_t wait_source_time;
int64_t wait_sink_time;
int64_t wait_dependency_time;
};

// Runtime profile is a group of profiling counters. It supports adding named counters
// and being able to serialize and deserialize them.
// The profiles support a tree structure to form a hierarchy of counters.
Expand Down
Loading

0 comments on commit 2cffad6

Please sign in to comment.