[Improvement](runtime-filter) build runtime filter before build hash table on join build probe (#29727)

build runtime filter before build hash table on join build probe
This commit is contained in:
Pxl
2024-01-11 10:45:56 +08:00
committed by yiguolei
parent 58f8994f5d
commit 33b8311d5f
5 changed files with 40 additions and 82 deletions

View File

@ -57,7 +57,7 @@ public:
throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, filter_id={}",
filter_id);
}
for (auto filter : filters) {
for (auto* filter : filters) {
filter->set_ignored("");
filter->signal();
}
@ -166,7 +166,7 @@ public:
return Status::OK();
}
void insert(const std::unordered_set<const vectorized::Block*>& datas) {
void insert(const vectorized::Block* block) {
for (int i = 0; i < _build_expr_context.size(); ++i) {
auto iter = _runtime_filters.find(i);
if (iter == _runtime_filters.end()) {
@ -174,18 +174,16 @@ public:
}
int result_column_id = _build_expr_context[i]->get_last_result_column_id();
for (const auto* it : datas) {
auto column = it->get_by_position(result_column_id).column;
for (auto* filter : iter->second) {
filter->insert_batch(column, 1);
}
const auto& column = block->get_by_position(result_column_id).column;
for (auto* filter : iter->second) {
filter->insert_batch(column, 1);
}
}
}
bool ready_finish_publish() {
for (auto& pair : _runtime_filters) {
for (auto filter : pair.second) {
for (auto* filter : pair.second) {
if (!filter->is_finish_rpc()) {
return false;
}
@ -196,7 +194,7 @@ public:
void finish_publish() {
for (auto& pair : _runtime_filters) {
for (auto filter : pair.second) {
for (auto* filter : pair.second) {
static_cast<void>(filter->join_rpc());
}
}

View File

@ -484,29 +484,13 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._build_side_mutable_block.to_block());
COUNTER_UPDATE(local_state._build_blocks_memory_usage,
(*local_state._shared_state->build_block).bytes());
RETURN_IF_ERROR(
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
const bool use_global_rf =
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf;
auto ret = std::visit(
Overload {[&](std::monostate&) -> Status {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
},
[&](auto&& arg) -> Status {
vectorized::ProcessRuntimeFilterBuild runtime_filter_build_process;
return runtime_filter_build_process(state, arg, &local_state,
use_global_rf);
}},
*local_state._shared_state->hash_table_variants);
if (!ret.ok()) {
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = ret;
_shared_hashtable_controller->signal(node_id());
}
return ret;
}
RETURN_IF_ERROR(vectorized::process_runtime_filter_build(
state, local_state._shared_state->build_block.get(), &local_state, use_global_rf));
RETURN_IF_ERROR(
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
// arena will be shared with other instances.

View File

@ -94,7 +94,10 @@ protected:
friend class HashJoinBuildSinkOperatorX;
template <class HashTableContext, typename Parent>
friend struct vectorized::ProcessHashTableBuild;
friend struct vectorized::ProcessRuntimeFilterBuild;
template <typename Parent>
friend Status vectorized::process_runtime_filter_build(RuntimeState* state,
vectorized::Block* block, Parent* parent,
bool is_global);
// build expr
vectorized::VExprContextSPtrs _build_expr_ctxs;
@ -107,7 +110,6 @@ protected:
std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots;
bool _has_set_need_null_map_for_build = false;
bool _build_side_ignore_null = false;
std::unordered_set<const vectorized::Block*> _inserted_blocks;
std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency;
std::vector<int> _build_col_ids;

View File

@ -662,7 +662,7 @@ Status HashJoinNode::alloc_resource(doris::RuntimeState* state) {
SCOPED_TIMER(_allocate_resource_timer);
RETURN_IF_ERROR(VJoinNodeBase::alloc_resource(state));
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
if (auto bf = _runtime_filters[i]->get_bloomfilter()) {
if (auto* bf = _runtime_filters[i]->get_bloomfilter()) {
RETURN_IF_ERROR(bf->init_with_fixed_length());
}
}
@ -751,23 +751,8 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
DCHECK(!_build_side_mutable_block.empty());
_build_block = std::make_shared<Block>(_build_side_mutable_block.to_block());
COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes());
RETURN_IF_ERROR(process_runtime_filter_build(state, _build_block.get(), this));
RETURN_IF_ERROR(_process_build_block(state, *_build_block));
auto ret = std::visit(Overload {[&](std::monostate&) -> Status {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
},
[&](auto&& arg) -> Status {
ProcessRuntimeFilterBuild runtime_filter_build_process;
return runtime_filter_build_process(state, arg, this);
}},
*_hash_table_variants);
if (!ret.ok()) {
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = ret;
_shared_hashtable_controller->signal(id());
}
return ret;
}
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
// arena will be shared with other instances.
@ -949,9 +934,6 @@ void HashJoinNode::_set_build_ignore_flag(Block& block, const std::vector<int>&
Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) {
SCOPED_TIMER(_build_table_timer);
size_t rows = block.rows();
if (UNLIKELY(rows == 0)) {
return Status::OK();
}
COUNTER_UPDATE(_build_rows_counter, rows);
ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());

View File

@ -74,33 +74,28 @@ template <int JoinOpType, typename Parent>
struct ProcessHashTableProbe;
class HashJoinNode;
struct ProcessRuntimeFilterBuild {
template <class HashTableContext, typename Parent>
Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx, Parent* parent,
bool is_global = false) {
if (parent->runtime_filter_descs().empty()) {
return Status::OK();
}
parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
parent->_build_expr_ctxs, parent->runtime_filter_descs(), is_global);
RETURN_IF_ERROR(
parent->_runtime_filter_slots->init(state, hash_table_ctx.hash_table->size()));
if (!parent->_runtime_filter_slots->empty() && !parent->_inserted_blocks.empty()) {
{
SCOPED_TIMER(parent->_runtime_filter_compute_timer);
parent->_runtime_filter_slots->insert(parent->_inserted_blocks);
}
}
{
SCOPED_TIMER(parent->_publish_runtime_filter_timer);
RETURN_IF_ERROR(parent->_runtime_filter_slots->publish());
}
template <typename Parent>
Status process_runtime_filter_build(RuntimeState* state, Block* block, Parent* parent,
bool is_global = false) {
if (parent->runtime_filter_descs().empty()) {
return Status::OK();
}
};
parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
parent->_build_expr_ctxs, parent->runtime_filter_descs(), is_global);
RETURN_IF_ERROR(parent->_runtime_filter_slots->init(state, block->rows()));
if (!parent->_runtime_filter_slots->empty() && block->rows() > 1) {
SCOPED_TIMER(parent->_runtime_filter_compute_timer);
parent->_runtime_filter_slots->insert(block);
}
{
SCOPED_TIMER(parent->_publish_runtime_filter_timer);
RETURN_IF_ERROR(parent->_runtime_filter_slots->publish());
}
return Status::OK();
}
using ProfileCounter = RuntimeProfile::Counter;
@ -129,10 +124,6 @@ struct ProcessHashTableBuild {
}
}
if (!_parent->runtime_filter_descs().empty()) {
_parent->_inserted_blocks.insert(&_acquired_block);
}
SCOPED_TIMER(_parent->_build_table_insert_timer);
hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows, _batch_size,
*has_null_key);
@ -414,10 +405,11 @@ private:
template <int JoinOpType, typename Parent>
friend struct ProcessHashTableProbe;
friend struct ProcessRuntimeFilterBuild;
template <typename Parent>
friend Status process_runtime_filter_build(RuntimeState* state, vectorized::Block* block,
Parent* parent, bool is_global);
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
std::unordered_set<const Block*> _inserted_blocks;
std::vector<IRuntimeFilter*> _runtime_filters;
std::atomic_bool _probe_open_finish = false;