fix meta merge && optimize meta merge scheduling

This commit is contained in:
Fengjingkun
2023-11-10 03:44:41 +00:00
committed by ob-robot
parent df342f05ff
commit 751a0c5876
24 changed files with 421 additions and 323 deletions

View File

@ -1380,72 +1380,65 @@ int ObAdaptiveMergePolicy::get_meta_merge_tables(
const ObMergeType merge_type = param.merge_type_;
result.reset();
if (OB_UNLIKELY(META_MAJOR_MERGE != merge_type)) {
if (OB_UNLIKELY(META_MAJOR_MERGE != merge_type && MEDIUM_MERGE != merge_type)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), "merge_type", merge_type_to_str(merge_type));
} else if (OB_FAIL(find_meta_major_tables(tablet, result))) {
} else if (OB_FAIL(find_adaptive_merge_tables(merge_type, tablet, result))) {
if (OB_NO_NEED_MERGE != ret) {
LOG_WARN("Failed to find minor merge tables", K(ret));
}
} else if (OB_FAIL(result.handle_.check_continues(nullptr))) {
LOG_WARN("failed to check continues", K(ret), K(result));
} else if (FALSE_IT(result.version_range_.snapshot_version_ =
MIN(tablet.get_snapshot_version(), result.version_range_.snapshot_version_))) {
// chosen version should less than tablet::snapshot
} else if (OB_FAIL(ObPartitionMergePolicy::get_multi_version_start(
param.merge_type_, ls, tablet, result.version_range_, result.snapshot_info_))) {
LOG_WARN("failed to get multi version_start", K(ret));
} else {
FLOG_INFO("succeed to get meta major merge tables", K(result), K(tablet));
} else if (MEDIUM_MERGE == merge_type) {
result.version_range_.snapshot_version_ = MIN(tablet.get_snapshot_version(), result.version_range_.snapshot_version_);
if (OB_FAIL(ObPartitionMergePolicy::get_multi_version_start(
merge_type, ls, tablet, result.version_range_, result.snapshot_info_))) {
LOG_WARN("failed to get multi version_start", K(ret));
}
}
if (OB_SUCC(ret)) {
FLOG_INFO("succeed to get meta major merge tables", K(merge_type), K(result), K(tablet));
}
return ret;
}
int ObAdaptiveMergePolicy::find_meta_major_tables(
int ObAdaptiveMergePolicy::find_adaptive_merge_tables(
const ObMergeType &merge_type,
const storage::ObTablet &tablet,
ObGetMergeTablesResult &result)
{
int ret = OB_SUCCESS;
int64_t min_snapshot = 0;
int64_t max_snapshot = 0;
int64_t base_row_cnt = 0;
int64_t inc_row_cnt = 0;
int64_t tx_determ_table_cnt = 0;
ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
const ObTabletTableStore *table_store = nullptr;
ObSSTable *base_table = nullptr;
if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) {
LOG_WARN("fail to fetch table store", K(ret));
} else if (OB_UNLIKELY(NULL == (table_store = table_store_wrapper.get_member()) || !table_store->is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ObTabletTableStore is not valid", K(ret), K(table_store_wrapper));
} else if (table_store->get_minor_sstables().empty() || table_store->get_major_sstables().empty()) {
ret = OB_NO_NEED_MERGE;
LOG_DEBUG("no minor/major sstable to do meta major merge", K(ret), KPC(table_store));
} else if (OB_ISNULL(base_table = nullptr == table_store->get_meta_major_sstable()
? static_cast<ObSSTable*>(table_store->get_major_sstables().get_boundary_table(true/*last*/))
: table_store->get_meta_major_sstable())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null base table", K(ret), KPC(table_store), K(tablet));
} else if (OB_FAIL(ObPartitionMergePolicy::get_boundary_snapshot_version(tablet, min_snapshot, max_snapshot))) {
LOG_WARN("failed to get boundary snapshot version", K(ret), KPC(base_table), K(min_snapshot), K(max_snapshot));
} else if (base_table->get_snapshot_version() < min_snapshot || max_snapshot != INT64_MAX /*exist next freeze info*/) {
ret = OB_NO_NEED_MERGE;
LOG_DEBUG("no need meta merge when the tablet is doing major merge", K(ret), K(min_snapshot), K(max_snapshot), KPC(base_table));
} else if (OB_FAIL(add_meta_merge_result(base_table, table_store_wrapper.get_meta_handle(), result, true))) {
LOG_WARN("failed to add base table to meta merge result", K(ret), KPC(base_table), K(result));
} else {
ObITable *last_major = table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true);
ObITable *base_table = table_store_wrapper.get_member()->get_meta_major_sstable();
const ObSSTableArray &minor_tables = table_store_wrapper.get_member()->get_minor_sstables();
if (minor_tables.empty() || nullptr == last_major) {
ret = OB_NO_NEED_MERGE;
LOG_DEBUG("no minor/major sstable to do meta major merge", K(ret), K(minor_tables), KPC(last_major));
} else if (FALSE_IT(base_table = nullptr == table_store->get_meta_major_sstable()
? last_major
: table_store->get_meta_major_sstable())) {
} else if (OB_FAIL(ObPartitionMergePolicy::get_boundary_snapshot_version(
tablet, min_snapshot, max_snapshot, false/*check_table_cnt*/, false/*is_multi_version_merge*/))) {
if (OB_NO_NEED_MERGE != ret) {
LOG_WARN("Failed to find meta merge base table", K(ret), KPC(last_major), KPC(base_table));
}
} else if (base_table->get_snapshot_version() < min_snapshot || max_snapshot != INT64_MAX) {
// max_snapshot == INT64_MAX means there's no next freeze_info
ret = OB_NO_NEED_MERGE;
LOG_DEBUG("no need meta merge when the tablet is doing major merge", K(ret), K(min_snapshot), K(max_snapshot), KPC(base_table));
} else if (OB_FAIL(add_meta_merge_result(base_table, table_store_wrapper.get_meta_handle(), result, true/*update_snapshot*/))) {
LOG_WARN("failed to add base table to meta merge result", K(ret), KPC(base_table), K(result));
} else {
base_row_cnt = static_cast<ObSSTable *>(base_table)->get_row_count();
++tx_determ_table_cnt; // inc for base_table
}
int64_t tx_determ_table_cnt = 1;
int64_t inc_row_cnt = 0;
const ObSSTableArray &minor_tables = table_store->get_minor_sstables();
bool found_undeterm_table = false;
for (int64_t i = 0; OB_SUCC(ret) && i < minor_tables.count(); ++i) {
ObITable *table = minor_tables[i];
@ -1454,39 +1447,53 @@ int ObAdaptiveMergePolicy::find_meta_major_tables(
LOG_WARN("get unexpected table", K(ret), K(i), K(PRINT_TS_WRAPPER(table_store_wrapper)));
} else if (result.handle_.get_count() <= 1 && table->get_upper_trans_version() <= base_table->get_snapshot_version()) {
continue; // skip minor sstable which has been merged
} else if (!found_undeterm_table && table->is_trans_state_deterministic()) {
++tx_determ_table_cnt;
ObSSTableMetaHandle inc_handle;
if (OB_FAIL(static_cast<ObSSTable *>(table)->get_meta(inc_handle))) {
LOG_WARN("failed to inc table meta", K(ret), KPC(table));
} else if (!table->is_trans_state_deterministic()) {
if (is_meta_major_merge(merge_type)) {
break;
} else {
inc_row_cnt += inc_handle.get_sstable_meta().get_row_count();
found_undeterm_table = true;
}
} else {
found_undeterm_table = true;
} else if (!found_undeterm_table) {
++tx_determ_table_cnt;
inc_row_cnt += static_cast<ObSSTable *>(table)->get_row_count();
}
if (FAILEDx(add_meta_merge_result(
table, table_store_wrapper.get_meta_handle(), result, !found_undeterm_table))) {
if (FAILEDx(add_meta_merge_result(table, table_store_wrapper.get_meta_handle(), result, !found_undeterm_table))) {
LOG_WARN("failed to add minor table to meta merge result", K(ret));
}
}
} // end for
bool scanty_tx_determ_table = tx_determ_table_cnt < 2;
bool scanty_inc_row_cnt = inc_row_cnt < TRANS_STATE_DETERM_ROW_CNT_THRESHOLD
|| inc_row_cnt < INC_ROW_COUNT_PERCENTAGE_THRESHOLD * base_table->get_row_count();
#ifdef ERRSIM
#define META_POLICY_ERRSIM(tracepoint) \
do { \
if (OB_SUCC(ret)) { \
ret = OB_E((EventTable::tracepoint)) OB_SUCCESS; \
if (OB_FAIL(ret)) { \
ret = OB_SUCCESS; \
STORAGE_LOG(INFO, "ERRSIM " #tracepoint); \
scanty_tx_determ_table = false; \
scanty_inc_row_cnt = false; \
} \
} \
} while(0);
META_POLICY_ERRSIM(EN_COMPACTION_SCHEDULE_META_MERGE);
#undef META_POLICY_ERRSIM
#endif
if (OB_FAIL(ret)) {
} else if (tx_determ_table_cnt < 2) {
} else if (scanty_tx_determ_table || scanty_inc_row_cnt) {
ret = OB_NO_NEED_MERGE;
if (REACH_TENANT_TIME_INTERVAL(60 * 1000 * 1000/*60s*/)) {
LOG_INFO("no enough table for meta merge", K(ret), K(result), K(PRINT_TS_WRAPPER(table_store_wrapper)));
}
} else if (inc_row_cnt < TRANS_STATE_DETERM_ROW_CNT_THRESHOLD
|| inc_row_cnt < INC_ROW_COUNT_PERCENTAGE_THRESHOLD * base_row_cnt) {
ret = OB_NO_NEED_MERGE;
if (REACH_TENANT_TIME_INTERVAL(60 * 1000 * 1000/*60s*/)) {
LOG_INFO("found sstable could merge is not enough", K(ret), K(inc_row_cnt), K(base_row_cnt));
if (REACH_TENANT_TIME_INTERVAL(30_s)) {
LOG_INFO("no enough table or no enough rows for meta merge", K(ret),
K(scanty_tx_determ_table), K(scanty_inc_row_cnt), K(result), K(PRINT_TS_WRAPPER(table_store_wrapper)));
}
} else if (result.version_range_.snapshot_version_ < tablet.get_multi_version_start()) {
ret = OB_NO_NEED_MERGE;
if (REACH_TENANT_TIME_INTERVAL(60 * 1000 * 1000/*60s*/)) {
if (REACH_TENANT_TIME_INTERVAL(30_s)) {
LOG_INFO("chosen snapshot is abandoned", K(ret), K(result), K(tablet.get_multi_version_start()));
}
}
@ -1508,40 +1515,6 @@ int ObAdaptiveMergePolicy::find_meta_major_tables(
return ret;
}
int ObAdaptiveMergePolicy::find_base_table_and_inc_version(
ObITable *last_major_table,
ObITable *last_minor_table,
ObITable *&meta_base_table,
int64_t &merge_inc_version)
{
int ret = OB_SUCCESS;
// find meta base table
if (OB_NOT_NULL(last_major_table)) {
if (OB_ISNULL(meta_base_table)) {
meta_base_table = last_major_table;
} else if (OB_UNLIKELY(meta_base_table->get_snapshot_version() <= last_major_table->get_snapshot_version())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("meta major table covered by major", K(ret), KPC(meta_base_table), KPC(last_major_table));
}
}
// find meta merge inc version
if (OB_FAIL(ret)) {
} else if (OB_NOT_NULL(last_major_table) && OB_NOT_NULL(last_minor_table)) {
merge_inc_version = MAX(last_major_table->get_snapshot_version(), last_minor_table->get_max_merged_trans_version());
} else if (OB_NOT_NULL(last_major_table)) {
merge_inc_version = last_major_table->get_snapshot_version();
} else if (OB_NOT_NULL(last_minor_table)){
merge_inc_version = last_minor_table->get_max_merged_trans_version();
}
if (OB_SUCC(ret) && (NULL == meta_base_table || merge_inc_version <= 0)) {
ret = OB_NO_NEED_MERGE;
LOG_WARN("cannot meta merge with null base table or inc version", K(ret), K(meta_base_table), K(merge_inc_version));
}
return ret;
}
int ObAdaptiveMergePolicy::add_meta_merge_result(
ObITable *table,
const ObStorageMetaHandle &table_meta_handle,
@ -1555,7 +1528,7 @@ int ObAdaptiveMergePolicy::add_meta_merge_result(
LOG_WARN("get invalid argument", K(ret), KPC(table));
} else if (OB_FAIL(result.handle_.add_sstable(table, table_meta_handle))) {
LOG_WARN("failed to add table", K(ret), KPC(table));
} else if (table->is_meta_major_sstable() || table->is_major_sstable()) {
} else if (table->is_major_sstable()) {
result.version_range_.base_version_ = 0;
result.version_range_.multi_version_start_ = table->get_snapshot_version();
result.version_range_.snapshot_version_ = table->get_snapshot_version();