change to scn.
This commit is contained in:
parent
8a4d14122f
commit
97a2a94d7c
@ -1047,6 +1047,9 @@ int ObService::check_modify_time_elapsed(
|
||||
}
|
||||
} else if (OB_FAIL(txs->get_max_commit_version(tmp_scn))) {
|
||||
LOG_WARN("fail to get max commit version", K(ret));
|
||||
} else if (OB_UNLIKELY(!tmp_scn.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error, scn is invalid", K(ret), K(tmp_scn));
|
||||
} else {
|
||||
result.snapshot_ = tmp_scn.get_val_for_tx();
|
||||
LOG_INFO("succeed to wait transaction end", K(arg));
|
||||
|
@ -608,7 +608,7 @@ int ObConstraintTask::hold_snapshot(const int64_t snapshot_version)
|
||||
} else if (OB_UNLIKELY(snapshot_version < 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(snapshot_version));
|
||||
} else if (OB_FAIL(snapshot_scn.convert_for_lsn_allocator(snapshot_version))) {
|
||||
} else if (OB_FAIL(snapshot_scn.convert_for_tx(snapshot_version))) {
|
||||
LOG_WARN("failed to convert", K(snapshot_version), K(ret));
|
||||
} else if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, object_id_, tablet_ids))) {
|
||||
LOG_WARN("failed to get tablet snapshots", K(ret));
|
||||
|
@ -323,7 +323,7 @@ int ObDDLRedefinitionTask::hold_snapshot(const int64_t snapshot_version)
|
||||
} else if (OB_UNLIKELY(snapshot_version < 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(snapshot_version));
|
||||
} else if (OB_FAIL(snapshot_scn.convert_for_lsn_allocator(snapshot_version))) {
|
||||
} else if (OB_FAIL(snapshot_scn.convert_for_tx(snapshot_version))) {
|
||||
LOG_WARN("failed to convert", K(snapshot_version), K(ret));
|
||||
} else if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id_, object_id_, tablet_ids))) {
|
||||
LOG_WARN("failed to get data table snapshot", K(ret));
|
||||
|
@ -524,7 +524,7 @@ int ObDDLTask::batch_release_snapshot(
|
||||
if (OB_ISNULL(root_service)) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("error sys, root service must not be nullptr", K(ret));
|
||||
} else if (OB_FAIL(snapshot_scn.convert_for_lsn_allocator(snapshot_version))) {
|
||||
} else if (OB_FAIL(snapshot_scn.convert_for_tx(snapshot_version))) {
|
||||
LOG_WARN("failed to convert scn", K(snapshot_scn), K(ret));
|
||||
} else if (OB_FAIL(trans.start(&root_service->get_ddl_service().get_sql_proxy(), tenant_id_))) {
|
||||
LOG_WARN("fail to start trans", K(ret));
|
||||
@ -1539,7 +1539,7 @@ int ObDDLTaskRecordOperator::fill_task_record(
|
||||
EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "ddl_stmt_str_unhex", ddl_stmt_str);
|
||||
if (OB_SUCC(ret)) {
|
||||
palf::SCN check_snapshot_version;
|
||||
if (OB_FAIL(check_snapshot_version.convert_for_inner_table_field(task_record.snapshot_version_))) {
|
||||
if (OB_FAIL(check_snapshot_version.convert_for_tx(task_record.snapshot_version_))) {
|
||||
LOG_WARN("convert for inner table field failed", K(ret), K(task_record.snapshot_version_));
|
||||
} else if (!check_snapshot_version.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
@ -531,7 +531,7 @@ int ObIndexBuildTask::hold_snapshot(const int64_t snapshot)
|
||||
} else if (snapshot <= 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("snapshot version not valid", K(ret), K(snapshot));
|
||||
} else if (OB_FAIL(snapshot_scn.convert_for_lsn_allocator(snapshot))) {
|
||||
} else if (OB_FAIL(snapshot_scn.convert_for_tx(snapshot))) {
|
||||
LOG_WARN("failed to convert", K(snapshot), K(ret));
|
||||
} else {
|
||||
ObDDLService &ddl_service = root_service_->get_ddl_service();
|
||||
|
@ -133,7 +133,7 @@ int ObDDLMacroBlockClogCb::on_success()
|
||||
} else {
|
||||
macro_block.block_type_ = redo_info_.block_type_;
|
||||
macro_block.logic_id_ = redo_info_.logic_id_;
|
||||
macro_block.log_scn_ = __get_scn();
|
||||
macro_block.scn_ = __get_scn();
|
||||
macro_block.buf_ = redo_info_.data_buffer_.ptr();
|
||||
macro_block.size_ = redo_info_.data_buffer_.length();
|
||||
macro_block.ddl_start_scn_ = redo_info_.start_scn_;
|
||||
|
@ -51,7 +51,7 @@ int ObDDLRedoLogReplayer::init(ObLS *ls)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLRedoLogReplayer::replay_start(const ObDDLStartLog &log, const palf::SCN &log_scn)
|
||||
int ObDDLRedoLogReplayer::replay_start(const ObDDLStartLog &log, const palf::SCN &scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTabletHandle tablet_handle;
|
||||
@ -65,9 +65,9 @@ int ObDDLRedoLogReplayer::replay_start(const ObDDLStartLog &log, const palf::SCN
|
||||
} else if (OB_UNLIKELY(!log.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(log));
|
||||
} else if (OB_FAIL(check_need_replay_ddl_log(table_key, log_scn, log_scn, need_replay, tablet_handle))) {
|
||||
} else if (OB_FAIL(check_need_replay_ddl_log(table_key, scn, scn, need_replay, tablet_handle))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(log_scn));
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(scn));
|
||||
}
|
||||
} else if (!need_replay) {
|
||||
// do nothing
|
||||
@ -76,15 +76,15 @@ int ObDDLRedoLogReplayer::replay_start(const ObDDLStartLog &log, const palf::SCN
|
||||
LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle, true/*try_create*/))) {
|
||||
LOG_WARN("create ddl kv mgr failed", K(ret), K(table_key));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(table_key, log_scn, log.get_cluster_version()))) {
|
||||
LOG_WARN("start ddl log failed", K(ret), K(log), K(log_scn));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_start(table_key, scn, log.get_cluster_version()))) {
|
||||
LOG_WARN("start ddl log failed", K(ret), K(log), K(scn));
|
||||
} else {
|
||||
LOG_INFO("succeed to replay ddl start log", K(ret), K(log));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLRedoLogReplayer::replay_redo(const ObDDLRedoLog &log, const palf::SCN &log_scn)
|
||||
int ObDDLRedoLogReplayer::replay_redo(const ObDDLRedoLog &log, const palf::SCN &scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObDDLMacroBlockRedoInfo &redo_info = log.get_redo_info();
|
||||
@ -102,9 +102,9 @@ int ObDDLRedoLogReplayer::replay_redo(const ObDDLRedoLog &log, const palf::SCN &
|
||||
} else if (OB_UNLIKELY(!log.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(log));
|
||||
} else if (OB_FAIL(check_need_replay_ddl_log(table_key, redo_info.start_scn_, log_scn, need_replay, tablet_handle))) {
|
||||
} else if (OB_FAIL(check_need_replay_ddl_log(table_key, redo_info.start_scn_, scn, need_replay, tablet_handle))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(log_scn));
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(scn));
|
||||
}
|
||||
} else if (!need_replay) {
|
||||
// do nothing
|
||||
@ -135,7 +135,7 @@ int ObDDLRedoLogReplayer::replay_redo(const ObDDLRedoLog &log, const palf::SCN &
|
||||
} else {
|
||||
macro_block.block_type_ = redo_info.block_type_;
|
||||
macro_block.logic_id_ = redo_info.logic_id_;
|
||||
macro_block.log_scn_ = log_scn;
|
||||
macro_block.scn_ = scn;
|
||||
macro_block.buf_ = redo_info.data_buffer_.ptr();
|
||||
macro_block.size_ = redo_info.data_buffer_.length();
|
||||
macro_block.ddl_start_scn_ = redo_info.start_scn_;
|
||||
@ -147,7 +147,7 @@ int ObDDLRedoLogReplayer::replay_redo(const ObDDLRedoLog &log, const palf::SCN &
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLRedoLogReplayer::replay_prepare(const ObDDLPrepareLog &log, const palf::SCN &log_scn)
|
||||
int ObDDLRedoLogReplayer::replay_prepare(const ObDDLPrepareLog &log, const palf::SCN &scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTabletHandle tablet_handle;
|
||||
@ -166,9 +166,9 @@ int ObDDLRedoLogReplayer::replay_prepare(const ObDDLPrepareLog &log, const palf:
|
||||
} else if (OB_UNLIKELY(!log.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(log));
|
||||
} else if (OB_FAIL(check_need_replay_ddl_log(table_key, log.get_start_scn(), log_scn, need_replay, tablet_handle))) {
|
||||
} else if (OB_FAIL(check_need_replay_ddl_log(table_key, log.get_start_scn(), scn, need_replay, tablet_handle))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(log_scn));
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(scn));
|
||||
}
|
||||
} else if (!need_replay) {
|
||||
// do nothing
|
||||
@ -177,15 +177,15 @@ int ObDDLRedoLogReplayer::replay_prepare(const ObDDLPrepareLog &log, const palf:
|
||||
LOG_WARN("need replay but tablet handle is invalid", K(ret), K(need_replay), K(tablet_handle));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
|
||||
LOG_WARN("get ddl kv mgr failed", K(ret));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(log.get_start_scn(), log_scn))) {
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_prepare(log.get_start_scn(), scn))) {
|
||||
LOG_WARN("replay ddl prepare log failed", K(ret), K(log));
|
||||
} else {
|
||||
LOG_INFO("replay ddl prepare log success", K(ret), K(table_key), K(log_scn));
|
||||
LOG_INFO("replay ddl prepare log success", K(ret), K(table_key), K(scn));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLRedoLogReplayer::replay_commit(const ObDDLCommitLog &log, const palf::SCN &log_scn)
|
||||
int ObDDLRedoLogReplayer::replay_commit(const ObDDLCommitLog &log, const palf::SCN &scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTabletHandle tablet_handle;
|
||||
@ -204,9 +204,9 @@ int ObDDLRedoLogReplayer::replay_commit(const ObDDLCommitLog &log, const palf::S
|
||||
} else if (OB_UNLIKELY(!log.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(log));
|
||||
} else if (OB_FAIL(check_need_replay_ddl_log(table_key, log.get_start_scn(), log_scn, need_replay, tablet_handle))) {
|
||||
} else if (OB_FAIL(check_need_replay_ddl_log(table_key, log.get_start_scn(), scn, need_replay, tablet_handle))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(log_scn));
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(scn));
|
||||
}
|
||||
} else if (!need_replay) {
|
||||
// do nothing
|
||||
@ -228,7 +228,7 @@ int ObDDLRedoLogReplayer::replay_commit(const ObDDLCommitLog &log, const palf::S
|
||||
|
||||
int ObDDLRedoLogReplayer::check_need_replay_ddl_log(const ObITable::TableKey &table_key,
|
||||
const palf::SCN &ddl_start_scn,
|
||||
const palf::SCN &log_scn,
|
||||
const palf::SCN &scn,
|
||||
bool &need_replay,
|
||||
ObTabletHandle &tablet_handle)
|
||||
{
|
||||
@ -256,7 +256,7 @@ int ObDDLRedoLogReplayer::check_need_replay_ddl_log(const ObITable::TableKey &ta
|
||||
LOG_INFO("tablet not exist, so ddl log skip replay", "ls_id", ls_->get_ls_id(), "tablet_id", table_key.tablet_id_);
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("fail to get tablet", K(ret), "tablet_id", table_key.tablet_id_, K(log_scn));
|
||||
LOG_WARN("fail to get tablet", K(ret), "tablet_id", table_key.tablet_id_, K(scn));
|
||||
}
|
||||
} else if (OB_ISNULL(tablet = tablet_handle.get_obj())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -267,17 +267,17 @@ int ObDDLRedoLogReplayer::check_need_replay_ddl_log(const ObITable::TableKey &ta
|
||||
LOG_INFO("no need to replay ddl log, because tablet will be deleted",
|
||||
K(table_key), "tablet_meta", tablet->get_tablet_meta());
|
||||
}
|
||||
} else if (log_scn <= tablet->get_tablet_meta().ddl_checkpoint_scn_) {
|
||||
} else if (scn <= tablet->get_tablet_meta().ddl_checkpoint_scn_) {
|
||||
need_replay = false;
|
||||
if (REACH_COUNT_INTERVAL(1000L)) {
|
||||
LOG_INFO("no need to replay ddl log, because the log ts is less than the ddl checkpoint ts",
|
||||
K(table_key), K(log_scn), "ddl_checkpoint_ts", tablet->get_tablet_meta().ddl_checkpoint_scn_);
|
||||
K(table_key), K(scn), "ddl_checkpoint_ts", tablet->get_tablet_meta().ddl_checkpoint_scn_);
|
||||
}
|
||||
} else if (OB_FAIL(tablet->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
LOG_WARN("get ddl kv manager failed", K(ret), K(table_key));
|
||||
} else {
|
||||
need_replay = (ddl_start_scn == log_scn); // only replay start log if ddl kv mgr is null
|
||||
need_replay = (ddl_start_scn == scn); // only replay start log if ddl kv mgr is null
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else if (ddl_start_scn < ddl_kv_mgr_handle.get_obj()->get_start_scn()) {
|
||||
|
@ -31,15 +31,15 @@ public:
|
||||
~ObDDLRedoLogReplayer();
|
||||
int init(ObLS *ls);
|
||||
void reset() { destroy(); }
|
||||
int replay_start(const ObDDLStartLog &log, const palf::SCN &log_scn);
|
||||
int replay_redo(const ObDDLRedoLog &log, const palf::SCN &log_scn);
|
||||
int replay_prepare(const ObDDLPrepareLog &log, const palf::SCN &log_scn);
|
||||
int replay_commit(const ObDDLCommitLog &log, const palf::SCN &log_scn);
|
||||
int replay_start(const ObDDLStartLog &log, const palf::SCN &scn);
|
||||
int replay_redo(const ObDDLRedoLog &log, const palf::SCN &scn);
|
||||
int replay_prepare(const ObDDLPrepareLog &log, const palf::SCN &scn);
|
||||
int replay_commit(const ObDDLCommitLog &log, const palf::SCN &scn);
|
||||
private:
|
||||
void destroy();
|
||||
int check_need_replay_ddl_log(const ObITable::TableKey &table_key,
|
||||
const palf::SCN &ddl_start_scn,
|
||||
const palf::SCN &log_scn,
|
||||
const palf::SCN &scn,
|
||||
bool &need_replay,
|
||||
ObTabletHandle &tablet_handle);
|
||||
private:
|
||||
|
@ -556,8 +556,8 @@ int ObDDLRedoLogWriter::write(
|
||||
|
||||
palf::LSN lsn;
|
||||
const bool need_nonblock= false;
|
||||
palf::SCN base_log_scn = palf::SCN::min_scn();
|
||||
palf::SCN log_scn;
|
||||
palf::SCN base_scn = palf::SCN::min_scn();
|
||||
palf::SCN scn;
|
||||
if (!log.is_valid() || nullptr == log_handler || !ls_id.is_valid() || OB_INVALID_TENANT_ID == tenant_id || nullptr == buffer) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(log), K(ls_id), K(tenant_id), KP(buffer));
|
||||
@ -574,17 +574,17 @@ int ObDDLRedoLogWriter::write(
|
||||
LOG_WARN("fail to seriaize ddl redo log", K(ret));
|
||||
} else if (OB_FAIL(log_handler->append(buffer,
|
||||
buffer_size,
|
||||
base_log_scn,
|
||||
base_scn,
|
||||
need_nonblock,
|
||||
cb,
|
||||
lsn,
|
||||
log_scn))) {
|
||||
scn))) {
|
||||
LOG_WARN("fail to submit ddl redo log", K(ret), K(buffer), K(buffer_size));
|
||||
} else {
|
||||
handle.cb_ = cb;
|
||||
cb = nullptr;
|
||||
handle.log_scn_ = log_scn;
|
||||
LOG_INFO("submit ddl redo log succeed", K(lsn), K(base_log_scn), K(log_scn));
|
||||
handle.scn_ = scn;
|
||||
LOG_INFO("submit ddl redo log succeed", K(lsn), K(base_scn), K(scn));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
int64_t real_sleep_us = 0;
|
||||
@ -618,7 +618,7 @@ int ObDDLRedoLogWriter::write_ddl_start_log(const ObDDLStartLog &log, ObLogHandl
|
||||
|
||||
palf::LSN lsn;
|
||||
const bool need_nonblock= false;
|
||||
palf::SCN log_scn;
|
||||
palf::SCN scn;
|
||||
bool is_external_consistent = false;
|
||||
if (OB_ISNULL(cb = op_alloc(ObDDLClogCb))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
@ -635,7 +635,7 @@ int ObDDLRedoLogWriter::write_ddl_start_log(const ObDDLStartLog &log, ObLogHandl
|
||||
need_nonblock,
|
||||
cb,
|
||||
lsn,
|
||||
log_scn))) {
|
||||
scn))) {
|
||||
LOG_WARN("fail to submit ddl start log", K(ret), K(buffer_size));
|
||||
} else {
|
||||
ObDDLClogCb *tmp_cb = cb;
|
||||
@ -659,7 +659,7 @@ int ObDDLRedoLogWriter::write_ddl_start_log(const ObDDLStartLog &log, ObLogHandl
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
start_scn = log_scn;
|
||||
start_scn = scn;
|
||||
}
|
||||
tmp_cb->try_release(); // release the memory no matter succ or not
|
||||
}
|
||||
@ -691,7 +691,7 @@ int ObDDLRedoLogWriter::write_ddl_finish_log(const T &log, const ObDDLClogType c
|
||||
palf::LSN lsn;
|
||||
const bool need_nonblock= false;
|
||||
palf::SCN base_scn = palf::SCN::base_scn();
|
||||
palf::SCN log_scn;
|
||||
palf::SCN scn;
|
||||
bool is_external_consistent = false;
|
||||
if (OB_ISNULL(buffer = static_cast<char *>(ob_malloc(buffer_size)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
@ -713,14 +713,14 @@ int ObDDLRedoLogWriter::write_ddl_finish_log(const T &log, const ObDDLClogType c
|
||||
need_nonblock,
|
||||
cb,
|
||||
lsn,
|
||||
log_scn))) {
|
||||
scn))) {
|
||||
LOG_WARN("fail to submit ddl commit log", K(ret), K(buffer), K(buffer_size));
|
||||
} else {
|
||||
ObDDLClogCb *tmp_cb = cb;
|
||||
cb = nullptr;
|
||||
bool need_retry = true;
|
||||
while (need_retry) {
|
||||
if (OB_FAIL(OB_TS_MGR.wait_gts_elapse(MTL_ID(), log_scn))) {
|
||||
if (OB_FAIL(OB_TS_MGR.wait_gts_elapse(MTL_ID(), scn))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("fail to wait gts elapse", K(ret), K(log));
|
||||
} else {
|
||||
@ -732,8 +732,8 @@ int ObDDLRedoLogWriter::write_ddl_finish_log(const T &log, const ObDDLClogType c
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
handle.cb_ = tmp_cb;
|
||||
handle.commit_scn_ = log_scn;
|
||||
LOG_INFO("submit ddl commit log succeed", K(lsn), K(base_scn), K(log_scn));
|
||||
handle.commit_scn_ = scn;
|
||||
LOG_INFO("submit ddl commit log succeed", K(lsn), K(base_scn), K(scn));
|
||||
} else {
|
||||
tmp_cb->try_release(); // release the memory
|
||||
}
|
||||
@ -752,7 +752,7 @@ int ObDDLRedoLogWriter::write_ddl_finish_log(const T &log, const ObDDLClogType c
|
||||
}
|
||||
|
||||
ObDDLRedoLogHandle::ObDDLRedoLogHandle()
|
||||
: cb_(nullptr), log_scn_()
|
||||
: cb_(nullptr), scn_()
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -163,10 +163,10 @@ public:
|
||||
~ObDDLRedoLogHandle();
|
||||
int wait(const int64_t timeout = DDL_REDO_LOG_TIMEOUT);
|
||||
void reset();
|
||||
bool is_valid() const { return nullptr != cb_ && log_scn_.is_valid(); }
|
||||
bool is_valid() const { return nullptr != cb_ && scn_.is_valid(); }
|
||||
public:
|
||||
ObDDLMacroBlockClogCb *cb_;
|
||||
palf::SCN log_scn_;
|
||||
palf::SCN scn_;
|
||||
};
|
||||
|
||||
class ObDDLCommitLogHandle final
|
||||
|
@ -87,7 +87,7 @@ int ObDDLMacroHandle::reset_macro_block_ref()
|
||||
}
|
||||
|
||||
ObDDLMacroBlock::ObDDLMacroBlock()
|
||||
: block_handle_(), logic_id_(), block_type_(DDL_MB_INVALID_TYPE), ddl_start_scn_(), log_scn_(), buf_(nullptr), size_(0)
|
||||
: block_handle_(), logic_id_(), block_type_(DDL_MB_INVALID_TYPE), ddl_start_scn_(), scn_(), buf_(nullptr), size_(0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -113,7 +113,7 @@ int ObDDLMacroBlock::deep_copy(ObDDLMacroBlock &dst_block, common::ObIAllocator
|
||||
dst_block.block_handle_ = block_handle_;
|
||||
dst_block.logic_id_ = logic_id_;
|
||||
dst_block.ddl_start_scn_ = ddl_start_scn_;
|
||||
dst_block.log_scn_ = log_scn_;
|
||||
dst_block.scn_ = scn_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -124,7 +124,7 @@ bool ObDDLMacroBlock::is_valid() const
|
||||
&& logic_id_.is_valid()
|
||||
&& DDL_MB_INVALID_TYPE != block_type_
|
||||
&& ddl_start_scn_.is_valid()
|
||||
&& log_scn_.is_valid()
|
||||
&& scn_.is_valid()
|
||||
&& nullptr != buf_
|
||||
&& size_ > 0;
|
||||
}
|
||||
@ -258,7 +258,7 @@ int ObDDLKV::set_macro_block(const ObDDLMacroBlock ¯o_block)
|
||||
LOG_INFO("ddl start log ts too small, maybe from old build task, ignore", K(ret),
|
||||
K(ls_id_), K(tablet_id_), K(ddl_start_scn_), K(macro_block));
|
||||
}
|
||||
} else if (macro_block.log_scn_ > freeze_scn_) {
|
||||
} else if (macro_block.scn_ > freeze_scn_) {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_INFO("this ddl kv is freezed, retry other ddl kv", K(ret), K(ls_id_), K(tablet_id_), K(macro_block), K(freeze_scn_));
|
||||
} else if (OB_FAIL(index_block_rebuilder_->append_macro_row(
|
||||
@ -267,8 +267,8 @@ int ObDDLKV::set_macro_block(const ObDDLMacroBlock ¯o_block)
|
||||
} else if (OB_FAIL(ddl_blocks_.push_back(macro_block.block_handle_))) {
|
||||
LOG_WARN("push back block handle failed", K(ret), K(macro_block.block_handle_));
|
||||
} else {
|
||||
min_scn_ = palf::SCN::min(min_scn_, macro_block.log_scn_);
|
||||
max_scn_ = palf::SCN::max(max_scn_, macro_block.log_scn_);
|
||||
min_scn_ = palf::SCN::min(min_scn_, macro_block.scn_);
|
||||
max_scn_ = palf::SCN::max(max_scn_, macro_block.scn_);
|
||||
LOG_INFO("succeed to set macro block into ddl kv", K(macro_block));
|
||||
}
|
||||
}
|
||||
@ -376,7 +376,7 @@ int ObDDLKV::wait_pending()
|
||||
if (OB_FAIL(ls_handle.get_ls()->get_max_decided_scn(max_decided_scn))) {
|
||||
LOG_WARN("get max decided log ts failed", K(ret), K(ls_id_));
|
||||
} else {
|
||||
// max_decided_scn is the left border log_scn - 1
|
||||
// max_decided_scn is the left border scn - 1
|
||||
wait_ls_ts = palf::SCN::plus(max_decided_scn, 1) < freeze_scn_;
|
||||
}
|
||||
}
|
||||
@ -503,18 +503,18 @@ int ObDDLKVsHandle::get_ddl_kv(const int64_t idx, ObDDLKV *&kv)
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObDDLKVPendingGuard::ObDDLKVPendingGuard(ObTablet *tablet, const palf::SCN &log_scn)
|
||||
ObDDLKVPendingGuard::ObDDLKVPendingGuard(ObTablet *tablet, const palf::SCN &scn)
|
||||
: tablet_(tablet), kv_handle_(), ret_(OB_SUCCESS)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObDDLKV *curr_kv = nullptr;
|
||||
ObDDLKvMgrHandle ddl_kv_mgr_handle;
|
||||
if (OB_UNLIKELY(nullptr == tablet || !log_scn.is_valid())) {
|
||||
if (OB_UNLIKELY(nullptr == tablet || !scn.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), KP(tablet), K(log_scn));
|
||||
LOG_WARN("invalid arguments", K(ret), KP(tablet), K(scn));
|
||||
} else if (OB_FAIL(tablet->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
|
||||
LOG_WARN("get ddl kv mgr failed", K(ret));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_or_create_ddl_kv(log_scn, kv_handle_))) {
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_or_create_ddl_kv(scn, kv_handle_))) {
|
||||
LOG_WARN("acquire ddl kv failed", K(ret));
|
||||
} else if (OB_FAIL(kv_handle_.get_ddl_kv(curr_kv))) {
|
||||
LOG_WARN("fail to get ddl kv", K(ret));
|
||||
@ -522,7 +522,7 @@ ObDDLKVPendingGuard::ObDDLKVPendingGuard(ObTablet *tablet, const palf::SCN &log_
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, active ddl kv must not be nullptr", K(ret));
|
||||
} else {
|
||||
log_scn_ = log_scn;
|
||||
scn_ = scn;
|
||||
curr_kv->inc_pending_cnt();
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -568,7 +568,7 @@ int ObDDLKVPendingGuard::set_macro_block(ObTablet *tablet, const ObDDLMacroBlock
|
||||
int64_t try_count = 0;
|
||||
while ((OB_SUCCESS == ret || OB_EAGAIN == ret) && try_count < MAX_RETRY_COUNT) {
|
||||
ObDDLKV *ddl_kv = nullptr;
|
||||
ObDDLKVPendingGuard guard(tablet, macro_block.log_scn_);
|
||||
ObDDLKVPendingGuard guard(tablet, macro_block.scn_);
|
||||
if (OB_FAIL(guard.get_ddl_kv(ddl_kv))) {
|
||||
LOG_WARN("get ddl kv failed", K(ret));
|
||||
} else if (OB_FAIL(ddl_kv->set_macro_block(macro_block))) {
|
||||
|
@ -57,13 +57,13 @@ public:
|
||||
const blocksstable::MacroBlockId &get_block_id() const { return block_handle_.get_block_id(); }
|
||||
int deep_copy(ObDDLMacroBlock &dst_block, common::ObIAllocator &allocator) const;
|
||||
bool is_valid() const;
|
||||
TO_STRING_KV(K_(block_handle), K_(logic_id), K_(block_type), K_(ddl_start_scn), K_(log_scn), KP_(buf), K_(size));
|
||||
TO_STRING_KV(K_(block_handle), K_(logic_id), K_(block_type), K_(ddl_start_scn), K_(scn), KP_(buf), K_(size));
|
||||
public:
|
||||
ObDDLMacroHandle block_handle_;
|
||||
blocksstable::ObLogicMacroBlockId logic_id_;
|
||||
blocksstable::ObDDLMacroBlockType block_type_;
|
||||
palf::SCN ddl_start_scn_;
|
||||
palf::SCN log_scn_;
|
||||
palf::SCN scn_;
|
||||
const char *buf_;
|
||||
int64_t size_;
|
||||
};
|
||||
@ -161,13 +161,13 @@ class ObDDLKVPendingGuard final
|
||||
public:
|
||||
static int set_macro_block(ObTablet *tablet, const ObDDLMacroBlock ¯o_block);
|
||||
public:
|
||||
ObDDLKVPendingGuard(ObTablet *tablet, const palf::SCN &log_scn);
|
||||
ObDDLKVPendingGuard(ObTablet *tablet, const palf::SCN &scn);
|
||||
~ObDDLKVPendingGuard();
|
||||
int get_ret() const { return ret_; }
|
||||
int get_ddl_kv(ObDDLKV *&kv);
|
||||
private:
|
||||
ObTablet *tablet_;
|
||||
palf::SCN log_scn_;
|
||||
palf::SCN scn_;
|
||||
ObDDLKVHandle kv_handle_;
|
||||
int ret_;
|
||||
};
|
||||
|
@ -12,13 +12,14 @@
|
||||
|
||||
#define USING_LOG_PREFIX STORAGE
|
||||
#include "ob_tablet_barrier_log.h"
|
||||
#include "logservice/palf/scn.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
ObTabletBarrierLogState::ObTabletBarrierLogState()
|
||||
: state_(TABLET_BARRIER_LOG_INIT), log_ts_(0), schema_version_(0)
|
||||
: state_(TABLET_BARRIER_LOG_INIT), scn_(), schema_version_(0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -44,17 +45,17 @@ ObTabletBarrierLogStateEnum ObTabletBarrierLogState::to_persistent_state() const
|
||||
void ObTabletBarrierLogState::reset()
|
||||
{
|
||||
state_ = TABLET_BARRIER_LOG_INIT;
|
||||
log_ts_ = 0;
|
||||
scn_.reset();
|
||||
schema_version_ = 0;
|
||||
}
|
||||
|
||||
void ObTabletBarrierLogState::set_log_info(
|
||||
const ObTabletBarrierLogStateEnum state,
|
||||
const int64_t log_ts,
|
||||
const palf::SCN &scn,
|
||||
const int64_t schema_version)
|
||||
{
|
||||
state_ = state;
|
||||
log_ts_ = log_ts;
|
||||
scn_ = scn;
|
||||
schema_version_ = schema_version;
|
||||
}
|
||||
|
||||
@ -64,8 +65,8 @@ int ObTabletBarrierLogState::serialize(char *buf, const int64_t buf_len, int64_t
|
||||
const ObTabletBarrierLogStateEnum persistent_state = to_persistent_state();
|
||||
if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, persistent_state))) {
|
||||
LOG_WARN("fail to encode state", K(ret));
|
||||
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, log_ts_))) {
|
||||
LOG_WARN("encode log ts failed", K(ret));
|
||||
} else if (OB_FAIL(scn_.fixed_serialize(buf, buf_len, pos))) {
|
||||
LOG_WARN("serialize scn failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -76,8 +77,8 @@ int ObTabletBarrierLogState::deserialize(const char *buf, const int64_t data_len
|
||||
int64_t tmp_state = 0;
|
||||
if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &tmp_state))) {
|
||||
LOG_WARN("fail to decode state", K(ret));
|
||||
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &log_ts_))) {
|
||||
LOG_WARN("decode log ts failed", K(ret));
|
||||
} else if (OB_FAIL(scn_.fixed_deserialize(buf, data_len, pos))) {
|
||||
LOG_WARN("deserialize scn failed", K(ret));
|
||||
} else {
|
||||
state_ = static_cast<ObTabletBarrierLogStateEnum>(tmp_state);
|
||||
}
|
||||
@ -88,7 +89,7 @@ int64_t ObTabletBarrierLogState::get_serialize_size() const
|
||||
{
|
||||
int64_t len = 0;
|
||||
len += serialization::encoded_length_i64(to_persistent_state());
|
||||
len += serialization::encoded_length_i64(log_ts_);
|
||||
len += scn_.get_fixed_serialize_size();
|
||||
return len;
|
||||
}
|
||||
}
|
||||
|
@ -14,6 +14,7 @@
|
||||
#define SRC_STORAGE_OB_TABLET_BARRIER_LOG_H_
|
||||
|
||||
#include "lib/utility/ob_print_utils.h"
|
||||
#include "logservice/palf/scn.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -34,13 +35,13 @@ public:
|
||||
~ObTabletBarrierLogState() = default;
|
||||
|
||||
ObTabletBarrierLogStateEnum &get_state() { return state_; }
|
||||
int64_t get_log_ts() const { return log_ts_; }
|
||||
palf::SCN get_scn() const { return scn_; }
|
||||
int64_t get_schema_version() const { return schema_version_; }
|
||||
|
||||
void reset();
|
||||
void set_log_info(
|
||||
const ObTabletBarrierLogStateEnum state,
|
||||
const int64_t log_ts,
|
||||
const palf::SCN &scn,
|
||||
const int64_t schema_version);
|
||||
NEED_SERIALIZE_AND_DESERIALIZE;
|
||||
TO_STRING_KV(K_(state));
|
||||
@ -48,7 +49,7 @@ private:
|
||||
ObTabletBarrierLogStateEnum to_persistent_state() const;
|
||||
private:
|
||||
ObTabletBarrierLogStateEnum state_;
|
||||
int64_t log_ts_;
|
||||
palf::SCN scn_;
|
||||
int64_t schema_version_;
|
||||
};
|
||||
}
|
||||
|
@ -430,7 +430,7 @@ int ObTabletDDLKvMgr::get_active_ddl_kv_impl(ObDDLKVHandle &kv_handle)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLKvMgr::get_or_create_ddl_kv(const palf::SCN &log_scn, ObDDLKVHandle &kv_handle)
|
||||
int ObTabletDDLKvMgr::get_or_create_ddl_kv(const palf::SCN &scn, ObDDLKVHandle &kv_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
kv_handle.reset();
|
||||
@ -438,12 +438,12 @@ int ObTabletDDLKvMgr::get_or_create_ddl_kv(const palf::SCN &log_scn, ObDDLKVHand
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret));
|
||||
} else if (!log_scn.is_valid()) {
|
||||
} else if (!scn.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(log_scn));
|
||||
LOG_WARN("invalid argument", K(ret), K(scn));
|
||||
} else {
|
||||
TCRLockGuard guard(lock_);
|
||||
try_get_ddl_kv_unlock(log_scn, kv);
|
||||
try_get_ddl_kv_unlock(scn, kv);
|
||||
if (nullptr != kv) {
|
||||
// increase or decrease the reference count must be under the lock
|
||||
if (OB_FAIL(kv_handle.set_ddl_kv(kv))) {
|
||||
@ -453,7 +453,7 @@ int ObTabletDDLKvMgr::get_or_create_ddl_kv(const palf::SCN &log_scn, ObDDLKVHand
|
||||
}
|
||||
if (OB_SUCC(ret) && nullptr == kv) {
|
||||
TCWLockGuard guard(lock_);
|
||||
try_get_ddl_kv_unlock(log_scn, kv);
|
||||
try_get_ddl_kv_unlock(scn, kv);
|
||||
if (nullptr != kv) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(alloc_ddl_kv(kv))) {
|
||||
@ -469,7 +469,7 @@ int ObTabletDDLKvMgr::get_or_create_ddl_kv(const palf::SCN &log_scn, ObDDLKVHand
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTabletDDLKvMgr::try_get_ddl_kv_unlock(const palf::SCN &log_scn, ObDDLKV *&kv)
|
||||
void ObTabletDDLKvMgr::try_get_ddl_kv_unlock(const palf::SCN &scn, ObDDLKV *&kv)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (get_count() > 0) {
|
||||
@ -478,7 +478,7 @@ void ObTabletDDLKvMgr::try_get_ddl_kv_unlock(const palf::SCN &log_scn, ObDDLKV *
|
||||
if (OB_ISNULL(tmp_kv)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ddl kv is null", K(ret), K(ls_id_), K(tablet_id_), KP(tmp_kv), K(i), K(head_), K(tail_));
|
||||
} else if (log_scn <= tmp_kv->get_freeze_scn()) {
|
||||
} else if (scn <= tmp_kv->get_freeze_scn()) {
|
||||
kv = tmp_kv;
|
||||
break;
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ public:
|
||||
int ddl_commit(const palf::SCN &start_scn, const palf::SCN &prepare_scn, const bool is_replay); // try wait build major sstable
|
||||
int wait_ddl_commit(const palf::SCN &start_scn, const palf::SCN &prepare_scn);
|
||||
int get_ddl_param(ObTabletDDLParam &ddl_param);
|
||||
int get_or_create_ddl_kv(const palf::SCN &log_scn, ObDDLKVHandle &kv_handle); // used in active ddl kv guard
|
||||
int get_or_create_ddl_kv(const palf::SCN &scn, ObDDLKVHandle &kv_handle); // used in active ddl kv guard
|
||||
int get_freezed_ddl_kv(const palf::SCN &freeze_scn, ObDDLKVHandle &kv_handle); // locate ddl kv with exeact freeze log ts
|
||||
int get_ddl_kvs(const bool frozen_only, ObDDLKVsHandle &ddl_kvs_handle); // get all freeze ddl kvs
|
||||
int freeze_ddl_kv(const palf::SCN &freeze_scn = palf::SCN::invalid_scn()); // freeze the active ddl kv, when memtable freeze or ddl commit
|
||||
@ -68,7 +68,7 @@ private:
|
||||
int alloc_ddl_kv(ObDDLKV *&kv);
|
||||
void free_ddl_kv(const int64_t idx);
|
||||
int get_active_ddl_kv_impl(ObDDLKVHandle &kv_handle);
|
||||
void try_get_ddl_kv_unlock(const palf::SCN &log_scn, ObDDLKV *&kv);
|
||||
void try_get_ddl_kv_unlock(const palf::SCN &scn, ObDDLKV *&kv);
|
||||
int update_tablet(const palf::SCN &start_scn, const int64_t snapshot_version, const palf::SCN &ddl_checkpoint_scn);
|
||||
void destroy();
|
||||
private:
|
||||
|
@ -46,7 +46,7 @@ OB_SERIALIZE_MEMBER(ObPGReportStatus,
|
||||
snapshot_version_);
|
||||
|
||||
ObPartitionBarrierLogState::ObPartitionBarrierLogState()
|
||||
: state_(BARRIER_LOG_INIT), log_id_(0), log_ts_(0), schema_version_(0)
|
||||
: state_(BARRIER_LOG_INIT), log_id_(0), scn_(), schema_version_(0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -69,11 +69,11 @@ ObPartitionBarrierLogStateEnum ObPartitionBarrierLogState::to_persistent_state()
|
||||
return persistent_state;
|
||||
}
|
||||
|
||||
void ObPartitionBarrierLogState::set_log_info(const ObPartitionBarrierLogStateEnum state, const int64_t log_id, const int64_t log_ts, const int64_t schema_version)
|
||||
void ObPartitionBarrierLogState::set_log_info(const ObPartitionBarrierLogStateEnum state, const int64_t log_id, const palf::SCN &scn, const int64_t schema_version)
|
||||
{
|
||||
state_ = state;
|
||||
log_id_ = log_id;
|
||||
log_ts_ = log_ts;
|
||||
scn_ = scn;
|
||||
schema_version_ = schema_version;
|
||||
}
|
||||
|
||||
@ -85,8 +85,8 @@ int ObPartitionBarrierLogState::serialize(char *buf, const int64_t buf_len, int6
|
||||
LOG_WARN("fail to encode state", K(ret));
|
||||
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, log_id_))) {
|
||||
LOG_WARN("encode log id failed", K(ret));
|
||||
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, log_ts_))) {
|
||||
LOG_WARN("encode log ts failed", K(ret));
|
||||
} else if (OB_FAIL(scn_.fixed_serialize(buf, buf_len, pos))) {
|
||||
LOG_WARN("fix serialized failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -99,8 +99,8 @@ int ObPartitionBarrierLogState::deserialize(const char *buf, const int64_t data_
|
||||
LOG_WARN("fail to decode state", K(ret));
|
||||
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &log_id_))) {
|
||||
LOG_WARN("decode log id failed", K(ret));
|
||||
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &log_ts_))) {
|
||||
LOG_WARN("decode log ts failed", K(ret));
|
||||
} else if (OB_FAIL(scn_.fixed_deserialize(buf, data_len, pos))) {
|
||||
LOG_WARN("fixed deserialize failed", K(ret));
|
||||
} else {
|
||||
state_ = static_cast<ObPartitionBarrierLogStateEnum>(tmp_state);
|
||||
}
|
||||
@ -112,7 +112,7 @@ int64_t ObPartitionBarrierLogState::get_serialize_size() const
|
||||
int64_t len = 0;
|
||||
len += serialization::encoded_length_i64(to_persistent_state());
|
||||
len += serialization::encoded_length_i64(log_id_);
|
||||
len += serialization::encoded_length_i64(log_ts_);
|
||||
len += scn_.get_fixed_serialize_size();
|
||||
return len;
|
||||
}
|
||||
|
||||
|
@ -223,9 +223,9 @@ public:
|
||||
~ObPartitionBarrierLogState() = default;
|
||||
ObPartitionBarrierLogStateEnum &get_state() { return state_; }
|
||||
int64_t get_log_id() { return log_id_; }
|
||||
int64_t get_log_ts() { return log_ts_; }
|
||||
palf::SCN get_scn() { return scn_; }
|
||||
int64_t get_schema_version() { return schema_version_; }
|
||||
void set_log_info(const ObPartitionBarrierLogStateEnum state, const int64_t log_id, const int64_t log_ts, const int64_t schema_version);
|
||||
void set_log_info(const ObPartitionBarrierLogStateEnum state, const int64_t log_id, const palf::SCN &scn, const int64_t schema_version);
|
||||
NEED_SERIALIZE_AND_DESERIALIZE;
|
||||
TO_STRING_KV(K_(state));
|
||||
private:
|
||||
@ -233,7 +233,7 @@ private:
|
||||
private:
|
||||
ObPartitionBarrierLogStateEnum state_;
|
||||
int64_t log_id_;
|
||||
int64_t log_ts_;
|
||||
palf::SCN scn_;
|
||||
int64_t schema_version_;
|
||||
};
|
||||
|
||||
|
@ -2314,6 +2314,9 @@ int ObTablet::check_schema_version_elapsed(
|
||||
transaction::ObTransService *txs = MTL(transaction::ObTransService*);
|
||||
if (OB_FAIL(txs->get_max_commit_version(max_commit_scn))) {
|
||||
LOG_WARN("fail to get max commit version", K(ret));
|
||||
} else if (OB_UNLIKELY(!max_commit_scn.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error, scn is invalid", K(ret), K(max_commit_scn));
|
||||
} else {
|
||||
max_commit_version = max_commit_scn.get_val_for_tx();
|
||||
}
|
||||
@ -2551,7 +2554,7 @@ int ObTablet::update_ddl_info(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTabletPointer *tablet_ptr = static_cast<ObTabletPointer*>(pointer_hdl_.get_resource_ptr());
|
||||
if (OB_FAIL(tablet_ptr->ddl_info_.update(schema_version, scn.get_val_for_inner_table_field(), schema_refreshed_ts))) {
|
||||
if (OB_FAIL(tablet_ptr->ddl_info_.update(schema_version, scn, schema_refreshed_ts))) {
|
||||
LOG_WARN("fail to update ddl info", K(ret), K(schema_version), K(scn));
|
||||
}
|
||||
return ret;
|
||||
|
@ -638,7 +638,7 @@ int ObTabletBindingHelper::modify_tablet_binding_for_unbind(
|
||||
info.hidden_tablet_ids_.reset();
|
||||
if (arg.is_redefined()) {
|
||||
info.redefined_ = true;
|
||||
info.snapshot_version_ = commit_version.get_val_for_lsn_allocator();
|
||||
info.snapshot_version_ = commit_version.get_val_for_tx();
|
||||
}
|
||||
if (OB_FAIL(tablet->set_multi_data_for_commit(info, scn, for_replay, MemtableRefOp::NONE))) {
|
||||
LOG_WARN("failed to save tablet binding info", K(ret));
|
||||
@ -664,7 +664,7 @@ int ObTabletBindingHelper::modify_tablet_binding_for_unbind(
|
||||
LOG_WARN("failed to get ddl data", K(ret));
|
||||
} else {
|
||||
info.redefined_ = false;
|
||||
info.snapshot_version_ = commit_version.get_val_for_lsn_allocator();
|
||||
info.snapshot_version_ = commit_version.get_val_for_tx();
|
||||
info.schema_version_ = arg.schema_version_;
|
||||
if (OB_FAIL(tablet->set_multi_data_for_commit(info, scn, for_replay, MemtableRefOp::NONE))) {
|
||||
LOG_WARN("failed to save tablet binding info", K(ret));
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "lib/ob_define.h"
|
||||
#include "lib/utility/ob_print_utils.h"
|
||||
#include "lib/utility/utility.h"
|
||||
#include "logservice/palf/scn.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -24,7 +25,7 @@ namespace storage
|
||||
ObTabletDDLInfo::ObTabletDDLInfo()
|
||||
: ddl_schema_version_(0),
|
||||
ddl_schema_refreshed_ts_(OB_INVALID_TIMESTAMP),
|
||||
schema_version_change_log_ts_(OB_INVALID_TIMESTAMP),
|
||||
schema_version_change_scn_(),
|
||||
rwlock_()
|
||||
{
|
||||
}
|
||||
@ -33,7 +34,7 @@ ObTabletDDLInfo &ObTabletDDLInfo::operator=(const ObTabletDDLInfo &other)
|
||||
{
|
||||
ddl_schema_version_ = other.ddl_schema_version_;
|
||||
ddl_schema_refreshed_ts_ = other.ddl_schema_refreshed_ts_;
|
||||
schema_version_change_log_ts_ = other.schema_version_change_log_ts_;
|
||||
schema_version_change_scn_ = other.schema_version_change_scn_;
|
||||
return *this;
|
||||
}
|
||||
|
||||
@ -41,7 +42,7 @@ void ObTabletDDLInfo::reset()
|
||||
{
|
||||
ddl_schema_version_ = 0;
|
||||
ddl_schema_refreshed_ts_ = OB_INVALID_TIMESTAMP;
|
||||
schema_version_change_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
schema_version_change_scn_.reset();
|
||||
}
|
||||
|
||||
int ObTabletDDLInfo::get(int64_t &schema_version, int64_t &schema_refreshed_ts)
|
||||
@ -53,17 +54,17 @@ int ObTabletDDLInfo::get(int64_t &schema_version, int64_t &schema_refreshed_ts)
|
||||
return ret;
|
||||
}
|
||||
int ObTabletDDLInfo::update(const int64_t schema_version,
|
||||
const int64_t log_ts,
|
||||
const palf::SCN &scn,
|
||||
int64_t &schema_refreshed_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
TCWLockGuard guard(rwlock_);
|
||||
if (schema_version <= 0 || log_ts <= 0) {
|
||||
if (schema_version <= 0 || !scn.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(schema_version), K(log_ts));
|
||||
LOG_WARN("invalid arguments", K(ret), K(schema_version), K(scn));
|
||||
} else if (ddl_schema_version_ < schema_version) {
|
||||
ddl_schema_refreshed_ts_ = common::max(ObTimeUtility::current_time(), ddl_schema_refreshed_ts_);
|
||||
schema_version_change_log_ts_ = log_ts;
|
||||
schema_version_change_scn_ = scn;
|
||||
ddl_schema_version_ = schema_version;
|
||||
} else {
|
||||
// do nothing
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include <stdint.h>
|
||||
#include "lib/lock/ob_tc_rwlock.h"
|
||||
#include "lib/utility/ob_print_utils.h"
|
||||
#include "logservice/palf/scn.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -31,13 +32,13 @@ public:
|
||||
void reset();
|
||||
int get(int64_t &schema_version, int64_t &schema_refreshed_ts);
|
||||
int update(const int64_t schema_version,
|
||||
const int64_t log_ts,
|
||||
const palf::SCN &scn,
|
||||
int64_t &schema_refreshed_ts);
|
||||
TO_STRING_KV(K_(ddl_schema_version), K_(ddl_schema_refreshed_ts), K_(schema_version_change_log_ts));
|
||||
TO_STRING_KV(K_(ddl_schema_version), K_(ddl_schema_refreshed_ts), K_(schema_version_change_scn));
|
||||
private:
|
||||
int64_t ddl_schema_version_;
|
||||
int64_t ddl_schema_refreshed_ts_;
|
||||
int64_t schema_version_change_log_ts_;
|
||||
palf::SCN schema_version_change_scn_;
|
||||
common::TCRWLock rwlock_;
|
||||
};
|
||||
} // namespace storage
|
||||
|
@ -222,12 +222,11 @@ int ObTabletMeta::init(
|
||||
LOG_WARN("failed to assign ddl data", K(ret));
|
||||
} else if (OB_FAIL(autoinc_seq_.assign(autoinc_seq))) {
|
||||
LOG_WARN("failed to assign autoinc seq", K(ret));
|
||||
} else if (OB_FAIL(ddl_start_scn_.convert_for_gts(MAX(old_tablet_meta.ddl_start_scn_.get_val_for_gts(), ddl_start_scn)))) {
|
||||
// TODO yiren, Do not forget to use palf::SCN::max rather than MAX.
|
||||
} else if (OB_FAIL(ddl_start_scn_.convert_for_tx(MAX(old_tablet_meta.ddl_start_scn_.get_val_for_tx(), ddl_start_scn)))) {
|
||||
LOG_WARN("fail to convert scn", K(ret), K(ddl_start_scn));
|
||||
} else if (OB_FAIL(clog_checkpoint_scn_.convert_for_gts(MAX(old_tablet_meta.clog_checkpoint_scn_.get_val_for_gts(), clog_checkpoint_ts)))) {
|
||||
} else if (OB_FAIL(clog_checkpoint_scn_.convert_for_tx(MAX(old_tablet_meta.clog_checkpoint_scn_.get_val_for_tx(), clog_checkpoint_ts)))) {
|
||||
LOG_WARN("fail to convert scn", K(ret), K(clog_checkpoint_ts));
|
||||
} else if (OB_FAIL(ddl_checkpoint_scn_.convert_for_gts(MAX(old_tablet_meta.ddl_checkpoint_scn_.get_val_for_gts(), ddl_checkpoint_ts)))) {
|
||||
} else if (OB_FAIL(ddl_checkpoint_scn_.convert_for_tx(MAX(old_tablet_meta.ddl_checkpoint_scn_.get_val_for_tx(), ddl_checkpoint_ts)))) {
|
||||
LOG_WARN("fail to convert scn", K(ret), K(ddl_checkpoint_ts));
|
||||
} else {
|
||||
version_ = TABLET_META_VERSION;
|
||||
|
Loading…
x
Reference in New Issue
Block a user