[refactor](conjuncts) simplify conjuncts in exec node (#19254)
Co-authored-by: yiguolei <yiguolei@gmail.com> Currently, exec node save exprcontext**, but the object is in object pool, the code is very unclear. we could just use exprcontext*.
This commit is contained in:
@ -143,11 +143,10 @@ Status BaseScanner::init_expr_ctxes() {
|
||||
if (!_pre_filter_texprs.empty()) {
|
||||
// for vectorized, preceding filter exprs should be compounded to one passed from fe.
|
||||
DCHECK(_pre_filter_texprs.size() == 1);
|
||||
_vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*);
|
||||
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(
|
||||
_state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get()));
|
||||
RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc));
|
||||
RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state));
|
||||
_state->obj_pool(), _pre_filter_texprs[0], &_vpre_filter_ctx_ptr));
|
||||
RETURN_IF_ERROR(_vpre_filter_ctx_ptr->prepare(_state, *_row_desc));
|
||||
RETURN_IF_ERROR(_vpre_filter_ctx_ptr->open(_state));
|
||||
}
|
||||
|
||||
// Construct dest slots information
|
||||
@ -365,7 +364,7 @@ Status BaseScanner::_fill_dest_block(vectorized::Block* dest_block, bool* eof) {
|
||||
|
||||
void BaseScanner::close() {
|
||||
if (_vpre_filter_ctx_ptr) {
|
||||
(*_vpre_filter_ctx_ptr)->close(_state);
|
||||
_vpre_filter_ctx_ptr->close(_state);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -131,7 +131,7 @@ protected:
|
||||
|
||||
// for vectorized load
|
||||
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
|
||||
std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
|
||||
vectorized::VExprContext* _vpre_filter_ctx_ptr = nullptr;
|
||||
vectorized::Block _src_block;
|
||||
bool _src_block_mem_reuse = false;
|
||||
int _num_of_columns_from_file;
|
||||
|
||||
@ -104,9 +104,8 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
init_runtime_profile(get_name());
|
||||
|
||||
if (tnode.__isset.vconjunct) {
|
||||
_vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
|
||||
RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(_pool, tnode.vconjunct,
|
||||
_vconjunct_ctx_ptr.get()));
|
||||
&_vconjunct_ctx_ptr));
|
||||
}
|
||||
|
||||
// create the projections expr
|
||||
@ -131,8 +130,8 @@ Status ExecNode::prepare(RuntimeState* state) {
|
||||
_mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(),
|
||||
_runtime_profile.get(), nullptr, "PeakMemoryUsage");
|
||||
|
||||
if (_vconjunct_ctx_ptr) {
|
||||
RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, intermediate_row_desc()));
|
||||
if (_vconjunct_ctx_ptr != nullptr) {
|
||||
RETURN_IF_ERROR(_vconjunct_ctx_ptr->prepare(state, intermediate_row_desc()));
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc()));
|
||||
@ -145,8 +144,8 @@ Status ExecNode::prepare(RuntimeState* state) {
|
||||
}
|
||||
|
||||
Status ExecNode::alloc_resource(doris::RuntimeState* state) {
|
||||
if (_vconjunct_ctx_ptr) {
|
||||
RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state));
|
||||
if (_vconjunct_ctx_ptr != nullptr) {
|
||||
RETURN_IF_ERROR(_vconjunct_ctx_ptr->open(state));
|
||||
}
|
||||
RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
|
||||
return Status::OK();
|
||||
@ -178,8 +177,8 @@ void ExecNode::release_resource(doris::RuntimeState* state) {
|
||||
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
|
||||
}
|
||||
|
||||
if (_vconjunct_ctx_ptr) {
|
||||
(*_vconjunct_ctx_ptr)->close(state);
|
||||
if (_vconjunct_ctx_ptr != nullptr) {
|
||||
_vconjunct_ctx_ptr->close(state);
|
||||
}
|
||||
vectorized::VExpr::close(_projections, state);
|
||||
|
||||
|
||||
@ -258,7 +258,7 @@ protected:
|
||||
ObjectPool* _pool;
|
||||
std::vector<TupleId> _tuple_ids;
|
||||
|
||||
std::unique_ptr<doris::vectorized::VExprContext*> _vconjunct_ctx_ptr;
|
||||
doris::vectorized::VExprContext* _vconjunct_ctx_ptr = nullptr;
|
||||
|
||||
std::vector<ExecNode*> _children;
|
||||
RowDescriptor _row_descriptor;
|
||||
|
||||
@ -65,16 +65,15 @@ void ScanNode::_peel_pushed_vconjunct(RuntimeState* state,
|
||||
}
|
||||
|
||||
int leaf_index = 0;
|
||||
vectorized::VExpr* conjunct_expr_root = (*_vconjunct_ctx_ptr)->root();
|
||||
vectorized::VExpr* conjunct_expr_root = _vconjunct_ctx_ptr->root();
|
||||
|
||||
if (conjunct_expr_root != nullptr) {
|
||||
vectorized::VExpr* new_conjunct_expr_root = vectorized::VectorizedUtils::dfs_peel_conjunct(
|
||||
state, *_vconjunct_ctx_ptr, conjunct_expr_root, leaf_index, checker);
|
||||
state, _vconjunct_ctx_ptr, conjunct_expr_root, leaf_index, checker);
|
||||
if (new_conjunct_expr_root == nullptr) {
|
||||
(*_vconjunct_ctx_ptr)->close(state);
|
||||
_vconjunct_ctx_ptr.reset(nullptr);
|
||||
_vconjunct_ctx_ptr->close(state);
|
||||
} else {
|
||||
(*_vconjunct_ctx_ptr)->set_root(new_conjunct_expr_root);
|
||||
_vconjunct_ctx_ptr->set_root(new_conjunct_expr_root);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -167,7 +167,7 @@ Status NewEsScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
|
||||
_state, this, _limit_per_scanner, _tuple_id, properties, _docvalue_context,
|
||||
doc_value_mode, _state->runtime_profile());
|
||||
|
||||
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
|
||||
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr));
|
||||
scanners->push_back(scanner);
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
@ -58,7 +58,7 @@ NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t l
|
||||
_docvalue_context(docvalue_context),
|
||||
_doc_value_mode(doc_value_mode) {}
|
||||
|
||||
Status NewEsScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) {
|
||||
Status NewEsScanner::prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr) {
|
||||
VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare";
|
||||
RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
|
||||
|
||||
|
||||
@ -60,7 +60,7 @@ public:
|
||||
Status close(RuntimeState* state) override;
|
||||
|
||||
public:
|
||||
Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr);
|
||||
Status prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr);
|
||||
|
||||
protected:
|
||||
Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
|
||||
|
||||
@ -111,7 +111,7 @@ Status NewFileScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
|
||||
VFileScanner::create_unique(_state, this, _limit_per_scanner,
|
||||
scan_range.scan_range.ext_scan_range.file_scan_range,
|
||||
runtime_profile(), _kv_cache.get());
|
||||
RETURN_IF_ERROR(scanner->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range,
|
||||
RETURN_IF_ERROR(scanner->prepare(_vconjunct_ctx_ptr, &_colname_to_value_range,
|
||||
&_colname_to_slot_id));
|
||||
scanners->push_back(std::move(scanner));
|
||||
}
|
||||
|
||||
@ -68,7 +68,7 @@ Status NewJdbcScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
|
||||
std::unique_ptr<NewJdbcScanner> scanner =
|
||||
NewJdbcScanner::create_unique(_state, this, _limit_per_scanner, _tuple_id,
|
||||
_query_string, _table_type, _state->runtime_profile());
|
||||
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
|
||||
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr));
|
||||
scanners->push_back(std::move(scanner));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -54,11 +54,11 @@ NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int
|
||||
_connector_close_timer = ADD_TIMER(get_parent()->_scanner_profile, "ConnectorCloseTime");
|
||||
}
|
||||
|
||||
Status NewJdbcScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) {
|
||||
Status NewJdbcScanner::prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr) {
|
||||
VLOG_CRITICAL << "NewJdbcScanner::Prepare";
|
||||
if (vconjunct_ctx_ptr != nullptr) {
|
||||
// Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx.
|
||||
RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(state, &_vconjunct_ctx));
|
||||
RETURN_IF_ERROR(vconjunct_ctx_ptr->clone(state, &_vconjunct_ctx));
|
||||
}
|
||||
|
||||
if (_is_init) {
|
||||
|
||||
@ -52,7 +52,7 @@ public:
|
||||
Status open(RuntimeState* state) override;
|
||||
Status close(RuntimeState* state) override;
|
||||
|
||||
Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr);
|
||||
Status prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr);
|
||||
|
||||
protected:
|
||||
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
|
||||
|
||||
@ -67,7 +67,7 @@ Status NewOdbcScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
|
||||
}
|
||||
std::shared_ptr<NewOdbcScanner> scanner = NewOdbcScanner::create_shared(
|
||||
_state, this, _limit_per_scanner, _odbc_scan_node, _state->runtime_profile());
|
||||
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
|
||||
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr));
|
||||
scanners->push_back(scanner);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -61,11 +61,11 @@ NewOdbcScanner::NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int
|
||||
_tuple_id(odbc_scan_node.tuple_id),
|
||||
_tuple_desc(nullptr) {}
|
||||
|
||||
Status NewOdbcScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) {
|
||||
Status NewOdbcScanner::prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr) {
|
||||
VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare";
|
||||
if (vconjunct_ctx_ptr != nullptr) {
|
||||
// Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx.
|
||||
RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(state, &_vconjunct_ctx));
|
||||
RETURN_IF_ERROR(vconjunct_ctx_ptr->clone(state, &_vconjunct_ctx));
|
||||
}
|
||||
|
||||
if (_is_init) {
|
||||
|
||||
@ -56,7 +56,7 @@ public:
|
||||
Status close(RuntimeState* state) override;
|
||||
|
||||
public:
|
||||
Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr);
|
||||
Status prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr);
|
||||
|
||||
protected:
|
||||
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
|
||||
|
||||
@ -431,9 +431,9 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
|
||||
SCOPED_TIMER(_scanner_init_timer);
|
||||
auto span = opentelemetry::trace::Tracer::GetCurrentSpan();
|
||||
|
||||
if (_vconjunct_ctx_ptr && (*_vconjunct_ctx_ptr)->root()) {
|
||||
if (_vconjunct_ctx_ptr && _vconjunct_ctx_ptr->root()) {
|
||||
_runtime_profile->add_info_string("RemainedDownPredicates",
|
||||
(*_vconjunct_ctx_ptr)->root()->debug_string());
|
||||
_vconjunct_ctx_ptr->root()->debug_string());
|
||||
}
|
||||
|
||||
if (!_olap_scan_node.output_column_unique_ids.empty()) {
|
||||
|
||||
@ -100,11 +100,11 @@ static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema,
|
||||
Status NewOlapScanner::init() {
|
||||
_is_init = true;
|
||||
auto parent = static_cast<NewOlapScanNode*>(_parent);
|
||||
RETURN_IF_ERROR(VScanner::prepare(_state, parent->_vconjunct_ctx_ptr.get()));
|
||||
RETURN_IF_ERROR(VScanner::prepare(_state, parent->_vconjunct_ctx_ptr));
|
||||
if (parent->_common_vexpr_ctxs_pushdown != nullptr) {
|
||||
// Copy common_vexpr_ctxs_pushdown from scan node to this scanner's _common_vexpr_ctxs_pushdown, just necessary.
|
||||
RETURN_IF_ERROR((*parent->_common_vexpr_ctxs_pushdown)
|
||||
->clone(_state, &_common_vexpr_ctxs_pushdown));
|
||||
RETURN_IF_ERROR(
|
||||
parent->_common_vexpr_ctxs_pushdown->clone(_state, &_common_vexpr_ctxs_pushdown));
|
||||
}
|
||||
|
||||
// set limit to reduce end of rowset and segment mem use
|
||||
|
||||
@ -97,7 +97,7 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t
|
||||
}
|
||||
|
||||
Status VFileScanner::prepare(
|
||||
VExprContext** vconjunct_ctx_ptr,
|
||||
VExprContext* vconjunct_ctx_ptr,
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const std::unordered_map<std::string, int>* colname_to_slot_id) {
|
||||
RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
|
||||
@ -127,11 +127,10 @@ Status VFileScanner::prepare(
|
||||
std::vector<bool>({false})));
|
||||
// prepare pre filters
|
||||
if (_params.__isset.pre_filter_exprs) {
|
||||
_pre_conjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
|
||||
RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(
|
||||
_state->obj_pool(), _params.pre_filter_exprs, _pre_conjunct_ctx_ptr.get()));
|
||||
RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->prepare(_state, *_src_row_desc));
|
||||
RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->open(_state));
|
||||
_state->obj_pool(), _params.pre_filter_exprs, &_pre_conjunct_ctx_ptr));
|
||||
RETURN_IF_ERROR(_pre_conjunct_ctx_ptr->prepare(_state, *_src_row_desc));
|
||||
RETURN_IF_ERROR(_pre_conjunct_ctx_ptr->open(_state));
|
||||
}
|
||||
}
|
||||
|
||||
@ -851,7 +850,7 @@ Status VFileScanner::close(RuntimeState* state) {
|
||||
}
|
||||
|
||||
if (_pre_conjunct_ctx_ptr) {
|
||||
(*_pre_conjunct_ctx_ptr)->close(state);
|
||||
_pre_conjunct_ctx_ptr->close(state);
|
||||
}
|
||||
|
||||
if (_push_down_expr) {
|
||||
|
||||
@ -71,7 +71,7 @@ public:
|
||||
|
||||
Status close(RuntimeState* state) override;
|
||||
|
||||
Status prepare(VExprContext** vconjunct_ctx_ptr,
|
||||
Status prepare(VExprContext* vconjunct_ctx_ptr,
|
||||
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
|
||||
const std::unordered_map<std::string, int>* colname_to_slot_id);
|
||||
|
||||
@ -128,7 +128,7 @@ protected:
|
||||
std::unordered_set<std::string> _missing_cols;
|
||||
|
||||
// For load task
|
||||
std::unique_ptr<doris::vectorized::VExprContext*> _pre_conjunct_ctx_ptr;
|
||||
doris::vectorized::VExprContext* _pre_conjunct_ctx_ptr = nullptr;
|
||||
std::unique_ptr<RowDescriptor> _src_row_desc;
|
||||
// row desc for default exprs
|
||||
std::unique_ptr<RowDescriptor> _default_val_row_desc;
|
||||
|
||||
@ -65,7 +65,7 @@ Status VMetaScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
|
||||
for (auto& scan_range : _scan_ranges) {
|
||||
std::shared_ptr<VMetaScanner> scanner = VMetaScanner::create_shared(
|
||||
_state, this, _tuple_id, scan_range, _limit_per_scanner, runtime_profile());
|
||||
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
|
||||
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr));
|
||||
scanners->push_back(scanner);
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
@ -70,7 +70,7 @@ Status VMetaScanner::open(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VMetaScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) {
|
||||
Status VMetaScanner::prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr) {
|
||||
VLOG_CRITICAL << "VMetaScanner::prepare";
|
||||
RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
|
||||
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
|
||||
|
||||
@ -55,7 +55,7 @@ public:
|
||||
|
||||
Status open(RuntimeState* state) override;
|
||||
Status close(RuntimeState* state) override;
|
||||
Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr);
|
||||
Status prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr);
|
||||
|
||||
protected:
|
||||
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
|
||||
|
||||
@ -391,8 +391,8 @@ Status VScanNode::_append_rf_into_conjuncts(std::vector<VExpr*>& vexprs) {
|
||||
}
|
||||
|
||||
VExpr* last_expr = nullptr;
|
||||
if (_vconjunct_ctx_ptr) {
|
||||
last_expr = (*_vconjunct_ctx_ptr)->root();
|
||||
if (_vconjunct_ctx_ptr != nullptr) {
|
||||
last_expr = _vconjunct_ctx_ptr->root();
|
||||
} else {
|
||||
DCHECK(_rf_vexpr_set.find(vexprs[0]) == _rf_vexpr_set.end());
|
||||
last_expr = vexprs[0];
|
||||
@ -430,15 +430,14 @@ Status VScanNode::_append_rf_into_conjuncts(std::vector<VExpr*>& vexprs) {
|
||||
}
|
||||
auto new_vconjunct_ctx_ptr = _pool->add(VExprContext::create_unique(last_expr).release());
|
||||
if (_vconjunct_ctx_ptr) {
|
||||
(*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr);
|
||||
_vconjunct_ctx_ptr->clone_fn_contexts(new_vconjunct_ctx_ptr);
|
||||
}
|
||||
RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(_state, _row_descriptor));
|
||||
RETURN_IF_ERROR(new_vconjunct_ctx_ptr->open(_state));
|
||||
if (_vconjunct_ctx_ptr) {
|
||||
_stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
|
||||
_stale_vexpr_ctxs.push_back(_vconjunct_ctx_ptr);
|
||||
}
|
||||
_vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
|
||||
*_vconjunct_ctx_ptr = new_vconjunct_ctx_ptr;
|
||||
_vconjunct_ctx_ptr = new_vconjunct_ctx_ptr;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -468,10 +467,10 @@ void VScanNode::release_resource(RuntimeState* state) {
|
||||
}
|
||||
|
||||
for (auto& ctx : _stale_vexpr_ctxs) {
|
||||
(*ctx)->close(state);
|
||||
ctx->close(state);
|
||||
}
|
||||
if (_common_vexpr_ctxs_pushdown) {
|
||||
(*_common_vexpr_ctxs_pushdown)->close(state);
|
||||
_common_vexpr_ctxs_pushdown->close(state);
|
||||
}
|
||||
|
||||
ExecNode::release_resource(state);
|
||||
@ -538,18 +537,18 @@ Status VScanNode::_normalize_conjuncts() {
|
||||
}
|
||||
}
|
||||
if (_vconjunct_ctx_ptr) {
|
||||
if ((*_vconjunct_ctx_ptr)->root()) {
|
||||
if (_vconjunct_ctx_ptr->root()) {
|
||||
VExpr* new_root;
|
||||
RETURN_IF_ERROR(_normalize_predicate((*_vconjunct_ctx_ptr)->root(), &new_root));
|
||||
RETURN_IF_ERROR(_normalize_predicate(_vconjunct_ctx_ptr->root(), &new_root));
|
||||
if (new_root) {
|
||||
(*_vconjunct_ctx_ptr)->set_root(new_root);
|
||||
_vconjunct_ctx_ptr->set_root(new_root);
|
||||
if (_should_push_down_common_expr()) {
|
||||
_common_vexpr_ctxs_pushdown = std::move(_vconjunct_ctx_ptr);
|
||||
_vconjunct_ctx_ptr.reset(nullptr);
|
||||
_common_vexpr_ctxs_pushdown = _vconjunct_ctx_ptr;
|
||||
_vconjunct_ctx_ptr = nullptr;
|
||||
}
|
||||
} else { // All conjucts are pushed down as predicate column
|
||||
_stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
|
||||
_vconjunct_ctx_ptr.reset(nullptr);
|
||||
_stale_vexpr_ctxs.push_back(_vconjunct_ctx_ptr);
|
||||
_vconjunct_ctx_ptr = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -604,7 +603,7 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output
|
||||
SlotDescriptor* slot = nullptr;
|
||||
ColumnValueRangeType* range = nullptr;
|
||||
PushDownType pdt = PushDownType::UNACCEPTABLE;
|
||||
RETURN_IF_ERROR(_eval_const_conjuncts(cur_expr, *_vconjunct_ctx_ptr, &pdt));
|
||||
RETURN_IF_ERROR(_eval_const_conjuncts(cur_expr, _vconjunct_ctx_ptr, &pdt));
|
||||
if (pdt == PushDownType::ACCEPTABLE) {
|
||||
*output_expr = nullptr;
|
||||
return Status::OK();
|
||||
@ -618,28 +617,23 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output
|
||||
is_runtimer_filter_predicate);
|
||||
}};
|
||||
RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate(
|
||||
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range,
|
||||
&pdt));
|
||||
cur_expr, _vconjunct_ctx_ptr, slot, value_range, &pdt));
|
||||
RETURN_IF_PUSH_DOWN(_normalize_not_in_and_not_eq_predicate(
|
||||
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range,
|
||||
&pdt));
|
||||
cur_expr, _vconjunct_ctx_ptr, slot, value_range, &pdt));
|
||||
RETURN_IF_PUSH_DOWN(_normalize_is_null_predicate(
|
||||
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range,
|
||||
&pdt));
|
||||
cur_expr, _vconjunct_ctx_ptr, slot, value_range, &pdt));
|
||||
RETURN_IF_PUSH_DOWN(_normalize_noneq_binary_predicate(
|
||||
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range,
|
||||
&pdt));
|
||||
cur_expr, _vconjunct_ctx_ptr, slot, value_range, &pdt));
|
||||
RETURN_IF_PUSH_DOWN(_normalize_match_predicate(
|
||||
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range,
|
||||
&pdt));
|
||||
cur_expr, _vconjunct_ctx_ptr, slot, value_range, &pdt));
|
||||
if (_is_key_column(slot->col_name())) {
|
||||
RETURN_IF_PUSH_DOWN(_normalize_bitmap_filter(
|
||||
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, &pdt));
|
||||
cur_expr, _vconjunct_ctx_ptr, slot, &pdt));
|
||||
RETURN_IF_PUSH_DOWN(_normalize_bloom_filter(
|
||||
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, &pdt));
|
||||
cur_expr, _vconjunct_ctx_ptr, slot, &pdt));
|
||||
if (_state->enable_function_pushdown()) {
|
||||
RETURN_IF_PUSH_DOWN(_normalize_function_filters(
|
||||
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, &pdt));
|
||||
cur_expr, _vconjunct_ctx_ptr, slot, &pdt));
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -648,7 +642,7 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output
|
||||
|
||||
if (pdt == PushDownType::UNACCEPTABLE &&
|
||||
TExprNodeType::COMPOUND_PRED == cur_expr->node_type()) {
|
||||
_normalize_compound_predicate(cur_expr, *(_vconjunct_ctx_ptr.get()), &pdt,
|
||||
_normalize_compound_predicate(cur_expr, _vconjunct_ctx_ptr, &pdt,
|
||||
is_runtimer_filter_predicate, in_predicate_checker,
|
||||
eq_predicate_checker);
|
||||
*output_expr = conjunct_expr_root; // remaining in conjunct tree
|
||||
@ -676,18 +670,18 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output
|
||||
} else {
|
||||
if (left_child == nullptr) {
|
||||
conjunct_expr_root->children()[0]->close(
|
||||
_state, *_vconjunct_ctx_ptr,
|
||||
(*_vconjunct_ctx_ptr)->get_function_state_scope());
|
||||
_state, _vconjunct_ctx_ptr,
|
||||
_vconjunct_ctx_ptr->get_function_state_scope());
|
||||
}
|
||||
if (right_child == nullptr) {
|
||||
conjunct_expr_root->children()[1]->close(
|
||||
_state, *_vconjunct_ctx_ptr,
|
||||
(*_vconjunct_ctx_ptr)->get_function_state_scope());
|
||||
_state, _vconjunct_ctx_ptr,
|
||||
_vconjunct_ctx_ptr->get_function_state_scope());
|
||||
}
|
||||
// here only close the and expr self, do not close the child
|
||||
conjunct_expr_root->set_children({});
|
||||
conjunct_expr_root->close(_state, *_vconjunct_ctx_ptr,
|
||||
(*_vconjunct_ctx_ptr)->get_function_state_scope());
|
||||
conjunct_expr_root->close(_state, _vconjunct_ctx_ptr,
|
||||
_vconjunct_ctx_ptr->get_function_state_scope());
|
||||
}
|
||||
|
||||
// here do not close VExpr* now
|
||||
@ -1358,7 +1352,7 @@ Status VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
|
||||
Status VScanNode::clone_vconjunct_ctx(VExprContext** _vconjunct_ctx) {
|
||||
if (_vconjunct_ctx_ptr) {
|
||||
std::unique_lock l(_rf_locks);
|
||||
return (*_vconjunct_ctx_ptr)->clone(_state, _vconjunct_ctx);
|
||||
return _vconjunct_ctx_ptr->clone(_state, _vconjunct_ctx);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -269,7 +269,6 @@ protected:
|
||||
// Set to true if the runtime filter is ready.
|
||||
std::vector<bool> _runtime_filter_ready_flag;
|
||||
doris::Mutex _rf_locks;
|
||||
std::map<int, RuntimeFilterContext*> _conjunct_id_to_runtime_filter_ctxs;
|
||||
phmap::flat_hash_set<VExpr*> _rf_vexpr_set;
|
||||
// True means all runtime filters are applied to scanners
|
||||
bool _is_all_rf_applied = true;
|
||||
@ -322,8 +321,8 @@ protected:
|
||||
|
||||
// Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector
|
||||
// so that it will be destroyed uniformly at the end of the query.
|
||||
std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
|
||||
std::unique_ptr<VExprContext*> _common_vexpr_ctxs_pushdown = nullptr;
|
||||
std::vector<VExprContext*> _stale_vexpr_ctxs;
|
||||
VExprContext* _common_vexpr_ctxs_pushdown = nullptr;
|
||||
|
||||
// If sort info is set, push limit to each scanner;
|
||||
int64_t _limit_per_scanner = -1;
|
||||
|
||||
@ -40,10 +40,10 @@ VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, Runtim
|
||||
_is_load = (_input_tuple_desc != nullptr);
|
||||
}
|
||||
|
||||
Status VScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) {
|
||||
Status VScanner::prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr) {
|
||||
if (vconjunct_ctx_ptr != nullptr) {
|
||||
// Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx.
|
||||
RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
|
||||
RETURN_IF_ERROR(vconjunct_ctx_ptr->clone(_state, &_vconjunct_ctx));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -77,7 +77,7 @@ protected:
|
||||
Status _filter_output_block(Block* block);
|
||||
|
||||
// Not virtual, all child will call this method explictly
|
||||
Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr);
|
||||
Status prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr);
|
||||
|
||||
public:
|
||||
VScanNode* get_parent() { return _parent; }
|
||||
|
||||
@ -473,9 +473,9 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
|
||||
std::bind<void>(&AggregationNode::_update_memusage_with_serialized_key, this);
|
||||
_executor.close = std::bind<void>(&AggregationNode::_close_with_serialized_key, this);
|
||||
|
||||
_should_limit_output = _limit != -1 && // has limit
|
||||
!_vconjunct_ctx_ptr && // no having conjunct
|
||||
_needs_finalize; // agg's finalize step
|
||||
_should_limit_output = _limit != -1 && // has limit
|
||||
_vconjunct_ctx_ptr == nullptr && // no having conjunct
|
||||
_needs_finalize; // agg's finalize step
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
|
||||
@ -88,7 +88,7 @@ void VExprContext::close(doris::RuntimeState* state) {
|
||||
}
|
||||
|
||||
doris::Status VExprContext::clone(RuntimeState* state, VExprContext** new_ctx) {
|
||||
DCHECK(_prepared);
|
||||
DCHECK(_prepared) << "expr context not prepared";
|
||||
DCHECK(_opened);
|
||||
DCHECK(*new_ctx == nullptr);
|
||||
|
||||
@ -127,17 +127,6 @@ Status VExprContext::filter_block(VExprContext* vexpr_ctx, Block* block, int col
|
||||
return Block::filter_block(block, result_column_id, column_to_keep);
|
||||
}
|
||||
|
||||
Status VExprContext::filter_block(const std::unique_ptr<VExprContext*>& vexpr_ctx_ptr, Block* block,
|
||||
int column_to_keep) {
|
||||
if (vexpr_ctx_ptr == nullptr || block->rows() == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
DCHECK((*vexpr_ctx_ptr) != nullptr);
|
||||
int result_column_id = -1;
|
||||
RETURN_IF_ERROR((*vexpr_ctx_ptr)->execute(block, &result_column_id));
|
||||
return Block::filter_block(block, result_column_id, column_to_keep);
|
||||
}
|
||||
|
||||
Block VExprContext::get_output_block_after_execute_exprs(
|
||||
const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs, const Block& input_block,
|
||||
Status& status) {
|
||||
|
||||
@ -68,8 +68,6 @@ public:
|
||||
|
||||
[[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block,
|
||||
int column_to_keep);
|
||||
[[nodiscard]] static Status filter_block(const std::unique_ptr<VExprContext*>& vexpr_ctx_ptr,
|
||||
Block* block, int column_to_keep);
|
||||
|
||||
static Block get_output_block_after_execute_exprs(const std::vector<vectorized::VExprContext*>&,
|
||||
const Block&, Status&);
|
||||
|
||||
Reference in New Issue
Block a user