diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.h b/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.h index fda634ff4d..1e68b74c63 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_proxy.h @@ -36,49 +36,55 @@ public: { }; -#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_1(name, pcode, Arg) \ - int name(const Arg &arg) \ - { \ - int ret = OB_SUCCESS; \ - obrpc::ObTableRpcProxy rpc_proxy = rpc_proxy_.to(addr_); \ - table::ObTableDirectLoadRequest request; \ - table::ObTableDirectLoadResult result; \ - request.credential_ = credential_; \ - request.operation_type_ = pcode; \ - if (OB_FAIL(request.set_arg(arg, allocator_))) { \ - SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \ - } else if (OB_FAIL(rpc_proxy.timeout(timeout_).by(tenant_id_).direct_load(request, result))) { \ - SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \ - } else if (OB_UNLIKELY(result.operation_type_ != pcode)) { \ - ret = OB_ERR_UNEXPECTED; \ - SERVER_LOG(WARN, "unexpected operation type", K(ret), K(request), K(result)); \ - } else if (OB_UNLIKELY(!result.res_content_.empty())) { \ - ret = OB_ERR_UNEXPECTED; \ - SERVER_LOG(WARN, "unexpected non empty res content", K(ret), K(result)); \ - } \ - return ret; \ +#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_1(name, pcode, Arg) \ + int name(const Arg &arg) \ + { \ + int ret = OB_SUCCESS; \ + table::ObTableDirectLoadRequest request; \ + table::ObTableDirectLoadResult result; \ + request.credential_ = credential_; \ + request.operation_type_ = pcode; \ + result.allocator_ = &allocator_; \ + if (OB_FAIL(request.set_arg(arg, allocator_))) { \ + SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \ + } else if (OB_FAIL(rpc_proxy_.to(addr_) \ + .timeout(timeout_) \ + .by(tenant_id_) \ + .direct_load(request, result))) { \ + SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \ + } else if (OB_UNLIKELY(result.operation_type_ != pcode)) { \ + ret = OB_ERR_UNEXPECTED; \ + SERVER_LOG(WARN, "unexpected operation type", K(ret), K(request), K(result)); \ + } else if (OB_UNLIKELY(!result.res_content_.empty())) { \ + ret = OB_ERR_UNEXPECTED; \ + SERVER_LOG(WARN, "unexpected non empty res content", K(ret), K(result)); \ + } \ + return ret; \ } -#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_2(name, pcode, Arg, Res) \ - int name(const Arg &arg, Res &res) \ - { \ - int ret = OB_SUCCESS; \ - obrpc::ObTableRpcProxy rpc_proxy = rpc_proxy_.to(addr_); \ - table::ObTableDirectLoadRequest request; \ - table::ObTableDirectLoadResult result; \ - request.credential_ = credential_; \ - request.operation_type_ = pcode; \ - if (OB_FAIL(request.set_arg(arg, allocator_))) { \ - SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \ - } else if (OB_FAIL(rpc_proxy.timeout(timeout_).by(tenant_id_).direct_load(request, result))) { \ - SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \ - } else if (OB_UNLIKELY(result.operation_type_ != pcode)) { \ - ret = OB_ERR_UNEXPECTED; \ - SERVER_LOG(WARN, "unexpected operation type", K(ret), K(request), K(result)); \ - } else if (OB_FAIL(result.get_res(res))) { \ - SERVER_LOG(WARN, "fail to get res", K(ret), K(result)); \ - } \ - return ret; \ +#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_2(name, pcode, Arg, Res) \ + int name(const Arg &arg, Res &res) \ + { \ + int ret = OB_SUCCESS; \ + table::ObTableDirectLoadRequest request; \ + table::ObTableDirectLoadResult result; \ + request.credential_ = credential_; \ + request.operation_type_ = pcode; \ + result.allocator_ = &allocator_; \ + if (OB_FAIL(request.set_arg(arg, allocator_))) { \ + SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \ + } else if (OB_FAIL(rpc_proxy_.to(addr_) \ + .timeout(timeout_) \ + .by(tenant_id_) \ + .direct_load(request, result))) { \ + SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \ + } else if (OB_UNLIKELY(result.operation_type_ != pcode)) { \ + ret = OB_ERR_UNEXPECTED; \ + SERVER_LOG(WARN, "unexpected operation type", K(ret), K(request), K(result)); \ + } else if (OB_FAIL(result.get_res(res))) { \ + SERVER_LOG(WARN, "fail to get res", K(ret), K(result)); \ + } \ + return ret; \ } #define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(name, pcode, ...) \ diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_proxy.h b/src/observer/table_load/control/ob_table_load_control_rpc_proxy.h index 31dffdb494..77c6d1b69c 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_proxy.h +++ b/src/observer/table_load/control/ob_table_load_control_rpc_proxy.h @@ -49,13 +49,14 @@ public: int name(const Arg &arg) \ { \ int ret = OB_SUCCESS; \ - obrpc::ObSrvRpcProxy rpc_proxy = rpc_proxy_.to(addr_); \ ObDirectLoadControlRequest request; \ ObDirectLoadControlResult result; \ request.command_type_ = pcode; \ + result.allocator_ = &allocator_; \ if (OB_FAIL(request.set_arg(arg, allocator_))) { \ SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \ - } else if (OB_FAIL(rpc_proxy.timeout(timeout_) \ + } else if (OB_FAIL(rpc_proxy_.to(addr_) \ + .timeout(timeout_) \ .by(tenant_id_) \ .direct_load_control(request, result))) { \ SERVER_LOG(WARN, "fail to rpc call direct load control", K(ret), K_(addr), K(request)); \ @@ -73,13 +74,14 @@ public: int name(const Arg &arg, Res &res) \ { \ int ret = OB_SUCCESS; \ - obrpc::ObSrvRpcProxy rpc_proxy = rpc_proxy_.to(addr_); \ ObDirectLoadControlRequest request; \ ObDirectLoadControlResult result; \ request.command_type_ = pcode; \ + result.allocator_ = &allocator_; \ if (OB_FAIL(request.set_arg(arg, allocator_))) { \ SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \ - } else if (OB_FAIL(rpc_proxy.timeout(timeout_) \ + } else if (OB_FAIL(rpc_proxy_.to(addr_) \ + .timeout(timeout_) \ .by(tenant_id_) \ .direct_load_control(request, result))) { \ SERVER_LOG(WARN, "fail to rpc call direct load control", K(ret), K_(addr), K(request)); \ diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp index 8fbd244434..ba4c83998a 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp @@ -26,9 +26,32 @@ OB_SERIALIZE_MEMBER(ObDirectLoadControlRequest, command_type_, arg_content_); -OB_SERIALIZE_MEMBER(ObDirectLoadControlResult, - command_type_, - res_content_); +OB_UNIS_DEF_SERIALIZE(ObDirectLoadControlResult, + command_type_, + res_content_); + +OB_UNIS_DEF_SERIALIZE_SIZE(ObDirectLoadControlResult, + command_type_, + res_content_); + +OB_DEF_DESERIALIZE(ObDirectLoadControlResult) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(allocator_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null allocator in deserialize", K(ret)); + } else { + ObString tmp_res_content; + LST_DO_CODE(OB_UNIS_DECODE, + command_type_, + tmp_res_content); + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ob_write_string(*allocator_, tmp_res_content, res_content_))) { + LOG_WARN("fail to copy string", K(ret)); + } + } + return ret; +} // pre_begin ObDirectLoadControlPreBeginArg::ObDirectLoadControlPreBeginArg() diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_struct.h b/src/observer/table_load/control/ob_table_load_control_rpc_struct.h index 2c9ea8fd1e..7ed57199e7 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_struct.h +++ b/src/observer/table_load/control/ob_table_load_control_rpc_struct.h @@ -96,7 +96,10 @@ class ObDirectLoadControlResult OB_UNIS_VERSION(1); public: - ObDirectLoadControlResult() : command_type_(observer::ObDirectLoadControlCommandType::MAX_TYPE) {} + ObDirectLoadControlResult() + : allocator_(nullptr), command_type_(observer::ObDirectLoadControlCommandType::MAX_TYPE) + { + } template int set_res(const Res &res, common::ObIAllocator &allocator) { @@ -132,6 +135,7 @@ public: TO_STRING_KV(K_(command_type), "res_content", common::ObHexStringWrap(res_content_)); public: + common::ObIAllocator *allocator_; // for deserialize observer::ObDirectLoadControlCommandType command_type_; ObString res_content_; }; diff --git a/src/observer/table_load/ob_table_load_commit_processor.cpp b/src/observer/table_load/ob_table_load_commit_processor.cpp deleted file mode 100644 index bf685079d1..0000000000 --- a/src/observer/table_load/ob_table_load_commit_processor.cpp +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Copyright (c) 2021 OceanBase - * OceanBase CE is licensed under Mulan PubL v2. - * You can use this software according to the terms and conditions of the Mulan PubL v2. - * You may obtain a copy of Mulan PubL v2 at: - * http://license.coscl.org.cn/MulanPubL-2.0 - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - * See the Mulan PubL v2 for more details. - */ - -#define USING_LOG_PREFIX SERVER - -#include "observer/table_load/ob_table_load_commit_processor.h" -#include "observer/table_load/ob_table_load_coordinator.h" -#include "observer/table_load/ob_table_load_service.h" -#include "observer/table_load/ob_table_load_store.h" -#include "sql/engine/ob_exec_context.h" - -namespace oceanbase -{ -namespace observer -{ -using namespace table; -using namespace sql; - -/** - * ObTableLoadCommitP - */ - -int ObTableLoadCommitP::process() -{ - int ret = OB_SUCCESS; - if (OB_FAIL(check_user_access(arg_.credential_))) { - LOG_WARN("fail to check_user_access", KR(ret)); - } else if (OB_FAIL(ObTableLoadService::check_tenant())) { - LOG_WARN("fail to check tenant", KR(ret)); - } else { - ObTableLoadTableCtx *table_ctx = nullptr; - ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_); - if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) { - LOG_WARN("fail to get table ctx", KR(ret), K(key)); - } else { - ObTableLoadCoordinator coordinator(table_ctx); - if (OB_FAIL(coordinator.init())) { - LOG_WARN("fail to init coordinator", KR(ret)); - } else if (OB_FAIL(coordinator.commit(result_.result_info_))) { - LOG_WARN("fail to coordinator commit", KR(ret)); - } else if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) { - LOG_WARN("fail to remove table ctx", KR(ret), K(key)); - } - } - if (OB_NOT_NULL(table_ctx)) { - ObTableLoadService::put_ctx(table_ctx); - table_ctx = nullptr; - } - } - return ret; -} - -int ObTableLoadCommitP::check_user_access(const ObString &credential_str) -{ - return ObTableLoadUtils::check_user_access(credential_str, gctx_, credential_); -} - -/** - * ObTableLoadCommitPeerP - */ - -int ObTableLoadCommitPeerP::process() -{ - int ret = OB_SUCCESS; - if (OB_FAIL(check_user_access(arg_.credential_))) { - LOG_WARN("fail to check_user_access", KR(ret)); - } else if (OB_FAIL(ObTableLoadService::check_tenant())) { - LOG_WARN("fail to check tenant", KR(ret)); - } else { - ObTableLoadTableCtx *table_ctx = nullptr; - ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_); - if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) { - LOG_WARN("fail to get table ctx", KR(ret), K(key)); - } else { - ObTableLoadStore store(table_ctx); - if (OB_FAIL(store.init())) { - LOG_WARN("fail to init store", KR(ret)); - } else if (OB_FAIL(store.commit(result_.result_info_))) { - LOG_WARN("fail to store commit", KR(ret)); - } else if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) { - LOG_WARN("fail to remove table ctx", KR(ret), K(key)); - } - } - if (OB_NOT_NULL(table_ctx)) { - ObTableLoadService::put_ctx(table_ctx); - table_ctx = nullptr; - } - } - return ret; -} - -int ObTableLoadCommitPeerP::check_user_access(const ObString &credential_str) -{ - return ObTableLoadUtils::check_user_access(credential_str, gctx_, credential_); -} - -} // namespace observer -} // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index bfe389b6b7..b3272e46f1 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -31,7 +31,7 @@ using namespace share; using namespace sql; #define TABLE_LOAD_CONTROL_RPC_CALL(name, addr, arg, ...) \ - if (OB_SUCC(ret)) { \ + ({ \ ObTableLoadControlRpcProxy proxy(*GCTX.srv_rpc_proxy_); \ ObTimeoutCtx ctx; \ if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, DEFAULT_TIMEOUT_US))) { \ @@ -42,7 +42,7 @@ using namespace sql; .name(arg, ##__VA_ARGS__))) { \ LOG_WARN("fail to rpc call " #name, KR(ret), K(addr), K(arg)); \ } \ - } + }) ObTableLoadCoordinator::ObTableLoadCoordinator(ObTableLoadTableCtx *ctx) : ctx_(ctx), diff --git a/src/share/table/ob_table_rpc_struct.cpp b/src/share/table/ob_table_rpc_struct.cpp index 0f1b3f05d4..685d1c2071 100644 --- a/src/share/table/ob_table_rpc_struct.cpp +++ b/src/share/table/ob_table_rpc_struct.cpp @@ -97,6 +97,29 @@ OB_SERIALIZE_MEMBER(ObTableDirectLoadRequest, operation_type_, arg_content_); -OB_SERIALIZE_MEMBER(ObTableDirectLoadResult, - operation_type_, - res_content_); +OB_UNIS_DEF_SERIALIZE(ObTableDirectLoadResult, + operation_type_, + res_content_); + +OB_UNIS_DEF_SERIALIZE_SIZE(ObTableDirectLoadResult, + operation_type_, + res_content_); + +OB_DEF_DESERIALIZE(ObTableDirectLoadResult) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(allocator_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null allocator in deserialize", K(ret)); + } else { + ObString tmp_res_content; + LST_DO_CODE(OB_UNIS_DECODE, + operation_type_, + tmp_res_content); + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ob_write_string(*allocator_, tmp_res_content, res_content_))) { + LOG_WARN("fail to copy string", K(ret)); + } + } + return ret; +} diff --git a/src/share/table/ob_table_rpc_struct.h b/src/share/table/ob_table_rpc_struct.h index 27f578edc5..41e0be0b1e 100644 --- a/src/share/table/ob_table_rpc_struct.h +++ b/src/share/table/ob_table_rpc_struct.h @@ -307,7 +307,10 @@ class ObTableDirectLoadResult { OB_UNIS_VERSION(1); public: - ObTableDirectLoadResult() : operation_type_(ObTableDirectLoadOperationType::MAX_TYPE) {} + ObTableDirectLoadResult() + : allocator_(nullptr), operation_type_(ObTableDirectLoadOperationType::MAX_TYPE) + { + } template int set_res(const Res &res, common::ObIAllocator &allocator) { @@ -342,6 +345,7 @@ public: } TO_STRING_KV(K_(operation_type), "res_content", common::ObHexStringWrap(res_content_)); public: + common::ObIAllocator *allocator_; // for deserialize ObTableDirectLoadOperationType operation_type_; ObString res_content_; };