From 84d9d1ebe2a1a5a316fef3e786c0914b18cfc58f Mon Sep 17 00:00:00 2001 From: obdev Date: Tue, 6 Feb 2024 16:28:50 +0000 Subject: [PATCH] fix deserialize arg in multi-transation commit --- src/share/ob_rpc_struct.cpp | 114 ++++++++++++ src/share/ob_rpc_struct.h | 12 ++ .../tablet/ob_tablet_binding_helper.cpp | 58 ++++++ src/storage/tablet/ob_tablet_binding_helper.h | 2 + .../tablet/ob_tablet_create_delete_helper.h | 61 ++++--- unittest/storage/CMakeLists.txt | 1 + .../multi_data_source/test_is_old_mds.cpp | 168 ++++++++++++++++++ 7 files changed, 389 insertions(+), 27 deletions(-) create mode 100644 unittest/storage/multi_data_source/test_is_old_mds.cpp diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index f89aba862..7799a8cd6 100755 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -7818,6 +7818,58 @@ int ObBatchRemoveTabletArg::init(const ObIArray &tablet_ids, return ret; } +int ObBatchRemoveTabletArg::skip_array_len(const char *buf, + int64_t data_len, + int64_t &pos) +{ + int ret = OB_SUCCESS; + int64_t count = 0; + if (pos > data_len) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len), K(pos)); + } else if (OB_FAIL(serialization::decode_vi64(buf, data_len, pos, &count))) { + TRANS_LOG(WARN, "failed to decode array count", K(ret), KP(buf), K(data_len)); + } else if (count <= 0) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len), K(pos), K(count)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) { + ObTabletID tablet_id; + OB_UNIS_DECODE(tablet_id); + } + } + return ret; +} + +int ObBatchRemoveTabletArg::is_old_mds(const char *buf, + int64_t data_len, + bool &is_old_mds) +{ + int ret = OB_SUCCESS; + int64_t pos = 0; + is_old_mds = false; + int64_t version = 0; + int64_t len = 0; + share::ObLSID id; + + if (OB_ISNULL(buf) || OB_UNLIKELY(data_len <= 0)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len)); + } else { + LST_DO_CODE(OB_UNIS_DECODE, version, len); + if (OB_FAIL(ret)) { + } + // tablets array + else if (OB_FAIL(skip_array_len(buf, data_len, pos))) { + TRANS_LOG(WARN, "failed to skip_unis_array_len", K(ret), KP(buf), K(data_len), K(pos), K(version), K(len), K(id)); + } else { + LST_DO_CODE(OB_UNIS_DECODE, id, is_old_mds); + } + } + + return ret; +} + DEF_TO_STRING(ObBatchRemoveTabletArg) { int64_t pos = 0; @@ -8086,6 +8138,33 @@ int ObBatchCreateTabletArg::serialize_for_create_tablet_schemas(char *buf, return ret; } +int ObBatchCreateTabletArg::skip_unis_array_len(const char *buf, + int64_t data_len, + int64_t &pos) +{ + int ret = OB_SUCCESS; + int64_t count = 0; + if (pos > data_len) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len), K(pos)); + } else if (OB_FAIL(serialization::decode_vi64(buf, data_len, pos, &count))) { + TRANS_LOG(WARN, "failed to decode array count", K(ret), KP(buf), K(data_len)); + } else if (count < 0) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len), K(pos), K(count)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) { + int64_t version = 0; + int64_t len = 0; + OB_UNIS_DECODEx(version); + OB_UNIS_DECODEx(len); + CHECK_VERSION_LENGTH(1, version, len); + pos += len; + } + } + return ret; +} + int64_t ObBatchCreateTabletArg::get_serialize_size_for_create_tablet_schemas() const { int ret = OB_SUCCESS; @@ -8145,6 +8224,41 @@ int ObBatchCreateTabletArg::deserialize_create_tablet_schemas(const char *buf, return ret; } +int ObBatchCreateTabletArg::is_old_mds(const char *buf, + int64_t data_len, + bool &is_old_mds) +{ + int ret = OB_SUCCESS; + int64_t pos = 0; + is_old_mds = false; + int64_t version = 0; + int64_t len = 0; + share::ObLSID id; + share::SCN major_frozen_scn; + bool need_check_tablet_cnt = false; + + if (OB_ISNULL(buf) || OB_UNLIKELY(data_len <= 0)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len)); + } else { + LST_DO_CODE(OB_UNIS_DECODE, version, len, id, major_frozen_scn); + if (OB_FAIL(ret)) { + } + // tablets array + else if (OB_FAIL(skip_unis_array_len(buf, data_len, pos))) { + TRANS_LOG(WARN, "failed to skip_unis_array_len", K(ret), KP(buf), K(data_len), K(pos)); + } + // schema array + else if (OB_FAIL(skip_unis_array_len(buf, data_len, pos))) { + TRANS_LOG(WARN, "failed to skip_unis_array_len", K(ret), KP(buf), K(data_len), K(pos)); + } else { + LST_DO_CODE(OB_UNIS_DECODE, need_check_tablet_cnt, is_old_mds); + } + } + + return ret; +} + DEF_TO_STRING(ObBatchCreateTabletArg) { int64_t pos = 0; diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index e9526a048..811d327c6 100755 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -3455,6 +3455,12 @@ public: int deserialize_create_tablet_schemas(const char *buf, const int64_t data_len, int64_t &pos); + static int is_old_mds(const char *buf, + int64_t data_len, + bool &is_old_mds); + static int skip_unis_array_len(const char *buf, + int64_t data_len, + int64_t &pos); DECLARE_TO_STRING; public: @@ -3480,6 +3486,12 @@ public: int assign (const ObBatchRemoveTabletArg &arg); int init(const ObIArray &tablet_ids, const share::ObLSID id); + static int is_old_mds(const char *buf, + int64_t data_len, + bool &is_old_mds); + static int skip_array_len(const char *buf, + int64_t data_len, + int64_t &pos); DECLARE_TO_STRING; public: diff --git a/src/storage/tablet/ob_tablet_binding_helper.cpp b/src/storage/tablet/ob_tablet_binding_helper.cpp index ffee6e0f1..3bb1e121f 100644 --- a/src/storage/tablet/ob_tablet_binding_helper.cpp +++ b/src/storage/tablet/ob_tablet_binding_helper.cpp @@ -66,6 +66,64 @@ int ObBatchUnbindTabletArg::assign(const ObBatchUnbindTabletArg &other) return ret; } +int ObBatchUnbindTabletArg::skip_array_len(const char *buf, + int64_t data_len, + int64_t &pos) +{ + int ret = OB_SUCCESS; + int64_t count = 0; + if (pos > data_len) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len), K(pos)); + } else if (OB_FAIL(serialization::decode_vi64(buf, data_len, pos, &count))) { + TRANS_LOG(WARN, "failed to decode array count", K(ret), KP(buf), K(data_len)); + } else if (count < 0) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len), K(pos), K(count)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) { + ObTabletID tablet_id; + OB_UNIS_DECODE(tablet_id); + } + } + return ret; +} + +int ObBatchUnbindTabletArg::is_old_mds(const char *buf, + int64_t data_len, + bool &is_old_mds) +{ + int ret = OB_SUCCESS; + int64_t pos = 0; + is_old_mds = false; + int64_t version = 0; + int64_t len = 0; + uint64_t tenant_id; + share::ObLSID id; + int64_t schema_version; + + if (OB_ISNULL(buf) || OB_UNLIKELY(data_len <= 0)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len)); + } else { + LST_DO_CODE(OB_UNIS_DECODE, version, len, tenant_id, id, schema_version); + if (OB_FAIL(ret)) { + } + // orig tablets array + else if (OB_FAIL(skip_array_len(buf, data_len, pos))) { + TRANS_LOG(WARN, "failed to skip_unis_array_len", K(ret), KP(buf), K(data_len), K(pos)); + } + // hidden tablets array + else if (OB_FAIL(skip_array_len(buf, data_len, pos))) { + TRANS_LOG(WARN, "failed to skip_unis_array_len", K(ret), KP(buf), K(data_len), K(pos)); + } else { + LST_DO_CODE(OB_UNIS_DECODE, is_old_mds); + } + } + + return ret; +} + OB_DEF_SERIALIZE(ObBatchUnbindTabletArg) { int ret = OB_SUCCESS; diff --git a/src/storage/tablet/ob_tablet_binding_helper.h b/src/storage/tablet/ob_tablet_binding_helper.h index 4f015f435..e25832748 100644 --- a/src/storage/tablet/ob_tablet_binding_helper.h +++ b/src/storage/tablet/ob_tablet_binding_helper.h @@ -62,6 +62,8 @@ public: inline bool is_redefined() const { return schema_version_ != OB_INVALID_VERSION; } TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(schema_version), K_(orig_tablet_ids), K_(hidden_tablet_ids)); bool is_valid() { return true; } + static int is_old_mds(const char *buf, const int64_t len, bool &is_old_mds); + static int skip_array_len(const char *buf, int64_t data_len, int64_t &pos); OB_UNIS_VERSION_V(1); public: diff --git a/src/storage/tablet/ob_tablet_create_delete_helper.h b/src/storage/tablet/ob_tablet_create_delete_helper.h index f50872731..d6312e377 100644 --- a/src/storage/tablet/ob_tablet_create_delete_helper.h +++ b/src/storage/tablet/ob_tablet_create_delete_helper.h @@ -208,11 +208,14 @@ int ObTabletCreateDeleteHelper::process_for_old_mds( { int ret = OB_SUCCESS; Arg arg; + bool is_old_mds = false; if (OB_ISNULL(buf) || OB_UNLIKELY(len <= 0)) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(len)); - } else { + } else if (OB_FAIL(Arg::is_old_mds(buf, len, is_old_mds))) { + TRANS_LOG(WARN, "failed to is_old_mds", K(ret), KP(buf), K(len)); + } else if (is_old_mds) { do { int64_t pos = 0; if (OB_FAIL(arg.deserialize(buf, len, pos))) { @@ -224,37 +227,41 @@ int ObTabletCreateDeleteHelper::process_for_old_mds( } } } while (OB_FAIL(ret) && !notify_arg.for_replay_); - } - - if (OB_FAIL(ret)) { - } else if (OB_UNLIKELY(!arg.is_valid())) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "arg is invalid", K(ret), K(arg)); - } else if (arg.is_old_mds_) { - mds::MdsCtx mds_ctx; - mds_ctx.set_binding_type_id(mds::TupleTypeIdx::value); - mds_ctx.set_writer(mds::MdsWriter(notify_arg.tx_id_)); - - if (notify_arg.for_replay_) { - if (OB_FAIL(Helper::replay_process(arg, notify_arg.scn_, mds_ctx))) { - ret = OB_EAGAIN; - TRANS_LOG(WARN, "failed to replay_process", K(ret), K(notify_arg), K(arg)); - } - } else { - do { - if (OB_FAIL(Helper::register_process(arg, mds_ctx))) { - TRANS_LOG(ERROR, "fail to register_process, retry", K(ret), K(arg), K(notify_arg)); - usleep(100 * 1000); - } - } while (OB_FAIL(ret)); - } if (OB_FAIL(ret)) { + } else if (OB_UNLIKELY(!arg.is_valid())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "arg is invalid", K(ret), K(arg)); + } else if (!arg.is_old_mds_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "arg is not old mds, but buf is old mds", K(ret), K(arg)); } else { - mds_ctx.single_log_commit(notify_arg.trans_version_, notify_arg.scn_); - TRANS_LOG(INFO, "replay create commit for old_mds", KR(ret), K(arg)); + mds::MdsCtx mds_ctx; + mds_ctx.set_binding_type_id(mds::TupleTypeIdx::value); + mds_ctx.set_writer(mds::MdsWriter(notify_arg.tx_id_)); + + if (notify_arg.for_replay_) { + if (OB_FAIL(Helper::replay_process(arg, notify_arg.scn_, mds_ctx))) { + ret = OB_EAGAIN; + TRANS_LOG(WARN, "failed to replay_process", K(ret), K(notify_arg), K(arg)); + } + } else { + do { + if (OB_FAIL(Helper::register_process(arg, mds_ctx))) { + TRANS_LOG(ERROR, "fail to register_process, retry", K(ret), K(arg), K(notify_arg)); + usleep(100 * 1000); + } + } while (OB_FAIL(ret)); + } + + if (OB_FAIL(ret)) { + } else { + mds_ctx.single_log_commit(notify_arg.trans_version_, notify_arg.scn_); + TRANS_LOG(INFO, "replay create commit for old_mds", KR(ret), K(arg)); + } } } + return ret; }; diff --git a/unittest/storage/CMakeLists.txt b/unittest/storage/CMakeLists.txt index 8b72b29ef..f065f981a 100644 --- a/unittest/storage/CMakeLists.txt +++ b/unittest/storage/CMakeLists.txt @@ -58,6 +58,7 @@ storage_unittest(test_mds_dump_kv multi_data_source/test_mds_dump_kv.cpp) #storage_unittest(test_memtable_multi_version_row_iterator memtable/test_memtable_multi_version_row_iterator.cpp) #storage_unittest(test_new_table_store) storage_unittest(test_mds_table_handle multi_data_source/test_mds_table_handle.cpp) +storage_unittest(test_is_old_mds multi_data_source/test_is_old_mds.cpp) storage_unittest(test_fixed_size_block_allocator) storage_unittest(test_dag_warning_history) storage_unittest(test_storage_schema) diff --git a/unittest/storage/multi_data_source/test_is_old_mds.cpp b/unittest/storage/multi_data_source/test_is_old_mds.cpp new file mode 100644 index 000000000..12afff630 --- /dev/null +++ b/unittest/storage/multi_data_source/test_is_old_mds.cpp @@ -0,0 +1,168 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "share/ob_errno.h" +#include +#include "storage/schema_utils.h" +#include "storage/test_tablet_helper.h" + +#define USING_LOG_PREFIX STORAGE + +#define UNITTEST + +namespace oceanbase +{ +namespace storage +{ +namespace checkpoint +{ +class TestIsOldMds : public ::testing::Test +{ +public: + TestIsOldMds() {} + virtual ~TestIsOldMds() = default; + + static void SetUpTestCase() + { + } + + static void TearDownTestCase() + { + } +}; + +TEST_F(TestIsOldMds, test_is_old_mds_for_unbind) { + int ret = OB_SUCCESS; + ObBatchUnbindTabletArg arg; + for (int64_t i =0; OB_SUCC(ret) && i < 5; i++) { + arg.orig_tablet_ids_.push_back(ObTabletID(1)); + ASSERT_EQ(OB_SUCCESS, ret); + } + + for (int64_t i =0; OB_SUCC(ret) && i < 5; i++) { + arg.hidden_tablet_ids_.push_back(ObTabletID(1)); + ASSERT_EQ(OB_SUCCESS, ret); + } + + int64_t len = 5000; + char buf[5000] = {0}; + int64_t pos = 0; + bool is_old_mds; + + arg.is_old_mds_ = false; + ret = arg.serialize(buf, len, pos); + ASSERT_EQ(OB_SUCCESS, ret); + ret = arg.is_old_mds(buf, len, is_old_mds); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(false, is_old_mds); + + pos = 0; + arg.is_old_mds_ = true; + memset(buf, 0, 5000); + ret = arg.serialize(buf, len, pos); + ASSERT_EQ(OB_SUCCESS, ret); + ret = arg.is_old_mds(buf, len, is_old_mds); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(true, is_old_mds); +} + +TEST_F(TestIsOldMds, test_is_old_mds_for_delete) { + int ret = OB_SUCCESS; + ObBatchRemoveTabletArg arg; + for (int64_t i =0; OB_SUCC(ret) && i < 5; i++) { + arg.tablet_ids_.push_back(ObTabletID(1)); + ASSERT_EQ(OB_SUCCESS, ret); + } + + int64_t len = 5000; + char buf[5000] = {0}; + int64_t pos = 0; + bool is_old_mds; + + arg.is_old_mds_ = false; + ret = arg.serialize(buf, len, pos); + ASSERT_EQ(OB_SUCCESS, ret); + ret = arg.is_old_mds(buf, len, is_old_mds); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(false, is_old_mds); + + pos = 0; + arg.is_old_mds_ = true; + memset(buf, 0, 5000); + ret = arg.serialize(buf, len, pos); + ASSERT_EQ(OB_SUCCESS, ret); + ret = arg.is_old_mds(buf, len, is_old_mds); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(true, is_old_mds); +} + +TEST_F(TestIsOldMds, test_is_old_mds_for_create) { + int ret = OB_SUCCESS; + ObBatchCreateTabletArg arg; + ObTableSchema table_schema; + TestSchemaUtils::prepare_data_schema(table_schema); + ret = arg.table_schemas_.push_back(table_schema); + ASSERT_EQ(OB_SUCCESS, ret); + ret = arg.table_schemas_.push_back(table_schema); + ASSERT_EQ(OB_SUCCESS, ret); + for (int64_t i =0; OB_SUCC(ret) && i < 5; i++) { + obrpc::ObCreateTabletInfo create_tablet_info; + create_tablet_info.compat_mode_ = lib::Worker::CompatMode::MYSQL; + create_tablet_info.data_tablet_id_ = ObTabletID(1); + ASSERT_EQ(OB_SUCCESS, ret); + for (int64_t j = 0; OB_SUCC(ret) && j < 5; ++j) { + create_tablet_info.tablet_ids_.push_back(ObTabletID(1)); + ASSERT_EQ(OB_SUCCESS, ret); + create_tablet_info.table_schema_index_.push_back(0); + ASSERT_EQ(OB_SUCCESS, ret); + } + arg.tablets_.push_back(create_tablet_info); + ASSERT_EQ(OB_SUCCESS, ret); + } + + int64_t len = 5000; + char buf[5000] = {0}; + int64_t pos = 0; + bool is_old_mds; + + arg.is_old_mds_ = false; + ret = arg.serialize(buf, len, pos); + ASSERT_EQ(OB_SUCCESS, ret); + ret = arg.is_old_mds(buf, len, is_old_mds); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(false, is_old_mds); + + pos = 0; + arg.is_old_mds_ = true; + memset(buf, 0, 5000); + ret = arg.serialize(buf, len, pos); + ASSERT_EQ(OB_SUCCESS, ret); + ret = arg.is_old_mds(buf, len, is_old_mds); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(true, is_old_mds); +} + +} +} // end namespace storage +} // end namespace oceanbase + +int main(int argc, char **argv) +{ + int ret = 0; + system("rm -f test_is_old_mds.log*"); + OB_LOGGER.set_file_name("test_is_old_mds.log", true); + OB_LOGGER.set_log_level("INFO"); + signal(49, SIG_IGN); + testing::InitGoogleTest(&argc, argv); + ret = RUN_ALL_TESTS(); + return ret; +}