[CP] [MDS] deserialize buffer_ctx by multi_data_source id
This commit is contained in:
@ -26,6 +26,7 @@
|
||||
#include "storage/multi_data_source/runtime_utility/mds_tenant_service.h"
|
||||
#include "storage/tx/ob_trans_define.h"
|
||||
#include "storage/multi_data_source/runtime_utility/mds_tenant_service.h"
|
||||
#include "storage/tx/ob_trans_define.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -117,17 +118,65 @@ int deserialize_<BufferCtxTupleHelper::get_element_size()>(BufferCtx *&ctx_,
|
||||
return ret;
|
||||
}
|
||||
|
||||
/****************************************************for compat********************************************************/
|
||||
transaction::ObTxBufferNode *get_current_tx_buffer_node() {
|
||||
transaction::ObTxBufferNode *tx_buffer_node = nullptr;
|
||||
if (transaction::TLOCAL_P_TX_BUFFER_NODE_ARRAY) {
|
||||
transaction::ObTxBufferNodeArray &array = *transaction::TLOCAL_P_TX_BUFFER_NODE_ARRAY;
|
||||
for (int64_t idx = 0; idx < array.count(); ++idx) {
|
||||
if (!array[idx].has_deserialized_buffer_ctx()) {
|
||||
tx_buffer_node = &array[idx];
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return tx_buffer_node;
|
||||
}
|
||||
int get_ctx_type_id_by_multi_data_source_type_idx(const transaction::ObTxDataSourceType multi_data_source_type, int64_t &ctx_type_idx) {
|
||||
int ret = OB_SUCCESS;
|
||||
switch (multi_data_source_type) {
|
||||
#define NEED_GENERATE_MDS_FRAME_CODE_FOR_TRANSACTION
|
||||
#define _GENERATE_MDS_FRAME_CODE_FOR_TRANSACTION_(HELPER_CLASS, BUFFER_CTX_TYPE, ID, ENUM_NAME) \
|
||||
case transaction::ObTxDataSourceType::ENUM_NAME:\
|
||||
{\
|
||||
ctx_type_idx = TupleTypeIdx<BufferCtxTupleHelper, BUFFER_CTX_TYPE>::value;\
|
||||
}\
|
||||
break;
|
||||
#include "storage/multi_data_source/compile_utility/mds_register.h"
|
||||
#undef _GENERATE_MDS_FRAME_CODE_FOR_TRANSACTION_
|
||||
#undef NEED_GENERATE_MDS_FRAME_CODE_FOR_TRANSACTION
|
||||
default:// this is an old MDS out of FRAME code, for example: table lock
|
||||
MDS_LOG(INFO, "this multi data source is out of frame", KR(ret), K(ctx_type_idx), K(multi_data_source_type));
|
||||
break;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
/**********************************************************************************************************************/
|
||||
int BufferCtxNode::deserialize(const char *buf, const int64_t buf_len, int64_t &pos, ObIAllocator &allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
MDS_TG(10_ms);
|
||||
int64_t type_idx = INVALID_VALUE;
|
||||
if (MDS_FAIL(serialization::decode(buf, buf_len, pos, type_idx))) {
|
||||
MDS_LOG(ERROR, "fail to deserialize buffer ctx id", KR(ret), K(type_idx));
|
||||
} else if (INVALID_VALUE == type_idx) {
|
||||
MDS_LOG(DEBUG, "deserialized INVALD buffer ctx", KR(ret), K(type_idx), K(buf_len), K(pos));
|
||||
} else if (MDS_FAIL(deserialize_<0>(ctx_, type_idx, buf, buf_len, pos, allocator))) {
|
||||
MDS_LOG(WARN, "deserialized buffer ctx failed", KR(ret), K(type_idx));
|
||||
int64_t ctx_type_idx = INVALID_VALUE;
|
||||
transaction::ObTxBufferNode *tx_buffer_node = get_current_tx_buffer_node();
|
||||
if (MDS_FAIL(serialization::decode(buf, buf_len, pos, ctx_type_idx))) {
|
||||
MDS_LOG(ERROR, "fail to deserialize buffer ctx id", KR(ret), K(ctx_type_idx));
|
||||
} else if (INVALID_VALUE == ctx_type_idx) {
|
||||
MDS_LOG(DEBUG, "deserialized INVALD buffer ctx", KR(ret), K(ctx_type_idx), K(buf_len), K(pos));
|
||||
} else {
|
||||
if (tx_buffer_node) {
|
||||
if (OB_FAIL(get_ctx_type_id_by_multi_data_source_type_idx(tx_buffer_node->get_data_source_type(), ctx_type_idx))) {
|
||||
MDS_LOG(ERROR, "fail get_ctx_type_id_by_multi_data_source_type_idx", KR(ret), K(ctx_type_idx));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (MDS_FAIL(deserialize_<0>(ctx_, ctx_type_idx, buf, buf_len, pos, allocator))) {
|
||||
MDS_LOG(WARN, "deserialized buffer ctx failed", KR(ret), K(ctx_type_idx));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (tx_buffer_node) {
|
||||
tx_buffer_node->set_has_deserialized_buffer_ctx();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -88,6 +88,7 @@ int ObTabletCreateMdsCtx::serialize(char *buf, const int64_t buf_len, int64_t &p
|
||||
int ObTabletCreateMdsCtx::deserialize(const char *buf, const int64_t buf_len, int64_t &pos)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t origin_pos = pos;
|
||||
int64_t tmp_pos = pos;
|
||||
int32_t magic = -1;
|
||||
int32_t version = -1;
|
||||
@ -100,13 +101,19 @@ int ObTabletCreateMdsCtx::deserialize(const char *buf, const int64_t buf_len, in
|
||||
LOG_WARN("invalid args", K(ret), K(buf), K(buf_len), K(pos));
|
||||
} else if (OB_FAIL(MdsCtx::deserialize(buf, buf_len, tmp_pos))) {
|
||||
LOG_WARN("fail to deserialize mds ctx", K(ret), K(buf_len), K(tmp_pos));
|
||||
} else if (OB_FAIL(serialization::decode(buf, buf_len, tmp_pos, magic))) {
|
||||
LOG_WARN("failed to deserialize magic", K(ret), K(buf_len), K(tmp_pos));
|
||||
} else if (OB_UNLIKELY(magic != MAGIC)) {
|
||||
FLOG_INFO("magic does not match, maybe this is old version data", K(ret), K(magic), LITERAL_K(MAGIC));
|
||||
version_ = VERSION;
|
||||
ls_id_ = ObLSID::INVALID_LS_ID;
|
||||
} else {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
pos = tmp_pos;
|
||||
bool is_old_data = false;
|
||||
if (tmp_pos == buf_len) {
|
||||
LOG_WARN("buffer is not enough for magic deserialize", K(ret), K(buf_len), K(tmp_pos));
|
||||
is_old_data = true;
|
||||
} else if (OB_TMP_FAIL(serialization::decode(buf, buf_len, tmp_pos, magic))) {
|
||||
LOG_WARN("decode magic from buffer failed", K(tmp_ret), K(ret), K(buf_len), K(tmp_pos));
|
||||
is_old_data = true;
|
||||
} else if (magic != MAGIC) {
|
||||
LOG_WARN("magic not match", K(tmp_ret), K(ret), K(buf_len), K(tmp_pos));
|
||||
is_old_data = true;
|
||||
} else if (OB_FAIL(serialization::decode(buf, buf_len, tmp_pos, version))) {
|
||||
LOG_WARN("failed to deserialize version", K(ret), K(buf_len), K(tmp_pos));
|
||||
} else if (OB_UNLIKELY(VERSION != version)) {
|
||||
@ -114,9 +121,9 @@ int ObTabletCreateMdsCtx::deserialize(const char *buf, const int64_t buf_len, in
|
||||
LOG_WARN("version does not match", K(ret), K(version));
|
||||
} else if (OB_FAIL(serialization::decode_i64(buf, buf_len, tmp_pos, &serialize_size))) {
|
||||
LOG_WARN("failed to deserialize serialize size", K(ret), K(buf_len), K(tmp_pos));
|
||||
} else if (tmp_pos - pos < serialize_size && OB_FAIL(ls_id_.deserialize(buf, buf_len, tmp_pos))) {
|
||||
} else if (tmp_pos - origin_pos < serialize_size && OB_FAIL(ls_id_.deserialize(buf, buf_len, tmp_pos))) {
|
||||
LOG_WARN("failed to deserialize ls id", K(ret), K(buf_len), K(tmp_pos));
|
||||
} else if (OB_UNLIKELY(tmp_pos - pos != serialize_size)) {
|
||||
} else if (OB_UNLIKELY(tmp_pos - origin_pos != serialize_size)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("deserialize length does not match", K(ret), K(buf_len), K(pos), K(tmp_pos), K(serialize_size));
|
||||
} else {
|
||||
@ -124,6 +131,13 @@ int ObTabletCreateMdsCtx::deserialize(const char *buf, const int64_t buf_len, in
|
||||
pos = tmp_pos;
|
||||
}
|
||||
|
||||
if (is_old_data) {
|
||||
FLOG_INFO("maybe meet old version data", K(ret), K(magic), LITERAL_K(MAGIC));
|
||||
version_ = VERSION;
|
||||
ls_id_ = ObLSID::INVALID_LS_ID;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -24,6 +24,8 @@ namespace oceanbase
|
||||
namespace transaction
|
||||
{
|
||||
|
||||
thread_local ObTxBufferNodeArray *TLOCAL_P_TX_BUFFER_NODE_ARRAY = nullptr;// FIXME: for compat issue, should be removed after barrier version
|
||||
|
||||
ObTxBufferNode::ObTxBufferNode()
|
||||
: seq_no_(),
|
||||
type_(ObTxDataSourceType::UNKNOWN),
|
||||
@ -72,6 +74,7 @@ void ObTxBufferNode::reset()
|
||||
has_submitted_ = false;
|
||||
has_synced_ = false;
|
||||
mds_base_scn_.reset();
|
||||
has_deserialized_buffer_ctx_ = false;
|
||||
}
|
||||
|
||||
int ObTxBufferNode::set_mds_register_no(const uint64_t register_no)
|
||||
|
||||
@ -77,6 +77,9 @@ public:
|
||||
void set_synced() { has_synced_ = true; }
|
||||
bool is_synced() const { return has_synced_; }
|
||||
|
||||
void set_has_deserialized_buffer_ctx() { has_deserialized_buffer_ctx_ = true; }
|
||||
bool has_deserialized_buffer_ctx() const { return has_deserialized_buffer_ctx_; }
|
||||
|
||||
const share::SCN &get_base_scn() { return mds_base_scn_; }
|
||||
|
||||
bool operator==(const ObTxBufferNode &buffer_node) const;
|
||||
@ -92,12 +95,14 @@ public:
|
||||
K(has_submitted_),
|
||||
K(has_synced_),
|
||||
"type", ObMultiDataSourcePrinter::to_str_mds_type(type_),
|
||||
K(data_.length()));
|
||||
K(data_.length()),
|
||||
K(has_deserialized_buffer_ctx_));
|
||||
private:
|
||||
uint64_t register_no_;
|
||||
ObTxSEQ seq_no_;
|
||||
bool has_submitted_;
|
||||
bool has_synced_;
|
||||
bool has_deserialized_buffer_ctx_;// FIXME: for compat issue, should be removed after barrier version
|
||||
share::SCN mds_base_scn_;
|
||||
ObTxDataSourceType type_;
|
||||
common::ObString data_;
|
||||
@ -128,6 +133,7 @@ private:
|
||||
|
||||
typedef common::ObSEArray<ObTxBufferNode, 1> ObTxBufferNodeArray;
|
||||
typedef common::ObSEArray<storage::mds::BufferCtxNode , 1> ObTxBufferCtxArray;
|
||||
extern thread_local ObTxBufferNodeArray *TLOCAL_P_TX_BUFFER_NODE_ARRAY;// FIXME: for compat issue, should be removed after barrier version
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -217,8 +217,11 @@ int ObTxCtxTableRecoverHelper::recover(const blocksstable::ObDatumRow &row,
|
||||
int64_t pos = 0;
|
||||
bool tx_ctx_existed = true;
|
||||
ctx_info_.set_compatible_version(curr_meta.get_version());
|
||||
if (OB_FAIL(ctx_info_.deserialize(deserialize_buf, deserialize_buf_length, pos, tx_data_table))) {
|
||||
if (FALSE_IT(TLOCAL_P_TX_BUFFER_NODE_ARRAY = &ctx_info_.exec_info_.multi_data_source_)) {// FIXME: for compat issue, should be removed after barrier version
|
||||
} else if (OB_FAIL(ctx_info_.deserialize(deserialize_buf, deserialize_buf_length, pos, tx_data_table))) {
|
||||
STORAGE_LOG(WARN, "failed to deserialize status_info", K(ret), K_(ctx_info));
|
||||
TLOCAL_P_TX_BUFFER_NODE_ARRAY = nullptr;// FIXME: for compat issue, should be removed after barrier version
|
||||
} else if (FALSE_IT(TLOCAL_P_TX_BUFFER_NODE_ARRAY = nullptr)) {// FIXME: for compat issue, should be removed after barrier version
|
||||
} else if (FALSE_IT(ctx_info_.exec_info_.mrege_buffer_ctx_array_to_multi_data_source())) {
|
||||
} else if (OB_FAIL(recover_one_tx_ctx_(ls_tx_ctx_mgr, ctx_info_))) {
|
||||
// heap memory needed be freed, but can not do this in destruction, cause tx_buffer_node has no value sematics
|
||||
|
||||
@ -68,6 +68,7 @@ storage_unittest(test_mvcc_callback memtable/mvcc/test_mvcc_callback.cpp)
|
||||
# storage_unittest(test_mds_compile multi_data_source/test_mds_compile.cpp)
|
||||
storage_unittest(test_mds_list multi_data_source/test_mds_list.cpp)
|
||||
storage_unittest(test_mds_node multi_data_source/test_mds_node.cpp)
|
||||
storage_unittest(test_mds_new_ctx_deserialized multi_data_source/test_mds_new_ctx_deserialized.cpp)
|
||||
# storage_unittest(test_mds_row multi_data_source/test_mds_row.cpp)
|
||||
# storage_unittest(test_mds_unit multi_data_source/test_mds_unit.cpp)
|
||||
storage_unittest(test_mds_table multi_data_source/test_mds_table.cpp)
|
||||
|
||||
@ -0,0 +1,122 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
#include "storage/multi_data_source/compile_utility/compile_mapper.h"
|
||||
#include "storage/multi_data_source/mds_ctx.h"
|
||||
#include "storage/multi_data_source/mds_writer.h"
|
||||
#include "storage/multi_data_source/ob_tablet_create_mds_ctx.h"
|
||||
#include "storage/tx/ob_trans_define.h"
|
||||
#define UNITTEST_DEBUG
|
||||
#include "lib/utility/utility.h"
|
||||
#include <atomic>
|
||||
#include <gtest/gtest.h>
|
||||
#define private public
|
||||
#define protected public
|
||||
#include "storage/multi_data_source/compile_utility/mds_dummy_key.h"
|
||||
#include "storage/multi_data_source/runtime_utility/common_define.h"
|
||||
|
||||
#include <thread>
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include <chrono>
|
||||
#include "storage/multi_data_source/runtime_utility/mds_factory.h"
|
||||
#include "common/ob_clock_generator.h"
|
||||
#include "storage/multi_data_source/mds_node.h"
|
||||
#include "common/meta_programming/ob_type_traits.h"
|
||||
#include "storage/multi_data_source/mds_row.h"
|
||||
namespace oceanbase {
|
||||
namespace storage
|
||||
{
|
||||
namespace mds
|
||||
{
|
||||
void *MdsAllocator::alloc(const int64_t size)
|
||||
{
|
||||
void *ptr = ob_malloc(size, "MDS");
|
||||
ATOMIC_INC(&alloc_times_);
|
||||
MDS_LOG(DEBUG, "alloc obj", KP(ptr), K(size), K(lbt()));
|
||||
return ptr;
|
||||
}
|
||||
void MdsAllocator::free(void *ptr) {
|
||||
ATOMIC_INC(&free_times_);
|
||||
MDS_LOG(DEBUG, "free obj", KP(ptr), K(lbt()));
|
||||
ob_free(ptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
namespace unittest {
|
||||
|
||||
using namespace common;
|
||||
using namespace std;
|
||||
using namespace storage;
|
||||
using namespace mds;
|
||||
|
||||
class TestMdsNewCtxDeserialized: public ::testing::Test
|
||||
{
|
||||
public:
|
||||
TestMdsNewCtxDeserialized() {};
|
||||
virtual ~TestMdsNewCtxDeserialized() {};
|
||||
virtual void SetUp() {
|
||||
};
|
||||
virtual void TearDown() {
|
||||
};
|
||||
private:
|
||||
// disallow copy
|
||||
DISALLOW_COPY_AND_ASSIGN(TestMdsNewCtxDeserialized);
|
||||
};
|
||||
|
||||
TEST_F(TestMdsNewCtxDeserialized, deserialized_from_mds_ctx) {
|
||||
MdsCtx old_ctx;
|
||||
old_ctx.set_writer(MdsWriter(transaction::ObTransID(1)));
|
||||
old_ctx.set_binding_type_id(TupleTypeIdx<mds::BufferCtxTupleHelper, MdsCtx>::value);
|
||||
char buffer[1024];
|
||||
for (auto &ch : buffer)
|
||||
ch = 0xff;
|
||||
int64_t pos = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, old_ctx.serialize(buffer, 1024, pos));
|
||||
int64_t buffer_len = pos;
|
||||
pos = 0;
|
||||
ObTabletCreateMdsCtx new_ctx1;
|
||||
ASSERT_EQ(OB_SUCCESS, new_ctx1.deserialize(buffer, buffer_len, pos));
|
||||
pos = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, new_ctx1.deserialize(buffer, buffer_len + 1, pos));
|
||||
pos = 0;
|
||||
for (int idx = buffer_len; idx < 1024; ++idx)
|
||||
buffer[idx] = 0;
|
||||
ASSERT_EQ(OB_SUCCESS, new_ctx1.deserialize(buffer, buffer_len + 10, pos));
|
||||
pos = 0;
|
||||
ObStartTransferInMdsCtx new_ctx2;
|
||||
ASSERT_EQ(OB_SUCCESS, new_ctx2.deserialize(buffer, buffer_len, pos));
|
||||
pos = 0;
|
||||
ObFinishTransferInMdsCtx new_ctx3;
|
||||
ASSERT_EQ(OB_SUCCESS, new_ctx3.deserialize(buffer, buffer_len, pos));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
system("rm -rf test_mds_new_ctx_deserialized.log");
|
||||
oceanbase::common::ObLogger &logger = oceanbase::common::ObLogger::get_logger();
|
||||
logger.set_file_name("test_mds_new_ctx_deserialized.log", false);
|
||||
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
int ret = RUN_ALL_TESTS();
|
||||
int64_t alloc_times = oceanbase::storage::mds::MdsAllocator::get_alloc_times();
|
||||
int64_t free_times = oceanbase::storage::mds::MdsAllocator::get_free_times();
|
||||
if (alloc_times != free_times) {
|
||||
MDS_LOG(ERROR, "memory may leak", K(free_times), K(alloc_times));
|
||||
ret = -1;
|
||||
} else {
|
||||
MDS_LOG(INFO, "all memory released", K(free_times), K(alloc_times));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
Reference in New Issue
Block a user