[fix](broker-load) fix use_count() issue when doing broker load in debug mode (#25288)
When executing broker load in ASAN mode, BE may crash with error:
```
F20231010 18:18:17.044978 185490 block.cpp:694] Check failed: d.column->use_count() == 1 (3 vs. 1)
*** Check failure stack trace: ***
@ 0x55e9d94c4e46 google::LogMessage::SendToLog()
@ 0x55e9d94c1410 google::LogMessage::Flush()
@ 0x55e9d94c5689 google::LogMessageFatal::~LogMessageFatal()
@ 0x55e9c509f80d doris::vectorized::Block::clear_column_data()
@ 0x55e9b6c170b3 doris::PlanFragmentExecutor::get_vectorized_internal()
@ 0x55e9b6c147e6 doris::PlanFragmentExecutor::open_vectorized_internal()
@ 0x55e9b6c12d9a doris::PlanFragmentExecutor::open()
@ 0x55e9b6c18426 doris::PlanFragmentExecutor::execute()
@ 0x55e9b6945cca doris::FragmentMgr::_exec_actual()
@ 0x55e9b696456c doris::FragmentMgr::exec_plan_fragment()::$_0::operator()()
```
It may happen when there is column maping like:
```
(k1,v2,v3,v4,v5,v6,v7,v8)
set (k2=v4,k3=v4,k4=v4)
```
in load stmt.
Case is covered by Baidu test cases
This commit is contained in:
@ -674,6 +674,14 @@ void Block::clear() {
|
||||
row_same_bit.clear();
|
||||
}
|
||||
|
||||
std::string Block::print_use_count() {
|
||||
std::stringstream ss;
|
||||
for (auto& d : data) {
|
||||
ss << ", [" << d.name << ", " << d.column->use_count() << "]";
|
||||
}
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
void Block::clear_column_data(int column_size) noexcept {
|
||||
// data.size() greater than column_size, means here have some
|
||||
// function exec result in block, need erase it here
|
||||
|
||||
@ -401,6 +401,10 @@ public:
|
||||
|
||||
void clear_same_bit() { row_same_bit.clear(); }
|
||||
|
||||
// return string contains use_count() of each columns
|
||||
// for debug purpose.
|
||||
std::string print_use_count();
|
||||
|
||||
private:
|
||||
void erase_impl(size_t position);
|
||||
};
|
||||
|
||||
@ -208,6 +208,10 @@ Status VFileScanner::prepare(
|
||||
RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
|
||||
RETURN_IF_ERROR(conjunct->open(_state));
|
||||
}
|
||||
|
||||
_dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
|
||||
std::vector<TupleId>({_output_tuple_desc->id()}),
|
||||
std::vector<bool>({false})));
|
||||
}
|
||||
|
||||
_default_val_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
|
||||
@ -507,7 +511,7 @@ Status VFileScanner::_pre_filter_src_block() {
|
||||
auto old_rows = _src_block_ptr->rows();
|
||||
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr,
|
||||
origin_column_num));
|
||||
_counter.num_rows_unselected += old_rows - _src_block.rows();
|
||||
_counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -519,17 +523,24 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
|
||||
|
||||
SCOPED_TIMER(_convert_to_output_block_timer);
|
||||
// The block is passed from scanner context's free blocks,
|
||||
// which is initialized by src columns.
|
||||
// But for load job, the block should be filled with dest columns.
|
||||
// So need to clear it first.
|
||||
block->clear();
|
||||
// which is initialized by output columns
|
||||
// so no need to clear it
|
||||
// block->clear();
|
||||
|
||||
int ctx_idx = 0;
|
||||
size_t rows = _src_block.rows();
|
||||
size_t rows = _src_block_ptr->rows();
|
||||
auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
|
||||
auto& filter_map = filter_column->get_data();
|
||||
|
||||
for (auto slot_desc : _output_tuple_desc->slots()) {
|
||||
// After convert, the column_ptr should be copied into output block.
|
||||
// Can not use block->insert() because it may cause use_count() non-zero bug
|
||||
MutableBlock mutable_output_block =
|
||||
VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
|
||||
auto& mutable_output_columns = mutable_output_block.mutable_columns();
|
||||
|
||||
// for (auto slot_desc : _output_tuple_desc->slots()) {
|
||||
for (int i = 0; i < mutable_output_columns.size(); ++i) {
|
||||
auto slot_desc = _output_tuple_desc->slots()[i];
|
||||
if (!slot_desc->is_materialized()) {
|
||||
continue;
|
||||
}
|
||||
@ -539,8 +550,8 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
|
||||
auto& ctx = _dest_vexpr_ctx[dest_index];
|
||||
int result_column_id = -1;
|
||||
// PT1 => dest primitive type
|
||||
RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
|
||||
column_ptr = _src_block.get_by_position(result_column_id).column;
|
||||
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id));
|
||||
column_ptr = _src_block_ptr->get_by_position(result_column_id).column;
|
||||
// column_ptr maybe a ColumnConst, convert it to a normal column
|
||||
column_ptr = column_ptr->convert_to_full_column_if_const();
|
||||
DCHECK(column_ptr != nullptr);
|
||||
@ -553,16 +564,16 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
|
||||
for (int i = 0; i < rows; ++i) {
|
||||
if (filter_map[i] && nullable_column->is_null_at(i)) {
|
||||
if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
|
||||
!_src_block.get_by_position(_dest_slot_to_src_slot_index[dest_index])
|
||||
!_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index])
|
||||
.column->is_null_at(i)) {
|
||||
RETURN_IF_ERROR(_state->append_error_msg_to_file(
|
||||
[&]() -> std::string {
|
||||
return _src_block.dump_one_line(i, _num_of_columns_from_file);
|
||||
return _src_block_ptr->dump_one_line(i,
|
||||
_num_of_columns_from_file);
|
||||
},
|
||||
[&]() -> std::string {
|
||||
auto raw_value =
|
||||
_src_block.get_by_position(ctx_idx).column->get_data_at(
|
||||
i);
|
||||
auto raw_value = _src_block_ptr->get_by_position(ctx_idx)
|
||||
.column->get_data_at(i);
|
||||
std::string raw_string = raw_value.to_string();
|
||||
fmt::memory_buffer error_msg;
|
||||
fmt::format_to(error_msg,
|
||||
@ -577,7 +588,8 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
|
||||
} else if (!slot_desc->is_nullable()) {
|
||||
RETURN_IF_ERROR(_state->append_error_msg_to_file(
|
||||
[&]() -> std::string {
|
||||
return _src_block.dump_one_line(i, _num_of_columns_from_file);
|
||||
return _src_block_ptr->dump_one_line(i,
|
||||
_num_of_columns_from_file);
|
||||
},
|
||||
[&]() -> std::string {
|
||||
fmt::memory_buffer error_msg;
|
||||
@ -598,14 +610,12 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
|
||||
} else if (slot_desc->is_nullable()) {
|
||||
column_ptr = make_nullable(column_ptr);
|
||||
}
|
||||
block->insert(dest_index, vectorized::ColumnWithTypeAndName(std::move(column_ptr),
|
||||
slot_desc->get_data_type_ptr(),
|
||||
slot_desc->col_name()));
|
||||
mutable_output_columns[i]->insert_range_from(*column_ptr, 0, rows);
|
||||
ctx_idx++;
|
||||
}
|
||||
|
||||
// after do the dest block insert operation, clear _src_block to remove the reference of origin column
|
||||
_src_block.clear();
|
||||
_src_block_ptr->clear();
|
||||
|
||||
size_t dest_size = block->columns();
|
||||
// do filter
|
||||
|
||||
@ -142,6 +142,7 @@ protected:
|
||||
// For load task
|
||||
vectorized::VExprContextSPtrs _pre_conjunct_ctxs;
|
||||
std::unique_ptr<RowDescriptor> _src_row_desc;
|
||||
std::unique_ptr<RowDescriptor> _dest_row_desc;
|
||||
// row desc for default exprs
|
||||
std::unique_ptr<RowDescriptor> _default_val_row_desc;
|
||||
// owned by scan node
|
||||
|
||||
Reference in New Issue
Block a user