Fix some bugs for tx module
This commit is contained in:
@ -162,7 +162,7 @@ int ObLS::init(const share::ObLSID &ls_id,
|
||||
LOG_WARN("init keep_alive_ls_handler failed", K(ret));
|
||||
} else if (OB_FAIL(gc_handler_.init(this))) {
|
||||
LOG_WARN("init gc handler failed", K(ret));
|
||||
} else if (OB_FAIL(ls_wrs_handler_.init())) {
|
||||
} else if (OB_FAIL(ls_wrs_handler_.init(ls_meta_.ls_id_))) {
|
||||
LOG_WARN("ls loop worker init failed", K(ret));
|
||||
} else if (OB_FAIL(ls_restore_handler_.init(this))) {
|
||||
LOG_WARN("init ls restore handler", K(ret));
|
||||
|
||||
@ -319,6 +319,7 @@ void ObPartTransCtx::default_init_()
|
||||
state_info_array_.reset();
|
||||
lastest_snapshot_.reset();
|
||||
standby_part_collected_.reset();
|
||||
trace_log_.reset();
|
||||
}
|
||||
|
||||
int ObPartTransCtx::init_log_cbs_(const ObLSID &ls_id, const ObTransID &tx_id)
|
||||
|
||||
@ -243,7 +243,7 @@ int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result)
|
||||
SCN gts_scn;
|
||||
if (OB_FAIL(get_info_from_result_(*result, bl_key, ls_info))) {
|
||||
TRANS_LOG(WARN, "get_info_from_result_ fail ", KR(ret), K(result));
|
||||
} else if (LEADER == ls_info.ls_state_) {
|
||||
} else if (ls_info.is_leader()) {
|
||||
// cannot add leader into blacklist
|
||||
} else if (ls_info.weak_read_scn_ == 0 && ls_info.migrate_status_ == OB_MIGRATE_STATUS_NONE) {
|
||||
// log stream is initializing, should't be put into blacklist
|
||||
@ -292,7 +292,7 @@ int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey
|
||||
int64_t port = 0;
|
||||
int64_t tenant_id = 0;
|
||||
int64_t id = ObLSID::INVALID_LS_ID;
|
||||
ObString ls_state_str;
|
||||
int64_t ls_role = -1;
|
||||
uint64_t weak_read_scn_uint = 0;
|
||||
int64_t migrate_status_int = -1;
|
||||
|
||||
@ -300,25 +300,22 @@ int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey
|
||||
(void)GET_COL_IGNORE_NULL(result.get_int, "svr_port", port);
|
||||
(void)GET_COL_IGNORE_NULL(result.get_int, "tenant_id", tenant_id);
|
||||
(void)GET_COL_IGNORE_NULL(result.get_int, "ls_id", id);
|
||||
(void)GET_COL_IGNORE_NULL(result.get_varchar, "role", ls_state_str);
|
||||
(void)GET_COL_IGNORE_NULL(result.get_int, "role", ls_role);
|
||||
(void)GET_COL_IGNORE_NULL(result.get_uint, "weak_read_scn", weak_read_scn_uint);
|
||||
(void)GET_COL_IGNORE_NULL(result.get_int, "migrate_status", migrate_status_int);
|
||||
|
||||
ObLSID ls_id(id);
|
||||
common::ObAddr server;
|
||||
ObRole ls_state = INVALID_ROLE;
|
||||
int64_t weak_read_scn = static_cast<int64_t>(weak_read_scn_uint);
|
||||
ObMigrateStatus migrate_status = ObMigrateStatus(migrate_status_int);
|
||||
|
||||
if (false == server.set_ip_addr(ip, static_cast<uint32_t>(port))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "invalid server address", K(ip), K(port));
|
||||
} else if (OB_FAIL(string_to_role(ls_state_str, ls_state))) {
|
||||
TRANS_LOG(WARN, "string_to_role fail", K(ls_state_str));
|
||||
} else if (OB_FAIL(bl_key.init(server, tenant_id, ls_id))) {
|
||||
TRANS_LOG(WARN, "bl_key init fail", K(server), K(tenant_id), K(ls_id));
|
||||
} else if (OB_FAIL(ls_info.init(ls_state, weak_read_scn, migrate_status))) {
|
||||
TRANS_LOG(WARN, "ls_info init fail", K(ls_state), K(weak_read_scn), K(migrate_status));
|
||||
} else if (OB_FAIL(ls_info.init(ls_role, weak_read_scn, migrate_status))) {
|
||||
TRANS_LOG(WARN, "ls_info init fail", K(ls_role), K(weak_read_scn), K(migrate_status));
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
@ -31,9 +31,9 @@
|
||||
|
||||
// 查询 __all_virtual_ls_info 的语句,设置了2s超时时间
|
||||
#define BLACK_LIST_SELECT_LS_INFO_STMT \
|
||||
"select /*+query_timeout(2000000)*/ a.svr_ip, a.svr_port, a.tenant_id, a.ls_id, b.role, a.weak_read_scn, a.migrate_status \
|
||||
from oceanbase.__all_virtual_ls_info a, oceanbase.__all_virtual_log_stat b \
|
||||
where a.svr_ip = b.svr_ip and a.svr_port = b.svr_port and a.tenant_id = b.tenant_id and a.ls_id = b.ls_id;"
|
||||
"select /*+query_timeout(2000000)*/ a.svr_ip, a.svr_port, a.tenant_id, a.ls_id, a.role, nvl(b.weak_read_scn, 1), nvl(b.migrate_status, 0) \
|
||||
from oceanbase.__all_virtual_ls_meta_table a left join oceanbase.__all_virtual_ls_info b \
|
||||
on a.svr_ip = b.svr_ip and a.svr_port = b.svr_port and a.tenant_id = b.tenant_id and a.ls_id = b.ls_id;"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -124,11 +124,11 @@ struct ObLsInfo
|
||||
{
|
||||
public:
|
||||
ObLsInfo()
|
||||
: ls_state_(INVALID_ROLE),
|
||||
: ls_state_(-1),
|
||||
weak_read_scn_(0),
|
||||
migrate_status_(OB_MIGRATE_STATUS_MAX)
|
||||
{}
|
||||
int init(ObRole ls_state, int64_t weak_read_scn, ObMigrateStatus migrate_status)
|
||||
int init(const int64_t ls_state, int64_t weak_read_scn, ObMigrateStatus migrate_status)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_MIGRATE_STATUS_MAX == migrate_status) {
|
||||
@ -140,14 +140,15 @@ public:
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
bool is_leader() const { return ls_state_ == 1; }
|
||||
bool is_valid() const
|
||||
{
|
||||
return OB_MIGRATE_STATUS_MAX != migrate_status_;
|
||||
}
|
||||
TO_STRING_KV(K_(ls_state), K_(weak_read_scn), K_(migrate_status));
|
||||
|
||||
// 日志流状态(角色):LEADER、FOLLOWER,其他角色对于日志流是没有意义的
|
||||
ObRole ls_state_;
|
||||
// 日志流状态(角色):LEADER(1)、FOLLOWER(2),其他角色对于日志流是没有意义的
|
||||
int64_t ls_state_;
|
||||
// 弱读时间戳,如果落后超过一定时间就要加入黑名单,单位ns
|
||||
int64_t weak_read_scn_;
|
||||
// 迁移状态,正在迁移的日志流一定不可读
|
||||
|
||||
@ -25,7 +25,7 @@ using namespace share;
|
||||
|
||||
namespace storage
|
||||
{
|
||||
int ObLSWRSHandler::init()
|
||||
int ObLSWRSHandler::init(const share::ObLSID &ls_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -35,7 +35,8 @@ int ObLSWRSHandler::init()
|
||||
} else {
|
||||
is_enabled_ = true;
|
||||
is_inited_ = true;
|
||||
STORAGE_LOG(INFO, "ObLSWRSHandler init success", K(this));
|
||||
ls_id_ = ls_id;
|
||||
STORAGE_LOG(INFO, "ObLSWRSHandler init success", K(*this));
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -46,6 +47,7 @@ void ObLSWRSHandler::reset()
|
||||
is_inited_ = false;
|
||||
ls_weak_read_ts_.set_min();
|
||||
is_enabled_ = false;
|
||||
ls_id_.reset();
|
||||
}
|
||||
|
||||
int ObLSWRSHandler::offline()
|
||||
@ -54,7 +56,7 @@ int ObLSWRSHandler::offline()
|
||||
ObSpinLockGuard guard(lock_);
|
||||
is_enabled_ = false;
|
||||
ls_weak_read_ts_.set_min();
|
||||
STORAGE_LOG(INFO, "weak read handler disabled", K(this));
|
||||
STORAGE_LOG(INFO, "weak read handler disabled", K(*this));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -64,11 +66,11 @@ int ObLSWRSHandler::online()
|
||||
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_LOG(WARN, "ObLSWRSHandler not init", K(ret), K(is_inited_), K(this));
|
||||
STORAGE_LOG(WARN, "ObLSWRSHandler not init", K(ret), K(*this));
|
||||
} else {
|
||||
ObSpinLockGuard guard(lock_);
|
||||
is_enabled_ = true;
|
||||
STORAGE_LOG(INFO, "weak read handler enabled", K(this));
|
||||
STORAGE_LOG(INFO, "weak read handler enabled", K(*this));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -82,7 +84,6 @@ int ObLSWRSHandler::generate_ls_weak_read_snapshot_version(ObLS &ls,
|
||||
int ret = OB_SUCCESS;
|
||||
SCN timestamp;
|
||||
SCN gts_scn;
|
||||
const ObLSID &ls_id = ls.get_ls_id();
|
||||
need_skip = false;
|
||||
ObMigrationStatus status = ObMigrationStatus::OB_MIGRATION_STATUS_NONE;
|
||||
|
||||
@ -94,14 +95,14 @@ int ObLSWRSHandler::generate_ls_weak_read_snapshot_version(ObLS &ls,
|
||||
// do nothing
|
||||
need_skip = true;
|
||||
if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) {
|
||||
STORAGE_LOG(INFO, "weak read handler not enabled", K(ls_id), K(this));
|
||||
STORAGE_LOG(INFO, "weak read handler not enabled", K(*this));
|
||||
}
|
||||
} else if (OB_FAIL(generate_weak_read_timestamp_(ls, max_stale_time, timestamp))) {
|
||||
STORAGE_LOG(DEBUG, "fail to generate weak read timestamp", KR(ret), K(max_stale_time));
|
||||
need_skip = true;
|
||||
ret = OB_SUCCESS;
|
||||
} else if (OB_TS_MGR.get_gts(MTL_ID(), NULL, gts_scn)) {
|
||||
TRANS_LOG(WARN, "get gts scn error", K(ls_id), K(max_stale_time));
|
||||
TRANS_LOG(WARN, "get gts scn error", K(max_stale_time), K(*this));
|
||||
} else if (OB_FAIL(ls.get_migration_status(status))
|
||||
|| ObMigrationStatus::OB_MIGRATION_STATUS_NONE == status ) {
|
||||
// check the weak read timestamp of the migrated ls
|
||||
@ -124,7 +125,7 @@ int ObLSWRSHandler::generate_ls_weak_read_snapshot_version(ObLS &ls,
|
||||
|
||||
// check replica type
|
||||
if (OB_SUCC(ret) && false == need_skip) {
|
||||
if (ls_id.is_sys_ls()) {
|
||||
if (ls_id_.is_sys_ls()) {
|
||||
is_user_ls = false;
|
||||
} else {
|
||||
is_user_ls = true;
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#include "lib/lock/ob_spin_lock.h"
|
||||
#include "lib/utility/ob_macro_utils.h"
|
||||
#include "share/scn.h"
|
||||
#include "share/ob_ls_id.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace clog
|
||||
@ -34,7 +35,7 @@ class ObLSWRSHandler
|
||||
public:
|
||||
ObLSWRSHandler() { reset(); }
|
||||
~ObLSWRSHandler() { reset(); }
|
||||
int init();
|
||||
int init(const share::ObLSID &ls_id);
|
||||
void reset();
|
||||
int offline();
|
||||
int online();
|
||||
@ -45,6 +46,9 @@ public:
|
||||
const int64_t max_stale_time);
|
||||
share::SCN get_ls_weak_read_ts() const { return ls_weak_read_ts_; }
|
||||
bool can_skip_ls() const { return !is_enabled_; }
|
||||
|
||||
TO_STRING_KV(K_(is_inited), K_(is_enabled), K_(ls_id), K_(ls_weak_read_ts));
|
||||
|
||||
private:
|
||||
int generate_weak_read_timestamp_(oceanbase::storage::ObLS &ls, const int64_t max_stale_time, share::SCN ×tamp);
|
||||
|
||||
@ -56,6 +60,7 @@ protected:
|
||||
common::ObSpinLock lock_;
|
||||
bool is_inited_;
|
||||
bool is_enabled_;
|
||||
share::ObLSID ls_id_;
|
||||
share::SCN ls_weak_read_ts_;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user