diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index df9fa123f0..7a695699e9 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -928,7 +928,7 @@ Status AggSinkOperatorX::sink(doris::RuntimeState* state, } template -Status AggSinkLocalState::close(RuntimeState* state) { +Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(Base::profile()->total_time_counter()); SCOPED_TIMER(Base::_close_timer); if (Base::_closed) { @@ -943,7 +943,7 @@ Status AggSinkLocalState::close(RuntimeState* state) { std::vector tmp_hash_values; _hash_values.swap(tmp_hash_values); - return Base::close(state); + return Base::close(state, exec_status); } class StreamingAggSinkLocalState; diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index dd672777e8..34ba9e2449 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -56,7 +56,7 @@ public: virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; - virtual Status close(RuntimeState* state) override; + Status close(RuntimeState* state, Status exec_status) override; Status try_spill_disk(bool eos = false); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp index f4712ca1c5..ade0bdc30c 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp @@ -248,7 +248,7 @@ Status DistinctStreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized:: return Status::OK(); } -Status DistinctStreamingAggSinkLocalState::close(RuntimeState* state) { +Status DistinctStreamingAggSinkLocalState::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } @@ -256,7 +256,7 @@ Status DistinctStreamingAggSinkLocalState::close(RuntimeState* state) { // finish should be set, if not set here means error. _shared_state->data_queue->set_canceled(); } - return Base::close(state); + return Base::close(state, exec_status); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h index 8fc826fe05..0c909b413b 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h @@ -90,7 +90,7 @@ public: return Status::OK(); } - Status close(RuntimeState* state) override; + Status close(RuntimeState* state, Status exec_status) override; Status _distinct_pre_agg_with_serialized_key(vectorized::Block* in_block, vectorized::Block* out_block); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e8e87f758f..47ead19529 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -238,30 +238,6 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _name = "ExchangeSinkOperatorX"; } -ExchangeSinkOperatorX::ExchangeSinkOperatorX( - const RowDescriptor& row_desc, PlanNodeId dest_node_id, - const std::vector& destinations, - bool send_query_statistics_with_every_batch) - : DataSinkOperatorX(dest_node_id), - _row_desc(row_desc), - _part_type(TPartitionType::UNPARTITIONED), - _dests(destinations), - _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch), - _dest_node_id(dest_node_id) { - _cur_pb_block = &_pb_block1; - _name = "ExchangeSinkOperatorX"; -} - -ExchangeSinkOperatorX::ExchangeSinkOperatorX(const RowDescriptor& row_desc, - bool send_query_statistics_with_every_batch) - : DataSinkOperatorX(0), - _row_desc(row_desc), - _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch), - _dest_node_id(0) { - _cur_pb_block = &_pb_block1; - _name = "ExchangeSinkOperatorX"; -} - Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { RETURN_IF_ERROR(DataSinkOperatorX::init(tsink)); const TDataStreamSink& t_stream_sink = tsink.stream_sink; @@ -541,7 +517,7 @@ Status ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch return Status::OK(); } -Status ExchangeSinkOperatorX::try_close(RuntimeState* state) { +Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status) { CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state); local_state._serializer.reset_block(); Status final_st = Status::OK(); @@ -554,7 +530,7 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state) { return final_st; } -Status ExchangeSinkLocalState::close(RuntimeState* state) { +Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } @@ -571,7 +547,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state) { } _sink_buffer->update_profile(profile()); _sink_buffer->close(); - return PipelineXSinkLocalState<>::close(state); + return PipelineXSinkLocalState<>::close(state, exec_status); } WriteDependency* ExchangeSinkOperatorX::wait_for_dependency(RuntimeState* state) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 6ca9f5dded..0b05491901 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -161,7 +161,7 @@ public: _serializer(this) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - Status close(RuntimeState* state) override; + Status close(RuntimeState* state, Status exec_status) override; Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1); void register_channels(pipeline::ExchangeSinkBuffer* buffer); @@ -248,11 +248,6 @@ public: const TDataStreamSink& sink, const std::vector& destinations, bool send_query_statistics_with_every_batch); - ExchangeSinkOperatorX(const RowDescriptor& row_desc, PlanNodeId dest_node_id, - const std::vector& destinations, - bool send_query_statistics_with_every_batch); - ExchangeSinkOperatorX(const RowDescriptor& row_desc, - bool send_query_statistics_with_every_batch); Status init(const TDataSink& tsink) override; RuntimeState* state() { return _state; } @@ -266,7 +261,7 @@ public: Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block* src, PBlock* dest, int num_receivers = 1); - Status try_close(RuntimeState* state) override; + Status try_close(RuntimeState* state, Status exec_status) override; WriteDependency* wait_for_dependency(RuntimeState* state) override; bool is_pending_finish(RuntimeState* state) const override; diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index eeb4449ad6..26f8fed731 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -18,8 +18,13 @@ #include "result_file_sink_operator.h" #include +#include +#include "pipeline/exec/exchange_sink_buffer.h" #include "pipeline/exec/operator.h" +#include "runtime/buffer_control_block.h" +#include "runtime/result_buffer_mgr.h" +#include "vec/sink/vdata_stream_sender.h" #include "vec/sink/vresult_file_sink.h" namespace doris { @@ -38,4 +43,238 @@ OperatorPtr ResultFileSinkOperatorBuilder::build_operator() { ResultFileSinkOperator::ResultFileSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) : DataSinkOperator(operator_builder, sink) {}; -} // namespace doris::pipeline \ No newline at end of file + +ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* parent, + RuntimeState* state) + : AsyncWriterSink(parent, state), + _serializer(this) {} + +ResultFileSinkOperatorX::ResultFileSinkOperatorX(const RowDescriptor& row_desc, + const std::vector& t_output_expr) + : DataSinkOperatorX(0), + _row_desc(row_desc), + _t_output_expr(t_output_expr), + _is_top_sink(true) {} + +ResultFileSinkOperatorX::ResultFileSinkOperatorX( + const RowDescriptor& row_desc, const TResultFileSink& sink, + const std::vector& destinations, + bool send_query_statistics_with_every_batch, const std::vector& t_output_expr, + DescriptorTbl& descs) + : DataSinkOperatorX(0), + _row_desc(row_desc), + _t_output_expr(t_output_expr), + _dests(destinations), + _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch), + _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false), + _is_top_sink(false) { + CHECK_EQ(destinations.size(), 1); +} + +Status ResultFileSinkOperatorX::init(const TDataSink& tsink) { + auto& sink = tsink.result_file_sink; + CHECK(sink.__isset.file_options); + _file_opts.reset(new vectorized::ResultFileOptions(sink.file_options)); + CHECK(sink.__isset.storage_backend_type); + _storage_type = sink.storage_backend_type; + + //for impl csv_with_name and csv_with_names_and_types + _header_type = sink.header_type; + _header = sink.header; + + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + + return Status::OK(); +} + +Status ResultFileSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::prepare(state)); + return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc); +} + +Status ResultFileSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::open(state)); + return vectorized::VExpr::open(_output_vexpr_ctxs, state); +} + +Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(_open_timer); + _sender_id = info.sender_id; + + _brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait"); + auto& p = _parent->cast(); + CHECK(p._file_opts.get() != nullptr); + if (p._is_top_sink) { + // create sender + RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( + state->fragment_instance_id(), p._buf_size, &_sender, state->enable_pipeline_exec(), + state->execution_timeout())); + // create writer + _writer.reset(new (std::nothrow) vectorized::VFileResultWriter( + p._file_opts.get(), p._storage_type, state->fragment_instance_id(), + _output_vexpr_ctxs, _sender.get(), nullptr, state->return_object_data_as_binary(), + p._output_row_descriptor)); + } else { + // init channel + _output_block = vectorized::Block::create_unique( + p._output_row_descriptor.tuple_descriptors()[0]->slots(), 1); + _writer.reset(new (std::nothrow) vectorized::VFileResultWriter( + p._file_opts.get(), p._storage_type, state->fragment_instance_id(), + _output_vexpr_ctxs, nullptr, _output_block.get(), + state->return_object_data_as_binary(), p._output_row_descriptor)); + + std::map fragment_id_to_channel_index; + for (int i = 0; i < p._dests.size(); ++i) { + _channels.push_back(new vectorized::Channel( + this, p._row_desc, p._dests[i].brpc_server, state->fragment_instance_id(), + info.tsink.result_file_sink.dest_node_id, false, + p._send_query_statistics_with_every_batch)); + } + std::random_device rd; + std::mt19937 g(rd()); + shuffle(_channels.begin(), _channels.end(), g); + + int local_size = 0; + for (int i = 0; i < _channels.size(); ++i) { + RETURN_IF_ERROR(_channels[i]->init(state)); + if (_channels[i]->is_local()) { + local_size++; + } + } + _only_local_exchange = local_size == _channels.size(); + } + _writer->set_dependency(_async_writer_dependency.get()); + _writer->set_header_info(p._header_type, p._header); + return Status::OK(); +} + +Status ResultFileSinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(_open_timer); + return Base::open(state); +} + +Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) { + if (Base::_closed) { + return Status::OK(); + } + SCOPED_TIMER(_close_timer); + SCOPED_TIMER(profile()->total_time_counter()); + + auto& p = _parent->cast(); + if (_closed) { + return Status::OK(); + } + + Status final_status = exec_status; + // close the writer + if (_writer && _writer->need_normal_close()) { + Status st = _writer->close(); + if (!st.ok() && exec_status.ok()) { + // close file writer failed, should return this error to client + final_status = st; + } + } + if (p._is_top_sink) { + // close sender, this is normal path end + if (_sender) { + _sender->update_num_written_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); + _sender->close(final_status); + } + state->exec_env()->result_mgr()->cancel_at_time( + time(nullptr) + config::result_buffer_cancelled_interval_time, + state->fragment_instance_id()); + } else { + if (final_status.ok()) { + bool all_receiver_eof = true; + for (auto channel : _channels) { + if (!channel->is_receiver_eof()) { + all_receiver_eof = false; + break; + } + } + if (all_receiver_eof) { + return Status::EndOfFile("all data stream channels EOF"); + } + // 1. serialize depends on it is not local exchange + // 2. send block + // 3. rollover block + if (_only_local_exchange) { + if (!_output_block->empty()) { + Status status; + for (auto channel : _channels) { + if (!channel->is_receiver_eof()) { + status = channel->send_local_block(_output_block.get()); + HANDLE_CHANNEL_STATUS(state, channel, status); + } + } + } + } else { + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + bool serialized = false; + RETURN_IF_ERROR(_serializer.next_serialized_block( + _output_block.get(), _block_holder->get_block(), _channels.size(), + &serialized, true)); + if (serialized) { + auto cur_block = _serializer.get_block()->to_block(); + if (!cur_block.empty()) { + RETURN_IF_ERROR(_serializer.serialize_block( + &cur_block, _block_holder->get_block(), _channels.size())); + } else { + _block_holder->get_block()->Clear(); + } + Status status; + for (auto channel : _channels) { + if (!channel->is_receiver_eof()) { + if (channel->is_local()) { + status = channel->send_local_block(&cur_block); + } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + status = + channel->send_block(_block_holder.get(), nullptr, true); + } + HANDLE_CHANNEL_STATUS(state, channel, status); + } + } + cur_block.clear_column_data(); + _serializer.get_block()->set_muatable_columns(cur_block.mutate_columns()); + } + } + } + } + _output_block->clear(); + } + + return Base::close(state, exec_status); +} + +template +void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, + Status st) { + channel->set_receiver_eof(st); + channel->close(state); +} + +Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) { + CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + return local_state.sink(state, in_block, source_state); +} + +bool ResultFileSinkOperatorX::is_pending_finish(RuntimeState* state) const { + auto& local_state = state->get_sink_local_state(id())->cast(); + return local_state.is_pending_finish(); +} + +WriteDependency* ResultFileSinkOperatorX::wait_for_dependency(RuntimeState* state) { + CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); + return local_state.write_blocked_by(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index beb04ff79c..fb77e7e8c9 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -20,6 +20,7 @@ #include #include "operator.h" +#include "pipeline/pipeline_x/operator.h" #include "vec/sink/vresult_file_sink.h" namespace doris { @@ -42,5 +43,83 @@ public: bool can_write() override { return true; } }; +class ResultFileSinkOperatorX; +class ResultFileSinkLocalState final + : public AsyncWriterSink { +public: + using Base = AsyncWriterSink; + ENABLE_FACTORY_CREATOR(ResultFileSinkLocalState); + ResultFileSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override; + Status close(RuntimeState* state, Status exec_status) override; + + int sender_id() const { return _sender_id; } + + RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; } + +private: + friend class ResultFileSinkOperatorX; + + template + void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st); + + std::unique_ptr _output_block = nullptr; + std::shared_ptr _sender; + + std::vector*> _channels; + bool _only_local_exchange = false; + vectorized::BlockSerializer _serializer; + std::unique_ptr _block_holder; + RuntimeProfile::Counter* _brpc_wait_timer; + + int _sender_id; +}; + +class ResultFileSinkOperatorX final : public DataSinkOperatorX { +public: + ResultFileSinkOperatorX(const RowDescriptor& row_desc, const std::vector& t_output_expr); + ResultFileSinkOperatorX(const RowDescriptor& row_desc, const TResultFileSink& sink, + const std::vector& destinations, + bool send_query_statistics_with_every_batch, + const std::vector& t_output_expr, DescriptorTbl& descs); + Status init(const TDataSink& thrift_sink) override; + + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override; + + WriteDependency* wait_for_dependency(RuntimeState* state) override; + + bool is_pending_finish(RuntimeState* state) const override; + +private: + friend class ResultFileSinkLocalState; + template + friend class AsyncWriterSink; + + const RowDescriptor& _row_desc; + const std::vector& _t_output_expr; + + const std::vector _dests; + bool _send_query_statistics_with_every_batch; + + // set file options when sink type is FILE + std::unique_ptr _file_opts; + TStorageBackendType::type _storage_type; + + // Owned by the RuntimeState. + RowDescriptor _output_row_descriptor; + int _buf_size = 1024; // Allocated from _pool + bool _is_top_sink = true; + std::string _header; + std::string _header_type; + + vectorized::VExprContextSPtrs _output_vexpr_ctxs; +}; + } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 550885acb8..424d75b1d9 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -167,7 +167,7 @@ Status ResultSinkOperatorX::_second_phase_fetch_data(RuntimeState* state, return Status::OK(); } -Status ResultSinkLocalState::close(RuntimeState* state) { +Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } @@ -202,7 +202,7 @@ Status ResultSinkLocalState::close(RuntimeState* state) { state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, state->fragment_instance_id()); - RETURN_IF_ERROR(PipelineXSinkLocalState<>::close(state)); + RETURN_IF_ERROR(PipelineXSinkLocalState<>::close(state, exec_status)); return final_status; } diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 27c229be1b..799389ffe3 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -79,7 +79,7 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; - Status close(RuntimeState* state) override; + Status close(RuntimeState* state, Status exec_status) override; private: friend class ResultSinkOperatorX; diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp index 444ae606ec..b739103c94 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp @@ -374,7 +374,7 @@ Status StreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block* i return Status::OK(); } -Status StreamingAggSinkLocalState::close(RuntimeState* state) { +Status StreamingAggSinkLocalState::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } @@ -387,7 +387,7 @@ Status StreamingAggSinkLocalState::close(RuntimeState* state) { COUNTER_SET(_queue_byte_size_counter, _shared_state->data_queue->max_bytes_in_queue()); } _preagg_block.clear(); - return Base::close(state); + return Base::close(state, exec_status); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index ddea7328d9..1796716804 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -83,7 +83,7 @@ public: StreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - Status close(RuntimeState* state) override; + Status close(RuntimeState* state, Status exec_status) override; Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* output_block); private: diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 9c2bfa6737..65cd718949 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -715,7 +715,7 @@ void PipelineFragmentContext::close_if_prepare_failed() { } for (auto& task : _tasks) { DCHECK(!task->is_pending_finish()); - WARN_IF_ERROR(task->close(), "close_if_prepare_failed failed: "); + WARN_IF_ERROR(task->close(Status::OK()), "close_if_prepare_failed failed: "); close_a_pipeline(); } } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index be0ecbffe7..07be52dd24 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -319,7 +319,7 @@ Status PipelineTask::finalize() { return _sink->finalize(_state); } -Status PipelineTask::try_close() { +Status PipelineTask::try_close(Status exec_status) { if (_try_close_flag) { return Status::OK(); } @@ -329,7 +329,7 @@ Status PipelineTask::try_close() { return status1.ok() ? status2 : status1; } -Status PipelineTask::close() { +Status PipelineTask::close(Status exec_status) { int64_t close_ns = 0; Defer defer {[&]() { if (_task_queue) { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index bc5ac96ae9..8a31fae647 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -123,10 +123,10 @@ public: // Try to close this pipeline task. If there are still some resources need to be released after `try_close`, // this task will enter the `PENDING_FINISH` state. - virtual Status try_close(); + virtual Status try_close(Status exec_status); // if the pipeline create a bunch of pipeline task // must be call after all pipeline task is finish to release resource - virtual Status close(); + virtual Status close(Status exec_status); void put_in_runnable_queue() { _schedule_time++; diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index bff4665e15..f374e43b00 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -598,5 +598,13 @@ private: PartitionSortNodeSharedState _partition_sort_state; }; +class AsyncWriterDependency final : public WriteDependency { +public: + ENABLE_FACTORY_CREATOR(AsyncWriterDependency); + AsyncWriterDependency(int id) : WriteDependency(id, "AsyncWriterDependency") {} + ~AsyncWriterDependency() override = default; + void* shared_state() override { return nullptr; } +}; + } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index eaa63bb1e3..a9490bd967 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -38,6 +38,7 @@ #include "pipeline/exec/partition_sort_sink_operator.h" #include "pipeline/exec/partition_sort_source_operator.h" #include "pipeline/exec/repeat_operator.h" +#include "pipeline/exec/result_file_sink_operator.h" #include "pipeline/exec/result_sink_operator.h" #include "pipeline/exec/select_operator.h" #include "pipeline/exec/sort_sink_operator.h" @@ -373,6 +374,7 @@ Status StatefulOperatorX::get_block(RuntimeState* state, vectori #define DECLARE_OPERATOR_X(LOCAL_STATE) template class DataSinkOperatorX; DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState) DECLARE_OPERATOR_X(ResultSinkLocalState) +DECLARE_OPERATOR_X(ResultFileSinkLocalState) DECLARE_OPERATOR_X(AnalyticSinkLocalState) DECLARE_OPERATOR_X(SortSinkLocalState) DECLARE_OPERATOR_X(BlockingAggSinkLocalState) diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 0de1378eea..f219b68f2e 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -53,6 +53,7 @@ struct LocalSinkStateInfo { RuntimeProfile* parent_profile; const int sender_id; Dependency* dependency; + const TDataSink& tsink; }; class PipelineXLocalStateBase { @@ -217,7 +218,7 @@ public: virtual bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const { return true; } - virtual Status close(RuntimeState* state) override; + Status close(RuntimeState* state) override; virtual Dependency* wait_for_dependency(RuntimeState* state) { return nullptr; } @@ -261,11 +262,11 @@ public: [[nodiscard]] int64_t limit() const { return _limit; } - [[nodiscard]] virtual const RowDescriptor& row_desc() override { + [[nodiscard]] const RowDescriptor& row_desc() override { return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor; } - [[nodiscard]] virtual bool is_source() const override { return false; } + [[nodiscard]] bool is_source() const override { return false; } Status get_next_after_projects(RuntimeState* state, vectorized::Block* block, SourceState& source_state); @@ -318,7 +319,7 @@ public: : PipelineXLocalStateBase(state, parent) {} virtual ~PipelineXLocalState() {} - virtual Status init(RuntimeState* state, LocalStateInfo& info) override { + Status init(RuntimeState* state, LocalStateInfo& info) override { _runtime_profile.reset(new RuntimeProfile(_parent->get_name() + " (id=" + std::to_string(_parent->id()) + ")")); _runtime_profile->set_metadata(_parent->id()); @@ -358,7 +359,7 @@ public: return Status::OK(); } - virtual Status close(RuntimeState* state) override { + Status close(RuntimeState* state) override { if (_closed) { return Status::OK(); } @@ -373,7 +374,7 @@ public: return Status::OK(); } - virtual std::string debug_string(int indentation_level = 0) const override; + [[nodiscard]] std::string debug_string(int indentation_level = 0) const override; protected: DependencyType* _dependency; @@ -386,7 +387,7 @@ class PipelineXSinkLocalStateBase { public: PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent_, RuntimeState* state_) : _parent(parent_), _state(state_) {} - virtual ~PipelineXSinkLocalStateBase() {} + virtual ~PipelineXSinkLocalStateBase() = default; // Do initialization. This step should be executed only once and in bthread, so we can do some // lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX) @@ -395,7 +396,8 @@ public: // Do initialization. This step can be executed multiple times, so we should make sure it is // idempotent (e.g. wait for runtime filters). virtual Status open(RuntimeState* state) = 0; - virtual Status close(RuntimeState* state) = 0; + virtual Status close(RuntimeState* state, Status exec_status) = 0; + virtual Status try_close(RuntimeState* state, Status exec_status) = 0; virtual std::string debug_string(int indentation_level) const; @@ -456,12 +458,15 @@ public: DataSinkOperatorXBase(const int id, std::vector& sources) : OperatorBase(nullptr), _id(id), _dests_id(sources) {} - virtual ~DataSinkOperatorXBase() override = default; + ~DataSinkOperatorXBase() override = default; // For agg/sort/join sink. virtual Status init(const TPlanNode& tnode, RuntimeState* state); - virtual Status init(const TDataSink& tsink) override; + Status init(const TDataSink& tsink) override; + + Status prepare(RuntimeState* state) override { return Status::OK(); } + Status open(RuntimeState* state) override { return Status::OK(); } virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0; @@ -485,8 +490,12 @@ public: virtual void get_dependency(std::vector& dependency) = 0; - virtual Status close(RuntimeState* state) override { - return state->get_sink_local_state(id())->close(state); + Status close(RuntimeState* state) override { + return Status::InternalError("Should not reach here!"); + } + + Status try_close(RuntimeState* state) override { + return Status::InternalError("Should not reach here!"); } bool can_read() override { @@ -508,7 +517,7 @@ public: virtual bool is_pending_finish(RuntimeState* state) const { return false; } - std::string debug_string() const override { return ""; } + [[nodiscard]] std::string debug_string() const override { return ""; } virtual std::string debug_string(int indentation_level) const; @@ -518,7 +527,13 @@ public: [[nodiscard]] bool is_source() const override { return false; } - virtual Status close(RuntimeState* state, Status exec_status) { return Status::OK(); } + virtual Status close(RuntimeState* state, Status exec_status) { + return state->get_sink_local_state(id())->close(state, exec_status); + } + + virtual Status try_close(RuntimeState* state, Status exec_status) { + return state->get_sink_local_state(id())->try_close(state, exec_status); + } [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, @@ -537,6 +552,9 @@ public: virtual bool should_dry_run(RuntimeState* state) { return false; } protected: + template + friend class AsyncWriterSink; + const int _id; std::vector _dests_id; @@ -564,8 +582,6 @@ public: Status setup_local_states(RuntimeState* state, std::vector& infos) override; void get_dependency(std::vector& dependency) override; - void get_dependency(DependencySPtr& dependency); - using LocalState = LocalStateType; }; @@ -577,7 +593,7 @@ public: : PipelineXSinkLocalStateBase(parent, state) {} ~PipelineXSinkLocalState() override = default; - virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) override { + Status init(RuntimeState* state, LocalSinkStateInfo& info) override { // create profile _profile = state->obj_pool()->add(new RuntimeProfile( _parent->get_name() + " (id=" + std::to_string(_parent->id()) + ")")); @@ -597,9 +613,11 @@ public: return Status::OK(); } - virtual Status open(RuntimeState* state) override { return Status::OK(); } + Status open(RuntimeState* state) override { return Status::OK(); } - Status close(RuntimeState* state) override { + Status try_close(RuntimeState* state, Status exec_status) override { return Status::OK(); } + + Status close(RuntimeState* state, Status exec_status) override { if (_closed) { return Status::OK(); } @@ -610,7 +628,7 @@ public: return Status::OK(); } - std::string debug_string(int indentation_level) const override; + [[nodiscard]] std::string debug_string(int indentation_level) const override; typename DependencyType::SharedState*& get_shared_state() { return _shared_state; } protected: @@ -660,4 +678,65 @@ public: [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const = 0; }; +template +class AsyncWriterSink : public PipelineXSinkLocalState<> { +public: + AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state) + : PipelineXSinkLocalState<>(parent, state), _async_writer_dependency(nullptr) {} + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override { + RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info)); + _output_vexpr_ctxs.resize(_parent->cast()._output_vexpr_ctxs.size()); + for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { + RETURN_IF_ERROR(_parent->cast()._output_vexpr_ctxs[i]->clone( + state, _output_vexpr_ctxs[i])); + } + + _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs)); + _async_writer_dependency = AsyncWriterDependency::create_shared(_parent->id()); + _writer->set_dependency(_async_writer_dependency.get()); + return Status::OK(); + } + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state)); + _writer->start_writer(state, _profile); + return Status::OK(); + } + + Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) { + return _writer->sink(block, source_state == SourceState::FINISHED); + } + + WriteDependency* write_blocked_by() { return _writer->write_blocked_by(); } + + Status close(RuntimeState* state, Status exec_status) override { + if (_closed) { + return Status::OK(); + } + if (_writer->need_normal_close()) { + if (exec_status.ok() && !state->is_cancelled()) { + RETURN_IF_ERROR(_writer->commit_trans()); + } + RETURN_IF_ERROR(_writer->close(exec_status)); + } + return PipelineXSinkLocalState<>::close(state, exec_status); + } + + Status try_close(RuntimeState* state, Status exec_status) override { + if (state->is_cancelled() || !exec_status.ok()) { + _writer->force_close(!exec_status.ok() ? exec_status : Status::Cancelled("Cancelled")); + } + return Status::OK(); + } + + bool is_pending_finish() { return _writer->is_pending_finish(); } + +protected: + vectorized::VExprContextSPtrs _output_vexpr_ctxs; + std::unique_ptr _writer; + + std::shared_ptr _async_writer_dependency; +}; + } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 89679c4bf8..f8ea574c7c 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -62,6 +62,7 @@ #include "pipeline/exec/partition_sort_sink_operator.h" #include "pipeline/exec/partition_sort_source_operator.h" #include "pipeline/exec/repeat_operator.h" +#include "pipeline/exec/result_file_sink_operator.h" #include "pipeline/exec/result_sink_operator.h" #include "pipeline/exec/scan_operator.h" #include "pipeline/exec/select_operator.h" @@ -204,14 +205,15 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r _runtime_state->obj_pool(), request, *_query_ctx->desc_tbl, &_root_op, root_pipeline)); // 3. Create sink operator - if (request.fragment.__isset.output_sink) { - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink( - _runtime_state->obj_pool(), request.fragment.output_sink, - request.fragment.output_exprs, request, root_pipeline->output_row_desc(), - _runtime_state.get(), *desc_tbl, root_pipeline->id())); - RETURN_IF_ERROR(_sink->init(request.fragment.output_sink)); - root_pipeline->set_sink(_sink); + if (!request.fragment.__isset.output_sink) { + return Status::InternalError("No output sink in this fragment!"); } + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink( + _runtime_state->obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, + request, root_pipeline->output_row_desc(), _runtime_state.get(), *desc_tbl, + root_pipeline->id())); + RETURN_IF_ERROR(_sink->init(request.fragment.output_sink)); + root_pipeline->set_sink(_sink); // 4. Initialize global states in pipelines. for (PipelinePtr& pipeline : _pipelines) { @@ -255,6 +257,26 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, thrift_sink.result_sink)); break; } + case TDataSinkType::RESULT_FILE_SINK: { + if (!thrift_sink.__isset.result_file_sink) { + return Status::InternalError("Missing result file sink."); + } + + // TODO: figure out good buffer size based on size of output row + bool send_query_statistics_with_every_batch = + params.__isset.send_query_statistics_with_every_batch + ? params.send_query_statistics_with_every_batch + : false; + // Result file sink is not the top sink + if (params.__isset.destinations && params.destinations.size() > 0) { + _sink.reset(new ResultFileSinkOperatorX( + row_desc, thrift_sink.result_file_sink, params.destinations, + send_query_statistics_with_every_batch, output_exprs, desc_tbl)); + } else { + _sink.reset(new ResultFileSinkOperatorX(row_desc, output_exprs)); + } + break; + } case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { DCHECK(thrift_sink.__isset.multi_cast_stream_sink); DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0); @@ -382,7 +404,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( pipeline_id_to_task[dep]->get_downstream_dependency()); } } - RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(), local_params)); + RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(), local_params, + request.fragment.output_sink)); } { @@ -790,7 +813,7 @@ void PipelineXFragmentContext::close_if_prepare_failed() { for (auto& task : _tasks) { for (auto& t : task) { DCHECK(!t->is_pending_finish()); - WARN_IF_ERROR(t->close(), "close_if_prepare_failed failed: "); + WARN_IF_ERROR(t->close(Status::OK()), "close_if_prepare_failed failed: "); close_a_pipeline(); } } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index c98a3fcfd6..1d56bb2f63 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -55,7 +55,8 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState _sink->get_dependency(_downstream_dependency); } -Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams& local_params) { +Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams& local_params, + const TDataSink& tsink) { DCHECK(_sink); DCHECK(_cur_state == PipelineTaskState::NOT_READY) << get_state_name(_cur_state); _init_profile(); @@ -69,7 +70,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams std::vector infos; for (auto& dep : deps) { infos.push_back(LocalSinkStateInfo {_pipeline->pipeline_profile(), - local_params.sender_id, dep.get()}); + local_params.sender_id, dep.get(), tsink}); } RETURN_IF_ERROR(_sink->setup_local_states(state, infos)); } @@ -249,17 +250,17 @@ Status PipelineXTask::finalize() { return _sink->finalize(_state); } -Status PipelineXTask::try_close() { +Status PipelineXTask::try_close(Status exec_status) { if (_try_close_flag) { return Status::OK(); } _try_close_flag = true; - Status status1 = _sink->try_close(_state); + Status status1 = _sink->try_close(_state, exec_status); Status status2 = _source->try_close(_state); return status1.ok() ? status2 : status1; } -Status PipelineXTask::close() { +Status PipelineXTask::close(Status exec_status) { int64_t close_ns = 0; Defer defer {[&]() { if (_task_queue) { @@ -269,7 +270,7 @@ Status PipelineXTask::close() { Status s; { SCOPED_RAW_TIMER(&close_ns); - s = _sink->close(_state); + s = _sink->close(_state, exec_status); for (auto& op : _operators) { auto tem = op->close(_state); if (!tem.ok() && s.ok()) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 1d441a03ad..04b647b28a 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -57,16 +57,17 @@ public: return Status::InternalError("Should not reach here!"); } - Status prepare(RuntimeState* state, const TPipelineInstanceParams& local_params); + Status prepare(RuntimeState* state, const TPipelineInstanceParams& local_params, + const TDataSink& tsink); Status execute(bool* eos) override; // Try to close this pipeline task. If there are still some resources need to be released after `try_close`, // this task will enter the `PENDING_FINISH` state. - Status try_close() override; + Status try_close(Status exec_status) override; // if the pipeline create a bunch of pipeline task // must be call after all pipeline task is finish to release resource - Status close() override; + Status close(Status exec_status) override; bool source_can_read() override { if (_dry_run) { diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 51da7d811c..e71d8da0cd 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -277,7 +277,7 @@ void TaskScheduler::_do_work(size_t index) { // exec failed,cancel all fragment instance fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.to_string()); fragment_ctx->send_report(true); - _try_close_task(task, PipelineTaskState::CANCELED); + _try_close_task(task, PipelineTaskState::CANCELED, status); continue; } @@ -291,8 +291,10 @@ void TaskScheduler::_do_work(size_t index) { fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "finalize fail:" + status.to_string()); } else { - _try_close_task(task, fragment_ctx->is_canceled() ? PipelineTaskState::CANCELED - : PipelineTaskState::FINISHED); + _try_close_task(task, + fragment_ctx->is_canceled() ? PipelineTaskState::CANCELED + : PipelineTaskState::FINISHED, + status); } continue; } @@ -315,11 +317,12 @@ void TaskScheduler::_do_work(size_t index) { } } -void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) { - auto status = task->try_close(); +void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, + Status exec_status) { + auto status = task->try_close(exec_status); if (!status.ok() && state != PipelineTaskState::CANCELED) { // Call `close` if `try_close` failed to make sure allocated resources are released - task->close(); + task->close(exec_status); task->query_context()->cancel(true, status.to_string(), Status::Cancelled(status.to_string())); state = PipelineTaskState::CANCELED; @@ -328,7 +331,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) _blocked_task_scheduler->add_blocked_task(task); return; } else { - status = task->close(); + status = task->close(exec_status); if (!status.ok() && state != PipelineTaskState::CANCELED) { task->query_context()->cancel(true, status.to_string(), Status::Cancelled(status.to_string())); diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 8238968750..943b11a8a2 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -106,6 +106,7 @@ private: void _do_work(size_t index); // after _try_close_task, task maybe destructed. - void _try_close_task(PipelineTask* task, PipelineTaskState state); + void _try_close_task(PipelineTask* task, PipelineTaskState state, + Status exec_status = Status::OK()); }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 3b40da900b..263a063590 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -34,6 +34,7 @@ #include "common/object_pool.h" #include "common/status.h" #include "pipeline/exec/exchange_sink_operator.h" +#include "pipeline/exec/result_file_sink_operator.h" #include "runtime/descriptors.h" #include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" @@ -125,13 +126,18 @@ Status Channel::send_current_block(bool eos) { template Status Channel::send_local_block(bool eos) { - SCOPED_TIMER(_parent->local_send_timer()); + if constexpr (!std::is_same_v) { + SCOPED_TIMER(_parent->local_send_timer()); + } Block block = _serializer.get_block()->to_block(); _serializer.get_block()->set_muatable_columns(block.clone_empty_columns()); if (_recvr_is_valid()) { - COUNTER_UPDATE(_parent->local_bytes_send_counter(), block.bytes()); - COUNTER_UPDATE(_parent->local_sent_rows(), block.rows()); - COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); + if constexpr (!std::is_same_v) { + COUNTER_UPDATE(_parent->local_bytes_send_counter(), block.bytes()); + COUNTER_UPDATE(_parent->local_sent_rows(), block.rows()); + COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); + } + _local_recvr->add_block(&block, _parent->sender_id(), true); if (eos) { _local_recvr->remove_sender(_parent->sender_id(), _be_number); @@ -145,11 +151,15 @@ Status Channel::send_local_block(bool eos) { template Status Channel::send_local_block(Block* block) { - SCOPED_TIMER(_parent->local_send_timer()); + if constexpr (!std::is_same_v) { + SCOPED_TIMER(_parent->local_send_timer()); + } if (_recvr_is_valid()) { - COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes()); - COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); - COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); + if constexpr (!std::is_same_v) { + COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes()); + COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); + COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); + } _local_recvr->add_block(block, _parent->sender_id(), false); return Status::OK(); } else { @@ -159,8 +169,11 @@ Status Channel::send_local_block(Block* block) { template Status Channel::send_block(PBlock* block, bool eos) { - SCOPED_TIMER(_parent->brpc_send_timer()); - COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); + if constexpr (!std::is_same_v) { + SCOPED_TIMER(_parent->brpc_send_timer()); + COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); + } + if (_closure == nullptr) { _closure = new RefCountClosure(); _closure->ref(); @@ -191,7 +204,7 @@ Status Channel::send_block(PBlock* block, bool eos) { { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - if (enable_http_send_block(_brpc_request, _parent->transfer_large_data_by_brpc())) { + if (enable_http_send_block(_brpc_request, config::transfer_large_data_by_brpc)) { RETURN_IF_ERROR(transmit_block_http(_state->exec_env(), _closure, _brpc_request, _brpc_dest_addr)); } else { @@ -744,12 +757,16 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (rows) { if (rows->size() > 0) { - SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer()); + if constexpr (!std::is_same_v) { + SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer()); + } const int* begin = &(*rows)[0]; _mutable_block->add_rows(block, begin, begin + rows->size()); } } else if (!block->empty()) { - SCOPED_TIMER(_parent->merge_block_timer()); + if constexpr (!std::is_same_v) { + SCOPED_TIMER(_parent->merge_block_timer()); + } RETURN_IF_ERROR(_mutable_block->merge(*block)); } } @@ -779,7 +796,7 @@ Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) template Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int num_receivers) { - { + if constexpr (!std::is_same_v) { SCOPED_TIMER(_parent->_serialize_batch_timer); dest->Clear(); size_t uncompressed_bytes = 0, compressed_bytes = 0; @@ -845,6 +862,8 @@ bool VDataStreamSender::channel_all_can_write() { template class Channel; template class Channel; +template class Channel; +template class BlockSerializer; template class BlockSerializer; template class BlockSerializer; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 55cf580f09..c1e7424c61 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -17,6 +17,7 @@ #include "async_result_writer.h" +#include "pipeline/pipeline_x/dependency.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/runtime_state.h" @@ -32,7 +33,7 @@ class TExpr; namespace vectorized { AsyncResultWriter::AsyncResultWriter(const doris::vectorized::VExprContextSPtrs& output_expr_ctxs) - : _vec_output_expr_ctxs(output_expr_ctxs) {}; + : _vec_output_expr_ctxs(output_expr_ctxs), _dependency(nullptr) {}; Status AsyncResultWriter::sink(Block* block, bool eos) { auto rows = block->rows(); @@ -50,8 +51,14 @@ Status AsyncResultWriter::sink(Block* block, bool eos) { std::lock_guard l(_m); _eos = eos; + if (_dependency && _is_finished()) { + _dependency->set_ready_for_write(); + } if (rows) { _data_queue.emplace_back(std::move(add_block)); + if (_dependency && !_data_queue_is_available() && !_is_finished()) { + _dependency->block_writing(); + } } else if (_eos && _data_queue.empty()) { status = Status::EndOfFile("Run out of sink data"); } @@ -65,6 +72,9 @@ std::unique_ptr AsyncResultWriter::get_block_from_queue() { DCHECK(!_data_queue.empty()); auto block = std::move(_data_queue.front()); _data_queue.pop_front(); + if (_dependency && _data_queue_is_available()) { + _dependency->set_ready_for_write(); + } return block; } @@ -99,6 +109,9 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi if (!status.ok()) { std::unique_lock l(_m); _writer_status = status; + if (_dependency && _is_finished()) { + _dependency->set_ready_for_write(); + } break; } } @@ -128,6 +141,9 @@ Status AsyncResultWriter::_projection_block(doris::vectorized::Block& input_bloc void AsyncResultWriter::force_close(Status s) { std::lock_guard l(_m); _writer_status = s; + if (_dependency && _is_finished()) { + _dependency->set_ready_for_write(); + } _cv.notify_one(); } @@ -145,5 +161,11 @@ std::unique_ptr AsyncResultWriter::_get_free_block(doris::vectorized::Blo return b; } +pipeline::WriteDependency* AsyncResultWriter::write_blocked_by() { + std::lock_guard l(_m); + DCHECK(_dependency != nullptr); + return _dependency->write_blocked_by(); +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index 371d30ea7c..27b8f2de13 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -32,6 +32,12 @@ class RuntimeProfile; class TDataSink; class TExpr; +namespace pipeline { +class AsyncWriterDependency; +class WriteDependency; + +} // namespace pipeline + namespace vectorized { class Block; /* @@ -50,6 +56,8 @@ class AsyncResultWriter : public ResultWriter { public: AsyncResultWriter(const VExprContextSPtrs& output_expr_ctxs); + void set_dependency(pipeline::AsyncWriterDependency* dep) { _dependency = dep; } + void force_close(Status s); virtual bool in_transaction() { return false; } @@ -66,9 +74,11 @@ public: bool can_write() { std::lock_guard l(_m); - return _data_queue.size() < QUEUE_SIZE || !_writer_status.ok() || _eos; + return _data_queue_is_available() || _is_finished(); } + pipeline::WriteDependency* write_blocked_by(); + [[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; } void process_block(RuntimeState* state, RuntimeProfile* profile); @@ -90,6 +100,8 @@ protected: void _return_free_block(std::unique_ptr); private: + [[nodiscard]] bool _data_queue_is_available() const { return _data_queue.size() < QUEUE_SIZE; } + [[nodiscard]] bool _is_finished() const { return !_writer_status.ok() || _eos; } static constexpr auto QUEUE_SIZE = 3; std::mutex _m; std::condition_variable _cv; @@ -99,6 +111,9 @@ private: bool _need_normal_close = true; bool _writer_thread_closed = false; + // Used by pipelineX + pipeline::AsyncWriterDependency* _dependency; + moodycamel::ConcurrentQueue> _free_blocks; };