add heartbeat rpc

This commit is contained in:
yongshige
2023-08-18 11:10:20 +00:00
committed by ob-robot
parent 4e92b9f5cc
commit 9d62ca9170
11 changed files with 119 additions and 9 deletions

View File

@ -139,8 +139,8 @@ int ObTableDirectLoadBeginExecutor::process()
if (OB_ISNULL(client_task_ = ObTableLoadClientService::alloc_task())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc client task", KR(ret));
} else if (OB_FAIL(
client_task_->init(tenant_id, user_id, database_id, table_id, arg_.timeout_))) {
} else if (OB_FAIL(client_task_->init(tenant_id, user_id, database_id, table_id,
arg_.timeout_, arg_.heartbeat_timeout_))) {
LOG_WARN("fail to init client task", KR(ret));
} else {
// create table ctx
@ -522,5 +522,35 @@ int ObTableDirectLoadInsertExecutor::set_batch_seq_no(int64_t batch_id,
return ret;
}
//heart_beat
int ObTableDirectLoadHeartBeatExecutor::check_args()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_ID == arg_.table_id_ || 0 == arg_.task_id_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(arg_));
}
return ret;
}
int ObTableDirectLoadHeartBeatExecutor::process()
{
int ret = OB_SUCCESS;
LOG_INFO("table direct load heart beat", K_(arg));
ObTableLoadClientTask *client_task = nullptr;
ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_);
if (OB_FAIL(ObTableLoadClientService::get_task(key, client_task))) {
LOG_WARN("fail to get client task", KR(ret), K(key));
} else {
client_task->get_exec_ctx()->last_heartbeat_time_ = ObTimeUtil::current_time();
client_task->get_status(res_.status_, res_.error_code_);
}
if (nullptr != client_task) {
ObTableLoadClientService::revert_task(client_task);
client_task = nullptr;
}
return ret;
}
} // namespace observer
} // namespace oceanbase

View File

@ -161,5 +161,26 @@ private:
int set_batch_seq_no(int64_t batch_id, table::ObTableLoadObjRowArray &obj_row_array);
};
// heart_beat
class ObTableDirectLoadHeartBeatExecutor
: public ObTableDirectLoadRpcExecutor<table::ObTableDirectLoadOperationType::HEART_BEAT>
{
typedef ObTableDirectLoadRpcExecutor<table::ObTableDirectLoadOperationType::HEART_BEAT>
ParentType;
public:
ObTableDirectLoadHeartBeatExecutor(ObTableDirectLoadExecContext &ctx,
const table::ObTableDirectLoadRequest &request,
table::ObTableDirectLoadResult &result)
: ParentType(ctx, request, result)
{
}
virtual ~ObTableDirectLoadHeartBeatExecutor() = default;
protected:
int check_args() override;
int process() override;
};
} // namespace observer
} // namespace oceanbase

View File

@ -37,6 +37,7 @@ int ObTableDirectLoadRpcProxy::dispatch(ObTableDirectLoadExecContext &ctx,
OB_TABLE_DIRECT_LOAD_RPC_DISPATCH(ObTableDirectLoadOperationType::ABORT);
OB_TABLE_DIRECT_LOAD_RPC_DISPATCH(ObTableDirectLoadOperationType::GET_STATUS);
OB_TABLE_DIRECT_LOAD_RPC_DISPATCH(ObTableDirectLoadOperationType::INSERT);
OB_TABLE_DIRECT_LOAD_RPC_DISPATCH(ObTableDirectLoadOperationType::HEART_BEAT);
default:
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "unexpected command type", K(ret), K(request));

View File

@ -26,6 +26,7 @@ class ObTableDirectLoadCommitExecutor;
class ObTableDirectLoadAbortExecutor;
class ObTableDirectLoadGetStatusExecutor;
class ObTableDirectLoadInsertExecutor;
class ObTableDirectLoadHeartBeatExecutor;
class ObTableDirectLoadRpcProxy
{
@ -149,6 +150,11 @@ public:
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(insert, table::ObTableDirectLoadOperationType::INSERT,
ObTableDirectLoadInsertExecutor, ObTableDirectLoadInsertArg);
// heart_beat
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(heartbeat, table::ObTableDirectLoadOperationType::HEART_BEAT,
ObTableDirectLoadHeartBeatExecutor, ObTableDirectLoadHeartBeatArg,
ObTableDirectLoadHeartBeatRes);
private:
obrpc::ObTableRpcProxy &rpc_proxy_;
ObArenaAllocator allocator_;

View File

@ -26,6 +26,7 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginArg,
max_error_row_count_,
dup_action_,
timeout_,
heartbeat_timeout_,
force_create_);
OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadBeginRes,
@ -60,5 +61,14 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadInsertArg,
task_id_,
payload_);
// heart_beat
OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadHeartBeatArg,
table_id_,
task_id_);
OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadHeartBeatRes,
status_,
error_code_);
} // namespace observer
} // namespace oceanbase

View File

@ -31,17 +31,19 @@ public:
max_error_row_count_(0),
dup_action_(sql::ObLoadDupActionType::LOAD_INVALID_MODE),
timeout_(0),
heartbeat_timeout_(0),
force_create_(false)
{
}
TO_STRING_KV(K_(table_name), K_(parallel), K_(max_error_row_count), K_(dup_action), K_(timeout),
K_(force_create));
K_(heartbeat_timeout), K_(force_create));
public:
ObString table_name_;
int64_t parallel_;
uint64_t max_error_row_count_;
sql::ObLoadDupActionType dup_action_;
int64_t timeout_;
int64_t heartbeat_timeout_;
bool force_create_;
};
@ -124,5 +126,30 @@ public:
ObString payload_;
};
struct ObTableDirectLoadHeartBeatArg
{
OB_UNIS_VERSION(1);
public:
ObTableDirectLoadHeartBeatArg() : table_id_(common::OB_INVALID_ID), task_id_(0) {}
TO_STRING_KV(K_(table_id), K_(task_id));
public:
uint64_t table_id_;
int64_t task_id_;
};
struct ObTableDirectLoadHeartBeatRes
{
OB_UNIS_VERSION(1);
public:
ObTableDirectLoadHeartBeatRes()
: status_(table::ObTableLoadClientStatus::MAX_STATUS), error_code_(0)
{
}
TO_STRING_KV(K_(status), K_(error_code));
public:
table::ObTableLoadClientStatus status_;
int32_t error_code_;
};
} // namespace observer
} // namespace oceanbase

View File

@ -78,7 +78,7 @@ ObTableLoadClientTask::~ObTableLoadClientTask()
}
int ObTableLoadClientTask::init(uint64_t tenant_id, uint64_t user_id, uint64_t database_id,
uint64_t table_id, int64_t timeout_us)
uint64_t table_id, int64_t timeout_us, int64_t heartbeat_timeout_us)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
@ -98,7 +98,7 @@ int ObTableLoadClientTask::init(uint64_t tenant_id, uint64_t user_id, uint64_t d
LOG_WARN("fail to create session info", KR(ret));
} else if (OB_FAIL(init_column_names_and_idxs())) {
LOG_WARN("fail to init column names and idxs", KR(ret));
} else if (OB_FAIL(init_exec_ctx(timeout_us))) {
} else if (OB_FAIL(init_exec_ctx(timeout_us, heartbeat_timeout_us))) {
LOG_WARN("fail to init client exec ctx", KR(ret));
} else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", MTL_ID()))) {
LOG_WARN("fail to init task allocator", KR(ret));
@ -212,7 +212,7 @@ int ObTableLoadClientTask::init_column_names_and_idxs()
return ret;
}
int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us)
int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(exec_ctx_ = OB_NEWx(ObTableLoadClientExecCtx, &allocator_))) {
@ -222,6 +222,8 @@ int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us)
exec_ctx_->allocator_ = &allocator_;
exec_ctx_->session_info_ = session_info_;
exec_ctx_->timeout_ts_ = ObTimeUtil::current_time() + timeout_us;
exec_ctx_->last_heartbeat_time_ = ObTimeUtil::current_time();
exec_ctx_->heartbeat_timeout_us_ = heartbeat_timeout_us;
}
return ret;
}

View File

@ -33,7 +33,7 @@ public:
ObTableLoadClientTask();
~ObTableLoadClientTask();
int init(uint64_t tenant_id, uint64_t user_id, uint64_t database_id, uint64_t table_id,
int64_t timeout_us);
int64_t timeout_us, int64_t heartbeat_timeout_us);
bool is_inited() const { return is_inited_; }
int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); }
int64_t inc_ref_count() { return ATOMIC_AAF(&ref_count_, 1); }
@ -70,7 +70,7 @@ private:
sql::ObSQLSessionInfo *&session_info,
sql::ObFreeSessionCtx &free_session_ctx);
int init_column_names_and_idxs();
int init_exec_ctx(int64_t timeout_us);
int init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us);
public:
uint64_t tenant_id_;
uint64_t user_id_;

View File

@ -74,6 +74,9 @@ int ObTableLoadClientExecCtx::check_status()
} else if (OB_UNLIKELY(timeout_ts_ < ObTimeUtil::current_time())) {
ret = OB_TIMEOUT;
LOG_WARN("table load is timeout", KR(ret), K_(timeout_ts));
} else if (OB_UNLIKELY(ObTimeUtil::current_time() - last_heartbeat_time_ > heartbeat_timeout_us_)) {
ret = OB_TIMEOUT;
LOG_WARN("heart beat is timeout", KR(ret), K(last_heartbeat_time_), K(heartbeat_timeout_us_));
} else if (OB_ISNULL(session_info_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session info is null");

View File

@ -56,7 +56,14 @@ public:
class ObTableLoadClientExecCtx : public ObTableLoadExecCtx
{
public:
ObTableLoadClientExecCtx() : allocator_(nullptr), session_info_(nullptr), timeout_ts_(0) {}
ObTableLoadClientExecCtx()
: allocator_(nullptr),
session_info_(nullptr),
timeout_ts_(0),
heartbeat_timeout_us_(0),
last_heartbeat_time_(0)
{
}
virtual ~ObTableLoadClientExecCtx() = default;
common::ObIAllocator *get_allocator() override { return allocator_; }
sql::ObSQLSessionInfo *get_session_info() override { return session_info_; }
@ -68,6 +75,8 @@ public:
common::ObIAllocator *allocator_;
sql::ObSQLSessionInfo *session_info_;
int64_t timeout_ts_;
int64_t heartbeat_timeout_us_;
int64_t last_heartbeat_time_;
};
} // namespace observer

View File

@ -900,6 +900,7 @@ enum class ObTableDirectLoadOperationType {
ABORT = 2,
GET_STATUS = 3,
INSERT = 4,
HEART_BEAT = 5,
MAX_TYPE
};