[CP] Add replica into blacklist when migrating

This commit is contained in:
Naynahs
2023-10-12 12:09:44 +00:00
committed by ob-robot
parent 8535879c8f
commit 0fe674397d
3 changed files with 14 additions and 12 deletions

View File

@ -14,8 +14,8 @@
#include "share/ob_thread_mgr.h" // set_thread_name #include "share/ob_thread_mgr.h" // set_thread_name
#include "observer/ob_server_struct.h" // for GCTX #include "observer/ob_server_struct.h" // for GCTX
#include "deps/oblib/src/common/ob_role.h" // role #include "deps/oblib/src/common/ob_role.h" // role
#include "src/storage/tx/wrs/ob_weak_read_util.h" // ObWeakReadUtil #include "storage/tx/wrs/ob_weak_read_util.h" // ObWeakReadUtil
#include "src/storage/tx/ob_ts_mgr.h" #include "storage/tx/ob_ts_mgr.h"
namespace oceanbase namespace oceanbase
{ {
@ -252,7 +252,9 @@ int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result)
} else { } else {
max_stale_time = get_tenant_max_stale_time_(bl_key.get_tenant_id()); max_stale_time = get_tenant_max_stale_time_(bl_key.get_tenant_id());
int64_t max_stale_time_ns = max_stale_time * 1000; int64_t max_stale_time_ns = max_stale_time * 1000;
if (gts_scn.get_val_for_gts() > ls_info.weak_read_scn_ + max_stale_time_ns || ls_info.tx_blocked_) { if (gts_scn.get_val_for_gts() > ls_info.weak_read_scn_ + max_stale_time_ns
|| ls_info.tx_blocked_
|| ls_info.migrate_status_ != ObMigrationStatus::OB_MIGRATION_STATUS_NONE) {
// scn is out-of-time,add this log stream into blacklist // scn is out-of-time,add this log stream into blacklist
if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info))) { if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info))) {
TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", K(bl_key), K(ls_info)); TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", K(bl_key), K(ls_info));
@ -324,7 +326,7 @@ int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey
ObLSID ls_id(id); ObLSID ls_id(id);
common::ObAddr server; common::ObAddr server;
ObMigrateStatus migrate_status = ObMigrateStatus(migrate_status_int); ObMigrationStatus migrate_status = ObMigrationStatus(migrate_status_int);
if (false == server.set_ip_addr(ip, static_cast<uint32_t>(port))) { if (false == server.set_ip_addr(ip, static_cast<uint32_t>(port))) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;

View File

@ -16,7 +16,7 @@
#include "common/ob_queue_thread.h" // ObCond #include "common/ob_queue_thread.h" // ObCond
#include "common/ob_role.h" // ObRole #include "common/ob_role.h" // ObRole
#include "storage/tx/ob_trans_define.h" // ObLSID, LinkHashNode #include "storage/tx/ob_trans_define.h" // ObLSID, LinkHashNode
#include "storage/ob_storage_struct.h" // ObMigrateStatus #include "storage/high_availability/ob_storage_ha_struct.h" // ObMigrateStatus
// 定期更新黑名单的时间间隔(us) // 定期更新黑名单的时间间隔(us)
#define BLACK_LIST_REFRESH_INTERVAL 3000000 // 3s #define BLACK_LIST_REFRESH_INTERVAL 3000000 // 3s
@ -128,13 +128,13 @@ public:
ObLsInfo() ObLsInfo()
: ls_state_(-1), : ls_state_(-1),
weak_read_scn_(0), weak_read_scn_(0),
migrate_status_(OB_MIGRATE_STATUS_MAX), migrate_status_(OB_MIGRATION_STATUS_MAX),
tx_blocked_(false) tx_blocked_(false)
{} {}
int init(const int64_t ls_state, int64_t weak_read_scn, ObMigrateStatus migrate_status, bool tx_blocked) int init(const int64_t ls_state, int64_t weak_read_scn, ObMigrationStatus migrate_status, bool tx_blocked)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_MIGRATE_STATUS_MAX == migrate_status) { if (OB_MIGRATION_STATUS_MAX == migrate_status) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
} else { } else {
ls_state_ = ls_state; ls_state_ = ls_state;
@ -147,7 +147,7 @@ public:
bool is_leader() const { return ls_state_ == 1; } bool is_leader() const { return ls_state_ == 1; }
bool is_valid() const bool is_valid() const
{ {
return OB_MIGRATE_STATUS_MAX != migrate_status_; return OB_MIGRATION_STATUS_MAX != migrate_status_;
} }
TO_STRING_KV(K_(ls_state), K_(weak_read_scn), K_(migrate_status), K_(tx_blocked)); TO_STRING_KV(K_(ls_state), K_(weak_read_scn), K_(migrate_status), K_(tx_blocked));
@ -156,7 +156,7 @@ public:
// 弱读时间戳,如果落后超过一定时间就要加入黑名单,单位ns // 弱读时间戳,如果落后超过一定时间就要加入黑名单,单位ns
int64_t weak_read_scn_; int64_t weak_read_scn_;
// 迁移状态,正在迁移的日志流一定不可读 // 迁移状态,正在迁移的日志流一定不可读
ObMigrateStatus migrate_status_; ObMigrationStatus migrate_status_;
// transaction ls blocked // transaction ls blocked
bool tx_blocked_; bool tx_blocked_;
}; };

View File

@ -56,7 +56,7 @@ TEST_F(TestObBlackList, black_list_init_invalid)
// update // update
ObLsInfo ls_info; ObLsInfo ls_info;
uint64_t curr_time = static_cast<uint64_t>(ObTimeUtility::current_time()); uint64_t curr_time = static_cast<uint64_t>(ObTimeUtility::current_time());
EXPECT_EQ(OB_SUCCESS, ls_info.init(0, curr_time, OB_MIGRATE_STATUS_NONE, 0)); EXPECT_EQ(OB_SUCCESS, ls_info.init(0, curr_time, OB_MIGRATION_STATUS_NONE, 0));
EXPECT_EQ(OB_SUCCESS, bl_service.ls_bl_mgr_.update(key, ls_info)); EXPECT_EQ(OB_SUCCESS, bl_service.ls_bl_mgr_.update(key, ls_info));
// check // check
@ -241,7 +241,7 @@ TEST_F(TestObBlackList, black_list_parallel_2)
int rand = 0; int rand = 0;
ObBLKey key; ObBLKey key;
ObLsInfo ls_info; ObLsInfo ls_info;
EXPECT_EQ(OB_SUCCESS, ls_info.init(0, 1, OB_MIGRATE_STATUS_NONE, 0)); EXPECT_EQ(OB_SUCCESS, ls_info.init(0, 1, OB_MIGRATION_STATUS_NONE, 0));
std::srand((unsigned)std::time(NULL)); std::srand((unsigned)std::time(NULL));
for (int i = 1; i <= loop_cnt; i++) { for (int i = 1; i <= loop_cnt; i++) {
rand = std::rand() % 1000 + 1; rand = std::rand() % 1000 + 1;