[CP]备库成员变更防回退

Co-authored-by: linqiucen <lqcgrace@outlook.com>
Co-authored-by: BinChenn <binchenn.bc@gmail.com>
This commit is contained in:
maosy 2024-10-12 05:56:01 +00:00 committed by ob-robot
parent 55c15c3d18
commit 99abe0b97e
19 changed files with 896 additions and 173 deletions

View File

@ -74,6 +74,7 @@ ob_unittest_observer(test_ob_simple_cluster test_ob_simple_cluster.cpp)
ob_unittest_observer(test_ob_partition_balance test_ob_partition_balance.cpp)
ob_unittest_observer(test_ls_status_operator test_ls_status_operator.cpp)
ob_unittest_observer(test_balance_operator test_tenant_balance_operator.cpp)
ob_unittest_observer(test_ls_recovery_stat test_ls_recovery_stat.cpp)
ob_unittest_observer(test_transfer_partition_task test_transfer_partition_task.cpp)
ob_unittest_observer(test_mds_table_checkpoint test_mds_table_checkpoint.cpp)
ob_unittest_observer(test_ob_black_list_service test_ob_black_list_service.cpp)

View File

@ -0,0 +1,120 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX RS
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#define private public
#define protected public
#include "env/ob_simple_cluster_test_base.h"
#include "lib/ob_errno.h"
#include "lib/oblog/ob_log.h"
#include "rootserver/ob_ls_recovery_stat_handler.h"//
#include "share/ob_ls_id.h"
#include "share/unit/ob_unit_info.h"
#include "share/ls/ob_ls_status_operator.h"
#include "share/ls/ob_ls_i_life_manager.h"
namespace oceanbase
{
using namespace unittest;
using namespace share;
using namespace common;
namespace rootserver
{
using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
class TestLSRecoveryGuard : public unittest::ObSimpleClusterTestBase
{
public:
TestLSRecoveryGuard() : unittest::ObSimpleClusterTestBase("test_ls_recovery_stat") {}
protected:
uint64_t tenant_id_;
};
TEST_F(TestLSRecoveryGuard, sys_recovery_guard)
{
int ret = OB_SUCCESS;
{
//不init直接析构
ObLSRecoveryGuard guard;
}
{
//init 系统租户,析构
ObLSRecoveryGuard guard;
ASSERT_EQ(OB_SUCCESS, guard.init(OB_SYS_TENANT_ID, SYS_LS));
}
{
//init不存在的租户,或者日志流
ObLSRecoveryGuard guard;
ASSERT_EQ(OB_TENANT_NOT_IN_SERVER, guard.init(1002, SYS_LS));
}
}
TEST_F(TestLSRecoveryGuard, user_recovery_guard)
{
int ret = OB_SUCCESS;
ASSERT_EQ(OB_SUCCESS, create_tenant());
tenant_id_ = 1002;
SCN readable_scn;
palf::LogConfigVersion config_version;
{
ObLSRecoveryGuard guard;
ObLSID ls_id(10000000);
ASSERT_EQ(OB_LS_NOT_EXIST, guard.init(tenant_id_, ls_id));
}
{
ObLSRecoveryGuard guard;
//加锁成功后,不在汇报,但是可以在统计
ASSERT_EQ(OB_SUCCESS, guard.init(tenant_id_, SYS_LS, 300 * 1000));
readable_scn = guard.ls_recovery_stat_->readable_scn_upper_limit_;
//内存中的scn还是可以推高的
SCN readable_scn_memory = guard.ls_recovery_stat_->replicas_scn_.at(0).get_readable_scn();
config_version = guard.ls_recovery_stat_->config_version_in_inner_;
ASSERT_EQ(1, guard.ls_recovery_stat_->ref_cnt_);
ObLSRecoveryGuard guard1;
//不能加锁成功
ASSERT_EQ(OB_EAGAIN, guard1.init(tenant_id_, SYS_LS, 2 * 1000 * 1000));
ASSERT_EQ(OB_INIT_TWICE, guard.init(tenant_id_, SYS_LS));
ASSERT_EQ(1, guard.ls_recovery_stat_->ref_cnt_);
ASSERT_EQ(OB_SUCCESS, guard.ls_recovery_stat_->reset_inner_readable_scn());
usleep(3000 * 1000);//sleep 300ms,应该设置成最新
ASSERT_EQ(readable_scn.get_val_for_sql(), guard.ls_recovery_stat_->readable_scn_upper_limit_.get_val_for_sql());
ASSERT_NE(readable_scn_memory.get_val_for_sql(), guard.ls_recovery_stat_->replicas_scn_.at(0).readable_scn_.get_val_for_sql());
}
//释放后,可以推过
usleep(3000 * 1000);
{
ObLSRecoveryGuard guard;
ASSERT_EQ(OB_SUCCESS, guard.init(tenant_id_, SYS_LS, 300 * 1000));
ASSERT_NE(readable_scn.get_val_for_sql(), guard.ls_recovery_stat_->readable_scn_upper_limit_.get_val_for_sql());
readable_scn = guard.ls_recovery_stat_->readable_scn_upper_limit_;
ASSERT_EQ(OB_SUCCESS, guard.ls_recovery_stat_->reset_inner_readable_scn());
ASSERT_EQ(OB_NEED_RETRY, guard.ls_recovery_stat_->set_inner_readable_scn(config_version,readable_scn, true));
ASSERT_EQ(OB_SUCCESS, guard.ls_recovery_stat_->set_inner_readable_scn(config_version, readable_scn, false));
}
}
} // namespace share
} // namespace oceanbase
int main(int argc, char **argv)
{
oceanbase::unittest::init_log_and_gtest(argc, argv);
OB_LOGGER.set_log_level("INFO");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -21,7 +21,7 @@ namespace oceanbase
namespace logservice
{
ObReconfigCheckerAdapter::ObReconfigCheckerAdapter()
ObReconfigCheckerAdapter::ObReconfigCheckerAdapter() : guard_()
{
tenant_id_ = OB_INVALID_TENANT_ID;
ls_id_.reset();
@ -43,8 +43,8 @@ int ObReconfigCheckerAdapter::init(const uint64_t tenant_id,
tenant_id_ = tenant_id;
ls_id_ = ls_id;
timeout_ = timeout;
ret = guard_.init(tenant_id, ls_id, timeout);
}
return ret;
}
@ -71,6 +71,10 @@ int ObReconfigCheckerAdapter::check_can_add_member(const ObAddr &server,
}
} while (OB_FAIL(ret) && ret != OB_TIMEOUT);
if (OB_SUCC(ret)) {
ret = guard_.check_can_add_member(server, timeout_us);
}
return ret;
}
@ -78,9 +82,7 @@ int ObReconfigCheckerAdapter::check_can_change_memberlist(const ObMemberList &ne
const int64_t paxos_replica_num,
const int64_t timeout_us)
{
int ret = OB_SUCCESS;
UNUSEDx(new_member_list, paxos_replica_num, timeout_us);
return ret;
return guard_.check_can_change_member(new_member_list, paxos_replica_num, timeout_us);
}
} // end namespace logservice

View File

@ -39,6 +39,7 @@ private:
uint64_t tenant_id_;
share::ObLSID ls_id_;
int64_t timeout_;
rootserver::ObLSRecoveryGuard guard_;
};
} // logservice

View File

@ -81,6 +81,7 @@ public:
LogConfigVersion();
~LogConfigVersion();
void operator=(const LogConfigVersion &config_version);
static const int64_t CONFIG_VERSION_LEN = 128;
public:
int generate(const int64_t proposal_id, const int64_t config_seq);

View File

@ -102,6 +102,7 @@
#include "storage/shared_storage/ob_disk_space_manager.h"
#endif
#include "storage/column_store/ob_column_store_replica_util.h"
#include "rootserver/ob_ls_recovery_stat_handler.h"//get_all_replica_min_readable_scn
namespace oceanbase
{
@ -3675,6 +3676,8 @@ int ObService::init_tenant_config(
return OB_SUCCESS;
}
ERRSIM_POINT_DEF(ERRSIM_GET_LS_READABLE_SCN_ERROR);
ERRSIM_POINT_DEF(ERRSIM_GET_LS_READABLE_SCN_OLD);
int ObService::get_ls_replayed_scn(
const ObGetLSReplayedScnArg &arg,
ObGetLSReplayedScnRes &result)
@ -3692,6 +3695,9 @@ int ObService::get_ls_replayed_scn(
LOG_WARN("arg is invaild", KR(ret), K(arg));
} else if (arg.get_tenant_id() != MTL_ID() && OB_FAIL(guard.switch_to(arg.get_tenant_id()))) {
LOG_WARN("switch tenant failed", KR(ret), K(arg));
} else if (ERRSIM_GET_LS_READABLE_SCN_ERROR) {
ret = ERRSIM_GET_LS_READABLE_SCN_ERROR;
LOG_WARN("failed to get ls replica readable scn for errsim", KR(ret), K(arg));
}
if (OB_SUCC(ret)) {
ObLSService *ls_svr = MTL(ObLSService*);
@ -3709,10 +3715,21 @@ int ObService::get_ls_replayed_scn(
} else if (OB_FAIL(ls->get_max_decided_scn(cur_readable_scn))) {
LOG_WARN("failed to get_max_decided_scn", KR(ret), K(arg), KPC(ls));
} else if (arg.is_all_replica()) {
if (OB_FAIL(ls->get_all_replica_min_readable_scn(cur_readable_scn))) {
LOG_WARN("failed to get all replica readable scn", KR(ret));
if (OB_ISNULL(ls->get_ls_recovery_stat_handler())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get ls recovery stat", KR(ret), K(arg));
} else if (OB_FAIL(ls->get_ls_recovery_stat_handler()
->get_all_replica_min_readable_scn(cur_readable_scn))) {
LOG_WARN("failed to get all replica min readable_scn", KR(ret), K(arg));
}
}
if (OB_SUCC(ret) && ERRSIM_GET_LS_READABLE_SCN_OLD) {
const int64_t current_time = ObTimeUtility::current_time() -
GCONF.internal_sql_execute_timeout;
cur_readable_scn.convert_from_ts(current_time);
LOG_WARN("set ls replica readble_scn small", K(arg), K(cur_readable_scn),
K(current_time));
}
if (FAILEDx(ls->get_offline_scn(offline_scn))) {
LOG_WARN("failed to get offline scn", KR(ret), K(arg), KPC(ls));
} else if (OB_FAIL(result.init(arg.get_tenant_id(), arg.get_ls_id(),

View File

@ -138,7 +138,8 @@ int ObAllVirtualPalfStat::insert_log_stat_(const logservice::ObLogStat &log_stat
break;
}
case OB_APP_MIN_COLUMN_ID + 6: {
if (0 >= palf_stat.config_version_.to_string(config_version_buf_, VARCHAR_128)) {
if (0 >= palf_stat.config_version_.to_string(config_version_buf_,
palf::LogConfigVersion::CONFIG_VERSION_LEN)) {
SERVER_LOG(WARN, "config_version_ to_string failed", K(ret), K(palf_stat));
} else {
cur_row_.cells_[i].set_varchar(ObString::make_string(config_version_buf_));

View File

@ -17,6 +17,7 @@
#include "share/ob_scanner.h"
#include "common/row/ob_row.h"
#include "logservice/palf/palf_handle.h"
#include "logservice/palf/log_meta_info.h"//CONFIG_VERSION_LEN
namespace oceanbase
{
@ -42,14 +43,13 @@ private:
private:
static const int64_t VARCHAR_32 = 32;
static const int64_t VARCHAR_64 = 64;
static const int64_t VARCHAR_128 = 128;
char role_str_[VARCHAR_32] = {'\0'};
char access_mode_str_[VARCHAR_32] = {'\0'};
char ip_[common::OB_IP_PORT_STR_BUFF] = {'\0'};
ObSqlString member_list_buf_;
char arbitration_member_buf_[MAX_SINGLE_MEMBER_LENGTH] = {'\0'};
char degraded_list_buf_[MAX_LEARNER_LIST_LENGTH] = {'\0'};
char config_version_buf_[VARCHAR_128] = {'\0'};
char config_version_buf_[palf::LogConfigVersion::CONFIG_VERSION_LEN] = {'\0'};
char replica_type_str_[VARCHAR_32] = {'\0'};
char learner_list_buf_[MAX_LEARNER_LIST_LENGTH] = {'\0'};
omt::ObMultiTenant *omt_;

View File

@ -264,12 +264,12 @@ int ObLSRecoveryReportor::update_ls_recovery_stat_()
} else if (OB_TMP_FAIL(ls->update_ls_replayable_point(replayable_scn))) {
LOG_WARN("failed to update_ls_replayable_point", KR(tmp_ret), KPC(ls), K(replayable_scn));
}
if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_3_0_0) {
} else if (OB_TMP_FAIL(ls->gather_replica_readable_scn())) {
//兼容性和是否为leader的判断包在了接口中
if (OB_TMP_FAIL(ls->gather_replica_readable_scn())) {
if (OB_NOT_MASTER == tmp_ret) {
tmp_ret = OB_SUCCESS;
} else {
LOG_WARN("failed to gather replica readable scn", KR(tmp_ret));
const ObLSID ls_id = ls->get_ls_id();
LOG_WARN("failed to gather replica readable scn", KR(tmp_ret), K(ls_id));
}
}
@ -301,10 +301,20 @@ int ObLSRecoveryReportor::update_ls_recovery(
int ret = OB_SUCCESS;
ObLSRecoveryStat ls_recovery_stat;
ObMySQLTransaction trans;
ObLSRecoveryGuard guard;
const uint64_t exec_tenant_id = ObLSLifeIAgent::get_exec_tenant_id(tenant_id_);
if (OB_ISNULL(ls) || OB_ISNULL(sql_proxy)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("ls or sql proxy is null", KR(ret), KP(ls), KP(sql_proxy));
} else if (OB_FAIL(guard.init(tenant_id_, ls->get_ls_id()))) {
if (OB_EAGAIN != ret) {
LOG_WARN("failed to init ls recovery guard", KR(ret), K(tenant_id_), "ls_id", ls->get_ls_id());
} else if (REACH_TENANT_TIME_INTERVAL(1 * 1000 * 1000)) {
//Reduce Exception Throwing
LOG_WARN("can not report recovery stat, maybe member_list change", KR(ret), K(tenant_id_),
"ls_id", ls->get_ls_id());
ret = OB_SUCCESS;
}
} else if (OB_FAIL(ls->get_ls_level_recovery_stat(ls_recovery_stat))) {
if (OB_NOT_MASTER == ret) {
LOG_TRACE("follower doesn't need to report ls recovery stat", KR(ret), KPC(ls));
@ -329,7 +339,6 @@ int ObLSRecoveryReportor::update_ls_recovery(
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
}
if (ls_recovery_stat.is_valid()) {
const int64_t PRINT_INTERVAL = 10 * 1000 * 1000L;
if (REACH_TIME_INTERVAL(PRINT_INTERVAL)) {

View File

@ -38,10 +38,126 @@ int ObLSReplicaReadableSCN::init(const common::ObAddr &server, const share::SCN
}
return ret;
}
int ObLSRecoveryGuard::init(const uint64_t tenant_id, const share::ObLSID &ls_id,
const int64_t &timeout)
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(ls_recovery_stat_) || is_valid_tenant_id(tenant_id_)) {
ret = OB_INIT_TWICE;
LOG_WARN("already init", KR(ret), K(tenant_id));
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))
|| !ls_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id));
} else if (skip_check_member_list_change_(tenant_id)) {
//meta and sys no transfer, no need report and wait readable_scn
//primary tenant no need check member list change
ls_recovery_stat_ = NULL;
tenant_id_ = tenant_id;
} else {
MTL_SWITCH(tenant_id) {
ObLSService *ls_svr = MTL(ObLSService *);
storage::ObLS *ls = NULL;
if (OB_ISNULL(ls_svr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls service is null", KR(ret));
} else if (OB_FAIL(ls_svr->get_ls(ls_id, ls_handle_, storage::ObLSGetMod::RS_MOD))) {
LOG_WARN("failed to get ls", KR(ret), K(ls_id));
} else if (OB_ISNULL(ls = ls_handle_.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls is NULL", KR(ret), K(ls_handle_));
} else {
ObLSRecoveryStatHandler* ls_recovery_stat = ls->get_ls_recovery_stat_handler();
if (OB_ISNULL(ls_recovery_stat)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get ls recovery stat", KR(ret), K(tenant_id),
"ls_id", ls->get_ls_id());
} else if (OB_FAIL(ls_recovery_stat->inc_ref(timeout))) {
LOG_WARN("failed to inc ref", KR(ret), K(timeout), K(tenant_id));
} else {
ls_recovery_stat_ = ls_recovery_stat;
tenant_id_ = tenant_id;
}
}
}
}
return ret;
}
ObLSRecoveryGuard::~ObLSRecoveryGuard()
{
if (OB_ISNULL(ls_recovery_stat_)) {
//not init, nothing todo
} else {
ls_recovery_stat_->reset_add_replica_server();
ls_recovery_stat_->dec_ref();
LOG_TRACE("release ls recovery stat guard", K(tenant_id_), KPC(ls_recovery_stat_));
ls_recovery_stat_ = NULL;
}
}
bool ObLSRecoveryGuard::skip_check_member_list_change_(const uint64_t tenant_id)
{
bool bret = false;
if (!is_user_tenant(tenant_id)) {
bret = true;
LOG_INFO("not user tenant, no need to check member list change", K(tenant_id));
} else {
int ret = OB_SUCCESS;//use to MTL_SWITCH
MTL_SWITCH(tenant_id) {
if (MTL_TENANT_ROLE_CACHE_IS_PRIMARY()) {
bret = true;
LOG_INFO("is primary tenant, no need check readable_scn");
}
}
}
return bret;
}
int ObLSRecoveryGuard::check_can_add_member(const ObAddr &server, const int64_t timeout)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!server.is_valid() || 0 >= timeout)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(server), K(timeout));
} else if (skip_check_member_list_change_(tenant_id_)) {
//if not user tenant, no need to check and add member
} else if (OB_ISNULL(ls_recovery_stat_)) {
ret = OB_NOT_INIT;
LOG_WARN("ls recovery stat is null, not init", KR(ret), K_(tenant_id));
} else if (OB_FAIL(ls_recovery_stat_->set_add_replica_server(server))) {
LOG_WARN("failed to set add replica server", KR(ret), K(server));
} else if (OB_FAIL(ls_recovery_stat_->wait_server_readable_scn(server, timeout))) {
LOG_WARN("failed to wait readable scn", KR(ret), K(server));
}
return ret;
}
int ObLSRecoveryGuard::check_can_change_member(const ObMemberList &new_member_list,
const int64_t paxos_replica_num, const int64_t timeout)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!new_member_list.is_valid() || 0 >= paxos_replica_num || 0 >= timeout)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(new_member_list), K(paxos_replica_num), K(timeout));
} else if (skip_check_member_list_change_(tenant_id_)) {
//if not user tenant, no need to check and add member
} else if (OB_ISNULL(ls_recovery_stat_)) {
ret = OB_NOT_INIT;
LOG_WARN("ls recovery stat is null, not init", KR(ret), K_(tenant_id));
} else if (OB_FAIL(ls_recovery_stat_->wait_can_change_member_list(new_member_list,
paxos_replica_num, timeout))) {
LOG_WARN("failed to check can change member", KR(ret), K(new_member_list), K(paxos_replica_num), K(timeout));
}
return ret;
}
int ObLSRecoveryStatHandler::init(const uint64_t tenant_id, ObLS *ls)
{
int ret = OB_SUCCESS;
reset(true);
if (is_inited_) {
ret = OB_INIT_TWICE;
LOG_WARN("ObLSRecoveryStatHandler init twice", KR(ret), K_(is_inited));
@ -51,30 +167,135 @@ int ObLSRecoveryStatHandler::init(const uint64_t tenant_id, ObLS *ls)
} else {
ls_ = ls;
tenant_id_ = tenant_id;
SpinWLockGuard guard(lock_);
last_dump_ts_ = ObTimeUtility::current_time();
is_inited_ = true;
replicas_scn_.set_tenant_id(tenant_id);
replicas_scn_.set_label("LSReadableSCN");
LOG_INFO("ObLSRecoveryStatHandler init success", K(this));
}
return ret;
}
void ObLSRecoveryStatHandler::reset()
void ObLSRecoveryStatHandler::reset(const bool is_init)
{
is_inited_ = false;
ls_ = NULL;
tenant_id_ = OB_INVALID_TENANT_ID;
SpinWLockGuard guard(lock_);
readable_scn_in_inner_.reset();
readable_scn_upper_limit_.reset();
config_version_in_inner_.reset();
extra_server_.reset();
config_version_.reset();
replicas_scn_.reset();
last_dump_ts_ = OB_INVALID_TIMESTAMP;
if (is_init) {
ATOMIC_SET(&ref_cnt_, 0);
} else if (0 != ATOMIC_LOAD(&ref_cnt_)) {
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "has reference", K(ref_cnt_));
ref_cond_.signal();
}
}
int ObLSRecoveryStatHandler::check_inner_stat_()
void ObLSRecoveryStatHandler::dec_ref()
{
ATOMIC_CAS(&ref_cnt_, 1, 0);
ref_cond_.signal();
}
int ObLSRecoveryStatHandler::inc_ref(const int64_t timeout)
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("check inner stat error", KR(ret));
} else {
int64_t curr_timeout = timeout;
const int64_t TIME_WAIT = 100 * 1000;
int tmp_ret = OB_SUCCESS;
ret = OB_EAGAIN;
do {
if (0 == ATOMIC_CAS(&ref_cnt_, 0, 1)) {
ret = OB_SUCCESS;
} else if (curr_timeout > 0) {
LOG_INFO("wait for inc ref", K(curr_timeout), K(timeout));
if (OB_TMP_FAIL(ref_cond_.timedwait(TIME_WAIT))) {
LOG_WARN("failed to timedwait", KR(ret), KR(tmp_ret));
}
curr_timeout -= TIME_WAIT;
}
} while (curr_timeout > 0 && OB_EAGAIN == ret);
}
return ret;
}
int ObLSRecoveryStatHandler::set_inner_readable_scn(const palf::LogConfigVersion &config_version,
const share::SCN &readable_scn, bool check_inner_config_valid)
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("check inner stat error", KR(ret));
} else if (OB_UNLIKELY(!config_version.is_valid() || !readable_scn.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(config_version), K(readable_scn));
} else {
SpinWLockGuard guard(lock_);
if (check_inner_config_valid && (!config_version_in_inner_.is_valid()
|| config_version_in_inner_ != config_version)) {
//如果需要校验config_version,只能校验config_version相等
//不能无脑推高config_version_in_inner_,这个值要严格和内部表保持一致
//如果本地统计的config_version大于config_version_in_inner_
//readable_scn_upper_limit_也没必要推高,总是要先把config_version更新成功后,才会更新内部表成功
ret = OB_NEED_RETRY;
LOG_WARN("config version in inner is invalid, can not update upper limit readable_scn",
KR(ret), K(check_inner_config_valid), K(config_version_in_inner_));
} else if (config_version_in_inner_.is_valid()
&& config_version_in_inner_ > config_version) {
//内存中的config_version不会回退,但是内存中存储的readable_scn_upper_limit可能会回退,
//可能会由于统计不到某些副本导致可读点回退,但是内存不回退,但是也不报错
ret = OB_NEED_RETRY;
LOG_WARN("config version not match or readable_scn fallback", KR(ret), K(config_version),
K(config_version_in_inner_), K(readable_scn_upper_limit_), K(readable_scn));
} else {
config_version_in_inner_ = config_version;
const int64_t PRINT_INTERVAL = 1 * 1000 * 1000;
if (readable_scn_upper_limit_ > readable_scn && REACH_TENANT_TIME_INTERVAL(PRINT_INTERVAL)) {
const ObLSID ls_id = ls_->get_ls_id();
LOG_INFO("readable scn fallback", K(ls_id), K(readable_scn_upper_limit_), K(readable_scn));
}
readable_scn_upper_limit_ = SCN::max(readable_scn_upper_limit_, readable_scn);
}
}
return ret;
}
int ObLSRecoveryStatHandler::reset_inner_readable_scn()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("check inner stat error", KR(ret));
} else {
SpinWLockGuard guard(lock_);
config_version_in_inner_.reset();
readable_scn_upper_limit_.reset();
}
return ret;
}
int ObLSRecoveryStatHandler::wait_can_change_member_list(
const ObMemberList &new_member_list, const int64_t paxos_replica_num,
const int64_t timeout)
{
int ret = OB_SUCCESS;
const int64_t now = ObTimeUtility::current_time();
if (OB_UNLIKELY(!new_member_list.is_valid() || 0 >= paxos_replica_num || 0 >= timeout)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(new_member_list), K(paxos_replica_num), K(timeout));
} else if (OB_FAIL(wait_func_with_timeout_(timeout, new_member_list, paxos_replica_num))) {
LOG_WARN("failed to wait func with timeout", KR(ret), K(new_member_list), K(paxos_replica_num), K(timeout));
}
LOG_INFO("finish wait for change member_list", KR(ret), K(new_member_list), K(paxos_replica_num),
K(timeout), "cost", ObTimeUtility::current_time() - now);
return ret;
}
int ObLSRecoveryStatHandler::check_inner_stat_() {
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
@ -85,7 +306,6 @@ int ObLSRecoveryStatHandler::check_inner_stat_()
}
return ret;
}
int ObLSRecoveryStatHandler::get_ls_replica_readable_scn(share::SCN &readable_scn)
{
int ret = OB_SUCCESS;
@ -111,23 +331,19 @@ int ObLSRecoveryStatHandler::get_ls_replica_readable_scn(share::SCN &readable_sc
return ret;
}
int ObLSRecoveryStatHandler::get_all_replica_min_readable_scn(share::SCN &readable_scn)
{
int ret = OB_SUCCESS;
palf::PalfStat palf_stat_first;
palf::PalfStat palf_stat_second;
readable_scn = SCN::max_scn();
logservice::ObLogService *ls_svr = MTL(logservice::ObLogService*);
int64_t first_proposal_id = 0;
ObRole first_role;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret), K_(is_inited));
} else if (OB_FAIL(get_latest_palf_stat_(palf_stat_first))) {
LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls));
} else if (OB_FAIL(ls_svr->get_palf_role(ls_->get_ls_id(), first_role, first_proposal_id))) {
LOG_WARN("failed to get first role", KR(ret), K(ls_->get_ls_id()), KP(ls_svr), KPC_(ls));
} else if (!is_strong_leader(first_role)) {
} else if (!is_strong_leader(palf_stat_first.role_)) {
ret = OB_NOT_MASTER;
LOG_WARN("not master, need retry", KR(ret), K(ls_->get_ls_id()));
} else {
@ -159,17 +375,80 @@ int ObLSRecoveryStatHandler::get_all_replica_min_readable_scn(share::SCN &readab
}
//TODO maybe need consider readable scn in inner table
ObLSID ls_id = ls_->get_ls_id();
LOG_INFO("all ls readable scn", K(ls_id), K(readable_scn), K(replicas_scn_));
LOG_INFO("all ls readable scn", KR(ret), K(ls_id), K(readable_scn), K(replicas_scn_),
"member_list", paxos_member_list, "replicas config_version", config_version_,
"current_config_version", palf_stat_first.config_version_);
}
if (FAILEDx(get_latest_palf_stat_(palf_stat_second))) {
LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls));
} else if (palf_stat_first.config_version_ != palf_stat_second.config_version_) {
ret = OB_EAGAIN;
ret = OB_NEED_RETRY;
LOG_WARN("config_version changed, try again", KR(ret), K(palf_stat_first), K(palf_stat_second));
}
return ret;
}
int ObLSRecoveryStatHandler::wait_server_readable_scn(
const common::ObAddr &server, const int64_t timeout)
{
int ret = OB_SUCCESS;
int64_t now = ObTimeUtility::current_time();
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret));
} else if (OB_UNLIKELY(!server.is_valid() || 0 >= timeout)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(server), K(timeout));
} else {
if (OB_FAIL(wait_func_with_timeout_(timeout, server))) {
LOG_WARN("failed wait func with timeout", KR(ret), K(server), K(timeout));
}
}
LOG_INFO("finish wait for add member", KR(ret), K(server),
K(timeout), "cost", ObTimeUtility::current_time() - now);
return ret;
}
int ObLSRecoveryStatHandler::check_member_change_valid_(
const common::ObAddr &server, bool &is_valid)
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret));
} else if (OB_UNLIKELY(!server.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(server));
} else {
SCN readable_scn;
is_valid = false;
const int64_t TIME_WAIT = 100 * 1000;
bool found = false;
SpinRLockGuard guard(lock_);
for (int64_t i = 0; i < replicas_scn_.count() && OB_SUCC(ret) && !found;
++i) {
if (replicas_scn_.at(i).get_server() == server) {
found = true;
readable_scn = replicas_scn_.at(i).get_readable_scn();
if (readable_scn >= readable_scn_upper_limit_) {
is_valid = true;
LOG_INFO("can change member list", K(server), K(readable_scn),
K(readable_scn_upper_limit_), "ls_id", ls_->get_ls_id());
} else if (REACH_TENANT_TIME_INTERVAL(TIME_WAIT)) {
LOG_INFO("server readable scn is not larger enough", K(server),
K(readable_scn), K(readable_scn_upper_limit_), "ls_id", ls_->get_ls_id());
}
}
} // end for
if (OB_SUCC(ret) && !found && REACH_TENANT_TIME_INTERVAL(TIME_WAIT)) {
is_valid = false;
LOG_INFO("cannot find server readable scn", K(server), K(replicas_scn_),
K(readable_scn_upper_limit_), "ls_id", ls_->get_ls_id());
}
}
return ret;
}
int ObLSRecoveryStatHandler::increase_ls_replica_readable_scn_(SCN &readable_scn)
{
int ret = OB_SUCCESS;
@ -251,7 +530,7 @@ int ObLSRecoveryStatHandler::get_ls_level_recovery_stat(ObLSRecoveryStat &ls_rec
LOG_WARN("failed to get lastest palf stat", KR(ret));
} else if (palf_stat_first.config_version_ != palf_stat_second.config_version_
|| !is_strong_leader(palf_stat_second.role_)) {
ret = OB_EAGAIN;
ret = OB_NEED_RETRY;
LOG_INFO("role changed, try again", KR(ret), K(palf_stat_first), K(palf_stat_second));
}
@ -262,7 +541,9 @@ int ObLSRecoveryStatHandler::set_add_replica_server(
const common::ObAddr &server)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!server.is_valid())) {
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret), K_(is_inited));
} else if (OB_UNLIKELY(!server.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("server is invalid", KR(ret), K(server));
} else {
@ -273,6 +554,13 @@ int ObLSRecoveryStatHandler::set_add_replica_server(
return ret;
}
void ObLSRecoveryStatHandler::reset_add_replica_server()
{
SpinWLockGuard guard(lock_);
extra_server_.reset();
}
int ObLSRecoveryStatHandler::do_get_ls_level_readable_scn_(SCN &read_scn)
{
int ret = OB_SUCCESS;
@ -346,15 +634,12 @@ int ObLSRecoveryStatHandler::construct_new_member_list_(
return ret;
}
int ObLSRecoveryStatHandler::try_reload_and_fix_config_version_(const palf::LogConfigVersion &current_version)
int ObLSRecoveryStatHandler::try_reload_and_fix_config_version_(
const palf::LogConfigVersion &current_version)
{
int ret = OB_SUCCESS;
bool need_update = false;
share::SCN readable_scn;
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_);
uint64_t tenant_data_version = 0;
ObLSRecoveryStatOperator op;
ObLSID ls_id;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret));
} else if (OB_UNLIKELY(!current_version.is_valid())) {
@ -363,35 +648,33 @@ int ObLSRecoveryStatHandler::try_reload_and_fix_config_version_(const palf::LogC
} else if (OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql proxy is null", KR(ret));
} else if (FALSE_IT(ls_id = ls_->get_ls_id())) {
//can not be there
} else if (OB_FAIL(GET_MIN_DATA_VERSION(meta_tenant_id, tenant_data_version))) {
LOG_WARN("failed to get min data version", KR(ret), K(tenant_id_), K(meta_tenant_id));
} else if (tenant_data_version < MOCK_DATA_VERSION_4_2_4_0) {
//内部表config_version的汇报最开始是在4300版本上提交的
//后面patch到424版本上,由于是在4300分支的第一个版本号提交
//所以版本号判断直接小于等于424即可
need_update = false;
LOG_INFO("not ready to load and update config version", KR(ret), K(tenant_data_version));
} else {
SpinRLockGuard guard(lock_);
if (current_version != config_version_in_inner_) {
if (config_version_in_inner_.is_valid() && config_version_in_inner_ > current_version) {
ret = OB_NEED_RETRY;
LOG_WARN("config version is fallback", KR(ret), K(current_version), K(config_version_in_inner_));
} else if (current_version == config_version_in_inner_) {
need_update = false;
LOG_DEBUG("config version not change", KR(ret), K(current_version));
} else {
need_update = true;
FLOG_INFO("config version not match, need update",
K(config_version_in_inner_), K(current_version), K(ls_id));
K(config_version_in_inner_), K(current_version), "ls_id",
ls_->get_ls_id());
}
}
if (OB_SUCC(ret) && need_update) {
ObLSRecoveryStatOperator op;
ObLSID ls_id = ls_->get_ls_id();
int tmp_ret = OB_SUCCESS;
if (OB_FAIL(op.update_ls_config_version(tenant_id_, ls_id, current_version,
*GCTX.sql_proxy_, readable_scn))) {
LOG_WARN("failed to update ls config version", KR(ret), K(tenant_id_), K(ls_id), K(current_version));
//set invalid config version
SpinWLockGuard guard(lock_);
config_version_in_inner_.reset();
} else {
SpinWLockGuard guard(lock_);
readable_scn_in_inner_ = readable_scn;
config_version_in_inner_ = current_version;
if (OB_TMP_FAIL(reset_inner_readable_scn())) {
LOG_ERROR("failed to reset config version", KR(ret), KR(tmp_ret));
}
} else if (OB_FAIL(set_inner_readable_scn(current_version, readable_scn, false))) {
LOG_ERROR("failed to set readable_scn", KR(ret), K(current_version), K(readable_scn));
}
}
return ret;
@ -443,27 +726,40 @@ int ObLSRecoveryStatHandler::get_latest_palf_stat_(
return ret;
}
int ObLSRecoveryStatHandler::gather_replica_readable_scn()
int ObLSRecoveryStatHandler::check_can_use_new_version_(bool &is_valid_use)
{
int ret = OB_SUCCESS;
ObArray<ObLSReplicaReadableSCN> replicas_scn;
palf::PalfStat palf_stat_first;
palf::PalfStat palf_stat_second;
ObArray<common::ObAddr> addr_list;
is_valid_use = false;
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_);
uint64_t tenant_data_version = 0;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret));
} else if (OB_FAIL(GET_MIN_DATA_VERSION(meta_tenant_id, tenant_data_version))) {
LOG_WARN("failed to get min data version", KR(ret), K(tenant_id_), K(meta_tenant_id));
} else if (tenant_data_version < MOCK_DATA_VERSION_4_2_4_0) {
//由于config_version在430版本已经存在了,所以兼容性判断为小于等于4240不支持
is_valid_use = false;
LOG_INFO("not ready to to use new version", KR(ret), K(tenant_data_version));
} else {
is_valid_use = true;
}
return ret;
}
int ObLSRecoveryStatHandler::construct_addr_list_(
const palf::PalfStat &palf_stat,
ObIArray<common::ObAddr> &addr_list)
{
int ret = OB_SUCCESS;
addr_list.reset();
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret), K_(is_inited));
} else if (OB_FAIL(get_latest_palf_stat_(palf_stat_first))) {
LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls));
} else if (!is_strong_leader(palf_stat_first.role_)) {
ret = OB_NOT_MASTER;
LOG_TRACE("not leader", KR(ret), K(palf_stat_first));
} else if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_3_0_0) {
ret = OB_NEED_WAIT;
LOG_WARN("not ready to gather replica readable scn", KR(ret));
} else if (OB_UNLIKELY(!palf_stat.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("palf stat is invalid", KR(ret), K(palf_stat));
} else {
common::ObMember member;
common::ObMemberList &member_list = palf_stat_first.paxos_member_list_;
const common::ObMemberList &member_list = palf_stat.paxos_member_list_;
for (int64_t i = 0; OB_SUCC(ret) && i < member_list.get_member_number(); ++i) {
if (OB_FAIL(member_list.get_member_by_index(i, member))) {
LOG_WARN("failed to get member by index", KR(ret), K(i));
@ -478,34 +774,116 @@ int ObLSRecoveryStatHandler::gather_replica_readable_scn()
}
}
}
if (FAILEDx(do_get_each_replica_readable_scn_(addr_list, replicas_scn))) {
return ret;
}
int ObLSRecoveryStatHandler::dump_all_replica_readable_scn_(const bool force_dump)
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret), K_(is_inited));
} else {
ObLSID ls_id = ls_->get_ls_id();
SpinRLockGuard guard(lock_);
const int64_t PRINT_INTERVAL = 5 * 1000 * 1000L;
const int64_t now = ObTimeUtility::current_time();
if (force_dump || now - last_dump_ts_ > PRINT_INTERVAL) {
LOG_INFO("ls readable scn in memory", K(ls_id), K(replicas_scn_),
K(last_dump_ts_), K(force_dump));
last_dump_ts_ = now;
} else {
LOG_TRACE("ls readable scn in memory", KR(ret), K(ls_id),
K(readable_scn_upper_limit_), K(replicas_scn_));
}
}
return ret;
}
int ObLSRecoveryStatHandler::check_member_change_valid_(
const ObMemberList &new_member_list, const int64_t paxos_replica_num, bool &is_valid)
{
int ret = OB_SUCCESS;
SCN readable_scn;
palf::PalfStat palf_stat;
ObArray<ObAddr> addr_list;
is_valid = false;
if (OB_UNLIKELY(!new_member_list.is_valid() || 0 >= paxos_replica_num)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(new_member_list), K(paxos_replica_num));
} else if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret), K_(is_inited));
} else if (OB_FAIL(get_palf_stat_(palf_stat))) {
LOG_WARN("failed to get palf stat", KR(ret));
} else if (OB_FAIL(new_member_list.get_addr_array(addr_list))) {
LOG_WARN("failed to get addr array", KR(ret), K(new_member_list));
} else if (OB_FAIL(do_get_majority_readable_scn_V2_(addr_list,
rootserver::majority(paxos_replica_num), palf_stat.config_version_, readable_scn))) {
LOG_WARN("failed to get majority readable scn", KR(ret), K(addr_list),
K(paxos_replica_num), K(palf_stat));
} else {
SpinRLockGuard guard(lock_);
if (readable_scn >= readable_scn_upper_limit_) {
is_valid = true;
LOG_INFO("can change member list", K(new_member_list), K(paxos_replica_num),
"new readable_scn", readable_scn, "current readable_scn", readable_scn_upper_limit_,
"ls_id", ls_->get_ls_id(), K(palf_stat));
} else if (REACH_TENANT_TIME_INTERVAL(1 * 1000 * 1000L)) {
LOG_INFO("can not change member list", K(new_member_list), K(paxos_replica_num),
K(readable_scn), K(readable_scn_upper_limit_),
"ls_id", ls_->get_ls_id(), K(palf_stat));
}
}
return ret;
}
int ObLSRecoveryStatHandler::gather_replica_readable_scn()
{
int ret = OB_SUCCESS;
ObArray<ObLSReplicaReadableSCN> replicas_scn;
palf::PalfStat palf_stat_first;
palf::PalfStat palf_stat_second;
ObArray<common::ObAddr> addr_list;
int tmp_ret = OB_SUCCESS;
bool is_valid_use = false;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret), K_(is_inited));
} else if (OB_FAIL(get_latest_palf_stat_(palf_stat_first))) {
LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls));
} else if (!is_strong_leader(palf_stat_first.role_)) {
ret = OB_NOT_MASTER;
LOG_TRACE("not leader", KR(ret), K(palf_stat_first));
} else if (OB_FAIL(check_can_use_new_version_(is_valid_use))) {
LOG_WARN("failed to check can use new version", KR(ret));
} else if (!is_valid_use) {
ret = OB_NEED_RETRY;
LOG_WARN("not valid to use new version", KR(ret));
} else if (OB_FAIL(construct_addr_list_(palf_stat_first, addr_list))) {
LOG_WARN("failed to construct addr list", KR(ret), K(palf_stat_first));
} else if (OB_FAIL(do_get_each_replica_readable_scn_(addr_list, replicas_scn))) {
LOG_WARN("failed to get each replica readable", KR(ret), K(addr_list));
} else if (OB_FAIL(get_palf_stat_(palf_stat_second))) {
LOG_WARN("failed to get palf stat", KR(ret));
} else if (palf_stat_second.config_version_ != palf_stat_first.config_version_) {
ret = OB_EAGAIN;
ret = OB_NEED_RETRY;
LOG_WARN("config version change", KR(ret), K(palf_stat_second), K(palf_stat_first));
} else {
SpinWLockGuard guard(lock_);
ObLSID ls_id = ls_->get_ls_id();
config_version_ = palf_stat_second.config_version_;
replicas_scn_.reset();
if (OB_FAIL(replicas_scn_.assign(replicas_scn))) {
LOG_WARN("failed to replicas scn", KR(ret), K(replicas_scn));
}
const int64_t PRINT_INTERVAL = 1 * 1000 * 1000L;
if (REACH_TENANT_TIME_INTERVAL(PRINT_INTERVAL)) {
LOG_INFO("ls readable scn in memory", KR(ret), K(ls_id), K(replicas_scn_));
} else {
LOG_TRACE("ls readable scn in memory", KR(ret), K(ls_id), K(replicas_scn_));
}
}
if (is_strong_leader(palf_stat_second.role_)) {
//优先把正确的config_version更新到内部表和内存中
int tmp_ret = OB_SUCCESS;
if (is_strong_leader(palf_stat_second.role_) && is_valid_use) {
if (OB_TMP_FAIL(try_reload_and_fix_config_version_(palf_stat_second.config_version_))) {
ret = OB_SUCC(ret) ? tmp_ret : ret;
LOG_WARN("failed to try reload and fix config version", KR(tmp_ret), KR(ret), K(palf_stat_second));
LOG_WARN("failed to try reload and fix config version",
KR(tmp_ret), KR(ret), K(palf_stat_second));
}
}
if (OB_NOT_MASTER != ret) {
if (OB_TMP_FAIL(dump_all_replica_readable_scn_(OB_FAIL(ret)))) {
LOG_WARN("failed to dump replica readable scn", KR(ret), KR(tmp_ret));
}
}
return ret;
@ -521,6 +899,9 @@ int ObLSRecoveryStatHandler::do_get_each_replica_readable_scn_(
} else if (OB_UNLIKELY(0 >= ob_member_list.count())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(ob_member_list));
} else if (OB_ISNULL(GCTX.srv_rpc_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rpc proxy is null", KR(ret));
} else {
obrpc::ObGetLSReplayedScnArg arg;
ObGetLSReplayedScnProxy proxy(
@ -529,6 +910,7 @@ int ObLSRecoveryStatHandler::do_get_each_replica_readable_scn_(
int tmp_ret = OB_SUCCESS;
ObArray<int> return_code_array;
ObLSReplicaReadableSCN replica_scn;
int group_id = share::OBCG_DBA_COMMAND;
if (OB_FAIL(arg.init(tenant_id_, ls_->get_ls_id(), false))) {
LOG_WARN("failed to init arg", KR(ret), K_(tenant_id), KPC_(ls));
} else if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
@ -536,18 +918,10 @@ int ObLSRecoveryStatHandler::do_get_each_replica_readable_scn_(
}
for (int64_t i = 0; OB_SUCC(ret) && i < ob_member_list.count(); ++i) {
const ObAddr &member = ob_member_list.at(i);
if (GCTX.self_addr() == member) {
SCN self_readable_scn;
if (OB_FAIL(ls_->get_max_decided_scn(self_readable_scn))) {
LOG_WARN("failed to get max decide scn", KR(ret));
} else if (OB_FAIL(replica_scn.init(member, self_readable_scn))) {
LOG_WARN("failed to init replica scn", KR(ret), K(member), K(self_readable_scn));
} else if (OB_FAIL(replicas_scn.push_back(replica_scn))) {
LOG_WARN("failed to push back replica scn", KR(ret), K(replica_scn));
}
} else if (OB_TMP_FAIL(proxy.call(member, ctx.get_timeout(), tenant_id_, arg))) {
LOG_WARN("failed to send rpc", KR(ret), K(member), K(i), K(ctx),
K_(tenant_id), K(arg), K(ob_member_list));
if (OB_TMP_FAIL(proxy.call(member, ctx.get_timeout(),
GCONF.cluster_id, tenant_id_, group_id, arg))) {
LOG_WARN("failed to send rpc", KR(ret), KR(tmp_ret), K(member), K(i), K(ctx),
K_(tenant_id), K(arg), K(ob_member_list), K(group_id));
}
}
if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
@ -587,6 +961,7 @@ int ObLSRecoveryStatHandler::get_majority_readable_scn_(
palf::PalfStat palf_stat_second;
ObArray<common::ObAddr> member_list_new;
int64_t paxos_replica_number_new = 0;
bool is_valid_to_use = false;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret), K_(is_inited));
@ -601,13 +976,20 @@ int ObLSRecoveryStatHandler::get_majority_readable_scn_(
member_list_new,
paxos_replica_number_new))) {
LOG_WARN("construct_new_member_list failed", KR(ret), KPC_(ls), K(palf_stat_first));
} else if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_3_0_0) {
} else if (OB_FAIL(check_can_use_new_version_(is_valid_to_use))) {
LOG_WARN("failed to check can use new version", KR(ret));
} else if (is_valid_to_use) {
//use readable in memory
if (OB_FAIL(do_get_majority_readable_scn_V2_(member_list_new,
rootserver::majority(paxos_replica_number_new), palf_stat_first.config_version_,
majority_min_readable_scn))) {
LOG_WARN("failed to get majority readable scn", KR(ret), K(palf_stat_first),
K(paxos_replica_number_new), K(member_list_new));
} else if (OB_FAIL(set_inner_readable_scn(palf_stat_first.config_version_,
majority_min_readable_scn, true))) {
//尝试更新内存中的可读点
LOG_WARN("failed to set inner readable scn", KR(ret), K(palf_stat_first),
K(majority_min_readable_scn));
}
} else if (OB_FAIL(do_get_majority_readable_scn_(member_list_new,
leader_readable_scn, rootserver::majority(paxos_replica_number_new), majority_min_readable_scn))) {
@ -617,7 +999,7 @@ int ObLSRecoveryStatHandler::get_majority_readable_scn_(
if (FAILEDx(get_latest_palf_stat_(palf_stat_second))) {
LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls));
} else if (palf_stat_first.config_version_ != palf_stat_second.config_version_) {
ret = OB_EAGAIN;
ret = OB_NEED_RETRY;
LOG_WARN("config_version changed, try again", KR(ret), K(palf_stat_first), K(palf_stat_second));
}
@ -738,12 +1120,12 @@ int ObLSRecoveryStatHandler::do_get_majority_readable_scn_V2_(
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret), K_(is_inited));
} else if ( ob_member_list.count() <= 0
} else if (ob_member_list.count() <= 0
|| 0 >= need_query_member_cnt
|| !config_version.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(need_query_member_cnt),
K(ob_member_list), K(config_version));
K(ob_member_list), K(config_version));
} else {
SpinRLockGuard guard(lock_);
if (config_version_ != config_version) {
@ -752,8 +1134,8 @@ int ObLSRecoveryStatHandler::do_get_majority_readable_scn_V2_(
K(config_version_));
}
for (int64_t i = 0; OB_SUCC(ret) && i < replicas_scn_.count(); ++i) {
ObAddr &server = replicas_scn_.at(i).get_server();
SCN readable_scn= replicas_scn_.at(i).get_readable_scn();
const ObAddr &server = replicas_scn_.at(i).get_server();
const SCN &readable_scn = replicas_scn_.at(i).get_readable_scn();
if (has_exist_in_array(ob_member_list, server)) {
if (OB_FAIL(replica_readble_scn.push_back(readable_scn))) {
LOG_WARN("failed to push back", KR(ret), K(i), K(server), K(readable_scn));
@ -762,7 +1144,7 @@ int ObLSRecoveryStatHandler::do_get_majority_readable_scn_V2_(
}
}
if (FAILEDx(do_calc_majority_min_readable_scn_(need_query_member_cnt,
replica_readble_scn, majority_min_readable_scn))) {
replica_readble_scn, majority_min_readable_scn))) {
LOG_WARN("failed to calc majority readable scn", KR(ret),
K(need_query_member_cnt), K(replica_readble_scn));
}
@ -800,7 +1182,7 @@ int ObLSRecoveryStatHandler::calc_majority_min_readable_scn_(
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("send rpc is failed", KR(tmp_ret), K(i), K(return_code_array));
} else {
const auto *result = proxy.get_results().at(i);
const ObGetLSReplayedScnRes *result = proxy.get_results().at(i);
if (OB_ISNULL(result)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result is null", KR(ret), K(i), K(return_code_array));

View File

@ -24,11 +24,13 @@ namespace oceanbase
namespace storage
{
class ObLS;
class ObLSHandle;
}
namespace rootserver
{
class ObLSRecoveryStatHandler;
class TestLSRecoveryGuard;
struct ObLSReplicaReadableSCN
{
public:
@ -36,20 +38,77 @@ public:
~ObLSReplicaReadableSCN() {}
int init(const common::ObAddr &server, const share::SCN &readable_scn);
share::SCN get_readable_scn()const
share::SCN get_readable_scn() const
{
return readable_scn_;
}
common::ObAddr &get_server()
common::ObAddr get_server() const
{
return server_;
}
TO_STRING_KV(K_(server), K_(readable_scn));
TO_STRING_KV(K_(server), K_(readable_scn));
private:
common::ObAddr server_;
share::SCN readable_scn_;
};
class ObLSRecoveryGuard
{
//To implement a mutex for ls_recovery_stat reporting and member_list changes,
//it needs to be used on the server that is the leader of the LS for this tenant.
//Successful initialization indicates that the mutex has been acquired successfully.
//This interface can also be invoked by meta and sys tenants without errors and without mutual exclusion.
//These two types of tenants do not report on ls_recovery_stat; the interface is used solely to standardize member changes.
// The detructor of the class will clear the reference count and reset any set information
public:
ObLSRecoveryGuard() :
tenant_id_(OB_INVALID_TENANT_ID), ls_handle_(), ls_recovery_stat_(NULL) {}
~ObLSRecoveryGuard();
/**
* @description:
* In this interface, implement mutual exclusion,
* which will wait for the reference count to drop to zero
* within the timeout period, and then increment the reference count.
* meta and sys only set tenant_id, nothing todo
*
* @param[in] tenant_id: tenant_id
* @param[in] ls_id: use to get ls_recovery_stat from ObLS
* @param[in] timeout: Timeout period, with a default value of -1, which indicates no waiting.
* The reference count will be incremented if possible without waiting in case of failure.
* The system will attempt to increase the reference count successfully within the timeout period.
* @return : OB_SUCCESS: Reference increment successful.
* OB_EAGAIN:Wait for reference increment to fail within the timeout period due to other reference points not being successfully released.
*/
int init(const uint64_t tenant_id, const share::ObLSID &ls_id,
const int64_t &timeout = -1);
//TODO 判断是否可以加减成员, meta or sys directly return success
/*
* @description: Used to check whether the readable_SCN of this member has surpassed the reported readable_SCN,
* for the purpose of judging member changes and the addition of replicas during timeout.
* @param[in] server: target server
* @param[in] timeout: Timeout period
* */
int check_can_add_member(const ObAddr &server, const int64_t timeout);
/*
* @description: Within the timeout period, determine whether the new member list can ensure that the readable_SCN does not regress.
* @param[in] new_member_list : new_member_list
* @param[in] paxos_replica_num : paxos_replica_num of new member_list
* @param[in] timeout: Timeout period
* **/
int check_can_change_member(const ObMemberList &new_member_list,
const int64_t paxos_replica_num,
const int64_t timeout);
friend class TestLSRecoveryGuard;
private:
bool skip_check_member_list_change_(const uint64_t tenant_id);
private:
uint64_t tenant_id_;
ObLSHandle ls_handle_;
ObLSRecoveryStatHandler *ls_recovery_stat_;
};
/**
* @description:
* ObLSRecoveryStatHandler exists on the LS of each observer and is responsible for
@ -58,10 +117,10 @@ private:
class ObLSRecoveryStatHandler
{
public:
ObLSRecoveryStatHandler() { reset(); }
~ObLSRecoveryStatHandler() { reset(); }
ObLSRecoveryStatHandler() { reset(true); }
~ObLSRecoveryStatHandler() { reset(false); }
void reset(const bool is_init = false);
int init(const uint64_t tenant_id, ObLS *ls);
void reset();
/**
* @description:
* get ls readable_scn considering readable scn, sync scn and replayable scn.
@ -69,15 +128,6 @@ public:
* @return return code
*/
int get_ls_replica_readable_scn(share::SCN &readable_scn);
/*
* @description:
* Get the readable_scn of other replicas
* @param[out] readable_scn ls readable_scn
* @return return code
* */
int get_all_replica_min_readable_scn(share::SCN &readable_scn);
/**
* @description:
* get ls level recovery_stat by LS leader.
@ -88,13 +138,65 @@ public:
int get_ls_level_recovery_stat(share::ObLSRecoveryStat &ls_recovery_stat);
int set_add_replica_server(const common::ObAddr &server);
void reset_add_replica_server();
/*
* @description:
* get all ls replica readable and set to replicas_scn_;
*/
int gather_replica_readable_scn();
/*
* @description:
* Add a reference, which may be used for reporting or member list changes.
* @param[in] timeout : Add a reference within the timeout range; if unsuccessful, wait for 100ms.
* */
int inc_ref(const int64_t timeout);
/*
* @description:
* Subtract a reference.
* */
void dec_ref();
/*
* @description:
* Set a readable_scn of inner_table in memory and use config_version for verification.
* */
int set_inner_readable_scn(const palf::LogConfigVersion &config_version,
const share::SCN &readable_scn, bool check_inner_config_valid);
/*
* @description:
* clear readable_scn and config_version of inner_table in memory;
* */
int reset_inner_readable_scn();
TO_STRING_KV(K_(tenant_id), K_(ls));
/*
* @description: get ls all paxos replica min readable_scn
* @param[out] min readable_scn of all paxos replica
* @return:
* OB_NOT_MASTER : replica not master, can not get readable_scn of other replica
* OB_NEED_RETRY : If there's no readable scn with all replicas;
* the config_version corresponding to the current cached readable scn does not match the latest config_version;
* or the config_version has changed during the statistical process.
* */
int get_all_replica_min_readable_scn(share::SCN &readable_scn);
/*
* @description: Used to check whether the readable_SCN of this member has surpassed the reported readable_SCN,
* for the purpose of judging member changes and the addition of replicas during timeout.
* @param[in] server: target server
* @param[in] timeout: Timeout period
* @return:
*/
int wait_server_readable_scn(const common::ObAddr &server, const int64_t timeout);
/*
* @description: Within the timeout period, determine whether the new member list can ensure that the readable_SCN does not regress.
* @param[in] new_member_list : new_member_list
* @param[in] paxos_replica_num : paxos_replica_num of new member_list
* @param[in] timeout: Timeout period
* @return:
*/
int wait_can_change_member_list(const ObMemberList &new_member_list,
const int64_t paxos_replica_num, const int64_t timeout);
TO_STRING_KV(K_(tenant_id), K_(ls), K(ref_cnt_));
friend class TestLSRecoveryGuard;
friend class ObLSRecoveryGuard;
private:
int check_inner_stat_();
@ -159,31 +261,83 @@ private:
const int64_t majority_cnt,
ObArray<SCN> &readable_scn_list,
share::SCN &majority_min_readable_scn);
int construct_new_member_list_(
const common::ObMemberList &member_list_ori,
const common::GlobalLearnerList &degraded_list,
const int64_t paxos_replica_number_ori,
ObIArray<common::ObAddr> &member_list_new,
int64_t &paxos_replica_number_new);
int check_can_use_new_version_(bool &vaild_to_use);
int construct_addr_list_(const palf::PalfStat &palf_stat,
ObIArray<common::ObAddr> &addr_list);
int dump_all_replica_readable_scn_(const bool force_dump);
int try_reload_and_fix_config_version_(const palf::LogConfigVersion &current_version);
template<typename... Args>
int wait_func_with_timeout_(const int64_t timeout, Args &&... args);
int check_member_change_valid_(const common::ObAddr &server, bool &is_valid);
int check_member_change_valid_(const ObMemberList &new_member_list,
const int64_t paxos_replica_num, bool &is_valid);
DISALLOW_COPY_AND_ASSIGN(ObLSRecoveryStatHandler);
private:
bool is_inited_;
uint64_t tenant_id_;
ObLS *ls_;
ObCond ref_cond_;//用于加锁等待唤醒
int64_t ref_cnt_;//use for Concurrency control
common::SpinRWLock lock_;
share::SCN readable_scn_in_inner_;//readable_scn of inner_table
//存储内部表和统计出来的最大值
//只在config_version_in_inner_合法的时候有效
share::SCN readable_scn_upper_limit_;//the max readable_scn of inner_table and memory
palf::LogConfigVersion config_version_in_inner_;//config_version in inner_table
common::ObAddr extra_server_;//for add replica, need to gather add_replica's readable_scn
palf::LogConfigVersion config_version_;
ObArray<ObLSReplicaReadableSCN> replicas_scn_;
int64_t last_dump_ts_;//用于记录上次打印内存可读点的时间戳
palf::LogConfigVersion config_version_;//记录统计可读点replicas_scn_使用的config_version
//成员列表里面最多只有OB_MAX_MEMBER_NUMBER,这个时候可能触发迁移,所以需要增加一个成员
ObSEArray<ObLSReplicaReadableSCN, OB_MAX_MEMBER_NUMBER + 1, ObNullAllocator> replicas_scn_;//缓存每个副本的可读位点
};
template <typename... Args>
inline int ObLSRecoveryStatHandler::wait_func_with_timeout_(
const int64_t timeout, Args &&...args)
{
int ret = OB_SUCCESS;
bool is_valid_use = false;
if (OB_UNLIKELY(timeout <= 0)) {
ret = OB_INVALID_ARGUMENT;
RS_LOG(WARN, "invalid argument", KR(ret), K(timeout));
} else if (OB_FAIL(check_can_use_new_version_(is_valid_use))) {
RS_LOG(WARN, "failed to check use new version", KR(ret));
} else if (!is_valid_use) {
RS_LOG(INFO, "can not use readable in memory, no need to check", K(timeout));
} else {
int64_t current_timeout = timeout;
const int64_t TIME_WAIT = 100 * 1000;
bool is_finish = false;
do {
if (OB_FAIL(check_member_change_valid_(std::forward<Args>(args)..., is_finish))) {
RS_LOG(WARN, "failed to check", KR(ret));
} else if (current_timeout > 0) {
usleep(TIME_WAIT);
current_timeout -= TIME_WAIT;
} else if (OB_SUCC(ret)) {
ret = OB_TIMEOUT;
RS_LOG(WARN, "failed to wait server readable scn", KR(ret), K(timeout));
}
} while (current_timeout > 0 && OB_SUCC(ret) && !is_finish);
}
if (OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(dump_all_replica_readable_scn_(true))) {
RS_LOG(WARN, "failed to dump all replica readable scn", KR(ret), KR(tmp_ret));
}
}
return ret;
}
} // namespace rootserver
}
#endif // OCEANBASE_STORAGE_OB_LS_RECOVERY_STAT_HANDLER

View File

@ -1521,6 +1521,7 @@ int ObTenantLSInfo::get_next_primary_zone(
}
return ret;
}
int ObLSServiceHelper::check_transfer_task_replay(const uint64_t tenant_id,
const share::ObLSID &src_ls,
const share::ObLSID &dest_ls,
@ -1532,12 +1533,19 @@ int ObLSServiceHelper::check_transfer_task_replay(const uint64_t tenant_id,
share::ObLSStatusInfo ls_status;
SCN readble_scn;
replay_finish = true;
uint64_t data_version = 0;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
|| !src_ls.is_valid() || !dest_ls.is_valid()
|| !transfer_scn.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(src_ls),
K(dest_ls), K(transfer_scn));
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
LOG_WARN("failed to get min data version", KR(ret), K(tenant_id));
} else if (MOCK_DATA_VERSION_4_2_4_0 > data_version) {
//高版本是从4.3.0开始支持,兼容性不需要改变
replay_finish = true;
LOG_INFO("no need check all ls replica", K(tenant_id), K(data_version));
} else if (OB_FAIL(check_ls_transfer_replay_(tenant_id, src_ls, transfer_scn, replay_finish))) {
LOG_WARN("failed to check ls transfer replay", KR(ret), K(tenant_id), K(src_ls), K(transfer_scn));
} else if (!replay_finish) {
@ -1618,6 +1626,9 @@ int ObLSServiceHelper::get_ls_all_replica_readable_scn_(const uint64_t tenant_id
K(timeout), K(arg));
} else if (OB_FAIL(proxy.wait())) {
LOG_WARN("failed to get wait all rpc", KR(ret), K(tenant_id), K(ls_id), K(leader));
} else if (1 != proxy.get_results().count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result count not expected", KR(ret), "result", proxy.get_results());
} else if (OB_ISNULL(proxy.get_results().at(0))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("result is null", KR(ret), K(tenant_id), K(leader), K(ls_id));

View File

@ -488,6 +488,7 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC
commit_log.get_multi_source_data();
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_);
ObMySQLTransaction trans;
ObLSRecoveryGuard guard;
for (int64_t i = 0; OB_SUCC(ret) && i < source_data.count(); ++i) {
const ObTxBufferNode &node = source_data.at(i);
if (OB_FAIL(try_cancel_clone_job_for_standby_tenant_(node))) {
@ -534,6 +535,9 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC
* NORMAL状态tenant_info的sync_scn小于系统日志流的sync_scn
*
* */
} else if (OB_FAIL(guard.init(tenant_id_, SYS_LS))) {
LOG_WARN("failed to init guard", KR(ret), K(tenant_id_));
} else if (OB_FAIL(report_sys_ls_recovery_stat_in_trans_(sync_scn, false, trans,
"report recovery stat and has multi data source", true/*need_check_sync_scn*/))) {
LOG_WARN("failed to report sys ls recovery stat", KR(ret), K(sync_scn));
@ -680,6 +684,27 @@ int ObRecoveryLSService::process_upgrade_log_(
return ret;
}
int ObRecoveryLSService::get_ls_(storage::ObLSHandle &ls_handle, storage::ObLS *&ls)
{
int ret = OB_SUCCESS;
ObLSService *ls_svr = MTL(ObLSService *);
ls = NULL;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(inited_));
} else if (OB_ISNULL(ls_svr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls service is null", KR(ret));
} else if (OB_FAIL(ls_svr->get_ls(SYS_LS, ls_handle, storage::ObLSGetMod::RS_MOD))) {
LOG_WARN("failed to get ls", KR(ret));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls is NULL", KR(ret), K(ls_handle));
}
return ret;
}
int ObRecoveryLSService::process_upgrade_data_version_log_(
const share::SCN &sync_scn,
const ObTxBufferNode &node,
@ -839,21 +864,17 @@ int ObRecoveryLSService::construct_sys_ls_recovery_stat_based_on_sync_scn_(
{
int ret = OB_SUCCESS;
ls_stat.reset();
ObLSService *ls_svr = MTL(ObLSService *);
ObLSHandle ls_handle;
ObLS *ls = NULL;
ObLSRecoveryStat tmp_ls_stat;
SCN readable_scn;
storage::ObLSHandle ls_handle;
storage::ObLS *ls = NULL;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(inited_));
} else if (OB_ISNULL(ls_svr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls service is null", KR(ret));
} else if (OB_FAIL(ls_svr->get_ls(SYS_LS, ls_handle, storage::ObLSGetMod::RS_MOD))) {
LOG_WARN("failed to get ls", KR(ret));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
} else if (OB_FAIL(get_ls_(ls_handle, ls))) {
LOG_WARN("failed to get sys ls", KR(ret));
} else if (OB_ISNULL(ls)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls is NULL", KR(ret), K(ls_handle));
} else if (OB_FAIL(ls->get_ls_level_recovery_stat(tmp_ls_stat))) {
@ -1318,17 +1339,12 @@ int ObRecoveryLSService::try_do_ls_balance_task_(
LOG_WARN("failed to do ls alter task", KR(ret), K(ls_balance_task));
}
} else if (ls_balance_task.get_task_op().is_transfer_end()) {
can_remove = true;
bool is_replay_finish = true;
if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay(
tenant_id_, ls_balance_task.get_src_ls(),
ls_balance_task.get_dest_ls(), ls_balance_task.get_operation_scn(),
is_replay_finish))) {
can_remove))) {
LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_),
K(ls_balance_task), K(tenant_info));
} else if (!is_replay_finish) {
ret = OB_NEED_RETRY;
LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task));
}
} else if (ls_balance_task.get_task_op().is_transfer_begin()) {
if (OB_FAIL(check_transfer_begin_can_remove_(ls_balance_task, tenant_info, can_remove))) {
@ -1349,10 +1365,10 @@ int ObRecoveryLSService::try_do_ls_balance_task_(
LOG_INFO("task can be remove", KR(ret), K(ls_balance_task));
ROOTSERVICE_EVENT_ADD("standby_tenant", "remove_balance_task",
K_(tenant_id), "task_type", ls_balance_task.get_task_op(),
"task_scn", ls_balance_task.get_operation_scn(),
"task_scn", ls_balance_task.get_operation_scn().get_val_for_inner_table_field(),
"switchover_status", tenant_info.get_switchover_status(),
"src_ls", ls_balance_task.get_src_ls(),
"dest_ls", ls_balance_task.get_dest_ls());
"src_ls", ls_balance_task.get_src_ls().id(),
"dest_ls", ls_balance_task.get_dest_ls().id());
}
END_TRANSACTION(trans)
}
@ -1380,7 +1396,6 @@ int ObRecoveryLSService::check_transfer_begin_can_remove_(
//find transfer end, or tenant is in flashback
ObBalanceTaskHelper transfer_end_task;
SCN transfer_scn;
bool is_replay_finish = false;
ret = ObBalanceTaskHelperTableOperator::try_find_transfer_end(tenant_id_,
ls_balance_task.get_operation_scn(), ls_balance_task.get_src_ls(),
ls_balance_task.get_dest_ls(), *proxy_, transfer_end_task);
@ -1409,17 +1424,19 @@ KR(ret), K(tenant_id_), K(tenant_info), K(ls_balance_task));
LOG_WARN("can not find transfer end task, can not end transfer begin task", KR(ret), K(tenant_info), K(ls_balance_task));
ret = OB_SUCCESS;
}
if (OB_FAIL(ret) || !can_remove) {
} else if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay(
tenant_id_, ls_balance_task.get_src_ls(),
ls_balance_task.get_dest_ls(), transfer_scn, is_replay_finish))) {
LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_), K(ls_balance_task),
K(tenant_info), K(transfer_scn));
} else if (!is_replay_finish) {
ret = OB_NEED_RETRY;
LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task), K(transfer_scn));
if (OB_SUCC(ret) && can_remove) {
if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay(
tenant_id_, ls_balance_task.get_src_ls(),
ls_balance_task.get_dest_ls(), transfer_scn, can_remove))) {
LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_),
K(ls_balance_task), K(transfer_scn));
} else if (can_remove) {
FLOG_INFO("ls all replica replay to newest, can remove", K(ls_balance_task));
} else if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) {
// 10s
LOG_WARN("can not remove ls balance task helper", K(ls_balance_task), K(tenant_info));
}
}
}
return ret;
}

View File

@ -35,6 +35,7 @@ class ObMySQLTransaction;
namespace transaction
{
class ObTxBufferNode;
class ObTxLogBlock;
}
namespace share
{
@ -49,15 +50,16 @@ class ObMultiVersionSchemaService;
class ObTenantSchema;
}
}
namespace storage
{
class ObLS;
class ObLSHandle;
}
namespace logservice
{
class ObLogHandler;
class ObGCLSLog;
}
namespace transaction
{
class ObTxLogBlock;
}
namespace palf
{
class PalfHandleGuard;
@ -143,6 +145,7 @@ private:
bool &can_remove);
int do_ls_balance_alter_task_(const share::ObBalanceTaskHelper &ls_balance_task,
common::ObMySQLTransaction &trans);
int get_ls_(storage::ObLSHandle &ls_handle, storage::ObLS *&ls);
int reset_restore_proxy_(ObRestoreSourceServiceAttr &service_attr);
int get_ls_all_replica_readable_scn_(const share::ObLSID &ls_id, share::SCN &reabable_scn);
#ifdef OB_BUILD_LOG_STORAGE_COMPRESS

View File

@ -24,6 +24,7 @@
#include "share/ob_share_util.h"
#include "share/ls/ob_ls_status_operator.h"
#include "share/scn.h"
#include "logservice/palf/log_meta_info.h"//LogConfigVersion
using namespace oceanbase;
using namespace oceanbase::common;
@ -536,11 +537,12 @@ int ObLSRecoveryStatOperator::update_ls_config_version(
common::ObSqlString sql;
ObString config_version_str;
ObArenaAllocator allocator("VersionStr");
char config_version_val[128] = {0};
const int64_t CONFIG_LEN = palf::LogConfigVersion::CONFIG_VERSION_LEN + 1;//128 + \0
char config_version_val[CONFIG_LEN] = {0};
if (OB_FAIL(type_to_hex_str(config_version, allocator,
config_version_str))) {
LOG_WARN("failed to type to hex", KR(ret), K(config_version));
} else if (0 > config_version.to_string(config_version_val, 128)) {
} else if (0 > config_version.to_string(config_version_val, CONFIG_LEN)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("config_version to string failed", KR(ret), K(config_version));
} else if (OB_FAIL(sql.assign_fmt("UPDATE %s SET config_version = '%s', bconfig_version = '%.*s' "
@ -569,7 +571,8 @@ int ObLSRecoveryStatOperator::update_ls_config_version(
int ObLSRecoveryStatOperator::get_min_create_scn_(
const uint64_t tenant_id, const common::ObSqlString &sql,
ObISQLClient &client, SCN &min_create_scn) {
ObISQLClient &client, SCN &min_create_scn)
{
int ret = OB_SUCCESS;
min_create_scn = SCN::base_scn();
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {

View File

@ -267,9 +267,13 @@ public:
* description: update ls config_version and return current readable_scn
* @param[in] tenant_id
* @param[in] ls_id
* @param[in] config_version :will not fallback
* @param[in] config_version : If the new and old config_version are equal, no update is performed, and it directly returns success.
* If the new config_version is greater than the old config_version, update it to the internal table.
* If the new config_version is less than the old config_version, report an error.
* @param[in] client
* @param[out] readable scn
* return: OB_SUCCESS: new and current config version maybe equal, or new bigger than current
* OB_NEED_RETRY: new config_version smaller than current
*/
int update_ls_config_version(const uint64_t tenant_id,
const ObLSID &ls_id, const palf::LogConfigVersion &config_version,

View File

@ -7424,6 +7424,8 @@ bool ObGetLSReplayedScnRes::is_valid() const
return OB_INVALID_TENANT_ID != tenant_id_
&& ls_id_.is_valid()
&& cur_readable_scn_.is_valid_and_not_min();
//no need check offline_scn,offline_scn可能没有并且有升级兼容性问题
//no need check server valid
}
int ObGetLSReplayedScnRes::init(
const uint64_t tenant_id,
@ -7437,7 +7439,7 @@ int ObGetLSReplayedScnRes::init(
|| !ls_id.is_valid()
|| !cur_readable_scn.is_valid_and_not_min()
|| !server.is_valid())) {
//不用校验offline_scn,可能就是一个非法的
//不用校验offline_scn,可能就是一个非法的
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(cur_readable_scn), K(server));
} else {

View File

@ -8648,12 +8648,12 @@ public:
ObGetLSReplayedScnRes(): tenant_id_(OB_INVALID_TENANT_ID),
ls_id_(),
cur_readable_scn_(share::SCN::min_scn()),
offline_scn_(),
self_addr_() {}
offline_scn_(), self_addr_() {}
~ObGetLSReplayedScnRes() {}
bool is_valid() const;
int init(const uint64_t tenant_id, const share::ObLSID &ls_id, const share::SCN &cur_readable_scn,
const share::SCN &offline_scn, const common::ObAddr &server);
int init(const uint64_t tenant_id, const share::ObLSID &ls_id,
const share::SCN &cur_readable_scn,
const share::SCN &offline_scn, const common::ObAddr &server);
int assign(const ObGetLSReplayedScnRes &other);
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(cur_readable_scn), K_(offline_scn), K(self_addr_));
uint64_t get_tenant_id() const
@ -8684,7 +8684,7 @@ private:
share::ObLSID ls_id_;
share::SCN cur_readable_scn_;
share::SCN offline_scn_;//add in 4.2.2.0
common::ObAddr self_addr_;//add in 4.3.0
common::ObAddr self_addr_;//add in 4.2.3/4.3.0
};

View File

@ -732,11 +732,6 @@ public:
//int gather_replica_readable_scn();
DELEGATE_WITH_RET(ls_recovery_stat_handler_, gather_replica_readable_scn, int);
// get all ls readable_scn: it will be failed while has replica is offline
// @param[out] readable_scn ls readable_scn
// int get_all_replica_min_readable_scn(share::SCN &readable_scn)
DELEGATE_WITH_RET(ls_recovery_stat_handler_, get_all_replica_min_readable_scn, int);
// disable clog sync.
// with ls read lock and log write lock.
int disable_sync();