From ced89ec6bdf1c4497a3ff1a3242a041b880fed5a Mon Sep 17 00:00:00 2001 From: Handora Date: Thu, 28 Mar 2024 02:15:21 +0000 Subject: [PATCH] [BUG] only set safe_to_destroy when all data on memtable has been synced --- mittest/simple_server/CMakeLists.txt | 1 + .../test_memtable_new_safe_to_destroy.cpp | 264 ++++++++++++++++++ src/logservice/ob_log_service.h | 1 + src/storage/memtable/ob_memtable.cpp | 68 ++++- src/storage/memtable/ob_memtable.h | 4 +- unittest/storage/test_compaction_policy.cpp | 1 - 6 files changed, 327 insertions(+), 12 deletions(-) create mode 100644 mittest/simple_server/test_memtable_new_safe_to_destroy.cpp diff --git a/mittest/simple_server/CMakeLists.txt b/mittest/simple_server/CMakeLists.txt index 1d5f1e6c9..87c2e0f48 100644 --- a/mittest/simple_server/CMakeLists.txt +++ b/mittest/simple_server/CMakeLists.txt @@ -111,6 +111,7 @@ ob_unittest_observer(test_callbacks_with_reverse_order test_callbacks_with_rever ob_unittest_observer(test_transfer_tx_data test_transfer_with_smaller_tx_data.cpp) ob_unittest_observer(test_transfer_in_after_abort test_transfer_in_after_abort.cpp) ob_unittest_observer(test_transfer_commit_action test_transfer_with_commit_action.cpp) +ob_unittest_observer(test_memtable_new_safe_to_destroy test_memtable_new_safe_to_destroy.cpp) # TODO(muwei.ym): open later ob_ha_unittest_observer(test_transfer_handler storage_ha/test_transfer_handler.cpp) ob_ha_unittest_observer(test_transfer_and_restart_basic storage_ha/test_transfer_and_restart_basic.cpp) diff --git a/mittest/simple_server/test_memtable_new_safe_to_destroy.cpp b/mittest/simple_server/test_memtable_new_safe_to_destroy.cpp new file mode 100644 index 000000000..ee4a95f3f --- /dev/null +++ b/mittest/simple_server/test_memtable_new_safe_to_destroy.cpp @@ -0,0 +1,264 @@ +/** + * Copyright (c) 2024 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#include +#include +#define protected public +#define private public + +#include "env/ob_simple_cluster_test_base.h" +#include "mittest/env/ob_simple_server_helper.h" +#include "storage/tx_storage/ob_ls_service.h" +#include "storage/tx/ob_tx_loop_worker.h" +#include "storage/tx/ob_trans_part_ctx.h" +#include "storage/tx/ob_trans_submit_log_cb.h" + + +static const char *TEST_FILE_NAME = "test_memtable_new_safe_to_destroy"; + +namespace oceanbase +{ + +ObTransID qcc_tx_id; +namespace transaction +{ +int ObTxLogCb::on_success() +{ + int ret = OB_SUCCESS; + const ObTransID tx_id = trans_id_; + + if (!is_inited_) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "ObTxLogCb not inited", K(ret)); + } else if (NULL == ctx_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "ctx is null", K(ret), K(tx_id), KP(ctx_)); + } else { + ObPartTransCtx *part_ctx = static_cast(ctx_); + while (qcc_tx_id == tx_id) { + TRANS_LOG(INFO, "qcc debug", KPC(part_ctx), K(tx_id)); + fprintf(stdout, "qcc debug\n"); + usleep(1 * 1000 * 1000); + } + if (OB_FAIL(part_ctx->on_success(this))) { + TRANS_LOG(WARN, "sync log success callback error", K(ret), K(tx_id)); + } + } + + return ret; +} + +} + + +namespace unittest +{ + + +#define EXE_SQL(sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + +#define EXE_SQL_FMT(...) \ + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \ + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + +#define WRITE_SQL_BY_CONN(conn, sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); + +#define WRITE_SQL_FMT_BY_CONN(conn, ...) \ + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); + +#define READ_SQL_BY_CONN(conn, sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), read_res)); + + +class ObTestMemtableNewSafeToDestroy : public ObSimpleClusterTestBase +{ +public: + ObTestMemtableNewSafeToDestroy() : ObSimpleClusterTestBase(TEST_FILE_NAME) {} + void create_test_tenant(uint64_t &tenant_id) + { + TRANS_LOG(INFO, "create_tenant start"); + ASSERT_EQ(OB_SUCCESS, create_tenant()); + ASSERT_EQ(OB_SUCCESS, get_tenant_id(tenant_id)); + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); + TRANS_LOG(INFO, "create_tenant end", K(tenant_id)); + } + void prepare_sys_env() + { + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy(); + int64_t affected_rows = 0; + ObSqlString sql; + EXE_SQL("alter system set debug_sync_timeout = '2000s'"); + } + void prepare_tenant_env() + { + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); + int64_t affected_rows = 0; + ObSqlString sql; + sqlclient::ObISQLConnection *connection = nullptr; + ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection)); + ASSERT_NE(nullptr, connection); + WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_timeout = 10000000000"); + WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_idle_timeout = 10000000000"); + WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_query_timeout = 10000000000"); + WRITE_SQL_BY_CONN(connection, "alter system set undo_retention = 0"); + } + + void get_ls(const uint64_t tenant_id, + const share::ObLSID ls_id, + ObLS *&ls) + { + ls = nullptr; + share::ObTenantSwitchGuard tenant_guard; + ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(tenant_id)); + + ObLSService *ls_svr = MTL(ObLSService*); + ASSERT_NE(nullptr, ls_svr); + ObLSHandle handle; + ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, handle, ObLSGetMod::STORAGE_MOD)); + ASSERT_NE(nullptr, ls = handle.get_ls()); + } + + void get_memtable(const uint64_t tenant_id, + const share::ObLSID ls_id, + const ObTabletID tablet_id, + ObTableHandleV2 &handle) + { + ObLS *ls = NULL; + get_ls(tenant_id, ls_id, ls); + ObTabletHandle tablet_handle; + ObTablet *tablet = nullptr; + ASSERT_EQ(OB_SUCCESS, ls->get_tablet_svr()->get_tablet(tablet_id, tablet_handle)); + tablet = tablet_handle.get_obj(); + ASSERT_EQ(OB_SUCCESS, tablet->get_active_memtable(handle)); + } + + void get_tablet(const uint64_t tenant_id, + const share::ObLSID ls_id, + const ObTabletID tablet_id, + ObTabletHandle &tablet_handle) + { + ObLS *ls = NULL; + get_ls(tenant_id, ls_id, ls); + ASSERT_EQ(OB_SUCCESS, ls->get_tablet_svr()->get_tablet(tablet_id, tablet_handle)); + } + +}; + + +TEST_F(ObTestMemtableNewSafeToDestroy, test_safe_to_destroy) +{ + ObSqlString sql; + int64_t affected_rows = 0; + + // ============================== Phase1. create tenant and table ============================== + TRANS_LOG(INFO, "create tenant start"); + uint64_t tenant_id = 0; + create_test_tenant(tenant_id); + TRANS_LOG(INFO, "create tenant end"); + + prepare_sys_env(); + + share::ObTenantSwitchGuard tenant_guard; + ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(tenant_id)); + + TRANS_LOG(INFO, "create table start"); + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); + EXE_SQL("create table qcc (a int)"); + // wait minor freeze when create table + usleep(10 * 1000 * 1000); + TRANS_LOG(INFO, "create_table end"); + + prepare_tenant_env(); + + sqlclient::ObISQLConnection *user_connection = nullptr; + ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(user_connection)); + ASSERT_NE(nullptr, user_connection); + + TRANS_LOG(INFO, "start the txn"); + WRITE_SQL_BY_CONN(user_connection, "begin;"); + WRITE_SQL_FMT_BY_CONN(user_connection, "insert into qcc values(1);"); + + + + ASSERT_EQ(0, SSH::find_tx(user_connection, qcc_tx_id)); + ObTableHandleV2 handle; + ObTabletID tablet_id; + ASSERT_EQ(0, SSH::select_table_tablet(tenant_id, "qcc", tablet_id)); + get_memtable(tenant_id, share::ObLSID(1001), tablet_id, handle); + memtable::ObIMemtable *imemtable; + handle.get_memtable(imemtable); + memtable::ObMemtable *memtable = dynamic_cast(imemtable); + TRANS_LOG(INFO, "qcc print", KPC(memtable)); + std::thread th([user_connection] () { + user_connection->commit(); + TRANS_LOG(INFO, "qcc not debug"); + fprintf(stdout, "qcc not debug\n"); + }); + + usleep(5 * 1000 * 1000); + + TRANS_LOG(INFO, "qcc print", KPC(memtable)); + bool is_safe = false; + // EXPECT_EQ(3, memtable->ref_cnt_); + // EXPECT_EQ(0, memtable->write_ref_cnt_); + // EXPECT_EQ(0, memtable->unsubmitted_cnt_); + + handle.reset(); + + storage::ObTabletMemtableMgr *memtable_mgr = memtable->get_memtable_mgr_(); + EXPECT_EQ(OB_SUCCESS, memtable_mgr->release_memtables()); + + TRANS_LOG(INFO, "qcc print2", KPC(memtable));; + + ObTabletHandle tablet_handle; + get_tablet(tenant_id, share::ObLSID(1001), tablet_id, tablet_handle); + tablet_handle.get_obj()->reset_memtable(); + + TRANS_LOG(INFO, "qcc print3", KPC(memtable));; + + usleep(5 * 1000 * 1000); + + ObPartTransCtx *ctx = nullptr; + ASSERT_EQ(0, SSH::get_tx_ctx(tenant_id, share::ObLSID(1001), qcc_tx_id, ctx)); + + EXPECT_EQ(2, ctx->mt_ctx_.trans_mgr_.callback_list_.get_length()); + EXPECT_EQ(OB_SUCCESS, memtable->safe_to_destroy(is_safe)); + EXPECT_EQ(false, is_safe); + EXPECT_EQ(OB_SUCCESS, ctx->mt_ctx_.trans_mgr_.callback_list_.tx_print_callback()); + + qcc_tx_id.reset(); + + usleep(1 * 1000 * 1000); + th.join(); + +} + +} +} + +int main(int argc, char **argv) +{ + using namespace oceanbase::unittest; + + oceanbase::unittest::init_log_and_gtest(argc, argv); + OB_LOGGER.set_log_level("info"); + ::testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} diff --git a/src/logservice/ob_log_service.h b/src/logservice/ob_log_service.h index 343e01b99..6d7f66b09 100644 --- a/src/logservice/ob_log_service.h +++ b/src/logservice/ob_log_service.h @@ -225,6 +225,7 @@ public: cdc::ObCdcService *get_cdc_service() { return &cdc_service_; } ObLogRestoreService *get_log_restore_service() { return &restore_service_; } ObLogReplayService *get_log_replay_service() { return &replay_service_; } + ObLogApplyService *get_log_apply_service() { return &apply_service_; } #ifdef OB_BUILD_ARBITRATION ObArbitrationService *get_arbitration_service() { return &arb_service_; } #endif diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 420d78fc6..4886d07b4 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -49,9 +49,10 @@ #include "storage/tx_storage/ob_tenant_freezer.h" #include "storage/column_store/ob_column_oriented_sstable.h" #include "storage/access/ob_row_sample_iterator.h" - #include "storage/concurrency_control/ob_trans_stat_row.h" +#include "logservice/ob_log_service.h" + namespace oceanbase { using namespace common; @@ -123,7 +124,6 @@ ObMemtable::ObMemtable() freeze_scn_(SCN::max_scn()), max_end_scn_(ObScnRange::MIN_SCN), rec_scn_(SCN::max_scn()), - state_(ObMemtableState::INVALID), freeze_state_(ObMemtableFreezeState::INVALID), timestamp_(0), is_tablet_freeze_(false), @@ -193,7 +193,6 @@ int ObMemtable::init(const ObITable::TableKey &table_key, } else { mode_ = MTL(lib::Worker::CompatMode); } - state_ = ObMemtableState::ACTIVE; freeze_state_ = ObMemtableFreezeState::NOT_READY_FOR_FLUSH; timestamp_ = ObTimeUtility::current_time(); is_inited_ = true; @@ -261,7 +260,6 @@ void ObMemtable::destroy() max_data_schema_version_ = 0; max_column_cnt_ = 0; mt_stat_.reset(); - state_ = ObMemtableState::INVALID; freeze_state_ = ObMemtableFreezeState::INVALID; unsubmitted_cnt_ = 0; logging_blocked_ = false; @@ -296,6 +294,64 @@ int ObMemtable::safe_to_destroy(bool &is_safe) is_safe = (0 == unsubmitted_cnt); } + if (is_safe) { + // In scenarios where the memtable is forcefully remove (such as when the + // table is dropped or the ls goes offline), relying solely on the + // previously mentioned conditions(write_ref and unsubmitted_cnt) cannot + // guarantee that all the data on the memtable has been synced. This can + // lead to the memtable being destroyed prematurely, which in turn can cause + // later txns to encounter the coredump when trying to access data from the + // destroyed memtable. Therefore, we need to ensure that all data has indeed + // been synced before the memtable is safe to destroy. The solutions to + // the problem can be unified into the following two scenarios: + // 1. If the ls hasnot gone offline: + // In this case, we can rely on max decided scn to ensure that all data on + // the memtable has been synced. + // 2. If the ls has gone offline: + // In this case, the ls cannot provide a decided scn. Therefore, we rely + // on the apply status of apply service to decide whether all data have + // been synced. + share::SCN max_decided_scn = share::ObScnRange::MIN_SCN; + if (!is_inited_) { + is_safe = true; + TRANS_LOG(INFO, "memtable is not inited and safe to destroy", KPC(this)); + } else if (OB_FAIL(ls_handle_.get_ls()->get_max_decided_scn(max_decided_scn))) { + TRANS_LOG(WARN, "fail to get max decided scn", K(ret), K(max_decided_scn)); + is_safe = false; + } else { + is_safe = max_decided_scn >= get_max_end_scn(); + } + + // STATE_NOT_MATCH means ls is offlined and we need replies on the apply + // service to guarantee all logs have been applied + if (!is_safe && ret == OB_STATE_NOT_MATCH) { + ret = OB_SUCCESS; + bool is_done = false; + share::LSN end_lsn; + if (OB_FAIL(MTL(logservice::ObLogService*)->get_log_apply_service()-> + is_apply_done(ls_handle_.get_ls()->get_ls_id(), + is_done, + end_lsn))) { + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + is_safe = true; + TRANS_LOG(INFO, "apply is decided after ls removed when safe to destroy", + K(ret), K(end_lsn), K(is_done)); + } else { + TRANS_LOG(WARN, "fail to is_apply_done", K(ret), K(max_decided_scn)); + } + } else { + TRANS_LOG(INFO, "apply is decided after ls offlined when safe to destroy", + K(ret), K(end_lsn), K(is_done)); + if (is_done) { + is_safe = true; + } else { + is_safe = false; + } + } + } + } + return ret; } //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -2417,10 +2473,6 @@ bool ObMemtable::is_active_memtable() const bool ObMemtable::is_frozen_memtable() const { - // return ObMemtableState::MAJOR_FROZEN == state_ - // || ObMemtableState::MAJOR_MERGING == state_ - // || ObMemtableState::MINOR_FROZEN == state_ - // || ObMemtableState::MINOR_MERGING == state_; // Note (yanyuan.cxf) log_frozen_memstore_info() will use this func after local_allocator_ init // Now freezer_ and ls_ will not be released before memtable const uint32_t logstream_freeze_clock = OB_NOT_NULL(freezer_) ? freezer_->get_freeze_clock() : 0; diff --git a/src/storage/memtable/ob_memtable.h b/src/storage/memtable/ob_memtable.h index b6ccca697..e0728ef33 100644 --- a/src/storage/memtable/ob_memtable.h +++ b/src/storage/memtable/ob_memtable.h @@ -433,7 +433,6 @@ public: virtual bool is_inner_tablet() const { return key_.tablet_id_.is_inner_tablet(); } ObTabletID get_tablet_id() const { return key_.tablet_id_; } int set_snapshot_version(const share::SCN snapshot_version); - int64_t get_memtable_state() const { return state_; } int64_t get_freeze_state() const { return freeze_state_; } int64_t get_protection_clock() const { return local_allocator_.get_protection_clock(); } int64_t get_retire_clock() const { return local_allocator_.get_retire_clock(); } @@ -603,7 +602,7 @@ public: } } - INHERIT_TO_STRING_KV("ObITable", ObITable, KP(this), K_(timestamp), K_(state), + INHERIT_TO_STRING_KV("ObITable", ObITable, KP(this), K_(timestamp), K_(freeze_clock), K_(max_schema_version), K_(max_data_schema_version), K_(max_column_cnt), K_(write_ref_cnt), K_(local_allocator), K_(unsubmitted_cnt), K_(logging_blocked), K_(unset_active_memtable_logging_blocked), K_(resolved_active_memtable_left_boundary), @@ -749,7 +748,6 @@ private: share::SCN freeze_scn_; share::SCN max_end_scn_; share::SCN rec_scn_; - int64_t state_; int64_t freeze_state_; int64_t timestamp_; share::SCN migration_clog_checkpoint_scn_; diff --git a/unittest/storage/test_compaction_policy.cpp b/unittest/storage/test_compaction_policy.cpp index 165a1dcbf..d216aa33d 100644 --- a/unittest/storage/test_compaction_policy.cpp +++ b/unittest/storage/test_compaction_policy.cpp @@ -369,7 +369,6 @@ int TestCompactionPolicy::mock_memtable( memtable->write_ref_cnt_ = 0; memtable->unsubmitted_cnt_ = 0; memtable->is_tablet_freeze_ = true; - memtable->state_ = ObMemtableState::MINOR_FROZEN; memtable->set_resolved_active_memtable_left_boundary(true); memtable->set_frozen(); memtable->location_ = storage::checkpoint::ObFreezeCheckpointLocation::PREPARE;