fix 4090 problem when scheduling medium compaction

This commit is contained in:
Fengjingkun
2023-08-03 07:42:58 +00:00
committed by ob-robot
parent 09e015fb7b
commit d320c6210c
2 changed files with 41 additions and 56 deletions

View File

@ -1342,81 +1342,66 @@ int ObAdaptiveMergePolicy::find_meta_major_tables(
int64_t tx_determ_table_cnt = 0; int64_t tx_determ_table_cnt = 0;
ObSSTableMetaHandle meta_handle; ObSSTableMetaHandle meta_handle;
ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper; ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
const ObTabletTableStore *table_store = nullptr;
if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) { if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) {
LOG_WARN("fail to fetch table store", K(ret)); LOG_WARN("fail to fetch table store", K(ret));
} else if (!table_store_wrapper.get_member()->is_valid()) { } else if (OB_UNLIKELY(NULL == (table_store = table_store_wrapper.get_member()) || !table_store->is_valid())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("ObTabletTableStore is not valid", K(ret), K(table_store_wrapper)); LOG_WARN("ObTabletTableStore is not valid", K(ret), K(table_store_wrapper));
} else { } else {
ObITable *last_major = table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true); ObITable *last_major = table_store->get_major_sstables().get_boundary_table(true);
ObITable *last_minor = table_store_wrapper.get_member()->get_minor_sstables().get_boundary_table(true); ObITable *base_table = nullptr;
ObITable *base_table = table_store_wrapper.get_member()->get_meta_major_sstable(); const ObSSTableArray &minor_tables = table_store->get_minor_sstables();
const ObSSTableArray &minor_tables = table_store_wrapper.get_member()->get_minor_sstables(); if (minor_tables.empty() || nullptr == last_major) {
if (nullptr == last_minor || nullptr == last_major) {
ret = OB_NO_NEED_MERGE; ret = OB_NO_NEED_MERGE;
LOG_WARN("no minor/major sstable to do meta major merge", K(ret), KPC(last_minor), KPC(last_major)); LOG_WARN("no minor/major sstable to do meta major merge", K(ret), K(minor_tables), KPC(last_major));
} else if (FALSE_IT(base_table = nullptr == table_store->get_meta_major_sstable()
? last_major
: table_store->get_meta_major_sstable())) {
} else if (OB_FAIL(ObPartitionMergePolicy::get_boundary_snapshot_version( } else if (OB_FAIL(ObPartitionMergePolicy::get_boundary_snapshot_version(
tablet, min_snapshot, max_snapshot, tablet, min_snapshot, max_snapshot, false/*check_table_cnt*/, false/*is_multi_version_merge*/))) {
false/*check_table_cnt*/, false/*is_multi_version_merge*/))) {
if (OB_NO_NEED_MERGE != ret) { if (OB_NO_NEED_MERGE != ret) {
LOG_WARN("Failed to find meta merge base table", K(ret), KPC(last_major), KPC(last_major), KPC(base_table)); LOG_WARN("Failed to find meta merge base table", K(ret), KPC(last_major), KPC(base_table));
} }
} else if (FALSE_IT(base_table = nullptr == base_table ? last_major : base_table)) {
} else if (OB_ISNULL(base_table)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("base table is unexpected null", K(ret), KP(base_table));
} else if (base_table->get_snapshot_version() < min_snapshot || max_snapshot != INT64_MAX) { } else if (base_table->get_snapshot_version() < min_snapshot || max_snapshot != INT64_MAX) {
// max_snapshot == INT64_MAX means there's no next freeze_info // max_snapshot == INT64_MAX means there's no next freeze_info
ret = OB_NO_NEED_MERGE; ret = OB_NO_NEED_MERGE;
LOG_DEBUG("no need meta merge when the tablet is doing major merge", K(ret), K(min_snapshot), K(max_snapshot), KPC(base_table)); LOG_DEBUG("no need meta merge when the tablet is doing major merge", K(ret), K(min_snapshot), K(max_snapshot), KPC(base_table));
} else if (OB_FAIL(add_meta_merge_result( } else if (OB_FAIL(add_meta_merge_result(base_table, table_store_wrapper.get_meta_handle(), result, true/*update_snapshot*/))) {
base_table, table_store_wrapper.get_meta_handle(), result, true/*update_snapshot*/))) {
LOG_WARN("failed to add base table to meta merge result", K(ret), KPC(base_table), K(result)); LOG_WARN("failed to add base table to meta merge result", K(ret), KPC(base_table), K(result));
} else if (OB_FAIL(static_cast<ObSSTable *>(base_table)->get_meta(meta_handle))) { } else if (OB_FAIL(static_cast<ObSSTable *>(base_table)->get_meta(meta_handle))) {
LOG_WARN("failed to base table meta", K(ret), KPC(base_table)); LOG_WARN("failed to base table meta", K(ret), KPC(base_table));
} else { } else {
++tx_determ_table_cnt; // inc for base_table
bool found_undeterm_table = false;
base_row_cnt = meta_handle.get_sstable_meta().get_row_count(); base_row_cnt = meta_handle.get_sstable_meta().get_row_count();
ObITable *table = nullptr; ++tx_determ_table_cnt; // inc for base_table
for (int64_t i = 0; OB_SUCC(ret) && i < minor_tables.count(); ++i) { }
if (OB_ISNULL(table = minor_tables[i]) || !table->is_multi_version_minor_sstable()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected table", K(ret), K(i), K(PRINT_TS_WRAPPER(table_store_wrapper)));
} else if (table->get_upper_trans_version() <= base_table->get_snapshot_version()) {
// skip minor sstable which has been merged
continue;
} else if (!found_undeterm_table && table->is_trans_state_deterministic()) {
++tx_determ_table_cnt;
ObSSTableMetaHandle inc_handle;
if (OB_FAIL(static_cast<ObSSTable *>(table)->get_meta(inc_handle))) {
LOG_WARN("failed to inc table meta", K(ret), KPC(table));
} else {
inc_row_cnt += inc_handle.get_sstable_meta().get_row_count();
}
} else {
found_undeterm_table = true;
}
if (FAILEDx(add_meta_merge_result( bool found_undeterm_table = false;
table, table_store_wrapper.get_meta_handle(), result, !found_undeterm_table))) { for (int64_t i = 0; OB_SUCC(ret) && i < minor_tables.count(); ++i) {
LOG_WARN("failed to add minor table to meta merge result", K(ret)); ObITable *table = minor_tables[i];
if (OB_UNLIKELY(NULL == table || !table->is_multi_version_minor_sstable())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected table", K(ret), K(i), K(PRINT_TS_WRAPPER(table_store_wrapper)));
} else if (result.handle_.get_count() <= 1 && table->get_upper_trans_version() <= base_table->get_snapshot_version()) {
continue; // skip minor sstable which has been merged
} else if (!found_undeterm_table && table->is_trans_state_deterministic()) {
++tx_determ_table_cnt;
ObSSTableMetaHandle inc_handle;
if (OB_FAIL(static_cast<ObSSTable *>(table)->get_meta(inc_handle))) {
LOG_WARN("failed to inc table meta", K(ret), KPC(table));
} else {
inc_row_cnt += inc_handle.get_sstable_meta().get_row_count();
} }
} // end of for } else {
if (OB_FAIL(ret)) { found_undeterm_table = true;
} else if (tx_determ_table_cnt < 2) {
ret = OB_NO_NEED_MERGE;
LOG_INFO("no enough table for meta merge", K(ret), K(result), K(PRINT_TS_WRAPPER(table_store_wrapper)));
} else if (inc_row_cnt < TRANS_STATE_DETERM_ROW_CNT_THRESHOLD
|| inc_row_cnt < INC_ROW_COUNT_PERCENTAGE_THRESHOLD * base_row_cnt) {
ret = OB_NO_NEED_MERGE;
LOG_INFO("found sstable could merge is not enough", K(ret), K(inc_row_cnt), K(base_row_cnt));
} else if (result.version_range_.snapshot_version_ < tablet.get_multi_version_start()) {
ret = OB_NO_NEED_MERGE;
LOG_INFO("chosen snapshot is abandoned", K(ret), K(result), K(tablet.get_multi_version_start()));
} }
} // end of for
if (FAILEDx(add_meta_merge_result(
table, table_store_wrapper.get_meta_handle(), result, !found_undeterm_table))) {
LOG_WARN("failed to add minor table to meta merge result", K(ret));
}
}
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (tx_determ_table_cnt < 2) { } else if (tx_determ_table_cnt < 2) {
@ -1425,7 +1410,7 @@ int ObAdaptiveMergePolicy::find_meta_major_tables(
LOG_INFO("no enough table for meta merge", K(ret), K(result), K(PRINT_TS_WRAPPER(table_store_wrapper))); LOG_INFO("no enough table for meta merge", K(ret), K(result), K(PRINT_TS_WRAPPER(table_store_wrapper)));
} }
} else if (inc_row_cnt < TRANS_STATE_DETERM_ROW_CNT_THRESHOLD } else if (inc_row_cnt < TRANS_STATE_DETERM_ROW_CNT_THRESHOLD
|| inc_row_cnt < INC_ROW_COUNT_PERCENTAGE_THRESHOLD * base_row_cnt) { || inc_row_cnt < INC_ROW_COUNT_PERCENTAGE_THRESHOLD * base_row_cnt) {
ret = OB_NO_NEED_MERGE; ret = OB_NO_NEED_MERGE;
if (REACH_TENANT_TIME_INTERVAL(60 * 1000 * 1000/*60s*/)) { if (REACH_TENANT_TIME_INTERVAL(60 * 1000 * 1000/*60s*/)) {
LOG_INFO("found sstable could merge is not enough", K(ret), K(inc_row_cnt), K(base_row_cnt)); LOG_INFO("found sstable could merge is not enough", K(ret), K(inc_row_cnt), K(base_row_cnt));
@ -1450,7 +1435,7 @@ int ObAdaptiveMergePolicy::find_meta_major_tables(
} }
#endif #endif
} } // else
return ret; return ret;
} }

View File

@ -361,7 +361,7 @@ private:
static constexpr int64_t CHECK_INTERVAL = 120L * 1000L * 1000L; //120s static constexpr int64_t CHECK_INTERVAL = 120L * 1000L * 1000L; //120s
static constexpr int64_t CHECK_RUNNING_TIME_INTERVAL = 120L * 1000L * 1000L; //120s static constexpr int64_t CHECK_RUNNING_TIME_INTERVAL = 120L * 1000L * 1000L; //120s
static constexpr int64_t CHECK_SYS_STAT_INTERVAL = 10 * 1000LL * 1000LL; //10s static constexpr int64_t CHECK_SYS_STAT_INTERVAL = 10 * 1000LL * 1000LL; //10s
static constexpr int32_t DEFAULT_MAX_FREE_STREAM_CNT = 10000; static constexpr int32_t DEFAULT_MAX_FREE_STREAM_CNT = 5000;
static constexpr int32_t DEFAULT_UP_LIMIT_STREAM_CNT = 20000; static constexpr int32_t DEFAULT_UP_LIMIT_STREAM_CNT = 20000;
static constexpr int32_t DEFAULT_BUCKET_NUM = 1000; static constexpr int32_t DEFAULT_BUCKET_NUM = 1000;
static constexpr int32_t DEFAULT_MAX_PENDING_CNT = 40000; static constexpr int32_t DEFAULT_MAX_PENDING_CNT = 40000;