From be27d4d92144d31e600372fac2a88d4dc82c5c08 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Thu, 12 Oct 2023 17:04:29 +0800 Subject: [PATCH] [fix](broker-load) fix use_count() issue when doing broker load in debug mode (#25288) When executing broker load in ASAN mode, BE may crash with error: ``` F20231010 18:18:17.044978 185490 block.cpp:694] Check failed: d.column->use_count() == 1 (3 vs. 1) *** Check failure stack trace: *** @ 0x55e9d94c4e46 google::LogMessage::SendToLog() @ 0x55e9d94c1410 google::LogMessage::Flush() @ 0x55e9d94c5689 google::LogMessageFatal::~LogMessageFatal() @ 0x55e9c509f80d doris::vectorized::Block::clear_column_data() @ 0x55e9b6c170b3 doris::PlanFragmentExecutor::get_vectorized_internal() @ 0x55e9b6c147e6 doris::PlanFragmentExecutor::open_vectorized_internal() @ 0x55e9b6c12d9a doris::PlanFragmentExecutor::open() @ 0x55e9b6c18426 doris::PlanFragmentExecutor::execute() @ 0x55e9b6945cca doris::FragmentMgr::_exec_actual() @ 0x55e9b696456c doris::FragmentMgr::exec_plan_fragment()::$_0::operator()() ``` It may happen when there is column maping like: ``` (k1,v2,v3,v4,v5,v6,v7,v8) set (k2=v4,k3=v4,k4=v4) ``` in load stmt. Case is covered by Baidu test cases --- be/src/vec/core/block.cpp | 8 +++++ be/src/vec/core/block.h | 4 +++ be/src/vec/exec/scan/vfile_scanner.cpp | 48 ++++++++++++++++---------- be/src/vec/exec/scan/vfile_scanner.h | 1 + 4 files changed, 42 insertions(+), 19 deletions(-) diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 9988ae9944..8492044215 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -674,6 +674,14 @@ void Block::clear() { row_same_bit.clear(); } +std::string Block::print_use_count() { + std::stringstream ss; + for (auto& d : data) { + ss << ", [" << d.name << ", " << d.column->use_count() << "]"; + } + return ss.str(); +} + void Block::clear_column_data(int column_size) noexcept { // data.size() greater than column_size, means here have some // function exec result in block, need erase it here diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 5f52258972..927ed5c655 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -401,6 +401,10 @@ public: void clear_same_bit() { row_same_bit.clear(); } + // return string contains use_count() of each columns + // for debug purpose. + std::string print_use_count(); + private: void erase_impl(size_t position); }; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 1b40e31d4d..ddb8df3ce0 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -208,6 +208,10 @@ Status VFileScanner::prepare( RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc)); RETURN_IF_ERROR(conjunct->open(_state)); } + + _dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(), + std::vector({_output_tuple_desc->id()}), + std::vector({false}))); } _default_val_row_desc.reset(new RowDescriptor(_state->desc_tbl(), @@ -507,7 +511,7 @@ Status VFileScanner::_pre_filter_src_block() { auto old_rows = _src_block_ptr->rows(); RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr, origin_column_num)); - _counter.num_rows_unselected += old_rows - _src_block.rows(); + _counter.num_rows_unselected += old_rows - _src_block_ptr->rows(); } return Status::OK(); } @@ -519,17 +523,24 @@ Status VFileScanner::_convert_to_output_block(Block* block) { SCOPED_TIMER(_convert_to_output_block_timer); // The block is passed from scanner context's free blocks, - // which is initialized by src columns. - // But for load job, the block should be filled with dest columns. - // So need to clear it first. - block->clear(); + // which is initialized by output columns + // so no need to clear it + // block->clear(); int ctx_idx = 0; - size_t rows = _src_block.rows(); + size_t rows = _src_block_ptr->rows(); auto filter_column = vectorized::ColumnUInt8::create(rows, 1); auto& filter_map = filter_column->get_data(); - for (auto slot_desc : _output_tuple_desc->slots()) { + // After convert, the column_ptr should be copied into output block. + // Can not use block->insert() because it may cause use_count() non-zero bug + MutableBlock mutable_output_block = + VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc); + auto& mutable_output_columns = mutable_output_block.mutable_columns(); + + // for (auto slot_desc : _output_tuple_desc->slots()) { + for (int i = 0; i < mutable_output_columns.size(); ++i) { + auto slot_desc = _output_tuple_desc->slots()[i]; if (!slot_desc->is_materialized()) { continue; } @@ -539,8 +550,8 @@ Status VFileScanner::_convert_to_output_block(Block* block) { auto& ctx = _dest_vexpr_ctx[dest_index]; int result_column_id = -1; // PT1 => dest primitive type - RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id)); - column_ptr = _src_block.get_by_position(result_column_id).column; + RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id)); + column_ptr = _src_block_ptr->get_by_position(result_column_id).column; // column_ptr maybe a ColumnConst, convert it to a normal column column_ptr = column_ptr->convert_to_full_column_if_const(); DCHECK(column_ptr != nullptr); @@ -553,16 +564,16 @@ Status VFileScanner::_convert_to_output_block(Block* block) { for (int i = 0; i < rows; ++i) { if (filter_map[i] && nullable_column->is_null_at(i)) { if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) && - !_src_block.get_by_position(_dest_slot_to_src_slot_index[dest_index]) + !_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]) .column->is_null_at(i)) { RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { - return _src_block.dump_one_line(i, _num_of_columns_from_file); + return _src_block_ptr->dump_one_line(i, + _num_of_columns_from_file); }, [&]() -> std::string { - auto raw_value = - _src_block.get_by_position(ctx_idx).column->get_data_at( - i); + auto raw_value = _src_block_ptr->get_by_position(ctx_idx) + .column->get_data_at(i); std::string raw_string = raw_value.to_string(); fmt::memory_buffer error_msg; fmt::format_to(error_msg, @@ -577,7 +588,8 @@ Status VFileScanner::_convert_to_output_block(Block* block) { } else if (!slot_desc->is_nullable()) { RETURN_IF_ERROR(_state->append_error_msg_to_file( [&]() -> std::string { - return _src_block.dump_one_line(i, _num_of_columns_from_file); + return _src_block_ptr->dump_one_line(i, + _num_of_columns_from_file); }, [&]() -> std::string { fmt::memory_buffer error_msg; @@ -598,14 +610,12 @@ Status VFileScanner::_convert_to_output_block(Block* block) { } else if (slot_desc->is_nullable()) { column_ptr = make_nullable(column_ptr); } - block->insert(dest_index, vectorized::ColumnWithTypeAndName(std::move(column_ptr), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); + mutable_output_columns[i]->insert_range_from(*column_ptr, 0, rows); ctx_idx++; } // after do the dest block insert operation, clear _src_block to remove the reference of origin column - _src_block.clear(); + _src_block_ptr->clear(); size_t dest_size = block->columns(); // do filter diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index fc9760b961..d5b1edb9e5 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -142,6 +142,7 @@ protected: // For load task vectorized::VExprContextSPtrs _pre_conjunct_ctxs; std::unique_ptr _src_row_desc; + std::unique_ptr _dest_row_desc; // row desc for default exprs std::unique_ptr _default_val_row_desc; // owned by scan node