From 9103ded1dd8b482ffc8a9f54b9e90fa82402b3a9 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Thu, 24 Nov 2022 21:06:44 +0800 Subject: [PATCH] [improvement](join)optimize sharing hash table for broadcast join (#14371) This PR is to make sharing hash table for broadcast more robust: Add a session variable to enable/disable this function. Do not block the hash join node's close function. Use shared pointer to share hash table and runtime filter in broadcast join nodes. The Hash join node that doesn't need to build the hash table will close the right child without reading any data(the child will close the corresponding sender). --- be/src/exec/hash_join_node.cpp | 2 +- be/src/exec/olap_scan_node.cpp | 2 +- be/src/exprs/runtime_filter.cpp | 169 +++++------ be/src/exprs/runtime_filter.h | 4 +- be/src/exprs/runtime_filter_slots.h | 22 +- be/src/runtime/fragment_mgr.cpp | 41 +-- be/src/runtime/fragment_mgr.h | 4 - be/src/runtime/query_fragments_ctx.h | 6 +- be/src/runtime/runtime_filter_mgr.cpp | 5 +- be/src/runtime/runtime_filter_mgr.h | 4 +- be/src/runtime/runtime_state.h | 5 + .../exec/join/process_hash_table_probe_impl.h | 32 +-- be/src/vec/exec/join/vhash_join_node.cpp | 266 ++++++++++-------- be/src/vec/exec/join/vhash_join_node.h | 23 +- be/src/vec/exec/scan/vscan_node.cpp | 2 +- .../runtime/shared_hash_table_controller.cpp | 112 ++------ .../runtime/shared_hash_table_controller.h | 76 ++--- .../runtime/shared_hashtable_controller.cpp | 95 ------- .../vec/runtime/shared_hashtable_controller.h | 75 ----- .../org/apache/doris/qe/SessionVariable.java | 7 + gensrc/thrift/PaloInternalService.thrift | 2 + 21 files changed, 366 insertions(+), 588 deletions(-) delete mode 100644 be/src/vec/runtime/shared_hashtable_controller.cpp delete mode 100644 be/src/vec/runtime/shared_hashtable_controller.h diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index da0fc10ea7..471801abca 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -92,7 +92,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _runtime_filters.resize(_runtime_filter_descs.size()); for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( + RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter( RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options())); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter( _runtime_filter_descs[i].filter_id, &_runtime_filters[i])); diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 60a8c3b2bc..5bf924d1c1 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -88,7 +88,7 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { for (int i = 0; i < filter_size; ++i) { IRuntimeFilter* runtime_filter = nullptr; const auto& filter_desc = _runtime_filter_descs[i]; - RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( + RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter( RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id())); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter)); diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index f92869b5f4..c9b3a3e2a1 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -42,6 +42,7 @@ #include "vec/exprs/vexpr.h" #include "vec/exprs/vliteral.h" #include "vec/exprs/vruntimefilter_wrapper.h" +#include "vec/runtime/shared_hash_table_controller.h" namespace doris { // PrimitiveType->TExprNodeType @@ -431,23 +432,25 @@ public: _max_in_num = params->max_in_num; switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - _hybrid_set.reset(create_set(_column_return_type, _state->enable_vectorized_exec())); + _context.hybrid_set.reset( + create_set(_column_return_type, _state->enable_vectorized_exec())); break; } case RuntimeFilterType::MINMAX_FILTER: { - _minmax_func.reset(create_minmax_filter(_column_return_type)); + _context.minmax_func.reset(create_minmax_filter(_column_return_type)); break; } case RuntimeFilterType::BLOOM_FILTER: { _is_bloomfilter = true; - _bloomfilter_func.reset(create_bloom_filter(_column_return_type)); - _bloomfilter_func->set_length(params->bloom_filter_size); + _context.bloom_filter_func.reset(create_bloom_filter(_column_return_type)); + _context.bloom_filter_func->set_length(params->bloom_filter_size); return Status::OK(); } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { - _hybrid_set.reset(create_set(_column_return_type, _state->enable_vectorized_exec())); - _bloomfilter_func.reset(create_bloom_filter(_column_return_type)); - _bloomfilter_func->set_length(params->bloom_filter_size); + _context.hybrid_set.reset( + create_set(_column_return_type, _state->enable_vectorized_exec())); + _context.bloom_filter_func.reset(create_bloom_filter(_column_return_type)); + _context.bloom_filter_func->set_length(params->bloom_filter_size); return Status::OK(); } default: @@ -461,14 +464,15 @@ public: << "Can not change to bloom filter because of runtime filter type is " << to_string(_filter_type); _is_bloomfilter = true; - insert_to_bloom_filter(_bloomfilter_func.get()); + insert_to_bloom_filter(_context.bloom_filter_func.get()); // release in filter - _hybrid_set.reset(create_set(_column_return_type, _state->enable_vectorized_exec())); + _context.hybrid_set.reset( + create_set(_column_return_type, _state->enable_vectorized_exec())); } void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const { - if (_hybrid_set->size() > 0) { - auto it = _hybrid_set->begin(); + if (_context.hybrid_set->size() > 0) { + auto it = _context.hybrid_set->begin(); if (_use_batch) { while (it->has_next()) { @@ -484,7 +488,7 @@ public: } } - BloomFilterFuncBase* get_bloomfilter() const { return _bloomfilter_func.get(); } + BloomFilterFuncBase* get_bloomfilter() const { return _context.bloom_filter_func.get(); } void insert(const void* data) { switch (_filter_type) { @@ -492,22 +496,22 @@ public: if (_is_ignored_in_filter) { break; } - _hybrid_set->insert(data); + _context.hybrid_set->insert(data); break; } case RuntimeFilterType::MINMAX_FILTER: { - _minmax_func->insert(data); + _context.minmax_func->insert(data); break; } case RuntimeFilterType::BLOOM_FILTER: { - _bloomfilter_func->insert(data); + _context.bloom_filter_func->insert(data); break; } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { if (_is_bloomfilter) { - _bloomfilter_func->insert(data); + _context.bloom_filter_func->insert(data); } else { - _hybrid_set->insert(data); + _context.hybrid_set->insert(data); } break; } @@ -523,22 +527,22 @@ public: if (_is_ignored_in_filter) { break; } - _hybrid_set->insert_fixed_len(data, offsets, number); + _context.hybrid_set->insert_fixed_len(data, offsets, number); break; } case RuntimeFilterType::MINMAX_FILTER: { - _minmax_func->insert_fixed_len(data, offsets, number); + _context.minmax_func->insert_fixed_len(data, offsets, number); break; } case RuntimeFilterType::BLOOM_FILTER: { - _bloomfilter_func->insert_fixed_len(data, offsets, number); + _context.bloom_filter_func->insert_fixed_len(data, offsets, number); break; } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { if (_is_bloomfilter) { - _bloomfilter_func->insert_fixed_len(data, offsets, number); + _context.bloom_filter_func->insert_fixed_len(data, offsets, number); } else { - _hybrid_set->insert_fixed_len(data, offsets, number); + _context.hybrid_set->insert_fixed_len(data, offsets, number); } break; } @@ -618,18 +622,18 @@ public: _is_ignored_in_filter = true; _ignored_in_filter_msg = wrapper->_ignored_in_filter_msg; // release in filter - _hybrid_set.reset( + _context.hybrid_set.reset( create_set(_column_return_type, _state->enable_vectorized_exec())); break; } // try insert set - _hybrid_set->insert(wrapper->_hybrid_set.get()); - if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) { + _context.hybrid_set->insert(wrapper->_context.hybrid_set.get()); + if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) { #ifdef VLOG_DEBUG_IS_ON std::stringstream msg; msg << "fragment instance " << _fragment_instance_id.to_string() << " ignore merge runtime filter(in filter id " << _filter_id - << ") because: in_num(" << _hybrid_set->size() << ") >= max_in_num(" + << ") because: in_num(" << _context.hybrid_set->size() << ") >= max_in_num(" << _max_in_num << ")"; _ignored_in_filter_msg = _pool->add(new std::string(msg.str())); #else @@ -638,17 +642,17 @@ public: _is_ignored_in_filter = true; // release in filter - _hybrid_set.reset( + _context.hybrid_set.reset( create_set(_column_return_type, _state->enable_vectorized_exec())); } break; } case RuntimeFilterType::MINMAX_FILTER: { - _minmax_func->merge(wrapper->_minmax_func.get(), _pool); + _context.minmax_func->merge(wrapper->_context.minmax_func.get(), _pool); break; } case RuntimeFilterType::BLOOM_FILTER: { - _bloomfilter_func->merge(wrapper->_bloomfilter_func.get()); + _context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get()); break; } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { @@ -661,11 +665,11 @@ public: << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " << *(wrapper->get_ignored_in_filter_msg()); - _hybrid_set->insert(wrapper->_hybrid_set.get()); - if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) { + _context.hybrid_set->insert(wrapper->_context.hybrid_set.get()); + if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) { VLOG_DEBUG << "fragment instance " << _fragment_instance_id.to_string() << " change runtime filter to bloom filter(id=" << _filter_id - << ") because: in_num(" << _hybrid_set->size() + << ") because: in_num(" << _context.hybrid_set->size() << ") >= max_in_num(" << _max_in_num << ")"; change_to_bloom_filter(); } @@ -675,7 +679,7 @@ public: << " change runtime filter to bloom filter(id=" << _filter_id << ") because: already exist a bloom filter"; change_to_bloom_filter(); - _bloomfilter_func->merge(wrapper->_bloomfilter_func.get()); + _context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get()); } } else { if (wrapper->_filter_type == @@ -685,10 +689,10 @@ public: << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " << *(wrapper->get_ignored_in_filter_msg()); - wrapper->insert_to_bloom_filter(_bloomfilter_func.get()); + wrapper->insert_to_bloom_filter(_context.bloom_filter_func.get()); // bloom filter merge bloom filter } else { - _bloomfilter_func->merge(wrapper->_bloomfilter_func.get()); + _context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get()); } } break; @@ -709,7 +713,7 @@ public: _ignored_in_filter_msg = _pool->add(new std::string(in_filter->ignored_msg())); return Status::OK(); } - _hybrid_set.reset(create_set(type, _state->enable_vectorized_exec())); + _context.hybrid_set.reset(create_set(type, _state->enable_vectorized_exec())); switch (type) { case TYPE_BOOLEAN: { batch_assign(in_filter, [](std::shared_ptr& set, PColumnValue& column, @@ -880,40 +884,40 @@ public: _is_bloomfilter = true; // we won't use this class to insert or find any data // so any type is ok - _bloomfilter_func.reset(create_bloom_filter(PrimitiveType::TYPE_INT)); - return _bloomfilter_func->assign(data, bloom_filter->filter_length()); + _context.bloom_filter_func.reset(create_bloom_filter(PrimitiveType::TYPE_INT)); + return _context.bloom_filter_func->assign(data, bloom_filter->filter_length()); } // used by shuffle runtime filter // assign this filter by protobuf Status assign(const PMinMaxFilter* minmax_filter) { PrimitiveType type = to_primitive_type(minmax_filter->column_type()); - _minmax_func.reset(create_minmax_filter(type)); + _context.minmax_func.reset(create_minmax_filter(type)); switch (type) { case TYPE_BOOLEAN: { bool min_val = minmax_filter->min_val().boolval(); bool max_val = minmax_filter->max_val().boolval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_TINYINT: { int8_t min_val = static_cast(minmax_filter->min_val().intval()); int8_t max_val = static_cast(minmax_filter->max_val().intval()); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_SMALLINT: { int16_t min_val = static_cast(minmax_filter->min_val().intval()); int16_t max_val = static_cast(minmax_filter->max_val().intval()); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_INT: { int32_t min_val = minmax_filter->min_val().intval(); int32_t max_val = minmax_filter->max_val().intval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_BIGINT: { int64_t min_val = minmax_filter->min_val().longval(); int64_t max_val = minmax_filter->max_val().longval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_LARGEINT: { auto min_string_val = minmax_filter->min_val().stringval(); @@ -925,27 +929,27 @@ public: int128_t max_val = StringParser::string_to_int( max_string_val.c_str(), max_string_val.length(), &result); DCHECK(result == StringParser::PARSE_SUCCESS); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_FLOAT: { float min_val = static_cast(minmax_filter->min_val().doubleval()); float max_val = static_cast(minmax_filter->max_val().doubleval()); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DOUBLE: { double min_val = static_cast(minmax_filter->min_val().doubleval()); double max_val = static_cast(minmax_filter->max_val().doubleval()); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DATEV2: { int32_t min_val = minmax_filter->min_val().intval(); int32_t max_val = minmax_filter->max_val().intval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DATETIMEV2: { int64_t min_val = minmax_filter->min_val().longval(); int64_t max_val = minmax_filter->max_val().longval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DATETIME: case TYPE_DATE: { @@ -955,24 +959,24 @@ public: DateTimeValue max_val; min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length()); max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length()); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMALV2: { auto& min_val_ref = minmax_filter->min_val().stringval(); auto& max_val_ref = minmax_filter->max_val().stringval(); DecimalV2Value min_val(min_val_ref); DecimalV2Value max_val(max_val_ref); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMAL32: { int32_t min_val = minmax_filter->min_val().intval(); int32_t max_val = minmax_filter->max_val().intval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMAL64: { int64_t min_val = minmax_filter->min_val().longval(); int64_t max_val = minmax_filter->max_val().longval(); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_DECIMAL128I: { auto min_string_val = minmax_filter->min_val().stringval(); @@ -984,7 +988,7 @@ public: int128_t max_val = StringParser::string_to_int( max_string_val.c_str(), max_string_val.length(), &result); DCHECK(result == StringParser::PARSE_SUCCESS); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } case TYPE_VARCHAR: case TYPE_CHAR: @@ -995,7 +999,7 @@ public: auto max_val_ptr = _pool->add(new std::string(max_val_ref)); StringValue min_val(const_cast(min_val_ptr->c_str()), min_val_ptr->length()); StringValue max_val(const_cast(max_val_ptr->c_str()), max_val_ptr->length()); - return _minmax_func->assign(&min_val, &max_val); + return _context.minmax_func->assign(&min_val, &max_val); } default: DCHECK(false) << "unknown type"; @@ -1005,17 +1009,17 @@ public: } Status get_in_filter_iterator(HybridSetBase::IteratorBase** it) { - *it = _hybrid_set->begin(); + *it = _context.hybrid_set->begin(); return Status::OK(); } Status get_bloom_filter_desc(char** data, int* filter_length) { - return _bloomfilter_func->get_data(data, filter_length); + return _context.bloom_filter_func->get_data(data, filter_length); } Status get_minmax_filter_desc(void** min_data, void** max_data) { - *min_data = _minmax_func->get_min(); - *max_data = _minmax_func->get_max(); + *min_data = _context.minmax_func->get_min(); + *max_data = _context.minmax_func->get_max(); return Status::OK(); } @@ -1027,13 +1031,13 @@ public: case TYPE_VARCHAR: case TYPE_CHAR: case TYPE_STRING: { - StringValue* min_value = static_cast(_minmax_func->get_min()); - StringValue* max_value = static_cast(_minmax_func->get_max()); + StringValue* min_value = static_cast(_context.minmax_func->get_min()); + StringValue* max_value = static_cast(_context.minmax_func->get_max()); auto min_val_ptr = _pool->add(new std::string(min_value->ptr)); auto max_val_ptr = _pool->add(new std::string(max_value->ptr)); StringValue min_val(const_cast(min_val_ptr->c_str()), min_val_ptr->length()); StringValue max_val(const_cast(max_val_ptr->c_str()), max_val_ptr->length()); - _minmax_func->assign(&min_val, &max_val); + _context.minmax_func->assign(&min_val, &max_val); } default: break; @@ -1052,11 +1056,11 @@ public: PColumnValue&, ObjectPool*)) { for (int i = 0; i < filter->values_size(); ++i) { PColumnValue column = filter->values(i); - assign_func(_hybrid_set, column, _pool); + assign_func(_context.hybrid_set, column, _pool); } } - size_t get_in_filter_size() const { return _hybrid_set->size(); } + size_t get_in_filter_size() const { return _context.hybrid_set->size(); } friend class IRuntimeFilter; @@ -1068,9 +1072,8 @@ private: PrimitiveType _column_return_type; // column type RuntimeFilterType _filter_type; int32_t _max_in_num = -1; - std::shared_ptr _minmax_func; - std::shared_ptr _hybrid_set; - std::shared_ptr _bloomfilter_func; + + vectorized::SharedRuntimeFilterContext _context; bool _is_bloomfilter = false; bool _is_ignored_in_filter = false; std::string* _ignored_in_filter_msg = nullptr; @@ -1090,12 +1093,12 @@ Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRunt return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id); } -Status IRuntimeFilter::apply_from_other(IRuntimeFilter* other) { - _wrapper->_hybrid_set = other->_wrapper->_hybrid_set; - _wrapper->_bloomfilter_func = other->_wrapper->_bloomfilter_func; - _wrapper->_minmax_func = other->_wrapper->_minmax_func; - _wrapper->_filter_type = other->_wrapper->_filter_type; - _runtime_filter_type = other->_runtime_filter_type; +void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) { + context = _wrapper->_context; +} + +Status IRuntimeFilter::copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context) { + _wrapper->_context = context; return Status::OK(); } @@ -1736,7 +1739,7 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta node.__isset.vector_opcode = true; node.__set_vector_opcode(to_in_opcode(_column_return_type)); auto in_pred = _pool->add(new InPredicate(node)); - RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.get())); + RETURN_IF_ERROR(in_pred->prepare(state, _context.hybrid_set.get())); in_pred->add_child(Expr::copy(_pool, prob_expr->root())); ExprContext* ctx = _pool->add(new ExprContext(in_pred)); container->push_back(ctx); @@ -1748,7 +1751,8 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta Expr* max_literal = nullptr; auto max_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::LE); RETURN_IF_ERROR(create_literal(_pool, prob_expr->root()->type(), - _minmax_func->get_max(), (void**)&max_literal)); + _context.minmax_func->get_max(), + (void**)&max_literal)); max_pred->add_child(Expr::copy(_pool, prob_expr->root())); max_pred->add_child(max_literal); container->push_back(_pool->add(new ExprContext(max_pred))); @@ -1756,7 +1760,8 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta Expr* min_literal = nullptr; auto min_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::GE); RETURN_IF_ERROR(create_literal(_pool, prob_expr->root()->type(), - _minmax_func->get_min(), (void**)&min_literal)); + _context.minmax_func->get_min(), + (void**)&min_literal)); min_pred->add_child(Expr::copy(_pool, prob_expr->root())); min_pred->add_child(min_literal); container->push_back(_pool->add(new ExprContext(min_pred))); @@ -1772,7 +1777,7 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta node.__isset.vector_opcode = true; node.__set_vector_opcode(to_in_opcode(_column_return_type)); auto bloom_pred = _pool->add(new BloomFilterPredicate(node)); - RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func)); + RETURN_IF_ERROR(bloom_pred->prepare(state, _context.bloom_filter_func)); bloom_pred->add_child(Expr::copy(_pool, prob_expr->root())); ExprContext* ctx = _pool->add(new ExprContext(bloom_pred)); container->push_back(ctx); @@ -1811,7 +1816,7 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vectoradd(new vectorized::VDirectInPredicate(node)); - in_pred->set_filter(_hybrid_set); + in_pred->set_filter(_context.hybrid_set); auto cloned_vexpr = vprob_expr->root()->clone(_pool); in_pred->add_child(cloned_vexpr); auto wrapper = _pool->add(new vectorized::VRuntimeFilterWrapper(node, in_pred)); @@ -1827,7 +1832,8 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector(_pool, vprob_expr->root()->type(), - _minmax_func->get_max(), (void**)&max_literal)); + _context.minmax_func->get_max(), + (void**)&max_literal)); auto cloned_vexpr = vprob_expr->root()->clone(_pool); max_pred->add_child(cloned_vexpr); max_pred->add_child(max_literal); @@ -1841,7 +1847,8 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector(_pool, vprob_expr->root()->type(), - _minmax_func->get_min(), (void**)&min_literal)); + _context.minmax_func->get_min(), + (void**)&min_literal)); cloned_vexpr = vprob_expr->root()->clone(_pool); min_pred->add_child(cloned_vexpr); min_pred->add_child(min_literal); @@ -1861,7 +1868,7 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vectoradd(new vectorized::VBloomPredicate(node)); - bloom_pred->set_filter(_bloomfilter_func); + bloom_pred->set_filter(_context.bloom_filter_func); auto cloned_vexpr = vprob_expr->root()->clone(_pool); bloom_pred->add_child(cloned_vexpr); auto wrapper = _pool->add(new vectorized::VRuntimeFilterWrapper(node, bloom_pred)); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index e84da75a5d..89c82abfef 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -47,6 +47,7 @@ class BloomFilterFuncBase; namespace vectorized { class VExpr; class VExprContext; +struct SharedRuntimeFilterContext; } // namespace vectorized enum class RuntimeFilterType { @@ -141,7 +142,8 @@ public: const TQueryOptions* query_options, const RuntimeFilterRole role, int node_id, IRuntimeFilter** res); - Status apply_from_other(IRuntimeFilter* other); + void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context); + Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context); // insert data to build filter // only used for producer diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index b13fa69625..c6fa1d2381 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -23,6 +23,7 @@ #include "vec/columns/column_nullable.h" #include "vec/columns/columns_number.h" #include "vec/common/assert_cast.h" +#include "vec/runtime/shared_hash_table_controller.h" namespace doris { // this class used in a hash join node @@ -229,19 +230,24 @@ public: } } - Status apply_from_other(RuntimeFilterSlotsBase* other) { + void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) { + for (auto& it : _runtime_filters) { + for (auto& filter : it.second) { + auto& target = context->runtime_filters[filter->filter_id()]; + filter->copy_to_shared_context(target); + } + } + } + + Status copy_from_shared_context(vectorized::SharedHashTableContextPtr& context) { for (auto& it : _runtime_filters) { - auto& other_filters = other->_runtime_filters[it.first]; for (auto& filter : it.second) { auto filter_id = filter->filter_id(); - auto ret = std::find_if(other_filters.begin(), other_filters.end(), - [&](IRuntimeFilter* other_filter) { - return other_filter->filter_id() == filter_id; - }); - if (ret == other_filters.end()) { + auto ret = context->runtime_filters.find(filter_id); + if (ret == context->runtime_filters.end()) { return Status::Aborted("invalid runtime filter id: {}", filter_id); } - filter->apply_from_other(*ret); + filter->copy_from_shared_context(ret->second); } } return Status::OK(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 29cdd17320..09ccb4ef23 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -240,13 +240,7 @@ Status FragmentExecState::execute() { } #ifndef BE_TEST if (_executor.runtime_state()->is_cancelled()) { - Status status = Status::Cancelled("cancelled before execution"); - _executor.runtime_state() - ->get_query_fragments_ctx() - ->get_shared_hash_table_controller() - ->release_ref_count_if_need(_executor.runtime_state()->fragment_instance_id(), - status); - return status; + return Status::Cancelled("cancelled before execution"); } #endif int64_t duration_ns = 0; @@ -254,19 +248,11 @@ Status FragmentExecState::execute() { SCOPED_RAW_TIMER(&duration_ns); CgroupsMgr::apply_system_cgroup(); opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment"); - Status status = _executor.open(); - WARN_IF_ERROR(status, + WARN_IF_ERROR(_executor.open(), strings::Substitute("Got error while opening fragment $0, query id: $1", print_id(_fragment_instance_id), print_id(_query_id))); _executor.close(); - if (!status.ok()) { - _executor.runtime_state() - ->get_query_fragments_ctx() - ->get_shared_hash_table_controller() - ->release_ref_count_if_need(_executor.runtime_state()->fragment_instance_id(), - status); - } } DorisMetrics::instance()->fragment_requests_total->increment(1); DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000); @@ -712,9 +698,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi } g_fragmentmgr_prepare_latency << (duration_ns / 1000); - _setup_shared_hashtable_for_broadcast_join(params, exec_state->executor()->runtime_state(), - fragments_ctx.get()); - std::shared_ptr handler; _runtimefilter_controller.add_entity(params, &handler, exec_state->executor()->runtime_state()); exec_state->set_merge_controller_handler(handler); @@ -784,26 +767,6 @@ void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params, #endif } -void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params, - RuntimeState* state, - QueryFragmentsCtx* fragments_ctx) { - if (!params.__isset.fragment || !params.fragment.__isset.plan || - params.fragment.plan.nodes.empty()) { - return; - } - - for (auto& node : params.fragment.plan.nodes) { - if (node.node_type != TPlanNodeType::HASH_JOIN_NODE || - !node.hash_join_node.__isset.is_broadcast_join || - !node.hash_join_node.is_broadcast_join) { - continue; - } - - std::lock_guard lock(_lock_for_shared_hash_table); - fragments_ctx->get_shared_hash_table_controller()->acquire_ref_count(state, node.node_id); - } -} - bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) { return type == TPlanNodeType::OLAP_SCAN_NODE || type == TPlanNodeType::MYSQL_SCAN_NODE || type == TPlanNodeType::SCHEMA_SCAN_NODE || type == TPlanNodeType::META_SCAN_NODE || diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 7c6a079e32..2246a42ac8 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -107,10 +107,6 @@ private: bool _is_scan_node(const TPlanNodeType::type& type); - void _setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params, - RuntimeState* state, - QueryFragmentsCtx* fragments_ctx); - // This is input params ExecEnv* _exec_env; diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 1f7c053db1..4bfff8c4da 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -100,8 +100,8 @@ public: return _ready_to_execute.load() && !_is_cancelled.load(); } - vectorized::SharedHashTableController* get_shared_hash_table_controller() { - return _shared_hash_table_controller.get(); + std::shared_ptr get_shared_hash_table_controller() { + return _shared_hash_table_controller; } public: @@ -144,7 +144,7 @@ private: std::atomic _ready_to_execute {false}; std::atomic _is_cancelled {false}; - std::unique_ptr _shared_hash_table_controller; + std::shared_ptr _shared_hash_table_controller; }; } // namespace doris diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 7bc894e1da..ce851c119a 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -78,8 +78,9 @@ Status RuntimeFilterMgr::get_producer_filter(const int filter_id, return get_filter_by_role(filter_id, RuntimeFilterRole::PRODUCER, producer_filter); } -Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc, - const TQueryOptions& options, int node_id) { +Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role, + const TRuntimeFilterDesc& desc, + const TQueryOptions& options, int node_id) { DCHECK((role == RuntimeFilterRole::CONSUMER && node_id >= 0) || role != RuntimeFilterRole::CONSUMER); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index db392bcc54..002f85c20b 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -69,8 +69,8 @@ public: Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter); // regist filter - Status regist_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc, - const TQueryOptions& options, int node_id = -1); + Status register_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc, + const TQueryOptions& options, int node_id = -1); // update filter by remote Status update_filter(const PPublishFilterRequest* request, diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index cc9d8a4831..7b8ae4d89d 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -397,6 +397,11 @@ public: bool enable_profile() const { return _query_options.is_report_success; } + bool enable_share_hash_table_for_broadcast_join() const { + return _query_options.__isset.enable_share_hash_table_for_broadcast_join && + _query_options.enable_share_hash_table_for_broadcast_join; + } + private: // Use a custom block manager for the query for testing purposes. void set_block_mgr2(const std::shared_ptr& block_mgr) { diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 7da2100784..9367c299c3 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -28,7 +28,7 @@ template ProcessHashTableProbe::ProcessHashTableProbe(HashJoinNode* join_node, int batch_size) : _join_node(join_node), _batch_size(batch_size), - _build_blocks(join_node->_build_blocks), + _build_blocks(*join_node->_build_blocks), _tuple_is_null_left_flags(join_node->_is_outer_join ? &(reinterpret_cast( *join_node->_tuple_is_null_left_flag_column) @@ -207,15 +207,14 @@ Status ProcessHashTableProbe::do_process(HashTableType& hash_table_c } int last_offset = current_offset; auto find_result = - !need_null_map_for_probe ? key_getter.find_key(*hash_table_ctx.hash_table_ptr, - probe_index, _arena) + !need_null_map_for_probe + ? key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena) : (*null_map)[probe_index] - ? decltype(key_getter.find_key(*hash_table_ctx.hash_table_ptr, - probe_index, _arena)) {nullptr, false} - : key_getter.find_key(*hash_table_ctx.hash_table_ptr, probe_index, - _arena); + ? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index, + _arena)) {nullptr, false} + : key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena); if (probe_index + PREFETCH_STEP < probe_rows) - key_getter.template prefetch(*hash_table_ctx.hash_table_ptr, + key_getter.template prefetch(hash_table_ctx.hash_table, probe_index + PREFETCH_STEP, _arena); if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || @@ -372,15 +371,14 @@ Status ProcessHashTableProbe::do_process_with_other_join_conjuncts( auto last_offset = current_offset; auto find_result = - !need_null_map_for_probe ? key_getter.find_key(*hash_table_ctx.hash_table_ptr, - probe_index, _arena) + !need_null_map_for_probe + ? key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena) : (*null_map)[probe_index] - ? decltype(key_getter.find_key(*hash_table_ctx.hash_table_ptr, - probe_index, _arena)) {nullptr, false} - : key_getter.find_key(*hash_table_ctx.hash_table_ptr, probe_index, - _arena); + ? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index, + _arena)) {nullptr, false} + : key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena); if (probe_index + PREFETCH_STEP < probe_rows) - key_getter.template prefetch(*hash_table_ctx.hash_table_ptr, + key_getter.template prefetch(hash_table_ctx.hash_table, probe_index + PREFETCH_STEP, _arena); if (find_result.is_found()) { auto& mapped = find_result.get_mapped(); @@ -627,7 +625,7 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp } }; - for (; iter != hash_table_ctx.hash_table_ptr->end() && block_size < _batch_size; ++iter) { + for (; iter != hash_table_ctx.hash_table.end() && block_size < _batch_size; ++iter) { auto& mapped = iter->get_second(); if constexpr (std::is_same_v) { if (mapped.visited) { @@ -674,7 +672,7 @@ Status ProcessHashTableProbe::process_data_in_hashtable(HashTableTyp } _tuple_is_null_left_flags->resize_fill(block_size, 1); } - *eos = iter == hash_table_ctx.hash_table_ptr->end(); + *eos = iter == hash_table_ctx.hash_table.end(); output_block->swap( mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0)); return Status::OK(); diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index da144378b1..3778b3688f 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -221,7 +221,7 @@ struct ProcessRuntimeFilterBuild { _join_node->_runtime_filter_descs)); RETURN_IF_ERROR(_join_node->_runtime_filter_slots->init( - state, hash_table_ctx.hash_table_ptr->get_size())); + state, hash_table_ctx.hash_table.get_size())); if (!_join_node->_runtime_filter_slots->empty() && !_join_node->_inserted_rows.empty()) { { @@ -250,14 +250,15 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr ? tnode.hash_join_node.hash_output_slot_ids : std::vector {}) { _runtime_filter_descs = tnode.runtime_filters; - _arena = std::make_unique(); - _hash_table_variants = std::make_unique(); + _arena = std::make_shared(); + _hash_table_variants = std::make_shared(); _process_hashtable_ctx_variants = std::make_unique(); + _build_blocks.reset(new std::vector()); // avoid vector expand change block address. // one block can store 4g data, _build_blocks can store 128*4g data. // if probe data bigger than 512g, runtime filter maybe will core dump when insert data. - _build_blocks.reserve(_MAX_BUILD_BLOCK_COUNT); + _build_blocks->reserve(_MAX_BUILD_BLOCK_COUNT); } Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { @@ -313,7 +314,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _runtime_filters.resize(_runtime_filter_descs.size()); for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( + RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter( RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options())); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter( _runtime_filter_descs[i].filter_id, &_runtime_filters[i])); @@ -349,18 +350,18 @@ Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); // Build phase - auto build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true); - runtime_profile()->add_child(build_phase_profile, false, nullptr); - _build_timer = ADD_TIMER(build_phase_profile, "BuildTime"); - _build_table_timer = ADD_TIMER(build_phase_profile, "BuildTableTime"); - _build_side_merge_block_timer = ADD_TIMER(build_phase_profile, "BuildSideMergeBlockTime"); - _build_table_insert_timer = ADD_TIMER(build_phase_profile, "BuildTableInsertTime"); - _build_expr_call_timer = ADD_TIMER(build_phase_profile, "BuildExprCallTime"); - _build_table_expanse_timer = ADD_TIMER(build_phase_profile, "BuildTableExpanseTime"); + _build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true); + runtime_profile()->add_child(_build_phase_profile, false, nullptr); + _build_timer = ADD_TIMER(_build_phase_profile, "BuildTime"); + _build_table_timer = ADD_TIMER(_build_phase_profile, "BuildTableTime"); + _build_side_merge_block_timer = ADD_TIMER(_build_phase_profile, "BuildSideMergeBlockTime"); + _build_table_insert_timer = ADD_TIMER(_build_phase_profile, "BuildTableInsertTime"); + _build_expr_call_timer = ADD_TIMER(_build_phase_profile, "BuildExprCallTime"); + _build_table_expanse_timer = ADD_TIMER(_build_phase_profile, "BuildTableExpanseTime"); _build_table_convert_timer = - ADD_TIMER(build_phase_profile, "BuildTableConvertToPartitionedTime"); - _build_rows_counter = ADD_COUNTER(build_phase_profile, "BuildRows", TUnit::UNIT); - _build_side_compute_hash_timer = ADD_TIMER(build_phase_profile, "BuildSideHashComputingTime"); + ADD_TIMER(_build_phase_profile, "BuildTableConvertToPartitionedTime"); + _build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows", TUnit::UNIT); + _build_side_compute_hash_timer = ADD_TIMER(_build_phase_profile, "BuildSideHashComputingTime"); // Probe phase auto probe_phase_profile = runtime_profile()->create_child("ProbePhase", true, true); @@ -379,8 +380,19 @@ Status HashJoinNode::prepare(RuntimeState* state) { _build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT); _build_buckets_fill_counter = ADD_COUNTER(runtime_profile(), "FilledBuckets", TUnit::UNIT); + _should_build_hash_table = true; if (_is_broadcast_join) { runtime_profile()->add_info_string("BroadcastJoin", "true"); + if (state->enable_share_hash_table_for_broadcast_join()) { + runtime_profile()->add_info_string("ShareHashTableEnabled", "true"); + _shared_hashtable_controller = + state->get_query_fragments_ctx()->get_shared_hash_table_controller(); + _shared_hash_table_context = _shared_hashtable_controller->get_context(id()); + _should_build_hash_table = _shared_hashtable_controller->should_build_hash_table( + state->fragment_instance_id(), id()); + } else { + runtime_profile()->add_info_string("ShareHashTableEnabled", "false"); + } } RETURN_IF_ERROR(VExpr::prepare(_build_expr_ctxs, state, child(1)->row_desc())); @@ -398,7 +410,6 @@ Status HashJoinNode::prepare(RuntimeState* state) { // Hash Table Init _hash_table_init(state); - _process_hashtable_ctx_variants_init(state); _construct_mutable_join_block(); return Status::OK(); @@ -417,11 +428,6 @@ Status HashJoinNode::close(RuntimeState* state) { return Status::OK(); } - if (_shared_hashtable_controller) { - _shared_hashtable_controller->release_ref_count(state, id()); - _shared_hashtable_controller->wait_for_closable(state, id()); - } - START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::close"); VExpr::close(_build_expr_ctxs, state); VExpr::close(_probe_expr_ctxs, state); @@ -610,6 +616,7 @@ Status HashJoinNode::open(RuntimeState* state) { Status HashJoinNode::_materialize_build_side(RuntimeState* state) { RETURN_IF_ERROR(child(1)->open(state)); + SCOPED_TIMER(_build_timer); MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors()); @@ -620,111 +627,128 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { // make one block for each 4 gigabytes constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; - auto should_build_hash_table = true; - if (_is_broadcast_join) { - _shared_hashtable_controller = - state->get_query_fragments_ctx()->get_shared_hash_table_controller(); - should_build_hash_table = - _shared_hashtable_controller->should_build_hash_table(state, id()); - } + if (_should_build_hash_table) { + Block block; + // If eos or have already met a null value using short-circuit strategy, we do not need to pull + // data from data. + while (!eos && !_short_circuit_for_null_in_probe_side) { + block.clear_column_data(); + RETURN_IF_CANCELLED(state); - Block block; - // If eos or have already met a null value using short-circuit strategy, we do not need to pull - // data from data. - while (!eos && !_short_circuit_for_null_in_probe_side) { - block.clear_column_data(); - RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block, &eos), + child(1)->get_next_span(), eos); - RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block, &eos), - child(1)->get_next_span(), eos); - if (!should_build_hash_table) { - continue; + _mem_used += block.allocated_bytes(); + + if (block.rows() != 0) { + SCOPED_TIMER(_build_side_merge_block_timer); + RETURN_IF_CATCH_BAD_ALLOC(mutable_block.merge(block)); + } + + if (UNLIKELY(_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE)) { + if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) { + return Status::NotSupported( + strings::Substitute("data size of right table in hash join > $0", + BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT)); + } + _build_blocks->emplace_back(mutable_block.to_block()); + // TODO:: Rethink may we should do the process after we receive all build blocks ? + // which is better. + RETURN_IF_ERROR(_process_build_block(state, (*_build_blocks)[index], index)); + + mutable_block = MutableBlock(); + ++index; + last_mem_used = _mem_used; + } } - _mem_used += block.allocated_bytes(); - - if (block.rows() != 0) { - SCOPED_TIMER(_build_side_merge_block_timer); - RETURN_IF_CATCH_BAD_ALLOC(mutable_block.merge(block)); - } - - if (UNLIKELY(_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE)) { - if (_build_blocks.size() == _MAX_BUILD_BLOCK_COUNT) { + if (!mutable_block.empty() && !_short_circuit_for_null_in_probe_side) { + if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) { return Status::NotSupported( strings::Substitute("data size of right table in hash join > $0", BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT)); } - _build_blocks.emplace_back(mutable_block.to_block()); - // TODO:: Rethink may we should do the process after we receive all build blocks ? - // which is better. - RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index)); - - mutable_block = MutableBlock(); - ++index; - last_mem_used = _mem_used; + _build_blocks->emplace_back(mutable_block.to_block()); + RETURN_IF_ERROR(_process_build_block(state, (*_build_blocks)[index], index)); } } - - if (should_build_hash_table && !mutable_block.empty() && - !_short_circuit_for_null_in_probe_side) { - if (_build_blocks.size() == _MAX_BUILD_BLOCK_COUNT) { - return Status::NotSupported( - strings::Substitute("data size of right table in hash join > $0", - BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT)); - } - _build_blocks.emplace_back(mutable_block.to_block()); - RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index)); - } child(1)->close(state); - return std::visit( - Overload {[&](std::monostate& arg) -> Status { - LOG(FATAL) << "FATAL: uninited hash table"; - __builtin_unreachable(); - }, - [&](auto&& arg) -> Status { - using HashTableCtxType = std::decay_t; - using HashTableType = typename HashTableCtxType::HashTable; - if (!should_build_hash_table) { - auto& ret = _shared_hashtable_controller->wait_for_hash_table(id()); - if (!ret.status.ok()) { - return ret.status; - } - _short_circuit_for_null_in_probe_side = - _shared_hashtable_controller - ->short_circuit_for_null_in_probe_side(); - arg.hash_table_ptr = - reinterpret_cast(ret.hash_table_ptr); - _build_blocks = *ret.blocks; - _runtime_filter_slots = _pool->add(new VRuntimeFilterSlots( - _probe_expr_ctxs, _build_expr_ctxs, _runtime_filter_descs)); - RETURN_IF_ERROR(_runtime_filter_slots->init( - state, arg.hash_table_ptr->get_size())); - RETURN_IF_ERROR(_runtime_filter_slots->apply_from_other( - ret.runtime_filter_slots)); - { - SCOPED_TIMER(_push_down_timer); - _runtime_filter_slots->publish(); - } - return Status::OK(); - } else { - arg.hash_table_ptr = &arg.hash_table; - ProcessRuntimeFilterBuild - runtime_filter_build_process(this); - auto ret = runtime_filter_build_process(state, arg); - if (_shared_hashtable_controller) { - _shared_hashtable_controller - ->set_short_circuit_for_null_in_probe_side( - _short_circuit_for_null_in_probe_side); - SharedHashTableEntry entry(ret, arg.hash_table_ptr, - &_build_blocks, _runtime_filter_slots); - _shared_hashtable_controller->put_hash_table(std::move(entry), - id()); - } - return ret; - } - }}, - *_hash_table_variants); + if (_should_build_hash_table) { + auto ret = std::visit(Overload {[&](std::monostate&) -> Status { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + }, + [&](auto&& arg) -> Status { + using HashTableCtxType = std::decay_t; + ProcessRuntimeFilterBuild + runtime_filter_build_process(this); + return runtime_filter_build_process(state, arg); + }}, + *_hash_table_variants); + if (!ret.ok()) { + if (_shared_hashtable_controller) { + _shared_hash_table_context->status = ret; + _shared_hashtable_controller->signal(id()); + } + return ret; + } + if (_shared_hashtable_controller) { + _shared_hash_table_context->status = Status::OK(); + // arena will be shared with other instances. + _shared_hash_table_context->arena = _arena; + _shared_hash_table_context->blocks = _build_blocks; + _shared_hash_table_context->hash_table_variants = _hash_table_variants; + _shared_hash_table_context->short_circuit_for_null_in_probe_side = + _short_circuit_for_null_in_probe_side; + if (_runtime_filter_slots) { + _runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); + } + _shared_hashtable_controller->signal(id()); + } + } else { + DCHECK(_shared_hashtable_controller != nullptr); + DCHECK(_shared_hash_table_context != nullptr); + auto wait_timer = ADD_TIMER(_build_phase_profile, "WaitForSharedHashTableTime"); + SCOPED_TIMER(wait_timer); + RETURN_IF_ERROR( + _shared_hashtable_controller->wait_for_signal(state, _shared_hash_table_context)); + + _build_phase_profile->add_info_string( + "SharedHashTableFrom", + print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id()))); + _short_circuit_for_null_in_probe_side = + _shared_hash_table_context->short_circuit_for_null_in_probe_side; + _hash_table_variants = std::static_pointer_cast( + _shared_hash_table_context->hash_table_variants); + _build_blocks = _shared_hash_table_context->blocks; + + if (!_shared_hash_table_context->runtime_filters.empty()) { + auto ret = std::visit( + Overload {[&](std::monostate&) -> Status { + LOG(FATAL) << "FATAL: uninited hash table"; + __builtin_unreachable(); + }, + [&](auto&& arg) -> Status { + if (_runtime_filter_descs.empty()) { + return Status::OK(); + } + _runtime_filter_slots = _pool->add(new VRuntimeFilterSlots( + _probe_expr_ctxs, _build_expr_ctxs, + _runtime_filter_descs)); + + RETURN_IF_ERROR(_runtime_filter_slots->init( + state, arg.hash_table.get_size())); + return _runtime_filter_slots->copy_from_shared_context( + _shared_hash_table_context); + }}, + *_hash_table_variants); + RETURN_IF_ERROR(ret); + } + } + + _process_hashtable_ctx_variants_init(state); + return Status::OK(); } template @@ -993,7 +1017,7 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) { __builtin_unreachable(); }, [&](auto&& arg) { - arg.hash_table_ptr->set_partitioned_threshold( + arg.hash_table.set_partitioned_threshold( state->partitioned_hash_join_rows_threshold()); }}, *_hash_table_variants); @@ -1054,6 +1078,12 @@ void HashJoinNode::_reset_tuple_is_null_column() { } } +HashJoinNode::~HashJoinNode() { + if (_shared_hashtable_controller && _should_build_hash_table) { + _shared_hashtable_controller->signal(id()); + } +} + void HashJoinNode::_release_mem() { _arena = nullptr; _hash_table_variants = nullptr; @@ -1061,10 +1091,8 @@ void HashJoinNode::_release_mem() { _null_map_column = nullptr; _tuple_is_null_left_flag_column = nullptr; _tuple_is_null_right_flag_column = nullptr; + _shared_hash_table_context = nullptr; _probe_block.clear(); - - std::vector tmp_build_blocks; - _build_blocks.swap(tmp_build_blocks); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 72d8317468..90c5e7e3cd 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -24,7 +24,9 @@ #include "join_op.h" #include "process_hash_table_probe.h" #include "vec/common/columns_hashing.h" +#include "vec/common/hash_table/hash_map.h" #include "vec/common/hash_table/partitioned_hash_map.h" +#include "vec/runtime/shared_hash_table_controller.h" #include "vjoin_node_base.h" namespace doris { @@ -44,7 +46,6 @@ struct SerializedHashTableContext { using Iter = typename HashTable::iterator; HashTable hash_table; - HashTable* hash_table_ptr = &hash_table; Iter iter; bool inited = false; @@ -76,7 +77,6 @@ struct PrimaryTypeHashTableContext { using Iter = typename HashTable::iterator; HashTable hash_table; - HashTable* hash_table_ptr = &hash_table; Iter iter; bool inited = false; @@ -111,7 +111,6 @@ struct FixedKeyHashTableContext { using Iter = typename HashTable::iterator; HashTable hash_table; - HashTable* hash_table_ptr = &hash_table; Iter iter; bool inited = false; @@ -185,6 +184,7 @@ public: static constexpr int PREFETCH_STEP = 64; HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~HashJoinNode(); Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; Status prepare(RuntimeState* state) override; @@ -235,14 +235,18 @@ private: RuntimeProfile::Counter* _join_filter_timer; + RuntimeProfile* _build_phase_profile; + int64_t _mem_used; - std::unique_ptr _arena; - std::unique_ptr _hash_table_variants; + std::shared_ptr _arena; + + // maybe share hash table with other fragment instances + std::shared_ptr _hash_table_variants; std::unique_ptr _process_hashtable_ctx_variants; - std::vector _build_blocks; + std::shared_ptr> _build_blocks; Block _probe_block; ColumnRawPtrs _probe_columns; ColumnUInt8::MutablePtr _null_map_column; @@ -259,8 +263,9 @@ private: Sizes _build_key_sz; bool _is_broadcast_join = false; - SharedHashTableController* _shared_hashtable_controller = nullptr; - VRuntimeFilterSlots* _runtime_filter_slots; + bool _should_build_hash_table = true; + std::shared_ptr _shared_hashtable_controller = nullptr; + VRuntimeFilterSlots* _runtime_filter_slots = nullptr; std::vector _hash_output_slot_ids; std::vector _left_output_slot_flags; @@ -269,6 +274,8 @@ private: MutableColumnPtr _tuple_is_null_left_flag_column; MutableColumnPtr _tuple_is_null_right_flag_column; + SharedHashTableContextPtr _shared_hash_table_context = nullptr; + private: Status _materialize_build_side(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 35c6186ad7..33c6fab3fe 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -189,7 +189,7 @@ Status VScanNode::_register_runtime_filter() { for (int i = 0; i < filter_size; ++i) { IRuntimeFilter* runtime_filter = nullptr; const auto& filter_desc = _runtime_filter_descs[i]; - RETURN_IF_ERROR(_state->runtime_filter_mgr()->regist_filter( + RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter( RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id())); RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter)); diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp index 7d92dabedf..e9e125a168 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ b/be/src/vec/runtime/shared_hash_table_controller.cpp @@ -22,111 +22,57 @@ namespace doris { namespace vectorized { -bool SharedHashTableController::should_build_hash_table(RuntimeState* state, int my_node_id) { +bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragment_instance_id, + int my_node_id) { std::lock_guard lock(_mutex); auto it = _builder_fragment_ids.find(my_node_id); if (it == _builder_fragment_ids.cend()) { - _builder_fragment_ids[my_node_id] = state->fragment_instance_id(); + _builder_fragment_ids.insert({my_node_id, fragment_instance_id}); return true; } return false; } -bool SharedHashTableController::supposed_to_build_hash_table(RuntimeState* state, int my_node_id) { +SharedHashTableContextPtr SharedHashTableController::get_context(int my_node_id) { std::lock_guard lock(_mutex); - auto it = _builder_fragment_ids.find(my_node_id); - if (it != _builder_fragment_ids.cend()) { - return _builder_fragment_ids[my_node_id] == state->fragment_instance_id(); + auto it = _shared_contexts.find(my_node_id); + if (it == _shared_contexts.cend()) { + _shared_contexts.insert({my_node_id, std::make_shared()}); } - return false; + return _shared_contexts[my_node_id]; } -void SharedHashTableController::put_hash_table(SharedHashTableEntry&& entry, int my_node_id) { +void SharedHashTableController::signal(int my_node_id) { std::lock_guard lock(_mutex); - DCHECK(_hash_table_entries.find(my_node_id) == _hash_table_entries.cend()); - _hash_table_entries.insert({my_node_id, std::move(entry)}); + auto it = _shared_contexts.find(my_node_id); + if (it != _shared_contexts.cend()) { + it->second->signaled = true; + _shared_contexts.erase(it); + } _cv.notify_all(); } -SharedHashTableEntry& SharedHashTableController::wait_for_hash_table(int my_node_id) { - std::unique_lock lock(_mutex); - auto it = _hash_table_entries.find(my_node_id); - if (it == _hash_table_entries.cend()) { - _cv.wait(lock, [this, &it, my_node_id]() { - it = _hash_table_entries.find(my_node_id); - return it != _hash_table_entries.cend(); - }); +TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int my_node_id) { + std::lock_guard lock(_mutex); + auto it = _builder_fragment_ids.find(my_node_id); + if (it == _builder_fragment_ids.cend()) { + return TUniqueId {}; } return it->second; } -void SharedHashTableController::acquire_ref_count(RuntimeState* state, int my_node_id) { +Status SharedHashTableController::wait_for_signal(RuntimeState* state, + const SharedHashTableContextPtr& context) { std::unique_lock lock(_mutex); - _ref_fragments[my_node_id].emplace_back(state->fragment_instance_id()); -} - -Status SharedHashTableController::release_ref_count(RuntimeState* state, int my_node_id) { - std::unique_lock lock(_mutex); - auto id = state->fragment_instance_id(); - auto it = std::find(_ref_fragments[my_node_id].begin(), _ref_fragments[my_node_id].end(), id); - CHECK(it != _ref_fragments[my_node_id].end()); - _ref_fragments[my_node_id].erase(it); - _put_an_empty_entry_if_need(Status::Cancelled("hash table not build"), id, my_node_id); - _cv.notify_all(); - return Status::OK(); -} - -void SharedHashTableController::_put_an_empty_entry_if_need(Status status, TUniqueId fragment_id, - int node_id) { - auto builder_it = _builder_fragment_ids.find(node_id); - if (builder_it != _builder_fragment_ids.end()) { - if (builder_it->second == fragment_id) { - if (_hash_table_entries.find(builder_it->first) == _hash_table_entries.cend()) { - // "here put an empty SharedHashTableEntry to avoid deadlocking" - _hash_table_entries.insert( - {builder_it->first, SharedHashTableEntry::empty_entry_with_status(status)}); - } - } + // maybe builder signaled before other instances waiting, + // so here need to check value of `signaled` + while (!context->signaled) { + _cv.wait_for(lock, std::chrono::milliseconds(400), [&]() { return context->signaled; }); + // return if the instances is cancelled(eg. query timeout) + RETURN_IF_CANCELLED(state); } -} - -Status SharedHashTableController::release_ref_count_if_need(TUniqueId fragment_id, Status status) { - std::unique_lock lock(_mutex); - bool need_to_notify = false; - for (auto& ref : _ref_fragments) { - auto it = std::find(ref.second.begin(), ref.second.end(), fragment_id); - if (it == ref.second.end()) { - continue; - } - ref.second.erase(it); - need_to_notify = true; - LOG(INFO) << "release_ref_count in node: " << ref.first - << " for fragment id: " << fragment_id; - } - - for (auto& builder : _builder_fragment_ids) { - if (builder.second == fragment_id) { - if (_hash_table_entries.find(builder.first) == _hash_table_entries.cend()) { - _hash_table_entries.insert( - {builder.first, SharedHashTableEntry::empty_entry_with_status(status)}); - } - } - } - - if (need_to_notify) { - _cv.notify_all(); - } - return Status::OK(); -} - -Status SharedHashTableController::wait_for_closable(RuntimeState* state, int my_node_id) { - std::unique_lock lock(_mutex); - if (!_ref_fragments[my_node_id].empty()) { - _cv.wait(lock, [&]() { return _ref_fragments[my_node_id].empty(); }); - } - RETURN_IF_CANCELLED(state); - return Status::OK(); + return context->status; } } // namespace vectorized -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index 9836c3ec3f..d7d50570db 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -29,70 +29,50 @@ namespace doris { class RuntimeState; class TUniqueId; - -template -class RuntimeFilterSlotsBase; +class MinMaxFuncBase; +class HybridSetBase; +class BloomFilterFuncBase; namespace vectorized { class VExprContext; -struct SharedHashTableEntry { - SharedHashTableEntry(Status status_, void* hash_table_ptr_, std::vector* blocks_, - RuntimeFilterSlotsBase* runtime_filter_slots_) - : status(status_), - hash_table_ptr(hash_table_ptr_), - blocks(blocks_), - runtime_filter_slots(runtime_filter_slots_) {} - SharedHashTableEntry(SharedHashTableEntry&& entry) - : status(entry.status), - hash_table_ptr(entry.hash_table_ptr), - blocks(entry.blocks), - runtime_filter_slots(entry.runtime_filter_slots) {} +struct SharedRuntimeFilterContext { + std::shared_ptr minmax_func; + std::shared_ptr hybrid_set; + std::shared_ptr bloom_filter_func; +}; - static SharedHashTableEntry empty_entry_with_status(const Status& status) { - return SharedHashTableEntry(status, nullptr, nullptr, nullptr); - } +struct SharedHashTableContext { + SharedHashTableContext() + : hash_table_variants(nullptr), + signaled(false), + short_circuit_for_null_in_probe_side(false) {} Status status; - void* hash_table_ptr; - std::vector* blocks; - RuntimeFilterSlotsBase* runtime_filter_slots; + std::shared_ptr arena; + std::shared_ptr hash_table_variants; + std::shared_ptr> blocks; + std::map runtime_filters; + bool signaled; + bool short_circuit_for_null_in_probe_side; }; +using SharedHashTableContextPtr = std::shared_ptr; + class SharedHashTableController { public: - bool should_build_hash_table(RuntimeState* state, int my_node_id); - bool supposed_to_build_hash_table(RuntimeState* state, int my_node_id); - void acquire_ref_count(RuntimeState* state, int my_node_id); - SharedHashTableEntry& wait_for_hash_table(int my_node_id); - Status release_ref_count(RuntimeState* state, int my_node_id); - Status release_ref_count_if_need(TUniqueId fragment_id, Status status); - void put_hash_table(SharedHashTableEntry&& entry, int my_node_id); - Status wait_for_closable(RuntimeState* state, int my_node_id); - - // Single-thread operation - void set_short_circuit_for_null_in_probe_side(bool short_circuit_for_null_in_probe_side) { - _short_circuit_for_null_in_probe_side = short_circuit_for_null_in_probe_side; - } - - bool short_circuit_for_null_in_probe_side() const { - return _short_circuit_for_null_in_probe_side; - } - -private: - // If the fragment instance was supposed to build hash table, but it didn't build. - // To avoid deadlocking other fragment instances, - // here need to put an empty SharedHashTableEntry with canceled status. - void _put_an_empty_entry_if_need(Status status, TUniqueId fragment_id, int node_id); + TUniqueId get_builder_fragment_instance_id(int my_node_id); + SharedHashTableContextPtr get_context(int my_node_id); + void signal(int my_node_id); + Status wait_for_signal(RuntimeState* state, const SharedHashTableContextPtr& context); + bool should_build_hash_table(const TUniqueId& fragment_instance_id, int my_node_id); private: std::mutex _mutex; std::condition_variable _cv; - std::map _builder_fragment_ids; - std::map _hash_table_entries; - std::map> _ref_fragments; - bool _short_circuit_for_null_in_probe_side; + std::map _builder_fragment_ids; + std::map _shared_contexts; }; } // namespace vectorized diff --git a/be/src/vec/runtime/shared_hashtable_controller.cpp b/be/src/vec/runtime/shared_hashtable_controller.cpp deleted file mode 100644 index a761da8c2e..0000000000 --- a/be/src/vec/runtime/shared_hashtable_controller.cpp +++ /dev/null @@ -1,95 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "shared_hashtable_controller.h" - -#include - -namespace doris { -namespace vectorized { - -bool SharedHashTableController::should_build_hash_table(RuntimeState* state, int my_node_id) { - std::lock_guard lock(_mutex); - auto it = _builder_fragment_ids.find(my_node_id); - if (it == _builder_fragment_ids.cend()) { - _builder_fragment_ids[my_node_id] = state->fragment_instance_id(); - return true; - } - return false; -} - -void SharedHashTableController::put_hash_table(SharedHashTableEntry&& entry, int my_node_id) { - std::lock_guard lock(_mutex); - DCHECK(_hash_table_entries.find(my_node_id) == _hash_table_entries.cend()); - _hash_table_entries.insert({my_node_id, std::move(entry)}); - _cv.notify_all(); -} - -SharedHashTableEntry& SharedHashTableController::wait_for_hash_table(int my_node_id) { - std::unique_lock lock(_mutex); - auto it = _hash_table_entries.find(my_node_id); - if (it == _hash_table_entries.cend()) { - _cv.wait(lock, [this, &it, my_node_id]() { - it = _hash_table_entries.find(my_node_id); - return it != _hash_table_entries.cend(); - }); - } - return it->second; -} - -void SharedHashTableController::acquire_ref_count(RuntimeState* state, int my_node_id) { - std::unique_lock lock(_mutex); - _ref_fragments[my_node_id].emplace_back(state->fragment_instance_id()); -} - -Status SharedHashTableController::release_ref_count(RuntimeState* state, int my_node_id) { - std::unique_lock lock(_mutex); - RETURN_IF_CANCELLED(state); - auto id = state->fragment_instance_id(); - auto it = std::find(_ref_fragments[my_node_id].begin(), _ref_fragments[my_node_id].end(), id); - CHECK(it != _ref_fragments[my_node_id].end()); - _ref_fragments[my_node_id].erase(it); - _cv.notify_all(); - return Status::OK(); -} - -Status SharedHashTableController::release_ref_count_if_need(TUniqueId fragment_id) { - std::unique_lock lock(_mutex); - bool need_to_notify = false; - for (auto& ref : _ref_fragments) { - auto it = std::find(ref.second.begin(), ref.second.end(), fragment_id); - if (it == ref.second.end()) continue; - ref.second.erase(it); - need_to_notify = true; - LOG(INFO) << "release_ref_count in node: " << ref.first - << " for fragment id: " << fragment_id; - } - if (need_to_notify) _cv.notify_all(); - return Status::OK(); -} - -Status SharedHashTableController::wait_for_closable(RuntimeState* state, int my_node_id) { - std::unique_lock lock(_mutex); - RETURN_IF_CANCELLED(state); - if (!_ref_fragments[my_node_id].empty()) { - _cv.wait(lock, [&]() { return _ref_fragments[my_node_id].empty(); }); - } - return Status::OK(); -} - -} // namespace vectorized -} // namespace doris \ No newline at end of file diff --git a/be/src/vec/runtime/shared_hashtable_controller.h b/be/src/vec/runtime/shared_hashtable_controller.h deleted file mode 100644 index 842dc89e90..0000000000 --- a/be/src/vec/runtime/shared_hashtable_controller.h +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include -#include - -#include "vec/core/block.h" - -namespace doris { - -class RuntimeState; -class TUniqueId; - -namespace vectorized { - -class VExprContext; - -struct SharedHashTableEntry { - SharedHashTableEntry(void* hash_table_ptr_, std::vector& blocks_, - std::unordered_map>& inserted_rows_, - const std::vector& exprs) - : hash_table_ptr(hash_table_ptr_), - blocks(blocks_), - inserted_rows(inserted_rows_), - build_exprs(exprs) {} - SharedHashTableEntry(SharedHashTableEntry&& entry) - : hash_table_ptr(entry.hash_table_ptr), - blocks(entry.blocks), - inserted_rows(entry.inserted_rows), - build_exprs(entry.build_exprs) {} - void* hash_table_ptr; - std::vector& blocks; - std::unordered_map>& inserted_rows; - std::vector build_exprs; -}; - -class SharedHashTableController { -public: - bool should_build_hash_table(RuntimeState* state, int my_node_id); - void acquire_ref_count(RuntimeState* state, int my_node_id); - SharedHashTableEntry& wait_for_hash_table(int my_node_id); - Status release_ref_count(RuntimeState* state, int my_node_id); - Status release_ref_count_if_need(TUniqueId fragment_id); - void put_hash_table(SharedHashTableEntry&& entry, int my_node_id); - Status wait_for_closable(RuntimeState* state, int my_node_id); - -private: - std::mutex _mutex; - std::condition_variable _cv; - std::map _builder_fragment_ids; - std::map _hash_table_entries; - std::map> _ref_fragments; -}; - -} // namespace vectorized -} // namespace doris \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index c8622de800..3bcf3ed0fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -231,6 +231,9 @@ public class SessionVariable implements Serializable, Writable { public static final String PARTITIONED_HASH_JOIN_ROWS_THRESHOLD = "partitioned_hash_join_rows_threshold"; + public static final String ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN + = "enable_share_hash_table_for_broadcast_join"; + // session origin value public Map sessionOriginValue = new HashMap(); // check stmt is or not [select /*+ SET_VAR(...)*/ ...] @@ -607,6 +610,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = PARTITIONED_HASH_JOIN_ROWS_THRESHOLD) public int partitionedHashJoinRowsThreshold = 0; + @VariableMgr.VarAttr(name = ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN) + public boolean enableShareHashTableForBroadcastJoin = true; + // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables, // not the default value set in the code. public void initFuzzyModeVariables() { @@ -1227,6 +1233,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setBeExecVersion(Config.be_exec_version); tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary); tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery); + tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin); tResult.setBatchSize(batchSize); tResult.setDisableStreamPreaggregations(disableStreamPreaggregations); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index b725ea9191..47a9144269 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -181,6 +181,8 @@ struct TQueryOptions { 52: optional i32 be_exec_version = 0 53: optional i32 partitioned_hash_join_rows_threshold = 0 + + 54: optional bool enable_share_hash_table_for_broadcast_join }