fix deserialize arg in multi-transation commit

This commit is contained in:
obdev 2024-02-06 16:28:50 +00:00 committed by ob-robot
parent 9c5a7b79e8
commit 84d9d1ebe2
7 changed files with 389 additions and 27 deletions

View File

@ -7818,6 +7818,58 @@ int ObBatchRemoveTabletArg::init(const ObIArray<common::ObTabletID> &tablet_ids,
return ret;
}
int ObBatchRemoveTabletArg::skip_array_len(const char *buf,
int64_t data_len,
int64_t &pos)
{
int ret = OB_SUCCESS;
int64_t count = 0;
if (pos > data_len) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len), K(pos));
} else if (OB_FAIL(serialization::decode_vi64(buf, data_len, pos, &count))) {
TRANS_LOG(WARN, "failed to decode array count", K(ret), KP(buf), K(data_len));
} else if (count <= 0) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len), K(pos), K(count));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) {
ObTabletID tablet_id;
OB_UNIS_DECODE(tablet_id);
}
}
return ret;
}
int ObBatchRemoveTabletArg::is_old_mds(const char *buf,
int64_t data_len,
bool &is_old_mds)
{
int ret = OB_SUCCESS;
int64_t pos = 0;
is_old_mds = false;
int64_t version = 0;
int64_t len = 0;
share::ObLSID id;
if (OB_ISNULL(buf) || OB_UNLIKELY(data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len));
} else {
LST_DO_CODE(OB_UNIS_DECODE, version, len);
if (OB_FAIL(ret)) {
}
// tablets array
else if (OB_FAIL(skip_array_len(buf, data_len, pos))) {
TRANS_LOG(WARN, "failed to skip_unis_array_len", K(ret), KP(buf), K(data_len), K(pos), K(version), K(len), K(id));
} else {
LST_DO_CODE(OB_UNIS_DECODE, id, is_old_mds);
}
}
return ret;
}
DEF_TO_STRING(ObBatchRemoveTabletArg)
{
int64_t pos = 0;
@ -8086,6 +8138,33 @@ int ObBatchCreateTabletArg::serialize_for_create_tablet_schemas(char *buf,
return ret;
}
int ObBatchCreateTabletArg::skip_unis_array_len(const char *buf,
int64_t data_len,
int64_t &pos)
{
int ret = OB_SUCCESS;
int64_t count = 0;
if (pos > data_len) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len), K(pos));
} else if (OB_FAIL(serialization::decode_vi64(buf, data_len, pos, &count))) {
TRANS_LOG(WARN, "failed to decode array count", K(ret), KP(buf), K(data_len));
} else if (count < 0) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len), K(pos), K(count));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) {
int64_t version = 0;
int64_t len = 0;
OB_UNIS_DECODEx(version);
OB_UNIS_DECODEx(len);
CHECK_VERSION_LENGTH(1, version, len);
pos += len;
}
}
return ret;
}
int64_t ObBatchCreateTabletArg::get_serialize_size_for_create_tablet_schemas() const
{
int ret = OB_SUCCESS;
@ -8145,6 +8224,41 @@ int ObBatchCreateTabletArg::deserialize_create_tablet_schemas(const char *buf,
return ret;
}
int ObBatchCreateTabletArg::is_old_mds(const char *buf,
int64_t data_len,
bool &is_old_mds)
{
int ret = OB_SUCCESS;
int64_t pos = 0;
is_old_mds = false;
int64_t version = 0;
int64_t len = 0;
share::ObLSID id;
share::SCN major_frozen_scn;
bool need_check_tablet_cnt = false;
if (OB_ISNULL(buf) || OB_UNLIKELY(data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len));
} else {
LST_DO_CODE(OB_UNIS_DECODE, version, len, id, major_frozen_scn);
if (OB_FAIL(ret)) {
}
// tablets array
else if (OB_FAIL(skip_unis_array_len(buf, data_len, pos))) {
TRANS_LOG(WARN, "failed to skip_unis_array_len", K(ret), KP(buf), K(data_len), K(pos));
}
// schema array
else if (OB_FAIL(skip_unis_array_len(buf, data_len, pos))) {
TRANS_LOG(WARN, "failed to skip_unis_array_len", K(ret), KP(buf), K(data_len), K(pos));
} else {
LST_DO_CODE(OB_UNIS_DECODE, need_check_tablet_cnt, is_old_mds);
}
}
return ret;
}
DEF_TO_STRING(ObBatchCreateTabletArg)
{
int64_t pos = 0;

View File

@ -3455,6 +3455,12 @@ public:
int deserialize_create_tablet_schemas(const char *buf,
const int64_t data_len,
int64_t &pos);
static int is_old_mds(const char *buf,
int64_t data_len,
bool &is_old_mds);
static int skip_unis_array_len(const char *buf,
int64_t data_len,
int64_t &pos);
DECLARE_TO_STRING;
public:
@ -3480,6 +3486,12 @@ public:
int assign (const ObBatchRemoveTabletArg &arg);
int init(const ObIArray<common::ObTabletID> &tablet_ids,
const share::ObLSID id);
static int is_old_mds(const char *buf,
int64_t data_len,
bool &is_old_mds);
static int skip_array_len(const char *buf,
int64_t data_len,
int64_t &pos);
DECLARE_TO_STRING;
public:

View File

@ -66,6 +66,64 @@ int ObBatchUnbindTabletArg::assign(const ObBatchUnbindTabletArg &other)
return ret;
}
int ObBatchUnbindTabletArg::skip_array_len(const char *buf,
int64_t data_len,
int64_t &pos)
{
int ret = OB_SUCCESS;
int64_t count = 0;
if (pos > data_len) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len), K(pos));
} else if (OB_FAIL(serialization::decode_vi64(buf, data_len, pos, &count))) {
TRANS_LOG(WARN, "failed to decode array count", K(ret), KP(buf), K(data_len));
} else if (count < 0) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len), K(pos), K(count));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) {
ObTabletID tablet_id;
OB_UNIS_DECODE(tablet_id);
}
}
return ret;
}
int ObBatchUnbindTabletArg::is_old_mds(const char *buf,
int64_t data_len,
bool &is_old_mds)
{
int ret = OB_SUCCESS;
int64_t pos = 0;
is_old_mds = false;
int64_t version = 0;
int64_t len = 0;
uint64_t tenant_id;
share::ObLSID id;
int64_t schema_version;
if (OB_ISNULL(buf) || OB_UNLIKELY(data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(data_len));
} else {
LST_DO_CODE(OB_UNIS_DECODE, version, len, tenant_id, id, schema_version);
if (OB_FAIL(ret)) {
}
// orig tablets array
else if (OB_FAIL(skip_array_len(buf, data_len, pos))) {
TRANS_LOG(WARN, "failed to skip_unis_array_len", K(ret), KP(buf), K(data_len), K(pos));
}
// hidden tablets array
else if (OB_FAIL(skip_array_len(buf, data_len, pos))) {
TRANS_LOG(WARN, "failed to skip_unis_array_len", K(ret), KP(buf), K(data_len), K(pos));
} else {
LST_DO_CODE(OB_UNIS_DECODE, is_old_mds);
}
}
return ret;
}
OB_DEF_SERIALIZE(ObBatchUnbindTabletArg)
{
int ret = OB_SUCCESS;

View File

@ -62,6 +62,8 @@ public:
inline bool is_redefined() const { return schema_version_ != OB_INVALID_VERSION; }
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(schema_version), K_(orig_tablet_ids), K_(hidden_tablet_ids));
bool is_valid() { return true; }
static int is_old_mds(const char *buf, const int64_t len, bool &is_old_mds);
static int skip_array_len(const char *buf, int64_t data_len, int64_t &pos);
OB_UNIS_VERSION_V(1);
public:

View File

@ -208,11 +208,14 @@ int ObTabletCreateDeleteHelper::process_for_old_mds(
{
int ret = OB_SUCCESS;
Arg arg;
bool is_old_mds = false;
if (OB_ISNULL(buf) || OB_UNLIKELY(len <= 0)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), KP(buf), K(len));
} else {
} else if (OB_FAIL(Arg::is_old_mds(buf, len, is_old_mds))) {
TRANS_LOG(WARN, "failed to is_old_mds", K(ret), KP(buf), K(len));
} else if (is_old_mds) {
do {
int64_t pos = 0;
if (OB_FAIL(arg.deserialize(buf, len, pos))) {
@ -224,37 +227,41 @@ int ObTabletCreateDeleteHelper::process_for_old_mds(
}
}
} while (OB_FAIL(ret) && !notify_arg.for_replay_);
}
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "arg is invalid", K(ret), K(arg));
} else if (arg.is_old_mds_) {
mds::MdsCtx mds_ctx;
mds_ctx.set_binding_type_id(mds::TupleTypeIdx<mds::BufferCtxTupleHelper, mds::MdsCtx>::value);
mds_ctx.set_writer(mds::MdsWriter(notify_arg.tx_id_));
if (notify_arg.for_replay_) {
if (OB_FAIL(Helper::replay_process(arg, notify_arg.scn_, mds_ctx))) {
ret = OB_EAGAIN;
TRANS_LOG(WARN, "failed to replay_process", K(ret), K(notify_arg), K(arg));
}
} else {
do {
if (OB_FAIL(Helper::register_process(arg, mds_ctx))) {
TRANS_LOG(ERROR, "fail to register_process, retry", K(ret), K(arg), K(notify_arg));
usleep(100 * 1000);
}
} while (OB_FAIL(ret));
}
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "arg is invalid", K(ret), K(arg));
} else if (!arg.is_old_mds_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "arg is not old mds, but buf is old mds", K(ret), K(arg));
} else {
mds_ctx.single_log_commit(notify_arg.trans_version_, notify_arg.scn_);
TRANS_LOG(INFO, "replay create commit for old_mds", KR(ret), K(arg));
mds::MdsCtx mds_ctx;
mds_ctx.set_binding_type_id(mds::TupleTypeIdx<mds::BufferCtxTupleHelper, mds::MdsCtx>::value);
mds_ctx.set_writer(mds::MdsWriter(notify_arg.tx_id_));
if (notify_arg.for_replay_) {
if (OB_FAIL(Helper::replay_process(arg, notify_arg.scn_, mds_ctx))) {
ret = OB_EAGAIN;
TRANS_LOG(WARN, "failed to replay_process", K(ret), K(notify_arg), K(arg));
}
} else {
do {
if (OB_FAIL(Helper::register_process(arg, mds_ctx))) {
TRANS_LOG(ERROR, "fail to register_process, retry", K(ret), K(arg), K(notify_arg));
usleep(100 * 1000);
}
} while (OB_FAIL(ret));
}
if (OB_FAIL(ret)) {
} else {
mds_ctx.single_log_commit(notify_arg.trans_version_, notify_arg.scn_);
TRANS_LOG(INFO, "replay create commit for old_mds", KR(ret), K(arg));
}
}
}
return ret;
};

View File

@ -58,6 +58,7 @@ storage_unittest(test_mds_dump_kv multi_data_source/test_mds_dump_kv.cpp)
#storage_unittest(test_memtable_multi_version_row_iterator memtable/test_memtable_multi_version_row_iterator.cpp)
#storage_unittest(test_new_table_store)
storage_unittest(test_mds_table_handle multi_data_source/test_mds_table_handle.cpp)
storage_unittest(test_is_old_mds multi_data_source/test_is_old_mds.cpp)
storage_unittest(test_fixed_size_block_allocator)
storage_unittest(test_dag_warning_history)
storage_unittest(test_storage_schema)

View File

@ -0,0 +1,168 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "share/ob_errno.h"
#include <gtest/gtest.h>
#include "storage/schema_utils.h"
#include "storage/test_tablet_helper.h"
#define USING_LOG_PREFIX STORAGE
#define UNITTEST
namespace oceanbase
{
namespace storage
{
namespace checkpoint
{
class TestIsOldMds : public ::testing::Test
{
public:
TestIsOldMds() {}
virtual ~TestIsOldMds() = default;
static void SetUpTestCase()
{
}
static void TearDownTestCase()
{
}
};
TEST_F(TestIsOldMds, test_is_old_mds_for_unbind) {
int ret = OB_SUCCESS;
ObBatchUnbindTabletArg arg;
for (int64_t i =0; OB_SUCC(ret) && i < 5; i++) {
arg.orig_tablet_ids_.push_back(ObTabletID(1));
ASSERT_EQ(OB_SUCCESS, ret);
}
for (int64_t i =0; OB_SUCC(ret) && i < 5; i++) {
arg.hidden_tablet_ids_.push_back(ObTabletID(1));
ASSERT_EQ(OB_SUCCESS, ret);
}
int64_t len = 5000;
char buf[5000] = {0};
int64_t pos = 0;
bool is_old_mds;
arg.is_old_mds_ = false;
ret = arg.serialize(buf, len, pos);
ASSERT_EQ(OB_SUCCESS, ret);
ret = arg.is_old_mds(buf, len, is_old_mds);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(false, is_old_mds);
pos = 0;
arg.is_old_mds_ = true;
memset(buf, 0, 5000);
ret = arg.serialize(buf, len, pos);
ASSERT_EQ(OB_SUCCESS, ret);
ret = arg.is_old_mds(buf, len, is_old_mds);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(true, is_old_mds);
}
TEST_F(TestIsOldMds, test_is_old_mds_for_delete) {
int ret = OB_SUCCESS;
ObBatchRemoveTabletArg arg;
for (int64_t i =0; OB_SUCC(ret) && i < 5; i++) {
arg.tablet_ids_.push_back(ObTabletID(1));
ASSERT_EQ(OB_SUCCESS, ret);
}
int64_t len = 5000;
char buf[5000] = {0};
int64_t pos = 0;
bool is_old_mds;
arg.is_old_mds_ = false;
ret = arg.serialize(buf, len, pos);
ASSERT_EQ(OB_SUCCESS, ret);
ret = arg.is_old_mds(buf, len, is_old_mds);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(false, is_old_mds);
pos = 0;
arg.is_old_mds_ = true;
memset(buf, 0, 5000);
ret = arg.serialize(buf, len, pos);
ASSERT_EQ(OB_SUCCESS, ret);
ret = arg.is_old_mds(buf, len, is_old_mds);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(true, is_old_mds);
}
TEST_F(TestIsOldMds, test_is_old_mds_for_create) {
int ret = OB_SUCCESS;
ObBatchCreateTabletArg arg;
ObTableSchema table_schema;
TestSchemaUtils::prepare_data_schema(table_schema);
ret = arg.table_schemas_.push_back(table_schema);
ASSERT_EQ(OB_SUCCESS, ret);
ret = arg.table_schemas_.push_back(table_schema);
ASSERT_EQ(OB_SUCCESS, ret);
for (int64_t i =0; OB_SUCC(ret) && i < 5; i++) {
obrpc::ObCreateTabletInfo create_tablet_info;
create_tablet_info.compat_mode_ = lib::Worker::CompatMode::MYSQL;
create_tablet_info.data_tablet_id_ = ObTabletID(1);
ASSERT_EQ(OB_SUCCESS, ret);
for (int64_t j = 0; OB_SUCC(ret) && j < 5; ++j) {
create_tablet_info.tablet_ids_.push_back(ObTabletID(1));
ASSERT_EQ(OB_SUCCESS, ret);
create_tablet_info.table_schema_index_.push_back(0);
ASSERT_EQ(OB_SUCCESS, ret);
}
arg.tablets_.push_back(create_tablet_info);
ASSERT_EQ(OB_SUCCESS, ret);
}
int64_t len = 5000;
char buf[5000] = {0};
int64_t pos = 0;
bool is_old_mds;
arg.is_old_mds_ = false;
ret = arg.serialize(buf, len, pos);
ASSERT_EQ(OB_SUCCESS, ret);
ret = arg.is_old_mds(buf, len, is_old_mds);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(false, is_old_mds);
pos = 0;
arg.is_old_mds_ = true;
memset(buf, 0, 5000);
ret = arg.serialize(buf, len, pos);
ASSERT_EQ(OB_SUCCESS, ret);
ret = arg.is_old_mds(buf, len, is_old_mds);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(true, is_old_mds);
}
}
} // end namespace storage
} // end namespace oceanbase
int main(int argc, char **argv)
{
int ret = 0;
system("rm -f test_is_old_mds.log*");
OB_LOGGER.set_file_name("test_is_old_mds.log", true);
OB_LOGGER.set_log_level("INFO");
signal(49, SIG_IGN);
testing::InitGoogleTest(&argc, argv);
ret = RUN_ALL_TESTS();
return ret;
}