[fix](load) fix load rpc timeout (#25701)

This commit is contained in:
zclllyybb
2023-10-24 19:50:44 +08:00
committed by GitHub
parent 10f1957379
commit d4fca3c67e
3 changed files with 48 additions and 24 deletions

View File

@ -87,7 +87,7 @@ public:
// Construct from thrift struct which is generated by FE.
VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& texprs, bool group_commit);
// the real writer will construct in (actually, father's) init but not constructor
Status init(const TDataSink& sink) override;
Status close(RuntimeState* state, Status exec_status) override;

View File

@ -17,6 +17,7 @@
#include "vtablet_writer.h"
#include <bits/ranges_algo.h>
#include <brpc/http_method.h>
#include <bthread/bthread.h>
#include <fmt/format.h>
@ -579,11 +580,11 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload,
int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
std::unique_ptr<ThreadPoolToken>& thread_pool_token) {
auto st = none_of({_cancelled, _send_finished});
if (!st.ok()) {
if (_cancelled || _send_finished) { // not run
return 0;
}
// set closure for sending block.
if (!_add_block_closure->try_set_in_flight()) {
// There is packet in flight, skip.
return _send_finished ? 0 : 1;
@ -591,16 +592,15 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
// We are sure that try_send_batch is not running
if (_pending_batches_num > 0) {
auto s = thread_pool_token->submit_func(
std::bind(&VNodeChannel::try_send_pending_block, this, state));
auto s = thread_pool_token->submit_func([this, state] { try_send_pending_block(state); });
if (!s.ok()) {
_cancel_with_msg("submit send_batch task to send_batch_thread_pool failed");
// clear in flight
// sending finished. clear in flight
_add_block_closure->clear_in_flight();
}
// in_flight is cleared in closure::Run
} else {
// clear in flight
// sending finished. clear in flight
_add_block_closure->clear_in_flight();
}
return _send_finished ? 0 : 1;
@ -610,7 +610,7 @@ void VNodeChannel::_cancel_with_msg(const std::string& msg) {
LOG(WARNING) << "cancel node channel " << channel_info() << ", error message: " << msg;
{
std::lock_guard<doris::SpinLock> l(_cancel_msg_lock);
if (_cancel_msg == "") {
if (_cancel_msg.empty()) {
_cancel_msg = msg;
}
}
@ -718,7 +718,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) {
}
}
// eos request must be the last request
// eos request must be the last request. it's a signal makeing callback function to set _add_batch_finished true.
_add_block_closure->end_mark();
_send_finished = true;
CHECK(_pending_batches_num == 0) << _pending_batches_num;
@ -923,7 +923,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
}
// waiting for finished, it may take a long time, so we couldn't set a timeout
// In pipeline, is_close_done() is false at this time, will not bock.
// In pipeline, is_close_done() is false at this time, will not block.
while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
bthread_usleep(1000);
}
@ -959,7 +959,7 @@ void VNodeChannel::mark_close() {
_cur_add_block_request.set_eos(true);
{
std::lock_guard<std::mutex> l(_pending_batches_lock);
if (!_cur_mutable_block) {
if (!_cur_mutable_block) [[unlikely]] {
// add a dummy block
_cur_mutable_block = vectorized::MutableBlock::create_unique();
}
@ -996,28 +996,47 @@ void VTabletWriter::_send_batch_process() {
SCOPED_ATTACH_TASK(_state);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
bool had_effect = false;
while (true) {
// incremental open will temporarily make channels into abnormal state. stop checking when this.
std::unique_lock<std::mutex> l(_stop_check_channel);
int running_channels_num = 0;
int opened_nodes = 0;
for (const auto& index_channel : _channels) {
index_channel->for_each_node_channel([&running_channels_num,
this](const std::shared_ptr<VNodeChannel>& ch) {
// if this channel all completed(cancelled), got 0. else 1.
running_channels_num +=
ch->try_send_and_fetch_status(_state, this->_send_batch_thread_pool_token);
});
opened_nodes += index_channel->num_node_channels();
}
// if there is no channel, maybe auto partition table. so check does there have had running channels ever.
if (running_channels_num == 0 && had_effect) {
LOG(INFO) << "all node channels are stopped(maybe finished/offending/cancelled), "
// auto partition table may have no node channel temporarily. wait to open.
if (opened_nodes != 0 && running_channels_num == 0) {
LOG(INFO) << "All node channels are stopped(maybe finished/offending/cancelled), "
"sender thread exit. "
<< print_id(_load_id);
return;
} else if (running_channels_num != 0) {
had_effect = true;
}
// for auto partition tables, there's a situation: we haven't open any node channel but decide to cancel the task.
// then the judge in front will never be true because opened_nodes won't increase. so we have to specially check wether we called close.
// we must RECHECK opened_nodes below, after got closed signal, because it may changed. Think of this:
// checked opened_nodes = 0 ---> new block arrived ---> task finished, close() was called ---> we got _try_close here
// if we don't check again, we may lose the last package.
if (_try_close) {
opened_nodes = 0;
std::ranges::for_each(_channels,
[&opened_nodes](const std::shared_ptr<IndexChannel>& ich) {
opened_nodes += ich->num_node_channels();
});
if (opened_nodes == 0) {
LOG(INFO) << "No node channel have ever opened but now we have to close. sender "
"thread exit. "
<< print_id(_load_id);
return;
}
}
bthread_usleep(config::olap_table_sink_send_interval_ms * 1000);
}
@ -1065,7 +1084,7 @@ Status VTabletWriter::open(doris::RuntimeState* state, doris::RuntimeProfile* pr
_send_batch_thread_pool_token = state->exec_env()->send_batch_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
// start to send batch continually
// start to send batch continually. this must be called after _init
if (bthread_start_background(&_sender_thread, nullptr, periodic_send_batch, (void*)this) != 0) {
return Status::Error<INTERNAL_ERROR>("bthread_start_backgroud failed");
}
@ -1219,7 +1238,7 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer));
}
_prepare = true;
_inited = true;
return Status::OK();
}
@ -1442,6 +1461,7 @@ void VTabletWriter::_cancel_all_channel(Status status) {
Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(_close_timer);
Status status = exec_status;
_try_close = true;
if (status.ok()) {
// only if status is ok can we call this _profile->total_time_counter().
// if status is not ok, this sink may not be prepared, so that _profile is null
@ -1470,7 +1490,7 @@ Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) {
if (!status.ok()) {
_cancel_all_channel(status);
_close_status = status;
_try_close = true;
_close_wait = true;
}
return Status::OK();
@ -1478,7 +1498,7 @@ Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) {
bool VTabletWriter::is_close_done() {
// Only after try_close, need to wait rpc end.
if (!_try_close) {
if (!_close_wait) {
return true;
}
bool close_done = true;
@ -1492,7 +1512,7 @@ bool VTabletWriter::is_close_done() {
}
Status VTabletWriter::close(Status exec_status) {
if (!_prepare) {
if (!_inited) {
DCHECK(!exec_status.ok());
_cancel_all_channel(exec_status);
_close_status = exec_status;
@ -1502,6 +1522,7 @@ Status VTabletWriter::close(Status exec_status) {
SCOPED_TIMER(_close_timer);
SCOPED_TIMER(_profile->total_time_counter());
// will make the last batch of request. close_wait will wait this finished.
static_cast<void>(try_close(_state, exec_status));
// If _close_status is not ok, all nodes have been canceled in try_close.

View File

@ -256,7 +256,7 @@ public:
Status add_block(vectorized::Block* block, const Payload* payload, bool is_append = false);
// @return: unfinished running channels.
// @return: 1 if running, 0 if finished.
// @caller: VOlapTabletSink::_send_batch_process. it's a continual asynchronous process.
int try_send_and_fetch_status(RuntimeState* state,
std::unique_ptr<ThreadPoolToken>& thread_pool_token);
@ -671,8 +671,11 @@ private:
int32_t _send_batch_parallelism = 1;
// Save the status of try_close() and close() method
Status _close_status;
// if we called try_close(), for auto partition the periodic send thread should stop if it's still waiting for node channels first-time open.
bool _try_close = false;
bool _prepare = false;
// for non-pipeline, if close() did something, close_wait() should wait it.
bool _close_wait = false;
bool _inited = false;
// User can change this config at runtime, avoid it being modified during query or loading process.
bool _transfer_large_data_by_brpc = false;