Fix ddl kv mgr deadlock with get_rec_scn
This commit is contained in:
@ -125,7 +125,7 @@ int ObTabletDDLKvMgr::ddl_start_nolock(const ObITable::TableKey &table_key,
|
||||
table_key_ = table_key;
|
||||
data_format_version_ = data_format_version;
|
||||
execution_id_ = execution_id;
|
||||
start_scn_ = start_scn;
|
||||
start_scn_.atomic_store(start_scn);
|
||||
max_freeze_scn_ = SCN::max(start_scn, checkpoint_scn);
|
||||
}
|
||||
}
|
||||
@ -158,10 +158,10 @@ int ObTabletDDLKvMgr::ddl_start(ObTablet &tablet,
|
||||
// save variables under lock
|
||||
saved_start_scn = start_scn_;
|
||||
saved_snapshot_version = table_key_.get_snapshot_version();
|
||||
commit_scn_ = get_commit_scn_nolock(tablet.get_tablet_meta());
|
||||
commit_scn_.atomic_store(get_commit_scn(tablet.get_tablet_meta()));
|
||||
if (checkpoint_scn.is_valid_and_not_min()) {
|
||||
if (tablet.get_tablet_meta().table_store_flag_.with_major_sstable() && tablet.get_tablet_meta().ddl_commit_scn_.is_valid_and_not_min()) {
|
||||
success_start_scn_ = tablet.get_tablet_meta().ddl_start_scn_;
|
||||
success_start_scn_.atomic_store(tablet.get_tablet_meta().ddl_start_scn_);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -185,7 +185,7 @@ int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn, const SCN &commit_scn)
|
||||
} else if (!is_started()) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
LOG_WARN("ddl not started", K(ret));
|
||||
} else if (start_scn < start_scn_) {
|
||||
} else if (start_scn < get_start_scn()) {
|
||||
ret = OB_TASK_EXPIRED;
|
||||
LOG_INFO("skip ddl commit log", K(start_scn), K(*this));
|
||||
} else if (OB_FAIL(set_commit_scn(commit_scn))) {
|
||||
@ -236,7 +236,7 @@ int ObTabletDDLKvMgr::schedule_ddl_merge_task(const SCN &start_scn, const SCN &c
|
||||
LOG_WARN("not init", K(ret), K(is_inited_));
|
||||
} else if (is_commit_success()) {
|
||||
FLOG_INFO("ddl commit already succeed", K(start_scn), K(commit_scn), K(*this));
|
||||
} else if (start_scn < start_scn_) {
|
||||
} else if (start_scn < get_start_scn()) {
|
||||
ret = OB_TASK_EXPIRED;
|
||||
LOG_INFO("skip ddl commit log", K(start_scn), K(commit_scn), K(*this));
|
||||
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
|
||||
@ -275,7 +275,7 @@ int ObTabletDDLKvMgr::wait_ddl_merge_success(const SCN &start_scn, const SCN &co
|
||||
} else if (!is_started()) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
LOG_WARN("ddl not started", K(ret));
|
||||
} else if (start_scn > start_scn_) {
|
||||
} else if (start_scn > get_start_scn()) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("start log ts not match", K(ret), K(start_scn), K(start_scn_), K(ls_id_), K(tablet_id_));
|
||||
} else {
|
||||
@ -305,17 +305,21 @@ int ObTabletDDLKvMgr::wait_ddl_merge_success(const SCN &start_scn, const SCN &co
|
||||
int ObTabletDDLKvMgr::get_ddl_major_merge_param(const ObTabletMeta &tablet_meta, ObDDLTableMergeDagParam ¶m)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLatchRGuard state_guard(state_lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
|
||||
ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
|
||||
if (can_schedule_major_compaction_nolock(tablet_meta)) {
|
||||
uint32_t lock_tid = 0;
|
||||
if (OB_FAIL(rdlock(TRY_LOCK_TIMEOUT, lock_tid))) {
|
||||
LOG_WARN("failed to rdlock", K(ret), KPC(this));
|
||||
} else if (can_schedule_major_compaction_nolock(tablet_meta)) {
|
||||
param.ls_id_ = ls_id_;
|
||||
param.tablet_id_ = tablet_id_;
|
||||
param.rec_scn_ = get_commit_scn_nolock(tablet_meta);
|
||||
param.rec_scn_ = get_commit_scn(tablet_meta);
|
||||
param.is_commit_ = true;
|
||||
param.start_scn_ = start_scn_;
|
||||
} else {
|
||||
ret = OB_EAGAIN;
|
||||
}
|
||||
if (0 != lock_tid) {
|
||||
unlock(lock_tid);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -342,10 +346,10 @@ int ObTabletDDLKvMgr::get_rec_scn(SCN &rec_scn)
|
||||
// rec scn of ddl start log
|
||||
if (OB_SUCC(ret)) {
|
||||
const ObTabletMeta &tablet_meta = tablet_handle.get_obj()->get_tablet_meta();
|
||||
ObLatchRGuard state_guard(state_lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
|
||||
if (start_scn_.is_valid_and_not_min() && start_scn_ != tablet_meta.ddl_start_scn_) {
|
||||
const SCN start_scn = get_start_scn();
|
||||
if (start_scn.is_valid_and_not_min() && start_scn != tablet_meta.ddl_start_scn_) {
|
||||
// has a latest start log and not flushed to tablet meta, keep it
|
||||
rec_scn = SCN::min(rec_scn, start_scn_);
|
||||
rec_scn = SCN::min(rec_scn, start_scn);
|
||||
}
|
||||
}
|
||||
|
||||
@ -403,12 +407,12 @@ int ObTabletDDLKvMgr::set_commit_scn(const SCN &commit_scn)
|
||||
LOG_WARN("get tablet handle failed", K(ret), K(ls_id_), K(tablet_id_));
|
||||
} else {
|
||||
ObLatchWGuard state_guard(state_lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
|
||||
const SCN old_commit_scn = get_commit_scn_nolock(tablet_handle.get_obj()->get_tablet_meta());
|
||||
const SCN old_commit_scn = get_commit_scn(tablet_handle.get_obj()->get_tablet_meta());
|
||||
if (old_commit_scn.is_valid_and_not_min() && old_commit_scn != commit_scn) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("already committed by others", K(ret), K(commit_scn), K(*this));
|
||||
} else {
|
||||
commit_scn_ = commit_scn;
|
||||
commit_scn_.atomic_store(commit_scn);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -416,18 +420,13 @@ int ObTabletDDLKvMgr::set_commit_scn(const SCN &commit_scn)
|
||||
|
||||
SCN ObTabletDDLKvMgr::get_commit_scn(const ObTabletMeta &tablet_meta)
|
||||
{
|
||||
ObLatchRGuard state_guard(state_lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
|
||||
return get_commit_scn_nolock(tablet_meta);
|
||||
}
|
||||
|
||||
SCN ObTabletDDLKvMgr::get_commit_scn_nolock(const ObTabletMeta &tablet_meta)
|
||||
{
|
||||
SCN mgr_commit_scn = commit_scn_.atomic_load();
|
||||
SCN commit_scn = SCN::min_scn();
|
||||
if (tablet_meta.ddl_commit_scn_.is_valid_and_not_min() || commit_scn_.is_valid_and_not_min()) {
|
||||
if (tablet_meta.ddl_commit_scn_.is_valid_and_not_min() || mgr_commit_scn.is_valid_and_not_min()) {
|
||||
if (tablet_meta.ddl_commit_scn_.is_valid_and_not_min()) {
|
||||
commit_scn = tablet_meta.ddl_commit_scn_;
|
||||
} else {
|
||||
commit_scn = commit_scn_;
|
||||
commit_scn = mgr_commit_scn;
|
||||
}
|
||||
} else {
|
||||
commit_scn = SCN::min_scn();
|
||||
@ -460,7 +459,7 @@ int ObTabletDDLKvMgr::set_commit_success(const SCN &start_scn)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
success_start_scn_ = start_scn;
|
||||
success_start_scn_.atomic_store(start_scn);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -468,24 +467,20 @@ int ObTabletDDLKvMgr::set_commit_success(const SCN &start_scn)
|
||||
|
||||
bool ObTabletDDLKvMgr::is_commit_success()
|
||||
{
|
||||
ObLatchRGuard state_guard(state_lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
|
||||
return is_commit_success_unlock();
|
||||
}
|
||||
|
||||
bool ObTabletDDLKvMgr::is_commit_success_unlock() const
|
||||
{
|
||||
return success_start_scn_ > SCN::min_scn() && success_start_scn_ == start_scn_;
|
||||
const SCN success_start_scn = success_start_scn_.atomic_load();
|
||||
const SCN start_scn = start_scn_.atomic_load();
|
||||
return success_start_scn > SCN::min_scn() && success_start_scn == start_scn;
|
||||
}
|
||||
|
||||
void ObTabletDDLKvMgr::reset_commit_success()
|
||||
{
|
||||
ObLatchWGuard state_guard(state_lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
|
||||
success_start_scn_.set_min();
|
||||
success_start_scn_.atomic_store(SCN::min_scn());
|
||||
}
|
||||
|
||||
bool ObTabletDDLKvMgr::can_schedule_major_compaction_nolock(const ObTabletMeta &tablet_meta)
|
||||
{
|
||||
return get_commit_scn_nolock(tablet_meta).is_valid_and_not_min() && !is_commit_success_unlock();
|
||||
return get_commit_scn(tablet_meta).is_valid_and_not_min() && !is_commit_success();
|
||||
}
|
||||
|
||||
int ObTabletDDLKvMgr::cleanup()
|
||||
@ -516,11 +511,11 @@ void ObTabletDDLKvMgr::cleanup_unlock()
|
||||
}
|
||||
table_key_.reset();
|
||||
data_format_version_ = 0;
|
||||
start_scn_.set_min();
|
||||
commit_scn_.set_min();
|
||||
start_scn_.atomic_store(SCN::min_scn());
|
||||
commit_scn_.atomic_store(SCN::min_scn());
|
||||
max_freeze_scn_.set_min();
|
||||
execution_id_ = -1;
|
||||
success_start_scn_.set_min();
|
||||
success_start_scn_.atomic_store(SCN::min_scn());
|
||||
}
|
||||
|
||||
bool ObTabletDDLKvMgr::is_execution_id_older(const int64_t execution_id)
|
||||
@ -528,33 +523,6 @@ bool ObTabletDDLKvMgr::is_execution_id_older(const int64_t execution_id)
|
||||
return execution_id < execution_id_;
|
||||
}
|
||||
|
||||
int ObTabletDDLKvMgr::set_execution_id_nolock(const int64_t execution_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (execution_id < execution_id_) {
|
||||
ret = OB_TASK_EXPIRED;
|
||||
LOG_WARN("ddl task expired", K(ret), K(execution_id), K(*this));
|
||||
} else {
|
||||
execution_id_ = execution_id;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLKvMgr::set_execution_id(const int64_t execution_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret), K(is_inited_));
|
||||
} else {
|
||||
ObLatchWGuard state_guard(state_lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
|
||||
if (OB_FAIL(set_execution_id_nolock(execution_id))) {
|
||||
LOG_WARN("failed to set execution id", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLKvMgr::online()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -809,7 +777,7 @@ int ObTabletDDLKvMgr::update_ddl_major_sstable()
|
||||
storage_schema,
|
||||
ls_handle.get_ls()->get_rebuild_seq());
|
||||
param.ddl_info_.keep_old_ddl_sstable_ = true;
|
||||
param.ddl_info_.ddl_commit_scn_ = commit_scn_;
|
||||
param.ddl_info_.ddl_commit_scn_ = get_commit_scn(tablet_handle.get_obj()->get_tablet_meta());
|
||||
if (OB_FAIL(ls_handle.get_ls()->update_tablet_table_store(tablet_id_, param, new_tablet_handle))) {
|
||||
LOG_WARN("failed to update tablet table store", K(ret), K(ls_id_), K(tablet_id_), K(param));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user