[CP] Modify direct load heart rpc to high prio

This commit is contained in:
obdev 2024-02-07 19:10:37 +00:00 committed by ob-robot
parent 128c9c5143
commit 07bed38c8e
7 changed files with 74 additions and 35 deletions

View File

@ -639,6 +639,8 @@ OB_INLINE int64_t ObResourceGroup::max_worker_cnt() const
cnt = 2; // one for take snapshot, one for purge
} else if (share::OBCG_DBA_COMMAND == group_id_) {
cnt = 1;
} else if (share::OBCG_DIRECT_LOAD_HIGH_PRIO == group_id_) {
cnt = 1;
}
return cnt;
}

View File

@ -14,6 +14,7 @@
#include "ob_table_direct_load_rpc_struct.h"
#include "observer/table_load/ob_table_load_rpc_executor.h"
#include "share/resource_manager/ob_cgroup_ctrl.h"
#include "share/table/ob_table_rpc_proxy.h"
namespace oceanbase
@ -37,7 +38,7 @@ public:
{
};
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_1(name, pcode, Arg) \
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_1(prio, name, pcode, Arg) \
int name(const Arg &arg) \
{ \
int ret = OB_SUCCESS; \
@ -51,6 +52,9 @@ public:
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
.timeout(timeout_) \
.by(tenant_id_) \
.group_id(ObTableLoadRpcPriority::HIGH_PRIO == prio \
? share::OBCG_DIRECT_LOAD_HIGH_PRIO \
: share::OBCG_DEFAULT) \
.direct_load(request, result))) { \
SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \
} else if (OB_UNLIKELY(result.header_.operation_type_ != pcode)) { \
@ -63,7 +67,7 @@ public:
return ret; \
}
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_2(name, pcode, Arg, Res) \
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_2(prio, name, pcode, Arg, Res) \
int name(const Arg &arg, Res &res) \
{ \
int ret = OB_SUCCESS; \
@ -77,6 +81,9 @@ public:
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
.timeout(timeout_) \
.by(tenant_id_) \
.group_id(ObTableLoadRpcPriority::HIGH_PRIO == prio \
? share::OBCG_DIRECT_LOAD_HIGH_PRIO \
: share::OBCG_DEFAULT) \
.direct_load(request, result))) { \
SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \
} else if (OB_UNLIKELY(result.header_.operation_type_ != pcode)) { \
@ -88,14 +95,15 @@ public:
return ret; \
}
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(name, pcode, ...) \
CONCAT(OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_, ARGS_NUM(__VA_ARGS__))(name, pcode, __VA_ARGS__)
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(prio, name, pcode, ...) \
CONCAT(OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_, ARGS_NUM(__VA_ARGS__)) \
(prio, name, pcode, __VA_ARGS__)
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC(name, pcode, Processor, ...) \
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC(prio, name, pcode, Processor, ...) \
OB_DEFINE_TABLE_LOAD_RPC(ObTableDirectLoadRpc, pcode, Processor, \
table::ObTableDirectLoadRequest, table::ObTableDirectLoadResult, \
__VA_ARGS__) \
OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(name, pcode, __VA_ARGS__)
OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(ObTableLoadRpcPriority::prio, name, pcode, __VA_ARGS__)
public:
ObTableDirectLoadRpcProxy(obrpc::ObTableRpcProxy &rpc_proxy)
@ -133,25 +141,29 @@ public:
table::ObTableDirectLoadResult &result);
// begin
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(begin, table::ObTableDirectLoadOperationType::BEGIN,
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, begin, table::ObTableDirectLoadOperationType::BEGIN,
ObTableDirectLoadBeginExecutor, ObTableDirectLoadBeginArg,
ObTableDirectLoadBeginRes);
// commit
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(commit, table::ObTableDirectLoadOperationType::COMMIT,
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, commit,
table::ObTableDirectLoadOperationType::COMMIT,
ObTableDirectLoadCommitExecutor, ObTableDirectLoadCommitArg);
// abort
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(abort, table::ObTableDirectLoadOperationType::ABORT,
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, abort, table::ObTableDirectLoadOperationType::ABORT,
ObTableDirectLoadAbortExecutor, ObTableDirectLoadAbortArg);
// get_status
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(get_status, table::ObTableDirectLoadOperationType::GET_STATUS,
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, get_status,
table::ObTableDirectLoadOperationType::GET_STATUS,
ObTableDirectLoadGetStatusExecutor, ObTableDirectLoadGetStatusArg,
ObTableDirectLoadGetStatusRes);
// insert
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(insert, table::ObTableDirectLoadOperationType::INSERT,
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, insert,
table::ObTableDirectLoadOperationType::INSERT,
ObTableDirectLoadInsertExecutor, ObTableDirectLoadInsertArg);
// heart_beat
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(heartbeat, table::ObTableDirectLoadOperationType::HEART_BEAT,
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(HIGH_PRIO, heartbeat,
table::ObTableDirectLoadOperationType::HEART_BEAT,
ObTableDirectLoadHeartBeatExecutor, ObTableDirectLoadHeartBeatArg,
ObTableDirectLoadHeartBeatRes);

View File

@ -46,7 +46,7 @@ public:
{
};
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_1(name, pcode, Arg) \
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_1(prio, name, pcode, Arg) \
int name(const Arg &arg) \
{ \
int ret = OB_SUCCESS; \
@ -59,6 +59,9 @@ public:
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
.timeout(timeout_) \
.by(tenant_id_) \
.group_id(ObTableLoadRpcPriority::HIGH_PRIO == prio \
? share::OBCG_DIRECT_LOAD_HIGH_PRIO \
: share::OBCG_DEFAULT) \
.direct_load_control(request, result))) { \
SERVER_LOG(WARN, "fail to rpc call direct load control", K(ret), K_(addr), K(request)); \
} else if (OB_UNLIKELY(result.command_type_ != pcode)) { \
@ -71,7 +74,7 @@ public:
return ret; \
}
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_2(name, pcode, Arg, Res) \
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_2(prio, name, pcode, Arg, Res) \
int name(const Arg &arg, Res &res) \
{ \
int ret = OB_SUCCESS; \
@ -84,6 +87,9 @@ public:
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
.timeout(timeout_) \
.by(tenant_id_) \
.group_id(ObTableLoadRpcPriority::HIGH_PRIO == prio \
? share::OBCG_DIRECT_LOAD_HIGH_PRIO \
: share::OBCG_DEFAULT) \
.direct_load_control(request, result))) { \
SERVER_LOG(WARN, "fail to rpc call direct load control", K(ret), K_(addr), K(request)); \
} else if (OB_UNLIKELY(result.command_type_ != pcode)) { \
@ -95,13 +101,14 @@ public:
return ret; \
}
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL(name, pcode, ...) \
CONCAT(OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_, ARGS_NUM(__VA_ARGS__))(name, pcode, __VA_ARGS__)
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL(prio, name, pcode, ...) \
CONCAT(OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_, ARGS_NUM(__VA_ARGS__)) \
(prio, name, pcode, __VA_ARGS__)
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC(name, pcode, Processor, ...) \
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC(prio, name, pcode, Processor, ...) \
OB_DEFINE_TABLE_LOAD_RPC(ObTableLoadControlRpc, pcode, Processor, ObDirectLoadControlRequest, \
ObDirectLoadControlResult, __VA_ARGS__) \
OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL(name, pcode, __VA_ARGS__)
OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL(ObTableLoadRpcPriority::prio, name, pcode, __VA_ARGS__)
public:
ObTableLoadControlRpcProxy(obrpc::ObSrvRpcProxy &rpc_proxy)
@ -133,70 +140,79 @@ public:
common::ObIAllocator &allocator);
// pre_begin
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(pre_begin, ObDirectLoadControlCommandType::PRE_BEGIN,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, pre_begin,
ObDirectLoadControlCommandType::PRE_BEGIN,
ObDirectLoadControlPreBeginExecutor,
ObDirectLoadControlPreBeginArg);
// confirm_begin
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(confirm_begin, ObDirectLoadControlCommandType::CONFIRM_BEGIN,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, confirm_begin,
ObDirectLoadControlCommandType::CONFIRM_BEGIN,
ObDirectLoadControlConfirmBeginExecutor,
ObDirectLoadControlConfirmBeginArg);
// pre_merge
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(pre_merge, ObDirectLoadControlCommandType::PRE_MERGE,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, pre_merge,
ObDirectLoadControlCommandType::PRE_MERGE,
ObDirectLoadControlPreMergeExecutor,
ObDirectLoadControlPreMergeArg);
// start_merge
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(start_merge, ObDirectLoadControlCommandType::START_MERGE,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, start_merge,
ObDirectLoadControlCommandType::START_MERGE,
ObDirectLoadControlStartMergeExecutor,
ObDirectLoadControlStartMergeArg);
// commit
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(commit, ObDirectLoadControlCommandType::COMMIT,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, commit, ObDirectLoadControlCommandType::COMMIT,
ObDirectLoadControlCommitExecutor, ObDirectLoadControlCommitArg,
ObDirectLoadControlCommitRes);
// abort
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(abort, ObDirectLoadControlCommandType::ABORT,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, abort, ObDirectLoadControlCommandType::ABORT,
ObDirectLoadControlAbortExecutor, ObDirectLoadControlAbortArg,
ObDirectLoadControlAbortRes);
// get_status
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(get_status, ObDirectLoadControlCommandType::GET_STATUS,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, get_status,
ObDirectLoadControlCommandType::GET_STATUS,
ObDirectLoadControlGetStatusExecutor,
ObDirectLoadControlGetStatusArg,
ObDirectLoadControlGetStatusRes);
// heart_beat
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(heart_beat, ObDirectLoadControlCommandType::HEART_BEAT,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(HIGH_PRIO, heart_beat,
ObDirectLoadControlCommandType::HEART_BEAT,
ObDirectLoadControlHeartBeatExecutor,
ObDirectLoadControlHeartBeatArg);
/// trans
// pre_start_trans
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(pre_start_trans, ObDirectLoadControlCommandType::PRE_START_TRANS,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, pre_start_trans,
ObDirectLoadControlCommandType::PRE_START_TRANS,
ObDirectLoadControlPreStartTransExecutor,
ObDirectLoadControlPreStartTransArg);
// confirm_start_trans
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(confirm_start_trans,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, confirm_start_trans,
ObDirectLoadControlCommandType::CONFIRM_START_TRANS,
ObDirectLoadControlConfirmStartTransExecutor,
ObDirectLoadControlConfirmStartTransArg);
// pre_finish_trans
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(pre_finish_trans,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, pre_finish_trans,
ObDirectLoadControlCommandType::PRE_FINISH_TRANS,
ObDirectLoadControlPreFinishTransExecutor,
ObDirectLoadControlPreFinishTransArg);
// confirm_finish_trans
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(confirm_finish_trans,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, confirm_finish_trans,
ObDirectLoadControlCommandType::CONFIRM_FINISH_TRANS,
ObDirectLoadControlConfirmFinishTransExecutor,
ObDirectLoadControlConfirmFinishTransArg);
// abandon_trans
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(abandon_trans, ObDirectLoadControlCommandType::ABANDON_TRANS,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, abandon_trans,
ObDirectLoadControlCommandType::ABANDON_TRANS,
ObDirectLoadControlAbandonTransExecutor,
ObDirectLoadControlAbandonTransArg);
// get_trans_status
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(get_trans_status,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, get_trans_status,
ObDirectLoadControlCommandType::GET_TRANS_STATUS,
ObDirectLoadControlGetTransStatusExecutor,
ObDirectLoadControlGetTransStatusArg,
ObDirectLoadControlGetTransStatusRes);
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(insert_trans, ObDirectLoadControlCommandType::INSERT_TRANS,
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, insert_trans,
ObDirectLoadControlCommandType::INSERT_TRANS,
ObDirectLoadControlInsertTransExecutor,
ObDirectLoadControlInsertTransArg);

View File

@ -509,6 +509,9 @@ int ObTableLoadCoordinator::check_peers_merge_result(bool &is_finish)
if (OB_UNLIKELY(ObTableLoadStatusType::ERROR == res.status_)) {
ret = res.error_code_;
LOG_WARN("store has error", KR(ret), K(addr), K(res.status_));
} else if (OB_UNLIKELY(ObTableLoadStatusType::ABORT == res.status_)) {
ret = OB_SUCCESS != res.error_code_ ? res.error_code_ : OB_CANCELED;
LOG_WARN("store has abort", KR(ret), K(addr), K(res.status_));
} else if (OB_UNLIKELY(ObTableLoadStatusType::MERGING != res.status_ &&
ObTableLoadStatusType::MERGED != res.status_)) {
ret = OB_ERR_UNEXPECTED;

View File

@ -21,6 +21,12 @@ namespace oceanbase
{
namespace observer
{
enum class ObTableLoadRpcPriority
{
NORMAL_PRIO = 0,
HIGH_PRIO = 1,
};
// template <typename pcode, typename IGNORE = void>
// struct ObTableLoadRpc
// {

View File

@ -383,7 +383,7 @@ int ObTableLoadStoreCtx::check_status(ObTableLoadStatusType status) const
if (ObTableLoadStatusType::ERROR == status_) {
ret = error_code_;
} else if (ObTableLoadStatusType::ABORT == status_) {
ret = OB_CANCELED;
ret = OB_SUCCESS != error_code_ ? error_code_ : OB_CANCELED;
} else {
ret = OB_STATE_NOT_MATCH;
}

View File

@ -33,5 +33,5 @@ CGID_DEF(OBCG_DBA_COMMAND, 18, 1)
CGID_DEF(OBCG_STORAGE, 19)
CGID_DEF(OBCG_LOCK, 20)
CGID_DEF(OBCG_UNLOCK, 21)
// CGID_DEF(OBCG_DIRECT_LOAD_HIGH_PRIO, 22)
CGID_DEF(OBCG_DIRECT_LOAD_HIGH_PRIO, 22)
CGID_DEF(OBCG_LQ, 100)