diff --git a/mittest/simple_server/CMakeLists.txt b/mittest/simple_server/CMakeLists.txt index 1fe872edb..3f04175b9 100644 --- a/mittest/simple_server/CMakeLists.txt +++ b/mittest/simple_server/CMakeLists.txt @@ -29,6 +29,7 @@ ob_unittest_observer(test_observer_expand_shrink test_observer_expand_shrink.cpp ob_unittest_observer(test_replay_from_middle test_replay_from_middle.cpp) ob_unittest_observer(test_special_tablet_flush test_special_tablet_flush.cpp) ob_unittest_observer(test_tx_data_table_mit test_tx_data_table_mit.cpp) +ob_unittest_observer(test_tx_ctx_table_mit test_tx_ctx_table_mit.cpp) ob_unittest_observer(test_lock_table_persistence test_lock_table_persistence.cpp) ob_unittest_observer(test_tx_recover test_tx_recovery.cpp) ob_unittest_observer(test_tx_recover2 test_tx_recovery2.cpp) diff --git a/mittest/simple_server/test_tx_ctx_table_mit.cpp b/mittest/simple_server/test_tx_ctx_table_mit.cpp new file mode 100644 index 000000000..74a126483 --- /dev/null +++ b/mittest/simple_server/test_tx_ctx_table_mit.cpp @@ -0,0 +1,361 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#include +#define USING_LOG_PREFIX STORAGE +#define protected public +#define private public + +#include "env/ob_simple_cluster_test_base.h" +#include "env/ob_simple_server_restart_helper.h" +#include "lib/mysqlclient/ob_mysql_result.h" +#include "storage/access/ob_rows_info.h" +#include "storage/checkpoint/ob_data_checkpoint.h" +#include "storage/compaction/ob_schedule_dag_func.h" +#include "storage/compaction/ob_tablet_merge_task.h" +#include "storage/ls/ob_freezer.h" +#include "storage/ls/ob_ls.h" +#include "storage/ls/ob_ls_meta.h" +#include "storage/ls/ob_ls_tablet_service.h" +#include "storage/ls/ob_ls_tx_service.h" +#include "storage/meta_mem/ob_tablet_handle.h" +#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h" +#include "storage/ob_relative_table.h" +#include "storage/ob_storage_table_guard.h" +#include "storage/tx_storage/ob_ls_map.h" +#include "storage/tx_storage/ob_ls_service.h" +#include "storage/tx/ob_trans_part_ctx.h" + +#undef private +#undef protected + +static const char *TEST_FILE_NAME = "test_tx_ctx_table_mit"; +static const char *BORN_CASE_NAME = "ObTxCtxTableTest"; +static const char *RESTART_CASE_NAME = "ObTxCtxTableRestartTest"; + +bool SLEEP_BEFORE_DUMP_TX_CTX = false; +bool HAS_GOT_TX_CTX = false; +int64_t TX_CTX_TABLE_LAST_CHECKPOINT = 0; + + +namespace oceanbase +{ +using namespace transaction; +using namespace storage; +using namespace palf; +using namespace share; + +namespace storage +{ + +int ObTxCtxMemtableScanIterator::get_next_tx_ctx_table_info_(transaction::ObPartTransCtx *&tx_ctx, + ObTxCtxTableInfo &ctx_info) +{ + int ret = OB_SUCCESS; + bool need_retry = true; + + while (OB_SUCC(ret) && need_retry) { + if (OB_FAIL(ls_tx_ctx_iter_.get_next_tx_ctx(tx_ctx))) { + if (OB_ITER_END != ret) { + STORAGE_LOG(WARN, "ls_tx_ctx_iter_.get_next_tx_ctx failed", K(ret)); + } + } else if (OB_FAIL(tx_ctx->get_tx_ctx_table_info(ctx_info))) { + if (OB_TRANS_CTX_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + STORAGE_LOG(WARN, "tx_ctx->get_tx_ctx_table_info failed", K(ret)); + } + ls_tx_ctx_iter_.revert_tx_ctx(tx_ctx); + } else { + need_retry = false; + } + } + + if (OB_FAIL(ret)) { + STORAGE_LOG(INFO, "get next tx ctx table info failed", KR(ret), KPC(tx_ctx), K(ctx_info.tx_data_guard_)); + } else if (SLEEP_BEFORE_DUMP_TX_CTX) { + fprintf(stdout, "ready to dump tx ctx, undo status node ptr : %p\n", ctx_info.tx_data_guard_.tx_data()->undo_status_list_.head_); + fprintf(stdout, "sleep 20 seconds before dump\n"); + HAS_GOT_TX_CTX = true; + SLEEP_BEFORE_DUMP_TX_CTX = false; + ::sleep(20); + } + + return ret; +} + +}; + +namespace unittest +{ + + +class TestRunCtx +{ +public: + uint64_t tenant_id_ = 0; +}; + +TestRunCtx RunCtx; + +class ObTxCtxTableTest : public ObSimpleClusterTestBase +{ +public: + ObTxCtxTableTest() : ObSimpleClusterTestBase(TEST_FILE_NAME) {} + + void dump_ctx_with_merged_undo_action(); + +private: +}; + +#define EXE_SQL(sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + +#define EXE_SQL_FMT(...) \ + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \ + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + +#define WRITE_SQL_BY_CONN(conn, sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); + +#define WRITE_SQL_FMT_BY_CONN(conn, ...) \ + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); + +#define DEF_VAL_FOR_SQL \ + int ret = OB_SUCCESS; \ + ObSqlString sql; \ + int64_t affected_rows = 0; \ + +void ObTxCtxTableTest::dump_ctx_with_merged_undo_action() +{ + DEF_VAL_FOR_SQL; + common::ObMySQLProxy &tt1_proxy = get_curr_simple_server().get_sql_proxy2(); + common::ObMySQLProxy &sys_proxy = get_curr_simple_server().get_sql_proxy(); + sqlclient::ObISQLConnection *tt1_conn = nullptr; + sqlclient::ObISQLConnection *sys_conn = nullptr; + + + ASSERT_EQ(OB_SUCCESS, tt1_proxy.acquire(tt1_conn)); + ASSERT_NE(nullptr, tt1_conn); + ASSERT_EQ(OB_SUCCESS, sys_proxy.acquire(sys_conn)); + ASSERT_NE(nullptr, sys_conn); + + // 初始化测试用例所需参数 + WRITE_SQL_BY_CONN(sys_conn, "alter system set _private_buffer_size = '1B';"); + WRITE_SQL_BY_CONN(tt1_conn, "set ob_trx_timeout = 3000000000"); + WRITE_SQL_BY_CONN(tt1_conn, "set ob_trx_idle_timeout = 3000000000"); + WRITE_SQL_BY_CONN(tt1_conn, "set ob_query_timeout = 3000000000"); + WRITE_SQL_BY_CONN(tt1_conn, "alter system set undo_retention = 1800"); + sleep(5); + + // 建表 + WRITE_SQL_BY_CONN(tt1_conn, "create table if not exists test_tx_ctx_t (a int, b int)"); + + // 开事务 + WRITE_SQL_BY_CONN(tt1_conn, "begin"); + WRITE_SQL_BY_CONN(tt1_conn, "insert into test_tx_ctx_t values(1, 1);"); + for (int i = 0; i < 4; i++) { + WRITE_SQL_BY_CONN(tt1_conn, "insert into test_tx_ctx_t select a, b from test_tx_ctx_t;"); + } + + // 执行一次事务上下文表的转储,保证建表语句产生的多源事务能够被转储下去,retain ctx可以被释放 + SLEEP_BEFORE_DUMP_TX_CTX = false; + WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant all;"); + sleep(5); + + + // 确认当前上下文中有且只有这一个事务 + HEAP_VAR(ObMySQLProxy::MySQLResult, res_0) + { + common::sqlclient::ObMySQLResult *result = nullptr; + sql.assign( + "SELECT count(*) as cnt FROM oceanbase.__all_virtual_trans_stat where tenant_id = 1002 and ls_id = 1001;"); + int retry_times = 20; + int64_t cnt = 0; + + while (--retry_times >= 0) { + res_0.reuse(); + ASSERT_EQ(OB_SUCCESS, sys_conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), res_0)); + result = res_0.mysql_result(); + ASSERT_EQ(OB_SUCCESS, result->next()); + + ASSERT_EQ(OB_SUCCESS, result->get_int("cnt", cnt)); + if (1 == cnt) { + break; + } else { + fprintf(stdout, "waitting for tx ctx table mini merge to clear retain ctx ... \n"); + sleep(1); + } + } + ASSERT_EQ(1, cnt); + } + + // 获取当前上下文表的转储位点,用于确认下一次转储是否成功 + HEAP_VAR(ObMySQLProxy::MySQLResult, res_1) + { + common::sqlclient::ObMySQLResult *result = nullptr; + sql.assign( + "SELECT end_log_scn FROM oceanbase.__all_virtual_table_mgr where tenant_id = 1002 and ls_id = 1001 and tablet_id = 49401 and table_type = 12;"); + + res_1.reuse(); + ASSERT_EQ(OB_SUCCESS, sys_conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), res_1)); + result = res_1.mysql_result(); + + ASSERT_EQ(OB_SUCCESS, result->next()); + ASSERT_EQ(OB_SUCCESS, result->get_int("end_log_scn", TX_CTX_TABLE_LAST_CHECKPOINT)); + } + + WRITE_SQL_BY_CONN(tt1_conn, "savepoint x1"); + for (int i = 0; i < 2; i++) { + WRITE_SQL_BY_CONN(tt1_conn, "insert into test_tx_ctx_t select a, b from test_tx_ctx_t;"); + } + + + // 产生一个undo action,同时插入一些新的数据 + WRITE_SQL_BY_CONN(tt1_conn, "savepoint x2"); + for (int i = 0; i < 2; i++) { + WRITE_SQL_BY_CONN(tt1_conn, "insert into test_tx_ctx_t select a, b from test_tx_ctx_t;"); + } + WRITE_SQL_BY_CONN(tt1_conn, "rollback to savepoint x2"); + WRITE_SQL_BY_CONN(tt1_conn, "insert into test_tx_ctx_t select a, b from test_tx_ctx_t;"); + + + // 转储ctx table,设置SLEEP_BEFORE_DUMP_TX_CTX标志 + SLEEP_BEFORE_DUMP_TX_CTX = true; + HAS_GOT_TX_CTX = false; + WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant tt1 ls 1001 tablet_id = 49401;"); + + // 等待一段时间保证上下文表的转储已经拿到了带有undo node的tx data + while (!HAS_GOT_TX_CTX) { + fprintf(stdout, "waitting for scheduling tx ctx table merge dag ...\n"); + sleep(2); + } + + // 再次执行rollback,预期能merge掉第一次回滚产生的undo action + WRITE_SQL_BY_CONN(tt1_conn, "rollback to savepoint x1"); + { + sqlclient::ObISQLConnection *tt1_conn_2 = nullptr; + ASSERT_EQ(OB_SUCCESS, tt1_proxy.acquire(tt1_conn_2)); + ASSERT_NE(nullptr, tt1_conn_2); + + int64_t insert_start_ts = ObTimeUtil::fast_current_time(); + fprintf(stdout, "doing insert while dump tx ctx table sleeping...\n"); + while (ObTimeUtil::fast_current_time() - insert_start_ts < 30 * 1000 * 1000/* 30 seconds */) { + WRITE_SQL_BY_CONN(tt1_conn_2, "insert into test_tx_ctx_t values(3, 3)"); + if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) { + } + } + } + + // 确认上下文表转储成功 + HEAP_VAR(ObMySQLProxy::MySQLResult, res_2) + { + common::sqlclient::ObMySQLResult *result = nullptr; + sql.assign("SELECT end_log_scn FROM oceanbase.__all_virtual_table_mgr where tenant_id = 1002 and ls_id = 1001 and " + "tablet_id = 49401 and table_type = 12;"); + + int retry_times = 40; + int64_t end_log_scn = 0; + while (--retry_times >= 0) { + res_2.reuse(); + ASSERT_EQ(OB_SUCCESS, sys_conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), res_2)); + result = res_2.mysql_result(); + ASSERT_EQ(OB_SUCCESS, result->next()); + ASSERT_EQ(OB_SUCCESS, result->get_int("end_log_scn", end_log_scn)); + if (end_log_scn > TX_CTX_TABLE_LAST_CHECKPOINT) { + break; + } else { + fprintf(stdout, "waitting for tx ctx table mini merge to dump rollback tx data\n"); + } + } + ASSERT_GT(end_log_scn, TX_CTX_TABLE_LAST_CHECKPOINT); + } +} + +TEST_F(ObTxCtxTableTest, observer_start) { SERVER_LOG(INFO, "observer_start succ"); } + +TEST_F(ObTxCtxTableTest, add_tenant) +{ + // create tenant + ASSERT_EQ(OB_SUCCESS, create_tenant()); + ASSERT_EQ(OB_SUCCESS, get_tenant_id(RunCtx.tenant_id_)); + ASSERT_NE(0, RunCtx.tenant_id_); + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); + sleep(5); +} + +TEST_F(ObTxCtxTableTest, dump_ctx_with_merged_undo_action) +{ + SLEEP_BEFORE_DUMP_TX_CTX = false; + HAS_GOT_TX_CTX = false; + dump_ctx_with_merged_undo_action(); + fprintf(stdout, "dump ctx with merge undo action done\n"); +} + +class ObTxCtxTableRestartTest : public ObSimpleClusterTestBase +{ +public: + ObTxCtxTableRestartTest() : ObSimpleClusterTestBase(TEST_FILE_NAME) {} + +}; + +TEST_F(ObTxCtxTableRestartTest, observer_restart) +{ + // init sql proxy2 to use tenant tt1 + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); + SERVER_LOG(INFO, "observer restart succ"); +} + +TEST_F(ObTxCtxTableRestartTest, test_to_do) +{ +} + +} // namespace unittest +} // namespace oceanbase + +int main(int argc, char **argv) +{ + int c = 0; + int time_sec = 0; + int concurrency = 1; + char *log_level = (char *)"INFO"; + while (EOF != (c = getopt(argc, argv, "t:l:"))) { + switch (c) { + case 't': + time_sec = atoi(optarg); + break; + case 'l': + log_level = optarg; + oceanbase::unittest::ObSimpleClusterTestBase::enable_env_warn_log_ = false; + break; + case 'c': + concurrency = atoi(optarg); + break; + default: + break; + } + } + std::string gtest_file_name = std::string(TEST_FILE_NAME) + "_gtest.log"; + oceanbase::unittest::init_gtest_output(gtest_file_name); + + int ret = 0; + ObSimpleServerRestartHelper restart_helper(argc, argv, TEST_FILE_NAME, BORN_CASE_NAME, + RESTART_CASE_NAME); + restart_helper.set_sleep_sec(time_sec); + restart_helper.run(); + + return ret; +} diff --git a/src/storage/tx/ob_ctx_tx_data.cpp b/src/storage/tx/ob_ctx_tx_data.cpp index 65c26eb3f..64cafaca1 100644 --- a/src/storage/tx/ob_ctx_tx_data.cpp +++ b/src/storage/tx/ob_ctx_tx_data.cpp @@ -68,7 +68,6 @@ void ObCtxTxData::reset() ctx_mgr_ = nullptr; tx_data_guard_.reset(); read_only_ = false; - tx_commit_data_.reset(); } void ObCtxTxData::destroy() @@ -88,7 +87,6 @@ int ObCtxTxData::insert_into_tx_table() GET_TX_TABLE_(tx_table) if (OB_FAIL(ret)) { } else { - tx_commit_data_ = *(tx_data_guard_.tx_data()); if (OB_FAIL(insert_tx_data_(tx_table, tx_data_guard_.tx_data()))) { TRANS_LOG(WARN, "insert tx data failed", K(ret), K(*this)); } else { @@ -100,7 +98,7 @@ int ObCtxTxData::insert_into_tx_table() return ret; } -int ObCtxTxData::recover_tx_data(const ObTxData &tmp_tx_data) +int ObCtxTxData::recover_tx_data(ObTxDataGuard &rhs) { int ret = OB_SUCCESS; WLockGuard guard(lock_); @@ -109,12 +107,12 @@ int ObCtxTxData::recover_tx_data(const ObTxData &tmp_tx_data) if (OB_FAIL(ret)) { } else if (OB_FAIL(check_tx_data_writable_())) { - TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this)); - } else if (OB_FAIL(tx_table->alloc_tx_data(tx_data_guard_))) { - TRANS_LOG(WARN, "alloc tx data failed", KR(ret), K(tmp_tx_data)); - } else { - ObTxData *tx_data = tx_data_guard_.tx_data(); - *tx_data = tmp_tx_data; + TRANS_LOG(WARN, "tx data is not writeable", K(ret), KPC(this)); + } else if (OB_ISNULL(rhs.tx_data())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "input tx data guard is unexpected nullptr", K(ret), KPC(this)); + } else if (OB_FAIL(tx_data_guard_.init(rhs.tx_data()))) { + TRANS_LOG(WARN, "init tx data guard failed", K(ret), KPC(this)); } return ret; @@ -288,43 +286,86 @@ int32_t ObCtxTxData::get_state() const { RLockGuard guard(lock_); const ObTxData *tx_data = tx_data_guard_.tx_data(); - return (NULL != tx_data ? ATOMIC_LOAD(&tx_data->state_): ATOMIC_LOAD(&tx_commit_data_.state_)); + if (OB_ISNULL(tx_data)) { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "tx data is unexpected nullptr", KPC(this)); + return 0; + } else { + return ATOMIC_LOAD(&tx_data->state_); + } } const SCN ObCtxTxData::get_commit_version() const { RLockGuard guard(lock_); const ObTxData *tx_data = tx_data_guard_.tx_data(); - return (NULL != tx_data ? tx_data->commit_version_.atomic_load() : tx_commit_data_.commit_version_.atomic_load()); + if (OB_ISNULL(tx_data)) { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "tx data is unexpected nullptr", KPC(this)); + return SCN::invalid_scn(); + } else { + return tx_data->commit_version_.atomic_load(); + } } const SCN ObCtxTxData::get_start_log_ts() const { RLockGuard guard(lock_); const ObTxData *tx_data = tx_data_guard_.tx_data(); - SCN ctx_scn = (NULL != tx_data ? tx_data->start_scn_.atomic_load() : tx_commit_data_.start_scn_.atomic_load()); - // if (ctx_scn.is_max()) { - // ctx_scn.reset(); - // } - return ctx_scn; + if (OB_ISNULL(tx_data)) { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "tx data is unexpected nullptr", KPC(this)); + return SCN::invalid_scn(); + } else { + return tx_data->start_scn_.atomic_load(); + } } const SCN ObCtxTxData::get_end_log_ts() const { RLockGuard guard(lock_); const ObTxData *tx_data = tx_data_guard_.tx_data(); - SCN ctx_scn = (NULL != tx_data ? tx_data->end_scn_.atomic_load() : tx_commit_data_.end_scn_.atomic_load()); - // if (ctx_scn.is_max()) { - // ctx_scn.reset(); - // } - return ctx_scn; + if (OB_ISNULL(tx_data)) { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "tx data is unexpected nullptr", KPC(this)); + return SCN::invalid_scn(); + } else { + return tx_data->end_scn_.atomic_load(); + } } ObTransID ObCtxTxData::get_tx_id() const { RLockGuard guard(lock_); const ObTxData *tx_data = tx_data_guard_.tx_data(); - return (NULL != tx_data ? tx_data->tx_id_ : tx_commit_data_.tx_id_); + if (OB_ISNULL(tx_data)) { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "tx data is unexpected nullptr", KPC(this)); + return ObTransID(0); + } else { + return tx_data->tx_id_; + } +} + +int ObCtxTxData::get_tx_data(storage::ObTxDataGuard &tx_data_guard) +{ + int ret = OB_SUCCESS; + ObTxData *tx_data = tx_data_guard_.tx_data(); + if (OB_ISNULL(tx_data)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "tx data is unexpected nullptr", KR(ret), KPC(this)); + } else { + ret = tx_data_guard.init(tx_data); + } + return ret; +} + +int ObCtxTxData::get_tx_data_ptr(storage::ObTxData *&tx_data_ptr) +{ + int ret = OB_SUCCESS; + ObTxData *tx_data = tx_data_guard_.tx_data(); + if (OB_ISNULL(tx_data)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "tx data is unexpected nullptr", KR(ret), KPC(this)); + } else { + tx_data_ptr = tx_data; + } + return ret; } int ObCtxTxData::prepare_add_undo_action(ObUndoAction &undo_action, @@ -401,25 +442,6 @@ int ObCtxTxData::add_undo_action(ObUndoAction &undo_action, storage::ObUndoStatu return ret; } -int ObCtxTxData::Guard::get_tx_data(const ObTxData *&tx_data) const -{ - int ret = OB_SUCCESS; - auto tmp_tx_data = host_.tx_data_guard_.tx_data(); - if (NULL == tmp_tx_data) { - ret = OB_TRANS_CTX_NOT_EXIST; - } else { - tx_data = tmp_tx_data; - } - return ret; -} - -int ObCtxTxData::get_tx_commit_data(const ObTxCommitData *&tx_commit_data) const -{ - int ret = OB_SUCCESS; - tx_commit_data = &tx_commit_data_; - return ret; -} - int ObCtxTxData::check_tx_data_writable_() { int ret = OB_SUCCESS; @@ -481,6 +503,7 @@ int ObCtxTxData::deep_copy_tx_data_(ObTxTable *tx_table, storage::ObTxDataGuard return ret; } + } // namespace transaction } // namespace oceanbase diff --git a/src/storage/tx/ob_ctx_tx_data.h b/src/storage/tx/ob_ctx_tx_data.h index 8c1ecc85e..d536348ed 100644 --- a/src/storage/tx/ob_ctx_tx_data.h +++ b/src/storage/tx/ob_ctx_tx_data.h @@ -38,7 +38,7 @@ public: bool is_read_only() const { return read_only_; } int insert_into_tx_table(); - int recover_tx_data(const storage::ObTxData &tmp_tx_data); + int recover_tx_data(storage::ObTxDataGuard &rhs); int replace_tx_data(storage::ObTxData *tmp_tx_data); int deep_copy_tx_data_out(storage::ObTxDataGuard &tmp_tx_data_guard); int alloc_tmp_tx_data(storage::ObTxDataGuard &tmp_tx_data); @@ -66,19 +66,13 @@ public: int commit_add_undo_action(ObUndoAction &undo_action, storage::ObUndoStatusNode *tmp_undo_status); int add_undo_action(ObUndoAction &undo_action, storage::ObUndoStatusNode *tmp_undo_status = NULL); - int get_tx_commit_data(const storage::ObTxCommitData *&tx_commit_data) const; + int get_tx_data(storage::ObTxDataGuard &tx_data_guard); + + // ATTENTION : use get_tx_data_ptr only if you can make sure the life cycle of ctx_tx_data is longer than your usage + int get_tx_data_ptr(storage::ObTxData *&tx_data_ptr); + + TO_STRING_KV(KP(ctx_mgr_), K(tx_data_guard_), K(read_only_)); - TO_STRING_KV(KP(ctx_mgr_), K(tx_data_guard_), K(tx_commit_data_), K(read_only_)); -public: - class Guard { // TODO(yunxing.cyx): remove it - friend class ObCtxTxData; - Guard(ObCtxTxData &host) : host_(host) { } - ObCtxTxData &host_; - public: - ~Guard() { } - int get_tx_data(const storage::ObTxData *&tx_data) const; - }; - Guard get_tx_data() { return Guard(*this); } public: //only for unittest void test_init(storage::ObTxData &tx_data, ObLSTxCtxMgr *ctx_mgr) @@ -113,7 +107,6 @@ private: private: ObLSTxCtxMgr *ctx_mgr_; storage::ObTxDataGuard tx_data_guard_; - storage::ObTxCommitData tx_commit_data_; bool read_only_; // lock for tx_data_ pointer RWLock lock_; diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index cff1da91a..3220667db 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1362,7 +1362,7 @@ int ObPartTransCtx::recover_tx_ctx_table_info(ObTxCtxTableInfo &ctx_info) // TRANS_LOG(WARN, "unexpected null ptr", K(*this)); } else if (OB_FAIL(mt_ctx_.recover_from_table_lock_durable_info(ctx_info.table_lock_info_))) { TRANS_LOG(ERROR, "recover_from_table_lock_durable_info failed", K(ret)); - } else if (OB_FAIL(ctx_tx_data_.recover_tx_data(ctx_info.state_info_))) { + } else if (OB_FAIL(ctx_tx_data_.recover_tx_data(ctx_info.tx_data_guard_))) { TRANS_LOG(WARN, "recover tx data failed", K(ret), K(ctx_tx_data_)); } else { trans_id_ = ctx_info.tx_id_; @@ -5313,9 +5313,8 @@ int ObPartTransCtx::check_with_tx_data(ObITxDataCheckFunctor &fn) { // NB: You need notice the lock is not acquired during check int ret = OB_SUCCESS; - const ObTxData *tx_data_ptr = NULL; - auto guard = ctx_tx_data_.get_tx_data(); - if (OB_FAIL(guard.get_tx_data(tx_data_ptr))) { + ObTxData *tx_data_ptr = NULL; + if (OB_FAIL(ctx_tx_data_.get_tx_data_ptr(tx_data_ptr))) { } else { // const ObTxData &tx_data = *tx_data_ptr; // NB: we must read the state then the version without lock. If you are interested in the @@ -5434,41 +5433,24 @@ int ObPartTransCtx::refresh_rec_log_ts_() int ObPartTransCtx::get_tx_ctx_table_info_(ObTxCtxTableInfo &info) { int ret = OB_SUCCESS; - { - const ObTxData *tx_data = NULL; - const ObTxCommitData *tx_commit_data = NULL; - auto guard = ctx_tx_data_.get_tx_data(); - if (OB_FAIL(guard.get_tx_data(tx_data))) { - TRANS_LOG(WARN, "get tx data failed", K(ret)); - // rewrite ret - ret = OB_SUCCESS; - if (OB_FAIL(ctx_tx_data_.get_tx_commit_data(tx_commit_data))) { - TRANS_LOG(WARN, "get tx commit data failed", K(ret)); - } else { - info.state_info_ = *tx_commit_data; - } + + if (OB_FAIL(ctx_tx_data_.get_tx_data(info.tx_data_guard_))) { + TRANS_LOG(WARN, "get tx data failed", K(ret)); + } else if (OB_FAIL(mt_ctx_.calc_checksum_before_scn( + exec_info_.max_applied_log_ts_, exec_info_.checksum_, exec_info_.checksum_scn_))) { + TRANS_LOG(ERROR, "calc checksum before log ts failed", K(ret), KPC(this)); + } else { + info.tx_id_ = trans_id_; + info.ls_id_ = ls_id_; + info.exec_info_ = exec_info_; + info.cluster_id_ = cluster_id_; + if (OB_FAIL(mt_ctx_.get_table_lock_store_info(info.table_lock_info_))) { + TRANS_LOG(WARN, "get_table_lock_store_info failed", K(ret), K(info)); } else { - info.state_info_ = *tx_data; + TRANS_LOG(INFO, "store ctx_info: ", K(ret), K(info), KPC(this)); } } - if (OB_SUCC(ret)) { - if (OB_FAIL(mt_ctx_.calc_checksum_before_scn(exec_info_.max_applied_log_ts_, - exec_info_.checksum_, - exec_info_.checksum_scn_))) { - TRANS_LOG(ERROR, "calc checksum before log ts failed", K(ret), KPC(this)); - } else { - info.tx_id_ = trans_id_; - info.ls_id_ = ls_id_; - info.exec_info_ = exec_info_; - info.cluster_id_ = cluster_id_; - if (OB_FAIL(mt_ctx_.get_table_lock_store_info(info.table_lock_info_))) { - TRANS_LOG(WARN, "get_table_lock_store_info failed", K(ret), K(info)); - } else { - TRANS_LOG(INFO, "store ctx_info: ", K(ret), K(info), KPC(this)); - } - } - } return ret; } @@ -6702,9 +6684,9 @@ int ObPartTransCtx::dump_2_text(FILE *fd) fprintf(fd, "********** ObPartTransCtx ***********\n\n"); fprintf(fd, "%s\n", buf); - auto guard = ctx_tx_data_.get_tx_data(); - if (OB_FAIL(guard.get_tx_data(tx_data_ptr))) { - } else if (OB_ISNULL(tx_data_ptr)) { + ObTxDataGuard tx_data_guard; + ctx_tx_data_.get_tx_data(tx_data_guard); + if (OB_ISNULL(tx_data_ptr = tx_data_guard.tx_data())) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpected nullptr", KR(ret)); } else { diff --git a/src/storage/tx/ob_tx_data_define.cpp b/src/storage/tx/ob_tx_data_define.cpp index 68a6ab52e..a0eef393b 100644 --- a/src/storage/tx/ob_tx_data_define.cpp +++ b/src/storage/tx/ob_tx_data_define.cpp @@ -23,6 +23,7 @@ namespace storage int ObUndoStatusList::serialize(char *buf, const int64_t buf_len, int64_t &pos) const { + SpinRLockGuard guard(lock_); int ret = OB_SUCCESS; const int64_t len = get_serialize_size_(); if (OB_UNLIKELY(OB_ISNULL(buf) || buf_len <= 0 || pos > buf_len)) { @@ -73,6 +74,7 @@ int ObUndoStatusList::deserialize(const char *buf, int ret = OB_SUCCESS; int64_t version = 0; int64_t undo_status_list_len = 0; + SpinWLockGuard guard(lock_); if (OB_FAIL(serialization::decode_vi64(buf, data_len, pos, &version))) { STORAGE_LOG(WARN, "decode version fail", K(version), K(data_len), K(pos), K(ret)); @@ -145,6 +147,7 @@ int ObUndoStatusList::deserialize_(const char *buf, int64_t ObUndoStatusList::get_serialize_size() const { + SpinRLockGuard guard(lock_); int64_t data_len = get_serialize_size_(); int64_t len = 0; len += serialization::encoded_length_vi64(UNIS_VERSION); @@ -485,7 +488,6 @@ int ObTxData::add_undo_action(ObTxTable *tx_table, transaction::ObUndoAction &ne if (OB_NOT_NULL(undo_node)) { tx_data_table->free_undo_status_node(undo_node); } - return ret; } @@ -506,6 +508,7 @@ int ObTxData::merge_undo_actions_(ObTxDataTable *tx_data_table, } if (0 == node->size_) { + // fprintf(stdout, "free undo node, node ptr = %p \n", node); // all undo actions in this node are merged, free it // STORAGE_LOG(DEBUG, "current node is empty, now free it"); ObUndoStatusNode *node_to_free = node; diff --git a/src/storage/tx_table/ob_tx_ctx_table.cpp b/src/storage/tx_table/ob_tx_ctx_table.cpp index 03af7832a..e7c6d31ee 100644 --- a/src/storage/tx_table/ob_tx_ctx_table.cpp +++ b/src/storage/tx_table/ob_tx_ctx_table.cpp @@ -128,8 +128,8 @@ int ObTxCtxTableRecoverHelper::recover_one_tx_ctx_(transaction::ObLSTxCtxMgr* ls } int ObTxCtxTableRecoverHelper::recover(const blocksstable::ObDatumRow &row, - ObSliceAlloc &slice_allocator, - transaction::ObLSTxCtxMgr* ls_tx_ctx_mgr) + ObTxDataTable &tx_data_table, + transaction::ObLSTxCtxMgr *ls_tx_ctx_mgr) { int ret = OB_SUCCESS; @@ -206,7 +206,7 @@ int ObTxCtxTableRecoverHelper::recover(const blocksstable::ObDatumRow &row, int64_t pos = 0; bool tx_ctx_existed = true; - if (OB_FAIL(ctx_info_.deserialize(deserialize_buf, deserialize_buf_length, pos, slice_allocator))) { + if (OB_FAIL(ctx_info_.deserialize(deserialize_buf, deserialize_buf_length, pos, tx_data_table))) { STORAGE_LOG(WARN, "failed to deserialize status_info", K(ret), K_(ctx_info)); } else if (OB_FAIL(recover_one_tx_ctx_(ls_tx_ctx_mgr, ctx_info_))) { STORAGE_LOG(WARN, "failed to recover_one_tx_ctx_", K(ret), K(ctx_info_)); @@ -313,9 +313,9 @@ int ObTxCtxTable::offline() return ret; } -int ObTxCtxTable::recover(const blocksstable::ObDatumRow &row, ObSliceAlloc &slice_allocator) +int ObTxCtxTable::recover(const blocksstable::ObDatumRow &row, ObTxDataTable &tx_data_table) { - return recover_helper_.recover(row, slice_allocator, ls_tx_ctx_mgr_); + return recover_helper_.recover(row, tx_data_table, ls_tx_ctx_mgr_); } int ObTxCtxTable::check_with_tx_data(const transaction::ObTransID tx_id, ObITxDataCheckFunctor &fn) diff --git a/src/storage/tx_table/ob_tx_ctx_table.h b/src/storage/tx_table/ob_tx_ctx_table.h index 8b6257823..6e8b2bd3e 100644 --- a/src/storage/tx_table/ob_tx_ctx_table.h +++ b/src/storage/tx_table/ob_tx_ctx_table.h @@ -35,8 +35,8 @@ public: void reset(); void destroy(); int recover(const blocksstable::ObDatumRow &row, - ObSliceAlloc &slice_allocator, - transaction::ObLSTxCtxMgr* ls_tx_ctx_mgr); + ObTxDataTable &tx_data_table, + transaction::ObLSTxCtxMgr *ls_tx_ctx_mgr); TO_STRING_KV(K_(in_multi_row_state), K_(prev_meta), K_(prev_end_pos)); @@ -109,7 +109,7 @@ public: int release_ref_(); // We use the method to recover the tx_ctx_table for reboot. - int recover(const blocksstable::ObDatumRow &row, ObSliceAlloc &slice_allocator); + int recover(const blocksstable::ObDatumRow &row, ObTxDataTable &tx_data_table); int check_with_tx_data(const transaction::ObTransID tx_id, ObITxDataCheckFunctor &fn); diff --git a/src/storage/tx_table/ob_tx_table.cpp b/src/storage/tx_table/ob_tx_table.cpp index 05ad698a0..2c592e26c 100644 --- a/src/storage/tx_table/ob_tx_table.cpp +++ b/src/storage/tx_table/ob_tx_table.cpp @@ -688,7 +688,7 @@ int ObTxTable::restore_tx_ctx_table_(ObITable &trans_sstable) if (OB_ITER_END != ret) { LOG_WARN("failed to get next row", K(ret)); } - } else if (OB_FAIL(tx_ctx_table_.recover(*row, *tx_data_table_.get_slice_allocator()))) { + } else if (OB_FAIL(tx_ctx_table_.recover(*row, tx_data_table_))) { LOG_WARN("failed to recover tx ctx table", K(ret)); } } diff --git a/src/storage/tx_table/ob_tx_table_define.cpp b/src/storage/tx_table/ob_tx_table_define.cpp index 8ac42629d..cd3bd824b 100644 --- a/src/storage/tx_table/ob_tx_table_define.cpp +++ b/src/storage/tx_table/ob_tx_table_define.cpp @@ -11,6 +11,7 @@ */ #include "storage/tx_table/ob_tx_table_define.h" +#include "storage/tx_table/ob_tx_data_table.h" namespace oceanbase { @@ -71,7 +72,10 @@ int ObTxCtxTableInfo::serialize(char *buf, int ret = OB_SUCCESS; const int64_t data_len = get_serialize_size_(); ObTxCtxTableCommonHeader header(MAGIC_VERSION, data_len); - if (OB_FAIL(header.serialize(buf, buf_len, pos))) { + if (OB_ISNULL(tx_data_guard_.tx_data())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "invalid tx data guard", KR(ret), KPC(this)); + } else if (OB_FAIL(header.serialize(buf, buf_len, pos))) { TRANS_LOG(WARN, "encode header fail", K(buf_len), K(pos), K(ret)); } else if (OB_FAIL(serialize_(buf, buf_len, pos))) { TRANS_LOG(WARN, "serialize fail", K(ret)); @@ -91,7 +95,7 @@ int ObTxCtxTableInfo::serialize_(char *buf, TRANS_LOG(WARN, "serialize ls_id fail.", KR(ret), K(pos), K(buf_len)); } else if (OB_FAIL(serialization::encode_vi64(buf, buf_len, pos, cluster_id_))) { TRANS_LOG(WARN, "encode cluster id failed", K(cluster_id_), K(buf_len), K(pos), K(ret)); - } else if (OB_FAIL(state_info_.serialize(buf, buf_len, pos))) { + } else if (OB_FAIL(tx_data_guard_.tx_data()->serialize(buf, buf_len, pos))) { TRANS_LOG(WARN, "serialize state_info fail.", KR(ret), K(pos), K(buf_len)); } else if (OB_FAIL(exec_info_.serialize(buf, buf_len, pos))) { TRANS_LOG(WARN, "serialize exec_info fail.", KR(ret), K(pos), K(buf_len)); @@ -105,14 +109,16 @@ int ObTxCtxTableInfo::serialize_(char *buf, int ObTxCtxTableInfo::deserialize(const char *buf, const int64_t buf_len, int64_t &pos, - ObSliceAlloc &slice_allocator) + ObTxDataTable &tx_data_table) { int ret = OB_SUCCESS; ObTxCtxTableCommonHeader header(MAGIC_VERSION, 0); - if (OB_FAIL(header.deserialize(buf, buf_len, pos))) { + if (OB_FAIL(tx_data_table.alloc_tx_data(tx_data_guard_))) { + STORAGE_LOG(WARN, "alloc tx data failed", KR(ret)); + } else if (OB_FAIL(header.deserialize(buf, buf_len, pos))) { TRANS_LOG(WARN, "deserialize header fail", K(buf_len), K(pos), K(ret)); - } else if (OB_FAIL(deserialize_(buf, buf_len, pos, slice_allocator))) { + } else if (OB_FAIL(deserialize_(buf, buf_len, pos, tx_data_table))) { TRANS_LOG(INFO, "deserialize_ fail", "buf_len", buf_len, K(pos), K(ret)); } return ret; @@ -121,7 +127,7 @@ int ObTxCtxTableInfo::deserialize(const char *buf, int ObTxCtxTableInfo::deserialize_(const char *buf, const int64_t buf_len, int64_t &pos, - ObSliceAlloc &slice_allocator) + ObTxDataTable &tx_data_table) { int ret = OB_SUCCESS; if (OB_FAIL(tx_id_.deserialize(buf, buf_len, pos))) { @@ -130,7 +136,7 @@ int ObTxCtxTableInfo::deserialize_(const char *buf, TRANS_LOG(WARN, "deserialize ls_id fail.", KR(ret), K(pos), K(buf_len)); } else if (OB_FAIL(serialization::decode_vi64(buf, buf_len, pos, &cluster_id_))) { TRANS_LOG(WARN, "encode cluster_id fail", K(cluster_id_), K(buf_len), K(pos), K(ret)); - } else if (OB_FAIL(state_info_.deserialize(buf, buf_len, pos, slice_allocator))) { + } else if (OB_FAIL(tx_data_guard_.tx_data()->deserialize(buf, buf_len, pos, *tx_data_table.get_slice_allocator()))) { TRANS_LOG(WARN, "deserialize state_info fail.", KR(ret), K(pos), K(buf_len)); } else if (OB_FAIL(exec_info_.deserialize(buf, buf_len, pos))) { TRANS_LOG(WARN, "deserialize exec_info fail.", KR(ret), K(pos), K(buf_len)); @@ -159,7 +165,7 @@ int64_t ObTxCtxTableInfo::get_serialize_size_(void) const len += tx_id_.get_serialize_size(); len += ls_id_.get_serialize_size(); len += serialization::encoded_length_vi64(cluster_id_); - len += state_info_.get_serialize_size(); + len += (OB_NOT_NULL(tx_data_guard_.tx_data()) ? tx_data_guard_.tx_data()->get_serialize_size() : 0); len += exec_info_.get_serialize_size(); len += table_lock_info_.get_serialize_size(); return len; diff --git a/src/storage/tx_table/ob_tx_table_define.h b/src/storage/tx_table/ob_tx_table_define.h index 421e59dbd..425349e11 100644 --- a/src/storage/tx_table/ob_tx_table_define.h +++ b/src/storage/tx_table/ob_tx_table_define.h @@ -75,12 +75,12 @@ private: const static int64_t MAGIC_VERSION = MAGIC_NUM + UNIS_VERSION; public: int serialize(char *buf, const int64_t buf_len, int64_t &pos) const; - int deserialize(const char *buf, const int64_t buf_len, int64_t &pos, ObSliceAlloc &slice_allocator); + int deserialize(const char *buf, const int64_t buf_len, int64_t &pos, ObTxDataTable &tx_data_table); int64_t get_serialize_size() const; private: int serialize_(char *buf, const int64_t buf_len, int64_t &pos) const; - int deserialize_(const char *buf, const int64_t buf_len, int64_t &pos, ObSliceAlloc &slice_allocator); + int deserialize_(const char *buf, const int64_t buf_len, int64_t &pos, ObTxDataTable &tx_data_table); int64_t get_serialize_size_() const; public: @@ -92,16 +92,16 @@ public: tx_id_.reset(); ls_id_.reset(); cluster_id_ = OB_INVALID_CLUSTER_ID; - state_info_.reset(); + tx_data_guard_.reset(); exec_info_.reset(); table_lock_info_.reset(); } void destroy() { reset(); } - TO_STRING_KV(K_(tx_id), K_(ls_id), K_(cluster_id), K_(state_info), K_(exec_info)); + TO_STRING_KV(K_(tx_id), K_(ls_id), K_(cluster_id), K_(tx_data_guard), K_(exec_info)); transaction::ObTransID tx_id_; share::ObLSID ls_id_; int64_t cluster_id_; - ObTxData state_info_; + ObTxDataGuard tx_data_guard_; transaction::ObTxExecInfo exec_info_; transaction::tablelock::ObTableLockInfo table_lock_info_; }; diff --git a/unittest/storage/tx/test_ob_standby_read.cpp b/unittest/storage/tx/test_ob_standby_read.cpp index db7023dfc..6ef6c2bfd 100644 --- a/unittest/storage/tx/test_ob_standby_read.cpp +++ b/unittest/storage/tx/test_ob_standby_read.cpp @@ -314,7 +314,12 @@ TEST_F(TestObStandbyRead, trans_check_for_standby) part1.set_downstream_state(ObTxState::PREPARE); part1.exec_info_.prepare_version_.convert_for_tx(90); part2.set_downstream_state(ObTxState::COMMIT); - part2.ctx_tx_data_.tx_commit_data_.commit_version_.convert_for_tx(90); + ObTxData part2_tx_data; + ObSliceAlloc slice_allocator; + part2_tx_data.ref_cnt_ = 1000; + part2_tx_data.slice_allocator_ = &slice_allocator; + part2.ctx_tx_data_.tx_data_guard_.init(&part2_tx_data); + part2.ctx_tx_data_.tx_data_guard_.tx_data()->commit_version_.convert_for_tx(90); part3.set_downstream_state(ObTxState::UNKNOWN); can_read = false; part1.state_info_array_.reset(); @@ -343,7 +348,7 @@ TEST_F(TestObStandbyRead, trans_check_for_standby) part1.set_downstream_state(ObTxState::PREPARE); coord.exec_info_.prepare_version_.convert_for_tx(90); part2.set_downstream_state(ObTxState::COMMIT); - part2.ctx_tx_data_.tx_commit_data_.commit_version_.convert_for_tx(300); + part2.ctx_tx_data_.tx_data_guard_.tx_data()->commit_version_.convert_for_tx(300); part3.set_downstream_state(ObTxState::UNKNOWN); can_read = true; part1.state_info_array_.reset(); diff --git a/unittest/storage/tx_table/test_tx_ctx_table.cpp b/unittest/storage/tx_table/test_tx_ctx_table.cpp index bb57915d4..976e201f7 100644 --- a/unittest/storage/tx_table/test_tx_ctx_table.cpp +++ b/unittest/storage/tx_table/test_tx_ctx_table.cpp @@ -258,9 +258,10 @@ TEST_F(TestTxCtxTable, test_tx_ctx_memtable_mgr) // ObTransSSTableDurableCtxInfo ctx_info; ObTxCtxTableInfo ctx_info; ObSliceAlloc slice_allocator; + ObTxDataTable tx_data_table; ObMemAttr attr; attr.tenant_id_ = MTL_ID(); - slice_allocator.init(sizeof(ObTxData), OB_MALLOC_NORMAL_BLOCK_SIZE, common::default_blk_alloc, attr); + tx_data_table.slice_allocator_.init(sizeof(ObTxData), OB_MALLOC_NORMAL_BLOCK_SIZE, common::default_blk_alloc, attr); ObTxPalfParam palf_param((logservice::ObLogHandler *)(0x01)); @@ -299,7 +300,7 @@ TEST_F(TestTxCtxTable, test_tx_ctx_memtable_mgr) row_copy.storage_datums_[TX_CTX_TABLE_META_COLUMN] = row_copy.storage_datums_[meta_col]; row_copy.storage_datums_[TX_CTX_TABLE_VAL_COLUMN] = row_copy.storage_datums_[value_col]; TRANS_LOG(INFO, "row_info projected", K(row_copy)); - ASSERT_EQ(OB_SUCCESS, recover_helper.recover(row_copy, slice_allocator, ls_tx_ctx_mgr_recover)); + ASSERT_EQ(OB_SUCCESS, recover_helper.recover(row_copy, tx_data_table, ls_tx_ctx_mgr_recover)); } while (tx_ctx_memtable_iter->has_unmerged_buf_); ObTxCtxTableInfo* ctx_info = recover_helper.get_tx_ctx_table_info();