let transfer src read as fast commit
This commit is contained in:
		@ -1,3 +1,12 @@
 | 
				
			|||||||
 | 
					add_library(mit_env env/ob_simple_server_helper.cpp)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					target_include_directories(mit_env PUBLIC
 | 
				
			||||||
 | 
					  ${CMAKE_SOURCE_DIR}/unittest ${CMAKE_SOURCE_DIR}/mittest)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					target_link_libraries(mit_env PUBLIC
 | 
				
			||||||
 | 
					  oceanbase
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
add_subdirectory(logservice)
 | 
					add_subdirectory(logservice)
 | 
				
			||||||
add_subdirectory(simple_server)
 | 
					add_subdirectory(simple_server)
 | 
				
			||||||
add_subdirectory(mtlenv)
 | 
					add_subdirectory(mtlenv)
 | 
				
			||||||
 | 
				
			|||||||
@ -9,9 +9,11 @@ add_library(simple_replica_test ${OBSERVER_TEST_SRCS})
 | 
				
			|||||||
target_include_directories(
 | 
					target_include_directories(
 | 
				
			||||||
  simple_replica_test PUBLIC
 | 
					  simple_replica_test PUBLIC
 | 
				
			||||||
  ${CMAKE_SOURCE_DIR}/unittest ${CMAKE_SOURCE_DIR}/mittest)
 | 
					  ${CMAKE_SOURCE_DIR}/unittest ${CMAKE_SOURCE_DIR}/mittest)
 | 
				
			||||||
  target_link_libraries(simple_replica_test PUBLIC
 | 
					
 | 
				
			||||||
 | 
					target_link_libraries(simple_replica_test PUBLIC
 | 
				
			||||||
  oceanbase
 | 
					  oceanbase
 | 
				
			||||||
  )
 | 
					  mit_env
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
function(ob_unittest_multi_replica case)
 | 
					function(ob_unittest_multi_replica case)
 | 
				
			||||||
  ob_unittest(${ARGV})
 | 
					  ob_unittest(${ARGV})
 | 
				
			||||||
@ -26,3 +28,4 @@ ob_unittest_multi_replica(test_ob_dup_table_restart)
 | 
				
			|||||||
ob_unittest_multi_replica(test_ob_dup_table_leader_switch)
 | 
					ob_unittest_multi_replica(test_ob_dup_table_leader_switch)
 | 
				
			||||||
ob_unittest_multi_replica(test_ob_dup_table_tablet_gc)
 | 
					ob_unittest_multi_replica(test_ob_dup_table_tablet_gc)
 | 
				
			||||||
ob_unittest_multi_replica(test_mds_replay_from_ctx_table)
 | 
					ob_unittest_multi_replica(test_mds_replay_from_ctx_table)
 | 
				
			||||||
 | 
					ob_unittest_multi_replica(test_multi_transfer_tx)
 | 
				
			||||||
 | 
				
			|||||||
@ -728,7 +728,8 @@ int ObMultiReplicaTestBase::close()
 | 
				
			|||||||
int ObMultiReplicaTestBase::create_tenant(const char *tenant_name,
 | 
					int ObMultiReplicaTestBase::create_tenant(const char *tenant_name,
 | 
				
			||||||
                                          const char *memory_size,
 | 
					                                          const char *memory_size,
 | 
				
			||||||
                                          const char *log_disk_size,
 | 
					                                          const char *log_disk_size,
 | 
				
			||||||
                                          const bool oracle_mode)
 | 
					                                          const bool oracle_mode,
 | 
				
			||||||
 | 
					                                          const char *primary_zone)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  SERVER_LOG(INFO, "create tenant start");
 | 
					  SERVER_LOG(INFO, "create tenant start");
 | 
				
			||||||
  int32_t log_level;
 | 
					  int32_t log_level;
 | 
				
			||||||
@ -787,9 +788,9 @@ int ObMultiReplicaTestBase::create_tenant(const char *tenant_name,
 | 
				
			|||||||
    ObSqlString sql;
 | 
					    ObSqlString sql;
 | 
				
			||||||
    if (OB_FAIL(ret)) {
 | 
					    if (OB_FAIL(ret)) {
 | 
				
			||||||
    } else if (OB_FAIL(sql.assign_fmt(
 | 
					    } else if (OB_FAIL(sql.assign_fmt(
 | 
				
			||||||
                   "create tenant %s replica_num = 3, primary_zone='zone1', "
 | 
					                   "create tenant %s replica_num = 3, primary_zone='%s', "
 | 
				
			||||||
                   "resource_pool_list=('pool_ym_%s') set ob_tcp_invited_nodes='%%'%s",
 | 
					                   "resource_pool_list=('pool_ym_%s') set ob_tcp_invited_nodes='%%'%s",
 | 
				
			||||||
                   tenant_name, tenant_name,
 | 
					                   tenant_name, primary_zone, tenant_name,
 | 
				
			||||||
                   oracle_mode ? ", ob_compatibility_mode='oracle'" : ";"))) {
 | 
					                   oracle_mode ? ", ob_compatibility_mode='oracle'" : ";"))) {
 | 
				
			||||||
      SERVER_LOG(WARN, "create_tenant", K(ret));
 | 
					      SERVER_LOG(WARN, "create_tenant", K(ret));
 | 
				
			||||||
    } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) {
 | 
					    } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) {
 | 
				
			||||||
 | 
				
			|||||||
@ -81,7 +81,8 @@ public:
 | 
				
			|||||||
  int create_tenant(const char *tenant_name = DEFAULT_TEST_TENANT_NAME,
 | 
					  int create_tenant(const char *tenant_name = DEFAULT_TEST_TENANT_NAME,
 | 
				
			||||||
                    const char *memory_size = "2G",
 | 
					                    const char *memory_size = "2G",
 | 
				
			||||||
                    const char *log_disk_size = "2G",
 | 
					                    const char *log_disk_size = "2G",
 | 
				
			||||||
                    const bool oracle_mode = false);
 | 
					                    const bool oracle_mode = false,
 | 
				
			||||||
 | 
					                    const char *primary_zone = "zone1");
 | 
				
			||||||
  int delete_tenant(const char *tenant_name = DEFAULT_TEST_TENANT_NAME);
 | 
					  int delete_tenant(const char *tenant_name = DEFAULT_TEST_TENANT_NAME);
 | 
				
			||||||
  int get_tenant_id(uint64_t &tenant_id, const char *tenant_name = DEFAULT_TEST_TENANT_NAME);
 | 
					  int get_tenant_id(uint64_t &tenant_id, const char *tenant_name = DEFAULT_TEST_TENANT_NAME);
 | 
				
			||||||
  int exec_write_sql_sys(const char *sql_str, int64_t &affected_rows);
 | 
					  int exec_write_sql_sys(const char *sql_str, int64_t &affected_rows);
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										335
									
								
								mittest/multi_replica/test_multi_transfer_tx.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										335
									
								
								mittest/multi_replica/test_multi_transfer_tx.cpp
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,335 @@
 | 
				
			|||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * Copyright (c) 2021 OceanBase
 | 
				
			||||||
 | 
					 * OceanBase CE is licensed under Mulan PubL v2.
 | 
				
			||||||
 | 
					 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
				
			||||||
 | 
					 * You may obtain a copy of Mulan PubL v2 at:
 | 
				
			||||||
 | 
					 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
				
			||||||
 | 
					 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
				
			||||||
 | 
					 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
				
			||||||
 | 
					 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
				
			||||||
 | 
					 * See the Mulan PubL v2 for more details.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					#include <gtest/gtest.h>
 | 
				
			||||||
 | 
					#define USING_LOG_PREFIX SERVER
 | 
				
			||||||
 | 
					#define protected public
 | 
				
			||||||
 | 
					#define private public
 | 
				
			||||||
 | 
					#include "env/ob_multi_replica_test_base.h"
 | 
				
			||||||
 | 
					#include "env/ob_multi_replica_util.h"
 | 
				
			||||||
 | 
					#include "mittest/env/ob_simple_server_helper.h"
 | 
				
			||||||
 | 
					#include "lib/mysqlclient/ob_mysql_result.h"
 | 
				
			||||||
 | 
					#include "storage/tx/ob_trans_part_ctx.h"
 | 
				
			||||||
 | 
					#include "storage/compaction/ob_tablet_merge_task.h"
 | 
				
			||||||
 | 
					#include "storage/compaction/ob_tablet_merge_ctx.h"
 | 
				
			||||||
 | 
					#include "storage/high_availability/ob_tablet_backfill_tx.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#define CUR_TEST_CASE_NAME ObTestMultiTransferTx
 | 
				
			||||||
 | 
					DEFINE_MULTI_ZONE_TEST_CASE_CLASS
 | 
				
			||||||
 | 
					MULTI_REPLICA_TEST_MAIN_FUNCTION(test_multi_transfer_tx_);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static ObTabletID MULTI_TRANSFER_TX_CHOOSEN_TABLET_ID;
 | 
				
			||||||
 | 
					static oceanbase::transaction::ObTransID MULTI_TRANSFER_TX_CHOOSEN_TX_ID;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					namespace oceanbase
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					namespace transaction
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					int ObPartTransCtx::replay_rollback_to(const ObTxRollbackToLog &log,
 | 
				
			||||||
 | 
					                                       const palf::LSN &offset,
 | 
				
			||||||
 | 
					                                       const SCN ×tamp,
 | 
				
			||||||
 | 
					                                       const int64_t &part_log_no,
 | 
				
			||||||
 | 
					                                       const bool is_tx_log_queue,
 | 
				
			||||||
 | 
					                                       const bool pre_barrier)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					  common::ObTimeGuard timeguard("replay_rollback_to", 10 * 1000);
 | 
				
			||||||
 | 
					  // int64_t start = ObTimeUtility::fast_current_time();
 | 
				
			||||||
 | 
					  CtxLockGuard guard(lock_);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (trans_id_ == MULTI_TRANSFER_TX_CHOOSEN_TX_ID) {
 | 
				
			||||||
 | 
					    TRANS_LOG(INFO, "qianchen debuf2", KPC(this));
 | 
				
			||||||
 | 
					    return OB_EAGAIN;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  bool need_replay = true;
 | 
				
			||||||
 | 
					  ObTxSEQ from = log.get_from();
 | 
				
			||||||
 | 
					  ObTxSEQ to = log.get_to();
 | 
				
			||||||
 | 
					  if (OB_UNLIKELY(from.get_branch() != to.get_branch())) {
 | 
				
			||||||
 | 
					    ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
 | 
					    TRANS_LOG(ERROR, "invalid savepoint", K(log));
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  //
 | 
				
			||||||
 | 
					  // the log is replay in txn log queue
 | 
				
			||||||
 | 
					  // for parallel replay, a global savepoint after the serial final log
 | 
				
			||||||
 | 
					  // must set the pre-barrier replay flag
 | 
				
			||||||
 | 
					  // some branch savepoint also need this, but we can't distinguish
 | 
				
			||||||
 | 
					  // hence only sanity check for global savepoint
 | 
				
			||||||
 | 
					  //
 | 
				
			||||||
 | 
					  else if (is_tx_log_queue) {
 | 
				
			||||||
 | 
					    if (is_parallel_logging()             // has enter parallel logging
 | 
				
			||||||
 | 
					        && to.get_branch() == 0           // is a global savepoint
 | 
				
			||||||
 | 
					        && timestamp > exec_info_.serial_final_scn_  // it is after the serial final log
 | 
				
			||||||
 | 
					        && !pre_barrier) {
 | 
				
			||||||
 | 
					      ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
 | 
					      TRANS_LOG(ERROR, "missing pre barrier flag for parallel replay", KR(ret), K(*this));
 | 
				
			||||||
 | 
					      usleep(5000);
 | 
				
			||||||
 | 
					      ob_abort();
 | 
				
			||||||
 | 
					    } else if (OB_FAIL(check_replay_avaliable_(offset, timestamp, part_log_no, need_replay))) {
 | 
				
			||||||
 | 
					      TRANS_LOG(WARN, "check replay available failed", KR(ret), K(offset), K(timestamp), K(*this));
 | 
				
			||||||
 | 
					    } else if (!need_replay) {
 | 
				
			||||||
 | 
					      TRANS_LOG(INFO, "need not replay log", K(log), K(timestamp), K(offset), K(*this));
 | 
				
			||||||
 | 
					    } else if (OB_FAIL((update_replaying_log_no_(timestamp, part_log_no)))) {
 | 
				
			||||||
 | 
					      TRANS_LOG(WARN, "update replaying log no failed", K(ret), K(timestamp), K(part_log_no));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  } else if (exec_info_.need_checksum_ && !has_replay_serial_final_()) {
 | 
				
			||||||
 | 
					    ret = OB_EAGAIN;
 | 
				
			||||||
 | 
					    TRANS_LOG(INFO, "branch savepoint should wait replay serial final because of calc checksum",
 | 
				
			||||||
 | 
					              K(ret), K(timestamp), KP(this), K(trans_id_), K(ls_id_), K(exec_info_));
 | 
				
			||||||
 | 
					  } else if (!ctx_tx_data_.get_start_log_ts().is_valid() && OB_FAIL(ctx_tx_data_.set_start_log_ts(timestamp))) {
 | 
				
			||||||
 | 
					    // update start_log_ts for branch savepoint, because it may replayed before first log in txn queue
 | 
				
			||||||
 | 
					    TRANS_LOG(WARN, "set tx data start log ts fail", K(ret), K(timestamp), KPC(this));
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  //
 | 
				
			||||||
 | 
					  // Step1, add Undo into TxData, both for parallel replay and serial replay
 | 
				
			||||||
 | 
					  //
 | 
				
			||||||
 | 
					  if (OB_SUCC(ret) && need_replay && OB_FAIL(rollback_to_savepoint_(log.get_from(), log.get_to(), timestamp))) {
 | 
				
			||||||
 | 
					    TRANS_LOG(WARN, "replay savepoint_rollback fail", K(ret), K(log), K(offset), K(timestamp),
 | 
				
			||||||
 | 
					              KPC(this));
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  // this is compatible code, since 4.3, redo_lsn not collect during replay
 | 
				
			||||||
 | 
					  if (OB_SUCC(ret) && OB_FAIL(check_and_merge_redo_lsns_(offset))) {
 | 
				
			||||||
 | 
					    TRANS_LOG(WARN, "check and merge redo lsns failed", K(ret), K(trans_id_), K(timestamp), K(offset));
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  //
 | 
				
			||||||
 | 
					  // Step2, remove TxNode(s)
 | 
				
			||||||
 | 
					  //
 | 
				
			||||||
 | 
					  if (OB_SUCC(ret) && !need_replay) {
 | 
				
			||||||
 | 
					    if (OB_FAIL(mt_ctx_.rollback(log.get_to(), log.get_from(), timestamp))) {
 | 
				
			||||||
 | 
					      TRANS_LOG(WARN, "mt ctx rollback fail", K(ret), K(log), KPC(this));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (OB_FAIL(ret) && OB_EAGAIN != ret) {
 | 
				
			||||||
 | 
					    TRANS_LOG(WARN, "[Replay Tx] Replay RollbackToLog in TxCtx Failed", K(timestamp), K(offset),
 | 
				
			||||||
 | 
					              K(ret), K(need_replay), K(log), KPC(this));
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					#ifndef NDEBUG
 | 
				
			||||||
 | 
					    TRANS_LOG(INFO, "[Replay Tx] Replay RollbackToLog in TxCtx", K(timestamp), K(offset), K(ret),
 | 
				
			||||||
 | 
					              K(need_replay), K(log), KPC(this));
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (OB_EAGAIN != ret) {
 | 
				
			||||||
 | 
					    REC_TRANS_TRACE_EXT(tlog_,
 | 
				
			||||||
 | 
					                        replay_rollback_to,
 | 
				
			||||||
 | 
					                        OB_ID(ret),
 | 
				
			||||||
 | 
					                        ret,
 | 
				
			||||||
 | 
					                        OB_ID(used),
 | 
				
			||||||
 | 
					                        timeguard.get_diff(),
 | 
				
			||||||
 | 
					                        OB_Y(need_replay),
 | 
				
			||||||
 | 
					                        OB_ID(offset),
 | 
				
			||||||
 | 
					                        offset.val_,
 | 
				
			||||||
 | 
					                        OB_ID(t),
 | 
				
			||||||
 | 
					                        timestamp,
 | 
				
			||||||
 | 
					                        OB_ID(ref),
 | 
				
			||||||
 | 
					                        get_ref());
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					namespace storage
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					int ObTransferWorkerMgr::do_transfer_backfill_tx_(const ObTransferBackfillTXParam ¶m)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					  STORAGE_LOG(INFO, "qianchen debuf3", K(param));
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					namespace compaction
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					int ObTabletMergeFinishTask::process()
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					  int tmp_ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					  ObTaskController::get().switch_task(share::ObTaskType::DATA_MAINTAIN);
 | 
				
			||||||
 | 
					  ObTabletMergeCtx *ctx_ptr = nullptr;
 | 
				
			||||||
 | 
					  DEBUG_SYNC(MERGE_PARTITION_FINISH_TASK);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (IS_NOT_INIT) {
 | 
				
			||||||
 | 
					    ret = OB_NOT_INIT;
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "not inited yet", K(ret));
 | 
				
			||||||
 | 
					  } else if (OB_UNLIKELY(nullptr == merge_dag_
 | 
				
			||||||
 | 
					      || (nullptr == (ctx_ptr = static_cast<ObTabletMergeCtx *>(merge_dag_->get_ctx()))))) {
 | 
				
			||||||
 | 
					    ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "get unexpected null ctx", K(ret));
 | 
				
			||||||
 | 
					  } else if (FALSE_IT(SET_MEM_CTX(ctx_ptr->mem_ctx_))) {
 | 
				
			||||||
 | 
					  } else if (ctx_ptr->get_tablet_id() == MULTI_TRANSFER_TX_CHOOSEN_TABLET_ID) {
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "qianchen debuf find tablet", KR(ret), KPC(ctx_ptr));
 | 
				
			||||||
 | 
					    ret = OB_EAGAIN;
 | 
				
			||||||
 | 
					  } else if (OB_FAIL(ctx_ptr->update_tablet_after_merge())) {
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "failed to update tablet after merge", KR(ret), KPC(ctx_ptr));
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  if (OB_FAIL(ret)) {
 | 
				
			||||||
 | 
					    STORAGE_LOG(WARN, "sstable merge failed", K(ret), KPC(ctx_ptr), "task", *(static_cast<ObITask *>(this)));
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    ObITable *sstable = ctx_ptr->merged_table_handle_.get_table();
 | 
				
			||||||
 | 
					    // ATTENTION! Critical diagnostic log, DO NOT CHANGE!!!
 | 
				
			||||||
 | 
					    STORAGE_LOG(INFO, "sstable merge finish", K(ret), "merge_info", ctx_ptr->get_merge_info(),
 | 
				
			||||||
 | 
					        KPC(sstable), "mem_peak", ctx_ptr->mem_ctx_.get_total_mem_peak(), "compat_mode", merge_dag_->get_compat_mode(),
 | 
				
			||||||
 | 
					        "time_guard", ctx_ptr->info_collector_.time_guard_);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  return ret;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					namespace unittest
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#define EXE_SQL(sql_str)                                            \
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str));                       \
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#define EXE_SQL_FMT(...)                                            \
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__));               \
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void get_tablet_info_with_table_name(common::ObMySQLProxy &sql_proxy,
 | 
				
			||||||
 | 
					                                     const char *name,
 | 
				
			||||||
 | 
					                                     int64_t &table_id,
 | 
				
			||||||
 | 
					                                     int64_t &object_id,
 | 
				
			||||||
 | 
					                                     int64_t &tablet_id,
 | 
				
			||||||
 | 
					                                     int64_t &ls_id)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					  ObSqlString sql;
 | 
				
			||||||
 | 
					  int64_t affected_rows = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("SELECT table_id, object_id, tablet_id, ls_id FROM oceanbase.DBA_OB_TABLE_LOCATIONS WHERE TABLE_NAME= '%s';", name));
 | 
				
			||||||
 | 
					  SMART_VAR(ObMySQLProxy::MySQLResult, res) {
 | 
				
			||||||
 | 
					    ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr()));
 | 
				
			||||||
 | 
					    sqlclient::ObMySQLResult *result = res.get_result();
 | 
				
			||||||
 | 
					    ASSERT_NE(nullptr, result);
 | 
				
			||||||
 | 
					    ASSERT_EQ(OB_SUCCESS, result->next());
 | 
				
			||||||
 | 
					    ASSERT_EQ(OB_SUCCESS, result->get_int("table_id", table_id));
 | 
				
			||||||
 | 
					    ASSERT_EQ(OB_SUCCESS, result->get_int("object_id", object_id));
 | 
				
			||||||
 | 
					    ASSERT_EQ(OB_SUCCESS, result->get_int("tablet_id", tablet_id));
 | 
				
			||||||
 | 
					    ASSERT_EQ(OB_SUCCESS, result->get_int("ls_id", ls_id));
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					TEST_F(GET_ZONE_TEST_CLASS_NAME(1), create_test_env)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					  ObSqlString sql;
 | 
				
			||||||
 | 
					  int64_t affected_rows = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  // ============================== Phase1. create tenant ==============================
 | 
				
			||||||
 | 
					  uint64_t tenant_id;
 | 
				
			||||||
 | 
					  SERVER_LOG(INFO, "create_tenant start");
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, create_tenant(DEFAULT_TEST_TENANT_NAME,
 | 
				
			||||||
 | 
					                                      "2G",
 | 
				
			||||||
 | 
					                                      "2G",
 | 
				
			||||||
 | 
					                                      false,
 | 
				
			||||||
 | 
					                                      "zone1, zone2"));
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, get_tenant_id(tenant_id));
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
 | 
				
			||||||
 | 
					  SERVER_LOG(INFO, "create_tenant end", K(tenant_id));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  SERVER_LOG(INFO, "[ObMultiReplicaTestBase] create test tenant success", K(tenant_id));
 | 
				
			||||||
 | 
					  common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
 | 
				
			||||||
 | 
					  sqlclient::ObISQLConnection *connection_qc = nullptr;
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection_qc));
 | 
				
			||||||
 | 
					  ASSERT_NE(nullptr, connection_qc);
 | 
				
			||||||
 | 
					  WRITE_SQL_BY_CONN(connection_qc, "alter system set partition_balance_schedule_interval = '10s';");
 | 
				
			||||||
 | 
					  WRITE_SQL_BY_CONN(connection_qc, "alter system set _enable_active_txn_transfer = True;");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  // ============================== Phase2. create table ==============================
 | 
				
			||||||
 | 
					  TRANS_LOG(INFO, "create table qcc1 start");
 | 
				
			||||||
 | 
					  EXE_SQL("create table qcc1 (a int)");
 | 
				
			||||||
 | 
					  TRANS_LOG(INFO, "create_table qcc1 end");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  TRANS_LOG(INFO, "create table qcc2 start");
 | 
				
			||||||
 | 
					  EXE_SQL("create table qcc2 (a int)");
 | 
				
			||||||
 | 
					  TRANS_LOG(INFO, "create_table qcc2 end");
 | 
				
			||||||
 | 
					  usleep(3 * 1000 * 1000);
 | 
				
			||||||
 | 
					  ObLSID loc1, loc2;
 | 
				
			||||||
 | 
					  ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc1", loc1));
 | 
				
			||||||
 | 
					  ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc2", loc2));
 | 
				
			||||||
 | 
					  int64_t table1, table2;
 | 
				
			||||||
 | 
					  int64_t object1, object2;
 | 
				
			||||||
 | 
					  int64_t tablet1, tablet2;
 | 
				
			||||||
 | 
					  int64_t ls1, ls2;
 | 
				
			||||||
 | 
					  get_tablet_info_with_table_name(sql_proxy, "qcc1", table1, object1, tablet1, ls1);
 | 
				
			||||||
 | 
					  get_tablet_info_with_table_name(sql_proxy, "qcc2", table2, object2, tablet2, ls2);
 | 
				
			||||||
 | 
					  fprintf(stdout, "qcc is created successfully, loc1: %ld, loc2: %ld, table1: %ld, table2: %ld, tablet1: %ld, tablet2: %ld, ls1: %ld, ls2: %ld\n",
 | 
				
			||||||
 | 
					          loc1.id(), loc2.id(), table1, table2, tablet1, tablet2, ls1, ls2);
 | 
				
			||||||
 | 
					  ASSERT_NE(loc1, loc2);
 | 
				
			||||||
 | 
					  EXE_SQL("create tablegroup tg1 sharding='NONE';");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  MULTI_TRANSFER_TX_CHOOSEN_TABLET_ID = tablet2;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  // ============================== Phase5. start the user txn ==============================
 | 
				
			||||||
 | 
					  sqlclient::ObISQLConnection *user_connection = nullptr;
 | 
				
			||||||
 | 
					  ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(user_connection));
 | 
				
			||||||
 | 
					  ASSERT_NE(nullptr, user_connection);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_trx_timeout = 10000000000");
 | 
				
			||||||
 | 
					  WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_trx_idle_timeout = 10000000000");
 | 
				
			||||||
 | 
					  WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_query_timeout = 10000000000");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  TRANS_LOG(INFO, "start the txn");
 | 
				
			||||||
 | 
					  WRITE_SQL_BY_CONN(user_connection, "begin;");
 | 
				
			||||||
 | 
					  WRITE_SQL_BY_CONN(user_connection, "savepoint qc1");
 | 
				
			||||||
 | 
					  WRITE_SQL_BY_CONN(user_connection, "insert into qcc1 values(1);");
 | 
				
			||||||
 | 
					  WRITE_SQL_BY_CONN(user_connection, "insert into qcc2 values(1);");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  ObTransID tx_id;
 | 
				
			||||||
 | 
					  ASSERT_EQ(0, SSH::find_tx(user_connection, tx_id));
 | 
				
			||||||
 | 
					  MULTI_TRANSFER_TX_CHOOSEN_TX_ID = tx_id;
 | 
				
			||||||
 | 
					  fprintf(stdout, "starting the user txn, %lu\n", tx_id.get_id());
 | 
				
			||||||
 | 
					  TRANS_LOG(INFO, "starting the user txn", K(tx_id));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  // ============================== Phase5. start the transfer ==============================
 | 
				
			||||||
 | 
					  EXE_SQL("alter tablegroup tg1 add qcc1,qcc2;");
 | 
				
			||||||
 | 
					  usleep(1 * 1000 * 1000);
 | 
				
			||||||
 | 
					  int64_t begin_time = ObTimeUtility::current_time();
 | 
				
			||||||
 | 
					  while (true) {
 | 
				
			||||||
 | 
					    ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc1", loc1));
 | 
				
			||||||
 | 
					    ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc2", loc2));
 | 
				
			||||||
 | 
					    if (loc1 == loc2) {
 | 
				
			||||||
 | 
					      fprintf(stdout, "succeed wait for balancer\n");
 | 
				
			||||||
 | 
					      break;
 | 
				
			||||||
 | 
					    } else if (ObTimeUtility::current_time() - begin_time > 300 * 1000 * 1000) {
 | 
				
			||||||
 | 
					      fprintf(stdout, "ERROR: fail to wait for balancer\n");
 | 
				
			||||||
 | 
					      break;
 | 
				
			||||||
 | 
					    } else {
 | 
				
			||||||
 | 
					      usleep(1 * 1000 * 1000);
 | 
				
			||||||
 | 
					      fprintf(stdout, "wait for balancer\n");
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  ASSERT_EQ(loc1, loc2);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  WRITE_SQL_BY_CONN(user_connection, "rollback to qc1");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  HEAP_VAR(ObMySQLProxy::MySQLResult, res)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    ASSERT_EQ(OB_SUCCESS,
 | 
				
			||||||
 | 
					              user_connection->execute_read(OB_SYS_TENANT_ID,
 | 
				
			||||||
 | 
					                                          "select /*+log_level(debug)*/* from qcc2;",
 | 
				
			||||||
 | 
					                                          res));
 | 
				
			||||||
 | 
					    common::sqlclient::ObMySQLResult *result = res.mysql_result();
 | 
				
			||||||
 | 
					    ASSERT_EQ(OB_ITER_END, result->next());
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					} // namespace unittest
 | 
				
			||||||
 | 
					} // namespace oceanbase
 | 
				
			||||||
@ -2,7 +2,6 @@ set(OBSERVER_TEST_SRCS
 | 
				
			|||||||
        env/ob_simple_server.cpp
 | 
					        env/ob_simple_server.cpp
 | 
				
			||||||
        env/ob_simple_server_restart_helper.cpp
 | 
					        env/ob_simple_server_restart_helper.cpp
 | 
				
			||||||
        env/ob_simple_cluster_test_base.cpp
 | 
					        env/ob_simple_cluster_test_base.cpp
 | 
				
			||||||
        env/ob_simple_server_helper.cpp
 | 
					 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
add_library(observer_test ${OBSERVER_TEST_SRCS})
 | 
					add_library(observer_test ${OBSERVER_TEST_SRCS})
 | 
				
			||||||
@ -11,6 +10,7 @@ target_include_directories(observer_test PUBLIC
 | 
				
			|||||||
  ${CMAKE_SOURCE_DIR}/unittest ${CMAKE_SOURCE_DIR}/mittest)
 | 
					  ${CMAKE_SOURCE_DIR}/unittest ${CMAKE_SOURCE_DIR}/mittest)
 | 
				
			||||||
target_link_libraries(observer_test PUBLIC
 | 
					target_link_libraries(observer_test PUBLIC
 | 
				
			||||||
  oceanbase
 | 
					  oceanbase
 | 
				
			||||||
 | 
					  mit_env
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
function(ob_unittest_observer case)
 | 
					function(ob_unittest_observer case)
 | 
				
			||||||
 | 
				
			|||||||
@ -18,7 +18,7 @@
 | 
				
			|||||||
#include "env/ob_simple_cluster_test_base.h"
 | 
					#include "env/ob_simple_cluster_test_base.h"
 | 
				
			||||||
#include "rootserver/ob_tenant_balance_service.h"
 | 
					#include "rootserver/ob_tenant_balance_service.h"
 | 
				
			||||||
#include "share/balance/ob_balance_job_table_operator.h"
 | 
					#include "share/balance/ob_balance_job_table_operator.h"
 | 
				
			||||||
#include "mittest/simple_server/env/ob_simple_server_helper.h"
 | 
					#include "mittest/env/ob_simple_server_helper.h"
 | 
				
			||||||
#include "storage/tx_storage/ob_ls_service.h"
 | 
					#include "storage/tx_storage/ob_ls_service.h"
 | 
				
			||||||
#include "storage/tx/ob_tx_loop_worker.h"
 | 
					#include "storage/tx/ob_tx_loop_worker.h"
 | 
				
			||||||
#include "storage/tx/ob_trans_part_ctx.h"
 | 
					#include "storage/tx/ob_trans_part_ctx.h"
 | 
				
			||||||
 | 
				
			|||||||
@ -26,7 +26,7 @@
 | 
				
			|||||||
#include "rootserver/ob_balance_group_ls_stat_operator.h"
 | 
					#include "rootserver/ob_balance_group_ls_stat_operator.h"
 | 
				
			||||||
#include "storage/tablet/ob_tablet.h"
 | 
					#include "storage/tablet/ob_tablet.h"
 | 
				
			||||||
#include "logservice/ob_log_service.h"
 | 
					#include "logservice/ob_log_service.h"
 | 
				
			||||||
#include "mittest/simple_server/env/ob_simple_server_helper.h"
 | 
					#include "mittest/env/ob_simple_server_helper.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
namespace oceanbase
 | 
					namespace oceanbase
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
				
			|||||||
@ -18,7 +18,7 @@
 | 
				
			|||||||
#include "env/ob_simple_cluster_test_base.h"
 | 
					#include "env/ob_simple_cluster_test_base.h"
 | 
				
			||||||
#include "rootserver/ob_tenant_balance_service.h"
 | 
					#include "rootserver/ob_tenant_balance_service.h"
 | 
				
			||||||
#include "share/balance/ob_balance_job_table_operator.h"
 | 
					#include "share/balance/ob_balance_job_table_operator.h"
 | 
				
			||||||
#include "mittest/simple_server/env/ob_simple_server_helper.h"
 | 
					#include "mittest/env/ob_simple_server_helper.h"
 | 
				
			||||||
#include "storage/tx_storage/ob_ls_service.h"
 | 
					#include "storage/tx_storage/ob_ls_service.h"
 | 
				
			||||||
#include "storage/tx/ob_tx_loop_worker.h"
 | 
					#include "storage/tx/ob_tx_loop_worker.h"
 | 
				
			||||||
#include "storage/tx/ob_trans_part_ctx.h"
 | 
					#include "storage/tx/ob_trans_part_ctx.h"
 | 
				
			||||||
 | 
				
			|||||||
@ -106,6 +106,7 @@ int ObMvccEngine::try_compact_row_when_mvcc_read_(const SCN &snapshot_version,
 | 
				
			|||||||
int ObMvccEngine::get(ObMvccAccessCtx &ctx,
 | 
					int ObMvccEngine::get(ObMvccAccessCtx &ctx,
 | 
				
			||||||
                      const ObQueryFlag &query_flag,
 | 
					                      const ObQueryFlag &query_flag,
 | 
				
			||||||
                      const ObMemtableKey *parameter_key,
 | 
					                      const ObMemtableKey *parameter_key,
 | 
				
			||||||
 | 
					                      const share::ObLSID memtable_ls_id,
 | 
				
			||||||
                      ObMemtableKey *returned_key,
 | 
					                      ObMemtableKey *returned_key,
 | 
				
			||||||
                      ObMvccValueIterator &value_iter,
 | 
					                      ObMvccValueIterator &value_iter,
 | 
				
			||||||
                      ObStoreRowLockState &lock_state)
 | 
					                      ObStoreRowLockState &lock_state)
 | 
				
			||||||
@ -139,6 +140,7 @@ int ObMvccEngine::get(ObMvccAccessCtx &ctx,
 | 
				
			|||||||
    if (OB_FAIL(value_iter.init(ctx,
 | 
					    if (OB_FAIL(value_iter.init(ctx,
 | 
				
			||||||
                                returned_key,
 | 
					                                returned_key,
 | 
				
			||||||
                                value,
 | 
					                                value,
 | 
				
			||||||
 | 
					                                memtable_ls_id,
 | 
				
			||||||
                                query_flag))) {
 | 
					                                query_flag))) {
 | 
				
			||||||
      TRANS_LOG(WARN, "ObMvccValueIterator init fail", KR(ret));
 | 
					      TRANS_LOG(WARN, "ObMvccValueIterator init fail", KR(ret));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -153,6 +155,7 @@ int ObMvccEngine::scan(
 | 
				
			|||||||
    ObMvccAccessCtx &ctx,
 | 
					    ObMvccAccessCtx &ctx,
 | 
				
			||||||
    const ObQueryFlag &query_flag,
 | 
					    const ObQueryFlag &query_flag,
 | 
				
			||||||
    const ObMvccScanRange &range,
 | 
					    const ObMvccScanRange &range,
 | 
				
			||||||
 | 
					    const share::ObLSID memtable_ls_id,
 | 
				
			||||||
    ObMvccRowIterator &row_iter)
 | 
					    ObMvccRowIterator &row_iter)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
@ -165,6 +168,7 @@ int ObMvccEngine::scan(
 | 
				
			|||||||
  } else if (OB_FAIL(row_iter.init(*query_engine_,
 | 
					  } else if (OB_FAIL(row_iter.init(*query_engine_,
 | 
				
			||||||
                                   ctx,
 | 
					                                   ctx,
 | 
				
			||||||
                                   range,
 | 
					                                   range,
 | 
				
			||||||
 | 
					                                   memtable_ls_id,
 | 
				
			||||||
                                   query_flag))) {
 | 
					                                   query_flag))) {
 | 
				
			||||||
    TRANS_LOG(WARN, "row_iter init fail", K(ret));
 | 
					    TRANS_LOG(WARN, "row_iter init fail", K(ret));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
 | 
				
			|||||||
@ -91,12 +91,14 @@ public:
 | 
				
			|||||||
  int get(ObMvccAccessCtx &ctx,
 | 
					  int get(ObMvccAccessCtx &ctx,
 | 
				
			||||||
          const ObQueryFlag &query_flag,
 | 
					          const ObQueryFlag &query_flag,
 | 
				
			||||||
          const ObMemtableKey *parameter_key,
 | 
					          const ObMemtableKey *parameter_key,
 | 
				
			||||||
 | 
					          const share::ObLSID memtable_ls_id,
 | 
				
			||||||
          ObMemtableKey *internal_key,
 | 
					          ObMemtableKey *internal_key,
 | 
				
			||||||
          ObMvccValueIterator &value_iter,
 | 
					          ObMvccValueIterator &value_iter,
 | 
				
			||||||
          storage::ObStoreRowLockState &lock_state);
 | 
					          storage::ObStoreRowLockState &lock_state);
 | 
				
			||||||
  int scan(ObMvccAccessCtx &ctx,
 | 
					  int scan(ObMvccAccessCtx &ctx,
 | 
				
			||||||
           const ObQueryFlag &query_flag,
 | 
					           const ObQueryFlag &query_flag,
 | 
				
			||||||
           const ObMvccScanRange &range,
 | 
					           const ObMvccScanRange &range,
 | 
				
			||||||
 | 
					           const share::ObLSID memtable_ls_id,
 | 
				
			||||||
           ObMvccRowIterator &row_iter);
 | 
					           ObMvccRowIterator &row_iter);
 | 
				
			||||||
  int scan(ObMvccAccessCtx &ctx,
 | 
					  int scan(ObMvccAccessCtx &ctx,
 | 
				
			||||||
           const ObMvccScanRange &range,
 | 
					           const ObMvccScanRange &range,
 | 
				
			||||||
 | 
				
			|||||||
@ -19,6 +19,7 @@
 | 
				
			|||||||
#include "storage/memtable/ob_memtable_context.h"
 | 
					#include "storage/memtable/ob_memtable_context.h"
 | 
				
			||||||
#include "storage/memtable/ob_row_conflict_handler.h"
 | 
					#include "storage/memtable/ob_row_conflict_handler.h"
 | 
				
			||||||
#include "storage/tx/ob_trans_ctx.h"
 | 
					#include "storage/tx/ob_trans_ctx.h"
 | 
				
			||||||
 | 
					#include "storage/ls/ob_ls.h"
 | 
				
			||||||
#include "common/ob_clock_generator.h"
 | 
					#include "common/ob_clock_generator.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
namespace oceanbase
 | 
					namespace oceanbase
 | 
				
			||||||
@ -33,6 +34,7 @@ namespace memtable
 | 
				
			|||||||
int ObMvccValueIterator::init(ObMvccAccessCtx &ctx,
 | 
					int ObMvccValueIterator::init(ObMvccAccessCtx &ctx,
 | 
				
			||||||
                              const ObMemtableKey *key,
 | 
					                              const ObMemtableKey *key,
 | 
				
			||||||
                              ObMvccRow *value,
 | 
					                              ObMvccRow *value,
 | 
				
			||||||
 | 
					                              const share::ObLSID memtable_ls_id,
 | 
				
			||||||
                              const ObQueryFlag &query_flag)
 | 
					                              const ObQueryFlag &query_flag)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
@ -46,6 +48,7 @@ int ObMvccValueIterator::init(ObMvccAccessCtx &ctx,
 | 
				
			|||||||
    is_inited_ = true;
 | 
					    is_inited_ = true;
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    value_ = value;
 | 
					    value_ = value;
 | 
				
			||||||
 | 
					    memtable_ls_id_ = memtable_ls_id;
 | 
				
			||||||
    if (OB_FAIL(lock_for_read_(query_flag))) {
 | 
					    if (OB_FAIL(lock_for_read_(query_flag))) {
 | 
				
			||||||
      TRANS_LOG(WARN, "fail to find start pos for iterator", K(ret));
 | 
					      TRANS_LOG(WARN, "fail to find start pos for iterator", K(ret));
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
@ -53,12 +56,13 @@ int ObMvccValueIterator::init(ObMvccAccessCtx &ctx,
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  TRANS_LOG(TRACE, "value_iter.init", K(ret),
 | 
					  TRANS_LOG(TRACE, "value_iter.init", K(ret),
 | 
				
			||||||
          KPC(value),
 | 
					            KPC(value),
 | 
				
			||||||
          KPC_(version_iter),
 | 
					            KPC_(version_iter),
 | 
				
			||||||
          K(query_flag.is_read_latest()),
 | 
					            K(query_flag.is_read_latest()),
 | 
				
			||||||
          KPC(key),
 | 
					            KPC(key),
 | 
				
			||||||
          K(ctx),
 | 
					            K(ctx),
 | 
				
			||||||
          K(lbt()));
 | 
					            K(memtable_ls_id),
 | 
				
			||||||
 | 
					            K(lbt()));
 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -149,6 +153,10 @@ int ObMvccValueIterator::lock_for_read_inner_(const ObQueryFlag &flag,
 | 
				
			|||||||
  if ((is_committed || is_aborted || (is_elr && !is_delayed_cleanout))
 | 
					  if ((is_committed || is_aborted || (is_elr && !is_delayed_cleanout))
 | 
				
			||||||
      // Opt2: data is not decided while we donot need cleanout
 | 
					      // Opt2: data is not decided while we donot need cleanout
 | 
				
			||||||
      || (!is_delayed_cleanout
 | 
					      || (!is_delayed_cleanout
 | 
				
			||||||
 | 
					          && (!ctx_->get_tx_table_guards().src_tx_table_guard_.is_valid() ||
 | 
				
			||||||
 | 
					              (memtable_ls_id_.is_valid() &&
 | 
				
			||||||
 | 
					               ctx_->get_tx_table_guards().src_tx_table_guard_.get_tx_table()->
 | 
				
			||||||
 | 
					               get_ls_id() != memtable_ls_id_))
 | 
				
			||||||
          && (// Opt2.1: snapshot reads the data written by snapshot
 | 
					          && (// Opt2.1: snapshot reads the data written by snapshot
 | 
				
			||||||
            data_tx_id == ctx_->snapshot_.tx_id_ ||
 | 
					            data_tx_id == ctx_->snapshot_.tx_id_ ||
 | 
				
			||||||
            // Opt2.2: read reader's latest is matched
 | 
					            // Opt2.2: read reader's latest is matched
 | 
				
			||||||
@ -357,6 +365,7 @@ int ObMvccValueIterator::check_row_locked(ObStoreRowLockState &lock_state)
 | 
				
			|||||||
ObMvccRowIterator::ObMvccRowIterator()
 | 
					ObMvccRowIterator::ObMvccRowIterator()
 | 
				
			||||||
    : is_inited_(false),
 | 
					    : is_inited_(false),
 | 
				
			||||||
      ctx_(NULL),
 | 
					      ctx_(NULL),
 | 
				
			||||||
 | 
					      memtable_ls_id_(),
 | 
				
			||||||
      query_flag_(),
 | 
					      query_flag_(),
 | 
				
			||||||
      value_iter_(),
 | 
					      value_iter_(),
 | 
				
			||||||
      query_engine_(NULL),
 | 
					      query_engine_(NULL),
 | 
				
			||||||
@ -373,6 +382,7 @@ int ObMvccRowIterator::init(
 | 
				
			|||||||
    ObQueryEngine &query_engine,
 | 
					    ObQueryEngine &query_engine,
 | 
				
			||||||
    ObMvccAccessCtx &ctx,
 | 
					    ObMvccAccessCtx &ctx,
 | 
				
			||||||
    const ObMvccScanRange &range,
 | 
					    const ObMvccScanRange &range,
 | 
				
			||||||
 | 
					    const share::ObLSID memtable_ls_id,
 | 
				
			||||||
    const ObQueryFlag &query_flag)
 | 
					    const ObQueryFlag &query_flag)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
@ -390,6 +400,7 @@ int ObMvccRowIterator::init(
 | 
				
			|||||||
    query_flag_ = query_flag;
 | 
					    query_flag_ = query_flag;
 | 
				
			||||||
    query_engine_ = &query_engine;
 | 
					    query_engine_ = &query_engine;
 | 
				
			||||||
    query_engine_iter_->set_version(ctx.snapshot_.version_.get_val_for_tx());
 | 
					    query_engine_iter_->set_version(ctx.snapshot_.version_.get_val_for_tx());
 | 
				
			||||||
 | 
					    memtable_ls_id_ = memtable_ls_id;
 | 
				
			||||||
    is_inited_ = true;
 | 
					    is_inited_ = true;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
@ -435,6 +446,7 @@ int ObMvccRowIterator::get_next_row(
 | 
				
			|||||||
      if (OB_FAIL(value_iter_.init(*ctx_,
 | 
					      if (OB_FAIL(value_iter_.init(*ctx_,
 | 
				
			||||||
                                   tmp_key,
 | 
					                                   tmp_key,
 | 
				
			||||||
                                   value,
 | 
					                                   value,
 | 
				
			||||||
 | 
					                                   memtable_ls_id_,
 | 
				
			||||||
                                   query_flag_))) {
 | 
					                                   query_flag_))) {
 | 
				
			||||||
        TRANS_LOG(WARN, "value iter init fail", K(ret), "ctx", *ctx_, KP(value), K(*value));
 | 
					        TRANS_LOG(WARN, "value iter init fail", K(ret), "ctx", *ctx_, KP(value), K(*value));
 | 
				
			||||||
      } else if (!value_iter_.is_exist()) {
 | 
					      } else if (!value_iter_.is_exist()) {
 | 
				
			||||||
@ -455,6 +467,7 @@ void ObMvccRowIterator::reset()
 | 
				
			|||||||
{
 | 
					{
 | 
				
			||||||
  is_inited_ = false;
 | 
					  is_inited_ = false;
 | 
				
			||||||
  ctx_ = NULL;
 | 
					  ctx_ = NULL;
 | 
				
			||||||
 | 
					  memtable_ls_id_.reset();
 | 
				
			||||||
  query_flag_.reset();
 | 
					  query_flag_.reset();
 | 
				
			||||||
  value_iter_.reset();
 | 
					  value_iter_.reset();
 | 
				
			||||||
  if (NULL != query_engine_iter_) {
 | 
					  if (NULL != query_engine_iter_) {
 | 
				
			||||||
 | 
				
			|||||||
@ -95,10 +95,11 @@ class ObMvccValueIterator
 | 
				
			|||||||
{
 | 
					{
 | 
				
			||||||
public:
 | 
					public:
 | 
				
			||||||
  ObMvccValueIterator()
 | 
					  ObMvccValueIterator()
 | 
				
			||||||
      : is_inited_(false),
 | 
					    : is_inited_(false),
 | 
				
			||||||
        ctx_(NULL),
 | 
					    ctx_(NULL),
 | 
				
			||||||
        value_(NULL),
 | 
					    value_(NULL),
 | 
				
			||||||
        version_iter_(NULL)
 | 
					    memtable_ls_id_(),
 | 
				
			||||||
 | 
					    version_iter_(NULL)
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  virtual ~ObMvccValueIterator() {}
 | 
					  virtual ~ObMvccValueIterator() {}
 | 
				
			||||||
@ -106,6 +107,7 @@ public:
 | 
				
			|||||||
  int init(ObMvccAccessCtx &ctx,
 | 
					  int init(ObMvccAccessCtx &ctx,
 | 
				
			||||||
           const ObMemtableKey *key,
 | 
					           const ObMemtableKey *key,
 | 
				
			||||||
           ObMvccRow *value,
 | 
					           ObMvccRow *value,
 | 
				
			||||||
 | 
					           const share::ObLSID memtable_ls_id,
 | 
				
			||||||
           const ObQueryFlag &query_flag);
 | 
					           const ObQueryFlag &query_flag);
 | 
				
			||||||
  OB_INLINE bool is_exist()
 | 
					  OB_INLINE bool is_exist()
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
@ -117,6 +119,7 @@ public:
 | 
				
			|||||||
    is_inited_ = false;
 | 
					    is_inited_ = false;
 | 
				
			||||||
    ctx_ = NULL;
 | 
					    ctx_ = NULL;
 | 
				
			||||||
    value_ = NULL;
 | 
					    value_ = NULL;
 | 
				
			||||||
 | 
					    memtable_ls_id_.reset();
 | 
				
			||||||
    version_iter_ = NULL;
 | 
					    version_iter_ = NULL;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  int check_row_locked(storage::ObStoreRowLockState &lock_state);
 | 
					  int check_row_locked(storage::ObStoreRowLockState &lock_state);
 | 
				
			||||||
@ -138,7 +141,7 @@ public:
 | 
				
			|||||||
  transaction::ObTransID get_reader_tx_id() const { return ctx_->tx_id_; }
 | 
					  transaction::ObTransID get_reader_tx_id() const { return ctx_->tx_id_; }
 | 
				
			||||||
  transaction::ObTransID get_snapshot_tx_id() const { return ctx_->snapshot_.tx_id_; }
 | 
					  transaction::ObTransID get_snapshot_tx_id() const { return ctx_->snapshot_.tx_id_; }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  TO_STRING_KV(KPC_(value), KPC_(version_iter), KPC_(ctx));
 | 
					  TO_STRING_KV(KPC_(value), KPC_(version_iter), KPC_(ctx), K_(memtable_ls_id));
 | 
				
			||||||
private:
 | 
					private:
 | 
				
			||||||
  int lock_for_read_(const ObQueryFlag &flag);
 | 
					  int lock_for_read_(const ObQueryFlag &flag);
 | 
				
			||||||
  int lock_for_read_inner_(const ObQueryFlag &flag, ObMvccTransNode *&iter);
 | 
					  int lock_for_read_inner_(const ObQueryFlag &flag, ObMvccTransNode *&iter);
 | 
				
			||||||
@ -153,6 +156,7 @@ private:
 | 
				
			|||||||
  bool is_inited_;
 | 
					  bool is_inited_;
 | 
				
			||||||
  ObMvccAccessCtx *ctx_;
 | 
					  ObMvccAccessCtx *ctx_;
 | 
				
			||||||
  ObMvccRow *value_;
 | 
					  ObMvccRow *value_;
 | 
				
			||||||
 | 
					  share::ObLSID memtable_ls_id_;
 | 
				
			||||||
  ObMvccTransNode *version_iter_;
 | 
					  ObMvccTransNode *version_iter_;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -167,6 +171,7 @@ public:
 | 
				
			|||||||
  int init(ObQueryEngine &query_engine,
 | 
					  int init(ObQueryEngine &query_engine,
 | 
				
			||||||
           ObMvccAccessCtx &ctx,
 | 
					           ObMvccAccessCtx &ctx,
 | 
				
			||||||
           const ObMvccScanRange &range,
 | 
					           const ObMvccScanRange &range,
 | 
				
			||||||
 | 
					           const share::ObLSID memtable_ls_id,
 | 
				
			||||||
           const ObQueryFlag &query_flag);
 | 
					           const ObQueryFlag &query_flag);
 | 
				
			||||||
  int get_next_row(const ObMemtableKey *&key,
 | 
					  int get_next_row(const ObMemtableKey *&key,
 | 
				
			||||||
                   ObMvccValueIterator *&value_iter,
 | 
					                   ObMvccValueIterator *&value_iter,
 | 
				
			||||||
@ -187,6 +192,7 @@ private:
 | 
				
			|||||||
private:
 | 
					private:
 | 
				
			||||||
  bool is_inited_;
 | 
					  bool is_inited_;
 | 
				
			||||||
  ObMvccAccessCtx *ctx_;
 | 
					  ObMvccAccessCtx *ctx_;
 | 
				
			||||||
 | 
					  share::ObLSID memtable_ls_id_;
 | 
				
			||||||
  ObQueryFlag query_flag_;
 | 
					  ObQueryFlag query_flag_;
 | 
				
			||||||
  ObMvccValueIterator value_iter_;
 | 
					  ObMvccValueIterator value_iter_;
 | 
				
			||||||
  ObQueryEngine *query_engine_;
 | 
					  ObQueryEngine *query_engine_;
 | 
				
			||||||
 | 
				
			|||||||
@ -682,6 +682,7 @@ int ObMemtable::exist(
 | 
				
			|||||||
  } else if (OB_FAIL(mvcc_engine_.get(context.store_ctx_->mvcc_acc_ctx_,
 | 
					  } else if (OB_FAIL(mvcc_engine_.get(context.store_ctx_->mvcc_acc_ctx_,
 | 
				
			||||||
                                      query_flag,
 | 
					                                      query_flag,
 | 
				
			||||||
                                      ¶meter_mtk,
 | 
					                                      ¶meter_mtk,
 | 
				
			||||||
 | 
					                                      ls_id_,
 | 
				
			||||||
                                      NULL,
 | 
					                                      NULL,
 | 
				
			||||||
                                      value_iter,
 | 
					                                      value_iter,
 | 
				
			||||||
                                      lock_state))) {
 | 
					                                      lock_state))) {
 | 
				
			||||||
@ -814,6 +815,7 @@ int ObMemtable::get(
 | 
				
			|||||||
    } else if (OB_FAIL(mvcc_engine_.get(context.store_ctx_->mvcc_acc_ctx_,
 | 
					    } else if (OB_FAIL(mvcc_engine_.get(context.store_ctx_->mvcc_acc_ctx_,
 | 
				
			||||||
                                        context.query_flag_,
 | 
					                                        context.query_flag_,
 | 
				
			||||||
                                        ¶meter_mtk,
 | 
					                                        ¶meter_mtk,
 | 
				
			||||||
 | 
					                                        ls_id_,
 | 
				
			||||||
                                        &returned_mtk,
 | 
					                                        &returned_mtk,
 | 
				
			||||||
                                        value_iter,
 | 
					                                        value_iter,
 | 
				
			||||||
                                        lock_state))) {
 | 
					                                        lock_state))) {
 | 
				
			||||||
 | 
				
			|||||||
@ -254,6 +254,7 @@ int ObMemtableScanIterator::prepare_scan()
 | 
				
			|||||||
  ObMemtableKey* start_key = NULL;
 | 
					  ObMemtableKey* start_key = NULL;
 | 
				
			||||||
  ObMemtableKey* end_key = NULL;
 | 
					  ObMemtableKey* end_key = NULL;
 | 
				
			||||||
  const ObColDescIArray *out_cols = nullptr;
 | 
					  const ObColDescIArray *out_cols = nullptr;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (OB_UNLIKELY(!is_inited_)) {
 | 
					  if (OB_UNLIKELY(!is_inited_)) {
 | 
				
			||||||
    ret = OB_NOT_INIT;
 | 
					    ret = OB_NOT_INIT;
 | 
				
			||||||
  } else if (is_scan_start_) {
 | 
					  } else if (is_scan_start_) {
 | 
				
			||||||
@ -273,6 +274,9 @@ int ObMemtableScanIterator::prepare_scan()
 | 
				
			|||||||
  } else if (OB_FAIL(ObMemtableKey::build(
 | 
					  } else if (OB_FAIL(ObMemtableKey::build(
 | 
				
			||||||
              end_key, *out_cols, &range.get_end_key().get_store_rowkey(), *context_->get_range_allocator()))) {
 | 
					              end_key, *out_cols, &range.get_end_key().get_store_rowkey(), *context_->get_range_allocator()))) {
 | 
				
			||||||
    TRANS_LOG(WARN, "end key build fail", K(param_->table_id_), K(range));
 | 
					    TRANS_LOG(WARN, "end key build fail", K(param_->table_id_), K(range));
 | 
				
			||||||
 | 
					  } else if (OB_ISNULL(memtable_)) {
 | 
				
			||||||
 | 
					    ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
 | 
					    TRANS_LOG(WARN, "fail to get memtable", K(ret));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    ObMvccEngine& mvcc_engine = ((ObMemtable*)memtable_)->get_mvcc_engine();
 | 
					    ObMvccEngine& mvcc_engine = ((ObMemtable*)memtable_)->get_mvcc_engine();
 | 
				
			||||||
    ObMvccScanRange mvcc_scan_range;
 | 
					    ObMvccScanRange mvcc_scan_range;
 | 
				
			||||||
@ -283,6 +287,7 @@ int ObMemtableScanIterator::prepare_scan()
 | 
				
			|||||||
    if (OB_FAIL(mvcc_engine.scan(context_->store_ctx_->mvcc_acc_ctx_,
 | 
					    if (OB_FAIL(mvcc_engine.scan(context_->store_ctx_->mvcc_acc_ctx_,
 | 
				
			||||||
                                 context_->query_flag_,
 | 
					                                 context_->query_flag_,
 | 
				
			||||||
                                 mvcc_scan_range,
 | 
					                                 mvcc_scan_range,
 | 
				
			||||||
 | 
					                                 memtable_->get_ls_id(),
 | 
				
			||||||
                                 row_iter_))) {
 | 
					                                 row_iter_))) {
 | 
				
			||||||
      TRANS_LOG(WARN, "mvcc engine scan fail", K(ret), K(mvcc_scan_range));
 | 
					      TRANS_LOG(WARN, "mvcc engine scan fail", K(ret), K(mvcc_scan_range));
 | 
				
			||||||
    } else if (OB_FAIL(bitmap_.init(read_info_->get_request_count(), read_info_->get_schema_rowkey_count()))) {
 | 
					    } else if (OB_FAIL(bitmap_.init(read_info_->get_request_count(), read_info_->get_schema_rowkey_count()))) {
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user