From 0a930a8024fa3586aca35627f896146bbd296bc3 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 4 Nov 2022 09:35:56 +0000 Subject: [PATCH] use tenant memory in bf_queue --- deps/oblib/src/lib/queue/ob_dedup_queue.cpp | 22 ++++++++++++------- deps/oblib/src/lib/queue/ob_dedup_queue.h | 4 +++- .../compaction/ob_tenant_tablet_scheduler.cpp | 11 ++++++++-- .../compaction/ob_tenant_tablet_scheduler.h | 6 +++++ 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/deps/oblib/src/lib/queue/ob_dedup_queue.cpp b/deps/oblib/src/lib/queue/ob_dedup_queue.cpp index 8ab8aa10f3..e4a9a626f6 100644 --- a/deps/oblib/src/lib/queue/ob_dedup_queue.cpp +++ b/deps/oblib/src/lib/queue/ob_dedup_queue.cpp @@ -62,17 +62,19 @@ int ObDedupQueue::init(int32_t thread_num /*= DEFAULT_THREAD_NUM*/, const int64_t task_map_size /*= TASK_MAP_SIZE*/, const int64_t total_mem_limit /*= TOTAL_LIMIT*/, const int64_t hold_mem_limit /*= HOLD_LIMIT*/, - const int64_t page_size /*= PAGE_SIZE*/) + const int64_t page_size /*= PAGE_SIZE*/, + const uint64_t tenant_id /*= OB_SERVER_TENANT_ID*/, + const lib::ObLabel &label /*= "DedupQueue"*/) { int ret = OB_SUCCESS; if (is_inited_) { ret = OB_INIT_TWICE; } else if (thread_num <= 0 || thread_num > MAX_THREAD_NUM || total_mem_limit <= 0 || hold_mem_limit <= 0 - || page_size <= 0) { + || page_size <= 0 || OB_INVALID_TENANT_ID == tenant_id) { ret = OB_INVALID_ARGUMENT; COMMON_LOG(WARN, "invalid argument", K(thread_num), K(queue_size), - K(total_mem_limit), K(hold_mem_limit), K(page_size)); + K(total_mem_limit), K(hold_mem_limit), K(page_size), K(tenant_id)); } else if (OB_FAIL(task_queue_sync_.init(ObWaitEventIds::DEDUP_QUEUE_COND_WAIT))) { COMMON_LOG(WARN, "fail to init task queue sync cond, ", K(ret)); } else if (OB_FAIL(work_thread_sync_.init(ObWaitEventIds::DEFAULT_COND_WAIT))) { @@ -82,14 +84,18 @@ int ObDedupQueue::init(int32_t thread_num /*= DEFAULT_THREAD_NUM*/, thread_num_ = thread_num; work_thread_num_ = thread_num; set_thread_count(thread_num); - if (OB_SUCCESS != (ret = allocator_.init(total_mem_limit, hold_mem_limit, page_size))) { - COMMON_LOG(WARN, "allocator init fail", K(total_mem_limit), K(hold_mem_limit), K(page_size), - K(ret)); + + if (OB_SUCCESS != (ret = allocator_.init(page_size, label, tenant_id, total_mem_limit))) { + COMMON_LOG(WARN, "allocator init fail", K(page_size), K(label), K(tenant_id), + K(total_mem_limit), K(ret)); } else if (OB_SUCCESS != (ret = task_map_.create(task_map_size, &hash_allocator_, - ObModIds::OB_HASH_BUCKET_TASK_MAP))) { + ObModIds::OB_HASH_BUCKET_TASK_MAP))) { COMMON_LOG(WARN, "task_map create fail", K(ret)); - } else if (OB_SUCCESS != (ret = task_queue_.init(queue_size))) { + } else if (OB_SUCCESS != (ret = task_queue_.init(queue_size, &allocator_))) { COMMON_LOG(WARN, "task_queue init fail", K(ret)); + } + + if (OB_FAIL(ret)) { } else if (OB_FAIL(start())) { COMMON_LOG(WARN, "start thread fail", K(ret)); } else { diff --git a/deps/oblib/src/lib/queue/ob_dedup_queue.h b/deps/oblib/src/lib/queue/ob_dedup_queue.h index 07b2928f0e..e3c7ea9131 100644 --- a/deps/oblib/src/lib/queue/ob_dedup_queue.h +++ b/deps/oblib/src/lib/queue/ob_dedup_queue.h @@ -166,7 +166,9 @@ public: const int64_t task_map_size = TASK_MAP_SIZE, const int64_t total_mem_limit = TOTAL_LIMIT, const int64_t hold_mem_limit = HOLD_LIMIT, - const int64_t page_size = PAGE_SIZE); + const int64_t page_size = PAGE_SIZE, + const uint64_t tenant_id = OB_SERVER_TENANT_ID, + const lib::ObLabel &label = "DedupQueue"); void destroy(); public: int add_task(const IObDedupTask &task); diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index 7bf6e5023c..306bdadba9 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -187,11 +187,18 @@ int ObTenantTabletScheduler::init() ret = OB_INIT_TWICE; LOG_WARN("ObTenantTabletScheduler has inited", K(ret)); } else if (FALSE_IT(bf_queue_.set_run_wrapper(MTL_CTX()))) { - } else if (OB_FAIL(bf_queue_.init(BLOOM_FILTER_LOAD_BUILD_THREAD_CNT, "BFBuildTask"))) { + } else if (OB_FAIL(bf_queue_.init(BLOOM_FILTER_LOAD_BUILD_THREAD_CNT, + "BFBuildTask", + BF_TASK_QUEUE_SIZE, + BF_TASK_MAP_SIZE, + BF_TASK_TOTAL_LIMIT, + BF_TASK_HOLD_LIMIT, + BF_TASK_PAGE_SIZE, + MTL_ID(), + "bf_queue"))) { LOG_WARN("Fail to init bloom filter queue", K(ret)); } else { schedule_interval_ = schedule_interval; - bf_queue_.set_label("bf_queue"); is_inited_ = true; } diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.h b/src/storage/compaction/ob_tenant_tablet_scheduler.h index 8b2227badb..d9b79de711 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.h +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.h @@ -179,6 +179,12 @@ public: private: static const int64_t BLOOM_FILTER_LOAD_BUILD_THREAD_CNT = 1; + static const int64_t BF_TASK_QUEUE_SIZE = 10L * 1000; + static const int64_t BF_TASK_MAP_SIZE = 10L * 1000; + static const int64_t BF_TASK_TOTAL_LIMIT = 512L * 1024L * 1024L; + static const int64_t BF_TASK_HOLD_LIMIT = 256L * 1024L * 1024L; + static const int64_t BF_TASK_PAGE_SIZE = common::OB_MALLOC_MIDDLE_BLOCK_SIZE; //64K + static const int64_t NO_MAJOR_MERGE_TYPE_CNT = 3; static constexpr ObMergeType MERGE_TYPES[] = { MINI_MINOR_MERGE, BUF_MINOR_MERGE, HISTORY_MINI_MINOR_MERGE};