diff --git a/be/src/pipeline/exec/empty_set_operator.h b/be/src/pipeline/exec/empty_set_operator.h index a0a1998971..8cffcd2783 100644 --- a/be/src/pipeline/exec/empty_set_operator.h +++ b/be/src/pipeline/exec/empty_set_operator.h @@ -18,13 +18,10 @@ #pragma once #include "operator.h" +#include "vec/exec/vempty_set_node.h" namespace doris { -namespace vectorized { -class VEmptySetNode; -} - namespace pipeline { class EmptySetSourceOperatorBuilder final : public OperatorBuilder { diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 2ee2db1c9c..689faefa55 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -26,7 +26,6 @@ #include "pipeline/pipeline_fragment_context.h" #include "service/brpc.h" #include "util/proto_util.h" -#include "util/time.h" #include "vec/sink/vdata_stream_sender.h" namespace doris::pipeline { @@ -64,7 +63,6 @@ public: } } -public: brpc::Controller cntl; T result; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 1eba3cc25b..95e2bdfbcb 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -23,9 +23,10 @@ #include #include +#include "common/global_types.h" +#include "common/status.h" #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" -#include "runtime/runtime_state.h" namespace doris { namespace vectorized { @@ -74,7 +75,6 @@ private: PipelineFragmentContext* _context; -private: Status _send_rpc(InstanceLoId); // must hold the _instance_to_package_queue_mutex[id] mutex to opera void _construct_request(InstanceLoId id); diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h b/be/src/pipeline/exec/olap_table_sink_operator.h new file mode 100644 index 0000000000..b26d167eca --- /dev/null +++ b/be/src/pipeline/exec/olap_table_sink_operator.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "vec/sink/vtablet_sink.h" + +namespace doris { + +namespace pipeline { + +class OlapTableSinkOperatorBuilder final + : public DataSinkOperatorBuilder { +public: + OlapTableSinkOperatorBuilder(int32_t id, DataSink* sink) + : DataSinkOperatorBuilder(id, "OlapTableSinkOperator", sink) {}; + + OperatorPtr build_operator() override; +}; + +class OlapTableSinkOperator final : public DataSinkOperator { +public: + OlapTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) + : DataSinkOperator(operator_builder, sink) {}; + + bool can_write() override { return true; } // TODO: need use mem_limit +}; + +OperatorPtr OlapTableSinkOperatorBuilder::build_operator() { + return std::make_shared(this, _sink); +} + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 461848d54a..db395084f6 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -98,7 +98,7 @@ public: OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) : OperatorBuilderBase(id, name), _node(reinterpret_cast(exec_node)) {} - virtual ~OperatorBuilder() = default; + ~OperatorBuilder() override = default; const RowDescriptor& row_desc() override { return _node->row_desc(); } @@ -114,7 +114,7 @@ public: DataSinkOperatorBuilder(int32_t id, const std::string& name, DataSink* sink = nullptr) : OperatorBuilderBase(id, name), _sink(reinterpret_cast(sink)) {} - virtual ~DataSinkOperatorBuilder() = default; + ~DataSinkOperatorBuilder() override = default; bool is_sink() const override { return true; } @@ -232,9 +232,9 @@ public: DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) : OperatorBase(builder), _sink(reinterpret_cast(sink)) {}; - virtual ~DataSinkOperator() = default; + ~DataSinkOperator() override = default; - virtual Status prepare(RuntimeState* state) override { + Status prepare(RuntimeState* state) override { RETURN_IF_ERROR(_sink->prepare(state)); _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); _sink->profile()->insert_child_head(_runtime_profile.get(), true); @@ -243,13 +243,13 @@ public: return Status::OK(); } - virtual Status open(RuntimeState* state) override { + Status open(RuntimeState* state) override { SCOPED_TIMER(_runtime_profile->total_time_counter()); return _sink->open(state); } - virtual Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override { + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { SCOPED_TIMER(_runtime_profile->total_time_counter()); if (!UNLIKELY(in_block)) { DCHECK(source_state == SourceState::FINISHED) @@ -259,12 +259,12 @@ public: return _sink->send(state, in_block, source_state == SourceState::FINISHED); } - virtual Status close(RuntimeState* state) override { + Status close(RuntimeState* state) override { _fresh_exec_timer(_sink); return _sink->close(state, Status::OK()); } - virtual Status finalize(RuntimeState* state) override { return Status::OK(); } + Status finalize(RuntimeState* state) override { return Status::OK(); } protected: void _fresh_exec_timer(NodeType* node) { @@ -284,9 +284,9 @@ public: Operator(OperatorBuilderBase* builder, ExecNode* node) : OperatorBase(builder), _node(reinterpret_cast(node)) {}; - virtual ~Operator() = default; + ~Operator() override = default; - virtual Status prepare(RuntimeState* state) override { + Status prepare(RuntimeState* state) override { _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); _mem_tracker = std::make_unique("Operator:" + _runtime_profile->name(), @@ -295,19 +295,19 @@ public: return Status::OK(); } - virtual Status open(RuntimeState* state) override { + Status open(RuntimeState* state) override { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(_node->alloc_resource(state)); return Status::OK(); } - virtual Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override { + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { SCOPED_TIMER(_runtime_profile->total_time_counter()); return _node->sink(state, in_block, source_state == SourceState::FINISHED); } - virtual Status close(RuntimeState* state) override { + Status close(RuntimeState* state) override { _fresh_exec_timer(_node); if (!_node->decrease_ref()) { _node->release_resource(state); @@ -315,8 +315,8 @@ public: return Status::OK(); } - virtual Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override { + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override { SCOPED_TIMER(_runtime_profile->total_time_counter()); bool eos = false; RETURN_IF_ERROR(_node->pull(state, block, &eos)); @@ -324,9 +324,9 @@ public: return Status::OK(); } - virtual Status finalize(RuntimeState* state) override { return Status::OK(); } + Status finalize(RuntimeState* state) override { return Status::OK(); } - virtual bool can_read() override { return _node->can_read(); } + bool can_read() override { return _node->can_read(); } protected: void _fresh_exec_timer(NodeType* node) { @@ -350,8 +350,8 @@ public: virtual ~DataStateOperator() = default; - virtual Status get_block(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) override { + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override { auto& node = Operator::_node; auto& child = Operator::_child; diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp index 70b9fd8e8b..2a6c321db8 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp @@ -44,12 +44,12 @@ Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in SCOPED_TIMER(_runtime_profile->total_time_counter()); Status ret = Status::OK(); if (in_block && in_block->rows() > 0) { - auto bock_from_ctx = _agg_context->get_free_block(); - RETURN_IF_ERROR(_node->do_pre_agg(in_block, bock_from_ctx.get())); - if (bock_from_ctx->rows() == 0) { - _agg_context->return_free_block(std::move(bock_from_ctx)); + auto block_from_ctx = _agg_context->get_free_block(); + RETURN_IF_ERROR(_node->do_pre_agg(in_block, block_from_ctx.get())); + if (block_from_ctx->rows() == 0) { + _agg_context->return_free_block(std::move(block_from_ctx)); } else { - _agg_context->push_block(std::move(bock_from_ctx)); + _agg_context->push_block(std::move(block_from_ctx)); } } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 75cd7dbc3e..10f86905be 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -37,6 +37,7 @@ #include "exec/streaming_aggregation_source_operator.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/HeartbeatService_types.h" +#include "pipeline/exec/olap_table_sink_operator.h" #include "pipeline/exec/table_function_operator.h" #include "pipeline_task.h" #include "runtime/client_cache.h" @@ -48,12 +49,10 @@ #include "vec/exec/scan/new_olap_scan_node.h" #include "vec/exec/scan/vscan_node.h" #include "vec/exec/vaggregation_node.h" -#include "vec/exec/vempty_set_node.h" #include "vec/exec/vexchange_node.h" #include "vec/exec/vrepeat_node.h" #include "vec/exec/vsort_node.h" #include "vec/runtime/vdata_stream_mgr.h" -#include "vec/sink/vdata_stream_sender.h" #include "vec/sink/vresult_sink.h" using apache::thrift::transport::TTransportException; @@ -389,6 +388,11 @@ Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) { _sink.get()); break; } + case TDataSinkType::OLAP_TABLE_SINK: { + sink_ = std::make_shared(next_operator_builder_id(), + _sink.get()); + break; + } default: return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type); } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 30d916a49c..f956c3b557 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -46,7 +46,6 @@ #include "runtime/stream_load/stream_load_pipe.h" #include "runtime/thread_context.h" #include "service/backend_options.h" -#include "util/debug_util.h" #include "util/doris_metrics.h" #include "util/stopwatch.hpp" #include "util/telemetry/telemetry.h" @@ -428,10 +427,7 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil } FragmentMgr::FragmentMgr(ExecEnv* exec_env) - : _exec_env(exec_env), - _fragment_map(), - _fragments_ctx_map(), - _stop_background_threads_latch(1) { + : _exec_env(exec_env), _stop_background_threads_latch(1) { _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count); REGISTER_HOOK_METRIC(plan_fragment_count, [this]() { return _fragment_map.size(); }); @@ -549,13 +545,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { set_pipe(params.params.fragment_instance_id, pipe); return Status::OK(); } else { - if (params.query_options.__isset.enable_pipeline_engine && - params.query_options.enable_pipeline_engine) { - return exec_pipeline(params); - } else { - return exec_plan_fragment(params, - std::bind(&empty_function, std::placeholders::_1)); - } + return exec_plan_fragment(params, std::bind(&empty_function, std::placeholders::_1)); } } @@ -598,120 +588,6 @@ std::shared_ptr FragmentMgr::get_pipe(const TUniqueId& fragment_ } } -Status FragmentMgr::exec_pipeline(const TExecPlanFragmentParams& params) { - auto tracer = telemetry::is_current_span_valid() ? telemetry::get_tracer("tracer") - : telemetry::get_noop_tracer(); - START_AND_SCOPE_SPAN(tracer, span, "FragmentMgr::exec_plan_fragment"); - const TUniqueId& fragment_instance_id = params.params.fragment_instance_id; - { - std::lock_guard lock(_lock); - auto iter = _pipeline_map.find(fragment_instance_id); - if (iter != _pipeline_map.end()) { - // Duplicated - return Status::OK(); - } - } - - std::shared_ptr fragments_ctx; - if (params.is_simplified_param) { - // Get common components from _fragments_ctx_map - std::lock_guard lock(_lock); - auto search = _fragments_ctx_map.find(params.params.query_id); - if (search == _fragments_ctx_map.end()) { - return Status::InternalError( - "Failed to get query fragments context. Query may be " - "timeout or be cancelled. host: {}", - BackendOptions::get_localhost()); - } - fragments_ctx = search->second; - _set_scan_concurrency(params, fragments_ctx.get()); - } else { - // This may be a first fragment request of the query. - // Create the query fragments context. - fragments_ctx = std::make_shared(params.fragment_num_on_host, _exec_env); - fragments_ctx->query_id = params.params.query_id; - RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl, - &(fragments_ctx->desc_tbl))); - fragments_ctx->coord_addr = params.coord; - LOG(INFO) << "query_id: " - << UniqueId(fragments_ctx->query_id.hi, fragments_ctx->query_id.lo) - << " coord_addr " << fragments_ctx->coord_addr; - fragments_ctx->query_globals = params.query_globals; - - if (params.__isset.resource_info) { - fragments_ctx->user = params.resource_info.user; - fragments_ctx->group = params.resource_info.group; - fragments_ctx->set_rsc_info = true; - } - - fragments_ctx->timeout_second = params.query_options.query_timeout; - _set_scan_concurrency(params, fragments_ctx.get()); - - bool has_query_mem_tracker = - params.query_options.__isset.mem_limit && (params.query_options.mem_limit > 0); - int64_t bytes_limit = has_query_mem_tracker ? params.query_options.mem_limit : -1; - if (bytes_limit > MemInfo::mem_limit()) { - VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES) - << " exceeds process memory limit of " - << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES) - << ". Using process memory limit instead"; - bytes_limit = MemInfo::mem_limit(); - } - if (params.query_options.query_type == TQueryType::SELECT) { - fragments_ctx->query_mem_tracker = std::make_shared( - MemTrackerLimiter::Type::QUERY, - fmt::format("Query#Id={}", print_id(fragments_ctx->query_id)), bytes_limit); - } else if (params.query_options.query_type == TQueryType::LOAD) { - fragments_ctx->query_mem_tracker = std::make_shared( - MemTrackerLimiter::Type::LOAD, - fmt::format("Load#Id={}", print_id(fragments_ctx->query_id)), bytes_limit); - } - if (params.query_options.__isset.is_report_success && - params.query_options.is_report_success) { - fragments_ctx->query_mem_tracker->enable_print_log_usage(); - } - - if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { - // 马上运行pipeline task - fragments_ctx->set_ready_to_execute_only(); - } - - { - // Find _fragments_ctx_map again, in case some other request has already - // create the query fragments context. - std::lock_guard lock(_lock); - auto search = _fragments_ctx_map.find(params.params.query_id); - if (search == _fragments_ctx_map.end()) { - _fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx)); - } else { - // Already has a query fragmentscontext, use it - fragments_ctx = search->second; - } - } - } - - std::shared_ptr context = - std::make_shared( - fragments_ctx->query_id, fragment_instance_id, params.backend_num, - fragments_ctx, _exec_env); - RETURN_IF_ERROR(context->prepare(params)); - { - std::lock_guard lock(_lock); - _pipeline_map.insert(std::make_pair(fragment_instance_id, context)); - _cv.notify_all(); - } - - auto st = context->submit(); - if (!st.ok()) { - // TODO pipeline 如果一个task都没有提交成功,则要让timeout checker线程去移除 - // 提交失败也不能移出,可能有些pipeline task提交成功,有些失败,要等所有task都结束才能移除。 - context->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "submit context fail"); - remove_pipeline_context(context); - return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.get_error_msg(), - BackendOptions::get_localhost()); - } - return Status::OK(); -} void FragmentMgr::remove_pipeline_context( std::shared_ptr f_context) { std::lock_guard lock(_lock); @@ -751,6 +627,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi BackendOptions::get_localhost()); } fragments_ctx = search->second; + _set_scan_concurrency(params, fragments_ctx.get()); } else { // This may be a first fragment request of the query. // Create the query fragments context. @@ -792,10 +669,12 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi fragments_ctx->query_mem_tracker = std::make_shared( MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(fragments_ctx->query_id)), bytes_limit); + fragments_ctx->set_ready_to_execute(false); } else { // EXTERNAL fragments_ctx->query_mem_tracker = std::make_shared( MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}", print_id(fragments_ctx->query_id)), bytes_limit); + fragments_ctx->set_ready_to_execute(false); } if (params.query_options.__isset.is_report_success && params.query_options.is_report_success) { @@ -846,23 +725,45 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi _cv.notify_all(); } - auto st = _thread_pool->submit_func( - [this, exec_state, cb, parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { - OpentelemetryScope scope {parent_span}; - _exec_actual(exec_state, cb); - }); - if (!st.ok()) { + if (params.query_options.__isset.enable_pipeline_engine && + params.query_options.enable_pipeline_engine) { + std::shared_ptr context = + std::make_shared( + fragments_ctx->query_id, fragment_instance_id, params.backend_num, + fragments_ctx, _exec_env); + RETURN_IF_ERROR(context->prepare(params)); { - // Remove the exec state added std::lock_guard lock(_lock); - _fragment_map.erase(params.params.fragment_instance_id); + _pipeline_map.insert(std::make_pair(fragment_instance_id, context)); + _cv.notify_all(); + } + auto st = context->submit(); + if (!st.ok()) { + context->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "submit context fail"); + remove_pipeline_context(context); + return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", + st.get_error_msg(), BackendOptions::get_localhost()); + } + } else { + auto st = _thread_pool->submit_func( + [this, exec_state, cb, + parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { + OpentelemetryScope scope {parent_span}; + _exec_actual(exec_state, cb); + }); + if (!st.ok()) { + { + // Remove the exec state added + std::lock_guard lock(_lock); + _fragment_map.erase(params.params.fragment_instance_id); + } + exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, + "push plan fragment to thread pool failed"); + return Status::InternalError(strings::Substitute( + "push plan fragment $0 to thread pool failed. err = $1, BE: $2", + print_id(params.params.fragment_instance_id), st.get_error_msg(), + BackendOptions::get_localhost())); } - exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, - "push plan fragment to thread pool failed"); - return Status::InternalError( - strings::Substitute("push plan fragment $0 to thread pool failed. err = $1, BE: $2", - print_id(params.params.fragment_instance_id), - st.get_error_msg(), BackendOptions::get_localhost())); } return Status::OK(); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 51a5ea2dd5..aa3217d71c 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -32,7 +32,6 @@ #include "http/rest_monitor_iface.h" #include "runtime_filter_mgr.h" #include "util/countdown_latch.h" -#include "util/hash_util.hpp" #include "util/metrics.h" #include "util/thread.h" @@ -65,13 +64,11 @@ public: using FinishCallback = std::function; FragmentMgr(ExecEnv* exec_env); - virtual ~FragmentMgr(); + ~FragmentMgr() override; // execute one plan fragment Status exec_plan_fragment(const TExecPlanFragmentParams& params); - Status exec_pipeline(const TExecPlanFragmentParams& params); - void remove_pipeline_context( std::shared_ptr pipeline_context); @@ -92,7 +89,7 @@ public: void cancel_worker(); - virtual void debug(std::stringstream& ss); + void debug(std::stringstream& ss) override; // input: TScanOpenParams fragment_instance_id // output: selected_columns diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h index 190b625e3d..37001eadfc 100644 --- a/be/src/vec/exec/vexchange_node.h +++ b/be/src/vec/exec/vexchange_node.h @@ -33,16 +33,16 @@ class VExchangeNode : public ExecNode { public: friend class doris::pipeline::ExchangeSourceOperator; VExchangeNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - virtual ~VExchangeNode() {} + ~VExchangeNode() override = default; - virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; - virtual Status prepare(RuntimeState* state) override; - virtual Status alloc_resource(RuntimeState* state) override; - virtual Status open(RuntimeState* state) override; - virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; - virtual Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override; - virtual void release_resource(RuntimeState* state) override; - virtual Status close(RuntimeState* state) override; + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + Status prepare(RuntimeState* state) override; + Status alloc_resource(RuntimeState* state) override; + Status open(RuntimeState* state) override; + Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override; + void release_resource(RuntimeState* state) override; + Status close(RuntimeState* state) override; // Status collect_query_statistics(QueryStatistics* statistics) override; void set_num_senders(int num_senders) { _num_senders = num_senders; } diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index 151873cbf7..8f7697f409 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -94,6 +94,8 @@ public: size_t get_pending_bytes() const; + const RowDescriptor& row_desc() { return _input_row_desc; } + private: // make input data valid for OLAP table // return number of invalid/filtered rows. diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 6c15ec9796..429c05c6fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1683,6 +1683,9 @@ public class Config extends ConfigBase { @ConfField public static boolean enable_vectorized_load = true; + @ConfField + public static boolean enable_pipeline_load = false; + @ConfField(mutable = false, masterOnly = true) public static int backend_rpc_timeout_ms = 60000; // 1 min diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 3a2974e69c..d61e21e98d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -137,6 +137,7 @@ public class LoadLoadingTask extends LoadTask { curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); curCoordinator.setExecVecEngine(Config.enable_vectorized_load); + curCoordinator.setExecPipEngine(Config.enable_pipeline_load); /* * For broker load job, user only need to set mem limit by 'exec_mem_limit' property. * And the variable 'load_mem_limit' does not make any effect. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java index 568b9442e7..67d195a406 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.ErrorCode; @@ -142,6 +143,7 @@ public class UpdateStmtExecutor { updatePlanner.getFragments(), updatePlanner.getScanNodes(), TimeUtils.DEFAULT_TIME_ZONE, false); coordinator.setQueryType(TQueryType.LOAD); coordinator.setExecVecEngine(VectorizedUtil.isVectorized()); + coordinator.setExecPipEngine(Config.enable_pipeline_load); QeProcessorImpl.INSTANCE.registerQuery(queryId, coordinator); analyzer.getContext().getExecutor().setCoord(coordinator); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index b55bb8f402..cdcf36045e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -250,6 +250,7 @@ public class StreamLoadPlanner { // for stream load, we use exec_mem_limit to limit the memory usage of load channel. queryOptions.setLoadMemLimit(taskInfo.getMemLimit()); queryOptions.setEnableVectorizedEngine(Config.enable_vectorized_load); + queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load); queryOptions.setBeExecVersion(Config.be_exec_version); params.setQueryOptions(queryOptions); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7d32530eb6..a6d416f4d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -319,6 +319,7 @@ public class Coordinator { private void initQueryOptions(ConnectContext context) { this.queryOptions = context.getSessionVariable().toThrift(); this.queryOptions.setEnableVectorizedEngine(VectorizedUtil.isVectorized()); + this.queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load); this.queryOptions.setBeExecVersion(Config.be_exec_version); } @@ -342,6 +343,10 @@ public class Coordinator { this.queryOptions.setEnableVectorizedEngine(vec); } + public void setExecPipEngine(boolean vec) { + this.queryOptions.setEnablePipelineEngine(vec); + } + public Status getExecStatus() { return queryStatus; }