Fix unexpected ddl start after ddl commit log
This commit is contained in:
@ -71,16 +71,24 @@ ObDDLStartClogCb::ObDDLStartClogCb()
|
||||
{
|
||||
}
|
||||
|
||||
int ObDDLStartClogCb::init(const uint32_t lock_tid, ObDDLKvMgrHandle &ddl_kv_mgr_handle)
|
||||
int ObDDLStartClogCb::init(const ObITable::TableKey &table_key,
|
||||
const int64_t data_format_version,
|
||||
const int64_t execution_id,
|
||||
const uint32_t lock_tid,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("init twice", K(ret));
|
||||
} else if (OB_UNLIKELY(0 == lock_tid || !ddl_kv_mgr_handle.is_valid())) {
|
||||
} else if (OB_UNLIKELY(!table_key.is_valid() || execution_id < 0 || data_format_version < 0
|
||||
|| 0 == lock_tid || !ddl_kv_mgr_handle.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret));
|
||||
} else {
|
||||
table_key_ = table_key;
|
||||
data_format_version_ = data_format_version;
|
||||
execution_id_ = execution_id;
|
||||
lock_tid_ = lock_tid;
|
||||
ddl_kv_mgr_handle_ = ddl_kv_mgr_handle;
|
||||
is_inited_ = true;
|
||||
@ -91,12 +99,15 @@ int ObDDLStartClogCb::init(const uint32_t lock_tid, ObDDLKvMgrHandle &ddl_kv_mgr
|
||||
int ObDDLStartClogCb::on_success()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const SCN &start_scn = __get_scn();
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else {
|
||||
ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_);
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle_.get_obj()->ddl_start_nolock(table_key_, start_scn, data_format_version_,
|
||||
execution_id_, SCN::min_scn()/*checkpoint_scn*/))) {
|
||||
LOG_WARN("failed to start ddl in cb", K(ret), K(table_key_), K(start_scn), K(execution_id_));
|
||||
}
|
||||
ddl_kv_mgr_handle_.get_obj()->unlock(lock_tid_);
|
||||
status_.set_ret_code(ret);
|
||||
status_.set_state(STATE_SUCCESS);
|
||||
try_release();
|
||||
|
@ -80,7 +80,7 @@ class ObDDLStartClogCb : public logservice::AppendCb
|
||||
public:
|
||||
ObDDLStartClogCb();
|
||||
virtual ~ObDDLStartClogCb() = default;
|
||||
int init(const uint32_t lock_tid, ObDDLKvMgrHandle &ddl_kv_mgr_handle);
|
||||
int init(const ObITable::TableKey &table_key, const int64_t data_format_version, const int64_t execution_id, const uint32_t lock_tid, ObDDLKvMgrHandle &ddl_kv_mgr_handle);
|
||||
virtual int on_success() override;
|
||||
virtual int on_failure() override;
|
||||
inline bool is_success() const { return status_.is_success(); }
|
||||
@ -88,10 +88,13 @@ public:
|
||||
inline bool is_finished() const { return status_.is_finished(); }
|
||||
int get_ret_code() const { return status_.get_ret_code(); }
|
||||
void try_release();
|
||||
TO_STRING_KV(K(is_inited_), K(status_), K_(lock_tid));
|
||||
TO_STRING_KV(K(is_inited_), K(status_), K_(table_key), K_(data_format_version), K_(execution_id), K_(lock_tid));
|
||||
private:
|
||||
bool is_inited_;
|
||||
ObDDLClogCbStatus status_;
|
||||
ObITable::TableKey table_key_;
|
||||
int64_t data_format_version_;
|
||||
int64_t execution_id_;
|
||||
uint32_t lock_tid_;
|
||||
ObDDLKvMgrHandle ddl_kv_mgr_handle_;
|
||||
};
|
||||
|
@ -704,7 +704,7 @@ int ObDDLRedoLogWriter::write_ddl_start_log(ObTabletHandle &tablet_handle,
|
||||
} else if (OB_ISNULL(cb = op_alloc(ObDDLStartClogCb))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc memory", K(ret));
|
||||
} else if (OB_FAIL(cb->init(lock_tid, ddl_kv_mgr_handle))) {
|
||||
} else if (OB_FAIL(cb->init(log.get_table_key(), log.get_data_format_version(), log.get_execution_id(), lock_tid, ddl_kv_mgr_handle))) {
|
||||
LOG_WARN("failed to init cb", K(ret));
|
||||
} else if (OB_FAIL(base_header.serialize(buffer, buffer_size, pos))) {
|
||||
LOG_WARN("failed to serialize log base header", K(ret));
|
||||
@ -747,15 +747,13 @@ int ObDDLRedoLogWriter::write_ddl_start_log(ObTabletHandle &tablet_handle,
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
const int64_t saved_snapshot_version = log.get_table_key().get_snapshot_version();
|
||||
start_scn = scn;
|
||||
if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(*tablet_handle.get_obj(),
|
||||
log.get_table_key(),
|
||||
start_scn,
|
||||
log.get_data_format_version(),
|
||||
log.get_execution_id(),
|
||||
SCN::min_scn()/*checkpoint_scn*/))) {
|
||||
LOG_WARN("start ddl log failed", K(ret), K(start_scn), K(log));
|
||||
// remove ddl sstable if exists and flush ddl start log ts and snapshot version into tablet meta
|
||||
if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->update_tablet(start_scn, saved_snapshot_version, start_scn))) {
|
||||
LOG_WARN("clean up ddl sstable failed", K(ret), K(log));
|
||||
}
|
||||
FLOG_INFO("start ddl kv mgr finished", K(ret), K(start_scn), K(log));
|
||||
}
|
||||
tmp_cb->try_release(); // release the memory no matter succ or not
|
||||
}
|
||||
|
@ -87,22 +87,14 @@ int ObTabletDDLKvMgr::init(const share::ObLSID &ls_id, const common::ObTabletID
|
||||
return ret;
|
||||
}
|
||||
|
||||
// ddl start from log
|
||||
// cleanup ddl sstable
|
||||
// ddl start from checkpoint
|
||||
// keep ddl sstable table
|
||||
|
||||
int ObTabletDDLKvMgr::ddl_start(ObTablet &tablet,
|
||||
const ObITable::TableKey &table_key,
|
||||
const SCN &start_scn,
|
||||
const int64_t data_format_version,
|
||||
const int64_t execution_id,
|
||||
const SCN &checkpoint_scn)
|
||||
int ObTabletDDLKvMgr::ddl_start_nolock(const ObITable::TableKey &table_key,
|
||||
const SCN &start_scn,
|
||||
const int64_t data_format_version,
|
||||
const int64_t execution_id,
|
||||
const SCN &checkpoint_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_brand_new = false;
|
||||
SCN saved_start_scn;
|
||||
int64_t saved_snapshot_version = 0;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret), K(is_inited_));
|
||||
@ -114,7 +106,6 @@ int ObTabletDDLKvMgr::ddl_start(ObTablet &tablet,
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("tablet id not same", K(ret), K(table_key), K(tablet_id_));
|
||||
} else {
|
||||
ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
|
||||
if (start_scn_.is_valid_and_not_min()) {
|
||||
if (execution_id >= execution_id_ && start_scn >= start_scn_) {
|
||||
LOG_INFO("execution id changed, need cleanup", K(ls_id_), K(tablet_id_), K(execution_id_), K(execution_id), K(start_scn_), K(start_scn));
|
||||
@ -137,7 +128,33 @@ int ObTabletDDLKvMgr::ddl_start(ObTablet &tablet,
|
||||
start_scn_ = start_scn;
|
||||
max_freeze_scn_ = SCN::max(start_scn, checkpoint_scn);
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// ddl start from log
|
||||
// cleanup ddl sstable
|
||||
// ddl start from checkpoint
|
||||
// keep ddl sstable table
|
||||
|
||||
int ObTabletDDLKvMgr::ddl_start(ObTablet &tablet,
|
||||
const ObITable::TableKey &table_key,
|
||||
const SCN &start_scn,
|
||||
const int64_t data_format_version,
|
||||
const int64_t execution_id,
|
||||
const SCN &checkpoint_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SCN saved_start_scn;
|
||||
int64_t saved_snapshot_version = 0;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret), K(is_inited_));
|
||||
} else {
|
||||
ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
|
||||
if (OB_FAIL(ddl_start_nolock(table_key, start_scn, data_format_version, execution_id, checkpoint_scn))) {
|
||||
LOG_WARN("failed to ddl start", K(ret));
|
||||
} else {
|
||||
// save variables under lock
|
||||
saved_start_scn = start_scn_;
|
||||
saved_snapshot_version = table_key_.get_snapshot_version();
|
||||
@ -149,7 +166,7 @@ int ObTabletDDLKvMgr::ddl_start(ObTablet &tablet,
|
||||
LOG_WARN("clean up ddl sstable failed", K(ret), K(ls_id_), K(tablet_id_));
|
||||
}
|
||||
}
|
||||
FLOG_INFO("start ddl kv mgr finished", K(ret), K(is_brand_new), K(start_scn), K(execution_id), K(checkpoint_scn), K(*this));
|
||||
FLOG_INFO("start ddl kv mgr finished", K(ret), K(start_scn), K(execution_id), K(checkpoint_scn), K(*this));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -36,6 +36,7 @@ public:
|
||||
ObTabletDDLKvMgr();
|
||||
~ObTabletDDLKvMgr();
|
||||
int init(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id); // init before memtable mgr
|
||||
int ddl_start_nolock(const ObITable::TableKey &table_key, const share::SCN &start_scn, const int64_t data_format_version, const int64_t execution_id, const share::SCN &checkpoint_scn);
|
||||
int ddl_start(ObTablet &tablet, const ObITable::TableKey &table_key, const share::SCN &start_scn, const int64_t data_format_version, const int64_t execution_id, const share::SCN &checkpoint_scn);
|
||||
int ddl_commit(const share::SCN &start_scn, const share::SCN &commit_scn, const uint64_t table_id = 0, const int64_t ddl_task_id = 0); // schedule build a major sstable
|
||||
int schedule_ddl_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn, const bool is_replay); // try wait build major sstable
|
||||
@ -70,6 +71,7 @@ public:
|
||||
int rdlock(const int64_t timeout_us, uint32_t &lock_tid);
|
||||
int wrlock(const int64_t timeout_us, uint32_t &lock_tid);
|
||||
void unlock(const uint32_t lock_tid);
|
||||
int update_tablet(const share::SCN &start_scn, const int64_t snapshot_version, const share::SCN &ddl_checkpoint_scn);
|
||||
OB_INLINE void inc_ref() { ATOMIC_INC(&ref_cnt_); }
|
||||
OB_INLINE int64_t dec_ref() { return ATOMIC_SAF(&ref_cnt_, 1 /* just sub 1 */); }
|
||||
OB_INLINE int64_t get_ref() const { return ATOMIC_LOAD(&ref_cnt_); }
|
||||
@ -90,7 +92,6 @@ private:
|
||||
int get_active_ddl_kv_impl(ObTableHandleV2 &kv_handle);
|
||||
void try_get_ddl_kv_unlock(const share::SCN &scn, ObTableHandleV2 &kv_handle);
|
||||
int get_ddl_kvs_unlock(const bool frozen_only, ObTablesHandleArray &kv_handle_array);
|
||||
int update_tablet(const share::SCN &start_scn, const int64_t snapshot_version, const share::SCN &ddl_checkpoint_scn);
|
||||
int update_ddl_major_sstable();
|
||||
int create_empty_ddl_sstable(ObTableHandleV2 &table_handle);
|
||||
void cleanup_unlock();
|
||||
|
Reference in New Issue
Block a user