fix write ddl clog do_sleep() function unable to determine if the current node is a leader bug
This commit is contained in:
@ -144,6 +144,30 @@ int ObDDLCtrlSpeedItem::cal_limit(const int64_t bytes, int64_t &next_available_t
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObDDLCtrlSpeedItem::check_cur_node_is_leader(bool &is_leader)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
is_leader = true;
|
||||||
|
ObRole role = INVALID_ROLE;
|
||||||
|
ObLS *ls = nullptr;
|
||||||
|
ObLSHandle handle;
|
||||||
|
ObLSService *ls_svr = MTL(ObLSService*);
|
||||||
|
if (OB_ISNULL(ls_svr)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("ls_svr is nullptr", K(ret));
|
||||||
|
} else if (OB_FAIL(ls_svr->get_ls(ls_id_, handle, ObLSGetMod::STORAGE_MOD))) {
|
||||||
|
LOG_WARN("fail to get ls handle", K(ret), K_(ls_id));
|
||||||
|
} else if (OB_ISNULL(ls = handle.get_ls())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("ls is nullptr", K(ret));
|
||||||
|
} else if (OB_FAIL(ls->get_ls_role(role))) {
|
||||||
|
LOG_WARN("get ls role failed", K(ret));
|
||||||
|
} else if (role != ObRole::LEADER) {
|
||||||
|
is_leader = false;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObDDLCtrlSpeedItem::do_sleep(
|
int ObDDLCtrlSpeedItem::do_sleep(
|
||||||
const int64_t next_available_ts,
|
const int64_t next_available_ts,
|
||||||
const int64_t task_id,
|
const int64_t task_id,
|
||||||
@ -154,6 +178,7 @@ int ObDDLCtrlSpeedItem::do_sleep(
|
|||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
real_sleep_us = 0;
|
real_sleep_us = 0;
|
||||||
bool is_exist = true;
|
bool is_exist = true;
|
||||||
|
|
||||||
bool is_need_stop_write = false;
|
bool is_need_stop_write = false;
|
||||||
if (OB_UNLIKELY(!is_inited_)) {
|
if (OB_UNLIKELY(!is_inited_)) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -210,9 +235,18 @@ int ObDDLCtrlSpeedItem::check_need_stop_write(ObDDLKvMgrHandle &ddl_kv_mgr_handl
|
|||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("not init", K(ret));
|
LOG_WARN("not init", K(ret));
|
||||||
} else {
|
} else {
|
||||||
int64_t ddl_kv_count = ddl_kv_mgr_handle.get_obj()->get_count();
|
bool is_leader = true;
|
||||||
is_need_stop_write = (ddl_kv_count >= ObTabletDDLKvMgr::MAX_DDL_KV_CNT_IN_STORAGE - 1);
|
if (OB_FAIL(check_cur_node_is_leader(is_leader))) {
|
||||||
is_need_stop_write = (is_need_stop_write || need_stop_write_);
|
LOG_WARN("check cur node is leader failed", K(ret));
|
||||||
|
} else {
|
||||||
|
if (is_leader) {
|
||||||
|
int64_t ddl_kv_count = ddl_kv_mgr_handle.get_obj()->get_count();
|
||||||
|
is_need_stop_write = (ddl_kv_count >= ObTabletDDLKvMgr::MAX_DDL_KV_CNT_IN_STORAGE - 1);
|
||||||
|
is_need_stop_write = (is_need_stop_write || need_stop_write_);
|
||||||
|
} else {
|
||||||
|
is_need_stop_write = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -69,6 +69,7 @@ public:
|
|||||||
TO_STRING_KV(K_(is_inited), K_(ls_id), K_(next_available_write_ts),
|
TO_STRING_KV(K_(is_inited), K_(ls_id), K_(next_available_write_ts),
|
||||||
K_(write_speed), K_(disk_used_stop_write_threshold), K_(need_stop_write), K_(ref_cnt));
|
K_(write_speed), K_(disk_used_stop_write_threshold), K_(need_stop_write), K_(ref_cnt));
|
||||||
private:
|
private:
|
||||||
|
int check_cur_node_is_leader(bool &is_leader);
|
||||||
int cal_limit(const int64_t bytes, int64_t &next_available_ts);
|
int cal_limit(const int64_t bytes, int64_t &next_available_ts);
|
||||||
int do_sleep(const int64_t next_available_ts,
|
int do_sleep(const int64_t next_available_ts,
|
||||||
const int64_t task_id,
|
const int64_t task_id,
|
||||||
|
Reference in New Issue
Block a user