From 8668b506b8a6cc3fcd2a41454c7dbbc07e62075d Mon Sep 17 00:00:00 2001 From: coolfishchen Date: Tue, 12 Dec 2023 17:12:43 +0000 Subject: [PATCH] [CP] add retry for direct load if unit has migrated Co-authored-by: suz-yang --- src/observer/mysql/ob_query_retry_ctrl.cpp | 15 ++++++++++++++ .../table_load/ob_table_load_redef_table.cpp | 3 +++ .../table_load/ob_table_load_service.cpp | 20 +++++++++++-------- src/share/ob_ddl_common.h | 8 ++++++++ src/sql/engine/cmd/ob_load_data_executor.cpp | 12 +++++------ src/sql/engine/cmd/ob_load_data_executor.h | 2 +- .../engine/cmd/ob_table_direct_insert_ctx.cpp | 1 + .../engine/cmd/ob_table_direct_insert_ctx.h | 10 ++++++++-- 8 files changed, 54 insertions(+), 17 deletions(-) diff --git a/src/observer/mysql/ob_query_retry_ctrl.cpp b/src/observer/mysql/ob_query_retry_ctrl.cpp index a2200415fd..83affe2e58 100644 --- a/src/observer/mysql/ob_query_retry_ctrl.cpp +++ b/src/observer/mysql/ob_query_retry_ctrl.cpp @@ -259,6 +259,13 @@ class ObStmtTypeRetryPolicy : public ObRetryPolicy public: ObStmtTypeRetryPolicy() = default; ~ObStmtTypeRetryPolicy() = default; + + bool is_direct_load(ObRetryParam &v) const + { + ObExecContext &exec_ctx = v.result_.get_exec_context(); + return exec_ctx.get_table_direct_insert_ctx().get_is_direct(); + } + virtual void test(ObRetryParam &v) const override { int err = v.err_; @@ -280,6 +287,14 @@ public: v.retry_type_ = RETRY_TYPE_NONE; } v.no_more_test_ = true; + } else if (is_direct_load(v)) { + if (is_direct_load_retry_err(err)) { + try_packet_retry(v); + } else { + v.client_ret_ = err; + v.retry_type_ = RETRY_TYPE_NONE; + } + v.no_more_test_ = true; } } }; diff --git a/src/observer/table_load/ob_table_load_redef_table.cpp b/src/observer/table_load/ob_table_load_redef_table.cpp index e2dc8fbddb..3c2ffe1804 100644 --- a/src/observer/table_load/ob_table_load_redef_table.cpp +++ b/src/observer/table_load/ob_table_load_redef_table.cpp @@ -113,6 +113,9 @@ int ObTableLoadRedefTable::finish(const ObTableLoadRedefTableFinishArg &arg, if (OB_FAIL(ObDDLServerClient::finish_redef_table( finish_redef_table_arg, build_single_replica_response_arg, session_info))) { LOG_WARN("failed to finish redef table", KR(ret), K(finish_redef_table_arg)); + if (ret == OB_NOT_MASTER) { //sql cannot be retried here, so change errcode + ret = OB_DIRECT_LOAD_COMMIT_ERROR; + } } else { LOG_INFO("succeed to finish redef table", KR(ret), K(finish_redef_table_arg)); } diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index 38971903d1..5a2a15505b 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -60,16 +60,12 @@ void ObTableLoadService::ObCheckTenantTask::runTimerTask() LOG_WARN("ObTableLoadService::ObCheckTenantTask not init", KR(ret), KP(this)); } else { LOG_DEBUG("table load check tenant", K(tenant_id_)); - ObTenant *tenant = nullptr; - if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id_, tenant))) { - LOG_WARN("fail to get tenant", KR(ret), K(tenant_id_)); - } else if (OB_UNLIKELY(ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL != - tenant->get_unit_status())) { - LOG_DEBUG("tenant unit status not normal, clear", K(tenant_id_), KPC(tenant)); + if (OB_FAIL(ObTableLoadService::check_tenant())) { + LOG_WARN("fail to check_tenant", KR(ret)); // abort all client task service_.abort_all_client_task(); // fail all current tasks - service_.fail_all_ctx(OB_ERR_UNEXPECTED_UNIT_STATUS); + service_.fail_all_ctx(ret); } } } @@ -372,6 +368,9 @@ int ObTableLoadService::mtl_init(ObTableLoadService *&service) return ret; } + + + int ObTableLoadService::check_tenant() { int ret = OB_SUCCESS; @@ -379,8 +378,13 @@ int ObTableLoadService::check_tenant() ObTenant *tenant = nullptr; if (OB_FAIL(GCTX.omt_->get_tenant(tenant_id, tenant))) { LOG_WARN("fail to get tenant", KR(ret), K(tenant_id)); + } else if (tenant->get_unit_status() == ObUnitInfoGetter::ObUnitStatus::UNIT_MARK_DELETING + || tenant->get_unit_status() == ObUnitInfoGetter::ObUnitStatus::UNIT_WAIT_GC_IN_OBSERVER + || tenant->get_unit_status() == ObUnitInfoGetter::ObUnitStatus::UNIT_DELETING_IN_OBSERVER) { + ret = OB_EAGAIN; + LOG_WARN("unit is migrate out, should retry direct load", KR(ret), K(tenant->get_unit_status())); } else if (OB_UNLIKELY(ObUnitInfoGetter::ObUnitStatus::UNIT_NORMAL != - tenant->get_unit_status())) { + tenant->get_unit_status() && ObUnitInfoGetter::ObUnitStatus::UNIT_MIGRATE_OUT != tenant->get_unit_status())) { ret = OB_ERR_UNEXPECTED_UNIT_STATUS; LOG_WARN("unit status not normal", KR(ret), K(tenant->get_unit_status())); } diff --git a/src/share/ob_ddl_common.h b/src/share/ob_ddl_common.h index c563dad7a7..f1ac58332f 100644 --- a/src/share/ob_ddl_common.h +++ b/src/share/ob_ddl_common.h @@ -251,6 +251,14 @@ static inline bool is_ddl_stmt_packet_retry_err(const int ret) ; } +static inline bool is_direct_load_retry_err(const int ret) +{ + return is_ddl_stmt_packet_retry_err(ret) || ret == OB_TABLET_NOT_EXIST || ret == OB_LS_NOT_EXIST + || ret == OB_NOT_MASTER + || ret == OB_TASK_EXPIRED + ; +} + enum ObCheckExistedDDLMode { INVALID_DDL_MODE = 0, diff --git a/src/sql/engine/cmd/ob_load_data_executor.cpp b/src/sql/engine/cmd/ob_load_data_executor.cpp index 0f4101e781..a98d263492 100644 --- a/src/sql/engine/cmd/ob_load_data_executor.cpp +++ b/src/sql/engine/cmd/ob_load_data_executor.cpp @@ -24,7 +24,7 @@ namespace oceanbase namespace sql { -int ObLoadDataExecutor::check_is_direct_load(const ObLoadDataHint &load_hint, bool &check_ret) +int ObLoadDataExecutor::check_is_direct_load(ObTableDirectInsertCtx &ctx, const ObLoadDataHint &load_hint) { int ret = OB_SUCCESS; int64_t enable_direct = 0; @@ -34,9 +34,9 @@ int ObLoadDataExecutor::check_is_direct_load(const ObLoadDataHint &load_hint, bo } else if (OB_FAIL(load_hint.get_value(ObLoadDataHint::APPEND, append))) { LOG_WARN("fail to get value of APPEND", K(ret)); } else if ((enable_direct != 0 || append != 0) && GCONF._ob_enable_direct_load) { - check_ret = true; + ctx.set_is_direct(true); } else { - check_ret = false; + ctx.set_is_direct(false); } return ret; } @@ -44,15 +44,15 @@ int ObLoadDataExecutor::check_is_direct_load(const ObLoadDataHint &load_hint, bo int ObLoadDataExecutor::execute(ObExecContext &ctx, ObLoadDataStmt &stmt) { int ret = OB_SUCCESS; + ObTableDirectInsertCtx &table_direct_insert_ctx = ctx.get_table_direct_insert_ctx(); ObLoadDataBase *load_impl = NULL; - bool is_direct_load = false; if (!stmt.get_load_arguments().is_csv_format_) { ret = OB_NOT_SUPPORTED; LOG_WARN("invalid resolver results", K(ret)); - } else if (OB_FAIL(check_is_direct_load(stmt.get_hints(), is_direct_load))) { + } else if (OB_FAIL(check_is_direct_load(table_direct_insert_ctx, stmt.get_hints()))) { LOG_WARN("fail to check is load mode", KR(ret)); } else { - if (!is_direct_load) { + if (!table_direct_insert_ctx.get_is_direct()) { if (OB_ISNULL(load_impl = OB_NEWx(ObLoadDataSPImpl, (&ctx.get_allocator())))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate memory failed", K(ret)); diff --git a/src/sql/engine/cmd/ob_load_data_executor.h b/src/sql/engine/cmd/ob_load_data_executor.h index b0398cc51d..2fc071bdc6 100644 --- a/src/sql/engine/cmd/ob_load_data_executor.h +++ b/src/sql/engine/cmd/ob_load_data_executor.h @@ -26,7 +26,7 @@ public: int execute(ObExecContext &ctx, ObLoadDataStmt &stmt); private: - int check_is_direct_load(const ObLoadDataHint &load_hint, bool &check_ret); + int check_is_direct_load(ObTableDirectInsertCtx &ctx, const ObLoadDataHint &load_hint); private: // disallow copy DISALLOW_COPY_AND_ASSIGN(ObLoadDataExecutor); diff --git a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp index fd8094c10e..2620c50fb4 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp +++ b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp @@ -45,6 +45,7 @@ int ObTableDirectInsertCtx::init(ObExecContext *exec_ctx, ret = OB_INVALID_ARGUMENT; LOG_WARN("exec_ctx cannot be null", KR(ret)); } else { + is_direct_ = true; if (OB_ISNULL(load_exec_ctx_ = OB_NEWx(ObTableLoadSqlExecCtx, &exec_ctx->get_allocator()))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadSqlExecCtx", KR(ret)); diff --git a/src/sql/engine/cmd/ob_table_direct_insert_ctx.h b/src/sql/engine/cmd/ob_table_direct_insert_ctx.h index c806828497..bfe680b449 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_ctx.h +++ b/src/sql/engine/cmd/ob_table_direct_insert_ctx.h @@ -32,7 +32,8 @@ public: ObTableDirectInsertCtx() : load_exec_ctx_(nullptr), table_load_instance_(nullptr), - is_inited_(false) {} + is_inited_(false), + is_direct_(false) {} ~ObTableDirectInsertCtx(); TO_STRING_KV(K_(is_inited)); public: @@ -40,6 +41,10 @@ public: int commit(); int finish(); void destroy(); + + bool get_is_direct() const { return is_direct_; } + void set_is_direct(bool is_direct) { is_direct_ = is_direct; } + private: int init_store_column_idxs(const uint64_t tenant_id, const uint64_t table_id, common::ObIArray &store_column_idxs); @@ -47,6 +52,7 @@ private: observer::ObTableLoadSqlExecCtx *load_exec_ctx_; observer::ObTableLoadInstance *table_load_instance_; bool is_inited_; + bool is_direct_; //indict whether the plan is direct load plan including insert into append and load data direct }; } // namespace observer -} // namespace oceanbase \ No newline at end of file +} // namespace oceanbase