[pipelineX](refactor) Use class template to simplify join (#25369)

This commit is contained in:
Gabriel
2023-10-13 16:51:55 +08:00
committed by GitHub
parent f4e2eb6564
commit 37dbda6209
14 changed files with 442 additions and 533 deletions

View File

@ -52,11 +52,15 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
_shared_state->probe_key_sz = p._build_key_sz;
_shared_state->build_blocks.reset(new std::vector<vectorized::Block>());
// avoid vector expand change block address.
// one block can store 4g data, _build_blocks can store 128*4g data.
// if probe data bigger than 512g, runtime filter maybe will core dump when insert data.
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
if (p._is_broadcast_join && state->enable_share_hash_table_for_broadcast_join()) {
_shared_state->build_blocks = p._shared_hash_table_context->blocks;
} else {
_shared_state->build_blocks.reset(new std::vector<vectorized::Block>());
// avoid vector expand change block address.
// one block can store 4g data, _build_blocks can store 128*4g data.
// if probe data bigger than 512g, runtime filter maybe will core dump when insert data.
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
}
_shared_state->is_null_safe_eq_join = p._is_null_safe_eq_join;
_shared_state->store_null_in_hash_table = p._store_null_in_hash_table;
_build_expr_ctxs.resize(p._build_expr_ctxs.size());
@ -79,6 +83,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
if (!_should_build_hash_table) {
_shared_hash_table_dependency->block_writing();
p._shared_hashtable_controller->append_dependency(p.id(), _shared_hash_table_dependency);
} else if (p._is_broadcast_join) {
// avoid vector expand change block address.
// one block can store 4g data, _build_blocks can store 128*4g data.
// if probe data bigger than 512g, runtime filter maybe will core dump when insert data.
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
}
_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
@ -135,6 +144,18 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}
vectorized::Sizes& HashJoinBuildSinkLocalState::build_key_sz() {
return _parent->cast<HashJoinBuildSinkOperatorX>()._build_key_sz;
}
bool HashJoinBuildSinkLocalState::build_unique() const {
return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique;
}
std::vector<TRuntimeFilterDesc>& HashJoinBuildSinkLocalState::runtime_filter_descs() const {
return _parent->cast<HashJoinBuildSinkOperatorX>()._runtime_filter_descs;
}
void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->short_circuit_for_probe =
@ -201,9 +222,9 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
[&](auto&& arg, auto has_null_value,
auto short_circuit_for_null_in_build_side) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
vectorized::HashJoinBuildContext context(this);
vectorized::ProcessHashTableBuild<HashTableCtxType>
hash_table_build_process(rows, block, raw_ptrs, &context,
vectorized::ProcessHashTableBuild<HashTableCtxType,
HashJoinBuildSinkLocalState>
hash_table_build_process(rows, block, raw_ptrs, this,
state->batch_size(), offset, state);
return hash_table_build_process
.template run<has_null_value, short_circuit_for_null_in_build_side>(
@ -246,11 +267,6 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN ||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN,
vectorized::RowRefListWithFlag, vectorized::RowRefList>>;
_shared_state->probe_row_match_iter
.emplace<vectorized::ForwardIterator<RowRefListType>>();
_shared_state->outer_join_pull_visited_iter
.emplace<vectorized::ForwardIterator<RowRefListType>>();
if (_build_expr_ctxs.size() == 1 && !p._store_null_in_hash_table[0]) {
// Single column optimization
switch (_build_expr_ctxs[0]->root()->result_type()) {
@ -519,18 +535,16 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
state, (*local_state._shared_state->build_blocks)[local_state._build_block_idx],
local_state._build_block_idx));
}
auto ret = std::visit(Overload {[&](std::monostate&) -> Status {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
},
[&](auto&& arg) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
vectorized::RuntimeFilterContext context(&local_state);
vectorized::ProcessRuntimeFilterBuild<HashTableCtxType>
runtime_filter_build_process(&context);
return runtime_filter_build_process(state, arg);
}},
*local_state._shared_state->hash_table_variants);
auto ret = std::visit(
Overload {[&](std::monostate&) -> Status {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
},
[&](auto&& arg) -> Status {
vectorized::ProcessRuntimeFilterBuild runtime_filter_build_process;
return runtime_filter_build_process(state, arg, &local_state);
}},
*local_state._shared_state->hash_table_variants);
if (!ret.ok()) {
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = ret;
@ -542,7 +556,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
_shared_hash_table_context->status = Status::OK();
// arena will be shared with other instances.
_shared_hash_table_context->arena = local_state._shared_state->arena;
_shared_hash_table_context->blocks = local_state._shared_state->build_blocks;
_shared_hash_table_context->hash_table_variants =
local_state._shared_state->hash_table_variants;
_shared_hash_table_context->short_circuit_for_null_in_probe_side =
@ -578,8 +591,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
*std::static_pointer_cast<vectorized::HashTableVariants>(
_shared_hash_table_context->hash_table_variants));
local_state._shared_state->build_blocks = _shared_hash_table_context->blocks;
if (!_shared_hash_table_context->runtime_filters.empty()) {
auto ret = std::visit(
Overload {

View File

@ -70,12 +70,25 @@ public:
void init_short_circuit_for_probe();
HashJoinBuildSinkOperatorX* join_build() { return (HashJoinBuildSinkOperatorX*)_parent; }
vectorized::Sizes& build_key_sz();
bool build_unique() const;
std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const;
std::shared_ptr<vectorized::Arena> arena() { return _shared_state->arena; }
void add_hash_buckets_info(const std::string& info) const {
_profile->add_info_string("HashTableBuckets", info);
}
void add_hash_buckets_filled_info(const std::string& info) const {
_profile->add_info_string("HashTableFilledBuckets", info);
}
protected:
void _hash_table_init(RuntimeState* state);
void _set_build_ignore_flag(vectorized::Block& block, const std::vector<int>& res_col_ids);
friend class HashJoinBuildSinkOperatorX;
friend struct vectorized::HashJoinBuildContext;
friend struct vectorized::RuntimeFilterContext;
template <class HashTableContext, typename Parent>
friend struct vectorized::ProcessHashTableBuild;
friend struct vectorized::ProcessRuntimeFilterBuild;
// build expr
vectorized::VExprContextSPtrs _build_expr_ctxs;
@ -139,16 +152,13 @@ public:
}
bool should_dry_run(RuntimeState* state) override {
auto tmp = _is_broadcast_join && !state->get_sink_local_state(id())
->cast<HashJoinBuildSinkLocalState>()
._should_build_hash_table;
return tmp;
return _is_broadcast_join && !state->get_sink_local_state(id())
->cast<HashJoinBuildSinkLocalState>()
._should_build_hash_table;
}
private:
friend class HashJoinBuildSinkLocalState;
friend struct vectorized::HashJoinBuildContext;
friend struct vectorized::RuntimeFilterContext;
// build expr
vectorized::VExprContextSPtrs _build_expr_ctxs;
@ -165,9 +175,6 @@ private:
vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
std::atomic_bool _probe_open_finish = false;
bool _probe_ignore_null = false;
};
} // namespace pipeline

View File

@ -59,12 +59,68 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info)
return Status::OK();
}
Status HashJoinProbeLocalState::open(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(JoinProbeLocalState::open(state));
_process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
auto& p = _parent->cast<HashJoinProbeOperatorX>();
std::visit(
[&](auto&& join_op_variants, auto have_other_join_conjunct) {
using JoinOpType = std::decay_t<decltype(join_op_variants)>;
using RowRefListType = std::conditional_t<
have_other_join_conjunct, vectorized::RowRefListWithFlags,
std::conditional_t<JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN ||
JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN ||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN,
vectorized::RowRefListWithFlag, vectorized::RowRefList>>;
_probe_row_match_iter.emplace<vectorized::ForwardIterator<RowRefListType>>();
_outer_join_pull_visited_iter
.emplace<vectorized::ForwardIterator<RowRefListType>>();
_process_hashtable_ctx_variants->emplace<vectorized::ProcessHashTableProbe<
JoinOpType::value, HashJoinProbeLocalState>>(this, state->batch_size());
},
_shared_state->join_op_variants,
vectorized::make_bool_variant(p._have_other_join_conjunct));
return Status::OK();
}
void HashJoinProbeLocalState::prepare_for_next() {
_probe_index = 0;
_ready_probe = false;
_prepare_probe_block();
}
bool HashJoinProbeLocalState::have_other_join_conjunct() const {
return _parent->cast<HashJoinProbeOperatorX>()._have_other_join_conjunct;
}
bool HashJoinProbeLocalState::is_right_semi_anti() const {
return _parent->cast<HashJoinProbeOperatorX>()._is_right_semi_anti;
}
bool HashJoinProbeLocalState::is_outer_join() const {
return _parent->cast<HashJoinProbeOperatorX>()._is_outer_join;
}
std::vector<bool>* HashJoinProbeLocalState::left_output_slot_flags() {
return &_parent->cast<HashJoinProbeOperatorX>()._left_output_slot_flags;
}
std::vector<bool>* HashJoinProbeLocalState::right_output_slot_flags() {
return &_parent->cast<HashJoinProbeOperatorX>()._right_output_slot_flags;
}
vectorized::DataTypes HashJoinProbeLocalState::right_table_data_types() {
return _parent->cast<HashJoinProbeOperatorX>()._right_table_data_types;
}
vectorized::DataTypes HashJoinProbeLocalState::left_table_data_types() {
return _parent->cast<HashJoinProbeOperatorX>()._left_table_data_types;
}
Status HashJoinProbeLocalState::close(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_close_timer);
@ -111,16 +167,7 @@ void HashJoinProbeLocalState::init_for_probe(RuntimeState* state) {
if (_probe_inited) {
return;
}
_process_hashtable_ctx_variants = std::make_unique<vectorized::HashTableCtxVariants>();
std::visit(
[&](auto&& join_op_variants) {
using JoinOpType = std::decay_t<decltype(join_op_variants)>;
_probe_context.reset(new vectorized::HashJoinProbeContext(this));
_process_hashtable_ctx_variants
->emplace<vectorized::ProcessHashTableProbe<JoinOpType::value>>(
_probe_context.get(), state->batch_size());
},
_shared_state->join_op_variants);
_probe_inited = true;
}

View File

@ -45,6 +45,22 @@ public:
Status open(RuntimeState*) override { return Status::OK(); }
};
class HashJoinProbeLocalState;
using HashTableCtxVariants = std::variant<
std::monostate,
vectorized::ProcessHashTableProbe<TJoinOp::INNER_JOIN, HashJoinProbeLocalState>,
vectorized::ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN, HashJoinProbeLocalState>,
vectorized::ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN, HashJoinProbeLocalState>,
vectorized::ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN, HashJoinProbeLocalState>,
vectorized::ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN, HashJoinProbeLocalState>,
vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN, HashJoinProbeLocalState>,
vectorized::ProcessHashTableProbe<TJoinOp::CROSS_JOIN, HashJoinProbeLocalState>,
vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN, HashJoinProbeLocalState>,
vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN, HashJoinProbeLocalState>,
vectorized::ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN,
HashJoinProbeLocalState>>;
class HashJoinProbeOperatorX;
class HashJoinProbeLocalState final
: public JoinProbeLocalState<HashJoinDependency, HashJoinProbeLocalState> {
@ -55,6 +71,7 @@ public:
~HashJoinProbeLocalState() override = default;
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
void prepare_for_next();
@ -64,13 +81,25 @@ public:
SourceState& source_state, vectorized::Block* temp_block,
bool check_rows_count = true);
HashJoinProbeOperatorX* join_probe() { return (HashJoinProbeOperatorX*)_parent; }
bool have_other_join_conjunct() const;
bool is_right_semi_anti() const;
bool is_outer_join() const;
std::vector<bool>* left_output_slot_flags();
std::vector<bool>* right_output_slot_flags();
vectorized::DataTypes right_table_data_types();
vectorized::DataTypes left_table_data_types();
bool* has_null_in_build_side() { return &_shared_state->_has_null_in_build_side; }
std::shared_ptr<std::vector<vectorized::Block>> build_blocks() const {
return _shared_state->build_blocks;
}
vectorized::Sizes probe_key_sz() const { return _shared_state->probe_key_sz; }
private:
void _prepare_probe_block();
bool _need_probe_null_map(vectorized::Block& block, const std::vector<int>& res_col_ids);
friend class HashJoinProbeOperatorX;
friend struct vectorized::HashJoinProbeContext;
template <int JoinOpType, typename Parent>
friend struct vectorized::ProcessHashTableProbe;
int _probe_index = -1;
bool _ready_probe = false;
@ -88,12 +117,15 @@ private:
bool _need_null_map_for_probe = false;
bool _has_set_need_null_map_for_probe = false;
std::unique_ptr<vectorized::HashJoinProbeContext> _probe_context;
vectorized::ColumnUInt8::MutablePtr _null_map_column;
// for cases when a probe row matches more than batch size build rows.
bool _is_any_probe_match_row_output = false;
std::unique_ptr<vectorized::HashTableCtxVariants> _process_hashtable_ctx_variants =
std::make_unique<vectorized::HashTableCtxVariants>();
std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants =
std::make_unique<HashTableCtxVariants>();
// for full/right outer join
vectorized::HashTableIteratorVariants _outer_join_pull_visited_iter;
vectorized::HashTableIteratorVariants _probe_row_match_iter;
RuntimeProfile::Counter* _probe_expr_call_timer;
RuntimeProfile::Counter* _probe_next_timer;
@ -125,7 +157,6 @@ private:
RuntimeProfile::Counter& expr_call_timer,
std::vector<int>& res_col_ids) const;
friend class HashJoinProbeLocalState;
friend struct vectorized::HashJoinProbeContext;
// other expr
vectorized::VExprContextSPtrs _other_join_conjuncts;

View File

@ -152,7 +152,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
COUNTER_SET(local_state._hash_table_size_counter, int64_t(local_state._num_partition));
//so all data from child have sink completed
local_state._dependency->set_ready_for_read();
local_state._dependency->set_eos();
}
return Status::OK();

View File

@ -52,8 +52,8 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
local_state._shared_state->blocks_buffer.front().swap(*output_block);
local_state._shared_state->blocks_buffer.pop();
//if buffer have no data, block reading and wait for signal again
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block,
output_block->columns()));
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
local_state._conjuncts, output_block, output_block->columns()));
if (local_state._shared_state->blocks_buffer.empty()) {
local_state._dependency->block_reading();
}
@ -67,13 +67,12 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
// if we move the _blocks_buffer output at last(behind 286 line),
// it's maybe eos but not output all data: when _blocks_buffer.empty() and _can_read = false (this: _sort_idx && _partition_sorts.size() are 0)
RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state));
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block,
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block,
output_block->columns()));
{
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
if (local_state._shared_state->blocks_buffer.empty() &&
local_state._shared_state->sort_idx >=
local_state._shared_state->partition_sorts.size()) {
local_state._sort_idx >= local_state._shared_state->partition_sorts.size()) {
source_state = SourceState::FINISHED;
}
}
@ -91,20 +90,18 @@ Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state,
SCOPED_TIMER(local_state._get_sorted_timer);
//sorter output data one by one
bool current_eos = false;
if (local_state._shared_state->sort_idx < local_state._shared_state->partition_sorts.size()) {
RETURN_IF_ERROR(
local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx]
->get_next(state, output_block, &current_eos));
if (local_state._sort_idx < local_state._shared_state->partition_sorts.size()) {
RETURN_IF_ERROR(local_state._shared_state->partition_sorts[local_state._sort_idx]->get_next(
state, output_block, &current_eos));
}
if (current_eos) {
//current sort have eos, so get next idx
local_state._shared_state->previous_row->reset();
auto rows = local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx]
auto rows = local_state._shared_state->partition_sorts[local_state._sort_idx]
->get_output_rows();
local_state._num_rows_returned += rows;
local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx].reset(
nullptr);
local_state._shared_state->sort_idx++;
local_state._shared_state->partition_sorts[local_state._sort_idx].reset(nullptr);
local_state._sort_idx++;
}
return Status::OK();

View File

@ -68,6 +68,7 @@ private:
RuntimeProfile::Counter* _get_sorted_timer;
RuntimeProfile::Counter* _get_next_timer;
int64_t _num_rows_returned;
int _sort_idx = 0;
};
class PartitionSortSourceOperatorX final : public OperatorX<PartitionSortSourceLocalState> {

View File

@ -565,14 +565,10 @@ struct HashJoinSharedState : public JoinSharedState {
// maybe share hash table with other fragment instances
std::shared_ptr<vectorized::HashTableVariants> hash_table_variants =
std::make_shared<vectorized::HashTableVariants>();
// for full/right outer join
vectorized::HashTableIteratorVariants outer_join_pull_visited_iter;
vectorized::HashTableIteratorVariants probe_row_match_iter;
vectorized::Sizes probe_key_sz;
const std::vector<TupleDescriptor*> build_side_child_desc;
size_t build_exprs_size = 0;
std::shared_ptr<std::vector<vectorized::Block>> build_blocks =
std::make_shared<std::vector<vectorized::Block>>();
std::shared_ptr<std::vector<vectorized::Block>> build_blocks = nullptr;
bool probe_ignore_null = false;
};
@ -626,20 +622,31 @@ public:
std::mutex buffer_mutex;
std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts;
std::unique_ptr<vectorized::SortCursorCmp> previous_row = nullptr;
int sort_idx = 0;
};
class PartitionSortDependency final : public WriteDependency {
public:
using SharedState = PartitionSortNodeSharedState;
PartitionSortDependency(int id) : WriteDependency(id, "PartitionSortDependency") {}
PartitionSortDependency(int id) : WriteDependency(id, "PartitionSortDependency"), _eos(false) {}
~PartitionSortDependency() override = default;
void* shared_state() override { return (void*)&_partition_sort_state; };
void set_ready_for_write() override {}
void block_writing() override {}
[[nodiscard]] Dependency* read_blocked_by() override {
if (config::enable_fuzzy_mode && !(_ready_for_read || _eos) &&
_read_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) {
LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " "
<< id();
}
return _ready_for_read || _eos ? nullptr : this;
}
void set_eos() { _eos = true; }
private:
PartitionSortNodeSharedState _partition_sort_state;
std::atomic<bool> _eos;
};
class AsyncWriterDependency final : public WriteDependency {

View File

@ -37,9 +37,9 @@ using MutableColumns = std::vector<MutableColumnPtr>;
using NullMap = ColumnUInt8::Container;
using ConstNullMapPtr = const NullMap*;
template <int JoinOpType>
template <int JoinOpType, typename Parent>
struct ProcessHashTableProbe {
ProcessHashTableProbe(HashJoinProbeContext* join_context, int batch_size);
ProcessHashTableProbe(Parent* parent, int batch_size);
~ProcessHashTableProbe() = default;
// output build side result column
@ -78,8 +78,9 @@ struct ProcessHashTableProbe {
void _emplace_element(int8_t block_offset, int32_t block_row, int& current_offset);
template <typename HashTableType>
HashTableType::State _init_probe_side(HashTableType& hash_table_ctx, size_t probe_rows,
bool with_other_join_conjuncts, const uint8_t* null_map);
typename HashTableType::State _init_probe_side(HashTableType& hash_table_ctx, size_t probe_rows,
bool with_other_join_conjuncts,
const uint8_t* null_map);
template <typename Mapped, bool with_other_join_conjuncts>
ForwardIterator<Mapped>& _probe_row_match(int& current_offset, int& probe_index,
@ -91,9 +92,9 @@ struct ProcessHashTableProbe {
Status process_data_in_hashtable(HashTableType& hash_table_ctx, MutableBlock& mutable_block,
Block* output_block, bool* eos);
vectorized::HashJoinProbeContext* _join_context;
Parent* _parent;
const int _batch_size;
const std::vector<Block>& _build_blocks;
std::shared_ptr<std::vector<Block>> _build_blocks;
std::unique_ptr<Arena> _arena;
std::vector<StringRef> _probe_keys;
@ -118,6 +119,13 @@ struct ProcessHashTableProbe {
int _right_col_len;
int _row_count_from_last_probe;
bool _have_other_join_conjunct;
bool _is_right_semi_anti;
Sizes _probe_key_sz;
std::vector<bool>* _left_output_slot_flags;
std::vector<bool>* _right_output_slot_flags;
bool* _has_null_in_build_side;
RuntimeProfile::Counter* _rows_returned_counter;
RuntimeProfile::Counter* _search_hashtable_timer;
RuntimeProfile::Counter* _build_side_output_timer;

View File

@ -18,6 +18,7 @@
#pragma once
#include "common/status.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "process_hash_table_probe.h"
#include "runtime/thread_context.h" // IWYU pragma: keep
#include "util/simd/bits.h"
@ -27,32 +28,35 @@
namespace doris::vectorized {
template <int JoinOpType>
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeContext* join_context,
int batch_size)
: _join_context(join_context),
template <int JoinOpType, typename Parent>
ProcessHashTableProbe<JoinOpType, Parent>::ProcessHashTableProbe(Parent* parent, int batch_size)
: _parent(parent),
_batch_size(batch_size),
_build_blocks(*join_context->_build_blocks),
_tuple_is_null_left_flags(
join_context->_is_outer_join
? &(reinterpret_cast<ColumnUInt8&>(
**join_context->_tuple_is_null_left_flag_column)
.get_data())
: nullptr),
_tuple_is_null_right_flags(
join_context->_is_outer_join
? &(reinterpret_cast<ColumnUInt8&>(
**join_context->_tuple_is_null_right_flag_column)
.get_data())
: nullptr),
_rows_returned_counter(join_context->_rows_returned_counter),
_search_hashtable_timer(join_context->_search_hashtable_timer),
_build_side_output_timer(join_context->_build_side_output_timer),
_probe_side_output_timer(join_context->_probe_side_output_timer),
_probe_process_hashtable_timer(join_context->_probe_process_hashtable_timer) {}
_build_blocks(parent->build_blocks()),
_tuple_is_null_left_flags(parent->is_outer_join()
? &(reinterpret_cast<ColumnUInt8&>(
*parent->_tuple_is_null_left_flag_column)
.get_data())
: nullptr),
_tuple_is_null_right_flags(parent->is_outer_join()
? &(reinterpret_cast<ColumnUInt8&>(
*parent->_tuple_is_null_right_flag_column)
.get_data())
: nullptr),
_have_other_join_conjunct(parent->have_other_join_conjunct()),
_is_right_semi_anti(parent->is_right_semi_anti()),
_probe_key_sz(parent->probe_key_sz()),
_left_output_slot_flags(parent->left_output_slot_flags()),
_right_output_slot_flags(parent->right_output_slot_flags()),
_has_null_in_build_side(parent->has_null_in_build_side()),
_rows_returned_counter(parent->_rows_returned_counter),
_search_hashtable_timer(parent->_search_hashtable_timer),
_build_side_output_timer(parent->_build_side_output_timer),
_probe_side_output_timer(parent->_probe_side_output_timer),
_probe_process_hashtable_timer(parent->_probe_process_hashtable_timer) {}
template <int JoinOpType>
void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
template <int JoinOpType, typename Parent>
void ProcessHashTableProbe<JoinOpType, Parent>::build_side_output_column(
MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size,
bool have_other_join_conjunct) {
SCOPED_TIMER(_build_side_output_timer);
@ -66,9 +70,9 @@ void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN;
if (!is_semi_anti_join || have_other_join_conjunct) {
if (_build_blocks.size() == 1) {
if (_build_blocks->size() == 1) {
for (int i = 0; i < _right_col_len; i++) {
auto& column = *_build_blocks[0].get_by_position(i).column;
auto& column = *(*_build_blocks)[0].get_by_position(i).column;
if (output_slot_flags[i]) {
mcol[i + _right_col_idx]->insert_indices_from(column, _build_block_rows.data(),
_build_block_rows.data() + size);
@ -86,7 +90,7 @@ void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
assert_cast<ColumnNullable*>(mcol[i + _right_col_idx].get())
->insert_default();
} else {
auto& column = *_build_blocks[_build_block_offsets[j]]
auto& column = *(*_build_blocks)[_build_block_offsets[j]]
.get_by_position(i)
.column;
mcol[i + _right_col_idx]->insert_from(column, _build_block_rows[j]);
@ -101,7 +105,7 @@ void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
// just insert default value
mcol[i + _right_col_idx]->insert_default();
} else {
auto& column = *_build_blocks[_build_block_offsets[j]]
auto& column = *(*_build_blocks)[_build_block_offsets[j]]
.get_by_position(i)
.column;
mcol[i + _right_col_idx]->insert_from(column, _build_block_rows[j]);
@ -125,13 +129,13 @@ void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
}
}
template <int JoinOpType>
void ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
template <int JoinOpType, typename Parent>
void ProcessHashTableProbe<JoinOpType, Parent>::probe_side_output_column(
MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size,
int last_probe_index, size_t probe_size, bool all_match_one,
bool have_other_join_conjunct) {
SCOPED_TIMER(_probe_side_output_timer);
auto& probe_block = *_join_context->_probe_block;
auto& probe_block = _parent->_probe_block;
for (int i = 0; i < output_slot_flags.size(); ++i) {
if (output_slot_flags[i]) {
auto& column = probe_block.get_by_position(i).column;
@ -152,15 +156,15 @@ void ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
}
}
template <int JoinOpType>
template <int JoinOpType, typename Parent>
template <typename HashTableType>
HashTableType::State ProcessHashTableProbe<JoinOpType>::_init_probe_side(
typename HashTableType::State ProcessHashTableProbe<JoinOpType, Parent>::_init_probe_side(
HashTableType& hash_table_ctx, size_t probe_rows, bool with_other_join_conjuncts,
const uint8_t* null_map) {
_right_col_idx = _join_context->_is_right_semi_anti && !with_other_join_conjuncts
_right_col_idx = _is_right_semi_anti && !with_other_join_conjuncts
? 0
: _join_context->_left_table_data_types->size();
_right_col_len = _join_context->_right_table_data_types->size();
: _parent->left_table_data_types().size();
_right_col_len = _parent->right_table_data_types().size();
_row_count_from_last_probe = 0;
_build_block_rows.clear();
@ -177,24 +181,20 @@ HashTableType::State ProcessHashTableProbe<JoinOpType>::_init_probe_side(
_build_block_rows.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
_build_block_offsets.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
if (!*_join_context->_ready_probe) {
*_join_context->_ready_probe = true;
if (!_parent->_ready_probe) {
_parent->_ready_probe = true;
hash_table_ctx.reset();
hash_table_ctx.init_serialized_keys(*_join_context->_probe_columns,
_join_context->_probe_key_sz, probe_rows, null_map);
hash_table_ctx.init_serialized_keys(_parent->_probe_columns, _probe_key_sz, probe_rows,
null_map);
}
return typename HashTableType::State(*_join_context->_probe_columns,
_join_context->_probe_key_sz);
return typename HashTableType::State(_parent->_probe_columns, _probe_key_sz);
}
template <int JoinOpType>
template <int JoinOpType, typename Parent>
template <typename Mapped, bool with_other_join_conjuncts>
ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType>::_probe_row_match(int& current_offset,
int& probe_index,
size_t& probe_size,
bool& all_match_one) {
auto& probe_row_match_iter =
std::get<ForwardIterator<Mapped>>(*_join_context->_probe_row_match_iter);
ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, Parent>::_probe_row_match(
int& current_offset, int& probe_index, size_t& probe_size, bool& all_match_one) {
auto& probe_row_match_iter = std::get<ForwardIterator<Mapped>>(_parent->_probe_row_match_iter);
if (!probe_row_match_iter.ok()) {
return probe_row_match_iter;
}
@ -219,22 +219,24 @@ ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType>::_probe_row_match(int
return probe_row_match_iter;
}
template <int JoinOpType>
void ProcessHashTableProbe<JoinOpType>::_emplace_element(int8_t block_offset, int32_t block_row,
int& current_offset) {
template <int JoinOpType, typename Parent>
void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int8_t block_offset,
int32_t block_row,
int& current_offset) {
_build_block_offsets.emplace_back(block_offset);
_build_block_rows.emplace_back(block_row);
current_offset++;
}
template <int JoinOpType>
template <int JoinOpType, typename Parent>
template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType,
bool with_other_conjuncts, bool is_mark_join>
Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_ctx,
ConstNullMapPtr null_map,
MutableBlock& mutable_block,
Block* output_block, size_t probe_rows) {
auto& probe_index = *_join_context->_probe_index;
Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash_table_ctx,
ConstNullMapPtr null_map,
MutableBlock& mutable_block,
Block* output_block,
size_t probe_rows) {
auto& probe_index = _parent->_probe_index;
using KeyGetter = typename HashTableType::State;
using Mapped = typename HashTableType::Mapped;
@ -318,9 +320,8 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
(JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^ find_result.is_found();
if constexpr (is_mark_join) {
++current_offset;
bool null_result =
(need_null_map_for_probe && (*null_map)[probe_index]) ||
(!need_go_ahead && _join_context->_has_null_value_in_build_side);
bool null_result = (need_null_map_for_probe && (*null_map)[probe_index]) ||
(!need_go_ahead && *_has_null_in_build_side);
if (null_result) {
mark_column->insert_null();
} else {
@ -401,14 +402,13 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
probe_size = probe_index - last_probe_index + probe_row_match_iter.ok();
}
build_side_output_column(mcol, *_join_context->_right_output_slot_flags, current_offset,
with_other_conjuncts);
build_side_output_column(mcol, *_right_output_slot_flags, current_offset, with_other_conjuncts);
if constexpr (with_other_conjuncts || (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN &&
JoinOpType != TJoinOp::RIGHT_ANTI_JOIN)) {
RETURN_IF_CATCH_EXCEPTION(probe_side_output_column(
mcol, *_join_context->_left_output_slot_flags, current_offset, last_probe_index,
probe_size, all_match_one, with_other_conjuncts));
mcol, *_left_output_slot_flags, current_offset, last_probe_index, probe_size,
all_match_one, with_other_conjuncts));
}
output_block->swap(mutable_block.to_block());
@ -421,8 +421,8 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
return Status::OK();
}
template <int JoinOpType>
Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
template <int JoinOpType, typename Parent>
Status ProcessHashTableProbe<JoinOpType, Parent>::do_other_join_conjuncts(
Block* output_block, bool is_mark_join, int multi_matched_output_row_count,
bool is_the_last_sub_block) {
// dispose the other join conjunct exec
@ -431,14 +431,14 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
return Status::OK();
}
SCOPED_TIMER(_join_context->_process_other_join_conjunct_timer);
SCOPED_TIMER(_parent->_process_other_join_conjunct_timer);
int orig_columns = output_block->columns();
IColumn::Filter other_conjunct_filter(row_count, 1);
{
bool can_be_filter_all = false;
RETURN_IF_ERROR(VExprContext::execute_conjuncts(
*_join_context->_other_join_conjuncts, nullptr, output_block,
&other_conjunct_filter, &can_be_filter_all));
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_other_join_conjuncts, nullptr,
output_block, &other_conjunct_filter,
&can_be_filter_all));
}
auto filter_column = ColumnUInt8::create();
@ -465,7 +465,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
null_map_data, filter_map, output_block);
// This is the last sub block of splitted block, and no equal-conjuncts-matched tuple
// is output in all sub blocks, need to output a tuple for this probe row
if (is_the_last_sub_block && !*_join_context->_is_any_probe_match_row_output) {
if (is_the_last_sub_block && !_parent->_is_any_probe_match_row_output) {
filter_map[0] = true;
null_map_data[0] = true;
}
@ -514,7 +514,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
// It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row
if (multi_matched_output_row_count > 0) {
*_join_context->_is_any_probe_match_row_output = false;
_parent->_is_any_probe_match_row_output = false;
_process_splited_equal_matched_tuples(row_count - multi_matched_output_row_count,
multi_matched_output_row_count, filter_column_ptr,
null_map_data, filter_map, output_block);
@ -534,7 +534,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
size_t start_row_idx = 1;
// We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks
if (_row_count_from_last_probe > 0) {
if (*_join_context->_is_any_probe_match_row_output) {
if (_parent->_is_any_probe_match_row_output) {
// if any matched tuple for this probe row is output,
// ignore all the following tuples for this probe row.
for (int row_idx = 0; row_idx < _row_count_from_last_probe; ++row_idx) {
@ -563,14 +563,13 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
if (multi_matched_output_row_count > 0) {
// If a matched row is output, all the equal-matched tuples in
// the following sub blocks should be ignored
*_join_context->_is_any_probe_match_row_output = filter_map[row_count - 1];
} else if (_row_count_from_last_probe > 0 &&
!*_join_context->_is_any_probe_match_row_output) {
_parent->_is_any_probe_match_row_output = filter_map[row_count - 1];
} else if (_row_count_from_last_probe > 0 && !_parent->_is_any_probe_match_row_output) {
// We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks,
// and no matched tuple has been output in all previous run.
// If a tuple is output in this run, all the following mathced tuples should be ignored
if (filter_map[_row_count_from_last_probe - 1]) {
*_join_context->_is_any_probe_match_row_output = true;
_parent->_is_any_probe_match_row_output = true;
}
}
@ -609,7 +608,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
size_t start_row_idx = 1;
// We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks
if (_row_count_from_last_probe > 0 && *_join_context->_is_any_probe_match_row_output) {
if (_row_count_from_last_probe > 0 && _parent->_is_any_probe_match_row_output) {
// if any matched tuple for this probe row is output,
// ignore all the following tuples for this probe row.
for (int row_idx = 0; row_idx < _row_count_from_last_probe; ++row_idx) {
@ -658,15 +657,15 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
int end_row_idx = 0;
if (_row_count_from_last_probe > 0) {
end_row_idx = row_count - multi_matched_output_row_count;
if (!*_join_context->_is_any_probe_match_row_output) {
if (!_parent->_is_any_probe_match_row_output) {
// We are handling euqual-conjuncts matched tuples that are splitted into multiple blocks,
// and no matched tuple has been output in all previous run.
// If a tuple is output in this run, all the following mathced tuples should be ignored
if (filter_map[_row_count_from_last_probe - 1]) {
*_join_context->_is_any_probe_match_row_output = true;
_parent->_is_any_probe_match_row_output = true;
filter_map[_row_count_from_last_probe - 1] = false;
}
if (is_the_last_sub_block && !*_join_context->_is_any_probe_match_row_output) {
if (is_the_last_sub_block && !_parent->_is_any_probe_match_row_output) {
// This is the last sub block of splitted block, and no equal-conjuncts-matched tuple
// is output in all sub blocks, output a tuple for this probe row
filter_map[0] = true;
@ -676,7 +675,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
// It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row
// If a matched row is output, all the equal-matched tuples in
// the following sub blocks should be ignored
*_join_context->_is_any_probe_match_row_output = filter_map[row_count - 1];
_parent->_is_any_probe_match_row_output = filter_map[row_count - 1];
filter_map[row_count - 1] = false;
}
} else if (multi_matched_output_row_count > 0) {
@ -684,7 +683,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
// It contains the first sub block of splited equal-conjuncts-matched tuples of the current probe row
// If a matched row is output, all the equal-matched tuples in
// the following sub blocks should be ignored
*_join_context->_is_any_probe_match_row_output = filter_map[row_count - 1];
_parent->_is_any_probe_match_row_output = filter_map[row_count - 1];
filter_map[row_count - 1] = false;
} else {
end_row_idx = row_count;
@ -744,8 +743,8 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
// and when processing the last sub block, check whether there are any
// equal-conjuncts-matched tuple is output in all sub blocks,
// if not, just pick a tuple and output.
template <int JoinOpType>
void ProcessHashTableProbe<JoinOpType>::_process_splited_equal_matched_tuples(
template <int JoinOpType, typename Parent>
void ProcessHashTableProbe<JoinOpType, Parent>::_process_splited_equal_matched_tuples(
int start_row_idx, int row_count, const UInt8* __restrict other_hit_column,
UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block* output_block) {
int end_row_idx = start_row_idx + row_count;
@ -770,16 +769,15 @@ void ProcessHashTableProbe<JoinOpType>::_process_splited_equal_matched_tuples(
*_visited_map[i] |= other_hit;
}
}
*_join_context->_is_any_probe_match_row_output |=
_parent->_is_any_probe_match_row_output |=
simd::contain_byte(filter_map + start_row_idx, row_count, 1);
}
template <int JoinOpType>
template <int JoinOpType, typename Parent>
template <typename HashTableType>
Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableType& hash_table_ctx,
MutableBlock& mutable_block,
Block* output_block,
bool* eos) {
Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable(
HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block* output_block,
bool* eos) {
using Mapped = typename HashTableType::Mapped;
SCOPED_TIMER(_probe_process_hashtable_timer);
if constexpr (std::is_same_v<Mapped, RowRefListWithFlag> ||
@ -787,16 +785,15 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
hash_table_ctx.init_iterator();
auto& mcol = mutable_block.mutable_columns();
bool right_semi_anti_without_other =
_join_context->_is_right_semi_anti && !_join_context->_have_other_join_conjunct;
bool right_semi_anti_without_other = _is_right_semi_anti && !_have_other_join_conjunct;
int right_col_idx =
right_semi_anti_without_other ? 0 : _join_context->_left_table_data_types->size();
int right_col_len = _join_context->_right_table_data_types->size();
right_semi_anti_without_other ? 0 : _parent->left_table_data_types().size();
int right_col_len = _parent->right_table_data_types().size();
auto& iter = hash_table_ctx.iterator;
auto block_size = 0;
auto& visited_iter =
std::get<ForwardIterator<Mapped>>(*_join_context->_outer_join_pull_visited_iter);
std::get<ForwardIterator<Mapped>>(_parent->_outer_join_pull_visited_iter);
_build_blocks_locs.resize(_batch_size);
auto register_build_loc = [&](int8_t offset, int32_t row_nums) {
_build_blocks_locs[block_size++] = std::pair<int8_t, int>(offset, row_nums);
@ -874,13 +871,13 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
auto insert_build_rows = [&](int8_t offset) {
for (size_t j = 0; j < right_col_len; ++j) {
auto& column = *_build_blocks[offset].get_by_position(j).column;
auto& column = *(*_build_blocks)[offset].get_by_position(j).column;
mcol[j + right_col_idx]->insert_indices_from(
column, _build_block_rows.data(),
_build_block_rows.data() + _build_block_rows.size());
}
};
if (_build_blocks.size() > 1) {
if (_build_blocks->size() > 1) {
std::sort(_build_blocks_locs.begin(), _build_blocks_locs.end(),
[](const auto a, const auto b) { return a.first > b.first; });
auto start = 0, end = 0;
@ -897,7 +894,7 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
start = end;
insert_build_rows(offset);
}
} else if (_build_blocks.size() == 1) {
} else if (_build_blocks->size() == 1) {
const auto size = _build_blocks_locs.size();
_build_block_rows.resize(_build_blocks_locs.size());
for (int i = 0; i < size; i++) {
@ -907,7 +904,7 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
}
// just resize the left table column in case with other conjunct to make block size is not zero
if (_join_context->_is_right_semi_anti && _join_context->_have_other_join_conjunct) {
if (_is_right_semi_anti && _have_other_join_conjunct) {
auto target_size = mcol[right_col_idx]->size();
for (int i = 0; i < right_col_idx; ++i) {
mcol[i]->resize(target_size);
@ -933,13 +930,11 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
}
}
template <int JoinOpType>
template <int JoinOpType, typename Parent>
template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType>
Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
ConstNullMapPtr null_map,
MutableBlock& mutable_block, Block* output_block,
size_t probe_rows, bool is_mark_join,
bool have_other_join_conjunct) {
Status ProcessHashTableProbe<JoinOpType, Parent>::process(
HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block,
Block* output_block, size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct) {
Status res;
if constexpr (!std::is_same_v<typename HashTableType::Mapped, RowRefListWithFlags>) {
if (have_other_join_conjunct) {
@ -973,74 +968,78 @@ struct ExtractType<T(U)> {
using Type = U;
};
#define INSTANTIATION(JoinOpType, T) \
template Status \
ProcessHashTableProbe<JoinOpType>::process<false, false, ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \
MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \
bool is_mark_join, bool have_other_join_conjunct); \
template Status \
ProcessHashTableProbe<JoinOpType>::process<false, true, ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \
MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \
bool is_mark_join, bool have_other_join_conjunct); \
template Status \
ProcessHashTableProbe<JoinOpType>::process<true, false, ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \
MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \
bool is_mark_join, bool have_other_join_conjunct); \
template Status \
ProcessHashTableProbe<JoinOpType>::process<true, true, ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \
MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \
bool is_mark_join, bool have_other_join_conjunct); \
\
template Status \
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, MutableBlock & mutable_block, \
Block * output_block, bool* eos)
#define INSTANTIATION(JoinOpType, Parent, T) \
template Status \
ProcessHashTableProbe<JoinOpType, Parent>::process<false, false, ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \
MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \
bool is_mark_join, bool have_other_join_conjunct); \
template Status \
ProcessHashTableProbe<JoinOpType, Parent>::process<false, true, ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \
MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \
bool is_mark_join, bool have_other_join_conjunct); \
template Status \
ProcessHashTableProbe<JoinOpType, Parent>::process<true, false, ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \
MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \
bool is_mark_join, bool have_other_join_conjunct); \
template Status \
ProcessHashTableProbe<JoinOpType, Parent>::process<true, true, ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \
MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \
bool is_mark_join, bool have_other_join_conjunct); \
\
template Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable< \
ExtractType<void(T)>::Type>(ExtractType<void(T)>::Type & hash_table_ctx, \
MutableBlock & mutable_block, Block * output_block, \
bool* eos)
#define INSTANTIATION_FOR(JoinOpType) \
template struct ProcessHashTableProbe<JoinOpType>; \
\
INSTANTIATION(JoinOpType, (SerializedHashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, (I8HashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, (I16HashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, (I32HashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, (I64HashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, (I128HashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, (I256HashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true, RowRefList>)); \
INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false, RowRefList>)); \
INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true, RowRefList>)); \
INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false, RowRefList>)); \
INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true, RowRefList>)); \
INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false, RowRefList>)); \
INSTANTIATION(JoinOpType, (SerializedHashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (I8HashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (I16HashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (I32HashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (I64HashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (I128HashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (I256HashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true, RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false, RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true, RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false, RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true, RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false, RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, (SerializedHashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, (I8HashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, (I16HashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, (I32HashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, (I64HashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, (I128HashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, (I256HashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true, RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false, RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true, RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false, RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true, RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false, RowRefListWithFlags>))
#define INSTANTIATION_FOR1(JoinOpType, Parent) \
template struct ProcessHashTableProbe<JoinOpType, Parent>; \
\
INSTANTIATION(JoinOpType, Parent, (SerializedHashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (I8HashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (I16HashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (I32HashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (I64HashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (I128HashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (I256HashTableContext<RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<true, RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<false, RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<true, RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<false, RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<true, RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<false, RowRefList>)); \
INSTANTIATION(JoinOpType, Parent, (SerializedHashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (I8HashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (I16HashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (I32HashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (I64HashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (I128HashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (I256HashTableContext<RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<true, RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<false, RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<true, RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<false, RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<true, RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<false, RowRefListWithFlag>)); \
INSTANTIATION(JoinOpType, Parent, (SerializedHashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, Parent, (I8HashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, Parent, (I16HashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, Parent, (I32HashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, Parent, (I64HashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, Parent, (I128HashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, Parent, (I256HashTableContext<RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<true, RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<false, RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<true, RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<false, RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<true, RowRefListWithFlags>)); \
INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<false, RowRefListWithFlags>))
#define INSTANTIATION_FOR(JoinOpType) \
INSTANTIATION_FOR1(JoinOpType, HashJoinNode); \
INSTANTIATION_FOR1(JoinOpType, pipeline::HashJoinProbeLocalState)
} // namespace doris::vectorized

View File

@ -84,120 +84,6 @@ template Status HashJoinNode::_extract_join_column<false>(
std::vector<IColumn const*, std::allocator<IColumn const*>>&,
std::vector<int, std::allocator<int>> const&);
RuntimeFilterContext::RuntimeFilterContext(HashJoinNode* join_node)
: _runtime_filter_descs(join_node->_runtime_filter_descs),
_runtime_filter_slots(join_node->_runtime_filter_slots),
_build_expr_ctxs(join_node->_build_expr_ctxs),
_build_rf_cardinality(join_node->_build_rf_cardinality),
_inserted_rows(join_node->_inserted_rows),
_push_down_timer(join_node->_push_down_timer),
_push_compute_timer(join_node->_push_compute_timer) {}
RuntimeFilterContext::RuntimeFilterContext(pipeline::HashJoinBuildSinkLocalState* local_state)
: _runtime_filter_descs(local_state->join_build()->_runtime_filter_descs),
_runtime_filter_slots(local_state->_runtime_filter_slots),
_build_expr_ctxs(local_state->_build_expr_ctxs),
_build_rf_cardinality(local_state->_build_rf_cardinality),
_inserted_rows(local_state->_inserted_rows),
_push_down_timer(local_state->_push_down_timer),
_push_compute_timer(local_state->_push_compute_timer) {}
HashJoinProbeContext::HashJoinProbeContext(HashJoinNode* join_node)
: _have_other_join_conjunct(join_node->_have_other_join_conjunct),
_is_right_semi_anti(join_node->_is_right_semi_anti),
_is_outer_join(join_node->_is_outer_join),
_tuple_is_null_left_flag_column(&join_node->_tuple_is_null_left_flag_column),
_tuple_is_null_right_flag_column(&join_node->_tuple_is_null_right_flag_column),
_other_join_conjuncts(&join_node->_other_join_conjuncts),
_right_table_data_types(&join_node->_right_table_data_types),
_left_table_data_types(&join_node->_left_table_data_types),
_search_hashtable_timer(join_node->_search_hashtable_timer),
_build_side_output_timer(join_node->_build_side_output_timer),
_probe_side_output_timer(join_node->_probe_side_output_timer),
_probe_process_hashtable_timer(join_node->_probe_process_hashtable_timer),
_process_other_join_conjunct_timer(join_node->_process_other_join_conjunct_timer),
_rows_returned_counter(join_node->_rows_returned_counter),
_probe_arena_memory_usage(join_node->_probe_arena_memory_usage),
_arena(join_node->_arena),
_outer_join_pull_visited_iter(&join_node->_outer_join_pull_visited_iter),
_probe_row_match_iter(&join_node->_probe_row_match_iter),
_build_blocks(join_node->_build_blocks),
_probe_block(&join_node->_probe_block),
_probe_columns(&join_node->_probe_columns),
_probe_index(&join_node->_probe_index),
_ready_probe(&join_node->_ready_probe),
_probe_key_sz(join_node->_probe_key_sz),
_left_output_slot_flags(&join_node->_left_output_slot_flags),
_right_output_slot_flags(&join_node->_right_output_slot_flags),
_is_any_probe_match_row_output(&join_node->_is_any_probe_match_row_output),
_has_null_value_in_build_side(join_node->_has_null_in_build_side) {}
HashJoinProbeContext::HashJoinProbeContext(pipeline::HashJoinProbeLocalState* local_state)
: _have_other_join_conjunct(local_state->join_probe()->_have_other_join_conjunct),
_is_right_semi_anti(local_state->join_probe()->_is_right_semi_anti),
_is_outer_join(local_state->join_probe()->_is_outer_join),
_tuple_is_null_left_flag_column(&local_state->_tuple_is_null_left_flag_column),
_tuple_is_null_right_flag_column(&local_state->_tuple_is_null_right_flag_column),
_other_join_conjuncts(&local_state->_other_join_conjuncts),
_right_table_data_types(&local_state->join_probe()->_right_table_data_types),
_left_table_data_types(&local_state->join_probe()->_left_table_data_types),
_search_hashtable_timer(local_state->_search_hashtable_timer),
_build_side_output_timer(local_state->_build_side_output_timer),
_probe_side_output_timer(local_state->_probe_side_output_timer),
_probe_process_hashtable_timer(local_state->_probe_process_hashtable_timer),
_process_other_join_conjunct_timer(local_state->_process_other_join_conjunct_timer),
_rows_returned_counter(local_state->_rows_returned_counter),
_probe_arena_memory_usage(local_state->_probe_arena_memory_usage),
_arena(local_state->_shared_state->arena),
_outer_join_pull_visited_iter(&local_state->_shared_state->outer_join_pull_visited_iter),
_probe_row_match_iter(&local_state->_shared_state->probe_row_match_iter),
_build_blocks(local_state->_shared_state->build_blocks),
_probe_block(&local_state->_probe_block),
_probe_columns(&local_state->_probe_columns),
_probe_index(&local_state->_probe_index),
_ready_probe(&local_state->_ready_probe),
_probe_key_sz(local_state->_shared_state->probe_key_sz),
_left_output_slot_flags(&local_state->join_probe()->_left_output_slot_flags),
_right_output_slot_flags(&local_state->join_probe()->_right_output_slot_flags),
_is_any_probe_match_row_output(&local_state->_is_any_probe_match_row_output),
_has_null_value_in_build_side(local_state->_shared_state->_has_null_in_build_side) {}
HashJoinBuildContext::HashJoinBuildContext(HashJoinNode* join_node)
: _hash_table_memory_usage(join_node->_hash_table_memory_usage),
_build_buckets_counter(join_node->_build_buckets_counter),
_build_collisions_counter(join_node->_build_collisions_counter),
_build_buckets_fill_counter(join_node->_build_buckets_fill_counter),
_build_table_insert_timer(join_node->_build_table_insert_timer),
_build_table_expanse_timer(join_node->_build_table_expanse_timer),
_build_table_convert_timer(join_node->_build_table_convert_timer),
_build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer),
_build_arena_memory_usage(join_node->_build_arena_memory_usage),
_profile(join_node->runtime_profile()),
_build_key_sz(join_node->_build_key_sz),
_build_unique(join_node->_build_unique),
_runtime_filter_descs(join_node->_runtime_filter_descs),
_inserted_rows(join_node->_inserted_rows),
_arena(join_node->_arena),
_build_rf_cardinality(join_node->_build_rf_cardinality) {}
HashJoinBuildContext::HashJoinBuildContext(pipeline::HashJoinBuildSinkLocalState* local_state)
: _hash_table_memory_usage(local_state->_hash_table_memory_usage),
_build_buckets_counter(local_state->_build_buckets_counter),
_build_collisions_counter(local_state->_build_collisions_counter),
_build_buckets_fill_counter(local_state->_build_buckets_fill_counter),
_build_table_insert_timer(local_state->_build_table_insert_timer),
_build_table_expanse_timer(local_state->_build_table_expanse_timer),
_build_table_convert_timer(local_state->_build_table_convert_timer),
_build_side_compute_hash_timer(local_state->_build_side_compute_hash_timer),
_build_arena_memory_usage(local_state->_build_arena_memory_usage),
_profile(local_state->profile()),
_build_key_sz(local_state->join_build()->_build_key_sz),
_build_unique(local_state->join_build()->_build_unique),
_runtime_filter_descs(local_state->join_build()->_runtime_filter_descs),
_inserted_rows(local_state->_inserted_rows),
_arena(local_state->_shared_state->arena),
_build_rf_cardinality(local_state->_build_rf_cardinality) {}
HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: VJoinNodeBase(pool, tnode, descs),
_is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join &&
@ -896,11 +782,8 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
__builtin_unreachable();
},
[&](auto&& arg) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
RuntimeFilterContext context(this);
ProcessRuntimeFilterBuild<HashTableCtxType>
runtime_filter_build_process(&context);
return runtime_filter_build_process(state, arg);
ProcessRuntimeFilterBuild runtime_filter_build_process;
return runtime_filter_build_process(state, arg, this);
}},
*_hash_table_variants);
if (!ret.ok()) {
@ -1119,10 +1002,9 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
[&](auto&& arg, auto has_null_value,
auto short_circuit_for_null_in_build_side) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
HashJoinBuildContext context(this);
ProcessHashTableBuild<HashTableCtxType> hash_table_build_process(
rows, block, raw_ptrs, &context, state->batch_size(), offset,
state);
ProcessHashTableBuild<HashTableCtxType, HashJoinNode>
hash_table_build_process(rows, block, raw_ptrs, this,
state->batch_size(), offset, state);
return hash_table_build_process
.template run<has_null_value, short_circuit_for_null_in_build_side>(
arg,
@ -1277,9 +1159,9 @@ void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) {
std::visit(
[&](auto&& join_op_variants) {
using JoinOpType = std::decay_t<decltype(join_op_variants)>;
_probe_context.reset(new HashJoinProbeContext(this));
_process_hashtable_ctx_variants->emplace<ProcessHashTableProbe<JoinOpType::value>>(
_probe_context.get(), state->batch_size());
_process_hashtable_ctx_variants
->emplace<ProcessHashTableProbe<JoinOpType::value, HashJoinNode>>(
this, state->batch_size());
},
_join_op_variants);
}

View File

@ -69,99 +69,52 @@ constexpr size_t CHECK_FRECUENCY = 65536;
struct UInt128;
struct UInt256;
template <int JoinOpType>
template <int JoinOpType, typename Parent>
struct ProcessHashTableProbe;
class HashJoinNode;
struct RuntimeFilterContext {
RuntimeFilterContext(HashJoinNode* join_node);
RuntimeFilterContext(pipeline::HashJoinBuildSinkLocalState* local_state);
std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
std::shared_ptr<VRuntimeFilterSlots>& _runtime_filter_slots;
VExprContextSPtrs& _build_expr_ctxs;
size_t& _build_rf_cardinality;
std::unordered_map<const Block*, std::vector<int>>& _inserted_rows;
RuntimeProfile::Counter* _push_down_timer;
RuntimeProfile::Counter* _push_compute_timer;
};
template <class HashTableContext>
struct ProcessRuntimeFilterBuild {
ProcessRuntimeFilterBuild(RuntimeFilterContext* context) : _context(context) {}
Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx) {
if (_context->_runtime_filter_descs.empty()) {
template <class HashTableContext, typename Parent>
Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx, Parent* parent) {
if (parent->runtime_filter_descs().empty()) {
return Status::OK();
}
_context->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
_context->_build_expr_ctxs, _context->_runtime_filter_descs);
parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
parent->_build_expr_ctxs, parent->runtime_filter_descs());
RETURN_IF_ERROR(_context->_runtime_filter_slots->init(
state, hash_table_ctx.hash_table->size(), _context->_build_rf_cardinality));
RETURN_IF_ERROR(parent->_runtime_filter_slots->init(
state, hash_table_ctx.hash_table->size(), parent->_build_rf_cardinality));
if (!_context->_runtime_filter_slots->empty() && !_context->_inserted_rows.empty()) {
if (!parent->_runtime_filter_slots->empty() && !parent->_inserted_rows.empty()) {
{
SCOPED_TIMER(_context->_push_compute_timer);
_context->_runtime_filter_slots->insert(_context->_inserted_rows);
SCOPED_TIMER(parent->_push_compute_timer);
parent->_runtime_filter_slots->insert(parent->_inserted_rows);
}
}
{
SCOPED_TIMER(_context->_push_down_timer);
RETURN_IF_ERROR(_context->_runtime_filter_slots->publish());
SCOPED_TIMER(parent->_push_down_timer);
RETURN_IF_ERROR(parent->_runtime_filter_slots->publish());
}
return Status::OK();
}
private:
RuntimeFilterContext* _context;
};
struct HashJoinBuildContext {
HashJoinBuildContext(HashJoinNode* join_node);
HashJoinBuildContext(pipeline::HashJoinBuildSinkLocalState* local_state);
void add_hash_buckets_info(const std::string& info) const {
_profile->add_info_string("HashTableBuckets", info);
}
void add_hash_buckets_filled_info(const std::string& info) const {
_profile->add_info_string("HashTableFilledBuckets", info);
}
RuntimeProfile::Counter* _hash_table_memory_usage;
RuntimeProfile::Counter* _build_buckets_counter;
RuntimeProfile::Counter* _build_collisions_counter;
RuntimeProfile::Counter* _build_buckets_fill_counter;
RuntimeProfile::Counter* _build_table_insert_timer;
RuntimeProfile::Counter* _build_table_expanse_timer;
RuntimeProfile::Counter* _build_table_convert_timer;
RuntimeProfile::Counter* _build_side_compute_hash_timer;
RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage;
RuntimeProfile* _profile;
Sizes& _build_key_sz;
bool& _build_unique;
std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
std::unordered_map<const Block*, std::vector<int>>& _inserted_rows;
std::shared_ptr<Arena>& _arena;
size_t& _build_rf_cardinality;
};
using ProfileCounter = RuntimeProfile::Counter;
template <class HashTableContext>
template <class HashTableContext, typename Parent>
struct ProcessHashTableBuild {
ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs,
HashJoinBuildContext* join_context, int batch_size, uint8_t offset,
RuntimeState* state)
Parent* parent, int batch_size, uint8_t offset, RuntimeState* state)
: _rows(rows),
_skip_rows(0),
_acquired_block(acquired_block),
_build_raw_ptrs(build_raw_ptrs),
_join_context(join_context),
_parent(parent),
_batch_size(batch_size),
_offset(offset),
_state(state),
_build_side_compute_hash_timer(join_context->_build_side_compute_hash_timer) {}
_build_side_compute_hash_timer(parent->_build_side_compute_hash_timer) {}
template <bool ignore_null, bool short_circuit_for_null>
Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) {
@ -172,30 +125,30 @@ struct ProcessHashTableBuild {
int64_t bucket_size = hash_table_ctx.hash_table->get_buffer_size_in_cells();
int64_t filled_bucket_size = hash_table_ctx.hash_table->size();
int64_t bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes();
COUNTER_SET(_join_context->_hash_table_memory_usage, bucket_bytes);
COUNTER_SET(_join_context->_build_buckets_counter, bucket_size);
COUNTER_SET(_join_context->_build_collisions_counter,
COUNTER_SET(_parent->_hash_table_memory_usage, bucket_bytes);
COUNTER_SET(_parent->_build_buckets_counter, bucket_size);
COUNTER_SET(_parent->_build_collisions_counter,
hash_table_ctx.hash_table->get_collisions());
COUNTER_SET(_join_context->_build_buckets_fill_counter, filled_bucket_size);
COUNTER_SET(_parent->_build_buckets_fill_counter, filled_bucket_size);
auto hash_table_buckets = hash_table_ctx.hash_table->get_buffer_sizes_in_cells();
std::string hash_table_buckets_info;
for (auto bucket_count : hash_table_buckets) {
hash_table_buckets_info += std::to_string(bucket_count) + ", ";
}
_join_context->add_hash_buckets_info(hash_table_buckets_info);
_parent->add_hash_buckets_info(hash_table_buckets_info);
auto hash_table_sizes = hash_table_ctx.hash_table->sizes();
hash_table_buckets_info.clear();
for (auto table_size : hash_table_sizes) {
hash_table_buckets_info += std::to_string(table_size) + ", ";
}
_join_context->add_hash_buckets_filled_info(hash_table_buckets_info);
_parent->add_hash_buckets_filled_info(hash_table_buckets_info);
}};
KeyGetter key_getter(_build_raw_ptrs, _join_context->_build_key_sz);
KeyGetter key_getter(_build_raw_ptrs, _parent->build_key_sz());
SCOPED_TIMER(_join_context->_build_table_insert_timer);
SCOPED_TIMER(_parent->_build_table_insert_timer);
hash_table_ctx.hash_table->reset_resize_timer();
// only not build_unique, we need expanse hash table before insert data
@ -204,21 +157,21 @@ struct ProcessHashTableBuild {
// 2. There are many duplicate keys, and the hash table filled bucket is far less than
// the hash table build bucket, which may waste a lot of memory.
// TODO, use the NDV expansion of the key column in the optimizer statistics
if (!_join_context->_build_unique) {
if (!_parent->build_unique()) {
RETURN_IF_CATCH_EXCEPTION(hash_table_ctx.hash_table->expanse_for_add_elem(
std::min<int>(_rows, config::hash_table_pre_expanse_max_rows)));
}
vector<int>& inserted_rows = _join_context->_inserted_rows[&_acquired_block];
bool has_runtime_filter = !_join_context->_runtime_filter_descs.empty();
vector<int>& inserted_rows = _parent->_inserted_rows[&_acquired_block];
bool has_runtime_filter = !_parent->runtime_filter_descs().empty();
if (has_runtime_filter) {
inserted_rows.reserve(_batch_size);
}
hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _join_context->_build_key_sz, _rows,
hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _parent->build_key_sz(), _rows,
null_map ? null_map->data() : nullptr);
auto& arena = *(_join_context->_arena);
auto& arena = *_parent->arena();
auto old_build_arena_memory = arena.size();
size_t k = 0;
@ -229,7 +182,7 @@ struct ProcessHashTableBuild {
ctor(key, Mapped {k, _offset});
};
bool build_unique = _join_context->_build_unique;
bool build_unique = _parent->build_unique();
#define EMPLACE_IMPL(stmt) \
for (; k < _rows; ++k) { \
if (k % CHECK_FRECUENCY == 0) { \
@ -258,21 +211,21 @@ struct ProcessHashTableBuild {
} else if (has_runtime_filter && !build_unique) {
EMPLACE_IMPL(
if (inserted) { inserted_rows.push_back(k); } else {
mapped.insert({k, _offset}, *(_join_context->_arena));
mapped.insert({k, _offset}, *_parent->arena());
inserted_rows.push_back(k);
});
} else if (!has_runtime_filter && build_unique) {
EMPLACE_IMPL(if (!inserted) { _skip_rows++; });
} else {
EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset}, *(_join_context->_arena)); });
EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset}, *_parent->arena()); });
}
_join_context->_build_rf_cardinality += inserted_rows.size();
_parent->_build_rf_cardinality += inserted_rows.size();
_join_context->_build_arena_memory_usage->add(arena.size() - old_build_arena_memory);
_parent->_build_arena_memory_usage->add(arena.size() - old_build_arena_memory);
COUNTER_UPDATE(_join_context->_build_table_expanse_timer,
COUNTER_UPDATE(_parent->_build_table_expanse_timer,
hash_table_ctx.hash_table->get_resize_timer_value());
COUNTER_UPDATE(_join_context->_build_table_convert_timer,
COUNTER_UPDATE(_parent->_build_table_convert_timer,
hash_table_ctx.hash_table->get_convert_timer_value());
return Status::OK();
@ -283,7 +236,7 @@ private:
int _skip_rows;
Block& _acquired_block;
ColumnRawPtrs& _build_raw_ptrs;
HashJoinBuildContext* _join_context;
Parent* _parent;
int _batch_size;
uint8_t _offset;
RuntimeState* _state;
@ -348,16 +301,16 @@ using HashTableVariants = std::variant<
class VExprContext;
using HashTableCtxVariants =
std::variant<std::monostate, ProcessHashTableProbe<TJoinOp::INNER_JOIN>,
ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN>,
ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN>,
ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN>,
ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN>,
ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN>,
ProcessHashTableProbe<TJoinOp::CROSS_JOIN>,
ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN>,
ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>,
ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>>;
std::variant<std::monostate, ProcessHashTableProbe<TJoinOp::INNER_JOIN, HashJoinNode>,
ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN, HashJoinNode>,
ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN, HashJoinNode>,
ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN, HashJoinNode>,
ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN, HashJoinNode>,
ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN, HashJoinNode>,
ProcessHashTableProbe<TJoinOp::CROSS_JOIN, HashJoinNode>,
ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN, HashJoinNode>,
ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN, HashJoinNode>,
ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN, HashJoinNode>>;
using HashTableIteratorVariants =
std::variant<std::monostate, ForwardIterator<RowRefList>,
@ -365,52 +318,6 @@ using HashTableIteratorVariants =
static constexpr auto HASH_JOIN_MAX_BUILD_BLOCK_COUNT = 128;
struct HashJoinProbeContext {
HashJoinProbeContext(HashJoinNode* join_node);
HashJoinProbeContext(pipeline::HashJoinProbeLocalState* local_state);
bool _have_other_join_conjunct;
const bool _is_right_semi_anti;
const bool _is_outer_join;
MutableColumnPtr* _tuple_is_null_left_flag_column;
MutableColumnPtr* _tuple_is_null_right_flag_column;
// other expr
VExprContextSPtrs* _other_join_conjuncts;
DataTypes* _right_table_data_types;
DataTypes* _left_table_data_types;
RuntimeProfile::Counter* _search_hashtable_timer;
RuntimeProfile::Counter* _build_side_output_timer;
RuntimeProfile::Counter* _probe_side_output_timer;
RuntimeProfile::Counter* _probe_process_hashtable_timer;
RuntimeProfile::Counter* _process_other_join_conjunct_timer;
RuntimeProfile::Counter* _rows_returned_counter;
RuntimeProfile::HighWaterMarkCounter* _probe_arena_memory_usage;
std::shared_ptr<Arena> _arena;
// for full/right outer join
HashTableIteratorVariants* _outer_join_pull_visited_iter;
HashTableIteratorVariants* _probe_row_match_iter;
std::shared_ptr<std::vector<Block>> _build_blocks;
Block* _probe_block;
ColumnRawPtrs* _probe_columns;
int* _probe_index;
bool* _ready_probe;
Sizes _probe_key_sz;
std::vector<bool>* _left_output_slot_flags;
std::vector<bool>* _right_output_slot_flags;
// for cases when a probe row matches more than batch size build rows.
bool* _is_any_probe_match_row_output;
bool _has_null_value_in_build_side {};
};
class HashJoinNode final : public VJoinNodeBase {
public:
HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
@ -450,13 +357,27 @@ public:
return _runtime_filter_slots->ready_finish_publish();
}
bool have_other_join_conjunct() const { return _have_other_join_conjunct; }
bool is_right_semi_anti() const { return _is_right_semi_anti; }
bool is_outer_join() const { return _is_outer_join; }
std::shared_ptr<std::vector<Block>> build_blocks() const { return _build_blocks; }
Sizes probe_key_sz() const { return _probe_key_sz; }
std::vector<bool>* left_output_slot_flags() { return &_left_output_slot_flags; }
std::vector<bool>* right_output_slot_flags() { return &_right_output_slot_flags; }
bool* has_null_in_build_side() { return &_has_null_in_build_side; }
DataTypes right_table_data_types() { return _right_table_data_types; }
DataTypes left_table_data_types() { return _left_table_data_types; }
vectorized::Sizes& build_key_sz() { return _build_key_sz; }
bool build_unique() const { return _build_unique; }
std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return _runtime_filter_descs; }
std::shared_ptr<vectorized::Arena> arena() { return _arena; }
protected:
void _probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) override;
private:
friend struct HashJoinProbeContext;
friend struct HashJoinBuildContext;
friend struct RuntimeFilterContext;
template <int JoinOpType, typename Parent>
friend struct ProcessHashTableProbe;
void _init_short_circuit_for_probe() override {
_short_circuit_for_probe =
@ -605,13 +526,12 @@ private:
bool* eos, Block* temp_block,
bool check_rows_count = true);
template <class HashTableContext>
template <class HashTableContext, typename Parent>
friend struct ProcessHashTableBuild;
template <int JoinOpType>
template <int JoinOpType, typename Parent>
friend struct ProcessHashTableProbe;
template <class HashTableContext>
friend struct ProcessRuntimeFilterBuild;
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
@ -620,8 +540,6 @@ private:
std::vector<IRuntimeFilter*> _runtime_filters;
size_t _build_rf_cardinality = 0;
std::atomic_bool _probe_open_finish = false;
std::unique_ptr<HashJoinProbeContext> _probe_context;
};
} // namespace vectorized
} // namespace doris

View File

@ -105,7 +105,7 @@ protected:
TJoinOp::type _join_op;
JoinOpVariants _join_op_variants;
bool _have_other_join_conjunct;
const bool _have_other_join_conjunct;
const bool _match_all_probe; // output all rows coming from the probe input. Full/Left Join
const bool _match_all_build; // output all rows coming from the build input. Full/Right Join
bool _build_unique; // build a hash table without duplicated rows. Left semi/anti Join

View File

@ -54,6 +54,7 @@ struct SharedRuntimeFilterContext {
struct SharedHashTableContext {
SharedHashTableContext()
: hash_table_variants(nullptr),
blocks(new std::vector<vectorized::Block>()),
signaled(false),
short_circuit_for_null_in_probe_side(false) {}