[Refactor](join) split SetHashTableVariants out from HashTableVariants (#29519)
split SetHashTableVariants out from HashTableVariants
This commit is contained in:
@ -56,7 +56,7 @@ template <bool is_intersect>
|
||||
VSetOperationNode<is_intersect>::VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
|
||||
const DescriptorTbl& descs)
|
||||
: ExecNode(pool, tnode, descs), _valid_element_in_hash_tbl(0), _build_finished(false) {
|
||||
_hash_table_variants = std::make_unique<HashTableVariants>();
|
||||
_hash_table_variants = std::make_unique<SetHashTableVariants>();
|
||||
}
|
||||
|
||||
template <bool is_intersect>
|
||||
@ -86,7 +86,7 @@ Status VSetOperationNode<is_intersect>::init(const TPlanNode& tnode, RuntimeStat
|
||||
return Status::NotSupported("Not Implemented, Check The Operation Node.");
|
||||
}
|
||||
|
||||
for (auto& texprs : *result_texpr_lists) {
|
||||
for (const auto& texprs : *result_texpr_lists) {
|
||||
VExprContextSPtrs ctxs;
|
||||
RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, ctxs));
|
||||
_child_expr_lists.push_back(ctxs);
|
||||
@ -424,10 +424,10 @@ Status VSetOperationNode<is_intersect>::extract_build_column(Block& block,
|
||||
|
||||
block.get_by_position(result_col_id).column =
|
||||
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
|
||||
auto column = block.get_by_position(result_col_id).column.get();
|
||||
const auto* column = block.get_by_position(result_col_id).column.get();
|
||||
|
||||
if (auto* nullable = check_and_get_column<ColumnNullable>(*column)) {
|
||||
auto& col_nested = nullable->get_nested_column();
|
||||
if (const auto* nullable = check_and_get_column<ColumnNullable>(*column)) {
|
||||
const auto& col_nested = nullable->get_nested_column();
|
||||
if (_build_not_ignore_null[i]) {
|
||||
raw_ptrs[i] = nullable;
|
||||
} else {
|
||||
@ -452,10 +452,10 @@ Status VSetOperationNode<is_intersect>::extract_probe_column(Block& block, Colum
|
||||
|
||||
block.get_by_position(result_col_id).column =
|
||||
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
|
||||
auto column = block.get_by_position(result_col_id).column.get();
|
||||
const auto* column = block.get_by_position(result_col_id).column.get();
|
||||
|
||||
if (auto* nullable = check_and_get_column<ColumnNullable>(*column)) {
|
||||
auto& col_nested = nullable->get_nested_column();
|
||||
if (const auto* nullable = check_and_get_column<ColumnNullable>(*column)) {
|
||||
const auto& col_nested = nullable->get_nested_column();
|
||||
if (_build_not_ignore_null[i]) { //same as build column
|
||||
raw_ptrs[i] = nullable;
|
||||
} else {
|
||||
@ -496,8 +496,8 @@ void VSetOperationNode<is_intersect>::debug_string(int indentation_level,
|
||||
std::stringstream* out) const {
|
||||
*out << string(indentation_level * 2, ' ');
|
||||
*out << " _child_expr_lists=[";
|
||||
for (int i = 0; i < _child_expr_lists.size(); ++i) {
|
||||
*out << VExpr::debug_string(_child_expr_lists[i]) << ", ";
|
||||
for (const auto& _child_expr_list : _child_expr_lists) {
|
||||
*out << VExpr::debug_string(_child_expr_list) << ", ";
|
||||
}
|
||||
*out << "] \n";
|
||||
ExecNode::debug_string(indentation_level, out);
|
||||
@ -516,50 +516,45 @@ void VSetOperationNode<is_intersect>::refresh_hash_table() {
|
||||
[&](auto&& arg) {
|
||||
using HashTableCtxType = std::decay_t<decltype(arg)>;
|
||||
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
|
||||
if constexpr (std::is_same_v<typename HashTableCtxType::Mapped,
|
||||
RowRefListWithFlags>) {
|
||||
auto tmp_hash_table =
|
||||
std::make_shared<typename HashTableCtxType::HashMapType>();
|
||||
bool is_need_shrink =
|
||||
arg.hash_table->should_be_shrink(_valid_element_in_hash_tbl);
|
||||
if (is_intersect || is_need_shrink) {
|
||||
tmp_hash_table->init_buf_size(
|
||||
_valid_element_in_hash_tbl / arg.hash_table->get_factor() + 1);
|
||||
}
|
||||
auto tmp_hash_table =
|
||||
std::make_shared<typename HashTableCtxType::HashMapType>();
|
||||
bool is_need_shrink =
|
||||
arg.hash_table->should_be_shrink(_valid_element_in_hash_tbl);
|
||||
if (is_intersect || is_need_shrink) {
|
||||
tmp_hash_table->init_buf_size(
|
||||
_valid_element_in_hash_tbl / arg.hash_table->get_factor() + 1);
|
||||
}
|
||||
|
||||
arg.init_iterator();
|
||||
auto& iter = arg.iterator;
|
||||
auto iter_end = arg.hash_table->end();
|
||||
std::visit(
|
||||
[&](auto is_need_shrink_const) {
|
||||
while (iter != iter_end) {
|
||||
auto& mapped = iter->get_second();
|
||||
auto it = mapped.begin();
|
||||
arg.init_iterator();
|
||||
auto& iter = arg.iterator;
|
||||
auto iter_end = arg.hash_table->end();
|
||||
std::visit(
|
||||
[&](auto is_need_shrink_const) {
|
||||
while (iter != iter_end) {
|
||||
auto& mapped = iter->get_second();
|
||||
auto it = mapped.begin();
|
||||
|
||||
if constexpr (is_intersect) { //intersected
|
||||
if (it->visited) {
|
||||
it->visited = false;
|
||||
if constexpr (is_intersect) { //intersected
|
||||
if (it->visited) {
|
||||
it->visited = false;
|
||||
tmp_hash_table->insert(iter->get_value());
|
||||
}
|
||||
++iter;
|
||||
} else { //except
|
||||
if constexpr (is_need_shrink_const) {
|
||||
if (!it->visited) {
|
||||
tmp_hash_table->insert(iter->get_value());
|
||||
}
|
||||
++iter;
|
||||
} else { //except
|
||||
if constexpr (is_need_shrink_const) {
|
||||
if (!it->visited) {
|
||||
tmp_hash_table->insert(iter->get_value());
|
||||
}
|
||||
}
|
||||
++iter;
|
||||
}
|
||||
++iter;
|
||||
}
|
||||
},
|
||||
make_bool_variant(is_need_shrink));
|
||||
}
|
||||
},
|
||||
make_bool_variant(is_need_shrink));
|
||||
|
||||
arg.reset();
|
||||
if (is_intersect || is_need_shrink) {
|
||||
arg.hash_table = std::move(tmp_hash_table);
|
||||
}
|
||||
} else {
|
||||
LOG(FATAL) << "FATAL: Invalid RowRefList";
|
||||
arg.reset();
|
||||
if (is_intersect || is_need_shrink) {
|
||||
arg.hash_table = std::move(tmp_hash_table);
|
||||
}
|
||||
} else {
|
||||
LOG(FATAL) << "FATAL: uninited hash table";
|
||||
@ -578,22 +573,18 @@ Status VSetOperationNode<is_intersected>::get_data_in_hashtable(HashTableContext
|
||||
auto& iter = hash_table_ctx.iterator;
|
||||
auto block_size = 0;
|
||||
|
||||
if constexpr (std::is_same_v<typename HashTableContext::Mapped, RowRefListWithFlags>) {
|
||||
for (; iter != hash_table_ctx.hash_table->end() && block_size < batch_size; ++iter) {
|
||||
auto& value = iter->get_second();
|
||||
auto it = value.begin();
|
||||
if constexpr (is_intersected) {
|
||||
if (it->visited) { //intersected: have done probe, so visited values it's the result
|
||||
add_result_columns(value, block_size);
|
||||
}
|
||||
} else {
|
||||
if (!it->visited) { //except: haven't visited values it's the needed result
|
||||
add_result_columns(value, block_size);
|
||||
}
|
||||
for (; iter != hash_table_ctx.hash_table->end() && block_size < batch_size; ++iter) {
|
||||
auto& value = iter->get_second();
|
||||
auto it = value.begin();
|
||||
if constexpr (is_intersected) {
|
||||
if (it->visited) { //intersected: have done probe, so visited values it's the result
|
||||
add_result_columns(value, block_size);
|
||||
}
|
||||
} else {
|
||||
if (!it->visited) { //except: haven't visited values it's the needed result
|
||||
add_result_columns(value, block_size);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Status::InternalError("Invalid RowRefListType!");
|
||||
}
|
||||
|
||||
*eos = iter == hash_table_ctx.hash_table->end();
|
||||
|
||||
Reference in New Issue
Block a user