From 51bbf177864f25725043dd7d33d815763cb7885c Mon Sep 17 00:00:00 2001 From: HappenLee Date: Tue, 13 Jun 2023 09:06:51 +0800 Subject: [PATCH] [Refactor](Profile) Add and refactor the join profile (#20693) --- be/src/exec/exec_node.cpp | 1 - .../exec/join/process_hash_table_probe_impl.h | 2 ++ be/src/vec/exec/join/vhash_join_node.cpp | 12 ++------ be/src/vec/exec/join/vhash_join_node.h | 3 +- be/src/vec/exec/join/vjoin_node_base.cpp | 21 ++++++++++++++ be/src/vec/exec/join/vjoin_node_base.h | 8 ++++- .../vec/exec/join/vnested_loop_join_node.cpp | 29 ++++++++++--------- 7 files changed, 48 insertions(+), 28 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 9d731193c0..2707e47a76 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -145,7 +145,6 @@ Status ExecNode::prepare(RuntimeState* state) { for (int i = 0; i < _children.size(); ++i) { RETURN_IF_ERROR(_children[i]->prepare(state)); } - return Status::OK(); } diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 5ef335ea52..adaefbc9f8 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -502,6 +502,7 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( } int multi_matched_output_row_count = 0; if (current_offset < _batch_size) { + SCOPED_TIMER(_search_hashtable_timer); while (probe_index < probe_rows) { // ignore null rows if constexpr (ignore_null && need_null_map_for_probe) { @@ -677,6 +678,7 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( // dispose the other join conjunct exec auto row_count = output_block->rows(); if (row_count) { + SCOPED_TIMER(_join_node->_process_other_join_conjunct_timer); int orig_columns = output_block->columns(); IColumn::Filter other_conjunct_filter(row_count, 1); bool can_be_filter_all; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 2dfb93a98d..822b99db58 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -449,11 +449,6 @@ Status HashJoinNode::prepare(RuntimeState* state) { "ProbeKeyArena", TUnit::BYTES, "MemoryUsage"); // Build phase - _build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true); - runtime_profile()->add_child(_build_phase_profile, false, nullptr); - _build_get_next_timer = ADD_TIMER(_build_phase_profile, "BuildGetNextTime"); - _build_timer = ADD_TIMER(_build_phase_profile, "BuildTime"); - auto record_profile = _should_build_hash_table ? _build_phase_profile : faker_runtime_profile(); _build_table_timer = ADD_TIMER(record_profile, "BuildTableTime"); _build_side_merge_block_timer = ADD_TIMER(record_profile, "BuildSideMergeBlockTime"); @@ -461,25 +456,22 @@ Status HashJoinNode::prepare(RuntimeState* state) { _build_expr_call_timer = ADD_TIMER(record_profile, "BuildExprCallTime"); _build_table_expanse_timer = ADD_TIMER(record_profile, "BuildTableExpanseTime"); _build_table_convert_timer = ADD_TIMER(record_profile, "BuildTableConvertToPartitionedTime"); - _build_rows_counter = ADD_COUNTER(record_profile, "BuildRows", TUnit::UNIT); _build_side_compute_hash_timer = ADD_TIMER(record_profile, "BuildSideHashComputingTime"); _build_runtime_filter_timer = ADD_TIMER(record_profile, "BuildRuntimeFilterTime"); _push_down_timer = ADD_TIMER(record_profile, "PublishRuntimeFilterTime"); _push_compute_timer = ADD_TIMER(record_profile, "PushDownComputeTime"); // Probe phase - auto probe_phase_profile = runtime_profile()->create_child("ProbePhase", true, true); - _probe_timer = ADD_TIMER(probe_phase_profile, "ProbeTime"); + auto probe_phase_profile = _probe_phase_profile; _probe_next_timer = ADD_TIMER(probe_phase_profile, "ProbeFindNextTime"); _probe_expr_call_timer = ADD_TIMER(probe_phase_profile, "ProbeExprCallTime"); - _probe_rows_counter = ADD_COUNTER(probe_phase_profile, "ProbeRows", TUnit::UNIT); _search_hashtable_timer = ADD_TIMER(probe_phase_profile, "ProbeWhenSearchHashTableTime"); _build_side_output_timer = ADD_TIMER(probe_phase_profile, "ProbeWhenBuildSideOutputTime"); _probe_side_output_timer = ADD_TIMER(probe_phase_profile, "ProbeWhenProbeSideOutputTime"); - _join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer"); _open_timer = ADD_TIMER(runtime_profile(), "OpenTime"); _allocate_resource_timer = ADD_TIMER(runtime_profile(), "AllocateResourceTime"); + _process_other_join_conjunct_timer = ADD_TIMER(runtime_profile(), "OtherJoinConjunctTime"); _build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT); _build_buckets_fill_counter = ADD_COUNTER(runtime_profile(), "FilledBuckets", TUnit::UNIT); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 286a0783a6..36398ba4ad 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -303,6 +303,7 @@ private: RuntimeProfile::Counter* _open_timer; RuntimeProfile::Counter* _allocate_resource_timer; + RuntimeProfile::Counter* _process_other_join_conjunct_timer; RuntimeProfile::Counter* _memory_usage_counter; RuntimeProfile::Counter* _build_blocks_memory_usage; @@ -310,8 +311,6 @@ private: RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage; RuntimeProfile::HighWaterMarkCounter* _probe_arena_memory_usage; - RuntimeProfile* _build_phase_profile; - std::shared_ptr _arena; // maybe share hash table with other fragment instances diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 3bc1d93c8b..cc50299db5 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -105,6 +105,26 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des } } +Status VJoinNodeBase::prepare(RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::prepare(state)); + _build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true); + runtime_profile()->add_child(_build_phase_profile, false, nullptr); + _build_get_next_timer = ADD_TIMER(_build_phase_profile, "BuildGetNextTime"); + _build_timer = ADD_TIMER(_build_phase_profile, "BuildTime"); + _build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows", TUnit::UNIT); + + _probe_phase_profile = runtime_profile()->create_child("ProbePhase", true, true); + _probe_timer = ADD_TIMER(_probe_phase_profile, "ProbeTime"); + _probe_rows_counter = ADD_COUNTER(_probe_phase_profile, "ProbeRows", TUnit::UNIT); + + _build_output_block_timer = ADD_TIMER(runtime_profile(), "BuildOutPutBlockTimer"); + _join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer"); + _push_down_timer = ADD_TIMER(runtime_profile(), "PublishRuntimeFilterTime"); + _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime"); + + return Status::OK(); +} + Status VJoinNodeBase::close(RuntimeState* state) { return ExecNode::close(state); } @@ -131,6 +151,7 @@ void VJoinNodeBase::_construct_mutable_join_block() { } Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block) { + SCOPED_TIMER(_build_output_block_timer); auto is_mem_reuse = output_block->mem_reuse(); MutableBlock mutable_block = is_mem_reuse diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index ce644c159e..f29897719d 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -57,6 +57,8 @@ class VJoinNodeBase : public ExecNode { public: VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + virtual Status prepare(RuntimeState* state) override; + virtual Status close(RuntimeState* state) override; virtual Status open(RuntimeState* state) override; @@ -130,14 +132,18 @@ protected: MutableColumnPtr _tuple_is_null_left_flag_column; MutableColumnPtr _tuple_is_null_right_flag_column; + RuntimeProfile* _build_phase_profile; RuntimeProfile::Counter* _build_timer; RuntimeProfile::Counter* _build_get_next_timer; - RuntimeProfile::Counter* _probe_timer; RuntimeProfile::Counter* _build_rows_counter; + + RuntimeProfile* _probe_phase_profile; + RuntimeProfile::Counter* _probe_timer; RuntimeProfile::Counter* _probe_rows_counter; RuntimeProfile::Counter* _push_down_timer; RuntimeProfile::Counter* _push_compute_timer; RuntimeProfile::Counter* _join_filter_timer; + RuntimeProfile::Counter* _build_output_block_timer; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 17cd2ce22a..57ad81f120 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -135,13 +135,8 @@ Status VNestedLoopJoinNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); - _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); - _build_rows_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT); _probe_rows_counter = ADD_COUNTER(runtime_profile(), "ProbeRows", TUnit::UNIT); _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); - _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime"); - _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime"); - _join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer"); // pre-compute the tuple index of build tuples in the output row int num_build_tuples = child(1)->row_desc().tuple_descriptors().size(); @@ -174,21 +169,27 @@ Status VNestedLoopJoinNode::close(RuntimeState* state) { return VJoinNodeBase::close(state); } +// TODO: This method should be implemented by the parent class Status VNestedLoopJoinNode::_materialize_build_side(RuntimeState* state) { // Do a full scan of child(1) and store all build row batches. - RETURN_IF_ERROR(child(1)->open(state)); + { + SCOPED_TIMER(_build_get_next_timer); + RETURN_IF_ERROR(child(1)->open(state)); + } bool eos = false; + Block block; while (true) { RETURN_IF_CANCELLED(state); - - Block block; - RETURN_IF_ERROR(child(1)->get_next_after_projects( - state, &block, &eos, - std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & - ExecNode::get_next, - _children[1], std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3))); + { + SCOPED_TIMER(_build_get_next_timer); + RETURN_IF_ERROR(child(1)->get_next_after_projects( + state, &block, &eos, + std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & + ExecNode::get_next, + _children[1], std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3))); + } sink(state, &block, eos);