fix parallel mini minor merge
This commit is contained in:
@ -53,8 +53,6 @@ ObPartitionMergePolicy::GetMergeTables ObPartitionMergePolicy::get_merge_tables[
|
||||
ObPartitionMergePolicy::get_medium_merge_tables,
|
||||
};
|
||||
|
||||
const int64_t ObPartitionMergePolicy::OB_MINOR_PARALLEL_SSTABLE_CNT_IN_DAG;
|
||||
const int64_t ObPartitionMergePolicy::OB_MINOR_PARALLEL_SSTABLE_CNT_TRIGGER;
|
||||
|
||||
int ObPartitionMergePolicy::get_neighbour_freeze_info(
|
||||
const int64_t snapshot_version,
|
||||
@ -996,23 +994,97 @@ int add_table_with_check(ObGetMergeTablesResult &result, ObITable *table)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartitionMergePolicy::push_result_with_merge(
|
||||
const int64_t minor_trigger,
|
||||
ObGetMergeTablesResult &input_result,
|
||||
ObIArray<ObGetMergeTablesResult> ¶llel_result)
|
||||
int ObPartitionMergePolicy::generate_input_result_array(
|
||||
const ObGetMergeTablesResult &input_result,
|
||||
ObMinorExecuteRangeMgr &minor_range_mgr,
|
||||
int64_t &fixed_input_table_cnt,
|
||||
ObIArray<ObGetMergeTablesResult> &input_result_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (input_result.handle_.get_count() > minor_trigger) {
|
||||
if (OB_FAIL(parallel_result.push_back(input_result))) {
|
||||
LOG_WARN("failed to push back table result", K(ret), K(input_result));
|
||||
fixed_input_table_cnt = 0;
|
||||
input_result_array.reset();
|
||||
ObGetMergeTablesResult tmp_result;
|
||||
|
||||
if (minor_range_mgr.exe_range_array_.empty()) {
|
||||
if (OB_FAIL(input_result_array.push_back(input_result))) {
|
||||
LOG_WARN("failed to add input result", K(ret), K(input_result));
|
||||
} else {
|
||||
fixed_input_table_cnt += input_result.handle_.get_count();
|
||||
}
|
||||
} else if (!parallel_result.empty()){
|
||||
ObGetMergeTablesResult &last_result = parallel_result.at(parallel_result.count() - 1);
|
||||
if (last_result.scn_range_.end_scn_ == input_result.scn_range_.start_scn_) {
|
||||
for (int i = 0; OB_SUCC(ret) && i < input_result.handle_.get_count(); ++i) {
|
||||
if (OB_FAIL(add_table_with_check(last_result, input_result.handle_.get_table(i)))) {
|
||||
LOG_WARN("failed to add table into result", K(ret), K(input_result), K(i));
|
||||
} else if (OB_FAIL(tmp_result.copy_basic_info(input_result))) {
|
||||
LOG_WARN("failed to copy basic info", K(ret), K(input_result));
|
||||
} else {
|
||||
const ObIArray<ObITable *> &table_array = input_result.handle_.get_tables();
|
||||
ObITable *table = nullptr;
|
||||
for (int64_t idx = 0; OB_SUCC(ret) && idx < table_array.count(); ++idx) {
|
||||
if (OB_ISNULL(table = table_array.at(idx))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table is unexpected null", K(ret), K(idx), K(table_array));
|
||||
} else if (minor_range_mgr.in_execute_range(table)) {
|
||||
if (tmp_result.handle_.get_count() < 2) {
|
||||
} else if (OB_FAIL(input_result_array.push_back(tmp_result))) {
|
||||
LOG_WARN("failed to add tmp result", K(ret), K(tmp_result));
|
||||
} else {
|
||||
fixed_input_table_cnt += tmp_result.handle_.get_count();
|
||||
}
|
||||
tmp_result.handle_.reset();
|
||||
tmp_result.scn_range_.reset();
|
||||
} else if (OB_FAIL(add_table_with_check(tmp_result, table))) {
|
||||
LOG_WARN("failed to add table into result", K(ret), K(tmp_result), KPC(table));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret) || tmp_result.handle_.get_count() < 2) {
|
||||
} else if (OB_FAIL(input_result_array.push_back(tmp_result))) {
|
||||
LOG_WARN("failed to add tmp result", K(ret), K(tmp_result));
|
||||
} else {
|
||||
fixed_input_table_cnt += tmp_result.handle_.get_count();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartitionMergePolicy::split_parallel_minor_range(
|
||||
const int64_t table_count_threshold,
|
||||
const ObGetMergeTablesResult &input_result,
|
||||
ObIArray<ObGetMergeTablesResult> ¶llel_result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t input_table_cnt = input_result.handle_.get_count();
|
||||
ObGetMergeTablesResult tmp_result;
|
||||
if (input_table_cnt < table_count_threshold) {
|
||||
// if there are no running minor dags, then the input_table_cnt must be greater than threshold.
|
||||
} else if (input_table_cnt < OB_MINOR_PARALLEL_SSTABLE_CNT_TRIGGER) {
|
||||
if (OB_FAIL(parallel_result.push_back(input_result))) {
|
||||
LOG_WARN("failed to add input result", K(ret), K(input_result));
|
||||
}
|
||||
} else if (OB_FAIL(tmp_result.copy_basic_info(input_result))) {
|
||||
LOG_WARN("failed to copy basic info", K(ret), K(input_result));
|
||||
} else {
|
||||
const int64_t parallel_dag_cnt = MAX(1, input_table_cnt / OB_MINOR_PARALLEL_SSTABLE_CNT_IN_DAG);
|
||||
const int64_t parallel_table_cnt = input_table_cnt / parallel_dag_cnt;
|
||||
const ObIArray<ObITable *> &table_array = input_result.handle_.get_tables();
|
||||
ObITable *table = nullptr;
|
||||
|
||||
int64_t start = 0;
|
||||
int64_t end = 0;
|
||||
for (int64_t seq = 0; OB_SUCC(ret) && seq < parallel_dag_cnt; ++seq) {
|
||||
start = parallel_table_cnt * seq;
|
||||
end = (parallel_dag_cnt - 1 == seq) ? table_array.count() : end + parallel_table_cnt;
|
||||
for (int64_t i = start; OB_SUCC(ret) && i < end; ++i) {
|
||||
if (OB_ISNULL(table = table_array.at(i))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table is unexpected null", K(ret), K(i), K(table_array));
|
||||
} else if (OB_FAIL(add_table_with_check(tmp_result, table))) {
|
||||
LOG_WARN("failed to add table into result", K(ret), K(tmp_result), KPC(table));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(parallel_result.push_back(tmp_result))) {
|
||||
LOG_WARN("failed to add tmp result", K(ret), K(tmp_result));
|
||||
} else {
|
||||
tmp_result.handle_.reset();
|
||||
tmp_result.scn_range_.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1020,64 +1092,39 @@ int ObPartitionMergePolicy::push_result_with_merge(
|
||||
}
|
||||
|
||||
int ObPartitionMergePolicy::generate_parallel_minor_interval(
|
||||
const int64_t minor_compact_trigger,
|
||||
const ObGetMergeTablesResult &input_result,
|
||||
ObMinorExecuteRangeMgr &minor_range_mgr,
|
||||
ObIArray<ObGetMergeTablesResult> ¶llel_result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t minor_compact_trigger = DEFAULT_MINOR_COMPACT_TRIGGER;
|
||||
{
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
|
||||
if (tenant_config.is_valid()) {
|
||||
minor_compact_trigger = tenant_config->minor_compact_trigger;
|
||||
}
|
||||
}
|
||||
const bool check_in_range = minor_range_mgr.exe_range_array_.count() > 0;
|
||||
minor_compact_trigger = check_in_range ? minor_compact_trigger << minor_range_mgr.exe_range_array_.count() : minor_compact_trigger;
|
||||
parallel_result.reset();
|
||||
ObSEArray<ObGetMergeTablesResult, 2> input_result_array;
|
||||
int64_t fixed_input_table_cnt = 0;
|
||||
|
||||
if (!storage::is_minor_merge(input_result.suggest_merge_type_)) {
|
||||
} else if (input_result.handle_.get_count() <= minor_compact_trigger) {
|
||||
// do nothing
|
||||
} else if (!check_in_range
|
||||
&& input_result.handle_.get_count() < OB_MINOR_PARALLEL_SSTABLE_CNT_TRIGGER) {
|
||||
if (OB_FAIL(parallel_result.push_back(input_result))) {
|
||||
LOG_WARN("failed to push back result", K(ret), K(input_result));
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (input_result.handle_.get_count() < minor_compact_trigger) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (OB_FAIL(generate_input_result_array(input_result, minor_range_mgr, fixed_input_table_cnt, input_result_array))) {
|
||||
LOG_WARN("failed to generate input result into array", K(ret), K(input_result));
|
||||
} else if (fixed_input_table_cnt < minor_compact_trigger) {
|
||||
// the quantity of table that should be merged is smaller than trigger, wait for the existing minor tasks to finish.
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
}
|
||||
|
||||
/*
|
||||
* When existing minor dag, we should ensure that the quantity of tables per parallel dag is a reasonable value:
|
||||
* 1. If compact_trigger is small, minor merge should be easier to schedule, we should lower the threshold;
|
||||
* 2. If compact_trigger is big, we should upper the threshold to prevent the creation of dag frequently.
|
||||
*/
|
||||
int64_t table_count_threshold = minor_range_mgr.exe_range_array_.empty()
|
||||
? minor_compact_trigger
|
||||
: MIN(OB_MINOR_PARALLEL_SSTABLE_CNT_IN_DAG / 2, minor_compact_trigger * 2);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < input_result_array.count(); ++i) {
|
||||
if (OB_FAIL(split_parallel_minor_range(table_count_threshold, input_result_array.at(i), parallel_result))) {
|
||||
LOG_WARN("failed to split parallel minor range", K(ret), K(input_result_array.at(i)));
|
||||
}
|
||||
} else {
|
||||
minor_compact_trigger = MAX(minor_compact_trigger, OB_MINOR_PARALLEL_SSTABLE_CNT_IN_DAG / 2);
|
||||
const int64_t table_cnt = input_result.handle_.get_count();
|
||||
const ObIArray<ObITable *> &table_array = input_result.handle_.get_tables();
|
||||
ObITable *table = nullptr;
|
||||
ObGetMergeTablesResult tmp_result;
|
||||
if (OB_FAIL(tmp_result.copy_basic_info(input_result))) {
|
||||
LOG_WARN("failed to copy basic info", K(ret), K(input_result));
|
||||
}
|
||||
int64_t idx = 0;
|
||||
bool split_minor = input_result.handle_.get_count() >= OB_MINOR_PARALLEL_SSTABLE_CNT_TRIGGER;
|
||||
while (OB_SUCC(ret) && idx < table_cnt) {
|
||||
tmp_result.handle_.reset();
|
||||
tmp_result.scn_range_.reset();
|
||||
while (OB_SUCC(ret) && idx < table_cnt) {
|
||||
if (OB_ISNULL(table = table_array.at(idx++))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table is unexpected null", K(ret), K(idx), K(table_array));
|
||||
} else if (check_in_range && minor_range_mgr.in_execute_range(table)) {
|
||||
if (tmp_result.handle_.get_count() > 0) {
|
||||
break;
|
||||
}
|
||||
} else if (OB_FAIL(add_table_with_check(tmp_result, table))) {
|
||||
LOG_WARN("failed to add table into result", K(ret), K(tmp_result), KPC(table));
|
||||
} else if (split_minor && tmp_result.handle_.get_count() >= OB_MINOR_PARALLEL_SSTABLE_CNT_IN_DAG) {
|
||||
break;
|
||||
}
|
||||
} // end of while
|
||||
if (OB_FAIL(ret) || tmp_result.handle_.empty()) {
|
||||
} else if (OB_FAIL(push_result_with_merge(minor_compact_trigger, tmp_result, parallel_result))) {
|
||||
LOG_WARN("failed to merge result", K(ret), K(parallel_result));
|
||||
} else {
|
||||
LOG_DEBUG("success to push result", K(ret), K(tmp_result), K(parallel_result));
|
||||
}
|
||||
} // end of while
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -78,6 +78,7 @@ public:
|
||||
bool &can_merge,
|
||||
bool &need_force_freeze);
|
||||
static int generate_parallel_minor_interval(
|
||||
const int64_t minor_compact_trigger,
|
||||
const ObGetMergeTablesResult &input_result,
|
||||
ObMinorExecuteRangeMgr &minor_range_mgr,
|
||||
ObIArray<ObGetMergeTablesResult> ¶llel_result);
|
||||
@ -141,15 +142,21 @@ private:
|
||||
storage::ObTenantFreezeInfoMgr::NeighbourFreezeInfo &freeze_info);
|
||||
|
||||
static int64_t cal_hist_minor_merge_threshold();
|
||||
static int generate_input_result_array(
|
||||
const ObGetMergeTablesResult &input_result,
|
||||
ObMinorExecuteRangeMgr &minor_range_mgr,
|
||||
int64_t &fixed_input_table_cnt,
|
||||
common::ObIArray<ObGetMergeTablesResult> &input_result_array);
|
||||
|
||||
static int split_parallel_minor_range(
|
||||
const int64_t table_count_threshold,
|
||||
const ObGetMergeTablesResult &input_result,
|
||||
common::ObIArray<ObGetMergeTablesResult> ¶llel_result);
|
||||
|
||||
static int deal_hist_minor_merge(
|
||||
const ObTablet &tablet,
|
||||
int64_t &max_snapshot_version);
|
||||
|
||||
static int push_result_with_merge(
|
||||
const int64_t minor_trigger,
|
||||
ObGetMergeTablesResult &input_result,
|
||||
ObIArray<ObGetMergeTablesResult> ¶llel_result);
|
||||
// diagnose part
|
||||
static int diagnose_minor_dag(
|
||||
storage::ObMergeType merge_type,
|
||||
|
||||
@ -708,12 +708,28 @@ int ObTenantTabletScheduler::schedule_tablet_minor_merge(
|
||||
LOG_WARN("failed to check need merge", K(ret), "merge_type", MERGE_TYPES[i], K(tablet_handle));
|
||||
}
|
||||
} else {
|
||||
int64_t minor_compact_trigger = ObPartitionMergePolicy::DEFAULT_MINOR_COMPACT_TRIGGER;
|
||||
{
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
|
||||
if (tenant_config.is_valid()) {
|
||||
minor_compact_trigger = tenant_config->minor_compact_trigger;
|
||||
}
|
||||
}
|
||||
|
||||
ObMinorExecuteRangeMgr minor_range_mgr;
|
||||
MinorParallelResultArray parallel_results;
|
||||
if (OB_FAIL(minor_range_mgr.get_merge_ranges(ls_id, tablet_id))) {
|
||||
LOG_WARN("failed to get merge range", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_FAIL(ObPartitionMergePolicy::generate_parallel_minor_interval(result, minor_range_mgr, parallel_results))) {
|
||||
LOG_WARN("failed to generate parallel minor dag", K(ret), K(result));
|
||||
} else if (OB_FAIL(ObPartitionMergePolicy::generate_parallel_minor_interval(minor_compact_trigger, result, minor_range_mgr, parallel_results))) {
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to generate parallel minor dag", K(ret), K(result));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
LOG_DEBUG("tablet no need merge", K(ret), "merge_type", MERGE_TYPES[i], K(ls_id), K(tablet_id), K(result));
|
||||
}
|
||||
} else if (parallel_results.empty()) {
|
||||
LOG_DEBUG("parallel results is empty, cannot schedule parallel minor merge", K(ls_id), K(tablet_id),
|
||||
K(result), K(minor_range_mgr.exe_range_array_));
|
||||
} else {
|
||||
ObTabletMergeDagParam dag_param(MERGE_TYPES[i], ls_id, tablet_id);
|
||||
for (int k = 0; OB_SUCC(ret) && k < parallel_results.count(); ++k) {
|
||||
|
||||
Reference in New Issue
Block a user