diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index eee1cfc065..8a2969534c 100755 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -162,7 +162,7 @@ int ObLS::init(const share::ObLSID &ls_id, LOG_WARN("init keep_alive_ls_handler failed", K(ret)); } else if (OB_FAIL(gc_handler_.init(this))) { LOG_WARN("init gc handler failed", K(ret)); - } else if (OB_FAIL(ls_wrs_handler_.init())) { + } else if (OB_FAIL(ls_wrs_handler_.init(ls_meta_.ls_id_))) { LOG_WARN("ls loop worker init failed", K(ret)); } else if (OB_FAIL(ls_restore_handler_.init(this))) { LOG_WARN("init ls restore handler", K(ret)); diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index e6a7c41012..881bdef2aa 100755 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -319,6 +319,7 @@ void ObPartTransCtx::default_init_() state_info_array_.reset(); lastest_snapshot_.reset(); standby_part_collected_.reset(); + trace_log_.reset(); } int ObPartTransCtx::init_log_cbs_(const ObLSID &ls_id, const ObTransID &tx_id) diff --git a/src/storage/tx/wrs/ob_black_list.cpp b/src/storage/tx/wrs/ob_black_list.cpp index e93410b5b6..3470bd7a09 100644 --- a/src/storage/tx/wrs/ob_black_list.cpp +++ b/src/storage/tx/wrs/ob_black_list.cpp @@ -243,7 +243,7 @@ int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result) SCN gts_scn; if (OB_FAIL(get_info_from_result_(*result, bl_key, ls_info))) { TRANS_LOG(WARN, "get_info_from_result_ fail ", KR(ret), K(result)); - } else if (LEADER == ls_info.ls_state_) { + } else if (ls_info.is_leader()) { // cannot add leader into blacklist } else if (ls_info.weak_read_scn_ == 0 && ls_info.migrate_status_ == OB_MIGRATE_STATUS_NONE) { // log stream is initializing, should't be put into blacklist @@ -292,7 +292,7 @@ int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey int64_t port = 0; int64_t tenant_id = 0; int64_t id = ObLSID::INVALID_LS_ID; - ObString ls_state_str; + int64_t ls_role = -1; uint64_t weak_read_scn_uint = 0; int64_t migrate_status_int = -1; @@ -300,25 +300,22 @@ int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey (void)GET_COL_IGNORE_NULL(result.get_int, "svr_port", port); (void)GET_COL_IGNORE_NULL(result.get_int, "tenant_id", tenant_id); (void)GET_COL_IGNORE_NULL(result.get_int, "ls_id", id); - (void)GET_COL_IGNORE_NULL(result.get_varchar, "role", ls_state_str); + (void)GET_COL_IGNORE_NULL(result.get_int, "role", ls_role); (void)GET_COL_IGNORE_NULL(result.get_uint, "weak_read_scn", weak_read_scn_uint); (void)GET_COL_IGNORE_NULL(result.get_int, "migrate_status", migrate_status_int); ObLSID ls_id(id); common::ObAddr server; - ObRole ls_state = INVALID_ROLE; int64_t weak_read_scn = static_cast(weak_read_scn_uint); ObMigrateStatus migrate_status = ObMigrateStatus(migrate_status_int); if (false == server.set_ip_addr(ip, static_cast(port))) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "invalid server address", K(ip), K(port)); - } else if (OB_FAIL(string_to_role(ls_state_str, ls_state))) { - TRANS_LOG(WARN, "string_to_role fail", K(ls_state_str)); } else if (OB_FAIL(bl_key.init(server, tenant_id, ls_id))) { TRANS_LOG(WARN, "bl_key init fail", K(server), K(tenant_id), K(ls_id)); - } else if (OB_FAIL(ls_info.init(ls_state, weak_read_scn, migrate_status))) { - TRANS_LOG(WARN, "ls_info init fail", K(ls_state), K(weak_read_scn), K(migrate_status)); + } else if (OB_FAIL(ls_info.init(ls_role, weak_read_scn, migrate_status))) { + TRANS_LOG(WARN, "ls_info init fail", K(ls_role), K(weak_read_scn), K(migrate_status)); } return ret; diff --git a/src/storage/tx/wrs/ob_black_list.h b/src/storage/tx/wrs/ob_black_list.h index d50f4ea6cd..e69b974a33 100644 --- a/src/storage/tx/wrs/ob_black_list.h +++ b/src/storage/tx/wrs/ob_black_list.h @@ -31,9 +31,9 @@ // 查询 __all_virtual_ls_info 的语句,设置了2s超时时间 #define BLACK_LIST_SELECT_LS_INFO_STMT \ - "select /*+query_timeout(2000000)*/ a.svr_ip, a.svr_port, a.tenant_id, a.ls_id, b.role, a.weak_read_scn, a.migrate_status \ - from oceanbase.__all_virtual_ls_info a, oceanbase.__all_virtual_log_stat b \ - where a.svr_ip = b.svr_ip and a.svr_port = b.svr_port and a.tenant_id = b.tenant_id and a.ls_id = b.ls_id;" + "select /*+query_timeout(2000000)*/ a.svr_ip, a.svr_port, a.tenant_id, a.ls_id, a.role, nvl(b.weak_read_scn, 1), nvl(b.migrate_status, 0) \ + from oceanbase.__all_virtual_ls_meta_table a left join oceanbase.__all_virtual_ls_info b \ + on a.svr_ip = b.svr_ip and a.svr_port = b.svr_port and a.tenant_id = b.tenant_id and a.ls_id = b.ls_id;" namespace oceanbase { @@ -124,11 +124,11 @@ struct ObLsInfo { public: ObLsInfo() - : ls_state_(INVALID_ROLE), + : ls_state_(-1), weak_read_scn_(0), migrate_status_(OB_MIGRATE_STATUS_MAX) {} - int init(ObRole ls_state, int64_t weak_read_scn, ObMigrateStatus migrate_status) + int init(const int64_t ls_state, int64_t weak_read_scn, ObMigrateStatus migrate_status) { int ret = OB_SUCCESS; if (OB_MIGRATE_STATUS_MAX == migrate_status) { @@ -140,14 +140,15 @@ public: } return ret; } + bool is_leader() const { return ls_state_ == 1; } bool is_valid() const { return OB_MIGRATE_STATUS_MAX != migrate_status_; } TO_STRING_KV(K_(ls_state), K_(weak_read_scn), K_(migrate_status)); - // 日志流状态(角色):LEADER、FOLLOWER,其他角色对于日志流是没有意义的 - ObRole ls_state_; + // 日志流状态(角色):LEADER(1)、FOLLOWER(2),其他角色对于日志流是没有意义的 + int64_t ls_state_; // 弱读时间戳,如果落后超过一定时间就要加入黑名单,单位ns int64_t weak_read_scn_; // 迁移状态,正在迁移的日志流一定不可读 diff --git a/src/storage/tx/wrs/ob_ls_wrs_handler.cpp b/src/storage/tx/wrs/ob_ls_wrs_handler.cpp index e0175480f2..c2684658f3 100644 --- a/src/storage/tx/wrs/ob_ls_wrs_handler.cpp +++ b/src/storage/tx/wrs/ob_ls_wrs_handler.cpp @@ -25,7 +25,7 @@ using namespace share; namespace storage { -int ObLSWRSHandler::init() +int ObLSWRSHandler::init(const share::ObLSID &ls_id) { int ret = OB_SUCCESS; @@ -35,7 +35,8 @@ int ObLSWRSHandler::init() } else { is_enabled_ = true; is_inited_ = true; - STORAGE_LOG(INFO, "ObLSWRSHandler init success", K(this)); + ls_id_ = ls_id; + STORAGE_LOG(INFO, "ObLSWRSHandler init success", K(*this)); } return ret; @@ -46,6 +47,7 @@ void ObLSWRSHandler::reset() is_inited_ = false; ls_weak_read_ts_.set_min(); is_enabled_ = false; + ls_id_.reset(); } int ObLSWRSHandler::offline() @@ -54,7 +56,7 @@ int ObLSWRSHandler::offline() ObSpinLockGuard guard(lock_); is_enabled_ = false; ls_weak_read_ts_.set_min(); - STORAGE_LOG(INFO, "weak read handler disabled", K(this)); + STORAGE_LOG(INFO, "weak read handler disabled", K(*this)); return ret; } @@ -64,11 +66,11 @@ int ObLSWRSHandler::online() if (!is_inited_) { ret = OB_NOT_INIT; - STORAGE_LOG(WARN, "ObLSWRSHandler not init", K(ret), K(is_inited_), K(this)); + STORAGE_LOG(WARN, "ObLSWRSHandler not init", K(ret), K(*this)); } else { ObSpinLockGuard guard(lock_); is_enabled_ = true; - STORAGE_LOG(INFO, "weak read handler enabled", K(this)); + STORAGE_LOG(INFO, "weak read handler enabled", K(*this)); } return ret; } @@ -82,7 +84,6 @@ int ObLSWRSHandler::generate_ls_weak_read_snapshot_version(ObLS &ls, int ret = OB_SUCCESS; SCN timestamp; SCN gts_scn; - const ObLSID &ls_id = ls.get_ls_id(); need_skip = false; ObMigrationStatus status = ObMigrationStatus::OB_MIGRATION_STATUS_NONE; @@ -94,14 +95,14 @@ int ObLSWRSHandler::generate_ls_weak_read_snapshot_version(ObLS &ls, // do nothing need_skip = true; if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) { - STORAGE_LOG(INFO, "weak read handler not enabled", K(ls_id), K(this)); + STORAGE_LOG(INFO, "weak read handler not enabled", K(*this)); } } else if (OB_FAIL(generate_weak_read_timestamp_(ls, max_stale_time, timestamp))) { STORAGE_LOG(DEBUG, "fail to generate weak read timestamp", KR(ret), K(max_stale_time)); need_skip = true; ret = OB_SUCCESS; } else if (OB_TS_MGR.get_gts(MTL_ID(), NULL, gts_scn)) { - TRANS_LOG(WARN, "get gts scn error", K(ls_id), K(max_stale_time)); + TRANS_LOG(WARN, "get gts scn error", K(max_stale_time), K(*this)); } else if (OB_FAIL(ls.get_migration_status(status)) || ObMigrationStatus::OB_MIGRATION_STATUS_NONE == status ) { // check the weak read timestamp of the migrated ls @@ -124,7 +125,7 @@ int ObLSWRSHandler::generate_ls_weak_read_snapshot_version(ObLS &ls, // check replica type if (OB_SUCC(ret) && false == need_skip) { - if (ls_id.is_sys_ls()) { + if (ls_id_.is_sys_ls()) { is_user_ls = false; } else { is_user_ls = true; diff --git a/src/storage/tx/wrs/ob_ls_wrs_handler.h b/src/storage/tx/wrs/ob_ls_wrs_handler.h index 79b8928460..66ad738ac6 100644 --- a/src/storage/tx/wrs/ob_ls_wrs_handler.h +++ b/src/storage/tx/wrs/ob_ls_wrs_handler.h @@ -18,6 +18,7 @@ #include "lib/lock/ob_spin_lock.h" #include "lib/utility/ob_macro_utils.h" #include "share/scn.h" +#include "share/ob_ls_id.h" namespace oceanbase { namespace clog @@ -34,7 +35,7 @@ class ObLSWRSHandler public: ObLSWRSHandler() { reset(); } ~ObLSWRSHandler() { reset(); } - int init(); + int init(const share::ObLSID &ls_id); void reset(); int offline(); int online(); @@ -45,6 +46,9 @@ public: const int64_t max_stale_time); share::SCN get_ls_weak_read_ts() const { return ls_weak_read_ts_; } bool can_skip_ls() const { return !is_enabled_; } + + TO_STRING_KV(K_(is_inited), K_(is_enabled), K_(ls_id), K_(ls_weak_read_ts)); + private: int generate_weak_read_timestamp_(oceanbase::storage::ObLS &ls, const int64_t max_stale_time, share::SCN ×tamp); @@ -56,6 +60,7 @@ protected: common::ObSpinLock lock_; bool is_inited_; bool is_enabled_; + share::ObLSID ls_id_; share::SCN ls_weak_read_ts_; };