[Enhancement](wal) Add timout for wal memory back pressure (#29178)
This commit is contained in:
@ -21,6 +21,7 @@
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
@ -40,14 +41,29 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block, bool write_wal) {
|
||||
Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
|
||||
std::shared_ptr<vectorized::Block> block, bool write_wal) {
|
||||
std::unique_lock l(mutex);
|
||||
RETURN_IF_ERROR(status);
|
||||
while (_all_block_queues_bytes->load(std::memory_order_relaxed) >
|
||||
config::group_commit_max_queue_size) {
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
while (!runtime_state->is_cancelled() && status.ok() &&
|
||||
_all_block_queues_bytes->load(std::memory_order_relaxed) >
|
||||
config::group_commit_queue_mem_limit) {
|
||||
_put_cond.wait_for(
|
||||
l, std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME));
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::steady_clock::now() - start);
|
||||
if (duration.count() > LoadBlockQueue::WAL_MEM_BACK_PRESSURE_TIME_OUT) {
|
||||
return Status::TimedOut(
|
||||
"Wal memory back pressure wait too much time! Load block queue txn id: {}, "
|
||||
"label: {}, instance id: {}",
|
||||
txn_id, label, load_instance_id.to_string());
|
||||
}
|
||||
}
|
||||
if (runtime_state->is_cancelled()) {
|
||||
return Status::Cancelled(runtime_state->cancel_reason());
|
||||
}
|
||||
RETURN_IF_ERROR(status);
|
||||
if (block->rows() > 0) {
|
||||
_block_queue.push_back(block);
|
||||
if (write_wal) {
|
||||
|
||||
Reference in New Issue
Block a user