[Pipeline](sink) support olap table sink operator (#14872)

* support olap table sink operator

* update config
This commit is contained in:
Pxl
2022-12-07 15:29:56 +08:00
committed by GitHub
parent cdbbf1e4ee
commit 48a9166aa4
16 changed files with 150 additions and 190 deletions

View File

@ -18,13 +18,10 @@
#pragma once
#include "operator.h"
#include "vec/exec/vempty_set_node.h"
namespace doris {
namespace vectorized {
class VEmptySetNode;
}
namespace pipeline {
class EmptySetSourceOperatorBuilder final : public OperatorBuilder<vectorized::VEmptySetNode> {

View File

@ -26,7 +26,6 @@
#include "pipeline/pipeline_fragment_context.h"
#include "service/brpc.h"
#include "util/proto_util.h"
#include "util/time.h"
#include "vec/sink/vdata_stream_sender.h"
namespace doris::pipeline {
@ -64,7 +63,6 @@ public:
}
}
public:
brpc::Controller cntl;
T result;

View File

@ -23,9 +23,10 @@
#include <queue>
#include <shared_mutex>
#include "common/global_types.h"
#include "common/status.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "runtime/runtime_state.h"
namespace doris {
namespace vectorized {
@ -74,7 +75,6 @@ private:
PipelineFragmentContext* _context;
private:
Status _send_rpc(InstanceLoId);
// must hold the _instance_to_package_queue_mutex[id] mutex to opera
void _construct_request(InstanceLoId id);

View File

@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "operator.h"
#include "vec/sink/vtablet_sink.h"
namespace doris {
namespace pipeline {
class OlapTableSinkOperatorBuilder final
: public DataSinkOperatorBuilder<stream_load::VOlapTableSink> {
public:
OlapTableSinkOperatorBuilder(int32_t id, DataSink* sink)
: DataSinkOperatorBuilder(id, "OlapTableSinkOperator", sink) {};
OperatorPtr build_operator() override;
};
class OlapTableSinkOperator final : public DataSinkOperator<OlapTableSinkOperatorBuilder> {
public:
OlapTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink)
: DataSinkOperator(operator_builder, sink) {};
bool can_write() override { return true; } // TODO: need use mem_limit
};
OperatorPtr OlapTableSinkOperatorBuilder::build_operator() {
return std::make_shared<OlapTableSinkOperator>(this, _sink);
}
} // namespace pipeline
} // namespace doris

View File

@ -98,7 +98,7 @@ public:
OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr)
: OperatorBuilderBase(id, name), _node(reinterpret_cast<NodeType*>(exec_node)) {}
virtual ~OperatorBuilder() = default;
~OperatorBuilder() override = default;
const RowDescriptor& row_desc() override { return _node->row_desc(); }
@ -114,7 +114,7 @@ public:
DataSinkOperatorBuilder(int32_t id, const std::string& name, DataSink* sink = nullptr)
: OperatorBuilderBase(id, name), _sink(reinterpret_cast<SinkType*>(sink)) {}
virtual ~DataSinkOperatorBuilder() = default;
~DataSinkOperatorBuilder() override = default;
bool is_sink() const override { return true; }
@ -232,9 +232,9 @@ public:
DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
: OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {};
virtual ~DataSinkOperator() = default;
~DataSinkOperator() override = default;
virtual Status prepare(RuntimeState* state) override {
Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(_sink->prepare(state));
_runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
_sink->profile()->insert_child_head(_runtime_profile.get(), true);
@ -243,13 +243,13 @@ public:
return Status::OK();
}
virtual Status open(RuntimeState* state) override {
Status open(RuntimeState* state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
return _sink->open(state);
}
virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
if (!UNLIKELY(in_block)) {
DCHECK(source_state == SourceState::FINISHED)
@ -259,12 +259,12 @@ public:
return _sink->send(state, in_block, source_state == SourceState::FINISHED);
}
virtual Status close(RuntimeState* state) override {
Status close(RuntimeState* state) override {
_fresh_exec_timer(_sink);
return _sink->close(state, Status::OK());
}
virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
Status finalize(RuntimeState* state) override { return Status::OK(); }
protected:
void _fresh_exec_timer(NodeType* node) {
@ -284,9 +284,9 @@ public:
Operator(OperatorBuilderBase* builder, ExecNode* node)
: OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {};
virtual ~Operator() = default;
~Operator() override = default;
virtual Status prepare(RuntimeState* state) override {
Status prepare(RuntimeState* state) override {
_runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name()));
_node->runtime_profile()->insert_child_head(_runtime_profile.get(), true);
_mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(),
@ -295,19 +295,19 @@ public:
return Status::OK();
}
virtual Status open(RuntimeState* state) override {
Status open(RuntimeState* state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(_node->alloc_resource(state));
return Status::OK();
}
virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
return _node->sink(state, in_block, source_state == SourceState::FINISHED);
}
virtual Status close(RuntimeState* state) override {
Status close(RuntimeState* state) override {
_fresh_exec_timer(_node);
if (!_node->decrease_ref()) {
_node->release_resource(state);
@ -315,8 +315,8 @@ public:
return Status::OK();
}
virtual Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override {
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override {
SCOPED_TIMER(_runtime_profile->total_time_counter());
bool eos = false;
RETURN_IF_ERROR(_node->pull(state, block, &eos));
@ -324,9 +324,9 @@ public:
return Status::OK();
}
virtual Status finalize(RuntimeState* state) override { return Status::OK(); }
Status finalize(RuntimeState* state) override { return Status::OK(); }
virtual bool can_read() override { return _node->can_read(); }
bool can_read() override { return _node->can_read(); }
protected:
void _fresh_exec_timer(NodeType* node) {
@ -350,8 +350,8 @@ public:
virtual ~DataStateOperator() = default;
virtual Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override {
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override {
auto& node = Operator<OperatorBuilderType>::_node;
auto& child = Operator<OperatorBuilderType>::_child;

View File

@ -44,12 +44,12 @@ Status StreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in
SCOPED_TIMER(_runtime_profile->total_time_counter());
Status ret = Status::OK();
if (in_block && in_block->rows() > 0) {
auto bock_from_ctx = _agg_context->get_free_block();
RETURN_IF_ERROR(_node->do_pre_agg(in_block, bock_from_ctx.get()));
if (bock_from_ctx->rows() == 0) {
_agg_context->return_free_block(std::move(bock_from_ctx));
auto block_from_ctx = _agg_context->get_free_block();
RETURN_IF_ERROR(_node->do_pre_agg(in_block, block_from_ctx.get()));
if (block_from_ctx->rows() == 0) {
_agg_context->return_free_block(std::move(block_from_ctx));
} else {
_agg_context->push_block(std::move(bock_from_ctx));
_agg_context->push_block(std::move(block_from_ctx));
}
}

View File

@ -37,6 +37,7 @@
#include "exec/streaming_aggregation_source_operator.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "pipeline/exec/olap_table_sink_operator.h"
#include "pipeline/exec/table_function_operator.h"
#include "pipeline_task.h"
#include "runtime/client_cache.h"
@ -48,12 +49,10 @@
#include "vec/exec/scan/new_olap_scan_node.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/exec/vaggregation_node.h"
#include "vec/exec/vempty_set_node.h"
#include "vec/exec/vexchange_node.h"
#include "vec/exec/vrepeat_node.h"
#include "vec/exec/vsort_node.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vresult_sink.h"
using apache::thrift::transport::TTransportException;
@ -389,6 +388,11 @@ Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) {
_sink.get());
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
sink_ = std::make_shared<OlapTableSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get());
break;
}
default:
return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
}

View File

@ -46,7 +46,6 @@
#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
#include "util/telemetry/telemetry.h"
@ -428,10 +427,7 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil
}
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
: _exec_env(exec_env),
_fragment_map(),
_fragments_ctx_map(),
_stop_background_threads_latch(1) {
: _exec_env(exec_env), _stop_background_threads_latch(1) {
_entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
REGISTER_HOOK_METRIC(plan_fragment_count, [this]() { return _fragment_map.size(); });
@ -549,13 +545,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
set_pipe(params.params.fragment_instance_id, pipe);
return Status::OK();
} else {
if (params.query_options.__isset.enable_pipeline_engine &&
params.query_options.enable_pipeline_engine) {
return exec_pipeline(params);
} else {
return exec_plan_fragment(params,
std::bind<void>(&empty_function, std::placeholders::_1));
}
return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1));
}
}
@ -598,120 +588,6 @@ std::shared_ptr<StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId& fragment_
}
}
Status FragmentMgr::exec_pipeline(const TExecPlanFragmentParams& params) {
auto tracer = telemetry::is_current_span_valid() ? telemetry::get_tracer("tracer")
: telemetry::get_noop_tracer();
START_AND_SCOPE_SPAN(tracer, span, "FragmentMgr::exec_plan_fragment");
const TUniqueId& fragment_instance_id = params.params.fragment_instance_id;
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _pipeline_map.find(fragment_instance_id);
if (iter != _pipeline_map.end()) {
// Duplicated
return Status::OK();
}
}
std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
if (params.is_simplified_param) {
// Get common components from _fragments_ctx_map
std::lock_guard<std::mutex> lock(_lock);
auto search = _fragments_ctx_map.find(params.params.query_id);
if (search == _fragments_ctx_map.end()) {
return Status::InternalError(
"Failed to get query fragments context. Query may be "
"timeout or be cancelled. host: {}",
BackendOptions::get_localhost());
}
fragments_ctx = search->second;
_set_scan_concurrency(params, fragments_ctx.get());
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
fragments_ctx = std::make_shared<QueryFragmentsCtx>(params.fragment_num_on_host, _exec_env);
fragments_ctx->query_id = params.params.query_id;
RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl,
&(fragments_ctx->desc_tbl)));
fragments_ctx->coord_addr = params.coord;
LOG(INFO) << "query_id: "
<< UniqueId(fragments_ctx->query_id.hi, fragments_ctx->query_id.lo)
<< " coord_addr " << fragments_ctx->coord_addr;
fragments_ctx->query_globals = params.query_globals;
if (params.__isset.resource_info) {
fragments_ctx->user = params.resource_info.user;
fragments_ctx->group = params.resource_info.group;
fragments_ctx->set_rsc_info = true;
}
fragments_ctx->timeout_second = params.query_options.query_timeout;
_set_scan_concurrency(params, fragments_ctx.get());
bool has_query_mem_tracker =
params.query_options.__isset.mem_limit && (params.query_options.mem_limit > 0);
int64_t bytes_limit = has_query_mem_tracker ? params.query_options.mem_limit : -1;
if (bytes_limit > MemInfo::mem_limit()) {
VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES)
<< " exceeds process memory limit of "
<< PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
<< ". Using process memory limit instead";
bytes_limit = MemInfo::mem_limit();
}
if (params.query_options.query_type == TQueryType::SELECT) {
fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::QUERY,
fmt::format("Query#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
} else if (params.query_options.query_type == TQueryType::LOAD) {
fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
fmt::format("Load#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
}
if (params.query_options.__isset.is_report_success &&
params.query_options.is_report_success) {
fragments_ctx->query_mem_tracker->enable_print_log_usage();
}
if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
// 马上运行pipeline task
fragments_ctx->set_ready_to_execute_only();
}
{
// Find _fragments_ctx_map again, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
auto search = _fragments_ctx_map.find(params.params.query_id);
if (search == _fragments_ctx_map.end()) {
_fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx));
} else {
// Already has a query fragmentscontext, use it
fragments_ctx = search->second;
}
}
}
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
fragments_ctx->query_id, fragment_instance_id, params.backend_num,
fragments_ctx, _exec_env);
RETURN_IF_ERROR(context->prepare(params));
{
std::lock_guard<std::mutex> lock(_lock);
_pipeline_map.insert(std::make_pair(fragment_instance_id, context));
_cv.notify_all();
}
auto st = context->submit();
if (!st.ok()) {
// TODO pipeline 如果一个task都没有提交成功,则要让timeout checker线程去移除
// 提交失败也不能移出,可能有些pipeline task提交成功,有些失败,要等所有task都结束才能移除。
context->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "submit context fail");
remove_pipeline_context(context);
return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.get_error_msg(),
BackendOptions::get_localhost());
}
return Status::OK();
}
void FragmentMgr::remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
std::lock_guard<std::mutex> lock(_lock);
@ -751,6 +627,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
BackendOptions::get_localhost());
}
fragments_ctx = search->second;
_set_scan_concurrency(params, fragments_ctx.get());
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
@ -792,10 +669,12 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
fmt::format("Load#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
fragments_ctx->set_ready_to_execute(false);
} else { // EXTERNAL
fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
fmt::format("External#Id={}", print_id(fragments_ctx->query_id)), bytes_limit);
fragments_ctx->set_ready_to_execute(false);
}
if (params.query_options.__isset.is_report_success &&
params.query_options.is_report_success) {
@ -846,23 +725,45 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
_cv.notify_all();
}
auto st = _thread_pool->submit_func(
[this, exec_state, cb, parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
OpentelemetryScope scope {parent_span};
_exec_actual(exec_state, cb);
});
if (!st.ok()) {
if (params.query_options.__isset.enable_pipeline_engine &&
params.query_options.enable_pipeline_engine) {
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
fragments_ctx->query_id, fragment_instance_id, params.backend_num,
fragments_ctx, _exec_env);
RETURN_IF_ERROR(context->prepare(params));
{
// Remove the exec state added
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(params.params.fragment_instance_id);
_pipeline_map.insert(std::make_pair(fragment_instance_id, context));
_cv.notify_all();
}
auto st = context->submit();
if (!st.ok()) {
context->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "submit context fail");
remove_pipeline_context(context);
return Status::InternalError("Submit pipeline failed. err = {}, BE: {}",
st.get_error_msg(), BackendOptions::get_localhost());
}
} else {
auto st = _thread_pool->submit_func(
[this, exec_state, cb,
parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
OpentelemetryScope scope {parent_span};
_exec_actual(exec_state, cb);
});
if (!st.ok()) {
{
// Remove the exec state added
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(params.params.fragment_instance_id);
}
exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
"push plan fragment to thread pool failed");
return Status::InternalError(strings::Substitute(
"push plan fragment $0 to thread pool failed. err = $1, BE: $2",
print_id(params.params.fragment_instance_id), st.get_error_msg(),
BackendOptions::get_localhost()));
}
exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
"push plan fragment to thread pool failed");
return Status::InternalError(
strings::Substitute("push plan fragment $0 to thread pool failed. err = $1, BE: $2",
print_id(params.params.fragment_instance_id),
st.get_error_msg(), BackendOptions::get_localhost()));
}
return Status::OK();

View File

@ -32,7 +32,6 @@
#include "http/rest_monitor_iface.h"
#include "runtime_filter_mgr.h"
#include "util/countdown_latch.h"
#include "util/hash_util.hpp"
#include "util/metrics.h"
#include "util/thread.h"
@ -65,13 +64,11 @@ public:
using FinishCallback = std::function<void(PlanFragmentExecutor*)>;
FragmentMgr(ExecEnv* exec_env);
virtual ~FragmentMgr();
~FragmentMgr() override;
// execute one plan fragment
Status exec_plan_fragment(const TExecPlanFragmentParams& params);
Status exec_pipeline(const TExecPlanFragmentParams& params);
void remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_context);
@ -92,7 +89,7 @@ public:
void cancel_worker();
virtual void debug(std::stringstream& ss);
void debug(std::stringstream& ss) override;
// input: TScanOpenParams fragment_instance_id
// output: selected_columns

View File

@ -33,16 +33,16 @@ class VExchangeNode : public ExecNode {
public:
friend class doris::pipeline::ExchangeSourceOperator;
VExchangeNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
virtual ~VExchangeNode() {}
~VExchangeNode() override = default;
virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
virtual Status prepare(RuntimeState* state) override;
virtual Status alloc_resource(RuntimeState* state) override;
virtual Status open(RuntimeState* state) override;
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
virtual Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override;
virtual void release_resource(RuntimeState* state) override;
virtual Status close(RuntimeState* state) override;
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
Status prepare(RuntimeState* state) override;
Status alloc_resource(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override;
void release_resource(RuntimeState* state) override;
Status close(RuntimeState* state) override;
// Status collect_query_statistics(QueryStatistics* statistics) override;
void set_num_senders(int num_senders) { _num_senders = num_senders; }

View File

@ -94,6 +94,8 @@ public:
size_t get_pending_bytes() const;
const RowDescriptor& row_desc() { return _input_row_desc; }
private:
// make input data valid for OLAP table
// return number of invalid/filtered rows.

View File

@ -1683,6 +1683,9 @@ public class Config extends ConfigBase {
@ConfField
public static boolean enable_vectorized_load = true;
@ConfField
public static boolean enable_pipeline_load = false;
@ConfField(mutable = false, masterOnly = true)
public static int backend_rpc_timeout_ms = 60000; // 1 min

View File

@ -137,6 +137,7 @@ public class LoadLoadingTask extends LoadTask {
curCoordinator.setQueryType(TQueryType.LOAD);
curCoordinator.setExecMemoryLimit(execMemLimit);
curCoordinator.setExecVecEngine(Config.enable_vectorized_load);
curCoordinator.setExecPipEngine(Config.enable_pipeline_load);
/*
* For broker load job, user only need to set mem limit by 'exec_mem_limit' property.
* And the variable 'load_mem_limit' does not make any effect.

View File

@ -24,6 +24,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.ErrorCode;
@ -142,6 +143,7 @@ public class UpdateStmtExecutor {
updatePlanner.getFragments(), updatePlanner.getScanNodes(), TimeUtils.DEFAULT_TIME_ZONE, false);
coordinator.setQueryType(TQueryType.LOAD);
coordinator.setExecVecEngine(VectorizedUtil.isVectorized());
coordinator.setExecPipEngine(Config.enable_pipeline_load);
QeProcessorImpl.INSTANCE.registerQuery(queryId, coordinator);
analyzer.getContext().getExecutor().setCoord(coordinator);

View File

@ -250,6 +250,7 @@ public class StreamLoadPlanner {
// for stream load, we use exec_mem_limit to limit the memory usage of load channel.
queryOptions.setLoadMemLimit(taskInfo.getMemLimit());
queryOptions.setEnableVectorizedEngine(Config.enable_vectorized_load);
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
params.setQueryOptions(queryOptions);

View File

@ -319,6 +319,7 @@ public class Coordinator {
private void initQueryOptions(ConnectContext context) {
this.queryOptions = context.getSessionVariable().toThrift();
this.queryOptions.setEnableVectorizedEngine(VectorizedUtil.isVectorized());
this.queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
this.queryOptions.setBeExecVersion(Config.be_exec_version);
}
@ -342,6 +343,10 @@ public class Coordinator {
this.queryOptions.setEnableVectorizedEngine(vec);
}
public void setExecPipEngine(boolean vec) {
this.queryOptions.setEnablePipelineEngine(vec);
}
public Status getExecStatus() {
return queryStatus;
}