From 2cffad6be3db286761e7c31e2fcef8e6f56c58d0 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Mon, 19 Jun 2023 14:37:09 +0800 Subject: [PATCH] [profile](pipeline) populate exec node's total time counter using pipeline task's running time counters --- be/src/exec/exec_node.cpp | 5 +- be/src/exec/exec_node.h | 37 +- .../pipeline/exec/exchange_source_operator.h | 11 + be/src/pipeline/exec/hashjoin_build_sink.h | 2 + .../exec/nested_loop_join_build_operator.h | 2 + be/src/pipeline/exec/operator.h | 38 +- be/src/pipeline/exec/scan_operator.h | 10 + be/src/pipeline/exec/set_sink_operator.h | 2 + be/src/pipeline/pipeline_task.cpp | 19 + be/src/pipeline/pipeline_task.h | 2 + be/src/util/runtime_profile.cpp | 10 +- be/src/util/runtime_profile.h | 7 + be/src/vec/exec/join/vhash_join_node.cpp | 54 +- be/src/vec/exec/join/vhash_join_node.h | 6 +- be/src/vec/exec/join/vjoin_node_base.cpp | 22 +- be/src/vec/exec/join/vjoin_node_base.h | 4 +- .../vec/exec/join/vnested_loop_join_node.cpp | 14 +- be/src/vec/exec/join/vnested_loop_join_node.h | 6 +- be/src/vec/exec/scan/vscan_node.h | 2 + be/src/vec/exec/vaggregation_node.cpp | 10 +- be/src/vec/exec/vaggregation_node.h | 4 +- be/src/vec/exec/vanalytic_eval_node.cpp | 10 +- be/src/vec/exec/vanalytic_eval_node.h | 4 +- be/src/vec/exec/vassert_num_rows_node.cpp | 5 +- be/src/vec/exec/vassert_num_rows_node.h | 2 +- be/src/vec/exec/vexchange_node.cpp | 5 + be/src/vec/exec/vexchange_node.h | 1 + be/src/vec/exec/vpartition_sort_node.cpp | 10 +- be/src/vec/exec/vpartition_sort_node.h | 4 +- be/src/vec/exec/vrepeat_node.cpp | 9 +- be/src/vec/exec/vrepeat_node.h | 4 +- be/src/vec/exec/vselect_node.cpp | 4 +- be/src/vec/exec/vselect_node.h | 2 +- be/src/vec/exec/vset_operation_node.cpp | 9 +- be/src/vec/exec/vset_operation_node.h | 4 +- be/src/vec/exec/vsort_node.cpp | 9 +- be/src/vec/exec/vsort_node.h | 4 +- be/src/vec/exec/vtable_function_node.cpp | 4 +- be/src/vec/exec/vtable_function_node.h | 4 +- be/src/vec/runtime/vdata_stream_recvr.h | 2 + be/src/vec/sink/vmysql_result_writer.cpp | 493 ------------------ be/src/vec/sink/vmysql_result_writer.h | 8 - .../doris/common/util/RuntimeProfile.java | 38 +- 43 files changed, 258 insertions(+), 644 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index c6b7826deb4ffe8..c186c45881e72f8 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -526,6 +526,7 @@ std::string ExecNode::get_name() { } Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_TIMER(_projection_timer); using namespace vectorized; MutableBlock mutable_block = @@ -570,8 +571,4 @@ Status ExecNode::get_next_after_projects( return func(state, block, eos); } -Status ExecNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) { - return Status::NotSupported("{} not implements sink", get_name()); -} - } // namespace doris diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 55cdea53b645757..864d11cfe2a28f1 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -122,14 +122,16 @@ class ExecNode { // Emit data, both need impl with method: sink // Eg: Aggregation, Sort, Scan - [[nodiscard]] virtual Status pull(RuntimeState* state, vectorized::Block* output_block, - bool* eos) { - return get_next(state, output_block, eos); + [[nodiscard]] Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return do_pull(state, output_block, eos); } - [[nodiscard]] virtual Status push(RuntimeState* state, vectorized::Block* input_block, - bool eos) { - return Status::OK(); + // add do_pull and do_push and do_sink method to exec node, because sink,push,pull will be + // reused by many other methods, we could not add total time counter correctly. + [[nodiscard]] Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return do_push(state, input_block, eos); } bool can_read() const { return _can_read; } @@ -137,8 +139,10 @@ class ExecNode { // Sink Data to ExecNode to do some stock work, both need impl with method: get_result // `eos` means source is exhausted, exec node should do some finalize work // Eg: Aggregation, Sort - [[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* input_block, - bool eos); + [[nodiscard]] Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return do_sink(state, input_block, eos); + } // Resets the stream of row batches to be retrieved by subsequent GetNext() calls. // Clears all internal state, returning this node to the state it was in after calling @@ -249,6 +253,23 @@ class ExecNode { /// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc Status do_projections(vectorized::Block* origin_block, vectorized::Block* output_block); + [[nodiscard]] virtual Status do_sink(RuntimeState* state, vectorized::Block* input_block, + bool eos) { + throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, + "do_sink method is not implemented by this class"); + } + + [[nodiscard]] virtual Status do_push(RuntimeState* state, vectorized::Block* input_block, + bool eos) { + throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, + "do_push method is not implemented by this class"); + } + + [[nodiscard]] virtual Status do_pull(RuntimeState* state, vectorized::Block* output_block, + bool* eos) { + return get_next(state, output_block, eos); + } + int _id; // unique w/in single plan tree TPlanNodeType::type _type; ObjectPool* _pool; diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index f80685f396d9f18..c2f9f9a261b872a 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -42,6 +42,17 @@ class ExchangeSourceOperator final : public SourceOperator::_node->runtime_profile() + ->total_time_counter() + ->update(pipeline_task_timer.wait_source_time + + pipeline_task_timer.wait_dependency_time); + + StreamingOperator::_node->update_wait_source_time( + pipeline_task_timer.wait_source_time); + } }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index c1e67512ddd7a61..e6b7b4cef4431fb 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -40,6 +40,8 @@ class HashJoinBuildSink final : public StreamingOperatorcan_sink_write(); } bool is_pending_finish() const override { return !_node->ready_for_finish(); } + // build is not on critital path, should not update total time counter + void update_profile(PipelineTaskTimer& pipeline_task_timer) override {} }; } // namespace pipeline diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 33918f0a0c8952d..74e20eff454bf47 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -40,6 +40,8 @@ class NestLoopJoinBuildOperator final : public StreamingOperator(this, _node); \ - } \ - \ - NAME::NAME(OperatorBuilderBase* operator_builder, ExecNode* node) \ +#define OPERATOR_CODE_GENERATOR(NAME, SUBCLASS) \ + NAME##Builder::NAME##Builder(int32_t id, ExecNode* exec_node) \ + : OperatorBuilder(id, #NAME, exec_node) {} \ + \ + OperatorPtr NAME##Builder::build_operator() { return std::make_shared(this, _node); } \ + \ + NAME::NAME(OperatorBuilderBase* operator_builder, ExecNode* node) \ : SUBCLASS(operator_builder, node) {}; namespace doris::pipeline { @@ -221,6 +219,8 @@ class OperatorBase { return Status::NotSupported(error_msg.str()); } + virtual void update_profile(PipelineTaskTimer& pipeline_task_timer) {} + /** * pending_finish means we have called `close` and there are still some work to do before finishing. * Now it is a pull-based pipeline and operators pull data from its child by this method. @@ -303,6 +303,11 @@ class DataSinkOperator : public OperatorBase { [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { return _sink->profile(); } + // Should add wait sink time to all sink node + void update_profile(PipelineTaskTimer& pipeline_task_timer) override { + _sink->profile()->total_time_counter()->update(pipeline_task_timer.wait_sink_time); + } + protected: NodeType* _sink; }; @@ -370,6 +375,11 @@ class StreamingOperator : public OperatorBase { return _node->runtime_profile(); } + void update_profile(PipelineTaskTimer& pipeline_task_timer) override { + _node->runtime_profile()->total_time_counter()->update( + pipeline_task_timer.wait_source_time + pipeline_task_timer.wait_dependency_time); + } + protected: NodeType* _node; bool _use_projection; @@ -397,6 +407,9 @@ class SourceOperator : public StreamingOperator { source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; return Status::OK(); } + + // In most scenarios, source operator does not do anything. + void update_profile(PipelineTaskTimer& pipeline_task_timer) override {} }; /** @@ -454,6 +467,13 @@ class StatefulOperator : public StreamingOperator { return Status::OK(); } + void update_profile(PipelineTaskTimer& pipeline_task_timer) override { + StreamingOperator::_node->runtime_profile() + ->total_time_counter() + ->update(pipeline_task_timer.wait_source_time + + pipeline_task_timer.wait_dependency_time); + } + protected: std::unique_ptr _child_block; SourceState _child_source_state; diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 850a1ab020be198..25f963588d13c39 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -51,6 +51,16 @@ class ScanOperator : public SourceOperator { std::string debug_string() const override; Status try_close(RuntimeState* state) override; + + // Should add wait source time to scan node + void update_profile(PipelineTaskTimer& pipeline_task_timer) override { + StreamingOperator::_node->runtime_profile() + ->total_time_counter() + ->update(pipeline_task_timer.wait_source_time + + pipeline_task_timer.wait_dependency_time); + StreamingOperator::_node->update_wait_source_time( + pipeline_task_timer.wait_source_time); + } }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index f622bef19ab4985..73fead18fc1f9b2 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -49,6 +49,8 @@ class SetSinkOperator : public StreamingOperator* _set_node; }; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 0e2041c488f03aa..cb31179c4b9eeb4 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -72,6 +72,7 @@ void PipelineTask::_fresh_profile_counter() { COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time); COUNTER_SET(_dst_pending_finish_over_timer, _dst_pending_finish_over_time); COUNTER_SET(_pip_task_total_timer, (int64_t)_pipeline_task_watcher.elapsed_time()); + COUNTER_SET(_wait_dependency_timer, (int64_t)_wait_dependency_watcher.elapsed_time()); } void PipelineTask::_init_profile() { @@ -98,6 +99,7 @@ void PipelineTask::_init_profile() { _wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime"); _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime"); _wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime"); + _wait_dependency_timer = ADD_TIMER(_task_profile, "WaitDependencyTime"); _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT); _block_by_source_counts = ADD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT); _block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT); @@ -279,6 +281,16 @@ Status PipelineTask::finalize() { _task_queue->update_statistics(this, _finalize_timer->value()); } }}; + + // propagate wait_source_time, wait_sink_time, wait_dependency_time to all nodes + PipelineTaskTimer pipeline_task_timer; + pipeline_task_timer.wait_dependency_time = (int64_t)_wait_dependency_watcher.elapsed_time(); + pipeline_task_timer.wait_source_time = (int64_t)_wait_source_watcher.elapsed_time(); + pipeline_task_timer.wait_sink_time = (int64_t)_wait_sink_watcher.elapsed_time(); + for (auto& optor : _operators) { + optor->update_profile(pipeline_task_timer); + } + _sink->update_profile(pipeline_task_timer); SCOPED_TIMER(_finalize_timer); return _sink->finalize(_state); } @@ -342,6 +354,10 @@ void PipelineTask::set_state(PipelineTaskState state) { if (state == PipelineTaskState::RUNNABLE) { _wait_bf_watcher.stop(); } + } else if (_cur_state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) { + if (state == PipelineTaskState::RUNNABLE) { + _wait_dependency_watcher.stop(); + } } else if (_cur_state == PipelineTaskState::RUNNABLE) { COUNTER_UPDATE(_block_counts, 1); if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) { @@ -352,6 +368,9 @@ void PipelineTask::set_state(PipelineTaskState state) { COUNTER_UPDATE(_block_by_sink_counts, 1); } else if (state == PipelineTaskState::BLOCKED_FOR_RF) { _wait_bf_watcher.start(); + } else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) { + _wait_dependency_watcher.start(); + // block by dependency should be 1, not need log it } } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 5cba2ef96ef6314..1d3c38234cc63da 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -311,6 +311,8 @@ class PipelineTask { // TODO we should calculate the time between when really runnable and runnable MonotonicStopWatch _wait_schedule_watcher; RuntimeProfile::Counter* _wait_schedule_timer; + MonotonicStopWatch _wait_dependency_watcher; + RuntimeProfile::Counter* _wait_dependency_timer; RuntimeProfile::Counter* _yield_counts; RuntimeProfile::Counter* _core_change_times; diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index f385256ab210a29..4fa59aeb18b766a 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -489,11 +489,11 @@ void RuntimeProfile::pretty_print(std::ostream* s, const std::string& prefix) co stream.flags(std::ios::fixed); stream << prefix << _name << ":"; - if (total_time->second->value() != 0) { - stream << "(Active: " - << PrettyPrinter::print(total_time->second->value(), total_time->second->type()) - << ", non-child: " << std::setprecision(2) << _local_time_percent << "%)"; - } + //if (total_time->second->value() != 0) { + stream << "(Active: " + << PrettyPrinter::print(total_time->second->value(), total_time->second->type()) + << ", non-child: " << std::setprecision(2) << _local_time_percent << "%)"; + //} stream << std::endl; diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 96d14d65404e524..496f6886f91e50e 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -74,6 +74,13 @@ class TRuntimeProfileTree; class ObjectPool; +// This class is used by pipeline task to populate the ExecNode's runtime profile +struct PipelineTaskTimer { + int64_t wait_source_time; + int64_t wait_sink_time; + int64_t wait_dependency_time; +}; + // Runtime profile is a group of profiling counters. It supports adding named counters // and being able to serialize and deserialize them. // The profiles support a tree structure to form a hierarchy of counters. diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index c048c4cc2f60618..46c1fc44f4eadaa 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -450,29 +450,32 @@ Status HashJoinNode::prepare(RuntimeState* state) { "ProbeKeyArena", TUnit::BYTES, "MemoryUsage"); // Build phase - auto record_profile = _should_build_hash_table ? _build_phase_profile : faker_runtime_profile(); - _build_table_timer = ADD_CHILD_TIMER(_build_phase_profile, "BuildTableTime", "BuildTime"); + _build_table_timer = ADD_CHILD_TIMER(runtime_profile(), "BuildTableTime", "BuildTime"); _build_side_merge_block_timer = - ADD_CHILD_TIMER(_build_phase_profile, "BuildSideMergeBlockTime", "BuildTime"); - _build_table_insert_timer = ADD_TIMER(record_profile, "BuildTableInsertTime"); - _build_expr_call_timer = ADD_TIMER(record_profile, "BuildExprCallTime"); - _build_table_expanse_timer = ADD_TIMER(record_profile, "BuildTableExpanseTime"); - _build_table_convert_timer = ADD_TIMER(record_profile, "BuildTableConvertToPartitionedTime"); - _build_side_compute_hash_timer = ADD_TIMER(record_profile, "BuildSideHashComputingTime"); - _build_runtime_filter_timer = ADD_TIMER(record_profile, "BuildRuntimeFilterTime"); + ADD_CHILD_TIMER(runtime_profile(), "BuildSideMergeBlockTime", "BuildTime"); + _build_table_insert_timer = + ADD_CHILD_TIMER(runtime_profile(), "BuildTableInsertTime", "BuildPhase"); + _build_expr_call_timer = ADD_CHILD_TIMER(runtime_profile(), "BuildExprCallTime", "BuildPhase"); + _build_table_expanse_timer = + ADD_CHILD_TIMER(runtime_profile(), "BuildTableExpanseTime", "BuildPhase"); + _build_table_convert_timer = + ADD_CHILD_TIMER(runtime_profile(), "BuildTableConvertToPartitionedTime", "BuildPhase"); + _build_side_compute_hash_timer = + ADD_CHILD_TIMER(runtime_profile(), "BuildSideHashComputingTime", "BuildPhase"); + _build_runtime_filter_timer = + ADD_CHILD_TIMER(runtime_profile(), "BuildRuntimeFilterTime", "BuildPhase"); // Probe phase - auto probe_phase_profile = _probe_phase_profile; - _probe_next_timer = ADD_TIMER(probe_phase_profile, "ProbeFindNextTime"); - _probe_expr_call_timer = ADD_TIMER(probe_phase_profile, "ProbeExprCallTime"); + _probe_next_timer = ADD_CHILD_TIMER(runtime_profile(), "ProbeFindNextTime", "ProbePhase"); + _probe_expr_call_timer = ADD_CHILD_TIMER(runtime_profile(), "ProbeExprCallTime", "ProbePhase"); _search_hashtable_timer = - ADD_CHILD_TIMER(probe_phase_profile, "ProbeWhenSearchHashTableTime", "ProbeTime"); + ADD_CHILD_TIMER(runtime_profile(), "ProbeWhenSearchHashTableTime", "ProbeTime"); _build_side_output_timer = - ADD_CHILD_TIMER(probe_phase_profile, "ProbeWhenBuildSideOutputTime", "ProbeTime"); + ADD_CHILD_TIMER(runtime_profile(), "ProbeWhenBuildSideOutputTime", "ProbeTime"); _probe_side_output_timer = - ADD_CHILD_TIMER(probe_phase_profile, "ProbeWhenProbeSideOutputTime", "ProbeTime"); + ADD_CHILD_TIMER(runtime_profile(), "ProbeWhenProbeSideOutputTime", "ProbeTime"); _probe_process_hashtable_timer = - ADD_CHILD_TIMER(probe_phase_profile, "ProbeWhenProcessHashTableTime", "ProbeTime"); + ADD_CHILD_TIMER(runtime_profile(), "ProbeWhenProcessHashTableTime", "ProbeTime"); _open_timer = ADD_TIMER(runtime_profile(), "OpenTime"); _allocate_resource_timer = ADD_TIMER(runtime_profile(), "AllocateResourceTime"); _process_other_join_conjunct_timer = ADD_TIMER(runtime_profile(), "OtherJoinConjunctTime"); @@ -539,7 +542,8 @@ void HashJoinNode::prepare_for_next() { _prepare_probe_block(); } -Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { +Status HashJoinNode::do_pull(doris::RuntimeState* state, vectorized::Block* output_block, + bool* eos) { SCOPED_TIMER(_probe_timer); if (_short_circuit_for_probe) { // If we use a short-circuit strategy, should return empty block directly. @@ -640,7 +644,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ return Status::OK(); } -Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* input_block, bool eos) { +Status HashJoinNode::do_push(RuntimeState* /*state*/, vectorized::Block* input_block, bool eos) { _probe_eos = eos; if (input_block->rows() > 0) { COUNTER_UPDATE(_probe_rows_counter, input_block->rows()); @@ -709,10 +713,10 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - RETURN_IF_ERROR(push(state, &_probe_block, _probe_eos)); + RETURN_IF_ERROR(do_push(state, &_probe_block, _probe_eos)); } - return pull(state, output_block, eos); + return do_pull(state, output_block, eos); } void HashJoinNode::_add_tuple_is_null_column(Block* block) { @@ -812,17 +816,17 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { _children[1], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); } - RETURN_IF_ERROR(sink(state, &block, eos)); + RETURN_IF_ERROR(do_sink(state, &block, eos)); } RETURN_IF_ERROR(child(1)->close(state)); } else { RETURN_IF_ERROR(child(1)->close(state)); - RETURN_IF_ERROR(sink(state, nullptr, true)); + RETURN_IF_ERROR(do_sink(state, nullptr, true)); } return Status::OK(); } -Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { +Status HashJoinNode::do_sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { SCOPED_TIMER(_build_timer); // make one block for each 4 gigabytes @@ -911,12 +915,12 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc DCHECK(_shared_hashtable_controller != nullptr); DCHECK(_shared_hash_table_context != nullptr); auto wait_timer = - ADD_CHILD_TIMER(_build_phase_profile, "WaitForSharedHashTableTime", "BuildTime"); + ADD_CHILD_TIMER(runtime_profile(), "WaitForSharedHashTableTime", "BuildTime"); SCOPED_TIMER(wait_timer); RETURN_IF_ERROR( _shared_hashtable_controller->wait_for_signal(state, _shared_hash_table_context)); - _build_phase_profile->add_info_string( + runtime_profile()->add_info_string( "SharedHashTableFrom", print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id()))); _short_circuit_for_null_in_probe_side = diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 096a9148cc955c3..88c2cd75036ddef 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -237,10 +237,10 @@ class HashJoinNode final : public VJoinNodeBase { Status alloc_resource(RuntimeState* state) override; void release_resource(RuntimeState* state) override; - Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override; + Status do_sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override; bool need_more_input_data() const; - Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; - Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; + Status do_pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; + Status do_push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; void prepare_for_next() override; void debug_string(int indentation_level, std::stringstream* out) const override; diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index a4e1493d58b80bf..53d58b499ee699f 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -108,17 +108,19 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des Status VJoinNodeBase::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); - _build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true); - _build_get_next_timer = ADD_TIMER(_build_phase_profile, "BuildGetNextTime"); - _build_timer = ADD_TIMER(_build_phase_profile, "BuildTime"); - _build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows", TUnit::UNIT); - _probe_phase_profile = runtime_profile()->create_child("ProbePhase", true, true); - _probe_timer = ADD_TIMER(_probe_phase_profile, "ProbeTime"); - _join_filter_timer = ADD_CHILD_TIMER(_probe_phase_profile, "JoinFilterTimer", "ProbeTime"); - _build_output_block_timer = - ADD_CHILD_TIMER(_probe_phase_profile, "BuildOutputBlock", "ProbeTime"); - _probe_rows_counter = ADD_COUNTER(_probe_phase_profile, "ProbeRows", TUnit::UNIT); + _build_phase_profile = ADD_LABEL_COUNTER(runtime_profile(), "BuildPhase"); + _build_get_next_timer = ADD_CHILD_TIMER(runtime_profile(), "BuildGetNextTime", "BuildPhase"); + _build_timer = ADD_CHILD_TIMER(runtime_profile(), "BuildTime", "BuildPhase"); + _build_rows_counter = + ADD_CHILD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT, "BuildPhase"); + + _probe_phase_profile = ADD_LABEL_COUNTER(runtime_profile(), "ProbePhase"); + _probe_timer = ADD_CHILD_TIMER(runtime_profile(), "ProbeTime", "ProbePhase"); + _join_filter_timer = ADD_CHILD_TIMER(runtime_profile(), "JoinFilterTimer", "ProbeTime"); + _build_output_block_timer = ADD_CHILD_TIMER(runtime_profile(), "BuildOutputBlock", "ProbeTime"); + _probe_rows_counter = + ADD_CHILD_COUNTER(runtime_profile(), "ProbeRows", TUnit::UNIT, "ProbePhase"); _push_down_timer = ADD_TIMER(runtime_profile(), "PublishRuntimeFilterTime"); _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime"); diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 120e77785ed7959..e400cfc2c816f33 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -132,12 +132,12 @@ class VJoinNodeBase : public ExecNode { MutableColumnPtr _tuple_is_null_left_flag_column; MutableColumnPtr _tuple_is_null_right_flag_column; - RuntimeProfile* _build_phase_profile; + RuntimeProfile::Counter* _build_phase_profile; RuntimeProfile::Counter* _build_timer; RuntimeProfile::Counter* _build_get_next_timer; RuntimeProfile::Counter* _build_rows_counter; - RuntimeProfile* _probe_phase_profile; + RuntimeProfile::Counter* _probe_phase_profile; RuntimeProfile::Counter* _probe_timer; RuntimeProfile::Counter* _probe_rows_counter; RuntimeProfile::Counter* _push_down_timer; diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index a3b6c30c9fb4dd6..d6d2c050ab183c1 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -187,7 +187,7 @@ Status VNestedLoopJoinNode::_materialize_build_side(RuntimeState* state) { std::placeholders::_3))); } - sink(state, &block, eos); + do_sink(state, &block, eos); if (eos) { break; @@ -197,7 +197,8 @@ Status VNestedLoopJoinNode::_materialize_build_side(RuntimeState* state) { return Status::OK(); } -Status VNestedLoopJoinNode::sink(doris::RuntimeState* state, vectorized::Block* block, bool eos) { +Status VNestedLoopJoinNode::do_sink(doris::RuntimeState* state, vectorized::Block* block, + bool eos) { SCOPED_TIMER(_build_timer); auto rows = block->rows(); auto mem_usage = block->allocated_bytes(); @@ -226,7 +227,8 @@ Status VNestedLoopJoinNode::sink(doris::RuntimeState* state, vectorized::Block* return Status::OK(); } -Status VNestedLoopJoinNode::push(doris::RuntimeState* state, vectorized::Block* block, bool eos) { +Status VNestedLoopJoinNode::do_push(doris::RuntimeState* state, vectorized::Block* block, + bool eos) { COUNTER_UPDATE(_probe_rows_counter, block->rows()); _cur_probe_row_visited_flags.resize(block->rows()); std::fill(_cur_probe_row_visited_flags.begin(), _cur_probe_row_visited_flags.end(), 0); @@ -270,10 +272,10 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo while (need_more_input_data()) { RETURN_IF_ERROR(_fresh_left_block(state)); - push(state, &_left_block, _left_side_eos); + do_push(state, &_left_block, _left_side_eos); } - return pull(state, block, eos); + return do_pull(state, block, eos); } void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_block) const { @@ -660,7 +662,7 @@ void VNestedLoopJoinNode::_release_mem() { _tuple_is_null_right_flag_column = nullptr; } -Status VNestedLoopJoinNode::pull(RuntimeState* state, vectorized::Block* block, bool* eos) { +Status VNestedLoopJoinNode::do_pull(RuntimeState* state, vectorized::Block* block, bool* eos) { if (_is_output_left_side_only) { RETURN_IF_ERROR(_build_output_block(&_left_block, block)); *eos = _left_side_eos; diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index 03676629bcc4eb6..5202f3b9ed67512 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -62,11 +62,11 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { void release_resource(doris::RuntimeState* state) override; - Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override; + Status do_sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override; - Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; + Status do_push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; - Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; + Status do_pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; bool need_more_input_data() const; diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 83e8909280d90dc..8703f49eefe459d 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -155,6 +155,8 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { void release_resource(RuntimeState* state) override; Status try_close(RuntimeState* state); + // for scan node, the wait source time is get next time waiting in non pipeline mode + void update_wait_source_time(int64_t delta) { _get_next_timer->update(delta); } bool should_run_serial() const { return _should_run_serial || _state->enable_scan_node_run_serial(); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index d7ebfe0a517c13d..d9f8328e1f5c8b6 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -528,7 +528,7 @@ Status AggregationNode::open(RuntimeState* state) { ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - RETURN_IF_ERROR(sink(state, &block, eos)); + RETURN_IF_ERROR(do_sink(state, &block, eos)); } _children[0]->close(state); @@ -565,16 +565,16 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { if (_preagg_block.rows() != 0) { RETURN_IF_ERROR(do_pre_agg(&_preagg_block, block)); } else { - RETURN_IF_ERROR(pull(state, block, eos)); + RETURN_IF_ERROR(do_pull(state, block, eos)); } } } else { - RETURN_IF_ERROR(pull(state, block, eos)); + RETURN_IF_ERROR(do_pull(state, block, eos)); } return Status::OK(); } -Status AggregationNode::pull(doris::RuntimeState* state, vectorized::Block* block, bool* eos) { +Status AggregationNode::do_pull(doris::RuntimeState* state, vectorized::Block* block, bool* eos) { RETURN_IF_ERROR(_executor.get_result(state, block, eos)); _make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg @@ -584,7 +584,7 @@ Status AggregationNode::pull(doris::RuntimeState* state, vectorized::Block* bloc return Status::OK(); } -Status AggregationNode::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { +Status AggregationNode::do_sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { if (in_block->rows() > 0) { RETURN_IF_ERROR(_executor.execute(in_block)); RETURN_IF_ERROR(_try_spill_disk()); diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index e31240cdbcb049c..205544ec39e8a28 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -832,8 +832,8 @@ class AggregationNode final : public ::doris::ExecNode { Status get_next(RuntimeState* state, Block* block, bool* eos) override; Status close(RuntimeState* state) override; void release_resource(RuntimeState* state) override; - Status pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) override; - Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override; + Status do_pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) override; + Status do_sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override; Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* output_block); bool is_streaming_preagg() const { return _is_streaming_preagg; } diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index 09c64e752d22c39..0d6a26184f2455e 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -262,8 +262,8 @@ Status VAnalyticEvalNode::alloc_resource(RuntimeState* state) { return Status::OK(); } -Status VAnalyticEvalNode::pull(doris::RuntimeState* /*state*/, vectorized::Block* output_block, - bool* eos) { +Status VAnalyticEvalNode::do_pull(doris::RuntimeState* /*state*/, vectorized::Block* output_block, + bool* eos) { if (_input_eos && (_output_block_index == _input_blocks.size() || _input_total_rows == 0)) { *eos = true; return Status::OK(); @@ -534,12 +534,12 @@ Status VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) { std::placeholders::_3))); } while (!_input_eos && block.rows() == 0); - RETURN_IF_ERROR(sink(state, &block, _input_eos)); + RETURN_IF_ERROR(do_sink(state, &block, _input_eos)); return Status::OK(); } -Status VAnalyticEvalNode::sink(doris::RuntimeState* /*state*/, vectorized::Block* input_block, - bool eos) { +Status VAnalyticEvalNode::do_sink(doris::RuntimeState* /*state*/, vectorized::Block* input_block, + bool eos) { _input_eos = eos; if (_input_eos && input_block->rows() == 0) { _need_more_input = false; diff --git a/be/src/vec/exec/vanalytic_eval_node.h b/be/src/vec/exec/vanalytic_eval_node.h index d232c28ff9c8d6d..3284418d7d952c6 100644 --- a/be/src/vec/exec/vanalytic_eval_node.h +++ b/be/src/vec/exec/vanalytic_eval_node.h @@ -79,8 +79,8 @@ class VAnalyticEvalNode : public ExecNode { Status close(RuntimeState* state) override; Status alloc_resource(RuntimeState* state) override; void release_resource(RuntimeState* state) override; - Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override; - Status pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) override; + Status do_sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override; + Status do_pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) override; bool can_read(); bool can_write(); diff --git a/be/src/vec/exec/vassert_num_rows_node.cpp b/be/src/vec/exec/vassert_num_rows_node.cpp index 84421180dd290e3..e188b2a808f0b1f 100644 --- a/be/src/vec/exec/vassert_num_rows_node.cpp +++ b/be/src/vec/exec/vassert_num_rows_node.cpp @@ -60,7 +60,8 @@ Status VAssertNumRowsNode::open(RuntimeState* state) { return Status::OK(); } -Status VAssertNumRowsNode::pull(doris::RuntimeState* state, vectorized::Block* block, bool* eos) { +Status VAssertNumRowsNode::do_pull(doris::RuntimeState* state, vectorized::Block* block, + bool* eos) { _num_rows_returned += block->rows(); bool assert_res = false; switch (_assertion) { @@ -115,7 +116,7 @@ Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - return pull(state, block, eos); + return do_pull(state, block, eos); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/vassert_num_rows_node.h b/be/src/vec/exec/vassert_num_rows_node.h index 90b74b0acb05f7e..7f2a7bc28765ffe 100644 --- a/be/src/vec/exec/vassert_num_rows_node.h +++ b/be/src/vec/exec/vassert_num_rows_node.h @@ -39,7 +39,7 @@ class VAssertNumRowsNode : public ExecNode { Status open(RuntimeState* state) override; Status get_next(RuntimeState* state, Block* block, bool* eos) override; - Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; + Status do_pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; private: int64_t _desired_num_rows; diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index 3d9a50ded23ba24..7a342948c431829 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -88,6 +88,11 @@ Status VExchangeNode::alloc_resource(RuntimeState* state) { return Status::OK(); } +// for exchange node, the wait source time is data arrival wait time in non pipeline mode +void VExchangeNode::update_wait_source_time(int64_t delta) { + _stream_recvr->update_wait_source_time(delta); +} + Status VExchangeNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::open(state)); diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h index c4f083dda480221..0d000a7ba1e1693 100644 --- a/be/src/vec/exec/vexchange_node.h +++ b/be/src/vec/exec/vexchange_node.h @@ -55,6 +55,7 @@ class VExchangeNode : public ExecNode { Status open(RuntimeState* state) override; Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override; void release_resource(RuntimeState* state) override; + void update_wait_source_time(int64_t delta); Status collect_query_statistics(QueryStatistics* statistics) override; Status close(RuntimeState* state) override; diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index c31c8af6a99d7ed..f6bca2dc4d5ba32 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -169,7 +169,7 @@ void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_colum _partitioned_data->_partition_method_variant); } -Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) { +Status VPartitionSortNode::do_sink(RuntimeState* state, vectorized::Block* input_block, bool eos) { auto current_rows = input_block->rows(); if (current_rows > 0) { child_input_rows = child_input_rows + current_rows; @@ -243,7 +243,7 @@ Status VPartitionSortNode::open(RuntimeState* state) { ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - RETURN_IF_ERROR(sink(state, input_block.get(), eos)); + RETURN_IF_ERROR(do_sink(state, input_block.get(), eos)); } while (!eos); child(0)->close(state); @@ -266,8 +266,8 @@ bool VPartitionSortNode::can_read() { return !_blocks_buffer.empty() || _can_read; } -Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, - bool* eos) { +Status VPartitionSortNode::do_pull(doris::RuntimeState* state, vectorized::Block* output_block, + bool* eos) { RETURN_IF_CANCELLED(state); output_block->clear_column_data(); { @@ -300,7 +300,7 @@ Status VPartitionSortNode::get_next(RuntimeState* state, Block* output_block, bo VLOG_CRITICAL << "VPartitionSortNode::get_next"; SCOPED_TIMER(_runtime_profile->total_time_counter()); - return pull(state, output_block, eos); + return do_pull(state, output_block, eos); } Status VPartitionSortNode::get_sorted_block(RuntimeState* state, Block* output_block, diff --git a/be/src/vec/exec/vpartition_sort_node.h b/be/src/vec/exec/vpartition_sort_node.h index 65384dc1c5ca276..f08634038b79f04 100644 --- a/be/src/vec/exec/vpartition_sort_node.h +++ b/be/src/vec/exec/vpartition_sort_node.h @@ -448,8 +448,8 @@ class VPartitionSortNode : public ExecNode { Status get_next(RuntimeState* state, Block* block, bool* eos) override; Status close(RuntimeState* state) override; - Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; - Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override; + Status do_pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; + Status do_sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override; void debug_profile(); bool can_read(); diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index 58c993c8b3213a6..7e7eccead314bb8 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -171,7 +171,8 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl return Status::OK(); } -Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { +Status VRepeatNode::do_pull(doris::RuntimeState* state, vectorized::Block* output_block, + bool* eos) { RETURN_IF_CANCELLED(state); DCHECK(_repeat_id_idx >= 0); for (const std::vector& v : _grouping_list) { @@ -199,7 +200,7 @@ Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block* output_b return Status::OK(); } -Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block, bool eos) { +Status VRepeatNode::do_push(RuntimeState* state, vectorized::Block* input_block, bool eos) { _child_eos = eos; DCHECK(!_intermediate_block || _intermediate_block->rows() == 0); DCHECK(!_expr_ctxs.empty()); @@ -247,10 +248,10 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) { _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - push(state, &_child_block, _child_eos); + do_push(state, &_child_block, _child_eos); } - return pull(state, block, eos); + return do_pull(state, block, eos); } Status VRepeatNode::close(RuntimeState* state) { diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h index 837b4c8aca14e98..154689217a0ea0c 100644 --- a/be/src/vec/exec/vrepeat_node.h +++ b/be/src/vec/exec/vrepeat_node.h @@ -54,8 +54,8 @@ class VRepeatNode : public ExecNode { Status get_next(RuntimeState* state, Block* block, bool* eos) override; Status close(RuntimeState* state) override; - Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; - Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; + Status do_pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; + Status do_push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; bool need_more_input_data() const; Block* get_child_block() { return &_child_block; } diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp index ee1628cd19fb18f..b51e5bea8a24090 100644 --- a/be/src/vec/exec/vselect_node.cpp +++ b/be/src/vec/exec/vselect_node.cpp @@ -70,10 +70,10 @@ Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool } } while (block->rows() == 0); - return pull(state, block, eos); + return do_pull(state, block, eos); } -Status VSelectNode::pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) { +Status VSelectNode::do_pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); reached_limit(output_block, eos); diff --git a/be/src/vec/exec/vselect_node.h b/be/src/vec/exec/vselect_node.h index d6783d723729486..3f392cd6e997ab1 100644 --- a/be/src/vec/exec/vselect_node.h +++ b/be/src/vec/exec/vselect_node.h @@ -36,7 +36,7 @@ class VSelectNode final : public ExecNode { Status open(RuntimeState* state) override; Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; Status close(RuntimeState* state) override; - Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; + Status do_pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; private: // true if last get_next() call on child signalled eos diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 17db664c8a2b5d6..64bf12dd17eb9e9 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -254,7 +254,7 @@ template Status VSetOperationNode::get_next(RuntimeState* state, Block* output_block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - return pull(state, output_block, eos); + return do_pull(state, output_block, eos); } template @@ -378,7 +378,7 @@ void VSetOperationNode::hash_table_init() { } template -Status VSetOperationNode::sink(RuntimeState* state, Block* block, bool eos) { +Status VSetOperationNode::do_sink(RuntimeState* state, Block* block, bool eos) { constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; if (block->rows() != 0) { @@ -413,7 +413,8 @@ Status VSetOperationNode::sink(RuntimeState* state, Block* block, } template -Status VSetOperationNode::pull(RuntimeState* state, Block* output_block, bool* eos) { +Status VSetOperationNode::do_pull(RuntimeState* state, Block* output_block, + bool* eos) { SCOPED_TIMER(_pull_timer); create_mutable_cols(output_block); auto st = std::visit( @@ -452,7 +453,7 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { if (eos) { child(0)->close(state); } - sink(state, &block, eos); + do_sink(state, &block, eos); } return Status::OK(); diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index 39a74f899ed1ff1..2d522aafc7c3682 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -66,8 +66,8 @@ class VSetOperationNode final : public ExecNode { Status alloc_resource(RuntimeState* state) override; void release_resource(RuntimeState* state) override; - Status sink(RuntimeState* state, Block* block, bool eos) override; - Status pull(RuntimeState* state, Block* output_block, bool* eos) override; + Status do_sink(RuntimeState* state, Block* block, bool eos) override; + Status do_pull(RuntimeState* state, Block* output_block, bool* eos) override; Status sink_probe(RuntimeState* state, int child_id, Block* block, bool eos); Status finalize_probe(RuntimeState* state, int child_id); diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 45ef327dd37dd7a..f898013db9bc0cf 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -135,7 +135,7 @@ Status VSortNode::alloc_resource(doris::RuntimeState* state) { return Status::OK(); } -Status VSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) { +Status VSortNode::do_sink(RuntimeState* state, vectorized::Block* input_block, bool eos) { if (input_block->rows() > 0) { RETURN_IF_ERROR(_sorter->append_block(input_block)); RETURN_IF_CANCELLED(state); @@ -187,7 +187,7 @@ Status VSortNode::open(RuntimeState* state) { } { SCOPED_TIMER(_sink_timer); - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(sink(state, upstream_block.get(), eos)); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(do_sink(state, upstream_block.get(), eos)); } } while (!eos); @@ -200,8 +200,7 @@ Status VSortNode::open(RuntimeState* state) { return Status::OK(); } -Status VSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { - SCOPED_TIMER(_get_next_timer); +Status VSortNode::do_pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sorter->get_next(state, output_block, eos)); reached_limit(output_block, eos); if (*eos) { @@ -213,7 +212,7 @@ Status VSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_blo Status VSortNode::get_next(RuntimeState* state, Block* block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - return pull(state, block, eos); + return do_pull(state, block, eos); } Status VSortNode::reset(RuntimeState* state) { diff --git a/be/src/vec/exec/vsort_node.h b/be/src/vec/exec/vsort_node.h index 37c781bde285c20..c6f348c4c731919 100644 --- a/be/src/vec/exec/vsort_node.h +++ b/be/src/vec/exec/vsort_node.h @@ -70,9 +70,9 @@ class VSortNode final : public doris::ExecNode { void release_resource(RuntimeState* state) override; - Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; + Status do_pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; - Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override; + Status do_sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override; protected: void debug_string(int indentation_level, std::stringstream* out) const override; diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index 81fdef438900ddc..f4cc78428b0b9d9 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -148,10 +148,10 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - RETURN_IF_ERROR(push(state, &_child_block, _child_eos)); + RETURN_IF_ERROR(do_push(state, &_child_block, _child_eos)); } - return pull(state, block, eos); + return do_pull(state, block, eos); } Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* output_block, diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h index 040ca3f7afef160..a56b7a1d121ee57 100644 --- a/be/src/vec/exec/vtable_function_node.h +++ b/be/src/vec/exec/vtable_function_node.h @@ -69,7 +69,7 @@ class VTableFunctionNode final : public ExecNode { ExecNode::release_resource(state); } - Status push(RuntimeState*, Block* input_block, bool eos) override { + Status do_push(RuntimeState*, Block* input_block, bool eos) override { _child_eos = eos; if (input_block->rows() == 0) { return Status::OK(); @@ -82,7 +82,7 @@ class VTableFunctionNode final : public ExecNode { return Status::OK(); } - Status pull(RuntimeState* state, Block* output_block, bool* eos) override { + Status do_pull(RuntimeState* state, Block* output_block, bool* eos) override { RETURN_IF_ERROR(_get_expanded_block(state, output_block, eos)); reached_limit(output_block, eos); return Status::OK(); diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 3e85a649c527864..aabcc46bbd9ddf3 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -83,6 +83,8 @@ class VDataStreamRecvr { bool sender_queue_empty(int sender_id); bool ready_to_read(); + // for exchange node, the wait source time is data arrival wait time in non pipeline mode + void update_wait_source_time(int64_t delta) { _data_arrival_timer->update(delta); } Status get_next(Block* block, bool* eos); diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index 79bc099246e3c69..ee4587879943d55 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -103,499 +103,6 @@ void VMysqlResultWriter::_init_profile() { _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES); } -// (TODO Amory: do not need this function) -template -template -Status VMysqlResultWriter::_add_one_column( - const ColumnPtr& column_ptr, std::unique_ptr& result, - std::vector>& rows_buffer, bool arg_const, int scale, - const DataTypes& sub_types) { - SCOPED_TIMER(_convert_tuple_timer); - - //if arg_const is true, the column_ptr is already expanded to one row - const auto row_size = rows_buffer.size(); - - doris::vectorized::ColumnPtr column; - if constexpr (is_nullable) { - column = assert_cast(*column_ptr).get_nested_column_ptr(); - } else { - column = column_ptr; - } - - int buf_ret = 0; - - if constexpr (type == TYPE_OBJECT || type == TYPE_QUANTILE_STATE || type == TYPE_VARCHAR || - type == TYPE_JSONB) { - for (int i = 0; i < row_size; ++i) { - if (0 != buf_ret) { - return Status::InternalError("pack mysql buffer failed."); - } - - const auto col_index = index_check_const(i, arg_const); - - if constexpr (is_nullable) { - if (column_ptr->is_null_at(col_index)) { - buf_ret = rows_buffer[i].push_null(); - continue; - } - } - - if constexpr (type == TYPE_OBJECT) { - if (column->is_bitmap() && output_object_data()) { - const vectorized::ColumnComplexType* pColumnComplexType = - assert_cast*>( - column.get()); - BitmapValue bitmapValue = pColumnComplexType->get_element(col_index); - size_t size = bitmapValue.getSizeInBytes(); - std::unique_ptr buf = std::make_unique(size); - bitmapValue.write_to(buf.get()); - buf_ret = rows_buffer[i].push_string(buf.get(), size); - } else if (column->is_hll() && output_object_data()) { - const vectorized::ColumnComplexType* pColumnComplexType = - assert_cast*>( - column.get()); - HyperLogLog hyperLogLog = pColumnComplexType->get_element(col_index); - size_t size = hyperLogLog.max_serialized_size(); - std::unique_ptr buf = std::make_unique(size); - hyperLogLog.serialize((uint8*)buf.get()); - buf_ret = rows_buffer[i].push_string(buf.get(), size); - - } else if (column->is_quantile_state() && output_object_data()) { - const vectorized::ColumnComplexType* pColumnComplexType = - assert_cast*>( - column.get()); - QuantileStateDouble quantileValue = pColumnComplexType->get_element(col_index); - size_t size = quantileValue.get_serialized_size(); - std::unique_ptr buf = std::make_unique(size); - quantileValue.serialize((uint8_t*)buf.get()); - buf_ret = rows_buffer[i].push_string(buf.get(), size); - } else { - buf_ret = rows_buffer[i].push_null(); - } - } - if constexpr (type == TYPE_VARCHAR) { - const auto string_val = column->get_data_at(col_index); - - if (string_val.data == nullptr) { - if (string_val.size == 0) { - // 0x01 is a magic num, not useful actually, just for present "" - char* tmp_val = reinterpret_cast(0x01); - buf_ret = rows_buffer[i].push_string(tmp_val, string_val.size); - } else { - buf_ret = rows_buffer[i].push_null(); - } - } else { - buf_ret = rows_buffer[i].push_string(string_val.data, string_val.size); - } - } - if constexpr (type == TYPE_JSONB) { - const auto jsonb_val = column->get_data_at(col_index); - // jsonb size == 0 is NULL - if (jsonb_val.data == nullptr || jsonb_val.size == 0) { - buf_ret = rows_buffer[i].push_null(); - } else { - std::string json_str = - JsonbToJson::jsonb_to_json_string(jsonb_val.data, jsonb_val.size); - buf_ret = rows_buffer[i].push_string(json_str.c_str(), json_str.size()); - } - } - } - } else if constexpr (type == TYPE_ARRAY) { - DCHECK_EQ(sub_types.size(), 1); - auto& column_array = assert_cast(*column); - auto& offsets = column_array.get_offsets(); - for (ssize_t i = 0; i < row_size; ++i) { - if (0 != buf_ret) { - return Status::InternalError("pack mysql buffer failed."); - } - - const auto col_index = index_check_const(i, arg_const); - - if constexpr (is_nullable) { - if (column_ptr->is_null_at(col_index)) { - buf_ret = rows_buffer[i].push_null(); - continue; - } - } - - rows_buffer[i].open_dynamic_mode(); - buf_ret = rows_buffer[i].push_string("[", 1); - bool begin = true; - for (auto j = offsets[col_index - 1]; j < offsets[col_index]; ++j) { - if (!begin) { - buf_ret = rows_buffer[i].push_string(", ", 2); - } - const auto& data = column_array.get_data_ptr(); - if (data->is_null_at(j)) { - buf_ret = rows_buffer[i].push_string("NULL", strlen("NULL")); - } else { - if (WhichDataType(remove_nullable(sub_types[0])).is_string()) { - buf_ret = rows_buffer[i].push_string("'", 1); - buf_ret = _add_one_cell(data, j, sub_types[0], rows_buffer[i], scale); - buf_ret = rows_buffer[i].push_string("'", 1); - } else { - buf_ret = _add_one_cell(data, j, sub_types[0], rows_buffer[i], scale); - } - } - begin = false; - } - buf_ret = rows_buffer[i].push_string("]", 1); - rows_buffer[i].close_dynamic_mode(); - } - } else if constexpr (type == TYPE_MAP) { - DCHECK_GE(sub_types.size(), 1); - auto& map_type = assert_cast(*sub_types[0]); - for (ssize_t i = 0; i < row_size; ++i) { - if (0 != buf_ret) { - return Status::InternalError("pack mysql buffer failed."); - } - - const auto col_index = index_check_const(i, arg_const); - - if constexpr (is_nullable) { - if (column_ptr->is_null_at(col_index)) { - buf_ret = rows_buffer[i].push_null(); - continue; - } - } - rows_buffer[i].open_dynamic_mode(); - std::string cell_str = map_type.to_string(*column, col_index); - buf_ret = rows_buffer[i].push_string(cell_str.c_str(), strlen(cell_str.c_str())); - - rows_buffer[i].close_dynamic_mode(); - } - } else if constexpr (type == TYPE_STRUCT) { - DCHECK_GE(sub_types.size(), 1); - auto& column_struct = assert_cast(*column); - for (ssize_t i = 0; i < row_size; ++i) { - if (0 != buf_ret) { - return Status::InternalError("pack mysql buffer failed."); - } - - const auto col_index = index_check_const(i, arg_const); - - if constexpr (is_nullable) { - if (column_ptr->is_null_at(col_index)) { - buf_ret = rows_buffer[i].push_null(); - continue; - } - } - - rows_buffer[i].open_dynamic_mode(); - buf_ret = rows_buffer[i].push_string("{", 1); - bool begin = true; - for (size_t j = 0; j < sub_types.size(); ++j) { - if (!begin) { - buf_ret = rows_buffer[i].push_string(", ", 2); - } - const auto& data = column_struct.get_column_ptr(j); - if (data->is_null_at(col_index)) { - buf_ret = rows_buffer[i].push_string("NULL", strlen("NULL")); - } else { - if (WhichDataType(remove_nullable(sub_types[j])).is_string()) { - buf_ret = rows_buffer[i].push_string("'", 1); - buf_ret = _add_one_cell(data, col_index, sub_types[j], rows_buffer[i]); - buf_ret = rows_buffer[i].push_string("'", 1); - } else { - buf_ret = _add_one_cell(data, col_index, sub_types[j], rows_buffer[i]); - } - } - begin = false; - } - buf_ret = rows_buffer[i].push_string("}", 1); - rows_buffer[i].close_dynamic_mode(); - } - } else if constexpr (type == TYPE_DECIMAL32 || type == TYPE_DECIMAL64 || - type == TYPE_DECIMAL128I) { - DCHECK_EQ(sub_types.size(), 1); - for (int i = 0; i < row_size; ++i) { - if (0 != buf_ret) { - return Status::InternalError("pack mysql buffer failed."); - } - - const auto col_index = index_check_const(i, arg_const); - - if constexpr (is_nullable) { - if (column_ptr->is_null_at(col_index)) { - buf_ret = rows_buffer[i].push_null(); - continue; - } - } - std::string decimal_str = sub_types[0]->to_string(*column, col_index); - buf_ret = rows_buffer[i].push_string(decimal_str.c_str(), decimal_str.length()); - } - } else { - using ColumnType = typename PrimitiveTypeTraits::ColumnType; - auto& data = assert_cast(*column).get_data(); - - for (int i = 0; i < row_size; ++i) { - if (0 != buf_ret) { - return Status::InternalError("pack mysql buffer failed."); - } - - const auto col_index = index_check_const(i, arg_const); - - if constexpr (is_nullable) { - if (column_ptr->is_null_at(col_index)) { - buf_ret = rows_buffer[i].push_null(); - continue; - } - } - - if constexpr (type == TYPE_BOOLEAN) { - //todo here need to using uint after MysqlRowBuffer support it - buf_ret = rows_buffer[i].push_tinyint(data[col_index]); - } - if constexpr (type == TYPE_TINYINT) { - buf_ret = rows_buffer[i].push_tinyint(data[col_index]); - } - if constexpr (type == TYPE_SMALLINT) { - buf_ret = rows_buffer[i].push_smallint(data[col_index]); - } - if constexpr (type == TYPE_INT) { - buf_ret = rows_buffer[i].push_int(data[col_index]); - } - if constexpr (type == TYPE_BIGINT) { - buf_ret = rows_buffer[i].push_bigint(data[col_index]); - } - if constexpr (type == TYPE_LARGEINT) { - auto v = LargeIntValue::to_string(data[col_index]); - buf_ret = rows_buffer[i].push_string(v.c_str(), v.size()); - } - if constexpr (type == TYPE_FLOAT) { - buf_ret = rows_buffer[i].push_float(data[col_index]); - } - if constexpr (type == TYPE_DOUBLE) { - buf_ret = rows_buffer[i].push_double(data[col_index]); - } - if constexpr (type == TYPE_TIME) { - buf_ret = rows_buffer[i].push_time(data[col_index]); - } - if constexpr (type == TYPE_TIMEV2) { - buf_ret = rows_buffer[i].push_timev2(data[col_index]); - } - if constexpr (type == TYPE_DATETIME) { - auto time_num = data[col_index]; - VecDateTimeValue time_val = binary_cast(time_num); - buf_ret = rows_buffer[i].push_vec_datetime(time_val); - } - if constexpr (type == TYPE_DATEV2) { - auto time_num = data[col_index]; - DateV2Value date_val = - binary_cast>(time_num); - buf_ret = rows_buffer[i].push_vec_datetime(date_val); - } - if constexpr (type == TYPE_DATETIMEV2) { - auto time_num = data[col_index]; - char buf[64]; - DateV2Value date_val = - binary_cast>(time_num); - char* pos = date_val.to_string(buf, scale); - buf_ret = rows_buffer[i].push_string(buf, pos - buf - 1); - } - if constexpr (type == TYPE_DECIMALV2) { - DecimalV2Value decimal_val(data[col_index]); - auto decimal_str = decimal_val.to_string(scale); - buf_ret = rows_buffer[i].push_string(decimal_str.c_str(), decimal_str.length()); - } - } - } - if (0 != buf_ret) { - return Status::InternalError("pack mysql buffer failed."); - } - - return Status::OK(); -} - -template -int VMysqlResultWriter::_add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, - const DataTypePtr& type, - MysqlRowBuffer& buffer, - int scale) { - WhichDataType which(type->get_type_id()); - if (which.is_nullable() && column_ptr->is_null_at(row_idx)) { - return buffer.push_null(); - } - - ColumnPtr column; - if (which.is_nullable()) { - column = assert_cast(*column_ptr).get_nested_column_ptr(); - which = WhichDataType(assert_cast(*type).get_nested_type()); - } else { - column = column_ptr; - } - - if (which.is_uint8()) { - auto& data = assert_cast(*column).get_data(); - return buffer.push_tinyint(data[row_idx]); - } else if (which.is_int8()) { - auto& data = assert_cast(*column).get_data(); - return buffer.push_tinyint(data[row_idx]); - } else if (which.is_int16()) { - auto& data = assert_cast(*column).get_data(); - return buffer.push_smallint(data[row_idx]); - } else if (which.is_int32()) { - auto& data = assert_cast(*column).get_data(); - return buffer.push_int(data[row_idx]); - } else if (which.is_int64()) { - auto& data = assert_cast(*column).get_data(); - return buffer.push_bigint(data[row_idx]); - } else if (which.is_int128()) { - auto& data = assert_cast(*column).get_data(); - auto v = LargeIntValue::to_string(data[row_idx]); - return buffer.push_string(v.c_str(), v.size()); - } else if (which.is_float32()) { - auto& data = assert_cast(*column).get_data(); - return buffer.push_float(data[row_idx]); - } else if (which.is_float64()) { - auto& data = assert_cast(*column).get_data(); - return buffer.push_double(data[row_idx]); - } else if (which.is_string()) { - int buf_ret = 0; - const auto string_val = column->get_data_at(row_idx); - if (string_val.data == nullptr) { - if (string_val.size == 0) { - // 0x01 is a magic num, not useful actually, just for present "" - char* tmp_val = reinterpret_cast(0x01); - buf_ret = buffer.push_string(tmp_val, string_val.size); - } else { - buf_ret = buffer.push_null(); - } - } else { - buf_ret = buffer.push_string(string_val.data, string_val.size); - } - return buf_ret; - } else if (which.is_date_or_datetime()) { - auto& column_vector = assert_cast&>(*column); - auto value = column_vector[row_idx].get(); - VecDateTimeValue datetime = binary_cast(value); - if (which.is_date()) { - datetime.cast_to_date(); - } - char buf[64]; - char* pos = datetime.to_string(buf); - return buffer.push_string(buf, pos - buf - 1); - } else if (which.is_date_v2()) { - auto& column_vector = assert_cast&>(*column); - auto value = column_vector[row_idx].get(); - DateV2Value datev2 = - binary_cast>(value); - char buf[64]; - char* pos = datev2.to_string(buf); - return buffer.push_string(buf, pos - buf - 1); - } else if (which.is_date_time_v2()) { - auto& column_vector = assert_cast&>(*column); - auto value = column_vector[row_idx].get(); - DateV2Value datetimev2 = - binary_cast>(value); - char buf[64]; - char* pos = datetimev2.to_string(buf, scale); - return buffer.push_string(buf, pos - buf - 1); - } else if (which.is_decimal32()) { - DataTypePtr nested_type = type; - if (type->is_nullable()) { - nested_type = assert_cast(*type).get_nested_type(); - } - auto decimal_str = assert_cast*>(nested_type.get()) - ->to_string(*column, row_idx); - return buffer.push_string(decimal_str.c_str(), decimal_str.length()); - } else if (which.is_decimal64()) { - DataTypePtr nested_type = type; - if (type->is_nullable()) { - nested_type = assert_cast(*type).get_nested_type(); - } - auto decimal_str = assert_cast*>(nested_type.get()) - ->to_string(*column, row_idx); - return buffer.push_string(decimal_str.c_str(), decimal_str.length()); - } else if (which.is_decimal128()) { - auto& column_data = - static_cast&>(*column).get_data(); - DecimalV2Value decimal_val(column_data[row_idx]); - auto decimal_str = decimal_val.to_string(); - return buffer.push_string(decimal_str.c_str(), decimal_str.length()); - } else if (which.is_decimal128i()) { - DataTypePtr nested_type = type; - if (type->is_nullable()) { - nested_type = assert_cast(*type).get_nested_type(); - } - auto decimal_str = assert_cast*>(nested_type.get()) - ->to_string(*column, row_idx); - return buffer.push_string(decimal_str.c_str(), decimal_str.length()); - // TODO(xy): support nested struct - } else if (which.is_array()) { - auto& column_array = assert_cast(*column); - auto& offsets = column_array.get_offsets(); - DataTypePtr sub_type; - if (type->is_nullable()) { - auto& nested_type = assert_cast(*type).get_nested_type(); - sub_type = assert_cast(*nested_type).get_nested_type(); - } else { - sub_type = assert_cast(*type).get_nested_type(); - } - - int start = offsets[row_idx - 1]; - int length = offsets[row_idx] - start; - const auto& data = column_array.get_data_ptr(); - - int buf_ret = buffer.push_string("[", strlen("[")); - bool begin = true; - for (int i = 0; i < length; ++i) { - int position = start + i; - if (begin) { - begin = false; - } else { - buf_ret = buffer.push_string(", ", strlen(", ")); - } - if (data->is_null_at(position)) { - buf_ret = buffer.push_string("NULL", strlen("NULL")); - } else { - buf_ret = _add_one_cell(data, position, sub_type, buffer); - } - } - buf_ret = buffer.push_string("]", strlen("]")); - return buf_ret; - } else if (which.is_struct()) { - auto& column_struct = assert_cast(*column); - - DataTypePtr nested_type = type; - if (type->is_nullable()) { - nested_type = assert_cast(*type).get_nested_type(); - } - - size_t tuple_size = column_struct.tuple_size(); - - int buf_ret = buffer.push_string("{", strlen("{")); - bool begin = true; - for (int i = 0; i < tuple_size; ++i) { - const auto& data = column_struct.get_column_ptr(i); - const auto& sub_type = assert_cast(*nested_type).get_element(i); - - if (begin) { - begin = false; - } else { - buf_ret = buffer.push_string(", ", strlen(", ")); - } - - if (data->is_null_at(row_idx)) { - buf_ret = buffer.push_string("NULL", strlen("NULL")); - } else { - if (WhichDataType(remove_nullable(sub_type)).is_string()) { - buf_ret = buffer.push_string("'", 1); - buf_ret = _add_one_cell(data, row_idx, sub_type, buffer, scale); - buf_ret = buffer.push_string("'", 1); - } else { - buf_ret = _add_one_cell(data, row_idx, sub_type, buffer, scale); - } - } - } - buf_ret = buffer.push_string("}", strlen("}")); - return buf_ret; - } else { - LOG(WARNING) << "sub TypeIndex(" << (int)which.idx << "not supported yet"; - return -1; - } -} - template Status VMysqlResultWriter::append_block(Block& input_block) { SCOPED_TIMER(_append_row_batch_timer); diff --git a/be/src/vec/sink/vmysql_result_writer.h b/be/src/vec/sink/vmysql_result_writer.h index b1954f2b01904d7..11c0cadb1aa0d5e 100644 --- a/be/src/vec/sink/vmysql_result_writer.h +++ b/be/src/vec/sink/vmysql_result_writer.h @@ -58,14 +58,6 @@ class VMysqlResultWriter final : public ResultWriter { private: void _init_profile(); - template - Status _add_one_column(const ColumnPtr& column_ptr, std::unique_ptr& result, - std::vector>& rows_buffer, - bool arg_const, int scale = -1, - const DataTypes& sub_types = DataTypes()); - int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const DataTypePtr& type, - MysqlRowBuffer& buffer, int scale = -1); - BufferControlBlock* _sinker; const VExprContextSPtrs& _output_vexpr_ctxs; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index fea16c2f75b83a9..04a7a82d5db05d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -159,7 +159,8 @@ public void update(final TRuntimeProfileTree thriftProfile) { // preorder traversal, idx should be modified in the traversal process private void update(List nodes, Reference idx) { TRuntimeProfileNode node = nodes.get(idx.getRef()); - // Make sure to update the latest LoadChannel profile according to the timestamp. + // Make sure to update the latest LoadChannel profile according to the + // timestamp. if (node.timestamp != -1 && node.timestamp < timestamp) { return; } @@ -182,8 +183,7 @@ private void update(List nodes, Reference idx) { if (node.child_counters_map != null) { // update childCounters - for (Map.Entry> entry : - node.child_counters_map.entrySet()) { + for (Map.Entry> entry : node.child_counters_map.entrySet()) { String parentCounterName = entry.getKey(); counterLock.writeLock().lock(); @@ -245,24 +245,24 @@ private void update(List nodes, Reference idx) { } // Print the profile: - // 1. Profile Name - // 2. Info Strings - // 3. Counters - // 4. Children + // 1. Profile Name + // 2. Info Strings + // 3. Counters + // 4. Children public void prettyPrint(StringBuilder builder, String prefix) { Counter counter = this.counterMap.get("TotalTime"); Preconditions.checkState(counter != null); // 1. profile name builder.append(prefix).append(name).append(":"); // total time - if (counter.getValue() != 0) { - try (Formatter fmt = new Formatter()) { - builder.append("(Active: ") - .append(this.printCounter(counter.getValue(), counter.getType())) - .append(", % non-child: ").append(fmt.format("%.2f", localTimePercent)) - .append("%)"); - } + // if (counter.getValue() != 0) { + try (Formatter fmt = new Formatter()) { + builder.append("(Active: ") + .append(this.printCounter(counter.getValue(), counter.getType())) + .append(", % non-child: ").append(fmt.format("%.2f", localTimePercent)) + .append("%)"); } + // } builder.append("\n"); // 2. info String @@ -440,7 +440,8 @@ public void addFirstChild(RuntimeProfile child) { } } - // Because the profile of summary and child fragment is not a real parent-child relationship + // Because the profile of summary and child fragment is not a real parent-child + // relationship // Each child profile needs to calculate the time proportion consumed by itself public void computeTimeInChildProfile() { childMap.values().forEach(RuntimeProfile::computeTimeInProfile); @@ -485,8 +486,10 @@ public void sortChildren() { this.childList.sort((profile1, profile2) -> Long.compare(profile2.first.getCounterTotalTime().getValue(), profile1.first.getCounterTotalTime().getValue())); } catch (IllegalArgumentException e) { - // This exception may be thrown if the counter total time of the child is updated in the update method - // during the sorting process. This sorting only affects the profile instance display order, so this + // This exception may be thrown if the counter total time of the child is + // updated in the update method + // during the sorting process. This sorting only affects the profile instance + // display order, so this // exception is temporarily ignored here. if (LOG.isDebugEnabled()) { LOG.debug("sort child list error: ", e); @@ -523,4 +526,3 @@ public Map getInfoStrings() { return infoStrings; } } -