set ddl success with start log ts
This commit is contained in:

committed by
wangzelin.wzl

parent
56076b3e8c
commit
18e8a1e6aa
@ -1808,9 +1808,11 @@ int ObTenantDagScheduler::add_dag(
|
||||
} else {
|
||||
ObThreadCondGuard guard(scheduler_sync_);
|
||||
if (OB_FAIL(inner_add_dag(emergency, check_size_overflow, dag))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("failed to inner add dag", K(ret), KPC(dag));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -412,7 +412,7 @@ int ObDDLTableMergeTask::process()
|
||||
LOG_WARN("fail to submit tablet update task", K(ret), K(tenant_id), K(merge_param_));
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->set_commit_success())) {
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->set_commit_success(merge_param_.start_log_ts_))) {
|
||||
LOG_WARN("set is commit success failed", K(ret));
|
||||
} else {
|
||||
LOG_INFO("commit ddl sstable succ", K(ddl_param), K(merge_param_));
|
||||
|
@ -74,7 +74,7 @@ int ObDDLRedoLogReplayer::replay_start(const ObDDLStartLog &log, const int64_t l
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle, true/*try_create*/))) {
|
||||
LOG_WARN("create ddl kv mgr failed", K(ret), K(table_key));
|
||||
LOG_WARN("create ddl kv mgr failed", K(ret), K(table_key), K(log), K(log_ts));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(table_key,
|
||||
log_ts,
|
||||
log.get_cluster_version(),
|
||||
@ -82,9 +82,9 @@ int ObDDLRedoLogReplayer::replay_start(const ObDDLStartLog &log, const int64_t l
|
||||
0/*checkpoint_log_ts*/))) {
|
||||
LOG_WARN("start ddl log failed", K(ret), K(log), K(log_ts));
|
||||
} else {
|
||||
LOG_INFO("succeed to replay ddl start log", K(ret), K(log));
|
||||
LOG_INFO("succeed to replay ddl start log", K(ret), K(log), K(log_ts));
|
||||
}
|
||||
LOG_INFO("finish replay ddl start log", K(ret), K(need_replay), K(log));
|
||||
LOG_INFO("finish replay ddl start log", K(ret), K(need_replay), K(log), K(log_ts));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -171,21 +171,21 @@ int ObDDLRedoLogReplayer::replay_prepare(const ObDDLPrepareLog &log, const int64
|
||||
LOG_WARN("invalid arguments", K(ret), K(log));
|
||||
} else if (OB_FAIL(check_need_replay_ddl_log(table_key, log.get_start_log_ts(), log_ts, need_replay, tablet_handle))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(log_ts));
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(log_ts), K(log), K(log_ts));
|
||||
}
|
||||
} else if (!need_replay) {
|
||||
// do nothing
|
||||
} else if (OB_UNLIKELY(!tablet_handle.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle));
|
||||
LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle), K(log), K(log_ts));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
|
||||
LOG_WARN("get ddl kv mgr failed", K(ret));
|
||||
LOG_WARN("get ddl kv mgr failed", K(ret), K(log), K(log_ts));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(log.get_start_log_ts(), log_ts))) {
|
||||
LOG_WARN("replay ddl prepare log failed", K(ret), K(log));
|
||||
LOG_WARN("replay ddl prepare log failed", K(ret), K(log), K(log_ts));
|
||||
} else {
|
||||
LOG_INFO("replay ddl prepare log success", K(ret), K(table_key), K(log_ts));
|
||||
LOG_INFO("replay ddl prepare log success", K(ret), K(log), K(log_ts));
|
||||
}
|
||||
LOG_INFO("finish replay ddl prepare log", K(ret), K(need_replay), K(log));
|
||||
LOG_INFO("finish replay ddl prepare log", K(ret), K(need_replay), K(log), K(log_ts));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -209,24 +209,26 @@ int ObDDLRedoLogReplayer::replay_commit(const ObDDLCommitLog &log, const int64_t
|
||||
LOG_WARN("invalid arguments", K(ret), K(log));
|
||||
} else if (OB_FAIL(check_need_replay_ddl_log(table_key, log.get_start_log_ts(), log_ts, need_replay, tablet_handle))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(log_ts));
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(log_ts), K(log));
|
||||
}
|
||||
} else if (!need_replay) {
|
||||
// do nothing
|
||||
} else if (OB_UNLIKELY(!tablet_handle.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle));
|
||||
LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle), K(log), K(log_ts));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
|
||||
LOG_WARN("get ddl kv mgr failed", K(ret));
|
||||
LOG_WARN("get ddl kv mgr failed", K(ret), K(log), K(log_ts));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(log.get_start_log_ts(), log.get_prepare_log_ts(), true/*is_replay*/))) {
|
||||
LOG_WARN("replay ddl commit log failed", K(ret));
|
||||
if (OB_TABLET_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS; // allow tablet not exist
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("replay ddl commit log failed", K(ret), K(log), K(log_ts));
|
||||
}
|
||||
if (OB_TABLET_NOT_EXIST == ret || OB_TASK_EXPIRED == ret) {
|
||||
ret = OB_SUCCESS; // exit when tablet not exist or task expired
|
||||
}
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->unregister_from_tablet(log.get_start_log_ts(), ddl_kv_mgr_handle))) {
|
||||
LOG_WARN("unregister ddl kv mgr from tablet failed", K(ret));
|
||||
LOG_WARN("unregister ddl kv mgr from tablet failed", K(ret), K(log), K(log_ts));
|
||||
}
|
||||
LOG_INFO("finish replay ddl commit log", K(ret), K(need_replay), K(log));
|
||||
LOG_INFO("finish replay ddl commit log", K(ret), K(need_replay), K(log), K(log_ts));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ using namespace oceanbase::share;
|
||||
using namespace oceanbase::storage;
|
||||
|
||||
ObTabletDDLKvMgr::ObTabletDDLKvMgr()
|
||||
: is_inited_(false), is_commit_success_(false), ls_id_(), tablet_id_(), table_key_(), cluster_version_(0),
|
||||
: is_inited_(false), success_start_log_ts_(0), ls_id_(), tablet_id_(), table_key_(), cluster_version_(0),
|
||||
start_log_ts_(0), max_freeze_log_ts_(0),
|
||||
table_id_(0), execution_id_(0), head_(0), tail_(0), lock_(), ref_cnt_(0)
|
||||
{
|
||||
@ -61,7 +61,7 @@ void ObTabletDDLKvMgr::destroy()
|
||||
max_freeze_log_ts_ = 0;
|
||||
table_id_ = 0;
|
||||
execution_id_ = 0;
|
||||
is_commit_success_ = false;
|
||||
success_start_log_ts_ = 0;
|
||||
is_inited_ = false;
|
||||
}
|
||||
|
||||
@ -153,7 +153,7 @@ int ObTabletDDLKvMgr::ddl_prepare(const int64_t start_log_ts,
|
||||
LOG_WARN("ddl not started", K(ret));
|
||||
} else if (start_log_ts < start_log_ts_) {
|
||||
ret = OB_TASK_EXPIRED;
|
||||
LOG_INFO("skip ddl prepare log", K(start_log_ts), K(start_log_ts_), K(ls_id_), K(tablet_id_));
|
||||
LOG_INFO("skip ddl prepare log", K(start_log_ts), K(*this));
|
||||
} else if (OB_FAIL(freeze_ddl_kv(prepare_log_ts))) {
|
||||
LOG_WARN("freeze ddl kv failed", K(ret), K(prepare_log_ts));
|
||||
} else {
|
||||
@ -178,12 +178,13 @@ int ObTabletDDLKvMgr::ddl_prepare(const int64_t start_log_ts,
|
||||
ret = OB_SUCCESS;
|
||||
ob_usleep(10L * 1000L);
|
||||
if (REACH_TIME_INTERVAL(10L * 1000L * 1000L)) {
|
||||
LOG_INFO("retry schedule ddl commit task", K(ls_id_), K(table_key_), K(prepare_log_ts), K(max_freeze_log_ts_),
|
||||
LOG_INFO("retry schedule ddl commit task",
|
||||
K(start_log_ts), K(prepare_log_ts), K(table_id), K(ddl_task_id), K(*this),
|
||||
"wait_elpased_s", (ObTimeUtility::fast_current_time() - start_ts) / 1000000L);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("schedule ddl commit task success", K(ls_id_), K(tablet_id_), K(prepare_log_ts));
|
||||
LOG_INFO("schedule ddl commit task success", K(start_log_ts), K(prepare_log_ts), K(table_id), K(ddl_task_id), K(*this));
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -198,11 +199,11 @@ int ObTabletDDLKvMgr::ddl_commit(const int64_t start_log_ts, const int64_t prepa
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret), K(is_inited_));
|
||||
} else if (is_commit_success_) {
|
||||
LOG_INFO("ddl commit already succeed", K(ls_id_), K(tablet_id_), K(table_id_));
|
||||
} else if (is_commit_success()) {
|
||||
FLOG_INFO("ddl commit already succeed", K(start_log_ts), K(prepare_log_ts), K(*this));
|
||||
} else if (start_log_ts < start_log_ts_) {
|
||||
ret = OB_TASK_EXPIRED;
|
||||
LOG_INFO("skip ddl commit log", K(start_log_ts), K(start_log_ts_), K(ls_id_), K(tablet_id_));
|
||||
LOG_INFO("skip ddl commit log", K(start_log_ts), K(prepare_log_ts), K(*this));
|
||||
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
|
||||
LOG_WARN("failed to get log stream", K(ret), K(ls_id_));
|
||||
} else {
|
||||
@ -231,7 +232,7 @@ int ObTabletDDLKvMgr::ddl_commit(const int64_t start_log_ts, const int64_t prepa
|
||||
ret = OB_SUCCESS; // think as succcess for replay
|
||||
} else {
|
||||
if (REACH_TIME_INTERVAL(10L * 1000L * 1000L)) {
|
||||
LOG_INFO("replay ddl commit", K(ret), K(ls_id_), K(tablet_id_), K(start_log_ts_), K(start_log_ts), K(prepare_log_ts), K(max_freeze_log_ts_));
|
||||
LOG_INFO("replay ddl commit", K(ret), K(start_log_ts), K(prepare_log_ts), K(*this));
|
||||
}
|
||||
ret = OB_EAGAIN; // retry by replay service
|
||||
}
|
||||
@ -276,18 +277,36 @@ int ObTabletDDLKvMgr::wait_ddl_commit(const int64_t start_log_ts, const int64_t
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLKvMgr::set_commit_success()
|
||||
int ObTabletDDLKvMgr::set_commit_success(const int64_t start_log_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret), K(is_inited_));
|
||||
} else if (OB_UNLIKELY(start_log_ts <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(start_log_ts));
|
||||
} else {
|
||||
is_commit_success_ = true;
|
||||
TCWLockGuard guard(lock_);
|
||||
if (start_log_ts < start_log_ts_) {
|
||||
ret = OB_TASK_EXPIRED;
|
||||
LOG_WARN("ddl task expired", K(ret), K(start_log_ts), K(*this));
|
||||
} else if (OB_UNLIKELY(start_log_ts > start_log_ts_)) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("sucess start log ts too large", K(ret), K(start_log_ts), K(*this));
|
||||
} else {
|
||||
success_start_log_ts_ = start_log_ts;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObTabletDDLKvMgr::is_commit_success() const
|
||||
{
|
||||
TCRLockGuard guard(lock_);
|
||||
return success_start_log_ts_ > 0 && success_start_log_ts_ == start_log_ts_;
|
||||
}
|
||||
|
||||
int ObTabletDDLKvMgr::cleanup()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -317,7 +336,7 @@ void ObTabletDDLKvMgr::cleanup_unlock()
|
||||
max_freeze_log_ts_ = 0;
|
||||
table_id_ = 0;
|
||||
execution_id_ = 0;
|
||||
is_commit_success_ = false;
|
||||
success_start_log_ts_ = 0;
|
||||
}
|
||||
|
||||
bool ObTabletDDLKvMgr::is_execution_id_older(const int64_t execution_id)
|
||||
|
@ -51,8 +51,8 @@ public:
|
||||
int get_ddl_kv_min_log_ts(int64_t &min_log_ts); // for calculate rec_log_ts of ls
|
||||
int64_t get_start_log_ts() const { return start_log_ts_; }
|
||||
bool is_started() const { return 0 != start_log_ts_; }
|
||||
int set_commit_success();
|
||||
bool is_commit_success() const { return is_commit_success_; }
|
||||
int set_commit_success(const int64_t start_log_ts);
|
||||
bool is_commit_success() const;
|
||||
common::ObTabletID get_tablet_id() const { return tablet_id_; }
|
||||
int cleanup();
|
||||
int online();
|
||||
@ -63,7 +63,7 @@ public:
|
||||
OB_INLINE int64_t dec_ref() { return ATOMIC_SAF(&ref_cnt_, 1 /* just sub 1 */); }
|
||||
OB_INLINE int64_t get_ref() const { return ATOMIC_LOAD(&ref_cnt_); }
|
||||
OB_INLINE void reset() { destroy(); }
|
||||
TO_STRING_KV(K_(is_inited), K_(is_commit_success), K_(ls_id), K_(tablet_id), K_(table_key),
|
||||
TO_STRING_KV(K_(is_inited), K_(success_start_log_ts), K_(ls_id), K_(tablet_id), K_(table_key),
|
||||
K_(cluster_version), K_(start_log_ts), K_(max_freeze_log_ts),
|
||||
K_(table_id), K_(execution_id), K_(ddl_task_id), K_(head), K_(tail), K_(ref_cnt));
|
||||
|
||||
@ -80,7 +80,7 @@ private:
|
||||
private:
|
||||
static const int64_t MAX_DDL_KV_CNT_IN_STORAGE = 64;
|
||||
bool is_inited_;
|
||||
bool is_commit_success_;
|
||||
int64_t success_start_log_ts_;
|
||||
share::ObLSID ls_id_;
|
||||
common::ObTabletID tablet_id_;
|
||||
ObITable::TableKey table_key_;
|
||||
|
Reference in New Issue
Block a user