[Improvement](runtime filter) Improve merge phase (#18828)
This commit is contained in:
@ -204,6 +204,8 @@ public:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
size_t get_size() const { return _bloom_filter ? _bloom_filter->size() : 0; }
|
||||
|
||||
void light_copy(BloomFilterFuncBase* bloomfilter_func) {
|
||||
auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
|
||||
_bloom_filter_alloced = other_func->_bloom_filter_alloced;
|
||||
|
||||
@ -348,28 +348,52 @@ public:
|
||||
RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool,
|
||||
const RuntimeFilterParams* params)
|
||||
: _state(state),
|
||||
_be_exec_version(_state->be_exec_version()),
|
||||
_pool(pool),
|
||||
_column_return_type(params->column_return_type),
|
||||
_filter_type(params->filter_type),
|
||||
_fragment_instance_id(params->fragment_instance_id),
|
||||
_filter_id(params->filter_id),
|
||||
_use_batch(IRuntimeFilter::enable_use_batch(_state->be_exec_version(),
|
||||
_column_return_type)),
|
||||
_use_new_hash(_state->be_exec_version() >= 2) {}
|
||||
_use_batch(
|
||||
IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)),
|
||||
_use_new_hash(_be_exec_version >= 2) {}
|
||||
// for a 'tmp' runtime predicate wrapper
|
||||
// only could called assign method or as a param for merge
|
||||
RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool, PrimitiveType column_type,
|
||||
RuntimeFilterType type, UniqueId fragment_instance_id,
|
||||
uint32_t filter_id)
|
||||
RuntimeFilterType type, uint32_t filter_id)
|
||||
: _state(state),
|
||||
_be_exec_version(_state->be_exec_version()),
|
||||
_pool(pool),
|
||||
_column_return_type(column_type),
|
||||
_filter_type(type),
|
||||
_fragment_instance_id(fragment_instance_id),
|
||||
_filter_id(filter_id),
|
||||
_use_batch(IRuntimeFilter::enable_use_batch(_state->be_exec_version(),
|
||||
_column_return_type)),
|
||||
_use_new_hash(_state->be_exec_version() >= 2) {}
|
||||
_use_batch(
|
||||
IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)),
|
||||
_use_new_hash(_be_exec_version >= 2) {}
|
||||
|
||||
RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool,
|
||||
const RuntimeFilterParams* params)
|
||||
: _query_ctx(query_ctx),
|
||||
_be_exec_version(_query_ctx->be_exec_version()),
|
||||
_pool(pool),
|
||||
_column_return_type(params->column_return_type),
|
||||
_filter_type(params->filter_type),
|
||||
_filter_id(params->filter_id),
|
||||
_use_batch(
|
||||
IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)),
|
||||
_use_new_hash(_be_exec_version >= 2) {}
|
||||
// for a 'tmp' runtime predicate wrapper
|
||||
// only could called assign method or as a param for merge
|
||||
RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool, PrimitiveType column_type,
|
||||
RuntimeFilterType type, uint32_t filter_id)
|
||||
: _query_ctx(query_ctx),
|
||||
_be_exec_version(_query_ctx->be_exec_version()),
|
||||
_pool(pool),
|
||||
_column_return_type(column_type),
|
||||
_filter_type(type),
|
||||
_filter_id(filter_id),
|
||||
_use_batch(
|
||||
IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)),
|
||||
_use_new_hash(_be_exec_version >= 2) {}
|
||||
// init runtime filter wrapper
|
||||
// alloc memory to init runtime filter function
|
||||
Status init(const RuntimeFilterParams* params) {
|
||||
@ -542,8 +566,7 @@ public:
|
||||
void insert_batch(const vectorized::ColumnPtr column, const std::vector<int>& rows) {
|
||||
if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) {
|
||||
bitmap_filter_insert_batch(column, rows);
|
||||
} else if (IRuntimeFilter::enable_use_batch(_state->be_exec_version(),
|
||||
_column_return_type)) {
|
||||
} else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)) {
|
||||
insert_fixed_len(column->get_raw_data().data, rows.data(), rows.size());
|
||||
} else {
|
||||
for (int index : rows) {
|
||||
@ -571,7 +594,14 @@ public:
|
||||
return real_filter_type;
|
||||
}
|
||||
|
||||
Status get_push_vexprs(std::vector<vectorized::VExpr*>* container, RuntimeState* state,
|
||||
size_t get_bloom_filter_size() {
|
||||
if (_is_bloomfilter) {
|
||||
return _context.bloom_filter_func->get_size();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
Status get_push_vexprs(std::vector<vectorized::VExpr*>* container,
|
||||
vectorized::VExprContext* prob_expr);
|
||||
|
||||
Status merge(const RuntimePredicateWrapper* wrapper) {
|
||||
@ -583,7 +613,6 @@ public:
|
||||
_filter_type != wrapper->_filter_type;
|
||||
|
||||
CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other)
|
||||
<< "fragment instance " << _fragment_instance_id.to_string()
|
||||
<< " can not merge runtime filter(id=" << _filter_id
|
||||
<< "), current is filter type is " << to_string(_filter_type)
|
||||
<< ", other filter type is " << to_string(wrapper->_filter_type);
|
||||
@ -593,8 +622,7 @@ public:
|
||||
if (_is_ignored_in_filter) {
|
||||
break;
|
||||
} else if (wrapper->_is_ignored_in_filter) {
|
||||
VLOG_DEBUG << "fragment instance " << _fragment_instance_id.to_string()
|
||||
<< " ignore merge runtime filter(in filter id " << _filter_id
|
||||
VLOG_DEBUG << " ignore merge runtime filter(in filter id " << _filter_id
|
||||
<< ") because: " << *(wrapper->get_ignored_in_filter_msg());
|
||||
|
||||
_is_ignored_in_filter = true;
|
||||
@ -608,8 +636,7 @@ public:
|
||||
if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) {
|
||||
#ifdef VLOG_DEBUG_IS_ON
|
||||
std::stringstream msg;
|
||||
msg << "fragment instance " << _fragment_instance_id.to_string()
|
||||
<< " ignore merge runtime filter(in filter id " << _filter_id
|
||||
msg << " ignore merge runtime filter(in filter id " << _filter_id
|
||||
<< ") because: in_num(" << _context.hybrid_set->size() << ") >= max_in_num("
|
||||
<< _max_in_num << ")";
|
||||
_ignored_in_filter_msg = _pool->add(new std::string(msg.str()));
|
||||
@ -637,22 +664,19 @@ public:
|
||||
if (real_filter_type == RuntimeFilterType::IN_FILTER) {
|
||||
if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { // in merge in
|
||||
CHECK(!wrapper->_is_ignored_in_filter)
|
||||
<< "fragment instance " << _fragment_instance_id.to_string()
|
||||
<< " can not ignore merge runtime filter(in filter id "
|
||||
<< wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: "
|
||||
<< *(wrapper->get_ignored_in_filter_msg());
|
||||
_context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
|
||||
if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) {
|
||||
VLOG_DEBUG << "fragment instance " << _fragment_instance_id.to_string()
|
||||
<< " change runtime filter to bloom filter(id=" << _filter_id
|
||||
VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id
|
||||
<< ") because: in_num(" << _context.hybrid_set->size()
|
||||
<< ") >= max_in_num(" << _max_in_num << ")";
|
||||
change_to_bloom_filter();
|
||||
}
|
||||
// in merge bloom filter
|
||||
} else {
|
||||
VLOG_DEBUG << "fragment instance " << _fragment_instance_id.to_string()
|
||||
<< " change runtime filter to bloom filter(id=" << _filter_id
|
||||
VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id
|
||||
<< ") because: already exist a bloom filter";
|
||||
change_to_bloom_filter();
|
||||
_context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get());
|
||||
@ -661,7 +685,6 @@ public:
|
||||
if (wrapper->_filter_type ==
|
||||
RuntimeFilterType::IN_FILTER) { // bloom filter merge in
|
||||
CHECK(!wrapper->_is_ignored_in_filter)
|
||||
<< "fragment instance " << _fragment_instance_id.to_string()
|
||||
<< " can not ignore merge runtime filter(in filter id "
|
||||
<< wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: "
|
||||
<< *(wrapper->get_ignored_in_filter_msg());
|
||||
@ -1036,6 +1059,8 @@ public:
|
||||
|
||||
private:
|
||||
RuntimeState* _state;
|
||||
QueryContext* _query_ctx;
|
||||
int _be_exec_version;
|
||||
ObjectPool* _pool;
|
||||
|
||||
// When a runtime filter received from remote and it is a bloom filter, _column_return_type will be invalid.
|
||||
@ -1047,7 +1072,6 @@ private:
|
||||
bool _is_bloomfilter = false;
|
||||
bool _is_ignored_in_filter = false;
|
||||
std::string* _ignored_in_filter_msg = nullptr;
|
||||
UniqueId _fragment_instance_id;
|
||||
uint32_t _filter_id;
|
||||
|
||||
// When _column_return_type is invalid, _use_batch will be always false.
|
||||
@ -1063,9 +1087,16 @@ Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRunt
|
||||
int node_id, IRuntimeFilter** res, bool build_bf_exactly) {
|
||||
*res = pool->add(new IRuntimeFilter(state, pool));
|
||||
(*res)->set_role(role);
|
||||
UniqueId fragment_instance_id(state->fragment_instance_id());
|
||||
return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id,
|
||||
build_bf_exactly);
|
||||
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
|
||||
}
|
||||
|
||||
Status IRuntimeFilter::create(QueryContext* query_ctx, ObjectPool* pool,
|
||||
const TRuntimeFilterDesc* desc, const TQueryOptions* query_options,
|
||||
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
|
||||
bool build_bf_exactly) {
|
||||
*res = pool->add(new IRuntimeFilter(query_ctx, pool));
|
||||
(*res)->set_role(role);
|
||||
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
|
||||
}
|
||||
|
||||
void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) {
|
||||
@ -1100,6 +1131,7 @@ Status IRuntimeFilter::publish() {
|
||||
if (_has_local_target) {
|
||||
IRuntimeFilter* consumer_filter = nullptr;
|
||||
// TODO: log if err
|
||||
DCHECK(_state != nullptr);
|
||||
RETURN_IF_ERROR(
|
||||
_state->runtime_filter_mgr()->get_consume_filter(_filter_id, &consumer_filter));
|
||||
// push down
|
||||
@ -1109,8 +1141,9 @@ Status IRuntimeFilter::publish() {
|
||||
return Status::OK();
|
||||
} else {
|
||||
TNetworkAddress addr;
|
||||
DCHECK(_state != nullptr);
|
||||
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_merge_addr(&addr));
|
||||
return push_to_remote(_state, &addr);
|
||||
return push_to_remote(_state, &addr, _opt_remote_rf);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1124,7 +1157,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_
|
||||
if (!_is_ignored) {
|
||||
_set_push_down();
|
||||
_profile->add_info_string("Info", _format_status());
|
||||
return _wrapper->get_push_vexprs(push_vexprs, _state, _vprobe_ctx);
|
||||
return _wrapper->get_push_vexprs(push_vexprs, _vprobe_ctx);
|
||||
} else {
|
||||
_profile->add_info_string("Info", _format_status());
|
||||
return Status::OK();
|
||||
@ -1132,32 +1165,35 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_
|
||||
}
|
||||
|
||||
Status IRuntimeFilter::get_prepared_vexprs(std::vector<vectorized::VExpr*>* vexprs,
|
||||
const RowDescriptor& desc) {
|
||||
const RowDescriptor& desc, RuntimeState* state) {
|
||||
_profile->add_info_string("Info", _format_status());
|
||||
if (_is_ignored) {
|
||||
return Status::OK();
|
||||
}
|
||||
DCHECK((!_state->enable_pipeline_exec() && _rf_state == RuntimeFilterState::READY) ||
|
||||
(_state->enable_pipeline_exec() &&
|
||||
DCHECK((!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) ||
|
||||
(_enable_pipeline_exec &&
|
||||
_rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY));
|
||||
DCHECK(is_consumer());
|
||||
std::lock_guard guard(_inner_mutex);
|
||||
|
||||
if (_push_down_vexprs.empty()) {
|
||||
RETURN_IF_ERROR(_wrapper->get_push_vexprs(&_push_down_vexprs, _state, _vprobe_ctx));
|
||||
RETURN_IF_ERROR(_wrapper->get_push_vexprs(&_push_down_vexprs, _vprobe_ctx));
|
||||
}
|
||||
// push expr
|
||||
vexprs->insert(vexprs->end(), _push_down_vexprs.begin(), _push_down_vexprs.end());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool IRuntimeFilter::await() {
|
||||
DCHECK(is_consumer());
|
||||
auto execution_timeout = _state == nullptr ? _query_ctx->execution_timeout() * 1000
|
||||
: _state->execution_timeout() * 1000;
|
||||
auto runtime_filter_wait_time_ms = _state == nullptr ? _query_ctx->runtime_filter_wait_time_ms()
|
||||
: _state->runtime_filter_wait_time_ms();
|
||||
// bitmap filter is precise filter and only filter once, so it must be applied.
|
||||
int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
|
||||
? _state->execution_timeout() * 1000
|
||||
: _state->runtime_filter_wait_time_ms();
|
||||
if (_state->enable_pipeline_exec()) {
|
||||
? execution_timeout
|
||||
: runtime_filter_wait_time_ms;
|
||||
if (_enable_pipeline_exec) {
|
||||
auto expected = _rf_state_atomic.load(std::memory_order_acquire);
|
||||
if (expected == RuntimeFilterState::NOT_READY) {
|
||||
if (!_rf_state_atomic.compare_exchange_strong(
|
||||
@ -1203,12 +1239,16 @@ bool IRuntimeFilter::await() {
|
||||
bool IRuntimeFilter::is_ready_or_timeout() {
|
||||
DCHECK(is_consumer());
|
||||
auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
|
||||
auto execution_timeout = _state == nullptr ? _query_ctx->execution_timeout() * 1000
|
||||
: _state->execution_timeout() * 1000;
|
||||
auto runtime_filter_wait_time_ms = _state == nullptr ? _query_ctx->runtime_filter_wait_time_ms()
|
||||
: _state->runtime_filter_wait_time_ms();
|
||||
// bitmap filter is precise filter and only filter once, so it must be applied.
|
||||
int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
|
||||
? _state->execution_timeout() * 1000
|
||||
: _state->runtime_filter_wait_time_ms();
|
||||
? execution_timeout
|
||||
: runtime_filter_wait_time_ms;
|
||||
int64_t ms_since_registration = MonotonicMillis() - registration_time_;
|
||||
if (!_state->enable_pipeline_exec()) {
|
||||
if (!_enable_pipeline_exec) {
|
||||
_rf_state = RuntimeFilterState::TIME_OUT;
|
||||
return true;
|
||||
} else if (is_ready()) {
|
||||
@ -1245,7 +1285,7 @@ bool IRuntimeFilter::is_ready_or_timeout() {
|
||||
|
||||
void IRuntimeFilter::signal() {
|
||||
DCHECK(is_consumer());
|
||||
if (_state->enable_pipeline_exec()) {
|
||||
if (_enable_pipeline_exec) {
|
||||
_rf_state_atomic.store(RuntimeFilterState::READY);
|
||||
} else {
|
||||
std::unique_lock lock(_inner_mutex);
|
||||
@ -1261,6 +1301,10 @@ void IRuntimeFilter::signal() {
|
||||
_profile->add_info_string("BitmapSize", std::to_string(bitmap_filter->size()));
|
||||
_profile->add_info_string("IsNotIn", bitmap_filter->is_not_in() ? "true" : "false");
|
||||
}
|
||||
if (_wrapper->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
|
||||
_profile->add_info_string("BloomFilterSize",
|
||||
std::to_string(_wrapper->get_bloom_filter_size()));
|
||||
}
|
||||
}
|
||||
|
||||
BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
|
||||
@ -1268,8 +1312,7 @@ BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
|
||||
}
|
||||
|
||||
Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
|
||||
UniqueId fragment_instance_id, int node_id,
|
||||
bool build_bf_exactly) {
|
||||
int node_id, bool build_bf_exactly) {
|
||||
// if node_id == -1 , it shouldn't be a consumer
|
||||
DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));
|
||||
|
||||
@ -1292,15 +1335,19 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
|
||||
_has_remote_target = desc->has_remote_targets;
|
||||
_expr_order = desc->expr_order;
|
||||
_filter_id = desc->filter_id;
|
||||
_opt_remote_rf = desc->__isset.opt_remote_rf && desc->opt_remote_rf;
|
||||
vectorized::VExprContext* build_ctx = nullptr;
|
||||
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool, desc->src_expr, &build_ctx));
|
||||
|
||||
RuntimeFilterParams params;
|
||||
params.fragment_instance_id = fragment_instance_id;
|
||||
params.filter_id = _filter_id;
|
||||
params.filter_type = _runtime_filter_type;
|
||||
params.column_return_type = build_ctx->root()->type().type;
|
||||
params.max_in_num = options->runtime_filter_max_in_num;
|
||||
// We build runtime filter by exact distinct count iff three conditions are met:
|
||||
// 1. Only 1 join key
|
||||
// 2. Do not have remote target (e.g. do not need to merge)
|
||||
// 3. Bloom filter
|
||||
params.build_bf_exactly = build_bf_exactly && !_has_remote_target &&
|
||||
_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER;
|
||||
if (desc->__isset.bloom_filter_size_bytes) {
|
||||
@ -1334,7 +1381,11 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
|
||||
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool, iter->second, &_vprobe_ctx));
|
||||
}
|
||||
|
||||
_wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms));
|
||||
if (_state) {
|
||||
_wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms));
|
||||
} else {
|
||||
_wrapper = _pool->add(new RuntimePredicateWrapper(_query_ctx, _pool, ¶ms));
|
||||
}
|
||||
return _wrapper->init(¶ms);
|
||||
}
|
||||
|
||||
@ -1346,6 +1397,10 @@ Status IRuntimeFilter::serialize(PPublishFilterRequest* request, void** data, in
|
||||
return serialize_impl(request, data, len);
|
||||
}
|
||||
|
||||
Status IRuntimeFilter::serialize(PPublishFilterRequestV2* request, void** data, int* len) {
|
||||
return serialize_impl(request, data, len);
|
||||
}
|
||||
|
||||
Status IRuntimeFilter::create_wrapper(RuntimeState* state, const MergeRuntimeFilterParams* param,
|
||||
ObjectPool* pool,
|
||||
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
|
||||
@ -1358,6 +1413,35 @@ Status IRuntimeFilter::create_wrapper(RuntimeState* state, const UpdateRuntimeFi
|
||||
return _create_wrapper(state, param, pool, wrapper);
|
||||
}
|
||||
|
||||
Status IRuntimeFilter::create_wrapper(QueryContext* query_ctx,
|
||||
const UpdateRuntimeFilterParamsV2* param, ObjectPool* pool,
|
||||
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
|
||||
int filter_type = param->request->filter_type();
|
||||
PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
|
||||
if (param->request->has_in_filter()) {
|
||||
column_type = to_primitive_type(param->request->in_filter().column_type());
|
||||
}
|
||||
wrapper->reset(new RuntimePredicateWrapper(query_ctx, pool, column_type, get_type(filter_type),
|
||||
param->request->filter_id()));
|
||||
|
||||
switch (filter_type) {
|
||||
case PFilterType::IN_FILTER: {
|
||||
DCHECK(param->request->has_in_filter());
|
||||
return (*wrapper)->assign(¶m->request->in_filter());
|
||||
}
|
||||
case PFilterType::BLOOM_FILTER: {
|
||||
DCHECK(param->request->has_bloom_filter());
|
||||
return (*wrapper)->assign(¶m->request->bloom_filter(), param->data);
|
||||
}
|
||||
case PFilterType::MINMAX_FILTER: {
|
||||
DCHECK(param->request->has_minmax_filter());
|
||||
return (*wrapper)->assign(¶m->request->minmax_filter());
|
||||
}
|
||||
default:
|
||||
return Status::InvalidArgument("unknown filter type");
|
||||
}
|
||||
}
|
||||
|
||||
void IRuntimeFilter::change_to_bloom_filter() {
|
||||
auto origin_type = _wrapper->get_real_type();
|
||||
_wrapper->change_to_bloom_filter();
|
||||
@ -1379,7 +1463,6 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, Obje
|
||||
column_type = to_primitive_type(param->request->in_filter().column_type());
|
||||
}
|
||||
wrapper->reset(new RuntimePredicateWrapper(state, pool, column_type, get_type(filter_type),
|
||||
UniqueId(param->request->fragment_id()),
|
||||
param->request->filter_id()));
|
||||
|
||||
switch (filter_type) {
|
||||
@ -1401,11 +1484,22 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, Obje
|
||||
}
|
||||
|
||||
void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
|
||||
DCHECK(parent_profile != nullptr);
|
||||
_profile.reset(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
|
||||
::doris::to_string(_runtime_filter_type))));
|
||||
if (_profile_init) {
|
||||
return;
|
||||
}
|
||||
{
|
||||
std::lock_guard guard(_inner_mutex);
|
||||
if (_profile_init) {
|
||||
return;
|
||||
}
|
||||
DCHECK(parent_profile != nullptr);
|
||||
_profile.reset(
|
||||
new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
|
||||
::doris::to_string(_runtime_filter_type))));
|
||||
_profile_init = true;
|
||||
}
|
||||
parent_profile->add_child(_profile.get(), true, nullptr);
|
||||
if (!_state->enable_pipeline_exec()) {
|
||||
if (!_enable_pipeline_exec) {
|
||||
_await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
|
||||
}
|
||||
_profile->add_info_string("Info", _format_status());
|
||||
@ -1421,10 +1515,6 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() {
|
||||
}
|
||||
}
|
||||
|
||||
void IRuntimeFilter::set_push_down_profile() {
|
||||
_profile->add_info_string("HasPushDownToEngine", "true");
|
||||
}
|
||||
|
||||
void IRuntimeFilter::ready_for_publish() {
|
||||
_wrapper->ready_for_publish();
|
||||
}
|
||||
@ -1446,10 +1536,6 @@ Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
|
||||
return status;
|
||||
}
|
||||
|
||||
const RuntimePredicateWrapper* IRuntimeFilter::get_wrapper() {
|
||||
return _wrapper;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void batch_copy(PInFilter* filter, HybridSetBase::IteratorBase* it,
|
||||
void (*set_func)(PColumnValue*, const T*)) {
|
||||
@ -1752,21 +1838,46 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param) {
|
||||
int64_t start_update = MonotonicMillis();
|
||||
if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) {
|
||||
set_ignored();
|
||||
const PInFilter in_filter = param->request->in_filter();
|
||||
auto msg = param->pool->add(new std::string(in_filter.ignored_msg()));
|
||||
set_ignored_msg(*msg);
|
||||
}
|
||||
|
||||
std::unique_ptr<RuntimePredicateWrapper> tmp_wrapper;
|
||||
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_query_ctx, param, _pool, &tmp_wrapper));
|
||||
auto origin_type = _wrapper->get_real_type();
|
||||
RETURN_IF_ERROR(_wrapper->merge(tmp_wrapper.get()));
|
||||
if (origin_type != _wrapper->get_real_type()) {
|
||||
update_runtime_filter_type_to_profile();
|
||||
}
|
||||
this->signal();
|
||||
|
||||
_profile->add_info_string("MergeTime", std::to_string(param->request->merge_time()) + " ms");
|
||||
_profile->add_info_string("UpdateTime",
|
||||
std::to_string(MonotonicMillis() - start_update) + " ms");
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status IRuntimeFilter::consumer_close() {
|
||||
DCHECK(is_consumer());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status RuntimePredicateWrapper::get_push_vexprs(std::vector<vectorized::VExpr*>* container,
|
||||
RuntimeState* state,
|
||||
vectorized::VExprContext* vprob_expr) {
|
||||
DCHECK(state != nullptr);
|
||||
DCHECK(container != nullptr);
|
||||
DCHECK(_pool != nullptr);
|
||||
DCHECK(vprob_expr->root()->type().type == _column_return_type ||
|
||||
(is_string_type(vprob_expr->root()->type().type) &&
|
||||
is_string_type(_column_return_type)) ||
|
||||
_filter_type == RuntimeFilterType::BITMAP_FILTER);
|
||||
_filter_type == RuntimeFilterType::BITMAP_FILTER)
|
||||
<< " vprob_expr->root()->type().type: " << vprob_expr->root()->type().type
|
||||
<< " _column_return_type: " << _column_return_type
|
||||
<< " _filter_type: " << ::doris::to_string(_filter_type);
|
||||
|
||||
auto real_filter_type = get_real_type();
|
||||
switch (real_filter_type) {
|
||||
|
||||
@ -32,6 +32,7 @@
|
||||
#include "runtime/define_primitive_type.h"
|
||||
#include "runtime/large_int_value.h"
|
||||
#include "runtime/primitive_type.h"
|
||||
#include "runtime/query_context.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/types.h"
|
||||
#include "util/lock.h"
|
||||
@ -51,6 +52,7 @@ namespace doris {
|
||||
class ObjectPool;
|
||||
class RuntimePredicateWrapper;
|
||||
class PPublishFilterRequest;
|
||||
class PPublishFilterRequestV2;
|
||||
class PMergeFilterRequest;
|
||||
class TRuntimeFilterDesc;
|
||||
class RowDescriptor;
|
||||
@ -106,7 +108,6 @@ struct RuntimeFilterParams {
|
||||
bloom_filter_size(-1),
|
||||
max_in_num(0),
|
||||
filter_id(0),
|
||||
fragment_instance_id(0, 0),
|
||||
bitmap_filter_not_in(false) {}
|
||||
|
||||
RuntimeFilterType filter_type;
|
||||
@ -115,7 +116,6 @@ struct RuntimeFilterParams {
|
||||
int64_t bloom_filter_size;
|
||||
int32_t max_in_num;
|
||||
int32_t filter_id;
|
||||
UniqueId fragment_instance_id;
|
||||
bool bitmap_filter_not_in;
|
||||
bool build_bf_exactly;
|
||||
};
|
||||
@ -129,6 +129,16 @@ struct UpdateRuntimeFilterParams {
|
||||
ObjectPool* pool;
|
||||
};
|
||||
|
||||
struct UpdateRuntimeFilterParamsV2 {
|
||||
UpdateRuntimeFilterParamsV2(const PPublishFilterRequestV2* req,
|
||||
butil::IOBufAsZeroCopyInputStream* data_stream,
|
||||
ObjectPool* obj_pool)
|
||||
: request(req), data(data_stream), pool(obj_pool) {}
|
||||
const PPublishFilterRequestV2* request;
|
||||
butil::IOBufAsZeroCopyInputStream* data;
|
||||
ObjectPool* pool;
|
||||
};
|
||||
|
||||
struct MergeRuntimeFilterParams {
|
||||
MergeRuntimeFilterParams(const PMergeFilterRequest* req,
|
||||
butil::IOBufAsZeroCopyInputStream* data_stream)
|
||||
@ -164,7 +174,25 @@ public:
|
||||
_expr_order(-1),
|
||||
_always_true(false),
|
||||
_is_ignored(false),
|
||||
registration_time_(MonotonicMillis()) {}
|
||||
registration_time_(MonotonicMillis()),
|
||||
_enable_pipeline_exec(_state->enable_pipeline_exec()) {}
|
||||
|
||||
IRuntimeFilter(QueryContext* query_ctx, ObjectPool* pool)
|
||||
: _query_ctx(query_ctx),
|
||||
_pool(pool),
|
||||
_runtime_filter_type(RuntimeFilterType::UNKNOWN_FILTER),
|
||||
_filter_id(-1),
|
||||
_is_broadcast_join(true),
|
||||
_has_remote_target(false),
|
||||
_has_local_target(false),
|
||||
_rf_state(RuntimeFilterState::NOT_READY),
|
||||
_rf_state_atomic(RuntimeFilterState::NOT_READY),
|
||||
_role(RuntimeFilterRole::PRODUCER),
|
||||
_expr_order(-1),
|
||||
_always_true(false),
|
||||
_is_ignored(false),
|
||||
registration_time_(MonotonicMillis()),
|
||||
_enable_pipeline_exec(query_ctx->enable_pipeline_exec()) {}
|
||||
|
||||
~IRuntimeFilter() = default;
|
||||
|
||||
@ -172,6 +200,10 @@ public:
|
||||
const TQueryOptions* query_options, const RuntimeFilterRole role,
|
||||
int node_id, IRuntimeFilter** res, bool build_bf_exactly = false);
|
||||
|
||||
static Status create(QueryContext* query_ctx, ObjectPool* pool, const TRuntimeFilterDesc* desc,
|
||||
const TQueryOptions* query_options, const RuntimeFilterRole role,
|
||||
int node_id, IRuntimeFilter** res, bool build_bf_exactly = false);
|
||||
|
||||
void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context);
|
||||
Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context);
|
||||
|
||||
@ -192,20 +224,19 @@ public:
|
||||
Status get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_vexprs);
|
||||
|
||||
Status get_prepared_vexprs(std::vector<doris::vectorized::VExpr*>* push_vexprs,
|
||||
const RowDescriptor& desc);
|
||||
const RowDescriptor& desc, RuntimeState* state);
|
||||
|
||||
bool is_broadcast_join() const { return _is_broadcast_join; }
|
||||
|
||||
bool has_remote_target() const { return _has_remote_target; }
|
||||
|
||||
bool is_ready() const {
|
||||
return (!_state->enable_pipeline_exec() && _rf_state == RuntimeFilterState::READY) ||
|
||||
(_state->enable_pipeline_exec() &&
|
||||
return (!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) ||
|
||||
(_enable_pipeline_exec &&
|
||||
_rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY);
|
||||
}
|
||||
RuntimeFilterState current_state() const {
|
||||
return _state->enable_pipeline_exec() ? _rf_state_atomic.load(std::memory_order_acquire)
|
||||
: _rf_state;
|
||||
return _enable_pipeline_exec ? _rf_state_atomic.load(std::memory_order_acquire) : _rf_state;
|
||||
}
|
||||
bool is_ready_or_timeout();
|
||||
|
||||
@ -226,33 +257,34 @@ public:
|
||||
|
||||
// init filter with desc
|
||||
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
|
||||
UniqueId fragment_id, int node_id = -1, bool build_bf_exactly = false);
|
||||
int node_id = -1, bool build_bf_exactly = false);
|
||||
|
||||
BloomFilterFuncBase* get_bloomfilter() const;
|
||||
|
||||
// serialize _wrapper to protobuf
|
||||
Status serialize(PMergeFilterRequest* request, void** data, int* len);
|
||||
Status serialize(PPublishFilterRequest* request, void** data = nullptr, int* len = nullptr);
|
||||
Status serialize(PPublishFilterRequestV2* request, void** data = nullptr, int* len = nullptr);
|
||||
|
||||
Status merge_from(const RuntimePredicateWrapper* wrapper);
|
||||
|
||||
// for ut
|
||||
const RuntimePredicateWrapper* get_wrapper();
|
||||
static Status create_wrapper(RuntimeState* state, const MergeRuntimeFilterParams* param,
|
||||
ObjectPool* pool,
|
||||
std::unique_ptr<RuntimePredicateWrapper>* wrapper);
|
||||
static Status create_wrapper(RuntimeState* state, const UpdateRuntimeFilterParams* param,
|
||||
ObjectPool* pool,
|
||||
std::unique_ptr<RuntimePredicateWrapper>* wrapper);
|
||||
static Status create_wrapper(QueryContext* query_ctx, const UpdateRuntimeFilterParamsV2* param,
|
||||
ObjectPool* pool,
|
||||
std::unique_ptr<RuntimePredicateWrapper>* wrapper);
|
||||
void change_to_bloom_filter();
|
||||
Status init_bloom_filter(const size_t build_bf_cardinality);
|
||||
Status update_filter(const UpdateRuntimeFilterParams* param);
|
||||
Status update_filter(const UpdateRuntimeFilterParamsV2* param);
|
||||
|
||||
void set_ignored() { _is_ignored = true; }
|
||||
|
||||
// for ut
|
||||
bool is_ignored() const { return _is_ignored; }
|
||||
|
||||
void set_ignored_msg(std::string& msg) { _ignored_msg = msg; }
|
||||
|
||||
// for ut
|
||||
@ -262,21 +294,17 @@ public:
|
||||
Status consumer_close();
|
||||
|
||||
// async push runtimefilter to remote node
|
||||
Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr);
|
||||
Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr, bool opt_remote_rf);
|
||||
Status join_rpc();
|
||||
|
||||
void init_profile(RuntimeProfile* parent_profile);
|
||||
|
||||
void update_runtime_filter_type_to_profile();
|
||||
|
||||
void set_push_down_profile();
|
||||
|
||||
void ready_for_publish();
|
||||
|
||||
std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter() const;
|
||||
|
||||
static bool enable_use_batch(int be_exec_version, PrimitiveType type) {
|
||||
return be_exec_version > 0 && (is_int_or_bool(type) || is_float_or_double(type));
|
||||
static bool enable_use_batch(bool use_batch, PrimitiveType type) {
|
||||
return use_batch && (is_int_or_bool(type) || is_float_or_double(type));
|
||||
}
|
||||
|
||||
int filter_id() const { return _filter_id; }
|
||||
@ -304,7 +332,7 @@ protected:
|
||||
}
|
||||
|
||||
std::string _get_explain_state_string() {
|
||||
if (_state->enable_pipeline_exec()) {
|
||||
if (_enable_pipeline_exec) {
|
||||
return _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY
|
||||
? "READY"
|
||||
: _rf_state_atomic.load(std::memory_order_acquire) ==
|
||||
@ -318,11 +346,12 @@ protected:
|
||||
}
|
||||
}
|
||||
|
||||
RuntimeState* _state;
|
||||
RuntimeState* _state = nullptr;
|
||||
QueryContext* _query_ctx = nullptr;
|
||||
ObjectPool* _pool;
|
||||
// _wrapper is a runtime filter function wrapper
|
||||
// _wrapper should alloc from _pool
|
||||
RuntimePredicateWrapper* _wrapper = nullptr;
|
||||
RuntimePredicateWrapper* _wrapper;
|
||||
// runtime filter type
|
||||
RuntimeFilterType _runtime_filter_type;
|
||||
// runtime filter id
|
||||
@ -350,7 +379,7 @@ protected:
|
||||
// this filter won't filter any data
|
||||
bool _always_true;
|
||||
|
||||
doris::vectorized::VExprContext* _vprobe_ctx;
|
||||
doris::vectorized::VExprContext* _vprobe_ctx = nullptr;
|
||||
|
||||
// Indicate whether runtime filter expr has been ignored
|
||||
bool _is_ignored;
|
||||
@ -370,6 +399,12 @@ protected:
|
||||
|
||||
/// Time in ms (from MonotonicMillis()), that the filter was registered.
|
||||
const int64_t registration_time_;
|
||||
|
||||
const bool _enable_pipeline_exec;
|
||||
|
||||
bool _profile_init = false;
|
||||
|
||||
bool _opt_remote_rf;
|
||||
};
|
||||
|
||||
// avoid expose RuntimePredicateWrapper
|
||||
|
||||
@ -45,7 +45,8 @@ struct IRuntimeFilter::rpc_context {
|
||||
brpc::CallId cid;
|
||||
};
|
||||
|
||||
Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr) {
|
||||
Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr,
|
||||
bool opt_remote_rf) {
|
||||
DCHECK(is_producer());
|
||||
DCHECK(_rpc_context == nullptr);
|
||||
std::shared_ptr<PBackendService_Stub> stub(
|
||||
@ -69,6 +70,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
|
||||
pfragment_instance_id->set_lo(state->fragment_instance_id().lo);
|
||||
|
||||
_rpc_context->request.set_filter_id(_filter_id);
|
||||
_rpc_context->request.set_opt_remote_rf(opt_remote_rf);
|
||||
_rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
|
||||
_rpc_context->cntl.set_timeout_ms(1000);
|
||||
_rpc_context->cid = _rpc_context->cntl.call_id();
|
||||
|
||||
@ -89,8 +89,7 @@ private:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else if (IRuntimeFilter::enable_use_batch(_be_exec_version, T)) {
|
||||
} else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0, T)) {
|
||||
const auto& data =
|
||||
reinterpret_cast<
|
||||
const vectorized::PredicateColumnType<PredicateEvaluateType<T>>*>(
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#include "runtime/fragment_mgr.h"
|
||||
|
||||
#include <bvar/latency_recorder.h>
|
||||
#include <exprs/runtime_filter.h>
|
||||
#include <fmt/format.h>
|
||||
#include <gen_cpp/DorisExternalService_types.h>
|
||||
#include <gen_cpp/FrontendService.h>
|
||||
@ -640,7 +641,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
|
||||
} else {
|
||||
// This may be a first fragment request of the query.
|
||||
// Create the query fragments context.
|
||||
query_ctx = QueryContext::create_shared(params.fragment_num_on_host, _exec_env);
|
||||
query_ctx = QueryContext::create_shared(params.fragment_num_on_host, _exec_env,
|
||||
params.query_options);
|
||||
query_ctx->query_id = query_id;
|
||||
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
|
||||
&(query_ctx->desc_tbl)));
|
||||
@ -1155,10 +1157,65 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
|
||||
return runtime_filter_mgr->update_filter(request, attach_data);
|
||||
}
|
||||
|
||||
Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
|
||||
butil::IOBufAsZeroCopyInputStream* attach_data) {
|
||||
bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
|
||||
|
||||
const auto& fragment_instance_ids = request->fragment_instance_ids();
|
||||
if (fragment_instance_ids.size() > 0) {
|
||||
UniqueId fragment_instance_id = fragment_instance_ids[0];
|
||||
TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
|
||||
|
||||
std::shared_ptr<FragmentExecState> fragment_state;
|
||||
std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
|
||||
|
||||
RuntimeFilterMgr* runtime_filter_mgr = nullptr;
|
||||
ObjectPool* pool;
|
||||
if (is_pipeline) {
|
||||
std::unique_lock<std::mutex> lock(_lock);
|
||||
auto iter = _pipeline_map.find(tfragment_instance_id);
|
||||
if (iter == _pipeline_map.end()) {
|
||||
VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id;
|
||||
return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
|
||||
}
|
||||
pip_context = iter->second;
|
||||
|
||||
DCHECK(pip_context != nullptr);
|
||||
runtime_filter_mgr =
|
||||
pip_context->get_runtime_state()->get_query_ctx()->runtime_filter_mgr();
|
||||
pool = &pip_context->get_query_context()->obj_pool;
|
||||
} else {
|
||||
std::unique_lock<std::mutex> lock(_lock);
|
||||
auto iter = _fragment_map.find(tfragment_instance_id);
|
||||
if (iter == _fragment_map.end()) {
|
||||
VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id;
|
||||
return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
|
||||
}
|
||||
fragment_state = iter->second;
|
||||
|
||||
DCHECK(fragment_state != nullptr);
|
||||
runtime_filter_mgr = fragment_state->executor()
|
||||
->runtime_state()
|
||||
->get_query_ctx()
|
||||
->runtime_filter_mgr();
|
||||
pool = &fragment_state->get_query_ctx()->obj_pool;
|
||||
}
|
||||
|
||||
UpdateRuntimeFilterParamsV2 params(request, attach_data, pool);
|
||||
int filter_id = request->filter_id();
|
||||
IRuntimeFilter* real_filter = nullptr;
|
||||
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filter(filter_id, &real_filter));
|
||||
RETURN_IF_ERROR(real_filter->update_filter(¶ms));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
|
||||
butil::IOBufAsZeroCopyInputStream* attach_data) {
|
||||
UniqueId queryid = request->query_id();
|
||||
bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
|
||||
bool opt_remote_rf = request->has_opt_remote_rf() && request->opt_remote_rf();
|
||||
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
|
||||
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
|
||||
|
||||
@ -1189,7 +1246,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
|
||||
// when filter_controller->merge is still in progress
|
||||
fragment_state = iter->second;
|
||||
}
|
||||
RETURN_IF_ERROR(filter_controller->merge(request, attach_data));
|
||||
RETURN_IF_ERROR(filter_controller->merge(request, attach_data, opt_remote_rf));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -129,6 +129,9 @@ public:
|
||||
Status apply_filter(const PPublishFilterRequest* request,
|
||||
butil::IOBufAsZeroCopyInputStream* attach_data);
|
||||
|
||||
Status apply_filterv2(const PPublishFilterRequestV2* request,
|
||||
butil::IOBufAsZeroCopyInputStream* attach_data);
|
||||
|
||||
Status merge_filter(const PMergeFilterRequest* request,
|
||||
butil::IOBufAsZeroCopyInputStream* attach_data);
|
||||
|
||||
|
||||
@ -30,6 +30,7 @@
|
||||
#include "runtime/datetime_value.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/memory/mem_tracker_limiter.h"
|
||||
#include "runtime/runtime_filter_mgr.h"
|
||||
#include "runtime/runtime_predicate.h"
|
||||
#include "task_group/task_group.h"
|
||||
#include "util/pretty_printer.h"
|
||||
@ -48,8 +49,12 @@ class QueryContext {
|
||||
ENABLE_FACTORY_CREATOR(QueryContext);
|
||||
|
||||
public:
|
||||
QueryContext(int total_fragment_num, ExecEnv* exec_env)
|
||||
: fragment_num(total_fragment_num), timeout_second(-1), _exec_env(exec_env) {
|
||||
QueryContext(int total_fragment_num, ExecEnv* exec_env, const TQueryOptions& query_options)
|
||||
: fragment_num(total_fragment_num),
|
||||
timeout_second(-1),
|
||||
_exec_env(exec_env),
|
||||
_runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)),
|
||||
_query_options(query_options) {
|
||||
_start_time = vectorized::VecDateTimeValue::local_time();
|
||||
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
|
||||
_shared_scanner_controller.reset(new vectorized::SharedScannerController());
|
||||
@ -139,6 +144,29 @@ public:
|
||||
|
||||
taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); }
|
||||
|
||||
int execution_timeout() const {
|
||||
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
|
||||
: _query_options.query_timeout;
|
||||
}
|
||||
|
||||
int32_t runtime_filter_wait_time_ms() const {
|
||||
return _query_options.runtime_filter_wait_time_ms;
|
||||
}
|
||||
|
||||
bool enable_pipeline_exec() const {
|
||||
return _query_options.__isset.enable_pipeline_engine &&
|
||||
_query_options.enable_pipeline_engine;
|
||||
}
|
||||
|
||||
int be_exec_version() const {
|
||||
if (!_query_options.__isset.be_exec_version) {
|
||||
return 0;
|
||||
}
|
||||
return _query_options.be_exec_version;
|
||||
}
|
||||
|
||||
RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }
|
||||
|
||||
public:
|
||||
TUniqueId query_id;
|
||||
DescriptorTbl* desc_tbl;
|
||||
@ -186,6 +214,8 @@ private:
|
||||
vectorized::RuntimePredicate _runtime_predicate;
|
||||
|
||||
taskgroup::TaskGroupPtr _task_group;
|
||||
std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
|
||||
const TQueryOptions _query_options;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -35,6 +35,7 @@
|
||||
#include "exprs/runtime_filter.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/memory/mem_tracker.h"
|
||||
#include "runtime/query_context.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "util/brpc_client_cache.h"
|
||||
@ -51,10 +52,12 @@ struct async_rpc_context {
|
||||
|
||||
RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state) : _state(state) {}
|
||||
|
||||
RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx)
|
||||
: _query_ctx(query_ctx) {}
|
||||
|
||||
RuntimeFilterMgr::~RuntimeFilterMgr() {}
|
||||
|
||||
Status RuntimeFilterMgr::init() {
|
||||
DCHECK(_state->query_mem_tracker() != nullptr);
|
||||
_tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
|
||||
ExecEnv::GetInstance()->experimental_mem_tracker());
|
||||
return Status::OK();
|
||||
@ -107,20 +110,41 @@ Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role,
|
||||
VLOG_NOTICE << "regist filter...:" << key << ",role:" << (int)role;
|
||||
|
||||
auto iter = filter_map->find(key);
|
||||
if (iter != filter_map->end()) {
|
||||
return Status::InvalidArgument("filter has registed");
|
||||
}
|
||||
|
||||
RuntimeFilterMgrVal filter_mgr_val;
|
||||
filter_mgr_val.role = role;
|
||||
|
||||
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, role, node_id,
|
||||
&filter_mgr_val.filter, build_bf_exactly));
|
||||
|
||||
filter_map->emplace(key, filter_mgr_val);
|
||||
if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && role == RuntimeFilterRole::CONSUMER &&
|
||||
desc.has_remote_targets && desc.type == TRuntimeFilterType::BLOOM) {
|
||||
// if this runtime filter has remote target (e.g. need merge), we reuse the runtime filter between all instances
|
||||
DCHECK(_query_ctx != nullptr);
|
||||
if (iter != filter_map->end()) {
|
||||
return Status::OK();
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
iter = filter_map->find(key);
|
||||
if (iter != filter_map->end()) {
|
||||
return Status::OK();
|
||||
}
|
||||
RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc,
|
||||
&options, role, node_id, &filter_mgr_val.filter,
|
||||
build_bf_exactly));
|
||||
filter_map->emplace(key, filter_mgr_val);
|
||||
}
|
||||
} else {
|
||||
DCHECK(_state != nullptr);
|
||||
if (iter != filter_map->end()) {
|
||||
return Status::InvalidArgument("filter has registed");
|
||||
}
|
||||
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, role, node_id,
|
||||
&filter_mgr_val.filter, build_bf_exactly));
|
||||
filter_map->emplace(key, filter_mgr_val);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request,
|
||||
butil::IOBufAsZeroCopyInputStream* data) {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
|
||||
@ -158,12 +182,34 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
|
||||
cntVal->runtime_filter_desc = *runtime_filter_desc;
|
||||
cntVal->target_info = *target_info;
|
||||
cntVal->pool.reset(new ObjectPool());
|
||||
cntVal->filter = cntVal->pool->add(new IRuntimeFilter(_state, cntVal->pool.get()));
|
||||
cntVal->filter =
|
||||
cntVal->pool->add(new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool));
|
||||
|
||||
std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
|
||||
// LOG(INFO) << "entity filter id:" << filter_id;
|
||||
cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options,
|
||||
_fragment_instance_id, -1, false);
|
||||
cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, -1, false);
|
||||
_filter_map.emplace(filter_id, cntVal);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status RuntimeFilterMergeControllerEntity::_init_with_desc(
|
||||
const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options,
|
||||
const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info,
|
||||
const int producer_size) {
|
||||
std::lock_guard<std::mutex> guard(_filter_map_mutex);
|
||||
std::shared_ptr<RuntimeFilterCntlVal> cntVal = std::make_shared<RuntimeFilterCntlVal>();
|
||||
// runtime_filter_desc and target will be released,
|
||||
// so we need to copy to cntVal
|
||||
cntVal->producer_size = producer_size;
|
||||
cntVal->runtime_filter_desc = *runtime_filter_desc;
|
||||
cntVal->targetv2_info = *targetv2_info;
|
||||
cntVal->pool.reset(new ObjectPool());
|
||||
cntVal->filter =
|
||||
cntVal->pool->add(new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool));
|
||||
|
||||
std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
|
||||
// LOG(INFO) << "entity filter id:" << filter_id;
|
||||
cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options);
|
||||
_filter_map.emplace(filter_id, cntVal);
|
||||
return Status::OK();
|
||||
}
|
||||
@ -179,26 +225,60 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag
|
||||
for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
|
||||
int filter_id = filterid_to_desc.first;
|
||||
const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
|
||||
if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
|
||||
if (target_iter == runtime_filter_params.rid_to_target_param.end() &&
|
||||
!runtime_filter_params.__isset.rid_to_target_paramv2) {
|
||||
return Status::InternalError("runtime filter params meet error");
|
||||
} else if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
|
||||
const auto& targetv2_iter = runtime_filter_params.rid_to_target_paramv2.find(filter_id);
|
||||
if (targetv2_iter == runtime_filter_params.rid_to_target_paramv2.end()) {
|
||||
return Status::InternalError("runtime filter params meet error");
|
||||
}
|
||||
const auto& build_iter =
|
||||
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
|
||||
if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
|
||||
return Status::InternalError("runtime filter params meet error");
|
||||
}
|
||||
_init_with_desc(&filterid_to_desc.second, &query_options, &targetv2_iter->second,
|
||||
build_iter->second);
|
||||
} else {
|
||||
const auto& build_iter =
|
||||
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
|
||||
if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
|
||||
return Status::InternalError("runtime filter params meet error");
|
||||
}
|
||||
_init_with_desc(&filterid_to_desc.second, &query_options, &target_iter->second,
|
||||
build_iter->second);
|
||||
}
|
||||
const auto& build_iter = runtime_filter_params.runtime_filter_builder_num.find(filter_id);
|
||||
if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
|
||||
return Status::InternalError("runtime filter params meet error");
|
||||
}
|
||||
if (runtime_filter_params.__isset.rid_to_runtime_filter) {
|
||||
for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
|
||||
int filter_id = filterid_to_desc.first;
|
||||
const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
|
||||
if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
|
||||
return Status::InternalError("runtime filter params meet error");
|
||||
}
|
||||
const auto& build_iter =
|
||||
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
|
||||
if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
|
||||
return Status::InternalError("runtime filter params meet error");
|
||||
}
|
||||
_init_with_desc(&filterid_to_desc.second, &query_options, &target_iter->second,
|
||||
build_iter->second);
|
||||
}
|
||||
_init_with_desc(&filterid_to_desc.second, &query_options, &target_iter->second,
|
||||
build_iter->second);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// merge data
|
||||
Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request,
|
||||
butil::IOBufAsZeroCopyInputStream* attach_data) {
|
||||
butil::IOBufAsZeroCopyInputStream* attach_data,
|
||||
bool opt_remote_rf) {
|
||||
_opt_remote_rf = _opt_remote_rf && opt_remote_rf;
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
|
||||
std::shared_ptr<RuntimeFilterCntlVal> cntVal;
|
||||
int merged_size = 0;
|
||||
{
|
||||
int64_t start_merge = MonotonicMillis();
|
||||
std::lock_guard<std::mutex> guard(_filter_map_mutex);
|
||||
auto iter = _filter_map.find(std::to_string(request->filter_id()));
|
||||
VLOG_ROW << "recv filter id:" << request->filter_id() << " " << request->ShortDebugString();
|
||||
@ -220,70 +300,143 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
|
||||
// TODO: avoid log when we had acquired a lock
|
||||
VLOG_ROW << "merge size:" << merged_size << ":" << cntVal->producer_size;
|
||||
DCHECK_LE(merged_size, cntVal->producer_size);
|
||||
_merge_timer += (MonotonicMillis() - start_merge);
|
||||
if (merged_size < cntVal->producer_size) {
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
if (merged_size == cntVal->producer_size) {
|
||||
// prepare rpc context
|
||||
using PPublishFilterRpcContext =
|
||||
async_rpc_context<PPublishFilterRequest, PPublishFilterResponse>;
|
||||
std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
|
||||
rpc_contexts.reserve(cntVal->target_info.size());
|
||||
if (opt_remote_rf) {
|
||||
DCHECK_GT(cntVal->targetv2_info.size(), 0);
|
||||
DCHECK(cntVal->filter->is_bloomfilter());
|
||||
// Optimize merging phase iff:
|
||||
// 1. All BE has been upgraded (e.g. _opt_remote_rf)
|
||||
// 2. FE has been upgraded (e.g. cntVal->targetv2_info.size() > 0)
|
||||
// 3. This filter is bloom filter (only bloom filter should be used for merging)
|
||||
using PPublishFilterRpcContext =
|
||||
async_rpc_context<PPublishFilterRequestV2, PPublishFilterResponse>;
|
||||
std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
|
||||
rpc_contexts.reserve(cntVal->targetv2_info.size());
|
||||
|
||||
butil::IOBuf request_attachment;
|
||||
butil::IOBuf request_attachment;
|
||||
|
||||
PPublishFilterRequest apply_request;
|
||||
// serialize filter
|
||||
void* data = nullptr;
|
||||
int len = 0;
|
||||
bool has_attachment = false;
|
||||
RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, &len));
|
||||
if (data != nullptr && len > 0) {
|
||||
request_attachment.append(data, len);
|
||||
has_attachment = true;
|
||||
}
|
||||
|
||||
std::vector<TRuntimeFilterTargetParams>& targets = cntVal->target_info;
|
||||
for (size_t i = 0; i < targets.size(); i++) {
|
||||
rpc_contexts.emplace_back(new PPublishFilterRpcContext);
|
||||
size_t cur = rpc_contexts.size() - 1;
|
||||
rpc_contexts[cur]->request = apply_request;
|
||||
rpc_contexts[cur]->request.set_filter_id(request->filter_id());
|
||||
rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
|
||||
request->is_pipeline());
|
||||
*rpc_contexts[cur]->request.mutable_query_id() = request->query_id();
|
||||
if (has_attachment) {
|
||||
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
|
||||
PPublishFilterRequestV2 apply_request;
|
||||
// serialize filter
|
||||
void* data = nullptr;
|
||||
int len = 0;
|
||||
bool has_attachment = false;
|
||||
RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, &len));
|
||||
if (data != nullptr && len > 0) {
|
||||
request_attachment.append(data, len);
|
||||
has_attachment = true;
|
||||
}
|
||||
rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id();
|
||||
|
||||
// set fragment-id
|
||||
auto request_fragment_id = rpc_contexts[cur]->request.mutable_fragment_id();
|
||||
request_fragment_id->set_hi(targets[cur].target_fragment_instance_id.hi);
|
||||
request_fragment_id->set_lo(targets[cur].target_fragment_instance_id.lo);
|
||||
std::vector<TRuntimeFilterTargetParamsV2>& targets = cntVal->targetv2_info;
|
||||
for (size_t i = 0; i < targets.size(); i++) {
|
||||
rpc_contexts.emplace_back(new PPublishFilterRpcContext);
|
||||
size_t cur = rpc_contexts.size() - 1;
|
||||
rpc_contexts[cur]->request = apply_request;
|
||||
rpc_contexts[cur]->request.set_filter_id(request->filter_id());
|
||||
rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
|
||||
request->is_pipeline());
|
||||
rpc_contexts[cur]->request.set_merge_time(_merge_timer);
|
||||
*rpc_contexts[cur]->request.mutable_query_id() = request->query_id();
|
||||
if (has_attachment) {
|
||||
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
|
||||
}
|
||||
rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id();
|
||||
|
||||
std::shared_ptr<PBackendService_Stub> stub(
|
||||
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
|
||||
targets[i].target_fragment_instance_addr));
|
||||
VLOG_NOTICE << "send filter " << rpc_contexts[cur]->request.filter_id()
|
||||
<< " to:" << targets[i].target_fragment_instance_addr.hostname << ":"
|
||||
<< targets[i].target_fragment_instance_addr.port
|
||||
<< rpc_contexts[cur]->request.ShortDebugString();
|
||||
if (stub == nullptr) {
|
||||
rpc_contexts.pop_back();
|
||||
continue;
|
||||
// set fragment-id
|
||||
for (size_t fid = 0; fid < targets[cur].target_fragment_instance_ids.size();
|
||||
fid++) {
|
||||
PUniqueId* cur_id = rpc_contexts[cur]->request.add_fragment_instance_ids();
|
||||
cur_id->set_hi(targets[cur].target_fragment_instance_ids[fid].hi);
|
||||
cur_id->set_lo(targets[cur].target_fragment_instance_ids[fid].lo);
|
||||
}
|
||||
|
||||
std::shared_ptr<PBackendService_Stub> stub(
|
||||
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
|
||||
targets[i].target_fragment_instance_addr));
|
||||
VLOG_NOTICE << "send filter " << rpc_contexts[cur]->request.filter_id()
|
||||
<< " to:" << targets[i].target_fragment_instance_addr.hostname << ":"
|
||||
<< targets[i].target_fragment_instance_addr.port
|
||||
<< rpc_contexts[cur]->request.ShortDebugString();
|
||||
if (stub == nullptr) {
|
||||
rpc_contexts.pop_back();
|
||||
continue;
|
||||
}
|
||||
stub->apply_filterv2(&rpc_contexts[cur]->cntl, &rpc_contexts[cur]->request,
|
||||
&rpc_contexts[cur]->response, brpc::DoNothing());
|
||||
}
|
||||
stub->apply_filter(&rpc_contexts[cur]->cntl, &rpc_contexts[cur]->request,
|
||||
&rpc_contexts[cur]->response, brpc::DoNothing());
|
||||
}
|
||||
for (auto& rpc_context : rpc_contexts) {
|
||||
brpc::Join(rpc_context->cid);
|
||||
if (rpc_context->cntl.Failed()) {
|
||||
LOG(WARNING) << "runtimefilter rpc err:" << rpc_context->cntl.ErrorText();
|
||||
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
|
||||
rpc_context->cntl.remote_side());
|
||||
for (auto& rpc_context : rpc_contexts) {
|
||||
brpc::Join(rpc_context->cid);
|
||||
if (rpc_context->cntl.Failed()) {
|
||||
LOG(WARNING) << "runtimefilter rpc err:" << rpc_context->cntl.ErrorText();
|
||||
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
|
||||
rpc_context->cntl.remote_side());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// prepare rpc context
|
||||
using PPublishFilterRpcContext =
|
||||
async_rpc_context<PPublishFilterRequest, PPublishFilterResponse>;
|
||||
std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
|
||||
rpc_contexts.reserve(cntVal->target_info.size());
|
||||
|
||||
butil::IOBuf request_attachment;
|
||||
|
||||
PPublishFilterRequest apply_request;
|
||||
// serialize filter
|
||||
void* data = nullptr;
|
||||
int len = 0;
|
||||
bool has_attachment = false;
|
||||
RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, &len));
|
||||
if (data != nullptr && len > 0) {
|
||||
request_attachment.append(data, len);
|
||||
has_attachment = true;
|
||||
}
|
||||
|
||||
std::vector<TRuntimeFilterTargetParams>& targets = cntVal->target_info;
|
||||
for (size_t i = 0; i < targets.size(); i++) {
|
||||
rpc_contexts.emplace_back(new PPublishFilterRpcContext);
|
||||
size_t cur = rpc_contexts.size() - 1;
|
||||
rpc_contexts[cur]->request = apply_request;
|
||||
rpc_contexts[cur]->request.set_filter_id(request->filter_id());
|
||||
rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
|
||||
request->is_pipeline());
|
||||
*rpc_contexts[cur]->request.mutable_query_id() = request->query_id();
|
||||
if (has_attachment) {
|
||||
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
|
||||
}
|
||||
rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id();
|
||||
|
||||
// set fragment-id
|
||||
auto request_fragment_id = rpc_contexts[cur]->request.mutable_fragment_id();
|
||||
request_fragment_id->set_hi(targets[cur].target_fragment_instance_id.hi);
|
||||
request_fragment_id->set_lo(targets[cur].target_fragment_instance_id.lo);
|
||||
|
||||
std::shared_ptr<PBackendService_Stub> stub(
|
||||
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
|
||||
targets[i].target_fragment_instance_addr));
|
||||
VLOG_NOTICE << "send filter " << rpc_contexts[cur]->request.filter_id()
|
||||
<< " to:" << targets[i].target_fragment_instance_addr.hostname << ":"
|
||||
<< targets[i].target_fragment_instance_addr.port
|
||||
<< rpc_contexts[cur]->request.ShortDebugString();
|
||||
if (stub == nullptr) {
|
||||
rpc_contexts.pop_back();
|
||||
continue;
|
||||
}
|
||||
stub->apply_filter(&rpc_contexts[cur]->cntl, &rpc_contexts[cur]->request,
|
||||
&rpc_contexts[cur]->response, brpc::DoNothing());
|
||||
}
|
||||
for (auto& rpc_context : rpc_contexts) {
|
||||
brpc::Join(rpc_context->cid);
|
||||
if (rpc_context->cntl.Failed()) {
|
||||
LOG(WARNING) << "runtimefilter rpc err:" << rpc_context->cntl.ErrorText();
|
||||
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
|
||||
rpc_context->cntl.remote_side());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -41,11 +41,14 @@ class IOBufAsZeroCopyInputStream;
|
||||
|
||||
namespace doris {
|
||||
class PPublishFilterRequest;
|
||||
class PPublishFilterRequestV2;
|
||||
class PMergeFilterRequest;
|
||||
class IRuntimeFilter;
|
||||
class MemTracker;
|
||||
class RuntimeState;
|
||||
enum class RuntimeFilterRole;
|
||||
class RuntimePredicateWrapper;
|
||||
class QueryContext;
|
||||
|
||||
/// producer:
|
||||
/// Filter filter;
|
||||
@ -63,6 +66,8 @@ class RuntimeFilterMgr {
|
||||
public:
|
||||
RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state);
|
||||
|
||||
RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx);
|
||||
|
||||
~RuntimeFilterMgr();
|
||||
|
||||
Status init();
|
||||
@ -100,12 +105,14 @@ private:
|
||||
std::map<int32_t, RuntimeFilterMgrVal> _producer_map;
|
||||
|
||||
RuntimeState* _state;
|
||||
QueryContext* _query_ctx;
|
||||
std::unique_ptr<MemTracker> _tracker;
|
||||
ObjectPool _pool;
|
||||
|
||||
TNetworkAddress _merge_addr;
|
||||
|
||||
bool _has_merge_addr;
|
||||
std::mutex _lock;
|
||||
};
|
||||
|
||||
// controller -> <query-id, entity>
|
||||
@ -123,8 +130,8 @@ public:
|
||||
const TQueryOptions& query_options);
|
||||
|
||||
// handle merge rpc
|
||||
Status merge(const PMergeFilterRequest* request,
|
||||
butil::IOBufAsZeroCopyInputStream* attach_data);
|
||||
Status merge(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data,
|
||||
bool opt_remote_rf);
|
||||
|
||||
UniqueId query_id() const { return _query_id; }
|
||||
|
||||
@ -135,6 +142,7 @@ public:
|
||||
int producer_size;
|
||||
TRuntimeFilterDesc runtime_filter_desc;
|
||||
std::vector<doris::TRuntimeFilterTargetParams> target_info;
|
||||
std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info;
|
||||
IRuntimeFilter* filter;
|
||||
std::unordered_set<std::string> arrive_id; // fragment_instance_id ?
|
||||
std::shared_ptr<ObjectPool> pool;
|
||||
@ -149,6 +157,11 @@ private:
|
||||
const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
|
||||
const int producer_size);
|
||||
|
||||
Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
|
||||
const TQueryOptions* query_options,
|
||||
const std::vector<doris::TRuntimeFilterTargetParamsV2>* target_info,
|
||||
const int producer_size);
|
||||
|
||||
UniqueId _query_id;
|
||||
UniqueId _fragment_instance_id;
|
||||
// protect _filter_map
|
||||
@ -158,6 +171,8 @@ private:
|
||||
// filter-id -> val
|
||||
std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
|
||||
RuntimeState* _state;
|
||||
bool _opt_remote_rf = true;
|
||||
int64_t _merge_timer = 0;
|
||||
};
|
||||
|
||||
// RuntimeFilterMergeController has a map query-id -> entity
|
||||
|
||||
@ -829,6 +829,30 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr
|
||||
}
|
||||
}
|
||||
|
||||
void PInternalServiceImpl::apply_filterv2(::google::protobuf::RpcController* controller,
|
||||
const ::doris::PPublishFilterRequestV2* request,
|
||||
::doris::PPublishFilterResponse* response,
|
||||
::google::protobuf::Closure* done) {
|
||||
bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() {
|
||||
brpc::ClosureGuard closure_guard(done);
|
||||
auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
|
||||
butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
|
||||
UniqueId unique_id(request->query_id());
|
||||
VLOG_NOTICE << "rpc apply_filterv2 recv";
|
||||
Status st = _exec_env->fragment_mgr()->apply_filterv2(request, &zero_copy_input_stream);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "apply filter meet error: " << st.to_string();
|
||||
}
|
||||
st.to_protobuf(response->mutable_status());
|
||||
});
|
||||
if (!ret) {
|
||||
LOG(WARNING) << "fail to offer request to the work pool";
|
||||
brpc::ClosureGuard closure_guard(done);
|
||||
response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
|
||||
response->mutable_status()->add_error_msgs("fail to offer request to the work pool");
|
||||
}
|
||||
}
|
||||
|
||||
void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller,
|
||||
const PSendDataRequest* request, PSendDataResult* response,
|
||||
google::protobuf::Closure* done) {
|
||||
|
||||
@ -123,6 +123,10 @@ public:
|
||||
const ::doris::PPublishFilterRequest* request,
|
||||
::doris::PPublishFilterResponse* response,
|
||||
::google::protobuf::Closure* done) override;
|
||||
void apply_filterv2(::google::protobuf::RpcController* controller,
|
||||
const ::doris::PPublishFilterRequestV2* request,
|
||||
::doris::PPublishFilterResponse* response,
|
||||
::google::protobuf::Closure* done) override;
|
||||
void transmit_block(::google::protobuf::RpcController* controller,
|
||||
const ::doris::PTransmitDataParams* request,
|
||||
::doris::PTransmitDataResult* response,
|
||||
|
||||
@ -428,6 +428,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
|
||||
ADD_TIMER(_build_phase_profile, "BuildTableConvertToPartitionedTime");
|
||||
_build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows", TUnit::UNIT);
|
||||
_build_side_compute_hash_timer = ADD_TIMER(_build_phase_profile, "BuildSideHashComputingTime");
|
||||
_build_runtime_filter_timer = ADD_TIMER(_build_phase_profile, "BuildRuntimeFilterTime");
|
||||
|
||||
// Probe phase
|
||||
auto probe_phase_profile = runtime_profile()->create_child("ProbePhase", true, true);
|
||||
@ -441,7 +442,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
|
||||
|
||||
_join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer");
|
||||
|
||||
_push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime");
|
||||
_push_down_timer = ADD_TIMER(runtime_profile(), "PublishRuntimeFilterTime");
|
||||
_push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime");
|
||||
_build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT);
|
||||
_build_buckets_fill_counter = ADD_COUNTER(runtime_profile(), "FilledBuckets", TUnit::UNIT);
|
||||
|
||||
@ -288,6 +288,7 @@ private:
|
||||
RuntimeProfile::Counter* _probe_side_output_timer;
|
||||
RuntimeProfile::Counter* _build_side_compute_hash_timer;
|
||||
RuntimeProfile::Counter* _build_side_merge_block_timer;
|
||||
RuntimeProfile::Counter* _build_runtime_filter_timer;
|
||||
|
||||
RuntimeProfile::Counter* _build_blocks_memory_usage;
|
||||
RuntimeProfile::Counter* _hash_table_memory_usage;
|
||||
|
||||
@ -316,10 +316,23 @@ Status VScanNode::_register_runtime_filter() {
|
||||
for (int i = 0; i < filter_size; ++i) {
|
||||
IRuntimeFilter* runtime_filter = nullptr;
|
||||
const auto& filter_desc = _runtime_filter_descs[i];
|
||||
RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter(
|
||||
RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id()));
|
||||
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
|
||||
&runtime_filter));
|
||||
if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
|
||||
DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM && filter_desc.has_remote_targets);
|
||||
// Optimize merging phase iff:
|
||||
// 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
|
||||
// 2. This filter is bloom filter (only bloom filter should be used for merging)
|
||||
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_filter(
|
||||
RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(),
|
||||
false));
|
||||
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
|
||||
filter_desc.filter_id, &runtime_filter));
|
||||
} else {
|
||||
RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter(
|
||||
RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(),
|
||||
false));
|
||||
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
|
||||
&runtime_filter));
|
||||
}
|
||||
_runtime_filter_ctxs.emplace_back(runtime_filter);
|
||||
_runtime_filter_ready_flag.emplace_back(false);
|
||||
}
|
||||
@ -345,13 +358,6 @@ Status VScanNode::_acquire_runtime_filter(bool wait) {
|
||||
std::vector<VExpr*> vexprs;
|
||||
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
|
||||
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
|
||||
// If all targets are local, scan node will use hash node's runtime filter, and we don't
|
||||
// need to allocate memory again
|
||||
if (runtime_filter->has_remote_target()) {
|
||||
if (auto bf = runtime_filter->get_bloomfilter()) {
|
||||
RETURN_IF_ERROR(bf->init_with_fixed_length());
|
||||
}
|
||||
}
|
||||
bool ready = runtime_filter->is_ready();
|
||||
if (!ready && wait) {
|
||||
ready = runtime_filter->await();
|
||||
@ -1329,7 +1335,8 @@ Status VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
|
||||
++current_arrived_rf_num;
|
||||
continue;
|
||||
} else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
|
||||
_runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor);
|
||||
_runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor,
|
||||
_state);
|
||||
++current_arrived_rf_num;
|
||||
_runtime_filter_ctxs[i].apply_mark = true;
|
||||
}
|
||||
|
||||
@ -190,8 +190,12 @@ public final class RuntimeFilter {
|
||||
tFilter.setIsBroadcastJoin(isBroadcastJoin);
|
||||
tFilter.setHasLocalTargets(hasLocalTargets);
|
||||
tFilter.setHasRemoteTargets(hasRemoteTargets);
|
||||
boolean optRemoteRf = true;
|
||||
for (RuntimeFilterTarget target : targets) {
|
||||
tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), target.expr.treeToThrift());
|
||||
// TODO: now only support SlotRef
|
||||
optRemoteRf = optRemoteRf && hasRemoteTargets && runtimeFilterType == TRuntimeFilterType.BLOOM
|
||||
&& target.expr instanceof SlotRef;
|
||||
}
|
||||
tFilter.setType(runtimeFilterType);
|
||||
tFilter.setBloomFilterSizeBytes(filterSizeBytes);
|
||||
@ -199,6 +203,7 @@ public final class RuntimeFilter {
|
||||
tFilter.setBitmapTargetExpr(targets.get(0).expr.treeToThrift());
|
||||
tFilter.setBitmapFilterNotIn(bitmapFilterNotIn);
|
||||
}
|
||||
tFilter.setOptRemoteRf(optRemoteRf);
|
||||
return tFilter;
|
||||
}
|
||||
|
||||
|
||||
@ -96,7 +96,7 @@ import org.apache.doris.thrift.TQueryType;
|
||||
import org.apache.doris.thrift.TReportExecStatusParams;
|
||||
import org.apache.doris.thrift.TResourceLimit;
|
||||
import org.apache.doris.thrift.TRuntimeFilterParams;
|
||||
import org.apache.doris.thrift.TRuntimeFilterTargetParams;
|
||||
import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2;
|
||||
import org.apache.doris.thrift.TScanRangeLocation;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
import org.apache.doris.thrift.TScanRangeParams;
|
||||
@ -3122,12 +3122,24 @@ public class Coordinator {
|
||||
if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
|
||||
for (Map.Entry<RuntimeFilterId, List<FRuntimeFilterTargetParam>> entry
|
||||
: ridToTargetParam.entrySet()) {
|
||||
List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList();
|
||||
Map<TNetworkAddress, TRuntimeFilterTargetParamsV2> targetParams = new HashMap<>();
|
||||
for (FRuntimeFilterTargetParam targetParam : entry.getValue()) {
|
||||
targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
|
||||
targetParam.targetFragmentInstanceAddr));
|
||||
if (targetParams.containsKey(targetParam.targetFragmentInstanceAddr)) {
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
|
||||
.add(targetParam.targetFragmentInstanceId);
|
||||
} else {
|
||||
targetParams.put(targetParam.targetFragmentInstanceAddr,
|
||||
new TRuntimeFilterTargetParamsV2());
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_addr
|
||||
= targetParam.targetFragmentInstanceAddr;
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
|
||||
= new ArrayList<>();
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
|
||||
.add(targetParam.targetFragmentInstanceId);
|
||||
}
|
||||
}
|
||||
params.params.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(), targetParams);
|
||||
params.params.runtime_filter_params.putToRidToTargetParamv2(entry.getKey().asInt(),
|
||||
new ArrayList<TRuntimeFilterTargetParamsV2>(targetParams.values()));
|
||||
}
|
||||
for (Map.Entry<RuntimeFilterId, Integer> entry : ridToBuilderNum.entrySet()) {
|
||||
params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(
|
||||
@ -3197,12 +3209,25 @@ public class Coordinator {
|
||||
if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
|
||||
for (Map.Entry<RuntimeFilterId, List<FRuntimeFilterTargetParam>> entry
|
||||
: ridToTargetParam.entrySet()) {
|
||||
List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList();
|
||||
Map<TNetworkAddress, TRuntimeFilterTargetParamsV2> targetParams = new HashMap<>();
|
||||
for (FRuntimeFilterTargetParam targetParam : entry.getValue()) {
|
||||
targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
|
||||
targetParam.targetFragmentInstanceAddr));
|
||||
if (targetParams.containsKey(targetParam.targetFragmentInstanceAddr)) {
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
|
||||
.add(targetParam.targetFragmentInstanceId);
|
||||
} else {
|
||||
targetParams.put(targetParam.targetFragmentInstanceAddr,
|
||||
new TRuntimeFilterTargetParamsV2());
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_addr
|
||||
= targetParam.targetFragmentInstanceAddr;
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
|
||||
= new ArrayList<>();
|
||||
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
|
||||
.add(targetParam.targetFragmentInstanceId);
|
||||
}
|
||||
}
|
||||
localParams.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(), targetParams);
|
||||
|
||||
localParams.runtime_filter_params.putToRidToTargetParamv2(entry.getKey().asInt(),
|
||||
new ArrayList<TRuntimeFilterTargetParamsV2>(targetParams.values()));
|
||||
}
|
||||
for (Map.Entry<RuntimeFilterId, Integer> entry : ridToBuilderNum.entrySet()) {
|
||||
localParams.runtime_filter_params.putToRuntimeFilterBuilderNum(
|
||||
|
||||
@ -481,6 +481,7 @@ message PMergeFilterRequest {
|
||||
optional PBloomFilter bloom_filter = 6;
|
||||
optional PInFilter in_filter = 7;
|
||||
optional bool is_pipeline = 8;
|
||||
optional bool opt_remote_rf = 9;
|
||||
};
|
||||
|
||||
message PMergeFilterResponse {
|
||||
@ -498,6 +499,18 @@ message PPublishFilterRequest {
|
||||
optional bool is_pipeline = 8;
|
||||
};
|
||||
|
||||
message PPublishFilterRequestV2 {
|
||||
required int32 filter_id = 1;
|
||||
required PUniqueId query_id = 2;
|
||||
repeated PUniqueId fragment_instance_ids = 3;
|
||||
required PFilterType filter_type = 4;
|
||||
optional PMinMaxFilter minmax_filter = 5;
|
||||
optional PBloomFilter bloom_filter = 6;
|
||||
optional PInFilter in_filter = 7;
|
||||
optional bool is_pipeline = 8;
|
||||
optional int64 merge_time = 9;
|
||||
};
|
||||
|
||||
message PPublishFilterResponse {
|
||||
required PStatus status = 1;
|
||||
};
|
||||
@ -639,6 +652,7 @@ service PBackendService {
|
||||
rpc rollback(PRollbackRequest) returns (PRollbackResult);
|
||||
rpc merge_filter(PMergeFilterRequest) returns (PMergeFilterResponse);
|
||||
rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse);
|
||||
rpc apply_filterv2(PPublishFilterRequestV2) returns (PPublishFilterResponse);
|
||||
rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
|
||||
rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult);
|
||||
rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult);
|
||||
|
||||
@ -237,6 +237,12 @@ struct TRuntimeFilterTargetParams {
|
||||
2: required Types.TNetworkAddress target_fragment_instance_addr
|
||||
}
|
||||
|
||||
struct TRuntimeFilterTargetParamsV2 {
|
||||
1: required list<Types.TUniqueId> target_fragment_instance_ids
|
||||
// The address of the instance where the fragment is expected to run
|
||||
2: required Types.TNetworkAddress target_fragment_instance_addr
|
||||
}
|
||||
|
||||
struct TRuntimeFilterParams {
|
||||
// Runtime filter merge instance address
|
||||
1: optional Types.TNetworkAddress runtime_filter_merge_addr
|
||||
@ -250,6 +256,8 @@ struct TRuntimeFilterParams {
|
||||
|
||||
// Number of Runtime filter producers
|
||||
4: optional map<i32, i32> runtime_filter_builder_num
|
||||
|
||||
5: optional map<i32, list<TRuntimeFilterTargetParamsV2>> rid_to_target_paramv2
|
||||
}
|
||||
|
||||
// Parameters for a single execution instance of a particular TPlanFragment
|
||||
|
||||
@ -1001,6 +1001,8 @@ struct TRuntimeFilterDesc {
|
||||
|
||||
// for bitmap filter
|
||||
11: optional bool bitmap_filter_not_in
|
||||
|
||||
12: optional bool opt_remote_rf;
|
||||
}
|
||||
|
||||
struct TDataGenScanNode {
|
||||
|
||||
Reference in New Issue
Block a user