[Pipeline](exec) support async writer in pipelien query engine (#22901)

This commit is contained in:
HappenLee
2023-08-15 17:32:53 +08:00
committed by GitHub
parent 50f66b1246
commit 9b2323b7fd
12 changed files with 258 additions and 47 deletions

View File

@ -67,6 +67,11 @@ public:
return Status::NotSupported("Not support send block");
}
// Send a Block into this sink, not blocked thredd API only use in pipeline exec engine
virtual Status sink(RuntimeState* state, vectorized::Block* block, bool eos = false) {
return send(state, block, eos);
}
[[nodiscard]] virtual Status try_close(RuntimeState* state, Status exec_status) {
return Status::OK();
}

View File

@ -273,12 +273,15 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
auto st = _sink->send(state, in_block, source_state == SourceState::FINISHED);
// TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished
if (st.template is<ErrorCode::END_OF_FILE>()) {
return Status::OK();
if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
auto st = _sink->sink(state, in_block, source_state == SourceState::FINISHED);
// TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished
if (st.template is<ErrorCode::END_OF_FILE>()) {
return Status::OK();
}
return st;
}
return st;
return Status::OK();
}
Status try_close(RuntimeState* state) override {

View File

@ -38,7 +38,7 @@ public:
TableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink)
: DataSinkOperator(operator_builder, sink) {}
bool can_write() override { return true; }
bool can_write() override { return _sink->can_write(); }
};
OperatorPtr TableSinkOperatorBuilder::build_operator() {

View File

@ -139,6 +139,8 @@ public:
void coordinator_callback(const ReportStatusRequest& req);
ThreadPool* get_thread_pool() { return _thread_pool.get(); }
private:
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, const FinishCallback& cb);

View File

@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "async_result_writer.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
namespace doris {
class ObjectPool;
class RowDescriptor;
class TExpr;
namespace vectorized {
Status AsyncResultWriter::sink(RuntimeState* state, Block* block, bool eos) {
auto rows = block->rows();
auto status = Status::OK();
std::unique_ptr<Block> add_block;
if (rows) {
add_block = block->create_same_struct_block(0);
}
std::lock_guard l(_m);
// if io task failed, just return error status to
// end the query
if (_writer_status.ok()) {
return _writer_status;
}
_eos = eos;
if (rows) {
if (!_data_queue.empty() && ((*_data_queue.end())->rows() + rows) <= state->batch_size()) {
RETURN_IF_ERROR(
MutableBlock::build_mutable_block(_data_queue.end()->get()).merge(*block));
} else {
RETURN_IF_ERROR(MutableBlock::build_mutable_block(add_block.get()).merge(*block));
_data_queue.emplace_back(std::move(add_block));
}
} else if (_eos && _data_queue.empty()) {
status = Status::EndOfFile("Run out of sink data");
}
_cv.notify_one();
return status;
}
std::unique_ptr<Block> AsyncResultWriter::get_block_from_queue() {
std::lock_guard l(_m);
DCHECK(!_data_queue.empty());
auto block = std::move(_data_queue.front());
_data_queue.pop_front();
return block;
}
void AsyncResultWriter::start_writer() {
ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
[this]() { this->process_block(); });
}
void AsyncResultWriter::process_block() {
if (!_is_open) {
_writer_status = open();
_is_open = true;
}
if (_writer_status.ok()) {
while (true) {
{
std::unique_lock l(_m);
while (!_eos && _data_queue.empty()) {
_cv.wait(l);
}
}
if (_eos && _data_queue.empty()) {
break;
}
auto status = write(get_block_from_queue());
std::unique_lock l(_m);
_writer_status = status;
if (!status.ok()) {
break;
}
}
}
_writer_thread_closed = true;
}
} // namespace vectorized
} // namespace doris

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <condition_variable>
#include <queue>
#include "runtime/result_writer.h"
namespace doris {
class ObjectPool;
class RowDescriptor;
class RuntimeState;
class TDataSink;
class TExpr;
namespace vectorized {
class Block;
/*
* In the pipeline execution engine, there are usually a large number of io operations on the sink side that
* will block the limited execution threads of the pipeline execution engine, resulting in a sharp performance
* degradation of the pipeline execution engine when there are import tasks.
*
* So all ResultWriter in Sink should use AsyncResultWriter to do the real IO task in thread pool to keep the
* pipeline execution engine performance.
*
* The Sub class of AsyncResultWriter need to impl two virtual function
* * Status open() the first time IO work like: create file/ connect networking
* * Status append_block() do the real IO work for block
*/
class AsyncResultWriter : public ResultWriter {
public:
Status close() override { return Status::OK(); }
Status init(RuntimeState* state) override { return Status::OK(); }
virtual Status open() { return Status::OK(); }
Status write(std::unique_ptr<Block> block) { return append_block(*block); }
bool can_write() {
std::lock_guard l(_m);
return _data_queue.size() < QUEUE_SIZE || !_writer_status.ok() || _eos;
}
[[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; }
void process_block();
// sink the block date to date queue
Status sink(RuntimeState* state, Block* block, bool eos);
std::unique_ptr<Block> get_block_from_queue();
// Add the IO thread task process block() to thread pool to dispose the IO
void start_writer();
private:
static constexpr auto QUEUE_SIZE = 3;
bool _is_open = false;
std::mutex _m;
std::condition_variable _cv;
std::deque<std::unique_ptr<Block>> _data_queue;
Status _writer_status = Status::OK();
bool _eos = false;
bool _writer_thread_closed = false;
};
} // namespace vectorized
} // namespace doris

View File

@ -40,28 +40,24 @@ VMysqlTableSink::VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc
Status VMysqlTableSink::init(const TDataSink& t_sink) {
RETURN_IF_ERROR(VTableSink::init(t_sink));
const TMysqlTableSink& t_mysql_sink = t_sink.mysql_table_sink;
_conn_info.host = t_mysql_sink.host;
_conn_info.port = t_mysql_sink.port;
_conn_info.user = t_mysql_sink.user;
_conn_info.passwd = t_mysql_sink.passwd;
_conn_info.db = t_mysql_sink.db;
_table_name = t_mysql_sink.table;
_conn_info.charset = t_mysql_sink.charset;
// create writer
_writer.reset(new VMysqlTableWriter(t_sink, _output_vexpr_ctxs));
return Status::OK();
}
Status VMysqlTableSink::open(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(VTableSink::open(state));
// create writer
_writer.reset(new VMysqlTableWriter(_output_vexpr_ctxs));
RETURN_IF_ERROR(_writer->open(_conn_info, _table_name));
if (state->enable_pipeline_exec()) {
_writer->start_writer();
} else {
RETURN_IF_ERROR(_writer->open());
}
return Status::OK();
}
Status VMysqlTableSink::send(RuntimeState* state, Block* block, bool eos) {
return _writer->append(block);
return _writer->append_block(*block);
}
Status VMysqlTableSink::close(RuntimeState* state, Status exec_status) {

View File

@ -44,10 +44,15 @@ public:
Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override;
Status sink(RuntimeState* state, vectorized::Block* block, bool eos = false) override {
return _writer->sink(state, block, eos);
}
Status close(RuntimeState* state, Status exec_status) override;
bool is_close_done() override { return !_writer->is_pending_finish(); }
private:
MysqlConnInfo _conn_info;
std::unique_ptr<VMysqlTableWriter> _writer;
};
} // namespace vectorized

View File

@ -17,6 +17,7 @@
#include "vec/sink/vmysql_table_writer.h"
#include <gen_cpp/DataSinks_types.h>
#include <glog/logging.h>
#include <mysql/mysql.h>
#include <stdint.h>
@ -55,12 +56,22 @@ std::string MysqlConnInfo::debug_string() const {
std::stringstream ss;
ss << "(host=" << host << ",port=" << port << ",user=" << user << ",db=" << db
<< ",passwd=" << passwd << ",charset=" << charset << ")";
<< ",table=" << table_name << ",passwd=" << passwd << ",charset=" << charset << ")";
return ss.str();
}
VMysqlTableWriter::VMysqlTableWriter(const VExprContextSPtrs& output_expr_ctxs)
: _vec_output_expr_ctxs(output_expr_ctxs) {}
VMysqlTableWriter::VMysqlTableWriter(const TDataSink& t_sink,
const VExprContextSPtrs& output_expr_ctxs)
: _vec_output_expr_ctxs(output_expr_ctxs) {
const auto& t_mysql_sink = t_sink.mysql_table_sink;
_conn_info.host = t_mysql_sink.host;
_conn_info.port = t_mysql_sink.port;
_conn_info.user = t_mysql_sink.user;
_conn_info.passwd = t_mysql_sink.passwd;
_conn_info.db = t_mysql_sink.db;
_conn_info.table_name = t_mysql_sink.table;
_conn_info.charset = t_mysql_sink.charset;
}
VMysqlTableWriter::~VMysqlTableWriter() {
if (_mysql_conn) {
@ -68,16 +79,17 @@ VMysqlTableWriter::~VMysqlTableWriter() {
}
}
Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string& tbl) {
Status VMysqlTableWriter::open() {
_mysql_conn = mysql_init(nullptr);
if (_mysql_conn == nullptr) {
return Status::InternalError("Call mysql_init failed.");
}
MYSQL* res = mysql_real_connect(_mysql_conn, conn_info.host.c_str(), conn_info.user.c_str(),
conn_info.passwd.c_str(), conn_info.db.c_str(), conn_info.port,
nullptr, // unix socket
0); // flags
MYSQL* res =
mysql_real_connect(_mysql_conn, _conn_info.host.c_str(), _conn_info.user.c_str(),
_conn_info.passwd.c_str(), _conn_info.db.c_str(), _conn_info.port,
nullptr, // unix socket
0); // flags
if (res == nullptr) {
fmt::memory_buffer err_ss;
fmt::format_to(err_ss, "mysql_real_connect failed because : {}.", mysql_error(_mysql_conn));
@ -85,26 +97,24 @@ Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string
}
// set character
if (mysql_set_character_set(_mysql_conn, conn_info.charset.c_str())) {
if (mysql_set_character_set(_mysql_conn, _conn_info.charset.c_str())) {
fmt::memory_buffer err_ss;
fmt::format_to(err_ss, "mysql_set_character_set failed because : {}.",
mysql_error(_mysql_conn));
return Status::InternalError(fmt::to_string(err_ss.data()));
}
_mysql_tbl = tbl;
return Status::OK();
}
Status VMysqlTableWriter::append(vectorized::Block* block) {
Status VMysqlTableWriter::append_block(vectorized::Block& block) {
Status status = Status::OK();
if (block == nullptr || block->rows() == 0) {
if (block.rows() == 0) {
return status;
}
Block output_block;
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
_vec_output_expr_ctxs, *block, &output_block));
_vec_output_expr_ctxs, block, &output_block));
auto num_rows = output_block.rows();
materialize_block_inplace(output_block);
for (int i = 0; i < num_rows; ++i) {
@ -115,7 +125,7 @@ Status VMysqlTableWriter::append(vectorized::Block* block) {
Status VMysqlTableWriter::insert_row(vectorized::Block& block, size_t row) {
_insert_stmt_buffer.clear();
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _mysql_tbl);
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _conn_info.table_name);
int num_columns = _vec_output_expr_ctxs.size();
for (int i = 0; i < num_columns; ++i) {

View File

@ -24,6 +24,7 @@
#include <string>
#include <vector>
#include "async_result_writer.h"
#include "common/status.h"
#include "vec/exprs/vexpr_fwd.h"
@ -35,6 +36,7 @@ struct MysqlConnInfo {
std::string user;
std::string passwd;
std::string db;
std::string table_name;
int port;
std::string charset;
@ -43,27 +45,21 @@ struct MysqlConnInfo {
class Block;
class VMysqlTableWriter {
class VMysqlTableWriter final : public AsyncResultWriter {
public:
VMysqlTableWriter(const VExprContextSPtrs& output_exprs);
~VMysqlTableWriter();
VMysqlTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs);
~VMysqlTableWriter() override;
// connect to mysql server
Status open(const MysqlConnInfo& conn_info, const std::string& tbl);
Status open() override;
Status begin_trans() { return Status::OK(); }
Status append(vectorized::Block* block);
Status abort_tarns() { return Status::OK(); }
Status finish_tarns() { return Status::OK(); }
Status append_block(vectorized::Block& block) override;
private:
Status insert_row(vectorized::Block& block, size_t row);
MysqlConnInfo _conn_info;
const VExprContextSPtrs& _vec_output_expr_ctxs;
fmt::memory_buffer _insert_stmt_buffer;
std::string _mysql_tbl;
MYSQL* _mysql_conn;
};
} // namespace vectorized

View File

@ -54,6 +54,8 @@ public:
const RowDescriptor& row_desc() { return _row_desc; }
virtual bool can_write() { return true; }
protected:
// owned by RuntimeState
ObjectPool* _pool;

View File

@ -322,7 +322,7 @@ public class Coordinator {
this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin;
// Only enable pipeline query engine in query, not load
this.enablePipelineEngine = context.getSessionVariable().getEnablePipelineEngine()
&& (fragments.size() > 0 && fragments.get(0).getSink() instanceof ResultSink);
&& fragments.size() > 0;
initQueryOptions(context);
setFromUserProperty(context);