[CP] [OBCDC] Fix memory usage low and memory fragmentation issue

This commit is contained in:
SanmuWangZJU 2023-09-21 06:48:23 +00:00 committed by ob-robot
parent dc4430d197
commit 7636476502
18 changed files with 158 additions and 76 deletions

View File

@ -56,6 +56,7 @@ void ObCDCAutoConfigMgr::reset()
void ObCDCAutoConfigMgr::init(const ObLogConfig &config) void ObCDCAutoConfigMgr::init(const ObLogConfig &config)
{ {
init_queue_length_(config); init_queue_length_(config);
init_initial_config_(config);
refresh_dynamic_config_(config); refresh_dynamic_config_(config);
LOG_INFO("init ObCDCAutoConfigMgr succ"); LOG_INFO("init ObCDCAutoConfigMgr succ");
} }
@ -96,17 +97,24 @@ void ObCDCAutoConfigMgr::init_queue_length_(const ObLogConfig &config)
REFRESH_NUM_FIELD_WITH_CONFIG(reader_queue_length, DEFAULT_STORAGE_QUEUE_LENGTH, config.reader_queue_length.get()); REFRESH_NUM_FIELD_WITH_CONFIG(reader_queue_length, DEFAULT_STORAGE_QUEUE_LENGTH, config.reader_queue_length.get());
} }
void ObCDCAutoConfigMgr::init_initial_config_(const ObLogConfig &config)
{
const int64_t factor = factor_;
const int64_t part_trans_task_prealloc_count = (1 << (factor_ - 11)) * 20000;
REFRESH_NUM_FIELD_WITH_CONFIG(part_trans_task_prealloc_count, part_trans_task_prealloc_count, config.part_trans_task_prealloc_count.get());
}
void ObCDCAutoConfigMgr::refresh_dynamic_config_(const ObLogConfig &config) void ObCDCAutoConfigMgr::refresh_dynamic_config_(const ObLogConfig &config)
{ {
refresh_factor_(config); refresh_factor_(config);
const static int64_t DEFAULT_STORAGER_MEM_PERCENT = 1; const static int64_t DEFAULT_STORAGER_MEM_PERCENT = 1;
const static int64_t DEFAULT_STORAGER_TASK_UPPER_BOUND = 100; const static int64_t DEFAULT_STORAGER_TASK_UPPER_BOUND = 100;
const int64_t redo_dispatcher_limit = (1 << (factor_ - 11)) * 32 * _M_; const int64_t redo_dispatcher_limit = (1 << (factor_ - 11)) * 32 * _M_;
const int64_t auto_part_trans_task_upper_bound = 2000 * (factor_ - 1); const int64_t auto_part_trans_task_upper_bound = (1 << (factor_ - 11)) * 20000;
const int64_t active_part_trans_task_upper_bound = auto_part_trans_task_upper_bound; const int64_t active_part_trans_task_upper_bound = auto_part_trans_task_upper_bound;
const int64_t reusable_part_trans_task_upper_bound = auto_part_trans_task_upper_bound; const int64_t reusable_part_trans_task_upper_bound = auto_part_trans_task_upper_bound;
const int64_t ready_to_seq_task_upper_bound = auto_part_trans_task_upper_bound; const int64_t ready_to_seq_task_upper_bound = auto_part_trans_task_upper_bound;
const int64_t extra_redo_dispatch_memory_size = 1 * _K_ + (1 << (factor_ - 11)) * (factor_ - 11) * _M_; const int64_t extra_redo_dispatch_memory_size = 1 * _K_ + (1 << (factor_ - 9)) * (factor_ - 11) * _M_;
const int64_t redo_dispatch_exceed_ratio = factor_ <= 12 ? 1 : (1 << (factor_ - 13)); const int64_t redo_dispatch_exceed_ratio = factor_ <= 12 ? 1 : (1 << (factor_ - 13));
REFRESH_NUM_FIELD_WITH_CONFIG(redo_dispatcher_memory_limit, redo_dispatcher_limit, config.redo_dispatcher_memory_limit.get()); REFRESH_NUM_FIELD_WITH_CONFIG(redo_dispatcher_memory_limit, redo_dispatcher_limit, config.redo_dispatcher_memory_limit.get());

View File

@ -37,6 +37,8 @@ private:
private: private:
void refresh_factor_(const ObLogConfig &config); void refresh_factor_(const ObLogConfig &config);
void init_queue_length_(const ObLogConfig &config); void init_queue_length_(const ObLogConfig &config);
// should invoke after init_queue_length_ to ensure factor_ is valid
void init_initial_config_(const ObLogConfig &config);
void refresh_dynamic_config_(const ObLogConfig &config); void refresh_dynamic_config_(const ObLogConfig &config);
int64_t get_log2_(int64_t value); int64_t get_log2_(int64_t value);
private: private:
@ -52,9 +54,10 @@ private:
// | factor | 11 | 12 | 13 | 14 | 15 | 17 | // | factor | 11 | 12 | 13 | 14 | 15 | 17 |
// | auto_queue_length | 256 | 512 | 1024 | 2048 | 4096 | 16384 | // | auto_queue_length | 256 | 512 | 1024 | 2048 | 4096 | 16384 |
// | br_queue_length | 8192 | 16384 | 32768 | 65536 | 10W | 10W | // | br_queue_length | 8192 | 16384 | 32768 | 65536 | 10W | 10W |
// | auto_part_trans_task_upper_bound | 20K | 22K | 24K | 26K | 28K | 32K | // | part_trans_task_prealloc_count | 2W | 4W | 8W | 16W | 32W | 128W |
// | auto_part_trans_task_upper_bound | 2W | 4W | 8W | 16W | 32W | 128W |
// | redo_dispatcher_memory_limit | 32M | 64M | 128M | 256M | 512M | 2G | // | redo_dispatcher_memory_limit | 32M | 64M | 128M | 256M | 512M | 2G |
// | extra_redo_dispatch_memory_size | 1K | 2M | 8M | 24M | 64M | 256M | // | extra_redo_dispatch_memory_size | 1K | 8M | 32M | 96M | 256M | 1.5G |
// | redo_dispatch_exceed_ratio | 1 | 1 | 1 | 2 | 4 | 16 | // | redo_dispatch_exceed_ratio | 1 | 1 | 1 | 2 | 4 | 16 |
int64_t factor_; int64_t factor_;
DEFINE_FIELD_WITH_GETTER(int64_t, br_queue_length); DEFINE_FIELD_WITH_GETTER(int64_t, br_queue_length);
@ -70,6 +73,9 @@ DEFINE_FIELD_WITH_GETTER(int64_t, resource_collector_queue_length);
DEFINE_FIELD_WITH_GETTER(int64_t, formatter_queue_length); DEFINE_FIELD_WITH_GETTER(int64_t, formatter_queue_length);
DEFINE_FIELD_WITH_GETTER(int64_t, dml_parser_queue_length); DEFINE_FIELD_WITH_GETTER(int64_t, dml_parser_queue_length);
// initial-value can't change after init
DEFINE_FIELD_WITH_GETTER(int64_t, part_trans_task_prealloc_count);
// flow controll // flow controll
DEFINE_FIELD_WITH_GETTER(int64_t, memory_limit); DEFINE_FIELD_WITH_GETTER(int64_t, memory_limit);
DEFINE_FIELD_WITH_GETTER(int64_t, redo_dispatcher_memory_limit); DEFINE_FIELD_WITH_GETTER(int64_t, redo_dispatcher_memory_limit);

View File

@ -113,11 +113,11 @@ public:
DEF_INT(br_queue_length, OB_CLUSTER_PARAMETER, "0", "[0, ]", "user_binlog_record queue length"); DEF_INT(br_queue_length, OB_CLUSTER_PARAMETER, "0", "[0, ]", "user_binlog_record queue length");
DEF_INT(cached_schema_version_count, OB_CLUSTER_PARAMETER, "32", "[1,]", "cached schema version count"); DEF_INT(cached_schema_version_count, OB_CLUSTER_PARAMETER, "32", "[1,]", "cached schema version count");
DEF_INT(history_schema_version_count, OB_CLUSTER_PARAMETER, "16", "[1,]", "history schema version count"); DEF_INT(history_schema_version_count, OB_CLUSTER_PARAMETER, "16", "[1,]", "history schema version count");
DEF_INT(resource_collector_thread_num, OB_CLUSTER_PARAMETER, "10", "[1,]", "resource collector thread number"); DEF_INT(resource_collector_thread_num, OB_CLUSTER_PARAMETER, "11", "[1,]", "resource collector thread number");
DEF_INT(resource_collector_thread_num_for_br, OB_CLUSTER_PARAMETER, "7", "[1,]", "binlog record resource collector thread number"); DEF_INT(resource_collector_thread_num_for_br, OB_CLUSTER_PARAMETER, "7", "[1,]", "binlog record resource collector thread number");
DEF_INT(instance_num, OB_CLUSTER_PARAMETER, "1", "[1,]", "store instance number"); DEF_INT(instance_num, OB_CLUSTER_PARAMETER, "1", "[1,]", "store instance number");
DEF_INT(instance_index, OB_CLUSTER_PARAMETER, "0", "[0,]", "store instance index, start from 0"); DEF_INT(instance_index, OB_CLUSTER_PARAMETER, "0", "[0,]", "store instance index, start from 0");
DEF_INT(part_trans_task_prealloc_count, OB_CLUSTER_PARAMETER, "300000", "[1,]", DEF_INT(part_trans_task_prealloc_count, OB_CLUSTER_PARAMETER, "0", "[0,]",
"part trans task pre-alloc count"); "part trans task pre-alloc count");
DEF_INT(part_trans_task_active_count_upper_bound, OB_CLUSTER_PARAMETER, "0", "[0,]", DEF_INT(part_trans_task_active_count_upper_bound, OB_CLUSTER_PARAMETER, "0", "[0,]",
"active part trans task count upper bound"); "active part trans task count upper bound");
@ -372,13 +372,13 @@ public:
T_DEF_INT_INFT(blacklist_survival_time_penalty_period_min, OB_CLUSTER_PARAMETER, 1, 1, "blacklist survival time punish interval in minute"); T_DEF_INT_INFT(blacklist_survival_time_penalty_period_min, OB_CLUSTER_PARAMETER, 1, 1, "blacklist survival time punish interval in minute");
// Blacklist history expiration time, used to delete history // Blacklist history expiration time, used to delete history
T_DEF_INT_INFT(blacklist_history_overdue_time_min, OB_CLUSTER_PARAMETER, 30, 10, "blacklist history overdue in minute"); T_DEF_INT_INFT(blacklist_history_overdue_time_min, OB_CLUSTER_PARAMETER, 3, 1, "blacklist history overdue in minute");
// Clear blacklist history period, unit: minutes // Clear blacklist history period, unit: minutes
T_DEF_INT_INFT(blacklist_history_clear_interval_min, OB_CLUSTER_PARAMETER, 20, 10, "blacklist history clear interval in minute"); T_DEF_INT_INFT(blacklist_history_clear_interval_min, OB_CLUSTER_PARAMETER, 2, 1, "blacklist history clear interval in minute");
// Check the need for active cut-off cycles, in minutes // Check the need for active cut-off cycles, in minutes
T_DEF_INT_INFT(check_switch_server_interval_min, OB_CLUSTER_PARAMETER, 30, 1, "check switch server interval in minute"); T_DEF_INT_INFT(check_switch_server_interval_min, OB_CLUSTER_PARAMETER, 10, 1, "check switch server interval in minute");
// Print the number of LSs with the slowest progress of the Fetcher module // Print the number of LSs with the slowest progress of the Fetcher module
T_DEF_INT_INFT(print_fetcher_slowest_ls_num, OB_CLUSTER_PARAMETER, 10, 1, "print fetcher slowest ls num"); T_DEF_INT_INFT(print_fetcher_slowest_ls_num, OB_CLUSTER_PARAMETER, 10, 1, "print fetcher slowest ls num");

View File

@ -23,8 +23,10 @@ namespace libobcdc
{ {
ObLogEntryTaskPool::ObLogEntryTaskPool() ObLogEntryTaskPool::ObLogEntryTaskPool()
:inited_(false), : inited_(false),
pool_() alloc_cnt_(0),
block_alloc_(),
allocator_()
{ {
} }
@ -36,16 +38,22 @@ ObLogEntryTaskPool::~ObLogEntryTaskPool()
int ObLogEntryTaskPool::init(const int64_t fixed_task_count) int ObLogEntryTaskPool::init(const int64_t fixed_task_count)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
lib::ObMemAttr mem_attr(OB_SERVER_TENANT_ID, "CDCEntryTaskPol");
if (OB_UNLIKELY(inited_)) { if (OB_UNLIKELY(inited_)) {
LOG_ERROR("RowDataTaskPool has been initialized"); LOG_ERROR("RowDataTaskPool has been initialized");
ret = OB_INIT_TWICE; ret = OB_INIT_TWICE;
} else if (OB_UNLIKELY(fixed_task_count <= 0)) { } else if (OB_UNLIKELY(fixed_task_count <= 0)) {
LOG_ERROR("invalid argument", K(fixed_task_count));
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(pool_.init(fixed_task_count, "LEntryTaskPool"))) { LOG_ERROR("invalid argument", K(fixed_task_count));
LOG_ERROR("row data task pool init fail", KR(ret), K(fixed_task_count)); } else if (OB_FAIL(allocator_.init(
sizeof(ObLogEntryTask),
common::OB_MALLOC_NORMAL_BLOCK_SIZE,
block_alloc_,
mem_attr))) {
LOG_ERROR("init allocator for ObLogEntryTaskPool failed", KR(ret));
} else { } else {
allocator_.set_nway(4);
inited_ = true; inited_ = true;
LOG_INFO("LogEntryTaskPool init success", K(fixed_task_count)); LOG_INFO("LogEntryTaskPool init success", K(fixed_task_count));
} }
@ -55,25 +63,28 @@ int ObLogEntryTaskPool::init(const int64_t fixed_task_count)
void ObLogEntryTaskPool::destroy() void ObLogEntryTaskPool::destroy()
{ {
try_purge_pool();
inited_ = false; inited_ = false;
pool_.destroy(); alloc_cnt_ = 0;
allocator_.destroy();
} }
int ObLogEntryTaskPool::alloc(ObLogEntryTask *&log_entry_task, int ObLogEntryTaskPool::alloc(
void *host) ObLogEntryTask *&log_entry_task,
PartTransTask &host)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
void *ptr = nullptr;
if (OB_UNLIKELY(! inited_)) { if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("RowDataTaskPool has not been initialized");
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
} else if (OB_FAIL(pool_.alloc(log_entry_task))) { LOG_ERROR("RowDataTaskPool has not been initialized", KR(ret));
LOG_ERROR("alloc binlog record fail", KR(ret)); } else if (OB_ISNULL(ptr = allocator_.alloc())) {
} else if (OB_ISNULL(log_entry_task)) {
LOG_ERROR("alloc binlog record fail", K(log_entry_task));
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("alloc log_entry_task failed", KR(ret), K_(alloc_cnt), "memory_hold", allocator_.hold());
} else { } else {
log_entry_task->set_host(host); log_entry_task = new(ptr) ObLogEntryTask(host);
ATOMIC_INC(&alloc_cnt_);
} }
return ret; return ret;
@ -83,27 +94,34 @@ void ObLogEntryTaskPool::free(ObLogEntryTask *log_entry_task)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_LIKELY(inited_) && OB_LIKELY(NULL != log_entry_task)) { if (OB_LIKELY(inited_) && OB_NOT_NULL(log_entry_task)) {
// Timely memory recycling // Timely memory recycling
log_entry_task->reset(); log_entry_task->~ObLogEntryTask();
allocator_.free(log_entry_task);
if (OB_FAIL(pool_.free(log_entry_task))) { log_entry_task = nullptr;
LOG_ERROR("free binlog record fail", KR(ret), K(log_entry_task)); ATOMIC_DEC(&alloc_cnt_);
} else {
log_entry_task = NULL;
}
} }
} }
int64_t ObLogEntryTaskPool::get_alloc_count() const int64_t ObLogEntryTaskPool::get_alloc_count() const
{ {
return pool_.get_alloc_count(); return ATOMIC_LOAD(&alloc_cnt_);
} }
void ObLogEntryTaskPool::print_stat_info() const void ObLogEntryTaskPool::print_stat_info()
{ {
_LOG_INFO("[STAT] [LOG_ENTRY_TASK_POOL] TOTAL=%ld FREE=%ld FIXED=%ld", LOG_INFO("[STAT] [LOG_ENTRY_TASK_POOL]",
pool_.get_alloc_count(), pool_.get_free_count(), pool_.get_fixed_count()); K_(alloc_cnt), "memory_used",
SIZE_TO_STR(alloc_cnt_ * sizeof(ObLogEntryTask)),
"allocated memory", SIZE_TO_STR(allocator_.hold()),
K_(allocator));
}
void ObLogEntryTaskPool::try_purge_pool()
{
if (inited_) {
allocator_.try_purge();
}
} }
} // namespace libobcdc } // namespace libobcdc

View File

@ -15,7 +15,7 @@
#ifndef OCEANBASE_SRC_LIBOBLOG_OB_LOG_ENTRY_TASK_POOL_ #ifndef OCEANBASE_SRC_LIBOBLOG_OB_LOG_ENTRY_TASK_POOL_
#define OCEANBASE_SRC_LIBOBLOG_OB_LOG_ENTRY_TASK_POOL_ #define OCEANBASE_SRC_LIBOBLOG_OB_LOG_ENTRY_TASK_POOL_
#include "lib/objectpool/ob_small_obj_pool.h" // ObSmallObjPool #include "lib/allocator/ob_slice_alloc.h" // ObSliceAlloc
#include "ob_log_part_trans_task.h" // ObLogEntryTask #include "ob_log_part_trans_task.h" // ObLogEntryTask
namespace oceanbase namespace oceanbase
@ -28,11 +28,13 @@ public:
virtual ~IObLogEntryTaskPool() {} virtual ~IObLogEntryTaskPool() {}
public: public:
virtual int alloc(ObLogEntryTask *&task, virtual int alloc(
void *host) = 0; ObLogEntryTask *&task,
PartTransTask &host) = 0;
virtual void free(ObLogEntryTask *task) = 0; virtual void free(ObLogEntryTask *task) = 0;
virtual int64_t get_alloc_count() const = 0; virtual int64_t get_alloc_count() const = 0;
virtual void print_stat_info() const = 0; virtual void print_stat_info() = 0;
virtual void try_purge_pool() = 0;
}; };
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -40,26 +42,29 @@ public:
// ObLogEntryTaskPool // ObLogEntryTaskPool
class ObLogEntryTaskPool : public IObLogEntryTaskPool class ObLogEntryTaskPool : public IObLogEntryTaskPool
{ {
typedef common::ObSmallObjPool<ObLogEntryTask> LogEntryTaskPool; typedef ObBlockAllocMgr BlockAlloc;
public: public:
ObLogEntryTaskPool(); ObLogEntryTaskPool();
virtual ~ObLogEntryTaskPool(); virtual ~ObLogEntryTaskPool();
public: public:
int alloc(ObLogEntryTask *&log_entry_task, int alloc(
void *host); ObLogEntryTask *&log_entry_task,
void free(ObLogEntryTask *log_entry_task); PartTransTask &host) override;
int64_t get_alloc_count() const; void free(ObLogEntryTask *log_entry_task) override;
void print_stat_info() const; int64_t get_alloc_count() const override;
void print_stat_info() override;
void try_purge_pool() override;
public: public:
int init(const int64_t fixed_task_count); int init(const int64_t fixed_task_count);
void destroy(); void destroy();
private: private:
bool inited_; bool inited_;
LogEntryTaskPool pool_; int64_t alloc_cnt_;
BlockAlloc block_alloc_;
ObSliceAlloc allocator_;
private: private:
DISALLOW_COPY_AND_ASSIGN(ObLogEntryTaskPool); DISALLOW_COPY_AND_ASSIGN(ObLogEntryTaskPool);

View File

@ -560,12 +560,14 @@ int ObLogInstance::init_common_(uint64_t start_tstamp_ns, ERROR_CALLBACK err_cb)
LOG_ERROR("check config fail", KR(ret)); LOG_ERROR("check config fail", KR(ret));
} else if (OB_FAIL(dump_config_())) { } else if (OB_FAIL(dump_config_())) {
LOG_ERROR("dump_config_ fail", KR(ret)); LOG_ERROR("dump_config_ fail", KR(ret));
} else if (OB_FAIL(trans_task_pool_alloc_.init(TASK_POOL_ALLOCATOR_TOTAL_LIMIT, } else if (OB_FAIL(trans_task_pool_alloc_.init(
TASK_POOL_ALLOCATOR_TOTAL_LIMIT,
TASK_POOL_ALLOCATOR_HOLD_LIMIT, TASK_POOL_ALLOCATOR_HOLD_LIMIT,
TASK_POOL_ALLOCATOR_PAGE_SIZE))) { TASK_POOL_ALLOCATOR_PAGE_SIZE))) {
LOG_ERROR("init fifo allocator fail", KR(ret)); LOG_ERROR("init fifo allocator fail", KR(ret));
} else if (OB_FAIL(trans_task_pool_.init(&trans_task_pool_alloc_, } else if (OB_FAIL(trans_task_pool_.init(
TCONF.part_trans_task_prealloc_count, &trans_task_pool_alloc_,
CDC_CFG_MGR.get_part_trans_task_prealloc_count(),
1 == TCONF.part_trans_task_dynamic_alloc, 1 == TCONF.part_trans_task_dynamic_alloc,
TCONF.part_trans_task_prealloc_page_count))) { TCONF.part_trans_task_prealloc_page_count))) {
LOG_ERROR("init task pool fail", KR(ret)); LOG_ERROR("init task pool fail", KR(ret));
@ -744,6 +746,13 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
LOG_INFO("set working mode", K(working_mode_str), K(working_mode_), "working_mode", print_working_mode(working_mode_)); LOG_INFO("set working mode", K(working_mode_str), K(working_mode_), "working_mode", print_working_mode(working_mode_));
} }
// init ObClockGenerator
if (OB_SUCC(ret)) {
if (OB_FAIL(common::ObClockGenerator::init())) {
LOG_ERROR("failed to init ob clock generator", KR(ret));
}
}
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
if (OB_UNLIKELY(! is_refresh_mode_valid(refresh_mode))) { if (OB_UNLIKELY(! is_refresh_mode_valid(refresh_mode))) {
ret = OB_INVALID_CONFIG; ret = OB_INVALID_CONFIG;
@ -840,13 +849,6 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
} }
} }
// init ObClockGenerator
if (OB_SUCC(ret)) {
if (OB_FAIL(common::ObClockGenerator::init())) {
LOG_ERROR("failed to init ob clock generator", KR(ret));
}
}
INIT(log_entry_task_pool_, ObLogEntryTaskPool, TCONF.log_entry_task_prealloc_count); INIT(log_entry_task_pool_, ObLogEntryTaskPool, TCONF.log_entry_task_prealloc_count);
INIT(store_service_, RocksDbStoreService, store_service_path); INIT(store_service_, RocksDbStoreService, store_service_path);
@ -2474,6 +2476,7 @@ void ObLogInstance::global_flow_control_()
bool condition3 = (storager_task_count > storager_task_count_upper_bound) && (memory_hold >= storager_mem_percentage * memory_limit); bool condition3 = (storager_task_count > storager_task_count_upper_bound) && (memory_hold >= storager_mem_percentage * memory_limit);
need_slow_down_fetcher = (condition1 && (condition2 || need_pause_dispatch || is_seq_queue_not_empty)) || condition3; need_slow_down_fetcher = (condition1 && (condition2 || need_pause_dispatch || is_seq_queue_not_empty)) || condition3;
if (need_slow_down_fetcher) { if (need_slow_down_fetcher) {
if (condition2) { if (condition2) {
reason = "MEMORY_LIMIT_AND_REUSABLE_PART_TOO_MUCH"; reason = "MEMORY_LIMIT_AND_REUSABLE_PART_TOO_MUCH";

View File

@ -546,6 +546,8 @@ int PartTransDispatcher::alloc_task(const PartTransID &part_trans_id, PartTransT
task->revert(); task->revert();
task = NULL; task = NULL;
LOG_ERROR("insert part trans task fail", KR(ret), K(part_trans_id)); LOG_ERROR("insert part trans task fail", KR(ret), K(part_trans_id));
} else if (OB_FAIL(task->init_log_entry_task_allocator())) {
LOG_ERROR("init log_entry_task base allocator failed", KR(ret), KPC(task));
} else { } else {
task->set_trans_id(part_trans_id.get_tx_id()); task->set_trans_id(part_trans_id.get_tx_id());
// This task is only in MAP, not in QUEUE // This task is only in MAP, not in QUEUE

View File

@ -181,7 +181,7 @@ int ObLogPartTransParser::parse_ddl_redo_log_(PartTransTask &task, volatile bool
ObLogTenant *tenant = NULL; ObLogTenant *tenant = NULL;
ObLogTenantGuard guard; ObLogTenantGuard guard;
// just declear here // just declear here
ObLogEntryTask invalid_redo_log_entry_task; ObLogEntryTask invalid_redo_log_entry_task(task);
// DDL data/non-PG partitioned data need to be deserialized in whole rows, not filtered // DDL data/non-PG partitioned data need to be deserialized in whole rows, not filtered
// otherwise need to get tenant structure and perform filtering // otherwise need to get tenant structure and perform filtering
@ -763,7 +763,7 @@ int ObLogPartTransParser::parse_ddl_lob_aux_stmts_(
PartTransTask &part_trans_task) PartTransTask &part_trans_task)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObLogEntryTask invalid_log_entry_task; ObLogEntryTask invalid_log_entry_task(part_trans_task);
// For DDL Lob Aux meta: DmlStmtTask needs to allocate memory based on PartTransTask // For DDL Lob Aux meta: DmlStmtTask needs to allocate memory based on PartTransTask
DmlStmtTask *stmt_task = static_cast<DmlStmtTask *>(part_trans_task.alloc(sizeof(DmlStmtTask))); DmlStmtTask *stmt_task = static_cast<DmlStmtTask *>(part_trans_task.alloc(sizeof(DmlStmtTask)));
ObCDCGlobalInfo &globl_info = TCTX.global_info_; ObCDCGlobalInfo &globl_info = TCTX.global_info_;

View File

@ -1777,8 +1777,8 @@ void DdlStmtTask::reset()
//////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////
ObLogEntryTask::ObLogEntryTask() : ObLogEntryTask::ObLogEntryTask(PartTransTask &host) :
host_(NULL), host_(&host),
participant_(NULL), participant_(NULL),
tls_id_(), tls_id_(),
trans_id_(), trans_id_(),
@ -1787,7 +1787,7 @@ ObLogEntryTask::ObLogEntryTask() :
stmt_list_(), stmt_list_(),
formatted_stmt_num_(0), formatted_stmt_num_(0),
row_ref_cnt_(0), row_ref_cnt_(0),
arena_allocator_("LogEntryTask", OB_MALLOC_MIDDLE_BLOCK_SIZE) arena_allocator_(host.get_log_entry_task_base_allocator(), "LogEntryTask", host.get_tenant_id())
{ {
} }
@ -2183,7 +2183,8 @@ PartTransTask::PartTransTask() :
wait_data_ready_cond_(), wait_data_ready_cond_(),
wait_formatted_cond_(NULL), wait_formatted_cond_(NULL),
output_br_count_by_turn_(0), output_br_count_by_turn_(0),
allocator_() allocator_(),
log_entry_task_base_allocator_()
{ {
} }
@ -2319,6 +2320,23 @@ void PartTransTask::reset()
output_br_count_by_turn_ = 0; output_br_count_by_turn_ = 0;
// reuse memory // reuse memory
allocator_.reset(); allocator_.reset();
log_entry_task_base_allocator_.destroy();
}
int PartTransTask::init_log_entry_task_allocator()
{
int ret = OB_SUCCESS;
lib::ObMemAttr attr(tls_id_.get_tenant_id(), "LogEntryTaskBas");
const int64_t cache_block_count = 4; // nway for vslice_alloc
if (OB_FAIL(log_entry_task_base_allocator_.init(
OB_MALLOC_MIDDLE_BLOCK_SIZE,
attr,
cache_block_count))) {
LOG_ERROR("init log_entry_task_base_allocator_ failed", KR(ret), KPC(this));
}
return ret;
} }
int PartTransTask::push_redo_log( int PartTransTask::push_redo_log(

View File

@ -39,6 +39,7 @@
#include "ob_log_callback.h" // ObILogCallback #include "ob_log_callback.h" // ObILogCallback
#include "ob_cdc_lob_ctx.h" // ObLobDataOutRowCtxList #include "ob_cdc_lob_ctx.h" // ObLobDataOutRowCtxList
#include "ob_cdc_lob_aux_table_schema_info.h" // ObCDCLobAuxTableSchemaInfo #include "ob_cdc_lob_aux_table_schema_info.h" // ObCDCLobAuxTableSchemaInfo
#include "lib/allocator/ob_lf_fifo_allocator.h" // ObConcurrentFIFOAllocator
#include "ob_log_safe_arena.h" #include "ob_log_safe_arena.h"
namespace oceanbase namespace oceanbase
@ -635,7 +636,7 @@ typedef LightyList<IStmtTask> StmtList;
class ObLogEntryTask class ObLogEntryTask
{ {
public: public:
ObLogEntryTask(); ObLogEntryTask(PartTransTask &host);
virtual ~ObLogEntryTask(); virtual ~ObLogEntryTask();
void reset(); void reset();
bool is_valid() const; bool is_valid() const;
@ -773,6 +774,8 @@ public:
public: public:
void reset(); void reset();
int init_log_entry_task_allocator();
/// The initialisation process of a transaction task is divided into four stages. /// The initialisation process of a transaction task is divided into four stages.
/// where: the DML transaction task processing process, where the maintenance of the completion status is completed, and the disassembly, maintenance and distribution of the task. /// where: the DML transaction task processing process, where the maintenance of the completion status is completed, and the disassembly, maintenance and distribution of the task.
/// ///
@ -1140,6 +1143,7 @@ public:
int check_for_ddl_trans( int check_for_ddl_trans(
bool &is_not_barrier, bool &is_not_barrier,
ObSchemaOperationType &op_type) const; ObSchemaOperationType &op_type) const;
ObIAllocator &get_log_entry_task_base_allocator() { return log_entry_task_base_allocator_; };
TO_STRING_KV( TO_STRING_KV(
"state", serve_state_, "state", serve_state_,
@ -1321,6 +1325,7 @@ private:
// trace_id/trace_info/part_trans_info_str_/participant_ // trace_id/trace_info/part_trans_info_str_/participant_
// MutatorRow(DDL)/DdlStmtTask // MutatorRow(DDL)/DdlStmtTask
ObSmallArena allocator_; ObSmallArena allocator_;
ObLfFIFOAllocator log_entry_task_base_allocator_;
private: private:
DISALLOW_COPY_AND_ASSIGN(PartTransTask); DISALLOW_COPY_AND_ASSIGN(PartTransTask);

View File

@ -80,7 +80,7 @@ int ObLogResourceCollector::init(const int64_t thread_num,
ret = OB_INIT_TWICE; ret = OB_INIT_TWICE;
} else if (OB_UNLIKELY(thread_num <= 0) } else if (OB_UNLIKELY(thread_num <= 0)
|| OB_UNLIKELY(thread_num_for_br <= 0) || OB_UNLIKELY(thread_num_for_br <= 0)
|| OB_UNLIKELY(thread_num_for_br >= thread_num) || OB_UNLIKELY(thread_num_for_br + 1 >= thread_num)
|| OB_UNLIKELY(queue_size <= 0) || OB_UNLIKELY(queue_size <= 0)
|| OB_ISNULL(br_pool) || OB_ISNULL(br_pool)
|| OB_ISNULL(trans_ctx_mgr) || OB_ISNULL(trans_ctx_mgr)
@ -398,9 +398,12 @@ int ObLogResourceCollector::push_task_into_queue_(ObLogResourceRecycleTask &task
static uint64_t br_push_seq = 0; static uint64_t br_push_seq = 0;
uint64_t hash_value = 0; uint64_t hash_value = 0;
// thread [0] for LOB_DATA_CLEAN_TASK
// thread [1, br_thread_num] for BR_TASK
// thread [br_thread_num + 1, thread_num] for PART_TRANS_TASK
if (task.is_part_trans_task()) { if (task.is_part_trans_task()) {
hash_value = ATOMIC_FAA(&part_trans_task_push_seq, 1); hash_value = ATOMIC_FAA(&part_trans_task_push_seq, 1);
hash_value = (hash_value % (RCThread::get_thread_num() - br_thread_num_)) + br_thread_num_; hash_value = (hash_value % (RCThread::get_thread_num() - br_thread_num_ - 1)) + br_thread_num_ + 1;
PartTransTask *part_trans_task = static_cast<PartTransTask *>(&task); PartTransTask *part_trans_task = static_cast<PartTransTask *>(&task);
@ -414,8 +417,11 @@ int ObLogResourceCollector::push_task_into_queue_(ObLogResourceRecycleTask &task
(void)ATOMIC_AAF(&br_count_, 1); (void)ATOMIC_AAF(&br_count_, 1);
hash_value = ATOMIC_FAA(&br_push_seq, 1); hash_value = ATOMIC_FAA(&br_push_seq, 1);
hash_value = hash_value % br_thread_num_; hash_value = (hash_value % br_thread_num_) + 1;
} else {} } else {
// LOB_DATA_CLEAN_TASK, use thread 0
// hash_value = 0
}
// push to thread queue, asynchronous recycling // push to thread queue, asynchronous recycling
while (OB_SUCC(ret) && ! RCThread::is_stoped()) { while (OB_SUCC(ret) && ! RCThread::is_stoped()) {

View File

@ -33,6 +33,19 @@ public:
int64_t ctx_id = 0) : int64_t ctx_id = 0) :
arena_(label, page_size, tenant_id, ctx_id), arena_(label, page_size, tenant_id, ctx_id),
lock_() {} lock_() {}
ObCdcSafeArena(
ObIAllocator &base_allocator,
const lib::ObLabel &label = ObModIds::OB_MODULE_PAGE_ALLOCATOR,
int64_t tenant_id = OB_SERVER_TENANT_ID,
const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE,
int64_t ctx_id = 0) :
arena_(base_allocator, page_size)
{
ObMemAttr attr(tenant_id, label, ctx_id);
arena_.set_attr(attr);
}
virtual ~ObCdcSafeArena() {} virtual ~ObCdcSafeArena() {}
virtual void *alloc(const int64_t size) override virtual void *alloc(const int64_t size) override
{ {

View File

@ -67,7 +67,7 @@ class ObLogTransTaskPool
typedef common::QLink PoolElemType; typedef common::QLink PoolElemType;
typedef common::ObFixedQueue<void> PagePool; typedef common::ObFixedQueue<void> PagePool;
static const int64_t LARGE_ALLOCATOR_PAGE_SIZE = (1LL << 22); // 4M static const int64_t LARGE_ALLOCATOR_PAGE_SIZE = OB_MALLOC_BIG_BLOCK_SIZE; // 2M - 17KB
static const int64_t LARGE_ALLOCATOR_TOTAL_LIMIT = (1LL << 37); // 127G static const int64_t LARGE_ALLOCATOR_TOTAL_LIMIT = (1LL << 37); // 127G
static const int64_t LARGE_ALLOCATOR_HOLD_LIMIT = (1LL << 26); // 64M static const int64_t LARGE_ALLOCATOR_HOLD_LIMIT = (1LL << 26); // 64M

View File

@ -131,7 +131,6 @@ int ObCDCTimeZoneInfoGetter::init(
mysql_proxy_ = &mysql_proxy; mysql_proxy_ = &mysql_proxy;
systable_helper_ = &systable_helper; systable_helper_ = &systable_helper;
err_handler_ = &err_handler; err_handler_ = &err_handler;
allocator_.set_nway(NWAY);
inited_ = true; inited_ = true;
LOG_INFO("init timezone info getter succ", LOG_INFO("init timezone info getter succ",

View File

@ -190,7 +190,6 @@ private:
private: private:
static const int64_t TENANT_TZ_INFO_VALUE_SIZE = sizeof(ObCDCTenantTimeZoneInfo); static const int64_t TENANT_TZ_INFO_VALUE_SIZE = sizeof(ObCDCTenantTimeZoneInfo);
static const int NWAY = 4;
static const int MAP_BUCKET_NUM = 4; static const int MAP_BUCKET_NUM = 4;
private: private:
bool inited_; bool inited_;

View File

@ -109,7 +109,7 @@ class ObLogTransCtxMgr : public IObLogTransCtxMgr
}; };
public: public:
static const int64_t BLOCK_SIZE = 1 << 24; static const int64_t BLOCK_SIZE = common::OB_MALLOC_MIDDLE_BLOCK_SIZE; // 64KB - 128
static const int64_t PRINT_STATE_INTERVAL = 10 * 1000 * 1000; static const int64_t PRINT_STATE_INTERVAL = 10 * 1000 * 1000;
typedef ObEasyHazardMap<TenantTransID, TransCtx> TransCtxMap; typedef ObEasyHazardMap<TenantTransID, TransCtx> TransCtxMap;

View File

@ -408,7 +408,7 @@ int ObLogTransRedoDispatcher::dispatch_redo_(PartTransTask &part_trans, DmlRedoL
} }
// alloc an ObLogEntryTask as redo_read_task // alloc an ObLogEntryTask as redo_read_task
int ObLogTransRedoDispatcher::alloc_task_for_redo_(const PartTransTask &part_task, int ObLogTransRedoDispatcher::alloc_task_for_redo_(PartTransTask &part_task,
DmlRedoLogNode &redo_node, DmlRedoLogNode &redo_node,
ObLogEntryTask *&log_entry_task) ObLogEntryTask *&log_entry_task)
{ {
@ -417,7 +417,7 @@ int ObLogTransRedoDispatcher::alloc_task_for_redo_(const PartTransTask &part_tas
if (OB_UNLIKELY(OB_ISNULL(TCTX.log_entry_task_pool_))) { if (OB_UNLIKELY(OB_ISNULL(TCTX.log_entry_task_pool_))) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_ERROR("log_entry_task_pool is null!", KR(ret)); LOG_ERROR("log_entry_task_pool is null!", KR(ret));
} else if (OB_FAIL(TCTX.log_entry_task_pool_->alloc(log_entry_task, (void *)&part_task))) { } else if (OB_FAIL(TCTX.log_entry_task_pool_->alloc(log_entry_task, part_task))) {
LOG_ERROR("log_entry_task_pool_ alloc fail", KR(ret), KPC(log_entry_task), K(part_task)); LOG_ERROR("log_entry_task_pool_ alloc fail", KR(ret), KPC(log_entry_task), K(part_task));
} else if (OB_ISNULL(log_entry_task)) { } else if (OB_ISNULL(log_entry_task)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;

View File

@ -104,7 +104,7 @@ private:
// dispatch redo, part_budget should not be null if dispatch by turn // dispatch redo, part_budget should not be null if dispatch by turn
int dispatch_redo_(PartTransTask &part_trans, DmlRedoLogNode &redo_node, volatile bool &stop_flag); int dispatch_redo_(PartTransTask &part_trans, DmlRedoLogNode &redo_node, volatile bool &stop_flag);
// alloc an ObLogEntryTask as redo_read_task // alloc an ObLogEntryTask as redo_read_task
int alloc_task_for_redo_(const PartTransTask &part_task, int alloc_task_for_redo_(PartTransTask &part_task,
DmlRedoLogNode &redo_node, DmlRedoLogNode &redo_node,
ObLogEntryTask *&log_entry_task); ObLogEntryTask *&log_entry_task);
// push redo_read_task(ObLogEntryTask) to ObLogRedoReader // push redo_read_task(ObLogEntryTask) to ObLogRedoReader