Fix direct load rpc get res memory sanity abort

This commit is contained in:
suz-yang 2023-07-13 15:48:16 +00:00 committed by ob-robot
parent 3c24bfb3f0
commit 6ccb87449f
8 changed files with 117 additions and 162 deletions

View File

@ -36,49 +36,55 @@ public:
{
};
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_1(name, pcode, Arg) \
int name(const Arg &arg) \
{ \
int ret = OB_SUCCESS; \
obrpc::ObTableRpcProxy rpc_proxy = rpc_proxy_.to(addr_); \
table::ObTableDirectLoadRequest request; \
table::ObTableDirectLoadResult result; \
request.credential_ = credential_; \
request.operation_type_ = pcode; \
if (OB_FAIL(request.set_arg(arg, allocator_))) { \
SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \
} else if (OB_FAIL(rpc_proxy.timeout(timeout_).by(tenant_id_).direct_load(request, result))) { \
SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \
} else if (OB_UNLIKELY(result.operation_type_ != pcode)) { \
ret = OB_ERR_UNEXPECTED; \
SERVER_LOG(WARN, "unexpected operation type", K(ret), K(request), K(result)); \
} else if (OB_UNLIKELY(!result.res_content_.empty())) { \
ret = OB_ERR_UNEXPECTED; \
SERVER_LOG(WARN, "unexpected non empty res content", K(ret), K(result)); \
} \
return ret; \
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_1(name, pcode, Arg) \
int name(const Arg &arg) \
{ \
int ret = OB_SUCCESS; \
table::ObTableDirectLoadRequest request; \
table::ObTableDirectLoadResult result; \
request.credential_ = credential_; \
request.operation_type_ = pcode; \
result.allocator_ = &allocator_; \
if (OB_FAIL(request.set_arg(arg, allocator_))) { \
SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
.timeout(timeout_) \
.by(tenant_id_) \
.direct_load(request, result))) { \
SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \
} else if (OB_UNLIKELY(result.operation_type_ != pcode)) { \
ret = OB_ERR_UNEXPECTED; \
SERVER_LOG(WARN, "unexpected operation type", K(ret), K(request), K(result)); \
} else if (OB_UNLIKELY(!result.res_content_.empty())) { \
ret = OB_ERR_UNEXPECTED; \
SERVER_LOG(WARN, "unexpected non empty res content", K(ret), K(result)); \
} \
return ret; \
}
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_2(name, pcode, Arg, Res) \
int name(const Arg &arg, Res &res) \
{ \
int ret = OB_SUCCESS; \
obrpc::ObTableRpcProxy rpc_proxy = rpc_proxy_.to(addr_); \
table::ObTableDirectLoadRequest request; \
table::ObTableDirectLoadResult result; \
request.credential_ = credential_; \
request.operation_type_ = pcode; \
if (OB_FAIL(request.set_arg(arg, allocator_))) { \
SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \
} else if (OB_FAIL(rpc_proxy.timeout(timeout_).by(tenant_id_).direct_load(request, result))) { \
SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \
} else if (OB_UNLIKELY(result.operation_type_ != pcode)) { \
ret = OB_ERR_UNEXPECTED; \
SERVER_LOG(WARN, "unexpected operation type", K(ret), K(request), K(result)); \
} else if (OB_FAIL(result.get_res(res))) { \
SERVER_LOG(WARN, "fail to get res", K(ret), K(result)); \
} \
return ret; \
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL_2(name, pcode, Arg, Res) \
int name(const Arg &arg, Res &res) \
{ \
int ret = OB_SUCCESS; \
table::ObTableDirectLoadRequest request; \
table::ObTableDirectLoadResult result; \
request.credential_ = credential_; \
request.operation_type_ = pcode; \
result.allocator_ = &allocator_; \
if (OB_FAIL(request.set_arg(arg, allocator_))) { \
SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
.timeout(timeout_) \
.by(tenant_id_) \
.direct_load(request, result))) { \
SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \
} else if (OB_UNLIKELY(result.operation_type_ != pcode)) { \
ret = OB_ERR_UNEXPECTED; \
SERVER_LOG(WARN, "unexpected operation type", K(ret), K(request), K(result)); \
} else if (OB_FAIL(result.get_res(res))) { \
SERVER_LOG(WARN, "fail to get res", K(ret), K(result)); \
} \
return ret; \
}
#define OB_DEFINE_TABLE_DIRECT_LOAD_RPC_CALL(name, pcode, ...) \

View File

@ -49,13 +49,14 @@ public:
int name(const Arg &arg) \
{ \
int ret = OB_SUCCESS; \
obrpc::ObSrvRpcProxy rpc_proxy = rpc_proxy_.to(addr_); \
ObDirectLoadControlRequest request; \
ObDirectLoadControlResult result; \
request.command_type_ = pcode; \
result.allocator_ = &allocator_; \
if (OB_FAIL(request.set_arg(arg, allocator_))) { \
SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \
} else if (OB_FAIL(rpc_proxy.timeout(timeout_) \
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
.timeout(timeout_) \
.by(tenant_id_) \
.direct_load_control(request, result))) { \
SERVER_LOG(WARN, "fail to rpc call direct load control", K(ret), K_(addr), K(request)); \
@ -73,13 +74,14 @@ public:
int name(const Arg &arg, Res &res) \
{ \
int ret = OB_SUCCESS; \
obrpc::ObSrvRpcProxy rpc_proxy = rpc_proxy_.to(addr_); \
ObDirectLoadControlRequest request; \
ObDirectLoadControlResult result; \
request.command_type_ = pcode; \
result.allocator_ = &allocator_; \
if (OB_FAIL(request.set_arg(arg, allocator_))) { \
SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \
} else if (OB_FAIL(rpc_proxy.timeout(timeout_) \
} else if (OB_FAIL(rpc_proxy_.to(addr_) \
.timeout(timeout_) \
.by(tenant_id_) \
.direct_load_control(request, result))) { \
SERVER_LOG(WARN, "fail to rpc call direct load control", K(ret), K_(addr), K(request)); \

View File

@ -26,9 +26,32 @@ OB_SERIALIZE_MEMBER(ObDirectLoadControlRequest,
command_type_,
arg_content_);
OB_SERIALIZE_MEMBER(ObDirectLoadControlResult,
command_type_,
res_content_);
OB_UNIS_DEF_SERIALIZE(ObDirectLoadControlResult,
command_type_,
res_content_);
OB_UNIS_DEF_SERIALIZE_SIZE(ObDirectLoadControlResult,
command_type_,
res_content_);
OB_DEF_DESERIALIZE(ObDirectLoadControlResult)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(allocator_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null allocator in deserialize", K(ret));
} else {
ObString tmp_res_content;
LST_DO_CODE(OB_UNIS_DECODE,
command_type_,
tmp_res_content);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ob_write_string(*allocator_, tmp_res_content, res_content_))) {
LOG_WARN("fail to copy string", K(ret));
}
}
return ret;
}
// pre_begin
ObDirectLoadControlPreBeginArg::ObDirectLoadControlPreBeginArg()

View File

@ -96,7 +96,10 @@ class ObDirectLoadControlResult
OB_UNIS_VERSION(1);
public:
ObDirectLoadControlResult() : command_type_(observer::ObDirectLoadControlCommandType::MAX_TYPE) {}
ObDirectLoadControlResult()
: allocator_(nullptr), command_type_(observer::ObDirectLoadControlCommandType::MAX_TYPE)
{
}
template <class Res>
int set_res(const Res &res, common::ObIAllocator &allocator)
{
@ -132,6 +135,7 @@ public:
TO_STRING_KV(K_(command_type), "res_content", common::ObHexStringWrap(res_content_));
public:
common::ObIAllocator *allocator_; // for deserialize
observer::ObDirectLoadControlCommandType command_type_;
ObString res_content_;
};

View File

@ -1,107 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include "observer/table_load/ob_table_load_commit_processor.h"
#include "observer/table_load/ob_table_load_coordinator.h"
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_store.h"
#include "sql/engine/ob_exec_context.h"
namespace oceanbase
{
namespace observer
{
using namespace table;
using namespace sql;
/**
* ObTableLoadCommitP
*/
int ObTableLoadCommitP::process()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_user_access(arg_.credential_))) {
LOG_WARN("fail to check_user_access", KR(ret));
} else if (OB_FAIL(ObTableLoadService::check_tenant())) {
LOG_WARN("fail to check tenant", KR(ret));
} else {
ObTableLoadTableCtx *table_ctx = nullptr;
ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_);
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {
LOG_WARN("fail to get table ctx", KR(ret), K(key));
} else {
ObTableLoadCoordinator coordinator(table_ctx);
if (OB_FAIL(coordinator.init())) {
LOG_WARN("fail to init coordinator", KR(ret));
} else if (OB_FAIL(coordinator.commit(result_.result_info_))) {
LOG_WARN("fail to coordinator commit", KR(ret));
} else if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {
LOG_WARN("fail to remove table ctx", KR(ret), K(key));
}
}
if (OB_NOT_NULL(table_ctx)) {
ObTableLoadService::put_ctx(table_ctx);
table_ctx = nullptr;
}
}
return ret;
}
int ObTableLoadCommitP::check_user_access(const ObString &credential_str)
{
return ObTableLoadUtils::check_user_access(credential_str, gctx_, credential_);
}
/**
* ObTableLoadCommitPeerP
*/
int ObTableLoadCommitPeerP::process()
{
int ret = OB_SUCCESS;
if (OB_FAIL(check_user_access(arg_.credential_))) {
LOG_WARN("fail to check_user_access", KR(ret));
} else if (OB_FAIL(ObTableLoadService::check_tenant())) {
LOG_WARN("fail to check tenant", KR(ret));
} else {
ObTableLoadTableCtx *table_ctx = nullptr;
ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_);
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {
LOG_WARN("fail to get table ctx", KR(ret), K(key));
} else {
ObTableLoadStore store(table_ctx);
if (OB_FAIL(store.init())) {
LOG_WARN("fail to init store", KR(ret));
} else if (OB_FAIL(store.commit(result_.result_info_))) {
LOG_WARN("fail to store commit", KR(ret));
} else if (OB_FAIL(ObTableLoadService::remove_ctx(table_ctx))) {
LOG_WARN("fail to remove table ctx", KR(ret), K(key));
}
}
if (OB_NOT_NULL(table_ctx)) {
ObTableLoadService::put_ctx(table_ctx);
table_ctx = nullptr;
}
}
return ret;
}
int ObTableLoadCommitPeerP::check_user_access(const ObString &credential_str)
{
return ObTableLoadUtils::check_user_access(credential_str, gctx_, credential_);
}
} // namespace observer
} // namespace oceanbase

View File

@ -31,7 +31,7 @@ using namespace share;
using namespace sql;
#define TABLE_LOAD_CONTROL_RPC_CALL(name, addr, arg, ...) \
if (OB_SUCC(ret)) { \
({ \
ObTableLoadControlRpcProxy proxy(*GCTX.srv_rpc_proxy_); \
ObTimeoutCtx ctx; \
if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, DEFAULT_TIMEOUT_US))) { \
@ -42,7 +42,7 @@ using namespace sql;
.name(arg, ##__VA_ARGS__))) { \
LOG_WARN("fail to rpc call " #name, KR(ret), K(addr), K(arg)); \
} \
}
})
ObTableLoadCoordinator::ObTableLoadCoordinator(ObTableLoadTableCtx *ctx)
: ctx_(ctx),

View File

@ -97,6 +97,29 @@ OB_SERIALIZE_MEMBER(ObTableDirectLoadRequest,
operation_type_,
arg_content_);
OB_SERIALIZE_MEMBER(ObTableDirectLoadResult,
operation_type_,
res_content_);
OB_UNIS_DEF_SERIALIZE(ObTableDirectLoadResult,
operation_type_,
res_content_);
OB_UNIS_DEF_SERIALIZE_SIZE(ObTableDirectLoadResult,
operation_type_,
res_content_);
OB_DEF_DESERIALIZE(ObTableDirectLoadResult)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(allocator_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null allocator in deserialize", K(ret));
} else {
ObString tmp_res_content;
LST_DO_CODE(OB_UNIS_DECODE,
operation_type_,
tmp_res_content);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ob_write_string(*allocator_, tmp_res_content, res_content_))) {
LOG_WARN("fail to copy string", K(ret));
}
}
return ret;
}

View File

@ -307,7 +307,10 @@ class ObTableDirectLoadResult
{
OB_UNIS_VERSION(1);
public:
ObTableDirectLoadResult() : operation_type_(ObTableDirectLoadOperationType::MAX_TYPE) {}
ObTableDirectLoadResult()
: allocator_(nullptr), operation_type_(ObTableDirectLoadOperationType::MAX_TYPE)
{
}
template <class Res>
int set_res(const Res &res, common::ObIAllocator &allocator)
{
@ -342,6 +345,7 @@ public:
}
TO_STRING_KV(K_(operation_type), "res_content", common::ObHexStringWrap(res_content_));
public:
common::ObIAllocator *allocator_; // for deserialize
ObTableDirectLoadOperationType operation_type_;
ObString res_content_;
};