From 30366c24eb2ddbe9b6d888fa11a560800023727f Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 8 Feb 2024 07:04:23 +0000 Subject: [PATCH] [CP] Fix direct load work thread hang when finish many trans at once --- .../ob_table_direct_load_rpc_executor.cpp | 26 +++------ .../ob_table_load_client_service.cpp | 58 ++++++++----------- .../table_load/ob_table_load_client_task.cpp | 23 -------- .../table_load/ob_table_load_client_task.h | 20 +++---- 4 files changed, 41 insertions(+), 86 deletions(-) diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp index 548d5333a..d0eaeb185 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp @@ -290,22 +290,15 @@ int ObTableDirectLoadBeginExecutor::do_begin() { int ret = OB_SUCCESS; ObTableLoadCoordinator coordinator(table_ctx_); + ObTableLoadTransId trans_id; if (OB_FAIL(coordinator.init())) { LOG_WARN("fail to init coordinator", KR(ret)); } else if (OB_FAIL(coordinator.begin())) { LOG_WARN("fail to coordinator begin", KR(ret)); - } - // start trans - for (int64_t i = 1; OB_SUCC(ret) && i <= table_ctx_->param_.session_count_; ++i) { - ObTableLoadSegmentID segment_id(i); - ObTableLoadTransId trans_id; - if (OB_FAIL(coordinator.start_trans(segment_id, trans_id))) { - LOG_WARN("fail to start trans", KR(ret), K(i)); - } else if (OB_FAIL(client_task_->add_trans_id(trans_id))) { - LOG_WARN("fail to add trans id", KR(ret), K(trans_id)); - } - } - if (OB_SUCC(ret)) { + } else if (OB_FAIL(coordinator.start_trans(ObTableLoadSegmentID(1), trans_id))) { + LOG_WARN("fail to start trans", KR(ret)); + } else { + client_task_->set_trans_id(trans_id); if (OB_FAIL(client_task_->set_status_running())) { LOG_WARN("fail to set status running", KR(ret)); } @@ -459,15 +452,14 @@ int ObTableDirectLoadInsertExecutor::process() LOG_WARN("fail to get table ctx", KR(ret)); } else { ObTableLoadCoordinator coordinator(table_ctx); - ObTableLoadTransId trans_id; - int64_t batch_id = client_task->get_next_batch_id(); + const ObTableLoadTransId &trans_id = client_task->get_trans_id(); + const int64_t batch_id = client_task->get_next_batch_id(); + const int32_t session_id = batch_id % table_ctx->param_.session_count_ + 1; if (OB_FAIL(set_batch_seq_no(batch_id, obj_rows))) { LOG_WARN("fail to set batch seq no", KR(ret)); - } else if (OB_FAIL(client_task->get_next_trans_id(trans_id))) { - LOG_WARN("fail to get next trans id", KR(ret)); } else if (OB_FAIL(coordinator.init())) { LOG_WARN("fail to init coordinator", KR(ret)); - } else if (OB_FAIL(coordinator.write(trans_id, obj_rows))) { + } else if (OB_FAIL(coordinator.write(trans_id, session_id, 0 /*seq_no*/, obj_rows))) { LOG_WARN("fail to coordinator write", KR(ret)); } } diff --git a/src/observer/table_load/ob_table_load_client_service.cpp b/src/observer/table_load/ob_table_load_client_service.cpp index 371b43198..907c0dfe5 100644 --- a/src/observer/table_load/ob_table_load_client_service.cpp +++ b/src/observer/table_load/ob_table_load_client_service.cpp @@ -106,16 +106,12 @@ private: if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) { LOG_WARN("fail to check status", KR(ret)); } else { - const ObIArray &trans_ids = client_task_->get_trans_ids(); ObTableLoadCoordinator coordinator(table_ctx_); + const ObTableLoadTransId &trans_id = client_task_->get_trans_id(); if (OB_FAIL(coordinator.init())) { LOG_WARN("fail to init coordinator", KR(ret)); - } - for (int64_t i = 0; OB_SUCC(ret) && i < trans_ids.count(); ++i) { - const ObTableLoadTransId &trans_id = trans_ids.at(i); - if (OB_FAIL(coordinator.finish_trans(trans_id))) { - LOG_WARN("fail to coordinator finish trans", KR(ret), K(i), K(trans_id)); - } + } else if (OB_FAIL(coordinator.finish_trans(trans_id))) { + LOG_WARN("fail to coordinator finish trans", KR(ret), K(trans_id)); } if (OB_FAIL(ret)) { client_task_->set_status_error(ret); @@ -126,7 +122,7 @@ private: int check_all_trans_commit() { int ret = OB_SUCCESS; - const ObIArray &trans_ids = client_task_->get_trans_ids(); + const ObTableLoadTransId &trans_id = client_task_->get_trans_id(); while (OB_SUCC(ret)) { if (OB_FAIL(client_task_->check_status(ObTableLoadClientStatus::COMMITTING))) { LOG_WARN("fail to check status", KR(ret)); @@ -134,37 +130,33 @@ private: if (OB_FAIL(client_task_->get_exec_ctx()->check_status())) { LOG_WARN("fail to check exec status", KR(ret)); } else { - bool all_commit = true; ObTableLoadCoordinator coordinator(table_ctx_); + ObTableLoadTransStatusType trans_status = ObTableLoadTransStatusType::NONE; + int error_code = OB_SUCCESS; + bool try_again = false; if (OB_FAIL(coordinator.init())) { LOG_WARN("fail to init coordinator", KR(ret)); - } - for (int64_t i = 0; OB_SUCC(ret) && i < trans_ids.count(); ++i) { - const ObTableLoadTransId &trans_id = trans_ids.at(i); - ObTableLoadTransStatusType trans_status = ObTableLoadTransStatusType::NONE; - int error_code = OB_SUCCESS; - if (OB_FAIL(coordinator.get_trans_status(trans_id, trans_status, error_code))) { - LOG_WARN("fail to coordinator get status", KR(ret), K(i), K(trans_id)); - } else { - switch (trans_status) { - case ObTableLoadTransStatusType::FROZEN: - all_commit = false; - break; - case ObTableLoadTransStatusType::COMMIT: - break; - case ObTableLoadTransStatusType::ERROR: - ret = error_code; - LOG_WARN("trans has error", KR(ret)); - break; - default: - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected trans status", KR(ret), K(trans_status)); - break; - } + } else if (OB_FAIL(coordinator.get_trans_status(trans_id, trans_status, error_code))) { + LOG_WARN("fail to coordinator get status", KR(ret), K(trans_id)); + } else { + switch (trans_status) { + case ObTableLoadTransStatusType::FROZEN: + try_again = true; + break; + case ObTableLoadTransStatusType::COMMIT: + break; + case ObTableLoadTransStatusType::ERROR: + ret = error_code; + LOG_WARN("trans has error", KR(ret)); + break; + default: + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected trans status", KR(ret), K(trans_status)); + break; } } if (OB_SUCC(ret)) { - if (all_commit) { + if (!try_again) { break; } else { ob_usleep(1000 * 1000); diff --git a/src/observer/table_load/ob_table_load_client_task.cpp b/src/observer/table_load/ob_table_load_client_task.cpp index 534222765..6cb2d3eb5 100644 --- a/src/observer/table_load/ob_table_load_client_task.cpp +++ b/src/observer/table_load/ob_table_load_client_task.cpp @@ -37,7 +37,6 @@ ObTableLoadClientTask::ObTableLoadClientTask() session_info_(nullptr), exec_ctx_(nullptr), task_scheduler_(nullptr), - next_trans_idx_(0), next_batch_id_(0), table_ctx_(nullptr), client_status_(ObTableLoadClientStatus::MAX_STATUS), @@ -262,28 +261,6 @@ int ObTableLoadClientTask::get_table_ctx(ObTableLoadTableCtx *&table_ctx) return ret; } -int ObTableLoadClientTask::add_trans_id(const ObTableLoadTransId &trans_id) -{ - int ret = OB_SUCCESS; - if (OB_FAIL(trans_ids_.push_back(trans_id))) { - LOG_WARN("fail to push back trans id", KR(ret), K(trans_id)); - } - return ret; -} - -int ObTableLoadClientTask::get_next_trans_id(ObTableLoadTransId &trans_id) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(trans_ids_.empty())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected empty trans id", KR(ret)); - } else { - const int64_t trans_idx = ATOMIC_FAA(&next_trans_idx_, 1) % trans_ids_.count(); - trans_id = trans_ids_.at(trans_idx); - } - return ret; -} - int ObTableLoadClientTask::set_status_running() { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_client_task.h b/src/observer/table_load/ob_table_load_client_task.h index 79382269d..d8f30247e 100644 --- a/src/observer/table_load/ob_table_load_client_task.h +++ b/src/observer/table_load/ob_table_load_client_task.h @@ -42,13 +42,9 @@ public: int get_table_ctx(ObTableLoadTableCtx *&table_ctx); OB_INLINE sql::ObSQLSessionInfo *get_session_info() { return session_info_; } OB_INLINE ObTableLoadClientExecCtx *get_exec_ctx() { return exec_ctx_; } - int add_trans_id(const table::ObTableLoadTransId &trans_id); - int get_next_trans_id(table::ObTableLoadTransId &trans_id); + OB_INLINE void set_trans_id(const table::ObTableLoadTransId &trans_id) { trans_id_ = trans_id; } + OB_INLINE const table::ObTableLoadTransId &get_trans_id() const { return trans_id_; } int64_t get_next_batch_id() { return ATOMIC_FAA(&next_batch_id_, 1); } - OB_INLINE const common::ObIArray &get_trans_ids() const - { - return trans_ids_; - } int set_status_running(); int set_status_committing(); int set_status_commit(); @@ -62,13 +58,12 @@ public: int add_task(ObTableLoadTask *task); TO_STRING_KV(K_(tenant_id), K_(user_id), K_(database_id), K_(table_id), K_(ddl_param), K_(column_names), K_(column_idxs), K_(result_info), KP_(session_info), - K_(free_session_ctx), KP_(exec_ctx), KP_(task_scheduler), K_(trans_ids), - K_(next_trans_idx), KP_(table_ctx), K_(client_status), K_(error_code), - K_(ref_count)); + K_(free_session_ctx), KP_(exec_ctx), KP_(task_scheduler), K_(trans_id), + KP_(table_ctx), K_(client_status), K_(error_code), K_(ref_count)); private: int create_session_info(uint64_t user_id, uint64_t database_id, uint64_t table_id, - sql::ObSQLSessionInfo *&session_info, - sql::ObFreeSessionCtx &free_session_ctx); + sql::ObSQLSessionInfo *&session_info, + sql::ObFreeSessionCtx &free_session_ctx); int init_column_names_and_idxs(); int init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us); public: @@ -87,8 +82,7 @@ private: ObTableLoadClientExecCtx *exec_ctx_; ObTableLoadObjectAllocator task_allocator_; ObITableLoadTaskScheduler *task_scheduler_; - common::ObArray trans_ids_; - int64_t next_trans_idx_; + table::ObTableLoadTransId trans_id_; int64_t next_batch_id_ CACHE_ALIGNED; mutable obsys::ObRWLock rw_lock_; ObTableLoadTableCtx *table_ctx_;