fix direct load unable to handle exceptions resulting in process stuck bug
This commit is contained in:
@ -80,7 +80,7 @@ int ObTableLoadRedefTable::finish(const ObTableLoadRedefTableFinishArg &arg,
|
|||||||
copy_table_dependents_arg.copy_constraints_ = true;
|
copy_table_dependents_arg.copy_constraints_ = true;
|
||||||
copy_table_dependents_arg.copy_triggers_ = false;
|
copy_table_dependents_arg.copy_triggers_ = false;
|
||||||
copy_table_dependents_arg.ignore_errors_ = false;
|
copy_table_dependents_arg.ignore_errors_ = false;
|
||||||
if (OB_FAIL(ObDDLServerClient::copy_table_dependents(copy_table_dependents_arg))) {
|
if (OB_FAIL(ObDDLServerClient::copy_table_dependents(copy_table_dependents_arg, session_info))) {
|
||||||
LOG_WARN("failed to copy table dependents", KR(ret), K(copy_table_dependents_arg));
|
LOG_WARN("failed to copy table dependents", KR(ret), K(copy_table_dependents_arg));
|
||||||
} else {
|
} else {
|
||||||
LOG_INFO("succeed to copy table dependents", K(copy_table_dependents_arg));
|
LOG_INFO("succeed to copy table dependents", K(copy_table_dependents_arg));
|
||||||
|
|||||||
@ -66,9 +66,9 @@ public:
|
|||||||
int64_t &affected_rows);
|
int64_t &affected_rows);
|
||||||
static int wait_build_index_finish(const uint64_t tenant_id, const int64_t task_id, bool &is_finish);
|
static int wait_build_index_finish(const uint64_t tenant_id, const int64_t task_id, bool &is_finish);
|
||||||
static int handle_session_exception(ObSQLSessionInfo &session);
|
static int handle_session_exception(ObSQLSessionInfo &session);
|
||||||
|
static int cancel_ddl_task(const int64_t tenant_id, obrpc::ObCommonRpcProxy *common_rpc_proxy);
|
||||||
private:
|
private:
|
||||||
static inline bool is_server_stopped() { return observer::ObServer::get_instance().is_stopped(); }
|
static inline bool is_server_stopped() { return observer::ObServer::get_instance().is_stopped(); }
|
||||||
static int cancel_ddl_task(const int64_t tenant_id, obrpc::ObCommonRpcProxy *common_rpc_proxy);
|
|
||||||
private:
|
private:
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObDDLExecutorUtil);
|
DISALLOW_COPY_AND_ASSIGN(ObDDLExecutorUtil);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -111,9 +111,10 @@ int ObDDLServerClient::start_redef_table(const obrpc::ObStartRedefTableArg &arg,
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
int ObDDLServerClient::copy_table_dependents(const obrpc::ObCopyTableDependentsArg &arg)
|
int ObDDLServerClient::copy_table_dependents(const obrpc::ObCopyTableDependentsArg &arg, sql::ObSQLSessionInfo &session)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
const uint64_t tenant_id = arg.tenant_id_;
|
||||||
const int64_t retry_interval = 100 * 1000L;
|
const int64_t retry_interval = 100 * 1000L;
|
||||||
ObAddr rs_leader_addr;
|
ObAddr rs_leader_addr;
|
||||||
obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
|
obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
|
||||||
@ -126,11 +127,15 @@ int ObDDLServerClient::copy_table_dependents(const obrpc::ObCopyTableDependentsA
|
|||||||
} else {
|
} else {
|
||||||
while (OB_SUCC(ret)) {
|
while (OB_SUCC(ret)) {
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
omt::ObTenant *tenant = nullptr;
|
if (OB_FAIL(check_need_stop(tenant_id))) {
|
||||||
if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) {
|
LOG_WARN("fail to basic check", K(ret), K(tenant_id));
|
||||||
|
} else if (OB_FAIL(ObDDLExecutorUtil::handle_session_exception(session))) {
|
||||||
|
LOG_WARN("session execption happened", K(ret));
|
||||||
|
if (OB_TMP_FAIL(ObDDLExecutorUtil::cancel_ddl_task(tenant_id, common_rpc_proxy))) {
|
||||||
|
LOG_WARN("cancel ddl task failed", K(tmp_ret));
|
||||||
|
}
|
||||||
|
} else if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) {
|
||||||
LOG_WARN("fail to rootservice address", K(tmp_ret));
|
LOG_WARN("fail to rootservice address", K(tmp_ret));
|
||||||
} else if (OB_FAIL(GCTX.omt_->get_tenant(arg.tenant_id_, tenant))) {
|
|
||||||
LOG_WARN("fail to get tenant, maybe tenant deleted", K_(arg.tenant_id));
|
|
||||||
} else if (OB_FAIL(common_rpc_proxy->to(rs_leader_addr).copy_table_dependents(arg))) {
|
} else if (OB_FAIL(common_rpc_proxy->to(rs_leader_addr).copy_table_dependents(arg))) {
|
||||||
LOG_WARN("copy table dependents failed", K(ret), K(arg));
|
LOG_WARN("copy table dependents failed", K(ret), K(arg));
|
||||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||||
@ -156,6 +161,7 @@ int ObDDLServerClient::copy_table_dependents(const obrpc::ObCopyTableDependentsA
|
|||||||
int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg, sql::ObSQLSessionInfo &session)
|
int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg, sql::ObSQLSessionInfo &session)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
const uint64_t tenant_id = arg.tenant_id_;
|
||||||
const int64_t retry_interval = 100 * 1000L;
|
const int64_t retry_interval = 100 * 1000L;
|
||||||
ObAddr rs_leader_addr;
|
ObAddr rs_leader_addr;
|
||||||
obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
|
obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
|
||||||
@ -168,11 +174,10 @@ int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg,
|
|||||||
} else {
|
} else {
|
||||||
while (OB_SUCC(ret)) {
|
while (OB_SUCC(ret)) {
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
omt::ObTenant *tenant = nullptr;
|
if (OB_FAIL(check_need_stop(tenant_id))) {
|
||||||
if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) {
|
LOG_WARN("fail to basic check", K(ret), K(tenant_id));
|
||||||
|
} else if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) {
|
||||||
LOG_WARN("fail to get rootservice address", K(tmp_ret));
|
LOG_WARN("fail to get rootservice address", K(tmp_ret));
|
||||||
} else if (OB_FAIL(GCTX.omt_->get_tenant(arg.tenant_id_, tenant))) {
|
|
||||||
LOG_WARN("fail to get tenant, maybe tenant deleted", K_(arg.tenant_id));
|
|
||||||
} else if (OB_FAIL(common_rpc_proxy->to(rs_leader_addr).abort_redef_table(arg))) {
|
} else if (OB_FAIL(common_rpc_proxy->to(rs_leader_addr).abort_redef_table(arg))) {
|
||||||
LOG_WARN("abort redef table failed", K(ret), K(arg));
|
LOG_WARN("abort redef table failed", K(ret), K(arg));
|
||||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||||
@ -217,6 +222,7 @@ int ObDDLServerClient::finish_redef_table(const obrpc::ObFinishRedefTableArg &fi
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
|
const uint64_t tenant_id = finish_redef_arg.tenant_id_;
|
||||||
const int64_t retry_interval = 100 * 1000L;
|
const int64_t retry_interval = 100 * 1000L;
|
||||||
ObAddr rs_leader_addr;
|
ObAddr rs_leader_addr;
|
||||||
obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
|
obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
|
||||||
@ -229,11 +235,16 @@ int ObDDLServerClient::finish_redef_table(const obrpc::ObFinishRedefTableArg &fi
|
|||||||
} else {
|
} else {
|
||||||
while (OB_SUCC(ret)) {
|
while (OB_SUCC(ret)) {
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
omt::ObTenant *tenant = nullptr;
|
if (OB_FAIL(check_need_stop(tenant_id))) {
|
||||||
if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) {
|
LOG_WARN("fail to basic check", K(ret), K(tenant_id));
|
||||||
|
} else if (OB_FAIL(ObDDLExecutorUtil::handle_session_exception(session))) {
|
||||||
|
LOG_WARN("session execption happened", K(ret));
|
||||||
|
if (OB_TMP_FAIL(ObDDLExecutorUtil::cancel_ddl_task(tenant_id, common_rpc_proxy))) {
|
||||||
|
LOG_WARN("cancel ddl task failed", K(tmp_ret));
|
||||||
|
ret = OB_SUCCESS;
|
||||||
|
}
|
||||||
|
} else if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) {
|
||||||
LOG_WARN("fail to rootservice address", K(tmp_ret));
|
LOG_WARN("fail to rootservice address", K(tmp_ret));
|
||||||
} else if (OB_FAIL(GCTX.omt_->get_tenant(finish_redef_arg.tenant_id_, tenant))) {
|
|
||||||
LOG_WARN("fail to get tenant, maybe tenant deleted", K_(finish_redef_arg.tenant_id));
|
|
||||||
} else if (OB_FAIL(common_rpc_proxy->to(rs_leader_addr).finish_redef_table(finish_redef_arg))) {
|
} else if (OB_FAIL(common_rpc_proxy->to(rs_leader_addr).finish_redef_table(finish_redef_arg))) {
|
||||||
LOG_WARN("finish redef table failed", K(ret), K(finish_redef_arg));
|
LOG_WARN("finish redef table failed", K(ret), K(finish_redef_arg));
|
||||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||||
@ -345,6 +356,36 @@ int ObDDLServerClient::heart_beat_clear(const int64_t task_id)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObDDLServerClient::check_need_stop(const uint64_t tenant_id)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
int tmp_ret = OB_SUCCESS;
|
||||||
|
bool is_tenant_dropped = false;
|
||||||
|
bool is_tenant_standby = false;
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
} else if (OB_TMP_FAIL(GSCHEMASERVICE.check_if_tenant_has_been_dropped(tenant_id, is_tenant_dropped))) {
|
||||||
|
LOG_WARN("check if tenant has been droopped failed", K(tmp_ret), K(tenant_id));
|
||||||
|
} else if (is_tenant_dropped) {
|
||||||
|
ret = OB_TENANT_HAS_BEEN_DROPPED;
|
||||||
|
LOG_WARN("tenant has been dropped", K(ret), K(tenant_id));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
} else if (OB_TMP_FAIL(ObAllTenantInfoProxy::is_standby_tenant(GCTX.sql_proxy_, tenant_id, is_tenant_standby))) {
|
||||||
|
LOG_WARN("check is standby tenant failed", K(tmp_ret), K(tenant_id));
|
||||||
|
} else if (is_tenant_standby) {
|
||||||
|
ret = OB_STANDBY_READ_ONLY;
|
||||||
|
LOG_WARN("tenant is standby now, stop wait", K(ret), K(tenant_id));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
} else if (observer::ObServer::get_instance().is_stopped()) {
|
||||||
|
ret = OB_TIMEOUT;
|
||||||
|
LOG_WARN("server is stopping", K(ret), K(tenant_id));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
} // end of namespace storage
|
} // end of namespace storage
|
||||||
} // end of namespace oceanbase
|
} // end of namespace oceanbase
|
||||||
|
|||||||
@ -25,7 +25,7 @@ class ObDDLServerClient final
|
|||||||
public:
|
public:
|
||||||
static int create_hidden_table(const obrpc::ObCreateHiddenTableArg &arg, obrpc::ObCreateHiddenTableRes &res, int64_t &snapshot_version, sql::ObSQLSessionInfo &session);
|
static int create_hidden_table(const obrpc::ObCreateHiddenTableArg &arg, obrpc::ObCreateHiddenTableRes &res, int64_t &snapshot_version, sql::ObSQLSessionInfo &session);
|
||||||
static int start_redef_table(const obrpc::ObStartRedefTableArg &arg, obrpc::ObStartRedefTableRes &res, sql::ObSQLSessionInfo &session);
|
static int start_redef_table(const obrpc::ObStartRedefTableArg &arg, obrpc::ObStartRedefTableRes &res, sql::ObSQLSessionInfo &session);
|
||||||
static int copy_table_dependents(const obrpc::ObCopyTableDependentsArg &arg);
|
static int copy_table_dependents(const obrpc::ObCopyTableDependentsArg &arg, sql::ObSQLSessionInfo &session);
|
||||||
static int finish_redef_table(const obrpc::ObFinishRedefTableArg &finish_redef_arg,
|
static int finish_redef_table(const obrpc::ObFinishRedefTableArg &finish_redef_arg,
|
||||||
const obrpc::ObDDLBuildSingleReplicaResponseArg &build_single_arg,
|
const obrpc::ObDDLBuildSingleReplicaResponseArg &build_single_arg,
|
||||||
sql::ObSQLSessionInfo &session);
|
sql::ObSQLSessionInfo &session);
|
||||||
@ -34,6 +34,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
static int wait_task_reach_pending(const uint64_t tenant_id, const int64_t task_id, int64_t &snapshot_version, ObMySQLProxy &sql_proxy, sql::ObSQLSessionInfo &session);
|
static int wait_task_reach_pending(const uint64_t tenant_id, const int64_t task_id, int64_t &snapshot_version, ObMySQLProxy &sql_proxy, sql::ObSQLSessionInfo &session);
|
||||||
static int heart_beat_clear(const int64_t task_id);
|
static int heart_beat_clear(const int64_t task_id);
|
||||||
|
static int check_need_stop(const uint64_t tenant_id);
|
||||||
};
|
};
|
||||||
|
|
||||||
} // end of namespace observer
|
} // end of namespace observer
|
||||||
|
|||||||
Reference in New Issue
Block a user