From c25a7235f9b07acdd0637d5c7145ee5a5c81f614 Mon Sep 17 00:00:00 2001 From: Pxl Date: Tue, 13 Dec 2022 00:28:36 +0800 Subject: [PATCH] [Pipeline](load) support pipeline broker load (#14940) support pipeline broker load --- be/src/exec/exec_node.cpp | 8 +-- be/src/exec/exec_node.h | 6 +- be/src/pipeline/exec/broker_scan_operator.h | 59 +++++++++++++++++++ be/src/pipeline/exec/operator.h | 1 + be/src/pipeline/pipeline.h | 11 ++-- be/src/pipeline/pipeline_fragment_context.cpp | 37 +++++++----- be/src/pipeline/pipeline_fragment_context.h | 13 ++-- be/src/pipeline/pipeline_task.cpp | 5 +- be/src/pipeline/pipeline_task.h | 1 - be/src/runtime/fragment_mgr.cpp | 8 +-- .../stream_load/stream_load_executor.cpp | 2 - be/src/vec/exec/scan/new_file_scan_node.cpp | 3 - be/src/vec/exec/scan/new_file_scan_node.h | 1 - be/src/vec/exec/vbroker_scan_node.h | 6 +- be/src/vec/exec/vbroker_scanner.cpp | 7 +-- 15 files changed, 108 insertions(+), 60 deletions(-) create mode 100644 be/src/pipeline/exec/broker_scan_operator.h diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 5849b455cb..939954dd01 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -38,7 +38,6 @@ #include "exec/hash_join_node.h" #include "exec/intersect_node.h" #include "exec/merge_node.h" -#include "exec/mysql_scan_node.h" #include "exec/odbc_scan_node.h" #include "exec/olap_scan_node.h" #include "exec/partitioned_aggregation_node.h" @@ -122,7 +121,7 @@ int ExecNode::RowBatchQueue::Cleanup() { // delete batch; // } - lock_guard l(lock_); + std::lock_guard l(lock_); for (std::list::iterator it = cleanup_queue_.begin(); it != cleanup_queue_.end(); ++it) { // num_io_buffers += (*it)->num_io_buffers(); @@ -144,7 +143,6 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl _rows_returned_counter(nullptr), _rows_returned_rate(nullptr), _memory_used_counter(nullptr), - _get_next_span(), _is_closed(false), _ref(0) { if (tnode.__isset.output_tuple_id) { @@ -321,7 +319,7 @@ Status ExecNode::close(RuntimeState* state) { } void ExecNode::add_runtime_exec_option(const std::string& str) { - lock_guard l(_exec_options_lock); + std::lock_guard l(_exec_options_lock); if (_runtime_exec_options.empty()) { _runtime_exec_options = str; @@ -656,7 +654,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN } default: - map::const_iterator i = + std::map::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); const char* str = "unknown node type"; diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 3af9967fc0..82951632e4 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -44,7 +44,6 @@ class RowBatch; class RuntimeState; class TPlan; class TupleRow; -class DataSink; class MemTracker; namespace vectorized { @@ -61,9 +60,6 @@ class OperatorBase; using std::string; using std::stringstream; using std::vector; -using std::map; -using std::lock_guard; -using std::mutex; // Superclass of all executor nodes. // All subclasses need to make sure to check RuntimeState::is_cancelled() @@ -196,7 +192,7 @@ public: // This improve is cautious, we ensure the correctness firstly. void try_do_aggregate_serde_improve(); - typedef bool (*EvalConjunctsFn)(ExprContext* const* ctxs, int num_ctxs, TupleRow* row); + using EvalConjunctsFn = bool (*)(ExprContext* const*, int, TupleRow*); // Evaluate exprs over row. Returns true if all exprs return true. // TODO: This doesn't use the vector signature because I haven't figured // out how to deal with declaring a templated std:vector type in IR diff --git a/be/src/pipeline/exec/broker_scan_operator.h b/be/src/pipeline/exec/broker_scan_operator.h new file mode 100644 index 0000000000..584ad8c472 --- /dev/null +++ b/be/src/pipeline/exec/broker_scan_operator.h @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "vec/exec/vbroker_scan_node.h" + +namespace doris::pipeline { + +class BrokerScanOperatorBuilder : public OperatorBuilder { +public: + BrokerScanOperatorBuilder(int32_t id, ExecNode* node) + : OperatorBuilder(id, "BrokerScanOperator", node) {} + bool is_source() const override { return true; } + OperatorPtr build_operator() override; +}; + +class BrokerScanOperator : public SourceOperator { +public: + BrokerScanOperator(OperatorBuilderBase* operator_builder, ExecNode* scan_node) + : SourceOperator(operator_builder, scan_node) {} + + bool can_read() override { return _node->can_read(); } + + bool is_pending_finish() const override { return !_node->can_finish(); } + + Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(SourceOperator::open(state)); + return _node->open(state); + } + + Status close(RuntimeState* state) override { + RETURN_IF_ERROR(SourceOperator::close(state)); + _node->close(state); + return Status::OK(); + } +}; + +OperatorPtr BrokerScanOperatorBuilder::build_operator() { + return std::make_shared(this, _node); +} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 68870fc57b..46c79c8470 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -20,6 +20,7 @@ #include #include "common/status.h" +#include "exec/data_sink.h" #include "exec/exec_node.h" #include "runtime/runtime_state.h" #include "vec/core/block.h" diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 0addd4464b..595f16ba63 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -18,11 +18,11 @@ #pragma once #include +#include #include #include "common/status.h" #include "exec/operator.h" -#include "vec/core/block.h" namespace doris::pipeline { @@ -31,16 +31,13 @@ using PipelinePtr = std::shared_ptr; using Pipelines = std::vector; using PipelineId = uint32_t; -class PipelineTask; -class PipelineFragmentContext; - class Pipeline : public std::enable_shared_from_this { friend class PipelineTask; public: Pipeline() = delete; - explicit Pipeline(PipelineId pipeline_id, std::shared_ptr context) - : _complete_dependency(0), _pipeline_id(pipeline_id), _context(std::move(context)) {} + explicit Pipeline(PipelineId pipeline_id, std::weak_ptr context) + : _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) {} Status prepare(RuntimeState* state); @@ -82,7 +79,7 @@ private: std::vector> _dependencies; PipelineId _pipeline_id; - std::shared_ptr _context; + std::weak_ptr _context; std::unique_ptr _pipeline_profile; }; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index a034b2a9a0..ecb79bb2d0 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -17,6 +17,7 @@ #include "pipeline_fragment_context.h" +#include #include #include "exec/agg_context.h" @@ -49,6 +50,7 @@ #include "gen_cpp/FrontendService.h" #include "gen_cpp/HeartbeatService_types.h" #include "pipeline/exec/assert_num_rows_operator.h" +#include "pipeline/exec/broker_scan_operator.h" #include "pipeline/exec/const_value_operator.h" #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" @@ -81,22 +83,21 @@ using apache::thrift::TException; namespace doris::pipeline { -PipelineFragmentContext::PipelineFragmentContext(const TUniqueId& query_id, - const TUniqueId& instance_id, int backend_num, - std::shared_ptr query_ctx, - ExecEnv* exec_env) +PipelineFragmentContext::PipelineFragmentContext( + const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num, + std::shared_ptr query_ctx, ExecEnv* exec_env, + std::function call_back) : _query_id(query_id), - _fragment_instance_id(instance_id), + _fragment_id(instance_id), _backend_num(backend_num), _exec_env(exec_env), _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR), _closed_pipeline_cnt(0), - _query_ctx(std::move(query_ctx)) { + _query_ctx(std::move(query_ctx)), + _call_back(call_back) { _fragment_watcher.start(); } -PipelineFragmentContext::~PipelineFragmentContext() = default; - void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { if (!_runtime_state->is_cancelled()) { @@ -114,7 +115,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, _query_ctx->set_ready_to_execute(true); // must close stream_mgr to avoid dead lock in Exchange Node - _exec_env->vstream_mgr()->cancel(_fragment_instance_id); + _exec_env->vstream_mgr()->cancel(_fragment_id); // Cancel the result queue manager used by spark doris connector // TODO pipeline incomp // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); @@ -124,7 +125,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, PipelinePtr PipelineFragmentContext::add_pipeline() { // _prepared、_submitted, _canceled should do not add pipeline PipelineId id = _next_pipeline_id++; - auto pipeline = std::make_shared(id, shared_from_this()); + auto pipeline = std::make_shared(id, weak_from_this()); _pipelines.emplace_back(pipeline); return pipeline; } @@ -301,9 +302,17 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur auto node_type = node->type(); switch (node_type) { // for source + case TPlanNodeType::BROKER_SCAN_NODE: { + OperatorBuilderPtr operator_t = std::make_shared( + fragment_context->next_operator_builder_id(), node); + RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); + break; + } case TPlanNodeType::OLAP_SCAN_NODE: case TPlanNodeType::JDBC_SCAN_NODE: - case TPlanNodeType::ODBC_SCAN_NODE: { + case TPlanNodeType::ODBC_SCAN_NODE: + case TPlanNodeType::FILE_SCAN_NODE: + case TPlanNodeType::ES_SCAN_NODE: { OperatorBuilderPtr operator_t = std::make_shared( fragment_context->next_operator_builder_id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); @@ -600,7 +609,7 @@ void PipelineFragmentContext::send_report(bool done) { params.protocol_version = FrontendServiceVersion::V1; params.__set_query_id(_query_id); params.__set_backend_num(_backend_num); - params.__set_fragment_instance_id(_fragment_instance_id); + params.__set_fragment_instance_id(_fragment_id); exec_status.set_t_status(¶ms); params.__set_done(true); @@ -685,8 +694,8 @@ void PipelineFragmentContext::send_report(bool done) { coord->reportExecStatus(res, params); } catch (TTransportException& e) { LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(_query_id) - << ", instance id: " << print_id(_fragment_instance_id) << " to " - << coord_addr << ", err: " << e.what(); + << ", instance id: " << print_id(_fragment_id) << " to " << coord_addr + << ", err: " << e.what(); rpc_status = coord.reopen(); if (!rpc_status.ok()) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index fd1d50ebff..5302cc83b5 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -17,7 +17,8 @@ #pragma once -#include "pipeline.h" +#include "pipeline/pipeline.h" +#include "pipeline/pipeline_task.h" #include "runtime/runtime_state.h" namespace doris { @@ -37,9 +38,10 @@ class PipelineFragmentContext : public std::enable_shared_from_this query_ctx, - ExecEnv* exec_env); + ExecEnv* exec_env, + std::function call_back); - virtual ~PipelineFragmentContext(); + ~PipelineFragmentContext() { _call_back(_runtime_state.get(), &_exec_status); } PipelinePtr add_pipeline(); @@ -80,13 +82,11 @@ public: private: // Id of this query TUniqueId _query_id; - // Id of this instance - TUniqueId _fragment_instance_id; + TUniqueId _fragment_id; int _backend_num; ExecEnv* _exec_env; - TUniqueId _fragment_id; bool _prepared = false; bool _submitted = false; @@ -129,6 +129,7 @@ private: template Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr); + std::function _call_back; }; } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 6e867d5959..250dfc2bab 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -75,9 +75,8 @@ bool PipelineTask::has_dependency() { if (_pipeline->has_dependency()) { return true; } - // FE do not call execute - if (!_state->get_query_fragments_ctx() - ->is_ready_to_execute()) { // TODO pipeline config::s_ready_to_execute + + if (!query_fragments_context()->is_ready_to_execute()) { return true; } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index a7f0dffec0..34cce073e6 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -167,7 +167,6 @@ private: void _init_profile(); void _init_state(); -private: uint32_t _index; PipelinePtr _pipeline; bool _dependency_finish = false; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1ed44838a4..f1354140ac 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -74,7 +74,6 @@ std::string to_load_error_http_path(const std::string& file_name) { } using apache::thrift::TException; -using apache::thrift::TProcessor; using apache::thrift::transport::TTransportException; class RuntimeProfile; @@ -547,8 +546,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { set_pipe(params.params.fragment_instance_id, pipe); return Status::OK(); } else { - return exec_plan_fragment(params, std::bind(&empty_function, std::placeholders::_1, - std::placeholders::_2)); + return exec_plan_fragment(params, empty_function); } } @@ -750,11 +748,10 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { fragments_ctx->set_ready_to_execute_only(); } - std::shared_ptr context = std::make_shared( fragments_ctx->query_id, fragment_instance_id, params.backend_num, - fragments_ctx, _exec_env); + fragments_ctx, _exec_env, cb); { SCOPED_RAW_TIMER(&duration_ns); RETURN_IF_ERROR(context->prepare(params)); @@ -767,7 +764,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi _cv.notify_all(); } auto st = context->submit(); - cb(context->get_runtime_state(), &st); if (!st.ok()) { context->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "submit context fail"); remove_pipeline_context(context); diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 34336ed427..61244bbd9f 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -50,8 +50,6 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) { ctx->commit_infos = std::move(state->tablet_commit_infos()); if (status->ok()) { - LOG(WARNING) << "MYTEST " << int64_t(state) << " " - << state->num_rows_load_total(); ctx->number_total_rows = state->num_rows_load_total(); ctx->number_loaded_rows = state->num_rows_load_success(); ctx->number_filtered_rows = state->num_rows_load_filtered(); diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index 49319f1e6b..8da34c5b65 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -17,10 +17,7 @@ #include "vec/exec/scan/new_file_scan_node.h" -#include "vec/columns/column_const.h" -#include "vec/exec/scan/new_olap_scanner.h" #include "vec/exec/scan/vfile_scanner.h" -#include "vec/functions/in.h" namespace doris::vectorized { diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h index 53b11e408d..12148da925 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.h +++ b/be/src/vec/exec/scan/new_file_scan_node.h @@ -39,7 +39,6 @@ protected: private: VScanner* _create_scanner(const TFileScanRange& scan_range); -private: std::vector _scan_ranges; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h index 98b4afc677..452415014f 100644 --- a/be/src/vec/exec/vbroker_scan_node.h +++ b/be/src/vec/exec/vbroker_scan_node.h @@ -24,6 +24,7 @@ #include "exec/scan_node.h" #include "gen_cpp/PaloInternalService_types.h" #include "runtime/descriptors.h" + namespace doris { class RuntimeState; @@ -45,7 +46,7 @@ public: Status open(RuntimeState* state) override; // Fill the next row batch by calling next() on the scanner, - virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override { + Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override { return Status::NotSupported("Not Implemented VBrokerScanNode::get_next."); } @@ -57,6 +58,9 @@ public: // No use Status set_scan_ranges(const std::vector& scan_ranges) override; + bool can_read() { return true; } + bool can_finish() const { return _num_running_scanners == 0; } + private: // Write debug string of this into out. void debug_string(int indentation_level, std::stringstream* out) const override; diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp index eaddb6ff10..df9f7e79ce 100644 --- a/be/src/vec/exec/vbroker_scanner.cpp +++ b/be/src/vec/exec/vbroker_scanner.cpp @@ -19,14 +19,9 @@ #include -#include - -#include "exec/exec_node.h" -#include "exec/plain_text_line_reader.h" +#include "exec/line_reader.h" #include "exec/text_converter.h" #include "exec/text_converter.hpp" -#include "exprs/expr_context.h" -#include "util/utf8_check.h" namespace doris::vectorized {