[pipelineX](sink) Support async writer (#24793)

This commit is contained in:
Gabriel
2023-09-22 23:04:23 +08:00
committed by GitHub
parent 8ca300f70b
commit 3d786c58de
26 changed files with 578 additions and 115 deletions

View File

@ -928,7 +928,7 @@ Status AggSinkOperatorX<LocalStateType>::sink(doris::RuntimeState* state,
}
template <typename DependencyType, typename Derived>
Status AggSinkLocalState<DependencyType, Derived>::close(RuntimeState* state) {
Status AggSinkLocalState<DependencyType, Derived>::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(Base::profile()->total_time_counter());
SCOPED_TIMER(Base::_close_timer);
if (Base::_closed) {
@ -943,7 +943,7 @@ Status AggSinkLocalState<DependencyType, Derived>::close(RuntimeState* state) {
std::vector<size_t> tmp_hash_values;
_hash_values.swap(tmp_hash_values);
return Base::close(state);
return Base::close(state, exec_status);
}
class StreamingAggSinkLocalState;

View File

@ -56,7 +56,7 @@ public:
virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
virtual Status close(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Status try_spill_disk(bool eos = false);

View File

@ -248,7 +248,7 @@ Status DistinctStreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::
return Status::OK();
}
Status DistinctStreamingAggSinkLocalState::close(RuntimeState* state) {
Status DistinctStreamingAggSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
@ -256,7 +256,7 @@ Status DistinctStreamingAggSinkLocalState::close(RuntimeState* state) {
// finish should be set, if not set here means error.
_shared_state->data_queue->set_canceled();
}
return Base::close(state);
return Base::close(state, exec_status);
}
} // namespace doris::pipeline

View File

@ -90,7 +90,7 @@ public:
return Status::OK();
}
Status close(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Status _distinct_pre_agg_with_serialized_key(vectorized::Block* in_block,
vectorized::Block* out_block);

View File

@ -238,30 +238,6 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_name = "ExchangeSinkOperatorX";
}
ExchangeSinkOperatorX::ExchangeSinkOperatorX(
const RowDescriptor& row_desc, PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch)
: DataSinkOperatorX(dest_node_id),
_row_desc(row_desc),
_part_type(TPartitionType::UNPARTITIONED),
_dests(destinations),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
_dest_node_id(dest_node_id) {
_cur_pb_block = &_pb_block1;
_name = "ExchangeSinkOperatorX";
}
ExchangeSinkOperatorX::ExchangeSinkOperatorX(const RowDescriptor& row_desc,
bool send_query_statistics_with_every_batch)
: DataSinkOperatorX(0),
_row_desc(row_desc),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
_dest_node_id(0) {
_cur_pb_block = &_pb_block1;
_name = "ExchangeSinkOperatorX";
}
Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tsink));
const TDataStreamSink& t_stream_sink = tsink.stream_sink;
@ -541,7 +517,7 @@ Status ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch
return Status::OK();
}
Status ExchangeSinkOperatorX::try_close(RuntimeState* state) {
Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status) {
CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
local_state._serializer.reset_block();
Status final_st = Status::OK();
@ -554,7 +530,7 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state) {
return final_st;
}
Status ExchangeSinkLocalState::close(RuntimeState* state) {
Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
@ -571,7 +547,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state) {
}
_sink_buffer->update_profile(profile());
_sink_buffer->close();
return PipelineXSinkLocalState<>::close(state);
return PipelineXSinkLocalState<>::close(state, exec_status);
}
WriteDependency* ExchangeSinkOperatorX::wait_for_dependency(RuntimeState* state) {

View File

@ -161,7 +161,7 @@ public:
_serializer(this) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status close(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1);
void register_channels(pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer);
@ -248,11 +248,6 @@ public:
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch);
ExchangeSinkOperatorX(const RowDescriptor& row_desc, PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch);
ExchangeSinkOperatorX(const RowDescriptor& row_desc,
bool send_query_statistics_with_every_batch);
Status init(const TDataSink& tsink) override;
RuntimeState* state() { return _state; }
@ -266,7 +261,7 @@ public:
Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block* src, PBlock* dest,
int num_receivers = 1);
Status try_close(RuntimeState* state) override;
Status try_close(RuntimeState* state, Status exec_status) override;
WriteDependency* wait_for_dependency(RuntimeState* state) override;
bool is_pending_finish(RuntimeState* state) const override;

View File

@ -18,8 +18,13 @@
#include "result_file_sink_operator.h"
#include <memory>
#include <random>
#include "pipeline/exec/exchange_sink_buffer.h"
#include "pipeline/exec/operator.h"
#include "runtime/buffer_control_block.h"
#include "runtime/result_buffer_mgr.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vresult_file_sink.h"
namespace doris {
@ -38,4 +43,238 @@ OperatorPtr ResultFileSinkOperatorBuilder::build_operator() {
ResultFileSinkOperator::ResultFileSinkOperator(OperatorBuilderBase* operator_builder,
DataSink* sink)
: DataSinkOperator(operator_builder, sink) {};
} // namespace doris::pipeline
ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
: AsyncWriterSink<vectorized::VFileResultWriter, ResultFileSinkOperatorX>(parent, state),
_serializer(this) {}
ResultFileSinkOperatorX::ResultFileSinkOperatorX(const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr)
: DataSinkOperatorX(0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_is_top_sink(true) {}
ResultFileSinkOperatorX::ResultFileSinkOperatorX(
const RowDescriptor& row_desc, const TResultFileSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch, const std::vector<TExpr>& t_output_expr,
DescriptorTbl& descs)
: DataSinkOperatorX(0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_dests(destinations),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
_output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false),
_is_top_sink(false) {
CHECK_EQ(destinations.size(), 1);
}
Status ResultFileSinkOperatorX::init(const TDataSink& tsink) {
auto& sink = tsink.result_file_sink;
CHECK(sink.__isset.file_options);
_file_opts.reset(new vectorized::ResultFileOptions(sink.file_options));
CHECK(sink.__isset.storage_backend_type);
_storage_type = sink.storage_backend_type;
//for impl csv_with_name and csv_with_names_and_types
_header_type = sink.header_type;
_header = sink.header;
// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
return Status::OK();
}
Status ResultFileSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::prepare(state));
return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc);
}
Status ResultFileSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::open(state));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
_sender_id = info.sender_id;
_brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait");
auto& p = _parent->cast<ResultFileSinkOperatorX>();
CHECK(p._file_opts.get() != nullptr);
if (p._is_top_sink) {
// create sender
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), p._buf_size, &_sender, state->enable_pipeline_exec(),
state->execution_timeout()));
// create writer
_writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
p._file_opts.get(), p._storage_type, state->fragment_instance_id(),
_output_vexpr_ctxs, _sender.get(), nullptr, state->return_object_data_as_binary(),
p._output_row_descriptor));
} else {
// init channel
_output_block = vectorized::Block::create_unique(
p._output_row_descriptor.tuple_descriptors()[0]->slots(), 1);
_writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
p._file_opts.get(), p._storage_type, state->fragment_instance_id(),
_output_vexpr_ctxs, nullptr, _output_block.get(),
state->return_object_data_as_binary(), p._output_row_descriptor));
std::map<int64_t, int64_t> fragment_id_to_channel_index;
for (int i = 0; i < p._dests.size(); ++i) {
_channels.push_back(new vectorized::Channel(
this, p._row_desc, p._dests[i].brpc_server, state->fragment_instance_id(),
info.tsink.result_file_sink.dest_node_id, false,
p._send_query_statistics_with_every_batch));
}
std::random_device rd;
std::mt19937 g(rd());
shuffle(_channels.begin(), _channels.end(), g);
int local_size = 0;
for (int i = 0; i < _channels.size(); ++i) {
RETURN_IF_ERROR(_channels[i]->init(state));
if (_channels[i]->is_local()) {
local_size++;
}
}
_only_local_exchange = local_size == _channels.size();
}
_writer->set_dependency(_async_writer_dependency.get());
_writer->set_header_info(p._header_type, p._header);
return Status::OK();
}
Status ResultFileSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
return Base::open(state);
}
Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (Base::_closed) {
return Status::OK();
}
SCOPED_TIMER(_close_timer);
SCOPED_TIMER(profile()->total_time_counter());
auto& p = _parent->cast<ResultFileSinkOperatorX>();
if (_closed) {
return Status::OK();
}
Status final_status = exec_status;
// close the writer
if (_writer && _writer->need_normal_close()) {
Status st = _writer->close();
if (!st.ok() && exec_status.ok()) {
// close file writer failed, should return this error to client
final_status = st;
}
}
if (p._is_top_sink) {
// close sender, this is normal path end
if (_sender) {
_sender->update_num_written_rows(_writer == nullptr ? 0 : _writer->get_written_rows());
_sender->close(final_status);
}
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
state->fragment_instance_id());
} else {
if (final_status.ok()) {
bool all_receiver_eof = true;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
all_receiver_eof = false;
break;
}
}
if (all_receiver_eof) {
return Status::EndOfFile("all data stream channels EOF");
}
// 1. serialize depends on it is not local exchange
// 2. send block
// 3. rollover block
if (_only_local_exchange) {
if (!_output_block->empty()) {
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
status = channel->send_local_block(_output_block.get());
HANDLE_CHANNEL_STATUS(state, channel, status);
}
}
}
} else {
{
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
bool serialized = false;
RETURN_IF_ERROR(_serializer.next_serialized_block(
_output_block.get(), _block_holder->get_block(), _channels.size(),
&serialized, true));
if (serialized) {
auto cur_block = _serializer.get_block()->to_block();
if (!cur_block.empty()) {
RETURN_IF_ERROR(_serializer.serialize_block(
&cur_block, _block_holder->get_block(), _channels.size()));
} else {
_block_holder->get_block()->Clear();
}
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
status = channel->send_local_block(&cur_block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status =
channel->send_block(_block_holder.get(), nullptr, true);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
}
cur_block.clear_column_data();
_serializer.get_block()->set_muatable_columns(cur_block.mutate_columns());
}
}
}
}
_output_block->clear();
}
return Base::close(state, exec_status);
}
template <typename ChannelPtrType>
void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st) {
channel->set_receiver_eof(st);
channel->close(state);
}
Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) {
CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
return local_state.sink(state, in_block, source_state);
}
bool ResultFileSinkOperatorX::is_pending_finish(RuntimeState* state) const {
auto& local_state = state->get_sink_local_state(id())->cast<ResultFileSinkLocalState>();
return local_state.is_pending_finish();
}
WriteDependency* ResultFileSinkOperatorX::wait_for_dependency(RuntimeState* state) {
CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
return local_state.write_blocked_by();
}
} // namespace doris::pipeline

View File

@ -20,6 +20,7 @@
#include <stdint.h>
#include "operator.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/sink/vresult_file_sink.h"
namespace doris {
@ -42,5 +43,83 @@ public:
bool can_write() override { return true; }
};
class ResultFileSinkOperatorX;
class ResultFileSinkLocalState final
: public AsyncWriterSink<vectorized::VFileResultWriter, ResultFileSinkOperatorX> {
public:
using Base = AsyncWriterSink<vectorized::VFileResultWriter, ResultFileSinkOperatorX>;
ENABLE_FACTORY_CREATOR(ResultFileSinkLocalState);
ResultFileSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state);
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
int sender_id() const { return _sender_id; }
RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
private:
friend class ResultFileSinkOperatorX;
template <typename ChannelPtrType>
void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st);
std::unique_ptr<vectorized::Block> _output_block = nullptr;
std::shared_ptr<BufferControlBlock> _sender;
std::vector<vectorized::Channel<ResultFileSinkLocalState>*> _channels;
bool _only_local_exchange = false;
vectorized::BlockSerializer<ResultFileSinkLocalState> _serializer;
std::unique_ptr<vectorized::BroadcastPBlockHolder> _block_holder;
RuntimeProfile::Counter* _brpc_wait_timer;
int _sender_id;
};
class ResultFileSinkOperatorX final : public DataSinkOperatorX<ResultFileSinkLocalState> {
public:
ResultFileSinkOperatorX(const RowDescriptor& row_desc, const std::vector<TExpr>& t_output_expr);
ResultFileSinkOperatorX(const RowDescriptor& row_desc, const TResultFileSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs);
Status init(const TDataSink& thrift_sink) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
WriteDependency* wait_for_dependency(RuntimeState* state) override;
bool is_pending_finish(RuntimeState* state) const override;
private:
friend class ResultFileSinkLocalState;
template <typename Writer, typename Parent>
friend class AsyncWriterSink;
const RowDescriptor& _row_desc;
const std::vector<TExpr>& _t_output_expr;
const std::vector<TPlanFragmentDestination> _dests;
bool _send_query_statistics_with_every_batch;
// set file options when sink type is FILE
std::unique_ptr<vectorized::ResultFileOptions> _file_opts;
TStorageBackendType::type _storage_type;
// Owned by the RuntimeState.
RowDescriptor _output_row_descriptor;
int _buf_size = 1024; // Allocated from _pool
bool _is_top_sink = true;
std::string _header;
std::string _header_type;
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
};
} // namespace pipeline
} // namespace doris
} // namespace doris

View File

@ -167,7 +167,7 @@ Status ResultSinkOperatorX::_second_phase_fetch_data(RuntimeState* state,
return Status::OK();
}
Status ResultSinkLocalState::close(RuntimeState* state) {
Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
@ -202,7 +202,7 @@ Status ResultSinkLocalState::close(RuntimeState* state) {
state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
state->fragment_instance_id());
RETURN_IF_ERROR(PipelineXSinkLocalState<>::close(state));
RETURN_IF_ERROR(PipelineXSinkLocalState<>::close(state, exec_status));
return final_status;
}

View File

@ -79,7 +79,7 @@ public:
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
private:
friend class ResultSinkOperatorX;

View File

@ -374,7 +374,7 @@ Status StreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block* i
return Status::OK();
}
Status StreamingAggSinkLocalState::close(RuntimeState* state) {
Status StreamingAggSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
@ -387,7 +387,7 @@ Status StreamingAggSinkLocalState::close(RuntimeState* state) {
COUNTER_SET(_queue_byte_size_counter, _shared_state->data_queue->max_bytes_in_queue());
}
_preagg_block.clear();
return Base::close(state);
return Base::close(state, exec_status);
}
} // namespace doris::pipeline

View File

@ -83,7 +83,7 @@ public:
StreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state);
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status close(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* output_block);
private:

View File

@ -715,7 +715,7 @@ void PipelineFragmentContext::close_if_prepare_failed() {
}
for (auto& task : _tasks) {
DCHECK(!task->is_pending_finish());
WARN_IF_ERROR(task->close(), "close_if_prepare_failed failed: ");
WARN_IF_ERROR(task->close(Status::OK()), "close_if_prepare_failed failed: ");
close_a_pipeline();
}
}

View File

@ -319,7 +319,7 @@ Status PipelineTask::finalize() {
return _sink->finalize(_state);
}
Status PipelineTask::try_close() {
Status PipelineTask::try_close(Status exec_status) {
if (_try_close_flag) {
return Status::OK();
}
@ -329,7 +329,7 @@ Status PipelineTask::try_close() {
return status1.ok() ? status2 : status1;
}
Status PipelineTask::close() {
Status PipelineTask::close(Status exec_status) {
int64_t close_ns = 0;
Defer defer {[&]() {
if (_task_queue) {

View File

@ -123,10 +123,10 @@ public:
// Try to close this pipeline task. If there are still some resources need to be released after `try_close`,
// this task will enter the `PENDING_FINISH` state.
virtual Status try_close();
virtual Status try_close(Status exec_status);
// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
virtual Status close();
virtual Status close(Status exec_status);
void put_in_runnable_queue() {
_schedule_time++;

View File

@ -598,5 +598,13 @@ private:
PartitionSortNodeSharedState _partition_sort_state;
};
class AsyncWriterDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(AsyncWriterDependency);
AsyncWriterDependency(int id) : WriteDependency(id, "AsyncWriterDependency") {}
~AsyncWriterDependency() override = default;
void* shared_state() override { return nullptr; }
};
} // namespace pipeline
} // namespace doris

View File

@ -38,6 +38,7 @@
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
#include "pipeline/exec/result_file_sink_operator.h"
#include "pipeline/exec/result_sink_operator.h"
#include "pipeline/exec/select_operator.h"
#include "pipeline/exec/sort_sink_operator.h"
@ -373,6 +374,7 @@ Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori
#define DECLARE_OPERATOR_X(LOCAL_STATE) template class DataSinkOperatorX<LOCAL_STATE>;
DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(ResultSinkLocalState)
DECLARE_OPERATOR_X(ResultFileSinkLocalState)
DECLARE_OPERATOR_X(AnalyticSinkLocalState)
DECLARE_OPERATOR_X(SortSinkLocalState)
DECLARE_OPERATOR_X(BlockingAggSinkLocalState)

View File

@ -53,6 +53,7 @@ struct LocalSinkStateInfo {
RuntimeProfile* parent_profile;
const int sender_id;
Dependency* dependency;
const TDataSink& tsink;
};
class PipelineXLocalStateBase {
@ -217,7 +218,7 @@ public:
virtual bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const { return true; }
virtual Status close(RuntimeState* state) override;
Status close(RuntimeState* state) override;
virtual Dependency* wait_for_dependency(RuntimeState* state) { return nullptr; }
@ -261,11 +262,11 @@ public:
[[nodiscard]] int64_t limit() const { return _limit; }
[[nodiscard]] virtual const RowDescriptor& row_desc() override {
[[nodiscard]] const RowDescriptor& row_desc() override {
return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor;
}
[[nodiscard]] virtual bool is_source() const override { return false; }
[[nodiscard]] bool is_source() const override { return false; }
Status get_next_after_projects(RuntimeState* state, vectorized::Block* block,
SourceState& source_state);
@ -318,7 +319,7 @@ public:
: PipelineXLocalStateBase(state, parent) {}
virtual ~PipelineXLocalState() {}
virtual Status init(RuntimeState* state, LocalStateInfo& info) override {
Status init(RuntimeState* state, LocalStateInfo& info) override {
_runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
" (id=" + std::to_string(_parent->id()) + ")"));
_runtime_profile->set_metadata(_parent->id());
@ -358,7 +359,7 @@ public:
return Status::OK();
}
virtual Status close(RuntimeState* state) override {
Status close(RuntimeState* state) override {
if (_closed) {
return Status::OK();
}
@ -373,7 +374,7 @@ public:
return Status::OK();
}
virtual std::string debug_string(int indentation_level = 0) const override;
[[nodiscard]] std::string debug_string(int indentation_level = 0) const override;
protected:
DependencyType* _dependency;
@ -386,7 +387,7 @@ class PipelineXSinkLocalStateBase {
public:
PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent_, RuntimeState* state_)
: _parent(parent_), _state(state_) {}
virtual ~PipelineXSinkLocalStateBase() {}
virtual ~PipelineXSinkLocalStateBase() = default;
// Do initialization. This step should be executed only once and in bthread, so we can do some
// lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX)
@ -395,7 +396,8 @@ public:
// Do initialization. This step can be executed multiple times, so we should make sure it is
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) = 0;
virtual Status close(RuntimeState* state) = 0;
virtual Status close(RuntimeState* state, Status exec_status) = 0;
virtual Status try_close(RuntimeState* state, Status exec_status) = 0;
virtual std::string debug_string(int indentation_level) const;
@ -456,12 +458,15 @@ public:
DataSinkOperatorXBase(const int id, std::vector<int>& sources)
: OperatorBase(nullptr), _id(id), _dests_id(sources) {}
virtual ~DataSinkOperatorXBase() override = default;
~DataSinkOperatorXBase() override = default;
// For agg/sort/join sink.
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
virtual Status init(const TDataSink& tsink) override;
Status init(const TDataSink& tsink) override;
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override { return Status::OK(); }
virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0;
@ -485,8 +490,12 @@ public:
virtual void get_dependency(std::vector<DependencySPtr>& dependency) = 0;
virtual Status close(RuntimeState* state) override {
return state->get_sink_local_state(id())->close(state);
Status close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
}
Status try_close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
}
bool can_read() override {
@ -508,7 +517,7 @@ public:
virtual bool is_pending_finish(RuntimeState* state) const { return false; }
std::string debug_string() const override { return ""; }
[[nodiscard]] std::string debug_string() const override { return ""; }
virtual std::string debug_string(int indentation_level) const;
@ -518,7 +527,13 @@ public:
[[nodiscard]] bool is_source() const override { return false; }
virtual Status close(RuntimeState* state, Status exec_status) { return Status::OK(); }
virtual Status close(RuntimeState* state, Status exec_status) {
return state->get_sink_local_state(id())->close(state, exec_status);
}
virtual Status try_close(RuntimeState* state, Status exec_status) {
return state->get_sink_local_state(id())->try_close(state, exec_status);
}
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
@ -537,6 +552,9 @@ public:
virtual bool should_dry_run(RuntimeState* state) { return false; }
protected:
template <typename Writer, typename Parent>
friend class AsyncWriterSink;
const int _id;
std::vector<int> _dests_id;
@ -564,8 +582,6 @@ public:
Status setup_local_states(RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) override;
void get_dependency(std::vector<DependencySPtr>& dependency) override;
void get_dependency(DependencySPtr& dependency);
using LocalState = LocalStateType;
};
@ -577,7 +593,7 @@ public:
: PipelineXSinkLocalStateBase(parent, state) {}
~PipelineXSinkLocalState() override = default;
virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(
_parent->get_name() + " (id=" + std::to_string(_parent->id()) + ")"));
@ -597,9 +613,11 @@ public:
return Status::OK();
}
virtual Status open(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override { return Status::OK(); }
Status close(RuntimeState* state) override {
Status try_close(RuntimeState* state, Status exec_status) override { return Status::OK(); }
Status close(RuntimeState* state, Status exec_status) override {
if (_closed) {
return Status::OK();
}
@ -610,7 +628,7 @@ public:
return Status::OK();
}
std::string debug_string(int indentation_level) const override;
[[nodiscard]] std::string debug_string(int indentation_level) const override;
typename DependencyType::SharedState*& get_shared_state() { return _shared_state; }
protected:
@ -660,4 +678,65 @@ public:
[[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const = 0;
};
template <typename Writer, typename Parent>
class AsyncWriterSink : public PipelineXSinkLocalState<> {
public:
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalState<>(parent, state), _async_writer_dependency(nullptr) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
_output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size());
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(_parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(
state, _output_vexpr_ctxs[i]));
}
_writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
_async_writer_dependency = AsyncWriterDependency::create_shared(_parent->id());
_writer->set_dependency(_async_writer_dependency.get());
return Status::OK();
}
Status open(RuntimeState* state) override {
RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
_writer->start_writer(state, _profile);
return Status::OK();
}
Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) {
return _writer->sink(block, source_state == SourceState::FINISHED);
}
WriteDependency* write_blocked_by() { return _writer->write_blocked_by(); }
Status close(RuntimeState* state, Status exec_status) override {
if (_closed) {
return Status::OK();
}
if (_writer->need_normal_close()) {
if (exec_status.ok() && !state->is_cancelled()) {
RETURN_IF_ERROR(_writer->commit_trans());
}
RETURN_IF_ERROR(_writer->close(exec_status));
}
return PipelineXSinkLocalState<>::close(state, exec_status);
}
Status try_close(RuntimeState* state, Status exec_status) override {
if (state->is_cancelled() || !exec_status.ok()) {
_writer->force_close(!exec_status.ok() ? exec_status : Status::Cancelled("Cancelled"));
}
return Status::OK();
}
bool is_pending_finish() { return _writer->is_pending_finish(); }
protected:
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
std::unique_ptr<Writer> _writer;
std::shared_ptr<AsyncWriterDependency> _async_writer_dependency;
};
} // namespace doris::pipeline

View File

@ -62,6 +62,7 @@
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
#include "pipeline/exec/result_file_sink_operator.h"
#include "pipeline/exec/result_sink_operator.h"
#include "pipeline/exec/scan_operator.h"
#include "pipeline/exec/select_operator.h"
@ -204,14 +205,15 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
_runtime_state->obj_pool(), request, *_query_ctx->desc_tbl, &_root_op, root_pipeline));
// 3. Create sink operator
if (request.fragment.__isset.output_sink) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request, root_pipeline->output_row_desc(),
_runtime_state.get(), *desc_tbl, root_pipeline->id()));
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
root_pipeline->set_sink(_sink);
if (!request.fragment.__isset.output_sink) {
return Status::InternalError("No output sink in this fragment!");
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink, request.fragment.output_exprs,
request, root_pipeline->output_row_desc(), _runtime_state.get(), *desc_tbl,
root_pipeline->id()));
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
root_pipeline->set_sink(_sink);
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
@ -255,6 +257,26 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
_sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, thrift_sink.result_sink));
break;
}
case TDataSinkType::RESULT_FILE_SINK: {
if (!thrift_sink.__isset.result_file_sink) {
return Status::InternalError("Missing result file sink.");
}
// TODO: figure out good buffer size based on size of output row
bool send_query_statistics_with_every_batch =
params.__isset.send_query_statistics_with_every_batch
? params.send_query_statistics_with_every_batch
: false;
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
_sink.reset(new ResultFileSinkOperatorX(
row_desc, thrift_sink.result_file_sink, params.destinations,
send_query_statistics_with_every_batch, output_exprs, desc_tbl));
} else {
_sink.reset(new ResultFileSinkOperatorX(row_desc, output_exprs));
}
break;
}
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
@ -382,7 +404,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
pipeline_id_to_task[dep]->get_downstream_dependency());
}
}
RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(), local_params));
RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(), local_params,
request.fragment.output_sink));
}
{
@ -790,7 +813,7 @@ void PipelineXFragmentContext::close_if_prepare_failed() {
for (auto& task : _tasks) {
for (auto& t : task) {
DCHECK(!t->is_pending_finish());
WARN_IF_ERROR(t->close(), "close_if_prepare_failed failed: ");
WARN_IF_ERROR(t->close(Status::OK()), "close_if_prepare_failed failed: ");
close_a_pipeline();
}
}

View File

@ -55,7 +55,8 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState
_sink->get_dependency(_downstream_dependency);
}
Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams& local_params) {
Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams& local_params,
const TDataSink& tsink) {
DCHECK(_sink);
DCHECK(_cur_state == PipelineTaskState::NOT_READY) << get_state_name(_cur_state);
_init_profile();
@ -69,7 +70,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams
std::vector<LocalSinkStateInfo> infos;
for (auto& dep : deps) {
infos.push_back(LocalSinkStateInfo {_pipeline->pipeline_profile(),
local_params.sender_id, dep.get()});
local_params.sender_id, dep.get(), tsink});
}
RETURN_IF_ERROR(_sink->setup_local_states(state, infos));
}
@ -249,17 +250,17 @@ Status PipelineXTask::finalize() {
return _sink->finalize(_state);
}
Status PipelineXTask::try_close() {
Status PipelineXTask::try_close(Status exec_status) {
if (_try_close_flag) {
return Status::OK();
}
_try_close_flag = true;
Status status1 = _sink->try_close(_state);
Status status1 = _sink->try_close(_state, exec_status);
Status status2 = _source->try_close(_state);
return status1.ok() ? status2 : status1;
}
Status PipelineXTask::close() {
Status PipelineXTask::close(Status exec_status) {
int64_t close_ns = 0;
Defer defer {[&]() {
if (_task_queue) {
@ -269,7 +270,7 @@ Status PipelineXTask::close() {
Status s;
{
SCOPED_RAW_TIMER(&close_ns);
s = _sink->close(_state);
s = _sink->close(_state, exec_status);
for (auto& op : _operators) {
auto tem = op->close(_state);
if (!tem.ok() && s.ok()) {

View File

@ -57,16 +57,17 @@ public:
return Status::InternalError("Should not reach here!");
}
Status prepare(RuntimeState* state, const TPipelineInstanceParams& local_params);
Status prepare(RuntimeState* state, const TPipelineInstanceParams& local_params,
const TDataSink& tsink);
Status execute(bool* eos) override;
// Try to close this pipeline task. If there are still some resources need to be released after `try_close`,
// this task will enter the `PENDING_FINISH` state.
Status try_close() override;
Status try_close(Status exec_status) override;
// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
Status close() override;
Status close(Status exec_status) override;
bool source_can_read() override {
if (_dry_run) {

View File

@ -277,7 +277,7 @@ void TaskScheduler::_do_work(size_t index) {
// exec failed,cancel all fragment instance
fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.to_string());
fragment_ctx->send_report(true);
_try_close_task(task, PipelineTaskState::CANCELED);
_try_close_task(task, PipelineTaskState::CANCELED, status);
continue;
}
@ -291,8 +291,10 @@ void TaskScheduler::_do_work(size_t index) {
fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
"finalize fail:" + status.to_string());
} else {
_try_close_task(task, fragment_ctx->is_canceled() ? PipelineTaskState::CANCELED
: PipelineTaskState::FINISHED);
_try_close_task(task,
fragment_ctx->is_canceled() ? PipelineTaskState::CANCELED
: PipelineTaskState::FINISHED,
status);
}
continue;
}
@ -315,11 +317,12 @@ void TaskScheduler::_do_work(size_t index) {
}
}
void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) {
auto status = task->try_close();
void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state,
Status exec_status) {
auto status = task->try_close(exec_status);
if (!status.ok() && state != PipelineTaskState::CANCELED) {
// Call `close` if `try_close` failed to make sure allocated resources are released
task->close();
task->close(exec_status);
task->query_context()->cancel(true, status.to_string(),
Status::Cancelled(status.to_string()));
state = PipelineTaskState::CANCELED;
@ -328,7 +331,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state)
_blocked_task_scheduler->add_blocked_task(task);
return;
} else {
status = task->close();
status = task->close(exec_status);
if (!status.ok() && state != PipelineTaskState::CANCELED) {
task->query_context()->cancel(true, status.to_string(),
Status::Cancelled(status.to_string()));

View File

@ -106,6 +106,7 @@ private:
void _do_work(size_t index);
// after _try_close_task, task maybe destructed.
void _try_close_task(PipelineTask* task, PipelineTaskState state);
void _try_close_task(PipelineTask* task, PipelineTaskState state,
Status exec_status = Status::OK());
};
} // namespace doris::pipeline

View File

@ -34,6 +34,7 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/result_file_sink_operator.h"
#include "runtime/descriptors.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
@ -125,13 +126,18 @@ Status Channel<Parent>::send_current_block(bool eos) {
template <typename Parent>
Status Channel<Parent>::send_local_block(bool eos) {
SCOPED_TIMER(_parent->local_send_timer());
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
SCOPED_TIMER(_parent->local_send_timer());
}
Block block = _serializer.get_block()->to_block();
_serializer.get_block()->set_muatable_columns(block.clone_empty_columns());
if (_recvr_is_valid()) {
COUNTER_UPDATE(_parent->local_bytes_send_counter(), block.bytes());
COUNTER_UPDATE(_parent->local_sent_rows(), block.rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
COUNTER_UPDATE(_parent->local_bytes_send_counter(), block.bytes());
COUNTER_UPDATE(_parent->local_sent_rows(), block.rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}
_local_recvr->add_block(&block, _parent->sender_id(), true);
if (eos) {
_local_recvr->remove_sender(_parent->sender_id(), _be_number);
@ -145,11 +151,15 @@ Status Channel<Parent>::send_local_block(bool eos) {
template <typename Parent>
Status Channel<Parent>::send_local_block(Block* block) {
SCOPED_TIMER(_parent->local_send_timer());
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
SCOPED_TIMER(_parent->local_send_timer());
}
if (_recvr_is_valid()) {
COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}
_local_recvr->add_block(block, _parent->sender_id(), false);
return Status::OK();
} else {
@ -159,8 +169,11 @@ Status Channel<Parent>::send_local_block(Block* block) {
template <typename Parent>
Status Channel<Parent>::send_block(PBlock* block, bool eos) {
SCOPED_TIMER(_parent->brpc_send_timer());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
SCOPED_TIMER(_parent->brpc_send_timer());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}
if (_closure == nullptr) {
_closure = new RefCountClosure<PTransmitDataResult>();
_closure->ref();
@ -191,7 +204,7 @@ Status Channel<Parent>::send_block(PBlock* block, bool eos) {
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
if (enable_http_send_block(_brpc_request, _parent->transfer_large_data_by_brpc())) {
if (enable_http_send_block(_brpc_request, config::transfer_large_data_by_brpc)) {
RETURN_IF_ERROR(transmit_block_http(_state->exec_env(), _closure, _brpc_request,
_brpc_dest_addr));
} else {
@ -744,12 +757,16 @@ Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest
SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
if (rows) {
if (rows->size() > 0) {
SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer());
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer());
}
const int* begin = &(*rows)[0];
_mutable_block->add_rows(block, begin, begin + rows->size());
}
} else if (!block->empty()) {
SCOPED_TIMER(_parent->merge_block_timer());
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
SCOPED_TIMER(_parent->merge_block_timer());
}
RETURN_IF_ERROR(_mutable_block->merge(*block));
}
}
@ -779,7 +796,7 @@ Status BlockSerializer<Parent>::serialize_block(PBlock* dest, int num_receivers)
template <typename Parent>
Status BlockSerializer<Parent>::serialize_block(const Block* src, PBlock* dest, int num_receivers) {
{
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
SCOPED_TIMER(_parent->_serialize_batch_timer);
dest->Clear();
size_t uncompressed_bytes = 0, compressed_bytes = 0;
@ -845,6 +862,8 @@ bool VDataStreamSender::channel_all_can_write() {
template class Channel<pipeline::ExchangeSinkLocalState>;
template class Channel<VDataStreamSender>;
template class Channel<pipeline::ResultFileSinkLocalState>;
template class BlockSerializer<pipeline::ResultFileSinkLocalState>;
template class BlockSerializer<pipeline::ExchangeSinkLocalState>;
template class BlockSerializer<VDataStreamSender>;

View File

@ -17,6 +17,7 @@
#include "async_result_writer.h"
#include "pipeline/pipeline_x/dependency.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_state.h"
@ -32,7 +33,7 @@ class TExpr;
namespace vectorized {
AsyncResultWriter::AsyncResultWriter(const doris::vectorized::VExprContextSPtrs& output_expr_ctxs)
: _vec_output_expr_ctxs(output_expr_ctxs) {};
: _vec_output_expr_ctxs(output_expr_ctxs), _dependency(nullptr) {};
Status AsyncResultWriter::sink(Block* block, bool eos) {
auto rows = block->rows();
@ -50,8 +51,14 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
std::lock_guard l(_m);
_eos = eos;
if (_dependency && _is_finished()) {
_dependency->set_ready_for_write();
}
if (rows) {
_data_queue.emplace_back(std::move(add_block));
if (_dependency && !_data_queue_is_available() && !_is_finished()) {
_dependency->block_writing();
}
} else if (_eos && _data_queue.empty()) {
status = Status::EndOfFile("Run out of sink data");
}
@ -65,6 +72,9 @@ std::unique_ptr<Block> AsyncResultWriter::get_block_from_queue() {
DCHECK(!_data_queue.empty());
auto block = std::move(_data_queue.front());
_data_queue.pop_front();
if (_dependency && _data_queue_is_available()) {
_dependency->set_ready_for_write();
}
return block;
}
@ -99,6 +109,9 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
if (!status.ok()) {
std::unique_lock l(_m);
_writer_status = status;
if (_dependency && _is_finished()) {
_dependency->set_ready_for_write();
}
break;
}
}
@ -128,6 +141,9 @@ Status AsyncResultWriter::_projection_block(doris::vectorized::Block& input_bloc
void AsyncResultWriter::force_close(Status s) {
std::lock_guard l(_m);
_writer_status = s;
if (_dependency && _is_finished()) {
_dependency->set_ready_for_write();
}
_cv.notify_one();
}
@ -145,5 +161,11 @@ std::unique_ptr<Block> AsyncResultWriter::_get_free_block(doris::vectorized::Blo
return b;
}
pipeline::WriteDependency* AsyncResultWriter::write_blocked_by() {
std::lock_guard l(_m);
DCHECK(_dependency != nullptr);
return _dependency->write_blocked_by();
}
} // namespace vectorized
} // namespace doris

View File

@ -32,6 +32,12 @@ class RuntimeProfile;
class TDataSink;
class TExpr;
namespace pipeline {
class AsyncWriterDependency;
class WriteDependency;
} // namespace pipeline
namespace vectorized {
class Block;
/*
@ -50,6 +56,8 @@ class AsyncResultWriter : public ResultWriter {
public:
AsyncResultWriter(const VExprContextSPtrs& output_expr_ctxs);
void set_dependency(pipeline::AsyncWriterDependency* dep) { _dependency = dep; }
void force_close(Status s);
virtual bool in_transaction() { return false; }
@ -66,9 +74,11 @@ public:
bool can_write() {
std::lock_guard l(_m);
return _data_queue.size() < QUEUE_SIZE || !_writer_status.ok() || _eos;
return _data_queue_is_available() || _is_finished();
}
pipeline::WriteDependency* write_blocked_by();
[[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; }
void process_block(RuntimeState* state, RuntimeProfile* profile);
@ -90,6 +100,8 @@ protected:
void _return_free_block(std::unique_ptr<Block>);
private:
[[nodiscard]] bool _data_queue_is_available() const { return _data_queue.size() < QUEUE_SIZE; }
[[nodiscard]] bool _is_finished() const { return !_writer_status.ok() || _eos; }
static constexpr auto QUEUE_SIZE = 3;
std::mutex _m;
std::condition_variable _cv;
@ -99,6 +111,9 @@ private:
bool _need_normal_close = true;
bool _writer_thread_closed = false;
// Used by pipelineX
pipeline::AsyncWriterDependency* _dependency;
moodycamel::ConcurrentQueue<std::unique_ptr<Block>> _free_blocks;
};