[Fix](core) Fix wal space back pressure core and add regression test (#27311)
This commit is contained in:
@ -18,6 +18,9 @@
|
||||
#include "runtime/group_commit_mgr.h"
|
||||
|
||||
#include <gen_cpp/Types_types.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#include "client_cache.h"
|
||||
#include "common/config.h"
|
||||
@ -33,14 +36,15 @@ Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block)
|
||||
DCHECK(block->get_schema_version() == schema_version);
|
||||
std::unique_lock l(mutex);
|
||||
RETURN_IF_ERROR(_status);
|
||||
while (*_all_block_queues_bytes > config::group_commit_max_queue_size) {
|
||||
while (_all_block_queues_bytes->load(std::memory_order_relaxed) >
|
||||
config::group_commit_max_queue_size) {
|
||||
_put_cond.wait_for(
|
||||
l, std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME));
|
||||
}
|
||||
if (block->rows() > 0) {
|
||||
_block_queue.push_back(block);
|
||||
*_all_block_queues_bytes += block->bytes();
|
||||
*_single_block_queue_bytes += block->bytes();
|
||||
_all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
|
||||
_single_block_queue_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
|
||||
}
|
||||
_get_cond.notify_all();
|
||||
return Status::OK();
|
||||
@ -81,11 +85,11 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
|
||||
fblock->swap_future_block(future_block);
|
||||
*find_block = true;
|
||||
_block_queue.pop_front();
|
||||
*_all_block_queues_bytes -= fblock->bytes();
|
||||
*_single_block_queue_bytes -= block->bytes();
|
||||
_all_block_queues_bytes->fetch_sub(fblock->bytes(), std::memory_order_relaxed);
|
||||
_single_block_queue_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed);
|
||||
}
|
||||
if (_block_queue.empty() && need_commit && _load_ids.empty()) {
|
||||
CHECK(*_single_block_queue_bytes == 0);
|
||||
CHECK_EQ(_single_block_queue_bytes->load(), 0);
|
||||
*eos = true;
|
||||
} else {
|
||||
*eos = false;
|
||||
@ -121,8 +125,8 @@ void LoadBlockQueue::cancel(const Status& st) {
|
||||
auto& future_block = _block_queue.front();
|
||||
std::unique_lock<std::mutex> l0(*(future_block->lock));
|
||||
future_block->set_result(st, future_block->rows(), 0);
|
||||
*_all_block_queues_bytes -= future_block->bytes();
|
||||
*_single_block_queue_bytes -= future_block->bytes();
|
||||
_all_block_queues_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed);
|
||||
_single_block_queue_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed);
|
||||
future_block->cv->notify_all();
|
||||
}
|
||||
_block_queue.pop_front();
|
||||
|
||||
Reference in New Issue
Block a user