diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp index 1d7ff1efb9..f39200798c 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp @@ -139,8 +139,8 @@ int ObTableDirectLoadBeginExecutor::process() if (OB_ISNULL(client_task_ = ObTableLoadClientService::alloc_task())) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc client task", KR(ret)); - } else if (OB_FAIL( - client_task_->init(tenant_id, user_id, database_id, table_id, arg_.timeout_))) { + } else if (OB_FAIL(client_task_->init(tenant_id, user_id, database_id, table_id, + arg_.timeout_, arg_.heartbeat_timeout_))) { LOG_WARN("fail to init client task", KR(ret)); } else { // create table ctx @@ -522,5 +522,35 @@ int ObTableDirectLoadInsertExecutor::set_batch_seq_no(int64_t batch_id, return ret; } +//heart_beat +int ObTableDirectLoadHeartBeatExecutor::check_args() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(OB_INVALID_ID == arg_.table_id_ || 0 == arg_.task_id_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(arg_)); + } + return ret; +} + +int ObTableDirectLoadHeartBeatExecutor::process() +{ + int ret = OB_SUCCESS; + LOG_INFO("table direct load heart beat", K_(arg)); + ObTableLoadClientTask *client_task = nullptr; + ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_); + if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) { + LOG_WARN("fail to get client task", KR(ret), K(key)); + } else { + client_task->get_exec_ctx()->last_heartbeat_time_ = ObTimeUtil::current_time(); + client_task->get_status(res_.status_, res_.error_code_); + } + if (nullptr != client_task) { + ObTableLoadClientService::revert_task(client_task); + client_task = nullptr; + } + return ret; +} + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h index a98cc83cc5..7a8238ec40 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h @@ -161,5 +161,26 @@ private: int set_batch_seq_no(int64_t batch_id, table::ObTableLoadObjRowArray &obj_row_array); }; +// heart_beat +class ObTableDirectLoadHeartBeatExecutor + : public ObTableDirectLoadRpcExecutor +{ + typedef ObTableDirectLoadRpcExecutor + ParentType; + +public: + ObTableDirectLoadHeartBeatExecutor(ObTableDirectLoadExecContext &ctx, + const table::ObTableDirectLoadRequest &request, + table::ObTableDirectLoadResult &result) + : ParentType(ctx, request, result) + { + } + virtual ~ObTableDirectLoadHeartBeatExecutor() = default; + +protected: + int check_args() override; + int process() override; +}; + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.cpp index 773798ac16..c329459054 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.cpp @@ -37,6 +37,7 @@ int ObTableDirectLoadRpcProxy::dispatch(ObTableDirectLoadExecContext &ctx, OB_TABLE_DIRECT_LOAD_RPC_DISPATCH(ObTableDirectLoadOperationType::ABORT); OB_TABLE_DIRECT_LOAD_RPC_DISPATCH(ObTableDirectLoadOperationType::GET_STATUS); OB_TABLE_DIRECT_LOAD_RPC_DISPATCH(ObTableDirectLoadOperationType::INSERT); + OB_TABLE_DIRECT_LOAD_RPC_DISPATCH(ObTableDirectLoadOperationType::HEART_BEAT); default: ret = OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "unexpected command type", K(ret), K(request)); diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.h b/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.h index 1e68b74c63..bd116b5d92 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.h @@ -26,6 +26,7 @@ class ObTableDirectLoadCommitExecutor; class ObTableDirectLoadAbortExecutor; class ObTableDirectLoadGetStatusExecutor; class ObTableDirectLoadInsertExecutor; +class ObTableDirectLoadHeartBeatExecutor; class ObTableDirectLoadRpcProxy { @@ -149,6 +150,11 @@ public: OB_DEFINE_TABLE_DIRECT_LOAD_RPC(insert, table::ObTableDirectLoadOperationType::INSERT, ObTableDirectLoadInsertExecutor, ObTableDirectLoadInsertArg); + // heart_beat + OB_DEFINE_TABLE_DIRECT_LOAD_RPC(heartbeat, table::ObTableDirectLoadOperationType::HEART_BEAT, + ObTableDirectLoadHeartBeatExecutor, ObTableDirectLoadHeartBeatArg, + ObTableDirectLoadHeartBeatRes); + private: obrpc::ObTableRpcProxy &rpc_proxy_; ObArenaAllocator allocator_; diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp index 390860d222..f3c9515b42 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.cpp @@ -26,6 +26,7 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginArg, max_error_row_count_, dup_action_, timeout_, + heartbeat_timeout_, force_create_); OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginRes, @@ -60,5 +61,14 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadInsertArg, task_id_, payload_); +// heart_beat +OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadHeartBeatArg, + table_id_, + task_id_); + +OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadHeartBeatRes, + status_, + error_code_); + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h index 4c20187a73..7f555f7235 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_struct.h @@ -31,17 +31,19 @@ public: max_error_row_count_(0), dup_action_(sql::ObLoadDupActionType::LOAD_INVALID_MODE), timeout_(0), + heartbeat_timeout_(0), force_create_(false) { } TO_STRING_KV(K_(table_name), K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout), - K_(force_create)); + K_(heartbeat_timeout), K_(force_create)); public: ObString table_name_; int64_t parallel_; uint64_t max_error_row_count_; sql::ObLoadDupActionType dup_action_; int64_t timeout_; + int64_t heartbeat_timeout_; bool force_create_; }; @@ -124,5 +126,30 @@ public: ObString payload_; }; +struct ObTableDirectLoadHeartBeatArg +{ + OB_UNIS_VERSION(1); +public: + ObTableDirectLoadHeartBeatArg() : table_id_(common::OB_INVALID_ID), task_id_(0) {} + TO_STRING_KV(K_(table_id), K_(task_id)); +public: + uint64_t table_id_; + int64_t task_id_; +}; + +struct ObTableDirectLoadHeartBeatRes +{ + OB_UNIS_VERSION(1); +public: + ObTableDirectLoadHeartBeatRes() + : status_(table::ObTableLoadClientStatus::MAX_STATUS), error_code_(0) + { + } + TO_STRING_KV(K_(status), K_(error_code)); +public: + table::ObTableLoadClientStatus status_; + int32_t error_code_; +}; + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_client_task.cpp b/src/observer/table_load/ob_table_load_client_task.cpp index c89e345af6..5342227658 100644 --- a/src/observer/table_load/ob_table_load_client_task.cpp +++ b/src/observer/table_load/ob_table_load_client_task.cpp @@ -78,7 +78,7 @@ ObTableLoadClientTask::~ObTableLoadClientTask() } int ObTableLoadClientTask::init(uint64_t tenant_id, uint64_t user_id, uint64_t database_id, - uint64_t table_id, int64_t timeout_us) + uint64_t table_id, int64_t timeout_us, int64_t heartbeat_timeout_us) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -98,7 +98,7 @@ int ObTableLoadClientTask::init(uint64_t tenant_id, uint64_t user_id, uint64_t d LOG_WARN("fail to create session info", KR(ret)); } else if (OB_FAIL(init_column_names_and_idxs())) { LOG_WARN("fail to init column names and idxs", KR(ret)); - } else if (OB_FAIL(init_exec_ctx(timeout_us))) { + } else if (OB_FAIL(init_exec_ctx(timeout_us, heartbeat_timeout_us))) { LOG_WARN("fail to init client exec ctx", KR(ret)); } else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", MTL_ID()))) { LOG_WARN("fail to init task allocator", KR(ret)); @@ -212,7 +212,7 @@ int ObTableLoadClientTask::init_column_names_and_idxs() return ret; } -int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us) +int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us) { int ret = OB_SUCCESS; if (OB_ISNULL(exec_ctx_ = OB_NEWx(ObTableLoadClientExecCtx, &allocator_))) { @@ -222,6 +222,8 @@ int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us) exec_ctx_->allocator_ = &allocator_; exec_ctx_->session_info_ = session_info_; exec_ctx_->timeout_ts_ = ObTimeUtil::current_time() + timeout_us; + exec_ctx_->last_heartbeat_time_ = ObTimeUtil::current_time(); + exec_ctx_->heartbeat_timeout_us_ = heartbeat_timeout_us; } return ret; } diff --git a/src/observer/table_load/ob_table_load_client_task.h b/src/observer/table_load/ob_table_load_client_task.h index bd89900419..79382269d7 100644 --- a/src/observer/table_load/ob_table_load_client_task.h +++ b/src/observer/table_load/ob_table_load_client_task.h @@ -33,7 +33,7 @@ public: ObTableLoadClientTask(); ~ObTableLoadClientTask(); int init(uint64_t tenant_id, uint64_t user_id, uint64_t database_id, uint64_t table_id, - int64_t timeout_us); + int64_t timeout_us, int64_t heartbeat_timeout_us); bool is_inited() const { return is_inited_; } int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); } int64_t inc_ref_count() { return ATOMIC_AAF(&ref_count_, 1); } @@ -70,7 +70,7 @@ private: sql::ObSQLSessionInfo *&session_info, sql::ObFreeSessionCtx &free_session_ctx); int init_column_names_and_idxs(); - int init_exec_ctx(int64_t timeout_us); + int init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us); public: uint64_t tenant_id_; uint64_t user_id_; diff --git a/src/observer/table_load/ob_table_load_exec_ctx.cpp b/src/observer/table_load/ob_table_load_exec_ctx.cpp index 56bcc95988..c4d629e060 100644 --- a/src/observer/table_load/ob_table_load_exec_ctx.cpp +++ b/src/observer/table_load/ob_table_load_exec_ctx.cpp @@ -74,6 +74,9 @@ int ObTableLoadClientExecCtx::check_status() } else if (OB_UNLIKELY(timeout_ts_ < ObTimeUtil::current_time())) { ret = OB_TIMEOUT; LOG_WARN("table load is timeout", KR(ret), K_(timeout_ts)); + } else if (OB_UNLIKELY(ObTimeUtil::current_time() - last_heartbeat_time_ > heartbeat_timeout_us_)) { + ret = OB_TIMEOUT; + LOG_WARN("heart beat is timeout", KR(ret), K(last_heartbeat_time_), K(heartbeat_timeout_us_)); } else if (OB_ISNULL(session_info_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("session info is null"); diff --git a/src/observer/table_load/ob_table_load_exec_ctx.h b/src/observer/table_load/ob_table_load_exec_ctx.h index 690546b0bd..ec17053e20 100644 --- a/src/observer/table_load/ob_table_load_exec_ctx.h +++ b/src/observer/table_load/ob_table_load_exec_ctx.h @@ -56,7 +56,14 @@ public: class ObTableLoadClientExecCtx : public ObTableLoadExecCtx { public: - ObTableLoadClientExecCtx() : allocator_(nullptr), session_info_(nullptr), timeout_ts_(0) {} + ObTableLoadClientExecCtx() + : allocator_(nullptr), + session_info_(nullptr), + timeout_ts_(0), + heartbeat_timeout_us_(0), + last_heartbeat_time_(0) + { + } virtual ~ObTableLoadClientExecCtx() = default; common::ObIAllocator *get_allocator() override { return allocator_; } sql::ObSQLSessionInfo *get_session_info() override { return session_info_; } @@ -68,6 +75,8 @@ public: common::ObIAllocator *allocator_; sql::ObSQLSessionInfo *session_info_; int64_t timeout_ts_; + int64_t heartbeat_timeout_us_; + int64_t last_heartbeat_time_; }; } // namespace observer diff --git a/src/share/table/ob_table.h b/src/share/table/ob_table.h index 0f7f1fa968..ead0d94050 100644 --- a/src/share/table/ob_table.h +++ b/src/share/table/ob_table.h @@ -900,6 +900,7 @@ enum class ObTableDirectLoadOperationType { ABORT = 2, GET_STATUS = 3, INSERT = 4, + HEART_BEAT = 5, MAX_TYPE };