optimize memtable releasing in mini compaction
This commit is contained in:
@ -201,8 +201,9 @@ int ObPartitionMergePolicy::get_mini_merge_tables(
|
|||||||
if (OB_NO_NEED_MERGE != ret) {
|
if (OB_NO_NEED_MERGE != ret) {
|
||||||
LOG_WARN("failed to find mini merge tables", K(ret), K(freeze_info));
|
LOG_WARN("failed to find mini merge tables", K(ret), K(freeze_info));
|
||||||
}
|
}
|
||||||
} else if (!result.update_tablet_directly_
|
} else if (result.update_tablet_directly_) {
|
||||||
&& OB_FAIL(deal_with_minor_result(merge_type, ls, tablet, result))) {
|
// do nothing
|
||||||
|
} else if (OB_FAIL(deal_with_minor_result(merge_type, ls, tablet, result))) {
|
||||||
LOG_WARN("failed to deal with minor merge result", K(ret));
|
LOG_WARN("failed to deal with minor merge result", K(ret));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,7 +230,7 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
|
|||||||
// can only take out all frozen memtable
|
// can only take out all frozen memtable
|
||||||
ObIMemtable *memtable = nullptr;
|
ObIMemtable *memtable = nullptr;
|
||||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||||
bool need_update_snapshot_version = false;
|
bool has_release_memtable = false;
|
||||||
|
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < memtable_handles.count(); ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < memtable_handles.count(); ++i) {
|
||||||
if (OB_ISNULL(memtable = static_cast<ObIMemtable *>(memtable_handles.at(i).get_table()))) {
|
if (OB_ISNULL(memtable = static_cast<ObIMemtable *>(memtable_handles.at(i).get_table()))) {
|
||||||
@ -242,15 +243,12 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
|
|||||||
FLOG_INFO("memtable cannot mini merge now", K(ret), K(i), KPC(memtable), K(max_snapshot_version), K(memtable_handles), K(param));
|
FLOG_INFO("memtable cannot mini merge now", K(ret), K(i), KPC(memtable), K(max_snapshot_version), K(memtable_handles), K(param));
|
||||||
break;
|
break;
|
||||||
} else if (memtable->get_end_scn() <= clog_checkpoint_scn) {
|
} else if (memtable->get_end_scn() <= clog_checkpoint_scn) {
|
||||||
if (!tablet_id.is_special_merge_tablet() &&
|
has_release_memtable = true;
|
||||||
memtable->get_snapshot_version() > tablet.get_tablet_meta().snapshot_version_) {
|
|
||||||
need_update_snapshot_version = true;
|
|
||||||
} else {
|
|
||||||
LOG_DEBUG("memtable wait to release", K(param), KPC(memtable));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} else if (result.handle_.get_count() > 0) {
|
} else if (result.handle_.get_count() > 0) {
|
||||||
if (result.scn_range_.end_scn_ < memtable->get_start_scn()) {
|
if (result.scn_range_.end_scn_ <= clog_checkpoint_scn) {
|
||||||
|
// meet the first memtable should be merged, reset the result to remove the memtable should be released.
|
||||||
|
result.reset();
|
||||||
|
} else if (result.scn_range_.end_scn_ < memtable->get_start_scn()) {
|
||||||
FLOG_INFO("scn range not continues, reset previous minor merge tables",
|
FLOG_INFO("scn range not continues, reset previous minor merge tables",
|
||||||
"last_end_scn", result.scn_range_.end_scn_, KPC(memtable), K(tablet));
|
"last_end_scn", result.scn_range_.end_scn_, KPC(memtable), K(tablet));
|
||||||
// mini merge always use the oldest memtable to dump
|
// mini merge always use the oldest memtable to dump
|
||||||
@ -264,32 +262,28 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (FAILEDx(result.handle_.add_memtable(memtable))) {
|
||||||
if (OB_FAIL(result.handle_.add_memtable(memtable))) {
|
LOG_WARN("Failed to add memtable", K(ret), KPC(memtable));
|
||||||
LOG_WARN("Failed to add memtable", KPC(memtable), K(ret));
|
} else {
|
||||||
} else {
|
// update end_scn/snapshot_version
|
||||||
// update end_scn/snapshot_version
|
if (1 == result.handle_.get_count()) {
|
||||||
if (1 == result.handle_.get_count()) {
|
result.scn_range_.start_scn_ = memtable->get_start_scn();
|
||||||
result.scn_range_.start_scn_ = memtable->get_start_scn();
|
|
||||||
}
|
|
||||||
result.scn_range_.end_scn_ = memtable->get_end_scn();
|
|
||||||
result.version_range_.snapshot_version_ = MAX(memtable->get_snapshot_version(), result.version_range_.snapshot_version_);
|
|
||||||
}
|
}
|
||||||
|
result.scn_range_.end_scn_ = memtable->get_end_scn();
|
||||||
|
result.version_range_.snapshot_version_ = MAX(memtable->get_snapshot_version(), result.version_range_.snapshot_version_);
|
||||||
}
|
}
|
||||||
} // end for
|
} // end for
|
||||||
|
|
||||||
bool need_check_tablet = false;
|
if (OB_SUCC(ret)) {
|
||||||
if (OB_FAIL(ret)) {
|
bool need_check_tablet = false;
|
||||||
} else {
|
|
||||||
result.suggest_merge_type_ = param.merge_type_;
|
result.suggest_merge_type_ = param.merge_type_;
|
||||||
result.version_range_.multi_version_start_ = tablet.get_multi_version_start();
|
result.version_range_.multi_version_start_ = tablet.get_multi_version_start();
|
||||||
if (result.handle_.empty()) {
|
|
||||||
ret = OB_NO_NEED_MERGE;
|
if (result.scn_range_.end_scn_ <= clog_checkpoint_scn) {
|
||||||
} else if (result.scn_range_.end_scn_ <= clog_checkpoint_scn) {
|
if (has_release_memtable) {
|
||||||
if (need_update_snapshot_version) {
|
|
||||||
result.update_tablet_directly_ = true;
|
result.update_tablet_directly_ = true;
|
||||||
result.version_range_.multi_version_start_ = 0; // set multi_version_start to pass tablet::init check
|
result.version_range_.multi_version_start_ = 0; // set multi_version_start to pass tablet::init check
|
||||||
LOG_INFO("meet empty force freeze memtable, could update tablet directly", K(ret), K(result));
|
FLOG_INFO("no memtable should be merged, but has memtable should be released", K(ret), K(result), K(memtable_handles));
|
||||||
} else {
|
} else {
|
||||||
ret = OB_NO_NEED_MERGE;
|
ret = OB_NO_NEED_MERGE;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -783,12 +783,6 @@ int ObTabletMergeCtx::inner_init_for_mini(bool &skip_rest_operation)
|
|||||||
// TODO(@DanLin) optimize this interface
|
// TODO(@DanLin) optimize this interface
|
||||||
if (OB_NO_NEED_MERGE != ret) {
|
if (OB_NO_NEED_MERGE != ret) {
|
||||||
LOG_WARN("failed to get merge tables", K(ret), KPC(this), K(get_merge_table_result));
|
LOG_WARN("failed to get merge tables", K(ret), KPC(this), K(get_merge_table_result));
|
||||||
} else { // OB_NO_NEED_MERGE
|
|
||||||
int tmp_ret = OB_SUCCESS;
|
|
||||||
// then release memtable
|
|
||||||
if (OB_TMP_FAIL(tablet->release_memtables(tablet->get_tablet_meta().clog_checkpoint_scn_))) {
|
|
||||||
LOG_WARN("failed to release memtable", K(tmp_ret), K(tablet->get_tablet_meta().clog_checkpoint_scn_));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else if (FALSE_IT(time_guard_.click(ObCompactionTimeGuard::COMPACTION_POLICY))) {
|
} else if (FALSE_IT(time_guard_.click(ObCompactionTimeGuard::COMPACTION_POLICY))) {
|
||||||
} else if (get_merge_table_result.update_tablet_directly_) {
|
} else if (get_merge_table_result.update_tablet_directly_) {
|
||||||
@ -829,44 +823,26 @@ int ObTabletMergeCtx::get_schema_and_gene_from_result(const ObGetMergeTablesResu
|
|||||||
int ObTabletMergeCtx::update_tablet_or_release_memtable(const ObGetMergeTablesResult &get_merge_table_result)
|
int ObTabletMergeCtx::update_tablet_or_release_memtable(const ObGetMergeTablesResult &get_merge_table_result)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t old_storage_schema_version = 0;
|
|
||||||
ObTablet *old_tablet = tablet_handle_.get_obj();
|
ObTablet *old_tablet = tablet_handle_.get_obj();
|
||||||
// check whether snapshot is updated or have storage_schema
|
|
||||||
bool update_table_store_flag = false;
|
|
||||||
ObArenaAllocator temp_allocator("DirUpdateTablet", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
|
|
||||||
const ObStorageSchema *schema_on_tablet = nullptr;
|
|
||||||
|
|
||||||
if (OB_UNLIKELY(!is_mini_merge(param_.merge_type_))) {
|
if (OB_UNLIKELY(!is_mini_merge(param_.merge_type_))) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("can only update tablet in mini merge", K(ret), KPC(this));
|
LOG_WARN("can only update tablet in mini merge", K(ret), KPC(this));
|
||||||
|
} else if (OB_UNLIKELY(get_merge_table_result.scn_range_.end_scn_ > old_tablet->get_tablet_meta().clog_checkpoint_scn_)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("can't have larger end_log_ts", K(ret), K(get_merge_table_result), K(old_tablet->get_tablet_meta()));
|
||||||
|
} else if (old_tablet->get_tablet_meta().snapshot_version_ >= get_merge_table_result.version_range_.snapshot_version_) {
|
||||||
|
// do nothing, no need to advance snapshot version on tablet meta.
|
||||||
} else if (OB_FAIL(get_storage_schema_to_merge(get_merge_table_result.handle_))) {
|
} else if (OB_FAIL(get_storage_schema_to_merge(get_merge_table_result.handle_))) {
|
||||||
LOG_WARN("failed to get storage schema", K(ret), K(get_merge_table_result));
|
LOG_WARN("failed to get storage schema", K(ret), K(get_merge_table_result));
|
||||||
} else if (OB_ISNULL(schema_ctx_.storage_schema_)) {
|
} else if (OB_ISNULL(schema_ctx_.storage_schema_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("storage schema is unexpected null", K(ret), KPC(this));
|
LOG_WARN("storage schema is unexpected null", K(ret), KPC(this));
|
||||||
} else if (OB_FAIL(old_tablet->load_storage_schema(temp_allocator, schema_on_tablet))) {
|
|
||||||
LOG_WARN("failed to load storage schema", K(ret), KPC(old_tablet));
|
|
||||||
} else if (OB_UNLIKELY(get_merge_table_result.scn_range_.end_scn_ > old_tablet->get_tablet_meta().clog_checkpoint_scn_)) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("can't have larger end_log_ts", K(ret), K(get_merge_table_result), K(old_tablet->get_tablet_meta()));
|
|
||||||
} else if (get_merge_table_result.version_range_.snapshot_version_ > old_tablet->get_snapshot_version()
|
|
||||||
|| schema_on_tablet->compare_schema_newer(*schema_ctx_.storage_schema_)) {
|
|
||||||
// need write slog to update snapshot_version on tablet_meta
|
|
||||||
// if schema on memtable is newer, need update into tablet
|
|
||||||
// (written rows are rolled back on leader, no logging, end_scn won't be updated)
|
|
||||||
update_table_store_flag = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const SCN &release_memtable_scn = old_tablet->get_clog_checkpoint_scn();
|
if (FAILEDx(update_tablet_directly(get_merge_table_result))) {
|
||||||
if (OB_FAIL(ret)) {
|
LOG_WARN("failed to update tablet directly", K(ret), K(get_merge_table_result));
|
||||||
} else if (update_table_store_flag && OB_FAIL(update_tablet_directly(get_merge_table_result))) {
|
|
||||||
LOG_WARN("failed to update tablet directly", K(ret), K(get_merge_table_result), K(update_table_store_flag));
|
|
||||||
} else if (OB_FAIL(old_tablet->release_memtables(release_memtable_scn))) {
|
|
||||||
LOG_WARN("failed to release memtable", K(ret), K(release_memtable_scn));
|
|
||||||
} else {
|
|
||||||
LOG_INFO("success to release memtable", K(ret), K_(param), K(release_memtable_scn));
|
|
||||||
}
|
}
|
||||||
ObTablet::free_storage_schema(temp_allocator, schema_on_tablet);
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -891,6 +867,12 @@ int ObTabletMergeCtx::update_tablet_directly(const ObGetMergeTablesResult &get_m
|
|||||||
if (OB_FAIL(ls_handle_.get_ls()->update_tablet_table_store(
|
if (OB_FAIL(ls_handle_.get_ls()->update_tablet_table_store(
|
||||||
param_.tablet_id_, param, new_tablet_handle))) {
|
param_.tablet_id_, param, new_tablet_handle))) {
|
||||||
LOG_WARN("failed to update tablet table store", K(ret), K(param));
|
LOG_WARN("failed to update tablet table store", K(ret), K(param));
|
||||||
|
} else if (OB_TMP_FAIL(new_tablet_handle.get_obj()->release_memtables(new_tablet_handle.get_obj()->get_tablet_meta().clog_checkpoint_scn_))) {
|
||||||
|
LOG_WARN("failed to release memtable", K(tmp_ret), K(param));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
} else if (OB_ISNULL(schema_ctx_.storage_schema_)) {
|
||||||
} else if (OB_FAIL(merge_info_.init(*this, false/*need_check*/))) {
|
} else if (OB_FAIL(merge_info_.init(*this, false/*need_check*/))) {
|
||||||
LOG_WARN("failed to inie merge info", K(ret));
|
LOG_WARN("failed to inie merge info", K(ret));
|
||||||
} else if (OB_FAIL(tables_handle_.assign(get_merge_table_result.handle_))) { // assgin for generate_participant_table_info
|
} else if (OB_FAIL(tables_handle_.assign(get_merge_table_result.handle_))) { // assgin for generate_participant_table_info
|
||||||
|
|||||||
@ -888,8 +888,7 @@ TEST_F(TestCompactionPolicy, check_mini_merge_basic)
|
|||||||
tablet_handle_.get_obj()->tablet_meta_.clog_checkpoint_scn_.convert_for_tx(300);
|
tablet_handle_.get_obj()->tablet_meta_.clog_checkpoint_scn_.convert_for_tx(300);
|
||||||
tablet_handle_.get_obj()->tablet_meta_.snapshot_version_ = 300;
|
tablet_handle_.get_obj()->tablet_meta_.snapshot_version_ = 300;
|
||||||
ret = ObPartitionMergePolicy::get_mini_merge_tables(param, ls, *tablet_handle_.get_obj(), result);
|
ret = ObPartitionMergePolicy::get_mini_merge_tables(param, ls, *tablet_handle_.get_obj(), result);
|
||||||
ASSERT_EQ(OB_NO_NEED_MERGE, ret);
|
ASSERT_EQ(result.update_tablet_directly_, true);
|
||||||
ASSERT_EQ(result.update_tablet_directly_, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(TestCompactionPolicy, check_minor_merge_basic)
|
TEST_F(TestCompactionPolicy, check_minor_merge_basic)
|
||||||
|
|||||||
Reference in New Issue
Block a user