From b1e7343532da6bb0ff5ce0dd6b829ff95b33a89b Mon Sep 17 00:00:00 2001 From: awakeljw <993007281@qq.com> Date: Wed, 23 Feb 2022 10:28:16 +0800 Subject: [PATCH] [Vectorized] [HashJoin] Opt HashJoin Performance (#8119) Co-authored-by: lihaopeng --- be/src/vec/columns/column_complex.h | 12 +- be/src/vec/columns/column_decimal.h | 10 +- be/src/vec/columns/column_string.cpp | 6 +- be/src/vec/columns/column_vector.cpp | 17 +- be/src/vec/core/block.h | 17 +- be/src/vec/exec/join/join_op.h | 12 +- be/src/vec/exec/join/vhash_join_node.cpp | 239 +++++++++++++---------- be/src/vec/exec/join/vhash_join_node.h | 8 +- be/src/vec/exec/vset_operation_node.cpp | 42 +++- be/src/vec/exec/vset_operation_node.h | 10 +- be/src/vec/sink/vdata_stream_sender.cpp | 8 +- 11 files changed, 231 insertions(+), 150 deletions(-) diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index 0cdf3a34f4..cd26c7fc23 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -153,9 +153,15 @@ public: void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override { const Self& src_vec = assert_cast(src); - data.reserve(size() + (indices_end - indices_begin)); - for (auto x = indices_begin; x != indices_end; ++x) { - data.push_back(src_vec.get_element(*x)); + auto new_size = indices_end - indices_begin; + + for (int i = 0; i < new_size; ++i) { + auto offset = *(indices_begin + i); + if (offset == -1) { + data.emplace_back(T{}); + } else { + data.emplace_back(src_vec.get_element(offset)); + } } } diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index a1294236a3..0b9c96a072 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -101,9 +101,13 @@ public: void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override { const Self& src_vec = assert_cast(src); - data.reserve(size() + (indices_end - indices_begin)); - for (auto x = indices_begin; x != indices_end; ++x) { - data.push_back_without_reserve(src_vec.get_element(*x)); + auto origin_size = size(); + auto new_size = indices_end - indices_begin; + data.resize(origin_size + new_size); + + for (int i = 0; i < new_size; ++i) { + auto offset = *(indices_begin + i); + data[origin_size + i] = offset == -1 ? T{} : src_vec.get_element(offset); } } diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 9ebf879270..832bbbe310 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -95,7 +95,11 @@ void ColumnString::insert_range_from(const IColumn& src, size_t start, size_t le void ColumnString::insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) { for (auto x = indices_begin; x != indices_end; ++x) { - ColumnString::insert_from(src, *x); + if (*x == -1) { + ColumnString::insert_default(); + } else { + ColumnString::insert_from(src, *x); + } } } diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index aed2502b1f..132d359c4e 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -221,9 +221,20 @@ void ColumnVector::insert_range_from(const IColumn& src, size_t start, size_t template void ColumnVector::insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) { const Self& src_vec = assert_cast(src); - data.reserve(size() + (indices_end - indices_begin)); - for (auto x = indices_begin; x != indices_end; ++x) { - data.push_back_without_reserve(src_vec.get_element(*x)); + auto origin_size = size(); + auto new_size = indices_end - indices_begin; + data.resize(origin_size + new_size); + + for (int i = 0; i < new_size; ++i) { + int offset = indices_begin[i]; + if constexpr (std::is_same_v) { + // Now Uint8 use to identify null and non null + // 1. nullable column : offset == -1 means is null at the here, set true here + // 2. real data column : offset == -1 what at is meaningless + data[origin_size + i] = (offset == -1) ? T{1} : src_vec.get_element(offset); + } else { + data[origin_size + i] = (offset == -1) ? T{0} : src_vec.get_element(offset); + } } } diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 023ad92254..621dabf6f1 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -297,13 +298,22 @@ public: MutableBlock() = default; ~MutableBlock() = default; - MutableBlock(MutableColumns&& columns, DataTypes&& data_types) - : _columns(std::move(columns)), _data_types(std::move(data_types)) {} + MutableBlock(DataTypes data_types) : _columns(data_types.size()), _data_types(std::move(data_types)) { + for (int i = 0; i < _data_types.size(); ++i) { + _columns[i] = _data_types[i]->create_column(); + } + } + MutableBlock(Block* block) : _columns(block->mutate_columns()), _data_types(block->get_data_types()) {} MutableBlock(Block&& block) : _columns(block.mutate_columns()), _data_types(block.get_data_types()) {} + void operator=(MutableBlock&& m_block) { + _columns = std::move(m_block._columns); + _data_types = std::move(m_block._data_types); + } + size_t rows() const; size_t columns() const { return _columns.size(); } @@ -364,9 +374,6 @@ public: _columns.clear(); _data_types.clear(); } - - // TODO: use add_rows instead of this - // add_rows(Block* block,PODArray& group,int group_num); }; } // namespace vectorized diff --git a/be/src/vec/exec/join/join_op.h b/be/src/vec/exec/join/join_op.h index 71fec00d21..caf8e7d3ec 100644 --- a/be/src/vec/exec/join/join_op.h +++ b/be/src/vec/exec/join/join_op.h @@ -26,15 +26,15 @@ namespace doris::vectorized { struct RowRef { using SizeT = uint32_t; /// Do not use size_t cause of memory economy - const Block* block = nullptr; SizeT row_num = 0; + uint8_t block_offset; // Use in right join to mark row is visited // TODO: opt the varaible to use it only need bool visited = false; RowRef() {} - RowRef(const Block* block_ptr, size_t row_num_count, bool is_visited = false) - : block(block_ptr), row_num(row_num_count), visited(is_visited) {} + RowRef(size_t row_num_count, uint8_t block_offset_, bool is_visited = false) + : row_num(row_num_count), block_offset(block_offset_), visited(is_visited) {} }; /// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs) @@ -115,10 +115,10 @@ struct RowRefList : RowRef { }; RowRefList() {} - RowRefList(const Block* block_, size_t row_num_) : RowRef(block_, row_num_) {} + RowRefList(size_t row_num_, uint8_t block_offset_) : RowRef(row_num_, block_offset_) {} ForwardIterator begin() { return ForwardIterator(this); } - ForwardIterator end() { return ForwardIterator::end(); } + static ForwardIterator end() { return ForwardIterator::end(); } /// insert element after current one void insert(RowRef&& row_ref, Arena& pool) { @@ -138,6 +138,4 @@ private: uint32_t row_count = 1; }; -// using MapI32 = doris::vectorized::HashMap>; -// using I32KeyType = doris::vectorized::ColumnsHashing::HashMethodOneNumber; } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 863c6a6087..eaeeeeda72 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -42,13 +42,14 @@ using ProfileCounter = RuntimeProfile::Counter; template struct ProcessHashTableBuild { ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs, - HashJoinNode* join_node, int batch_size) + HashJoinNode* join_node, int batch_size, uint8_t offset) : _rows(rows), _skip_rows(0), _acquired_block(acquired_block), _build_raw_ptrs(build_raw_ptrs), _join_node(join_node), - _batch_size(batch_size) {} + _batch_size(batch_size), + _offset(offset) {} Status operator()(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool has_runtime_filter) { @@ -88,14 +89,14 @@ struct ProcessHashTableBuild { } if (emplace_result.is_inserted()) { - new (&emplace_result.get_mapped()) Mapped({&_acquired_block, k}); + new (&emplace_result.get_mapped()) Mapped({k, _offset}); if (has_runtime_filter) { inserted_rows.push_back(k); } } else { if constexpr (!build_unique) { /// The first element of the list is stored in the value of the hash table, the rest in the pool. - emplace_result.get_mapped().insert({&_acquired_block, k}, _join_node->_arena); + emplace_result.get_mapped().insert({k, _offset}, _join_node->_arena); if (has_runtime_filter) { inserted_rows.push_back(k); } @@ -105,14 +106,6 @@ struct ProcessHashTableBuild { } } - if constexpr (build_unique) { - // If all row in build block is skip, just remove it - // to reduce mem pressure - if (_skip_rows == _rows) { - _join_node->_acquire_list.remove_last_element(); - } - } - COUNTER_UPDATE(_join_node->_build_table_expanse_timer, hash_table_ctx.hash_table.get_resize_timer_value()); @@ -126,6 +119,7 @@ private: ColumnRawPtrs& _build_raw_ptrs; HashJoinNode* _join_node; int _batch_size; + uint8_t _offset; }; template @@ -168,9 +162,13 @@ struct ProcessHashTableProbe { _right_col_len(join_node->_right_col_len), _batch_size(batch_size), _probe_rows(probe_rows), + _build_blocks(join_node->_build_blocks), _probe_block(join_node->_probe_block), _probe_index(join_node->_probe_index), _probe_raw_ptrs(join_node->_probe_columns), + _items_counts(join_node->_items_counts), + _build_block_offsets(join_node->_build_block_offsets), + _build_block_rows(join_node->_build_block_rows), _rows_returned_counter(join_node->_rows_returned_counter) {} // Only process the join with no other join conjunt, because of no other join conjunt @@ -183,122 +181,129 @@ struct ProcessHashTableProbe { using Mapped = typename HashTableContext::Mapped; KeyGetter key_getter(_probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - - std::vector items_counts(_probe_rows); auto& mcol = mutable_block.mutable_columns(); int current_offset = 0; + _items_counts.resize(_probe_rows); + memset(_items_counts.data(), 0, sizeof(uint32_t) * _probe_rows); + + constexpr auto is_right_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN; + + constexpr auto is_right_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN; + + constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; + for (; _probe_index < _probe_rows;) { if constexpr (ignore_null) { if ((*null_map)[_probe_index]) { - items_counts[_probe_index++] = 0; + _items_counts[_probe_index++] = (uint32_t)0; continue; } } - int repeat_count = 0; - + int last_offset = current_offset; auto find_result = (*null_map)[_probe_index] ? decltype(key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena)) {nullptr, false} : key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena); - if constexpr (JoinOpType::value == TJoinOp::INNER_JOIN) { + + if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { + if (!find_result.is_found()) { + ++current_offset; + } + } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN) { + if (find_result.is_found()) { + ++current_offset; + } + } else { if (find_result.is_found()) { auto& mapped = find_result.get_mapped(); // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. // We should rethink whether to use this iterator mode in the future. Now just opt the one row case if (mapped.get_row_count() == 1) { - ++repeat_count; - for (size_t j = 0; j < _right_col_len; ++j) { - auto& column = *mapped.block->get_by_position(j).column; - mcol[j + _right_col_idx]->insert_from(column, mapped.row_num); + if constexpr (is_right_join) + mapped.visited = true; + + if constexpr (!is_right_semi_anti_join) { + _build_block_offsets[current_offset] = mapped.block_offset; + _build_block_rows[current_offset] = mapped.row_num; + ++current_offset; } } else { // prefetch is more useful while matching to multiple rows if (_probe_index + 2 < _probe_rows) key_getter.prefetch(hash_table_ctx.hash_table, _probe_index + 2, _arena); - for (auto it = mapped.begin(); it.ok(); ++it) { - ++repeat_count; - for (size_t j = 0; j < _right_col_len; ++j) { - auto& column = *it->block->get_by_position(j).column; - // TODO: interface insert from cause serious performance problems - // when column is nullable. Try to make more effective way - mcol[j + _right_col_idx]->insert_from(column, it->row_num); - } - } - } - } - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { - if (!find_result.is_found()) { - ++repeat_count; - } - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN) { - if (find_result.is_found()) { - ++repeat_count; - } - } else { - if (_probe_index + 2 < _probe_rows) - key_getter.prefetch(hash_table_ctx.hash_table, _probe_index + 2, _arena); - if (find_result.is_found()) { - auto& mapped = find_result.get_mapped(); - // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. - // We should rethink whether to use this iterator mode in the future. Now just opt the one row case - if (mapped.get_row_count() == 1) { - mapped.visited = true; - // right semi/anti join should dispose the data in hash table - // after probe data eof - if constexpr (JoinOpType::value != TJoinOp::RIGHT_ANTI_JOIN && - JoinOpType::value != TJoinOp::RIGHT_SEMI_JOIN) { - ++repeat_count; - for (size_t j = 0; j < _right_col_len; ++j) { - auto& column = *mapped.block->get_by_position(j).column; - mcol[j + _right_col_idx]->insert_from(column, mapped.row_num); - } - } - } else { for (auto it = mapped.begin(); it.ok(); ++it) { - // right semi/anti join should dispose the data in hash table - // after probe data eof - if constexpr (JoinOpType::value != TJoinOp::RIGHT_ANTI_JOIN && - JoinOpType::value != TJoinOp::RIGHT_SEMI_JOIN) { - ++repeat_count; - for (size_t j = 0; j < _right_col_len; ++j) { - auto& column = *it->block->get_by_position(j).column; - // TODO: interface insert from cause serious performance problems - // when column is nullable. Try to make more effective way - mcol[j + _right_col_idx]->insert_from(column, it->row_num); + if constexpr (!is_right_semi_anti_join) { + if (current_offset < _batch_size) { + _build_block_offsets[current_offset] = it->block_offset; + _build_block_rows[current_offset] = it->row_num; + } else { + _build_block_offsets.emplace_back(it->block_offset); + _build_block_rows.emplace_back(it->row_num); } + ++current_offset; } - it->visited = true; + if constexpr (is_right_join) + it->visited = true; } } - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) { - // only full outer / left outer need insert the data of right table - ++repeat_count; - for (size_t j = 0; j < _right_col_len; ++j) { - DCHECK(mcol[j + _right_col_idx]->is_nullable()); - assert_cast(mcol[j + _right_col_idx].get())->insert_data(nullptr, 0); + } else { + if constexpr (probe_all) { + // only full outer / left outer need insert the data of right table + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; + ++current_offset; } } } - items_counts[_probe_index++] = repeat_count; - current_offset += repeat_count; + _items_counts[_probe_index++] = (uint32_t)(current_offset - last_offset); if (current_offset >= _batch_size) { break; } } - + + // insert all matched build rows + if constexpr (!is_right_semi_anti_join) { + if (_build_blocks.size() == 1) { + for (int i = 0; i < _right_col_len; i++) { + auto& column = *_build_blocks[0].get_by_position(i).column; + mcol[i + _right_col_idx]->insert_indices_from(column, + _build_block_rows.data(), _build_block_rows.data() + current_offset); + } + } else { + for (int i = 0; i < _right_col_len; i++) { + for (int j = 0; j < current_offset; j++) { + if constexpr (probe_all) { + if (_build_block_offsets[j] == -1) { + DCHECK(mcol[i + _right_col_idx]->is_nullable()); + assert_cast(mcol[i + _right_col_idx].get())->insert_data(nullptr, 0); + } else { + auto& column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column; + mcol[i + _right_col_idx]->insert_from(column, _build_block_rows[j]); + } + } else { + auto &column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column; + mcol[i + _right_col_idx]->insert_from(column, _build_block_rows[j]); + } + } + } + } + } + for (int i = 0; i < _right_col_idx; ++i) { auto& column = _probe_block.get_by_position(i).column; - column->replicate(items_counts.data(), current_offset, *mcol[i]); + column->replicate(&_items_counts[0], current_offset, *mcol[i]); } output_block->swap(mutable_block.to_block()); return Status::OK(); } - // In the presence of other join conjunt, the process of join become more complicated. // each matching join column need to be processed by other join conjunt. so the sturct of mutable block // and output block may be different @@ -344,8 +349,9 @@ struct ProcessHashTableProbe { for (auto it = mapped.begin(); it.ok(); ++it) { ++current_offset; + const Block& cur_blk = _build_blocks[it->block_offset]; for (size_t j = 0; j < _right_col_len; ++j) { - auto& column = *it->block->get_by_position(j).column; + auto& column = *cur_blk.get_by_position(j).column; mcol[j + _right_col_idx]->insert_from(column, it->row_num); } visited_map.emplace_back(&it->visited); @@ -386,6 +392,7 @@ struct ProcessHashTableProbe { for (int i = _probe_index; i < _probe_rows; ++i) { offset_data[i] = current_offset; } + output_block->swap(mutable_block.to_block()); for (int i = 0; i < _right_col_idx; ++i) { auto& column = _probe_block.get_by_position(i).column; @@ -507,10 +514,10 @@ struct ProcessHashTableProbe { auto& iter = hash_table_ctx.iter; auto block_size = 0; - auto insert_from_hash_table = [&](const Block* block, uint32_t row_num) { + auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) { block_size++; for (size_t j = 0; j < _right_col_len; ++j) { - auto& column = *block->get_by_position(j).column; + auto& column = *_build_blocks[offset].get_by_position(j).column; mcol[j + _right_col_idx]->insert_from(column, row_num); } }; @@ -520,10 +527,10 @@ struct ProcessHashTableProbe { for (auto it = mapped.begin(); it.ok(); ++it) { if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN) { if (it->visited) - insert_from_hash_table(it->block, it->row_num); + insert_from_hash_table(it->block_offset, it->row_num); } else { if (!it->visited) - insert_from_hash_table(it->block, it->row_num); + insert_from_hash_table(it->block_offset, it->row_num); } } } @@ -551,11 +558,16 @@ private: const int _right_col_len; const int _batch_size; const size_t _probe_rows; + const std::vector& _build_blocks; const Block& _probe_block; int& _probe_index; ColumnRawPtrs& _probe_raw_ptrs; Arena _arena; + std::vector& _items_counts; + std::vector& _build_block_offsets; + std::vector& _build_block_rows; + ProfileCounter* _rows_returned_counter; }; @@ -571,8 +583,6 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _match_all_build(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN), _build_unique(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN), - _is_left_semi_anti(_join_op == TJoinOp::LEFT_ANTI_JOIN || - _join_op == TJoinOp::LEFT_SEMI_JOIN), _is_right_semi_anti(_join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_SEMI_JOIN), _is_outer_join(_match_all_build || _match_all_probe) { @@ -695,6 +705,9 @@ Status HashJoinNode::prepare(RuntimeState* state) { _right_col_idx = _is_right_semi_anti ? 0 : _left_table_data_types.size(); _right_col_len = _right_table_data_types.size(); + + _build_block_offsets.resize(state->batch_size()); + _build_block_rows.resize(state->batch_size()); return Status::OK(); } @@ -706,7 +719,6 @@ Status HashJoinNode::close(RuntimeState* state) { if (_vother_join_conjunct_ptr) (*_vother_join_conjunct_ptr)->close(state); _mem_tracker->Release(_mem_used); - return ExecNode::close(state); } @@ -861,11 +873,15 @@ Status HashJoinNode::open(RuntimeState* state) { Status HashJoinNode::_hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(child(1)->open(state)); SCOPED_TIMER(_build_timer); - Block block; + MutableBlock mutable_block(_right_table_data_types); + uint8_t index = 0; + int64_t last_mem_used = 0; bool eos = false; + + Block block; while (!eos) { - block.clear(); + block.clear_column_data(); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(child(1)->get_next(state, &block, &eos)); @@ -873,10 +889,27 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) { _mem_used += block.allocated_bytes(); RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while getting next from the child 1."); - RETURN_IF_ERROR(_process_build_block(state, block)); - RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table."); + if (block.rows() != 0) { mutable_block.merge(block); } + + // make one block for each 4 gigabytes + constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; + if (_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE) { + _build_blocks.emplace_back(mutable_block.to_block()); + // TODO:: Rethink may we should do the proess after we recevie all build blocks ? + // which is better. + RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index)); + RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table."); + + mutable_block = MutableBlock(_right_table_data_types); + ++index; + last_mem_used = _mem_used; + } } + _build_blocks.emplace_back(mutable_block.to_block()); + RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index)); + RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table."); + return std::visit( [&](auto&& arg) -> Status { using HashTableCtxType = std::decay_t; @@ -902,10 +935,7 @@ Status HashJoinNode::extract_build_join_column(Block& block, NullMap& null_map, RETURN_IF_ERROR(_build_expr_ctxs[i]->execute(&block, &result_col_id)); } - // TODO: opt the column is const - block.get_by_position(result_col_id).column = - block.get_by_position(result_col_id).column->convert_to_full_column_if_const(); - + // MutableBlock assume no const column in build block if (_is_null_safe_eq_join[i]) { raw_ptrs[i] = block.get_by_position(result_col_id).column.get(); } else { @@ -975,7 +1005,7 @@ Status HashJoinNode::extract_probe_join_column(Block& block, NullMap& null_map, return Status::OK(); } -Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) { +Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uint8_t offset) { SCOPED_TIMER(_build_table_timer); size_t rows = block.rows(); if (rows == 0) { @@ -983,9 +1013,6 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) { } COUNTER_UPDATE(_build_rows_counter, rows); - auto& acquired_block = _acquire_list.acquire(std::move(block)); - materialize_block_inplace(acquired_block); - ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size()); NullMap null_map_val(rows); @@ -997,7 +1024,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) { [&](auto&& arg) -> Status { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - return extract_build_join_column(acquired_block, null_map_val, raw_ptrs, + return extract_build_join_column(block, null_map_val, raw_ptrs, has_null, *_build_expr_call_timer); } else { LOG(FATAL) << "FATAL: uninited hash table"; @@ -1014,7 +1041,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) { if constexpr (!std::is_same_v) { #define CALL_BUILD_FUNCTION(HAS_NULL, BUILD_UNIQUE) \ ProcessHashTableBuild hash_table_build_process( \ - rows, acquired_block, raw_ptrs, this, state->batch_size()); \ + rows, block, raw_ptrs, this, state->batch_size(), offset); \ st = hash_table_build_process(arg, &null_map_val, has_runtime_filter); if (std::pair {has_null, _build_unique} == std::pair {true, true}) { CALL_BUILD_FUNCTION(true, true); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index ba6f394b3f..4fbfc15d39 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -201,8 +201,8 @@ private: Arena _arena; HashTableVariants _hash_table_variants; - AcquireList _acquire_list; + std::vector _build_blocks; Block _probe_block; ColumnRawPtrs _probe_columns; ColumnUInt8::MutablePtr _null_map_column; @@ -218,7 +218,6 @@ private: const bool _match_all_build; // output all rows coming from the build input. Full/Right Join bool _build_unique; // build a hash table without duplicated rows. Left semi/anti Join - const bool _is_left_semi_anti; const bool _is_right_semi_anti; const bool _is_outer_join; bool _have_other_join_conjunct = false; @@ -227,9 +226,12 @@ private: int _right_col_idx = 0; int _right_col_len = 0; + std::vector _items_counts; + std::vector _build_block_offsets; + std::vector _build_block_rows; private: Status _hash_table_build(RuntimeState* state); - Status _process_build_block(RuntimeState* state, Block& block); + Status _process_build_block(RuntimeState* state, Block& block, uint8_t offset); Status extract_build_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs, bool& ignore_null, RuntimeProfile::Counter& expr_call_timer); diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 3e6f73dae7..9031e0abad 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -26,8 +26,9 @@ namespace vectorized { template struct HashTableBuild { HashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs, - VSetOperationNode* operation_node) + VSetOperationNode* operation_node, uint8_t offset) : _rows(rows), + _offset(offset), _acquired_block(acquired_block), _build_raw_ptrs(build_raw_ptrs), _operation_node(operation_node) {} @@ -54,7 +55,7 @@ struct HashTableBuild { } if (emplace_result.is_inserted()) { //only inserted once as the same key, others skip - new (&emplace_result.get_mapped()) Mapped({&_acquired_block, k}); + new (&emplace_result.get_mapped()) Mapped({k, _offset}); _operation_node->_valid_element_in_hash_tbl++; } } @@ -63,6 +64,7 @@ struct HashTableBuild { private: const int _rows; + const uint8_t _offset; Block& _acquired_block; ColumnRawPtrs& _build_raw_ptrs; VSetOperationNode* _operation_node; @@ -138,6 +140,7 @@ Status VSetOperationNode::prepare(RuntimeState* state) { _left_table_data_types.push_back(ctx->root()->data_type()); } hash_table_init(); + return Status::OK(); } @@ -225,6 +228,10 @@ void VSetOperationNode::hash_table_init() { Status VSetOperationNode::hash_table_build(RuntimeState* state) { RETURN_IF_ERROR(child(0)->open(state)); Block block; + MutableBlock mutable_block(_left_table_data_types); + + uint8_t index = 0; + int64_t last_mem_used = 0; bool eos = false; while (!eos) { block.clear(); @@ -237,29 +244,44 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { _mem_used += allocated_bytes; RETURN_IF_LIMIT_EXCEEDED(state, "Set Operation Node, while getting next from the child 0."); - RETURN_IF_ERROR(process_build_block(block)); - RETURN_IF_LIMIT_EXCEEDED(state, "Set Operation Node, while constructing the hash table."); + if (block.rows() != 0) { mutable_block.merge(block); } + + // make one block for each 4 gigabytes + constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; + if (_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE) { + _build_blocks.emplace_back(mutable_block.to_block()); + // TODO:: Rethink may we should do the proess after we recevie all build blocks ? + // which is better. + RETURN_IF_ERROR(process_build_block(_build_blocks[index], index)); + RETURN_IF_LIMIT_EXCEEDED(state, "Set Operation Node, while constructing the hash table."); + mutable_block = MutableBlock(_left_table_data_types); + ++index; + last_mem_used = _mem_used; + } } + + _build_blocks.emplace_back(mutable_block.to_block()); + RETURN_IF_ERROR(process_build_block(_build_blocks[index], index)); + RETURN_IF_LIMIT_EXCEEDED(state, "Set Operation Node, while constructing the hash table."); return Status::OK(); } -Status VSetOperationNode::process_build_block(Block& block) { +Status VSetOperationNode::process_build_block(Block& block, uint8_t offset) { size_t rows = block.rows(); if (rows == 0) { return Status::OK(); } - auto& acquired_block = _acquire_list.acquire(std::move(block)); - vectorized::materialize_block_inplace(acquired_block); + vectorized::materialize_block_inplace(block); ColumnRawPtrs raw_ptrs(_child_expr_lists[0].size()); - RETURN_IF_ERROR(extract_build_column(acquired_block, raw_ptrs)); + RETURN_IF_ERROR(extract_build_column(block, raw_ptrs)); std::visit( [&](auto&& arg) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - HashTableBuild hash_table_build_process(rows, acquired_block, - raw_ptrs, this); + HashTableBuild hash_table_build_process(rows, block, + raw_ptrs, this, offset); hash_table_build_process(arg); } else { LOG(FATAL) << "FATAL: uninited hash table"; diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index f31532ff8f..1f8519c955 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -49,7 +49,7 @@ protected: //It's time to abstract out the same methods and provide them directly to others; void hash_table_init(); Status hash_table_build(RuntimeState* state); - Status process_build_block(Block& block); + Status process_build_block(Block& block, uint8_t offset); Status extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs); Status extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, int child_id); template @@ -81,6 +81,7 @@ protected: //record insert column id during probe std::vector _probe_column_inserted_id; + std::vector _build_blocks; Block _probe_block; ColumnRawPtrs _probe_columns; std::vector _mutable_cols; @@ -150,6 +151,7 @@ struct HashTableProbe { _left_table_data_types(operation_node->_left_table_data_types), _batch_size(batch_size), _probe_rows(probe_rows), + _build_blocks(operation_node->_build_blocks), _probe_block(operation_node->_probe_block), _probe_index(operation_node->_probe_index), _num_rows_returned(operation_node->_num_rows_returned), @@ -183,9 +185,10 @@ struct HashTableProbe { } void add_result_columns(RowRefList& value, int& block_size) { + auto it = value.begin(); for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end(); ++idx) { - auto& column = *value.begin()->block->get_by_position(idx->first).column; - _mutable_cols[idx->second]->insert_from(column, value.begin()->row_num); + auto& column = *_build_blocks[it->block_offset].get_by_position(idx->first).column; + _mutable_cols[idx->second]->insert_from(column, it->row_num); } block_size++; } @@ -230,6 +233,7 @@ private: const DataTypes& _left_table_data_types; const int _batch_size; const size_t _probe_rows; + const std::vector& _build_blocks; const Block& _probe_block; int& _probe_index; int64_t& _num_rows_returned; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 8819eaa236..cd830e5fbf 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -156,9 +156,7 @@ Status VDataStreamSender::Channel::add_row(Block* block, int row) { } if (_mutable_block.get() == nullptr) { - auto empty_block = block->clone_empty(); - _mutable_block.reset( - new MutableBlock(empty_block.mutate_columns(), empty_block.get_data_types())); + _mutable_block.reset(new MutableBlock(block->clone_empty())); } _mutable_block->add_row(block, row); @@ -174,9 +172,7 @@ Status VDataStreamSender::Channel::add_rows(Block* block, const std::vector } if (_mutable_block.get() == nullptr) { - auto empty_block = block->clone_empty(); - _mutable_block.reset( - new MutableBlock(empty_block.mutate_columns(), empty_block.get_data_types())); + _mutable_block.reset(new MutableBlock(block->clone_empty())); } int row_wait_add = rows.size();