From 9b2323b7fd6a5ef8e698367cfe6c511baa4b2d19 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Tue, 15 Aug 2023 17:32:53 +0800 Subject: [PATCH] [Pipeline](exec) support async writer in pipelien query engine (#22901) --- be/src/exec/data_sink.h | 5 + be/src/pipeline/exec/operator.h | 13 ++- be/src/pipeline/exec/table_sink_operator.h | 2 +- be/src/runtime/fragment_mgr.h | 2 + be/src/vec/sink/async_result_writer.cpp | 108 ++++++++++++++++++ be/src/vec/sink/async_result_writer.h | 84 ++++++++++++++ be/src/vec/sink/vmysql_table_sink.cpp | 20 ++-- be/src/vec/sink/vmysql_table_sink.h | 7 +- be/src/vec/sink/vmysql_table_writer.cpp | 40 ++++--- be/src/vec/sink/vmysql_table_writer.h | 20 ++-- be/src/vec/sink/vtable_sink.h | 2 + .../java/org/apache/doris/qe/Coordinator.java | 2 +- 12 files changed, 258 insertions(+), 47 deletions(-) create mode 100644 be/src/vec/sink/async_result_writer.cpp create mode 100644 be/src/vec/sink/async_result_writer.h diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index fd59cd1d27..26fb535bf2 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -67,6 +67,11 @@ public: return Status::NotSupported("Not support send block"); } + // Send a Block into this sink, not blocked thredd API only use in pipeline exec engine + virtual Status sink(RuntimeState* state, vectorized::Block* block, bool eos = false) { + return send(state, block, eos); + } + [[nodiscard]] virtual Status try_close(RuntimeState* state, Status exec_status) { return Status::OK(); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 55335c093a..8a7dc565a6 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -273,12 +273,15 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { - auto st = _sink->send(state, in_block, source_state == SourceState::FINISHED); - // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished - if (st.template is()) { - return Status::OK(); + if (in_block->rows() > 0 || source_state == SourceState::FINISHED) { + auto st = _sink->sink(state, in_block, source_state == SourceState::FINISHED); + // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished + if (st.template is()) { + return Status::OK(); + } + return st; } - return st; + return Status::OK(); } Status try_close(RuntimeState* state) override { diff --git a/be/src/pipeline/exec/table_sink_operator.h b/be/src/pipeline/exec/table_sink_operator.h index cbad6d6472..054a511139 100644 --- a/be/src/pipeline/exec/table_sink_operator.h +++ b/be/src/pipeline/exec/table_sink_operator.h @@ -38,7 +38,7 @@ public: TableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) : DataSinkOperator(operator_builder, sink) {} - bool can_write() override { return true; } + bool can_write() override { return _sink->can_write(); } }; OperatorPtr TableSinkOperatorBuilder::build_operator() { diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 8ca58ccffa..9820cf9045 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -139,6 +139,8 @@ public: void coordinator_callback(const ReportStatusRequest& req); + ThreadPool* get_thread_pool() { return _thread_pool.get(); } + private: void _exec_actual(std::shared_ptr exec_state, const FinishCallback& cb); diff --git a/be/src/vec/sink/async_result_writer.cpp b/be/src/vec/sink/async_result_writer.cpp new file mode 100644 index 0000000000..81c8bdbfdd --- /dev/null +++ b/be/src/vec/sink/async_result_writer.cpp @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "async_result_writer.h" + +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" +#include "runtime/runtime_state.h" +#include "vec/core/block.h" + +namespace doris { +class ObjectPool; +class RowDescriptor; +class TExpr; + +namespace vectorized { + +Status AsyncResultWriter::sink(RuntimeState* state, Block* block, bool eos) { + auto rows = block->rows(); + auto status = Status::OK(); + std::unique_ptr add_block; + if (rows) { + add_block = block->create_same_struct_block(0); + } + + std::lock_guard l(_m); + // if io task failed, just return error status to + // end the query + if (_writer_status.ok()) { + return _writer_status; + } + + _eos = eos; + if (rows) { + if (!_data_queue.empty() && ((*_data_queue.end())->rows() + rows) <= state->batch_size()) { + RETURN_IF_ERROR( + MutableBlock::build_mutable_block(_data_queue.end()->get()).merge(*block)); + } else { + RETURN_IF_ERROR(MutableBlock::build_mutable_block(add_block.get()).merge(*block)); + _data_queue.emplace_back(std::move(add_block)); + } + } else if (_eos && _data_queue.empty()) { + status = Status::EndOfFile("Run out of sink data"); + } + + _cv.notify_one(); + return status; +} + +std::unique_ptr AsyncResultWriter::get_block_from_queue() { + std::lock_guard l(_m); + DCHECK(!_data_queue.empty()); + auto block = std::move(_data_queue.front()); + _data_queue.pop_front(); + return block; +} + +void AsyncResultWriter::start_writer() { + ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( + [this]() { this->process_block(); }); +} + +void AsyncResultWriter::process_block() { + if (!_is_open) { + _writer_status = open(); + _is_open = true; + } + + if (_writer_status.ok()) { + while (true) { + { + std::unique_lock l(_m); + while (!_eos && _data_queue.empty()) { + _cv.wait(l); + } + } + + if (_eos && _data_queue.empty()) { + break; + } + + auto status = write(get_block_from_queue()); + std::unique_lock l(_m); + _writer_status = status; + if (!status.ok()) { + break; + } + } + } + _writer_thread_closed = true; +} + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/async_result_writer.h b/be/src/vec/sink/async_result_writer.h new file mode 100644 index 0000000000..7f471a6975 --- /dev/null +++ b/be/src/vec/sink/async_result_writer.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include +#include + +#include "runtime/result_writer.h" + +namespace doris { +class ObjectPool; +class RowDescriptor; +class RuntimeState; +class TDataSink; +class TExpr; + +namespace vectorized { +class Block; +/* + * In the pipeline execution engine, there are usually a large number of io operations on the sink side that + * will block the limited execution threads of the pipeline execution engine, resulting in a sharp performance + * degradation of the pipeline execution engine when there are import tasks. + * + * So all ResultWriter in Sink should use AsyncResultWriter to do the real IO task in thread pool to keep the + * pipeline execution engine performance. + * + * The Sub class of AsyncResultWriter need to impl two virtual function + * * Status open() the first time IO work like: create file/ connect networking + * * Status append_block() do the real IO work for block + */ +class AsyncResultWriter : public ResultWriter { +public: + Status close() override { return Status::OK(); } + + Status init(RuntimeState* state) override { return Status::OK(); } + + virtual Status open() { return Status::OK(); } + + Status write(std::unique_ptr block) { return append_block(*block); } + + bool can_write() { + std::lock_guard l(_m); + return _data_queue.size() < QUEUE_SIZE || !_writer_status.ok() || _eos; + } + + [[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; } + + void process_block(); + + // sink the block date to date queue + Status sink(RuntimeState* state, Block* block, bool eos); + + std::unique_ptr get_block_from_queue(); + + // Add the IO thread task process block() to thread pool to dispose the IO + void start_writer(); + +private: + static constexpr auto QUEUE_SIZE = 3; + bool _is_open = false; + std::mutex _m; + std::condition_variable _cv; + std::deque> _data_queue; + Status _writer_status = Status::OK(); + bool _eos = false; + bool _writer_thread_closed = false; +}; + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/vmysql_table_sink.cpp b/be/src/vec/sink/vmysql_table_sink.cpp index ee1c015c54..0199299295 100644 --- a/be/src/vec/sink/vmysql_table_sink.cpp +++ b/be/src/vec/sink/vmysql_table_sink.cpp @@ -40,28 +40,24 @@ VMysqlTableSink::VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc Status VMysqlTableSink::init(const TDataSink& t_sink) { RETURN_IF_ERROR(VTableSink::init(t_sink)); - const TMysqlTableSink& t_mysql_sink = t_sink.mysql_table_sink; - _conn_info.host = t_mysql_sink.host; - _conn_info.port = t_mysql_sink.port; - _conn_info.user = t_mysql_sink.user; - _conn_info.passwd = t_mysql_sink.passwd; - _conn_info.db = t_mysql_sink.db; - _table_name = t_mysql_sink.table; - _conn_info.charset = t_mysql_sink.charset; + // create writer + _writer.reset(new VMysqlTableWriter(t_sink, _output_vexpr_ctxs)); return Status::OK(); } Status VMysqlTableSink::open(RuntimeState* state) { // Prepare the exprs to run. RETURN_IF_ERROR(VTableSink::open(state)); - // create writer - _writer.reset(new VMysqlTableWriter(_output_vexpr_ctxs)); - RETURN_IF_ERROR(_writer->open(_conn_info, _table_name)); + if (state->enable_pipeline_exec()) { + _writer->start_writer(); + } else { + RETURN_IF_ERROR(_writer->open()); + } return Status::OK(); } Status VMysqlTableSink::send(RuntimeState* state, Block* block, bool eos) { - return _writer->append(block); + return _writer->append_block(*block); } Status VMysqlTableSink::close(RuntimeState* state, Status exec_status) { diff --git a/be/src/vec/sink/vmysql_table_sink.h b/be/src/vec/sink/vmysql_table_sink.h index 00ce4346fe..1fe60e2d84 100644 --- a/be/src/vec/sink/vmysql_table_sink.h +++ b/be/src/vec/sink/vmysql_table_sink.h @@ -44,10 +44,15 @@ public: Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override; + Status sink(RuntimeState* state, vectorized::Block* block, bool eos = false) override { + return _writer->sink(state, block, eos); + } + Status close(RuntimeState* state, Status exec_status) override; + bool is_close_done() override { return !_writer->is_pending_finish(); } + private: - MysqlConnInfo _conn_info; std::unique_ptr _writer; }; } // namespace vectorized diff --git a/be/src/vec/sink/vmysql_table_writer.cpp b/be/src/vec/sink/vmysql_table_writer.cpp index af1f920e4a..98682e9e0c 100644 --- a/be/src/vec/sink/vmysql_table_writer.cpp +++ b/be/src/vec/sink/vmysql_table_writer.cpp @@ -17,6 +17,7 @@ #include "vec/sink/vmysql_table_writer.h" +#include #include #include #include @@ -55,12 +56,22 @@ std::string MysqlConnInfo::debug_string() const { std::stringstream ss; ss << "(host=" << host << ",port=" << port << ",user=" << user << ",db=" << db - << ",passwd=" << passwd << ",charset=" << charset << ")"; + << ",table=" << table_name << ",passwd=" << passwd << ",charset=" << charset << ")"; return ss.str(); } -VMysqlTableWriter::VMysqlTableWriter(const VExprContextSPtrs& output_expr_ctxs) - : _vec_output_expr_ctxs(output_expr_ctxs) {} +VMysqlTableWriter::VMysqlTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_expr_ctxs) + : _vec_output_expr_ctxs(output_expr_ctxs) { + const auto& t_mysql_sink = t_sink.mysql_table_sink; + _conn_info.host = t_mysql_sink.host; + _conn_info.port = t_mysql_sink.port; + _conn_info.user = t_mysql_sink.user; + _conn_info.passwd = t_mysql_sink.passwd; + _conn_info.db = t_mysql_sink.db; + _conn_info.table_name = t_mysql_sink.table; + _conn_info.charset = t_mysql_sink.charset; +} VMysqlTableWriter::~VMysqlTableWriter() { if (_mysql_conn) { @@ -68,16 +79,17 @@ VMysqlTableWriter::~VMysqlTableWriter() { } } -Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string& tbl) { +Status VMysqlTableWriter::open() { _mysql_conn = mysql_init(nullptr); if (_mysql_conn == nullptr) { return Status::InternalError("Call mysql_init failed."); } - MYSQL* res = mysql_real_connect(_mysql_conn, conn_info.host.c_str(), conn_info.user.c_str(), - conn_info.passwd.c_str(), conn_info.db.c_str(), conn_info.port, - nullptr, // unix socket - 0); // flags + MYSQL* res = + mysql_real_connect(_mysql_conn, _conn_info.host.c_str(), _conn_info.user.c_str(), + _conn_info.passwd.c_str(), _conn_info.db.c_str(), _conn_info.port, + nullptr, // unix socket + 0); // flags if (res == nullptr) { fmt::memory_buffer err_ss; fmt::format_to(err_ss, "mysql_real_connect failed because : {}.", mysql_error(_mysql_conn)); @@ -85,26 +97,24 @@ Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string } // set character - if (mysql_set_character_set(_mysql_conn, conn_info.charset.c_str())) { + if (mysql_set_character_set(_mysql_conn, _conn_info.charset.c_str())) { fmt::memory_buffer err_ss; fmt::format_to(err_ss, "mysql_set_character_set failed because : {}.", mysql_error(_mysql_conn)); return Status::InternalError(fmt::to_string(err_ss.data())); } - _mysql_tbl = tbl; - return Status::OK(); } -Status VMysqlTableWriter::append(vectorized::Block* block) { +Status VMysqlTableWriter::append_block(vectorized::Block& block) { Status status = Status::OK(); - if (block == nullptr || block->rows() == 0) { + if (block.rows() == 0) { return status; } Block output_block; RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( - _vec_output_expr_ctxs, *block, &output_block)); + _vec_output_expr_ctxs, block, &output_block)); auto num_rows = output_block.rows(); materialize_block_inplace(output_block); for (int i = 0; i < num_rows; ++i) { @@ -115,7 +125,7 @@ Status VMysqlTableWriter::append(vectorized::Block* block) { Status VMysqlTableWriter::insert_row(vectorized::Block& block, size_t row) { _insert_stmt_buffer.clear(); - fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _mysql_tbl); + fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _conn_info.table_name); int num_columns = _vec_output_expr_ctxs.size(); for (int i = 0; i < num_columns; ++i) { diff --git a/be/src/vec/sink/vmysql_table_writer.h b/be/src/vec/sink/vmysql_table_writer.h index 51f62a4db5..62de5e8dbf 100644 --- a/be/src/vec/sink/vmysql_table_writer.h +++ b/be/src/vec/sink/vmysql_table_writer.h @@ -24,6 +24,7 @@ #include #include +#include "async_result_writer.h" #include "common/status.h" #include "vec/exprs/vexpr_fwd.h" @@ -35,6 +36,7 @@ struct MysqlConnInfo { std::string user; std::string passwd; std::string db; + std::string table_name; int port; std::string charset; @@ -43,27 +45,21 @@ struct MysqlConnInfo { class Block; -class VMysqlTableWriter { +class VMysqlTableWriter final : public AsyncResultWriter { public: - VMysqlTableWriter(const VExprContextSPtrs& output_exprs); - ~VMysqlTableWriter(); + VMysqlTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + ~VMysqlTableWriter() override; // connect to mysql server - Status open(const MysqlConnInfo& conn_info, const std::string& tbl); + Status open() override; - Status begin_trans() { return Status::OK(); } - - Status append(vectorized::Block* block); - - Status abort_tarns() { return Status::OK(); } - - Status finish_tarns() { return Status::OK(); } + Status append_block(vectorized::Block& block) override; private: Status insert_row(vectorized::Block& block, size_t row); + MysqlConnInfo _conn_info; const VExprContextSPtrs& _vec_output_expr_ctxs; fmt::memory_buffer _insert_stmt_buffer; - std::string _mysql_tbl; MYSQL* _mysql_conn; }; } // namespace vectorized diff --git a/be/src/vec/sink/vtable_sink.h b/be/src/vec/sink/vtable_sink.h index 0c45d567f3..247d8072b7 100644 --- a/be/src/vec/sink/vtable_sink.h +++ b/be/src/vec/sink/vtable_sink.h @@ -54,6 +54,8 @@ public: const RowDescriptor& row_desc() { return _row_desc; } + virtual bool can_write() { return true; } + protected: // owned by RuntimeState ObjectPool* _pool; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 6edc8d6744..32829d2db5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -322,7 +322,7 @@ public class Coordinator { this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin; // Only enable pipeline query engine in query, not load this.enablePipelineEngine = context.getSessionVariable().getEnablePipelineEngine() - && (fragments.size() > 0 && fragments.get(0).getSink() instanceof ResultSink); + && fragments.size() > 0; initQueryOptions(context); setFromUserProperty(context);