From b66583551c331f8dc37bb854f3f74cbb5bf1cb8d Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Thu, 22 Feb 2024 09:09:35 +0800 Subject: [PATCH] [fix](group_commit)Fix bound checking problem when reading wal block (#31112) --- be/src/vec/exec/format/wal/wal_reader.cpp | 21 ++- be/test/vec/exec/vwal_scanner_test.cpp | 155 +++++++++++++--------- 2 files changed, 106 insertions(+), 70 deletions(-) diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp index cb89dd9bcf..981e740d05 100644 --- a/be/src/vec/exec/format/wal/wal_reader.cpp +++ b/be/src/vec/exec/format/wal/wal_reader.cpp @@ -18,6 +18,7 @@ #include "wal_reader.h" #include "common/logging.h" +#include "common/sync_point.h" #include "gutil/strings/split.h" #include "olap/wal/wal_manager.h" #include "runtime/runtime_state.h" @@ -61,11 +62,17 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { //convert to dst block vectorized::Block dst_block; int index = 0; - auto columns = block->get_columns_with_type_and_name(); - if (_column_id_count != columns.size() || columns.size() != _tuple_descriptor->slots().size()) { + auto output_block_columns = block->get_columns_with_type_and_name(); + size_t output_block_column_size = output_block_columns.size(); + TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count", &_column_id_count); + TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size", &output_block_column_size); + if (_column_id_count != src_block.columns() || + output_block_column_size != _tuple_descriptor->slots().size()) { return Status::InternalError( - "not equal _column_id_count={} vs columns size={} vs tuple_descriptor size={}", - std::to_string(_column_id_count), std::to_string(columns.size()), + "not equal wal _column_id_count={} vs wal block columns size={}, " + "output block columns size={} vs tuple_descriptor size={}", + std::to_string(_column_id_count), std::to_string(src_block.columns()), + std::to_string(output_block_column_size), std::to_string(_tuple_descriptor->slots().size())); } for (auto slot_desc : _tuple_descriptor->slots()) { @@ -78,9 +85,9 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { if (column_ptr != nullptr && slot_desc->is_nullable()) { column_ptr = make_nullable(column_ptr); } - dst_block.insert( - index, vectorized::ColumnWithTypeAndName(std::move(column_ptr), columns[index].type, - columns[index].name)); + dst_block.insert(index, vectorized::ColumnWithTypeAndName( + std::move(column_ptr), output_block_columns[index].type, + output_block_columns[index].name)); index++; } block->swap(dst_block); diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index 8d4fe6ad75..af29264036 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -20,6 +20,7 @@ #include #include "common/object_pool.h" +#include "common/sync_point.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" #include "io/fs/local_file_system.h" @@ -40,12 +41,16 @@ public: init(); _profile = _runtime_state.runtime_profile(); _runtime_state.init_mem_trackers(); - static_cast(_runtime_state.init(unique_id, query_options, query_globals, _env)); + WARN_IF_ERROR(_runtime_state.init(_unique_id, _query_options, _query_globals, _env), + "fail to init _runtime_state"); } void init(); + void generate_scanner(std::shared_ptr& scanner); void TearDown() override { - static_cast(io::global_local_filesystem()->delete_directory(wal_dir)); + WARN_IF_ERROR(_scan_node->close(&_runtime_state), "fail to close scan_node") + WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir), + fmt::format("fail to delete dir={}", _wal_dir)); SAFE_STOP(_env->_wal_manager); } @@ -53,16 +58,16 @@ protected: virtual void SetUp() override {} private: - void init_desc_table(); + void _init_desc_table(); ExecEnv* _env = nullptr; - std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test"; - int64_t db_id = 1; - int64_t tb_id = 2; - int64_t txn_id = 789; - int64_t version = 0; - int64_t backend_id = 1001; - std::string label = "test"; + std::string _wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test"; + int64_t _db_id = 1; + int64_t _tb_id = 2; + int64_t _txn_id = 789; + int64_t _version = 0; + int64_t _backend_id = 1001; + std::string _label = "test"; TupleId _dst_tuple_id = 0; RuntimeState _runtime_state; @@ -73,12 +78,18 @@ private: ScannerCounter _counter; std::vector _pre_filter; TPlanNode _tnode; - TUniqueId unique_id; - TQueryOptions query_options; - TQueryGlobals query_globals; + TUniqueId _unique_id; + TQueryOptions _query_options; + TQueryGlobals _query_globals; + std::shared_ptr _scan_node = nullptr; + std::vector _ranges; + TFileRangeDesc _range_desc; + TFileScanRange _scan_range; + std::unique_ptr _kv_cache = nullptr; + std::unique_ptr _master_info = nullptr; }; -void VWalScannerTest::init_desc_table() { +void VWalScannerTest::_init_desc_table() { TDescriptorTable t_desc_table; // table descriptors @@ -118,6 +129,7 @@ void VWalScannerTest::init_desc_table() { slot_desc.nullIndicatorBit = -1; slot_desc.colName = "c1"; slot_desc.slotIdx = 1; + slot_desc.col_unique_id = 0; slot_desc.isMaterialized = true; t_desc_table.slotDescriptors.push_back(slot_desc); @@ -145,6 +157,7 @@ void VWalScannerTest::init_desc_table() { slot_desc.nullIndicatorBit = -1; slot_desc.colName = "c2"; slot_desc.slotIdx = 2; + slot_desc.col_unique_id = 1; slot_desc.isMaterialized = true; t_desc_table.slotDescriptors.push_back(slot_desc); @@ -172,6 +185,7 @@ void VWalScannerTest::init_desc_table() { slot_desc.nullIndicatorBit = -1; slot_desc.colName = "c3"; slot_desc.slotIdx = 3; + slot_desc.col_unique_id = 2; slot_desc.isMaterialized = true; t_desc_table.slotDescriptors.push_back(slot_desc); @@ -196,9 +210,10 @@ void VWalScannerTest::init_desc_table() { void VWalScannerTest::init() { config::group_commit_wal_max_disk_limit = "100M"; - init_desc_table(); - static_cast(io::global_local_filesystem()->create_directory( - wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id))); + _init_desc_table(); + WARN_IF_ERROR(io::global_local_filesystem()->create_directory( + _wal_dir + "/" + std::to_string(_db_id) + "/" + std::to_string(_tb_id)), + "fail to creat directory"); // Node Id _tnode.node_id = 0; @@ -210,73 +225,87 @@ void VWalScannerTest::init() { _tnode.file_scan_node.tuple_id = 0; _tnode.__isset.file_scan_node = true; + _scan_node = std::make_shared(&_obj_pool, _tnode, *_desc_tbl); + _scan_node->_output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id); + WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init scan_node"); + WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to prepare scan_node"); + + _range_desc.start_offset = 0; + _range_desc.size = 1000; + _ranges.push_back(_range_desc); + _scan_range.ranges = _ranges; + _scan_range.__isset.params = true; + _scan_range.params.format_type = TFileFormatType::FORMAT_WAL; + _kv_cache.reset(new ShardedKVCache(48)); + + _runtime_state._wal_id = _txn_id; + + _master_info.reset(new TMasterInfo()); _env = ExecEnv::GetInstance(); - _env->_master_info = new TMasterInfo(); + _env->_master_info = _master_info.get(); _env->_master_info->network_address.hostname = "host name"; - _env->_master_info->network_address.port = backend_id; + _env->_master_info->network_address.port = _backend_id; _env->_master_info->backend_id = 1001; - _env->_wal_manager = WalManager::create_shared(_env, wal_dir); + _env->_wal_manager = WalManager::create_shared(_env, _wal_dir); std::string base_path; auto st = _env->_wal_manager->_init_wal_dirs_info(); - st = _env->_wal_manager->create_wal_path(db_id, tb_id, txn_id, label, base_path); + st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id, _label, base_path); std::string src = "./be/test/exec/test_data/wal_scanner/wal"; - std::string dst = wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id) + "/" + - std::to_string(version) + "_" + std::to_string(backend_id) + "_" + - std::to_string(txn_id) + "_" + label; + std::string dst = _wal_dir + "/" + std::to_string(_db_id) + "/" + std::to_string(_tb_id) + "/" + + std::to_string(_version) + "_" + std::to_string(_backend_id) + "_" + + std::to_string(_txn_id) + "_" + _label; std::filesystem::copy(src, dst); } -TEST_F(VWalScannerTest, normal) { - std::vector index_vector; - index_vector.emplace_back(0); - index_vector.emplace_back(1); - index_vector.emplace_back(2); - // config::group_commit_replay_wal_dir = wal_dir; - NewFileScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); - scan_node._output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id); - static_cast(scan_node.init(_tnode, &_runtime_state)); - auto status = scan_node.prepare(&_runtime_state); - EXPECT_TRUE(status.ok()); - - std::vector ranges; - TFileRangeDesc range_desc; - { - range_desc.start_offset = 0; - range_desc.size = 1000; - } - ranges.push_back(range_desc); - TFileScanRange scan_range; - scan_range.ranges = ranges; - scan_range.__isset.params = true; - scan_range.params.format_type = TFileFormatType::FORMAT_WAL; - std::unique_ptr _kv_cache; - _kv_cache.reset(new ShardedKVCache(48)); - _runtime_state._wal_id = txn_id; - VFileScanner scanner(&_runtime_state, &scan_node, -1, scan_range, _profile, _kv_cache.get()); - scanner._is_load = false; +void VWalScannerTest::generate_scanner(std::shared_ptr& scanner) { + scanner = std::make_shared(&_runtime_state, _scan_node.get(), -1, _scan_range, + _profile, _kv_cache.get()); + scanner->_is_load = false; vectorized::VExprContextSPtrs _conjuncts; std::unordered_map _colname_to_value_range; std::unordered_map _colname_to_slot_id; - static_cast(scanner.prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id)); + WARN_IF_ERROR(scanner->prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id), + "fail to prepare scanner"); +} +TEST_F(VWalScannerTest, normal) { + std::shared_ptr scanner = nullptr; + generate_scanner(scanner); std::unique_ptr block(new vectorized::Block()); bool eof = false; - auto st = scanner.get_block(&_runtime_state, block.get(), &eof); - EXPECT_EQ(3, block->rows()); + auto st = scanner->get_block(&_runtime_state, block.get(), &eof); ASSERT_TRUE(st.ok()); + EXPECT_EQ(3, block->rows()); block->clear(); - st = scanner.get_block(&_runtime_state, block.get(), &eof); + st = scanner->get_block(&_runtime_state, block.get(), &eof); ASSERT_TRUE(st.ok()); EXPECT_EQ(0, block->rows()); ASSERT_TRUE(eof); - static_cast(scanner.close(&_runtime_state)); - static_cast(scan_node.close(&_runtime_state)); + WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner"); +} - { - std::stringstream ss; - scan_node.runtime_profile()->pretty_print(&ss); - LOG(INFO) << ss.str(); - } +TEST_F(VWalScannerTest, fail_with_not_equal) { + auto sp = SyncPoint::get_instance(); + Defer defer {[sp] { + sp->clear_call_back("WalReader::set_column_id_count"); + sp->clear_call_back("WalReader::set_out_block_column_size"); + }}; + sp->set_call_back("WalReader::set_column_id_count", + [](auto&& args) { *try_any_cast(args[0]) = 2; }); + sp->set_call_back("WalReader::set_out_block_column_size", + [](auto&& args) { *try_any_cast(args[0]) = 2; }); + sp->enable_processing(); + + std::shared_ptr scanner = nullptr; + generate_scanner(scanner); + std::unique_ptr block(new vectorized::Block()); + bool eof = false; + auto st = scanner->get_block(&_runtime_state, block.get(), &eof); + ASSERT_FALSE(st.ok()); + auto msg = st.to_string(); + auto pos = msg.find("not equal"); + ASSERT_TRUE(pos != msg.npos); + WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner"); } } // namespace vectorized