From 5ac2af6588627f88ecbd525597ef41776421c545 Mon Sep 17 00:00:00 2001 From: coolfishchen Date: Thu, 13 Jul 2023 10:24:18 +0000 Subject: [PATCH] implement ob_table_load_shared_allocator with ob_table_load_handle --- .../ob_table_direct_load_rpc_executor.cpp | 2 +- .../ob_table_load_control_rpc_executor.cpp | 2 +- .../ob_table_load_trans_bucket_writer.cpp | 2 +- src/share/CMakeLists.txt | 1 - src/share/table/ob_table_load_handle.h | 57 ++++---- src/share/table/ob_table_load_row.h | 4 +- .../table/ob_table_load_shared_allocator.cpp | 126 ------------------ .../table/ob_table_load_shared_allocator.h | 39 +----- .../engine/cmd/ob_load_data_direct_impl.cpp | 2 +- 9 files changed, 43 insertions(+), 192 deletions(-) delete mode 100644 src/share/table/ob_table_load_shared_allocator.cpp diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp index e889f91a2e..5673f638a9 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp @@ -441,7 +441,7 @@ int ObTableDirectLoadInsertExecutor::decode_payload(const ObString &payload, LOG_WARN("invalid args", KR(ret), K(payload)); } else { ObTableLoadSharedAllocatorHandle allocator_handle = - ObTableLoadSharedAllocatorHandle::make_handle(); + ObTableLoadSharedAllocatorHandle::make_handle("TLD_share_alloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); const int64_t data_len = payload.length(); char *buf = nullptr; int64_t pos = 0; diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp index 50198da636..9610482aeb 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp @@ -598,7 +598,7 @@ int ObDirectLoadControlInsertTransExecutor::process() ObTableLoadTableCtx *table_ctx = nullptr; ObTableLoadUniqueKey key(arg_.table_id_, arg_.task_id_); ObTableLoadSharedAllocatorHandle allocator_handle = - ObTableLoadSharedAllocatorHandle::make_handle(); + ObTableLoadSharedAllocatorHandle::make_handle("TLD_share_alloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); int64_t data_len = arg_.payload_.length(); char *buf = nullptr; if (OB_FAIL(ObTableLoadService::get_ctx(key, table_ctx))) { diff --git a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp index 8e183ee980..15476c408a 100644 --- a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp +++ b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp @@ -240,7 +240,7 @@ int ObTableLoadTransBucketWriter::handle_partition_with_autoinc_identity( } else if (OB_FAIL(storage_datum.to_obj_enhance(obj_row.cells_[obj_index], column_schema->get_meta_type()))) { LOG_WARN("fail to obj enhance", KR(ret), K(obj_row.cells_[obj_index])); - } else if (OB_FAIL(ob_write_obj(obj_row.get_allocator_handler()->get_allocator(), obj_row.cells_[obj_index], + } else if (OB_FAIL(ob_write_obj(*(obj_row.get_allocator_handler()), obj_row.cells_[obj_index], obj_row.cells_[obj_index]))) { LOG_WARN("fail to deep copy obj", KR(ret), K(obj_row.cells_[obj_index])); } diff --git a/src/share/CMakeLists.txt b/src/share/CMakeLists.txt index bb5d40b479..01f370428e 100755 --- a/src/share/CMakeLists.txt +++ b/src/share/CMakeLists.txt @@ -236,7 +236,6 @@ ob_set_subtarget(ob_share common_mixed table/ob_table.cpp table/ob_table_rpc_struct.cpp table/ob_table_load_define.cpp - table/ob_table_load_shared_allocator.cpp table/ob_table_load_row.cpp transfer/ob_transfer_info.cpp transfer/ob_transfer_task_operator.cpp diff --git a/src/share/table/ob_table_load_handle.h b/src/share/table/ob_table_load_handle.h index 5ea1e80092..1997ac9ec3 100644 --- a/src/share/table/ob_table_load_handle.h +++ b/src/share/table/ob_table_load_handle.h @@ -13,30 +13,26 @@ namespace oceanbase namespace table { -template +template class ObTableLoadHandle { class Object { public: + template Object(Args... args) : ref_count_(0), object_(args...) {} public: - int32_t ref_count_; + int64_t ref_count_; T object_; }; public: ObTableLoadHandle() : ptr_(nullptr) {} virtual ~ObTableLoadHandle() { - int32_t ref_count = ATOMIC_AAF(&(ptr_->ref_count_), -1); - if (ref_count == 0) { - if (ptr_ != nullptr) { - ptr_->~Object(); - ob_free(ptr_); - } - } + reset(); } + template static ObTableLoadHandle make_handle(Args... args) { ObMemAttr attr(MTL_ID(), "TLD_Handle"); @@ -46,35 +42,50 @@ public: return handle; } - ObTableLoadHandle(ObTableLoadHandle &other) { - ptr_ = other.ptr_; - ATOMIC_AAF(&(ptr_->ref_count_), 1); + ObTableLoadHandle(const ObTableLoadHandle &other) : ptr_(nullptr) { + *this = other; } - ObTableLoadHandle(ObTableLoadHandle &&other) { - ptr_ = other.ptr_; - other.ptr_ = nullptr; + ObTableLoadHandle(ObTableLoadHandle &&other) : ptr_(nullptr) { + if (this != &other) { + reset(); + ptr_ = other.ptr_; + other.ptr_ = nullptr; + } } - void operator= (ObTableLoadHandle &other) { - ptr_ = other.ptr_; - ATOMIC_AAF(&(ptr_->ref_count_), 1); + void operator = (const ObTableLoadHandle &other) { + if (this != &other) { + reset(); + ptr_ = other.ptr_; + if (ptr_ != nullptr) { + ATOMIC_AAF(&(ptr_->ref_count_), 1); + } + } } - operator bool() { + operator bool() const { return ptr_ != nullptr; } - T *operator->() { + T *operator->() const { return &(ptr_->object_); } - T &operator*() { + T &operator*() const { return ptr_->object_; } -private: - DISALLOW_COPY_AND_ASSIGN(ObTableLoadHandle); + void reset() { + if (ptr_ != nullptr) { + int64_t ref_count = ATOMIC_AAF(&(ptr_->ref_count_), -1); + if (ref_count == 0) { + ptr_->~Object(); + ob_free(ptr_); + } + ptr_ = nullptr; + } + } private: // data members diff --git a/src/share/table/ob_table_load_row.h b/src/share/table/ob_table_load_row.h index 3b54909d71..3420b98683 100644 --- a/src/share/table/ob_table_load_row.h +++ b/src/share/table/ob_table_load_row.h @@ -114,7 +114,7 @@ int ObTableLoadRow::deep_copy_and_assign(const T *row, int64_t count, } for (int64_t i = 0; OB_SUCC(ret) && i < count; i ++) { if (OB_FAIL(observer::ObTableLoadUtils::deep_copy(row[i], - cells[i], allocator_handle->get_allocator()))) { + cells[i], *allocator_handle))) { OB_LOG(WARN, "fail to deep copy object", KR(ret)); } } @@ -185,4 +185,4 @@ public: }; } // namespace table -} // namespace oceanbase \ No newline at end of file +} // namespace oceanbase diff --git a/src/share/table/ob_table_load_shared_allocator.cpp b/src/share/table/ob_table_load_shared_allocator.cpp deleted file mode 100644 index 0c2c5bb0c9..0000000000 --- a/src/share/table/ob_table_load_shared_allocator.cpp +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved. -// Author: -// yuya.yu <> - -#define USING_LOG_PREFIX SERVER - -#include "share/table/ob_table_load_shared_allocator.h" -#include "share/ob_errno.h" -#include "share/rc/ob_tenant_base.h" - -namespace oceanbase -{ -namespace table -{ -using namespace common; - -ObTableLoadSharedAllocator::ObTableLoadSharedAllocator() - : allocator_("TLD_share_alloc"), - ref_count_(0) -{ - allocator_.set_tenant_id(MTL_ID()); -} - -ObTableLoadSharedAllocator::~ObTableLoadSharedAllocator() -{ - OB_ASSERT(0 == get_ref_count()); -} - -void *ObTableLoadSharedAllocator::alloc(const int64_t size) -{ - return allocator_.alloc(size); -} - -void ObTableLoadSharedAllocator::free(void *ptr) -{ - allocator_.free(ptr); -} - -ObTableLoadSharedAllocatorHandle::ObTableLoadSharedAllocatorHandle( - ObTableLoadSharedAllocator *allocator) - : allocator_(allocator) -{ - if (OB_NOT_NULL(allocator_)) { - allocator_->inc_ref_count(); - } -} - -ObTableLoadSharedAllocatorHandle::ObTableLoadSharedAllocatorHandle( - const ObTableLoadSharedAllocatorHandle &other) -{ - allocator_ = other.allocator_; - if (OB_NOT_NULL(allocator_)) { - allocator_->inc_ref_count(); - } -} - -ObTableLoadSharedAllocatorHandle::~ObTableLoadSharedAllocatorHandle() -{ - if (OB_NOT_NULL(allocator_)) { - if (allocator_->dec_ref_count() == 0) { - allocator_->~ObTableLoadSharedAllocator(); - ob_free(allocator_); - } - allocator_ = nullptr; - } -} - -ObTableLoadSharedAllocatorHandle &ObTableLoadSharedAllocatorHandle::operator=( - const ObTableLoadSharedAllocatorHandle &other) -{ - if (other.allocator_ != allocator_) { - allocator_ = other.allocator_; - if (OB_NOT_NULL(allocator_)) { - allocator_->inc_ref_count(); - } - } - return *this; -} - -ObTableLoadSharedAllocator *ObTableLoadSharedAllocatorHandle::operator->() -{ - return allocator_; -} - -ObTableLoadSharedAllocator *ObTableLoadSharedAllocatorHandle::operator->() const -{ - return allocator_; -} - -ObTableLoadSharedAllocator &ObTableLoadSharedAllocatorHandle::operator*() -{ - return *allocator_; -} - -ObTableLoadSharedAllocatorHandle::operator bool () const -{ - return OB_NOT_NULL(allocator_); -} - -ObTableLoadSharedAllocatorHandle ObTableLoadSharedAllocatorHandle::make_handle() -{ - int ret = OB_SUCCESS; - ObTableLoadSharedAllocator *shared_allocator = (ObTableLoadSharedAllocator *)ob_malloc( - sizeof(ObTableLoadSharedAllocator), ObMemAttr(MTL_ID(), "TLD_share_alloc")); - if (OB_ISNULL(shared_allocator)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate memory", KR(ret)); - } else { - new (shared_allocator) ObTableLoadSharedAllocator; - } - return ObTableLoadSharedAllocatorHandle(shared_allocator); -} - -void ObTableLoadSharedAllocatorHandle::reset() -{ - if (OB_NOT_NULL(allocator_)) { - if (allocator_->dec_ref_count() == 0) { - allocator_->~ObTableLoadSharedAllocator(); - ob_free(allocator_); - } - allocator_ = nullptr; - } -} - -} // namespace table -} // namespace oceanbase diff --git a/src/share/table/ob_table_load_shared_allocator.h b/src/share/table/ob_table_load_shared_allocator.h index 9b2f234d29..5bcf3a7313 100644 --- a/src/share/table/ob_table_load_shared_allocator.h +++ b/src/share/table/ob_table_load_shared_allocator.h @@ -5,47 +5,14 @@ #pragma once #include "lib/allocator/page_arena.h" +#include "share/table/ob_table_load_handle.h" namespace oceanbase { namespace table { -class ObTableLoadSharedAllocator -{ -public: - ObTableLoadSharedAllocator(); - ~ObTableLoadSharedAllocator(); - void *alloc(const int64_t size); - void free(void *ptr); - int64_t get_ref_count() const { return ATOMIC_LOAD(&ref_count_); } - int64_t inc_ref_count() { return ATOMIC_AAF(&ref_count_, 1); } - int64_t dec_ref_count() { return ATOMIC_AAF(&ref_count_, -1); } - common::ObArenaAllocator &get_allocator() { return allocator_; } +typedef ObTableLoadHandle ObTableLoadSharedAllocatorHandle; -private: - common::ObArenaAllocator allocator_; - int64_t ref_count_; -}; - -class ObTableLoadSharedAllocatorHandle -{ -public: - ObTableLoadSharedAllocatorHandle() : allocator_(nullptr) {} - ObTableLoadSharedAllocatorHandle(ObTableLoadSharedAllocator *allocator); - ObTableLoadSharedAllocatorHandle(const ObTableLoadSharedAllocatorHandle &other); - ~ObTableLoadSharedAllocatorHandle(); - - ObTableLoadSharedAllocatorHandle &operator=(const ObTableLoadSharedAllocatorHandle &other); - ObTableLoadSharedAllocator *operator->(); - ObTableLoadSharedAllocator *operator->() const; - ObTableLoadSharedAllocator &operator*(); - operator bool () const; - static ObTableLoadSharedAllocatorHandle make_handle(); - void reset(); - -private: - ObTableLoadSharedAllocator *allocator_; -}; } // namespace table -} // namespace oceanbase \ No newline at end of file +} // namespace oceanbase diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index fb9da7f5e4..9b86e2eb4d 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -1319,7 +1319,7 @@ int ObLoadDataDirectImpl::FileLoadExecutor::process_task_handle(TaskHandle *hand while (OB_SUCC(ret) && !is_iter_end) { // 每个新的batch需要分配一个新的shared_allocator ObTableLoadSharedAllocatorHandle allocator_handle = - ObTableLoadSharedAllocatorHandle::make_handle(); + ObTableLoadSharedAllocatorHandle::make_handle("TLD_share_alloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); if (!allocator_handle) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to make allocator handle", KR(ret));