diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index f0c9874578..7aa6c5b321 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -87,7 +87,7 @@ public: // Construct from thrift struct which is generated by FE. VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector& texprs, bool group_commit); - + // the real writer will construct in (actually, father's) init but not constructor Status init(const TDataSink& sink) override; Status close(RuntimeState* state, Status exec_status) override; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 7035653ee7..cf9e660f9d 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -17,6 +17,7 @@ #include "vtablet_writer.h" +#include #include #include #include @@ -579,11 +580,11 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, std::unique_ptr& thread_pool_token) { - auto st = none_of({_cancelled, _send_finished}); - if (!st.ok()) { + if (_cancelled || _send_finished) { // not run return 0; } + // set closure for sending block. if (!_add_block_closure->try_set_in_flight()) { // There is packet in flight, skip. return _send_finished ? 0 : 1; @@ -591,16 +592,15 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, // We are sure that try_send_batch is not running if (_pending_batches_num > 0) { - auto s = thread_pool_token->submit_func( - std::bind(&VNodeChannel::try_send_pending_block, this, state)); + auto s = thread_pool_token->submit_func([this, state] { try_send_pending_block(state); }); if (!s.ok()) { _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed"); - // clear in flight + // sending finished. clear in flight _add_block_closure->clear_in_flight(); } // in_flight is cleared in closure::Run } else { - // clear in flight + // sending finished. clear in flight _add_block_closure->clear_in_flight(); } return _send_finished ? 0 : 1; @@ -610,7 +610,7 @@ void VNodeChannel::_cancel_with_msg(const std::string& msg) { LOG(WARNING) << "cancel node channel " << channel_info() << ", error message: " << msg; { std::lock_guard l(_cancel_msg_lock); - if (_cancel_msg == "") { + if (_cancel_msg.empty()) { _cancel_msg = msg; } } @@ -718,7 +718,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { } } - // eos request must be the last request + // eos request must be the last request. it's a signal makeing callback function to set _add_batch_finished true. _add_block_closure->end_mark(); _send_finished = true; CHECK(_pending_batches_num == 0) << _pending_batches_num; @@ -923,7 +923,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) { } // waiting for finished, it may take a long time, so we couldn't set a timeout - // In pipeline, is_close_done() is false at this time, will not bock. + // In pipeline, is_close_done() is false at this time, will not block. while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) { bthread_usleep(1000); } @@ -959,7 +959,7 @@ void VNodeChannel::mark_close() { _cur_add_block_request.set_eos(true); { std::lock_guard l(_pending_batches_lock); - if (!_cur_mutable_block) { + if (!_cur_mutable_block) [[unlikely]] { // add a dummy block _cur_mutable_block = vectorized::MutableBlock::create_unique(); } @@ -996,28 +996,47 @@ void VTabletWriter::_send_batch_process() { SCOPED_ATTACH_TASK(_state); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); - bool had_effect = false; while (true) { // incremental open will temporarily make channels into abnormal state. stop checking when this. std::unique_lock l(_stop_check_channel); int running_channels_num = 0; + int opened_nodes = 0; for (const auto& index_channel : _channels) { index_channel->for_each_node_channel([&running_channels_num, this](const std::shared_ptr& ch) { + // if this channel all completed(cancelled), got 0. else 1. running_channels_num += ch->try_send_and_fetch_status(_state, this->_send_batch_thread_pool_token); }); + opened_nodes += index_channel->num_node_channels(); } - // if there is no channel, maybe auto partition table. so check does there have had running channels ever. - if (running_channels_num == 0 && had_effect) { - LOG(INFO) << "all node channels are stopped(maybe finished/offending/cancelled), " + // auto partition table may have no node channel temporarily. wait to open. + if (opened_nodes != 0 && running_channels_num == 0) { + LOG(INFO) << "All node channels are stopped(maybe finished/offending/cancelled), " "sender thread exit. " << print_id(_load_id); return; - } else if (running_channels_num != 0) { - had_effect = true; + } + + // for auto partition tables, there's a situation: we haven't open any node channel but decide to cancel the task. + // then the judge in front will never be true because opened_nodes won't increase. so we have to specially check wether we called close. + // we must RECHECK opened_nodes below, after got closed signal, because it may changed. Think of this: + // checked opened_nodes = 0 ---> new block arrived ---> task finished, close() was called ---> we got _try_close here + // if we don't check again, we may lose the last package. + if (_try_close) { + opened_nodes = 0; + std::ranges::for_each(_channels, + [&opened_nodes](const std::shared_ptr& ich) { + opened_nodes += ich->num_node_channels(); + }); + if (opened_nodes == 0) { + LOG(INFO) << "No node channel have ever opened but now we have to close. sender " + "thread exit. " + << print_id(_load_id); + return; + } } bthread_usleep(config::olap_table_sink_send_interval_ms * 1000); } @@ -1065,7 +1084,7 @@ Status VTabletWriter::open(doris::RuntimeState* state, doris::RuntimeProfile* pr _send_batch_thread_pool_token = state->exec_env()->send_batch_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism); - // start to send batch continually + // start to send batch continually. this must be called after _init if (bthread_start_background(&_sender_thread, nullptr, periodic_send_batch, (void*)this) != 0) { return Status::Error("bthread_start_backgroud failed"); } @@ -1219,7 +1238,7 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer)); } - _prepare = true; + _inited = true; return Status::OK(); } @@ -1442,6 +1461,7 @@ void VTabletWriter::_cancel_all_channel(Status status) { Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(_close_timer); Status status = exec_status; + _try_close = true; if (status.ok()) { // only if status is ok can we call this _profile->total_time_counter(). // if status is not ok, this sink may not be prepared, so that _profile is null @@ -1470,7 +1490,7 @@ Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) { if (!status.ok()) { _cancel_all_channel(status); _close_status = status; - _try_close = true; + _close_wait = true; } return Status::OK(); @@ -1478,7 +1498,7 @@ Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) { bool VTabletWriter::is_close_done() { // Only after try_close, need to wait rpc end. - if (!_try_close) { + if (!_close_wait) { return true; } bool close_done = true; @@ -1492,7 +1512,7 @@ bool VTabletWriter::is_close_done() { } Status VTabletWriter::close(Status exec_status) { - if (!_prepare) { + if (!_inited) { DCHECK(!exec_status.ok()); _cancel_all_channel(exec_status); _close_status = exec_status; @@ -1502,6 +1522,7 @@ Status VTabletWriter::close(Status exec_status) { SCOPED_TIMER(_close_timer); SCOPED_TIMER(_profile->total_time_counter()); + // will make the last batch of request. close_wait will wait this finished. static_cast(try_close(_state, exec_status)); // If _close_status is not ok, all nodes have been canceled in try_close. diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 1e7937cc80..4e95b444d7 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -256,7 +256,7 @@ public: Status add_block(vectorized::Block* block, const Payload* payload, bool is_append = false); - // @return: unfinished running channels. + // @return: 1 if running, 0 if finished. // @caller: VOlapTabletSink::_send_batch_process. it's a continual asynchronous process. int try_send_and_fetch_status(RuntimeState* state, std::unique_ptr& thread_pool_token); @@ -671,8 +671,11 @@ private: int32_t _send_batch_parallelism = 1; // Save the status of try_close() and close() method Status _close_status; + // if we called try_close(), for auto partition the periodic send thread should stop if it's still waiting for node channels first-time open. bool _try_close = false; - bool _prepare = false; + // for non-pipeline, if close() did something, close_wait() should wait it. + bool _close_wait = false; + bool _inited = false; // User can change this config at runtime, avoid it being modified during query or loading process. bool _transfer_large_data_by_brpc = false;