Add blocked ls into blacklist for RTO
This commit is contained in:
@ -193,11 +193,15 @@ int ObAllVirtualLSInfo::process_curr_tenant(ObNewRow *&row)
|
||||
// tablet_change_checkpoint_scn
|
||||
cur_row_.cells_[i].set_uint64(!ls_info.tablet_change_checkpoint_scn_.is_valid() ? 0 : ls_info.tablet_change_checkpoint_scn_.get_val_for_inner_table_field());
|
||||
break;
|
||||
default:
|
||||
case OB_APP_MIN_COLUMN_ID + 15:
|
||||
// transfer_scn
|
||||
cur_row_.cells_[i].set_uint64(!ls_info.transfer_scn_.is_valid() ? 0 : ls_info.transfer_scn_.get_val_for_inner_table_field());
|
||||
break;
|
||||
case OB_APP_MIN_COLUMN_ID + 16:
|
||||
// tx blocked
|
||||
cur_row_.cells_[i].set_int(ls_info.tx_blocked_);
|
||||
break;
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
SERVER_LOG(WARN, "invalid col_id", K(ret), K(col_id));
|
||||
break;
|
||||
|
||||
@ -9521,6 +9521,21 @@ int ObInnerTableSchema::all_virtual_ls_info_schema(ObTableSchema &table_schema)
|
||||
false, //is_nullable
|
||||
false); //is_autoincrement
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ADD_COLUMN_SCHEMA("tx_blocked", //column_name
|
||||
++column_id, //column_id
|
||||
0, //rowkey_id
|
||||
0, //index_id
|
||||
0, //part_key_pos
|
||||
ObUInt64Type, //column_type
|
||||
CS_TYPE_INVALID, //column_collation_type
|
||||
sizeof(uint64_t), //column_length
|
||||
-1, //column_precision
|
||||
-1, //column_scale
|
||||
false, //is_nullable
|
||||
false); //is_autoincrement
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
table_schema.get_part_option().set_part_num(1);
|
||||
table_schema.set_part_level(PARTITION_LEVEL_ONE);
|
||||
|
||||
@ -10833,6 +10833,7 @@ def_table_schema(
|
||||
('rebuild_seq', 'int'),
|
||||
('tablet_change_checkpoint_scn', 'uint'),
|
||||
('transfer_scn', 'uint'),
|
||||
('tx_blocked', 'uint'),
|
||||
],
|
||||
partition_columns = ['svr_ip', 'svr_port'],
|
||||
vtable_route_policy = 'distributed',
|
||||
|
||||
@ -1218,6 +1218,7 @@ int ObLS::get_ls_info(ObLSVTInfo &ls_info)
|
||||
bool is_log_sync = false;
|
||||
bool is_need_rebuild = false;
|
||||
ObMigrationStatus migrate_status;
|
||||
bool tx_blocked = false;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ls is not inited", K(ret));
|
||||
@ -1228,6 +1229,8 @@ int ObLS::get_ls_info(ObLSVTInfo &ls_info)
|
||||
LOG_WARN("get ls need rebuild info failed", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(ls_meta_.get_migration_status(migrate_status))) {
|
||||
LOG_WARN("get ls migrate status failed", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(ls_tx_svr_.check_tx_blocked(tx_blocked))) {
|
||||
LOG_WARN("check tx ls state error", K(ret),KPC(this));
|
||||
} else {
|
||||
ls_info.ls_id_ = ls_meta_.ls_id_;
|
||||
ls_info.replica_type_ = ls_meta_.get_replica_type();
|
||||
@ -1241,6 +1244,7 @@ int ObLS::get_ls_info(ObLSVTInfo &ls_info)
|
||||
ls_info.rebuild_seq_ = ls_meta_.get_rebuild_seq();
|
||||
ls_info.tablet_change_checkpoint_scn_ = ls_meta_.get_tablet_change_checkpoint_scn();
|
||||
ls_info.transfer_scn_ = ls_meta_.get_transfer_scn();
|
||||
ls_info.tx_blocked_ = tx_blocked;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -93,6 +93,7 @@ struct ObLSVTInfo
|
||||
int64_t rebuild_seq_;
|
||||
share::SCN tablet_change_checkpoint_scn_;
|
||||
share::SCN transfer_scn_;
|
||||
bool tx_blocked_;
|
||||
};
|
||||
|
||||
// 诊断虚表统计信息
|
||||
|
||||
@ -743,6 +743,18 @@ int ObLSTxService::set_max_replay_commit_version(share::SCN commit_version)
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSTxService::check_tx_blocked(bool &tx_blocked) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(mgr_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", KR(ret), K_(ls_id));
|
||||
} else {
|
||||
tx_blocked = mgr_->is_blocked();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
} // transaction
|
||||
|
||||
} // oceanbase
|
||||
|
||||
@ -174,6 +174,9 @@ public:
|
||||
int check_in_leader_serving_state(bool& bool_ret);
|
||||
int set_max_replay_commit_version(share::SCN commit_version);
|
||||
transaction::ObTxRetainCtxMgr *get_retain_ctx_mgr();
|
||||
|
||||
// check tx ls blocked
|
||||
int check_tx_blocked(bool &tx_blocked) const;
|
||||
private:
|
||||
void reset_();
|
||||
|
||||
|
||||
@ -511,6 +511,9 @@ public:
|
||||
// check is master
|
||||
bool is_master() const { return is_master_(); }
|
||||
|
||||
// check is blocked
|
||||
bool is_blocked() const { return is_blocked_(); }
|
||||
|
||||
// Switch the prev_aggre_log_ts and aggre_log_ts during dump starts
|
||||
int refresh_aggre_rec_scn();
|
||||
|
||||
|
||||
@ -252,7 +252,7 @@ int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result)
|
||||
} else {
|
||||
max_stale_time = get_tenant_max_stale_time_(bl_key.get_tenant_id());
|
||||
int64_t max_stale_time_ns = max_stale_time * 1000;
|
||||
if (gts_scn.get_val_for_gts() > ls_info.weak_read_scn_ + max_stale_time_ns) {
|
||||
if (gts_scn.get_val_for_gts() > ls_info.weak_read_scn_ + max_stale_time_ns || ls_info.tx_blocked_) {
|
||||
// scn is out-of-time,add this log stream into blacklist
|
||||
if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info))) {
|
||||
TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", K(bl_key), K(ls_info));
|
||||
@ -295,7 +295,9 @@ int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey
|
||||
int64_t ls_role = -1;
|
||||
int64_t weak_read_scn = 0;
|
||||
int64_t migrate_status_int = -1;
|
||||
int64_t tx_blocked = 0;
|
||||
common::number::ObNumber weak_read_number;
|
||||
common::number::ObNumber tx_blocked_number;
|
||||
|
||||
(void)GET_COL_IGNORE_NULL(result.get_varchar, "svr_ip", ip);
|
||||
(void)GET_COL_IGNORE_NULL(result.get_int, "svr_port", port);
|
||||
@ -304,6 +306,7 @@ int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey
|
||||
(void)GET_COL_IGNORE_NULL(result.get_int, "role", ls_role);
|
||||
(void)GET_COL_IGNORE_NULL(result.get_number, "weak_read_scn", weak_read_number);
|
||||
(void)GET_COL_IGNORE_NULL(result.get_int, "migrate_status", migrate_status_int);
|
||||
(void)GET_COL_IGNORE_NULL(result.get_number, "tx_blocked", tx_blocked_number);
|
||||
|
||||
ObLSID ls_id(id);
|
||||
common::ObAddr server;
|
||||
@ -314,10 +317,12 @@ int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey
|
||||
TRANS_LOG(WARN, "invalid server address", K(ip), K(port));
|
||||
} else if (OB_FAIL(weak_read_number.cast_to_int64(weak_read_scn))) {
|
||||
TRANS_LOG(WARN, "failed to cast int", K(ret), K(weak_read_number));
|
||||
} else if (OB_FAIL(tx_blocked_number.cast_to_int64(tx_blocked))) {
|
||||
TRANS_LOG(WARN, "failed to cast int", K(ret), K(tx_blocked_number));
|
||||
} else if (OB_FAIL(bl_key.init(server, tenant_id, ls_id))) {
|
||||
TRANS_LOG(WARN, "bl_key init fail", K(server), K(tenant_id), K(ls_id));
|
||||
} else if (OB_FAIL(ls_info.init(ls_role, weak_read_scn, migrate_status))) {
|
||||
TRANS_LOG(WARN, "ls_info init fail", K(ls_role), K(weak_read_scn), K(migrate_status));
|
||||
} else if (OB_FAIL(ls_info.init(ls_role, weak_read_scn, migrate_status, tx_blocked))) {
|
||||
TRANS_LOG(WARN, "ls_info init fail", K(ls_role), K(weak_read_scn), K(migrate_status), K(tx_blocked));
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
@ -32,7 +32,7 @@
|
||||
// 查询 __all_virtual_ls_info 的语句,设置了2s超时时间
|
||||
#define BLACK_LIST_SELECT_LS_INFO_STMT \
|
||||
"select /*+query_timeout(2000000)*/ a.svr_ip, a.svr_port, a.tenant_id, a.ls_id, a.role, \
|
||||
nvl(b.weak_read_scn, 1) as weak_read_scn, nvl(b.migrate_status, 0) as migrate_status \
|
||||
nvl(b.weak_read_scn, 1) as weak_read_scn, nvl(b.migrate_status, 0) as migrate_status, nvl(b.tx_blocked, 0) as tx_blocked \
|
||||
from oceanbase.__all_virtual_ls_meta_table a left join oceanbase.__all_virtual_ls_info b \
|
||||
on a.svr_ip = b.svr_ip and a.svr_port = b.svr_port and a.tenant_id = b.tenant_id and a.ls_id = b.ls_id;"
|
||||
|
||||
@ -127,9 +127,10 @@ public:
|
||||
ObLsInfo()
|
||||
: ls_state_(-1),
|
||||
weak_read_scn_(0),
|
||||
migrate_status_(OB_MIGRATE_STATUS_MAX)
|
||||
migrate_status_(OB_MIGRATE_STATUS_MAX),
|
||||
tx_blocked_(false)
|
||||
{}
|
||||
int init(const int64_t ls_state, int64_t weak_read_scn, ObMigrateStatus migrate_status)
|
||||
int init(const int64_t ls_state, int64_t weak_read_scn, ObMigrateStatus migrate_status, bool tx_blocked)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_MIGRATE_STATUS_MAX == migrate_status) {
|
||||
@ -138,6 +139,7 @@ public:
|
||||
ls_state_ = ls_state;
|
||||
weak_read_scn_ = weak_read_scn;
|
||||
migrate_status_ = migrate_status;
|
||||
tx_blocked_ = tx_blocked;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -154,6 +156,8 @@ public:
|
||||
int64_t weak_read_scn_;
|
||||
// 迁移状态,正在迁移的日志流一定不可读
|
||||
ObMigrateStatus migrate_status_;
|
||||
// transaction ls blocked
|
||||
bool tx_blocked_;
|
||||
};
|
||||
|
||||
// blacklist value
|
||||
|
||||
@ -56,7 +56,7 @@ TEST_F(TestObBlackList, black_list_init_invalid)
|
||||
// update
|
||||
ObLsInfo ls_info;
|
||||
uint64_t curr_time = static_cast<uint64_t>(ObTimeUtility::current_time());
|
||||
EXPECT_EQ(OB_SUCCESS, ls_info.init(FOLLOWER, curr_time, OB_MIGRATE_STATUS_NONE));
|
||||
EXPECT_EQ(OB_SUCCESS, ls_info.init(0, curr_time, OB_MIGRATE_STATUS_NONE, 0));
|
||||
EXPECT_EQ(OB_SUCCESS, bl_service.ls_bl_mgr_.update(key, ls_info));
|
||||
|
||||
// check
|
||||
@ -241,7 +241,7 @@ TEST_F(TestObBlackList, black_list_parallel_2)
|
||||
int rand = 0;
|
||||
ObBLKey key;
|
||||
ObLsInfo ls_info;
|
||||
EXPECT_EQ(OB_SUCCESS, ls_info.init(FOLLOWER, 1, OB_MIGRATE_STATUS_NONE));
|
||||
EXPECT_EQ(OB_SUCCESS, ls_info.init(0, 1, OB_MIGRATE_STATUS_NONE, 0));
|
||||
std::srand((unsigned)std::time(NULL));
|
||||
for (int i = 1; i <= loop_cnt; i++) {
|
||||
rand = std::rand() % 1000 + 1;
|
||||
|
||||
Reference in New Issue
Block a user