[vectorized](pipeline) support assert num rows operator (#14923)

This commit is contained in:
zhangstar333
2022-12-09 09:39:29 +08:00
committed by GitHub
parent b311ebef6c
commit 20f2abb3d4
4 changed files with 70 additions and 10 deletions

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "operator.h"
#include "vec/exec/vassert_num_rows_node.h"
namespace doris {
namespace pipeline {
class AssertNumRowsOperatorBuilder final : public OperatorBuilder<vectorized::VAssertNumRowsNode> {
public:
AssertNumRowsOperatorBuilder(int32_t id, ExecNode* node)
: OperatorBuilder(id, "AssertNumRowsOperatorBuilder", node) {};
OperatorPtr build_operator() override;
};
class AssertNumRowsOperator final : public Operator<AssertNumRowsOperatorBuilder> {
public:
AssertNumRowsOperator(OperatorBuilderBase* operator_builder, ExecNode* node)
: Operator(operator_builder, node) {};
};
OperatorPtr AssertNumRowsOperatorBuilder::build_operator() {
return std::make_shared<AssertNumRowsOperator>(this, _node);
}
} // namespace pipeline
} // namespace doris

View File

@ -39,7 +39,9 @@
#include "exec/streaming_aggregation_source_operator.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "pipeline/exec/assert_num_rows_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/table_function_operator.h"
#include "pipeline_task.h"
#include "runtime/client_cache.h"
@ -351,6 +353,13 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr builder =
std::make_shared<AssertNumRowsOperatorBuilder>(next_operator_builder_id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
case TPlanNodeType::TABLE_FUNCTION_NODE: {
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr builder =

View File

@ -18,8 +18,6 @@
#include "vec/exec/vassert_num_rows_node.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gutil/strings/substitute.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
@ -47,12 +45,7 @@ Status VAssertNumRowsNode::open(RuntimeState* state) {
return Status::OK();
}
Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos) {
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"VAssertNumRowsNode::get_next");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, block, eos),
child(0)->get_next_span(), *eos);
Status VAssertNumRowsNode::pull(doris::RuntimeState* state, vectorized::Block* block, bool* eos) {
_num_rows_returned += block->rows();
bool assert_res = false;
switch (_assertion) {
@ -98,4 +91,14 @@ Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos
return Status::OK();
}
Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos) {
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"VAssertNumRowsNode::get_next");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, block, eos),
child(0)->get_next_span(), *eos);
return pull(state, block, eos);
}
} // namespace doris::vectorized

View File

@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "common/status.h"
#include "exec/exec_node.h"
#include "gen_cpp/PlanNodes_types.h"
@ -30,8 +31,9 @@ public:
return Status::NotSupported("Not Implemented VAnalyticEvalNode::get_next.");
}
virtual Status open(RuntimeState* state) override;
virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
Status open(RuntimeState* state) override;
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
private:
int64_t _desired_num_rows;