[enhancement](memory) return error if allocate memory failed during add rows method (#35085)

* return error when add rows failed

* f

---------

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2024-05-21 10:53:40 +08:00
committed by yiguolei
parent b11f2ad9f0
commit f38ecd349c
18 changed files with 104 additions and 80 deletions

View File

@ -158,7 +158,7 @@ Status SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, bool
int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
mblock.add_rows(_active_query_block.get(), _row_idx, current_batch_rows);
RETURN_IF_ERROR(mblock.add_rows(_active_query_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;
*eos = _row_idx == _total_rows;

View File

@ -84,7 +84,7 @@ Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* block,
int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
mblock.add_rows(_task_stats_block.get(), _row_idx, current_batch_rows);
RETURN_IF_ERROR(mblock.add_rows(_task_stats_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;
*eos = _row_idx == _total_rows;

View File

@ -162,7 +162,7 @@ Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* eos
int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
mblock.add_rows(_routines_block.get(), _row_idx, current_batch_rows);
RETURN_IF_ERROR(mblock.add_rows(_routines_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;
*eos = _row_idx == _total_rows;

View File

@ -135,7 +135,7 @@ Status SchemaWorkloadGroupsScanner::get_next_block(vectorized::Block* block, boo
int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
mblock.add_rows(_workload_groups_block.get(), _row_idx, current_batch_rows);
RETURN_IF_ERROR(mblock.add_rows(_workload_groups_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;
*eos = _row_idx == _total_rows;

View File

@ -127,7 +127,7 @@ Status SchemaWorkloadSchedulePolicyScanner::get_next_block(vectorized::Block* bl
int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
mblock.add_rows(_block.get(), _row_idx, current_batch_rows);
RETURN_IF_ERROR(mblock.add_rows(_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;
*eos = _row_idx == _total_rows;

View File

@ -178,7 +178,8 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r
*_pblock, -1);
}
void MemTable::insert(const vectorized::Block* input_block, const std::vector<uint32_t>& row_idxs) {
Status MemTable::insert(const vectorized::Block* input_block,
const std::vector<uint32_t>& row_idxs) {
vectorized::Block target_block = *input_block;
target_block = input_block->copy_block(_column_offset);
if (_is_first_insertion) {
@ -209,7 +210,8 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<ui
auto num_rows = row_idxs.size();
size_t cursor_in_mutableblock = _input_mutable_block.rows();
auto block_size0 = _input_mutable_block.allocated_bytes();
_input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows);
RETURN_IF_ERROR(_input_mutable_block.add_rows(&target_block, row_idxs.data(),
row_idxs.data() + num_rows));
auto block_size1 = _input_mutable_block.allocated_bytes();
g_memtable_input_block_allocated_size << block_size1 - block_size0;
auto input_size = size_t(target_block.bytes() * num_rows / target_block.rows() *
@ -221,6 +223,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<ui
}
_stat.raw_rows += num_rows;
return Status::OK();
}
void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block,
@ -245,7 +248,7 @@ void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_blo
src_row->_row_pos, _arena.get());
}
}
void MemTable::_put_into_output(vectorized::Block& in_block) {
Status MemTable::_put_into_output(vectorized::Block& in_block) {
SCOPED_RAW_TIMER(&_stat.put_into_output_ns);
std::vector<uint32_t> row_pos_vec;
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
@ -253,8 +256,8 @@ void MemTable::_put_into_output(vectorized::Block& in_block) {
for (int i = 0; i < _row_in_blocks.size(); i++) {
row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos);
}
_output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
row_pos_vec.data() + in_block.rows());
return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
row_pos_vec.data() + in_block.rows());
}
size_t MemTable::_sort() {
@ -298,7 +301,7 @@ size_t MemTable::_sort() {
return same_keys_num;
}
void MemTable::_sort_by_cluster_keys() {
Status MemTable::_sort_by_cluster_keys() {
SCOPED_RAW_TIMER(&_stat.sort_ns);
_stat.sort_times++;
// sort all rows
@ -344,8 +347,8 @@ void MemTable::_sort_by_cluster_keys() {
for (int i = 0; i < row_in_blocks.size(); i++) {
row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos);
}
_output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
row_pos_vec.data() + in_block.rows());
return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
row_pos_vec.data() + in_block.rows());
}
void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
@ -502,27 +505,28 @@ bool MemTable::need_agg() const {
return false;
}
std::unique_ptr<vectorized::Block> MemTable::to_block() {
Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) {
_output_mutable_block.swap(_input_mutable_block);
} else {
vectorized::Block in_block = _input_mutable_block.to_block();
_put_into_output(in_block);
RETURN_IF_ERROR(_put_into_output(in_block));
}
} else {
_aggregate<true>();
}
if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow &&
!_tablet_schema->cluster_key_idxes().empty()) {
_sort_by_cluster_keys();
RETURN_IF_ERROR(_sort_by_cluster_keys());
}
g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes();
_input_mutable_block.clear();
_insert_mem_tracker->release(_mem_usage);
_mem_usage = 0;
return vectorized::Block::create_unique(_output_mutable_block.to_block());
*res = vectorized::Block::create_unique(_output_mutable_block.to_block());
return Status::OK();
}
} // namespace doris

View File

@ -181,7 +181,7 @@ public:
_flush_mem_tracker->consumption();
}
// insert tuple from (row_pos) to (row_pos+num_rows)
void insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);
Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);
void shrink_memtable_by_agg();
@ -189,7 +189,7 @@ public:
bool need_agg() const;
std::unique_ptr<vectorized::Block> to_block();
Status to_block(std::unique_ptr<vectorized::Block>* res);
bool empty() const { return _input_mutable_block.rows() == 0; }
@ -244,7 +244,7 @@ private:
//return number of same keys
size_t _sort();
void _sort_by_cluster_keys();
Status _sort_by_cluster_keys();
void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
std::function<int(const RowInBlock*, const RowInBlock*)> cmp);
template <bool is_final>
@ -252,7 +252,7 @@ private:
int row_pos);
template <bool is_final>
void _aggregate();
void _put_into_output(vectorized::Block& in_block);
Status _put_into_output(vectorized::Block& in_block);
bool _is_first_insertion;
void _init_agg_functions(const vectorized::Block* block);

View File

@ -141,7 +141,8 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in
signal::set_signal_task_id(_rowset_writer->load_id());
{
SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker());
std::unique_ptr<vectorized::Block> block = memtable->to_block();
std::unique_ptr<vectorized::Block> block;
RETURN_IF_ERROR(memtable->to_block(&block));
RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size));
}
_memtable_stat += memtable->stat();

View File

@ -109,7 +109,7 @@ Status MemTableWriter::write(const vectorized::Block* block,
}
_total_received_rows += row_idxs.size();
_mem_table->insert(block, row_idxs);
RETURN_IF_ERROR(_mem_table->insert(block, row_idxs));
if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
_mem_table->shrink_memtable_by_agg();

View File

@ -602,8 +602,8 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
partitioned_blocks[i] =
vectorized::MutableBlock::create_unique(input_block->clone_empty());
}
partitioned_blocks[i]->add_rows(input_block, &(partition_indexes[i][0]),
&(partition_indexes[i][count]));
RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(input_block, &(partition_indexes[i][0]),
&(partition_indexes[i][count])));
if (partitioned_blocks[i]->rows() > 2 * 1024 * 1024 ||
(eos && partitioned_blocks[i]->rows() > 0)) {

View File

@ -203,7 +203,14 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
{
SCOPED_TIMER(_partition_shuffle_timer);
partition_block->add_rows(&build_block, begin, end);
Status st = partition_block->add_rows(&build_block, begin, end);
if (!st.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status = st;
_spill_status_ok = false;
_dependency->set_ready();
return;
}
partitions_indexes[partition_idx].clear();
}
@ -336,8 +343,8 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
partitioned_blocks[i] =
vectorized::MutableBlock::create_unique(in_block->clone_empty());
}
partitioned_blocks[i]->add_rows(in_block, &(partition_indexes[i][0]),
&(partition_indexes[i][count]));
RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(in_block, &(partition_indexes[i][0]),
&(partition_indexes[i][count])));
}
return Status::OK();

View File

@ -44,26 +44,28 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block
PartitionedBlock partitioned_block;
std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
auto get_data = [&](vectorized::Block* result_block) {
auto get_data = [&](vectorized::Block* result_block) -> Status {
do {
const auto* offset_start = &((
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
auto block_wrapper = partitioned_block.first;
local_state._shared_state->sub_mem_usage(
local_state._channel_id, block_wrapper->data_block.allocated_bytes(), false);
mutable_block->add_rows(&block_wrapper->data_block, offset_start,
offset_start + std::get<2>(partitioned_block.second));
RETURN_IF_ERROR(
mutable_block->add_rows(&block_wrapper->data_block, offset_start,
offset_start + std::get<2>(partitioned_block.second)));
block_wrapper->unref(local_state._shared_state);
} while (mutable_block->rows() < state->batch_size() &&
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
*result_block = mutable_block->to_block();
return Status::OK();
};
if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block = vectorized::MutableBlock::create_unique(
partitioned_block.first->data_block.clone_empty());
get_data(block);
RETURN_IF_ERROR(get_data(block));
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
*eos = true;
@ -72,7 +74,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block = vectorized::MutableBlock::create_unique(
partitioned_block.first->data_block.clone_empty());
get_data(block);
RETURN_IF_ERROR(get_data(block));
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
@ -244,7 +246,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block
LocalExchangeSinkLocalState& local_state) {
for (size_t i = 0; i < _num_partitions; i++) {
auto mutable_block = vectorized::MutableBlock::create_unique(in_block->clone_empty());
mutable_block->add_rows(in_block, 0, in_block->rows());
RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, in_block->rows()));
_data_queue[i].enqueue(mutable_block->to_block());
local_state._shared_state->set_ready_to_read(i);
}
@ -335,7 +337,7 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
if (size > 0) {
std::unique_ptr<vectorized::MutableBlock> mutable_block =
vectorized::MutableBlock::create_unique(block->clone_empty());
mutable_block->add_rows(block, start, size);
RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
auto new_block = mutable_block->to_block();
local_state._shared_state->add_mem_usage(i, new_block.allocated_bytes());
data_queue[i].enqueue(std::move(new_block));

View File

@ -970,44 +970,53 @@ void MutableBlock::add_row(const Block* block, int row) {
}
}
void MutableBlock::add_rows(const Block* block, const uint32_t* row_begin,
const uint32_t* row_end) {
DCHECK_LE(columns(), block->columns());
const auto& block_data = block->get_columns_with_type_and_name();
for (size_t i = 0; i < _columns.size(); ++i) {
DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
auto& dst = _columns[i];
const auto& src = *block_data[i].column.get();
DCHECK_GE(src.size(), row_end - row_begin);
dst->insert_indices_from(src, row_begin, row_end);
}
}
void MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) {
DCHECK_LE(columns(), block->columns());
const auto& block_data = block->get_columns_with_type_and_name();
for (size_t i = 0; i < _columns.size(); ++i) {
DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
auto& dst = _columns[i];
const auto& src = *block_data[i].column.get();
dst->insert_range_from(src, row_begin, length);
}
}
void MutableBlock::add_rows(const Block* block, std::vector<int64_t> rows) {
DCHECK_LE(columns(), block->columns());
const auto& block_data = block->get_columns_with_type_and_name();
const size_t length = std::ranges::distance(rows);
for (size_t i = 0; i < _columns.size(); ++i) {
DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
auto& dst = _columns[i];
const auto& src = *block_data[i].column.get();
dst->reserve(dst->size() + length);
for (size_t row : rows) {
// we can introduce a new function like `insert_assume_reserved` for IColumn.
dst->insert_from(src, row);
Status MutableBlock::add_rows(const Block* block, const uint32_t* row_begin,
const uint32_t* row_end) {
RETURN_IF_CATCH_EXCEPTION({
DCHECK_LE(columns(), block->columns());
const auto& block_data = block->get_columns_with_type_and_name();
for (size_t i = 0; i < _columns.size(); ++i) {
DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
auto& dst = _columns[i];
const auto& src = *block_data[i].column.get();
DCHECK_GE(src.size(), row_end - row_begin);
dst->insert_indices_from(src, row_begin, row_end);
}
}
});
return Status::OK();
}
Status MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) {
RETURN_IF_CATCH_EXCEPTION({
DCHECK_LE(columns(), block->columns());
const auto& block_data = block->get_columns_with_type_and_name();
for (size_t i = 0; i < _columns.size(); ++i) {
DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
auto& dst = _columns[i];
const auto& src = *block_data[i].column.get();
dst->insert_range_from(src, row_begin, length);
}
});
return Status::OK();
}
Status MutableBlock::add_rows(const Block* block, std::vector<int64_t> rows) {
RETURN_IF_CATCH_EXCEPTION({
DCHECK_LE(columns(), block->columns());
const auto& block_data = block->get_columns_with_type_and_name();
const size_t length = std::ranges::distance(rows);
for (size_t i = 0; i < _columns.size(); ++i) {
DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
auto& dst = _columns[i];
const auto& src = *block_data[i].column.get();
dst->reserve(dst->size() + length);
for (size_t row : rows) {
// we can introduce a new function like `insert_assume_reserved` for IColumn.
dst->insert_from(src, row);
}
}
});
return Status::OK();
}
void MutableBlock::erase(const String& name) {

View File

@ -603,9 +603,10 @@ public:
void swap(MutableBlock&& other) noexcept;
void add_row(const Block* block, int row);
void add_rows(const Block* block, const uint32_t* row_begin, const uint32_t* row_end);
void add_rows(const Block* block, size_t row_begin, size_t length);
void add_rows(const Block* block, std::vector<int64_t> rows);
// Batch add row should return error status if allocate memory failed.
Status add_rows(const Block* block, const uint32_t* row_begin, const uint32_t* row_end);
Status add_rows(const Block* block, size_t row_begin, size_t length);
Status add_rows(const Block* block, std::vector<int64_t> rows);
/// remove the column with the specified name
void erase(const String& name);

View File

@ -1132,7 +1132,7 @@ Status AggregationNode::_spill_hash_table(HashTableCtxType& agg_method, HashTabl
for (size_t j = 0; j < partitioned_indices.size(); ++j) {
if (partitioned_indices[j] != i) {
if (length > 0) {
mutable_block.add_rows(&block, begin, length);
RETURN_IF_ERROR(mutable_block.add_rows(&block, begin, length));
}
length = 0;
continue;
@ -1145,7 +1145,7 @@ Status AggregationNode::_spill_hash_table(HashTableCtxType& agg_method, HashTabl
}
if (length > 0) {
mutable_block.add_rows(&block, begin, length);
RETURN_IF_ERROR(mutable_block.add_rows(&block, begin, length));
}
CHECK_EQ(mutable_block.rows(), blocks_rows[i]);

View File

@ -377,7 +377,7 @@ Status VCollectIterator::_topn_next(Block* block) {
size_t base = mutable_block.rows();
// append block to mutable_block
mutable_block.add_rows(block, 0, rows_to_copy);
RETURN_IF_ERROR(mutable_block.add_rows(block, 0, rows_to_copy));
// insert appended rows pos in mutable_block to sorted_row_pos and sort it
for (size_t i = 0; i < rows_to_copy; i++) {
sorted_row_pos.insert(base + i);

View File

@ -885,7 +885,7 @@ Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest
if (!rows->empty()) {
SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer());
const auto* begin = rows->data();
_mutable_block->add_rows(block, begin, begin + rows->size());
RETURN_IF_ERROR(_mutable_block->add_rows(block, begin, begin + rows->size()));
}
} else if (!block->empty()) {
SCOPED_TIMER(_parent->merge_block_timer());

View File

@ -51,7 +51,7 @@ Status VRowDistribution::_save_missing_values(
int col_size, Block* block, std::vector<int64_t> filter,
const std::vector<const NullMap*>& col_null_maps) {
// de-duplication for new partitions but save all rows.
_batching_block->add_rows(block, filter);
RETURN_IF_ERROR(_batching_block->add_rows(block, filter));
std::vector<TNullableStringLiteral> cur_row_values;
for (int row = 0; row < col_strs[0].size(); ++row) {
cur_row_values.clear();