[chery-pick](branch-2.1) Pick "[Fix](group commit) Fix group commit block queue mem estimate fault" (#37379)

Pick [Fix](group commit) Fix group commit block queue mem estimate faule
#35314

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->

**Problem:** When `group commit=async_mode` and NULL data is imported
into a `variant` type column, it causes incorrect memory statistics for
group commit backpressure, leading to a stuck issue. **Cause:** In group
commit mode, blocks are first added to a queue in batches using `add
block`, and then blocks are retrieved from the queue using `get block`.
To track memory usage during backpressure, we add the block size to the
memory statistics during `add block` and subtract the block size from
the memory statistics during `get block`. However, for `variant` types,
during the `add block` write to WAL, serialization occurs, which can
merge types (e.g., merging `int` and `bigint` into `bigint`), thereby
changing the block size. This results in a discrepancy between the block
size during `get block` and `add block`, causing memory statistics to
overflow.
**Solution:** Record the block size at the time of `add block` and use
this recorded size during `get block` instead of the actual block size.
This ensures consistency in the memory addition and subtraction.

## Further comments

If this is a relatively large or complex change, kick off the discussion
at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why
you chose the solution you did and what alternatives you considered,
etc...

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->
This commit is contained in:
abmdocrt
2024-07-07 18:27:49 +08:00
committed by GitHub
parent 32529ecda2
commit 7d423b3a6a
5 changed files with 131 additions and 8 deletions

View File

@ -62,9 +62,27 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
RETURN_IF_ERROR(status);
if (block->rows() > 0) {
if (!config::group_commit_wait_replay_wal_finish) {
_block_queue.push_back(block);
_block_queue.emplace_back(block);
_data_bytes += block->bytes();
int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
std::stringstream ss;
ss << "[";
for (const auto& id : _load_ids) {
ss << id.to_string() << ", ";
}
ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). "
<< "block queue size is " << _block_queue.size() << ", block rows is "
<< block->rows() << ", block bytes is " << block->bytes()
<< ", before add block, all block queues bytes is "
<< before_block_queues_bytes
<< ", after add block, all block queues bytes is "
<< _all_block_queues_bytes->load() << ", txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", load_ids=" << ss.str() << ", runtime_state=" << runtime_state
<< ", the block is " << block->dump_data() << ", the block column size is "
<< block->columns_bytes();
}
if (write_wal || config::group_commit_wait_replay_wal_finish) {
auto st = _v_wal_writer->write_wal(block.get());
@ -132,11 +150,29 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
return st;
}
if (!_block_queue.empty()) {
auto fblock = _block_queue.front();
block->swap(*fblock.get());
const BlockData block_data = _block_queue.front();
block->swap(*block_data.block);
*find_block = true;
_block_queue.pop_front();
_all_block_queues_bytes->fetch_sub(block->bytes(), std::memory_order_relaxed);
int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed);
std::stringstream ss;
ss << "[";
for (const auto& id : _load_ids) {
ss << id.to_string() << ", ";
}
ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
<< "block queue size is " << _block_queue.size() << ", block rows is "
<< block->rows() << ", block bytes is " << block->bytes()
<< ", before remove block, all block queues bytes is "
<< before_block_queues_bytes
<< ", after remove block, all block queues bytes is "
<< _all_block_queues_bytes->load() << ", txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", load_ids=" << ss.str() << ", runtime_state=" << runtime_state
<< ", the block is " << block->dump_data() << ", the block column size is "
<< block->columns_bytes();
}
if (_block_queue.empty() && _need_commit && _load_ids.empty()) {
*eos = true;
@ -176,10 +212,26 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) {
<< ", status=" << st.to_string();
status = st;
while (!_block_queue.empty()) {
{
auto& future_block = _block_queue.front();
_all_block_queues_bytes->fetch_sub(future_block->bytes(), std::memory_order_relaxed);
const BlockData& block_data = _block_queue.front().block;
int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed);
std::stringstream ss;
ss << "[";
for (const auto& id : _load_ids) {
ss << id.to_string() << ", ";
}
ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::_cancel_without_block). "
<< "block queue size is " << _block_queue.size() << ", block rows is "
<< block_data.block->rows() << ", block bytes is " << block_data.block->bytes()
<< ", before remove block, all block queues bytes is "
<< before_block_queues_bytes
<< ", after remove block, all block queues bytes is "
<< _all_block_queues_bytes->load() << ", txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", load_ids=" << ss.str() << ", the block is "
<< block_data.block->dump_data() << ", the block column size is "
<< block_data.block->columns_bytes();
_block_queue.pop_front();
}
}