[Chore](runtime filter) change runtime filter dcheck to error status or exception (#21475)
change runtime filter dcheck to error status or exception
This commit is contained in:
@ -17,6 +17,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "common/exception.h"
|
||||
#include "common/status.h"
|
||||
#include "exprs/runtime_filter.h"
|
||||
#include "runtime/runtime_filter_mgr.h"
|
||||
@ -28,20 +29,24 @@
|
||||
#include "vec/runtime/shared_hash_table_controller.h"
|
||||
|
||||
namespace doris {
|
||||
// this class used in a hash join node
|
||||
// Provide a unified interface for other classes
|
||||
template <typename ExprCtxType>
|
||||
class RuntimeFilterSlotsBase {
|
||||
// this class used in hash join node
|
||||
class VRuntimeFilterSlots {
|
||||
public:
|
||||
RuntimeFilterSlotsBase(const std::vector<std::shared_ptr<ExprCtxType>>& prob_expr_ctxs,
|
||||
const std::vector<std::shared_ptr<ExprCtxType>>& build_expr_ctxs,
|
||||
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
|
||||
VRuntimeFilterSlots(
|
||||
const std::vector<std::shared_ptr<vectorized::VExprContext>>& prob_expr_ctxs,
|
||||
const std::vector<std::shared_ptr<vectorized::VExprContext>>& build_expr_ctxs,
|
||||
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
|
||||
: _probe_expr_context(prob_expr_ctxs),
|
||||
_build_expr_context(build_expr_ctxs),
|
||||
_runtime_filter_descs(runtime_filter_descs) {}
|
||||
|
||||
Status init(RuntimeState* state, int64_t hash_table_size, size_t build_bf_cardinality) {
|
||||
DCHECK(_probe_expr_context.size() == _build_expr_context.size());
|
||||
if (_probe_expr_context.size() != _build_expr_context.size()) {
|
||||
return Status::InternalError(
|
||||
"_probe_expr_context.size() != _build_expr_context.size(), "
|
||||
"_probe_expr_context.size()={}, _build_expr_context.size()={}",
|
||||
_probe_expr_context.size(), _build_expr_context.size());
|
||||
}
|
||||
|
||||
// runtime filter effect strategy
|
||||
// 1. we will ignore IN filter when hash_table_size is too big
|
||||
@ -53,7 +58,10 @@ public:
|
||||
auto ignore_local_filter = [state](int filter_id) {
|
||||
std::vector<IRuntimeFilter*> filters;
|
||||
state->runtime_filter_mgr()->get_consume_filters(filter_id, filters);
|
||||
DCHECK(!filters.empty());
|
||||
if (filters.empty()) {
|
||||
throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, filter_id={}",
|
||||
filter_id);
|
||||
}
|
||||
for (auto filter : filters) {
|
||||
filter->set_ignored();
|
||||
filter->signal();
|
||||
@ -92,9 +100,13 @@ public:
|
||||
IRuntimeFilter* runtime_filter = nullptr;
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
|
||||
&runtime_filter));
|
||||
DCHECK(runtime_filter != nullptr);
|
||||
DCHECK(runtime_filter->expr_order() >= 0);
|
||||
DCHECK(runtime_filter->expr_order() < _probe_expr_context.size());
|
||||
if (runtime_filter->expr_order() < 0 ||
|
||||
runtime_filter->expr_order() >= _probe_expr_context.size()) {
|
||||
return Status::InternalError(
|
||||
"runtime_filter meet invalid expr_order, expr_order={}, "
|
||||
"probe_expr_context.size={}",
|
||||
runtime_filter->expr_order(), _probe_expr_context.size());
|
||||
}
|
||||
|
||||
// do not create 'in filter' when hash_table size over limit
|
||||
auto max_in_num = state->runtime_filter_max_in_num();
|
||||
@ -250,12 +262,11 @@ public:
|
||||
bool empty() { return !_runtime_filters.size(); }
|
||||
|
||||
private:
|
||||
const std::vector<std::shared_ptr<ExprCtxType>>& _probe_expr_context;
|
||||
const std::vector<std::shared_ptr<ExprCtxType>>& _build_expr_context;
|
||||
const std::vector<std::shared_ptr<vectorized::VExprContext>>& _probe_expr_context;
|
||||
const std::vector<std::shared_ptr<vectorized::VExprContext>>& _build_expr_context;
|
||||
const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
|
||||
// prob_contition index -> [IRuntimeFilter]
|
||||
std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
|
||||
};
|
||||
|
||||
using VRuntimeFilterSlots = RuntimeFilterSlotsBase<vectorized::VExprContext>;
|
||||
} // namespace doris
|
||||
|
||||
@ -32,23 +32,26 @@
|
||||
|
||||
namespace doris {
|
||||
// this class used in cross join node
|
||||
template <typename ExprCtxType = vectorized::VExprContext>
|
||||
class RuntimeFilterSlotsCross {
|
||||
class VRuntimeFilterSlotsCross {
|
||||
public:
|
||||
RuntimeFilterSlotsCross(const std::vector<TRuntimeFilterDesc>& runtime_filter_descs,
|
||||
const vectorized::VExprContextSPtrs& src_expr_ctxs)
|
||||
VRuntimeFilterSlotsCross(const std::vector<TRuntimeFilterDesc>& runtime_filter_descs,
|
||||
const vectorized::VExprContextSPtrs& src_expr_ctxs)
|
||||
: _runtime_filter_descs(runtime_filter_descs), filter_src_expr_ctxs(src_expr_ctxs) {}
|
||||
|
||||
~RuntimeFilterSlotsCross() = default;
|
||||
~VRuntimeFilterSlotsCross() = default;
|
||||
|
||||
Status init(RuntimeState* state) {
|
||||
for (auto& filter_desc : _runtime_filter_descs) {
|
||||
IRuntimeFilter* runtime_filter = nullptr;
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
|
||||
&runtime_filter));
|
||||
DCHECK(runtime_filter != nullptr);
|
||||
if (runtime_filter == nullptr) {
|
||||
return Status::InternalError("runtime filter is nullptr");
|
||||
}
|
||||
// cross join has not remote filter
|
||||
DCHECK(!runtime_filter->has_remote_target());
|
||||
if (runtime_filter->has_remote_target()) {
|
||||
return Status::InternalError("cross join runtime filter has remote target");
|
||||
}
|
||||
_runtime_filters.push_back(runtime_filter);
|
||||
}
|
||||
return Status::OK();
|
||||
@ -104,5 +107,4 @@ private:
|
||||
std::vector<IRuntimeFilter*> _runtime_filters;
|
||||
};
|
||||
|
||||
using VRuntimeFilterSlotsCross = RuntimeFilterSlotsCross<vectorized::VExprContext>;
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user