retry remote write ddl clog.

This commit is contained in:
obdev 2023-04-13 06:57:17 +00:00 committed by ob-robot
parent a36c4299c1
commit 93292ad68d
3 changed files with 85 additions and 16 deletions

View File

@ -1975,6 +1975,7 @@ int ObRpcRemoteWriteDDLRedoLogP::process()
LOG_WARN("invalid arguments", K(ret), K_(arg));
} else {
MTL_SWITCH(tenant_id) {
ObRole role = INVALID_ROLE;
ObDDLSSTableRedoWriter sstable_redo_writer;
MacroBlockId macro_block_id;
ObMacroBlockHandle macro_handle;
@ -1983,6 +1984,7 @@ int ObRpcRemoteWriteDDLRedoLogP::process()
ObLSHandle ls_handle;
ObTabletHandle tablet_handle;
ObDDLKvMgrHandle ddl_kv_mgr_handle;
ObLS *ls = nullptr;
// restruct write_info
write_info.buffer_ = arg_.redo_info_.data_buffer_.ptr();
@ -1991,7 +1993,15 @@ int ObRpcRemoteWriteDDLRedoLogP::process()
const int64_t io_timeout_ms = max(DDL_FLUSH_MACRO_BLOCK_TIMEOUT / 1000L, GCONF._data_storage_io_timeout / 1000L);
if (OB_FAIL(ls_service->get_ls(arg_.ls_id_, ls_handle, ObLSGetMod::OBSERVER_MOD))) {
LOG_WARN("get ls failed", K(ret), K(arg_));
} else if (OB_FAIL(ls_handle.get_ls()->get_tablet(arg_.redo_info_.table_key_.tablet_id_, tablet_handle))) {
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret), K(MTL_ID()), K(arg_.ls_id_));
} else if (OB_FAIL(ls->get_ls_role(role))) {
LOG_WARN("get role failed", K(ret), K(MTL_ID()), K(arg_.ls_id_));
} else if (ObRole::LEADER != role) {
ret = OB_NOT_MASTER;
LOG_INFO("leader may not have finished replaying clog, caller retry", K(ret), K(MTL_ID()), K(arg_.ls_id_));
} else if (OB_FAIL(ls->get_tablet(arg_.redo_info_.table_key_.tablet_id_, tablet_handle))) {
LOG_WARN("get tablet failed", K(ret));
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
LOG_WARN("get ddl kv manager failed", K(ret));
@ -2018,18 +2028,28 @@ int ObRpcRemoteWriteDDLCommitLogP::process()
uint64_t tenant_id = arg_.tenant_id_;
MTL_SWITCH(tenant_id) {
ObRole role = INVALID_ROLE;
const ObITable::TableKey &table_key = arg_.table_key_;
ObDDLSSTableRedoWriter sstable_redo_writer;
ObLSService *ls_service = MTL(ObLSService*);
ObLSHandle ls_handle;
ObTabletHandle tablet_handle;
ObDDLKvMgrHandle ddl_kv_mgr_handle;
ObLS *ls = nullptr;
if (OB_UNLIKELY(!arg_.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K_(arg));
} else if (OB_FAIL(ls_service->get_ls(arg_.ls_id_, ls_handle, ObLSGetMod::OBSERVER_MOD))) {
LOG_WARN("get ls failed", K(ret), K(arg_));
} else if (OB_FAIL(ls_handle.get_ls()->get_tablet(table_key.tablet_id_, tablet_handle))) {
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret), K(MTL_ID()), K(arg_.ls_id_));
} else if (OB_FAIL(ls->get_ls_role(role))) {
LOG_WARN("get role failed", K(ret), K(MTL_ID()), K(arg_.ls_id_));
} else if (ObRole::LEADER != role) {
ret = OB_NOT_MASTER;
LOG_INFO("leader may not have finished replaying clog, caller retry", K(ret), K(MTL_ID()), K(arg_.ls_id_));
} else if (OB_FAIL(ls->get_tablet(table_key.tablet_id_, tablet_handle))) {
LOG_WARN("get tablet failed", K(ret));
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
if (OB_ENTRY_NOT_EXIST == ret) {

View File

@ -1182,11 +1182,8 @@ int ObDDLSSTableRedoWriter::write_redo_log(const ObDDLMacroBlockRedoInfo &redo_i
}
if (OB_SUCC(ret) && remote_write_) {
if (OB_FAIL(ObDDLMacroBlockRedoWriter::remote_write_macro_redo(task_id,
leader_addr_,
leader_ls_id_,
redo_info))) {
LOG_WARN("fail to remote write ddl redo log", K(ret), K_(leader_ls_id), K_(leader_addr));
if (OB_FAIL(retry_remote_write_ddl_clog( [&]() { return remote_write_macro_redo(task_id, redo_info); }))) {
LOG_WARN("remote write redo failed", K(ret), K(task_id));
}
}
return ret;
@ -1265,18 +1262,11 @@ int ObDDLSSTableRedoWriter::write_commit_log(ObTabletHandle &tablet_handle,
}
}
if (OB_SUCC(ret) && remote_write_) {
ObSrvRpcProxy *srv_rpc_proxy = GCTX.srv_rpc_proxy_;
obrpc::ObRpcRemoteWriteDDLCommitLogArg arg;
obrpc::Int64 log_ns;
if (OB_FAIL(arg.init(MTL_ID(), leader_ls_id_, table_key, get_start_scn(), table_id, execution_id, ddl_task_id))) {
LOG_WARN("fail to init ObRpcRemoteWriteDDLCommitLogArg", K(ret));
} else if (OB_ISNULL(srv_rpc_proxy)) {
ret = OB_ERR_SYS;
LOG_WARN("srv rpc proxy or location service is null", K(ret), KP(srv_rpc_proxy));
} else if (OB_FAIL(srv_rpc_proxy->to(leader_addr_).by(MTL_ID()).remote_write_ddl_commit_log(arg, log_ns))) {
LOG_WARN("fail to remote write ddl redo log", K(ret), K(arg));
} else if (OB_FAIL(commit_scn.convert_for_tx(log_ns))) {
LOG_WARN("convert for tx failed", K(ret));
} else if (OB_FAIL(retry_remote_write_ddl_clog( [&]() { return remote_write_commit_log(arg, commit_scn); }))) {
LOG_WARN("remote write ddl commit log failed", K(ret), K(arg));
}
}
return ret;
@ -1310,6 +1300,61 @@ int ObDDLSSTableRedoWriter::switch_to_remote_write()
return ret;
}
template <typename T>
int ObDDLSSTableRedoWriter::retry_remote_write_ddl_clog(T function)
{
int ret = OB_SUCCESS;
int retry_cnt = 0;
const int64_t MAX_REMOTE_WRITE_RETRY_CNT = 800;
while (OB_SUCC(ret)) {
if (OB_FAIL(switch_to_remote_write())) {
LOG_WARN("flush ls leader location failed", K(ret));
} else if (OB_FAIL(function())) {
if (OB_NOT_MASTER == ret && retry_cnt++ < MAX_REMOTE_WRITE_RETRY_CNT) {
ob_usleep(10 * 1000); // 10 ms.
ret = OB_SUCCESS;
} else {
LOG_WARN("remote write macro redo failed", K(ret), K_(leader_ls_id), K_(leader_addr));
}
} else {
break; // remote write ddl clog successfully.
}
}
return ret;
}
int ObDDLSSTableRedoWriter::remote_write_macro_redo(const int64_t task_id, const ObDDLMacroBlockRedoInfo &redo_info)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObDDLMacroBlockRedoWriter::remote_write_macro_redo(task_id,
leader_addr_,
leader_ls_id_,
redo_info))) {
LOG_WARN("remote write macro redo failed", K(ret));
}
return ret;
}
int ObDDLSSTableRedoWriter::remote_write_commit_log(const obrpc::ObRpcRemoteWriteDDLCommitLogArg &arg, SCN &commit_scn)
{
int ret = OB_SUCCESS;
ObSrvRpcProxy *srv_rpc_proxy = GCTX.srv_rpc_proxy_;
obrpc::Int64 log_ns;
int retry_cnt = 0;
if (OB_ISNULL(srv_rpc_proxy)) {
ret = OB_ERR_SYS;
LOG_WARN("srv rpc proxy or location service is null", K(ret), KP(srv_rpc_proxy));
} else if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(arg));
} else if (OB_FAIL(srv_rpc_proxy->to(leader_addr_).by(MTL_ID()).remote_write_ddl_commit_log(arg, log_ns))) {
LOG_WARN("remote write macro redo failed", K(ret), K_(leader_ls_id), K_(leader_addr));
} else if (OB_FAIL(commit_scn.convert_for_tx(log_ns))) {
LOG_WARN("convert for tx failed", K(ret));
}
return ret;
}
ObDDLSSTableRedoWriter::~ObDDLSSTableRedoWriter()
{
if (nullptr != buffer_) {

View File

@ -293,6 +293,10 @@ public:
OB_INLINE share::SCN get_start_scn() const { return start_scn_.atomic_get(); }
private:
int switch_to_remote_write();
int remote_write_macro_redo(const int64_t task_id, const ObDDLMacroBlockRedoInfo &redo_info);
int remote_write_commit_log(const obrpc::ObRpcRemoteWriteDDLCommitLogArg &arg, SCN &commit_scn);
template <typename T>
int retry_remote_write_ddl_clog(T function);
private:
bool is_inited_;
bool remote_write_;