wait ddl commit log replay when it is written remotely
This commit is contained in:
parent
935697c85a
commit
ac9fb77cff
@ -2148,19 +2148,15 @@ int ObRpcRemoteWriteDDLCommitLogP::process()
|
||||
} else if (FALSE_IT(sstable_redo_writer.set_start_scn(arg_.start_scn_))) {
|
||||
} else {
|
||||
SCN commit_scn;
|
||||
bool is_remote_write = false;
|
||||
if (OB_FAIL(sstable_redo_writer.write_commit_log(tablet_handle,
|
||||
ddl_kv_mgr_handle,
|
||||
false,
|
||||
table_key,
|
||||
arg_.table_id_,
|
||||
arg_.execution_id_,
|
||||
arg_.ddl_task_id_,
|
||||
commit_scn))) {
|
||||
commit_scn,
|
||||
is_remote_write))) {
|
||||
LOG_WARN("fail to remote write commit log", K(ret), K(table_key), K_(arg));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(arg_.start_scn_,
|
||||
commit_scn,
|
||||
arg_.table_id_,
|
||||
arg_.ddl_task_id_))) {
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(arg_.start_scn_, commit_scn))) {
|
||||
LOG_WARN("failed to do ddl kv commit", K(ret), K(arg_));
|
||||
} else {
|
||||
result_ = commit_scn.get_val_for_tx();
|
||||
|
@ -7659,25 +7659,17 @@ ObRpcRemoteWriteDDLCommitLogArg::ObRpcRemoteWriteDDLCommitLogArg()
|
||||
int ObRpcRemoteWriteDDLCommitLogArg::init(const uint64_t tenant_id,
|
||||
const share::ObLSID &ls_id,
|
||||
const storage::ObITable::TableKey &table_key,
|
||||
const SCN &start_scn,
|
||||
const int64_t table_id,
|
||||
const int64_t execution_id,
|
||||
const int64_t ddl_task_id)
|
||||
const SCN &start_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(tenant_id == OB_INVALID_ID || !ls_id.is_valid() || !table_key.is_valid() || !start_scn.is_valid_and_not_min()
|
||||
|| table_id <= 0 || execution_id < 0 || ddl_task_id <= 0)) {
|
||||
if (OB_UNLIKELY(tenant_id == OB_INVALID_ID || !ls_id.is_valid() || !table_key.is_valid() || !start_scn.is_valid_and_not_min())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("tablet id is not valid", K(ret), K(tenant_id), K(ls_id), K(table_key), K(start_scn),
|
||||
K(table_id), K(execution_id), K(ddl_task_id));
|
||||
LOG_WARN("tablet id is not valid", K(ret), K(tenant_id), K(ls_id), K(table_key), K(start_scn));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
ls_id_ = ls_id;
|
||||
table_key_ = table_key;
|
||||
start_scn_ = start_scn;
|
||||
table_id_ = table_id;
|
||||
execution_id_ = execution_id;
|
||||
ddl_task_id_ = ddl_task_id;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -8359,14 +8359,10 @@ public:
|
||||
int init(const uint64_t tenant_id,
|
||||
const share::ObLSID &ls_id,
|
||||
const storage::ObITable::TableKey &table_key,
|
||||
const share::SCN &start_scn,
|
||||
const int64_t table_id,
|
||||
const int64_t execution_id,
|
||||
const int64_t ddl_task_id);
|
||||
const share::SCN &start_scn);
|
||||
bool is_valid() const
|
||||
{
|
||||
return tenant_id_ != OB_INVALID_ID && ls_id_.is_valid() && table_key_.is_valid() && start_scn_.is_valid_and_not_min()
|
||||
&& table_id_ > 0 && execution_id_ >= 0 && ddl_task_id_ > 0;
|
||||
return tenant_id_ != OB_INVALID_ID && ls_id_.is_valid() && table_key_.is_valid() && start_scn_.is_valid_and_not_min();
|
||||
}
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(table_key), K_(start_scn), K_(table_id),
|
||||
K_(execution_id), K_(ddl_task_id));
|
||||
@ -8375,9 +8371,9 @@ public:
|
||||
share::ObLSID ls_id_;
|
||||
storage::ObITable::TableKey table_key_;
|
||||
share::SCN start_scn_;
|
||||
int64_t table_id_;
|
||||
int64_t execution_id_;
|
||||
int64_t ddl_task_id_;
|
||||
int64_t table_id_; // depercated
|
||||
int64_t execution_id_; // depercated
|
||||
int64_t ddl_task_id_; // depercated
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObRpcRemoteWriteDDLCommitLogArg);
|
||||
};
|
||||
|
@ -794,6 +794,8 @@ int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(ObTabletHandle &tab
|
||||
if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
|
||||
LOG_WARN("schedule ddl merge dag failed", K(ret), K(param));
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("schedule ddl merge task for major sstable success", K(param));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -45,27 +45,20 @@ public:
|
||||
tablet_id_(),
|
||||
rec_scn_(share::SCN::min_scn()),
|
||||
is_commit_(false),
|
||||
start_scn_(share::SCN::min_scn()),
|
||||
table_id_(0),
|
||||
execution_id_(-1),
|
||||
ddl_task_id_(0)
|
||||
start_scn_(share::SCN::min_scn())
|
||||
{ }
|
||||
bool is_valid() const
|
||||
{
|
||||
return ls_id_.is_valid() && tablet_id_.is_valid() && start_scn_.is_valid_and_not_min();
|
||||
}
|
||||
virtual ~ObDDLTableMergeDagParam() = default;
|
||||
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(rec_scn), K_(is_commit), K_(start_scn),
|
||||
K_(table_id), K_(execution_id), K_(ddl_task_id));
|
||||
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(rec_scn), K_(is_commit), K_(start_scn));
|
||||
public:
|
||||
share::ObLSID ls_id_;
|
||||
ObTabletID tablet_id_;
|
||||
share::SCN rec_scn_;
|
||||
bool is_commit_;
|
||||
share::SCN start_scn_; // start log ts at schedule, for skipping expired task
|
||||
uint64_t table_id_; // used for report ddl checksum
|
||||
int64_t execution_id_; // used for report ddl checksum
|
||||
int64_t ddl_task_id_; // used for report ddl checksum
|
||||
};
|
||||
|
||||
class ObDDLTableMergeDag : public share::ObIDag
|
||||
|
@ -1190,6 +1190,7 @@ int ObDDLSSTableRedoWriter::end_ddl_redo_and_create_ddl_sstable(
|
||||
ObLSID ls_id;
|
||||
SCN ddl_start_scn = get_start_scn();
|
||||
SCN commit_scn = SCN::min_scn();
|
||||
bool is_remote_write = false;
|
||||
if (OB_ISNULL(ls = ls_handle.get_ls()) || OB_UNLIKELY(!table_key.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid ls", K(ret), K(table_key));
|
||||
@ -1198,7 +1199,7 @@ int ObDDLSSTableRedoWriter::end_ddl_redo_and_create_ddl_sstable(
|
||||
LOG_WARN("get tablet failed", K(ret));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
|
||||
LOG_WARN("get ddl kv manager failed", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_FAIL(write_commit_log(tablet_handle, ddl_kv_mgr_handle, true, table_key, table_id, execution_id, ddl_task_id, commit_scn))) {
|
||||
} else if (OB_FAIL(write_commit_log(tablet_handle, ddl_kv_mgr_handle, true, table_key, commit_scn, is_remote_write))) {
|
||||
if (OB_TASK_EXPIRED == ret) {
|
||||
LOG_INFO("ddl task expired", K(ret), K(table_key), K(table_id), K(execution_id), K(ddl_task_id));
|
||||
} else {
|
||||
@ -1219,17 +1220,18 @@ int ObDDLSSTableRedoWriter::end_ddl_redo_and_create_ddl_sstable(
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(ddl_start_scn,
|
||||
commit_scn,
|
||||
table_id,
|
||||
ddl_task_id))) {
|
||||
} else if (is_remote_write) {
|
||||
LOG_INFO("ddl commit log is written in remote, need wait replay", K(ddl_task_id), K(tablet_id), K(ddl_start_scn), K(commit_scn));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->ddl_commit(ddl_start_scn, commit_scn))) {
|
||||
if (OB_TASK_EXPIRED == ret) {
|
||||
LOG_INFO("ddl task expired", K(ret), K(ls_id), K(tablet_id),
|
||||
K(ddl_start_scn), "new_ddl_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn());
|
||||
} else {
|
||||
LOG_WARN("failed to do ddl kv commit", K(ret), K(ddl_start_scn), K(commit_scn));
|
||||
}
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_merge_success(ddl_start_scn, commit_scn, table_id, ddl_task_id))) {
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->wait_ddl_merge_success(ddl_start_scn, commit_scn))) {
|
||||
if (OB_TASK_EXPIRED == ret) {
|
||||
LOG_INFO("ddl task expired, but return success", K(ret), K(ls_id), K(tablet_id),
|
||||
K(ddl_start_scn), "new_ddl_start_scn",
|
||||
@ -1342,22 +1344,18 @@ int ObDDLSSTableRedoWriter::write_commit_log(ObTabletHandle &tablet_handle,
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
const bool allow_remote_write,
|
||||
const ObITable::TableKey &table_key,
|
||||
const int64_t table_id,
|
||||
const int64_t execution_id,
|
||||
const int64_t ddl_task_id,
|
||||
SCN &commit_scn)
|
||||
SCN &commit_scn,
|
||||
bool &is_remote_write)
|
||||
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
#ifdef ERRSIM
|
||||
SERVER_EVENT_SYNC_ADD("storage_ddl", "before_write_prepare_log",
|
||||
"table_key", table_key,
|
||||
"table_id", table_id,
|
||||
"execution_id", execution_id,
|
||||
"ddl_task_id", ddl_task_id);
|
||||
"table_key", table_key);
|
||||
DEBUG_SYNC(BEFORE_DDL_WRITE_PREPARE_LOG);
|
||||
#endif
|
||||
commit_scn.set_min();
|
||||
is_remote_write = false;
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
ObDDLCommitLog log;
|
||||
@ -1392,10 +1390,12 @@ int ObDDLSSTableRedoWriter::write_commit_log(ObTabletHandle &tablet_handle,
|
||||
}
|
||||
if (OB_SUCC(ret) && remote_write_) {
|
||||
obrpc::ObRpcRemoteWriteDDLCommitLogArg arg;
|
||||
if (OB_FAIL(arg.init(MTL_ID(), leader_ls_id_, table_key, get_start_scn(), table_id, execution_id, ddl_task_id))) {
|
||||
if (OB_FAIL(arg.init(MTL_ID(), leader_ls_id_, table_key, get_start_scn()))) {
|
||||
LOG_WARN("fail to init ObRpcRemoteWriteDDLCommitLogArg", K(ret));
|
||||
} else if (OB_FAIL(retry_remote_write_ddl_clog( [&]() { return remote_write_commit_log(arg, commit_scn); }))) {
|
||||
LOG_WARN("remote write ddl commit log failed", K(ret), K(arg));
|
||||
} else {
|
||||
is_remote_write = !(leader_addr_ == GCTX.self_addr());
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -303,10 +303,8 @@ public:
|
||||
ObDDLKvMgrHandle &ddl_kv_mgr_handle,
|
||||
const bool allow_remote_write,
|
||||
const ObITable::TableKey &table_key,
|
||||
const int64_t table_id,
|
||||
const int64_t execution_id,
|
||||
const int64_t ddl_task_id,
|
||||
share::SCN &commit_scn);
|
||||
share::SCN &commit_scn,
|
||||
bool &is_remote_write);
|
||||
OB_INLINE void set_start_scn(const share::SCN &start_scn) { start_scn_.atomic_set(start_scn); }
|
||||
OB_INLINE share::SCN get_start_scn() const { return start_scn_.atomic_get(); }
|
||||
private:
|
||||
|
@ -175,10 +175,7 @@ int ObTabletDDLKvMgr::ddl_start(ObTablet &tablet,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn,
|
||||
const SCN &commit_scn,
|
||||
const uint64_t table_id,
|
||||
const int64_t ddl_task_id)
|
||||
int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn, const SCN &commit_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
@ -211,34 +208,25 @@ int ObTabletDDLKvMgr::ddl_commit(const SCN &start_scn,
|
||||
param.rec_scn_ = commit_scn;
|
||||
param.is_commit_ = true;
|
||||
param.start_scn_ = start_scn;
|
||||
param.table_id_ = table_id;
|
||||
param.execution_id_ = execution_id_;
|
||||
param.ddl_task_id_ = ddl_task_id;
|
||||
const int64_t start_ts = ObTimeUtility::fast_current_time();
|
||||
|
||||
while (OB_SUCC(ret) && is_started()) {
|
||||
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
|
||||
if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
|
||||
LOG_WARN("schedule ddl merge dag failed", K(ret), K(param));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
ob_usleep(10L * 1000L);
|
||||
if (REACH_TIME_INTERVAL(10L * 1000L * 1000L)) {
|
||||
LOG_INFO("retry schedule ddl commit task",
|
||||
K(start_scn), K(commit_scn), K(table_id), K(ddl_task_id), K(*this),
|
||||
"wait_elpased_s", (ObTimeUtility::fast_current_time() - start_ts) / 1000000L);
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
|
||||
if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
|
||||
LOG_WARN("schedule ddl merge dag failed", K(ret), K(param));
|
||||
} else {
|
||||
LOG_INFO("schedule ddl commit task success", K(start_scn), K(commit_scn), K(table_id), K(ddl_task_id), K(*this));
|
||||
break;
|
||||
ret = OB_SUCCESS; // the backgroud scheduler will reschedule again
|
||||
LOG_INFO("schedule ddl merge task need retry",
|
||||
K(start_scn), K(commit_scn), K(*this),
|
||||
"wait_elpased_s", (ObTimeUtility::fast_current_time() - start_ts) / 1000000L);
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("schedule ddl commit task success", K(start_scn), K(commit_scn), K(*this));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLKvMgr::schedule_ddl_merge_task(const SCN &start_scn, const SCN &commit_scn, const bool is_replay, const uint64_t table_id, const int64_t ddl_task_id)
|
||||
int ObTabletDDLKvMgr::schedule_ddl_merge_task(const SCN &start_scn, const SCN &commit_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
@ -260,9 +248,6 @@ int ObTabletDDLKvMgr::schedule_ddl_merge_task(const SCN &start_scn, const SCN &c
|
||||
param.rec_scn_ = commit_scn;
|
||||
param.is_commit_ = true;
|
||||
param.start_scn_ = start_scn;
|
||||
param.table_id_ = table_id;
|
||||
param.execution_id_ = execution_id_;
|
||||
param.ddl_task_id_ = ddl_task_id;
|
||||
// retry submit dag in case of the previous dag failed
|
||||
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
|
||||
if (OB_SIZE_OVERFLOW == ret || OB_EAGAIN == ret) {
|
||||
@ -274,20 +259,10 @@ int ObTabletDDLKvMgr::schedule_ddl_merge_task(const SCN &start_scn, const SCN &c
|
||||
ret = OB_EAGAIN; // until major sstable is ready
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret) && is_replay) {
|
||||
if (OB_TABLET_NOT_EXIST == ret || OB_TASK_EXPIRED == ret) {
|
||||
ret = OB_SUCCESS; // think as succcess for replay
|
||||
} else {
|
||||
if (REACH_TIME_INTERVAL(10L * 1000L * 1000L)) {
|
||||
LOG_INFO("replay ddl commit", K(ret), K(start_scn), K(commit_scn), K(*this));
|
||||
}
|
||||
ret = OB_EAGAIN; // retry by replay service
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLKvMgr::wait_ddl_merge_success(const SCN &start_scn, const SCN &commit_scn, const uint64_t table_id, const int64_t ddl_task_id)
|
||||
int ObTabletDDLKvMgr::wait_ddl_merge_success(const SCN &start_scn, const SCN &commit_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
@ -307,7 +282,7 @@ int ObTabletDDLKvMgr::wait_ddl_merge_success(const SCN &start_scn, const SCN &co
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(THIS_WORKER.check_status())) {
|
||||
LOG_WARN("check status failed", K(ret));
|
||||
} else if (OB_FAIL(schedule_ddl_merge_task(start_scn, commit_scn, false/*is_replay*/, table_id, ddl_task_id))) {
|
||||
} else if (OB_FAIL(schedule_ddl_merge_task(start_scn, commit_scn))) {
|
||||
if (OB_EAGAIN == ret) {
|
||||
ob_usleep(100L); // 100us.
|
||||
ret = OB_SUCCESS; // retry
|
||||
@ -336,11 +311,6 @@ int ObTabletDDLKvMgr::get_ddl_major_merge_param(const ObTabletMeta &tablet_meta,
|
||||
param.rec_scn_ = get_commit_scn_nolock(tablet_meta);
|
||||
param.is_commit_ = true;
|
||||
param.start_scn_ = start_scn_;
|
||||
|
||||
// no checksum report
|
||||
param.table_id_ = 0;
|
||||
param.execution_id_ = -1;
|
||||
param.ddl_task_id_ = 0;
|
||||
} else {
|
||||
ret = OB_EAGAIN;
|
||||
}
|
||||
|
@ -38,9 +38,9 @@ public:
|
||||
int init(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id); // init before memtable mgr
|
||||
int ddl_start_nolock(const ObITable::TableKey &table_key, const share::SCN &start_scn, const int64_t data_format_version, const int64_t execution_id, const share::SCN &checkpoint_scn);
|
||||
int ddl_start(ObTablet &tablet, const ObITable::TableKey &table_key, const share::SCN &start_scn, const int64_t data_format_version, const int64_t execution_id, const share::SCN &checkpoint_scn);
|
||||
int ddl_commit(const share::SCN &start_scn, const share::SCN &commit_scn, const uint64_t table_id = 0, const int64_t ddl_task_id = 0); // schedule build a major sstable
|
||||
int schedule_ddl_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn, const bool is_replay, const uint64_t table_id, const int64_t ddl_task_id); // try wait build major sstable
|
||||
int wait_ddl_merge_success(const share::SCN &start_scn, const share::SCN &commit_scn, const uint64_t table_id, const int64_t ddl_task_id);
|
||||
int ddl_commit(const share::SCN &start_scn, const share::SCN &commit_scn); // schedule build a major sstable
|
||||
int schedule_ddl_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn); // try wait build major sstable
|
||||
int wait_ddl_merge_success(const share::SCN &start_scn, const share::SCN &commit_scn);
|
||||
int get_ddl_param(ObTabletDDLParam &ddl_param);
|
||||
int get_or_create_ddl_kv(const share::SCN &start_scn, const share::SCN &scn, ObTableHandleV2 &kv_handle); // used in active ddl kv guard
|
||||
int get_freezed_ddl_kv(const share::SCN &freeze_scn, ObTableHandleV2 &kv_handle); // locate ddl kv with exeact freeze log ts
|
||||
|
Loading…
x
Reference in New Issue
Block a user