fix major schedule not init parallel range & fix mysqltest
This commit is contained in:
@ -40,11 +40,6 @@ ObMediumCompactionScheduleFunc::ChooseMediumScn ObMediumCompactionScheduleFunc::
|
||||
ObMediumCompactionScheduleFunc::choose_major_snapshot,
|
||||
};
|
||||
|
||||
ObMediumCompactionScheduleFunc::PrepareTableIter ObMediumCompactionScheduleFunc::prepare_table_iter[MEDIUM_FUNC_CNT]
|
||||
= { ObMediumCompactionScheduleFunc::prepare_iter_for_medium,
|
||||
ObMediumCompactionScheduleFunc::prepare_iter_for_major,
|
||||
};
|
||||
|
||||
int64_t ObMediumCompactionScheduleFunc::to_string(char *buf, const int64_t buf_len) const
|
||||
{
|
||||
int64_t pos = 0;
|
||||
@ -149,8 +144,12 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot(
|
||||
medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE;
|
||||
medium_info.medium_snapshot_ = freeze_info.freeze_version;
|
||||
result.schema_version_ = freeze_info.schema_version;
|
||||
LOG_TRACE("choose_major_snapshot", K(ret), "ls_id", ls.get_ls_id(),
|
||||
"tablet_id", tablet.get_tablet_meta().tablet_id_, K(medium_info), K(freeze_info));
|
||||
if (OB_FAIL(get_result_for_major(tablet, medium_info, result))) {
|
||||
LOG_WARN("failed get result for major", K(ret), K(medium_info));
|
||||
} else {
|
||||
LOG_TRACE("choose_major_snapshot", K(ret), "ls_id", ls.get_ls_id(),
|
||||
"tablet_id", tablet.get_tablet_meta().tablet_id_, K(medium_info), K(freeze_info), K(result));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -296,26 +295,27 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const ObTabletID &tablet_id = tablet_.get_tablet_meta().tablet_id_;
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
uint64_t compat_version = 0;
|
||||
LOG_TRACE("decide_medium_snapshot", K(ret), KPC(this), K(tablet_id));
|
||||
if (OB_FAIL(tablet_.get_max_sync_medium_scn(max_sync_medium_scn))) {
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(ls_.add_dependent_medium_tablet(tablet_id))) { // add dependent_id in ObLSReservedSnapshotMgr
|
||||
LOG_WARN("failed to add dependent tablet", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
|
||||
LOG_WARN("fail to get data version", K(ret));
|
||||
} else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_1_0_0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid data version to schedule medium compaction", K(ret), K(compat_version));
|
||||
} else {
|
||||
int64_t max_reserved_snapshot = 0;
|
||||
ObGetMergeTablesResult result;
|
||||
ObMediumCompactionInfo medium_info;
|
||||
uint64_t compat_version = 0;
|
||||
medium_info.data_version_ = compat_version;
|
||||
|
||||
if (OB_FAIL(choose_medium_scn[is_major](ls_, tablet_, merge_reason, allocator_, medium_info, result))) {
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this));
|
||||
}
|
||||
} else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
|
||||
LOG_WARN("fail to get data version", K(ret));
|
||||
} else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_1_0_0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid data version to schedule medium compaction", K(ret), K(compat_version));
|
||||
} else if (FALSE_IT(medium_info.data_version_ = compat_version)) {
|
||||
} else if (is_major) {
|
||||
// do nothing
|
||||
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
||||
@ -396,8 +396,7 @@ int ObMediumCompactionScheduleFunc::init_parallel_range(
|
||||
|
||||
int64_t expected_task_count = 0;
|
||||
const int64_t tablet_size = medium_info.storage_schema_.get_tablet_size();
|
||||
const ObSSTable *first_sstable =
|
||||
static_cast<const ObSSTable*>(tablet_.get_table_store().get_major_sstables().get_boundary_table(true/*last*/));
|
||||
const ObSSTable *first_sstable = static_cast<const ObSSTable *>(result.handle_.get_table(0));
|
||||
if (OB_ISNULL(first_sstable)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sstable is unexpected null", K(ret), K(tablet_));
|
||||
@ -429,7 +428,7 @@ int ObMediumCompactionScheduleFunc::init_parallel_range(
|
||||
range.set_start_key(ObStoreRowkey::MIN_STORE_ROWKEY);
|
||||
range.set_end_key(ObStoreRowkey::MAX_STORE_ROWKEY);
|
||||
const bool is_major = medium_info.is_major_compaction();
|
||||
if (OB_FAIL(prepare_table_iter[is_major](tablet_, result, medium_info, table_iter))) {
|
||||
if (OB_FAIL(prepare_iter(result, table_iter))) {
|
||||
LOG_WARN("failed to get table iter", K(ret), K(range_array));
|
||||
} else if (OB_FAIL(input_range_array.push_back(range))) {
|
||||
LOG_WARN("failed to push back range", K(ret), K(range));
|
||||
@ -448,15 +447,12 @@ int ObMediumCompactionScheduleFunc::init_parallel_range(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMediumCompactionScheduleFunc::prepare_iter_for_major(
|
||||
ObTablet &tablet,
|
||||
const ObGetMergeTablesResult &result,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObTableStoreIterator &table_iter)
|
||||
int ObMediumCompactionScheduleFunc::get_result_for_major(
|
||||
ObTablet &tablet,
|
||||
const ObMediumCompactionInfo &medium_info,
|
||||
ObGetMergeTablesResult &result)
|
||||
{
|
||||
UNUSED(result);
|
||||
int ret = OB_SUCCESS;
|
||||
table_iter.reset();
|
||||
ObSSTable *base_table = nullptr;
|
||||
const ObTabletTableStore &table_store = tablet.get_table_store();
|
||||
|
||||
@ -468,31 +464,30 @@ int ObMediumCompactionScheduleFunc::prepare_iter_for_major(
|
||||
LOG_WARN("major sstable not exist", K(ret), K(table_store));
|
||||
} else if (base_table->get_snapshot_version() >= medium_info.medium_snapshot_) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (OB_FAIL(table_iter.add_table(base_table))) {
|
||||
} else if (OB_FAIL(result.handle_.add_table(base_table))) {
|
||||
LOG_WARN("failed to add table into iterator", K(ret), KP(base_table));
|
||||
} else {
|
||||
const ObSSTableArray &minor_tables = table_store.get_minor_sstables();
|
||||
int64_t start_idx = 0;
|
||||
bool start_add_table_flag = false;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < minor_tables.count_; ++i) {
|
||||
if (OB_ISNULL(minor_tables[i])) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("table must not null", K(ret), K(i), K(minor_tables));
|
||||
} else if (minor_tables[i]->get_upper_trans_version() >= base_table->get_snapshot_version()) {
|
||||
start_idx = i;
|
||||
break;
|
||||
} else if (!start_add_table_flag
|
||||
&& minor_tables[i]->get_upper_trans_version() >= base_table->get_snapshot_version()) {
|
||||
start_add_table_flag = true;
|
||||
}
|
||||
if (OB_SUCC(ret) && start_add_table_flag) {
|
||||
if (OB_FAIL(result.handle_.add_table(minor_tables[i]))) {
|
||||
LOG_WARN("failed to add table", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (FAILEDx(table_iter.add_tables(minor_tables.array_ + start_idx, minor_tables.count_ - start_idx))) {
|
||||
LOG_WARN("failed to add tables", K(ret), K(start_idx));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int ObMediumCompactionScheduleFunc::prepare_iter_for_medium(
|
||||
ObTablet &tablet,
|
||||
int ObMediumCompactionScheduleFunc::prepare_iter(
|
||||
const ObGetMergeTablesResult &result,
|
||||
ObMediumCompactionInfo &medium_info,
|
||||
ObTableStoreIterator &table_iter)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
Reference in New Issue
Block a user