fix ddl kv freeze and release speed inconsistent bug
This commit is contained in:
@ -146,23 +146,29 @@ int ObDDLCtrlSpeedItem::cal_limit(const int64_t bytes, int64_t &next_available_t
|
||||
int ObDDLCtrlSpeedItem::do_sleep(
|
||||
const int64_t next_available_ts,
|
||||
const int64_t task_id,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
int64_t &real_sleep_us)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
real_sleep_us = 0;
|
||||
bool is_exist = true;
|
||||
bool is_need_stop_write = false;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (next_available_ts <= 0 || task_id == 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument.", K(ret), K(next_available_ts), K(task_id));
|
||||
} else if (OB_UNLIKELY(need_stop_write_)) /*clog disk used exceeds threshold*/ {
|
||||
} else if (OB_TMP_FAIL(check_need_stop_write(ddl_kv_mgr_handle, is_need_stop_write))) {
|
||||
LOG_WARN("fail to check need stop write", K(tmp_ret), K(ddl_kv_mgr_handle));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (is_need_stop_write) /*clog disk used exceeds threshold*/ {
|
||||
int64_t loop_cnt = 0;
|
||||
ObMySQLProxy *sql_proxy = GCTX.sql_proxy_;
|
||||
while (OB_SUCC(ret) && need_stop_write_) {
|
||||
while (OB_SUCC(ret) && is_need_stop_write) {
|
||||
// TODO YIREN (FIXME-20221017), exit when task is canceled, etc.
|
||||
int64_t tmp_ret = OB_SUCCESS;
|
||||
ob_usleep(SLEEP_INTERVAL);
|
||||
if (0 == loop_cnt % 100) {
|
||||
if (OB_TMP_FAIL(rootserver::ObDDLTaskRecordOperator::check_task_id_exist(*sql_proxy, task_id, is_exist))) {
|
||||
@ -178,7 +184,11 @@ int ObDDLCtrlSpeedItem::do_sleep(
|
||||
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
|
||||
ObTaskController::get().allow_next_syslog();
|
||||
FLOG_INFO("stop write ddl clog", K(ret), K(ls_id_),
|
||||
K(write_speed_), K(need_stop_write_), K(ref_cnt_), K(disk_used_stop_write_threshold_));
|
||||
K(write_speed_), K(need_stop_write_), K(ref_cnt_),
|
||||
K(disk_used_stop_write_threshold_));
|
||||
}
|
||||
if (OB_TMP_FAIL(check_need_stop_write(ddl_kv_mgr_handle, is_need_stop_write))) {
|
||||
LOG_WARN("fail to check need stop write", K(tmp_ret), K(ddl_kv_mgr_handle));
|
||||
}
|
||||
loop_cnt++;
|
||||
}
|
||||
@ -190,10 +200,27 @@ int ObDDLCtrlSpeedItem::do_sleep(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLCtrlSpeedItem::check_need_stop_write(ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
bool &is_need_stop_write)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_need_stop_write = false;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else {
|
||||
int64_t ddl_kv_count = ddl_kv_mgr_handle.get_obj()->get_count();
|
||||
is_need_stop_write = (ddl_kv_count >= ObTabletDDLKvMgr::MAX_DDL_KV_CNT_IN_STORAGE - 1);
|
||||
is_need_stop_write = (is_need_stop_write || need_stop_write_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// calculate the sleep time for the input bytes, sleep.
|
||||
int ObDDLCtrlSpeedItem::limit_and_sleep(
|
||||
const int64_t bytes,
|
||||
const int64_t task_id,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
int64_t &real_sleep_us)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -217,7 +244,7 @@ int ObDDLCtrlSpeedItem::limit_and_sleep(
|
||||
INT64_MAX,
|
||||
&transmit_sleep_us))) {
|
||||
LOG_WARN("fail to limit out and sleep", K(ret), K(bytes), K(transmit_sleep_us));
|
||||
} else if (OB_FAIL(do_sleep(next_available_ts, task_id, real_sleep_us))) {
|
||||
} else if (OB_FAIL(do_sleep(next_available_ts, task_id, ddl_kv_mgr_handle, real_sleep_us))) {
|
||||
LOG_WARN("fail to sleep", K(ret), K(next_available_ts), K(real_sleep_us));
|
||||
} else {/* do nothing. */}
|
||||
return ret;
|
||||
@ -307,12 +334,12 @@ int ObDDLCtrlSpeedHandle::init()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLCtrlSpeedHandle::limit_and_sleep(
|
||||
const uint64_t tenant_id,
|
||||
const share::ObLSID &ls_id,
|
||||
const int64_t bytes,
|
||||
const int64_t task_id,
|
||||
int64_t &real_sleep_us)
|
||||
int ObDDLCtrlSpeedHandle::limit_and_sleep(const uint64_t tenant_id,
|
||||
const share::ObLSID &ls_id,
|
||||
const int64_t bytes,
|
||||
const int64_t task_id,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
int64_t &real_sleep_us)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SpeedHandleKey speed_handle_key;
|
||||
@ -339,6 +366,7 @@ int ObDDLCtrlSpeedHandle::limit_and_sleep(
|
||||
LOG_WARN("unexpected err, ctrl speed item is nullptr", K(ret), K(speed_handle_key));
|
||||
} else if (OB_FAIL(speed_handle_item->limit_and_sleep(bytes,
|
||||
task_id,
|
||||
ddl_kv_mgr_handle,
|
||||
real_sleep_us))) {
|
||||
LOG_WARN("fail to limit and sleep", K(ret), K(bytes), K(task_id), K(real_sleep_us));
|
||||
}
|
||||
@ -623,13 +651,14 @@ int ObDDLRedoLogWriter::write(
|
||||
uint32_t lock_tid = 0;
|
||||
int64_t real_sleep_us = 0;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (!log.is_valid() || nullptr == log_handler || !ls_id.is_valid() || OB_INVALID_TENANT_ID == tenant_id || nullptr == buffer || 0 == task_id) {
|
||||
if (!log.is_valid() || nullptr == log_handler || !ls_id.is_valid()
|
||||
|| OB_INVALID_TENANT_ID == tenant_id
|
||||
|| nullptr == buffer || 0 == task_id) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(log), K(ls_id), K(tenant_id), KP(buffer));
|
||||
} else if (OB_TMP_FAIL(ObDDLCtrlSpeedHandle::get_instance().limit_and_sleep(tenant_id, ls_id, buffer_size, task_id, real_sleep_us))) {
|
||||
} else if (OB_TMP_FAIL(ObDDLCtrlSpeedHandle::get_instance().limit_and_sleep(tenant_id, ls_id, buffer_size, task_id, ddl_kv_mgr_handle, real_sleep_us))) {
|
||||
LOG_WARN("fail to limit and sleep", K(tmp_ret), K(tenant_id), K(task_id), K(ls_id), K(buffer_size), K(real_sleep_us));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->rdlock(ObDDLRedoLogHandle::DDL_REDO_LOG_TIMEOUT, lock_tid))) {
|
||||
LOG_WARN("failed to rdlock", K(ret));
|
||||
@ -986,26 +1015,38 @@ void ObDDLCommitLogHandle::reset()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int ObDDLMacroBlockRedoWriter::write_macro_redo(ObTabletHandle &tablet_handle,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
const ObDDLMacroBlockRedoInfo &redo_info,
|
||||
const share::ObLSID &ls_id,
|
||||
const int64_t task_id,
|
||||
ObLogHandler *log_handler,
|
||||
logservice::ObLogHandler *log_handler,
|
||||
const blocksstable::MacroBlockId ¯o_block_id,
|
||||
char *buffer,
|
||||
ObDDLRedoLogHandle &handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!redo_info.is_valid() || nullptr == log_handler || nullptr == buffer || 0 == task_id)) {
|
||||
if (OB_UNLIKELY(!redo_info.is_valid()
|
||||
|| nullptr == log_handler
|
||||
|| nullptr == buffer
|
||||
|| 0 == task_id
|
||||
|| !ls_id.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(redo_info), KP(log_handler), KP(buffer), K(task_id));
|
||||
LOG_WARN("invalid arguments", K(ret), K(redo_info), KP(log_handler), KP(buffer), K(task_id), K(ls_id));
|
||||
} else {
|
||||
ObDDLRedoLog log;
|
||||
int64_t tmp_ret = OB_SUCCESS;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
if (OB_FAIL(log.init(redo_info))) {
|
||||
LOG_WARN("fail to init DDLRedoLog", K(ret), K(redo_info));
|
||||
} else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write(tablet_handle, ddl_kv_mgr_handle, log, tenant_id, task_id, ls_id, log_handler, macro_block_id, buffer, handle))) {
|
||||
} else if (OB_FAIL(ObDDLRedoLogWriter::get_instance().write(tablet_handle,
|
||||
ddl_kv_mgr_handle,
|
||||
log, tenant_id,
|
||||
task_id, ls_id,
|
||||
log_handler,
|
||||
macro_block_id, buffer,
|
||||
handle))) {
|
||||
LOG_WARN("fail to write ddl redo log item", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
@ -57,8 +57,10 @@ public:
|
||||
int refresh();
|
||||
int limit_and_sleep(const int64_t bytes,
|
||||
const int64_t task_id,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
int64_t &real_sleep_us);
|
||||
|
||||
int check_need_stop_write(ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
bool &is_need_stop_write);
|
||||
// for ref_cnt_
|
||||
void inc_ref() { ATOMIC_INC(&ref_cnt_); }
|
||||
int64_t dec_ref() { return ATOMIC_SAF(&ref_cnt_, 1); }
|
||||
@ -70,6 +72,7 @@ private:
|
||||
int cal_limit(const int64_t bytes, int64_t &next_available_ts);
|
||||
int do_sleep(const int64_t next_available_ts,
|
||||
const int64_t task_id,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
int64_t &real_sleep_us);
|
||||
private:
|
||||
static const int64_t MIN_WRITE_SPEED = 50L;
|
||||
@ -93,6 +96,7 @@ public:
|
||||
const share::ObLSID &ls_id,
|
||||
const int64_t bytes,
|
||||
const int64_t task_id,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
int64_t &real_sleep_us);
|
||||
|
||||
private:
|
||||
@ -252,7 +256,7 @@ class ObDDLMacroBlockRedoWriter final
|
||||
public:
|
||||
static int write_macro_redo(ObTabletHandle &tablet_handle,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
const blocksstable::ObDDLMacroBlockRedoInfo &redo_info,
|
||||
const ObDDLMacroBlockRedoInfo &redo_info,
|
||||
const share::ObLSID &ls_id,
|
||||
const int64_t task_id,
|
||||
logservice::ObLogHandler *log_handler,
|
||||
@ -266,6 +270,8 @@ public:
|
||||
private:
|
||||
ObDDLMacroBlockRedoWriter() = default;
|
||||
~ObDDLMacroBlockRedoWriter() = default;
|
||||
private:
|
||||
static const int64_t SLEEP_INTERVAL = 1 * 1000; // 1ms
|
||||
};
|
||||
|
||||
// This class should be the entrance to write redo log and commit log
|
||||
|
||||
@ -878,8 +878,17 @@ int ObTabletDDLKvMgr::get_freezed_ddl_kv(const SCN &freeze_scn, ObTableHandleV2
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int64_t ObTabletDDLKvMgr::get_count()
|
||||
{
|
||||
int64_t ddl_kv_count = 0;
|
||||
{
|
||||
ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
|
||||
ddl_kv_count = tail_ - head_;
|
||||
}
|
||||
return ddl_kv_count;
|
||||
}
|
||||
|
||||
int64_t ObTabletDDLKvMgr::get_count() const
|
||||
int64_t ObTabletDDLKvMgr::get_count_nolock() const
|
||||
{
|
||||
return tail_ - head_;
|
||||
}
|
||||
@ -893,7 +902,7 @@ int ObTabletDDLKvMgr::get_active_ddl_kv_impl(ObTableHandleV2 &kv_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
kv_handle.reset();
|
||||
if (get_count() == 0) {
|
||||
if (get_count_nolock() == 0) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
} else {
|
||||
ObTableHandleV2 &tail_kv_handle = ddl_kv_handles_[get_idx(tail_ - 1)];
|
||||
@ -961,7 +970,7 @@ void ObTabletDDLKvMgr::try_get_ddl_kv_unlock(const SCN &scn, ObTableHandleV2 &kv
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
kv_handle.reset();
|
||||
if (get_count() > 0) {
|
||||
if (get_count_nolock() > 0) {
|
||||
for (int64_t i = tail_ - 1; OB_SUCC(ret) && i >= head_ && !kv_handle.is_valid(); --i) {
|
||||
ObTableHandleV2 &tmp_kv_handle = ddl_kv_handles_[get_idx(i)];
|
||||
ObDDLKV *tmp_kv = static_cast<ObDDLKV *>(tmp_kv_handle.get_table());
|
||||
@ -984,7 +993,7 @@ int ObTabletDDLKvMgr::freeze_ddl_kv(const SCN &freeze_scn)
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret));
|
||||
} else if (0 == get_count()) {
|
||||
} else if (0 == get_count_nolock()) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(get_active_ddl_kv_impl(kv_handle))) {
|
||||
LOG_WARN("fail to get active ddl kv", K(ret));
|
||||
@ -1142,7 +1151,7 @@ int ObTabletDDLKvMgr::check_has_effective_ddl_kv(bool &has_ddl_kv)
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret));
|
||||
} else {
|
||||
has_ddl_kv = 0 != get_count();
|
||||
has_ddl_kv = 0 != get_count_nolock();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1160,7 +1169,7 @@ int ObTabletDDLKvMgr::alloc_ddl_kv(ObTableHandleV2 &kv_handle)
|
||||
} else if (OB_UNLIKELY(!is_started())) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("ddl kv manager not started", K(ret));
|
||||
} else if (get_count() == MAX_DDL_KV_CNT_IN_STORAGE) {
|
||||
} else if (get_count_nolock() == MAX_DDL_KV_CNT_IN_STORAGE) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, too much ddl kv count", K(ret));
|
||||
} else if (OB_FAIL(t3m->acquire_ddl_kv(tmp_kv_handle))) {
|
||||
@ -1181,7 +1190,7 @@ int ObTabletDDLKvMgr::alloc_ddl_kv(ObTableHandleV2 &kv_handle)
|
||||
tail_++;
|
||||
ddl_kv_handles_[idx] = tmp_kv_handle;
|
||||
kv_handle = tmp_kv_handle;
|
||||
FLOG_INFO("succeed to add ddl kv", K(ls_id_), K(tablet_id_), K(head_), K(tail_), "ddl_kv_cnt", get_count(), KP(kv));
|
||||
FLOG_INFO("succeed to add ddl kv", K(ls_id_), K(tablet_id_), K(head_), K(tail_), "ddl_kv_cnt", get_count_nolock(), KP(kv));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -72,6 +72,7 @@ public:
|
||||
int wrlock(const int64_t timeout_us, uint32_t &lock_tid);
|
||||
void unlock(const uint32_t lock_tid);
|
||||
int update_tablet(const share::SCN &start_scn, const int64_t snapshot_version, const share::SCN &ddl_checkpoint_scn);
|
||||
int64_t get_count();
|
||||
OB_INLINE void inc_ref() { ATOMIC_INC(&ref_cnt_); }
|
||||
OB_INLINE int64_t dec_ref() { return ATOMIC_SAF(&ref_cnt_, 1 /* just sub 1 */); }
|
||||
OB_INLINE int64_t get_ref() const { return ATOMIC_LOAD(&ref_cnt_); }
|
||||
@ -85,12 +86,12 @@ public:
|
||||
|
||||
private:
|
||||
int64_t get_idx(const int64_t pos) const;
|
||||
int64_t get_count() const;
|
||||
int alloc_ddl_kv(ObTableHandleV2 &kv_handle);
|
||||
void free_ddl_kv(const int64_t idx);
|
||||
int get_active_ddl_kv_impl(ObTableHandleV2 &kv_handle);
|
||||
void try_get_ddl_kv_unlock(const share::SCN &scn, ObTableHandleV2 &kv_handle);
|
||||
int get_ddl_kvs_unlock(const bool frozen_only, ObTablesHandleArray &kv_handle_array);
|
||||
int64_t get_count_nolock() const;
|
||||
int update_ddl_major_sstable();
|
||||
int create_empty_ddl_sstable(ObTableHandleV2 &table_handle);
|
||||
void cleanup_unlock();
|
||||
|
||||
Reference in New Issue
Block a user