[FEAT MERGE] resource manage patch to direct_load_res_patch431
Co-authored-by: coolfishchen <coolfishchen@gmail.com>
This commit is contained in:
@ -15,6 +15,7 @@
|
||||
#include "storage/direct_load/ob_direct_load_external_block_reader.h"
|
||||
#include "storage/direct_load/ob_direct_load_external_table.h"
|
||||
#include "storage/direct_load/ob_direct_load_mem_sample.h"
|
||||
#include "observer/table_load/ob_table_load_service.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -22,6 +23,7 @@ namespace storage
|
||||
{
|
||||
using namespace common;
|
||||
using namespace blocksstable;
|
||||
using namespace observer;
|
||||
|
||||
/**
|
||||
* ObDirectLoadMemLoader
|
||||
@ -102,8 +104,18 @@ int ObDirectLoadMemLoader::work()
|
||||
LOG_WARN("fail to allocate mem", KR(ret));
|
||||
} else {
|
||||
ATOMIC_AAF(&(mem_ctx_->fly_mem_chunk_count_), 1);
|
||||
if (OB_FAIL(chunk->init(MTL_ID(), mem_ctx_->table_data_desc_.mem_chunk_size_))) {
|
||||
LOG_WARN("fail to init external sort", KR(ret));
|
||||
int64_t sort_memory = 0;
|
||||
if (mem_ctx_->table_data_desc_.exe_mode_ == observer::ObTableLoadExeMode::MAX_TYPE) {
|
||||
sort_memory = mem_ctx_->table_data_desc_.mem_chunk_size_;
|
||||
} else if (OB_FAIL(ObTableLoadService::get_sort_memory(sort_memory))) {
|
||||
LOG_WARN("fail to get sort memory", KR(ret));
|
||||
} else {
|
||||
sort_memory /= mem_ctx_->table_data_desc_.max_mem_chunk_count_;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(chunk->init(MTL_ID(), sort_memory))) {
|
||||
LOG_WARN("fail to init external sort", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
#include "storage/direct_load/ob_direct_load_mem_sample.h"
|
||||
#include "storage/direct_load/ob_direct_load_multiple_heap_table_map.h"
|
||||
#include "storage/direct_load/ob_direct_load_multiple_heap_table_builder.h"
|
||||
#include "observer/table_load/ob_table_load_service.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -24,6 +25,7 @@ namespace storage
|
||||
{
|
||||
using namespace common;
|
||||
using namespace blocksstable;
|
||||
using namespace observer;
|
||||
|
||||
ObDirectLoadMultipleHeapTableSorter::ObDirectLoadMultipleHeapTableSorter(
|
||||
ObDirectLoadMemContext *mem_ctx)
|
||||
@ -200,12 +202,21 @@ int ObDirectLoadMultipleHeapTableSorter::work()
|
||||
LOG_WARN("some error ocurr", KR(ret));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
chunk = OB_NEW(ChunkType, ObMemAttr(MTL_ID(), "TLD_MemChunkVal"), mem_ctx_->table_data_desc_.heap_table_mem_chunk_size_);
|
||||
if (chunk == nullptr) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to allocate mem", KR(ret));
|
||||
} else if (OB_FAIL(chunk->init())) {
|
||||
LOG_WARN("fail to init external sort", KR(ret));
|
||||
int64_t sort_memory = 0;
|
||||
if (mem_ctx_->table_data_desc_.exe_mode_ == observer::ObTableLoadExeMode::MAX_TYPE) {
|
||||
sort_memory = mem_ctx_->table_data_desc_.heap_table_mem_chunk_size_;
|
||||
} else if (OB_FAIL(ObTableLoadService::get_sort_memory(sort_memory))) {
|
||||
LOG_WARN("fail to get sort memory", KR(ret));
|
||||
} else {
|
||||
sort_memory /= mem_ctx_->table_data_desc_.session_count_;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
chunk = OB_NEW(ChunkType, ObMemAttr(MTL_ID(), "TLD_MemChunkVal"), sort_memory);
|
||||
if (chunk == nullptr) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
} else if (OB_FAIL(chunk->init())) {
|
||||
LOG_WARN("fail to init external sort", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ namespace oceanbase
|
||||
namespace storage
|
||||
{
|
||||
using namespace common;
|
||||
using namespace observer;
|
||||
|
||||
ObDirectLoadTableDataDesc::ObDirectLoadTableDataDesc()
|
||||
: rowkey_column_num_(0),
|
||||
@ -31,7 +32,9 @@ ObDirectLoadTableDataDesc::ObDirectLoadTableDataDesc()
|
||||
mem_chunk_size_(0),
|
||||
max_mem_chunk_count_(0),
|
||||
merge_count_per_round_(0),
|
||||
heap_table_mem_chunk_size_(0)
|
||||
heap_table_mem_chunk_size_(0),
|
||||
session_count_(0),
|
||||
exe_mode_(ObTableLoadExeMode::MAX_TYPE)
|
||||
{
|
||||
}
|
||||
|
||||
@ -53,6 +56,8 @@ void ObDirectLoadTableDataDesc::reset()
|
||||
max_mem_chunk_count_ = 0;
|
||||
merge_count_per_round_ = 0;
|
||||
heap_table_mem_chunk_size_ = 0;
|
||||
session_count_ = 0;
|
||||
exe_mode_ = ObTableLoadExeMode::MAX_TYPE;
|
||||
}
|
||||
|
||||
bool ObDirectLoadTableDataDesc::is_valid() const
|
||||
@ -63,8 +68,8 @@ bool ObDirectLoadTableDataDesc::is_valid() const
|
||||
sstable_index_block_size_ > 0 && sstable_index_block_size_ % DIO_ALIGN_SIZE == 0 &&
|
||||
sstable_data_block_size_ > 0 && sstable_data_block_size_ % DIO_ALIGN_SIZE == 0 &&
|
||||
extra_buf_size_ > 0 && extra_buf_size_ % DIO_ALIGN_SIZE == 0 &&
|
||||
compressor_type_ > ObCompressorType::INVALID_COMPRESSOR && mem_chunk_size_ > 0 &&
|
||||
max_mem_chunk_count_ > 0 && merge_count_per_round_ > 0 && heap_table_mem_chunk_size_ > 0;
|
||||
compressor_type_ > ObCompressorType::INVALID_COMPRESSOR &&
|
||||
max_mem_chunk_count_ > 0 && merge_count_per_round_ > 0;
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
|
||||
@ -13,6 +13,7 @@
|
||||
|
||||
#include "lib/compress/ob_compressor.h"
|
||||
#include "lib/utility/ob_print_utils.h"
|
||||
#include "observer/table_load/ob_table_load_struct.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -30,7 +31,7 @@ public:
|
||||
TO_STRING_KV(K_(rowkey_column_num), K_(column_count), K_(external_data_block_size),
|
||||
K_(sstable_index_block_size), K_(sstable_data_block_size), K_(extra_buf_size),
|
||||
K_(compressor_type), K_(is_heap_table), K_(mem_chunk_size), K_(max_mem_chunk_count),
|
||||
K_(merge_count_per_round), K_(heap_table_mem_chunk_size));
|
||||
K_(merge_count_per_round), K_(heap_table_mem_chunk_size), K_(session_count), K_(exe_mode));
|
||||
public:
|
||||
int64_t rowkey_column_num_;
|
||||
int64_t column_count_;
|
||||
@ -47,6 +48,8 @@ public:
|
||||
|
||||
//heap sort param
|
||||
int64_t heap_table_mem_chunk_size_;
|
||||
int32_t session_count_;
|
||||
observer::ObTableLoadExeMode exe_mode_;
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
|
||||
@ -69,6 +69,7 @@
|
||||
#include "storage/high_availability/ob_rebuild_service.h"
|
||||
#include "observer/table/ttl/ob_ttl_service.h"
|
||||
#include "observer/table/ttl/ob_tenant_tablet_ttl_mgr.h"
|
||||
#include "observer/table_load/resource/ob_table_load_resource_service.h"
|
||||
#include "share/wr/ob_wr_service.h"
|
||||
#include "rootserver/mview/ob_mview_maintenance_service.h"
|
||||
|
||||
@ -968,6 +969,7 @@ int ObLS::register_sys_service()
|
||||
REGISTER_TO_LOGSERVICE(WORKLOAD_REPOSITORY_SERVICE_LOG_BASE_TYPE, GCTX.wr_service_);
|
||||
REGISTER_TO_LOGSERVICE(HEARTBEAT_SERVICE_LOG_BASE_TYPE, MTL(ObHeartbeatService *));
|
||||
REGISTER_TO_LOGSERVICE(MVIEW_MAINTENANCE_SERVICE_LOG_BASE_TYPE, MTL(ObMViewMaintenanceService *));
|
||||
REGISTER_TO_LOGSERVICE(TABLE_LOAD_RESOURCE_SERVICE_LOG_BASE_TYPE, MTL(observer::ObTableLoadResourceService *));
|
||||
}
|
||||
if (is_meta_tenant(tenant_id)) {
|
||||
REGISTER_TO_LOGSERVICE(SNAPSHOT_SCHEDULER_LOG_BASE_TYPE, MTL(ObTenantSnapshotScheduler *));
|
||||
@ -995,6 +997,7 @@ int ObLS::register_user_service()
|
||||
REGISTER_TO_RESTORESERVICE(NET_STANDBY_TNT_SERVICE_LOG_BASE_TYPE, MTL(ObCreateStandbyFromNetActor *));
|
||||
REGISTER_TO_LOGSERVICE(TTL_LOG_BASE_TYPE, MTL(table::ObTTLService *));
|
||||
REGISTER_TO_LOGSERVICE(MVIEW_MAINTENANCE_SERVICE_LOG_BASE_TYPE, MTL(ObMViewMaintenanceService *));
|
||||
REGISTER_TO_LOGSERVICE(TABLE_LOAD_RESOURCE_SERVICE_LOG_BASE_TYPE, MTL(observer::ObTableLoadResourceService *));
|
||||
}
|
||||
|
||||
if (ls_id.is_user_ls()) {
|
||||
@ -1092,6 +1095,7 @@ void ObLS::unregister_sys_service_()
|
||||
ObHeartbeatService * heartbeat_service = MTL(ObHeartbeatService*);
|
||||
UNREGISTER_FROM_LOGSERVICE(HEARTBEAT_SERVICE_LOG_BASE_TYPE, heartbeat_service);
|
||||
UNREGISTER_FROM_LOGSERVICE(MVIEW_MAINTENANCE_SERVICE_LOG_BASE_TYPE, MTL(ObMViewMaintenanceService *));
|
||||
UNREGISTER_FROM_LOGSERVICE(TABLE_LOAD_RESOURCE_SERVICE_LOG_BASE_TYPE, MTL(observer::ObTableLoadResourceService *));
|
||||
}
|
||||
if (is_meta_tenant(MTL_ID())) {
|
||||
ObTenantSnapshotScheduler * snapshot_scheduler = MTL(ObTenantSnapshotScheduler*);
|
||||
@ -1123,6 +1127,7 @@ void ObLS::unregister_user_service_()
|
||||
UNREGISTER_FROM_RESTORESERVICE(NET_STANDBY_TNT_SERVICE_LOG_BASE_TYPE, net_standby_tnt_service);
|
||||
UNREGISTER_FROM_LOGSERVICE(TTL_LOG_BASE_TYPE, MTL(table::ObTTLService *));
|
||||
UNREGISTER_FROM_LOGSERVICE(MVIEW_MAINTENANCE_SERVICE_LOG_BASE_TYPE, MTL(ObMViewMaintenanceService *));
|
||||
UNREGISTER_FROM_LOGSERVICE(TABLE_LOAD_RESOURCE_SERVICE_LOG_BASE_TYPE, MTL(observer::ObTableLoadResourceService *));
|
||||
}
|
||||
if (ls_meta_.ls_id_.is_user_ls()) {
|
||||
UNREGISTER_FROM_LOGSERVICE(TTL_LOG_BASE_TYPE, tablet_ttl_mgr_);
|
||||
|
||||
Reference in New Issue
Block a user