[Refactor](join) refactor of hash join (#27557)

Improve the performance under the tpch data set by reconstructing the join related code and the use of hash table

Co-authored-by: HappenLee <happenlee@hotmail.com>
Co-authored-by: BiteTheDDDDt <pxl290@qq.com>
This commit is contained in:
Pxl
2023-11-28 19:46:00 +08:00
committed by GitHub
parent 1b509ab13c
commit d969047b50
67 changed files with 1579 additions and 1937 deletions

View File

@ -58,7 +58,6 @@ VSetOperationNode<is_intersect>::VSetOperationNode(ObjectPool* pool, const TPlan
: ExecNode(pool, tnode, descs),
_valid_element_in_hash_tbl(0),
_mem_used(0),
_build_block_index(0),
_build_finished(false) {
_hash_table_variants = std::make_unique<HashTableVariants>();
}
@ -219,7 +218,7 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
}
return;
}
if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32, RowRefListWithFlags>(
if (!try_get_hash_map_context_fixed<JoinFixedHashMap, HashCRC32, RowRefListWithFlags>(
*_hash_table_variants, _child_expr_lists[0])) {
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListWithFlags>>();
}
@ -228,36 +227,46 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::sink(RuntimeState* state, Block* block, bool eos) {
SCOPED_TIMER(_exec_timer);
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
if (block->rows() != 0) {
_mem_used += block->allocated_bytes();
RETURN_IF_ERROR(_mutable_block.merge(*block));
}
if (eos || _mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
_build_blocks.emplace_back(_mutable_block.to_block());
RETURN_IF_ERROR(
process_build_block(_build_blocks[_build_block_index], _build_block_index, state));
_mutable_block.clear();
++_build_block_index;
if (eos) {
if constexpr (is_intersect) {
_valid_element_in_hash_tbl = 0;
} else {
std::visit(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
_valid_element_in_hash_tbl = arg.hash_table->size();
}
},
*_hash_table_variants);
}
_build_finished = true;
_can_read = _children.size() == 1;
if (block->rows() != 0) {
if (_build_block.empty()) {
RETURN_IF_ERROR(_mutable_block.merge(*(block->create_same_struct_block(0, false))));
}
RETURN_IF_ERROR(_mutable_block.merge(*block));
if (_mutable_block.rows() > std::numeric_limits<uint32_t>::max()) {
return Status::NotSupported(
"Hash join do not support build table rows"
" over:" +
std::to_string(std::numeric_limits<uint32_t>::max()));
}
}
if (eos) {
if (!_mutable_block.empty()) {
_build_block = _mutable_block.to_block();
}
RETURN_IF_ERROR(process_build_block(_build_block, state));
_mutable_block.clear();
if constexpr (is_intersect) {
_valid_element_in_hash_tbl = 0;
} else {
std::visit(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
_valid_element_in_hash_tbl = arg.hash_table->size();
}
},
*_hash_table_variants);
}
_build_finished = true;
_can_read = _children.size() == 1;
}
return Status::OK();
}
@ -310,8 +319,7 @@ Status VSetOperationNode<is_intersect>::hash_table_build(RuntimeState* state) {
}
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_t offset,
RuntimeState* state) {
Status VSetOperationNode<is_intersect>::process_build_block(Block& block, RuntimeState* state) {
size_t rows = block.rows();
if (rows == 0) {
return Status::OK();
@ -326,7 +334,7 @@ Status VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
HashTableBuild<HashTableCtxType, is_intersect> hash_table_build_process(
this, rows, raw_ptrs, offset, state);
this, rows, raw_ptrs, state);
st = hash_table_build_process(arg, _arena);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
@ -342,8 +350,8 @@ void VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags& va
int& block_size) {
auto it = value.begin();
for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end(); ++idx) {
auto& column = *_build_blocks[it->block_offset].get_by_position(idx->first).column;
if (_mutable_cols[idx->second]->is_nullable() xor column.is_nullable()) {
const auto& column = *_build_block.get_by_position(idx->first).column;
if (_mutable_cols[idx->second]->is_nullable() ^ column.is_nullable()) {
DCHECK(_mutable_cols[idx->second]->is_nullable());
((ColumnNullable*)(_mutable_cols[idx->second].get()))
->insert_from_not_nullable(column, it->row_num);
@ -512,10 +520,6 @@ void VSetOperationNode<is_intersect>::debug_string(int indentation_level,
template <bool is_intersect>
void VSetOperationNode<is_intersect>::release_mem() {
_hash_table_variants = nullptr;
std::vector<Block> tmp_build_blocks;
_build_blocks.swap(tmp_build_blocks);
_probe_block.clear();
}