use tenant memory in bf_queue
This commit is contained in:
22
deps/oblib/src/lib/queue/ob_dedup_queue.cpp
vendored
22
deps/oblib/src/lib/queue/ob_dedup_queue.cpp
vendored
@ -62,17 +62,19 @@ int ObDedupQueue::init(int32_t thread_num /*= DEFAULT_THREAD_NUM*/,
|
|||||||
const int64_t task_map_size /*= TASK_MAP_SIZE*/,
|
const int64_t task_map_size /*= TASK_MAP_SIZE*/,
|
||||||
const int64_t total_mem_limit /*= TOTAL_LIMIT*/,
|
const int64_t total_mem_limit /*= TOTAL_LIMIT*/,
|
||||||
const int64_t hold_mem_limit /*= HOLD_LIMIT*/,
|
const int64_t hold_mem_limit /*= HOLD_LIMIT*/,
|
||||||
const int64_t page_size /*= PAGE_SIZE*/)
|
const int64_t page_size /*= PAGE_SIZE*/,
|
||||||
|
const uint64_t tenant_id /*= OB_SERVER_TENANT_ID*/,
|
||||||
|
const lib::ObLabel &label /*= "DedupQueue"*/)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (is_inited_) {
|
if (is_inited_) {
|
||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
} else if (thread_num <= 0 || thread_num > MAX_THREAD_NUM
|
} else if (thread_num <= 0 || thread_num > MAX_THREAD_NUM
|
||||||
|| total_mem_limit <= 0 || hold_mem_limit <= 0
|
|| total_mem_limit <= 0 || hold_mem_limit <= 0
|
||||||
|| page_size <= 0) {
|
|| page_size <= 0 || OB_INVALID_TENANT_ID == tenant_id) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
COMMON_LOG(WARN, "invalid argument", K(thread_num), K(queue_size),
|
COMMON_LOG(WARN, "invalid argument", K(thread_num), K(queue_size),
|
||||||
K(total_mem_limit), K(hold_mem_limit), K(page_size));
|
K(total_mem_limit), K(hold_mem_limit), K(page_size), K(tenant_id));
|
||||||
} else if (OB_FAIL(task_queue_sync_.init(ObWaitEventIds::DEDUP_QUEUE_COND_WAIT))) {
|
} else if (OB_FAIL(task_queue_sync_.init(ObWaitEventIds::DEDUP_QUEUE_COND_WAIT))) {
|
||||||
COMMON_LOG(WARN, "fail to init task queue sync cond, ", K(ret));
|
COMMON_LOG(WARN, "fail to init task queue sync cond, ", K(ret));
|
||||||
} else if (OB_FAIL(work_thread_sync_.init(ObWaitEventIds::DEFAULT_COND_WAIT))) {
|
} else if (OB_FAIL(work_thread_sync_.init(ObWaitEventIds::DEFAULT_COND_WAIT))) {
|
||||||
@ -82,14 +84,18 @@ int ObDedupQueue::init(int32_t thread_num /*= DEFAULT_THREAD_NUM*/,
|
|||||||
thread_num_ = thread_num;
|
thread_num_ = thread_num;
|
||||||
work_thread_num_ = thread_num;
|
work_thread_num_ = thread_num;
|
||||||
set_thread_count(thread_num);
|
set_thread_count(thread_num);
|
||||||
if (OB_SUCCESS != (ret = allocator_.init(total_mem_limit, hold_mem_limit, page_size))) {
|
|
||||||
COMMON_LOG(WARN, "allocator init fail", K(total_mem_limit), K(hold_mem_limit), K(page_size),
|
if (OB_SUCCESS != (ret = allocator_.init(page_size, label, tenant_id, total_mem_limit))) {
|
||||||
K(ret));
|
COMMON_LOG(WARN, "allocator init fail", K(page_size), K(label), K(tenant_id),
|
||||||
|
K(total_mem_limit), K(ret));
|
||||||
} else if (OB_SUCCESS != (ret = task_map_.create(task_map_size, &hash_allocator_,
|
} else if (OB_SUCCESS != (ret = task_map_.create(task_map_size, &hash_allocator_,
|
||||||
ObModIds::OB_HASH_BUCKET_TASK_MAP))) {
|
ObModIds::OB_HASH_BUCKET_TASK_MAP))) {
|
||||||
COMMON_LOG(WARN, "task_map create fail", K(ret));
|
COMMON_LOG(WARN, "task_map create fail", K(ret));
|
||||||
} else if (OB_SUCCESS != (ret = task_queue_.init(queue_size))) {
|
} else if (OB_SUCCESS != (ret = task_queue_.init(queue_size, &allocator_))) {
|
||||||
COMMON_LOG(WARN, "task_queue init fail", K(ret));
|
COMMON_LOG(WARN, "task_queue init fail", K(ret));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
} else if (OB_FAIL(start())) {
|
} else if (OB_FAIL(start())) {
|
||||||
COMMON_LOG(WARN, "start thread fail", K(ret));
|
COMMON_LOG(WARN, "start thread fail", K(ret));
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
4
deps/oblib/src/lib/queue/ob_dedup_queue.h
vendored
4
deps/oblib/src/lib/queue/ob_dedup_queue.h
vendored
@ -166,7 +166,9 @@ public:
|
|||||||
const int64_t task_map_size = TASK_MAP_SIZE,
|
const int64_t task_map_size = TASK_MAP_SIZE,
|
||||||
const int64_t total_mem_limit = TOTAL_LIMIT,
|
const int64_t total_mem_limit = TOTAL_LIMIT,
|
||||||
const int64_t hold_mem_limit = HOLD_LIMIT,
|
const int64_t hold_mem_limit = HOLD_LIMIT,
|
||||||
const int64_t page_size = PAGE_SIZE);
|
const int64_t page_size = PAGE_SIZE,
|
||||||
|
const uint64_t tenant_id = OB_SERVER_TENANT_ID,
|
||||||
|
const lib::ObLabel &label = "DedupQueue");
|
||||||
void destroy();
|
void destroy();
|
||||||
public:
|
public:
|
||||||
int add_task(const IObDedupTask &task);
|
int add_task(const IObDedupTask &task);
|
||||||
|
|||||||
@ -187,11 +187,18 @@ int ObTenantTabletScheduler::init()
|
|||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
LOG_WARN("ObTenantTabletScheduler has inited", K(ret));
|
LOG_WARN("ObTenantTabletScheduler has inited", K(ret));
|
||||||
} else if (FALSE_IT(bf_queue_.set_run_wrapper(MTL_CTX()))) {
|
} else if (FALSE_IT(bf_queue_.set_run_wrapper(MTL_CTX()))) {
|
||||||
} else if (OB_FAIL(bf_queue_.init(BLOOM_FILTER_LOAD_BUILD_THREAD_CNT, "BFBuildTask"))) {
|
} else if (OB_FAIL(bf_queue_.init(BLOOM_FILTER_LOAD_BUILD_THREAD_CNT,
|
||||||
|
"BFBuildTask",
|
||||||
|
BF_TASK_QUEUE_SIZE,
|
||||||
|
BF_TASK_MAP_SIZE,
|
||||||
|
BF_TASK_TOTAL_LIMIT,
|
||||||
|
BF_TASK_HOLD_LIMIT,
|
||||||
|
BF_TASK_PAGE_SIZE,
|
||||||
|
MTL_ID(),
|
||||||
|
"bf_queue"))) {
|
||||||
LOG_WARN("Fail to init bloom filter queue", K(ret));
|
LOG_WARN("Fail to init bloom filter queue", K(ret));
|
||||||
} else {
|
} else {
|
||||||
schedule_interval_ = schedule_interval;
|
schedule_interval_ = schedule_interval;
|
||||||
bf_queue_.set_label("bf_queue");
|
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -179,6 +179,12 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
static const int64_t BLOOM_FILTER_LOAD_BUILD_THREAD_CNT = 1;
|
static const int64_t BLOOM_FILTER_LOAD_BUILD_THREAD_CNT = 1;
|
||||||
|
static const int64_t BF_TASK_QUEUE_SIZE = 10L * 1000;
|
||||||
|
static const int64_t BF_TASK_MAP_SIZE = 10L * 1000;
|
||||||
|
static const int64_t BF_TASK_TOTAL_LIMIT = 512L * 1024L * 1024L;
|
||||||
|
static const int64_t BF_TASK_HOLD_LIMIT = 256L * 1024L * 1024L;
|
||||||
|
static const int64_t BF_TASK_PAGE_SIZE = common::OB_MALLOC_MIDDLE_BLOCK_SIZE; //64K
|
||||||
|
|
||||||
static const int64_t NO_MAJOR_MERGE_TYPE_CNT = 3;
|
static const int64_t NO_MAJOR_MERGE_TYPE_CNT = 3;
|
||||||
static constexpr ObMergeType MERGE_TYPES[] = {
|
static constexpr ObMergeType MERGE_TYPES[] = {
|
||||||
MINI_MINOR_MERGE, BUF_MINOR_MERGE, HISTORY_MINI_MINOR_MERGE};
|
MINI_MINOR_MERGE, BUF_MINOR_MERGE, HISTORY_MINI_MINOR_MERGE};
|
||||||
|
|||||||
Reference in New Issue
Block a user