[improvement](join)optimize sharing hash table for broadcast join (#14371)
This PR is to make sharing hash table for broadcast more robust: Add a session variable to enable/disable this function. Do not block the hash join node's close function. Use shared pointer to share hash table and runtime filter in broadcast join nodes. The Hash join node that doesn't need to build the hash table will close the right child without reading any data(the child will close the corresponding sender).
This commit is contained in:
@ -92,7 +92,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
_runtime_filters.resize(_runtime_filter_descs.size());
|
||||
|
||||
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
|
||||
RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options()));
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
|
||||
_runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
|
||||
|
||||
@ -88,7 +88,7 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
for (int i = 0; i < filter_size; ++i) {
|
||||
IRuntimeFilter* runtime_filter = nullptr;
|
||||
const auto& filter_desc = _runtime_filter_descs[i];
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
|
||||
RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id()));
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
|
||||
&runtime_filter));
|
||||
|
||||
@ -42,6 +42,7 @@
|
||||
#include "vec/exprs/vexpr.h"
|
||||
#include "vec/exprs/vliteral.h"
|
||||
#include "vec/exprs/vruntimefilter_wrapper.h"
|
||||
#include "vec/runtime/shared_hash_table_controller.h"
|
||||
|
||||
namespace doris {
|
||||
// PrimitiveType->TExprNodeType
|
||||
@ -431,23 +432,25 @@ public:
|
||||
_max_in_num = params->max_in_num;
|
||||
switch (_filter_type) {
|
||||
case RuntimeFilterType::IN_FILTER: {
|
||||
_hybrid_set.reset(create_set(_column_return_type, _state->enable_vectorized_exec()));
|
||||
_context.hybrid_set.reset(
|
||||
create_set(_column_return_type, _state->enable_vectorized_exec()));
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::MINMAX_FILTER: {
|
||||
_minmax_func.reset(create_minmax_filter(_column_return_type));
|
||||
_context.minmax_func.reset(create_minmax_filter(_column_return_type));
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::BLOOM_FILTER: {
|
||||
_is_bloomfilter = true;
|
||||
_bloomfilter_func.reset(create_bloom_filter(_column_return_type));
|
||||
_bloomfilter_func->set_length(params->bloom_filter_size);
|
||||
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type));
|
||||
_context.bloom_filter_func->set_length(params->bloom_filter_size);
|
||||
return Status::OK();
|
||||
}
|
||||
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
|
||||
_hybrid_set.reset(create_set(_column_return_type, _state->enable_vectorized_exec()));
|
||||
_bloomfilter_func.reset(create_bloom_filter(_column_return_type));
|
||||
_bloomfilter_func->set_length(params->bloom_filter_size);
|
||||
_context.hybrid_set.reset(
|
||||
create_set(_column_return_type, _state->enable_vectorized_exec()));
|
||||
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type));
|
||||
_context.bloom_filter_func->set_length(params->bloom_filter_size);
|
||||
return Status::OK();
|
||||
}
|
||||
default:
|
||||
@ -461,14 +464,15 @@ public:
|
||||
<< "Can not change to bloom filter because of runtime filter type is "
|
||||
<< to_string(_filter_type);
|
||||
_is_bloomfilter = true;
|
||||
insert_to_bloom_filter(_bloomfilter_func.get());
|
||||
insert_to_bloom_filter(_context.bloom_filter_func.get());
|
||||
// release in filter
|
||||
_hybrid_set.reset(create_set(_column_return_type, _state->enable_vectorized_exec()));
|
||||
_context.hybrid_set.reset(
|
||||
create_set(_column_return_type, _state->enable_vectorized_exec()));
|
||||
}
|
||||
|
||||
void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const {
|
||||
if (_hybrid_set->size() > 0) {
|
||||
auto it = _hybrid_set->begin();
|
||||
if (_context.hybrid_set->size() > 0) {
|
||||
auto it = _context.hybrid_set->begin();
|
||||
|
||||
if (_use_batch) {
|
||||
while (it->has_next()) {
|
||||
@ -484,7 +488,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
BloomFilterFuncBase* get_bloomfilter() const { return _bloomfilter_func.get(); }
|
||||
BloomFilterFuncBase* get_bloomfilter() const { return _context.bloom_filter_func.get(); }
|
||||
|
||||
void insert(const void* data) {
|
||||
switch (_filter_type) {
|
||||
@ -492,22 +496,22 @@ public:
|
||||
if (_is_ignored_in_filter) {
|
||||
break;
|
||||
}
|
||||
_hybrid_set->insert(data);
|
||||
_context.hybrid_set->insert(data);
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::MINMAX_FILTER: {
|
||||
_minmax_func->insert(data);
|
||||
_context.minmax_func->insert(data);
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::BLOOM_FILTER: {
|
||||
_bloomfilter_func->insert(data);
|
||||
_context.bloom_filter_func->insert(data);
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
|
||||
if (_is_bloomfilter) {
|
||||
_bloomfilter_func->insert(data);
|
||||
_context.bloom_filter_func->insert(data);
|
||||
} else {
|
||||
_hybrid_set->insert(data);
|
||||
_context.hybrid_set->insert(data);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -523,22 +527,22 @@ public:
|
||||
if (_is_ignored_in_filter) {
|
||||
break;
|
||||
}
|
||||
_hybrid_set->insert_fixed_len(data, offsets, number);
|
||||
_context.hybrid_set->insert_fixed_len(data, offsets, number);
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::MINMAX_FILTER: {
|
||||
_minmax_func->insert_fixed_len(data, offsets, number);
|
||||
_context.minmax_func->insert_fixed_len(data, offsets, number);
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::BLOOM_FILTER: {
|
||||
_bloomfilter_func->insert_fixed_len(data, offsets, number);
|
||||
_context.bloom_filter_func->insert_fixed_len(data, offsets, number);
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
|
||||
if (_is_bloomfilter) {
|
||||
_bloomfilter_func->insert_fixed_len(data, offsets, number);
|
||||
_context.bloom_filter_func->insert_fixed_len(data, offsets, number);
|
||||
} else {
|
||||
_hybrid_set->insert_fixed_len(data, offsets, number);
|
||||
_context.hybrid_set->insert_fixed_len(data, offsets, number);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -618,18 +622,18 @@ public:
|
||||
_is_ignored_in_filter = true;
|
||||
_ignored_in_filter_msg = wrapper->_ignored_in_filter_msg;
|
||||
// release in filter
|
||||
_hybrid_set.reset(
|
||||
_context.hybrid_set.reset(
|
||||
create_set(_column_return_type, _state->enable_vectorized_exec()));
|
||||
break;
|
||||
}
|
||||
// try insert set
|
||||
_hybrid_set->insert(wrapper->_hybrid_set.get());
|
||||
if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) {
|
||||
_context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
|
||||
if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) {
|
||||
#ifdef VLOG_DEBUG_IS_ON
|
||||
std::stringstream msg;
|
||||
msg << "fragment instance " << _fragment_instance_id.to_string()
|
||||
<< " ignore merge runtime filter(in filter id " << _filter_id
|
||||
<< ") because: in_num(" << _hybrid_set->size() << ") >= max_in_num("
|
||||
<< ") because: in_num(" << _context.hybrid_set->size() << ") >= max_in_num("
|
||||
<< _max_in_num << ")";
|
||||
_ignored_in_filter_msg = _pool->add(new std::string(msg.str()));
|
||||
#else
|
||||
@ -638,17 +642,17 @@ public:
|
||||
_is_ignored_in_filter = true;
|
||||
|
||||
// release in filter
|
||||
_hybrid_set.reset(
|
||||
_context.hybrid_set.reset(
|
||||
create_set(_column_return_type, _state->enable_vectorized_exec()));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::MINMAX_FILTER: {
|
||||
_minmax_func->merge(wrapper->_minmax_func.get(), _pool);
|
||||
_context.minmax_func->merge(wrapper->_context.minmax_func.get(), _pool);
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::BLOOM_FILTER: {
|
||||
_bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
|
||||
_context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get());
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
|
||||
@ -661,11 +665,11 @@ public:
|
||||
<< " can not ignore merge runtime filter(in filter id "
|
||||
<< wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: "
|
||||
<< *(wrapper->get_ignored_in_filter_msg());
|
||||
_hybrid_set->insert(wrapper->_hybrid_set.get());
|
||||
if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) {
|
||||
_context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
|
||||
if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) {
|
||||
VLOG_DEBUG << "fragment instance " << _fragment_instance_id.to_string()
|
||||
<< " change runtime filter to bloom filter(id=" << _filter_id
|
||||
<< ") because: in_num(" << _hybrid_set->size()
|
||||
<< ") because: in_num(" << _context.hybrid_set->size()
|
||||
<< ") >= max_in_num(" << _max_in_num << ")";
|
||||
change_to_bloom_filter();
|
||||
}
|
||||
@ -675,7 +679,7 @@ public:
|
||||
<< " change runtime filter to bloom filter(id=" << _filter_id
|
||||
<< ") because: already exist a bloom filter";
|
||||
change_to_bloom_filter();
|
||||
_bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
|
||||
_context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get());
|
||||
}
|
||||
} else {
|
||||
if (wrapper->_filter_type ==
|
||||
@ -685,10 +689,10 @@ public:
|
||||
<< " can not ignore merge runtime filter(in filter id "
|
||||
<< wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: "
|
||||
<< *(wrapper->get_ignored_in_filter_msg());
|
||||
wrapper->insert_to_bloom_filter(_bloomfilter_func.get());
|
||||
wrapper->insert_to_bloom_filter(_context.bloom_filter_func.get());
|
||||
// bloom filter merge bloom filter
|
||||
} else {
|
||||
_bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
|
||||
_context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get());
|
||||
}
|
||||
}
|
||||
break;
|
||||
@ -709,7 +713,7 @@ public:
|
||||
_ignored_in_filter_msg = _pool->add(new std::string(in_filter->ignored_msg()));
|
||||
return Status::OK();
|
||||
}
|
||||
_hybrid_set.reset(create_set(type, _state->enable_vectorized_exec()));
|
||||
_context.hybrid_set.reset(create_set(type, _state->enable_vectorized_exec()));
|
||||
switch (type) {
|
||||
case TYPE_BOOLEAN: {
|
||||
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column,
|
||||
@ -880,40 +884,40 @@ public:
|
||||
_is_bloomfilter = true;
|
||||
// we won't use this class to insert or find any data
|
||||
// so any type is ok
|
||||
_bloomfilter_func.reset(create_bloom_filter(PrimitiveType::TYPE_INT));
|
||||
return _bloomfilter_func->assign(data, bloom_filter->filter_length());
|
||||
_context.bloom_filter_func.reset(create_bloom_filter(PrimitiveType::TYPE_INT));
|
||||
return _context.bloom_filter_func->assign(data, bloom_filter->filter_length());
|
||||
}
|
||||
|
||||
// used by shuffle runtime filter
|
||||
// assign this filter by protobuf
|
||||
Status assign(const PMinMaxFilter* minmax_filter) {
|
||||
PrimitiveType type = to_primitive_type(minmax_filter->column_type());
|
||||
_minmax_func.reset(create_minmax_filter(type));
|
||||
_context.minmax_func.reset(create_minmax_filter(type));
|
||||
switch (type) {
|
||||
case TYPE_BOOLEAN: {
|
||||
bool min_val = minmax_filter->min_val().boolval();
|
||||
bool max_val = minmax_filter->max_val().boolval();
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_TINYINT: {
|
||||
int8_t min_val = static_cast<int8_t>(minmax_filter->min_val().intval());
|
||||
int8_t max_val = static_cast<int8_t>(minmax_filter->max_val().intval());
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_SMALLINT: {
|
||||
int16_t min_val = static_cast<int16_t>(minmax_filter->min_val().intval());
|
||||
int16_t max_val = static_cast<int16_t>(minmax_filter->max_val().intval());
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_INT: {
|
||||
int32_t min_val = minmax_filter->min_val().intval();
|
||||
int32_t max_val = minmax_filter->max_val().intval();
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_BIGINT: {
|
||||
int64_t min_val = minmax_filter->min_val().longval();
|
||||
int64_t max_val = minmax_filter->max_val().longval();
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_LARGEINT: {
|
||||
auto min_string_val = minmax_filter->min_val().stringval();
|
||||
@ -925,27 +929,27 @@ public:
|
||||
int128_t max_val = StringParser::string_to_int<int128_t>(
|
||||
max_string_val.c_str(), max_string_val.length(), &result);
|
||||
DCHECK(result == StringParser::PARSE_SUCCESS);
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_FLOAT: {
|
||||
float min_val = static_cast<float>(minmax_filter->min_val().doubleval());
|
||||
float max_val = static_cast<float>(minmax_filter->max_val().doubleval());
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_DOUBLE: {
|
||||
double min_val = static_cast<double>(minmax_filter->min_val().doubleval());
|
||||
double max_val = static_cast<double>(minmax_filter->max_val().doubleval());
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_DATEV2: {
|
||||
int32_t min_val = minmax_filter->min_val().intval();
|
||||
int32_t max_val = minmax_filter->max_val().intval();
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_DATETIMEV2: {
|
||||
int64_t min_val = minmax_filter->min_val().longval();
|
||||
int64_t max_val = minmax_filter->max_val().longval();
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_DATETIME:
|
||||
case TYPE_DATE: {
|
||||
@ -955,24 +959,24 @@ public:
|
||||
DateTimeValue max_val;
|
||||
min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length());
|
||||
max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length());
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_DECIMALV2: {
|
||||
auto& min_val_ref = minmax_filter->min_val().stringval();
|
||||
auto& max_val_ref = minmax_filter->max_val().stringval();
|
||||
DecimalV2Value min_val(min_val_ref);
|
||||
DecimalV2Value max_val(max_val_ref);
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_DECIMAL32: {
|
||||
int32_t min_val = minmax_filter->min_val().intval();
|
||||
int32_t max_val = minmax_filter->max_val().intval();
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_DECIMAL64: {
|
||||
int64_t min_val = minmax_filter->min_val().longval();
|
||||
int64_t max_val = minmax_filter->max_val().longval();
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_DECIMAL128I: {
|
||||
auto min_string_val = minmax_filter->min_val().stringval();
|
||||
@ -984,7 +988,7 @@ public:
|
||||
int128_t max_val = StringParser::string_to_int<int128_t>(
|
||||
max_string_val.c_str(), max_string_val.length(), &result);
|
||||
DCHECK(result == StringParser::PARSE_SUCCESS);
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
case TYPE_VARCHAR:
|
||||
case TYPE_CHAR:
|
||||
@ -995,7 +999,7 @@ public:
|
||||
auto max_val_ptr = _pool->add(new std::string(max_val_ref));
|
||||
StringValue min_val(const_cast<char*>(min_val_ptr->c_str()), min_val_ptr->length());
|
||||
StringValue max_val(const_cast<char*>(max_val_ptr->c_str()), max_val_ptr->length());
|
||||
return _minmax_func->assign(&min_val, &max_val);
|
||||
return _context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
default:
|
||||
DCHECK(false) << "unknown type";
|
||||
@ -1005,17 +1009,17 @@ public:
|
||||
}
|
||||
|
||||
Status get_in_filter_iterator(HybridSetBase::IteratorBase** it) {
|
||||
*it = _hybrid_set->begin();
|
||||
*it = _context.hybrid_set->begin();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status get_bloom_filter_desc(char** data, int* filter_length) {
|
||||
return _bloomfilter_func->get_data(data, filter_length);
|
||||
return _context.bloom_filter_func->get_data(data, filter_length);
|
||||
}
|
||||
|
||||
Status get_minmax_filter_desc(void** min_data, void** max_data) {
|
||||
*min_data = _minmax_func->get_min();
|
||||
*max_data = _minmax_func->get_max();
|
||||
*min_data = _context.minmax_func->get_min();
|
||||
*max_data = _context.minmax_func->get_max();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -1027,13 +1031,13 @@ public:
|
||||
case TYPE_VARCHAR:
|
||||
case TYPE_CHAR:
|
||||
case TYPE_STRING: {
|
||||
StringValue* min_value = static_cast<StringValue*>(_minmax_func->get_min());
|
||||
StringValue* max_value = static_cast<StringValue*>(_minmax_func->get_max());
|
||||
StringValue* min_value = static_cast<StringValue*>(_context.minmax_func->get_min());
|
||||
StringValue* max_value = static_cast<StringValue*>(_context.minmax_func->get_max());
|
||||
auto min_val_ptr = _pool->add(new std::string(min_value->ptr));
|
||||
auto max_val_ptr = _pool->add(new std::string(max_value->ptr));
|
||||
StringValue min_val(const_cast<char*>(min_val_ptr->c_str()), min_val_ptr->length());
|
||||
StringValue max_val(const_cast<char*>(max_val_ptr->c_str()), max_val_ptr->length());
|
||||
_minmax_func->assign(&min_val, &max_val);
|
||||
_context.minmax_func->assign(&min_val, &max_val);
|
||||
}
|
||||
default:
|
||||
break;
|
||||
@ -1052,11 +1056,11 @@ public:
|
||||
PColumnValue&, ObjectPool*)) {
|
||||
for (int i = 0; i < filter->values_size(); ++i) {
|
||||
PColumnValue column = filter->values(i);
|
||||
assign_func(_hybrid_set, column, _pool);
|
||||
assign_func(_context.hybrid_set, column, _pool);
|
||||
}
|
||||
}
|
||||
|
||||
size_t get_in_filter_size() const { return _hybrid_set->size(); }
|
||||
size_t get_in_filter_size() const { return _context.hybrid_set->size(); }
|
||||
|
||||
friend class IRuntimeFilter;
|
||||
|
||||
@ -1068,9 +1072,8 @@ private:
|
||||
PrimitiveType _column_return_type; // column type
|
||||
RuntimeFilterType _filter_type;
|
||||
int32_t _max_in_num = -1;
|
||||
std::shared_ptr<MinMaxFuncBase> _minmax_func;
|
||||
std::shared_ptr<HybridSetBase> _hybrid_set;
|
||||
std::shared_ptr<BloomFilterFuncBase> _bloomfilter_func;
|
||||
|
||||
vectorized::SharedRuntimeFilterContext _context;
|
||||
bool _is_bloomfilter = false;
|
||||
bool _is_ignored_in_filter = false;
|
||||
std::string* _ignored_in_filter_msg = nullptr;
|
||||
@ -1090,12 +1093,12 @@ Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRunt
|
||||
return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id);
|
||||
}
|
||||
|
||||
Status IRuntimeFilter::apply_from_other(IRuntimeFilter* other) {
|
||||
_wrapper->_hybrid_set = other->_wrapper->_hybrid_set;
|
||||
_wrapper->_bloomfilter_func = other->_wrapper->_bloomfilter_func;
|
||||
_wrapper->_minmax_func = other->_wrapper->_minmax_func;
|
||||
_wrapper->_filter_type = other->_wrapper->_filter_type;
|
||||
_runtime_filter_type = other->_runtime_filter_type;
|
||||
void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) {
|
||||
context = _wrapper->_context;
|
||||
}
|
||||
|
||||
Status IRuntimeFilter::copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context) {
|
||||
_wrapper->_context = context;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -1736,7 +1739,7 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta
|
||||
node.__isset.vector_opcode = true;
|
||||
node.__set_vector_opcode(to_in_opcode(_column_return_type));
|
||||
auto in_pred = _pool->add(new InPredicate(node));
|
||||
RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.get()));
|
||||
RETURN_IF_ERROR(in_pred->prepare(state, _context.hybrid_set.get()));
|
||||
in_pred->add_child(Expr::copy(_pool, prob_expr->root()));
|
||||
ExprContext* ctx = _pool->add(new ExprContext(in_pred));
|
||||
container->push_back(ctx);
|
||||
@ -1748,7 +1751,8 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta
|
||||
Expr* max_literal = nullptr;
|
||||
auto max_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::LE);
|
||||
RETURN_IF_ERROR(create_literal<false>(_pool, prob_expr->root()->type(),
|
||||
_minmax_func->get_max(), (void**)&max_literal));
|
||||
_context.minmax_func->get_max(),
|
||||
(void**)&max_literal));
|
||||
max_pred->add_child(Expr::copy(_pool, prob_expr->root()));
|
||||
max_pred->add_child(max_literal);
|
||||
container->push_back(_pool->add(new ExprContext(max_pred)));
|
||||
@ -1756,7 +1760,8 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta
|
||||
Expr* min_literal = nullptr;
|
||||
auto min_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::GE);
|
||||
RETURN_IF_ERROR(create_literal<false>(_pool, prob_expr->root()->type(),
|
||||
_minmax_func->get_min(), (void**)&min_literal));
|
||||
_context.minmax_func->get_min(),
|
||||
(void**)&min_literal));
|
||||
min_pred->add_child(Expr::copy(_pool, prob_expr->root()));
|
||||
min_pred->add_child(min_literal);
|
||||
container->push_back(_pool->add(new ExprContext(min_pred)));
|
||||
@ -1772,7 +1777,7 @@ Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* sta
|
||||
node.__isset.vector_opcode = true;
|
||||
node.__set_vector_opcode(to_in_opcode(_column_return_type));
|
||||
auto bloom_pred = _pool->add(new BloomFilterPredicate(node));
|
||||
RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func));
|
||||
RETURN_IF_ERROR(bloom_pred->prepare(state, _context.bloom_filter_func));
|
||||
bloom_pred->add_child(Expr::copy(_pool, prob_expr->root()));
|
||||
ExprContext* ctx = _pool->add(new ExprContext(bloom_pred));
|
||||
container->push_back(ctx);
|
||||
@ -1811,7 +1816,7 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
|
||||
node.__set_is_nullable(false);
|
||||
|
||||
auto in_pred = _pool->add(new vectorized::VDirectInPredicate(node));
|
||||
in_pred->set_filter(_hybrid_set);
|
||||
in_pred->set_filter(_context.hybrid_set);
|
||||
auto cloned_vexpr = vprob_expr->root()->clone(_pool);
|
||||
in_pred->add_child(cloned_vexpr);
|
||||
auto wrapper = _pool->add(new vectorized::VRuntimeFilterWrapper(node, in_pred));
|
||||
@ -1827,7 +1832,8 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
|
||||
&max_pred, &max_pred_node));
|
||||
doris::vectorized::VExpr* max_literal = nullptr;
|
||||
RETURN_IF_ERROR(create_literal<true>(_pool, vprob_expr->root()->type(),
|
||||
_minmax_func->get_max(), (void**)&max_literal));
|
||||
_context.minmax_func->get_max(),
|
||||
(void**)&max_literal));
|
||||
auto cloned_vexpr = vprob_expr->root()->clone(_pool);
|
||||
max_pred->add_child(cloned_vexpr);
|
||||
max_pred->add_child(max_literal);
|
||||
@ -1841,7 +1847,8 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
|
||||
&min_pred, &min_pred_node));
|
||||
doris::vectorized::VExpr* min_literal = nullptr;
|
||||
RETURN_IF_ERROR(create_literal<true>(_pool, vprob_expr->root()->type(),
|
||||
_minmax_func->get_min(), (void**)&min_literal));
|
||||
_context.minmax_func->get_min(),
|
||||
(void**)&min_literal));
|
||||
cloned_vexpr = vprob_expr->root()->clone(_pool);
|
||||
min_pred->add_child(cloned_vexpr);
|
||||
min_pred->add_child(min_literal);
|
||||
@ -1861,7 +1868,7 @@ Status RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
|
||||
node.__set_vector_opcode(to_in_opcode(_column_return_type));
|
||||
node.__set_is_nullable(false);
|
||||
auto bloom_pred = _pool->add(new vectorized::VBloomPredicate(node));
|
||||
bloom_pred->set_filter(_bloomfilter_func);
|
||||
bloom_pred->set_filter(_context.bloom_filter_func);
|
||||
auto cloned_vexpr = vprob_expr->root()->clone(_pool);
|
||||
bloom_pred->add_child(cloned_vexpr);
|
||||
auto wrapper = _pool->add(new vectorized::VRuntimeFilterWrapper(node, bloom_pred));
|
||||
|
||||
@ -47,6 +47,7 @@ class BloomFilterFuncBase;
|
||||
namespace vectorized {
|
||||
class VExpr;
|
||||
class VExprContext;
|
||||
struct SharedRuntimeFilterContext;
|
||||
} // namespace vectorized
|
||||
|
||||
enum class RuntimeFilterType {
|
||||
@ -141,7 +142,8 @@ public:
|
||||
const TQueryOptions* query_options, const RuntimeFilterRole role,
|
||||
int node_id, IRuntimeFilter** res);
|
||||
|
||||
Status apply_from_other(IRuntimeFilter* other);
|
||||
void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context);
|
||||
Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context);
|
||||
|
||||
// insert data to build filter
|
||||
// only used for producer
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
#include "vec/columns/column_nullable.h"
|
||||
#include "vec/columns/columns_number.h"
|
||||
#include "vec/common/assert_cast.h"
|
||||
#include "vec/runtime/shared_hash_table_controller.h"
|
||||
|
||||
namespace doris {
|
||||
// this class used in a hash join node
|
||||
@ -229,19 +230,24 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
Status apply_from_other(RuntimeFilterSlotsBase<ExprCtxType>* other) {
|
||||
void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) {
|
||||
for (auto& it : _runtime_filters) {
|
||||
for (auto& filter : it.second) {
|
||||
auto& target = context->runtime_filters[filter->filter_id()];
|
||||
filter->copy_to_shared_context(target);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Status copy_from_shared_context(vectorized::SharedHashTableContextPtr& context) {
|
||||
for (auto& it : _runtime_filters) {
|
||||
auto& other_filters = other->_runtime_filters[it.first];
|
||||
for (auto& filter : it.second) {
|
||||
auto filter_id = filter->filter_id();
|
||||
auto ret = std::find_if(other_filters.begin(), other_filters.end(),
|
||||
[&](IRuntimeFilter* other_filter) {
|
||||
return other_filter->filter_id() == filter_id;
|
||||
});
|
||||
if (ret == other_filters.end()) {
|
||||
auto ret = context->runtime_filters.find(filter_id);
|
||||
if (ret == context->runtime_filters.end()) {
|
||||
return Status::Aborted("invalid runtime filter id: {}", filter_id);
|
||||
}
|
||||
filter->apply_from_other(*ret);
|
||||
filter->copy_from_shared_context(ret->second);
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
@ -240,13 +240,7 @@ Status FragmentExecState::execute() {
|
||||
}
|
||||
#ifndef BE_TEST
|
||||
if (_executor.runtime_state()->is_cancelled()) {
|
||||
Status status = Status::Cancelled("cancelled before execution");
|
||||
_executor.runtime_state()
|
||||
->get_query_fragments_ctx()
|
||||
->get_shared_hash_table_controller()
|
||||
->release_ref_count_if_need(_executor.runtime_state()->fragment_instance_id(),
|
||||
status);
|
||||
return status;
|
||||
return Status::Cancelled("cancelled before execution");
|
||||
}
|
||||
#endif
|
||||
int64_t duration_ns = 0;
|
||||
@ -254,19 +248,11 @@ Status FragmentExecState::execute() {
|
||||
SCOPED_RAW_TIMER(&duration_ns);
|
||||
CgroupsMgr::apply_system_cgroup();
|
||||
opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment");
|
||||
Status status = _executor.open();
|
||||
WARN_IF_ERROR(status,
|
||||
WARN_IF_ERROR(_executor.open(),
|
||||
strings::Substitute("Got error while opening fragment $0, query id: $1",
|
||||
print_id(_fragment_instance_id), print_id(_query_id)));
|
||||
|
||||
_executor.close();
|
||||
if (!status.ok()) {
|
||||
_executor.runtime_state()
|
||||
->get_query_fragments_ctx()
|
||||
->get_shared_hash_table_controller()
|
||||
->release_ref_count_if_need(_executor.runtime_state()->fragment_instance_id(),
|
||||
status);
|
||||
}
|
||||
}
|
||||
DorisMetrics::instance()->fragment_requests_total->increment(1);
|
||||
DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000);
|
||||
@ -712,9 +698,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
|
||||
}
|
||||
g_fragmentmgr_prepare_latency << (duration_ns / 1000);
|
||||
|
||||
_setup_shared_hashtable_for_broadcast_join(params, exec_state->executor()->runtime_state(),
|
||||
fragments_ctx.get());
|
||||
|
||||
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
|
||||
_runtimefilter_controller.add_entity(params, &handler, exec_state->executor()->runtime_state());
|
||||
exec_state->set_merge_controller_handler(handler);
|
||||
@ -784,26 +767,6 @@ void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params,
|
||||
#endif
|
||||
}
|
||||
|
||||
void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params,
|
||||
RuntimeState* state,
|
||||
QueryFragmentsCtx* fragments_ctx) {
|
||||
if (!params.__isset.fragment || !params.fragment.__isset.plan ||
|
||||
params.fragment.plan.nodes.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto& node : params.fragment.plan.nodes) {
|
||||
if (node.node_type != TPlanNodeType::HASH_JOIN_NODE ||
|
||||
!node.hash_join_node.__isset.is_broadcast_join ||
|
||||
!node.hash_join_node.is_broadcast_join) {
|
||||
continue;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(_lock_for_shared_hash_table);
|
||||
fragments_ctx->get_shared_hash_table_controller()->acquire_ref_count(state, node.node_id);
|
||||
}
|
||||
}
|
||||
|
||||
bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) {
|
||||
return type == TPlanNodeType::OLAP_SCAN_NODE || type == TPlanNodeType::MYSQL_SCAN_NODE ||
|
||||
type == TPlanNodeType::SCHEMA_SCAN_NODE || type == TPlanNodeType::META_SCAN_NODE ||
|
||||
|
||||
@ -107,10 +107,6 @@ private:
|
||||
|
||||
bool _is_scan_node(const TPlanNodeType::type& type);
|
||||
|
||||
void _setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params,
|
||||
RuntimeState* state,
|
||||
QueryFragmentsCtx* fragments_ctx);
|
||||
|
||||
// This is input params
|
||||
ExecEnv* _exec_env;
|
||||
|
||||
|
||||
@ -100,8 +100,8 @@ public:
|
||||
return _ready_to_execute.load() && !_is_cancelled.load();
|
||||
}
|
||||
|
||||
vectorized::SharedHashTableController* get_shared_hash_table_controller() {
|
||||
return _shared_hash_table_controller.get();
|
||||
std::shared_ptr<vectorized::SharedHashTableController> get_shared_hash_table_controller() {
|
||||
return _shared_hash_table_controller;
|
||||
}
|
||||
|
||||
public:
|
||||
@ -144,7 +144,7 @@ private:
|
||||
std::atomic<bool> _ready_to_execute {false};
|
||||
std::atomic<bool> _is_cancelled {false};
|
||||
|
||||
std::unique_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller;
|
||||
std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -78,8 +78,9 @@ Status RuntimeFilterMgr::get_producer_filter(const int filter_id,
|
||||
return get_filter_by_role(filter_id, RuntimeFilterRole::PRODUCER, producer_filter);
|
||||
}
|
||||
|
||||
Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc,
|
||||
const TQueryOptions& options, int node_id) {
|
||||
Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role,
|
||||
const TRuntimeFilterDesc& desc,
|
||||
const TQueryOptions& options, int node_id) {
|
||||
DCHECK((role == RuntimeFilterRole::CONSUMER && node_id >= 0) ||
|
||||
role != RuntimeFilterRole::CONSUMER);
|
||||
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
|
||||
|
||||
@ -69,8 +69,8 @@ public:
|
||||
|
||||
Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter);
|
||||
// regist filter
|
||||
Status regist_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc,
|
||||
const TQueryOptions& options, int node_id = -1);
|
||||
Status register_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc,
|
||||
const TQueryOptions& options, int node_id = -1);
|
||||
|
||||
// update filter by remote
|
||||
Status update_filter(const PPublishFilterRequest* request,
|
||||
|
||||
@ -397,6 +397,11 @@ public:
|
||||
|
||||
bool enable_profile() const { return _query_options.is_report_success; }
|
||||
|
||||
bool enable_share_hash_table_for_broadcast_join() const {
|
||||
return _query_options.__isset.enable_share_hash_table_for_broadcast_join &&
|
||||
_query_options.enable_share_hash_table_for_broadcast_join;
|
||||
}
|
||||
|
||||
private:
|
||||
// Use a custom block manager for the query for testing purposes.
|
||||
void set_block_mgr2(const std::shared_ptr<BufferedBlockMgr2>& block_mgr) {
|
||||
|
||||
@ -28,7 +28,7 @@ template <int JoinOpType>
|
||||
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinNode* join_node, int batch_size)
|
||||
: _join_node(join_node),
|
||||
_batch_size(batch_size),
|
||||
_build_blocks(join_node->_build_blocks),
|
||||
_build_blocks(*join_node->_build_blocks),
|
||||
_tuple_is_null_left_flags(join_node->_is_outer_join
|
||||
? &(reinterpret_cast<ColumnUInt8&>(
|
||||
*join_node->_tuple_is_null_left_flag_column)
|
||||
@ -207,15 +207,14 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
|
||||
}
|
||||
int last_offset = current_offset;
|
||||
auto find_result =
|
||||
!need_null_map_for_probe ? key_getter.find_key(*hash_table_ctx.hash_table_ptr,
|
||||
probe_index, _arena)
|
||||
!need_null_map_for_probe
|
||||
? key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena)
|
||||
: (*null_map)[probe_index]
|
||||
? decltype(key_getter.find_key(*hash_table_ctx.hash_table_ptr,
|
||||
probe_index, _arena)) {nullptr, false}
|
||||
: key_getter.find_key(*hash_table_ctx.hash_table_ptr, probe_index,
|
||||
_arena);
|
||||
? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
|
||||
_arena)) {nullptr, false}
|
||||
: key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena);
|
||||
if (probe_index + PREFETCH_STEP < probe_rows)
|
||||
key_getter.template prefetch<true>(*hash_table_ctx.hash_table_ptr,
|
||||
key_getter.template prefetch<true>(hash_table_ctx.hash_table,
|
||||
probe_index + PREFETCH_STEP, _arena);
|
||||
|
||||
if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
|
||||
@ -372,15 +371,14 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
|
||||
|
||||
auto last_offset = current_offset;
|
||||
auto find_result =
|
||||
!need_null_map_for_probe ? key_getter.find_key(*hash_table_ctx.hash_table_ptr,
|
||||
probe_index, _arena)
|
||||
!need_null_map_for_probe
|
||||
? key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena)
|
||||
: (*null_map)[probe_index]
|
||||
? decltype(key_getter.find_key(*hash_table_ctx.hash_table_ptr,
|
||||
probe_index, _arena)) {nullptr, false}
|
||||
: key_getter.find_key(*hash_table_ctx.hash_table_ptr, probe_index,
|
||||
_arena);
|
||||
? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
|
||||
_arena)) {nullptr, false}
|
||||
: key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena);
|
||||
if (probe_index + PREFETCH_STEP < probe_rows)
|
||||
key_getter.template prefetch<true>(*hash_table_ctx.hash_table_ptr,
|
||||
key_getter.template prefetch<true>(hash_table_ctx.hash_table,
|
||||
probe_index + PREFETCH_STEP, _arena);
|
||||
if (find_result.is_found()) {
|
||||
auto& mapped = find_result.get_mapped();
|
||||
@ -627,7 +625,7 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
|
||||
}
|
||||
};
|
||||
|
||||
for (; iter != hash_table_ctx.hash_table_ptr->end() && block_size < _batch_size; ++iter) {
|
||||
for (; iter != hash_table_ctx.hash_table.end() && block_size < _batch_size; ++iter) {
|
||||
auto& mapped = iter->get_second();
|
||||
if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
|
||||
if (mapped.visited) {
|
||||
@ -674,7 +672,7 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
|
||||
}
|
||||
_tuple_is_null_left_flags->resize_fill(block_size, 1);
|
||||
}
|
||||
*eos = iter == hash_table_ctx.hash_table_ptr->end();
|
||||
*eos = iter == hash_table_ctx.hash_table.end();
|
||||
output_block->swap(
|
||||
mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0));
|
||||
return Status::OK();
|
||||
|
||||
@ -221,7 +221,7 @@ struct ProcessRuntimeFilterBuild {
|
||||
_join_node->_runtime_filter_descs));
|
||||
|
||||
RETURN_IF_ERROR(_join_node->_runtime_filter_slots->init(
|
||||
state, hash_table_ctx.hash_table_ptr->get_size()));
|
||||
state, hash_table_ctx.hash_table.get_size()));
|
||||
|
||||
if (!_join_node->_runtime_filter_slots->empty() && !_join_node->_inserted_rows.empty()) {
|
||||
{
|
||||
@ -250,14 +250,15 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
|
||||
? tnode.hash_join_node.hash_output_slot_ids
|
||||
: std::vector<SlotId> {}) {
|
||||
_runtime_filter_descs = tnode.runtime_filters;
|
||||
_arena = std::make_unique<Arena>();
|
||||
_hash_table_variants = std::make_unique<HashTableVariants>();
|
||||
_arena = std::make_shared<Arena>();
|
||||
_hash_table_variants = std::make_shared<HashTableVariants>();
|
||||
_process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
|
||||
_build_blocks.reset(new std::vector<Block>());
|
||||
|
||||
// avoid vector expand change block address.
|
||||
// one block can store 4g data, _build_blocks can store 128*4g data.
|
||||
// if probe data bigger than 512g, runtime filter maybe will core dump when insert data.
|
||||
_build_blocks.reserve(_MAX_BUILD_BLOCK_COUNT);
|
||||
_build_blocks->reserve(_MAX_BUILD_BLOCK_COUNT);
|
||||
}
|
||||
|
||||
Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
@ -313,7 +314,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
|
||||
_runtime_filters.resize(_runtime_filter_descs.size());
|
||||
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
|
||||
RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options()));
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
|
||||
_runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
|
||||
@ -349,18 +350,18 @@ Status HashJoinNode::prepare(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
|
||||
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
||||
// Build phase
|
||||
auto build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true);
|
||||
runtime_profile()->add_child(build_phase_profile, false, nullptr);
|
||||
_build_timer = ADD_TIMER(build_phase_profile, "BuildTime");
|
||||
_build_table_timer = ADD_TIMER(build_phase_profile, "BuildTableTime");
|
||||
_build_side_merge_block_timer = ADD_TIMER(build_phase_profile, "BuildSideMergeBlockTime");
|
||||
_build_table_insert_timer = ADD_TIMER(build_phase_profile, "BuildTableInsertTime");
|
||||
_build_expr_call_timer = ADD_TIMER(build_phase_profile, "BuildExprCallTime");
|
||||
_build_table_expanse_timer = ADD_TIMER(build_phase_profile, "BuildTableExpanseTime");
|
||||
_build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true);
|
||||
runtime_profile()->add_child(_build_phase_profile, false, nullptr);
|
||||
_build_timer = ADD_TIMER(_build_phase_profile, "BuildTime");
|
||||
_build_table_timer = ADD_TIMER(_build_phase_profile, "BuildTableTime");
|
||||
_build_side_merge_block_timer = ADD_TIMER(_build_phase_profile, "BuildSideMergeBlockTime");
|
||||
_build_table_insert_timer = ADD_TIMER(_build_phase_profile, "BuildTableInsertTime");
|
||||
_build_expr_call_timer = ADD_TIMER(_build_phase_profile, "BuildExprCallTime");
|
||||
_build_table_expanse_timer = ADD_TIMER(_build_phase_profile, "BuildTableExpanseTime");
|
||||
_build_table_convert_timer =
|
||||
ADD_TIMER(build_phase_profile, "BuildTableConvertToPartitionedTime");
|
||||
_build_rows_counter = ADD_COUNTER(build_phase_profile, "BuildRows", TUnit::UNIT);
|
||||
_build_side_compute_hash_timer = ADD_TIMER(build_phase_profile, "BuildSideHashComputingTime");
|
||||
ADD_TIMER(_build_phase_profile, "BuildTableConvertToPartitionedTime");
|
||||
_build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows", TUnit::UNIT);
|
||||
_build_side_compute_hash_timer = ADD_TIMER(_build_phase_profile, "BuildSideHashComputingTime");
|
||||
|
||||
// Probe phase
|
||||
auto probe_phase_profile = runtime_profile()->create_child("ProbePhase", true, true);
|
||||
@ -379,8 +380,19 @@ Status HashJoinNode::prepare(RuntimeState* state) {
|
||||
_build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT);
|
||||
_build_buckets_fill_counter = ADD_COUNTER(runtime_profile(), "FilledBuckets", TUnit::UNIT);
|
||||
|
||||
_should_build_hash_table = true;
|
||||
if (_is_broadcast_join) {
|
||||
runtime_profile()->add_info_string("BroadcastJoin", "true");
|
||||
if (state->enable_share_hash_table_for_broadcast_join()) {
|
||||
runtime_profile()->add_info_string("ShareHashTableEnabled", "true");
|
||||
_shared_hashtable_controller =
|
||||
state->get_query_fragments_ctx()->get_shared_hash_table_controller();
|
||||
_shared_hash_table_context = _shared_hashtable_controller->get_context(id());
|
||||
_should_build_hash_table = _shared_hashtable_controller->should_build_hash_table(
|
||||
state->fragment_instance_id(), id());
|
||||
} else {
|
||||
runtime_profile()->add_info_string("ShareHashTableEnabled", "false");
|
||||
}
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(VExpr::prepare(_build_expr_ctxs, state, child(1)->row_desc()));
|
||||
@ -398,7 +410,6 @@ Status HashJoinNode::prepare(RuntimeState* state) {
|
||||
|
||||
// Hash Table Init
|
||||
_hash_table_init(state);
|
||||
_process_hashtable_ctx_variants_init(state);
|
||||
_construct_mutable_join_block();
|
||||
|
||||
return Status::OK();
|
||||
@ -417,11 +428,6 @@ Status HashJoinNode::close(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (_shared_hashtable_controller) {
|
||||
_shared_hashtable_controller->release_ref_count(state, id());
|
||||
_shared_hashtable_controller->wait_for_closable(state, id());
|
||||
}
|
||||
|
||||
START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::close");
|
||||
VExpr::close(_build_expr_ctxs, state);
|
||||
VExpr::close(_probe_expr_ctxs, state);
|
||||
@ -610,6 +616,7 @@ Status HashJoinNode::open(RuntimeState* state) {
|
||||
|
||||
Status HashJoinNode::_materialize_build_side(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(child(1)->open(state));
|
||||
|
||||
SCOPED_TIMER(_build_timer);
|
||||
MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors());
|
||||
|
||||
@ -620,111 +627,128 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) {
|
||||
// make one block for each 4 gigabytes
|
||||
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
|
||||
|
||||
auto should_build_hash_table = true;
|
||||
if (_is_broadcast_join) {
|
||||
_shared_hashtable_controller =
|
||||
state->get_query_fragments_ctx()->get_shared_hash_table_controller();
|
||||
should_build_hash_table =
|
||||
_shared_hashtable_controller->should_build_hash_table(state, id());
|
||||
}
|
||||
if (_should_build_hash_table) {
|
||||
Block block;
|
||||
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
|
||||
// data from data.
|
||||
while (!eos && !_short_circuit_for_null_in_probe_side) {
|
||||
block.clear_column_data();
|
||||
RETURN_IF_CANCELLED(state);
|
||||
|
||||
Block block;
|
||||
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
|
||||
// data from data.
|
||||
while (!eos && !_short_circuit_for_null_in_probe_side) {
|
||||
block.clear_column_data();
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block, &eos),
|
||||
child(1)->get_next_span(), eos);
|
||||
|
||||
RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block, &eos),
|
||||
child(1)->get_next_span(), eos);
|
||||
if (!should_build_hash_table) {
|
||||
continue;
|
||||
_mem_used += block.allocated_bytes();
|
||||
|
||||
if (block.rows() != 0) {
|
||||
SCOPED_TIMER(_build_side_merge_block_timer);
|
||||
RETURN_IF_CATCH_BAD_ALLOC(mutable_block.merge(block));
|
||||
}
|
||||
|
||||
if (UNLIKELY(_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE)) {
|
||||
if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) {
|
||||
return Status::NotSupported(
|
||||
strings::Substitute("data size of right table in hash join > $0",
|
||||
BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT));
|
||||
}
|
||||
_build_blocks->emplace_back(mutable_block.to_block());
|
||||
// TODO:: Rethink may we should do the process after we receive all build blocks ?
|
||||
// which is better.
|
||||
RETURN_IF_ERROR(_process_build_block(state, (*_build_blocks)[index], index));
|
||||
|
||||
mutable_block = MutableBlock();
|
||||
++index;
|
||||
last_mem_used = _mem_used;
|
||||
}
|
||||
}
|
||||
|
||||
_mem_used += block.allocated_bytes();
|
||||
|
||||
if (block.rows() != 0) {
|
||||
SCOPED_TIMER(_build_side_merge_block_timer);
|
||||
RETURN_IF_CATCH_BAD_ALLOC(mutable_block.merge(block));
|
||||
}
|
||||
|
||||
if (UNLIKELY(_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE)) {
|
||||
if (_build_blocks.size() == _MAX_BUILD_BLOCK_COUNT) {
|
||||
if (!mutable_block.empty() && !_short_circuit_for_null_in_probe_side) {
|
||||
if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) {
|
||||
return Status::NotSupported(
|
||||
strings::Substitute("data size of right table in hash join > $0",
|
||||
BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT));
|
||||
}
|
||||
_build_blocks.emplace_back(mutable_block.to_block());
|
||||
// TODO:: Rethink may we should do the process after we receive all build blocks ?
|
||||
// which is better.
|
||||
RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index));
|
||||
|
||||
mutable_block = MutableBlock();
|
||||
++index;
|
||||
last_mem_used = _mem_used;
|
||||
_build_blocks->emplace_back(mutable_block.to_block());
|
||||
RETURN_IF_ERROR(_process_build_block(state, (*_build_blocks)[index], index));
|
||||
}
|
||||
}
|
||||
|
||||
if (should_build_hash_table && !mutable_block.empty() &&
|
||||
!_short_circuit_for_null_in_probe_side) {
|
||||
if (_build_blocks.size() == _MAX_BUILD_BLOCK_COUNT) {
|
||||
return Status::NotSupported(
|
||||
strings::Substitute("data size of right table in hash join > $0",
|
||||
BUILD_BLOCK_MAX_SIZE * _MAX_BUILD_BLOCK_COUNT));
|
||||
}
|
||||
_build_blocks.emplace_back(mutable_block.to_block());
|
||||
RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index], index));
|
||||
}
|
||||
child(1)->close(state);
|
||||
|
||||
return std::visit(
|
||||
Overload {[&](std::monostate& arg) -> Status {
|
||||
LOG(FATAL) << "FATAL: uninited hash table";
|
||||
__builtin_unreachable();
|
||||
},
|
||||
[&](auto&& arg) -> Status {
|
||||
using HashTableCtxType = std::decay_t<decltype(arg)>;
|
||||
using HashTableType = typename HashTableCtxType::HashTable;
|
||||
if (!should_build_hash_table) {
|
||||
auto& ret = _shared_hashtable_controller->wait_for_hash_table(id());
|
||||
if (!ret.status.ok()) {
|
||||
return ret.status;
|
||||
}
|
||||
_short_circuit_for_null_in_probe_side =
|
||||
_shared_hashtable_controller
|
||||
->short_circuit_for_null_in_probe_side();
|
||||
arg.hash_table_ptr =
|
||||
reinterpret_cast<HashTableType*>(ret.hash_table_ptr);
|
||||
_build_blocks = *ret.blocks;
|
||||
_runtime_filter_slots = _pool->add(new VRuntimeFilterSlots(
|
||||
_probe_expr_ctxs, _build_expr_ctxs, _runtime_filter_descs));
|
||||
RETURN_IF_ERROR(_runtime_filter_slots->init(
|
||||
state, arg.hash_table_ptr->get_size()));
|
||||
RETURN_IF_ERROR(_runtime_filter_slots->apply_from_other(
|
||||
ret.runtime_filter_slots));
|
||||
{
|
||||
SCOPED_TIMER(_push_down_timer);
|
||||
_runtime_filter_slots->publish();
|
||||
}
|
||||
return Status::OK();
|
||||
} else {
|
||||
arg.hash_table_ptr = &arg.hash_table;
|
||||
ProcessRuntimeFilterBuild<HashTableCtxType>
|
||||
runtime_filter_build_process(this);
|
||||
auto ret = runtime_filter_build_process(state, arg);
|
||||
if (_shared_hashtable_controller) {
|
||||
_shared_hashtable_controller
|
||||
->set_short_circuit_for_null_in_probe_side(
|
||||
_short_circuit_for_null_in_probe_side);
|
||||
SharedHashTableEntry entry(ret, arg.hash_table_ptr,
|
||||
&_build_blocks, _runtime_filter_slots);
|
||||
_shared_hashtable_controller->put_hash_table(std::move(entry),
|
||||
id());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}},
|
||||
*_hash_table_variants);
|
||||
if (_should_build_hash_table) {
|
||||
auto ret = std::visit(Overload {[&](std::monostate&) -> Status {
|
||||
LOG(FATAL) << "FATAL: uninited hash table";
|
||||
__builtin_unreachable();
|
||||
},
|
||||
[&](auto&& arg) -> Status {
|
||||
using HashTableCtxType = std::decay_t<decltype(arg)>;
|
||||
ProcessRuntimeFilterBuild<HashTableCtxType>
|
||||
runtime_filter_build_process(this);
|
||||
return runtime_filter_build_process(state, arg);
|
||||
}},
|
||||
*_hash_table_variants);
|
||||
if (!ret.ok()) {
|
||||
if (_shared_hashtable_controller) {
|
||||
_shared_hash_table_context->status = ret;
|
||||
_shared_hashtable_controller->signal(id());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
if (_shared_hashtable_controller) {
|
||||
_shared_hash_table_context->status = Status::OK();
|
||||
// arena will be shared with other instances.
|
||||
_shared_hash_table_context->arena = _arena;
|
||||
_shared_hash_table_context->blocks = _build_blocks;
|
||||
_shared_hash_table_context->hash_table_variants = _hash_table_variants;
|
||||
_shared_hash_table_context->short_circuit_for_null_in_probe_side =
|
||||
_short_circuit_for_null_in_probe_side;
|
||||
if (_runtime_filter_slots) {
|
||||
_runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
|
||||
}
|
||||
_shared_hashtable_controller->signal(id());
|
||||
}
|
||||
} else {
|
||||
DCHECK(_shared_hashtable_controller != nullptr);
|
||||
DCHECK(_shared_hash_table_context != nullptr);
|
||||
auto wait_timer = ADD_TIMER(_build_phase_profile, "WaitForSharedHashTableTime");
|
||||
SCOPED_TIMER(wait_timer);
|
||||
RETURN_IF_ERROR(
|
||||
_shared_hashtable_controller->wait_for_signal(state, _shared_hash_table_context));
|
||||
|
||||
_build_phase_profile->add_info_string(
|
||||
"SharedHashTableFrom",
|
||||
print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id())));
|
||||
_short_circuit_for_null_in_probe_side =
|
||||
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
|
||||
_hash_table_variants = std::static_pointer_cast<HashTableVariants>(
|
||||
_shared_hash_table_context->hash_table_variants);
|
||||
_build_blocks = _shared_hash_table_context->blocks;
|
||||
|
||||
if (!_shared_hash_table_context->runtime_filters.empty()) {
|
||||
auto ret = std::visit(
|
||||
Overload {[&](std::monostate&) -> Status {
|
||||
LOG(FATAL) << "FATAL: uninited hash table";
|
||||
__builtin_unreachable();
|
||||
},
|
||||
[&](auto&& arg) -> Status {
|
||||
if (_runtime_filter_descs.empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
_runtime_filter_slots = _pool->add(new VRuntimeFilterSlots(
|
||||
_probe_expr_ctxs, _build_expr_ctxs,
|
||||
_runtime_filter_descs));
|
||||
|
||||
RETURN_IF_ERROR(_runtime_filter_slots->init(
|
||||
state, arg.hash_table.get_size()));
|
||||
return _runtime_filter_slots->copy_from_shared_context(
|
||||
_shared_hash_table_context);
|
||||
}},
|
||||
*_hash_table_variants);
|
||||
RETURN_IF_ERROR(ret);
|
||||
}
|
||||
}
|
||||
|
||||
_process_hashtable_ctx_variants_init(state);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
template <bool BuildSide>
|
||||
@ -993,7 +1017,7 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) {
|
||||
__builtin_unreachable();
|
||||
},
|
||||
[&](auto&& arg) {
|
||||
arg.hash_table_ptr->set_partitioned_threshold(
|
||||
arg.hash_table.set_partitioned_threshold(
|
||||
state->partitioned_hash_join_rows_threshold());
|
||||
}},
|
||||
*_hash_table_variants);
|
||||
@ -1054,6 +1078,12 @@ void HashJoinNode::_reset_tuple_is_null_column() {
|
||||
}
|
||||
}
|
||||
|
||||
HashJoinNode::~HashJoinNode() {
|
||||
if (_shared_hashtable_controller && _should_build_hash_table) {
|
||||
_shared_hashtable_controller->signal(id());
|
||||
}
|
||||
}
|
||||
|
||||
void HashJoinNode::_release_mem() {
|
||||
_arena = nullptr;
|
||||
_hash_table_variants = nullptr;
|
||||
@ -1061,10 +1091,8 @@ void HashJoinNode::_release_mem() {
|
||||
_null_map_column = nullptr;
|
||||
_tuple_is_null_left_flag_column = nullptr;
|
||||
_tuple_is_null_right_flag_column = nullptr;
|
||||
_shared_hash_table_context = nullptr;
|
||||
_probe_block.clear();
|
||||
|
||||
std::vector<Block> tmp_build_blocks;
|
||||
_build_blocks.swap(tmp_build_blocks);
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -24,7 +24,9 @@
|
||||
#include "join_op.h"
|
||||
#include "process_hash_table_probe.h"
|
||||
#include "vec/common/columns_hashing.h"
|
||||
#include "vec/common/hash_table/hash_map.h"
|
||||
#include "vec/common/hash_table/partitioned_hash_map.h"
|
||||
#include "vec/runtime/shared_hash_table_controller.h"
|
||||
#include "vjoin_node_base.h"
|
||||
|
||||
namespace doris {
|
||||
@ -44,7 +46,6 @@ struct SerializedHashTableContext {
|
||||
using Iter = typename HashTable::iterator;
|
||||
|
||||
HashTable hash_table;
|
||||
HashTable* hash_table_ptr = &hash_table;
|
||||
Iter iter;
|
||||
bool inited = false;
|
||||
|
||||
@ -76,7 +77,6 @@ struct PrimaryTypeHashTableContext {
|
||||
using Iter = typename HashTable::iterator;
|
||||
|
||||
HashTable hash_table;
|
||||
HashTable* hash_table_ptr = &hash_table;
|
||||
Iter iter;
|
||||
bool inited = false;
|
||||
|
||||
@ -111,7 +111,6 @@ struct FixedKeyHashTableContext {
|
||||
using Iter = typename HashTable::iterator;
|
||||
|
||||
HashTable hash_table;
|
||||
HashTable* hash_table_ptr = &hash_table;
|
||||
Iter iter;
|
||||
bool inited = false;
|
||||
|
||||
@ -185,6 +184,7 @@ public:
|
||||
static constexpr int PREFETCH_STEP = 64;
|
||||
|
||||
HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
~HashJoinNode();
|
||||
|
||||
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
|
||||
Status prepare(RuntimeState* state) override;
|
||||
@ -235,14 +235,18 @@ private:
|
||||
|
||||
RuntimeProfile::Counter* _join_filter_timer;
|
||||
|
||||
RuntimeProfile* _build_phase_profile;
|
||||
|
||||
int64_t _mem_used;
|
||||
|
||||
std::unique_ptr<Arena> _arena;
|
||||
std::unique_ptr<HashTableVariants> _hash_table_variants;
|
||||
std::shared_ptr<Arena> _arena;
|
||||
|
||||
// maybe share hash table with other fragment instances
|
||||
std::shared_ptr<HashTableVariants> _hash_table_variants;
|
||||
|
||||
std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants;
|
||||
|
||||
std::vector<Block> _build_blocks;
|
||||
std::shared_ptr<std::vector<Block>> _build_blocks;
|
||||
Block _probe_block;
|
||||
ColumnRawPtrs _probe_columns;
|
||||
ColumnUInt8::MutablePtr _null_map_column;
|
||||
@ -259,8 +263,9 @@ private:
|
||||
Sizes _build_key_sz;
|
||||
|
||||
bool _is_broadcast_join = false;
|
||||
SharedHashTableController* _shared_hashtable_controller = nullptr;
|
||||
VRuntimeFilterSlots* _runtime_filter_slots;
|
||||
bool _should_build_hash_table = true;
|
||||
std::shared_ptr<SharedHashTableController> _shared_hashtable_controller = nullptr;
|
||||
VRuntimeFilterSlots* _runtime_filter_slots = nullptr;
|
||||
|
||||
std::vector<SlotId> _hash_output_slot_ids;
|
||||
std::vector<bool> _left_output_slot_flags;
|
||||
@ -269,6 +274,8 @@ private:
|
||||
MutableColumnPtr _tuple_is_null_left_flag_column;
|
||||
MutableColumnPtr _tuple_is_null_right_flag_column;
|
||||
|
||||
SharedHashTableContextPtr _shared_hash_table_context = nullptr;
|
||||
|
||||
private:
|
||||
Status _materialize_build_side(RuntimeState* state) override;
|
||||
|
||||
|
||||
@ -189,7 +189,7 @@ Status VScanNode::_register_runtime_filter() {
|
||||
for (int i = 0; i < filter_size; ++i) {
|
||||
IRuntimeFilter* runtime_filter = nullptr;
|
||||
const auto& filter_desc = _runtime_filter_descs[i];
|
||||
RETURN_IF_ERROR(_state->runtime_filter_mgr()->regist_filter(
|
||||
RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter(
|
||||
RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id()));
|
||||
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
|
||||
&runtime_filter));
|
||||
|
||||
@ -22,111 +22,57 @@
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
|
||||
bool SharedHashTableController::should_build_hash_table(RuntimeState* state, int my_node_id) {
|
||||
bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragment_instance_id,
|
||||
int my_node_id) {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
auto it = _builder_fragment_ids.find(my_node_id);
|
||||
if (it == _builder_fragment_ids.cend()) {
|
||||
_builder_fragment_ids[my_node_id] = state->fragment_instance_id();
|
||||
_builder_fragment_ids.insert({my_node_id, fragment_instance_id});
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool SharedHashTableController::supposed_to_build_hash_table(RuntimeState* state, int my_node_id) {
|
||||
SharedHashTableContextPtr SharedHashTableController::get_context(int my_node_id) {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
auto it = _builder_fragment_ids.find(my_node_id);
|
||||
if (it != _builder_fragment_ids.cend()) {
|
||||
return _builder_fragment_ids[my_node_id] == state->fragment_instance_id();
|
||||
auto it = _shared_contexts.find(my_node_id);
|
||||
if (it == _shared_contexts.cend()) {
|
||||
_shared_contexts.insert({my_node_id, std::make_shared<SharedHashTableContext>()});
|
||||
}
|
||||
return false;
|
||||
return _shared_contexts[my_node_id];
|
||||
}
|
||||
|
||||
void SharedHashTableController::put_hash_table(SharedHashTableEntry&& entry, int my_node_id) {
|
||||
void SharedHashTableController::signal(int my_node_id) {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
DCHECK(_hash_table_entries.find(my_node_id) == _hash_table_entries.cend());
|
||||
_hash_table_entries.insert({my_node_id, std::move(entry)});
|
||||
auto it = _shared_contexts.find(my_node_id);
|
||||
if (it != _shared_contexts.cend()) {
|
||||
it->second->signaled = true;
|
||||
_shared_contexts.erase(it);
|
||||
}
|
||||
_cv.notify_all();
|
||||
}
|
||||
|
||||
SharedHashTableEntry& SharedHashTableController::wait_for_hash_table(int my_node_id) {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
auto it = _hash_table_entries.find(my_node_id);
|
||||
if (it == _hash_table_entries.cend()) {
|
||||
_cv.wait(lock, [this, &it, my_node_id]() {
|
||||
it = _hash_table_entries.find(my_node_id);
|
||||
return it != _hash_table_entries.cend();
|
||||
});
|
||||
TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int my_node_id) {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
auto it = _builder_fragment_ids.find(my_node_id);
|
||||
if (it == _builder_fragment_ids.cend()) {
|
||||
return TUniqueId {};
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void SharedHashTableController::acquire_ref_count(RuntimeState* state, int my_node_id) {
|
||||
Status SharedHashTableController::wait_for_signal(RuntimeState* state,
|
||||
const SharedHashTableContextPtr& context) {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
_ref_fragments[my_node_id].emplace_back(state->fragment_instance_id());
|
||||
}
|
||||
|
||||
Status SharedHashTableController::release_ref_count(RuntimeState* state, int my_node_id) {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
auto id = state->fragment_instance_id();
|
||||
auto it = std::find(_ref_fragments[my_node_id].begin(), _ref_fragments[my_node_id].end(), id);
|
||||
CHECK(it != _ref_fragments[my_node_id].end());
|
||||
_ref_fragments[my_node_id].erase(it);
|
||||
_put_an_empty_entry_if_need(Status::Cancelled("hash table not build"), id, my_node_id);
|
||||
_cv.notify_all();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void SharedHashTableController::_put_an_empty_entry_if_need(Status status, TUniqueId fragment_id,
|
||||
int node_id) {
|
||||
auto builder_it = _builder_fragment_ids.find(node_id);
|
||||
if (builder_it != _builder_fragment_ids.end()) {
|
||||
if (builder_it->second == fragment_id) {
|
||||
if (_hash_table_entries.find(builder_it->first) == _hash_table_entries.cend()) {
|
||||
// "here put an empty SharedHashTableEntry to avoid deadlocking"
|
||||
_hash_table_entries.insert(
|
||||
{builder_it->first, SharedHashTableEntry::empty_entry_with_status(status)});
|
||||
}
|
||||
}
|
||||
// maybe builder signaled before other instances waiting,
|
||||
// so here need to check value of `signaled`
|
||||
while (!context->signaled) {
|
||||
_cv.wait_for(lock, std::chrono::milliseconds(400), [&]() { return context->signaled; });
|
||||
// return if the instances is cancelled(eg. query timeout)
|
||||
RETURN_IF_CANCELLED(state);
|
||||
}
|
||||
}
|
||||
|
||||
Status SharedHashTableController::release_ref_count_if_need(TUniqueId fragment_id, Status status) {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
bool need_to_notify = false;
|
||||
for (auto& ref : _ref_fragments) {
|
||||
auto it = std::find(ref.second.begin(), ref.second.end(), fragment_id);
|
||||
if (it == ref.second.end()) {
|
||||
continue;
|
||||
}
|
||||
ref.second.erase(it);
|
||||
need_to_notify = true;
|
||||
LOG(INFO) << "release_ref_count in node: " << ref.first
|
||||
<< " for fragment id: " << fragment_id;
|
||||
}
|
||||
|
||||
for (auto& builder : _builder_fragment_ids) {
|
||||
if (builder.second == fragment_id) {
|
||||
if (_hash_table_entries.find(builder.first) == _hash_table_entries.cend()) {
|
||||
_hash_table_entries.insert(
|
||||
{builder.first, SharedHashTableEntry::empty_entry_with_status(status)});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (need_to_notify) {
|
||||
_cv.notify_all();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SharedHashTableController::wait_for_closable(RuntimeState* state, int my_node_id) {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
if (!_ref_fragments[my_node_id].empty()) {
|
||||
_cv.wait(lock, [&]() { return _ref_fragments[my_node_id].empty(); });
|
||||
}
|
||||
RETURN_IF_CANCELLED(state);
|
||||
return Status::OK();
|
||||
return context->status;
|
||||
}
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
} // namespace doris
|
||||
|
||||
@ -29,70 +29,50 @@ namespace doris {
|
||||
|
||||
class RuntimeState;
|
||||
class TUniqueId;
|
||||
|
||||
template <typename ExprCtxType>
|
||||
class RuntimeFilterSlotsBase;
|
||||
class MinMaxFuncBase;
|
||||
class HybridSetBase;
|
||||
class BloomFilterFuncBase;
|
||||
|
||||
namespace vectorized {
|
||||
|
||||
class VExprContext;
|
||||
|
||||
struct SharedHashTableEntry {
|
||||
SharedHashTableEntry(Status status_, void* hash_table_ptr_, std::vector<Block>* blocks_,
|
||||
RuntimeFilterSlotsBase<VExprContext>* runtime_filter_slots_)
|
||||
: status(status_),
|
||||
hash_table_ptr(hash_table_ptr_),
|
||||
blocks(blocks_),
|
||||
runtime_filter_slots(runtime_filter_slots_) {}
|
||||
SharedHashTableEntry(SharedHashTableEntry&& entry)
|
||||
: status(entry.status),
|
||||
hash_table_ptr(entry.hash_table_ptr),
|
||||
blocks(entry.blocks),
|
||||
runtime_filter_slots(entry.runtime_filter_slots) {}
|
||||
struct SharedRuntimeFilterContext {
|
||||
std::shared_ptr<MinMaxFuncBase> minmax_func;
|
||||
std::shared_ptr<HybridSetBase> hybrid_set;
|
||||
std::shared_ptr<BloomFilterFuncBase> bloom_filter_func;
|
||||
};
|
||||
|
||||
static SharedHashTableEntry empty_entry_with_status(const Status& status) {
|
||||
return SharedHashTableEntry(status, nullptr, nullptr, nullptr);
|
||||
}
|
||||
struct SharedHashTableContext {
|
||||
SharedHashTableContext()
|
||||
: hash_table_variants(nullptr),
|
||||
signaled(false),
|
||||
short_circuit_for_null_in_probe_side(false) {}
|
||||
|
||||
Status status;
|
||||
void* hash_table_ptr;
|
||||
std::vector<Block>* blocks;
|
||||
RuntimeFilterSlotsBase<VExprContext>* runtime_filter_slots;
|
||||
std::shared_ptr<Arena> arena;
|
||||
std::shared_ptr<void> hash_table_variants;
|
||||
std::shared_ptr<std::vector<Block>> blocks;
|
||||
std::map<int, SharedRuntimeFilterContext> runtime_filters;
|
||||
bool signaled;
|
||||
bool short_circuit_for_null_in_probe_side;
|
||||
};
|
||||
|
||||
using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
|
||||
|
||||
class SharedHashTableController {
|
||||
public:
|
||||
bool should_build_hash_table(RuntimeState* state, int my_node_id);
|
||||
bool supposed_to_build_hash_table(RuntimeState* state, int my_node_id);
|
||||
void acquire_ref_count(RuntimeState* state, int my_node_id);
|
||||
SharedHashTableEntry& wait_for_hash_table(int my_node_id);
|
||||
Status release_ref_count(RuntimeState* state, int my_node_id);
|
||||
Status release_ref_count_if_need(TUniqueId fragment_id, Status status);
|
||||
void put_hash_table(SharedHashTableEntry&& entry, int my_node_id);
|
||||
Status wait_for_closable(RuntimeState* state, int my_node_id);
|
||||
|
||||
// Single-thread operation
|
||||
void set_short_circuit_for_null_in_probe_side(bool short_circuit_for_null_in_probe_side) {
|
||||
_short_circuit_for_null_in_probe_side = short_circuit_for_null_in_probe_side;
|
||||
}
|
||||
|
||||
bool short_circuit_for_null_in_probe_side() const {
|
||||
return _short_circuit_for_null_in_probe_side;
|
||||
}
|
||||
|
||||
private:
|
||||
// If the fragment instance was supposed to build hash table, but it didn't build.
|
||||
// To avoid deadlocking other fragment instances,
|
||||
// here need to put an empty SharedHashTableEntry with canceled status.
|
||||
void _put_an_empty_entry_if_need(Status status, TUniqueId fragment_id, int node_id);
|
||||
TUniqueId get_builder_fragment_instance_id(int my_node_id);
|
||||
SharedHashTableContextPtr get_context(int my_node_id);
|
||||
void signal(int my_node_id);
|
||||
Status wait_for_signal(RuntimeState* state, const SharedHashTableContextPtr& context);
|
||||
bool should_build_hash_table(const TUniqueId& fragment_instance_id, int my_node_id);
|
||||
|
||||
private:
|
||||
std::mutex _mutex;
|
||||
std::condition_variable _cv;
|
||||
std::map<int /*node id*/, TUniqueId /*fragment id*/> _builder_fragment_ids;
|
||||
std::map<int /*node id*/, SharedHashTableEntry> _hash_table_entries;
|
||||
std::map<int /*node id*/, std::vector<TUniqueId>> _ref_fragments;
|
||||
bool _short_circuit_for_null_in_probe_side;
|
||||
std::map<int /*node id*/, TUniqueId /*fragment instance id*/> _builder_fragment_ids;
|
||||
std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts;
|
||||
};
|
||||
|
||||
} // namespace vectorized
|
||||
|
||||
@ -1,95 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "shared_hashtable_controller.h"
|
||||
|
||||
#include <runtime/runtime_state.h>
|
||||
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
|
||||
bool SharedHashTableController::should_build_hash_table(RuntimeState* state, int my_node_id) {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
auto it = _builder_fragment_ids.find(my_node_id);
|
||||
if (it == _builder_fragment_ids.cend()) {
|
||||
_builder_fragment_ids[my_node_id] = state->fragment_instance_id();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void SharedHashTableController::put_hash_table(SharedHashTableEntry&& entry, int my_node_id) {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
DCHECK(_hash_table_entries.find(my_node_id) == _hash_table_entries.cend());
|
||||
_hash_table_entries.insert({my_node_id, std::move(entry)});
|
||||
_cv.notify_all();
|
||||
}
|
||||
|
||||
SharedHashTableEntry& SharedHashTableController::wait_for_hash_table(int my_node_id) {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
auto it = _hash_table_entries.find(my_node_id);
|
||||
if (it == _hash_table_entries.cend()) {
|
||||
_cv.wait(lock, [this, &it, my_node_id]() {
|
||||
it = _hash_table_entries.find(my_node_id);
|
||||
return it != _hash_table_entries.cend();
|
||||
});
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void SharedHashTableController::acquire_ref_count(RuntimeState* state, int my_node_id) {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
_ref_fragments[my_node_id].emplace_back(state->fragment_instance_id());
|
||||
}
|
||||
|
||||
Status SharedHashTableController::release_ref_count(RuntimeState* state, int my_node_id) {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
RETURN_IF_CANCELLED(state);
|
||||
auto id = state->fragment_instance_id();
|
||||
auto it = std::find(_ref_fragments[my_node_id].begin(), _ref_fragments[my_node_id].end(), id);
|
||||
CHECK(it != _ref_fragments[my_node_id].end());
|
||||
_ref_fragments[my_node_id].erase(it);
|
||||
_cv.notify_all();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SharedHashTableController::release_ref_count_if_need(TUniqueId fragment_id) {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
bool need_to_notify = false;
|
||||
for (auto& ref : _ref_fragments) {
|
||||
auto it = std::find(ref.second.begin(), ref.second.end(), fragment_id);
|
||||
if (it == ref.second.end()) continue;
|
||||
ref.second.erase(it);
|
||||
need_to_notify = true;
|
||||
LOG(INFO) << "release_ref_count in node: " << ref.first
|
||||
<< " for fragment id: " << fragment_id;
|
||||
}
|
||||
if (need_to_notify) _cv.notify_all();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SharedHashTableController::wait_for_closable(RuntimeState* state, int my_node_id) {
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
RETURN_IF_CANCELLED(state);
|
||||
if (!_ref_fragments[my_node_id].empty()) {
|
||||
_cv.wait(lock, [&]() { return _ref_fragments[my_node_id].empty(); });
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
@ -1,75 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
|
||||
#include "vec/core/block.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class RuntimeState;
|
||||
class TUniqueId;
|
||||
|
||||
namespace vectorized {
|
||||
|
||||
class VExprContext;
|
||||
|
||||
struct SharedHashTableEntry {
|
||||
SharedHashTableEntry(void* hash_table_ptr_, std::vector<Block>& blocks_,
|
||||
std::unordered_map<const Block*, std::vector<int>>& inserted_rows_,
|
||||
const std::vector<VExprContext*>& exprs)
|
||||
: hash_table_ptr(hash_table_ptr_),
|
||||
blocks(blocks_),
|
||||
inserted_rows(inserted_rows_),
|
||||
build_exprs(exprs) {}
|
||||
SharedHashTableEntry(SharedHashTableEntry&& entry)
|
||||
: hash_table_ptr(entry.hash_table_ptr),
|
||||
blocks(entry.blocks),
|
||||
inserted_rows(entry.inserted_rows),
|
||||
build_exprs(entry.build_exprs) {}
|
||||
void* hash_table_ptr;
|
||||
std::vector<Block>& blocks;
|
||||
std::unordered_map<const Block*, std::vector<int>>& inserted_rows;
|
||||
std::vector<VExprContext*> build_exprs;
|
||||
};
|
||||
|
||||
class SharedHashTableController {
|
||||
public:
|
||||
bool should_build_hash_table(RuntimeState* state, int my_node_id);
|
||||
void acquire_ref_count(RuntimeState* state, int my_node_id);
|
||||
SharedHashTableEntry& wait_for_hash_table(int my_node_id);
|
||||
Status release_ref_count(RuntimeState* state, int my_node_id);
|
||||
Status release_ref_count_if_need(TUniqueId fragment_id);
|
||||
void put_hash_table(SharedHashTableEntry&& entry, int my_node_id);
|
||||
Status wait_for_closable(RuntimeState* state, int my_node_id);
|
||||
|
||||
private:
|
||||
std::mutex _mutex;
|
||||
std::condition_variable _cv;
|
||||
std::map<int /*node id*/, TUniqueId /*fragment id*/> _builder_fragment_ids;
|
||||
std::map<int /*node id*/, SharedHashTableEntry> _hash_table_entries;
|
||||
std::map<int /*node id*/, std::vector<TUniqueId>> _ref_fragments;
|
||||
};
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
@ -231,6 +231,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String PARTITIONED_HASH_JOIN_ROWS_THRESHOLD = "partitioned_hash_join_rows_threshold";
|
||||
|
||||
public static final String ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN
|
||||
= "enable_share_hash_table_for_broadcast_join";
|
||||
|
||||
// session origin value
|
||||
public Map<Field, String> sessionOriginValue = new HashMap<Field, String>();
|
||||
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
|
||||
@ -607,6 +610,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = PARTITIONED_HASH_JOIN_ROWS_THRESHOLD)
|
||||
public int partitionedHashJoinRowsThreshold = 0;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN)
|
||||
public boolean enableShareHashTableForBroadcastJoin = true;
|
||||
|
||||
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables,
|
||||
// not the default value set in the code.
|
||||
public void initFuzzyModeVariables() {
|
||||
@ -1227,6 +1233,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
tResult.setBeExecVersion(Config.be_exec_version);
|
||||
tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
|
||||
tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);
|
||||
tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin);
|
||||
|
||||
tResult.setBatchSize(batchSize);
|
||||
tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);
|
||||
|
||||
@ -181,6 +181,8 @@ struct TQueryOptions {
|
||||
52: optional i32 be_exec_version = 0
|
||||
|
||||
53: optional i32 partitioned_hash_join_rows_threshold = 0
|
||||
|
||||
54: optional bool enable_share_hash_table_for_broadcast_join
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user