[fix](pipeline) Fix non-prepared execute of UnionOperator (#30355)

This commit is contained in:
zclllyybb
2024-01-26 22:12:48 +08:00
committed by yiguolei
parent d51cbf00f4
commit d191809372
5 changed files with 24 additions and 10 deletions

View File

@ -78,6 +78,17 @@ public:
// If overridden in subclass, must first call superclass's prepare().
[[nodiscard]] virtual Status prepare(RuntimeState* state);
/*
* For open and alloc_resource:
* Base class ExecNode's `open` only calls `alloc_resource`, which opens some public projections.
* If was overrided, `open` must call corresponding `alloc_resource` since it's a (early) part of opening.
* Or just call `ExecNode::open` is alternative way.
* Then `alloc_resource` call father's after it's own business to make the progress completed, including the projections.
* In Pipeline engine:
* PipeContext::prepare -> node::prepare
* Task::open -> StreamingOp::open -> node::alloc_resource, for sink+source splits, only open in SinkOperator.
* So in pipeline, the things directly done by open(like call child's) wouldn't be done in `open`.
*/
// Performs any preparatory work prior to calling get_next().
// Can be called repeatedly (after calls to close()).
// Caller must not be holding any io buffers. This will cause deadlock.

View File

@ -54,8 +54,10 @@ bool UnionSourceOperator::_has_data() {
// we assumed it can read to process const expr, Although we don't know whether there is
// ,and queue have data, could read also
// The source operator's run dependences on Node's alloc_resource, which is called in Sink's open.
// So hang until SinkOperator was scheduled to open.
bool UnionSourceOperator::can_read() {
return _has_data() || _data_queue->is_all_finish();
return _node->resource_allocated() && (_has_data() || _data_queue->is_all_finish());
}
Status UnionSourceOperator::pull_data(RuntimeState* state, vectorized::Block* block, bool* eos) {

View File

@ -80,6 +80,7 @@ Status VUnionNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
_materialize_exprs_evaluate_timer =
ADD_TIMER(_runtime_profile, "MaterializeExprsEvaluateTimer");
// Prepare const expr lists.
for (const VExprContextSPtrs& exprs : _const_expr_lists) {
RETURN_IF_ERROR(VExpr::prepare(exprs, state, _row_descriptor));
@ -93,7 +94,7 @@ Status VUnionNode::prepare(RuntimeState* state) {
}
Status VUnionNode::open(RuntimeState* state) {
RETURN_IF_ERROR(alloc_resource(state));
RETURN_IF_ERROR(ExecNode::open(state)); // exactly same with this->alloc_resource()
// Ensures that rows are available for clients to fetch after this open() has
// succeeded.
if (!_children.empty()) {

View File

@ -18,8 +18,8 @@
#pragma once
#include <glog/logging.h>
#include <stddef.h>
#include <cstddef>
#include <iosfwd>
#include <vector>
@ -64,6 +64,8 @@ public:
/// GetNext() for the constant expression case.
Status get_next_const(RuntimeState* state, Block* block);
bool resource_allocated() const { return _resource_allocated; }
private:
/// Const exprs materialized by this node. These exprs don't refer to any children.
/// Only materialized by the first fragment instance to avoid duplication.

View File

@ -20,13 +20,10 @@
#include <fmt/format.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <stddef.h>
#include <algorithm>
#include <exception>
#include <cstddef>
#include <memory>
#include <ostream>
#include <vector>
#include "common/exception.h"
#include "common/status.h"
@ -87,8 +84,8 @@ const DataTypePtr& VCastExpr::get_target_type() const {
doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
DCHECK(_prepare_finished);
for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->open(state, context, scope));
for (auto& i : _children) {
RETURN_IF_ERROR(i->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
@ -105,7 +102,8 @@ void VCastExpr::close(VExprContext* context, FunctionContext::FunctionStateScope
doris::Status VCastExpr::execute(VExprContext* context, doris::vectorized::Block* block,
int* result_column_id) {
DCHECK(_open_finished || _getting_const_col);
DCHECK(_open_finished || _getting_const_col)
<< _open_finished << _getting_const_col << _expr_name;
// for each child call execute
int column_id = 0;
RETURN_IF_ERROR(_children[0]->execute(context, block, &column_id));