From 31a5e072e76eb6f5956c1fedc135cc6e95d982d8 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 17 Oct 2023 15:11:46 +0800 Subject: [PATCH] [refactor](pipelineX) Simplify set operation (#25502) --- .../pipeline/exec/set_probe_sink_operator.cpp | 6 +- .../pipeline/exec/set_probe_sink_operator.h | 5 +- be/src/pipeline/exec/set_sink_operator.cpp | 6 +- be/src/pipeline/exec/set_sink_operator.h | 6 +- .../common/hash_table/hash_table_set_build.h | 55 ++----------------- .../common/hash_table/hash_table_set_probe.h | 52 +++--------------- be/src/vec/exec/vset_operation_node.cpp | 2 +- be/src/vec/exec/vset_operation_node.h | 3 + 8 files changed, 30 insertions(+), 105 deletions(-) diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 81c30d45d1..aea9aff0e7 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -131,9 +131,9 @@ Status SetProbeSinkOperatorX::sink(RuntimeState* state, vectorized [&](auto&& arg) -> Status { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - vectorized::HashTableProbeX - process_hashtable_ctx(local_state, probe_rows); - return process_hashtable_ctx.mark_data_in_hashtable(local_state, arg); + vectorized::HashTableProbe + process_hashtable_ctx(&local_state, probe_rows); + return process_hashtable_ctx.mark_data_in_hashtable(arg); } else { LOG(FATAL) << "FATAL: uninited hash table"; } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 2c2f1ce1c6..45176fd009 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -31,7 +31,7 @@ class RuntimeState; namespace vectorized { class Block; template -struct HashTableProbeX; +struct HashTableProbe; } // namespace vectorized namespace pipeline { @@ -81,11 +81,12 @@ public: : Base(parent, state) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + int64_t* valid_element_in_hash_tbl() { return &_shared_state->valid_element_in_hash_tbl; } private: friend class SetProbeSinkOperatorX; template - friend struct vectorized::HashTableProbeX; + friend struct vectorized::HashTableProbe; //record insert column id during probe std::vector _probe_column_inserted_id; diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 604729a470..6725deffa1 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -116,9 +116,9 @@ Status SetSinkOperatorX::_process_build_block( [&](auto&& arg) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - vectorized::HashTableBuildX - hash_table_build_process(rows, raw_ptrs, offset, state); - static_cast(hash_table_build_process(local_state, arg)); + vectorized::HashTableBuild + hash_table_build_process(&local_state, rows, raw_ptrs, offset, state); + static_cast(hash_table_build_process(arg, local_state._arena)); } else { LOG(FATAL) << "FATAL: uninited hash table"; } diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 945ec06891..5383b1b3a5 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -29,7 +29,7 @@ class ExecNode; namespace vectorized { template -struct HashTableBuildX; +struct HashTableBuild; } namespace pipeline { @@ -74,10 +74,12 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + int64_t* mem_used() { return &_shared_state->mem_used; }; + private: friend class SetSinkOperatorX; template - friend struct vectorized::HashTableBuildX; + friend struct vectorized::HashTableBuild; RuntimeProfile::Counter* _build_timer; // time to build hash table vectorized::MutableBlock _mutable_block; diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h b/be/src/vec/common/hash_table/hash_table_set_build.h index ff1fec3ab1..e3c1ed27b1 100644 --- a/be/src/vec/common/hash_table/hash_table_set_build.h +++ b/be/src/vec/common/hash_table/hash_table_set_build.h @@ -23,13 +23,13 @@ namespace doris::vectorized { template struct HashTableBuild { - HashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs, - VSetOperationNode* operation_node, uint8_t offset, + template + HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs, uint8_t offset, RuntimeState* state) - : _rows(rows), + : _mem_used(parent->mem_used()), + _rows(rows), _offset(offset), _build_raw_ptrs(build_raw_ptrs), - _operation_node(operation_node), _state(state) {} Status operator()(HashTableContext& hash_table_ctx, Arena& arena) { @@ -39,7 +39,7 @@ struct HashTableBuild { Defer defer {[&]() { int64_t bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); - _operation_node->_mem_used += bucket_bytes - old_bucket_bytes; + *_mem_used += bucket_bytes - old_bucket_bytes; }}; KeyGetter key_getter(_build_raw_ptrs); @@ -62,54 +62,11 @@ struct HashTableBuild { } private: - const int _rows; - const uint8_t _offset; - ColumnRawPtrs& _build_raw_ptrs; - VSetOperationNode* _operation_node; - RuntimeState* _state; -}; - -template -struct HashTableBuildX { - HashTableBuildX(int rows, ColumnRawPtrs& build_raw_ptrs, uint8_t offset, RuntimeState* state) - : _rows(rows), _offset(offset), _build_raw_ptrs(build_raw_ptrs), _state(state) {} - - Status operator()(pipeline::SetSinkLocalState& local_state, - HashTableContext& hash_table_ctx) { - using KeyGetter = typename HashTableContext::State; - using Mapped = typename HashTableContext::Mapped; - int64_t old_bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); - - Defer defer {[&]() { - int64_t bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); - local_state._shared_state->mem_used += bucket_bytes - old_bucket_bytes; - }}; - - KeyGetter key_getter(_build_raw_ptrs); - hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows); - - size_t k = 0; - auto creator = [&](const auto& ctor, auto& key, auto& origin) { - HashTableContext::try_presis_key(key, origin, local_state._arena); - ctor(key, Mapped {k, _offset}); - }; - auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset}; }; - - for (; k < _rows; ++k) { - if (k % CHECK_FRECUENCY == 0) { - RETURN_IF_CANCELLED(_state); - } - hash_table_ctx.lazy_emplace(key_getter, k, creator, creator_for_null_key); - } - return Status::OK(); - } - -private: + int64_t* _mem_used; const int _rows; const uint8_t _offset; ColumnRawPtrs& _build_raw_ptrs; RuntimeState* _state; - std::vector _build_side_hash_values; }; } // namespace doris::vectorized diff --git a/be/src/vec/common/hash_table/hash_table_set_probe.h b/be/src/vec/common/hash_table/hash_table_set_probe.h index 4a79b86a14..eb00cca856 100644 --- a/be/src/vec/common/hash_table/hash_table_set_probe.h +++ b/be/src/vec/common/hash_table/hash_table_set_probe.h @@ -24,10 +24,11 @@ namespace doris::vectorized { template struct HashTableProbe { - HashTableProbe(VSetOperationNode* operation_node, int probe_rows) - : _operation_node(operation_node), + template + HashTableProbe(Parent* parent, int probe_rows) + : _valid_element_in_hash_tbl(parent->valid_element_in_hash_tbl()), _probe_rows(probe_rows), - _probe_raw_ptrs(operation_node->_probe_columns) {} + _probe_raw_ptrs(parent->_probe_columns) {} Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) { using KeyGetter = typename HashTableContext::State; @@ -43,49 +44,9 @@ struct HashTableProbe { if (!(it->visited)) { it->visited = true; if constexpr (is_intersected) { //intersected - _operation_node->_valid_element_in_hash_tbl++; + (*_valid_element_in_hash_tbl)++; } else { - _operation_node->_valid_element_in_hash_tbl--; //except - } - } - } - } - } else { - LOG(FATAL) << "Invalid RowRefListType!"; - } - return Status::OK(); - } - -private: - VSetOperationNode* _operation_node; - const size_t _probe_rows; - ColumnRawPtrs& _probe_raw_ptrs; - std::vector _probe_keys; -}; - -template -struct HashTableProbeX { - HashTableProbeX(pipeline::SetProbeSinkLocalState& local_state, int probe_rows) - : _probe_rows(probe_rows), _probe_raw_ptrs(local_state._probe_columns) {} - - Status mark_data_in_hashtable(pipeline::SetProbeSinkLocalState& local_state, - HashTableContext& hash_table_ctx) { - using KeyGetter = typename HashTableContext::State; - - KeyGetter key_getter(_probe_raw_ptrs); - hash_table_ctx.init_serialized_keys(_probe_raw_ptrs, _probe_rows); - - if constexpr (std::is_same_v) { - for (int probe_index = 0; probe_index < _probe_rows; probe_index++) { - auto find_result = hash_table_ctx.find(key_getter, probe_index); - if (find_result.is_found()) { //if found, marked visited - auto it = find_result.get_mapped().begin(); - if (!(it->visited)) { - it->visited = true; - if constexpr (is_intersected) { //intersected - local_state._shared_state->valid_element_in_hash_tbl++; - } else { - local_state._shared_state->valid_element_in_hash_tbl--; //except + (*_valid_element_in_hash_tbl)--; //except } } } @@ -97,6 +58,7 @@ struct HashTableProbeX { } private: + int64_t* _valid_element_in_hash_tbl; const size_t _probe_rows; ColumnRawPtrs& _probe_raw_ptrs; std::vector _probe_keys; diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 9b15db67b3..d284385b8e 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -320,7 +320,7 @@ Status VSetOperationNode::process_build_block(Block& block, uint8_ using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { HashTableBuild hash_table_build_process( - rows, raw_ptrs, this, offset, state); + this, rows, raw_ptrs, offset, state); st = hash_table_build_process(arg, _arena); } else { LOG(FATAL) << "FATAL: uninited hash table"; diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index dfd9643011..ff016469f4 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -73,6 +73,9 @@ public: bool is_child_finished(int child_id) const; + int64_t* valid_element_in_hash_tbl() { return &_valid_element_in_hash_tbl; } + int64_t* mem_used() { return &_mem_used; }; + private: void _finalize_probe(int child_id); //Todo: In build process of hashtable, It's same as join node.