[refactor](resultwriter) rename append_block to write method is more meaningful (#29635)

---------

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2024-01-08 00:00:33 +08:00
committed by GitHub
parent c497f749ce
commit c58d18147b
25 changed files with 32 additions and 35 deletions

View File

@ -136,7 +136,7 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block,
if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
}
RETURN_IF_ERROR(local_state._writer->append_block(*block));
RETURN_IF_ERROR(local_state._writer->write(*block));
if (_fetch_option.use_two_phase_fetch) {
// Block structure may be changed by calling _second_phase_fetch_data().
// So we should clear block in case of unmatched columns

View File

@ -44,7 +44,8 @@ public:
[[nodiscard]] bool output_object_data() const { return _output_object_data; }
virtual Status append_block(vectorized::Block& block) = 0;
// Write is sync, it will do real IO work.
virtual Status write(vectorized::Block& block) = 0;
virtual bool can_sink() { return true; }

View File

@ -334,7 +334,7 @@ template <typename MysqlWriter>
Status _serialize_block(MysqlWriter& mysql_writer, vectorized::Block& block,
PTabletKeyLookupResponse* response) {
block.clear_names();
RETURN_IF_ERROR(mysql_writer.append_block(block));
RETURN_IF_ERROR(mysql_writer.write(block));
assert(mysql_writer.results().size() == 1);
uint8_t* buf = nullptr;
uint32_t len = 0;

View File

@ -82,7 +82,7 @@ public:
SCOPED_TIMER(_exec_timer);
COUNTER_UPDATE(_blocks_sent_counter, 1);
COUNTER_UPDATE(_output_rows_counter, block->rows());
return _writer->append_block(*block);
return _writer->write(*block);
}
Status sink(RuntimeState* state, vectorized::Block* block, bool eos = false) override {

View File

@ -36,8 +36,7 @@ public:
SCOPED_TIMER(_exec_timer);
COUNTER_UPDATE(_blocks_sent_counter, 1);
COUNTER_UPDATE(_output_rows_counter, block->rows());
static_cast<void>(_multi_cast_data_streamer->push(state, block, eos));
return Status::OK();
return _multi_cast_data_streamer->push(state, block, eos);
};
Status open(doris::RuntimeState* state) override { return Status::OK(); };

View File

@ -52,7 +52,7 @@ void VArrowFlightResultWriter::_init_profile() {
_bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES);
}
Status VArrowFlightResultWriter::append_block(Block& input_block) {
Status VArrowFlightResultWriter::write(Block& input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {

View File

@ -43,7 +43,7 @@ public:
Status init(RuntimeState* state) override;
Status append_block(Block& block) override;
Status write(Block& block) override;
bool can_sink() override;

View File

@ -105,7 +105,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() {
}
template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
Status VMysqlResultWriter<is_binary_format>::write(Block& input_block) {
SCOPED_TIMER(_append_row_batch_timer);
Status status = Status::OK();
if (UNLIKELY(input_block.rows() == 0)) {

View File

@ -47,7 +47,7 @@ public:
Status init(RuntimeState* state) override;
Status append_block(Block& block) override;
Status write(Block& block) override;
bool can_sink() override;

View File

@ -141,7 +141,7 @@ Status VResultSink::send(RuntimeState* state, Block* block, bool eos) {
DCHECK(_sink_type == TResultSinkType::MYSQL_PROTOCAL);
RETURN_IF_ERROR(second_phase_fetch_data(state, block));
}
RETURN_IF_ERROR(_writer->append_block(*block));
RETURN_IF_ERROR(_writer->write(*block));
if (_fetch_option.use_two_phase_fetch) {
// Block structure may be changed by calling _second_phase_fetch_data().
// So we should clear block in case of unmatched columns

View File

@ -111,7 +111,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
}
auto block = _get_block_from_queue();
auto status = write(block);
auto status = write(*block);
if (!status.ok()) [[unlikely]] {
std::unique_lock l(_m);
_writer_status = status;

View File

@ -51,7 +51,7 @@ class Block;
*
* The Sub class of AsyncResultWriter need to impl two virtual function
* * Status open() the first time IO work like: create file/ connect networking
* * Status append_block() do the real IO work for block
* * Status write() do the real IO work for block
*/
class AsyncResultWriter : public ResultWriter {
public:
@ -71,8 +71,6 @@ public:
virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0;
Status write(std::unique_ptr<Block>& block) { return append_block(*block); }
bool can_write() {
std::lock_guard l(_m);
return _data_queue_is_available() || _is_finished();
@ -80,9 +78,7 @@ public:
[[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; }
void process_block(RuntimeState* state, RuntimeProfile* profile);
// sink the block date to date queue
// sink the block date to date queue, it is async
Status sink(Block* block, bool eos);
// Add the IO thread task process block() to thread pool to dispose the IO
@ -99,6 +95,7 @@ protected:
void _return_free_block(std::unique_ptr<Block>);
private:
void process_block(RuntimeState* state, RuntimeProfile* profile);
[[nodiscard]] bool _data_queue_is_available() const { return _data_queue.size() < QUEUE_SIZE; }
[[nodiscard]] bool _is_finished() const { return !_writer_status.ok() || _eos; }

View File

@ -221,7 +221,7 @@ std::string VFileResultWriter::_file_format_to_name() {
}
}
Status VFileResultWriter::append_block(Block& block) {
Status VFileResultWriter::write(Block& block) {
if (block.rows() == 0) {
return Status::OK();
}

View File

@ -58,7 +58,7 @@ public:
VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs);
Status append_block(Block& block) override;
Status write(Block& block) override;
Status close(Status s = Status::OK()) override;

View File

@ -54,7 +54,7 @@ VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
const VExprContextSPtrs& output_expr_ctxs)
: AsyncResultWriter(output_expr_ctxs), JdbcConnector(create_connect_param(t_sink)) {}
Status VJdbcTableWriter::append_block(vectorized::Block& block) {
Status VJdbcTableWriter::write(vectorized::Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();

View File

@ -44,7 +44,7 @@ public:
return init_to_write(profile);
}
Status append_block(vectorized::Block& block) override;
Status write(vectorized::Block& block) override;
Status close(Status s) override { return JdbcConnector::close(s); }

View File

@ -109,7 +109,7 @@ Status VMysqlTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
return Status::OK();
}
Status VMysqlTableWriter::append_block(vectorized::Block& block) {
Status VMysqlTableWriter::write(vectorized::Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();

View File

@ -51,7 +51,7 @@ public:
// connect to mysql server
Status open(RuntimeState* state, RuntimeProfile* profile) override;
Status append_block(vectorized::Block& block) override;
Status write(vectorized::Block& block) override;
Status close(Status) override;

View File

@ -45,7 +45,7 @@ VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink& t_sink,
const VExprContextSPtrs& output_expr_ctxs)
: AsyncResultWriter(output_expr_ctxs), ODBCConnector(create_connect_param(t_sink)) {}
Status VOdbcTableWriter::append_block(vectorized::Block& block) {
Status VOdbcTableWriter::write(vectorized::Block& block) {
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
auto num_rows = output_block.rows();

View File

@ -44,7 +44,7 @@ public:
return init_to_write(profile);
}
Status append_block(vectorized::Block& block) override;
Status write(vectorized::Block& block) override;
Status close(Status s) override { return ODBCConnector::close(s); }

View File

@ -1337,9 +1337,9 @@ Status VTabletWriter::_send_new_partition_batch() {
// these order is only.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz append_block doesn't real adjust it. it generate a new block from that.
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
RETURN_IF_ERROR(this->append_block(tmp_block));
RETURN_IF_ERROR(this->write(tmp_block));
_row_distribution._batching_block->set_mutable_columns(
tmp_block.mutate_columns()); // Recovery back
_row_distribution._batching_block->clear_column_data();
@ -1606,7 +1606,7 @@ void VTabletWriter::_generate_index_channels_payloads(
}
}
Status VTabletWriter::append_block(doris::vectorized::Block& input_block) {
Status VTabletWriter::write(doris::vectorized::Block& input_block) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();

View File

@ -518,7 +518,7 @@ public:
Status init_properties(ObjectPool* pool);
Status append_block(Block& block) override;
Status write(Block& block) override;
Status close(Status) override;

View File

@ -365,7 +365,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id,
return Status::OK();
}
Status VTabletWriterV2::append_block(Block& input_block) {
Status VTabletWriterV2::write(Block& input_block) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
@ -470,9 +470,9 @@ Status VTabletWriterV2::_send_new_partition_batch() {
// these order is only.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz append_block doesn't real adjust it. it generate a new block from that.
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
RETURN_IF_ERROR(this->append_block(tmp_block));
RETURN_IF_ERROR(this->write(tmp_block));
_row_distribution._batching_block->set_mutable_columns(
tmp_block.mutate_columns()); // Recovery back
_row_distribution._batching_block->clear_column_data();

View File

@ -108,7 +108,7 @@ public:
Status init_properties(ObjectPool* pool);
Status append_block(Block& block) override;
Status write(Block& block) override;
Status open(RuntimeState* state, RuntimeProfile* profile) override;

View File

@ -319,7 +319,7 @@ void serialize_and_deserialize_mysql_test() {
// mysql_writer init
vectorized::VMysqlResultWriter<false> mysql_writer(nullptr, _output_vexpr_ctxs, nullptr);
Status st = mysql_writer.append_block(block);
Status st = mysql_writer.write(block);
EXPECT_TRUE(st.ok());
}