[CP] add retry for direct load if unit has migrated

This commit is contained in:
obdev
2024-02-07 17:49:30 +00:00
committed by ob-robot
parent 66317b738e
commit 1e03c39854
8 changed files with 54 additions and 17 deletions

View File

@ -259,6 +259,13 @@ class ObStmtTypeRetryPolicy : public ObRetryPolicy
public: public:
ObStmtTypeRetryPolicy() = default; ObStmtTypeRetryPolicy() = default;
~ObStmtTypeRetryPolicy() = default; ~ObStmtTypeRetryPolicy() = default;
bool is_direct_load(ObRetryParam &v) const
{
ObExecContext &exec_ctx = v.result_.get_exec_context();
return exec_ctx.get_table_direct_insert_ctx().get_is_direct();
}
virtual void test(ObRetryParam &v) const override virtual void test(ObRetryParam &v) const override
{ {
int err = v.err_; int err = v.err_;
@ -280,6 +287,14 @@ public:
v.retry_type_ = RETRY_TYPE_NONE; v.retry_type_ = RETRY_TYPE_NONE;
} }
v.no_more_test_ = true; v.no_more_test_ = true;
} else if (is_direct_load(v)) {
if (is_direct_load_retry_err(err)) {
try_packet_retry(v);
} else {
v.client_ret_ = err;
v.retry_type_ = RETRY_TYPE_NONE;
}
v.no_more_test_ = true;
} }
} }
}; };

View File

@ -113,6 +113,9 @@ int ObTableLoadRedefTable::finish(const ObTableLoadRedefTableFinishArg &arg,
if (OB_FAIL(ObDDLServerClient::finish_redef_table( if (OB_FAIL(ObDDLServerClient::finish_redef_table(
finish_redef_table_arg, build_single_replica_response_arg, session_info))) { finish_redef_table_arg, build_single_replica_response_arg, session_info))) {
LOG_WARN("failed to finish redef table", KR(ret), K(finish_redef_table_arg)); LOG_WARN("failed to finish redef table", KR(ret), K(finish_redef_table_arg));
if (ret == OB_NOT_MASTER) { //sql cannot be retried here, so change errcode
ret = OB_DIRECT_LOAD_COMMIT_ERROR;
}
} else { } else {
LOG_INFO("succeed to finish redef table", KR(ret), K(finish_redef_table_arg)); LOG_INFO("succeed to finish redef table", KR(ret), K(finish_redef_table_arg));
} }

View File

@ -60,16 +60,12 @@ void ObTableLoadService::ObCheckTenantTask::runTimerTask()
LOG_WARN("ObTableLoadService::ObCheckTenantTask not init", KR(ret), KP(this)); LOG_WARN("ObTableLoadService::ObCheckTenantTask not init", KR(ret), KP(this));
} else { } else {
LOG_DEBUG("table load check tenant", K(tenant_id_)); LOG_DEBUG("table load check tenant", K(tenant_id_));
ObTenant *tenant = nullptr; if (OB_FAIL(ObTableLoadService::check_tenant())) {
if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id_, tenant))) { LOG_WARN("fail to check_tenant", KR(ret));
LOG_WARN("fail to get tenant", KR(ret), K(tenant_id_));
} else if (OB_UNLIKELY(ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL !=
tenant->get_unit_status())) {
LOG_DEBUG("tenant unit status not normal, clear", K(tenant_id_), KPC(tenant));
// abort all client task // abort all client task
service_.abort_all_client_task(); service_.abort_all_client_task();
// fail all current tasks // fail all current tasks
service_.fail_all_ctx(OB_ERR_UNEXPECTED_UNIT_STATUS); service_.fail_all_ctx(ret);
} }
} }
} }
@ -372,6 +368,9 @@ int ObTableLoadService::mtl_init(ObTableLoadService *&service)
return ret; return ret;
} }
int ObTableLoadService::check_tenant() int ObTableLoadService::check_tenant()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -379,8 +378,13 @@ int ObTableLoadService::check_tenant()
ObTenant *tenant = nullptr; ObTenant *tenant = nullptr;
if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) { if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) {
LOG_WARN("fail to get tenant", KR(ret), K(tenant_id)); LOG_WARN("fail to get tenant", KR(ret), K(tenant_id));
} else if (tenant->get_unit_status() == ObUnitInfoGetter::ObUnitStatus::UNIT_MARK_DELETING
|| tenant->get_unit_status() == ObUnitInfoGetter::ObUnitStatus::UNIT_WAIT_GC_IN_OBSERVER
|| tenant->get_unit_status() == ObUnitInfoGetter::ObUnitStatus::UNIT_DELETING_IN_OBSERVER) {
ret = OB_EAGAIN;
LOG_WARN("unit is migrate out, should retry direct load", KR(ret), K(tenant->get_unit_status()));
} else if (OB_UNLIKELY(ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL != } else if (OB_UNLIKELY(ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL !=
tenant->get_unit_status())) { tenant->get_unit_status() && ObUnitInfoGetter::ObUnitStatus::UNIT_MIGRATE_OUT != tenant->get_unit_status())) {
ret = OB_ERR_UNEXPECTED_UNIT_STATUS; ret = OB_ERR_UNEXPECTED_UNIT_STATUS;
LOG_WARN("unit status not normal", KR(ret), K(tenant->get_unit_status())); LOG_WARN("unit status not normal", KR(ret), K(tenant->get_unit_status()));
} }

View File

@ -251,6 +251,14 @@ static inline bool is_ddl_stmt_packet_retry_err(const int ret)
; ;
} }
static inline bool is_direct_load_retry_err(const int ret)
{
return is_ddl_stmt_packet_retry_err(ret) || ret == OB_TABLET_NOT_EXIST || ret == OB_LS_NOT_EXIST
|| ret == OB_NOT_MASTER
|| ret == OB_TASK_EXPIRED
;
}
enum ObCheckExistedDDLMode enum ObCheckExistedDDLMode
{ {
INVALID_DDL_MODE = 0, INVALID_DDL_MODE = 0,

View File

@ -24,7 +24,7 @@ namespace oceanbase
namespace sql namespace sql
{ {
int ObLoadDataExecutor::check_is_direct_load(const ObLoadDataHint &load_hint, bool &check_ret) int ObLoadDataExecutor::check_is_direct_load(ObTableDirectInsertCtx &ctx, const ObLoadDataHint &load_hint)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t enable_direct = 0; int64_t enable_direct = 0;
@ -34,9 +34,9 @@ int ObLoadDataExecutor::check_is_direct_load(const ObLoadDataHint &load_hint, bo
} else if (OB_FAIL(load_hint.get_value(ObLoadDataHint::APPEND, append))) { } else if (OB_FAIL(load_hint.get_value(ObLoadDataHint::APPEND, append))) {
LOG_WARN("fail to get value of APPEND", K(ret)); LOG_WARN("fail to get value of APPEND", K(ret));
} else if ((enable_direct != 0 || append != 0) && GCONF._ob_enable_direct_load) { } else if ((enable_direct != 0 || append != 0) && GCONF._ob_enable_direct_load) {
check_ret = true; ctx.set_is_direct(true);
} else { } else {
check_ret = false; ctx.set_is_direct(false);
} }
return ret; return ret;
} }
@ -44,15 +44,15 @@ int ObLoadDataExecutor::check_is_direct_load(const ObLoadDataHint &load_hint, bo
int ObLoadDataExecutor::execute(ObExecContext &ctx, ObLoadDataStmt &stmt) int ObLoadDataExecutor::execute(ObExecContext &ctx, ObLoadDataStmt &stmt)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObTableDirectInsertCtx &table_direct_insert_ctx = ctx.get_table_direct_insert_ctx();
ObLoadDataBase *load_impl = NULL; ObLoadDataBase *load_impl = NULL;
bool is_direct_load = false;
if (!stmt.get_load_arguments().is_csv_format_) { if (!stmt.get_load_arguments().is_csv_format_) {
ret = OB_NOT_SUPPORTED; ret = OB_NOT_SUPPORTED;
LOG_WARN("invalid resolver results", K(ret)); LOG_WARN("invalid resolver results", K(ret));
} else if (OB_FAIL(check_is_direct_load(stmt.get_hints(), is_direct_load))) { } else if (OB_FAIL(check_is_direct_load(table_direct_insert_ctx, stmt.get_hints()))) {
LOG_WARN("fail to check is load mode", KR(ret)); LOG_WARN("fail to check is load mode", KR(ret));
} else { } else {
if (!is_direct_load) { if (!table_direct_insert_ctx.get_is_direct()) {
if (OB_ISNULL(load_impl = OB_NEWx(ObLoadDataSPImpl, (&ctx.get_allocator())))) { if (OB_ISNULL(load_impl = OB_NEWx(ObLoadDataSPImpl, (&ctx.get_allocator())))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret)); LOG_WARN("allocate memory failed", K(ret));

View File

@ -26,7 +26,7 @@ public:
int execute(ObExecContext &ctx, ObLoadDataStmt &stmt); int execute(ObExecContext &ctx, ObLoadDataStmt &stmt);
private: private:
int check_is_direct_load(const ObLoadDataHint &load_hint, bool &check_ret); int check_is_direct_load(ObTableDirectInsertCtx &ctx, const ObLoadDataHint &load_hint);
private: private:
// disallow copy // disallow copy
DISALLOW_COPY_AND_ASSIGN(ObLoadDataExecutor); DISALLOW_COPY_AND_ASSIGN(ObLoadDataExecutor);

View File

@ -45,6 +45,7 @@ int ObTableDirectInsertCtx::init(ObExecContext *exec_ctx,
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("exec_ctx cannot be null", KR(ret)); LOG_WARN("exec_ctx cannot be null", KR(ret));
} else { } else {
is_direct_ = true;
if (OB_ISNULL(load_exec_ctx_ = OB_NEWx(ObTableLoadSqlExecCtx, &exec_ctx->get_allocator()))) { if (OB_ISNULL(load_exec_ctx_ = OB_NEWx(ObTableLoadSqlExecCtx, &exec_ctx->get_allocator()))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadSqlExecCtx", KR(ret)); LOG_WARN("fail to new ObTableLoadSqlExecCtx", KR(ret));

View File

@ -32,7 +32,8 @@ public:
ObTableDirectInsertCtx() ObTableDirectInsertCtx()
: load_exec_ctx_(nullptr), : load_exec_ctx_(nullptr),
table_load_instance_(nullptr), table_load_instance_(nullptr),
is_inited_(false) {} is_inited_(false),
is_direct_(false) {}
~ObTableDirectInsertCtx(); ~ObTableDirectInsertCtx();
TO_STRING_KV(K_(is_inited)); TO_STRING_KV(K_(is_inited));
public: public:
@ -40,6 +41,10 @@ public:
int commit(); int commit();
int finish(); int finish();
void destroy(); void destroy();
bool get_is_direct() const { return is_direct_; }
void set_is_direct(bool is_direct) { is_direct_ = is_direct; }
private: private:
int init_store_column_idxs(const uint64_t tenant_id, const uint64_t table_id, int init_store_column_idxs(const uint64_t tenant_id, const uint64_t table_id,
common::ObIArray<int64_t> &store_column_idxs); common::ObIArray<int64_t> &store_column_idxs);
@ -47,6 +52,7 @@ private:
observer::ObTableLoadSqlExecCtx *load_exec_ctx_; observer::ObTableLoadSqlExecCtx *load_exec_ctx_;
observer::ObTableLoadInstance *table_load_instance_; observer::ObTableLoadInstance *table_load_instance_;
bool is_inited_; bool is_inited_;
bool is_direct_; //indict whether the plan is direct load plan including insert into append and load data direct
}; };
} // namespace observer } // namespace observer
} // namespace oceanbase } // namespace oceanbase