diff --git a/src/observer/virtual_table/ob_all_virtual_ls_info.cpp b/src/observer/virtual_table/ob_all_virtual_ls_info.cpp index e9d38a6ecc..ca3aa0830c 100644 --- a/src/observer/virtual_table/ob_all_virtual_ls_info.cpp +++ b/src/observer/virtual_table/ob_all_virtual_ls_info.cpp @@ -193,11 +193,15 @@ int ObAllVirtualLSInfo::process_curr_tenant(ObNewRow *&row) // tablet_change_checkpoint_scn cur_row_.cells_[i].set_uint64(!ls_info.tablet_change_checkpoint_scn_.is_valid() ? 0 : ls_info.tablet_change_checkpoint_scn_.get_val_for_inner_table_field()); break; - default: case OB_APP_MIN_COLUMN_ID + 15: // transfer_scn cur_row_.cells_[i].set_uint64(!ls_info.transfer_scn_.is_valid() ? 0 : ls_info.transfer_scn_.get_val_for_inner_table_field()); break; + case OB_APP_MIN_COLUMN_ID + 16: + // tx blocked + cur_row_.cells_[i].set_int(ls_info.tx_blocked_); + break; + default: ret = OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "invalid col_id", K(ret), K(col_id)); break; diff --git a/src/share/inner_table/ob_inner_table_schema.12251_12300.cpp b/src/share/inner_table/ob_inner_table_schema.12251_12300.cpp index fb32cebbdf..9b038764ef 100644 --- a/src/share/inner_table/ob_inner_table_schema.12251_12300.cpp +++ b/src/share/inner_table/ob_inner_table_schema.12251_12300.cpp @@ -9521,6 +9521,21 @@ int ObInnerTableSchema::all_virtual_ls_info_schema(ObTableSchema &table_schema) false, //is_nullable false); //is_autoincrement } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("tx_blocked", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObUInt64Type, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(uint64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } if (OB_SUCC(ret)) { table_schema.get_part_option().set_part_num(1); table_schema.set_part_level(PARTITION_LEVEL_ONE); diff --git a/src/share/inner_table/ob_inner_table_schema_def.py b/src/share/inner_table/ob_inner_table_schema_def.py index 237a40ff29..bf82832c10 100644 --- a/src/share/inner_table/ob_inner_table_schema_def.py +++ b/src/share/inner_table/ob_inner_table_schema_def.py @@ -10833,6 +10833,7 @@ def_table_schema( ('rebuild_seq', 'int'), ('tablet_change_checkpoint_scn', 'uint'), ('transfer_scn', 'uint'), + ('tx_blocked', 'uint'), ], partition_columns = ['svr_ip', 'svr_port'], vtable_route_policy = 'distributed', diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index c11a9454e9..2011208995 100755 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -1218,6 +1218,7 @@ int ObLS::get_ls_info(ObLSVTInfo &ls_info) bool is_log_sync = false; bool is_need_rebuild = false; ObMigrationStatus migrate_status; + bool tx_blocked = false; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ls is not inited", K(ret)); @@ -1228,6 +1229,8 @@ int ObLS::get_ls_info(ObLSVTInfo &ls_info) LOG_WARN("get ls need rebuild info failed", K(ret), KPC(this)); } else if (OB_FAIL(ls_meta_.get_migration_status(migrate_status))) { LOG_WARN("get ls migrate status failed", K(ret), KPC(this)); + } else if (OB_FAIL(ls_tx_svr_.check_tx_blocked(tx_blocked))) { + LOG_WARN("check tx ls state error", K(ret),KPC(this)); } else { ls_info.ls_id_ = ls_meta_.ls_id_; ls_info.replica_type_ = ls_meta_.get_replica_type(); @@ -1241,6 +1244,7 @@ int ObLS::get_ls_info(ObLSVTInfo &ls_info) ls_info.rebuild_seq_ = ls_meta_.get_rebuild_seq(); ls_info.tablet_change_checkpoint_scn_ = ls_meta_.get_tablet_change_checkpoint_scn(); ls_info.transfer_scn_ = ls_meta_.get_transfer_scn(); + ls_info.tx_blocked_ = tx_blocked; } return ret; } diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index 367133e2e6..c4d3f64cbc 100755 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -93,6 +93,7 @@ struct ObLSVTInfo int64_t rebuild_seq_; share::SCN tablet_change_checkpoint_scn_; share::SCN transfer_scn_; + bool tx_blocked_; }; // 诊断虚表统计信息 diff --git a/src/storage/ls/ob_ls_tx_service.cpp b/src/storage/ls/ob_ls_tx_service.cpp index 400a962419..3aaa0f7eee 100644 --- a/src/storage/ls/ob_ls_tx_service.cpp +++ b/src/storage/ls/ob_ls_tx_service.cpp @@ -743,6 +743,18 @@ int ObLSTxService::set_max_replay_commit_version(share::SCN commit_version) } return ret; } + +int ObLSTxService::check_tx_blocked(bool &tx_blocked) const +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(mgr_)) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "not init", KR(ret), K_(ls_id)); + } else { + tx_blocked = mgr_->is_blocked(); + } + return ret; +} } // transaction } // oceanbase diff --git a/src/storage/ls/ob_ls_tx_service.h b/src/storage/ls/ob_ls_tx_service.h index af0c2977a2..adb1be47b0 100644 --- a/src/storage/ls/ob_ls_tx_service.h +++ b/src/storage/ls/ob_ls_tx_service.h @@ -174,6 +174,9 @@ public: int check_in_leader_serving_state(bool& bool_ret); int set_max_replay_commit_version(share::SCN commit_version); transaction::ObTxRetainCtxMgr *get_retain_ctx_mgr(); + + // check tx ls blocked + int check_tx_blocked(bool &tx_blocked) const; private: void reset_(); diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.h b/src/storage/tx/ob_trans_ctx_mgr_v4.h index 5c8d41a9f1..1955a49c15 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.h +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.h @@ -511,6 +511,9 @@ public: // check is master bool is_master() const { return is_master_(); } + // check is blocked + bool is_blocked() const { return is_blocked_(); } + // Switch the prev_aggre_log_ts and aggre_log_ts during dump starts int refresh_aggre_rec_scn(); diff --git a/src/storage/tx/wrs/ob_black_list.cpp b/src/storage/tx/wrs/ob_black_list.cpp index 8f809fa6fd..51f1da9c6a 100644 --- a/src/storage/tx/wrs/ob_black_list.cpp +++ b/src/storage/tx/wrs/ob_black_list.cpp @@ -252,7 +252,7 @@ int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result) } else { max_stale_time = get_tenant_max_stale_time_(bl_key.get_tenant_id()); int64_t max_stale_time_ns = max_stale_time * 1000; - if (gts_scn.get_val_for_gts() > ls_info.weak_read_scn_ + max_stale_time_ns) { + if (gts_scn.get_val_for_gts() > ls_info.weak_read_scn_ + max_stale_time_ns || ls_info.tx_blocked_) { // scn is out-of-time,add this log stream into blacklist if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info))) { TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", K(bl_key), K(ls_info)); @@ -295,7 +295,9 @@ int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey int64_t ls_role = -1; int64_t weak_read_scn = 0; int64_t migrate_status_int = -1; + int64_t tx_blocked = 0; common::number::ObNumber weak_read_number; + common::number::ObNumber tx_blocked_number; (void)GET_COL_IGNORE_NULL(result.get_varchar, "svr_ip", ip); (void)GET_COL_IGNORE_NULL(result.get_int, "svr_port", port); @@ -304,6 +306,7 @@ int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey (void)GET_COL_IGNORE_NULL(result.get_int, "role", ls_role); (void)GET_COL_IGNORE_NULL(result.get_number, "weak_read_scn", weak_read_number); (void)GET_COL_IGNORE_NULL(result.get_int, "migrate_status", migrate_status_int); + (void)GET_COL_IGNORE_NULL(result.get_number, "tx_blocked", tx_blocked_number); ObLSID ls_id(id); common::ObAddr server; @@ -314,10 +317,12 @@ int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey TRANS_LOG(WARN, "invalid server address", K(ip), K(port)); } else if (OB_FAIL(weak_read_number.cast_to_int64(weak_read_scn))) { TRANS_LOG(WARN, "failed to cast int", K(ret), K(weak_read_number)); + } else if (OB_FAIL(tx_blocked_number.cast_to_int64(tx_blocked))) { + TRANS_LOG(WARN, "failed to cast int", K(ret), K(tx_blocked_number)); } else if (OB_FAIL(bl_key.init(server, tenant_id, ls_id))) { TRANS_LOG(WARN, "bl_key init fail", K(server), K(tenant_id), K(ls_id)); - } else if (OB_FAIL(ls_info.init(ls_role, weak_read_scn, migrate_status))) { - TRANS_LOG(WARN, "ls_info init fail", K(ls_role), K(weak_read_scn), K(migrate_status)); + } else if (OB_FAIL(ls_info.init(ls_role, weak_read_scn, migrate_status, tx_blocked))) { + TRANS_LOG(WARN, "ls_info init fail", K(ls_role), K(weak_read_scn), K(migrate_status), K(tx_blocked)); } return ret; diff --git a/src/storage/tx/wrs/ob_black_list.h b/src/storage/tx/wrs/ob_black_list.h index daf57de6d2..d8dde16e34 100644 --- a/src/storage/tx/wrs/ob_black_list.h +++ b/src/storage/tx/wrs/ob_black_list.h @@ -32,7 +32,7 @@ // 查询 __all_virtual_ls_info 的语句,设置了2s超时时间 #define BLACK_LIST_SELECT_LS_INFO_STMT \ "select /*+query_timeout(2000000)*/ a.svr_ip, a.svr_port, a.tenant_id, a.ls_id, a.role, \ - nvl(b.weak_read_scn, 1) as weak_read_scn, nvl(b.migrate_status, 0) as migrate_status \ + nvl(b.weak_read_scn, 1) as weak_read_scn, nvl(b.migrate_status, 0) as migrate_status, nvl(b.tx_blocked, 0) as tx_blocked \ from oceanbase.__all_virtual_ls_meta_table a left join oceanbase.__all_virtual_ls_info b \ on a.svr_ip = b.svr_ip and a.svr_port = b.svr_port and a.tenant_id = b.tenant_id and a.ls_id = b.ls_id;" @@ -127,9 +127,10 @@ public: ObLsInfo() : ls_state_(-1), weak_read_scn_(0), - migrate_status_(OB_MIGRATE_STATUS_MAX) + migrate_status_(OB_MIGRATE_STATUS_MAX), + tx_blocked_(false) {} - int init(const int64_t ls_state, int64_t weak_read_scn, ObMigrateStatus migrate_status) + int init(const int64_t ls_state, int64_t weak_read_scn, ObMigrateStatus migrate_status, bool tx_blocked) { int ret = OB_SUCCESS; if (OB_MIGRATE_STATUS_MAX == migrate_status) { @@ -138,6 +139,7 @@ public: ls_state_ = ls_state; weak_read_scn_ = weak_read_scn; migrate_status_ = migrate_status; + tx_blocked_ = tx_blocked; } return ret; } @@ -154,6 +156,8 @@ public: int64_t weak_read_scn_; // 迁移状态,正在迁移的日志流一定不可读 ObMigrateStatus migrate_status_; + // transaction ls blocked + bool tx_blocked_; }; // blacklist value diff --git a/unittest/storage/tx/test_ob_black_list.cpp b/unittest/storage/tx/test_ob_black_list.cpp index 05461606fc..22250c022c 100644 --- a/unittest/storage/tx/test_ob_black_list.cpp +++ b/unittest/storage/tx/test_ob_black_list.cpp @@ -56,7 +56,7 @@ TEST_F(TestObBlackList, black_list_init_invalid) // update ObLsInfo ls_info; uint64_t curr_time = static_cast(ObTimeUtility::current_time()); - EXPECT_EQ(OB_SUCCESS, ls_info.init(FOLLOWER, curr_time, OB_MIGRATE_STATUS_NONE)); + EXPECT_EQ(OB_SUCCESS, ls_info.init(0, curr_time, OB_MIGRATE_STATUS_NONE, 0)); EXPECT_EQ(OB_SUCCESS, bl_service.ls_bl_mgr_.update(key, ls_info)); // check @@ -241,7 +241,7 @@ TEST_F(TestObBlackList, black_list_parallel_2) int rand = 0; ObBLKey key; ObLsInfo ls_info; - EXPECT_EQ(OB_SUCCESS, ls_info.init(FOLLOWER, 1, OB_MIGRATE_STATUS_NONE)); + EXPECT_EQ(OB_SUCCESS, ls_info.init(0, 1, OB_MIGRATE_STATUS_NONE, 0)); std::srand((unsigned)std::time(NULL)); for (int i = 1; i <= loop_cnt; i++) { rand = std::rand() % 1000 + 1;