revert bugfix for migrate
This commit is contained in:
@ -133,14 +133,14 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
|
|||||||
// Keep max_snapshot_version currently because major merge must be done step by step
|
// Keep max_snapshot_version currently because major merge must be done step by step
|
||||||
int64_t max_snapshot_version = freeze_info.next.freeze_ts;
|
int64_t max_snapshot_version = freeze_info.next.freeze_ts;
|
||||||
ObITable *last_table = tablet.get_table_store().get_minor_sstables().get_boundary_table(true/*last*/);
|
ObITable *last_table = tablet.get_table_store().get_minor_sstables().get_boundary_table(true/*last*/);
|
||||||
const int64_t last_minor_log_ts = nullptr == last_table ? tablet.get_clog_checkpoint_ts() : last_table->get_end_log_ts();
|
const int64_t clog_checkpoint_ts = tablet.get_clog_checkpoint_ts();
|
||||||
|
|
||||||
// Freezing in the restart phase may not satisfy end >= last_max_sstable,
|
// Freezing in the restart phase may not satisfy end >= last_max_sstable,
|
||||||
// so the memtable cannot be filtered by log_ts
|
// so the memtable cannot be filtered by log_ts
|
||||||
// can only take out all frozen memtable
|
// can only take out all frozen memtable
|
||||||
ObIMemtable *memtable = nullptr;
|
ObIMemtable *memtable = nullptr;
|
||||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||||
int64_t log_ts_included_memtable_cnt = 0;
|
bool contain_force_freeze_memtable = false;
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < memtable_handles.count(); ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < memtable_handles.count(); ++i) {
|
||||||
if (OB_ISNULL(memtable = static_cast<ObIMemtable *>(memtable_handles.at(i).get_table()))) {
|
if (OB_ISNULL(memtable = static_cast<ObIMemtable *>(memtable_handles.at(i).get_table()))) {
|
||||||
ret = OB_ERR_SYS;
|
ret = OB_ERR_SYS;
|
||||||
@ -151,9 +151,12 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
|
|||||||
} else if (!memtable->can_be_minor_merged()) {
|
} else if (!memtable->can_be_minor_merged()) {
|
||||||
FLOG_INFO("memtable cannot mini merge now", K(ret), K(i), KPC(memtable), K(max_snapshot_version), K(memtable_handles), K(param));
|
FLOG_INFO("memtable cannot mini merge now", K(ret), K(i), KPC(memtable), K(max_snapshot_version), K(memtable_handles), K(param));
|
||||||
break;
|
break;
|
||||||
} else if (memtable->get_end_log_ts() <= last_minor_log_ts) {
|
} else if (memtable->get_end_log_ts() <= clog_checkpoint_ts) {
|
||||||
if (!tablet_id.is_special_merge_tablet()) {
|
if (!tablet_id.is_special_merge_tablet()) {
|
||||||
++log_ts_included_memtable_cnt;
|
if (static_cast<ObMemtable *>(memtable)->get_is_force_freeze()
|
||||||
|
&& memtable->get_snapshot_version() > tablet.get_tablet_meta().snapshot_version_) {
|
||||||
|
contain_force_freeze_memtable = true;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG_DEBUG("memtable wait to release", K(param), KPC(memtable));
|
LOG_DEBUG("memtable wait to release", K(param), KPC(memtable));
|
||||||
continue;
|
continue;
|
||||||
@ -189,9 +192,15 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
|
|||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
result.suggest_merge_type_ = param.merge_type_;
|
result.suggest_merge_type_ = param.merge_type_;
|
||||||
result.version_range_.multi_version_start_ = tablet.get_multi_version_start();
|
result.version_range_.multi_version_start_ = tablet.get_multi_version_start();
|
||||||
if (log_ts_included_memtable_cnt > 0 && log_ts_included_memtable_cnt == result.handle_.get_count()) {
|
if (result.handle_.empty()) {
|
||||||
result.update_tablet_directly_ = true;
|
ret = OB_NO_NEED_MERGE;
|
||||||
LOG_INFO("meet one empty force freeze memtable, could update tablet directly", K(ret), K(result));
|
} else if (result.log_ts_range_.end_log_ts_ <= clog_checkpoint_ts) {
|
||||||
|
if (contain_force_freeze_memtable) {
|
||||||
|
result.update_tablet_directly_ = true;
|
||||||
|
LOG_INFO("meet empty force freeze memtable, could update tablet directly", K(ret), K(result));
|
||||||
|
} else {
|
||||||
|
ret = OB_NO_NEED_MERGE;
|
||||||
|
}
|
||||||
} else if (OB_FAIL(refine_mini_merge_result(tablet, result))) {
|
} else if (OB_FAIL(refine_mini_merge_result(tablet, result))) {
|
||||||
if (OB_NO_NEED_MERGE != ret) {
|
if (OB_NO_NEED_MERGE != ret) {
|
||||||
LOG_WARN("failed to refine mini merge result", K(ret), K(tablet_id));
|
LOG_WARN("failed to refine mini merge result", K(ret), K(tablet_id));
|
||||||
@ -1010,9 +1019,6 @@ int ObPartitionMergePolicy::refine_mini_merge_result(
|
|||||||
if (OB_UNLIKELY(!table_store.is_valid())) {
|
if (OB_UNLIKELY(!table_store.is_valid())) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("Table store not valid", K(ret), K(table_store));
|
LOG_WARN("Table store not valid", K(ret), K(table_store));
|
||||||
} else if (result.handle_.empty()){
|
|
||||||
ret = OB_NO_NEED_MERGE;
|
|
||||||
// LOG_WARN("no need mini merge", K(ret), K(result), K(table_store));
|
|
||||||
} else if (OB_ISNULL(last_table = table_store.get_minor_sstables().get_boundary_table(true/*last*/))) {
|
} else if (OB_ISNULL(last_table = table_store.get_minor_sstables().get_boundary_table(true/*last*/))) {
|
||||||
// no minor sstable, skip to cut memtable's boundary
|
// no minor sstable, skip to cut memtable's boundary
|
||||||
} else if (result.log_ts_range_.start_log_ts_ > last_table->get_end_log_ts()) {
|
} else if (result.log_ts_range_.start_log_ts_ > last_table->get_end_log_ts()) {
|
||||||
|
|||||||
@ -758,15 +758,15 @@ int ObTabletMergeCtx::update_tablet_or_release_memtable(const ObGetMergeTablesRe
|
|||||||
} else if (schema_ctx_.storage_schema_->get_schema_version() > old_tablet->get_storage_schema().get_schema_version()) {
|
} else if (schema_ctx_.storage_schema_->get_schema_version() > old_tablet->get_storage_schema().get_schema_version()) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("can't have larger storage schema", K(ret), K(schema_ctx_.storage_schema_), K(old_tablet->get_storage_schema()));
|
LOG_WARN("can't have larger storage schema", K(ret), K(schema_ctx_.storage_schema_), K(old_tablet->get_storage_schema()));
|
||||||
|
} else if (OB_UNLIKELY(get_merge_table_result.log_ts_range_.end_log_ts_ > old_tablet->get_tablet_meta().clog_checkpoint_ts_)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("can't have larger end_log_ts", K(ret), K(get_merge_table_result), K(old_tablet->get_tablet_meta()));
|
||||||
} else if (get_merge_table_result.version_range_.snapshot_version_ > old_tablet->get_snapshot_version()) {
|
} else if (get_merge_table_result.version_range_.snapshot_version_ > old_tablet->get_snapshot_version()) {
|
||||||
// need write slog to update snapshot_version on tablet_meta
|
// need write slog to update snapshot_version on tablet_meta
|
||||||
update_table_store_flag = true;
|
update_table_store_flag = true;
|
||||||
} else if (get_merge_table_result.log_ts_range_.end_log_ts_ > old_tablet->get_clog_checkpoint_ts()) {
|
|
||||||
// need write slog to update clog_checkpoint_log_ts on tablet_meta
|
|
||||||
update_table_store_flag = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const int64_t release_memtable_log_ts = get_merge_table_result.log_ts_range_.end_log_ts_;
|
const int64_t release_memtable_log_ts = old_tablet->get_clog_checkpoint_ts();
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
} else if (update_table_store_flag && OB_FAIL(update_tablet_directly(get_merge_table_result))) {
|
} else if (update_table_store_flag && OB_FAIL(update_tablet_directly(get_merge_table_result))) {
|
||||||
LOG_WARN("failed to update tablet directly", K(ret), K(get_merge_table_result), K(update_table_store_flag));
|
LOG_WARN("failed to update tablet directly", K(ret), K(get_merge_table_result), K(update_table_store_flag));
|
||||||
@ -783,7 +783,6 @@ int ObTabletMergeCtx::update_tablet_directly(const ObGetMergeTablesResult &get_m
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const int64_t rebuild_seq = ls_handle_.get_ls()->get_rebuild_seq();
|
const int64_t rebuild_seq = ls_handle_.get_ls()->get_rebuild_seq();
|
||||||
log_ts_range_ = get_merge_table_result.log_ts_range_;
|
log_ts_range_ = get_merge_table_result.log_ts_range_;
|
||||||
int64_t clog_checkpoint_ts = get_merge_table_result.log_ts_range_.end_log_ts_;
|
|
||||||
|
|
||||||
ObTableHandleV2 empty_table_handle;
|
ObTableHandleV2 empty_table_handle;
|
||||||
ObUpdateTableStoreParam param(
|
ObUpdateTableStoreParam param(
|
||||||
@ -793,7 +792,7 @@ int ObTabletMergeCtx::update_tablet_directly(const ObGetMergeTablesResult &get_m
|
|||||||
schema_ctx_.storage_schema_,
|
schema_ctx_.storage_schema_,
|
||||||
rebuild_seq,
|
rebuild_seq,
|
||||||
param_.is_major_merge(),
|
param_.is_major_merge(),
|
||||||
clog_checkpoint_ts);
|
0/*clog_checkpoint_ts*/);
|
||||||
ObTabletHandle new_tablet_handle;
|
ObTabletHandle new_tablet_handle;
|
||||||
if (OB_FAIL(ls_handle_.get_ls()->update_tablet_table_store(
|
if (OB_FAIL(ls_handle_.get_ls()->update_tablet_table_store(
|
||||||
param_.tablet_id_, param, new_tablet_handle))) {
|
param_.tablet_id_, param, new_tablet_handle))) {
|
||||||
|
|||||||
@ -711,25 +711,11 @@ TEST_F(TestCompactionPolicy, check_mini_merge_basic)
|
|||||||
ObGetMergeTablesParam param;
|
ObGetMergeTablesParam param;
|
||||||
param.merge_type_ = ObMergeType::MINI_MERGE;
|
param.merge_type_ = ObMergeType::MINI_MERGE;
|
||||||
ObGetMergeTablesResult result;
|
ObGetMergeTablesResult result;
|
||||||
ret = ObPartitionMergePolicy::get_mini_merge_tables(param, 0, *tablet_handle_.get_obj(), result);
|
|
||||||
ASSERT_EQ(OB_SUCCESS, ret);
|
|
||||||
ASSERT_EQ(3, result.handle_.get_count());
|
|
||||||
|
|
||||||
tablet_handle_.get_obj()->tablet_meta_.clog_checkpoint_ts_ = 300;
|
tablet_handle_.get_obj()->tablet_meta_.clog_checkpoint_ts_ = 300;
|
||||||
tablet_handle_.get_obj()->tablet_meta_.snapshot_version_ = 300;
|
tablet_handle_.get_obj()->tablet_meta_.snapshot_version_ = 300;
|
||||||
result.reset();
|
|
||||||
ret = ObPartitionMergePolicy::get_mini_merge_tables(param, 0, *tablet_handle_.get_obj(), result);
|
ret = ObPartitionMergePolicy::get_mini_merge_tables(param, 0, *tablet_handle_.get_obj(), result);
|
||||||
ASSERT_EQ(OB_SUCCESS, ret);
|
ASSERT_EQ(OB_NO_NEED_MERGE, ret);
|
||||||
ASSERT_EQ(result.update_tablet_directly_, true);
|
ASSERT_EQ(result.update_tablet_directly_, false);
|
||||||
|
|
||||||
tablet_handle_.get_obj()->tablet_meta_.clog_checkpoint_ts_ = 280;
|
|
||||||
tablet_handle_.get_obj()->tablet_meta_.snapshot_version_ = 280;
|
|
||||||
result.reset();
|
|
||||||
ret = ObPartitionMergePolicy::get_mini_merge_tables(param, 0, *tablet_handle_.get_obj(), result);
|
|
||||||
ASSERT_EQ(OB_SUCCESS, ret);
|
|
||||||
ASSERT_EQ(3, result.handle_.get_count());
|
|
||||||
ASSERT_EQ(300, result.log_ts_range_.end_log_ts_);
|
|
||||||
ASSERT_EQ(result.update_tablet_directly_, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(TestCompactionPolicy, check_minor_merge_basic)
|
TEST_F(TestCompactionPolicy, check_minor_merge_basic)
|
||||||
@ -877,7 +863,7 @@ TEST_F(TestCompactionPolicy, check_no_need_major_merge)
|
|||||||
int main(int argc, char **argv)
|
int main(int argc, char **argv)
|
||||||
{
|
{
|
||||||
system("rm -rf test_compaction_policy.log");
|
system("rm -rf test_compaction_policy.log");
|
||||||
OB_LOGGER.set_file_name("test_compaction_policy.log", true);
|
OB_LOGGER.set_file_name("test_compaction_policy.log");
|
||||||
OB_LOGGER.set_log_level("DEBUG");
|
OB_LOGGER.set_log_level("DEBUG");
|
||||||
CLOG_LOG(INFO, "begin unittest: test_compaction_policy");
|
CLOG_LOG(INFO, "begin unittest: test_compaction_policy");
|
||||||
::testing::InitGoogleTest(&argc, argv);
|
::testing::InitGoogleTest(&argc, argv);
|
||||||
|
|||||||
Reference in New Issue
Block a user