From 99abe0b97e9b98ceea88ce52585004ea4133a31d Mon Sep 17 00:00:00 2001 From: maosy <630014370@qq.com> Date: Sat, 12 Oct 2024 05:56:01 +0000 Subject: [PATCH] =?UTF-8?q?[CP]=E5=A4=87=E5=BA=93=E6=88=90=E5=91=98?= =?UTF-8?q?=E5=8F=98=E6=9B=B4=E9=98=B2=E5=9B=9E=E9=80=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: linqiucen Co-authored-by: BinChenn --- mittest/simple_server/CMakeLists.txt | 1 + .../simple_server/test_ls_recovery_stat.cpp | 120 ++++ .../ob_reconfig_checker_adapter.cpp | 12 +- src/logservice/ob_reconfig_checker_adapter.h | 1 + src/logservice/palf/log_meta_info.h | 1 + src/observer/ob_service.cpp | 21 +- .../virtual_table/ob_all_virtual_log_stat.cpp | 3 +- .../virtual_table/ob_all_virtual_log_stat.h | 4 +- src/rootserver/ob_ls_recovery_reportor.cpp | 19 +- .../ob_ls_recovery_stat_handler.cpp | 560 +++++++++++++++--- src/rootserver/ob_ls_recovery_stat_handler.h | 196 +++++- src/rootserver/ob_ls_service_helper.cpp | 11 + .../standby/ob_recovery_ls_service.cpp | 75 ++- .../standby/ob_recovery_ls_service.h | 11 +- src/share/ls/ob_ls_recovery_stat_operator.cpp | 9 +- src/share/ls/ob_ls_recovery_stat_operator.h | 6 +- src/share/ob_rpc_struct.cpp | 4 +- src/share/ob_rpc_struct.h | 10 +- src/storage/ls/ob_ls.h | 5 - 19 files changed, 896 insertions(+), 173 deletions(-) create mode 100644 mittest/simple_server/test_ls_recovery_stat.cpp diff --git a/mittest/simple_server/CMakeLists.txt b/mittest/simple_server/CMakeLists.txt index a4228d6f2..1a8902eca 100644 --- a/mittest/simple_server/CMakeLists.txt +++ b/mittest/simple_server/CMakeLists.txt @@ -74,6 +74,7 @@ ob_unittest_observer(test_ob_simple_cluster test_ob_simple_cluster.cpp) ob_unittest_observer(test_ob_partition_balance test_ob_partition_balance.cpp) ob_unittest_observer(test_ls_status_operator test_ls_status_operator.cpp) ob_unittest_observer(test_balance_operator test_tenant_balance_operator.cpp) +ob_unittest_observer(test_ls_recovery_stat test_ls_recovery_stat.cpp) ob_unittest_observer(test_transfer_partition_task test_transfer_partition_task.cpp) ob_unittest_observer(test_mds_table_checkpoint test_mds_table_checkpoint.cpp) ob_unittest_observer(test_ob_black_list_service test_ob_black_list_service.cpp) diff --git a/mittest/simple_server/test_ls_recovery_stat.cpp b/mittest/simple_server/test_ls_recovery_stat.cpp new file mode 100644 index 000000000..8750e2d1f --- /dev/null +++ b/mittest/simple_server/test_ls_recovery_stat.cpp @@ -0,0 +1,120 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX RS + +#include +#include +#define private public +#define protected public + +#include "env/ob_simple_cluster_test_base.h" +#include "lib/ob_errno.h" +#include "lib/oblog/ob_log.h" +#include "rootserver/ob_ls_recovery_stat_handler.h"// +#include "share/ob_ls_id.h" +#include "share/unit/ob_unit_info.h" +#include "share/ls/ob_ls_status_operator.h" +#include "share/ls/ob_ls_i_life_manager.h" + +namespace oceanbase +{ +using namespace unittest; +using namespace share; +using namespace common; +namespace rootserver +{ +using ::testing::_; +using ::testing::Invoke; +using ::testing::Return; +class TestLSRecoveryGuard : public unittest::ObSimpleClusterTestBase +{ +public: + TestLSRecoveryGuard() : unittest::ObSimpleClusterTestBase("test_ls_recovery_stat") {} +protected: + + uint64_t tenant_id_; +}; + +TEST_F(TestLSRecoveryGuard, sys_recovery_guard) +{ + int ret = OB_SUCCESS; + { + //不init直接析构 + ObLSRecoveryGuard guard; + } + { + //init 系统租户,析构 + ObLSRecoveryGuard guard; + ASSERT_EQ(OB_SUCCESS, guard.init(OB_SYS_TENANT_ID, SYS_LS)); + } + { + //init不存在的租户,或者日志流 + ObLSRecoveryGuard guard; + ASSERT_EQ(OB_TENANT_NOT_IN_SERVER, guard.init(1002, SYS_LS)); + } +} + +TEST_F(TestLSRecoveryGuard, user_recovery_guard) +{ + int ret = OB_SUCCESS; + ASSERT_EQ(OB_SUCCESS, create_tenant()); + tenant_id_ = 1002; + SCN readable_scn; + palf::LogConfigVersion config_version; + { + ObLSRecoveryGuard guard; + ObLSID ls_id(10000000); + ASSERT_EQ(OB_LS_NOT_EXIST, guard.init(tenant_id_, ls_id)); + } + { + ObLSRecoveryGuard guard; + //加锁成功后,不在汇报,但是可以在统计 + ASSERT_EQ(OB_SUCCESS, guard.init(tenant_id_, SYS_LS, 300 * 1000)); + readable_scn = guard.ls_recovery_stat_->readable_scn_upper_limit_; + //内存中的scn还是可以推高的 + SCN readable_scn_memory = guard.ls_recovery_stat_->replicas_scn_.at(0).get_readable_scn(); + config_version = guard.ls_recovery_stat_->config_version_in_inner_; + ASSERT_EQ(1, guard.ls_recovery_stat_->ref_cnt_); + ObLSRecoveryGuard guard1; + //不能加锁成功 + ASSERT_EQ(OB_EAGAIN, guard1.init(tenant_id_, SYS_LS, 2 * 1000 * 1000)); + ASSERT_EQ(OB_INIT_TWICE, guard.init(tenant_id_, SYS_LS)); + ASSERT_EQ(1, guard.ls_recovery_stat_->ref_cnt_); + ASSERT_EQ(OB_SUCCESS, guard.ls_recovery_stat_->reset_inner_readable_scn()); + usleep(3000 * 1000);//sleep 300ms,应该设置成最新 + ASSERT_EQ(readable_scn.get_val_for_sql(), guard.ls_recovery_stat_->readable_scn_upper_limit_.get_val_for_sql()); + ASSERT_NE(readable_scn_memory.get_val_for_sql(), guard.ls_recovery_stat_->replicas_scn_.at(0).readable_scn_.get_val_for_sql()); + } + //释放后,可以推过 + usleep(3000 * 1000); + { + ObLSRecoveryGuard guard; + ASSERT_EQ(OB_SUCCESS, guard.init(tenant_id_, SYS_LS, 300 * 1000)); + ASSERT_NE(readable_scn.get_val_for_sql(), guard.ls_recovery_stat_->readable_scn_upper_limit_.get_val_for_sql()); + readable_scn = guard.ls_recovery_stat_->readable_scn_upper_limit_; + ASSERT_EQ(OB_SUCCESS, guard.ls_recovery_stat_->reset_inner_readable_scn()); + ASSERT_EQ(OB_NEED_RETRY, guard.ls_recovery_stat_->set_inner_readable_scn(config_version,readable_scn, true)); + ASSERT_EQ(OB_SUCCESS, guard.ls_recovery_stat_->set_inner_readable_scn(config_version, readable_scn, false)); + + } +} +} // namespace share +} // namespace oceanbase + +int main(int argc, char **argv) +{ + oceanbase::unittest::init_log_and_gtest(argc, argv); + OB_LOGGER.set_log_level("INFO"); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/logservice/ob_reconfig_checker_adapter.cpp b/src/logservice/ob_reconfig_checker_adapter.cpp index 58b094e9d..de828ff6e 100644 --- a/src/logservice/ob_reconfig_checker_adapter.cpp +++ b/src/logservice/ob_reconfig_checker_adapter.cpp @@ -21,7 +21,7 @@ namespace oceanbase namespace logservice { -ObReconfigCheckerAdapter::ObReconfigCheckerAdapter() +ObReconfigCheckerAdapter::ObReconfigCheckerAdapter() : guard_() { tenant_id_ = OB_INVALID_TENANT_ID; ls_id_.reset(); @@ -43,8 +43,8 @@ int ObReconfigCheckerAdapter::init(const uint64_t tenant_id, tenant_id_ = tenant_id; ls_id_ = ls_id; timeout_ = timeout; + ret = guard_.init(tenant_id, ls_id, timeout); } - return ret; } @@ -71,6 +71,10 @@ int ObReconfigCheckerAdapter::check_can_add_member(const ObAddr &server, } } while (OB_FAIL(ret) && ret != OB_TIMEOUT); + if (OB_SUCC(ret)) { + ret = guard_.check_can_add_member(server, timeout_us); + } + return ret; } @@ -78,9 +82,7 @@ int ObReconfigCheckerAdapter::check_can_change_memberlist(const ObMemberList &ne const int64_t paxos_replica_num, const int64_t timeout_us) { - int ret = OB_SUCCESS; - UNUSEDx(new_member_list, paxos_replica_num, timeout_us); - return ret; + return guard_.check_can_change_member(new_member_list, paxos_replica_num, timeout_us); } } // end namespace logservice diff --git a/src/logservice/ob_reconfig_checker_adapter.h b/src/logservice/ob_reconfig_checker_adapter.h index 6a2e7f8ff..0b09b12d3 100644 --- a/src/logservice/ob_reconfig_checker_adapter.h +++ b/src/logservice/ob_reconfig_checker_adapter.h @@ -39,6 +39,7 @@ private: uint64_t tenant_id_; share::ObLSID ls_id_; int64_t timeout_; + rootserver::ObLSRecoveryGuard guard_; }; } // logservice diff --git a/src/logservice/palf/log_meta_info.h b/src/logservice/palf/log_meta_info.h index 356ee161f..0a88b7dc2 100644 --- a/src/logservice/palf/log_meta_info.h +++ b/src/logservice/palf/log_meta_info.h @@ -81,6 +81,7 @@ public: LogConfigVersion(); ~LogConfigVersion(); void operator=(const LogConfigVersion &config_version); + static const int64_t CONFIG_VERSION_LEN = 128; public: int generate(const int64_t proposal_id, const int64_t config_seq); diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 56dfd3434..8c9f34973 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -102,6 +102,7 @@ #include "storage/shared_storage/ob_disk_space_manager.h" #endif #include "storage/column_store/ob_column_store_replica_util.h" +#include "rootserver/ob_ls_recovery_stat_handler.h"//get_all_replica_min_readable_scn namespace oceanbase { @@ -3675,6 +3676,8 @@ int ObService::init_tenant_config( return OB_SUCCESS; } +ERRSIM_POINT_DEF(ERRSIM_GET_LS_READABLE_SCN_ERROR); +ERRSIM_POINT_DEF(ERRSIM_GET_LS_READABLE_SCN_OLD); int ObService::get_ls_replayed_scn( const ObGetLSReplayedScnArg &arg, ObGetLSReplayedScnRes &result) @@ -3692,6 +3695,9 @@ int ObService::get_ls_replayed_scn( LOG_WARN("arg is invaild", KR(ret), K(arg)); } else if (arg.get_tenant_id() != MTL_ID() && OB_FAIL(guard.switch_to(arg.get_tenant_id()))) { LOG_WARN("switch tenant failed", KR(ret), K(arg)); + } else if (ERRSIM_GET_LS_READABLE_SCN_ERROR) { + ret = ERRSIM_GET_LS_READABLE_SCN_ERROR; + LOG_WARN("failed to get ls replica readable scn for errsim", KR(ret), K(arg)); } if (OB_SUCC(ret)) { ObLSService *ls_svr = MTL(ObLSService*); @@ -3709,10 +3715,21 @@ int ObService::get_ls_replayed_scn( } else if (OB_FAIL(ls->get_max_decided_scn(cur_readable_scn))) { LOG_WARN("failed to get_max_decided_scn", KR(ret), K(arg), KPC(ls)); } else if (arg.is_all_replica()) { - if (OB_FAIL(ls->get_all_replica_min_readable_scn(cur_readable_scn))) { - LOG_WARN("failed to get all replica readable scn", KR(ret)); + if (OB_ISNULL(ls->get_ls_recovery_stat_handler())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get ls recovery stat", KR(ret), K(arg)); + } else if (OB_FAIL(ls->get_ls_recovery_stat_handler() + ->get_all_replica_min_readable_scn(cur_readable_scn))) { + LOG_WARN("failed to get all replica min readable_scn", KR(ret), K(arg)); } } + if (OB_SUCC(ret) && ERRSIM_GET_LS_READABLE_SCN_OLD) { + const int64_t current_time = ObTimeUtility::current_time() - + GCONF.internal_sql_execute_timeout; + cur_readable_scn.convert_from_ts(current_time); + LOG_WARN("set ls replica readble_scn small", K(arg), K(cur_readable_scn), + K(current_time)); + } if (FAILEDx(ls->get_offline_scn(offline_scn))) { LOG_WARN("failed to get offline scn", KR(ret), K(arg), KPC(ls)); } else if (OB_FAIL(result.init(arg.get_tenant_id(), arg.get_ls_id(), diff --git a/src/observer/virtual_table/ob_all_virtual_log_stat.cpp b/src/observer/virtual_table/ob_all_virtual_log_stat.cpp index 875e6e53b..cccc2d42a 100644 --- a/src/observer/virtual_table/ob_all_virtual_log_stat.cpp +++ b/src/observer/virtual_table/ob_all_virtual_log_stat.cpp @@ -138,7 +138,8 @@ int ObAllVirtualPalfStat::insert_log_stat_(const logservice::ObLogStat &log_stat break; } case OB_APP_MIN_COLUMN_ID + 6: { - if (0 >= palf_stat.config_version_.to_string(config_version_buf_, VARCHAR_128)) { + if (0 >= palf_stat.config_version_.to_string(config_version_buf_, + palf::LogConfigVersion::CONFIG_VERSION_LEN)) { SERVER_LOG(WARN, "config_version_ to_string failed", K(ret), K(palf_stat)); } else { cur_row_.cells_[i].set_varchar(ObString::make_string(config_version_buf_)); diff --git a/src/observer/virtual_table/ob_all_virtual_log_stat.h b/src/observer/virtual_table/ob_all_virtual_log_stat.h index 09479aa51..b9bee8d0d 100644 --- a/src/observer/virtual_table/ob_all_virtual_log_stat.h +++ b/src/observer/virtual_table/ob_all_virtual_log_stat.h @@ -17,6 +17,7 @@ #include "share/ob_scanner.h" #include "common/row/ob_row.h" #include "logservice/palf/palf_handle.h" +#include "logservice/palf/log_meta_info.h"//CONFIG_VERSION_LEN namespace oceanbase { @@ -42,14 +43,13 @@ private: private: static const int64_t VARCHAR_32 = 32; static const int64_t VARCHAR_64 = 64; - static const int64_t VARCHAR_128 = 128; char role_str_[VARCHAR_32] = {'\0'}; char access_mode_str_[VARCHAR_32] = {'\0'}; char ip_[common::OB_IP_PORT_STR_BUFF] = {'\0'}; ObSqlString member_list_buf_; char arbitration_member_buf_[MAX_SINGLE_MEMBER_LENGTH] = {'\0'}; char degraded_list_buf_[MAX_LEARNER_LIST_LENGTH] = {'\0'}; - char config_version_buf_[VARCHAR_128] = {'\0'}; + char config_version_buf_[palf::LogConfigVersion::CONFIG_VERSION_LEN] = {'\0'}; char replica_type_str_[VARCHAR_32] = {'\0'}; char learner_list_buf_[MAX_LEARNER_LIST_LENGTH] = {'\0'}; omt::ObMultiTenant *omt_; diff --git a/src/rootserver/ob_ls_recovery_reportor.cpp b/src/rootserver/ob_ls_recovery_reportor.cpp index 795df8041..2fb131a78 100755 --- a/src/rootserver/ob_ls_recovery_reportor.cpp +++ b/src/rootserver/ob_ls_recovery_reportor.cpp @@ -264,12 +264,12 @@ int ObLSRecoveryReportor::update_ls_recovery_stat_() } else if (OB_TMP_FAIL(ls->update_ls_replayable_point(replayable_scn))) { LOG_WARN("failed to update_ls_replayable_point", KR(tmp_ret), KPC(ls), K(replayable_scn)); } - if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_3_0_0) { - } else if (OB_TMP_FAIL(ls->gather_replica_readable_scn())) { + //兼容性和是否为leader的判断包在了接口中 + if (OB_TMP_FAIL(ls->gather_replica_readable_scn())) { if (OB_NOT_MASTER == tmp_ret) { - tmp_ret = OB_SUCCESS; } else { - LOG_WARN("failed to gather replica readable scn", KR(tmp_ret)); + const ObLSID ls_id = ls->get_ls_id(); + LOG_WARN("failed to gather replica readable scn", KR(tmp_ret), K(ls_id)); } } @@ -301,10 +301,20 @@ int ObLSRecoveryReportor::update_ls_recovery( int ret = OB_SUCCESS; ObLSRecoveryStat ls_recovery_stat; ObMySQLTransaction trans; + ObLSRecoveryGuard guard; const uint64_t exec_tenant_id = ObLSLifeIAgent::get_exec_tenant_id(tenant_id_); if (OB_ISNULL(ls) || OB_ISNULL(sql_proxy)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("ls or sql proxy is null", KR(ret), KP(ls), KP(sql_proxy)); + } else if (OB_FAIL(guard.init(tenant_id_, ls->get_ls_id()))) { + if (OB_EAGAIN != ret) { + LOG_WARN("failed to init ls recovery guard", KR(ret), K(tenant_id_), "ls_id", ls->get_ls_id()); + } else if (REACH_TENANT_TIME_INTERVAL(1 * 1000 * 1000)) { + //Reduce Exception Throwing + LOG_WARN("can not report recovery stat, maybe member_list change", KR(ret), K(tenant_id_), + "ls_id", ls->get_ls_id()); + ret = OB_SUCCESS; + } } else if (OB_FAIL(ls->get_ls_level_recovery_stat(ls_recovery_stat))) { if (OB_NOT_MASTER == ret) { LOG_TRACE("follower doesn't need to report ls recovery stat", KR(ret), KPC(ls)); @@ -329,7 +339,6 @@ int ObLSRecoveryReportor::update_ls_recovery( ret = OB_SUCC(ret) ? tmp_ret : ret; } } - if (ls_recovery_stat.is_valid()) { const int64_t PRINT_INTERVAL = 10 * 1000 * 1000L; if (REACH_TIME_INTERVAL(PRINT_INTERVAL)) { diff --git a/src/rootserver/ob_ls_recovery_stat_handler.cpp b/src/rootserver/ob_ls_recovery_stat_handler.cpp index c9f87d0f5..f05ddb478 100755 --- a/src/rootserver/ob_ls_recovery_stat_handler.cpp +++ b/src/rootserver/ob_ls_recovery_stat_handler.cpp @@ -38,10 +38,126 @@ int ObLSReplicaReadableSCN::init(const common::ObAddr &server, const share::SCN } return ret; } + +int ObLSRecoveryGuard::init(const uint64_t tenant_id, const share::ObLSID &ls_id, + const int64_t &timeout) +{ + int ret = OB_SUCCESS; + if (OB_NOT_NULL(ls_recovery_stat_) || is_valid_tenant_id(tenant_id_)) { + ret = OB_INIT_TWICE; + LOG_WARN("already init", KR(ret), K(tenant_id)); + } else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)) + || !ls_id.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id)); + } else if (skip_check_member_list_change_(tenant_id)) { + //meta and sys no transfer, no need report and wait readable_scn + //primary tenant no need check member list change + ls_recovery_stat_ = NULL; + tenant_id_ = tenant_id; + } else { + MTL_SWITCH(tenant_id) { + ObLSService *ls_svr = MTL(ObLSService *); + storage::ObLS *ls = NULL; + + if (OB_ISNULL(ls_svr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls service is null", KR(ret)); + } else if (OB_FAIL(ls_svr->get_ls(ls_id, ls_handle_, storage::ObLSGetMod::RS_MOD))) { + LOG_WARN("failed to get ls", KR(ret), K(ls_id)); + } else if (OB_ISNULL(ls = ls_handle_.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("ls is NULL", KR(ret), K(ls_handle_)); + } else { + ObLSRecoveryStatHandler* ls_recovery_stat = ls->get_ls_recovery_stat_handler(); + if (OB_ISNULL(ls_recovery_stat)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get ls recovery stat", KR(ret), K(tenant_id), + "ls_id", ls->get_ls_id()); + } else if (OB_FAIL(ls_recovery_stat->inc_ref(timeout))) { + LOG_WARN("failed to inc ref", KR(ret), K(timeout), K(tenant_id)); + } else { + ls_recovery_stat_ = ls_recovery_stat; + tenant_id_ = tenant_id; + } + } + } + } + return ret; +} + +ObLSRecoveryGuard::~ObLSRecoveryGuard() +{ + if (OB_ISNULL(ls_recovery_stat_)) { + //not init, nothing todo + } else { + ls_recovery_stat_->reset_add_replica_server(); + ls_recovery_stat_->dec_ref(); + LOG_TRACE("release ls recovery stat guard", K(tenant_id_), KPC(ls_recovery_stat_)); + ls_recovery_stat_ = NULL; + } +} + +bool ObLSRecoveryGuard::skip_check_member_list_change_(const uint64_t tenant_id) +{ + bool bret = false; + if (!is_user_tenant(tenant_id)) { + bret = true; + LOG_INFO("not user tenant, no need to check member list change", K(tenant_id)); + } else { + int ret = OB_SUCCESS;//use to MTL_SWITCH + MTL_SWITCH(tenant_id) { + if (MTL_TENANT_ROLE_CACHE_IS_PRIMARY()) { + bret = true; + LOG_INFO("is primary tenant, no need check readable_scn"); + } + } + } + return bret; +} + +int ObLSRecoveryGuard::check_can_add_member(const ObAddr &server, const int64_t timeout) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!server.is_valid() || 0 >= timeout)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(server), K(timeout)); + } else if (skip_check_member_list_change_(tenant_id_)) { + //if not user tenant, no need to check and add member + } else if (OB_ISNULL(ls_recovery_stat_)) { + ret = OB_NOT_INIT; + LOG_WARN("ls recovery stat is null, not init", KR(ret), K_(tenant_id)); + } else if (OB_FAIL(ls_recovery_stat_->set_add_replica_server(server))) { + LOG_WARN("failed to set add replica server", KR(ret), K(server)); + } else if (OB_FAIL(ls_recovery_stat_->wait_server_readable_scn(server, timeout))) { + LOG_WARN("failed to wait readable scn", KR(ret), K(server)); + } + return ret; +} + +int ObLSRecoveryGuard::check_can_change_member(const ObMemberList &new_member_list, + const int64_t paxos_replica_num, const int64_t timeout) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!new_member_list.is_valid() || 0 >= paxos_replica_num || 0 >= timeout)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(new_member_list), K(paxos_replica_num), K(timeout)); + } else if (skip_check_member_list_change_(tenant_id_)) { + //if not user tenant, no need to check and add member + } else if (OB_ISNULL(ls_recovery_stat_)) { + ret = OB_NOT_INIT; + LOG_WARN("ls recovery stat is null, not init", KR(ret), K_(tenant_id)); + } else if (OB_FAIL(ls_recovery_stat_->wait_can_change_member_list(new_member_list, + paxos_replica_num, timeout))) { + LOG_WARN("failed to check can change member", KR(ret), K(new_member_list), K(paxos_replica_num), K(timeout)); + } + return ret; +} + int ObLSRecoveryStatHandler::init(const uint64_t tenant_id, ObLS *ls) { int ret = OB_SUCCESS; - + reset(true); if (is_inited_) { ret = OB_INIT_TWICE; LOG_WARN("ObLSRecoveryStatHandler init twice", KR(ret), K_(is_inited)); @@ -51,30 +167,135 @@ int ObLSRecoveryStatHandler::init(const uint64_t tenant_id, ObLS *ls) } else { ls_ = ls; tenant_id_ = tenant_id; + SpinWLockGuard guard(lock_); + last_dump_ts_ = ObTimeUtility::current_time(); is_inited_ = true; - replicas_scn_.set_tenant_id(tenant_id); - replicas_scn_.set_label("LSReadableSCN"); LOG_INFO("ObLSRecoveryStatHandler init success", K(this)); } - return ret; } -void ObLSRecoveryStatHandler::reset() +void ObLSRecoveryStatHandler::reset(const bool is_init) { is_inited_ = false; ls_ = NULL; tenant_id_ = OB_INVALID_TENANT_ID; SpinWLockGuard guard(lock_); - readable_scn_in_inner_.reset(); + readable_scn_upper_limit_.reset(); config_version_in_inner_.reset(); extra_server_.reset(); config_version_.reset(); replicas_scn_.reset(); + last_dump_ts_ = OB_INVALID_TIMESTAMP; + if (is_init) { + ATOMIC_SET(&ref_cnt_, 0); + } else if (0 != ATOMIC_LOAD(&ref_cnt_)) { + LOG_ERROR_RET(OB_ERR_UNEXPECTED, "has reference", K(ref_cnt_)); + ref_cond_.signal(); + } } -int ObLSRecoveryStatHandler::check_inner_stat_() +void ObLSRecoveryStatHandler::dec_ref() { + ATOMIC_CAS(&ref_cnt_, 1, 0); + ref_cond_.signal(); +} + +int ObLSRecoveryStatHandler::inc_ref(const int64_t timeout) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("check inner stat error", KR(ret)); + } else { + int64_t curr_timeout = timeout; + const int64_t TIME_WAIT = 100 * 1000; + int tmp_ret = OB_SUCCESS; + ret = OB_EAGAIN; + do { + if (0 == ATOMIC_CAS(&ref_cnt_, 0, 1)) { + ret = OB_SUCCESS; + } else if (curr_timeout > 0) { + LOG_INFO("wait for inc ref", K(curr_timeout), K(timeout)); + if (OB_TMP_FAIL(ref_cond_.timedwait(TIME_WAIT))) { + LOG_WARN("failed to timedwait", KR(ret), KR(tmp_ret)); + } + curr_timeout -= TIME_WAIT; + } + } while (curr_timeout > 0 && OB_EAGAIN == ret); + } + return ret; +} + +int ObLSRecoveryStatHandler::set_inner_readable_scn(const palf::LogConfigVersion &config_version, + const share::SCN &readable_scn, bool check_inner_config_valid) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("check inner stat error", KR(ret)); + } else if (OB_UNLIKELY(!config_version.is_valid() || !readable_scn.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(config_version), K(readable_scn)); + } else { + SpinWLockGuard guard(lock_); + if (check_inner_config_valid && (!config_version_in_inner_.is_valid() + || config_version_in_inner_ != config_version)) { + //如果需要校验config_version,只能校验config_version相等 + //不能无脑推高config_version_in_inner_,这个值要严格和内部表保持一致 + //如果本地统计的config_version大于config_version_in_inner_ + //readable_scn_upper_limit_也没必要推高,总是要先把config_version更新成功后,才会更新内部表成功 + ret = OB_NEED_RETRY; + LOG_WARN("config version in inner is invalid, can not update upper limit readable_scn", + KR(ret), K(check_inner_config_valid), K(config_version_in_inner_)); + } else if (config_version_in_inner_.is_valid() + && config_version_in_inner_ > config_version) { + //内存中的config_version不会回退,但是内存中存储的readable_scn_upper_limit可能会回退, + //可能会由于统计不到某些副本导致可读点回退,但是内存不回退,但是也不报错 + ret = OB_NEED_RETRY; + LOG_WARN("config version not match or readable_scn fallback", KR(ret), K(config_version), + K(config_version_in_inner_), K(readable_scn_upper_limit_), K(readable_scn)); + } else { + config_version_in_inner_ = config_version; + const int64_t PRINT_INTERVAL = 1 * 1000 * 1000; + if (readable_scn_upper_limit_ > readable_scn && REACH_TENANT_TIME_INTERVAL(PRINT_INTERVAL)) { + const ObLSID ls_id = ls_->get_ls_id(); + LOG_INFO("readable scn fallback", K(ls_id), K(readable_scn_upper_limit_), K(readable_scn)); + } + readable_scn_upper_limit_ = SCN::max(readable_scn_upper_limit_, readable_scn); + } + } + return ret; +} +int ObLSRecoveryStatHandler::reset_inner_readable_scn() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("check inner stat error", KR(ret)); + } else { + SpinWLockGuard guard(lock_); + config_version_in_inner_.reset(); + readable_scn_upper_limit_.reset(); + } + return ret; +} + +int ObLSRecoveryStatHandler::wait_can_change_member_list( + const ObMemberList &new_member_list, const int64_t paxos_replica_num, + const int64_t timeout) +{ + int ret = OB_SUCCESS; + const int64_t now = ObTimeUtility::current_time(); + if (OB_UNLIKELY(!new_member_list.is_valid() || 0 >= paxos_replica_num || 0 >= timeout)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(new_member_list), K(paxos_replica_num), K(timeout)); + } else if (OB_FAIL(wait_func_with_timeout_(timeout, new_member_list, paxos_replica_num))) { + LOG_WARN("failed to wait func with timeout", KR(ret), K(new_member_list), K(paxos_replica_num), K(timeout)); + } + LOG_INFO("finish wait for change member_list", KR(ret), K(new_member_list), K(paxos_replica_num), + K(timeout), "cost", ObTimeUtility::current_time() - now); + return ret; +} + +int ObLSRecoveryStatHandler::check_inner_stat_() { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; @@ -85,7 +306,6 @@ int ObLSRecoveryStatHandler::check_inner_stat_() } return ret; } - int ObLSRecoveryStatHandler::get_ls_replica_readable_scn(share::SCN &readable_scn) { int ret = OB_SUCCESS; @@ -111,23 +331,19 @@ int ObLSRecoveryStatHandler::get_ls_replica_readable_scn(share::SCN &readable_sc return ret; } - int ObLSRecoveryStatHandler::get_all_replica_min_readable_scn(share::SCN &readable_scn) { int ret = OB_SUCCESS; palf::PalfStat palf_stat_first; palf::PalfStat palf_stat_second; readable_scn = SCN::max_scn(); - logservice::ObLogService *ls_svr = MTL(logservice::ObLogService*); int64_t first_proposal_id = 0; ObRole first_role; if (OB_FAIL(check_inner_stat_())) { LOG_WARN("inner stat error", KR(ret), K_(is_inited)); } else if (OB_FAIL(get_latest_palf_stat_(palf_stat_first))) { LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls)); - } else if (OB_FAIL(ls_svr->get_palf_role(ls_->get_ls_id(), first_role, first_proposal_id))) { - LOG_WARN("failed to get first role", KR(ret), K(ls_->get_ls_id()), KP(ls_svr), KPC_(ls)); - } else if (!is_strong_leader(first_role)) { + } else if (!is_strong_leader(palf_stat_first.role_)) { ret = OB_NOT_MASTER; LOG_WARN("not master, need retry", KR(ret), K(ls_->get_ls_id())); } else { @@ -159,17 +375,80 @@ int ObLSRecoveryStatHandler::get_all_replica_min_readable_scn(share::SCN &readab } //TODO maybe need consider readable scn in inner table ObLSID ls_id = ls_->get_ls_id(); - LOG_INFO("all ls readable scn", K(ls_id), K(readable_scn), K(replicas_scn_)); + LOG_INFO("all ls readable scn", KR(ret), K(ls_id), K(readable_scn), K(replicas_scn_), + "member_list", paxos_member_list, "replicas config_version", config_version_, + "current_config_version", palf_stat_first.config_version_); } if (FAILEDx(get_latest_palf_stat_(palf_stat_second))) { LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls)); } else if (palf_stat_first.config_version_ != palf_stat_second.config_version_) { - ret = OB_EAGAIN; + ret = OB_NEED_RETRY; LOG_WARN("config_version changed, try again", KR(ret), K(palf_stat_first), K(palf_stat_second)); } return ret; } +int ObLSRecoveryStatHandler::wait_server_readable_scn( + const common::ObAddr &server, const int64_t timeout) +{ + int ret = OB_SUCCESS; + int64_t now = ObTimeUtility::current_time(); + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("inner stat error", KR(ret)); + } else if (OB_UNLIKELY(!server.is_valid() || 0 >= timeout)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(server), K(timeout)); + } else { + if (OB_FAIL(wait_func_with_timeout_(timeout, server))) { + LOG_WARN("failed wait func with timeout", KR(ret), K(server), K(timeout)); + } + } + LOG_INFO("finish wait for add member", KR(ret), K(server), + K(timeout), "cost", ObTimeUtility::current_time() - now); + + return ret; +} + +int ObLSRecoveryStatHandler::check_member_change_valid_( + const common::ObAddr &server, bool &is_valid) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("inner stat error", KR(ret)); + } else if (OB_UNLIKELY(!server.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(server)); + } else { + SCN readable_scn; + is_valid = false; + const int64_t TIME_WAIT = 100 * 1000; + bool found = false; + SpinRLockGuard guard(lock_); + for (int64_t i = 0; i < replicas_scn_.count() && OB_SUCC(ret) && !found; + ++i) { + if (replicas_scn_.at(i).get_server() == server) { + found = true; + readable_scn = replicas_scn_.at(i).get_readable_scn(); + if (readable_scn >= readable_scn_upper_limit_) { + is_valid = true; + LOG_INFO("can change member list", K(server), K(readable_scn), + K(readable_scn_upper_limit_), "ls_id", ls_->get_ls_id()); + } else if (REACH_TENANT_TIME_INTERVAL(TIME_WAIT)) { + LOG_INFO("server readable scn is not larger enough", K(server), + K(readable_scn), K(readable_scn_upper_limit_), "ls_id", ls_->get_ls_id()); + } + } + } // end for + if (OB_SUCC(ret) && !found && REACH_TENANT_TIME_INTERVAL(TIME_WAIT)) { + is_valid = false; + LOG_INFO("cannot find server readable scn", K(server), K(replicas_scn_), + K(readable_scn_upper_limit_), "ls_id", ls_->get_ls_id()); + } + } + return ret; +} + + int ObLSRecoveryStatHandler::increase_ls_replica_readable_scn_(SCN &readable_scn) { int ret = OB_SUCCESS; @@ -251,7 +530,7 @@ int ObLSRecoveryStatHandler::get_ls_level_recovery_stat(ObLSRecoveryStat &ls_rec LOG_WARN("failed to get lastest palf stat", KR(ret)); } else if (palf_stat_first.config_version_ != palf_stat_second.config_version_ || !is_strong_leader(palf_stat_second.role_)) { - ret = OB_EAGAIN; + ret = OB_NEED_RETRY; LOG_INFO("role changed, try again", KR(ret), K(palf_stat_first), K(palf_stat_second)); } @@ -262,7 +541,9 @@ int ObLSRecoveryStatHandler::set_add_replica_server( const common::ObAddr &server) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!server.is_valid())) { + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("inner stat error", KR(ret), K_(is_inited)); + } else if (OB_UNLIKELY(!server.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("server is invalid", KR(ret), K(server)); } else { @@ -273,6 +554,13 @@ int ObLSRecoveryStatHandler::set_add_replica_server( return ret; } + +void ObLSRecoveryStatHandler::reset_add_replica_server() +{ + SpinWLockGuard guard(lock_); + extra_server_.reset(); +} + int ObLSRecoveryStatHandler::do_get_ls_level_readable_scn_(SCN &read_scn) { int ret = OB_SUCCESS; @@ -346,15 +634,12 @@ int ObLSRecoveryStatHandler::construct_new_member_list_( return ret; } -int ObLSRecoveryStatHandler::try_reload_and_fix_config_version_(const palf::LogConfigVersion ¤t_version) +int ObLSRecoveryStatHandler::try_reload_and_fix_config_version_( + const palf::LogConfigVersion ¤t_version) { int ret = OB_SUCCESS; bool need_update = false; share::SCN readable_scn; - const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_); - uint64_t tenant_data_version = 0; - ObLSRecoveryStatOperator op; - ObLSID ls_id; if (OB_FAIL(check_inner_stat_())) { LOG_WARN("inner stat error", KR(ret)); } else if (OB_UNLIKELY(!current_version.is_valid())) { @@ -363,35 +648,33 @@ int ObLSRecoveryStatHandler::try_reload_and_fix_config_version_(const palf::LogC } else if (OB_ISNULL(GCTX.sql_proxy_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("sql proxy is null", KR(ret)); - } else if (FALSE_IT(ls_id = ls_->get_ls_id())) { - //can not be there - } else if (OB_FAIL(GET_MIN_DATA_VERSION(meta_tenant_id, tenant_data_version))) { - LOG_WARN("failed to get min data version", KR(ret), K(tenant_id_), K(meta_tenant_id)); - } else if (tenant_data_version < MOCK_DATA_VERSION_4_2_4_0) { - //内部表config_version的汇报最开始是在4300版本上提交的 - //后面patch到424版本上,由于是在4300分支的第一个版本号提交 - //所以版本号判断直接小于等于424即可 - need_update = false; - LOG_INFO("not ready to load and update config version", KR(ret), K(tenant_data_version)); } else { SpinRLockGuard guard(lock_); - if (current_version != config_version_in_inner_) { + if (config_version_in_inner_.is_valid() && config_version_in_inner_ > current_version) { + ret = OB_NEED_RETRY; + LOG_WARN("config version is fallback", KR(ret), K(current_version), K(config_version_in_inner_)); + } else if (current_version == config_version_in_inner_) { + need_update = false; + LOG_DEBUG("config version not change", KR(ret), K(current_version)); + } else { need_update = true; FLOG_INFO("config version not match, need update", - K(config_version_in_inner_), K(current_version), K(ls_id)); + K(config_version_in_inner_), K(current_version), "ls_id", + ls_->get_ls_id()); } } if (OB_SUCC(ret) && need_update) { + ObLSRecoveryStatOperator op; + ObLSID ls_id = ls_->get_ls_id(); + int tmp_ret = OB_SUCCESS; if (OB_FAIL(op.update_ls_config_version(tenant_id_, ls_id, current_version, *GCTX.sql_proxy_, readable_scn))) { LOG_WARN("failed to update ls config version", KR(ret), K(tenant_id_), K(ls_id), K(current_version)); - //set invalid config version - SpinWLockGuard guard(lock_); - config_version_in_inner_.reset(); - } else { - SpinWLockGuard guard(lock_); - readable_scn_in_inner_ = readable_scn; - config_version_in_inner_ = current_version; + if (OB_TMP_FAIL(reset_inner_readable_scn())) { + LOG_ERROR("failed to reset config version", KR(ret), KR(tmp_ret)); + } + } else if (OB_FAIL(set_inner_readable_scn(current_version, readable_scn, false))) { + LOG_ERROR("failed to set readable_scn", KR(ret), K(current_version), K(readable_scn)); } } return ret; @@ -443,27 +726,40 @@ int ObLSRecoveryStatHandler::get_latest_palf_stat_( return ret; } -int ObLSRecoveryStatHandler::gather_replica_readable_scn() +int ObLSRecoveryStatHandler::check_can_use_new_version_(bool &is_valid_use) { int ret = OB_SUCCESS; - ObArray replicas_scn; - palf::PalfStat palf_stat_first; - palf::PalfStat palf_stat_second; - ObArray addr_list; + is_valid_use = false; + const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_); + uint64_t tenant_data_version = 0; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("inner stat error", KR(ret)); + } else if (OB_FAIL(GET_MIN_DATA_VERSION(meta_tenant_id, tenant_data_version))) { + LOG_WARN("failed to get min data version", KR(ret), K(tenant_id_), K(meta_tenant_id)); + } else if (tenant_data_version < MOCK_DATA_VERSION_4_2_4_0) { + //由于config_version在430版本已经存在了,所以兼容性判断为小于等于4240不支持 + is_valid_use = false; + LOG_INFO("not ready to to use new version", KR(ret), K(tenant_data_version)); + } else { + is_valid_use = true; + } + return ret; +} +int ObLSRecoveryStatHandler::construct_addr_list_( + const palf::PalfStat &palf_stat, + ObIArray &addr_list) +{ + int ret = OB_SUCCESS; + addr_list.reset(); if (OB_FAIL(check_inner_stat_())) { LOG_WARN("inner stat error", KR(ret), K_(is_inited)); - } else if (OB_FAIL(get_latest_palf_stat_(palf_stat_first))) { - LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls)); - } else if (!is_strong_leader(palf_stat_first.role_)) { - ret = OB_NOT_MASTER; - LOG_TRACE("not leader", KR(ret), K(palf_stat_first)); - } else if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_3_0_0) { - ret = OB_NEED_WAIT; - LOG_WARN("not ready to gather replica readable scn", KR(ret)); + } else if (OB_UNLIKELY(!palf_stat.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("palf stat is invalid", KR(ret), K(palf_stat)); } else { common::ObMember member; - common::ObMemberList &member_list = palf_stat_first.paxos_member_list_; + const common::ObMemberList &member_list = palf_stat.paxos_member_list_; for (int64_t i = 0; OB_SUCC(ret) && i < member_list.get_member_number(); ++i) { if (OB_FAIL(member_list.get_member_by_index(i, member))) { LOG_WARN("failed to get member by index", KR(ret), K(i)); @@ -478,34 +774,116 @@ int ObLSRecoveryStatHandler::gather_replica_readable_scn() } } } - if (FAILEDx(do_get_each_replica_readable_scn_(addr_list, replicas_scn))) { + return ret; +} + +int ObLSRecoveryStatHandler::dump_all_replica_readable_scn_(const bool force_dump) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("inner stat error", KR(ret), K_(is_inited)); + } else { + ObLSID ls_id = ls_->get_ls_id(); + SpinRLockGuard guard(lock_); + const int64_t PRINT_INTERVAL = 5 * 1000 * 1000L; + const int64_t now = ObTimeUtility::current_time(); + if (force_dump || now - last_dump_ts_ > PRINT_INTERVAL) { + LOG_INFO("ls readable scn in memory", K(ls_id), K(replicas_scn_), + K(last_dump_ts_), K(force_dump)); + last_dump_ts_ = now; + } else { + LOG_TRACE("ls readable scn in memory", KR(ret), K(ls_id), + K(readable_scn_upper_limit_), K(replicas_scn_)); + } + } + return ret; +} + +int ObLSRecoveryStatHandler::check_member_change_valid_( + const ObMemberList &new_member_list, const int64_t paxos_replica_num, bool &is_valid) +{ + int ret = OB_SUCCESS; + SCN readable_scn; + palf::PalfStat palf_stat; + ObArray addr_list; + is_valid = false; + if (OB_UNLIKELY(!new_member_list.is_valid() || 0 >= paxos_replica_num)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(new_member_list), K(paxos_replica_num)); + } else if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("inner stat error", KR(ret), K_(is_inited)); + } else if (OB_FAIL(get_palf_stat_(palf_stat))) { + LOG_WARN("failed to get palf stat", KR(ret)); + } else if (OB_FAIL(new_member_list.get_addr_array(addr_list))) { + LOG_WARN("failed to get addr array", KR(ret), K(new_member_list)); + } else if (OB_FAIL(do_get_majority_readable_scn_V2_(addr_list, + rootserver::majority(paxos_replica_num), palf_stat.config_version_, readable_scn))) { + LOG_WARN("failed to get majority readable scn", KR(ret), K(addr_list), + K(paxos_replica_num), K(palf_stat)); + } else { + SpinRLockGuard guard(lock_); + if (readable_scn >= readable_scn_upper_limit_) { + is_valid = true; + LOG_INFO("can change member list", K(new_member_list), K(paxos_replica_num), + "new readable_scn", readable_scn, "current readable_scn", readable_scn_upper_limit_, + "ls_id", ls_->get_ls_id(), K(palf_stat)); + } else if (REACH_TENANT_TIME_INTERVAL(1 * 1000 * 1000L)) { + LOG_INFO("can not change member list", K(new_member_list), K(paxos_replica_num), + K(readable_scn), K(readable_scn_upper_limit_), + "ls_id", ls_->get_ls_id(), K(palf_stat)); + } + } + return ret; +} + +int ObLSRecoveryStatHandler::gather_replica_readable_scn() +{ + int ret = OB_SUCCESS; + ObArray replicas_scn; + palf::PalfStat palf_stat_first; + palf::PalfStat palf_stat_second; + ObArray addr_list; + int tmp_ret = OB_SUCCESS; + bool is_valid_use = false; + + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("inner stat error", KR(ret), K_(is_inited)); + } else if (OB_FAIL(get_latest_palf_stat_(palf_stat_first))) { + LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls)); + } else if (!is_strong_leader(palf_stat_first.role_)) { + ret = OB_NOT_MASTER; + LOG_TRACE("not leader", KR(ret), K(palf_stat_first)); + } else if (OB_FAIL(check_can_use_new_version_(is_valid_use))) { + LOG_WARN("failed to check can use new version", KR(ret)); + } else if (!is_valid_use) { + ret = OB_NEED_RETRY; + LOG_WARN("not valid to use new version", KR(ret)); + } else if (OB_FAIL(construct_addr_list_(palf_stat_first, addr_list))) { + LOG_WARN("failed to construct addr list", KR(ret), K(palf_stat_first)); + } else if (OB_FAIL(do_get_each_replica_readable_scn_(addr_list, replicas_scn))) { LOG_WARN("failed to get each replica readable", KR(ret), K(addr_list)); } else if (OB_FAIL(get_palf_stat_(palf_stat_second))) { LOG_WARN("failed to get palf stat", KR(ret)); } else if (palf_stat_second.config_version_ != palf_stat_first.config_version_) { - ret = OB_EAGAIN; + ret = OB_NEED_RETRY; LOG_WARN("config version change", KR(ret), K(palf_stat_second), K(palf_stat_first)); } else { SpinWLockGuard guard(lock_); - ObLSID ls_id = ls_->get_ls_id(); config_version_ = palf_stat_second.config_version_; - replicas_scn_.reset(); if (OB_FAIL(replicas_scn_.assign(replicas_scn))) { LOG_WARN("failed to replicas scn", KR(ret), K(replicas_scn)); } - const int64_t PRINT_INTERVAL = 1 * 1000 * 1000L; - if (REACH_TENANT_TIME_INTERVAL(PRINT_INTERVAL)) { - LOG_INFO("ls readable scn in memory", KR(ret), K(ls_id), K(replicas_scn_)); - } else { - LOG_TRACE("ls readable scn in memory", KR(ret), K(ls_id), K(replicas_scn_)); - } } - if (is_strong_leader(palf_stat_second.role_)) { - //优先把正确的config_version更新到内部表和内存中 - int tmp_ret = OB_SUCCESS; + if (is_strong_leader(palf_stat_second.role_) && is_valid_use) { if (OB_TMP_FAIL(try_reload_and_fix_config_version_(palf_stat_second.config_version_))) { ret = OB_SUCC(ret) ? tmp_ret : ret; - LOG_WARN("failed to try reload and fix config version", KR(tmp_ret), KR(ret), K(palf_stat_second)); + LOG_WARN("failed to try reload and fix config version", + KR(tmp_ret), KR(ret), K(palf_stat_second)); + } + } + if (OB_NOT_MASTER != ret) { + if (OB_TMP_FAIL(dump_all_replica_readable_scn_(OB_FAIL(ret)))) { + LOG_WARN("failed to dump replica readable scn", KR(ret), KR(tmp_ret)); } } return ret; @@ -521,6 +899,9 @@ int ObLSRecoveryStatHandler::do_get_each_replica_readable_scn_( } else if (OB_UNLIKELY(0 >= ob_member_list.count())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(ob_member_list)); + } else if (OB_ISNULL(GCTX.srv_rpc_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rpc proxy is null", KR(ret)); } else { obrpc::ObGetLSReplayedScnArg arg; ObGetLSReplayedScnProxy proxy( @@ -529,6 +910,7 @@ int ObLSRecoveryStatHandler::do_get_each_replica_readable_scn_( int tmp_ret = OB_SUCCESS; ObArray return_code_array; ObLSReplicaReadableSCN replica_scn; + int group_id = share::OBCG_DBA_COMMAND; if (OB_FAIL(arg.init(tenant_id_, ls_->get_ls_id(), false))) { LOG_WARN("failed to init arg", KR(ret), K_(tenant_id), KPC_(ls)); } else if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) { @@ -536,18 +918,10 @@ int ObLSRecoveryStatHandler::do_get_each_replica_readable_scn_( } for (int64_t i = 0; OB_SUCC(ret) && i < ob_member_list.count(); ++i) { const ObAddr &member = ob_member_list.at(i); - if (GCTX.self_addr() == member) { - SCN self_readable_scn; - if (OB_FAIL(ls_->get_max_decided_scn(self_readable_scn))) { - LOG_WARN("failed to get max decide scn", KR(ret)); - } else if (OB_FAIL(replica_scn.init(member, self_readable_scn))) { - LOG_WARN("failed to init replica scn", KR(ret), K(member), K(self_readable_scn)); - } else if (OB_FAIL(replicas_scn.push_back(replica_scn))) { - LOG_WARN("failed to push back replica scn", KR(ret), K(replica_scn)); - } - } else if (OB_TMP_FAIL(proxy.call(member, ctx.get_timeout(), tenant_id_, arg))) { - LOG_WARN("failed to send rpc", KR(ret), K(member), K(i), K(ctx), - K_(tenant_id), K(arg), K(ob_member_list)); + if (OB_TMP_FAIL(proxy.call(member, ctx.get_timeout(), + GCONF.cluster_id, tenant_id_, group_id, arg))) { + LOG_WARN("failed to send rpc", KR(ret), KR(tmp_ret), K(member), K(i), K(ctx), + K_(tenant_id), K(arg), K(ob_member_list), K(group_id)); } } if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { @@ -587,6 +961,7 @@ int ObLSRecoveryStatHandler::get_majority_readable_scn_( palf::PalfStat palf_stat_second; ObArray member_list_new; int64_t paxos_replica_number_new = 0; + bool is_valid_to_use = false; if (OB_FAIL(check_inner_stat_())) { LOG_WARN("inner stat error", KR(ret), K_(is_inited)); @@ -601,13 +976,20 @@ int ObLSRecoveryStatHandler::get_majority_readable_scn_( member_list_new, paxos_replica_number_new))) { LOG_WARN("construct_new_member_list failed", KR(ret), KPC_(ls), K(palf_stat_first)); - } else if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_3_0_0) { + } else if (OB_FAIL(check_can_use_new_version_(is_valid_to_use))) { + LOG_WARN("failed to check can use new version", KR(ret)); + } else if (is_valid_to_use) { //use readable in memory if (OB_FAIL(do_get_majority_readable_scn_V2_(member_list_new, rootserver::majority(paxos_replica_number_new), palf_stat_first.config_version_, majority_min_readable_scn))) { LOG_WARN("failed to get majority readable scn", KR(ret), K(palf_stat_first), K(paxos_replica_number_new), K(member_list_new)); + } else if (OB_FAIL(set_inner_readable_scn(palf_stat_first.config_version_, + majority_min_readable_scn, true))) { + //尝试更新内存中的可读点 + LOG_WARN("failed to set inner readable scn", KR(ret), K(palf_stat_first), + K(majority_min_readable_scn)); } } else if (OB_FAIL(do_get_majority_readable_scn_(member_list_new, leader_readable_scn, rootserver::majority(paxos_replica_number_new), majority_min_readable_scn))) { @@ -617,7 +999,7 @@ int ObLSRecoveryStatHandler::get_majority_readable_scn_( if (FAILEDx(get_latest_palf_stat_(palf_stat_second))) { LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls)); } else if (palf_stat_first.config_version_ != palf_stat_second.config_version_) { - ret = OB_EAGAIN; + ret = OB_NEED_RETRY; LOG_WARN("config_version changed, try again", KR(ret), K(palf_stat_first), K(palf_stat_second)); } @@ -738,12 +1120,12 @@ int ObLSRecoveryStatHandler::do_get_majority_readable_scn_V2_( if (OB_FAIL(check_inner_stat_())) { LOG_WARN("inner stat error", KR(ret), K_(is_inited)); - } else if ( ob_member_list.count() <= 0 + } else if (ob_member_list.count() <= 0 || 0 >= need_query_member_cnt || !config_version.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(need_query_member_cnt), - K(ob_member_list), K(config_version)); + K(ob_member_list), K(config_version)); } else { SpinRLockGuard guard(lock_); if (config_version_ != config_version) { @@ -752,8 +1134,8 @@ int ObLSRecoveryStatHandler::do_get_majority_readable_scn_V2_( K(config_version_)); } for (int64_t i = 0; OB_SUCC(ret) && i < replicas_scn_.count(); ++i) { - ObAddr &server = replicas_scn_.at(i).get_server(); - SCN readable_scn= replicas_scn_.at(i).get_readable_scn(); + const ObAddr &server = replicas_scn_.at(i).get_server(); + const SCN &readable_scn = replicas_scn_.at(i).get_readable_scn(); if (has_exist_in_array(ob_member_list, server)) { if (OB_FAIL(replica_readble_scn.push_back(readable_scn))) { LOG_WARN("failed to push back", KR(ret), K(i), K(server), K(readable_scn)); @@ -762,7 +1144,7 @@ int ObLSRecoveryStatHandler::do_get_majority_readable_scn_V2_( } } if (FAILEDx(do_calc_majority_min_readable_scn_(need_query_member_cnt, - replica_readble_scn, majority_min_readable_scn))) { + replica_readble_scn, majority_min_readable_scn))) { LOG_WARN("failed to calc majority readable scn", KR(ret), K(need_query_member_cnt), K(replica_readble_scn)); } @@ -800,7 +1182,7 @@ int ObLSRecoveryStatHandler::calc_majority_min_readable_scn_( if (OB_SUCCESS != tmp_ret) { LOG_WARN("send rpc is failed", KR(tmp_ret), K(i), K(return_code_array)); } else { - const auto *result = proxy.get_results().at(i); + const ObGetLSReplayedScnRes *result = proxy.get_results().at(i); if (OB_ISNULL(result)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("result is null", KR(ret), K(i), K(return_code_array)); diff --git a/src/rootserver/ob_ls_recovery_stat_handler.h b/src/rootserver/ob_ls_recovery_stat_handler.h index a628dafae..a94d67bf4 100644 --- a/src/rootserver/ob_ls_recovery_stat_handler.h +++ b/src/rootserver/ob_ls_recovery_stat_handler.h @@ -24,11 +24,13 @@ namespace oceanbase namespace storage { -class ObLS; +class ObLSHandle; } namespace rootserver { +class ObLSRecoveryStatHandler; +class TestLSRecoveryGuard; struct ObLSReplicaReadableSCN { public: @@ -36,20 +38,77 @@ public: ~ObLSReplicaReadableSCN() {} int init(const common::ObAddr &server, const share::SCN &readable_scn); - share::SCN get_readable_scn()const + share::SCN get_readable_scn() const { return readable_scn_; } - common::ObAddr &get_server() + common::ObAddr get_server() const { return server_; } - TO_STRING_KV(K_(server), K_(readable_scn)); + TO_STRING_KV(K_(server), K_(readable_scn)); private: common::ObAddr server_; share::SCN readable_scn_; }; +class ObLSRecoveryGuard +{ + //To implement a mutex for ls_recovery_stat reporting and member_list changes, + //it needs to be used on the server that is the leader of the LS for this tenant. + //Successful initialization indicates that the mutex has been acquired successfully. + //This interface can also be invoked by meta and sys tenants without errors and without mutual exclusion. + //These two types of tenants do not report on ls_recovery_stat; the interface is used solely to standardize member changes. + // The detructor of the class will clear the reference count and reset any set information +public: + ObLSRecoveryGuard() : + tenant_id_(OB_INVALID_TENANT_ID), ls_handle_(), ls_recovery_stat_(NULL) {} + ~ObLSRecoveryGuard(); + /** + * @description: + * In this interface, implement mutual exclusion, + * which will wait for the reference count to drop to zero + * within the timeout period, and then increment the reference count. + * meta and sys only set tenant_id, nothing todo + * + * @param[in] tenant_id: tenant_id + * @param[in] ls_id: use to get ls_recovery_stat from ObLS + * @param[in] timeout: Timeout period, with a default value of -1, which indicates no waiting. + * The reference count will be incremented if possible without waiting in case of failure. + * The system will attempt to increase the reference count successfully within the timeout period. + * @return : OB_SUCCESS: Reference increment successful. + * OB_EAGAIN:Wait for reference increment to fail within the timeout period due to other reference points not being successfully released. + */ + + int init(const uint64_t tenant_id, const share::ObLSID &ls_id, + const int64_t &timeout = -1); + //TODO 判断是否可以加减成员, meta or sys directly return success + /* + * @description: Used to check whether the readable_SCN of this member has surpassed the reported readable_SCN, + * for the purpose of judging member changes and the addition of replicas during timeout. + * @param[in] server: target server + * @param[in] timeout: Timeout period + * */ + int check_can_add_member(const ObAddr &server, const int64_t timeout); + /* + * @description: Within the timeout period, determine whether the new member list can ensure that the readable_SCN does not regress. + * @param[in] new_member_list : new_member_list + * @param[in] paxos_replica_num : paxos_replica_num of new member_list + * @param[in] timeout: Timeout period + * **/ + int check_can_change_member(const ObMemberList &new_member_list, + const int64_t paxos_replica_num, + const int64_t timeout); + friend class TestLSRecoveryGuard; +private: + bool skip_check_member_list_change_(const uint64_t tenant_id); +private: + + uint64_t tenant_id_; + ObLSHandle ls_handle_; + ObLSRecoveryStatHandler *ls_recovery_stat_; +}; + /** * @description: * ObLSRecoveryStatHandler exists on the LS of each observer and is responsible for @@ -58,10 +117,10 @@ private: class ObLSRecoveryStatHandler { public: - ObLSRecoveryStatHandler() { reset(); } - ~ObLSRecoveryStatHandler() { reset(); } + ObLSRecoveryStatHandler() { reset(true); } + ~ObLSRecoveryStatHandler() { reset(false); } + void reset(const bool is_init = false); int init(const uint64_t tenant_id, ObLS *ls); - void reset(); /** * @description: * get ls readable_scn considering readable scn, sync scn and replayable scn. @@ -69,15 +128,6 @@ public: * @return return code */ int get_ls_replica_readable_scn(share::SCN &readable_scn); - - /* - * @description: - * Get the readable_scn of other replicas - * @param[out] readable_scn ls readable_scn - * @return return code - * */ - int get_all_replica_min_readable_scn(share::SCN &readable_scn); - /** * @description: * get ls level recovery_stat by LS leader. @@ -88,13 +138,65 @@ public: int get_ls_level_recovery_stat(share::ObLSRecoveryStat &ls_recovery_stat); int set_add_replica_server(const common::ObAddr &server); + void reset_add_replica_server(); /* * @description: * get all ls replica readable and set to replicas_scn_; */ int gather_replica_readable_scn(); + /* + * @description: + * Add a reference, which may be used for reporting or member list changes. + * @param[in] timeout : Add a reference within the timeout range; if unsuccessful, wait for 100ms. + * */ + int inc_ref(const int64_t timeout); + /* + * @description: + * Subtract a reference. + * */ + void dec_ref(); + /* + * @description: + * Set a readable_scn of inner_table in memory and use config_version for verification. + * */ + int set_inner_readable_scn(const palf::LogConfigVersion &config_version, + const share::SCN &readable_scn, bool check_inner_config_valid); + /* + * @description: + * clear readable_scn and config_version of inner_table in memory; + * */ + int reset_inner_readable_scn(); - TO_STRING_KV(K_(tenant_id), K_(ls)); + /* + * @description: get ls all paxos replica min readable_scn + * @param[out] min readable_scn of all paxos replica + * @return: + * OB_NOT_MASTER : replica not master, can not get readable_scn of other replica + * OB_NEED_RETRY : If there's no readable scn with all replicas; + * the config_version corresponding to the current cached readable scn does not match the latest config_version; + * or the config_version has changed during the statistical process. + * */ + int get_all_replica_min_readable_scn(share::SCN &readable_scn); + /* + * @description: Used to check whether the readable_SCN of this member has surpassed the reported readable_SCN, + * for the purpose of judging member changes and the addition of replicas during timeout. + * @param[in] server: target server + * @param[in] timeout: Timeout period + * @return: + */ + int wait_server_readable_scn(const common::ObAddr &server, const int64_t timeout); + /* + * @description: Within the timeout period, determine whether the new member list can ensure that the readable_SCN does not regress. + * @param[in] new_member_list : new_member_list + * @param[in] paxos_replica_num : paxos_replica_num of new member_list + * @param[in] timeout: Timeout period + * @return: + */ + int wait_can_change_member_list(const ObMemberList &new_member_list, + const int64_t paxos_replica_num, const int64_t timeout); + TO_STRING_KV(K_(tenant_id), K_(ls), K(ref_cnt_)); + friend class TestLSRecoveryGuard; + friend class ObLSRecoveryGuard; private: int check_inner_stat_(); @@ -159,31 +261,83 @@ private: const int64_t majority_cnt, ObArray &readable_scn_list, share::SCN &majority_min_readable_scn); - int construct_new_member_list_( const common::ObMemberList &member_list_ori, const common::GlobalLearnerList °raded_list, const int64_t paxos_replica_number_ori, ObIArray &member_list_new, int64_t &paxos_replica_number_new); + int check_can_use_new_version_(bool &vaild_to_use); + int construct_addr_list_(const palf::PalfStat &palf_stat, + ObIArray &addr_list); + int dump_all_replica_readable_scn_(const bool force_dump); int try_reload_and_fix_config_version_(const palf::LogConfigVersion ¤t_version); + template + int wait_func_with_timeout_(const int64_t timeout, Args &&... args); + int check_member_change_valid_(const common::ObAddr &server, bool &is_valid); + int check_member_change_valid_(const ObMemberList &new_member_list, + const int64_t paxos_replica_num, bool &is_valid); DISALLOW_COPY_AND_ASSIGN(ObLSRecoveryStatHandler); private: bool is_inited_; uint64_t tenant_id_; ObLS *ls_; + ObCond ref_cond_;//用于加锁等待唤醒 + int64_t ref_cnt_;//use for Concurrency control common::SpinRWLock lock_; - share::SCN readable_scn_in_inner_;//readable_scn of inner_table + //存储内部表和统计出来的最大值 + //只在config_version_in_inner_合法的时候有效 + share::SCN readable_scn_upper_limit_;//the max readable_scn of inner_table and memory palf::LogConfigVersion config_version_in_inner_;//config_version in inner_table common::ObAddr extra_server_;//for add replica, need to gather add_replica's readable_scn - palf::LogConfigVersion config_version_; - ObArray replicas_scn_; + int64_t last_dump_ts_;//用于记录上次打印内存可读点的时间戳 + palf::LogConfigVersion config_version_;//记录统计可读点replicas_scn_使用的config_version + //成员列表里面最多只有OB_MAX_MEMBER_NUMBER,这个时候可能触发迁移,所以需要增加一个成员 + ObSEArray replicas_scn_;//缓存每个副本的可读位点 }; +template +inline int ObLSRecoveryStatHandler::wait_func_with_timeout_( + const int64_t timeout, Args &&...args) +{ + int ret = OB_SUCCESS; + bool is_valid_use = false; + if (OB_UNLIKELY(timeout <= 0)) { + ret = OB_INVALID_ARGUMENT; + RS_LOG(WARN, "invalid argument", KR(ret), K(timeout)); + } else if (OB_FAIL(check_can_use_new_version_(is_valid_use))) { + RS_LOG(WARN, "failed to check use new version", KR(ret)); + } else if (!is_valid_use) { + RS_LOG(INFO, "can not use readable in memory, no need to check", K(timeout)); + } else { + int64_t current_timeout = timeout; + const int64_t TIME_WAIT = 100 * 1000; + bool is_finish = false; + do { + if (OB_FAIL(check_member_change_valid_(std::forward(args)..., is_finish))) { + RS_LOG(WARN, "failed to check", KR(ret)); + } else if (current_timeout > 0) { + usleep(TIME_WAIT); + current_timeout -= TIME_WAIT; + } else if (OB_SUCC(ret)) { + ret = OB_TIMEOUT; + RS_LOG(WARN, "failed to wait server readable scn", KR(ret), K(timeout)); + } + } while (current_timeout > 0 && OB_SUCC(ret) && !is_finish); + } + if (OB_FAIL(ret)) { + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(dump_all_replica_readable_scn_(true))) { + RS_LOG(WARN, "failed to dump all replica readable scn", KR(ret), KR(tmp_ret)); + } + } + return ret; } + +} // namespace rootserver } #endif // OCEANBASE_STORAGE_OB_LS_RECOVERY_STAT_HANDLER diff --git a/src/rootserver/ob_ls_service_helper.cpp b/src/rootserver/ob_ls_service_helper.cpp index 3ce7ce8c6..7ca3f9ceb 100755 --- a/src/rootserver/ob_ls_service_helper.cpp +++ b/src/rootserver/ob_ls_service_helper.cpp @@ -1521,6 +1521,7 @@ int ObTenantLSInfo::get_next_primary_zone( } return ret; } + int ObLSServiceHelper::check_transfer_task_replay(const uint64_t tenant_id, const share::ObLSID &src_ls, const share::ObLSID &dest_ls, @@ -1532,12 +1533,19 @@ int ObLSServiceHelper::check_transfer_task_replay(const uint64_t tenant_id, share::ObLSStatusInfo ls_status; SCN readble_scn; replay_finish = true; + uint64_t data_version = 0; if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !src_ls.is_valid() || !dest_ls.is_valid() || !transfer_scn.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(src_ls), K(dest_ls), K(transfer_scn)); + } else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) { + LOG_WARN("failed to get min data version", KR(ret), K(tenant_id)); + } else if (MOCK_DATA_VERSION_4_2_4_0 > data_version) { + //高版本是从4.3.0开始支持,兼容性不需要改变 + replay_finish = true; + LOG_INFO("no need check all ls replica", K(tenant_id), K(data_version)); } else if (OB_FAIL(check_ls_transfer_replay_(tenant_id, src_ls, transfer_scn, replay_finish))) { LOG_WARN("failed to check ls transfer replay", KR(ret), K(tenant_id), K(src_ls), K(transfer_scn)); } else if (!replay_finish) { @@ -1618,6 +1626,9 @@ int ObLSServiceHelper::get_ls_all_replica_readable_scn_(const uint64_t tenant_id K(timeout), K(arg)); } else if (OB_FAIL(proxy.wait())) { LOG_WARN("failed to get wait all rpc", KR(ret), K(tenant_id), K(ls_id), K(leader)); + } else if (1 != proxy.get_results().count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result count not expected", KR(ret), "result", proxy.get_results()); } else if (OB_ISNULL(proxy.get_results().at(0))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("result is null", KR(ret), K(tenant_id), K(leader), K(ls_id)); diff --git a/src/rootserver/standby/ob_recovery_ls_service.cpp b/src/rootserver/standby/ob_recovery_ls_service.cpp index 1fc372479..4a10749c9 100755 --- a/src/rootserver/standby/ob_recovery_ls_service.cpp +++ b/src/rootserver/standby/ob_recovery_ls_service.cpp @@ -488,6 +488,7 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC commit_log.get_multi_source_data(); const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_); ObMySQLTransaction trans; + ObLSRecoveryGuard guard; for (int64_t i = 0; OB_SUCC(ret) && i < source_data.count(); ++i) { const ObTxBufferNode &node = source_data.at(i); if (OB_FAIL(try_cancel_clone_job_for_standby_tenant_(node))) { @@ -534,6 +535,9 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC * 这个日志流已经变成了NORMAL状态,所以可能会导致tenant_info的sync_scn小于系统日志流的sync_scn,这里增加一个校验 * 是为了防止这种情况 * */ + } else if (OB_FAIL(guard.init(tenant_id_, SYS_LS))) { + LOG_WARN("failed to init guard", KR(ret), K(tenant_id_)); + } else if (OB_FAIL(report_sys_ls_recovery_stat_in_trans_(sync_scn, false, trans, "report recovery stat and has multi data source", true/*need_check_sync_scn*/))) { LOG_WARN("failed to report sys ls recovery stat", KR(ret), K(sync_scn)); @@ -680,6 +684,27 @@ int ObRecoveryLSService::process_upgrade_log_( return ret; } +int ObRecoveryLSService::get_ls_(storage::ObLSHandle &ls_handle, storage::ObLS *&ls) +{ + int ret = OB_SUCCESS; + ObLSService *ls_svr = MTL(ObLSService *); + ls = NULL; + + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret), K(inited_)); + } else if (OB_ISNULL(ls_svr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls service is null", KR(ret)); + } else if (OB_FAIL(ls_svr->get_ls(SYS_LS, ls_handle, storage::ObLSGetMod::RS_MOD))) { + LOG_WARN("failed to get ls", KR(ret)); + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("ls is NULL", KR(ret), K(ls_handle)); + } + return ret; +} + int ObRecoveryLSService::process_upgrade_data_version_log_( const share::SCN &sync_scn, const ObTxBufferNode &node, @@ -839,21 +864,17 @@ int ObRecoveryLSService::construct_sys_ls_recovery_stat_based_on_sync_scn_( { int ret = OB_SUCCESS; ls_stat.reset(); - ObLSService *ls_svr = MTL(ObLSService *); - ObLSHandle ls_handle; - ObLS *ls = NULL; ObLSRecoveryStat tmp_ls_stat; SCN readable_scn; + storage::ObLSHandle ls_handle; + storage::ObLS *ls = NULL; if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret), K(inited_)); - } else if (OB_ISNULL(ls_svr)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls service is null", KR(ret)); - } else if (OB_FAIL(ls_svr->get_ls(SYS_LS, ls_handle, storage::ObLSGetMod::RS_MOD))) { - LOG_WARN("failed to get ls", KR(ret)); - } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + } else if (OB_FAIL(get_ls_(ls_handle, ls))) { + LOG_WARN("failed to get sys ls", KR(ret)); + } else if (OB_ISNULL(ls)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("ls is NULL", KR(ret), K(ls_handle)); } else if (OB_FAIL(ls->get_ls_level_recovery_stat(tmp_ls_stat))) { @@ -1318,17 +1339,12 @@ int ObRecoveryLSService::try_do_ls_balance_task_( LOG_WARN("failed to do ls alter task", KR(ret), K(ls_balance_task)); } } else if (ls_balance_task.get_task_op().is_transfer_end()) { - can_remove = true; - bool is_replay_finish = true; if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay( tenant_id_, ls_balance_task.get_src_ls(), ls_balance_task.get_dest_ls(), ls_balance_task.get_operation_scn(), - is_replay_finish))) { + can_remove))) { LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_), K(ls_balance_task), K(tenant_info)); - } else if (!is_replay_finish) { - ret = OB_NEED_RETRY; - LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task)); } } else if (ls_balance_task.get_task_op().is_transfer_begin()) { if (OB_FAIL(check_transfer_begin_can_remove_(ls_balance_task, tenant_info, can_remove))) { @@ -1349,10 +1365,10 @@ int ObRecoveryLSService::try_do_ls_balance_task_( LOG_INFO("task can be remove", KR(ret), K(ls_balance_task)); ROOTSERVICE_EVENT_ADD("standby_tenant", "remove_balance_task", K_(tenant_id), "task_type", ls_balance_task.get_task_op(), - "task_scn", ls_balance_task.get_operation_scn(), + "task_scn", ls_balance_task.get_operation_scn().get_val_for_inner_table_field(), "switchover_status", tenant_info.get_switchover_status(), - "src_ls", ls_balance_task.get_src_ls(), - "dest_ls", ls_balance_task.get_dest_ls()); + "src_ls", ls_balance_task.get_src_ls().id(), + "dest_ls", ls_balance_task.get_dest_ls().id()); } END_TRANSACTION(trans) } @@ -1380,7 +1396,6 @@ int ObRecoveryLSService::check_transfer_begin_can_remove_( //find transfer end, or tenant is in flashback ObBalanceTaskHelper transfer_end_task; SCN transfer_scn; - bool is_replay_finish = false; ret = ObBalanceTaskHelperTableOperator::try_find_transfer_end(tenant_id_, ls_balance_task.get_operation_scn(), ls_balance_task.get_src_ls(), ls_balance_task.get_dest_ls(), *proxy_, transfer_end_task); @@ -1409,17 +1424,19 @@ KR(ret), K(tenant_id_), K(tenant_info), K(ls_balance_task)); LOG_WARN("can not find transfer end task, can not end transfer begin task", KR(ret), K(tenant_info), K(ls_balance_task)); ret = OB_SUCCESS; } - if (OB_FAIL(ret) || !can_remove) { - } else if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay( - tenant_id_, ls_balance_task.get_src_ls(), - ls_balance_task.get_dest_ls(), transfer_scn, is_replay_finish))) { - LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_), K(ls_balance_task), - K(tenant_info), K(transfer_scn)); - } else if (!is_replay_finish) { - ret = OB_NEED_RETRY; - LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task), K(transfer_scn)); + if (OB_SUCC(ret) && can_remove) { + if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay( + tenant_id_, ls_balance_task.get_src_ls(), + ls_balance_task.get_dest_ls(), transfer_scn, can_remove))) { + LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_), + K(ls_balance_task), K(transfer_scn)); + } else if (can_remove) { + FLOG_INFO("ls all replica replay to newest, can remove", K(ls_balance_task)); + } else if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) { + // 10s + LOG_WARN("can not remove ls balance task helper", K(ls_balance_task), K(tenant_info)); + } } - } return ret; } diff --git a/src/rootserver/standby/ob_recovery_ls_service.h b/src/rootserver/standby/ob_recovery_ls_service.h index aa2321049..60dffdce3 100755 --- a/src/rootserver/standby/ob_recovery_ls_service.h +++ b/src/rootserver/standby/ob_recovery_ls_service.h @@ -35,6 +35,7 @@ class ObMySQLTransaction; namespace transaction { class ObTxBufferNode; +class ObTxLogBlock; } namespace share { @@ -49,15 +50,16 @@ class ObMultiVersionSchemaService; class ObTenantSchema; } } +namespace storage +{ +class ObLS; +class ObLSHandle; +} namespace logservice { class ObLogHandler; class ObGCLSLog; } -namespace transaction -{ -class ObTxLogBlock; -} namespace palf { class PalfHandleGuard; @@ -143,6 +145,7 @@ private: bool &can_remove); int do_ls_balance_alter_task_(const share::ObBalanceTaskHelper &ls_balance_task, common::ObMySQLTransaction &trans); + int get_ls_(storage::ObLSHandle &ls_handle, storage::ObLS *&ls); int reset_restore_proxy_(ObRestoreSourceServiceAttr &service_attr); int get_ls_all_replica_readable_scn_(const share::ObLSID &ls_id, share::SCN &reabable_scn); #ifdef OB_BUILD_LOG_STORAGE_COMPRESS diff --git a/src/share/ls/ob_ls_recovery_stat_operator.cpp b/src/share/ls/ob_ls_recovery_stat_operator.cpp index 336c30794..8df7ff500 100644 --- a/src/share/ls/ob_ls_recovery_stat_operator.cpp +++ b/src/share/ls/ob_ls_recovery_stat_operator.cpp @@ -24,6 +24,7 @@ #include "share/ob_share_util.h" #include "share/ls/ob_ls_status_operator.h" #include "share/scn.h" +#include "logservice/palf/log_meta_info.h"//LogConfigVersion using namespace oceanbase; using namespace oceanbase::common; @@ -536,11 +537,12 @@ int ObLSRecoveryStatOperator::update_ls_config_version( common::ObSqlString sql; ObString config_version_str; ObArenaAllocator allocator("VersionStr"); - char config_version_val[128] = {0}; + const int64_t CONFIG_LEN = palf::LogConfigVersion::CONFIG_VERSION_LEN + 1;//128 + \0 + char config_version_val[CONFIG_LEN] = {0}; if (OB_FAIL(type_to_hex_str(config_version, allocator, config_version_str))) { LOG_WARN("failed to type to hex", KR(ret), K(config_version)); - } else if (0 > config_version.to_string(config_version_val, 128)) { + } else if (0 > config_version.to_string(config_version_val, CONFIG_LEN)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("config_version to string failed", KR(ret), K(config_version)); } else if (OB_FAIL(sql.assign_fmt("UPDATE %s SET config_version = '%s', bconfig_version = '%.*s' " @@ -569,7 +571,8 @@ int ObLSRecoveryStatOperator::update_ls_config_version( int ObLSRecoveryStatOperator::get_min_create_scn_( const uint64_t tenant_id, const common::ObSqlString &sql, - ObISQLClient &client, SCN &min_create_scn) { + ObISQLClient &client, SCN &min_create_scn) +{ int ret = OB_SUCCESS; min_create_scn = SCN::base_scn(); if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { diff --git a/src/share/ls/ob_ls_recovery_stat_operator.h b/src/share/ls/ob_ls_recovery_stat_operator.h index 7aeccd05e..a6811ea69 100644 --- a/src/share/ls/ob_ls_recovery_stat_operator.h +++ b/src/share/ls/ob_ls_recovery_stat_operator.h @@ -267,9 +267,13 @@ public: * description: update ls config_version and return current readable_scn * @param[in] tenant_id * @param[in] ls_id - * @param[in] config_version :will not fallback + * @param[in] config_version : If the new and old config_version are equal, no update is performed, and it directly returns success. + * If the new config_version is greater than the old config_version, update it to the internal table. + * If the new config_version is less than the old config_version, report an error. * @param[in] client * @param[out] readable scn + * return: OB_SUCCESS: new and current config version maybe equal, or new bigger than current + * OB_NEED_RETRY: new config_version smaller than current */ int update_ls_config_version(const uint64_t tenant_id, const ObLSID &ls_id, const palf::LogConfigVersion &config_version, diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index c94d7bed5..98dcb63d8 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -7424,6 +7424,8 @@ bool ObGetLSReplayedScnRes::is_valid() const return OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid() && cur_readable_scn_.is_valid_and_not_min(); + //no need check offline_scn,offline_scn可能没有并且有升级兼容性问题 + //no need check server valid } int ObGetLSReplayedScnRes::init( const uint64_t tenant_id, @@ -7437,7 +7439,7 @@ int ObGetLSReplayedScnRes::init( || !ls_id.is_valid() || !cur_readable_scn.is_valid_and_not_min() || !server.is_valid())) { - //不用校验offline_scn,可能就是一个非法的 + //不用校验offline_scn,可能就是一个非法的 ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(cur_readable_scn), K(server)); } else { diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index bc65170fd..8d930fcc3 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -8648,12 +8648,12 @@ public: ObGetLSReplayedScnRes(): tenant_id_(OB_INVALID_TENANT_ID), ls_id_(), cur_readable_scn_(share::SCN::min_scn()), - offline_scn_(), - self_addr_() {} + offline_scn_(), self_addr_() {} ~ObGetLSReplayedScnRes() {} bool is_valid() const; - int init(const uint64_t tenant_id, const share::ObLSID &ls_id, const share::SCN &cur_readable_scn, - const share::SCN &offline_scn, const common::ObAddr &server); + int init(const uint64_t tenant_id, const share::ObLSID &ls_id, + const share::SCN &cur_readable_scn, + const share::SCN &offline_scn, const common::ObAddr &server); int assign(const ObGetLSReplayedScnRes &other); TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(cur_readable_scn), K_(offline_scn), K(self_addr_)); uint64_t get_tenant_id() const @@ -8684,7 +8684,7 @@ private: share::ObLSID ls_id_; share::SCN cur_readable_scn_; share::SCN offline_scn_;//add in 4.2.2.0 - common::ObAddr self_addr_;//add in 4.3.0 + common::ObAddr self_addr_;//add in 4.2.3/4.3.0 }; diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index 0c118be01..3a751db1e 100644 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -732,11 +732,6 @@ public: //int gather_replica_readable_scn(); DELEGATE_WITH_RET(ls_recovery_stat_handler_, gather_replica_readable_scn, int); - // get all ls readable_scn: it will be failed while has replica is offline - // @param[out] readable_scn ls readable_scn - // int get_all_replica_min_readable_scn(share::SCN &readable_scn) - DELEGATE_WITH_RET(ls_recovery_stat_handler_, get_all_replica_min_readable_scn, int); - // disable clog sync. // with ls read lock and log write lock. int disable_sync();