[MDS] add check_transfer_in_redo_written() interface on tablet
This commit is contained in:
@ -81,7 +81,6 @@ ob_unittest_observer(test_ob_tablet_to_ls_operator test_ob_tablet_to_ls_operator
|
|||||||
ob_unittest_observer(test_tenant_transfer_service test_tenant_transfer_service.cpp)
|
ob_unittest_observer(test_tenant_transfer_service test_tenant_transfer_service.cpp)
|
||||||
ob_unittest_observer(test_schema_service_sql_impl test_schema_service_sql_impl.cpp)
|
ob_unittest_observer(test_schema_service_sql_impl test_schema_service_sql_impl.cpp)
|
||||||
ob_unittest_observer(test_location_service test_location_service.cpp)
|
ob_unittest_observer(test_location_service test_location_service.cpp)
|
||||||
ob_unittest_observer(test_mds_transaction test_mds_transaction.cpp)
|
|
||||||
ob_unittest_observer(test_mds_tx_ctx_recover_mem_leak test_mds_tx_ctx_recover_mem_leak.cpp)
|
ob_unittest_observer(test_mds_tx_ctx_recover_mem_leak test_mds_tx_ctx_recover_mem_leak.cpp)
|
||||||
ob_unittest_observer(test_role_change_service test_role_change_service.cpp)
|
ob_unittest_observer(test_role_change_service test_role_change_service.cpp)
|
||||||
ob_unittest_observer(test_arbitration_service_rpc test_arbitration_service_rpc.cpp)
|
ob_unittest_observer(test_arbitration_service_rpc test_arbitration_service_rpc.cpp)
|
||||||
@ -122,4 +121,5 @@ ob_ha_unittest_observer(test_transfer_complete_restart_without_mds_flush storage
|
|||||||
ob_ha_unittest_observer(test_transfer_doing_stage_restart_with_mds_flush storage_ha/test_transfer_doing_stage_restart_with_mds_flush.cpp)
|
ob_ha_unittest_observer(test_transfer_doing_stage_restart_with_mds_flush storage_ha/test_transfer_doing_stage_restart_with_mds_flush.cpp)
|
||||||
ob_ha_unittest_observer(test_transfer_complete_restart_with_mds_flush storage_ha/test_transfer_complete_restart_with_mds_flush.cpp)
|
ob_ha_unittest_observer(test_transfer_complete_restart_with_mds_flush storage_ha/test_transfer_complete_restart_with_mds_flush.cpp)
|
||||||
ob_ha_unittest_observer(test_transfer_with_empty_shell storage_ha/test_transfer_with_empty_shell.cpp)
|
ob_ha_unittest_observer(test_transfer_with_empty_shell storage_ha/test_transfer_with_empty_shell.cpp)
|
||||||
|
ob_ha_unittest_observer(test_mds_transaction test_mds_transaction.cpp)
|
||||||
errsim_ha_unittest_observer(errsim_test_transfer_handler errsim/storage_ha/errsim_test_transfer_handler.cpp)
|
errsim_ha_unittest_observer(errsim_test_transfer_handler errsim/storage_ha/errsim_test_transfer_handler.cpp)
|
||||||
@ -13,6 +13,9 @@
|
|||||||
#ifndef DEBUG_FOR_MDS
|
#ifndef DEBUG_FOR_MDS
|
||||||
#define DEBUG_FOR_MDS
|
#define DEBUG_FOR_MDS
|
||||||
#include "lib/ob_errno.h"
|
#include "lib/ob_errno.h"
|
||||||
|
#include "storage/multi_data_source/test/common_define.h"
|
||||||
|
#include "storage/tablet/ob_tablet_create_delete_mds_user_data.h"
|
||||||
|
#include "storage/tablet/ob_tablet_status.h"
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#define TEST_MDS_TRANSACTION
|
#define TEST_MDS_TRANSACTION
|
||||||
@ -84,6 +87,14 @@ class TestMdsTransactionTest : public ObSimpleClusterTestBase
|
|||||||
public:
|
public:
|
||||||
// 指定case运行目录前缀 test_ob_simple_cluster_
|
// 指定case运行目录前缀 test_ob_simple_cluster_
|
||||||
TestMdsTransactionTest() : ObSimpleClusterTestBase("test_mds_transaction_") {}
|
TestMdsTransactionTest() : ObSimpleClusterTestBase("test_mds_transaction_") {}
|
||||||
|
virtual void SetUp() override {
|
||||||
|
ObSimpleClusterTestBase::SetUp();
|
||||||
|
OB_LOGGER.set_log_level("TRACE");
|
||||||
|
}
|
||||||
|
virtual void TearDown() override {
|
||||||
|
OB_LOGGER.set_log_level("WDIAG");
|
||||||
|
ObSimpleClusterTestBase::TearDown();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
TEST_F(TestMdsTransactionTest, simple_test)
|
TEST_F(TestMdsTransactionTest, simple_test)
|
||||||
@ -136,9 +147,6 @@ TEST_F(TestMdsTransactionTest, test_for_each_kv_in_unit_in_tablet)
|
|||||||
data_2.data_version_ = 2;
|
data_2.data_version_ = 2;
|
||||||
data_3.data_version_ = 3;
|
data_3.data_version_ = 3;
|
||||||
data_11.data_version_ = 11;
|
data_11.data_version_ = 11;
|
||||||
auto mock_equal = [](const ObMediumCompactionInfo &lhs, const ObMediumCompactionInfo &rhs) -> bool {
|
|
||||||
return lhs.data_version_ == rhs.data_version_;
|
|
||||||
};
|
|
||||||
MTL_SWITCH(OB_SYS_TENANT_ID)
|
MTL_SWITCH(OB_SYS_TENANT_ID)
|
||||||
{
|
{
|
||||||
int64_t _;
|
int64_t _;
|
||||||
@ -180,17 +188,15 @@ TEST_F(TestMdsTransactionTest, test_for_each_kv_in_unit_in_tablet)
|
|||||||
|
|
||||||
TEST_F(TestMdsTransactionTest, test_mds_table_gc_and_recycle)
|
TEST_F(TestMdsTransactionTest, test_mds_table_gc_and_recycle)
|
||||||
{
|
{
|
||||||
|
MDS_LOG(INFO, "test_mds_table_gc_and_recycle");
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTabletBindingMdsUserData data_to_write;
|
ObTabletBindingMdsUserData data_to_write;
|
||||||
data_to_write.schema_version_ = 1;
|
data_to_write.schema_version_ = 1;
|
||||||
|
data_to_write.snapshot_version_ = 1;
|
||||||
data_to_write.data_tablet_id_ = ObTabletID(2);
|
data_to_write.data_tablet_id_ = ObTabletID(2);
|
||||||
data_to_write.hidden_tablet_id_ = ObTabletID(3);
|
data_to_write.hidden_tablet_id_ = ObTabletID(3);
|
||||||
data_to_write.lob_meta_tablet_id_ = ObTabletID(4);
|
data_to_write.lob_meta_tablet_id_ = ObTabletID(4);
|
||||||
data_to_write.lob_piece_tablet_id_ = ObTabletID(5);
|
data_to_write.lob_piece_tablet_id_ = ObTabletID(5);
|
||||||
auto mock_equal = [](const ObMediumCompactionInfo &lhs,
|
|
||||||
const ObMediumCompactionInfo &rhs) -> bool {
|
|
||||||
return lhs.data_version_ == rhs.data_version_;
|
|
||||||
};
|
|
||||||
MTL_SWITCH(OB_SYS_TENANT_ID)
|
MTL_SWITCH(OB_SYS_TENANT_ID)
|
||||||
{
|
{
|
||||||
int64_t _;
|
int64_t _;
|
||||||
@ -214,7 +220,7 @@ TEST_F(TestMdsTransactionTest, test_mds_table_gc_and_recycle)
|
|||||||
// 4. 从ls_id找到ls
|
// 4. 从ls_id找到ls
|
||||||
storage::ObLSHandle ls_handle;
|
storage::ObLSHandle ls_handle;
|
||||||
ASSERT_EQ(OB_SUCCESS, MTL(storage::ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::TRANS_MOD));
|
ASSERT_EQ(OB_SUCCESS, MTL(storage::ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::TRANS_MOD));
|
||||||
// 5. 从LS找到tablet结构
|
// 5. 从LS找到tablet结构1
|
||||||
storage::ObTabletHandle tablet_handle;
|
storage::ObTabletHandle tablet_handle;
|
||||||
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle));
|
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle));
|
||||||
// 6. 调用tablet接口写入多源数据,提交
|
// 6. 调用tablet接口写入多源数据,提交
|
||||||
@ -228,13 +234,15 @@ TEST_F(TestMdsTransactionTest, test_mds_table_gc_and_recycle)
|
|||||||
// ASSERT_EQ(OB_SUCCESS, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.get_rec_scn(rec_scn));
|
// ASSERT_EQ(OB_SUCCESS, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.get_rec_scn(rec_scn));
|
||||||
// ASSERT_EQ(mock_scn(10), rec_scn);
|
// ASSERT_EQ(mock_scn(10), rec_scn);
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(5));
|
std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||||
ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->mds_table_flush(share::SCN::max_scn()));
|
share::SCN max_decided_scn;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_max_decided_scn(max_decided_scn));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->mds_table_flush(max_decided_scn));
|
||||||
// 7. 检查mds table的存在情况
|
// 7. 检查mds table的存在情况
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(5));
|
std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||||
ASSERT_EQ(true, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid());
|
ASSERT_EQ(true, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid());
|
||||||
ASSERT_EQ(OB_SUCCESS, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.get_rec_scn(rec_scn));
|
ASSERT_EQ(OB_SUCCESS, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.get_rec_scn(rec_scn));
|
||||||
ASSERT_EQ(share::SCN::max_scn(), rec_scn);
|
ASSERT_EQ(share::SCN::max_scn(), rec_scn);
|
||||||
MDS_LOG(ERROR, "change mock_tablet_oldest_scn", K(tablet_id));
|
MDS_LOG(INFO, "change mock_tablet_oldest_scn", K(tablet_id));
|
||||||
mock_tablet_oldest_scn = unittest::mock_scn(2074916885902668817);
|
mock_tablet_oldest_scn = unittest::mock_scn(2074916885902668817);
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(15));
|
std::this_thread::sleep_for(std::chrono::seconds(15));
|
||||||
ASSERT_EQ(false, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid());
|
ASSERT_EQ(false, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid());
|
||||||
@ -243,6 +251,76 @@ TEST_F(TestMdsTransactionTest, test_mds_table_gc_and_recycle)
|
|||||||
OB_ASSERT(data_to_write.schema_version_ == data_to_read.schema_version_);
|
OB_ASSERT(data_to_write.schema_version_ == data_to_read.schema_version_);
|
||||||
return OB_SUCCESS;
|
return OB_SUCCESS;
|
||||||
}));
|
}));
|
||||||
|
mock_tablet_oldest_scn = unittest::mock_scn(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TestMdsTransactionTest, test_mds_table_get_tablet_status_transfer_in_written_state)
|
||||||
|
{
|
||||||
|
MDS_LOG(INFO, "test_mds_table_get_tablet_status_transfer_in_written_state");
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
bool written = false;
|
||||||
|
ObTabletCreateDeleteMdsUserData data_to_write;
|
||||||
|
data_to_write.tablet_status_ = ObTabletStatus::TRANSFER_IN;
|
||||||
|
data_to_write.create_commit_scn_ = mock_scn(10);
|
||||||
|
data_to_write.create_commit_version_ = 10;
|
||||||
|
data_to_write.transfer_ls_id_ = ObLSID(10);
|
||||||
|
data_to_write.transfer_scn_ = mock_scn(10);
|
||||||
|
MTL_SWITCH(OB_SYS_TENANT_ID)
|
||||||
|
{
|
||||||
|
int64_t _;
|
||||||
|
// 1. 新建一个tablet
|
||||||
|
ASSERT_EQ(OB_SUCCESS, GCTX.ddl_sql_proxy_->write(OB_SYS_TENANT_ID, "create table test_mds_table3(a int)", _));
|
||||||
|
// 2. 从表名拿到它的tablet_id
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ObTableAccessHelper::read_single_row(OB_SYS_TENANT_ID,
|
||||||
|
{"tablet_id"},
|
||||||
|
OB_ALL_TABLE_TNAME,
|
||||||
|
"where table_name = 'test_mds_table3'",
|
||||||
|
tablet_id));
|
||||||
|
// 3. 从tablet_id拿到它的ls_id
|
||||||
|
ObLSID ls_id;
|
||||||
|
char where_condition[512] = { 0 };
|
||||||
|
databuff_printf(where_condition, 512, "where tablet_id = %ld", tablet_id.id());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ObTableAccessHelper::read_single_row(OB_SYS_TENANT_ID,
|
||||||
|
{"ls_id"},
|
||||||
|
OB_ALL_TABLET_TO_LS_TNAME,
|
||||||
|
where_condition,
|
||||||
|
ls_id));
|
||||||
|
// 4. 从ls_id找到ls
|
||||||
|
storage::ObLSHandle ls_handle;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, MTL(storage::ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::TRANS_MOD));
|
||||||
|
// 5. 从LS找到tablet结构
|
||||||
|
storage::ObTabletHandle tablet_handle;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle));
|
||||||
|
// 6. 调用tablet接口写入多源数据,提交
|
||||||
|
MdsCtx ctx1(mds::MdsWriter(ObTransID(1)));
|
||||||
|
share::SCN rec_scn;
|
||||||
|
ASSERT_EQ(OB_STATE_NOT_MATCH, tablet_handle.get_obj()->check_transfer_in_redo_written(written));// 这个时候因为tablet status不是TRANSFER IN所以查不出来
|
||||||
|
ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->set(data_to_write, ctx1));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->check_transfer_in_redo_written(written));// 这个时候tablet status是TRANSFER IN, 但事务还没写日志,所以可以查出结果,但结果是false
|
||||||
|
ASSERT_EQ(false, written);
|
||||||
|
ctx1.single_log_commit(mock_scn(10), mock_scn(10000000));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->check_transfer_in_redo_written(written));// 这个时候tablet status是TRANSFER IN, 并且事务已经提交,所以可以查出结果,并且结果是true
|
||||||
|
ASSERT_EQ(true, written);
|
||||||
|
ASSERT_EQ(true, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid());
|
||||||
|
std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||||
|
share::SCN max_decided_scn;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_max_decided_scn(max_decided_scn));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->mds_table_flush(max_decided_scn));
|
||||||
|
// 7. 检查mds table的存在情况
|
||||||
|
std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||||
|
MDS_LOG(INFO, "print tablet id", K(tablet_id));
|
||||||
|
ASSERT_EQ(true, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid());
|
||||||
|
ASSERT_EQ(OB_SUCCESS, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.get_rec_scn(rec_scn));
|
||||||
|
ASSERT_EQ(share::SCN::max_scn(), rec_scn);
|
||||||
|
MDS_LOG(INFO, "change mock_tablet_oldest_scn", K(tablet_id));
|
||||||
|
mock_tablet_oldest_scn = unittest::mock_scn(2074916885902668817);
|
||||||
|
std::this_thread::sleep_for(std::chrono::seconds(15));
|
||||||
|
ASSERT_EQ(false, static_cast<ObTabletPointer*>(tablet_handle.get_obj()->pointer_hdl_.get_resource_ptr())->mds_table_handler_.mds_table_handle_.is_valid());// mds table已经释放
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle));// 重新获取一下tablet handle
|
||||||
|
ASSERT_EQ(OB_SUCCESS, tablet_handle.get_obj()->check_transfer_in_redo_written(written));// 这个时候tablet status是TRANSFER IN, 并且事务已经提交,所以可以查出结果,并且结果是true
|
||||||
|
ASSERT_EQ(true, written);
|
||||||
|
mock_tablet_oldest_scn = unittest::mock_scn(1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -261,7 +339,7 @@ int main(int argc, char **argv)
|
|||||||
{
|
{
|
||||||
int c = 0;
|
int c = 0;
|
||||||
int time_sec = 0;
|
int time_sec = 0;
|
||||||
char *log_level = (char*)"INFO";
|
char *log_level = (char*)"WDIAG";
|
||||||
while(EOF != (c = getopt(argc,argv,"t:l:"))) {
|
while(EOF != (c = getopt(argc,argv,"t:l:"))) {
|
||||||
switch(c) {
|
switch(c) {
|
||||||
case 't':
|
case 't':
|
||||||
|
|||||||
@ -24,11 +24,11 @@ bool FlushOp::operator()(const ObTabletID &, MdsTableBase *&mds_table)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (mds_table->is_switched_to_empty_shell()) {
|
if (mds_table->is_switched_to_empty_shell()) {
|
||||||
MDS_LOG_RET(INFO, ret, "skip empty shell tablet mds_table flush",
|
MDS_LOG(INFO, "skip empty shell tablet mds_table flush",
|
||||||
KPC(mds_table), K(scan_mds_table_cnt_), K_(max_consequent_callbacked_scn));
|
KPC(mds_table), K(scan_mds_table_cnt_), K_(max_consequent_callbacked_scn));
|
||||||
} else if (checkpoint::INVALID_TRACE_ID != trace_id_ && FALSE_IT(mds_table->set_trace_id(trace_id_))) {
|
} else if (checkpoint::INVALID_TRACE_ID != trace_id_ && FALSE_IT(mds_table->set_trace_id(trace_id_))) {
|
||||||
} else if (OB_FAIL(mds_table->flush(do_flush_limit_scn_, max_consequent_callbacked_scn_))) {
|
} else if (OB_FAIL(mds_table->flush(do_flush_limit_scn_, max_consequent_callbacked_scn_))) {
|
||||||
MDS_LOG_RET(WARN, ret, "flush mds table failed",
|
MDS_LOG(WARN, "flush mds table failed",
|
||||||
KR(ret), KPC(mds_table), K_(scan_mds_table_cnt), K_(max_consequent_callbacked_scn));
|
KR(ret), KPC(mds_table), K_(scan_mds_table_cnt), K_(max_consequent_callbacked_scn));
|
||||||
if (OB_SIZE_OVERFLOW == ret) {
|
if (OB_SIZE_OVERFLOW == ret) {
|
||||||
is_dag_full_ = true;
|
is_dag_full_ = true;
|
||||||
|
|||||||
@ -143,6 +143,8 @@ public:
|
|||||||
ObFunction<int(void *)> &op,
|
ObFunction<int(void *)> &op,
|
||||||
bool &is_committed,
|
bool &is_committed,
|
||||||
const int64_t read_seq) const = 0;
|
const int64_t read_seq) const = 0;
|
||||||
|
virtual int get_tablet_status_node(ObFunction<int(void *)> &op,
|
||||||
|
const int64_t read_seq) const = 0;
|
||||||
virtual int get_snapshot(int64_t unit_id,
|
virtual int get_snapshot(int64_t unit_id,
|
||||||
void *key,
|
void *key,
|
||||||
ObFunction<int(void *)> &op,
|
ObFunction<int(void *)> &op,
|
||||||
|
|||||||
@ -15,6 +15,8 @@
|
|||||||
#include "lib/ob_errno.h"
|
#include "lib/ob_errno.h"
|
||||||
#include "mds_table_impl.h"
|
#include "mds_table_impl.h"
|
||||||
#include "lib/guard/ob_light_shared_gaurd.h"
|
#include "lib/guard/ob_light_shared_gaurd.h"
|
||||||
|
#include "storage/multi_data_source/compile_utility/mds_dummy_key.h"
|
||||||
|
#include "storage/tablet/ob_tablet_create_delete_mds_user_data.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -54,6 +56,8 @@ public:
|
|||||||
int replay(T &&data, MdsCtx &ctx, const share::SCN &scn);
|
int replay(T &&data, MdsCtx &ctx, const share::SCN &scn);
|
||||||
template <typename T, typename OP, ENABLE_IF_LIKE_FUNCTION(OP, int(const T&))>
|
template <typename T, typename OP, ENABLE_IF_LIKE_FUNCTION(OP, int(const T&))>
|
||||||
int get_latest(OP &&read_op, bool &is_committed, const int64_t read_seq = 0) const;
|
int get_latest(OP &&read_op, bool &is_committed, const int64_t read_seq = 0) const;
|
||||||
|
template <typename OP, ENABLE_IF_LIKE_FUNCTION(OP, int(const UserMdsNode<DummyKey, ObTabletCreateDeleteMdsUserData>&))>
|
||||||
|
int get_tablet_status_node(OP &&read_op, const int64_t read_seq = 0) const;
|
||||||
template <typename T, typename OP, ENABLE_IF_LIKE_FUNCTION(OP, int(const T&))>
|
template <typename T, typename OP, ENABLE_IF_LIKE_FUNCTION(OP, int(const T&))>
|
||||||
int get_snapshot(OP &&read_op,
|
int get_snapshot(OP &&read_op,
|
||||||
const share::SCN snapshot = share::SCN::max_scn(),
|
const share::SCN snapshot = share::SCN::max_scn(),
|
||||||
|
|||||||
@ -14,6 +14,7 @@
|
|||||||
#define STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_HANDLE_IPP
|
#define STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_HANDLE_IPP
|
||||||
|
|
||||||
#include "lib/ob_errno.h"
|
#include "lib/ob_errno.h"
|
||||||
|
#include "storage/multi_data_source/mds_node.h"
|
||||||
#ifndef STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_HANDLE_H_IPP
|
#ifndef STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_HANDLE_H_IPP
|
||||||
#define STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_HANDLE_H_IPP
|
#define STORAGE_MULTI_DATE_SOURCE_MDS_TABLE_HANDLE_H_IPP
|
||||||
#include "mds_table_handle.h"
|
#include "mds_table_handle.h"
|
||||||
@ -240,6 +241,30 @@ int MdsTableHandle::get_latest(OP &&read_op, bool &is_committed, const int64_t r
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename OP>
|
||||||
|
struct GetTabletStatusNodeOpWrapper {
|
||||||
|
GetTabletStatusNodeOpWrapper(OP &op) : op_(op) {}
|
||||||
|
int operator()(void *tablet_status_node) {
|
||||||
|
return op_(*reinterpret_cast<const UserMdsNode<DummyKey, ObTabletCreateDeleteMdsUserData> *>(tablet_status_node));
|
||||||
|
}
|
||||||
|
OP &op_;
|
||||||
|
};
|
||||||
|
template <typename OP,
|
||||||
|
typename std::enable_if<OB_TRAIT_IS_FUNCTION_LIKE(OP,
|
||||||
|
int(const UserMdsNode<DummyKey, ObTabletCreateDeleteMdsUserData>&)), bool>::type>
|
||||||
|
int MdsTableHandle::get_tablet_status_node(OP &&read_op, const int64_t read_seq) const
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
CHECK_MDS_TABLE_INIT();
|
||||||
|
ObFunction<int(void *)> function = GetTabletStatusNodeOpWrapper<OP>(read_op);
|
||||||
|
if (OB_FAIL(p_mds_table_base_->get_tablet_status_node(function, read_seq))) {
|
||||||
|
if (OB_UNLIKELY(OB_SNAPSHOT_DISCARDED != ret)) {
|
||||||
|
MDS_LOG(WARN, "fail to call get_latest", KR(ret), K(read_seq));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
template <typename T, typename OP,
|
template <typename T, typename OP,
|
||||||
typename std::enable_if<OB_TRAIT_IS_FUNCTION_LIKE(OP, int(const T&)), bool>::type>
|
typename std::enable_if<OB_TRAIT_IS_FUNCTION_LIKE(OP, int(const T&)), bool>::type>
|
||||||
int MdsTableHandle::get_snapshot(OP &&read_op,
|
int MdsTableHandle::get_snapshot(OP &&read_op,
|
||||||
|
|||||||
@ -118,6 +118,8 @@ public:
|
|||||||
ObFunction<int(void *)> &op,
|
ObFunction<int(void *)> &op,
|
||||||
bool &is_committed,
|
bool &is_committed,
|
||||||
const int64_t read_seq) const override;
|
const int64_t read_seq) const override;
|
||||||
|
virtual int get_tablet_status_node(ObFunction<int(void *)> &op,
|
||||||
|
const int64_t read_seq) const override;
|
||||||
virtual int get_snapshot(int64_t unit_id,
|
virtual int get_snapshot(int64_t unit_id,
|
||||||
void *key,
|
void *key,
|
||||||
ObFunction<int(void *)> &op,
|
ObFunction<int(void *)> &op,
|
||||||
|
|||||||
@ -18,6 +18,7 @@
|
|||||||
#include "ob_clock_generator.h"
|
#include "ob_clock_generator.h"
|
||||||
#include "share/ob_errno.h"
|
#include "share/ob_errno.h"
|
||||||
#include "storage/multi_data_source/mds_table_base.h"
|
#include "storage/multi_data_source/mds_table_base.h"
|
||||||
|
#include "storage/multi_data_source/mds_table_impl.h"
|
||||||
#ifndef STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_IMPL_H_IPP
|
#ifndef STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_IMPL_H_IPP
|
||||||
#define STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_IMPL_H_IPP
|
#define STORAGE_MULTI_DATA_SOURCE_MDS_TABLE_IMPL_H_IPP
|
||||||
#include "mds_table_impl.h"
|
#include "mds_table_impl.h"
|
||||||
@ -551,6 +552,26 @@ int MdsTableImpl<MdsTableType>::get_latest(int64_t unit_id,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// only normal mds table support this method, and only for transfer
|
||||||
|
template <typename MdsTableType>
|
||||||
|
int MdsTableImpl<MdsTableType>::get_tablet_status_node(ObFunction<int(void *)> &op, const int64_t read_seq) const
|
||||||
|
{
|
||||||
|
return OB_NOT_SUPPORTED;
|
||||||
|
}
|
||||||
|
struct GetTabletSetusNodeOpWrapper {
|
||||||
|
GetTabletSetusNodeOpWrapper(ObFunction<int(void *)> &op) : op_(op) {}
|
||||||
|
int operator()(const UserMdsNode<DummyKey, ObTabletCreateDeleteMdsUserData> &node) {
|
||||||
|
return op_((void *)&node);
|
||||||
|
}
|
||||||
|
ObFunction<int(void *)> &op_;
|
||||||
|
};
|
||||||
|
template <>
|
||||||
|
inline int MdsTableImpl<NormalMdsTable>::get_tablet_status_node(ObFunction<int(void *)> &op, const int64_t read_seq) const
|
||||||
|
{
|
||||||
|
return const_cast<NormalMdsTable &>(unit_tuple_).element<MdsUnit<DummyKey, ObTabletCreateDeleteMdsUserData>>().
|
||||||
|
get_latest(GetTabletSetusNodeOpWrapper(op), read_seq);
|
||||||
|
}
|
||||||
|
|
||||||
template <typename MdsTableImpl>
|
template <typename MdsTableImpl>
|
||||||
struct GetSnapshotHelper {
|
struct GetSnapshotHelper {
|
||||||
GetSnapshotHelper(const MdsTableImpl &mds_table_impl,
|
GetSnapshotHelper(const MdsTableImpl &mds_table_impl,
|
||||||
|
|||||||
@ -64,6 +64,14 @@ public:
|
|||||||
"tablet_id", get_table_id_(), KP(get_tablet_pointer_()));
|
"tablet_id", get_table_id_(), KP(get_tablet_pointer_()));
|
||||||
int get_mds_table_rec_log_scn(share::SCN &rec_scn);
|
int get_mds_table_rec_log_scn(share::SCN &rec_scn);
|
||||||
int mds_table_flush(const share::SCN &recycle_scn);
|
int mds_table_flush(const share::SCN &recycle_scn);
|
||||||
|
// get tablet status from MDS, and check whether state is TRANSFER_IN and redo scn is valid.
|
||||||
|
// @param [in] written : if current tablet status is TRANSFER_IN, set true if redo_scn is valid, otherwise set fasle
|
||||||
|
// @return OB_STATE_NOT_MATCH : tablet status is not TRANSFER_IN.
|
||||||
|
// OB_EMPTY_RESULT : never has tablet status written.
|
||||||
|
// OB_LS_OFFLINE : read meet ls offline
|
||||||
|
// other error...
|
||||||
|
// CAUTIONS: this interface is only for transfer! anyone else shouldn't call this!
|
||||||
|
int check_transfer_in_redo_written(bool &written);
|
||||||
protected:// implemented by ObTablet
|
protected:// implemented by ObTablet
|
||||||
virtual bool check_is_inited_() const = 0;
|
virtual bool check_is_inited_() const = 0;
|
||||||
virtual const ObTabletMdsData &get_mds_data_() const = 0;
|
virtual const ObTabletMdsData &get_mds_data_() const = 0;
|
||||||
|
|||||||
@ -1,7 +1,11 @@
|
|||||||
#ifndef INCLUDE_OB_TABLET_MDS_PART_IPP
|
#ifndef INCLUDE_OB_TABLET_MDS_PART_IPP
|
||||||
#define INCLUDE_OB_TABLET_MDS_PART_IPP
|
#define INCLUDE_OB_TABLET_MDS_PART_IPP
|
||||||
|
#include "lib/ob_errno.h"
|
||||||
#include "ob_i_tablet_mds_interface.h"
|
#include "ob_i_tablet_mds_interface.h"
|
||||||
#include "share/ob_errno.h"
|
#include "share/ob_errno.h"
|
||||||
|
#include "storage/multi_data_source/compile_utility/mds_dummy_key.h"
|
||||||
|
#include "storage/multi_data_source/mds_node.h"
|
||||||
|
#include "storage/tablet/ob_tablet_status.h"
|
||||||
#endif
|
#endif
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -227,6 +231,7 @@ inline int ObITabletMdsInterface::get_mds_data_from_tablet<ObTabletBindingMdsUse
|
|||||||
MDS_LOG_GET(WARN, "failed to load aux tablet info");
|
MDS_LOG_GET(WARN, "failed to load aux tablet info");
|
||||||
} else if (!aux_tablet_info.is_valid()) {
|
} else if (!aux_tablet_info.is_valid()) {
|
||||||
ret = OB_EMPTY_RESULT;
|
ret = OB_EMPTY_RESULT;
|
||||||
|
MDS_LOG_GET(DEBUG, "get empty aux_tablet_info");
|
||||||
} else if (CLICK_FAIL(read_op(aux_tablet_info))) {
|
} else if (CLICK_FAIL(read_op(aux_tablet_info))) {
|
||||||
MDS_LOG_GET(WARN, "failed to read_op");
|
MDS_LOG_GET(WARN, "failed to read_op");
|
||||||
}
|
}
|
||||||
@ -779,5 +784,96 @@ inline int ObITabletMdsInterface::fill_virtual_info(ObIArray<mds::MdsNodeInfoFor
|
|||||||
#undef PRINT_WRAPPER
|
#undef PRINT_WRAPPER
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/********************************************this is special logic*****************************************************/
|
||||||
|
struct GetTabletStatuaNodeFromMdsTableOp {
|
||||||
|
GetTabletStatuaNodeFromMdsTableOp(ObTabletCreateDeleteMdsUserData &tablet_status, share::SCN &redo_scn)
|
||||||
|
: tablet_status_(tablet_status),
|
||||||
|
redo_scn_(redo_scn) {}
|
||||||
|
int operator()(const mds::UserMdsNode<mds::DummyKey, ObTabletCreateDeleteMdsUserData> &node) {
|
||||||
|
tablet_status_.assign(node.user_data_);
|
||||||
|
redo_scn_ = node.redo_scn_;
|
||||||
|
MDS_LOG(TRACE, "read tablet status in mds_table", K(node));
|
||||||
|
return OB_SUCCESS;
|
||||||
|
}
|
||||||
|
ObTabletCreateDeleteMdsUserData &tablet_status_;
|
||||||
|
share::SCN &redo_scn_;
|
||||||
|
};
|
||||||
|
struct GetTabletStatuaFromTabletOp {
|
||||||
|
GetTabletStatuaFromTabletOp(ObTabletCreateDeleteMdsUserData &tablet_status)
|
||||||
|
: tablet_status_(tablet_status) {}
|
||||||
|
int operator()(const ObTabletCreateDeleteMdsUserData& data) {
|
||||||
|
tablet_status_.assign(data);
|
||||||
|
return OB_SUCCESS;
|
||||||
|
}
|
||||||
|
ObTabletCreateDeleteMdsUserData &tablet_status_;
|
||||||
|
};
|
||||||
|
inline int ObITabletMdsInterface::check_transfer_in_redo_written(bool &written)
|
||||||
|
{
|
||||||
|
#define PRINT_WRAPPER KR(ret), K(*this), K(written)
|
||||||
|
MDS_TG(10_ms);
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
bool is_online = false;
|
||||||
|
ObTabletCreateDeleteMdsUserData tablet_status;
|
||||||
|
share::SCN redo_scn;
|
||||||
|
do {
|
||||||
|
mds::MdsTableHandle handle;
|
||||||
|
ObLSSwitchChecker ls_switch_checker;
|
||||||
|
if (CLICK_FAIL(get_mds_table_handle_(handle, false))) {
|
||||||
|
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||||
|
MDS_LOG_GET(WARN, "failed to get_mds_table");
|
||||||
|
} else {
|
||||||
|
MDS_LOG_GET(TRACE, "failed to get_mds_table");
|
||||||
|
}
|
||||||
|
} else if (!handle.is_valid()) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
MDS_LOG_GET(WARN, "mds cannot be NULL");
|
||||||
|
} else if (OB_ISNULL(get_tablet_pointer_())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
MDS_LOG_GET(WARN, "tablet pointer is null", K(ret), KPC(this));
|
||||||
|
} else if (MDS_FAIL(ls_switch_checker.check_ls_switch_state(get_tablet_pointer_()->get_ls(), is_online))) {
|
||||||
|
MDS_LOG_GET(WARN, "check ls online state failed", K(ret), KPC(this));
|
||||||
|
} else if (CLICK_FAIL(handle.get_tablet_status_node(GetTabletStatuaNodeFromMdsTableOp(tablet_status, redo_scn)))) {
|
||||||
|
if (OB_SNAPSHOT_DISCARDED != ret) {
|
||||||
|
MDS_LOG_GET(WARN, "failed to get mds data");
|
||||||
|
} else {
|
||||||
|
MDS_LOG_GET(TRACE, "failed to get mds data");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (tablet_status.get_tablet_status() != ObTabletStatus::TRANSFER_IN) {
|
||||||
|
ret = OB_STATE_NOT_MATCH;
|
||||||
|
} else if (!redo_scn.is_valid() || redo_scn.is_max()) {
|
||||||
|
written = false;
|
||||||
|
MDS_LOG_GET(TRACE, "get transfer in status on mds_table, but redo scn is not valid");
|
||||||
|
} else {
|
||||||
|
written = true;
|
||||||
|
MDS_LOG_GET(TRACE, "get transfer in status on mds_table, and redo scn is valid");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (CLICK_FAIL(ret)) {
|
||||||
|
if (OB_ENTRY_NOT_EXIST == ret || OB_SNAPSHOT_DISCARDED == ret) {
|
||||||
|
if (CLICK_FAIL(get_mds_data_from_tablet<ObTabletCreateDeleteMdsUserData>(GetTabletStatuaFromTabletOp(tablet_status)))) {
|
||||||
|
MDS_LOG_GET(WARN, "failed to get latest data from tablet");
|
||||||
|
} else if (tablet_status.get_tablet_status() != ObTabletStatus::TRANSFER_IN) {
|
||||||
|
ret = OB_STATE_NOT_MATCH;
|
||||||
|
} else {
|
||||||
|
written = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
if (is_online && MDS_FAIL(ls_switch_checker.double_check_epoch(is_online))) {
|
||||||
|
if (!is_online) {
|
||||||
|
ret = OB_LS_OFFLINE;
|
||||||
|
}
|
||||||
|
MDS_LOG_GET(WARN, "failed to double check ls online");
|
||||||
|
} else {
|
||||||
|
MDS_LOG_GET(TRACE, "success to check_transfer_in_redo_written");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (ret == OB_VERSION_NOT_MATCH && is_online);
|
||||||
|
return ret;
|
||||||
|
#undef PRINT_WRAPPER
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user