fix balcklist bug which make RTO > 10s
This commit is contained in:
parent
73c41fb9d8
commit
2bb9937cf7
@ -30,7 +30,7 @@ int ObBLService::init()
|
||||
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
TRANS_LOG(ERROR, "BLService init twice", K(*this));
|
||||
TRANS_LOG(ERROR, "BLService init twice", KR(ret), K(*this));
|
||||
} else if (OB_FAIL(ls_bl_mgr_.init())) {
|
||||
TRANS_LOG(ERROR, "ls_bl_mgr_ init fail", KR(ret));
|
||||
} else if (OB_FAIL(ObThreadPool::init())) {
|
||||
@ -110,7 +110,7 @@ int ObBLService::add(const ObBLKey &bl_key)
|
||||
}
|
||||
default: {
|
||||
ret = OB_UNKNOWN_OBJ;
|
||||
TRANS_LOG(ERROR, "unknown key type", K(bl_key));
|
||||
TRANS_LOG(ERROR, "unknown key type", KR(ret), K(bl_key));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -124,7 +124,7 @@ void ObBLService::remove(const ObBLKey &bl_key)
|
||||
} else {
|
||||
switch(bl_key.get_type()) {
|
||||
case BLTYPE_LS: {
|
||||
ls_bl_mgr_.remove(bl_key);
|
||||
(void)ls_bl_mgr_.remove(bl_key);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
@ -134,7 +134,7 @@ void ObBLService::remove(const ObBLKey &bl_key)
|
||||
}
|
||||
}
|
||||
|
||||
int ObBLService::check_in_black_list(const ObBLKey &bl_key, bool &in_black_list) const
|
||||
int ObBLService::check_in_black_list(const ObBLKey &bl_key, bool &in_black_list)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -146,12 +146,27 @@ int ObBLService::check_in_black_list(const ObBLKey &bl_key, bool &in_black_list)
|
||||
case BLTYPE_LS: {
|
||||
if (OB_FAIL(ls_bl_mgr_.check_in_black_list(bl_key, in_black_list))) {
|
||||
TRANS_LOG(WARN, "ls_bl_mgr_ check failed", KR(ret), K(bl_key));
|
||||
} else if (in_black_list) {
|
||||
ObAddr leader;
|
||||
if (GCTX.is_inited() &&
|
||||
OB_FAIL(GCTX.location_service_->nonblock_get_leader(GCONF.cluster_id,
|
||||
bl_key.get_tenant_id(),
|
||||
bl_key.get_ls_id(),
|
||||
leader))) {
|
||||
TRANS_LOG(WARN, "nonblock get ls location failed", KR(ret), K(bl_key));
|
||||
// don't return error, and assume it's not leader
|
||||
ret = OB_SUCCESS;
|
||||
} else if(leader == bl_key.get_server() && check_need_skip_leader_(bl_key.get_tenant_id())) {
|
||||
in_black_list = false;
|
||||
// remove primary leader from blacklist
|
||||
(void)ls_bl_mgr_.remove(bl_key);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_UNKNOWN_OBJ;
|
||||
TRANS_LOG(ERROR, "unknown blacklist key type", K(bl_key));
|
||||
TRANS_LOG(ERROR, "unknown blacklist key type", KR(ret), K(bl_key));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -200,7 +215,7 @@ void ObBLService::do_thread_task_(const int64_t begin_tstamp,
|
||||
TRANS_LOG(WARN, "sql_proxy is null", KR(ret));
|
||||
} else if (OB_FAIL(sql.assign_fmt(BLACK_LIST_SELECT_LS_INFO_STMT))) {
|
||||
TRANS_LOG(WARN, "fail to append sql", KR(ret));
|
||||
} else if (OB_FAIL(sql_proxy->read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
|
||||
} else if (OB_FAIL(sql_proxy->read(res, OB_SYS_TENANT_ID, sql.ptr(), nullptr, INNER_SQL_QUERY_TIMEOUT))) {
|
||||
TRANS_LOG(WARN, "fail to execute sql", KR(ret), K(sql));
|
||||
} else if (NULL == (result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -246,14 +261,17 @@ int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result)
|
||||
ObBLKey bl_key;
|
||||
ObLsInfo ls_info;
|
||||
SCN gts_scn;
|
||||
bool need_remove = false;
|
||||
if (OB_FAIL(get_info_from_result_(*result, bl_key, ls_info))) {
|
||||
TRANS_LOG(WARN, "get_info_from_result_ fail ", KR(ret), K(result));
|
||||
} else if (ls_info.is_leader() && check_need_skip_leader_(bl_key.get_tenant_id())) {
|
||||
// cannot add leader into blacklist
|
||||
need_remove = true;
|
||||
} else if (ls_info.weak_read_scn_ == 0) {
|
||||
// log stream is initializing, should't be put into blacklist
|
||||
need_remove = true;
|
||||
} else if (OB_FAIL(OB_TS_MGR.get_gts(bl_key.get_tenant_id(), NULL, gts_scn))) {
|
||||
TRANS_LOG(WARN, "get gts scn error", K(ret), K(bl_key));
|
||||
TRANS_LOG(WARN, "get gts scn error", KR(ret), K(bl_key));
|
||||
} else {
|
||||
max_stale_time = get_tenant_max_stale_time_(bl_key.get_tenant_id());
|
||||
max_stale_time_ns = max_stale_time * 1000;
|
||||
@ -262,19 +280,26 @@ int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result)
|
||||
if (gts_scn.get_val_for_gts() > ls_info.weak_read_scn_ + max_stale_time_ns
|
||||
|| ls_info.tx_blocked_
|
||||
|| ls_info.migrate_status_ != ObMigrationStatus::OB_MIGRATION_STATUS_NONE) {
|
||||
// scn is out-of-time,add this log stream into blacklist
|
||||
// scn is out-of-time,add this log stream into blacklist (insert or update)
|
||||
if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info))) {
|
||||
TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", K(bl_key), K(ls_info));
|
||||
TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", KR(ret), K(bl_key), K(ls_info));
|
||||
}
|
||||
TRANS_LOG(INFO, "ls_bl_mgr_ add finish ", KR(ret), KTIME(gts), KTIME(weak_read_scn), K(max_stale_time), K(bl_key), K(ls_info));
|
||||
} else if (gts_scn.get_val_for_gts() + BLACK_LIST_WHITEWASH_INTERVAL_NS < ls_info.weak_read_scn_ + max_stale_time_ns) {
|
||||
// scn is new enough,remove this log stream in the blacklist
|
||||
ls_bl_mgr_.remove(bl_key);
|
||||
need_remove = true;
|
||||
} else {
|
||||
// do nothing
|
||||
// only update, don't insert
|
||||
bool only_update = true;
|
||||
if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info, only_update))) {
|
||||
TRANS_LOG(WARN, "ls_bl_mgr_ update fail ", KR(ret), K(bl_key), K(ls_info));
|
||||
}
|
||||
TRANS_LOG(INFO, "ls_bl_mgr_ do nothing ", KR(ret), KTIME(gts), KTIME(weak_read_scn), K(max_stale_time), K(bl_key), K(ls_info));
|
||||
}
|
||||
}
|
||||
if (need_remove) {
|
||||
(void)ls_bl_mgr_.remove(bl_key);
|
||||
}
|
||||
}
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
@ -339,20 +364,20 @@ int ObBLService::get_info_from_result_(sqlclient::ObMySQLResult &result, ObBLKey
|
||||
|
||||
if (false == server.set_ip_addr(ip, static_cast<uint32_t>(port))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "invalid server address", K(ip), K(port));
|
||||
TRANS_LOG(WARN, "invalid server address", KR(ret), K(ip), K(port));
|
||||
} else if (OB_FAIL(weak_read_number.cast_to_int64(weak_read_scn))) {
|
||||
TRANS_LOG(WARN, "failed to cast int", K(ret), K(weak_read_number));
|
||||
TRANS_LOG(WARN, "failed to cast int", KR(ret), K(weak_read_number));
|
||||
} else if (OB_FAIL(bl_key.init(server, tenant_id, ls_id))) {
|
||||
TRANS_LOG(WARN, "bl_key init fail", K(server), K(tenant_id), K(ls_id));
|
||||
TRANS_LOG(WARN, "bl_key init fail", KR(ret), K(server), K(tenant_id), K(ls_id));
|
||||
} else if (OB_FAIL(ls_info.init(ls_role, weak_read_scn, migrate_status, (1 == tx_blocked) ? true : false))) {
|
||||
TRANS_LOG(WARN, "ls_info init fail", K(ls_role), K(weak_read_scn), K(migrate_status), K(tx_blocked));
|
||||
TRANS_LOG(WARN, "ls_info init fail", KR(ret), K(ls_role), K(weak_read_scn), K(migrate_status), K(tx_blocked));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (1 == tx_blocked) {
|
||||
TRANS_LOG(INFO, "current ls is blocked, need to put blacklist", K(bl_key), K(ls_info));
|
||||
} else if (0 != tx_blocked) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected tx blocked", K(ret), K(tx_blocked), K(bl_key), K(ls_info));
|
||||
TRANS_LOG(ERROR, "unexpected tx blocked", KR(ret), K(tx_blocked), K(bl_key), K(ls_info));
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
|
@ -21,13 +21,15 @@
|
||||
// 定期更新黑名单的时间间隔(us)
|
||||
#define BLACK_LIST_REFRESH_INTERVAL 3000000 // 3s
|
||||
// 判断时间戳是否赶上/落后的缓冲时间(ns),避免阈值附近的日志流反复加入/移出黑名单
|
||||
#define BLACK_LIST_WHITEWASH_INTERVAL_NS 1000000000 // 1s
|
||||
#define BLACK_LIST_WHITEWASH_INTERVAL_NS 1000000000L // 1s
|
||||
// 黑名单信息打印时间间隔(us)
|
||||
#define BLACK_LIST_PRINT_INTERVAL 5000000 // 5s
|
||||
// 清理超时对象的时间间隔(us),这些对象不会出现在 SQLResult 中,比如切换server之后旧server上的日志流
|
||||
#define BLACK_LIST_CLEAN_UP_INTERVAL 5000000 // 5s
|
||||
// 最大连续失败次数,连续刷新黑名单失败 达到 该次数则清空黑名单
|
||||
#define BLACK_LIST_MAX_FAIL_COUNT 3
|
||||
// 执行内部sql的超时时间,内部sql的hint不生效,需要在接口中指定超时时间
|
||||
#define INNER_SQL_QUERY_TIMEOUT 2000000L // 2s
|
||||
|
||||
// 查询 __all_virtual_ls_info 的语句,设置了2s超时时间
|
||||
// select /*+query_timeout(2000000)*/ a.svr_ip, a.svr_port, a.tenant_id, a.ls_id, a.role, nvl(b.weak_read_scn, 1) as weak_read_scn, nvl(b.migrate_status, 0) as migrate_status, nvl(b.tx_blocked, 0) as tx_blocked from oceanbase.__all_virtual_ls_meta_table a left join oceanbase.__all_virtual_ls_info b on a.svr_ip = b.svr_ip and a.svr_port = b.svr_port and a.tenant_id = b.tenant_id and a.ls_id = b.ls_id;
|
||||
@ -107,6 +109,7 @@ public:
|
||||
const ObLSID &get_ls_id() const { return ls_id_; }
|
||||
// TODO: different keys return different types, default return BLTYPE_UNKNOWN
|
||||
BLType get_type() const { return BLType::BLTYPE_LS; }
|
||||
const ObAddr &get_server() const { return server_; }
|
||||
bool is_valid() const {
|
||||
return server_.is_valid() && OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid();
|
||||
}
|
||||
@ -177,7 +180,7 @@ public:
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int init(const ObBLKey &key, const ObLsInfo ls_info)
|
||||
int init(const ObBLKey &key, const ObLsInfo &ls_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!key.is_valid() || !ls_info.is_valid()) {
|
||||
@ -285,7 +288,7 @@ public:
|
||||
return ret;
|
||||
}
|
||||
// update or create
|
||||
int update(const BLKey &bl_key, const ObLsInfo &ls_info)
|
||||
int update(const BLKey &bl_key, const ObLsInfo &ls_info, bool only_update = false)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
BLValue *value = NULL;
|
||||
@ -297,7 +300,9 @@ public:
|
||||
TRANS_LOG(WARN, "map get error", KR(ret), K(bl_key), K(ls_info));
|
||||
} else if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
// key不存在,创建value并插入map
|
||||
if (OB_FAIL(map_.create(bl_key, value))) {
|
||||
if (only_update) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(map_.create(bl_key, value))) {
|
||||
// 可能前面get时还没有这个key,但是在create之前别的线程把这个key插入map了
|
||||
TRANS_LOG(WARN, "map create error", KR(ret), K(bl_key), K(ls_info));
|
||||
} else {
|
||||
@ -317,11 +322,19 @@ public:
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
void remove(const BLKey &bl_key)
|
||||
int remove(const BLKey &bl_key)
|
||||
{
|
||||
if (bl_key.is_valid()) {
|
||||
map_.del(bl_key);
|
||||
int ret = OB_SUCCESS;
|
||||
if (!bl_key.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
} else if(OB_FAIL(map_.del(bl_key))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
TRANS_LOG(ERROR, "map remove fail ", KR(ret), K(bl_key));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int check_in_black_list(const BLKey &bl_key, bool &in_black_list) const
|
||||
{
|
||||
@ -362,7 +375,7 @@ public:
|
||||
|
||||
int add(const ObBLKey &bl_key);
|
||||
void remove(const ObBLKey &bl_key);
|
||||
int check_in_black_list(const ObBLKey &bl_key, bool &in_black_list) const;
|
||||
int check_in_black_list(const ObBLKey &bl_key, bool &in_black_list);
|
||||
|
||||
void run1();
|
||||
TO_STRING_KV(K_(is_inited), K_(is_running));
|
||||
|
Loading…
x
Reference in New Issue
Block a user