diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index da0fc10ea7..471801abca 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -92,7 +92,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _runtime_filters.resize(_runtime_filter_descs.size()); for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( + RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter( RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options())); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter( _runtime_filter_descs[i].filter_id, &_runtime_filters[i])); diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 60a8c3b2bc..5bf924d1c1 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -88,7 +88,7 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { for (int i = 0; i < filter_size; ++i) { IRuntimeFilter* runtime_filter = nullptr; const auto& filter_desc = _runtime_filter_descs[i]; - RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( + RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter( RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id())); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter)); diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index f92869b5f4..c9b3a3e2a1 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -42,6 +42,7 @@ #include "vec/exprs/vexpr.h" #include "vec/exprs/vliteral.h" #include "vec/exprs/vruntimefilter_wrapper.h" +#include "vec/runtime/shared_hash_table_controller.h" namespace doris { // PrimitiveType->TExprNodeType @@ -431,23 +432,25 @@ public: _max_in_num = params->max_in_num; switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - _hybrid_set.reset(create_set(_column_return_type, _state->enable_vectorized_exec())); + _context.hybrid_set.reset( + create_set(_column_return_type, _state->enable_vectorized_exec())); break; } case RuntimeFilterType::MINMAX_FILTER: { - _minmax_func.reset(create_minmax_filter(_column_return_type)); + _context.minmax_func.reset(create_minmax_filter(_column_return_type)); break; } case RuntimeFilterType::BLOOM_FILTER: { _is_bloomfilter = true; - _bloomfilter_func.reset(create_bloom_filter(_column_return_type)); - _bloomfilter_func->set_length(params->bloom_filter_size); + _context.bloom_filter_func.reset(create_bloom_filter(_column_return_type)); + _context.bloom_filter_func->set_length(params->bloom_filter_size); return Status::OK(); } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { - _hybrid_set.reset(create_set(_column_return_type, _state->enable_vectorized_exec())); - _bloomfilter_func.reset(create_bloom_filter(_column_return_type)); - _bloomfilter_func->set_length(params->bloom_filter_size); + _context.hybrid_set.reset( + create_set(_column_return_type, _state->enable_vectorized_exec())); + _context.bloom_filter_func.reset(create_bloom_filter(_column_return_type)); + _context.bloom_filter_func->set_length(params->bloom_filter_size); return Status::OK(); } default: @@ -461,14 +464,15 @@ public: << "Can not change to bloom filter because of runtime filter type is " << to_string(_filter_type); _is_bloomfilter = true; - insert_to_bloom_filter(_bloomfilter_func.get()); + insert_to_bloom_filter(_context.bloom_filter_func.get()); // release in filter - _hybrid_set.reset(create_set(_column_return_type, _state->enable_vectorized_exec())); + _context.hybrid_set.reset( + create_set(_column_return_type, _state->enable_vectorized_exec())); } void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const { - if (_hybrid_set->size() > 0) { - auto it = _hybrid_set->begin(); + if (_context.hybrid_set->size() > 0) { + auto it = _context.hybrid_set->begin(); if (_use_batch) { while (it->has_next()) { @@ -484,7 +488,7 @@ public: } } - BloomFilterFuncBase* get_bloomfilter() const { return _bloomfilter_func.get(); } + BloomFilterFuncBase* get_bloomfilter() const { return _context.bloom_filter_func.get(); } void insert(const void* data) { switch (_filter_type) { @@ -492,22 +496,22 @@ public: if (_is_ignored_in_filter) { break; } - _hybrid_set->insert(data); + _context.hybrid_set->insert(data); break; } case RuntimeFilterType::MINMAX_FILTER: { - _minmax_func->insert(data); + _context.minmax_func->insert(data); break; } case RuntimeFilterType::BLOOM_FILTER: { - _bloomfilter_func->insert(data); + _context.bloom_filter_func->insert(data); break; } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { if (_is_bloomfilter) { - _bloomfilter_func->insert(data); + _context.bloom_filter_func->insert(data); } else { - _hybrid_set->insert(data); + _context.hybrid_set->insert(data); } break; } @@ -523,22 +527,22 @@ public: if (_is_ignored_in_filter) { break; } - _hybrid_set->insert_fixed_len(data, offsets, number); + _context.hybrid_set->insert_fixed_len(data, offsets, number); break; } case RuntimeFilterType::MINMAX_FILTER: { - _minmax_func->insert_fixed_len(data, offsets, number); + _context.minmax_func->insert_fixed_len(data, offsets, number); break; } case RuntimeFilterType::BLOOM_FILTER: { - _bloomfilter_func->insert_fixed_len(data, offsets, number); + _context.bloom_filter_func->insert_fixed_len(data, offsets, number); break; } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { if (_is_bloomfilter) { - _bloomfilter_func->insert_fixed_len(data, offsets, number); + _context.bloom_filter_func->insert_fixed_len(data, offsets, number); } else { - _hybrid_set->insert_fixed_len(data, offsets, number); + _context.hybrid_set->insert_fixed_len(data, offsets, number); } break; } @@ -618,18 +622,18 @@ public: _is_ignored_in_filter = true; _ignored_in_filter_msg = wrapper->_ignored_in_filter_msg; // release in filter - _hybrid_set.reset( + _context.hybrid_set.reset( create_set(_column_return_type, _state->enable_vectorized_exec())); break; } // try insert set - _hybrid_set->insert(wrapper->_hybrid_set.get()); - if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) { + _context.hybrid_set->insert(wrapper->_context.hybrid_set.get()); + if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) { #ifdef VLOG_DEBUG_IS_ON std::stringstream msg; msg << "fragment instance " << _fragment_instance_id.to_string() << " ignore merge runtime filter(in filter id " << _filter_id - << ") because: in_num(" << _hybrid_set->size() << ") >= max_in_num(" + << ") because: in_num(" << _context.hybrid_set->size() << ") >= max_in_num(" << _max_in_num << ")"; _ignored_in_filter_msg = _pool->add(new std::string(msg.str())); #else @@ -638,17 +642,17 @@ public: _is_ignored_in_filter = true; // release in filter - _hybrid_set.reset( + _context.hybrid_set.reset( create_set(_column_return_type, _state->enable_vectorized_exec())); } break; } case RuntimeFilterType::MINMAX_FILTER: { - _minmax_func->merge(wrapper->_minmax_func.get(), _pool); + _context.minmax_func->merge(wrapper->_context.minmax_func.get(), _pool); break; } case RuntimeFilterType::BLOOM_FILTER: { - _bloomfilter_func->merge(wrapper->_bloomfilter_func.get()); + _context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get()); break; } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { @@ -661,11 +665,11 @@ public: << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " << *(wrapper->get_ignored_in_filter_msg()); - _hybrid_set->insert(wrapper->_hybrid_set.get()); - if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) { + _context.hybrid_set->insert(wrapper->_context.hybrid_set.get()); + if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) { VLOG_DEBUG << "fragment instance " << _fragment_instance_id.to_string() << " change runtime filter to bloom filter(id=" << _filter_id - << ") because: in_num(" << _hybrid_set->size() + << ") because: in_num(" << _context.hybrid_set->size() << ") >= max_in_num(" << _max_in_num << ")"; change_to_bloom_filter(); } @@ -675,7 +679,7 @@ public: << " change runtime filter to bloom filter(id=" << _filter_id << ") because: already exist a bloom filter"; change_to_bloom_filter(); - _bloomfilter_func->merge(wrapper->_bloomfilter_func.get()); + _context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get()); } } else { if (wrapper->_filter_type == @@ -685,10 +689,10 @@ public: << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " << *(wrapper->get_ignored_in_filter_msg()); - wrapper->insert_to_bloom_filter(_bloomfilter_func.get()); + wrapper->insert_to_bloom_filter(_context.bloom_filter_func.get()); // bloom filter merge bloom filter } else { - _bloomfilter_func->merge(wrapper->_bloomfilter_func.get()); + _context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get()); } } break; @@ -709,7 +713,7 @@ public: _ignored_in_filter_msg = _pool->add(new std::string(in_filter->ignored_msg())); return Status::OK(); } - _hybrid_set.reset(create_set(type, _state->enable_vectorized_exec())); + _context.hybrid_set.reset(create_set(type, _state->enable_vectorized_exec())); switch (type) { case TYPE_BOOLEAN: { batch_assign(in_filter, [](std::shared_ptr& set, PColumnValue& column, @@ -880,40 +884,40 @@ public: _is_bloomfilter = true; // we won't use this class to insert or find any data // so any type is ok - _bloomfilter_func.reset(create_bloom_filter(PrimitiveType::TYPE_INT)); - return _bloomfilter_func->assign(data, bloom_filter->filter_length()); + _context.bloom_filter_func.reset(create_bloom_filter(PrimitiveType::TYPE_INT)); + return _context.bloom_filter_func->assign(data, bloom_filter->filter_length()); } // used by shuffle runtime filter // assign this filter by protobuf Status assign(const PMinMaxFilter* minmax_filter) { PrimitiveType type = to_primitive_type(minmax_filter->column_type()); - _minmax_func.reset(create_minmax_filter(type)); + _context.minmax_func.reset(create_minmax_filter(type)); switch (type) { case TYPE_BOOLEAN: { bool min_val = minmax_filter->min_val().boolval(); bool max_val = minmax_filter->max_val().boolval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_TINYINT: { int8_t min_val = static_cast(minmax_filter->min_val().intval()); int8_t max_val = static_cast(minmax_filter->max_val().intval()); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_SMALLINT: { int16_t min_val = static_cast(minmax_filter->min_val().intval()); int16_t max_val = static_cast(minmax_filter->max_val().intval()); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_INT: { int32_t min_val = minmax_filter->min_val().intval(); int32_t max_val = minmax_filter->max_val().intval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_BIGINT: { int64_t min_val = minmax_filter->min_val().longval(); int64_t max_val = minmax_filter->max_val().longval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_LARGEINT: { auto min_string_val = minmax_filter->min_val().stringval(); @@ -925,27 +929,27 @@ public: int128_t max_val = StringParser::string_to_int( max_string_val.c_str(), max_string_val.length(), &result); DCHECK(result == StringParser::PARSE_SUCCESS); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_FLOAT: { float min_val = static_cast(minmax_filter->min_val().doubleval()); float max_val = static_cast(minmax_filter->max_val().doubleval()); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DOUBLE: { double min_val = static_cast(minmax_filter->min_val().doubleval()); double max_val = static_cast(minmax_filter->max_val().doubleval()); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DATEV2: { int32_t min_val = minmax_filter->min_val().intval(); int32_t max_val = minmax_filter->max_val().intval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DATETIMEV2: { int64_t min_val = minmax_filter->min_val().longval(); int64_t max_val = minmax_filter->max_val().longval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DATETIME: case TYPE_DATE: { @@ -955,24 +959,24 @@ public: DateTimeValue max_val; min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length()); max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length()); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMALV2: { auto& min_val_ref = minmax_filter->min_val().stringval(); auto& max_val_ref = minmax_filter->max_val().stringval(); DecimalV2Value min_val(min_val_ref); DecimalV2Value max_val(max_val_ref); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMAL32: { int32_t min_val = minmax_filter->min_val().intval(); int32_t max_val = minmax_filter->max_val().intval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMAL64: { int64_t min_val = minmax_filter->min_val().longval(); int64_t max_val = minmax_filter->max_val().longval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMAL128I: { auto min_string_val = minmax_filter->min_val().stringval(); @@ -984,7 +988,7 @@ public: int128_t max_val = StringParser::string_to_int( max_string_val.c_str(), max_string_val.length(), &result); DCHECK(result == StringParser::PARSE_SUCCESS); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_VARCHAR: case TYPE_CHAR: @@ -995,7 +999,7 @@ public: auto max_val_ptr = _pool->add(new std::string(max_val_ref)); StringValue min_val(const_cast(min_val_ptr->c_str()), min_val_ptr->length()); StringValue max_val(const_cast(max_val_ptr->c_str()), max_val_ptr->length()); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } default: DCHECK(false) << "unknown type"; @@ -1005,17 +1009,17 @@ public: } Status get_in_filter_iterator(HybridSetBase::IteratorBase** it) { - *it = _hybrid_set->begin(); + *it = _context.hybrid_set->begin(); return Status::OK(); } Status get_bloom_filter_desc(char** data, int* filter_length) { - return _bloomfilter_func->get_data(data, filter_length); + return _context.bloom_filter_func->get_data(data, filter_length); } Status get_minmax_filter_desc(void** min_data, void** max_data) { - *min_data = _minmax_func->get_min(); - *max_data = _minmax_func->get_max(); + *min_data = _context.minmax_func->get_min(); + *max_data = _context.minmax_func->get_max(); return Status::OK(); } @@ -1027,13 +1031,13 @@ public: case TYPE_VARCHAR: case TYPE_CHAR: case TYPE_STRING: { - StringValue* min_value = static_cast(_minmax_func->get_min()); - StringValue* max_value = static_cast(_minmax_func->get_max()); + StringValue* min_value = static_cast(_context.minmax_func->get_min()); + StringValue* max_value = static_cast(_context.minmax_func->get_max()); auto min_val_ptr = _pool->add(new std::string(min_value->ptr)); auto max_val_ptr = _pool->add(new std::string(max_value->ptr)); StringValue min_val(const_cast(min_val_ptr->c_str()), min_val_ptr->length()); StringValue max_val(const_cast(max_val_ptr->c_str()), max_val_ptr->length()); - _minmax_func->assign(&min_val, &max_val); + _context.minmax_func->assign(&min_val, &max_val); } default: break; @@ -1052,11 +1056,11 @@ public: PColumnValue&, ObjectPool*)) { for (int i = 0; i < filter->values_size(); ++i) { PColumnValue column = filter->values(i); - assign_func(_hybrid_set, column, _pool); + assign_func(_context.hybrid_set, column, _pool); } } - size_t get_in_filter_size() const { return _hybrid_set->size(); } + size_t get_in_filter_size() const { return _context.hybrid_set->size(); } friend class IRuntimeFilter; @@ -1068,9 +1072,8 @@ private: PrimitiveType _column_return_type; // column type RuntimeFilterType _filter_type; int32_t _max_in_num = -1; - std::shared_ptr _minmax_func; - std::shared_ptr _hybrid_set; - std::shared_ptr _bloomfilter_func; + + vectorized::SharedRuntimeFilterContext _context; bool _is_bloomfilter = false; bool _is_ignored_in_filter = false; std::string* _ignored_in_filter_msg = nullptr; @@ -1090,12 +1093,12 @@ Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRunt return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id); } -Status IRuntimeFilter::apply_from_other(IRuntimeFilter* other) { - _wrapper->_hybrid_set = other->_wrapper->_hybrid_set; - _wrapper->_bloomfilter_func = other->_wrapper->_bloomfilter_func; - _wrapper->_minmax_func = other->_wrapper->_minmax_func; - _wrapper->_filter_type = other->_wrapper->_filter_type; - _runtime_filter_type = other->_runtime_filter_type; +void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) { + context = _wrapper->_context; +} + +Status IRuntimeFilter::copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context) { + _wrapper->_context = context; return Status::OK(); } @@ -1736,7 +1739,7 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta node.__isset.vector_opcode = true; node.__set_vector_opcode(to_in_opcode(_column_return_type)); auto in_pred = _pool->add(new InPredicate(node)); - RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.get())); + RETURN_IF_ERROR(in_pred->prepare(state, _context.hybrid_set.get())); in_pred->add_child(Expr::copy(_pool, prob_expr->root())); ExprContext* ctx = _pool->add(new ExprContext(in_pred)); container->push_back(ctx); @@ -1748,7 +1751,8 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta Expr* max_literal = nullptr; auto max_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::LE); RETURN_IF_ERROR(create_literal(_pool, prob_expr->root()->type(), - _minmax_func->get_max(), (void**)&max_literal)); + _context.minmax_func->get_max(), + (void**)&max_literal)); max_pred->add_child(Expr::copy(_pool, prob_expr->root())); max_pred->add_child(max_literal); container->push_back(_pool->add(new ExprContext(max_pred))); @@ -1756,7 +1760,8 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta Expr* min_literal = nullptr; auto min_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::GE); RETURN_IF_ERROR(create_literal(_pool, prob_expr->root()->type(), - _minmax_func->get_min(), (void**)&min_literal)); + _context.minmax_func->get_min(), + (void**)&min_literal)); min_pred->add_child(Expr::copy(_pool, prob_expr->root())); min_pred->add_child(min_literal); container->push_back(_pool->add(new ExprContext(min_pred))); @@ -1772,7 +1777,7 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta node.__isset.vector_opcode = true; node.__set_vector_opcode(to_in_opcode(_column_return_type)); auto bloom_pred = _pool->add(new BloomFilterPredicate(node)); - RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func)); + RETURN_IF_ERROR(bloom_pred->prepare(state, _context.bloom_filter_func)); bloom_pred->add_child(Expr::copy(_pool, prob_expr->root())); ExprContext* ctx = _pool->add(new ExprContext(bloom_pred)); container->push_back(ctx); @@ -1811,7 +1816,7 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vectoradd(new vectorized::VDirectInPredicate(node)); - in_pred->set_filter(_hybrid_set); + in_pred->set_filter(_context.hybrid_set); auto cloned_vexpr = vprob_expr->root()->clone(_pool); in_pred->add_child(cloned_vexpr); auto wrapper = _pool->add(new vectorized::VRuntimeFilterWrapper(node, in_pred)); @@ -1827,7 +1832,8 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector(_pool, vprob_expr->root()->type(), - _minmax_func->get_max(), (void**)&max_literal)); + _context.minmax_func->get_max(), + (void**)&max_literal)); auto cloned_vexpr = vprob_expr->root()->clone(_pool); max_pred->add_child(cloned_vexpr); max_pred->add_child(max_literal); @@ -1841,7 +1847,8 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector(_pool, vprob_expr->root()->type(), - _minmax_func->get_min(), (void**)&min_literal)); + _context.minmax_func->get_min(), + (void**)&min_literal)); cloned_vexpr = vprob_expr->root()->clone(_pool); min_pred->add_child(cloned_vexpr); min_pred->add_child(min_literal); @@ -1861,7 +1868,7 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vectoradd(new vectorized::VBloomPredicate(node)); - bloom_pred->set_filter(_bloomfilter_func); + bloom_pred->set_filter(_context.bloom_filter_func); auto cloned_vexpr = vprob_expr->root()->clone(_pool); bloom_pred->add_child(cloned_vexpr); auto wrapper = _pool->add(new vectorized::VRuntimeFilterWrapper(node, bloom_pred)); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index e84da75a5d..89c82abfef 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -47,6 +47,7 @@ class BloomFilterFuncBase; namespace vectorized { class VExpr; class VExprContext; +struct SharedRuntimeFilterContext; } // namespace vectorized enum class RuntimeFilterType { @@ -141,7 +142,8 @@ public: const TQueryOptions* query_options, const RuntimeFilterRole role, int node_id, IRuntimeFilter** res); - Status apply_from_other(IRuntimeFilter* other); + void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context); + Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context); // insert data to build filter // only used for producer diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index b13fa69625..c6fa1d2381 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -23,6 +23,7 @@ #include "vec/columns/column_nullable.h" #include "vec/columns/columns_number.h" #include "vec/common/assert_cast.h" +#include "vec/runtime/shared_hash_table_controller.h" namespace doris { // this class used in a hash join node @@ -229,19 +230,24 @@ public: } } - Status apply_from_other(RuntimeFilterSlotsBase* other) { + void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) { + for (auto& it : _runtime_filters) { + for (auto& filter : it.second) { + auto& target = context->runtime_filters[filter->filter_id()]; + filter->copy_to_shared_context(target); + } + } + } + + Status copy_from_shared_context(vectorized::SharedHashTableContextPtr& context) { for (auto& it : _runtime_filters) { - auto& other_filters = other->_runtime_filters[it.first]; for (auto& filter : it.second) { auto filter_id = filter->filter_id(); - auto ret = std::find_if(other_filters.begin(), other_filters.end(), - [&](IRuntimeFilter* other_filter) { - return other_filter->filter_id() == filter_id; - }); - if (ret == other_filters.end()) { + auto ret = context->runtime_filters.find(filter_id); + if (ret == context->runtime_filters.end()) { return Status::Aborted("invalid runtime filter id: {}", filter_id); } - filter->apply_from_other(*ret); + filter->copy_from_shared_context(ret->second); } } return Status::OK(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 29cdd17320..09ccb4ef23 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -240,13 +240,7 @@ Status FragmentExecState::execute() { } #ifndef BE_TEST if (_executor.runtime_state()->is_cancelled()) { - Status status = Status::Cancelled("cancelled before execution"); - _executor.runtime_state() - ->get_query_fragments_ctx() - ->get_shared_hash_table_controller() - ->release_ref_count_if_need(_executor.runtime_state()->fragment_instance_id(), - status); - return status; + return Status::Cancelled("cancelled before execution"); } #endif int64_t duration_ns = 0; @@ -254,19 +248,11 @@ Status FragmentExecState::execute() { SCOPED_RAW_TIMER(&duration_ns); CgroupsMgr::apply_system_cgroup(); opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment"); - Status status = _executor.open(); - WARN_IF_ERROR(status, + WARN_IF_ERROR(_executor.open(), strings::Substitute("Got error while opening fragment $0, query id: $1", print_id(_fragment_instance_id), print_id(_query_id))); _executor.close(); - if (!status.ok()) { - _executor.runtime_state() - ->get_query_fragments_ctx() - ->get_shared_hash_table_controller() - ->release_ref_count_if_need(_executor.runtime_state()->fragment_instance_id(), - status); - } } DorisMetrics::instance()->fragment_requests_total->increment(1); DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000); @@ -712,9 +698,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi } g_fragmentmgr_prepare_latency << (duration_ns / 1000); - _setup_shared_hashtable_for_broadcast_join(params, exec_state->executor()->runtime_state(), - fragments_ctx.get()); - std::shared_ptr handler; _runtimefilter_controller.add_entity(params, &handler, exec_state->executor()->runtime_state()); exec_state->set_merge_controller_handler(handler); @@ -784,26 +767,6 @@ void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params, #endif } -void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params, - RuntimeState* state, - QueryFragmentsCtx* fragments_ctx) { - if (!params.__isset.fragment || !params.fragment.__isset.plan || - params.fragment.plan.nodes.empty()) { - return; - } - - for (auto& node : params.fragment.plan.nodes) { - if (node.node_type != TPlanNodeType::HASH_JOIN_NODE || - !node.hash_join_node.__isset.is_broadcast_join || - !node.hash_join_node.is_broadcast_join) { - continue; - } - - std::lock_guard lock(_lock_for_shared_hash_table); - fragments_ctx->get_shared_hash_table_controller()->acquire_ref_count(state, node.node_id); - } -} - bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) { return type == TPlanNodeType::OLAP_SCAN_NODE || type == TPlanNodeType::MYSQL_SCAN_NODE || type == TPlanNodeType::SCHEMA_SCAN_NODE || type == TPlanNodeType::META_SCAN_NODE || diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 7c6a079e32..2246a42ac8 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -107,10 +107,6 @@ private: bool _is_scan_node(const TPlanNodeType::type& type); - void _setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params, - RuntimeState* state, - QueryFragmentsCtx* fragments_ctx); - // This is input params ExecEnv* _exec_env; diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 1f7c053db1..4bfff8c4da 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -100,8 +100,8 @@ public: return _ready_to_execute.load() && !_is_cancelled.load(); } - vectorized::SharedHashTableController* get_shared_hash_table_controller() { - return _shared_hash_table_controller.get(); + std::shared_ptr get_shared_hash_table_controller() { + return _shared_hash_table_controller; } public: @@ -144,7 +144,7 @@ private: std::atomic _ready_to_execute {false}; std::atomic _is_cancelled {false}; - std::unique_ptr _shared_hash_table_controller; + std::shared_ptr _shared_hash_table_controller; }; } // namespace doris diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 7bc894e1da..ce851c119a 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -78,8 +78,9 @@ Status RuntimeFilterMgr::get_producer_filter(const int filter_id, return get_filter_by_role(filter_id, RuntimeFilterRole::PRODUCER, producer_filter); } -Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc, - const TQueryOptions& options, int node_id) { +Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role, + const TRuntimeFilterDesc& desc, + const TQueryOptions& options, int node_id) { DCHECK((role == RuntimeFilterRole::CONSUMER && node_id >= 0) || role != RuntimeFilterRole::CONSUMER); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index db392bcc54..002f85c20b 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -69,8 +69,8 @@ public: Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter); // regist filter - Status regist_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc, - const TQueryOptions& options, int node_id = -1); + Status register_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc, + const TQueryOptions& options, int node_id = -1); // update filter by remote Status update_filter(const PPublishFilterRequest* request, diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index cc9d8a4831..7b8ae4d89d 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -397,6 +397,11 @@ public: bool enable_profile() const { return _query_options.is_report_success; } + bool enable_share_hash_table_for_broadcast_join() const { + return _query_options.__isset.enable_share_hash_table_for_broadcast_join && + _query_options.enable_share_hash_table_for_broadcast_join; + } + private: // Use a custom block manager for the query for testing purposes. void set_block_mgr2(const std::shared_ptr& block_mgr) { diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 7da2100784..9367c299c3 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -28,7 +28,7 @@ template ProcessHashTableProbe::ProcessHashTableProbe(HashJoinNode* join_node, int batch_size) : _join_node(join_node), _batch_size(batch_size), - _build_blocks(join_node->_build_blocks), + _build_blocks(*join_node->_build_blocks), _tuple_is_null_left_flags(join_node->_is_outer_join ? &(reinterpret_cast( *join_node->_tuple_is_null_left_flag_column) @@ -207,15 +207,14 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c } int last_offset = current_offset; auto find_result = - !need_null_map_for_probe ? key_getter.find_key(*hash_table_ctx.hash_table_ptr, - probe_index, _arena) + !need_null_map_for_probe + ? key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena) : (*null_map)[probe_index] - ? decltype(key_getter.find_key(*hash_table_ctx.hash_table_ptr, - probe_index, _arena)) {nullptr, false} - : key_getter.find_key(*hash_table_ctx.hash_table_ptr, probe_index, - _arena); + ? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index, + _arena)) {nullptr, false} + : key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena); if (probe_index + PREFETCH_STEP < probe_rows) - key_getter.template prefetch(*hash_table_ctx.hash_table_ptr, + key_getter.template prefetch(hash_table_ctx.hash_table, probe_index + PREFETCH_STEP, _arena); if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || @@ -372,15 +371,14 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( auto last_offset = current_offset; auto find_result = - !need_null_map_for_probe ? key_getter.find_key(*hash_table_ctx.hash_table_ptr, - probe_index, _arena) + !need_null_map_for_probe + ? key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena) : (*null_map)[probe_index] - ? decltype(key_getter.find_key(*hash_table_ctx.hash_table_ptr, - probe_index, _arena)) {nullptr, false} - : key_getter.find_key(*hash_table_ctx.hash_table_ptr, probe_index, - _arena); + ? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index, + _arena)) {nullptr, false} + : key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena); if (probe_index + PREFETCH_STEP < probe_rows) - key_getter.template prefetch(*hash_table_ctx.hash_table_ptr, + key_getter.template prefetch(hash_table_ctx.hash_table, probe_index + PREFETCH_STEP, _arena); if (find_result.is_found()) { auto& mapped = find_result.get_mapped(); @@ -627,7 +625,7 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp } }; - for (; iter != hash_table_ctx.hash_table_ptr->end() && block_size < _batch_size; ++iter) { + for (; iter != hash_table_ctx.hash_table.end() && block_size < _batch_size; ++iter) { auto& mapped = iter->get_second(); if constexpr (std::is_same_v) { if (mapped.visited) { @@ -674,7 +672,7 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp } _tuple_is_null_left_flags->resize_fill(block_size, 1); } - *eos = iter == hash_table_ctx.hash_table_ptr->end(); + *eos = iter == hash_table_ctx.hash_table.end(); output_block->swap( mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0)); return Status::OK(); diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index da144378b1..3778b3688f 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -221,7 +221,7 @@ struct ProcessRuntimeFilterBuild { _join_node->_runtime_filter_descs)); RETURN_IF_ERROR(_join_node->_runtime_filter_slots->init( - state, hash_table_ctx.hash_table_ptr->get_size())); + state, hash_table_ctx.hash_table.get_size())); if (!_join_node->_runtime_filter_slots->empty() && !_join_node->_inserted_rows.empty()) { { @@ -250,14 +250,15 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr ? tnode.hash_join_node.hash_output_slot_ids : std::vector {}) { _runtime_filter_descs = tnode.runtime_filters; - _arena = std::make_unique(); - _hash_table_variants = std::make_unique(); + _arena = std::make_shared(); + _hash_table_variants = std::make_shared(); _process_hashtable_ctx_variants = std::make_unique(); + _build_blocks.reset(new std::vector()); // avoid vector expand change block address. // one block can store 4g data, _build_blocks can store 128*4g data. // if probe data bigger than 512g, runtime filter maybe will core dump when insert data. - _build_blocks.reserve(_MAX_BUILD_BLOCK_COUNT); + _build_blocks->reserve(_MAX_BUILD_BLOCK_COUNT); } Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { @@ -313,7 +314,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _runtime_filters.resize(_runtime_filter_descs.size()); for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( + RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter( RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options())); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter( _runtime_filter_descs[i].filter_id, &_runtime_filters[i])); @@ -349,18 +350,18 @@ Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // Build phase - auto build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true); - runtime_profile()->add_child(build_phase_profile, false, nullptr); - _build_timer = ADD_TIMER(build_phase_profile, "BuildTime"); - _build_table_timer = ADD_TIMER(build_phase_profile, "BuildTableTime"); - _build_side_merge_block_timer = ADD_TIMER(build_phase_profile, "BuildSideMergeBlockTime"); - _build_table_insert_timer = ADD_TIMER(build_phase_profile, "BuildTableInsertTime"); - _build_expr_call_timer = ADD_TIMER(build_phase_profile, "BuildExprCallTime"); - _build_table_expanse_timer = ADD_TIMER(build_phase_profile, "BuildTableExpanseTime"); + _build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true); + runtime_profile()->add_child(_build_phase_profile, false, nullptr); + _build_timer = ADD_TIMER(_build_phase_profile, "BuildTime"); + _build_table_timer = ADD_TIMER(_build_phase_profile, "BuildTableTime"); + _build_side_merge_block_timer = ADD_TIMER(_build_phase_profile, "BuildSideMergeBlockTime"); + _build_table_insert_timer = ADD_TIMER(_build_phase_profile, "BuildTableInsertTime"); + _build_expr_call_timer = ADD_TIMER(_build_phase_profile, "BuildExprCallTime"); + _build_table_expanse_timer = ADD_TIMER(_build_phase_profile, "BuildTableExpanseTime"); _build_table_convert_timer = - ADD_TIMER(build_phase_profile, "BuildTableConvertToPartitionedTime"); - _build_rows_counter = ADD_COUNTER(build_phase_profile, "BuildRows", TUnit::UNIT); - _build_side_compute_hash_timer = ADD_TIMER(build_phase_profile, "BuildSideHashComputingTime"); + ADD_TIMER(_build_phase_profile, "BuildTableConvertToPartitionedTime"); + _build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows", TUnit::UNIT); + _build_side_compute_hash_timer = ADD_TIMER(_build_phase_profile, "BuildSideHashComputingTime"); // Probe phase auto probe_phase_profile = runtime_profile()->create_child("ProbePhase", true, true); @@ -379,8 +380,19 @@ Status HashJoinNode::prepare(RuntimeState* state) { _build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT); _build_buckets_fill_counter = ADD_COUNTER(runtime_profile(), "FilledBuckets", TUnit::UNIT); + _should_build_hash_table = true; if (_is_broadcast_join) { runtime_profile()->add_info_string("BroadcastJoin", "true"); + if (state->enable_share_hash_table_for_broadcast_join()) { + runtime_profile()->add_info_string("ShareHashTableEnabled", "true"); + _shared_hashtable_controller = + state->get_query_fragments_ctx()->get_shared_hash_table_controller(); + _shared_hash_table_context = _shared_hashtable_controller->get_context(id()); + _should_build_hash_table = _shared_hashtable_controller->should_build_hash_table( + state->fragment_instance_id(), id()); + } else { + runtime_profile()->add_info_string("ShareHashTableEnabled", "false"); + } } RETURN_IF_ERROR(VExpr::prepare(_build_expr_ctxs, state, child(1)->row_desc())); @@ -398,7 +410,6 @@ Status HashJoinNode::prepare(RuntimeState* state) { // Hash Table Init _hash_table_init(state); - _process_hashtable_ctx_variants_init(state); _construct_mutable_join_block(); return Status::OK(); @@ -417,11 +428,6 @@ Status HashJoinNode::close(RuntimeState* state) { return Status::OK(); } - if (_shared_hashtable_controller) { - _shared_hashtable_controller->release_ref_count(state, id()); - _shared_hashtable_controller->wait_for_closable(state, id()); - } - START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::close"); VExpr::close(_build_expr_ctxs, state); VExpr::close(_probe_expr_ctxs, state); @@ -610,6 +616,7 @@ Status HashJoinNode::open(RuntimeState* state) { Status HashJoinNode::_materialize_build_side(RuntimeState* state) { RETURN_IF_ERROR(child(1)->open(state)); + SCOPED_TIMER(_build_timer); MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors()); @@ -620,111 +627,128 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { // make one block for each 4 gigabytes constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; - auto should_build_hash_table = true; - if (_is_broadcast_join) { - _shared_hashtable_controller = - state->get_query_fragments_ctx()->get_shared_hash_table_controller(); - should_build_hash_table = - _shared_hashtable_controller->should_build_hash_table(state, id()); - } + if (_should_build_hash_table) { + Block block; + // If eos or have already met a null value using short-circuit strategy, we do not need to pull + // data from data. + while (!eos && !_short_circuit_for_null_in_probe_side) { + block.clear_column_data(); + RETURN_IF_CANCELLED(state); - Block block; - // If eos or have already met a null value using short-circuit strategy, we do not need to pull - // data from data. - while (!eos && !_short_circuit_for_null_in_probe_side) { - block.clear_column_data(); - RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block, &eos), + child(1)->get_next_span(), eos); - RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block, &eos), - child(1)->get_next_span(), eos); - if (!should_build_hash_table) { - continue; + _mem_used += block.allocated_bytes(); + + if (block.rows() != 0) { + SCOPED_TIMER(_build_side_merge_block_timer); + RETURN_IF_CATCH_BAD_ALLOC(mutable_block.merge(block)); + } + + if (UNLIKELY(_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE)) { + if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) { + return Status::NotSupported( + strings::Substitute("data size of right table in hash join > $0", + BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT)); + } + _build_blocks->emplace_back(mutable_block.to_block()); + // TODO:: Rethink may we should do the process after we receive all build blocks ? + // which is better. + RETURN_IF_ERROR(_process_build_block(state, (*_build_blocks)[index], index)); + + mutable_block = MutableBlock(); + ++index; + last_mem_used = _mem_used; + } } - _mem_used += block.allocated_bytes(); - - if (block.rows() != 0) { - SCOPED_TIMER(_build_side_merge_block_timer); - RETURN_IF_CATCH_BAD_ALLOC(mutable_block.merge(block)); - } - - if (UNLIKELY(_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE)) { - if (_build_blocks.size() == _MAX_BUILD_BLOCK_COUNT) { + if (!mutable_block.empty() && !_short_circuit_for_null_in_probe_side) { + if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) { return Status::NotSupported( strings::Substitute("data size of right table in hash join > $0", BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT)); } - _build_blocks.emplace_back(mutable_block.to_block()); - // TODO:: Rethink may we should do the process after we receive all build blocks ? - // which is better. - RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index)); - - mutable_block = MutableBlock(); - ++index; - last_mem_used = _mem_used; + _build_blocks->emplace_back(mutable_block.to_block()); + RETURN_IF_ERROR(_process_build_block(state, (*_build_blocks)[index], index)); } } - - if (should_build_hash_table && !mutable_block.empty() && - !_short_circuit_for_null_in_probe_side) { - if (_build_blocks.size() == _MAX_BUILD_BLOCK_COUNT) { - return Status::NotSupported( - strings::Substitute("data size of right table in hash join > $0", - BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT)); - } - _build_blocks.emplace_back(mutable_block.to_block()); - RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index)); - } child(1)->close(state); - return std::visit( - Overload {[&](std::monostate& arg) -> Status { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - }, - [&](auto&& arg) -> Status { - using HashTableCtxType = std::decay_t; - using HashTableType = typename HashTableCtxType::HashTable; - if (!should_build_hash_table) { - auto& ret = _shared_hashtable_controller->wait_for_hash_table(id()); - if (!ret.status.ok()) { - return ret.status; - } - _short_circuit_for_null_in_probe_side = - _shared_hashtable_controller - ->short_circuit_for_null_in_probe_side(); - arg.hash_table_ptr = - reinterpret_cast(ret.hash_table_ptr); - _build_blocks = *ret.blocks; - _runtime_filter_slots = _pool->add(new VRuntimeFilterSlots( - _probe_expr_ctxs, _build_expr_ctxs, _runtime_filter_descs)); - RETURN_IF_ERROR(_runtime_filter_slots->init( - state, arg.hash_table_ptr->get_size())); - RETURN_IF_ERROR(_runtime_filter_slots->apply_from_other( - ret.runtime_filter_slots)); - { - SCOPED_TIMER(_push_down_timer); - _runtime_filter_slots->publish(); - } - return Status::OK(); - } else { - arg.hash_table_ptr = &arg.hash_table; - ProcessRuntimeFilterBuild - runtime_filter_build_process(this); - auto ret = runtime_filter_build_process(state, arg); - if (_shared_hashtable_controller) { - _shared_hashtable_controller - ->set_short_circuit_for_null_in_probe_side( - _short_circuit_for_null_in_probe_side); - SharedHashTableEntry entry(ret, arg.hash_table_ptr, - &_build_blocks, _runtime_filter_slots); - _shared_hashtable_controller->put_hash_table(std::move(entry), - id()); - } - return ret; - } - }}, - *_hash_table_variants); + if (_should_build_hash_table) { + auto ret = std::visit(Overload {[&](std::monostate&) -> Status { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + }, + [&](auto&& arg) -> Status { + using HashTableCtxType = std::decay_t; + ProcessRuntimeFilterBuild + runtime_filter_build_process(this); + return runtime_filter_build_process(state, arg); + }}, + *_hash_table_variants); + if (!ret.ok()) { + if (_shared_hashtable_controller) { + _shared_hash_table_context->status = ret; + _shared_hashtable_controller->signal(id()); + } + return ret; + } + if (_shared_hashtable_controller) { + _shared_hash_table_context->status = Status::OK(); + // arena will be shared with other instances. + _shared_hash_table_context->arena = _arena; + _shared_hash_table_context->blocks = _build_blocks; + _shared_hash_table_context->hash_table_variants = _hash_table_variants; + _shared_hash_table_context->short_circuit_for_null_in_probe_side = + _short_circuit_for_null_in_probe_side; + if (_runtime_filter_slots) { + _runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); + } + _shared_hashtable_controller->signal(id()); + } + } else { + DCHECK(_shared_hashtable_controller != nullptr); + DCHECK(_shared_hash_table_context != nullptr); + auto wait_timer = ADD_TIMER(_build_phase_profile, "WaitForSharedHashTableTime"); + SCOPED_TIMER(wait_timer); + RETURN_IF_ERROR( + _shared_hashtable_controller->wait_for_signal(state, _shared_hash_table_context)); + + _build_phase_profile->add_info_string( + "SharedHashTableFrom", + print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id()))); + _short_circuit_for_null_in_probe_side = + _shared_hash_table_context->short_circuit_for_null_in_probe_side; + _hash_table_variants = std::static_pointer_cast( + _shared_hash_table_context->hash_table_variants); + _build_blocks = _shared_hash_table_context->blocks; + + if (!_shared_hash_table_context->runtime_filters.empty()) { + auto ret = std::visit( + Overload {[&](std::monostate&) -> Status { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + }, + [&](auto&& arg) -> Status { + if (_runtime_filter_descs.empty()) { + return Status::OK(); + } + _runtime_filter_slots = _pool->add(new VRuntimeFilterSlots( + _probe_expr_ctxs, _build_expr_ctxs, + _runtime_filter_descs)); + + RETURN_IF_ERROR(_runtime_filter_slots->init( + state, arg.hash_table.get_size())); + return _runtime_filter_slots->copy_from_shared_context( + _shared_hash_table_context); + }}, + *_hash_table_variants); + RETURN_IF_ERROR(ret); + } + } + + _process_hashtable_ctx_variants_init(state); + return Status::OK(); } template @@ -993,7 +1017,7 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) { __builtin_unreachable(); }, [&](auto&& arg) { - arg.hash_table_ptr->set_partitioned_threshold( + arg.hash_table.set_partitioned_threshold( state->partitioned_hash_join_rows_threshold()); }}, *_hash_table_variants); @@ -1054,6 +1078,12 @@ void HashJoinNode::_reset_tuple_is_null_column() { } } +HashJoinNode::~HashJoinNode() { + if (_shared_hashtable_controller && _should_build_hash_table) { + _shared_hashtable_controller->signal(id()); + } +} + void HashJoinNode::_release_mem() { _arena = nullptr; _hash_table_variants = nullptr; @@ -1061,10 +1091,8 @@ void HashJoinNode::_release_mem() { _null_map_column = nullptr; _tuple_is_null_left_flag_column = nullptr; _tuple_is_null_right_flag_column = nullptr; + _shared_hash_table_context = nullptr; _probe_block.clear(); - - std::vector tmp_build_blocks; - _build_blocks.swap(tmp_build_blocks); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 72d8317468..90c5e7e3cd 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -24,7 +24,9 @@ #include "join_op.h" #include "process_hash_table_probe.h" #include "vec/common/columns_hashing.h" +#include "vec/common/hash_table/hash_map.h" #include "vec/common/hash_table/partitioned_hash_map.h" +#include "vec/runtime/shared_hash_table_controller.h" #include "vjoin_node_base.h" namespace doris { @@ -44,7 +46,6 @@ struct SerializedHashTableContext { using Iter = typename HashTable::iterator; HashTable hash_table; - HashTable* hash_table_ptr = &hash_table; Iter iter; bool inited = false; @@ -76,7 +77,6 @@ struct PrimaryTypeHashTableContext { using Iter = typename HashTable::iterator; HashTable hash_table; - HashTable* hash_table_ptr = &hash_table; Iter iter; bool inited = false; @@ -111,7 +111,6 @@ struct FixedKeyHashTableContext { using Iter = typename HashTable::iterator; HashTable hash_table; - HashTable* hash_table_ptr = &hash_table; Iter iter; bool inited = false; @@ -185,6 +184,7 @@ public: static constexpr int PREFETCH_STEP = 64; HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~HashJoinNode(); Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; Status prepare(RuntimeState* state) override; @@ -235,14 +235,18 @@ private: RuntimeProfile::Counter* _join_filter_timer; + RuntimeProfile* _build_phase_profile; + int64_t _mem_used; - std::unique_ptr _arena; - std::unique_ptr _hash_table_variants; + std::shared_ptr _arena; + + // maybe share hash table with other fragment instances + std::shared_ptr _hash_table_variants; std::unique_ptr _process_hashtable_ctx_variants; - std::vector _build_blocks; + std::shared_ptr> _build_blocks; Block _probe_block; ColumnRawPtrs _probe_columns; ColumnUInt8::MutablePtr _null_map_column; @@ -259,8 +263,9 @@ private: Sizes _build_key_sz; bool _is_broadcast_join = false; - SharedHashTableController* _shared_hashtable_controller = nullptr; - VRuntimeFilterSlots* _runtime_filter_slots; + bool _should_build_hash_table = true; + std::shared_ptr _shared_hashtable_controller = nullptr; + VRuntimeFilterSlots* _runtime_filter_slots = nullptr; std::vector _hash_output_slot_ids; std::vector _left_output_slot_flags; @@ -269,6 +274,8 @@ private: MutableColumnPtr _tuple_is_null_left_flag_column; MutableColumnPtr _tuple_is_null_right_flag_column; + SharedHashTableContextPtr _shared_hash_table_context = nullptr; + private: Status _materialize_build_side(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 35c6186ad7..33c6fab3fe 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -189,7 +189,7 @@ Status VScanNode::_register_runtime_filter() { for (int i = 0; i < filter_size; ++i) { IRuntimeFilter* runtime_filter = nullptr; const auto& filter_desc = _runtime_filter_descs[i]; - RETURN_IF_ERROR(_state->runtime_filter_mgr()->regist_filter( + RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter( RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id())); RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter)); diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp index 7d92dabedf..e9e125a168 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ b/be/src/vec/runtime/shared_hash_table_controller.cpp @@ -22,111 +22,57 @@ namespace doris { namespace vectorized { -bool SharedHashTableController::should_build_hash_table(RuntimeState* state, int my_node_id) { +bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragment_instance_id, + int my_node_id) { std::lock_guard lock(_mutex); auto it = _builder_fragment_ids.find(my_node_id); if (it == _builder_fragment_ids.cend()) { - _builder_fragment_ids[my_node_id] = state->fragment_instance_id(); + _builder_fragment_ids.insert({my_node_id, fragment_instance_id}); return true; } return false; } -bool SharedHashTableController::supposed_to_build_hash_table(RuntimeState* state, int my_node_id) { +SharedHashTableContextPtr SharedHashTableController::get_context(int my_node_id) { std::lock_guard lock(_mutex); - auto it = _builder_fragment_ids.find(my_node_id); - if (it != _builder_fragment_ids.cend()) { - return _builder_fragment_ids[my_node_id] == state->fragment_instance_id(); + auto it = _shared_contexts.find(my_node_id); + if (it == _shared_contexts.cend()) { + _shared_contexts.insert({my_node_id, std::make_shared()}); } - return false; + return _shared_contexts[my_node_id]; } -void SharedHashTableController::put_hash_table(SharedHashTableEntry&& entry, int my_node_id) { +void SharedHashTableController::signal(int my_node_id) { std::lock_guard lock(_mutex); - DCHECK(_hash_table_entries.find(my_node_id) == _hash_table_entries.cend()); - _hash_table_entries.insert({my_node_id, std::move(entry)}); + auto it = _shared_contexts.find(my_node_id); + if (it != _shared_contexts.cend()) { + it->second->signaled = true; + _shared_contexts.erase(it); + } _cv.notify_all(); } -SharedHashTableEntry& SharedHashTableController::wait_for_hash_table(int my_node_id) { - std::unique_lock lock(_mutex); - auto it = _hash_table_entries.find(my_node_id); - if (it == _hash_table_entries.cend()) { - _cv.wait(lock, [this, &it, my_node_id]() { - it = _hash_table_entries.find(my_node_id); - return it != _hash_table_entries.cend(); - }); +TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int my_node_id) { + std::lock_guard lock(_mutex); + auto it = _builder_fragment_ids.find(my_node_id); + if (it == _builder_fragment_ids.cend()) { + return TUniqueId {}; } return it->second; } -void SharedHashTableController::acquire_ref_count(RuntimeState* state, int my_node_id) { +Status SharedHashTableController::wait_for_signal(RuntimeState* state, + const SharedHashTableContextPtr& context) { std::unique_lock lock(_mutex); - _ref_fragments[my_node_id].emplace_back(state->fragment_instance_id()); -} - -Status SharedHashTableController::release_ref_count(RuntimeState* state, int my_node_id) { - std::unique_lock lock(_mutex); - auto id = state->fragment_instance_id(); - auto it = std::find(_ref_fragments[my_node_id].begin(), _ref_fragments[my_node_id].end(), id); - CHECK(it != _ref_fragments[my_node_id].end()); - _ref_fragments[my_node_id].erase(it); - _put_an_empty_entry_if_need(Status::Cancelled("hash table not build"), id, my_node_id); - _cv.notify_all(); - return Status::OK(); -} - -void SharedHashTableController::_put_an_empty_entry_if_need(Status status, TUniqueId fragment_id, - int node_id) { - auto builder_it = _builder_fragment_ids.find(node_id); - if (builder_it != _builder_fragment_ids.end()) { - if (builder_it->second == fragment_id) { - if (_hash_table_entries.find(builder_it->first) == _hash_table_entries.cend()) { - // "here put an empty SharedHashTableEntry to avoid deadlocking" - _hash_table_entries.insert( - {builder_it->first, SharedHashTableEntry::empty_entry_with_status(status)}); - } - } + // maybe builder signaled before other instances waiting, + // so here need to check value of `signaled` + while (!context->signaled) { + _cv.wait_for(lock, std::chrono::milliseconds(400), [&]() { return context->signaled; }); + // return if the instances is cancelled(eg. query timeout) + RETURN_IF_CANCELLED(state); } -} - -Status SharedHashTableController::release_ref_count_if_need(TUniqueId fragment_id, Status status) { - std::unique_lock lock(_mutex); - bool need_to_notify = false; - for (auto& ref : _ref_fragments) { - auto it = std::find(ref.second.begin(), ref.second.end(), fragment_id); - if (it == ref.second.end()) { - continue; - } - ref.second.erase(it); - need_to_notify = true; - LOG(INFO) << "release_ref_count in node: " << ref.first - << " for fragment id: " << fragment_id; - } - - for (auto& builder : _builder_fragment_ids) { - if (builder.second == fragment_id) { - if (_hash_table_entries.find(builder.first) == _hash_table_entries.cend()) { - _hash_table_entries.insert( - {builder.first, SharedHashTableEntry::empty_entry_with_status(status)}); - } - } - } - - if (need_to_notify) { - _cv.notify_all(); - } - return Status::OK(); -} - -Status SharedHashTableController::wait_for_closable(RuntimeState* state, int my_node_id) { - std::unique_lock lock(_mutex); - if (!_ref_fragments[my_node_id].empty()) { - _cv.wait(lock, [&]() { return _ref_fragments[my_node_id].empty(); }); - } - RETURN_IF_CANCELLED(state); - return Status::OK(); + return context->status; } } // namespace vectorized -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index 9836c3ec3f..d7d50570db 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -29,70 +29,50 @@ namespace doris { class RuntimeState; class TUniqueId; - -template -class RuntimeFilterSlotsBase; +class MinMaxFuncBase; +class HybridSetBase; +class BloomFilterFuncBase; namespace vectorized { class VExprContext; -struct SharedHashTableEntry { - SharedHashTableEntry(Status status_, void* hash_table_ptr_, std::vector* blocks_, - RuntimeFilterSlotsBase* runtime_filter_slots_) - : status(status_), - hash_table_ptr(hash_table_ptr_), - blocks(blocks_), - runtime_filter_slots(runtime_filter_slots_) {} - SharedHashTableEntry(SharedHashTableEntry&& entry) - : status(entry.status), - hash_table_ptr(entry.hash_table_ptr), - blocks(entry.blocks), - runtime_filter_slots(entry.runtime_filter_slots) {} +struct SharedRuntimeFilterContext { + std::shared_ptr minmax_func; + std::shared_ptr hybrid_set; + std::shared_ptr bloom_filter_func; +}; - static SharedHashTableEntry empty_entry_with_status(const Status& status) { - return SharedHashTableEntry(status, nullptr, nullptr, nullptr); - } +struct SharedHashTableContext { + SharedHashTableContext() + : hash_table_variants(nullptr), + signaled(false), + short_circuit_for_null_in_probe_side(false) {} Status status; - void* hash_table_ptr; - std::vector* blocks; - RuntimeFilterSlotsBase* runtime_filter_slots; + std::shared_ptr arena; + std::shared_ptr hash_table_variants; + std::shared_ptr> blocks; + std::map runtime_filters; + bool signaled; + bool short_circuit_for_null_in_probe_side; }; +using SharedHashTableContextPtr = std::shared_ptr; + class SharedHashTableController { public: - bool should_build_hash_table(RuntimeState* state, int my_node_id); - bool supposed_to_build_hash_table(RuntimeState* state, int my_node_id); - void acquire_ref_count(RuntimeState* state, int my_node_id); - SharedHashTableEntry& wait_for_hash_table(int my_node_id); - Status release_ref_count(RuntimeState* state, int my_node_id); - Status release_ref_count_if_need(TUniqueId fragment_id, Status status); - void put_hash_table(SharedHashTableEntry&& entry, int my_node_id); - Status wait_for_closable(RuntimeState* state, int my_node_id); - - // Single-thread operation - void set_short_circuit_for_null_in_probe_side(bool short_circuit_for_null_in_probe_side) { - _short_circuit_for_null_in_probe_side = short_circuit_for_null_in_probe_side; - } - - bool short_circuit_for_null_in_probe_side() const { - return _short_circuit_for_null_in_probe_side; - } - -private: - // If the fragment instance was supposed to build hash table, but it didn't build. - // To avoid deadlocking other fragment instances, - // here need to put an empty SharedHashTableEntry with canceled status. - void _put_an_empty_entry_if_need(Status status, TUniqueId fragment_id, int node_id); + TUniqueId get_builder_fragment_instance_id(int my_node_id); + SharedHashTableContextPtr get_context(int my_node_id); + void signal(int my_node_id); + Status wait_for_signal(RuntimeState* state, const SharedHashTableContextPtr& context); + bool should_build_hash_table(const TUniqueId& fragment_instance_id, int my_node_id); private: std::mutex _mutex; std::condition_variable _cv; - std::map _builder_fragment_ids; - std::map _hash_table_entries; - std::map> _ref_fragments; - bool _short_circuit_for_null_in_probe_side; + std::map _builder_fragment_ids; + std::map _shared_contexts; }; } // namespace vectorized diff --git a/be/src/vec/runtime/shared_hashtable_controller.cpp b/be/src/vec/runtime/shared_hashtable_controller.cpp deleted file mode 100644 index a761da8c2e..0000000000 --- a/be/src/vec/runtime/shared_hashtable_controller.cpp +++ /dev/null @@ -1,95 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "shared_hashtable_controller.h" - -#include - -namespace doris { -namespace vectorized { - -bool SharedHashTableController::should_build_hash_table(RuntimeState* state, int my_node_id) { - std::lock_guard lock(_mutex); - auto it = _builder_fragment_ids.find(my_node_id); - if (it == _builder_fragment_ids.cend()) { - _builder_fragment_ids[my_node_id] = state->fragment_instance_id(); - return true; - } - return false; -} - -void SharedHashTableController::put_hash_table(SharedHashTableEntry&& entry, int my_node_id) { - std::lock_guard lock(_mutex); - DCHECK(_hash_table_entries.find(my_node_id) == _hash_table_entries.cend()); - _hash_table_entries.insert({my_node_id, std::move(entry)}); - _cv.notify_all(); -} - -SharedHashTableEntry& SharedHashTableController::wait_for_hash_table(int my_node_id) { - std::unique_lock lock(_mutex); - auto it = _hash_table_entries.find(my_node_id); - if (it == _hash_table_entries.cend()) { - _cv.wait(lock, [this, &it, my_node_id]() { - it = _hash_table_entries.find(my_node_id); - return it != _hash_table_entries.cend(); - }); - } - return it->second; -} - -void SharedHashTableController::acquire_ref_count(RuntimeState* state, int my_node_id) { - std::unique_lock lock(_mutex); - _ref_fragments[my_node_id].emplace_back(state->fragment_instance_id()); -} - -Status SharedHashTableController::release_ref_count(RuntimeState* state, int my_node_id) { - std::unique_lock lock(_mutex); - RETURN_IF_CANCELLED(state); - auto id = state->fragment_instance_id(); - auto it = std::find(_ref_fragments[my_node_id].begin(), _ref_fragments[my_node_id].end(), id); - CHECK(it != _ref_fragments[my_node_id].end()); - _ref_fragments[my_node_id].erase(it); - _cv.notify_all(); - return Status::OK(); -} - -Status SharedHashTableController::release_ref_count_if_need(TUniqueId fragment_id) { - std::unique_lock lock(_mutex); - bool need_to_notify = false; - for (auto& ref : _ref_fragments) { - auto it = std::find(ref.second.begin(), ref.second.end(), fragment_id); - if (it == ref.second.end()) continue; - ref.second.erase(it); - need_to_notify = true; - LOG(INFO) << "release_ref_count in node: " << ref.first - << " for fragment id: " << fragment_id; - } - if (need_to_notify) _cv.notify_all(); - return Status::OK(); -} - -Status SharedHashTableController::wait_for_closable(RuntimeState* state, int my_node_id) { - std::unique_lock lock(_mutex); - RETURN_IF_CANCELLED(state); - if (!_ref_fragments[my_node_id].empty()) { - _cv.wait(lock, [&]() { return _ref_fragments[my_node_id].empty(); }); - } - return Status::OK(); -} - -} // namespace vectorized -} // namespace doris \ No newline at end of file diff --git a/be/src/vec/runtime/shared_hashtable_controller.h b/be/src/vec/runtime/shared_hashtable_controller.h deleted file mode 100644 index 842dc89e90..0000000000 --- a/be/src/vec/runtime/shared_hashtable_controller.h +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include -#include - -#include "vec/core/block.h" - -namespace doris { - -class RuntimeState; -class TUniqueId; - -namespace vectorized { - -class VExprContext; - -struct SharedHashTableEntry { - SharedHashTableEntry(void* hash_table_ptr_, std::vector& blocks_, - std::unordered_map>& inserted_rows_, - const std::vector& exprs) - : hash_table_ptr(hash_table_ptr_), - blocks(blocks_), - inserted_rows(inserted_rows_), - build_exprs(exprs) {} - SharedHashTableEntry(SharedHashTableEntry&& entry) - : hash_table_ptr(entry.hash_table_ptr), - blocks(entry.blocks), - inserted_rows(entry.inserted_rows), - build_exprs(entry.build_exprs) {} - void* hash_table_ptr; - std::vector& blocks; - std::unordered_map>& inserted_rows; - std::vector build_exprs; -}; - -class SharedHashTableController { -public: - bool should_build_hash_table(RuntimeState* state, int my_node_id); - void acquire_ref_count(RuntimeState* state, int my_node_id); - SharedHashTableEntry& wait_for_hash_table(int my_node_id); - Status release_ref_count(RuntimeState* state, int my_node_id); - Status release_ref_count_if_need(TUniqueId fragment_id); - void put_hash_table(SharedHashTableEntry&& entry, int my_node_id); - Status wait_for_closable(RuntimeState* state, int my_node_id); - -private: - std::mutex _mutex; - std::condition_variable _cv; - std::map _builder_fragment_ids; - std::map _hash_table_entries; - std::map> _ref_fragments; -}; - -} // namespace vectorized -} // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index c8622de800..3bcf3ed0fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -231,6 +231,9 @@ public class SessionVariable implements Serializable, Writable { public static final String PARTITIONED_HASH_JOIN_ROWS_THRESHOLD = "partitioned_hash_join_rows_threshold"; + public static final String ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN + = "enable_share_hash_table_for_broadcast_join"; + // session origin value public Map sessionOriginValue = new HashMap(); // check stmt is or not [select /*+ SET_VAR(...)*/ ...] @@ -607,6 +610,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = PARTITIONED_HASH_JOIN_ROWS_THRESHOLD) public int partitionedHashJoinRowsThreshold = 0; + @VariableMgr.VarAttr(name = ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN) + public boolean enableShareHashTableForBroadcastJoin = true; + // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables, // not the default value set in the code. public void initFuzzyModeVariables() { @@ -1227,6 +1233,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setBeExecVersion(Config.be_exec_version); tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary); tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery); + tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin); tResult.setBatchSize(batchSize); tResult.setDisableStreamPreaggregations(disableStreamPreaggregations); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index b725ea9191..47a9144269 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -181,6 +181,8 @@ struct TQueryOptions { 52: optional i32 be_exec_version = 0 53: optional i32 partitioned_hash_join_rows_threshold = 0 + + 54: optional bool enable_share_hash_table_for_broadcast_join }