Table direct load support obproxy

This commit is contained in:
suz-yang 2023-09-01 04:40:32 +00:00 committed by ob-robot
parent e344e593fd
commit 9a3e84dc86
9 changed files with 74 additions and 27 deletions

View File

@ -31,7 +31,7 @@ ObTableDirectLoadP::ObTableDirectLoadP(const ObGlobalContext &gctx)
int ObTableDirectLoadP::check_arg()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(arg_.operation_type_ == ObTableDirectLoadOperationType::MAX_TYPE ||
if (OB_UNLIKELY(arg_.header_.operation_type_ == ObTableDirectLoadOperationType::MAX_TYPE ||
arg_.arg_content_.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(arg_));
@ -60,7 +60,7 @@ ObTableAPITransCb *ObTableDirectLoadP::new_callback(rpc::ObRequest *req)
uint64_t ObTableDirectLoadP::get_request_checksum()
{
uint64_t checksum = 0;
checksum = ob_crc64(checksum, &result_.operation_type_, sizeof(result_.operation_type_));
checksum = ob_crc64(checksum, &result_.header_, sizeof(result_.header_));
checksum = ob_crc64(checksum, result_.res_content_.ptr(), result_.res_content_.length());
return checksum;
}

View File

@ -13,6 +13,7 @@
#define USING_LOG_PREFIX SERVER
#include "ob_table_direct_load_rpc_executor.h"
#include "observer/ob_server.h"
#include "observer/omt/ob_multi_tenant.h"
#include "observer/omt/ob_tenant.h"
#include "observer/table_load/ob_table_load_client_service.h"
@ -27,6 +28,7 @@ namespace oceanbase
{
namespace observer
{
using namespace observer;
using namespace omt;
using namespace share::schema;
using namespace sql;
@ -64,6 +66,16 @@ int ObTableDirectLoadBeginExecutor::check_args()
return ret;
}
int ObTableDirectLoadBeginExecutor::set_result_header()
{
int ret = OB_SUCCESS;
ret = ParentType::set_result_header();
if (OB_SUCC(ret)) {
this->result_.header_.addr_ = ObServer::get_instance().get_self();
}
return ret;
}
int ObTableDirectLoadBeginExecutor::process()
{
int ret = OB_SUCCESS;
@ -483,7 +495,8 @@ int ObTableDirectLoadInsertExecutor::decode_payload(const ObString &payload,
LOG_WARN("invalid args", KR(ret), K(payload));
} else {
ObTableLoadSharedAllocatorHandle allocator_handle =
ObTableLoadSharedAllocatorHandle::make_handle("TLD_share_alloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
ObTableLoadSharedAllocatorHandle::make_handle("TLD_share_alloc", OB_MALLOC_NORMAL_BLOCK_SIZE,
MTL_ID());
const int64_t data_len = payload.length();
char *buf = nullptr;
int64_t pos = 0;
@ -522,7 +535,7 @@ int ObTableDirectLoadInsertExecutor::set_batch_seq_no(int64_t batch_id,
return ret;
}
//heart_beat
// heart_beat
int ObTableDirectLoadHeartBeatExecutor::check_args()
{
int ret = OB_SUCCESS;

View File

@ -40,11 +40,12 @@ public:
protected:
int deserialize() override { return this->request_.get_arg(this->arg_); }
int serialize() override
int set_result_header() override
{
this->result_.operation_type_ = pcode;
return this->result_.set_res(this->res_, ctx_.get_allocator());
this->result_.header_.operation_type_ = pcode;
return OB_SUCCESS;
}
int serialize() override { return this->result_.set_res(this->res_, ctx_.get_allocator()); }
protected:
ObTableDirectLoadExecContext &ctx_;
@ -64,6 +65,7 @@ public:
protected:
int check_args() override;
int set_result_header() override;
int process() override;
private:

View File

@ -31,7 +31,7 @@ int ObTableDirectLoadRpcProxy::dispatch(ObTableDirectLoadExecContext &ctx,
break;
int ret = OB_SUCCESS;
switch (request.operation_type_) {
switch (request.header_.operation_type_) {
OB_TABLE_DIRECT_LOAD_RPC_DISPATCH(ObTableDirectLoadOperationType::BEGIN);
OB_TABLE_DIRECT_LOAD_RPC_DISPATCH(ObTableDirectLoadOperationType::COMMIT);
OB_TABLE_DIRECT_LOAD_RPC_DISPATCH(ObTableDirectLoadOperationType::ABORT);

View File

@ -43,8 +43,8 @@ public:
int ret = OB_SUCCESS; \
table::ObTableDirectLoadRequest request; \
table::ObTableDirectLoadResult result; \
request.header_.operation_type_ = pcode; \
request.credential_ = credential_; \
request.operation_type_ = pcode; \
result.allocator_ = &allocator_; \
if (OB_FAIL(request.set_arg(arg, allocator_))) { \
SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \
@ -53,7 +53,7 @@ public:
.by(tenant_id_) \
.direct_load(request, result))) { \
SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \
} else if (OB_UNLIKELY(result.operation_type_ != pcode)) { \
} else if (OB_UNLIKELY(result.header_.operation_type_ != pcode)) { \
ret = OB_ERR_UNEXPECTED; \
SERVER_LOG(WARN, "unexpected operation type", K(ret), K(request), K(result)); \
} else if (OB_UNLIKELY(!result.res_content_.empty())) { \
@ -69,8 +69,8 @@ public:
int ret = OB_SUCCESS; \
table::ObTableDirectLoadRequest request; \
table::ObTableDirectLoadResult result; \
request.header_.operation_type_ = pcode; \
request.credential_ = credential_; \
request.operation_type_ = pcode; \
result.allocator_ = &allocator_; \
if (OB_FAIL(request.set_arg(arg, allocator_))) { \
SERVER_LOG(WARN, "fail to set arg", K(ret), K(arg)); \
@ -79,7 +79,7 @@ public:
.by(tenant_id_) \
.direct_load(request, result))) { \
SERVER_LOG(WARN, "fail to rpc call direct load", K(ret), K_(addr), K(request)); \
} else if (OB_UNLIKELY(result.operation_type_ != pcode)) { \
} else if (OB_UNLIKELY(result.header_.operation_type_ != pcode)) { \
ret = OB_ERR_UNEXPECTED; \
SERVER_LOG(WARN, "unexpected operation type", K(ret), K(request), K(result)); \
} else if (OB_FAIL(result.get_res(res))) { \

View File

@ -40,11 +40,12 @@ public:
protected:
int deserialize() override { return this->request_.get_arg(this->arg_); }
int serialize() override
int set_result_header() override
{
this->result_.command_type_ = pcode;
return this->result_.set_res(this->res_, allocator_);
return OB_SUCCESS;
}
int serialize() override { return this->result_.set_res(this->res_, allocator_); }
protected:
common::ObIAllocator &allocator_;

View File

@ -107,6 +107,8 @@ public:
SERVER_LOG(WARN, "fail to check args", K(ret));
} else if (OB_FAIL(process())) {
SERVER_LOG(WARN, "fail to process", K(ret));
} else if (OB_FAIL(set_result_header())) {
SERVER_LOG(WARN, "fail to set result header", K(ret));
} else if (OB_FAIL(serialize())) {
SERVER_LOG(WARN, "fail to do serialize", K(ret));
}
@ -118,6 +120,7 @@ protected:
virtual int deserialize() = 0;
virtual int check_args() = 0;
virtual int process() = 0;
virtual int set_result_header() = 0;
// serialize res to result
virtual int serialize() = 0;

View File

@ -92,17 +92,25 @@ OB_SERIALIZE_MEMBER((ObTableQuerySyncRequest, ObTableQueryRequest),
query_type_
);
////////////////////////////////////////////////////////////////
OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadRequestHeader,
addr_,
operation_type_);
OB_SERIALIZE_MEMBER(ObTableDirectLoadRequest,
header_,
credential_,
operation_type_,
arg_content_);
OB_SERIALIZE_MEMBER_SIMPLE(ObTableDirectLoadResultHeader,
addr_,
operation_type_);
OB_UNIS_DEF_SERIALIZE(ObTableDirectLoadResult,
operation_type_,
header_,
res_content_);
OB_UNIS_DEF_SERIALIZE_SIZE(ObTableDirectLoadResult,
operation_type_,
header_,
res_content_);
OB_DEF_DESERIALIZE(ObTableDirectLoadResult)
@ -114,7 +122,7 @@ OB_DEF_DESERIALIZE(ObTableDirectLoadResult)
} else {
ObString tmp_res_content;
LST_DO_CODE(OB_UNIS_DECODE,
operation_type_,
header_,
tmp_res_content);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ob_write_string(*allocator_, tmp_res_content, res_content_))) {

View File

@ -260,11 +260,22 @@ public:
ObQueryOperationType query_type_;
};
struct ObTableDirectLoadRequestHeader
{
OB_UNIS_VERSION(1);
public:
ObTableDirectLoadRequestHeader() : operation_type_(ObTableDirectLoadOperationType::MAX_TYPE) {}
TO_STRING_KV(K_(addr), K_(operation_type));
public:
ObAddr addr_;
ObTableDirectLoadOperationType operation_type_;
};
class ObTableDirectLoadRequest
{
OB_UNIS_VERSION(1);
public:
ObTableDirectLoadRequest() : operation_type_(ObTableDirectLoadOperationType::MAX_TYPE) {}
ObTableDirectLoadRequest() {}
template <class Arg>
int set_arg(const Arg &arg, common::ObIAllocator &allocator)
{
@ -295,22 +306,31 @@ public:
}
return ret;
}
TO_STRING_KV("credential", common::ObHexStringWrap(credential_), K_(operation_type),
TO_STRING_KV(K_(header),
"credential", common::ObHexStringWrap(credential_),
"arg_content", common::ObHexStringWrap(arg_content_));
public:
ObTableDirectLoadRequestHeader header_;
ObString credential_;
ObTableDirectLoadOperationType operation_type_;
ObString arg_content_;
};
struct ObTableDirectLoadResultHeader
{
OB_UNIS_VERSION(1);
public:
ObTableDirectLoadResultHeader() : operation_type_(ObTableDirectLoadOperationType::MAX_TYPE) {}
TO_STRING_KV(K_(addr), K_(operation_type));
public:
ObAddr addr_;
ObTableDirectLoadOperationType operation_type_;
};
class ObTableDirectLoadResult
{
OB_UNIS_VERSION(1);
public:
ObTableDirectLoadResult()
: allocator_(nullptr), operation_type_(ObTableDirectLoadOperationType::MAX_TYPE)
{
}
ObTableDirectLoadResult() : allocator_(nullptr) {}
template <class Res>
int set_res(const Res &res, common::ObIAllocator &allocator)
{
@ -343,10 +363,10 @@ public:
}
return ret;
}
TO_STRING_KV(K_(operation_type), "res_content", common::ObHexStringWrap(res_content_));
TO_STRING_KV(K_(header), "res_content", common::ObHexStringWrap(res_content_));
public:
common::ObIAllocator *allocator_; // for deserialize
ObTableDirectLoadOperationType operation_type_;
ObTableDirectLoadResultHeader header_;
ObString res_content_;
};