From c58d18147b0ba361f008d95391b4697e226b5cea Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Mon, 8 Jan 2024 00:00:33 +0800 Subject: [PATCH] [refactor](resultwriter) rename append_block to write method is more meaningful (#29635) --------- Co-authored-by: yiguolei --- be/src/pipeline/exec/result_sink_operator.cpp | 2 +- be/src/runtime/result_writer.h | 3 ++- be/src/service/point_query_executor.cpp | 2 +- be/src/vec/sink/async_writer_sink.h | 2 +- be/src/vec/sink/multi_cast_data_stream_sink.h | 3 +-- be/src/vec/sink/varrow_flight_result_writer.cpp | 2 +- be/src/vec/sink/varrow_flight_result_writer.h | 2 +- be/src/vec/sink/vmysql_result_writer.cpp | 2 +- be/src/vec/sink/vmysql_result_writer.h | 2 +- be/src/vec/sink/vresult_sink.cpp | 2 +- be/src/vec/sink/writer/async_result_writer.cpp | 2 +- be/src/vec/sink/writer/async_result_writer.h | 9 +++------ be/src/vec/sink/writer/vfile_result_writer.cpp | 2 +- be/src/vec/sink/writer/vfile_result_writer.h | 2 +- be/src/vec/sink/writer/vjdbc_table_writer.cpp | 2 +- be/src/vec/sink/writer/vjdbc_table_writer.h | 2 +- be/src/vec/sink/writer/vmysql_table_writer.cpp | 2 +- be/src/vec/sink/writer/vmysql_table_writer.h | 2 +- be/src/vec/sink/writer/vodbc_table_writer.cpp | 2 +- be/src/vec/sink/writer/vodbc_table_writer.h | 2 +- be/src/vec/sink/writer/vtablet_writer.cpp | 6 +++--- be/src/vec/sink/writer/vtablet_writer.h | 2 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 6 +++--- be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +- .../vec/data_types/serde/data_type_serde_mysql_test.cpp | 2 +- 25 files changed, 32 insertions(+), 35 deletions(-) diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index dcf0d996c6..8dc6eed299 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -136,7 +136,7 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, if (_fetch_option.use_two_phase_fetch && block->rows() > 0) { RETURN_IF_ERROR(_second_phase_fetch_data(state, block)); } - RETURN_IF_ERROR(local_state._writer->append_block(*block)); + RETURN_IF_ERROR(local_state._writer->write(*block)); if (_fetch_option.use_two_phase_fetch) { // Block structure may be changed by calling _second_phase_fetch_data(). // So we should clear block in case of unmatched columns diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h index f199d2f6b9..f65f06399b 100644 --- a/be/src/runtime/result_writer.h +++ b/be/src/runtime/result_writer.h @@ -44,7 +44,8 @@ public: [[nodiscard]] bool output_object_data() const { return _output_object_data; } - virtual Status append_block(vectorized::Block& block) = 0; + // Write is sync, it will do real IO work. + virtual Status write(vectorized::Block& block) = 0; virtual bool can_sink() { return true; } diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index d122a9d3cb..a86d5ed90b 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -334,7 +334,7 @@ template Status _serialize_block(MysqlWriter& mysql_writer, vectorized::Block& block, PTabletKeyLookupResponse* response) { block.clear_names(); - RETURN_IF_ERROR(mysql_writer.append_block(block)); + RETURN_IF_ERROR(mysql_writer.write(block)); assert(mysql_writer.results().size() == 1); uint8_t* buf = nullptr; uint32_t len = 0; diff --git a/be/src/vec/sink/async_writer_sink.h b/be/src/vec/sink/async_writer_sink.h index 8963f9a4ec..600eb60928 100644 --- a/be/src/vec/sink/async_writer_sink.h +++ b/be/src/vec/sink/async_writer_sink.h @@ -82,7 +82,7 @@ public: SCOPED_TIMER(_exec_timer); COUNTER_UPDATE(_blocks_sent_counter, 1); COUNTER_UPDATE(_output_rows_counter, block->rows()); - return _writer->append_block(*block); + return _writer->write(*block); } Status sink(RuntimeState* state, vectorized::Block* block, bool eos = false) override { diff --git a/be/src/vec/sink/multi_cast_data_stream_sink.h b/be/src/vec/sink/multi_cast_data_stream_sink.h index b222914283..7cc057013a 100644 --- a/be/src/vec/sink/multi_cast_data_stream_sink.h +++ b/be/src/vec/sink/multi_cast_data_stream_sink.h @@ -36,8 +36,7 @@ public: SCOPED_TIMER(_exec_timer); COUNTER_UPDATE(_blocks_sent_counter, 1); COUNTER_UPDATE(_output_rows_counter, block->rows()); - static_cast(_multi_cast_data_streamer->push(state, block, eos)); - return Status::OK(); + return _multi_cast_data_streamer->push(state, block, eos); }; Status open(doris::RuntimeState* state) override { return Status::OK(); }; diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp b/be/src/vec/sink/varrow_flight_result_writer.cpp index 771040bfb8..49885b3420 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.cpp +++ b/be/src/vec/sink/varrow_flight_result_writer.cpp @@ -52,7 +52,7 @@ void VArrowFlightResultWriter::_init_profile() { _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES); } -Status VArrowFlightResultWriter::append_block(Block& input_block) { +Status VArrowFlightResultWriter::write(Block& input_block) { SCOPED_TIMER(_append_row_batch_timer); Status status = Status::OK(); if (UNLIKELY(input_block.rows() == 0)) { diff --git a/be/src/vec/sink/varrow_flight_result_writer.h b/be/src/vec/sink/varrow_flight_result_writer.h index b9b44d1dfd..774d938bc9 100644 --- a/be/src/vec/sink/varrow_flight_result_writer.h +++ b/be/src/vec/sink/varrow_flight_result_writer.h @@ -43,7 +43,7 @@ public: Status init(RuntimeState* state) override; - Status append_block(Block& block) override; + Status write(Block& block) override; bool can_sink() override; diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp index 6a7e16a484..41f0682b1c 100644 --- a/be/src/vec/sink/vmysql_result_writer.cpp +++ b/be/src/vec/sink/vmysql_result_writer.cpp @@ -105,7 +105,7 @@ void VMysqlResultWriter::_init_profile() { } template -Status VMysqlResultWriter::append_block(Block& input_block) { +Status VMysqlResultWriter::write(Block& input_block) { SCOPED_TIMER(_append_row_batch_timer); Status status = Status::OK(); if (UNLIKELY(input_block.rows() == 0)) { diff --git a/be/src/vec/sink/vmysql_result_writer.h b/be/src/vec/sink/vmysql_result_writer.h index 10a0b7e9e0..8227d09dcc 100644 --- a/be/src/vec/sink/vmysql_result_writer.h +++ b/be/src/vec/sink/vmysql_result_writer.h @@ -47,7 +47,7 @@ public: Status init(RuntimeState* state) override; - Status append_block(Block& block) override; + Status write(Block& block) override; bool can_sink() override; diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index c6e7e7b87d..3fa2e03597 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -141,7 +141,7 @@ Status VResultSink::send(RuntimeState* state, Block* block, bool eos) { DCHECK(_sink_type == TResultSinkType::MYSQL_PROTOCAL); RETURN_IF_ERROR(second_phase_fetch_data(state, block)); } - RETURN_IF_ERROR(_writer->append_block(*block)); + RETURN_IF_ERROR(_writer->write(*block)); if (_fetch_option.use_two_phase_fetch) { // Block structure may be changed by calling _second_phase_fetch_data(). // So we should clear block in case of unmatched columns diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 422dc2efef..93b86ca843 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -111,7 +111,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi } auto block = _get_block_from_queue(); - auto status = write(block); + auto status = write(*block); if (!status.ok()) [[unlikely]] { std::unique_lock l(_m); _writer_status = status; diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index e91ff1a070..d48e41daa6 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -51,7 +51,7 @@ class Block; * * The Sub class of AsyncResultWriter need to impl two virtual function * * Status open() the first time IO work like: create file/ connect networking - * * Status append_block() do the real IO work for block + * * Status write() do the real IO work for block */ class AsyncResultWriter : public ResultWriter { public: @@ -71,8 +71,6 @@ public: virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0; - Status write(std::unique_ptr& block) { return append_block(*block); } - bool can_write() { std::lock_guard l(_m); return _data_queue_is_available() || _is_finished(); @@ -80,9 +78,7 @@ public: [[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; } - void process_block(RuntimeState* state, RuntimeProfile* profile); - - // sink the block date to date queue + // sink the block date to date queue, it is async Status sink(Block* block, bool eos); // Add the IO thread task process block() to thread pool to dispose the IO @@ -99,6 +95,7 @@ protected: void _return_free_block(std::unique_ptr); private: + void process_block(RuntimeState* state, RuntimeProfile* profile); [[nodiscard]] bool _data_queue_is_available() const { return _data_queue.size() < QUEUE_SIZE; } [[nodiscard]] bool _is_finished() const { return !_writer_status.ok() || _eos; } diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index 67c386c24d..f7015e17c9 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -221,7 +221,7 @@ std::string VFileResultWriter::_file_format_to_name() { } } -Status VFileResultWriter::append_block(Block& block) { +Status VFileResultWriter::write(Block& block) { if (block.rows() == 0) { return Status::OK(); } diff --git a/be/src/vec/sink/writer/vfile_result_writer.h b/be/src/vec/sink/writer/vfile_result_writer.h index 0e7e0ccefc..864d0966a7 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.h +++ b/be/src/vec/sink/writer/vfile_result_writer.h @@ -58,7 +58,7 @@ public: VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); - Status append_block(Block& block) override; + Status write(Block& block) override; Status close(Status s = Status::OK()) override; diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp b/be/src/vec/sink/writer/vjdbc_table_writer.cpp index d4cb90d0a5..a4805921ef 100644 --- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp +++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp @@ -54,7 +54,7 @@ VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_expr_ctxs) : AsyncResultWriter(output_expr_ctxs), JdbcConnector(create_connect_param(t_sink)) {} -Status VJdbcTableWriter::append_block(vectorized::Block& block) { +Status VJdbcTableWriter::write(vectorized::Block& block) { Block output_block; RETURN_IF_ERROR(_projection_block(block, &output_block)); auto num_rows = output_block.rows(); diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h b/be/src/vec/sink/writer/vjdbc_table_writer.h index b1e4d5ad28..735c023fce 100644 --- a/be/src/vec/sink/writer/vjdbc_table_writer.h +++ b/be/src/vec/sink/writer/vjdbc_table_writer.h @@ -44,7 +44,7 @@ public: return init_to_write(profile); } - Status append_block(vectorized::Block& block) override; + Status write(vectorized::Block& block) override; Status close(Status s) override { return JdbcConnector::close(s); } diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp b/be/src/vec/sink/writer/vmysql_table_writer.cpp index 4a3513c80f..d9ca6d96f9 100644 --- a/be/src/vec/sink/writer/vmysql_table_writer.cpp +++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp @@ -109,7 +109,7 @@ Status VMysqlTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { return Status::OK(); } -Status VMysqlTableWriter::append_block(vectorized::Block& block) { +Status VMysqlTableWriter::write(vectorized::Block& block) { Block output_block; RETURN_IF_ERROR(_projection_block(block, &output_block)); auto num_rows = output_block.rows(); diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h b/be/src/vec/sink/writer/vmysql_table_writer.h index a88c5730cb..856d0a21ec 100644 --- a/be/src/vec/sink/writer/vmysql_table_writer.h +++ b/be/src/vec/sink/writer/vmysql_table_writer.h @@ -51,7 +51,7 @@ public: // connect to mysql server Status open(RuntimeState* state, RuntimeProfile* profile) override; - Status append_block(vectorized::Block& block) override; + Status write(vectorized::Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp b/be/src/vec/sink/writer/vodbc_table_writer.cpp index caa5c2c3ab..da068c3d67 100644 --- a/be/src/vec/sink/writer/vodbc_table_writer.cpp +++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp @@ -45,7 +45,7 @@ VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink& t_sink, const VExprContextSPtrs& output_expr_ctxs) : AsyncResultWriter(output_expr_ctxs), ODBCConnector(create_connect_param(t_sink)) {} -Status VOdbcTableWriter::append_block(vectorized::Block& block) { +Status VOdbcTableWriter::write(vectorized::Block& block) { Block output_block; RETURN_IF_ERROR(_projection_block(block, &output_block)); auto num_rows = output_block.rows(); diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h b/be/src/vec/sink/writer/vodbc_table_writer.h index 4c0e6a19f6..a28947355e 100644 --- a/be/src/vec/sink/writer/vodbc_table_writer.h +++ b/be/src/vec/sink/writer/vodbc_table_writer.h @@ -44,7 +44,7 @@ public: return init_to_write(profile); } - Status append_block(vectorized::Block& block) override; + Status write(vectorized::Block& block) override; Status close(Status s) override { return ODBCConnector::close(s); } diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 3741e80fab..a216a5034c 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1337,9 +1337,9 @@ Status VTabletWriter::_send_new_partition_batch() { // these order is only. // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. // 2. deal batched block - // 3. now reuse the column of lval block. cuz append_block doesn't real adjust it. it generate a new block from that. + // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. _row_distribution.clear_batching_stats(); - RETURN_IF_ERROR(this->append_block(tmp_block)); + RETURN_IF_ERROR(this->write(tmp_block)); _row_distribution._batching_block->set_mutable_columns( tmp_block.mutate_columns()); // Recovery back _row_distribution._batching_block->clear_column_data(); @@ -1606,7 +1606,7 @@ void VTabletWriter::_generate_index_channels_payloads( } } -Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { +Status VTabletWriter::write(doris::vectorized::Block& input_block) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 05a9c455ca..a93c2bffc7 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -518,7 +518,7 @@ public: Status init_properties(ObjectPool* pool); - Status append_block(Block& block) override; + Status write(Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index e02fba7c21..bf4033eebe 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -365,7 +365,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, return Status::OK(); } -Status VTabletWriterV2::append_block(Block& input_block) { +Status VTabletWriterV2::write(Block& input_block) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); @@ -470,9 +470,9 @@ Status VTabletWriterV2::_send_new_partition_batch() { // these order is only. // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. // 2. deal batched block - // 3. now reuse the column of lval block. cuz append_block doesn't real adjust it. it generate a new block from that. + // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. _row_distribution.clear_batching_stats(); - RETURN_IF_ERROR(this->append_block(tmp_block)); + RETURN_IF_ERROR(this->write(tmp_block)); _row_distribution._batching_block->set_mutable_columns( tmp_block.mutate_columns()); // Recovery back _row_distribution._batching_block->clear_column_data(); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index c829b254c7..e4cdcdca09 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -108,7 +108,7 @@ public: Status init_properties(ObjectPool* pool); - Status append_block(Block& block) override; + Status write(Block& block) override; Status open(RuntimeState* state, RuntimeProfile* profile) override; diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp index 5fff0d75cc..e781960c5e 100644 --- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp @@ -319,7 +319,7 @@ void serialize_and_deserialize_mysql_test() { // mysql_writer init vectorized::VMysqlResultWriter mysql_writer(nullptr, _output_vexpr_ctxs, nullptr); - Status st = mysql_writer.append_block(block); + Status st = mysql_writer.write(block); EXPECT_TRUE(st.ok()); }