diff --git a/src/observer/omt/ob_tenant.h b/src/observer/omt/ob_tenant.h index 02e48ca0b..de5f21eeb 100644 --- a/src/observer/omt/ob_tenant.h +++ b/src/observer/omt/ob_tenant.h @@ -639,6 +639,8 @@ OB_INLINE int64_t ObResourceGroup::max_worker_cnt() const cnt = 2; // one for take snapshot, one for purge } else if (share::OBCG_DBA_COMMAND == group_id_) { cnt = 1; + } else if (share::OBCG_DIRECT_LOAD_HIGH_PRIO == group_id_) { + cnt = 1; } return cnt; } diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.h b/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.h index a915d717f..3ae22ab91 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.h @@ -14,6 +14,7 @@ #include "ob_table_direct_load_rpc_struct.h" #include "observer/table_load/ob_table_load_rpc_executor.h" +#include "share/resource_manager/ob_cgroup_ctrl.h" #include "share/table/ob_table_rpc_proxy.h" namespace oceanbase @@ -37,7 +38,7 @@ public: { }; -#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_1(name, pcode, Arg) \ +#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_1(prio, name, pcode, Arg) \ int name(const Arg &arg) \ { \ int ret = OB_SUCCESS; \ @@ -51,6 +52,9 @@ public: } else if (OB_FAIL(rpc_proxy_.to(addr_) \ .timeout(timeout_) \ .by(tenant_id_) \ + .group_id(ObTableLoadRpcPriority::HIGH_PRIO == prio \ + ? share::OBCG_DIRECT_LOAD_HIGH_PRIO \ + : share::OBCG_DEFAULT) \ .direct_load(request, result))) { \ SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \ } else if (OB_UNLIKELY(result.header_.operation_type_ != pcode)) { \ @@ -63,7 +67,7 @@ public: return ret; \ } -#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_2(name, pcode, Arg, Res) \ +#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_2(prio, name, pcode, Arg, Res) \ int name(const Arg &arg, Res &res) \ { \ int ret = OB_SUCCESS; \ @@ -77,6 +81,9 @@ public: } else if (OB_FAIL(rpc_proxy_.to(addr_) \ .timeout(timeout_) \ .by(tenant_id_) \ + .group_id(ObTableLoadRpcPriority::HIGH_PRIO == prio \ + ? share::OBCG_DIRECT_LOAD_HIGH_PRIO \ + : share::OBCG_DEFAULT) \ .direct_load(request, result))) { \ SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \ } else if (OB_UNLIKELY(result.header_.operation_type_ != pcode)) { \ @@ -88,14 +95,15 @@ public: return ret; \ } -#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(name, pcode, ...) \ - CONCAT(OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_, ARGS_NUM(__VA_ARGS__))(name, pcode, __VA_ARGS__) +#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(prio, name, pcode, ...) \ + CONCAT(OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_, ARGS_NUM(__VA_ARGS__)) \ + (prio, name, pcode, __VA_ARGS__) -#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC(name, pcode, Processor, ...) \ +#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC(prio, name, pcode, Processor, ...) \ OB_DEFINE_TABLE_LOAD_RPC(ObTableDirectLoadRpc, pcode, Processor, \ table::ObTableDirectLoadRequest, table::ObTableDirectLoadResult, \ __VA_ARGS__) \ - OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(name, pcode, __VA_ARGS__) + OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(ObTableLoadRpcPriority::prio, name, pcode, __VA_ARGS__) public: ObTableDirectLoadRpcProxy(obrpc::ObTableRpcProxy &rpc_proxy) @@ -133,25 +141,29 @@ public: table::ObTableDirectLoadResult &result); // begin - OB_DEFINE_TABLE_DIRECT_LOAD_RPC(begin, table::ObTableDirectLoadOperationType::BEGIN, + OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, begin, table::ObTableDirectLoadOperationType::BEGIN, ObTableDirectLoadBeginExecutor, ObTableDirectLoadBeginArg, ObTableDirectLoadBeginRes); // commit - OB_DEFINE_TABLE_DIRECT_LOAD_RPC(commit, table::ObTableDirectLoadOperationType::COMMIT, + OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, commit, + table::ObTableDirectLoadOperationType::COMMIT, ObTableDirectLoadCommitExecutor, ObTableDirectLoadCommitArg); // abort - OB_DEFINE_TABLE_DIRECT_LOAD_RPC(abort, table::ObTableDirectLoadOperationType::ABORT, + OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, abort, table::ObTableDirectLoadOperationType::ABORT, ObTableDirectLoadAbortExecutor, ObTableDirectLoadAbortArg); // get_status - OB_DEFINE_TABLE_DIRECT_LOAD_RPC(get_status, table::ObTableDirectLoadOperationType::GET_STATUS, + OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, get_status, + table::ObTableDirectLoadOperationType::GET_STATUS, ObTableDirectLoadGetStatusExecutor, ObTableDirectLoadGetStatusArg, ObTableDirectLoadGetStatusRes); // insert - OB_DEFINE_TABLE_DIRECT_LOAD_RPC(insert, table::ObTableDirectLoadOperationType::INSERT, + OB_DEFINE_TABLE_DIRECT_LOAD_RPC(NORMAL_PRIO, insert, + table::ObTableDirectLoadOperationType::INSERT, ObTableDirectLoadInsertExecutor, ObTableDirectLoadInsertArg); // heart_beat - OB_DEFINE_TABLE_DIRECT_LOAD_RPC(heartbeat, table::ObTableDirectLoadOperationType::HEART_BEAT, + OB_DEFINE_TABLE_DIRECT_LOAD_RPC(HIGH_PRIO, heartbeat, + table::ObTableDirectLoadOperationType::HEART_BEAT, ObTableDirectLoadHeartBeatExecutor, ObTableDirectLoadHeartBeatArg, ObTableDirectLoadHeartBeatRes); diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_proxy.h b/src/observer/table_load/control/ob_table_load_control_rpc_proxy.h index a47f77bab..32aefb830 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_proxy.h +++ b/src/observer/table_load/control/ob_table_load_control_rpc_proxy.h @@ -46,7 +46,7 @@ public: { }; -#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_1(name, pcode, Arg) \ +#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_1(prio, name, pcode, Arg) \ int name(const Arg &arg) \ { \ int ret = OB_SUCCESS; \ @@ -59,6 +59,9 @@ public: } else if (OB_FAIL(rpc_proxy_.to(addr_) \ .timeout(timeout_) \ .by(tenant_id_) \ + .group_id(ObTableLoadRpcPriority::HIGH_PRIO == prio \ + ? share::OBCG_DIRECT_LOAD_HIGH_PRIO \ + : share::OBCG_DEFAULT) \ .direct_load_control(request, result))) { \ SERVER_LOG(WARN, "fail to rpc call direct load control", K(ret), K_(addr), K(request)); \ } else if (OB_UNLIKELY(result.command_type_ != pcode)) { \ @@ -71,7 +74,7 @@ public: return ret; \ } -#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_2(name, pcode, Arg, Res) \ +#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_2(prio, name, pcode, Arg, Res) \ int name(const Arg &arg, Res &res) \ { \ int ret = OB_SUCCESS; \ @@ -84,6 +87,9 @@ public: } else if (OB_FAIL(rpc_proxy_.to(addr_) \ .timeout(timeout_) \ .by(tenant_id_) \ + .group_id(ObTableLoadRpcPriority::HIGH_PRIO == prio \ + ? share::OBCG_DIRECT_LOAD_HIGH_PRIO \ + : share::OBCG_DEFAULT) \ .direct_load_control(request, result))) { \ SERVER_LOG(WARN, "fail to rpc call direct load control", K(ret), K_(addr), K(request)); \ } else if (OB_UNLIKELY(result.command_type_ != pcode)) { \ @@ -95,13 +101,14 @@ public: return ret; \ } -#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL(name, pcode, ...) \ - CONCAT(OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_, ARGS_NUM(__VA_ARGS__))(name, pcode, __VA_ARGS__) +#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL(prio, name, pcode, ...) \ + CONCAT(OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL_, ARGS_NUM(__VA_ARGS__)) \ + (prio, name, pcode, __VA_ARGS__) -#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC(name, pcode, Processor, ...) \ +#define OB_DEFINE_TABLE_LOAD_CONTROL_RPC(prio, name, pcode, Processor, ...) \ OB_DEFINE_TABLE_LOAD_RPC(ObTableLoadControlRpc, pcode, Processor, ObDirectLoadControlRequest, \ ObDirectLoadControlResult, __VA_ARGS__) \ - OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL(name, pcode, __VA_ARGS__) + OB_DEFINE_TABLE_LOAD_CONTROL_RPC_CALL(ObTableLoadRpcPriority::prio, name, pcode, __VA_ARGS__) public: ObTableLoadControlRpcProxy(obrpc::ObSrvRpcProxy &rpc_proxy) @@ -133,70 +140,79 @@ public: common::ObIAllocator &allocator); // pre_begin - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(pre_begin, ObDirectLoadControlCommandType::PRE_BEGIN, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, pre_begin, + ObDirectLoadControlCommandType::PRE_BEGIN, ObDirectLoadControlPreBeginExecutor, ObDirectLoadControlPreBeginArg); // confirm_begin - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(confirm_begin, ObDirectLoadControlCommandType::CONFIRM_BEGIN, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, confirm_begin, + ObDirectLoadControlCommandType::CONFIRM_BEGIN, ObDirectLoadControlConfirmBeginExecutor, ObDirectLoadControlConfirmBeginArg); // pre_merge - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(pre_merge, ObDirectLoadControlCommandType::PRE_MERGE, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, pre_merge, + ObDirectLoadControlCommandType::PRE_MERGE, ObDirectLoadControlPreMergeExecutor, ObDirectLoadControlPreMergeArg); // start_merge - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(start_merge, ObDirectLoadControlCommandType::START_MERGE, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, start_merge, + ObDirectLoadControlCommandType::START_MERGE, ObDirectLoadControlStartMergeExecutor, ObDirectLoadControlStartMergeArg); // commit - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(commit, ObDirectLoadControlCommandType::COMMIT, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, commit, ObDirectLoadControlCommandType::COMMIT, ObDirectLoadControlCommitExecutor, ObDirectLoadControlCommitArg, ObDirectLoadControlCommitRes); // abort - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(abort, ObDirectLoadControlCommandType::ABORT, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, abort, ObDirectLoadControlCommandType::ABORT, ObDirectLoadControlAbortExecutor, ObDirectLoadControlAbortArg, ObDirectLoadControlAbortRes); // get_status - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(get_status, ObDirectLoadControlCommandType::GET_STATUS, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, get_status, + ObDirectLoadControlCommandType::GET_STATUS, ObDirectLoadControlGetStatusExecutor, ObDirectLoadControlGetStatusArg, ObDirectLoadControlGetStatusRes); // heart_beat - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(heart_beat, ObDirectLoadControlCommandType::HEART_BEAT, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(HIGH_PRIO, heart_beat, + ObDirectLoadControlCommandType::HEART_BEAT, ObDirectLoadControlHeartBeatExecutor, ObDirectLoadControlHeartBeatArg); /// trans // pre_start_trans - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(pre_start_trans, ObDirectLoadControlCommandType::PRE_START_TRANS, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, pre_start_trans, + ObDirectLoadControlCommandType::PRE_START_TRANS, ObDirectLoadControlPreStartTransExecutor, ObDirectLoadControlPreStartTransArg); // confirm_start_trans - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(confirm_start_trans, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, confirm_start_trans, ObDirectLoadControlCommandType::CONFIRM_START_TRANS, ObDirectLoadControlConfirmStartTransExecutor, ObDirectLoadControlConfirmStartTransArg); // pre_finish_trans - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(pre_finish_trans, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, pre_finish_trans, ObDirectLoadControlCommandType::PRE_FINISH_TRANS, ObDirectLoadControlPreFinishTransExecutor, ObDirectLoadControlPreFinishTransArg); // confirm_finish_trans - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(confirm_finish_trans, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, confirm_finish_trans, ObDirectLoadControlCommandType::CONFIRM_FINISH_TRANS, ObDirectLoadControlConfirmFinishTransExecutor, ObDirectLoadControlConfirmFinishTransArg); // abandon_trans - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(abandon_trans, ObDirectLoadControlCommandType::ABANDON_TRANS, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, abandon_trans, + ObDirectLoadControlCommandType::ABANDON_TRANS, ObDirectLoadControlAbandonTransExecutor, ObDirectLoadControlAbandonTransArg); // get_trans_status - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(get_trans_status, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, get_trans_status, ObDirectLoadControlCommandType::GET_TRANS_STATUS, ObDirectLoadControlGetTransStatusExecutor, ObDirectLoadControlGetTransStatusArg, ObDirectLoadControlGetTransStatusRes); - OB_DEFINE_TABLE_LOAD_CONTROL_RPC(insert_trans, ObDirectLoadControlCommandType::INSERT_TRANS, + OB_DEFINE_TABLE_LOAD_CONTROL_RPC(NORMAL_PRIO, insert_trans, + ObDirectLoadControlCommandType::INSERT_TRANS, ObDirectLoadControlInsertTransExecutor, ObDirectLoadControlInsertTransArg); diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index 231a6a932..ae644fed9 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -509,6 +509,9 @@ int ObTableLoadCoordinator::check_peers_merge_result(bool &is_finish) if (OB_UNLIKELY(ObTableLoadStatusType::ERROR == res.status_)) { ret = res.error_code_; LOG_WARN("store has error", KR(ret), K(addr), K(res.status_)); + } else if (OB_UNLIKELY(ObTableLoadStatusType::ABORT == res.status_)) { + ret = OB_SUCCESS != res.error_code_ ? res.error_code_ : OB_CANCELED; + LOG_WARN("store has abort", KR(ret), K(addr), K(res.status_)); } else if (OB_UNLIKELY(ObTableLoadStatusType::MERGING != res.status_ && ObTableLoadStatusType::MERGED != res.status_)) { ret = OB_ERR_UNEXPECTED; diff --git a/src/observer/table_load/ob_table_load_rpc_executor.h b/src/observer/table_load/ob_table_load_rpc_executor.h index 493425f29..6f03b6468 100644 --- a/src/observer/table_load/ob_table_load_rpc_executor.h +++ b/src/observer/table_load/ob_table_load_rpc_executor.h @@ -21,6 +21,12 @@ namespace oceanbase { namespace observer { +enum class ObTableLoadRpcPriority +{ + NORMAL_PRIO = 0, + HIGH_PRIO = 1, +}; + // template // struct ObTableLoadRpc // { diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index fe771087a..c3daa59b7 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -383,7 +383,7 @@ int ObTableLoadStoreCtx::check_status(ObTableLoadStatusType status) const if (ObTableLoadStatusType::ERROR == status_) { ret = error_code_; } else if (ObTableLoadStatusType::ABORT == status_) { - ret = OB_CANCELED; + ret = OB_SUCCESS != error_code_ ? error_code_ : OB_CANCELED; } else { ret = OB_STATE_NOT_MATCH; } diff --git a/src/share/resource_manager/ob_group_list.h b/src/share/resource_manager/ob_group_list.h index e753bf75d..c9d8e5dcf 100644 --- a/src/share/resource_manager/ob_group_list.h +++ b/src/share/resource_manager/ob_group_list.h @@ -33,5 +33,5 @@ CGID_DEF(OBCG_DBA_COMMAND, 18, 1) CGID_DEF(OBCG_STORAGE, 19) CGID_DEF(OBCG_LOCK, 20) CGID_DEF(OBCG_UNLOCK, 21) -// CGID_DEF(OBCG_DIRECT_LOAD_HIGH_PRIO, 22) +CGID_DEF(OBCG_DIRECT_LOAD_HIGH_PRIO, 22) CGID_DEF(OBCG_LQ, 100)