diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp index 836878355..6075addbb 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp @@ -94,7 +94,7 @@ int ObTableDirectLoadBeginExecutor::process() } // get the existing client task if it exists - if (OB_SUCC(ret)) { + while (OB_SUCC(ret)) { ObTableLoadKey key(tenant_id, table_id); if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task_))) { if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { @@ -102,6 +102,7 @@ int ObTableDirectLoadBeginExecutor::process() } else { ret = OB_SUCCESS; client_task_ = nullptr; + break; } } else { bool need_wait_finish = false; @@ -129,13 +130,15 @@ int ObTableDirectLoadBeginExecutor::process() LOG_WARN("unexpected client status", KR(ret), KPC(client_task_), K(client_status)); break; } - if (OB_SUCC(ret) && need_wait_finish) { - if (OB_FAIL(ObTableLoadClientService::wait_task_finish(client_task_, wait_client_status))) { - LOG_WARN("fail to wait client task finish", KR(ret), KPC(client_task_), - K(wait_client_status)); - } else { - ObTableLoadClientService::revert_task(client_task_); - client_task_ = nullptr; + if (OB_FAIL(ret)) { + } else if (!need_wait_finish) { + break; + } else { + ObTableLoadUniqueKey task_key(table_id, client_task_->ddl_param_.task_id_); + ObTableLoadClientService::revert_task(client_task_); + client_task_ = nullptr; + if (OB_FAIL(ObTableLoadClientService::wait_task_finish(task_key))) { + LOG_WARN("fail to wait client task finish", KR(ret), K(task_key), K(wait_client_status)); } } } @@ -370,14 +373,14 @@ int ObTableDirectLoadAbortExecutor::process() LOG_WARN("fail to get client task", KR(ret), K(key)); } else if (OB_FAIL(ObTableLoadClientService::abort_task(client_task))) { LOG_WARN("fail to abort client task", KR(ret)); - } else if (OB_FAIL(ObTableLoadClientService::wait_task_finish(client_task, - ObTableLoadClientStatus::ABORT))) { - LOG_WARN("fail to abort client task", KR(ret)); } if (nullptr != client_task) { ObTableLoadClientService::revert_task(client_task); client_task = nullptr; } + if (OB_SUCC(ret) && OB_FAIL(ObTableLoadClientService::wait_task_finish(key))) { + LOG_WARN("fail to wait client task finish", KR(ret), K(key)); + } return ret; } diff --git a/src/observer/table_load/ob_table_load_client_service.cpp b/src/observer/table_load/ob_table_load_client_service.cpp index 6bfd87997..4ef71b772 100644 --- a/src/observer/table_load/ob_table_load_client_service.cpp +++ b/src/observer/table_load/ob_table_load_client_service.cpp @@ -490,6 +490,21 @@ int ObTableLoadClientService::get_task(const ObTableLoadKey &key, return ret; } +int ObTableLoadClientService::exist_task(const ObTableLoadUniqueKey &key, bool &is_exist) +{ + int ret = OB_SUCCESS; + ObTableLoadService *service = nullptr; + if (OB_ISNULL(service = MTL(ObTableLoadService *))) { + ret = OB_ERR_SYS; + LOG_WARN("null table load service", KR(ret)); + } else { + if (OB_FAIL(service->get_client_service().exist_client_task(key, is_exist))) { + LOG_WARN("fail to check exist client task", KR(ret), K(key)); + } + } + return ret; +} + int ObTableLoadClientService::commit_task(ObTableLoadClientTask *client_task) { int ret = OB_SUCCESS; @@ -528,34 +543,27 @@ int ObTableLoadClientService::abort_task(ObTableLoadClientTask *client_task) return ret; } -int ObTableLoadClientService::wait_task_finish(ObTableLoadClientTask *client_task, - ObTableLoadClientStatus client_status) +int ObTableLoadClientService::wait_task_finish(const ObTableLoadUniqueKey &key) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(nullptr == client_task)) { + if (OB_UNLIKELY(!key.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), KPC(client_task)); - } else if (OB_FAIL(client_task->check_status(client_status))) { - LOG_WARN("fail to check status", KR(ret), KPC(client_task), K(client_status)); + LOG_WARN("invalid args", KR(ret), K(key)); } else { + bool is_exist = true; ObTimeoutCtx ctx; if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, 10LL * 1000 * 1000))) { LOG_WARN("fail to set default timeout ctx", KR(ret)); } - while (OB_SUCC(ret)) { + while (OB_SUCC(ret) && is_exist) { if (ctx.is_timeouted()) { ret = OB_TIMEOUT; LOG_WARN("timeouted", KR(ret), K(ctx)); - } else { - ObTableLoadClientStatus client_status = client_task->get_status(); - if (client_task->get_ref_count() > 2) { - // wait - ob_usleep(100LL * 1000); - } else if (OB_FAIL(remove_task(client_task))) { - LOG_WARN("fail to remove client task", KR(ret), KPC(client_task)); - } else { - break; - } + } else if (OB_FAIL(exist_task(key, is_exist))) { + LOG_WARN("fail to check exist client task", KR(ret), K(key)); + } else if (is_exist) { + // wait + ob_usleep(100LL * 1000); } } } @@ -731,6 +739,29 @@ int ObTableLoadClientService::get_client_task_by_table_id(uint64_t table_id, return ret; } +int ObTableLoadClientService::exist_client_task(const ObTableLoadUniqueKey &key, bool &is_exist) +{ + int ret = OB_SUCCESS; + is_exist = false; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObTableLoadClientService not init", KR(ret), KP(this)); + } else { + obsys::ObRLockGuard guard(rwlock_); + ObTableLoadClientTask *client_task = nullptr; + if (OB_FAIL(client_task_map_.get_refactored(key, client_task))) { + if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) { + LOG_WARN("fail to get refactored", KR(ret), K(key)); + } else { + ret = OB_ENTRY_NOT_EXIST; + } + } else { + is_exist = true; + } + } + return ret; +} + int64_t ObTableLoadClientService::get_client_task_count() const { obsys::ObRLockGuard guard(rwlock_); diff --git a/src/observer/table_load/ob_table_load_client_service.h b/src/observer/table_load/ob_table_load_client_service.h index 9a75fa671..63034594e 100644 --- a/src/observer/table_load/ob_table_load_client_service.h +++ b/src/observer/table_load/ob_table_load_client_service.h @@ -46,16 +46,17 @@ public: static int remove_task(ObTableLoadClientTask *client_task); static int get_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *&client_task); static int get_task(const ObTableLoadKey &key, ObTableLoadClientTask *&client_task); + static int exist_task(const ObTableLoadUniqueKey &key, bool &is_exist); static int commit_task(ObTableLoadClientTask *client_task); static int abort_task(ObTableLoadClientTask *client_task); - static int wait_task_finish(ObTableLoadClientTask *client_task, - table::ObTableLoadClientStatus client_status); + static int wait_task_finish(const ObTableLoadUniqueKey &key); int add_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task); int remove_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *client_task); int get_all_client_task(common::ObIArray &client_task_array); int get_client_task(const ObTableLoadUniqueKey &key, ObTableLoadClientTask *&client_task); int get_client_task_by_table_id(uint64_t table_id, ObTableLoadClientTask *&client_task); + int exist_client_task(const ObTableLoadUniqueKey &key, bool &is_exist); int64_t get_client_task_count() const; void purge_client_task();