diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp index 10a979f89d..cce5d14d69 100644 --- a/be/src/olap/wal/wal_manager.cpp +++ b/be/src/olap/wal/wal_manager.cpp @@ -205,11 +205,12 @@ size_t WalManager::get_wal_queue_size(int64_t table_id) { } Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, - const std::string& label, std::string& base_path) { + const std::string& label, std::string& base_path, + uint32_t wal_version) { base_path = _wal_dirs_info->get_available_random_wal_dir(); std::stringstream ss; ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" - << _wal_version << "_" << _exec_env->master_info()->backend_id << "_" + << std::to_string(wal_version) << "_" << _exec_env->master_info()->backend_id << "_" << std::to_string(wal_id) << "_" << label; { std::lock_guard wrlock(_wal_path_lock); diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h index 3710b9e31c..9cc9f95bd7 100644 --- a/be/src/olap/wal/wal_manager.h +++ b/be/src/olap/wal/wal_manager.h @@ -73,7 +73,7 @@ public: // replay wal Status create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, - const std::string& label, std::string& base_path); + const std::string& label, std::string& base_path, uint32_t wal_version); Status get_wal_path(int64_t wal_id, std::string& wal_path); Status delete_wal(int64_t table_id, int64_t wal_id); Status rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t wal_id); @@ -144,7 +144,6 @@ private: std::shared_mutex _wal_queue_lock; std::unordered_map> _wal_queues; - int64_t _wal_version = 0; std::atomic _first_replay; // for test relay @@ -154,4 +153,8 @@ private: std::shared_mutex _wal_cv_lock; std::unordered_map _wal_cv_map; }; + +// In doris 2.1.0, wal version is 0, now need to upgrade it to 1 to solve compatibility issues. +// see https://github.com/apache/doris/pull/32299 +constexpr inline uint32_t WAL_VERSION = 1; } // namespace doris diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp index d1263daf1c..9e4618b2bc 100644 --- a/be/src/olap/wal/wal_reader.cpp +++ b/be/src/olap/wal/wal_reader.cpp @@ -85,7 +85,7 @@ Status WalReader::read_block(PBlock& block) { return Status::OK(); } -Status WalReader::read_header(std::string& col_ids) { +Status WalReader::read_header(uint32_t& version, std::string& col_ids) { if (file_reader->size() == 0) { return Status::DataQualityError("empty file"); } @@ -101,7 +101,7 @@ Status WalReader::read_header(std::string& col_ids) { RETURN_IF_ERROR( file_reader->read_at(_offset, {version_buf, WalWriter::VERSION_SIZE}, &bytes_read)); _offset += WalWriter::VERSION_SIZE; - _version = decode_fixed32_le(version_buf); + version = decode_fixed32_le(version_buf); uint8_t len_buf[WalWriter::LENGTH_SIZE]; RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf, WalWriter::LENGTH_SIZE}, &bytes_read)); _offset += WalWriter::LENGTH_SIZE; diff --git a/be/src/olap/wal/wal_reader.h b/be/src/olap/wal/wal_reader.h index 1f26a7598f..c47d029331 100644 --- a/be/src/olap/wal/wal_reader.h +++ b/be/src/olap/wal/wal_reader.h @@ -32,13 +32,12 @@ public: Status finalize(); Status read_block(PBlock& block); - Status read_header(std::string& col_ids); + Status read_header(uint32_t& version, std::string& col_ids); private: Status _check_checksum(const char* binary, size_t size, uint32_t checksum); std::string _file_name; - uint32_t _version = 0; size_t _offset; io::FileReaderSPtr file_reader; }; diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index 14e3779748..641ef8c664 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -321,7 +321,10 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id, Status WalTable::_read_wal_header(const std::string& wal_path, std::string& columns) { std::shared_ptr wal_reader = std::make_shared(wal_path); RETURN_IF_ERROR(wal_reader->init()); - RETURN_IF_ERROR(wal_reader->read_header(columns)); + uint32_t version = 0; + RETURN_IF_ERROR(wal_reader->read_header(version, columns)); + VLOG_DEBUG << "wal=" << wal_path << ",version=" << std::to_string(version) + << ",columns=" << columns; RETURN_IF_ERROR(wal_reader->finalize()); return Status::OK(); } diff --git a/be/src/olap/wal/wal_writer.cpp b/be/src/olap/wal/wal_writer.cpp index 43f021c9d3..5658eded56 100644 --- a/be/src/olap/wal/wal_writer.cpp +++ b/be/src/olap/wal/wal_writer.cpp @@ -90,7 +90,7 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) { return Status::OK(); } -Status WalWriter::append_header(uint32_t version, std::string col_ids) { +Status WalWriter::append_header(std::string col_ids) { if (!_file_writer) { return Status::InternalError("wal writer is null,fail to write file={}", _file_name); } @@ -105,7 +105,7 @@ Status WalWriter::append_header(uint32_t version, std::string col_ids) { offset += k_wal_magic_length; uint8_t version_buf[sizeof(uint32_t)]; - encode_fixed32_le(version_buf, version); + encode_fixed32_le(version_buf, WAL_VERSION); RETURN_IF_ERROR(_file_writer->append({version_buf, sizeof(uint32_t)})); offset += VERSION_SIZE; uint8_t len_buf[sizeof(uint64_t)]; diff --git a/be/src/olap/wal/wal_writer.h b/be/src/olap/wal/wal_writer.h index 08d2a4eb71..f730e02666 100644 --- a/be/src/olap/wal/wal_writer.h +++ b/be/src/olap/wal/wal_writer.h @@ -36,7 +36,7 @@ public: Status finalize(); Status append_blocks(const PBlockArray& blocks); - Status append_header(uint32_t version, std::string col_ids); + Status append_header(std::string col_ids); std::string file_name() { return _file_name; }; diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index cadff23176..646b58c5fd 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -314,8 +314,12 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version) { st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, pipeline_params); if (!st.ok()) { - static_cast(_finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id, - st, nullptr)); + auto finish_st = _finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id, + st, nullptr); + if (!finish_st.ok()) { + LOG(WARNING) << "finish group commit error, label=" << label + << ", st=" << finish_st.to_string(); + } } return st; } @@ -432,8 +436,12 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id, const TExecPlanFragmentParams& params, const TPipelineFragmentParams& pipeline_params) { auto finish_cb = [db_id, table_id, label, txn_id, this](RuntimeState* state, Status* status) { - static_cast(_finish_group_commit_load(db_id, table_id, label, txn_id, - state->fragment_instance_id(), *status, state)); + auto finish_st = _finish_group_commit_load(db_id, table_id, label, txn_id, + state->fragment_instance_id(), *status, state); + if (!finish_st.ok()) { + LOG(WARNING) << "finish group commit error, label=" << label + << ", st=" << finish_st.to_string(); + } }; if (is_pipeline) { return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, finish_cb); @@ -512,8 +520,8 @@ Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, std::string real_label = config::group_commit_wait_replay_wal_finish ? import_label + "_test_wait" : import_label; - RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path(db_id, tb_id, wal_id, - real_label, _wal_base_path)); + RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path( + db_id, tb_id, wal_id, real_label, _wal_base_path, WAL_VERSION)); _v_wal_writer = std::make_shared( db_id, tb_id, wal_id, real_label, wal_manager, slot_desc, be_exe_version); return _v_wal_writer->init(); diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp index 981e740d05..1fbd395ad2 100644 --- a/be/src/vec/exec/format/wal/wal_reader.cpp +++ b/be/src/vec/exec/format/wal/wal_reader.cpp @@ -47,6 +47,15 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { //read src block PBlock pblock; auto st = _wal_reader->read_block(pblock); + // Due to historical reasons, be_exec_version=3 will use the new way to serialize block + // in doris 2.1.0, now it has been corrected to use the old way to do serialize and deserialize + // in the latest version. So if a wal is created by 2.1.0 (wal version=0 && be_exec_version=3), + // it should upgrade the be_exec_version to 4 to use the new way to deserialize pblock to solve + // compatibility issues.see https://github.com/apache/doris/pull/32299 + if (_version == 0 && pblock.has_be_exec_version() && pblock.be_exec_version() == 3) { + VLOG_DEBUG << "need to set be_exec_version to 4 to solve compatibility issues"; + pblock.set_be_exec_version(4); + } if (st.is()) { LOG(INFO) << "read eof on wal:" << _wal_path; *read_rows = 0; @@ -99,7 +108,7 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { Status WalReader::get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) { std::string col_ids; - RETURN_IF_ERROR(_wal_reader->read_header(col_ids)); + RETURN_IF_ERROR(_wal_reader->read_header(_version, col_ids)); std::vector column_id_vector = strings::Split(col_ids, ",", strings::SkipWhitespace()); _column_id_count = column_id_vector.size(); diff --git a/be/src/vec/exec/format/wal/wal_reader.h b/be/src/vec/exec/format/wal/wal_reader.h index d2636d5495..09311496c1 100644 --- a/be/src/vec/exec/format/wal/wal_reader.h +++ b/be/src/vec/exec/format/wal/wal_reader.h @@ -41,6 +41,7 @@ private: // column_id, column_pos std::map _column_pos_map; int64_t _column_id_count; + uint32_t _version = 0; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp index 569f9bcd65..76e0bf0679 100644 --- a/be/src/vec/sink/writer/vwal_writer.cpp +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -60,7 +60,7 @@ Status VWalWriter::init() { ss << std::to_string(slot_desc.col_unique_id) << ","; } std::string col_ids = ss.str().substr(0, ss.str().size() - 1); - RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids)); + RETURN_IF_ERROR(_wal_writer->append_header(col_ids)); return Status::OK(); } diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h index f22250cb5d..a9fa218f33 100644 --- a/be/src/vec/sink/writer/vwal_writer.h +++ b/be/src/vec/sink/writer/vwal_writer.h @@ -44,8 +44,6 @@ private: int64_t _db_id; int64_t _tb_id; int64_t _wal_id; - // TODO version should in olap/wal_writer - uint32_t _version = 0; std::string _label; WalManager* _wal_manager; std::vector& _slot_descs; diff --git a/be/test/exec/test_data/wal_scanner/wal b/be/test/exec/test_data/wal_scanner/wal deleted file mode 100644 index 2c5fe90963..0000000000 Binary files a/be/test/exec/test_data/wal_scanner/wal and /dev/null differ diff --git a/be/test/exec/test_data/wal_scanner/wal_version0 b/be/test/exec/test_data/wal_scanner/wal_version0 new file mode 100644 index 0000000000..ddce750894 Binary files /dev/null and b/be/test/exec/test_data/wal_scanner/wal_version0 differ diff --git a/be/test/exec/test_data/wal_scanner/wal_version1 b/be/test/exec/test_data/wal_scanner/wal_version1 new file mode 100644 index 0000000000..f84b280b1f Binary files /dev/null and b/be/test/exec/test_data/wal_scanner/wal_version1 differ diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index af29264036..cb32fef740 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -64,10 +64,13 @@ private: std::string _wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test"; int64_t _db_id = 1; int64_t _tb_id = 2; - int64_t _txn_id = 789; - int64_t _version = 0; + int64_t _txn_id_1 = 123; + int64_t _txn_id_2 = 456; + uint32_t _version_0 = 0; + uint32_t _version_1 = 1; int64_t _backend_id = 1001; - std::string _label = "test"; + std::string _label_1 = "test1"; + std::string _label_2 = "test2"; TupleId _dst_tuple_id = 0; RuntimeState _runtime_state; @@ -238,8 +241,6 @@ void VWalScannerTest::init() { _scan_range.params.format_type = TFileFormatType::FORMAT_WAL; _kv_cache.reset(new ShardedKVCache(48)); - _runtime_state._wal_id = _txn_id; - _master_info.reset(new TMasterInfo()); _env = ExecEnv::GetInstance(); _env->_master_info = _master_info.get(); @@ -249,11 +250,19 @@ void VWalScannerTest::init() { _env->_wal_manager = WalManager::create_shared(_env, _wal_dir); std::string base_path; auto st = _env->_wal_manager->_init_wal_dirs_info(); - st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id, _label, base_path); - std::string src = "./be/test/exec/test_data/wal_scanner/wal"; + st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_1, _label_1, base_path, + _version_0); + std::string src = "./be/test/exec/test_data/wal_scanner/wal_version0"; std::string dst = _wal_dir + "/" + std::to_string(_db_id) + "/" + std::to_string(_tb_id) + "/" + - std::to_string(_version) + "_" + std::to_string(_backend_id) + "_" + - std::to_string(_txn_id) + "_" + _label; + std::to_string(_version_0) + "_" + std::to_string(_backend_id) + "_" + + std::to_string(_txn_id_1) + "_" + _label_1; + std::filesystem::copy(src, dst); + st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_2, _label_2, base_path, + _version_1); + src = "./be/test/exec/test_data/wal_scanner/wal_version1"; + dst = _wal_dir + "/" + std::to_string(_db_id) + "/" + std::to_string(_tb_id) + "/" + + std::to_string(_version_1) + "_" + std::to_string(_backend_id) + "_" + + std::to_string(_txn_id_2) + "_" + _label_2; std::filesystem::copy(src, dst); } @@ -269,6 +278,8 @@ void VWalScannerTest::generate_scanner(std::shared_ptr& scanner) { } TEST_F(VWalScannerTest, normal) { + // read wal file with wal_version=0 + _runtime_state._wal_id = _txn_id_1; std::shared_ptr scanner = nullptr; generate_scanner(scanner); std::unique_ptr block(new vectorized::Block()); @@ -282,6 +293,19 @@ TEST_F(VWalScannerTest, normal) { EXPECT_EQ(0, block->rows()); ASSERT_TRUE(eof); WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner"); + // read wal file with wal_version=1 + eof = false; + _runtime_state._wal_id = _txn_id_2; + generate_scanner(scanner); + st = scanner->get_block(&_runtime_state, block.get(), &eof); + ASSERT_TRUE(st.ok()); + EXPECT_EQ(3, block->rows()); + block->clear(); + st = scanner->get_block(&_runtime_state, block.get(), &eof); + ASSERT_TRUE(st.ok()); + EXPECT_EQ(0, block->rows()); + ASSERT_TRUE(eof); + WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner"); } TEST_F(VWalScannerTest, fail_with_not_equal) { @@ -296,6 +320,7 @@ TEST_F(VWalScannerTest, fail_with_not_equal) { [](auto&& args) { *try_any_cast(args[0]) = 2; }); sp->enable_processing(); + _runtime_state._wal_id = _txn_id_1; std::shared_ptr scanner = nullptr; generate_scanner(scanner); std::unique_ptr block(new vectorized::Block());