add compaction dag limit & batch size

This commit is contained in:
yangqise7en
2023-09-12 08:40:21 +00:00
committed by ob-robot
parent f990b986b5
commit 7b4f5eeffe
13 changed files with 63 additions and 101 deletions

View File

@ -848,6 +848,12 @@ DEF_BOOL(_enable_parallel_minor_merge, OB_TENANT_PARAMETER, "True",
DEF_BOOL(_enable_adaptive_compaction, OB_TENANT_PARAMETER, "True",
"specifies whether allow adaptive compaction schedule and information collection",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(compaction_dag_cnt_limit, OB_TENANT_PARAMETER, "15000", "[10000,200000]",
"the compaction dag count limit. Range: [10000,200000] in integer. default value is 15000",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(compaction_schedule_tablet_batch_cnt, OB_TENANT_PARAMETER, "50000", "[10000,200000]",
"the batch size when scheduling tablet to execute compaction task. Range: [10000,200000] in integer. default value is 50000",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(compaction_low_thread_score, OB_TENANT_PARAMETER, "0", "[0,100]",
"the current work thread score of low priority compaction. Range: [0,100] in integer. Especially, 0 means default value",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

View File

@ -1574,6 +1574,7 @@ void ObTenantDagScheduler::reload_config()
set_thread_score(ObDagPrio::DAG_PRIO_HA_LOW, tenant_config->ha_low_thread_score);
set_thread_score(ObDagPrio::DAG_PRIO_DDL, tenant_config->ddl_thread_score);
set_thread_score(ObDagPrio::DAG_PRIO_TTL, tenant_config->ttl_thread_score);
compaction_dag_limit_ = tenant_config->compaction_dag_cnt_limit;
}
}
@ -1626,6 +1627,7 @@ int ObTenantDagScheduler::init(
get_default_config();
dag_limit_ = dag_limit;
compaction_dag_limit_ = dag_limit;
if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::DagScheduler, tg_id_))) {
COMMON_LOG(WARN, "TG create dag scheduler failed", K(ret));
}
@ -1672,6 +1674,7 @@ void ObTenantDagScheduler::reset()
WEAK_BARRIER();
int tmp_ret = OB_SUCCESS;
int64_t abort_dag_cnt = 0;
dump_dag_status();
for (int64_t j = 0; j < DAG_LIST_MAX; ++j) {
for (int64_t i = 0; i < PriorityDagList::PRIO_CNT; ++i) {
ObIDag *head = dag_list_[j].get_head(i);
@ -1680,6 +1683,7 @@ void ObTenantDagScheduler::reset()
ObIDagNet *tmp_dag_net = nullptr;
while (NULL != cur_dag && head != cur_dag) {
next = cur_dag->get_next();
FLOG_INFO("destroy dag", "dag_list", j, "prio", i, KPC(cur_dag));
if (cur_dag->get_dag_id().is_valid()
&& OB_TMP_FAIL(ObSysTaskStatMgr::get_instance().del_task(cur_dag->get_dag_id()))) {
if (OB_ENTRY_NOT_EXIST != tmp_ret) {
@ -3364,6 +3368,17 @@ int ObTenantDagScheduler::check_dag_net_exist(
return ret;
}
OB_INLINE int64_t ObTenantDagScheduler::get_dag_limit(const ObDagPrio::ObDagPrioEnum dag_prio)
{
int64_t dag_limit = dag_limit_;
if (ObDagPrio::DAG_PRIO_COMPACTION_HIGH == dag_prio
|| ObDagPrio::DAG_PRIO_COMPACTION_MID == dag_prio
|| ObDagPrio::DAG_PRIO_COMPACTION_LOW == dag_prio) {
dag_limit = compaction_dag_limit_;
}
return dag_limit;
}
int ObTenantDagScheduler::inner_add_dag(
const bool emergency,
const bool check_size_overflow,
@ -3377,7 +3392,7 @@ int ObTenantDagScheduler::inner_add_dag(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("inner dag dag get invalid argument", K(ret), KPC(dag));
} else {
if (check_size_overflow && dag_cnts_[dag->get_type()] >= dag_limit_) {
if (check_size_overflow && dag_cnts_[dag->get_type()] >= get_dag_limit((ObDagPrio::ObDagPrioEnum)dag->get_priority())) {
ret = OB_SIZE_OVERFLOW;
COMMON_LOG(WARN, "ObTenantDagScheduler is full", K(ret), K_(dag_limit), KPC(dag));
} else if (OB_FAIL(add_dag_into_list_and_map_(

View File

@ -936,6 +936,7 @@ private:
int generate_next_dag_(ObIDag *dag);
int try_move_child_to_ready_list(ObIDag &dag);
void inner_free_dag(ObIDag &dag);
OB_INLINE int64_t get_dag_limit(const ObDagPrio::ObDagPrioEnum dag_prio);
private:
bool is_inited_;
@ -947,6 +948,7 @@ private:
lib::ObMutex dag_net_map_lock_;
int64_t dag_cnt_;
int64_t dag_limit_;
int64_t compaction_dag_limit_;
int64_t check_period_;
int64_t loop_waiting_dag_list_period_;
int64_t total_worker_cnt_;

View File

@ -555,10 +555,10 @@ int64_t ObMediumCompactionInfo::to_string(char* buf, const int64_t buf_len) cons
if (OB_ISNULL(buf) || buf_len <= 0) {
} else {
J_OBJ_START();
J_KV(K_(cluster_id), K_(medium_compat_version), K_(data_version),
"compaction_type", ObMediumCompactionInfo::get_compaction_type_str((ObCompactionType)compaction_type_),
J_KV("compaction_type", ObMediumCompactionInfo::get_compaction_type_str((ObCompactionType)compaction_type_),
"medium_merge_reason", ObAdaptiveMergePolicy::merge_reason_to_str(medium_merge_reason_),
K_(is_schema_changed), K_(tenant_id), K_(cluster_id), K_(medium_snapshot), K_(last_medium_snapshot), K_(storage_schema),
K_(medium_snapshot), K_(last_medium_snapshot), K_(tenant_id), K_(cluster_id),
K_(medium_compat_version), K_(data_version), K_(is_schema_changed), K_(storage_schema),
K_(contain_parallel_range), K_(parallel_merge_info));
J_OBJ_END();
}

View File

@ -187,8 +187,10 @@ void ObTabletMediumCompactionInfoRecorder::free_allocated_info()
{
if (OB_NOT_NULL(allocator_)) {
if (OB_NOT_NULL(logcb_ptr_)) {
logcb_ptr_->~ObStorageCLogCb();
tablet_handle_ptr_->reset();
tablet_handle_ptr_->~ObTabletHandle();
mds_ctx_->~MdsCtx();
allocator_->free(logcb_ptr_);
logcb_ptr_ = nullptr;
tablet_handle_ptr_ = nullptr;

View File

@ -257,12 +257,14 @@ int ObTenantTabletScheduler::init()
{
int ret = OB_SUCCESS;
int64_t schedule_interval = DEFAULT_COMPACTION_SCHEDULE_INTERVAL;
int64_t schedule_batch_size = DEFAULT_COMPACTION_SCHEDULE_BATCH_SIZE;
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
schedule_interval = tenant_config->ob_compaction_schedule_interval;
enable_adaptive_compaction_ = tenant_config->_enable_adaptive_compaction;
fast_freeze_checker_.reload_config(tenant_config->_ob_enable_fast_freeze);
schedule_batch_size = tenant_config->compaction_schedule_tablet_batch_cnt;
}
} // end of ObTenantConfigGuard
if (IS_INIT) {
@ -285,6 +287,7 @@ int ObTenantTabletScheduler::init()
LOG_WARN("Fail to create prohibit medium ls id map", K(ret));
} else {
schedule_interval_ = schedule_interval;
schedule_tablet_batch_size_ = schedule_batch_size;
is_inited_ = true;
}
@ -330,12 +333,16 @@ int ObTenantTabletScheduler::reload_tenant_config()
{
int ret = OB_SUCCESS;
int64_t merge_schedule_interval = DEFAULT_COMPACTION_SCHEDULE_INTERVAL;
int64_t schedule_batch_size = DEFAULT_COMPACTION_SCHEDULE_BATCH_SIZE;
bool tenant_config_valid = false;
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
tenant_config_valid = true;
merge_schedule_interval = tenant_config->ob_compaction_schedule_interval;
enable_adaptive_compaction_ = tenant_config->_enable_adaptive_compaction;
fast_freeze_checker_.reload_config(tenant_config->_ob_enable_fast_freeze);
schedule_batch_size = tenant_config->compaction_schedule_tablet_batch_cnt;
}
} // end of ObTenantConfigGuard
if (IS_NOT_INIT) {
@ -350,9 +357,13 @@ int ObTenantTabletScheduler::reload_tenant_config()
LOG_WARN("failed to reload new merge schedule interval", K(merge_schedule_interval));
} else {
schedule_interval_ = merge_schedule_interval;
LOG_INFO("succeeded to reload new merge schedule interval", K(merge_schedule_interval));
LOG_INFO("succeeded to reload new merge schedule interval", K(merge_schedule_interval), K(tenant_config_valid));
}
}
if (OB_SUCC(ret) && schedule_tablet_batch_size_ != schedule_batch_size) {
schedule_tablet_batch_size_ = schedule_batch_size;
LOG_INFO("succeeded to reload new merge schedule tablet batch cnt", K(schedule_tablet_batch_size_), K(tenant_config_valid));
}
return ret;
}
@ -479,7 +490,7 @@ int ObTenantTabletScheduler::schedule_all_tablets_minor()
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("The ObTenantTabletScheduler has not been inited", K(ret));
} else if (OB_FAIL(minor_ls_tablet_iter_.build_iter())) {
} else if (OB_FAIL(minor_ls_tablet_iter_.build_iter(schedule_tablet_batch_size_))) {
LOG_WARN("failed to init iterator", K(ret));
} else {
LOG_INFO("start schedule all tablet minor merge", K(minor_ls_tablet_iter_));
@ -1431,7 +1442,7 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
LOG_WARN("failed to add suspect info", K(tmp_ret));
}
}
} else if (OB_FAIL(medium_ls_tablet_iter_.build_iter())) {
} else if (OB_FAIL(medium_ls_tablet_iter_.build_iter(schedule_tablet_batch_size_))) {
LOG_WARN("failed to init iterator", K(ret));
} else {
bool all_ls_weak_read_ts_ready = true;
@ -1618,10 +1629,13 @@ int ObTenantTabletScheduler::update_report_scn_as_ls_leader(ObLS &ls)
}
// ------------------- ObCompactionScheduleIterator -------------------- //
int ObCompactionScheduleIterator::build_iter()
int ObCompactionScheduleIterator::build_iter(const int64_t batch_tablet_cnt)
{
int ret = OB_SUCCESS;
if (!is_valid()) {
if (OB_UNLIKELY(batch_tablet_cnt <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(batch_tablet_cnt));
} else if (!is_valid()) {
ls_ids_.reuse();
if (OB_FAIL(MTL(ObLSService *)->get_ls_ids(ls_ids_))) {
LOG_WARN("failed to get all ls id", K(ret));
@ -1646,6 +1660,9 @@ int ObCompactionScheduleIterator::build_iter()
} else { // iter is invalid, no need to build, just set var to start cur batch
(void) start_cur_batch();
}
if (OB_SUCC(ret)) {
max_batch_tablet_cnt_ = batch_tablet_cnt;
}
return ret;
}

View File

@ -61,8 +61,7 @@ class ObCompactionScheduleIterator
public:
ObCompactionScheduleIterator(
const bool is_major,
ObLSGetMod mod = ObLSGetMod::STORAGE_MOD,
const int64_t batch_tablet_cnt = SCHEDULE_TABLET_BATCH_CNT)
ObLSGetMod mod = ObLSGetMod::STORAGE_MOD)
: mod_(mod),
is_major_(is_major),
scan_finish_(false),
@ -72,7 +71,7 @@ public:
ls_idx_(-1),
tablet_idx_(0),
schedule_tablet_cnt_(0),
max_batch_tablet_cnt_(batch_tablet_cnt),
max_batch_tablet_cnt_(SCHEDULE_TABLET_BATCH_CNT),
ls_tablet_svr_(nullptr),
ls_ids_(),
tablet_ids_()
@ -81,7 +80,7 @@ public:
tablet_ids_.set_attr(ObMemAttr(MTL_ID(), "CompIter"));
}
~ObCompactionScheduleIterator() { reset(); }
int build_iter();
int build_iter(const int64_t batch_tablet_cnt);
int get_next_ls(ObLSHandle &ls_handle);
int get_next_tablet(ObTabletHandle &tablet_handle);
bool is_scan_finish() const { return scan_finish_; }
@ -365,6 +364,7 @@ private:
static const int64_t SSTABLE_GC_INTERVAL = 30 * 1000 * 1000L; // 30s
static const int64_t INFO_POOL_RESIZE_INTERVAL = 30 * 1000 * 1000L; // 30s
static const int64_t DEFAULT_COMPACTION_SCHEDULE_INTERVAL = 30 * 1000 * 1000L; // 30s
static const int64_t DEFAULT_COMPACTION_SCHEDULE_BATCH_SIZE = 50 * 1000L; // 5w
static const int64_t ADD_LOOP_EVENT_INTERVAL = 120 * 1000 * 1000L; // 120s
static const int64_t WAIT_MEDIUM_CHECK_THRESHOLD = 10 * 60 * 1000 * 1000 * 1000L; // 10m // ns
static const int64_t PRINT_LOG_INVERVAL = 2 * 60 * 1000 * 1000L; // 2m
@ -379,6 +379,7 @@ private:
int sstable_gc_tg_id_; // thread
int info_pool_resize_tg_id_; // thread
int64_t schedule_interval_;
int64_t schedule_tablet_batch_size_;
common::ObDedupQueue bf_queue_;
mutable obsys::ObRWLock frozen_version_lock_;

View File

@ -974,12 +974,6 @@ int ObTabletTableBackfillTXTask::update_merge_sstable_()
is_major_merge_type(tablet_merge_ctx_.param_.merge_type_),
tablet_merge_ctx_.merged_sstable_.get_end_scn());
if (ObMergeType::MINI_MERGE == tablet_merge_ctx_.param_.merge_type_) {
if (OB_FAIL(read_msd_from_memtable_(param))) {
LOG_WARN("failed to read msd from memtable", K(ret), K(tablet_merge_ctx_));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ls->update_tablet_table_store(
tablet_id_, param, new_tablet_handle))) {
@ -993,65 +987,6 @@ int ObTabletTableBackfillTXTask::update_merge_sstable_()
return ret;
}
int ObTabletTableBackfillTXTask::read_msd_from_memtable_(ObUpdateTableStoreParam &param)
{
int ret = OB_SUCCESS;
if (OB_FAIL(traverse_frozen_memtable_(memtable::MultiSourceDataUnitType::TABLET_TX_DATA, &param.tx_data_))) {
LOG_WARN("failed to read tx data from memtable", K(ret));
} else if (OB_FAIL(traverse_frozen_memtable_(memtable::MultiSourceDataUnitType::TABLET_BINDING_INFO, &param.binding_info_))) {
LOG_WARN("failed to read binding info from memtable", K(ret));
} else if (OB_FAIL(traverse_frozen_memtable_(memtable::MultiSourceDataUnitType::TABLET_SEQ, &param.autoinc_seq_))) {
LOG_WARN("failed to read tablet seq from memtable", K(ret));
} else {
LOG_INFO("succeeded to read msd from memtable", K(ret),
"ls_id", tablet_merge_ctx_.param_.ls_id_,
"tablet_id", tablet_merge_ctx_.param_.tablet_id_,
"tx_data", param.tx_data_,
"binding_info", param.binding_info_,
"auto_inc_seq", param.autoinc_seq_);
}
return ret;
}
int ObTabletTableBackfillTXTask::traverse_frozen_memtable_(
const memtable::MultiSourceDataUnitType &type,
memtable::ObIMultiSourceDataUnit *msd)
{
int ret = OB_SUCCESS;
ObITable *table = nullptr;
memtable::ObMemtable *memtable = nullptr;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("tablet table backfill tx task do not init", K(ret));
} else if (OB_ISNULL(msd)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret));
} else if (!table_handle_.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet handle is invalid", K(ret), K(tablet_merge_ctx_), K(table_handle_));
} else if (OB_ISNULL(table = table_handle_.get_table())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table should not be NULL or table type is unexpected", K(ret), K(tablet_merge_ctx_));
} else if (table->is_data_memtable()) {
// TODO(lixia) delete this code
// memtable = static_cast<memtable::ObMemtable*>(table);
// if (!memtable->is_frozen_memtable()) {
// ret = OB_ERR_UNEXPECTED;
// LOG_WARN("memtable should be frozen memtable", K(ret), KPC(memtable));
// } else if (memtable->has_multi_source_data_unit(type)) {
// if (OB_FAIL(memtable->get_multi_source_data_unit(msd, nullptr/*allocator*/))) {
// LOG_WARN("failed to get msd from memtable", K(ret), K(type));
// }
// } else {
// LOG_INFO("memtable do not has multi source data unit", KPC(memtable), K(type));
// }
}
return ret;
}
/******************ObFinishTabletBackfillTXTask*********************/
ObFinishTabletBackfillTXTask::ObFinishTabletBackfillTXTask()
: ObITask(TASK_TYPE_MIGRATE_PREPARE),

View File

@ -149,10 +149,6 @@ private:
int do_backfill_tx_();
int prepare_partition_merge_();
int update_merge_sstable_();
int read_msd_from_memtable_(ObUpdateTableStoreParam &param);
int traverse_frozen_memtable_(
const memtable::MultiSourceDataUnitType &type,
memtable::ObIMultiSourceDataUnit *msd);
private:
bool is_inited_;

View File

@ -352,9 +352,6 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
allow_duplicate_sstable_(false),
need_check_transfer_seq_(false),
transfer_seq_(-1),
tx_data_(),
binding_info_(),
autoinc_seq_(),
merge_type_(MERGE_TYPE_MAX)
{
clog_checkpoint_scn_.set_min();
@ -386,9 +383,6 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
allow_duplicate_sstable_(allow_duplicate_sstable),
need_check_transfer_seq_(need_check_transfer_seq),
transfer_seq_(transfer_seq),
tx_data_(),
binding_info_(),
autoinc_seq_(),
merge_type_(merge_type)
{
clog_checkpoint_scn_ = clog_checkpoint_scn;
@ -416,9 +410,6 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
allow_duplicate_sstable_(false),
need_check_transfer_seq_(false),
transfer_seq_(-1),
tx_data_(),
binding_info_(),
autoinc_seq_(),
merge_type_(merge_type)
{
clog_checkpoint_scn_.set_min();

View File

@ -374,7 +374,7 @@ struct ObUpdateTableStoreParam
TO_STRING_KV(KP_(sstable), K_(snapshot_version), K_(clog_checkpoint_scn), K_(multi_version_start),
K_(need_report), KPC_(storage_schema), K_(rebuild_seq), K_(update_with_major_flag),
K_(need_check_sstable), K_(ddl_info), K_(allow_duplicate_sstable),
K_(tx_data), K_(binding_info), K_(autoinc_seq), "merge_type", merge_type_to_str(merge_type_),
"merge_type", merge_type_to_str(merge_type_),
K_(need_check_transfer_seq), K_(transfer_seq));
const blocksstable::ObSSTable *sstable_;
@ -390,11 +390,6 @@ struct ObUpdateTableStoreParam
bool allow_duplicate_sstable_;
bool need_check_transfer_seq_;
int64_t transfer_seq_;
// msd
ObTabletTxMultiSourceDataUnit tx_data_;
ObTabletBindingInfo binding_info_;
share::ObTabletAutoincSeq autoinc_seq_;
ObMergeType merge_type_; // set merge_type only when update tablet in compaction
};

View File

@ -39,9 +39,11 @@ cache_wash_threshold
clog_sync_time_warn_threshold
cluster
cluster_id
compaction_dag_cnt_limit
compaction_high_thread_score
compaction_low_thread_score
compaction_mid_thread_score
compaction_schedule_tablet_batch_cnt
compatible
config_additional_dir
connection_control_failed_connections_threshold

View File

@ -31,11 +31,10 @@ namespace unittest
class MockObCompactionScheduleIterator : public storage::ObCompactionScheduleIterator
{
public:
MockObCompactionScheduleIterator(const int64_t max_batch_tablet_cnt)
MockObCompactionScheduleIterator()
: ObCompactionScheduleIterator(
true/*is_major, no meaning*/,
ObLSGetMod::STORAGE_MOD,
max_batch_tablet_cnt),
ObLSGetMod::STORAGE_MOD),
mock_tablet_id_cnt_(0),
error_tablet_idx_(-1),
errno_(OB_SUCCESS)
@ -89,11 +88,12 @@ void TestCompactionIter::test_iter(
const int64_t error_tablet_idx,
const int input_errno)
{
MockObCompactionScheduleIterator iter(max_batch_tablet_cnt);
MockObCompactionScheduleIterator iter;
iter.prepare_ls_id_array(ls_cnt);
iter.mock_tablet_id_cnt_ = tablet_cnt_per_ls;
iter.error_tablet_idx_ = error_tablet_idx;
iter.errno_ = input_errno;
iter.max_batch_tablet_cnt_ = max_batch_tablet_cnt;
int ret = OB_SUCCESS;
int iter_cnt = 0;