diff --git a/mittest/mtlenv/storage/test_tablet_create_delete_helper.cpp b/mittest/mtlenv/storage/test_tablet_create_delete_helper.cpp index 294de388fa..1d8f2d1d9c 100644 --- a/mittest/mtlenv/storage/test_tablet_create_delete_helper.cpp +++ b/mittest/mtlenv/storage/test_tablet_create_delete_helper.cpp @@ -1143,6 +1143,8 @@ TEST_F(TestTabletCreateDeleteHelper, abort_create_tablets_no_redo) trans_flags.for_replay_ = false; trans_flags.tx_id_ = 1; trans_flags.scn_ = share::SCN::invalid_scn(); + trans_flags.redo_submitted_ = true; + trans_flags.redo_synced_ = true; // mock, no prepare(as if prepare create failed) // and log ts is invalid @@ -1847,7 +1849,7 @@ TEST_F(TestTabletCreateDeleteHelper, roll_back_prepare_remove_tablet) ObMulSourceDataNotifyArg trans_flags; trans_flags.for_replay_ = false; trans_flags.tx_id_ = 1; - const share::SCN val = share::SCN::minus(share::SCN::max_scn(), 200); + const share::SCN val = share::SCN::minus(share::SCN::max_scn(), 300); trans_flags.scn_ = val; ObLSHandle ls_handle; @@ -1906,6 +1908,10 @@ TEST_F(TestTabletCreateDeleteHelper, roll_back_prepare_remove_tablet) ASSERT_EQ(OB_SUCCESS, ret); tablet_handle.get_obj()->get_tx_data(tx_data); ASSERT_EQ(ObTabletStatus::NORMAL, tx_data.tablet_status_); + + trans_flags.scn_ = share::SCN::plus(trans_flags.scn_, 1); + ret = ls_tablet_service.on_abort_remove_tablets(arg, trans_flags); + ASSERT_NE(OB_SUCCESS, ret); } } @@ -2369,7 +2375,7 @@ TEST_F(TestTabletCreateDeleteHelper, partial_prepare_remove_and_full_abort_remov int ret = OB_SUCCESS; ObMulSourceDataNotifyArg trans_flags; - trans_flags.for_replay_ = false; + trans_flags.for_replay_ = true; trans_flags.tx_id_ = 1; trans_flags.scn_ = share::SCN::minus(share::SCN::max_scn(), 100); @@ -2427,7 +2433,7 @@ TEST_F(TestTabletCreateDeleteHelper, partial_prepare_remove_and_full_abort_remov ASSERT_EQ(OB_SUCCESS, ret); trans_flags.tx_id_ = 2; - trans_flags.scn_ = share::SCN::invalid_scn(); + trans_flags.scn_ = share::SCN::minus(share::SCN::max_scn(), 95); // mock partial prepare remove ret = ls_tablet_service.on_prepare_remove_tablets(partial_arg, trans_flags); ASSERT_EQ(OB_SUCCESS, ret); @@ -2447,7 +2453,6 @@ TEST_F(TestTabletCreateDeleteHelper, partial_prepare_remove_and_full_abort_remov ASSERT_EQ(ObTabletStatus::NORMAL, tx_data.tablet_status_); ASSERT_TRUE(tablet_handle.get_obj()->is_data_tablet()); - key.tablet_id_ = 2; ret = ObTabletCreateDeleteHelper::get_tablet(key, tablet_handle); ASSERT_EQ(OB_SUCCESS, ret); @@ -2913,7 +2918,7 @@ TEST_F(TestTabletCreateDeleteHelper, tablet_not_exist_commit) int main(int argc, char **argv) { system("rm -f test_tablet_create_delete_helper.log*"); - OB_LOGGER.set_file_name("test_tablet_create_delete_helper.log", true); + OB_LOGGER.set_file_name("test_tablet_create_delete_helper.log"); OB_LOGGER.set_log_level("INFO"); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index e7e000080c..2b7903eff0 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -266,6 +266,17 @@ void ObMemtable::destroy() snapshot_version_.set_max(); } +int ObMemtable::safe_to_destroy(bool &is_safe) +{ + int ret = OB_SUCCESS; + + is_safe = (0 == get_ref() && + 0 == get_write_ref() && + 0 == get_unsubmitted_cnt() && + 0 == get_unsynced_cnt()); + + return ret; +} //////////////////////////////////////////////////////////////////////////////////////////////////// // Public Functions: set/lock @@ -1775,7 +1786,10 @@ bool ObMemtable::ready_for_flush_() migration_clog_checkpoint_scn >= get_end_scn() && 0 != unsynced_cnt && multi_source_data_.get_all_unsync_cnt_for_multi_data() == unsynced_cnt) { + ATOMIC_STORE(&unsubmitted_cnt_, 0); + ATOMIC_STORE(&unsynced_cnt_, 0); bool_ret = true; + TRANS_LOG(INFO, "skip ready for flush for migration", KPC(this)); } if (bool_ret) { if (OB_FAIL(resolve_snapshot_version_())) { diff --git a/src/storage/memtable/ob_memtable.h b/src/storage/memtable/ob_memtable.h index cde32f2c71..6dbb5e1ed0 100644 --- a/src/storage/memtable/ob_memtable.h +++ b/src/storage/memtable/ob_memtable.h @@ -174,6 +174,7 @@ public: const int64_t schema_version, const uint32_t freeze_clock); virtual void destroy(); + virtual int safe_to_destroy(bool &is_safe); OB_INLINE void reset() { destroy(); } public: diff --git a/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp b/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp index b65b70ea9e..412323f1ca 100644 --- a/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp +++ b/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp @@ -288,10 +288,13 @@ int ObTenantMetaMemMgr::gc_tables_in_queue(bool &all_table_cleaned) } else { TableGCItem *item = static_cast(ptr); ObITable *table = item->table_; + bool is_safe = false; if (OB_ISNULL(table)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("the table in gc item is nullptr", K(ret), KP(item)); - } else { + } else if (OB_FAIL(table->safe_to_destroy(is_safe))) { + LOG_WARN("fail to check safe_to_destroy", K(ret), KPC(table)); + } else if (is_safe) { ObITable::TableType table_type = item->table_type_; int64_t index = static_cast(table_type); if (OB_UNLIKELY(!ObITable::is_table_type_valid(table_type) || nullptr == pool_arr_[index])) { @@ -301,9 +304,11 @@ int ObTenantMetaMemMgr::gc_tables_in_queue(bool &all_table_cleaned) pool_arr_[index]->free_obj(static_cast(table)); table_cnt_arr[index]++; } + } else if (TC_REACH_TIME_INTERVAL(1000 * 1000)) { + LOG_INFO("the table is unsafe to destroy", KPC(table)); } - if (OB_SUCC(ret)) { + if (OB_SUCC(ret) && is_safe) { left_recycle_cnt--; ob_free(item); item = nullptr; diff --git a/src/storage/ob_i_table.cpp b/src/storage/ob_i_table.cpp index 5cb4feb1eb..32584e2de1 100644 --- a/src/storage/ob_i_table.cpp +++ b/src/storage/ob_i_table.cpp @@ -119,6 +119,12 @@ void ObITable::reset() key_.reset(); } +int ObITable::safe_to_destroy(bool &is_safe) +{ + is_safe = true; + return OB_SUCCESS; +} + int ObITable::exist( ObStoreCtx &ctx, const uint64_t table_id, diff --git a/src/storage/ob_i_table.h b/src/storage/ob_i_table.h index 1b121ab0cd..c6f4555112 100644 --- a/src/storage/ob_i_table.h +++ b/src/storage/ob_i_table.h @@ -160,6 +160,7 @@ public: int init(const TableKey &table_key); void reset(); + virtual int safe_to_destroy(bool &is_safe); OB_INLINE const TableKey &get_key() const { return key_; } void set_scn_range(share::ObScnRange scn_range) { key_.scn_range_ = scn_range; } void set_table_type(ObITable::TableType table_type) { key_.table_type_ = table_type; } diff --git a/src/storage/tablet/ob_tablet_create_delete_helper.cpp b/src/storage/tablet/ob_tablet_create_delete_helper.cpp index 6dbe36a127..2cc3651dcb 100644 --- a/src/storage/tablet/ob_tablet_create_delete_helper.cpp +++ b/src/storage/tablet/ob_tablet_create_delete_helper.cpp @@ -918,7 +918,7 @@ int ObTabletCreateDeleteHelper::do_abort_create_tablet( // If tx log ts equals SCN::max_scn(), it means redo callback has not been called. // Thus, we should handle this situation ret = OB_ERR_UNEXPECTED; - LOG_WARN("log ts is smaller than tx log ts", K(ret), K(tablet_id), K(trans_flags), K(tx_data)); + LOG_WARN("log ts is no bigger than tx log ts", K(ret), K(tablet_id), K(trans_flags), K(tx_data)); } else if (OB_UNLIKELY(trans_flags.tx_id_ != tx_data.tx_id_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tx id does not equal", K(ret), K(tablet_id), K(trans_flags), K(tx_data)); @@ -1240,7 +1240,7 @@ int ObTabletCreateDeleteHelper::do_abort_remove_tablet( && trans_flags.scn_ != SCN::invalid_scn() && trans_flags.scn_ <= tx_data.tx_scn_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("log ts is smaller than tx log ts", K(ret), K(key), K(trans_flags), K(tx_data)); + LOG_WARN("log ts is no bigger than tx log ts", K(ret), K(key), K(trans_flags), K(tx_data)); } else if (OB_UNLIKELY(trans_flags.tx_id_ != tx_data.tx_id_)) { is_valid = false; LOG_INFO("tx id does not equal", K(ret), K(key), K(trans_flags), K(tx_data)); diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 09369ab977..3e055a0ec5 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -5653,22 +5653,27 @@ int ObPartTransCtx::notify_data_source_(const NotifyType notify_type, const bool is_force_kill) { int ret = OB_SUCCESS; - ObMulSourceDataNotifyArg arg; - arg.tx_id_ = trans_id_; - arg.scn_ = log_ts; - arg.trans_version_ = ctx_tx_data_.get_commit_version(); - arg.for_replay_ = for_replay; - arg.notify_type_ = notify_type; - arg.is_force_kill_ = is_force_kill; + if (is_exiting_ && !is_follower_()) { + // do nothing + } else { + ObMulSourceDataNotifyArg arg; + arg.tx_id_ = trans_id_; + arg.scn_ = log_ts; + arg.trans_version_ = ctx_tx_data_.get_commit_version(); + arg.for_replay_ = for_replay; + arg.notify_type_ = notify_type; + arg.is_force_kill_ = is_force_kill; - int64_t total_time = 0; + int64_t total_time = 0; - if (OB_FAIL(ObMulSourceTxDataNotifier::notify(notify_array, notify_type, arg, this, total_time))) { - TRANS_LOG(WARN, "notify data source failed", K(ret), K(arg)); - } - if (notify_array.count() > 0) { - TRANS_LOG(INFO, "notify MDS", K(ret), K(trans_id_), K(ls_id_), K(notify_type), K(log_ts), - K(notify_array), K(total_time)); + if (OB_FAIL( + ObMulSourceTxDataNotifier::notify(notify_array, notify_type, arg, this, total_time))) { + TRANS_LOG(WARN, "notify data source failed", K(ret), K(arg)); + } + if (notify_array.count() > 0) { + TRANS_LOG(INFO, "notify MDS", K(ret), K(trans_id_), K(ls_id_), K(notify_type), K(log_ts), + K(notify_array), K(total_time)); + } } return ret; } @@ -5693,6 +5698,9 @@ int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_sou } else if (is_follower_()) { ret = OB_NOT_MASTER; TRANS_LOG(WARN, "can not register mds on a follower", K(ret), K(data_source_type), K(len), KPC(this)); + } else if (is_committing_()) { + ret = OB_TRANS_HAS_DECIDED; + TRANS_LOG(WARN, "can not register mds in committing part_ctx", K(ret), KPC(this)); } else if (OB_ISNULL(ptr = mtl_malloc(len, "MultiTxData"))) { ret = OB_ALLOCATE_MEMORY_FAILED; TRANS_LOG(WARN, "allocate memory failed", KR(ret), K(data_source_type), K(len)); @@ -6440,10 +6448,10 @@ int ObPartTransCtx::do_force_kill_tx_() if (OB_FAIL(gen_total_mds_array_(tmp_array))) { TRANS_LOG(WARN, "gen total mds array failed", KR(ret), K(*this)); - // } else if (OB_FAIL(notify_data_source_(NotifyType::ON_ABORT, - // ctx_tx_data_.get_end_log_ts() /*invalid_scn*/, false, - // tmp_array, true /*is_force_kill*/))) { - // TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this)); + } else if (OB_FAIL(notify_data_source_(NotifyType::ON_ABORT, + ctx_tx_data_.get_end_log_ts() /*invalid_scn*/, false, + tmp_array, true /*is_force_kill*/))) { + TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this)); } else { trans_kill_(); // Force kill cannot guarantee the consistency, so we just set end_log_ts diff --git a/unittest/storage/test_sstable_log_ts_range_cut.cpp b/unittest/storage/test_sstable_log_ts_range_cut.cpp index d8aa05e9d7..168365b4bf 100644 --- a/unittest/storage/test_sstable_log_ts_range_cut.cpp +++ b/unittest/storage/test_sstable_log_ts_range_cut.cpp @@ -118,6 +118,7 @@ TEST_F(TestSSTableScnRangeCut, sstable_scn_range_no_cross_and_continue) ASSERT_EQ(OB_SUCCESS, ret); ret = tablet_table_store.cut_ha_sstable_scn_range_(minor_sstables, tables_handle); ASSERT_EQ(OB_SUCCESS, ret); + tables_handle.meta_mem_mgr_ = nullptr; ASSERT_EQ(0, tables_handle.get_table(0)->key_.scn_range_.start_scn_.get_val_for_inner_table_field()); ASSERT_EQ(100, tables_handle.get_table(0)->key_.scn_range_.end_scn_.get_val_for_inner_table_field()); @@ -160,6 +161,7 @@ TEST_F(TestSSTableScnRangeCut, sstable_scn_range_is_not_continue) ASSERT_EQ(OB_SUCCESS, ret); ret = tablet_table_store.cut_ha_sstable_scn_range_(minor_sstables, tables_handle); ASSERT_EQ(OB_ERR_UNEXPECTED, ret); + tables_handle.meta_mem_mgr_ = nullptr; } @@ -193,6 +195,7 @@ TEST_F(TestSSTableScnRangeCut, sstable_scn_range_contain) ASSERT_EQ(OB_SUCCESS, ret); ret = tablet_table_store.cut_ha_sstable_scn_range_(minor_sstables, tables_handle); ASSERT_EQ(OB_SUCCESS, ret); + tables_handle.meta_mem_mgr_ = nullptr; ASSERT_EQ(0, tables_handle.get_table(0)->key_.scn_range_.start_scn_.get_val_for_inner_table_field()); ASSERT_EQ(100, tables_handle.get_table(0)->key_.scn_range_.end_scn_.get_val_for_inner_table_field()); @@ -235,6 +238,7 @@ TEST_F(TestSSTableScnRangeCut, sstable_scn_range_has_overlap) ASSERT_EQ(OB_SUCCESS, ret); ret = tablet_table_store.cut_ha_sstable_scn_range_(minor_sstables, tables_handle); ASSERT_EQ(OB_SUCCESS, ret); + tables_handle.meta_mem_mgr_ = nullptr; ASSERT_EQ(0, tables_handle.get_table(0)->key_.scn_range_.start_scn_.get_val_for_inner_table_field()); ASSERT_EQ(100, tables_handle.get_table(0)->key_.scn_range_.end_scn_.get_val_for_inner_table_field()); @@ -252,6 +256,8 @@ TEST_F(TestSSTableScnRangeCut, sstable_scn_range_has_overlap) int main(int argc, char** argv) { + system("rm -f test_sstable_log_ts_range_cut.log*"); + OB_LOGGER.set_file_name("test_sstable_log_ts_range_cut.log"); OB_LOGGER.set_log_level("INFO"); testing::InitGoogleTest(&argc, argv); oceanbase::lib::set_memory_limit(40L << 30);