diff --git a/mittest/CMakeLists.txt b/mittest/CMakeLists.txt index 138d7f4f9..eb18cc658 100644 --- a/mittest/CMakeLists.txt +++ b/mittest/CMakeLists.txt @@ -1,3 +1,12 @@ +add_library(mit_env env/ob_simple_server_helper.cpp) + +target_include_directories(mit_env PUBLIC + ${CMAKE_SOURCE_DIR}/unittest ${CMAKE_SOURCE_DIR}/mittest) + +target_link_libraries(mit_env PUBLIC + oceanbase +) + add_subdirectory(logservice) add_subdirectory(simple_server) add_subdirectory(mtlenv) diff --git a/mittest/simple_server/env/ob_simple_server_helper.cpp b/mittest/env/ob_simple_server_helper.cpp similarity index 100% rename from mittest/simple_server/env/ob_simple_server_helper.cpp rename to mittest/env/ob_simple_server_helper.cpp diff --git a/mittest/simple_server/env/ob_simple_server_helper.h b/mittest/env/ob_simple_server_helper.h similarity index 100% rename from mittest/simple_server/env/ob_simple_server_helper.h rename to mittest/env/ob_simple_server_helper.h diff --git a/mittest/multi_replica/CMakeLists.txt b/mittest/multi_replica/CMakeLists.txt index dfb933f4b..0a13feff8 100644 --- a/mittest/multi_replica/CMakeLists.txt +++ b/mittest/multi_replica/CMakeLists.txt @@ -9,9 +9,11 @@ add_library(simple_replica_test ${OBSERVER_TEST_SRCS}) target_include_directories( simple_replica_test PUBLIC ${CMAKE_SOURCE_DIR}/unittest ${CMAKE_SOURCE_DIR}/mittest) - target_link_libraries(simple_replica_test PUBLIC + +target_link_libraries(simple_replica_test PUBLIC oceanbase - ) + mit_env +) function(ob_unittest_multi_replica case) ob_unittest(${ARGV}) @@ -26,3 +28,4 @@ ob_unittest_multi_replica(test_ob_dup_table_restart) ob_unittest_multi_replica(test_ob_dup_table_leader_switch) ob_unittest_multi_replica(test_ob_dup_table_tablet_gc) ob_unittest_multi_replica(test_mds_replay_from_ctx_table) +ob_unittest_multi_replica(test_multi_transfer_tx) diff --git a/mittest/multi_replica/env/ob_multi_replica_test_base.cpp b/mittest/multi_replica/env/ob_multi_replica_test_base.cpp index 310aac166..cae2a2920 100644 --- a/mittest/multi_replica/env/ob_multi_replica_test_base.cpp +++ b/mittest/multi_replica/env/ob_multi_replica_test_base.cpp @@ -728,7 +728,8 @@ int ObMultiReplicaTestBase::close() int ObMultiReplicaTestBase::create_tenant(const char *tenant_name, const char *memory_size, const char *log_disk_size, - const bool oracle_mode) + const bool oracle_mode, + const char *primary_zone) { SERVER_LOG(INFO, "create tenant start"); int32_t log_level; @@ -787,9 +788,9 @@ int ObMultiReplicaTestBase::create_tenant(const char *tenant_name, ObSqlString sql; if (OB_FAIL(ret)) { } else if (OB_FAIL(sql.assign_fmt( - "create tenant %s replica_num = 3, primary_zone='zone1', " + "create tenant %s replica_num = 3, primary_zone='%s', " "resource_pool_list=('pool_ym_%s') set ob_tcp_invited_nodes='%%'%s", - tenant_name, tenant_name, + tenant_name, primary_zone, tenant_name, oracle_mode ? ", ob_compatibility_mode='oracle'" : ";"))) { SERVER_LOG(WARN, "create_tenant", K(ret)); } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) { diff --git a/mittest/multi_replica/env/ob_multi_replica_test_base.h b/mittest/multi_replica/env/ob_multi_replica_test_base.h index 5381a3c79..a23482ec7 100644 --- a/mittest/multi_replica/env/ob_multi_replica_test_base.h +++ b/mittest/multi_replica/env/ob_multi_replica_test_base.h @@ -81,7 +81,8 @@ public: int create_tenant(const char *tenant_name = DEFAULT_TEST_TENANT_NAME, const char *memory_size = "2G", const char *log_disk_size = "2G", - const bool oracle_mode = false); + const bool oracle_mode = false, + const char *primary_zone = "zone1"); int delete_tenant(const char *tenant_name = DEFAULT_TEST_TENANT_NAME); int get_tenant_id(uint64_t &tenant_id, const char *tenant_name = DEFAULT_TEST_TENANT_NAME); int exec_write_sql_sys(const char *sql_str, int64_t &affected_rows); diff --git a/mittest/multi_replica/test_multi_transfer_tx.cpp b/mittest/multi_replica/test_multi_transfer_tx.cpp new file mode 100644 index 000000000..a5582f6e2 --- /dev/null +++ b/mittest/multi_replica/test_multi_transfer_tx.cpp @@ -0,0 +1,335 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#include +#define USING_LOG_PREFIX SERVER +#define protected public +#define private public +#include "env/ob_multi_replica_test_base.h" +#include "env/ob_multi_replica_util.h" +#include "mittest/env/ob_simple_server_helper.h" +#include "lib/mysqlclient/ob_mysql_result.h" +#include "storage/tx/ob_trans_part_ctx.h" +#include "storage/compaction/ob_tablet_merge_task.h" +#include "storage/compaction/ob_tablet_merge_ctx.h" +#include "storage/high_availability/ob_tablet_backfill_tx.h" + +#define CUR_TEST_CASE_NAME ObTestMultiTransferTx +DEFINE_MULTI_ZONE_TEST_CASE_CLASS +MULTI_REPLICA_TEST_MAIN_FUNCTION(test_multi_transfer_tx_); + +static ObTabletID MULTI_TRANSFER_TX_CHOOSEN_TABLET_ID; +static oceanbase::transaction::ObTransID MULTI_TRANSFER_TX_CHOOSEN_TX_ID; + +namespace oceanbase +{ + +namespace transaction +{ +int ObPartTransCtx::replay_rollback_to(const ObTxRollbackToLog &log, + const palf::LSN &offset, + const SCN ×tamp, + const int64_t &part_log_no, + const bool is_tx_log_queue, + const bool pre_barrier) +{ + int ret = OB_SUCCESS; + common::ObTimeGuard timeguard("replay_rollback_to", 10 * 1000); + // int64_t start = ObTimeUtility::fast_current_time(); + CtxLockGuard guard(lock_); + + if (trans_id_ == MULTI_TRANSFER_TX_CHOOSEN_TX_ID) { + TRANS_LOG(INFO, "qianchen debuf2", KPC(this)); + return OB_EAGAIN; + } + + bool need_replay = true; + ObTxSEQ from = log.get_from(); + ObTxSEQ to = log.get_to(); + if (OB_UNLIKELY(from.get_branch() != to.get_branch())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "invalid savepoint", K(log)); + } + // + // the log is replay in txn log queue + // for parallel replay, a global savepoint after the serial final log + // must set the pre-barrier replay flag + // some branch savepoint also need this, but we can't distinguish + // hence only sanity check for global savepoint + // + else if (is_tx_log_queue) { + if (is_parallel_logging() // has enter parallel logging + && to.get_branch() == 0 // is a global savepoint + && timestamp > exec_info_.serial_final_scn_ // it is after the serial final log + && !pre_barrier) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "missing pre barrier flag for parallel replay", KR(ret), K(*this)); + usleep(5000); + ob_abort(); + } else if (OB_FAIL(check_replay_avaliable_(offset, timestamp, part_log_no, need_replay))) { + TRANS_LOG(WARN, "check replay available failed", KR(ret), K(offset), K(timestamp), K(*this)); + } else if (!need_replay) { + TRANS_LOG(INFO, "need not replay log", K(log), K(timestamp), K(offset), K(*this)); + } else if (OB_FAIL((update_replaying_log_no_(timestamp, part_log_no)))) { + TRANS_LOG(WARN, "update replaying log no failed", K(ret), K(timestamp), K(part_log_no)); + } + } else if (exec_info_.need_checksum_ && !has_replay_serial_final_()) { + ret = OB_EAGAIN; + TRANS_LOG(INFO, "branch savepoint should wait replay serial final because of calc checksum", + K(ret), K(timestamp), KP(this), K(trans_id_), K(ls_id_), K(exec_info_)); + } else if (!ctx_tx_data_.get_start_log_ts().is_valid() && OB_FAIL(ctx_tx_data_.set_start_log_ts(timestamp))) { + // update start_log_ts for branch savepoint, because it may replayed before first log in txn queue + TRANS_LOG(WARN, "set tx data start log ts fail", K(ret), K(timestamp), KPC(this)); + } + + // + // Step1, add Undo into TxData, both for parallel replay and serial replay + // + if (OB_SUCC(ret) && need_replay && OB_FAIL(rollback_to_savepoint_(log.get_from(), log.get_to(), timestamp))) { + TRANS_LOG(WARN, "replay savepoint_rollback fail", K(ret), K(log), K(offset), K(timestamp), + KPC(this)); + } + + // this is compatible code, since 4.3, redo_lsn not collect during replay + if (OB_SUCC(ret) && OB_FAIL(check_and_merge_redo_lsns_(offset))) { + TRANS_LOG(WARN, "check and merge redo lsns failed", K(ret), K(trans_id_), K(timestamp), K(offset)); + } + + // + // Step2, remove TxNode(s) + // + if (OB_SUCC(ret) && !need_replay) { + if (OB_FAIL(mt_ctx_.rollback(log.get_to(), log.get_from(), timestamp))) { + TRANS_LOG(WARN, "mt ctx rollback fail", K(ret), K(log), KPC(this)); + } + } + + if (OB_FAIL(ret) && OB_EAGAIN != ret) { + TRANS_LOG(WARN, "[Replay Tx] Replay RollbackToLog in TxCtx Failed", K(timestamp), K(offset), + K(ret), K(need_replay), K(log), KPC(this)); + } else { +#ifndef NDEBUG + TRANS_LOG(INFO, "[Replay Tx] Replay RollbackToLog in TxCtx", K(timestamp), K(offset), K(ret), + K(need_replay), K(log), KPC(this)); +#endif + } + + if (OB_EAGAIN != ret) { + REC_TRANS_TRACE_EXT(tlog_, + replay_rollback_to, + OB_ID(ret), + ret, + OB_ID(used), + timeguard.get_diff(), + OB_Y(need_replay), + OB_ID(offset), + offset.val_, + OB_ID(t), + timestamp, + OB_ID(ref), + get_ref()); + } + + return ret; +} +} + +namespace storage +{ +int ObTransferWorkerMgr::do_transfer_backfill_tx_(const ObTransferBackfillTXParam ¶m) +{ + int ret = OB_SUCCESS; + STORAGE_LOG(INFO, "qianchen debuf3", K(param)); + return ret; +} +} + +namespace compaction +{ +int ObTabletMergeFinishTask::process() +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + ObTaskController::get().switch_task(share::ObTaskType::DATA_MAINTAIN); + ObTabletMergeCtx *ctx_ptr = nullptr; + DEBUG_SYNC(MERGE_PARTITION_FINISH_TASK); + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + STORAGE_LOG(WARN, "not inited yet", K(ret)); + } else if (OB_UNLIKELY(nullptr == merge_dag_ + || (nullptr == (ctx_ptr = static_cast(merge_dag_->get_ctx()))))) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "get unexpected null ctx", K(ret)); + } else if (FALSE_IT(SET_MEM_CTX(ctx_ptr->mem_ctx_))) { + } else if (ctx_ptr->get_tablet_id() == MULTI_TRANSFER_TX_CHOOSEN_TABLET_ID) { + STORAGE_LOG(WARN, "qianchen debuf find tablet", KR(ret), KPC(ctx_ptr)); + ret = OB_EAGAIN; + } else if (OB_FAIL(ctx_ptr->update_tablet_after_merge())) { + STORAGE_LOG(WARN, "failed to update tablet after merge", KR(ret), KPC(ctx_ptr)); + } + if (OB_FAIL(ret)) { + STORAGE_LOG(WARN, "sstable merge failed", K(ret), KPC(ctx_ptr), "task", *(static_cast(this))); + } else { + ObITable *sstable = ctx_ptr->merged_table_handle_.get_table(); + // ATTENTION! Critical diagnostic log, DO NOT CHANGE!!! + STORAGE_LOG(INFO, "sstable merge finish", K(ret), "merge_info", ctx_ptr->get_merge_info(), + KPC(sstable), "mem_peak", ctx_ptr->mem_ctx_.get_total_mem_peak(), "compat_mode", merge_dag_->get_compat_mode(), + "time_guard", ctx_ptr->info_collector_.time_guard_); + } + return ret; +} +} + +namespace unittest +{ + +#define EXE_SQL(sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + +#define EXE_SQL_FMT(...) \ + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \ + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + + +void get_tablet_info_with_table_name(common::ObMySQLProxy &sql_proxy, + const char *name, + int64_t &table_id, + int64_t &object_id, + int64_t &tablet_id, + int64_t &ls_id) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + int64_t affected_rows = 0; + + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("SELECT table_id, object_id, tablet_id, ls_id FROM oceanbase.DBA_OB_TABLE_LOCATIONS WHERE TABLE_NAME= '%s';", name)); + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr())); + sqlclient::ObMySQLResult *result = res.get_result(); + ASSERT_NE(nullptr, result); + ASSERT_EQ(OB_SUCCESS, result->next()); + ASSERT_EQ(OB_SUCCESS, result->get_int("table_id", table_id)); + ASSERT_EQ(OB_SUCCESS, result->get_int("object_id", object_id)); + ASSERT_EQ(OB_SUCCESS, result->get_int("tablet_id", tablet_id)); + ASSERT_EQ(OB_SUCCESS, result->get_int("ls_id", ls_id)); + } +} + +TEST_F(GET_ZONE_TEST_CLASS_NAME(1), create_test_env) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + int64_t affected_rows = 0; + + // ============================== Phase1. create tenant ============================== + uint64_t tenant_id; + SERVER_LOG(INFO, "create_tenant start"); + ASSERT_EQ(OB_SUCCESS, create_tenant(DEFAULT_TEST_TENANT_NAME, + "2G", + "2G", + false, + "zone1, zone2")); + ASSERT_EQ(OB_SUCCESS, get_tenant_id(tenant_id)); + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); + SERVER_LOG(INFO, "create_tenant end", K(tenant_id)); + + SERVER_LOG(INFO, "[ObMultiReplicaTestBase] create test tenant success", K(tenant_id)); + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); + sqlclient::ObISQLConnection *connection_qc = nullptr; + ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection_qc)); + ASSERT_NE(nullptr, connection_qc); + WRITE_SQL_BY_CONN(connection_qc, "alter system set partition_balance_schedule_interval = '10s';"); + WRITE_SQL_BY_CONN(connection_qc, "alter system set _enable_active_txn_transfer = True;"); + + // ============================== Phase2. create table ============================== + TRANS_LOG(INFO, "create table qcc1 start"); + EXE_SQL("create table qcc1 (a int)"); + TRANS_LOG(INFO, "create_table qcc1 end"); + + TRANS_LOG(INFO, "create table qcc2 start"); + EXE_SQL("create table qcc2 (a int)"); + TRANS_LOG(INFO, "create_table qcc2 end"); + usleep(3 * 1000 * 1000); + ObLSID loc1, loc2; + ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc1", loc1)); + ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc2", loc2)); + int64_t table1, table2; + int64_t object1, object2; + int64_t tablet1, tablet2; + int64_t ls1, ls2; + get_tablet_info_with_table_name(sql_proxy, "qcc1", table1, object1, tablet1, ls1); + get_tablet_info_with_table_name(sql_proxy, "qcc2", table2, object2, tablet2, ls2); + fprintf(stdout, "qcc is created successfully, loc1: %ld, loc2: %ld, table1: %ld, table2: %ld, tablet1: %ld, tablet2: %ld, ls1: %ld, ls2: %ld\n", + loc1.id(), loc2.id(), table1, table2, tablet1, tablet2, ls1, ls2); + ASSERT_NE(loc1, loc2); + EXE_SQL("create tablegroup tg1 sharding='NONE';"); + + MULTI_TRANSFER_TX_CHOOSEN_TABLET_ID = tablet2; + + // ============================== Phase5. start the user txn ============================== + sqlclient::ObISQLConnection *user_connection = nullptr; + ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(user_connection)); + ASSERT_NE(nullptr, user_connection); + + WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_trx_timeout = 10000000000"); + WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_trx_idle_timeout = 10000000000"); + WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_query_timeout = 10000000000"); + + TRANS_LOG(INFO, "start the txn"); + WRITE_SQL_BY_CONN(user_connection, "begin;"); + WRITE_SQL_BY_CONN(user_connection, "savepoint qc1"); + WRITE_SQL_BY_CONN(user_connection, "insert into qcc1 values(1);"); + WRITE_SQL_BY_CONN(user_connection, "insert into qcc2 values(1);"); + + ObTransID tx_id; + ASSERT_EQ(0, SSH::find_tx(user_connection, tx_id)); + MULTI_TRANSFER_TX_CHOOSEN_TX_ID = tx_id; + fprintf(stdout, "starting the user txn, %lu\n", tx_id.get_id()); + TRANS_LOG(INFO, "starting the user txn", K(tx_id)); + + // ============================== Phase5. start the transfer ============================== + EXE_SQL("alter tablegroup tg1 add qcc1,qcc2;"); + usleep(1 * 1000 * 1000); + int64_t begin_time = ObTimeUtility::current_time(); + while (true) { + ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc1", loc1)); + ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc2", loc2)); + if (loc1 == loc2) { + fprintf(stdout, "succeed wait for balancer\n"); + break; + } else if (ObTimeUtility::current_time() - begin_time > 300 * 1000 * 1000) { + fprintf(stdout, "ERROR: fail to wait for balancer\n"); + break; + } else { + usleep(1 * 1000 * 1000); + fprintf(stdout, "wait for balancer\n"); + } + } + + ASSERT_EQ(loc1, loc2); + + WRITE_SQL_BY_CONN(user_connection, "rollback to qc1"); + + HEAP_VAR(ObMySQLProxy::MySQLResult, res) + { + ASSERT_EQ(OB_SUCCESS, + user_connection->execute_read(OB_SYS_TENANT_ID, + "select /*+log_level(debug)*/* from qcc2;", + res)); + common::sqlclient::ObMySQLResult *result = res.mysql_result(); + ASSERT_EQ(OB_ITER_END, result->next()); + } +} + +} // namespace unittest +} // namespace oceanbase diff --git a/mittest/simple_server/CMakeLists.txt b/mittest/simple_server/CMakeLists.txt index ed3f08122..62b7328a5 100644 --- a/mittest/simple_server/CMakeLists.txt +++ b/mittest/simple_server/CMakeLists.txt @@ -2,7 +2,6 @@ set(OBSERVER_TEST_SRCS env/ob_simple_server.cpp env/ob_simple_server_restart_helper.cpp env/ob_simple_cluster_test_base.cpp - env/ob_simple_server_helper.cpp ) add_library(observer_test ${OBSERVER_TEST_SRCS}) @@ -11,6 +10,7 @@ target_include_directories(observer_test PUBLIC ${CMAKE_SOURCE_DIR}/unittest ${CMAKE_SOURCE_DIR}/mittest) target_link_libraries(observer_test PUBLIC oceanbase + mit_env ) function(ob_unittest_observer case) diff --git a/mittest/simple_server/test_transfer_in_after_abort.cpp b/mittest/simple_server/test_transfer_in_after_abort.cpp index cee7c1a7d..eb0cb1fd5 100644 --- a/mittest/simple_server/test_transfer_in_after_abort.cpp +++ b/mittest/simple_server/test_transfer_in_after_abort.cpp @@ -18,7 +18,7 @@ #include "env/ob_simple_cluster_test_base.h" #include "rootserver/ob_tenant_balance_service.h" #include "share/balance/ob_balance_job_table_operator.h" -#include "mittest/simple_server/env/ob_simple_server_helper.h" +#include "mittest/env/ob_simple_server_helper.h" #include "storage/tx_storage/ob_ls_service.h" #include "storage/tx/ob_tx_loop_worker.h" #include "storage/tx/ob_trans_part_ctx.h" diff --git a/mittest/simple_server/test_transfer_tx.cpp b/mittest/simple_server/test_transfer_tx.cpp index 3310c81d8..830653709 100644 --- a/mittest/simple_server/test_transfer_tx.cpp +++ b/mittest/simple_server/test_transfer_tx.cpp @@ -26,7 +26,7 @@ #include "rootserver/ob_balance_group_ls_stat_operator.h" #include "storage/tablet/ob_tablet.h" #include "logservice/ob_log_service.h" -#include "mittest/simple_server/env/ob_simple_server_helper.h" +#include "mittest/env/ob_simple_server_helper.h" namespace oceanbase { diff --git a/mittest/simple_server/test_transfer_with_smaller_tx_data.cpp b/mittest/simple_server/test_transfer_with_smaller_tx_data.cpp index 309f292f6..80f95df71 100644 --- a/mittest/simple_server/test_transfer_with_smaller_tx_data.cpp +++ b/mittest/simple_server/test_transfer_with_smaller_tx_data.cpp @@ -18,7 +18,7 @@ #include "env/ob_simple_cluster_test_base.h" #include "rootserver/ob_tenant_balance_service.h" #include "share/balance/ob_balance_job_table_operator.h" -#include "mittest/simple_server/env/ob_simple_server_helper.h" +#include "mittest/env/ob_simple_server_helper.h" #include "storage/tx_storage/ob_ls_service.h" #include "storage/tx/ob_tx_loop_worker.h" #include "storage/tx/ob_trans_part_ctx.h" diff --git a/src/storage/memtable/mvcc/ob_mvcc_engine.cpp b/src/storage/memtable/mvcc/ob_mvcc_engine.cpp index f9539d2fa..6f1121aa1 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_engine.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_engine.cpp @@ -106,6 +106,7 @@ int ObMvccEngine::try_compact_row_when_mvcc_read_(const SCN &snapshot_version, int ObMvccEngine::get(ObMvccAccessCtx &ctx, const ObQueryFlag &query_flag, const ObMemtableKey *parameter_key, + const share::ObLSID memtable_ls_id, ObMemtableKey *returned_key, ObMvccValueIterator &value_iter, ObStoreRowLockState &lock_state) @@ -139,6 +140,7 @@ int ObMvccEngine::get(ObMvccAccessCtx &ctx, if (OB_FAIL(value_iter.init(ctx, returned_key, value, + memtable_ls_id, query_flag))) { TRANS_LOG(WARN, "ObMvccValueIterator init fail", KR(ret)); } @@ -153,6 +155,7 @@ int ObMvccEngine::scan( ObMvccAccessCtx &ctx, const ObQueryFlag &query_flag, const ObMvccScanRange &range, + const share::ObLSID memtable_ls_id, ObMvccRowIterator &row_iter) { int ret = OB_SUCCESS; @@ -165,6 +168,7 @@ int ObMvccEngine::scan( } else if (OB_FAIL(row_iter.init(*query_engine_, ctx, range, + memtable_ls_id, query_flag))) { TRANS_LOG(WARN, "row_iter init fail", K(ret)); } else { diff --git a/src/storage/memtable/mvcc/ob_mvcc_engine.h b/src/storage/memtable/mvcc/ob_mvcc_engine.h index e9a5f1a84..ceeee2bce 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_engine.h +++ b/src/storage/memtable/mvcc/ob_mvcc_engine.h @@ -91,12 +91,14 @@ public: int get(ObMvccAccessCtx &ctx, const ObQueryFlag &query_flag, const ObMemtableKey *parameter_key, + const share::ObLSID memtable_ls_id, ObMemtableKey *internal_key, ObMvccValueIterator &value_iter, storage::ObStoreRowLockState &lock_state); int scan(ObMvccAccessCtx &ctx, const ObQueryFlag &query_flag, const ObMvccScanRange &range, + const share::ObLSID memtable_ls_id, ObMvccRowIterator &row_iter); int scan(ObMvccAccessCtx &ctx, const ObMvccScanRange &range, diff --git a/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp b/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp index 468c51284..edde61fe5 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp @@ -19,6 +19,7 @@ #include "storage/memtable/ob_memtable_context.h" #include "storage/memtable/ob_row_conflict_handler.h" #include "storage/tx/ob_trans_ctx.h" +#include "storage/ls/ob_ls.h" #include "common/ob_clock_generator.h" namespace oceanbase @@ -33,6 +34,7 @@ namespace memtable int ObMvccValueIterator::init(ObMvccAccessCtx &ctx, const ObMemtableKey *key, ObMvccRow *value, + const share::ObLSID memtable_ls_id, const ObQueryFlag &query_flag) { int ret = OB_SUCCESS; @@ -46,6 +48,7 @@ int ObMvccValueIterator::init(ObMvccAccessCtx &ctx, is_inited_ = true; } else { value_ = value; + memtable_ls_id_ = memtable_ls_id; if (OB_FAIL(lock_for_read_(query_flag))) { TRANS_LOG(WARN, "fail to find start pos for iterator", K(ret)); } else { @@ -53,12 +56,13 @@ int ObMvccValueIterator::init(ObMvccAccessCtx &ctx, } } TRANS_LOG(TRACE, "value_iter.init", K(ret), - KPC(value), - KPC_(version_iter), - K(query_flag.is_read_latest()), - KPC(key), - K(ctx), - K(lbt())); + KPC(value), + KPC_(version_iter), + K(query_flag.is_read_latest()), + KPC(key), + K(ctx), + K(memtable_ls_id), + K(lbt())); return ret; } @@ -149,6 +153,10 @@ int ObMvccValueIterator::lock_for_read_inner_(const ObQueryFlag &flag, if ((is_committed || is_aborted || (is_elr && !is_delayed_cleanout)) // Opt2: data is not decided while we donot need cleanout || (!is_delayed_cleanout + && (!ctx_->get_tx_table_guards().src_tx_table_guard_.is_valid() || + (memtable_ls_id_.is_valid() && + ctx_->get_tx_table_guards().src_tx_table_guard_.get_tx_table()-> + get_ls_id() != memtable_ls_id_)) && (// Opt2.1: snapshot reads the data written by snapshot data_tx_id == ctx_->snapshot_.tx_id_ || // Opt2.2: read reader's latest is matched @@ -357,6 +365,7 @@ int ObMvccValueIterator::check_row_locked(ObStoreRowLockState &lock_state) ObMvccRowIterator::ObMvccRowIterator() : is_inited_(false), ctx_(NULL), + memtable_ls_id_(), query_flag_(), value_iter_(), query_engine_(NULL), @@ -373,6 +382,7 @@ int ObMvccRowIterator::init( ObQueryEngine &query_engine, ObMvccAccessCtx &ctx, const ObMvccScanRange &range, + const share::ObLSID memtable_ls_id, const ObQueryFlag &query_flag) { int ret = OB_SUCCESS; @@ -390,6 +400,7 @@ int ObMvccRowIterator::init( query_flag_ = query_flag; query_engine_ = &query_engine; query_engine_iter_->set_version(ctx.snapshot_.version_.get_val_for_tx()); + memtable_ls_id_ = memtable_ls_id; is_inited_ = true; } return ret; @@ -435,6 +446,7 @@ int ObMvccRowIterator::get_next_row( if (OB_FAIL(value_iter_.init(*ctx_, tmp_key, value, + memtable_ls_id_, query_flag_))) { TRANS_LOG(WARN, "value iter init fail", K(ret), "ctx", *ctx_, KP(value), K(*value)); } else if (!value_iter_.is_exist()) { @@ -455,6 +467,7 @@ void ObMvccRowIterator::reset() { is_inited_ = false; ctx_ = NULL; + memtable_ls_id_.reset(); query_flag_.reset(); value_iter_.reset(); if (NULL != query_engine_iter_) { diff --git a/src/storage/memtable/mvcc/ob_mvcc_iterator.h b/src/storage/memtable/mvcc/ob_mvcc_iterator.h index 1653e9b64..8c3693e65 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_iterator.h +++ b/src/storage/memtable/mvcc/ob_mvcc_iterator.h @@ -95,10 +95,11 @@ class ObMvccValueIterator { public: ObMvccValueIterator() - : is_inited_(false), - ctx_(NULL), - value_(NULL), - version_iter_(NULL) + : is_inited_(false), + ctx_(NULL), + value_(NULL), + memtable_ls_id_(), + version_iter_(NULL) { } virtual ~ObMvccValueIterator() {} @@ -106,6 +107,7 @@ public: int init(ObMvccAccessCtx &ctx, const ObMemtableKey *key, ObMvccRow *value, + const share::ObLSID memtable_ls_id, const ObQueryFlag &query_flag); OB_INLINE bool is_exist() { @@ -117,6 +119,7 @@ public: is_inited_ = false; ctx_ = NULL; value_ = NULL; + memtable_ls_id_.reset(); version_iter_ = NULL; } int check_row_locked(storage::ObStoreRowLockState &lock_state); @@ -138,7 +141,7 @@ public: transaction::ObTransID get_reader_tx_id() const { return ctx_->tx_id_; } transaction::ObTransID get_snapshot_tx_id() const { return ctx_->snapshot_.tx_id_; } - TO_STRING_KV(KPC_(value), KPC_(version_iter), KPC_(ctx)); + TO_STRING_KV(KPC_(value), KPC_(version_iter), KPC_(ctx), K_(memtable_ls_id)); private: int lock_for_read_(const ObQueryFlag &flag); int lock_for_read_inner_(const ObQueryFlag &flag, ObMvccTransNode *&iter); @@ -153,6 +156,7 @@ private: bool is_inited_; ObMvccAccessCtx *ctx_; ObMvccRow *value_; + share::ObLSID memtable_ls_id_; ObMvccTransNode *version_iter_; }; @@ -167,6 +171,7 @@ public: int init(ObQueryEngine &query_engine, ObMvccAccessCtx &ctx, const ObMvccScanRange &range, + const share::ObLSID memtable_ls_id, const ObQueryFlag &query_flag); int get_next_row(const ObMemtableKey *&key, ObMvccValueIterator *&value_iter, @@ -187,6 +192,7 @@ private: private: bool is_inited_; ObMvccAccessCtx *ctx_; + share::ObLSID memtable_ls_id_; ObQueryFlag query_flag_; ObMvccValueIterator value_iter_; ObQueryEngine *query_engine_; diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index a6aafe14e..cf428343d 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -682,6 +682,7 @@ int ObMemtable::exist( } else if (OB_FAIL(mvcc_engine_.get(context.store_ctx_->mvcc_acc_ctx_, query_flag, ¶meter_mtk, + ls_id_, NULL, value_iter, lock_state))) { @@ -814,6 +815,7 @@ int ObMemtable::get( } else if (OB_FAIL(mvcc_engine_.get(context.store_ctx_->mvcc_acc_ctx_, context.query_flag_, ¶meter_mtk, + ls_id_, &returned_mtk, value_iter, lock_state))) { diff --git a/src/storage/memtable/ob_memtable_iterator.cpp b/src/storage/memtable/ob_memtable_iterator.cpp index 4cf972d86..85ea6b7e8 100644 --- a/src/storage/memtable/ob_memtable_iterator.cpp +++ b/src/storage/memtable/ob_memtable_iterator.cpp @@ -254,6 +254,7 @@ int ObMemtableScanIterator::prepare_scan() ObMemtableKey* start_key = NULL; ObMemtableKey* end_key = NULL; const ObColDescIArray *out_cols = nullptr; + if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; } else if (is_scan_start_) { @@ -273,6 +274,9 @@ int ObMemtableScanIterator::prepare_scan() } else if (OB_FAIL(ObMemtableKey::build( end_key, *out_cols, &range.get_end_key().get_store_rowkey(), *context_->get_range_allocator()))) { TRANS_LOG(WARN, "end key build fail", K(param_->table_id_), K(range)); + } else if (OB_ISNULL(memtable_)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "fail to get memtable", K(ret)); } else { ObMvccEngine& mvcc_engine = ((ObMemtable*)memtable_)->get_mvcc_engine(); ObMvccScanRange mvcc_scan_range; @@ -283,6 +287,7 @@ int ObMemtableScanIterator::prepare_scan() if (OB_FAIL(mvcc_engine.scan(context_->store_ctx_->mvcc_acc_ctx_, context_->query_flag_, mvcc_scan_range, + memtable_->get_ls_id(), row_iter_))) { TRANS_LOG(WARN, "mvcc engine scan fail", K(ret), K(mvcc_scan_range)); } else if (OB_FAIL(bitmap_.init(read_info_->get_request_count(), read_info_->get_schema_rowkey_count()))) {