From de08581e98c0916e2e07a7228438020a176871ac Mon Sep 17 00:00:00 2001 From: suz-yang Date: Mon, 7 Aug 2023 03:24:25 +0000 Subject: [PATCH] Fix direct load client task not auto abort --- .../ob_table_direct_load_rpc_executor.cpp | 38 +++++++++++++++ .../ob_table_direct_load_rpc_struct.cpp | 3 +- .../client/ob_table_direct_load_rpc_struct.h | 7 ++- .../ob_table_load_client_service.cpp | 34 +++++++++++++ .../table_load/ob_table_load_client_service.h | 2 + .../table_load/ob_table_load_schema.cpp | 3 +- .../table_load/ob_table_load_schema.h | 2 +- .../table_load/ob_table_load_service.cpp | 48 +++++++++++++++++++ .../table_load/ob_table_load_service.h | 19 +++++++- 9 files changed, 150 insertions(+), 6 deletions(-) diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp index 5673f638a9..b1d8133193 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp @@ -90,6 +90,41 @@ int ObTableDirectLoadBeginExecutor::process() ret = OB_SUCCESS; client_task_ = nullptr; } + } else { + bool need_wait_finish = false; + ObTableLoadClientStatus wait_client_status; + ObTableLoadClientStatus client_status = client_task_->get_status(); + switch (client_status) { + case ObTableLoadClientStatus::RUNNING: + case ObTableLoadClientStatus::COMMITTING: + if (arg_.force_create_) { + if (OB_FAIL(ObTableLoadClientService::abort_task(client_task_))) { + LOG_WARN("fail to abort client task", KR(ret)); + } else { + need_wait_finish = true; + wait_client_status = ObTableLoadClientStatus::ABORT; + } + } + break; + case ObTableLoadClientStatus::COMMIT: + case ObTableLoadClientStatus::ABORT: + need_wait_finish = true; + wait_client_status = client_status; + break; + default: + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected client status", KR(ret), KPC(client_task_), K(client_status)); + break; + } + if (OB_SUCC(ret) && need_wait_finish) { + if (OB_FAIL(ObTableLoadClientService::wait_task_finish(client_task_, wait_client_status))) { + LOG_WARN("fail to wait client task finish", KR(ret), KPC(client_task_), + K(wait_client_status)); + } else { + ObTableLoadClientService::revert_task(client_task_); + client_task_ = nullptr; + } + } } } @@ -322,6 +357,9 @@ int ObTableDirectLoadAbortExecutor::process() LOG_WARN("fail to get client task", KR(ret), K(key)); } else if (OB_FAIL(ObTableLoadClientService::abort_task(client_task))) { LOG_WARN("fail to abort client task", KR(ret)); + } else if (OB_FAIL(ObTableLoadClientService::wait_task_finish(client_task, + ObTableLoadClientStatus::ABORT))) { + LOG_WARN("fail to abort client task", KR(ret)); } if (nullptr != client_task) { ObTableLoadClientService::revert_task(client_task); diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp index 3f32358869..390860d222 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp @@ -25,7 +25,8 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginArg, parallel_, max_error_row_count_, dup_action_, - timeout_); + timeout_, + force_create_); OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginRes, table_id_, diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h index aa8311ee72..4c20187a73 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h @@ -30,16 +30,19 @@ public: : parallel_(0), max_error_row_count_(0), dup_action_(sql::ObLoadDupActionType::LOAD_INVALID_MODE), - timeout_(0) + timeout_(0), + force_create_(false) { } - TO_STRING_KV(K_(table_name), K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout)); + TO_STRING_KV(K_(table_name), K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout), + K_(force_create)); public: ObString table_name_; int64_t parallel_; uint64_t max_error_row_count_; sql::ObLoadDupActionType dup_action_; int64_t timeout_; + bool force_create_; }; struct ObTableDirectLoadBeginRes diff --git a/src/observer/table_load/ob_table_load_client_service.cpp b/src/observer/table_load/ob_table_load_client_service.cpp index 353f452f8d..6bfd879974 100644 --- a/src/observer/table_load/ob_table_load_client_service.cpp +++ b/src/observer/table_load/ob_table_load_client_service.cpp @@ -528,6 +528,40 @@ int ObTableLoadClientService::abort_task(ObTableLoadClientTask *client_task) return ret; } +int ObTableLoadClientService::wait_task_finish(ObTableLoadClientTask *client_task, + ObTableLoadClientStatus client_status) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(nullptr == client_task)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KPC(client_task)); + } else if (OB_FAIL(client_task->check_status(client_status))) { + LOG_WARN("fail to check status", KR(ret), KPC(client_task), K(client_status)); + } else { + ObTimeoutCtx ctx; + if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, 10LL * 1000 * 1000))) { + LOG_WARN("fail to set default timeout ctx", KR(ret)); + } + while (OB_SUCC(ret)) { + if (ctx.is_timeouted()) { + ret = OB_TIMEOUT; + LOG_WARN("timeouted", KR(ret), K(ctx)); + } else { + ObTableLoadClientStatus client_status = client_task->get_status(); + if (client_task->get_ref_count() > 2) { + // wait + ob_usleep(100LL * 1000); + } else if (OB_FAIL(remove_task(client_task))) { + LOG_WARN("fail to remove client task", KR(ret), KPC(client_task)); + } else { + break; + } + } + } + } + return ret; +} + int ObTableLoadClientService::add_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task) { diff --git a/src/observer/table_load/ob_table_load_client_service.h b/src/observer/table_load/ob_table_load_client_service.h index 1ded3f3a90..9a75fa6712 100644 --- a/src/observer/table_load/ob_table_load_client_service.h +++ b/src/observer/table_load/ob_table_load_client_service.h @@ -48,6 +48,8 @@ public: static int get_task(const ObTableLoadKey &key, ObTableLoadClientTask *&client_task); static int commit_task(ObTableLoadClientTask *client_task); static int abort_task(ObTableLoadClientTask *client_task); + static int wait_task_finish(ObTableLoadClientTask *client_task, + table::ObTableLoadClientStatus client_status); int add_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task); int remove_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task); diff --git a/src/observer/table_load/ob_table_load_schema.cpp b/src/observer/table_load/ob_table_load_schema.cpp index b9faa61192..bf64576c4d 100644 --- a/src/observer/table_load/ob_table_load_schema.cpp +++ b/src/observer/table_load/ob_table_load_schema.cpp @@ -218,6 +218,7 @@ void ObTableLoadSchema::reset() column_descs_.reset(); multi_version_column_descs_.reset(); datum_utils_.reset(); + cmp_funcs_.reset(); partition_ids_.reset(); allocator_.reset(); is_inited_ = false; @@ -327,7 +328,7 @@ int ObTableLoadSchema::prepare_col_desc(const ObTableSchema *table_schema, commo return ret; } -int ObTableLoadSchema::init_cmp_funcs(const ObArray &col_descs, +int ObTableLoadSchema::init_cmp_funcs(const ObIArray &col_descs, const bool is_oracle_mode) { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_schema.h b/src/observer/table_load/ob_table_load_schema.h index 8ba41c30ab..af5dca7cd7 100644 --- a/src/observer/table_load/ob_table_load_schema.h +++ b/src/observer/table_load/ob_table_load_schema.h @@ -48,7 +48,7 @@ public: private: int init_table_schema(const share::schema::ObTableSchema *table_schema); int prepare_col_desc(const ObTableSchema *table_schema, common::ObIArray &col_descs); - int init_cmp_funcs(const common::ObArray &column_descs, + int init_cmp_funcs(const common::ObIArray &column_descs, const bool is_oracle_mode); public: common::ObArenaAllocator allocator_; diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index 10b7af7975..cd56f6e2d0 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -175,6 +175,48 @@ void ObTableLoadService::ObReleaseTask::runTimerTask() } } +/** + * ObClientTaskAutoAbortTask + */ + +int ObTableLoadService::ObClientTaskAutoAbortTask::init(uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("ObTableLoadService::ObClientTaskAutoAbortTask init twice", KR(ret), KP(this)); + } else { + tenant_id_ = tenant_id; + is_inited_ = true; + } + return ret; +} + +void ObTableLoadService::ObClientTaskAutoAbortTask::runTimerTask() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLoadService::ObClientTaskAutoAbortTask not init", KR(ret), KP(this)); + } else { + LOG_DEBUG("table load auto abort client task", K(tenant_id_)); + ObArray client_task_array; + if (OB_FAIL(service_.get_client_service().get_all_client_task(client_task_array))) { + LOG_WARN("fail to get all client task", KR(ret)); + } else { + for (int64_t i = 0; i < client_task_array.count(); ++i) { + ObTableLoadClientTask *client_task = client_task_array.at(i); + if (ObTableLoadClientStatus::ERROR == client_task->get_status()) { + if (OB_FAIL(ObTableLoadClientService::abort_task(client_task))) { + LOG_WARN("fail to abort client task", KR(ret), KPC(client_task)); + } + } + ObTableLoadClientService::revert_task(client_task); + } + } + } +} + /** * ObClientTaskPurgeTask */ @@ -372,6 +414,7 @@ ObTableLoadService::ObTableLoadService() : check_tenant_task_(*this), gc_task_(*this), release_task_(*this), + client_task_auto_abort_task_(*this), client_task_purge_task_(*this), is_stop_(false), is_inited_(false) @@ -394,6 +437,8 @@ int ObTableLoadService::init(uint64_t tenant_id) LOG_WARN("fail to init gc task", KR(ret)); } else if (OB_FAIL(release_task_.init(tenant_id))) { LOG_WARN("fail to init release task", KR(ret)); + } else if (OB_FAIL(client_task_auto_abort_task_.init(tenant_id))) { + LOG_WARN("fail to init client task auto abort task", KR(ret)); } else if (OB_FAIL(client_task_purge_task_.init(tenant_id))) { LOG_WARN("fail to init client task purge task", KR(ret)); } else { @@ -418,6 +463,9 @@ int ObTableLoadService::start() LOG_WARN("fail to schedule gc task", KR(ret)); } else if (OB_FAIL(timer_.schedule(release_task_, RELEASE_INTERVAL, true))) { LOG_WARN("fail to schedule release task", KR(ret)); + } else if (OB_FAIL(timer_.schedule(client_task_auto_abort_task_, + CLIENT_TASK_AUTO_ABORT_INTERVAL, true))) { + LOG_WARN("fail to schedule client task auto abort task", KR(ret)); } else if (OB_FAIL( timer_.schedule(client_task_purge_task_, CLIENT_TASK_PURGE_INTERVAL, true))) { LOG_WARN("fail to schedule client task purge task", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_service.h b/src/observer/table_load/ob_table_load_service.h index 528fbd1b70..e36febbc5d 100644 --- a/src/observer/table_load/ob_table_load_service.h +++ b/src/observer/table_load/ob_table_load_service.h @@ -55,7 +55,8 @@ private: static const int64_t CHECK_TENANT_INTERVAL = 1LL * 1000 * 1000; // 1s static const int64_t GC_INTERVAL = 30LL * 1000 * 1000; // 30s static const int64_t RELEASE_INTERVAL = 1LL * 1000 * 1000; // 1s - static const int64_t CLIENT_TASK_PURGE_INTERVAL = 1LL * 1000 * 1000; // 30s + static const int64_t CLIENT_TASK_AUTO_ABORT_INTERVAL = 1LL * 1000 * 1000; // 1s + static const int64_t CLIENT_TASK_PURGE_INTERVAL = 1LL * 1000 * 1000; // 1s class ObCheckTenantTask : public common::ObTimerTask { public: @@ -95,6 +96,21 @@ private: uint64_t tenant_id_; bool is_inited_; }; + class ObClientTaskAutoAbortTask : public common::ObTimerTask + { + public: + ObClientTaskAutoAbortTask(ObTableLoadService &service) + : service_(service), tenant_id_(common::OB_INVALID_ID), is_inited_(false) + { + } + virtual ~ObClientTaskAutoAbortTask() = default; + int init(uint64_t tenant_id); + void runTimerTask() override; + private: + ObTableLoadService &service_; + uint64_t tenant_id_; + bool is_inited_; + }; class ObClientTaskPurgeTask : public common::ObTimerTask { public: @@ -117,6 +133,7 @@ private: ObCheckTenantTask check_tenant_task_; ObGCTask gc_task_; ObReleaseTask release_task_; + ObClientTaskAutoAbortTask client_task_auto_abort_task_; ObClientTaskPurgeTask client_task_purge_task_; volatile bool is_stop_; bool is_inited_;