implement ob_table_load_shared_allocator with ob_table_load_handle

This commit is contained in:
coolfishchen
2023-07-13 10:24:18 +00:00
committed by ob-robot
parent 0182ddf347
commit 5ac2af6588
9 changed files with 43 additions and 192 deletions

View File

@ -441,7 +441,7 @@ int ObTableDirectLoadInsertExecutor::decode_payload(const ObString &payload,
LOG_WARN("invalid args", KR(ret), K(payload));
} else {
ObTableLoadSharedAllocatorHandle allocator_handle =
ObTableLoadSharedAllocatorHandle::make_handle();
ObTableLoadSharedAllocatorHandle::make_handle("TLD_share_alloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
const int64_t data_len = payload.length();
char *buf = nullptr;
int64_t pos = 0;

View File

@ -598,7 +598,7 @@ int ObDirectLoadControlInsertTransExecutor::process()
ObTableLoadTableCtx *table_ctx = nullptr;
ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_);
ObTableLoadSharedAllocatorHandle allocator_handle =
ObTableLoadSharedAllocatorHandle::make_handle();
ObTableLoadSharedAllocatorHandle::make_handle("TLD_share_alloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
int64_t data_len = arg_.payload_.length();
char *buf = nullptr;
if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) {

View File

@ -240,7 +240,7 @@ int ObTableLoadTransBucketWriter::handle_partition_with_autoinc_identity(
} else if (OB_FAIL(storage_datum.to_obj_enhance(obj_row.cells_[obj_index],
column_schema->get_meta_type()))) {
LOG_WARN("fail to obj enhance", KR(ret), K(obj_row.cells_[obj_index]));
} else if (OB_FAIL(ob_write_obj(obj_row.get_allocator_handler()->get_allocator(), obj_row.cells_[obj_index],
} else if (OB_FAIL(ob_write_obj(*(obj_row.get_allocator_handler()), obj_row.cells_[obj_index],
obj_row.cells_[obj_index]))) {
LOG_WARN("fail to deep copy obj", KR(ret), K(obj_row.cells_[obj_index]));
}

View File

@ -236,7 +236,6 @@ ob_set_subtarget(ob_share common_mixed
table/ob_table.cpp
table/ob_table_rpc_struct.cpp
table/ob_table_load_define.cpp
table/ob_table_load_shared_allocator.cpp
table/ob_table_load_row.cpp
transfer/ob_transfer_info.cpp
transfer/ob_transfer_task_operator.cpp

View File

@ -13,30 +13,26 @@ namespace oceanbase
namespace table
{
template<class T, class... Args>
template<class T>
class ObTableLoadHandle
{
class Object
{
public:
template<class... Args>
Object(Args... args) : ref_count_(0), object_(args...) {}
public:
int32_t ref_count_;
int64_t ref_count_;
T object_;
};
public:
ObTableLoadHandle() : ptr_(nullptr) {}
virtual ~ObTableLoadHandle() {
int32_t ref_count = ATOMIC_AAF(&(ptr_->ref_count_), -1);
if (ref_count == 0) {
if (ptr_ != nullptr) {
ptr_->~Object();
ob_free(ptr_);
}
}
reset();
}
template<class... Args >
static ObTableLoadHandle make_handle(Args... args)
{
ObMemAttr attr(MTL_ID(), "TLD_Handle");
@ -46,35 +42,50 @@ public:
return handle;
}
ObTableLoadHandle(ObTableLoadHandle &other) {
ptr_ = other.ptr_;
ATOMIC_AAF(&(ptr_->ref_count_), 1);
ObTableLoadHandle(const ObTableLoadHandle &other) : ptr_(nullptr) {
*this = other;
}
ObTableLoadHandle(ObTableLoadHandle &&other) {
ptr_ = other.ptr_;
other.ptr_ = nullptr;
ObTableLoadHandle(ObTableLoadHandle &&other) : ptr_(nullptr) {
if (this != &other) {
reset();
ptr_ = other.ptr_;
other.ptr_ = nullptr;
}
}
void operator= (ObTableLoadHandle &other) {
ptr_ = other.ptr_;
ATOMIC_AAF(&(ptr_->ref_count_), 1);
void operator = (const ObTableLoadHandle &other) {
if (this != &other) {
reset();
ptr_ = other.ptr_;
if (ptr_ != nullptr) {
ATOMIC_AAF(&(ptr_->ref_count_), 1);
}
}
}
operator bool() {
operator bool() const {
return ptr_ != nullptr;
}
T *operator->() {
T *operator->() const {
return &(ptr_->object_);
}
T &operator*() {
T &operator*() const {
return ptr_->object_;
}
private:
DISALLOW_COPY_AND_ASSIGN(ObTableLoadHandle);
void reset() {
if (ptr_ != nullptr) {
int64_t ref_count = ATOMIC_AAF(&(ptr_->ref_count_), -1);
if (ref_count == 0) {
ptr_->~Object();
ob_free(ptr_);
}
ptr_ = nullptr;
}
}
private:
// data members

View File

@ -114,7 +114,7 @@ int ObTableLoadRow<T>::deep_copy_and_assign(const T *row, int64_t count,
}
for (int64_t i = 0; OB_SUCC(ret) && i < count; i ++) {
if (OB_FAIL(observer::ObTableLoadUtils::deep_copy(row[i],
cells[i], allocator_handle->get_allocator()))) {
cells[i], *allocator_handle))) {
OB_LOG(WARN, "fail to deep copy object", KR(ret));
}
}
@ -185,4 +185,4 @@ public:
};
} // namespace table
} // namespace oceanbase
} // namespace oceanbase

View File

@ -1,126 +0,0 @@
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
// yuya.yu <>
#define USING_LOG_PREFIX SERVER
#include "share/table/ob_table_load_shared_allocator.h"
#include "share/ob_errno.h"
#include "share/rc/ob_tenant_base.h"
namespace oceanbase
{
namespace table
{
using namespace common;
ObTableLoadSharedAllocator::ObTableLoadSharedAllocator()
: allocator_("TLD_share_alloc"),
ref_count_(0)
{
allocator_.set_tenant_id(MTL_ID());
}
ObTableLoadSharedAllocator::~ObTableLoadSharedAllocator()
{
OB_ASSERT(0 == get_ref_count());
}
void *ObTableLoadSharedAllocator::alloc(const int64_t size)
{
return allocator_.alloc(size);
}
void ObTableLoadSharedAllocator::free(void *ptr)
{
allocator_.free(ptr);
}
ObTableLoadSharedAllocatorHandle::ObTableLoadSharedAllocatorHandle(
ObTableLoadSharedAllocator *allocator)
: allocator_(allocator)
{
if (OB_NOT_NULL(allocator_)) {
allocator_->inc_ref_count();
}
}
ObTableLoadSharedAllocatorHandle::ObTableLoadSharedAllocatorHandle(
const ObTableLoadSharedAllocatorHandle &other)
{
allocator_ = other.allocator_;
if (OB_NOT_NULL(allocator_)) {
allocator_->inc_ref_count();
}
}
ObTableLoadSharedAllocatorHandle::~ObTableLoadSharedAllocatorHandle()
{
if (OB_NOT_NULL(allocator_)) {
if (allocator_->dec_ref_count() == 0) {
allocator_->~ObTableLoadSharedAllocator();
ob_free(allocator_);
}
allocator_ = nullptr;
}
}
ObTableLoadSharedAllocatorHandle &ObTableLoadSharedAllocatorHandle::operator=(
const ObTableLoadSharedAllocatorHandle &other)
{
if (other.allocator_ != allocator_) {
allocator_ = other.allocator_;
if (OB_NOT_NULL(allocator_)) {
allocator_->inc_ref_count();
}
}
return *this;
}
ObTableLoadSharedAllocator *ObTableLoadSharedAllocatorHandle::operator->()
{
return allocator_;
}
ObTableLoadSharedAllocator *ObTableLoadSharedAllocatorHandle::operator->() const
{
return allocator_;
}
ObTableLoadSharedAllocator &ObTableLoadSharedAllocatorHandle::operator*()
{
return *allocator_;
}
ObTableLoadSharedAllocatorHandle::operator bool () const
{
return OB_NOT_NULL(allocator_);
}
ObTableLoadSharedAllocatorHandle ObTableLoadSharedAllocatorHandle::make_handle()
{
int ret = OB_SUCCESS;
ObTableLoadSharedAllocator *shared_allocator = (ObTableLoadSharedAllocator *)ob_malloc(
sizeof(ObTableLoadSharedAllocator), ObMemAttr(MTL_ID(), "TLD_share_alloc"));
if (OB_ISNULL(shared_allocator)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", KR(ret));
} else {
new (shared_allocator) ObTableLoadSharedAllocator;
}
return ObTableLoadSharedAllocatorHandle(shared_allocator);
}
void ObTableLoadSharedAllocatorHandle::reset()
{
if (OB_NOT_NULL(allocator_)) {
if (allocator_->dec_ref_count() == 0) {
allocator_->~ObTableLoadSharedAllocator();
ob_free(allocator_);
}
allocator_ = nullptr;
}
}
} // namespace table
} // namespace oceanbase

View File

@ -5,47 +5,14 @@
#pragma once
#include "lib/allocator/page_arena.h"
#include "share/table/ob_table_load_handle.h"
namespace oceanbase
{
namespace table
{
class ObTableLoadSharedAllocator
{
public:
ObTableLoadSharedAllocator();
~ObTableLoadSharedAllocator();
void *alloc(const int64_t size);
void free(void *ptr);
int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); }
int64_t inc_ref_count() { return ATOMIC_AAF(&ref_count_, 1); }
int64_t dec_ref_count() { return ATOMIC_AAF(&ref_count_, -1); }
common::ObArenaAllocator &get_allocator() { return allocator_; }
typedef ObTableLoadHandle<common::ObArenaAllocator> ObTableLoadSharedAllocatorHandle;
private:
common::ObArenaAllocator allocator_;
int64_t ref_count_;
};
class ObTableLoadSharedAllocatorHandle
{
public:
ObTableLoadSharedAllocatorHandle() : allocator_(nullptr) {}
ObTableLoadSharedAllocatorHandle(ObTableLoadSharedAllocator *allocator);
ObTableLoadSharedAllocatorHandle(const ObTableLoadSharedAllocatorHandle &other);
~ObTableLoadSharedAllocatorHandle();
ObTableLoadSharedAllocatorHandle &operator=(const ObTableLoadSharedAllocatorHandle &other);
ObTableLoadSharedAllocator *operator->();
ObTableLoadSharedAllocator *operator->() const;
ObTableLoadSharedAllocator &operator*();
operator bool () const;
static ObTableLoadSharedAllocatorHandle make_handle();
void reset();
private:
ObTableLoadSharedAllocator *allocator_;
};
} // namespace table
} // namespace oceanbase
} // namespace oceanbase

View File

@ -1319,7 +1319,7 @@ int ObLoadDataDirectImpl::FileLoadExecutor::process_task_handle(TaskHandle *hand
while (OB_SUCC(ret) && !is_iter_end) {
// 每个新的batch需要分配一个新的shared_allocator
ObTableLoadSharedAllocatorHandle allocator_handle =
ObTableLoadSharedAllocatorHandle::make_handle();
ObTableLoadSharedAllocatorHandle::make_handle("TLD_share_alloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
if (!allocator_handle) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to make allocator handle", KR(ret));