[CP] Modify direct load heart rpc to high prio
This commit is contained in:
@ -639,6 +639,8 @@ OB_INLINE int64_t ObResourceGroup::max_worker_cnt() const
|
|||||||
cnt = 2; // one for take snapshot, one for purge
|
cnt = 2; // one for take snapshot, one for purge
|
||||||
} else if (share::OBCG_DBA_COMMAND == group_id_) {
|
} else if (share::OBCG_DBA_COMMAND == group_id_) {
|
||||||
cnt = 1;
|
cnt = 1;
|
||||||
|
} else if (share::OBCG_DIRECT_LOAD_HIGH_PRIO == group_id_) {
|
||||||
|
cnt = 1;
|
||||||
}
|
}
|
||||||
return cnt;
|
return cnt;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
#include "ob_table_direct_load_rpc_struct.h"
|
#include "ob_table_direct_load_rpc_struct.h"
|
||||||
#include "observer/table_load/ob_table_load_rpc_executor.h"
|
#include "observer/table_load/ob_table_load_rpc_executor.h"
|
||||||
|
#include "share/resource_manager/ob_cgroup_ctrl.h"
|
||||||
#include "share/table/ob_table_rpc_proxy.h"
|
#include "share/table/ob_table_rpc_proxy.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
@ -37,7 +38,7 @@ public:
|
|||||||
{
|
{
|
||||||
};
|
};
|
||||||
|
|
||||||
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_1(name, pcode, Arg) \
|
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_1(prio, name, pcode, Arg) \
|
||||||
int name(const Arg &arg) \
|
int name(const Arg &arg) \
|
||||||
{ \
|
{ \
|
||||||
int ret = OB_SUCCESS; \
|
int ret = OB_SUCCESS; \
|
||||||
@ -51,6 +52,9 @@ public:
|
|||||||
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
|
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
|
||||||
.timeout(timeout_) \
|
.timeout(timeout_) \
|
||||||
.by(tenant_id_) \
|
.by(tenant_id_) \
|
||||||
|
.group_id(ObTableLoadRpcPriority::HIGH_PRIO == prio \
|
||||||
|
? share::OBCG_DIRECT_LOAD_HIGH_PRIO \
|
||||||
|
: share::OBCG_DEFAULT) \
|
||||||
.direct_load(request, result))) { \
|
.direct_load(request, result))) { \
|
||||||
SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \
|
SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \
|
||||||
} else if (OB_UNLIKELY(result.header_.operation_type_ != pcode)) { \
|
} else if (OB_UNLIKELY(result.header_.operation_type_ != pcode)) { \
|
||||||
@ -63,7 +67,7 @@ public:
|
|||||||
return ret; \
|
return ret; \
|
||||||
}
|
}
|
||||||
|
|
||||||
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_2(name, pcode, Arg, Res) \
|
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_2(prio, name, pcode, Arg, Res) \
|
||||||
int name(const Arg &arg, Res &res) \
|
int name(const Arg &arg, Res &res) \
|
||||||
{ \
|
{ \
|
||||||
int ret = OB_SUCCESS; \
|
int ret = OB_SUCCESS; \
|
||||||
@ -77,6 +81,9 @@ public:
|
|||||||
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
|
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
|
||||||
.timeout(timeout_) \
|
.timeout(timeout_) \
|
||||||
.by(tenant_id_) \
|
.by(tenant_id_) \
|
||||||
|
.group_id(ObTableLoadRpcPriority::HIGH_PRIO == prio \
|
||||||
|
? share::OBCG_DIRECT_LOAD_HIGH_PRIO \
|
||||||
|
: share::OBCG_DEFAULT) \
|
||||||
.direct_load(request, result))) { \
|
.direct_load(request, result))) { \
|
||||||
SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \
|
SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \
|
||||||
} else if (OB_UNLIKELY(result.header_.operation_type_ != pcode)) { \
|
} else if (OB_UNLIKELY(result.header_.operation_type_ != pcode)) { \
|
||||||
@ -88,14 +95,15 @@ public:
|
|||||||
return ret; \
|
return ret; \
|
||||||
}
|
}
|
||||||
|
|
||||||
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(name, pcode, ...) \
|
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(prio, name, pcode, ...) \
|
||||||
CONCAT(OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_, ARGS_NUM(__VA_ARGS__))(name, pcode, __VA_ARGS__)
|
CONCAT(OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_, ARGS_NUM(__VA_ARGS__)) \
|
||||||
|
(prio, name, pcode, __VA_ARGS__)
|
||||||
|
|
||||||
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC(name, pcode, Processor, ...) \
|
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC(prio, name, pcode, Processor, ...) \
|
||||||
OB_DEFINE_TABLE_LOAD_RPC(ObTableDirectLoadRpc, pcode, Processor, \
|
OB_DEFINE_TABLE_LOAD_RPC(ObTableDirectLoadRpc, pcode, Processor, \
|
||||||
table::ObTableDirectLoadRequest, table::ObTableDirectLoadResult, \
|
table::ObTableDirectLoadRequest, table::ObTableDirectLoadResult, \
|
||||||
__VA_ARGS__) \
|
__VA_ARGS__) \
|
||||||
OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(name, pcode, __VA_ARGS__)
|
OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(ObTableLoadRpcPriority::prio, name, pcode, __VA_ARGS__)
|
||||||
|
|
||||||
public:
|
public:
|
||||||
ObTableDirectLoadRpcProxy(obrpc::ObTableRpcProxy &rpc_proxy)
|
ObTableDirectLoadRpcProxy(obrpc::ObTableRpcProxy &rpc_proxy)
|
||||||
@ -133,25 +141,29 @@ public:
|
|||||||
table::ObTableDirectLoadResult &result);
|
table::ObTableDirectLoadResult &result);
|
||||||
|
|
||||||
// begin
|
// begin
|
||||||
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(begin, table::ObTableDirectLoadOperationType::BEGIN,
|
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, begin, table::ObTableDirectLoadOperationType::BEGIN,
|
||||||
ObTableDirectLoadBeginExecutor, ObTableDirectLoadBeginArg,
|
ObTableDirectLoadBeginExecutor, ObTableDirectLoadBeginArg,
|
||||||
ObTableDirectLoadBeginRes);
|
ObTableDirectLoadBeginRes);
|
||||||
// commit
|
// commit
|
||||||
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(commit, table::ObTableDirectLoadOperationType::COMMIT,
|
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, commit,
|
||||||
|
table::ObTableDirectLoadOperationType::COMMIT,
|
||||||
ObTableDirectLoadCommitExecutor, ObTableDirectLoadCommitArg);
|
ObTableDirectLoadCommitExecutor, ObTableDirectLoadCommitArg);
|
||||||
// abort
|
// abort
|
||||||
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(abort, table::ObTableDirectLoadOperationType::ABORT,
|
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, abort, table::ObTableDirectLoadOperationType::ABORT,
|
||||||
ObTableDirectLoadAbortExecutor, ObTableDirectLoadAbortArg);
|
ObTableDirectLoadAbortExecutor, ObTableDirectLoadAbortArg);
|
||||||
// get_status
|
// get_status
|
||||||
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(get_status, table::ObTableDirectLoadOperationType::GET_STATUS,
|
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, get_status,
|
||||||
|
table::ObTableDirectLoadOperationType::GET_STATUS,
|
||||||
ObTableDirectLoadGetStatusExecutor, ObTableDirectLoadGetStatusArg,
|
ObTableDirectLoadGetStatusExecutor, ObTableDirectLoadGetStatusArg,
|
||||||
ObTableDirectLoadGetStatusRes);
|
ObTableDirectLoadGetStatusRes);
|
||||||
// insert
|
// insert
|
||||||
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(insert, table::ObTableDirectLoadOperationType::INSERT,
|
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, insert,
|
||||||
|
table::ObTableDirectLoadOperationType::INSERT,
|
||||||
ObTableDirectLoadInsertExecutor, ObTableDirectLoadInsertArg);
|
ObTableDirectLoadInsertExecutor, ObTableDirectLoadInsertArg);
|
||||||
|
|
||||||
// heart_beat
|
// heart_beat
|
||||||
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(heartbeat, table::ObTableDirectLoadOperationType::HEART_BEAT,
|
OB_DEFINE_TABLE_DIRECT_LOAD_RPC(HIGH_PRIO, heartbeat,
|
||||||
|
table::ObTableDirectLoadOperationType::HEART_BEAT,
|
||||||
ObTableDirectLoadHeartBeatExecutor, ObTableDirectLoadHeartBeatArg,
|
ObTableDirectLoadHeartBeatExecutor, ObTableDirectLoadHeartBeatArg,
|
||||||
ObTableDirectLoadHeartBeatRes);
|
ObTableDirectLoadHeartBeatRes);
|
||||||
|
|
||||||
|
|||||||
@ -46,7 +46,7 @@ public:
|
|||||||
{
|
{
|
||||||
};
|
};
|
||||||
|
|
||||||
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_1(name, pcode, Arg) \
|
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_1(prio, name, pcode, Arg) \
|
||||||
int name(const Arg &arg) \
|
int name(const Arg &arg) \
|
||||||
{ \
|
{ \
|
||||||
int ret = OB_SUCCESS; \
|
int ret = OB_SUCCESS; \
|
||||||
@ -59,6 +59,9 @@ public:
|
|||||||
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
|
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
|
||||||
.timeout(timeout_) \
|
.timeout(timeout_) \
|
||||||
.by(tenant_id_) \
|
.by(tenant_id_) \
|
||||||
|
.group_id(ObTableLoadRpcPriority::HIGH_PRIO == prio \
|
||||||
|
? share::OBCG_DIRECT_LOAD_HIGH_PRIO \
|
||||||
|
: share::OBCG_DEFAULT) \
|
||||||
.direct_load_control(request, result))) { \
|
.direct_load_control(request, result))) { \
|
||||||
SERVER_LOG(WARN, "fail to rpc call direct load control", K(ret), K_(addr), K(request)); \
|
SERVER_LOG(WARN, "fail to rpc call direct load control", K(ret), K_(addr), K(request)); \
|
||||||
} else if (OB_UNLIKELY(result.command_type_ != pcode)) { \
|
} else if (OB_UNLIKELY(result.command_type_ != pcode)) { \
|
||||||
@ -71,7 +74,7 @@ public:
|
|||||||
return ret; \
|
return ret; \
|
||||||
}
|
}
|
||||||
|
|
||||||
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_2(name, pcode, Arg, Res) \
|
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_2(prio, name, pcode, Arg, Res) \
|
||||||
int name(const Arg &arg, Res &res) \
|
int name(const Arg &arg, Res &res) \
|
||||||
{ \
|
{ \
|
||||||
int ret = OB_SUCCESS; \
|
int ret = OB_SUCCESS; \
|
||||||
@ -84,6 +87,9 @@ public:
|
|||||||
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
|
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
|
||||||
.timeout(timeout_) \
|
.timeout(timeout_) \
|
||||||
.by(tenant_id_) \
|
.by(tenant_id_) \
|
||||||
|
.group_id(ObTableLoadRpcPriority::HIGH_PRIO == prio \
|
||||||
|
? share::OBCG_DIRECT_LOAD_HIGH_PRIO \
|
||||||
|
: share::OBCG_DEFAULT) \
|
||||||
.direct_load_control(request, result))) { \
|
.direct_load_control(request, result))) { \
|
||||||
SERVER_LOG(WARN, "fail to rpc call direct load control", K(ret), K_(addr), K(request)); \
|
SERVER_LOG(WARN, "fail to rpc call direct load control", K(ret), K_(addr), K(request)); \
|
||||||
} else if (OB_UNLIKELY(result.command_type_ != pcode)) { \
|
} else if (OB_UNLIKELY(result.command_type_ != pcode)) { \
|
||||||
@ -95,13 +101,14 @@ public:
|
|||||||
return ret; \
|
return ret; \
|
||||||
}
|
}
|
||||||
|
|
||||||
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL(name, pcode, ...) \
|
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL(prio, name, pcode, ...) \
|
||||||
CONCAT(OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_, ARGS_NUM(__VA_ARGS__))(name, pcode, __VA_ARGS__)
|
CONCAT(OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_, ARGS_NUM(__VA_ARGS__)) \
|
||||||
|
(prio, name, pcode, __VA_ARGS__)
|
||||||
|
|
||||||
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC(name, pcode, Processor, ...) \
|
#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC(prio, name, pcode, Processor, ...) \
|
||||||
OB_DEFINE_TABLE_LOAD_RPC(ObTableLoadControlRpc, pcode, Processor, ObDirectLoadControlRequest, \
|
OB_DEFINE_TABLE_LOAD_RPC(ObTableLoadControlRpc, pcode, Processor, ObDirectLoadControlRequest, \
|
||||||
ObDirectLoadControlResult, __VA_ARGS__) \
|
ObDirectLoadControlResult, __VA_ARGS__) \
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL(name, pcode, __VA_ARGS__)
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL(ObTableLoadRpcPriority::prio, name, pcode, __VA_ARGS__)
|
||||||
|
|
||||||
public:
|
public:
|
||||||
ObTableLoadControlRpcProxy(obrpc::ObSrvRpcProxy &rpc_proxy)
|
ObTableLoadControlRpcProxy(obrpc::ObSrvRpcProxy &rpc_proxy)
|
||||||
@ -133,70 +140,79 @@ public:
|
|||||||
common::ObIAllocator &allocator);
|
common::ObIAllocator &allocator);
|
||||||
|
|
||||||
// pre_begin
|
// pre_begin
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(pre_begin, ObDirectLoadControlCommandType::PRE_BEGIN,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, pre_begin,
|
||||||
|
ObDirectLoadControlCommandType::PRE_BEGIN,
|
||||||
ObDirectLoadControlPreBeginExecutor,
|
ObDirectLoadControlPreBeginExecutor,
|
||||||
ObDirectLoadControlPreBeginArg);
|
ObDirectLoadControlPreBeginArg);
|
||||||
// confirm_begin
|
// confirm_begin
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(confirm_begin, ObDirectLoadControlCommandType::CONFIRM_BEGIN,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, confirm_begin,
|
||||||
|
ObDirectLoadControlCommandType::CONFIRM_BEGIN,
|
||||||
ObDirectLoadControlConfirmBeginExecutor,
|
ObDirectLoadControlConfirmBeginExecutor,
|
||||||
ObDirectLoadControlConfirmBeginArg);
|
ObDirectLoadControlConfirmBeginArg);
|
||||||
// pre_merge
|
// pre_merge
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(pre_merge, ObDirectLoadControlCommandType::PRE_MERGE,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, pre_merge,
|
||||||
|
ObDirectLoadControlCommandType::PRE_MERGE,
|
||||||
ObDirectLoadControlPreMergeExecutor,
|
ObDirectLoadControlPreMergeExecutor,
|
||||||
ObDirectLoadControlPreMergeArg);
|
ObDirectLoadControlPreMergeArg);
|
||||||
// start_merge
|
// start_merge
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(start_merge, ObDirectLoadControlCommandType::START_MERGE,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, start_merge,
|
||||||
|
ObDirectLoadControlCommandType::START_MERGE,
|
||||||
ObDirectLoadControlStartMergeExecutor,
|
ObDirectLoadControlStartMergeExecutor,
|
||||||
ObDirectLoadControlStartMergeArg);
|
ObDirectLoadControlStartMergeArg);
|
||||||
// commit
|
// commit
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(commit, ObDirectLoadControlCommandType::COMMIT,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, commit, ObDirectLoadControlCommandType::COMMIT,
|
||||||
ObDirectLoadControlCommitExecutor, ObDirectLoadControlCommitArg,
|
ObDirectLoadControlCommitExecutor, ObDirectLoadControlCommitArg,
|
||||||
ObDirectLoadControlCommitRes);
|
ObDirectLoadControlCommitRes);
|
||||||
// abort
|
// abort
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(abort, ObDirectLoadControlCommandType::ABORT,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, abort, ObDirectLoadControlCommandType::ABORT,
|
||||||
ObDirectLoadControlAbortExecutor, ObDirectLoadControlAbortArg,
|
ObDirectLoadControlAbortExecutor, ObDirectLoadControlAbortArg,
|
||||||
ObDirectLoadControlAbortRes);
|
ObDirectLoadControlAbortRes);
|
||||||
// get_status
|
// get_status
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(get_status, ObDirectLoadControlCommandType::GET_STATUS,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, get_status,
|
||||||
|
ObDirectLoadControlCommandType::GET_STATUS,
|
||||||
ObDirectLoadControlGetStatusExecutor,
|
ObDirectLoadControlGetStatusExecutor,
|
||||||
ObDirectLoadControlGetStatusArg,
|
ObDirectLoadControlGetStatusArg,
|
||||||
ObDirectLoadControlGetStatusRes);
|
ObDirectLoadControlGetStatusRes);
|
||||||
// heart_beat
|
// heart_beat
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(heart_beat, ObDirectLoadControlCommandType::HEART_BEAT,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(HIGH_PRIO, heart_beat,
|
||||||
|
ObDirectLoadControlCommandType::HEART_BEAT,
|
||||||
ObDirectLoadControlHeartBeatExecutor,
|
ObDirectLoadControlHeartBeatExecutor,
|
||||||
ObDirectLoadControlHeartBeatArg);
|
ObDirectLoadControlHeartBeatArg);
|
||||||
|
|
||||||
/// trans
|
/// trans
|
||||||
// pre_start_trans
|
// pre_start_trans
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(pre_start_trans, ObDirectLoadControlCommandType::PRE_START_TRANS,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, pre_start_trans,
|
||||||
|
ObDirectLoadControlCommandType::PRE_START_TRANS,
|
||||||
ObDirectLoadControlPreStartTransExecutor,
|
ObDirectLoadControlPreStartTransExecutor,
|
||||||
ObDirectLoadControlPreStartTransArg);
|
ObDirectLoadControlPreStartTransArg);
|
||||||
// confirm_start_trans
|
// confirm_start_trans
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(confirm_start_trans,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, confirm_start_trans,
|
||||||
ObDirectLoadControlCommandType::CONFIRM_START_TRANS,
|
ObDirectLoadControlCommandType::CONFIRM_START_TRANS,
|
||||||
ObDirectLoadControlConfirmStartTransExecutor,
|
ObDirectLoadControlConfirmStartTransExecutor,
|
||||||
ObDirectLoadControlConfirmStartTransArg);
|
ObDirectLoadControlConfirmStartTransArg);
|
||||||
// pre_finish_trans
|
// pre_finish_trans
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(pre_finish_trans,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, pre_finish_trans,
|
||||||
ObDirectLoadControlCommandType::PRE_FINISH_TRANS,
|
ObDirectLoadControlCommandType::PRE_FINISH_TRANS,
|
||||||
ObDirectLoadControlPreFinishTransExecutor,
|
ObDirectLoadControlPreFinishTransExecutor,
|
||||||
ObDirectLoadControlPreFinishTransArg);
|
ObDirectLoadControlPreFinishTransArg);
|
||||||
// confirm_finish_trans
|
// confirm_finish_trans
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(confirm_finish_trans,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, confirm_finish_trans,
|
||||||
ObDirectLoadControlCommandType::CONFIRM_FINISH_TRANS,
|
ObDirectLoadControlCommandType::CONFIRM_FINISH_TRANS,
|
||||||
ObDirectLoadControlConfirmFinishTransExecutor,
|
ObDirectLoadControlConfirmFinishTransExecutor,
|
||||||
ObDirectLoadControlConfirmFinishTransArg);
|
ObDirectLoadControlConfirmFinishTransArg);
|
||||||
// abandon_trans
|
// abandon_trans
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(abandon_trans, ObDirectLoadControlCommandType::ABANDON_TRANS,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, abandon_trans,
|
||||||
|
ObDirectLoadControlCommandType::ABANDON_TRANS,
|
||||||
ObDirectLoadControlAbandonTransExecutor,
|
ObDirectLoadControlAbandonTransExecutor,
|
||||||
ObDirectLoadControlAbandonTransArg);
|
ObDirectLoadControlAbandonTransArg);
|
||||||
// get_trans_status
|
// get_trans_status
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(get_trans_status,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, get_trans_status,
|
||||||
ObDirectLoadControlCommandType::GET_TRANS_STATUS,
|
ObDirectLoadControlCommandType::GET_TRANS_STATUS,
|
||||||
ObDirectLoadControlGetTransStatusExecutor,
|
ObDirectLoadControlGetTransStatusExecutor,
|
||||||
ObDirectLoadControlGetTransStatusArg,
|
ObDirectLoadControlGetTransStatusArg,
|
||||||
ObDirectLoadControlGetTransStatusRes);
|
ObDirectLoadControlGetTransStatusRes);
|
||||||
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(insert_trans, ObDirectLoadControlCommandType::INSERT_TRANS,
|
OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, insert_trans,
|
||||||
|
ObDirectLoadControlCommandType::INSERT_TRANS,
|
||||||
ObDirectLoadControlInsertTransExecutor,
|
ObDirectLoadControlInsertTransExecutor,
|
||||||
ObDirectLoadControlInsertTransArg);
|
ObDirectLoadControlInsertTransArg);
|
||||||
|
|
||||||
|
|||||||
@ -509,6 +509,9 @@ int ObTableLoadCoordinator::check_peers_merge_result(bool &is_finish)
|
|||||||
if (OB_UNLIKELY(ObTableLoadStatusType::ERROR == res.status_)) {
|
if (OB_UNLIKELY(ObTableLoadStatusType::ERROR == res.status_)) {
|
||||||
ret = res.error_code_;
|
ret = res.error_code_;
|
||||||
LOG_WARN("store has error", KR(ret), K(addr), K(res.status_));
|
LOG_WARN("store has error", KR(ret), K(addr), K(res.status_));
|
||||||
|
} else if (OB_UNLIKELY(ObTableLoadStatusType::ABORT == res.status_)) {
|
||||||
|
ret = OB_SUCCESS != res.error_code_ ? res.error_code_ : OB_CANCELED;
|
||||||
|
LOG_WARN("store has abort", KR(ret), K(addr), K(res.status_));
|
||||||
} else if (OB_UNLIKELY(ObTableLoadStatusType::MERGING != res.status_ &&
|
} else if (OB_UNLIKELY(ObTableLoadStatusType::MERGING != res.status_ &&
|
||||||
ObTableLoadStatusType::MERGED != res.status_)) {
|
ObTableLoadStatusType::MERGED != res.status_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
|||||||
@ -21,6 +21,12 @@ namespace oceanbase
|
|||||||
{
|
{
|
||||||
namespace observer
|
namespace observer
|
||||||
{
|
{
|
||||||
|
enum class ObTableLoadRpcPriority
|
||||||
|
{
|
||||||
|
NORMAL_PRIO = 0,
|
||||||
|
HIGH_PRIO = 1,
|
||||||
|
};
|
||||||
|
|
||||||
// template <typename pcode, typename IGNORE = void>
|
// template <typename pcode, typename IGNORE = void>
|
||||||
// struct ObTableLoadRpc
|
// struct ObTableLoadRpc
|
||||||
// {
|
// {
|
||||||
|
|||||||
@ -383,7 +383,7 @@ int ObTableLoadStoreCtx::check_status(ObTableLoadStatusType status) const
|
|||||||
if (ObTableLoadStatusType::ERROR == status_) {
|
if (ObTableLoadStatusType::ERROR == status_) {
|
||||||
ret = error_code_;
|
ret = error_code_;
|
||||||
} else if (ObTableLoadStatusType::ABORT == status_) {
|
} else if (ObTableLoadStatusType::ABORT == status_) {
|
||||||
ret = OB_CANCELED;
|
ret = OB_SUCCESS != error_code_ ? error_code_ : OB_CANCELED;
|
||||||
} else {
|
} else {
|
||||||
ret = OB_STATE_NOT_MATCH;
|
ret = OB_STATE_NOT_MATCH;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -33,5 +33,5 @@ CGID_DEF(OBCG_DBA_COMMAND, 18, 1)
|
|||||||
CGID_DEF(OBCG_STORAGE, 19)
|
CGID_DEF(OBCG_STORAGE, 19)
|
||||||
CGID_DEF(OBCG_LOCK, 20)
|
CGID_DEF(OBCG_LOCK, 20)
|
||||||
CGID_DEF(OBCG_UNLOCK, 21)
|
CGID_DEF(OBCG_UNLOCK, 21)
|
||||||
// CGID_DEF(OBCG_DIRECT_LOAD_HIGH_PRIO, 22)
|
CGID_DEF(OBCG_DIRECT_LOAD_HIGH_PRIO, 22)
|
||||||
CGID_DEF(OBCG_LQ, 100)
|
CGID_DEF(OBCG_LQ, 100)
|
||||||
|
|||||||
Reference in New Issue
Block a user