Co-authored-by: Minionyh <minions.yh@gmail.com> Co-authored-by: BinChenn <binchenn.bc@gmail.com> Co-authored-by: ChangerR <dingjinchengyx@163.com>
		
			
				
	
	
		
			379 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			379 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#define USING_LOG_PREFIX STORAGE
 | 
						|
#include <gtest/gtest.h>
 | 
						|
#include "mtlenv/mock_tenant_module_env.h"
 | 
						|
#include "storage/mockcontainer/mock_ob_iterator.h"
 | 
						|
#include "storage/mockcontainer/mock_ob_end_trans_callback.h"
 | 
						|
#include "storage/init_basic_struct.h"
 | 
						|
#include "storage/test_tablet_helper.h"
 | 
						|
#include "share/schema/ob_table_dml_param.h"
 | 
						|
#include "observer/ob_safe_destroy_thread.h"
 | 
						|
#include "storage/ob_dml_running_ctx.h"
 | 
						|
#include "storage/tx/ob_trans_part_ctx.h"
 | 
						|
 | 
						|
namespace oceanbase
 | 
						|
{
 | 
						|
 | 
						|
namespace storage
 | 
						|
{
 | 
						|
int ObDMLRunningCtx::init(
 | 
						|
    const common::ObIArray<uint64_t> *column_ids,
 | 
						|
    const common::ObIArray<uint64_t> *upd_col_ids,
 | 
						|
    ObMultiVersionSchemaService *schema_service,
 | 
						|
    ObTabletHandle &tablet_handle)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
 | 
						|
  if (IS_INIT) {
 | 
						|
    ret = OB_INIT_TWICE;
 | 
						|
    LOG_WARN("init twice", K(ret));
 | 
						|
  } else if (OB_UNLIKELY(!store_ctx_.is_valid())
 | 
						|
      || OB_UNLIKELY(!dml_param_.is_valid())
 | 
						|
      || OB_ISNULL(dml_param_.table_param_)
 | 
						|
      || OB_ISNULL(schema_service)) {
 | 
						|
    ret = OB_INVALID_ARGUMENT;
 | 
						|
    LOG_WARN("invalid argument", K(ret), K(store_ctx_),
 | 
						|
        K(dml_param_), KP(schema_service));
 | 
						|
  } else {
 | 
						|
    const uint64_t tenant_id = MTL_ID();
 | 
						|
    const uint64_t table_id = dml_param_.table_param_->get_data_table().get_table_id();
 | 
						|
    const int64_t version = dml_param_.schema_version_;
 | 
						|
    const int64_t tenant_schema_version = dml_param_.tenant_schema_version_;
 | 
						|
  }
 | 
						|
 | 
						|
  if (OB_FAIL(ret)) {
 | 
						|
    // do nothing
 | 
						|
  } else if (OB_FAIL(prepare_relative_table(
 | 
						|
      dml_param_.table_param_->get_data_table(),
 | 
						|
      tablet_handle,
 | 
						|
      store_ctx_.mvcc_acc_ctx_.get_snapshot_version()))) {
 | 
						|
    LOG_WARN("failed to get relative table", K(ret), K(dml_param_));
 | 
						|
  } else if (NULL != column_ids && OB_FAIL(prepare_column_info(*column_ids))) {
 | 
						|
    LOG_WARN("fail to get column descriptions and column map", K(ret), K(*column_ids));
 | 
						|
  } else {
 | 
						|
    store_ctx_.table_version_ = dml_param_.schema_version_;
 | 
						|
    column_ids_ = column_ids;
 | 
						|
    is_inited_ = true;
 | 
						|
  }
 | 
						|
 | 
						|
  if (IS_NOT_INIT) {
 | 
						|
    relative_table_.destroy();
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
}
 | 
						|
namespace transaction
 | 
						|
{
 | 
						|
 | 
						|
int ObLocationAdapter::get_leader_(const int64_t cluster_id,
 | 
						|
                                   const int64_t tenant_id,
 | 
						|
                                   const ObLSID &ls_id,
 | 
						|
                                   common::ObAddr &leader,
 | 
						|
                                   const bool is_sync)
 | 
						|
{
 | 
						|
  UNUSED(cluster_id);
 | 
						|
  UNUSED(tenant_id);
 | 
						|
  UNUSED(ls_id);
 | 
						|
  UNUSED(is_sync);
 | 
						|
  leader = GCTX.self_addr();
 | 
						|
  return OB_SUCCESS;
 | 
						|
}
 | 
						|
}
 | 
						|
 | 
						|
using namespace storage;
 | 
						|
 | 
						|
class TestTrans : public ::testing::Test
 | 
						|
{
 | 
						|
public:
 | 
						|
  static void SetUpTestCase()
 | 
						|
  {
 | 
						|
    LOG_INFO("SetUpTestCase");
 | 
						|
    ASSERT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init());
 | 
						|
    SAFE_DESTROY_INSTANCE.init();
 | 
						|
    SAFE_DESTROY_INSTANCE.start();
 | 
						|
    ObServerCheckpointSlogHandler::get_instance().is_started_ = true;
 | 
						|
  }
 | 
						|
  static void TearDownTestCase()
 | 
						|
  {
 | 
						|
    LOG_INFO("TearDownTestCase");
 | 
						|
    SAFE_DESTROY_INSTANCE.stop();
 | 
						|
    SAFE_DESTROY_INSTANCE.wait();
 | 
						|
    SAFE_DESTROY_INSTANCE.destroy();
 | 
						|
    MockTenantModuleEnv::get_instance().destroy();
 | 
						|
  }
 | 
						|
 | 
						|
  void create_ls(uint64_t tenant_id, ObLSID &ls_id, ObLS *&ls);
 | 
						|
  void insert_rows(ObLSID &ls_id, ObTabletID &tablet_id, ObTxDesc &tx_desc, ObTxReadSnapshot snapshot, const char* in_str);
 | 
						|
  void prepare_tx_desc(ObTxDesc *&tx_desc, ObTxReadSnapshot &snapshot);
 | 
						|
private:
 | 
						|
  static share::schema::ObTableSchema table_schema_;
 | 
						|
};
 | 
						|
 | 
						|
share::schema::ObTableSchema TestTrans::table_schema_;
 | 
						|
 | 
						|
void TestTrans::create_ls(uint64_t tenant_id, ObLSID &ls_id, ObLS *&ls)
 | 
						|
{
 | 
						|
  LOG_INFO("create log stream");
 | 
						|
  ObCreateLSArg arg;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, gen_create_ls_arg(tenant_id, ls_id, arg));
 | 
						|
  ObLSService* ls_svr = MTL(ObLSService*);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ls_svr->create_ls(arg));
 | 
						|
 | 
						|
  // set member list
 | 
						|
  LOG_INFO("set member list");
 | 
						|
  ObLSHandle handle;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, handle, ObLSGetMod::STORAGE_MOD));
 | 
						|
  ls = handle.get_ls();
 | 
						|
  ObMemberList member_list;
 | 
						|
  int64_t paxos_replica_num = 1;
 | 
						|
  (void) member_list.add_server(MockTenantModuleEnv::get_instance().self_addr_);
 | 
						|
  GlobalLearnerList learner_list;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ls->set_initial_member_list(member_list,
 | 
						|
                                                    paxos_replica_num,
 | 
						|
                                                    learner_list));
 | 
						|
 | 
						|
  // check leader
 | 
						|
  LOG_INFO("check leader");
 | 
						|
  for (int i = 0; i < 15; i++) {
 | 
						|
    ObRole role;
 | 
						|
    int64_t leader_epoch = 0;
 | 
						|
    ASSERT_EQ(OB_SUCCESS, ls->get_log_handler()->get_role(role, leader_epoch));
 | 
						|
    if (role == ObRole::LEADER) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    ::sleep(1);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void TestTrans::insert_rows(ObLSID &ls_id, ObTabletID &tablet_id, ObTxDesc &tx_desc, ObTxReadSnapshot snapshot, const char* ins_str)
 | 
						|
{
 | 
						|
  int64_t affected_rows = 0;
 | 
						|
  ObMockNewRowIterator ins_iter;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ins_iter.from(ins_str));
 | 
						|
 | 
						|
  ObArenaAllocator allocator;
 | 
						|
  share::schema::ObTableDMLParam table_dml_param(allocator);
 | 
						|
 | 
						|
  ObSEArray<const ObTableSchema *, 1> index_schema_array;
 | 
						|
  ObSEArray<uint64_t, 1> column_ids;
 | 
						|
  column_ids.push_back(16); // pk
 | 
						|
  ASSERT_EQ(OB_SUCCESS, table_dml_param.convert(&table_schema_, 1000, column_ids));
 | 
						|
  ObDMLBaseParam dml_param;
 | 
						|
  dml_param.timeout_ = ObTimeUtility::current_time() + 100000000;
 | 
						|
  dml_param.schema_version_ = 1000;
 | 
						|
  dml_param.table_param_ = &table_dml_param;
 | 
						|
  dml_param.snapshot_ = snapshot;
 | 
						|
 | 
						|
  auto das = MTL(ObAccessService*);
 | 
						|
  LOG_INFO("storage access by dml");
 | 
						|
  ASSERT_EQ(OB_SUCCESS, das->insert_rows(ls_id,
 | 
						|
                                         tablet_id,
 | 
						|
                                         tx_desc,
 | 
						|
                                         dml_param,
 | 
						|
                                         column_ids,
 | 
						|
                                         &ins_iter,
 | 
						|
                                         affected_rows));
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
void TestTrans::prepare_tx_desc(ObTxDesc *&tx_desc, ObTxReadSnapshot &snapshot)
 | 
						|
{
 | 
						|
  ObTxParam tx_p;
 | 
						|
  tx_p.timeout_us_ = ObTimeUtility::current_time() + 100000000; // us
 | 
						|
  tx_p.access_mode_ = transaction::ObTxAccessMode::RW;
 | 
						|
  tx_p.isolation_ = transaction::ObTxIsolationLevel::RC;
 | 
						|
  tx_p.cluster_id_ = 1;
 | 
						|
  ObTransService *tx_service = MTL(ObTransService*);
 | 
						|
  // alloc tx_desc
 | 
						|
  ASSERT_EQ(OB_SUCCESS, tx_service->acquire_tx(tx_desc, 100));
 | 
						|
  // init tx_desc and put in map
 | 
						|
  ASSERT_EQ(OB_SUCCESS, tx_service->start_tx(*tx_desc, tx_p));
 | 
						|
 | 
						|
  LOG_INFO("get snapshot for dml");
 | 
						|
  {
 | 
						|
    int64_t expire_ts = ObTimeUtility::current_time() + 100000000;
 | 
						|
    ObTxIsolationLevel isolation = ObTxIsolationLevel::RC;
 | 
						|
    ASSERT_EQ(OB_SUCCESS, tx_service->get_read_snapshot(*tx_desc, isolation, expire_ts, snapshot));
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestTrans, create_ls_and_tablet)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ObCurTraceId::get_trace_id()->set("Y1-1111111111111111"));
 | 
						|
  ObLSID ls_id(100);
 | 
						|
  ObLS *ls = nullptr;
 | 
						|
  uint64_t tenant_id = MTL_ID();
 | 
						|
  create_ls(tenant_id, ls_id, ls);
 | 
						|
 | 
						|
  LOG_INFO("create tablet");
 | 
						|
  obrpc::ObBatchCreateTabletArg create_tablet_arg;
 | 
						|
  ObTabletID tablet_id(1001);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, gen_create_tablet_arg(tenant_id, ls_id, tablet_id, create_tablet_arg, 1, &table_schema_));
 | 
						|
  ObLSTabletService *ls_tablet_svr = ls->get_tablet_svr();
 | 
						|
  ASSERT_EQ(OB_SUCCESS, TestTabletHelper::create_tablet(*ls_tablet_svr, create_tablet_arg));
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestTrans, basic)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  uint64_t tenant_id = MTL_ID();
 | 
						|
  // create log stream
 | 
						|
  ObLSID ls_id(100);
 | 
						|
  ObTabletID tablet_id(1001);
 | 
						|
 | 
						|
  LOG_INFO("start transaction");
 | 
						|
  ObTxDesc *tx_desc = NULL;
 | 
						|
  ObTxReadSnapshot snapshot;
 | 
						|
  prepare_tx_desc(tx_desc, snapshot);
 | 
						|
  // prepare insert param
 | 
						|
  const char *ins_str =
 | 
						|
      "bigint    dml          \n"
 | 
						|
      "50        T_DML_INSERT \n";
 | 
						|
  insert_rows(ls_id, tablet_id, *tx_desc, snapshot, ins_str);
 | 
						|
 | 
						|
  ObTransService *tx_service = MTL(ObTransService*);
 | 
						|
  ObPartTransCtx *part_ctx;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, tx_service->tx_ctx_mgr_.get_tx_ctx(ls_id, tx_desc->tx_id_, false, part_ctx));
 | 
						|
  ASSERT_EQ(OB_SUCCESS, tx_service->tx_ctx_mgr_.revert_tx_ctx(part_ctx));
 | 
						|
 | 
						|
  ObLSHandle ls_handle;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD));
 | 
						|
  ObTabletHandle tablet_handle;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tablet_svr()->get_tablet(tablet_id, tablet_handle, 0));
 | 
						|
  ObIMemtableMgr *mt_mgr = tablet_handle.get_obj()->get_memtable_mgr();
 | 
						|
  ASSERT_EQ(1, mt_mgr->get_memtable_count_());
 | 
						|
  ObTableHandleV2 mt_handle;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, mt_mgr->get_active_memtable(mt_handle));
 | 
						|
  memtable::ObMemtable *mt;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, mt_handle.get_data_memtable(mt));
 | 
						|
 | 
						|
  printf("mt size=%ld occ_size=%ld bree_item_count=%ld rec_scn=%s end_scn=%s\n", mt->get_size(), mt->get_occupied_size(), mt->get_btree_item_count(), to_cstring(mt->get_rec_scn()),to_cstring(mt->get_max_end_scn()));
 | 
						|
 | 
						|
  /*
 | 
						|
  ObStoreRow store_row;
 | 
						|
  store_row.flag_.set_flag(ObDmlFlag::DF_INSERT);
 | 
						|
  ObNewRow *new_row;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ins_iter.get_next_row(new_row));
 | 
						|
  store_row.row_val_ = *new_row;
 | 
						|
  ObStoreRowkey tmp_key;
 | 
						|
  memtable::ObMemtableKey mtk;
 | 
						|
 | 
						|
  ASSERT_EQ(OB_SUCCESS, tmp_key.assign(store_row.row_val_.cells_, 1));
 | 
						|
  ASSERT_EQ(OB_SUCCESS, mtk.encode(columns, &tmp_key));
 | 
						|
  mt->get_query_engine().get();
 | 
						|
  */
 | 
						|
 | 
						|
  memtable::ObTransCallbackMgr &trans_mgr = part_ctx->mt_ctx_.trans_mgr_;
 | 
						|
  printf("part_ctx trans_mgr:%ld\n",trans_mgr.callback_list_.get_length());
 | 
						|
  auto cb = trans_mgr.callback_list_.head_.next_;
 | 
						|
  for (int i = 0; i < trans_mgr.callback_list_.get_length(); i++) {
 | 
						|
    memtable::ObMvccRowCallback *row_cb = dynamic_cast<memtable::ObMvccRowCallback*>(cb);
 | 
						|
    if (row_cb != nullptr) {
 | 
						|
      printf("mvcc row: ptr=%p node_cnt=%ld\n", &row_cb->value_, row_cb->value_.get_total_trans_node_cnt());
 | 
						|
    } else {
 | 
						|
      ObOBJLockCallback *tl_cb = dynamic_cast<ObOBJLockCallback*>(cb);
 | 
						|
      if (tl_cb != nullptr) {
 | 
						|
        printf("table lock node: lock_id=%ld op_type=%c\n", tl_cb->lock_op_->lock_op_.lock_id_.obj_id_, tl_cb->lock_op_->lock_op_.op_type_);
 | 
						|
      } else {
 | 
						|
        abort();
 | 
						|
      }
 | 
						|
    }
 | 
						|
    cb = cb->next_;
 | 
						|
  }
 | 
						|
  LOG_INFO("commit transaction");
 | 
						|
  ASSERT_EQ(OB_SUCCESS, tx_service->commit_tx(*tx_desc, ObTimeUtility::current_time() + 100000000));
 | 
						|
 | 
						|
  LOG_INFO("release transaction");
 | 
						|
  tx_service->release_tx(*tx_desc);
 | 
						|
 | 
						|
  //ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->remove_ls(ls_id, false));
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestTrans, dist_trans)
 | 
						|
{
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ObCurTraceId::get_trace_id()->set("Y2-2222222222222222222222"));
 | 
						|
  uint64_t tenant_id = MTL_ID();
 | 
						|
  ObLSID ls_id(100);
 | 
						|
  ObTabletID tablet_id(1001);
 | 
						|
 | 
						|
  ObLSID ls_id2(101);
 | 
						|
  ObLS *ls2 = nullptr;
 | 
						|
  create_ls(tenant_id, ls_id2, ls2);
 | 
						|
 | 
						|
  // create tablet
 | 
						|
  LOG_INFO("create tablet");
 | 
						|
  obrpc::ObBatchCreateTabletArg create_tablet_arg;
 | 
						|
  ObTabletID tablet_id2(1002);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, gen_create_tablet_arg(tenant_id, ls_id2, tablet_id2, create_tablet_arg, 1, &table_schema_));
 | 
						|
 | 
						|
  ObLSTabletService *ls_tablet_svr = ls2->get_tablet_svr();
 | 
						|
  ASSERT_EQ(OB_SUCCESS, TestTabletHelper::create_tablet(*ls_tablet_svr, create_tablet_arg));
 | 
						|
 | 
						|
 | 
						|
  LOG_INFO("start transaction");
 | 
						|
  ObTxDesc *tx_desc = NULL;
 | 
						|
  ObTxReadSnapshot snapshot;
 | 
						|
  prepare_tx_desc(tx_desc, snapshot);
 | 
						|
 | 
						|
  const char *ins_str =
 | 
						|
      "bigint    dml          \n"
 | 
						|
      "100        T_DML_INSERT \n";
 | 
						|
  insert_rows(ls_id, tablet_id, *tx_desc, snapshot, ins_str);
 | 
						|
 | 
						|
  const char *ins_str2 =
 | 
						|
      "bigint    dml          \n"
 | 
						|
      "200        T_DML_INSERT \n";
 | 
						|
  insert_rows(ls_id2, tablet_id2, *tx_desc, snapshot, ins_str2);
 | 
						|
 | 
						|
  ObTransService *tx_service = MTL(ObTransService*);
 | 
						|
  LOG_INFO("commit transaction");
 | 
						|
  ASSERT_EQ(OB_SUCCESS, tx_service->commit_tx(*tx_desc, ObTimeUtility::current_time() + 10000000000));
 | 
						|
 | 
						|
  LOG_INFO("release transaction");
 | 
						|
  tx_service->release_tx(*tx_desc);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
TEST_F(TestTrans, freeze)
 | 
						|
{
 | 
						|
  ObLSID ls_id(100);
 | 
						|
  ObLSHandle ls_handle;
 | 
						|
  ObLS *ls;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD));
 | 
						|
  ls = ls_handle.get_ls();
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ls->logstream_freeze());
 | 
						|
}
 | 
						|
*/
 | 
						|
 | 
						|
TEST_F(TestTrans, remove_ls)
 | 
						|
{
 | 
						|
  ObLSID ls_id(100);
 | 
						|
  ObLSID ls_id2(101);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->remove_ls(ls_id, false));
 | 
						|
  ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->remove_ls(ls_id2, false));
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
} // end oceanbase
 | 
						|
 | 
						|
int main(int argc, char **argv)
 | 
						|
{
 | 
						|
  system("rm -rf test_trans.log*");
 | 
						|
  OB_LOGGER.set_file_name("test_trans.log",true, false, "test_trans.log", "test_trans.log");
 | 
						|
  OB_LOGGER.set_log_level("INFO");
 | 
						|
  ::testing::InitGoogleTest(&argc, argv);
 | 
						|
  return RUN_ALL_TESTS();
 | 
						|
}
 |