[fix](schema change) fix memory exceeded when schema change (#11748)
In row mode schema change, it will fail sometime because memory exceeded. When the left memory is enough for sorting but not enough for next block, it will not flush row_block_arr which data in memory and continue to alloc next block so it can't alloc the memory and return directly. And if it can't alloc the memory for block, it need to flush row_block_arr and try it again unless row_block_arr is empty.
This commit is contained in:
@ -963,14 +963,8 @@ Status RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bool n
|
||||
size_t row_block_size = _row_len * num_rows;
|
||||
|
||||
if (_memory_limitation > 0 && _tracker->consumption() + row_block_size > _memory_limitation) {
|
||||
LOG(WARNING)
|
||||
<< "RowBlockAllocator::alocate() memory exceeded. "
|
||||
<< "m_memory_allocated=" << _tracker->consumption() << " "
|
||||
<< "mem limit for schema change=" << _memory_limitation << " "
|
||||
<< "You can increase the memory "
|
||||
<< "by changing the Config.memory_limitation_per_thread_for_schema_change_bytes";
|
||||
*row_block = nullptr;
|
||||
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
|
||||
return Status::OLAPInternalError(OLAP_ERR_FETCH_MEMORY_EXCEEDED);
|
||||
}
|
||||
|
||||
// TODO(lijiao) : Why abandon the original m_row_block_buffer
|
||||
@ -1356,11 +1350,15 @@ Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_read
|
||||
RowBlock* ref_row_block = nullptr;
|
||||
rowset_reader->next_block(&ref_row_block);
|
||||
while (ref_row_block != nullptr && ref_row_block->has_remaining()) {
|
||||
if (!_row_block_allocator->allocate(&new_row_block, ref_row_block->row_block_info().row_num,
|
||||
true)) {
|
||||
LOG(WARNING) << "failed to allocate RowBlock.";
|
||||
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
|
||||
} else {
|
||||
auto st = _row_block_allocator->allocate(&new_row_block,
|
||||
ref_row_block->row_block_info().row_num, true);
|
||||
// if OLAP_ERR_FETCH_MEMORY_EXCEEDED == st.precise_code()
|
||||
// that mean RowBlockAllocator::alocate() memory exceeded.
|
||||
// But we can flush row_block_arr if row_block_arr is not empty.
|
||||
// Don't return directly.
|
||||
if (OLAP_ERR_MALLOC_ERROR == st.precise_code()) {
|
||||
return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
|
||||
} else if (st) {
|
||||
// do memory check for sorting, in case schema change task fail at row block sorting because of
|
||||
// not doing internal sorting first
|
||||
if (!_row_block_allocator->is_memory_enough_for_sorting(
|
||||
@ -1375,8 +1373,11 @@ Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_read
|
||||
if (new_row_block == nullptr) {
|
||||
if (row_block_arr.empty()) {
|
||||
LOG(WARNING) << "Memory limitation is too small for Schema Change."
|
||||
<< "memory_limitation=" << _memory_limitation;
|
||||
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
|
||||
<< "memory_limitation=" << _memory_limitation
|
||||
<< "You can increase the memory "
|
||||
<< "by changing the "
|
||||
"Config.memory_limitation_per_thread_for_schema_change_bytes";
|
||||
return Status::OLAPInternalError(OLAP_ERR_FETCH_MEMORY_EXCEEDED);
|
||||
}
|
||||
|
||||
// enter here while memory limitation is reached.
|
||||
|
||||
Reference in New Issue
Block a user