1098 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1098 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#include <gmock/gmock.h>
 | 
						|
#include "../../clog/mock_ob_partition_log_service.h"
 | 
						|
#include "../mockcontainer/mock_ob_partition_service.h"
 | 
						|
#include "../mockcontainer/mock_ob_end_trans_callback.h"
 | 
						|
#include "../mockcontainer/mock_ob_partition.h"
 | 
						|
#include "../mockcontainer/mock_ob_trans_service.h"
 | 
						|
#include "../mock_ob_partition_component_factory.h"
 | 
						|
#define private public
 | 
						|
#define protected public
 | 
						|
#include "storage/transaction/ob_trans_service.h"
 | 
						|
#include "storage/transaction/ob_trans_define.h"
 | 
						|
#include "clog/ob_partition_log_service.h"
 | 
						|
#include "lib/stat/ob_session_stat.h"
 | 
						|
#include "share/ob_errno.h"
 | 
						|
#include "lib/oblog/ob_log.h"
 | 
						|
#include "common/ob_partition_key.h"
 | 
						|
#include "storage/transaction/ob_trans_rpc.h"
 | 
						|
#include "lib/objectpool/ob_resource_pool.h"
 | 
						|
#include "lib/container/ob_array_iterator.h"
 | 
						|
#include "sql/resolver/ob_stmt_type.h"
 | 
						|
#include "sql/ob_sql_define.h"
 | 
						|
 | 
						|
namespace oceanbase {
 | 
						|
 | 
						|
using namespace common;
 | 
						|
using namespace transaction;
 | 
						|
using namespace storage;
 | 
						|
using namespace share;
 | 
						|
using namespace obrpc;
 | 
						|
using namespace memtable;
 | 
						|
using namespace sql;
 | 
						|
using namespace clog;
 | 
						|
 | 
						|
namespace unittest {
 | 
						|
 | 
						|
LeaderActiveArg leader_active_arg;
 | 
						|
 | 
						|
// Mock transaction module ObRpcProxy associated with ObTransRpc.
 | 
						|
// (1) Packaage sent does not go through the network communication.
 | 
						|
// (2) OB_SUCCESS is returned for the function post_trans_msg.
 | 
						|
class MockObTransRpc : public ObITransRpc, common::ObSimpleThreadPool {
 | 
						|
public:
 | 
						|
  MockObTransRpc() : trans_service_(NULL)
 | 
						|
  {}
 | 
						|
  virtual ~MockObTransRpc()
 | 
						|
  {
 | 
						|
    ObSimpleThreadPool::destroy();
 | 
						|
  }
 | 
						|
  int init(ObTransService* trans_service, const ObAddr& self)
 | 
						|
  {
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
    UNUSED(self);
 | 
						|
 | 
						|
    if (OB_SUCCESS != (ret = ObSimpleThreadPool::init(get_cpu_num() / 2, 100000))) {
 | 
						|
      TRANS_LOG(WARN, "ObSimpleThreadPool init error.", K(ret));
 | 
						|
    } else {
 | 
						|
      trans_service_ = trans_service;
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
 | 
						|
  void handle(void* task)
 | 
						|
  {
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
 | 
						|
    TransRpcTask* rpc_task = static_cast<TransRpcTask*>(task);
 | 
						|
    ObTransRpcResult result;
 | 
						|
 | 
						|
    if (NULL == trans_service_) {
 | 
						|
      TRANS_LOG(WARN, "tranaction service is null");
 | 
						|
    } else if (NULL == rpc_task) {
 | 
						|
      TRANS_LOG(WARN, "tranaction rpc task is null");
 | 
						|
    } else if (OB_SUCCESS != (ret = trans_service_->handle_trans_msg(rpc_task->get_msg(), result))) {
 | 
						|
      TRANS_LOG(WARN, "handle transaction message error", K(ret));
 | 
						|
    } else {
 | 
						|
      TRANS_LOG(INFO, "transaction message handle success", K(rpc_task->get_msg()));
 | 
						|
    }
 | 
						|
 | 
						|
    if (NULL != rpc_task) {
 | 
						|
      op_reclaim_free(rpc_task);
 | 
						|
      rpc_task = NULL;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int init(obrpc::ObTransRpcProxy* rpc_proxy, ObTransService* trans_service, const common::ObAddr& self)
 | 
						|
  {
 | 
						|
    UNUSED(rpc_proxy);
 | 
						|
    UNUSED(trans_service);
 | 
						|
    UNUSED(self);
 | 
						|
    start();
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int start()
 | 
						|
  {
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual void stop()
 | 
						|
  {}
 | 
						|
  virtual void wait()
 | 
						|
  {}
 | 
						|
  virtual void destroy()
 | 
						|
  {
 | 
						|
    trans_service_ = NULL;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int post_trans_msg(const common::ObAddr& server, const ObTransMsg& msg, const int64_t msg_type)
 | 
						|
  {
 | 
						|
    UNUSED(server);
 | 
						|
    UNUSED(msg_type);
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
 | 
						|
    TransRpcTask* rpc_task = NULL;
 | 
						|
    if (NULL == (rpc_task = op_reclaim_alloc(TransRpcTask))) {
 | 
						|
      ret = OB_ERR_UNEXPECTED;
 | 
						|
    } else {
 | 
						|
      if (OB_FAIL(rpc_task->init(msg, ObTransRetryTaskType::ERROR_MSG_TASK))) {
 | 
						|
        TRANS_LOG(WARN, "rpc task init error", K(ret), "rpc_task", *rpc_task);
 | 
						|
      } else if (OB_SUCCESS != (ret = push(rpc_task))) {
 | 
						|
        TRANS_LOG(WARN, "push task error", K(ret));
 | 
						|
        op_reclaim_free(rpc_task);
 | 
						|
        rpc_task = NULL;
 | 
						|
      } else {
 | 
						|
        TRANS_LOG(DEBUG, "transaction message push success", K(msg));
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
  virtual int post_trans_msg(
 | 
						|
      const uint64_t tenant_id, const common::ObAddr& server, const ObTransMsg& msg, const int64_t msg_type)
 | 
						|
  {
 | 
						|
    UNUSED(tenant_id);
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
 | 
						|
    if (OB_SUCCESS != (ret = post_trans_msg(server, msg, msg_type))) {
 | 
						|
      TRANS_LOG(WARN, "post transaction message error", K(ret));
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
  virtual int post_trans_msg(
 | 
						|
      const uint64_t tenant_id, const common::ObAddr& server, const ObTrxMsgBase& msg, const int64_t msg_type)
 | 
						|
  {
 | 
						|
    UNUSED(tenant_id);
 | 
						|
    UNUSED(server);
 | 
						|
    UNUSED(msg);
 | 
						|
    UNUSED(msg_type);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int post_batch_msg(const uint64_t tenant_id, const ObAddr& server, const obrpc::ObIFill& msg,
 | 
						|
      const int64_t msg_type, const ObPartitionKey& pkey)
 | 
						|
  {
 | 
						|
    UNUSED(tenant_id);
 | 
						|
    UNUSED(server);
 | 
						|
    UNUSED(msg);
 | 
						|
    UNUSED(msg_type);
 | 
						|
    UNUSED(pkey);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int post_trans_resp_msg(const uint64_t tenant_id, const ObAddr& server, const ObTransMsg& msg)
 | 
						|
  {
 | 
						|
    UNUSED(tenant_id);
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
 | 
						|
    if (OB_SUCCESS != (ret = post_trans_msg(server, msg, msg.get_msg_type()))) {
 | 
						|
      TRANS_LOG(WARN, "post transaction message error", K(ret));
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
 | 
						|
private:
 | 
						|
  ObTransService* trans_service_;
 | 
						|
};
 | 
						|
 | 
						|
class MockObLocationAdapter : public ObILocationAdapter {
 | 
						|
public:
 | 
						|
  MockObLocationAdapter()
 | 
						|
  {}
 | 
						|
  ~MockObLocationAdapter()
 | 
						|
  {}
 | 
						|
  int init(ObIPartitionLocationCache* location_cache, share::schema::ObMultiVersionSchemaService* schema_service)
 | 
						|
  {
 | 
						|
    UNUSED(location_cache);
 | 
						|
    UNUSED(schema_service);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  void destroy()
 | 
						|
  {}
 | 
						|
  int nonblock_get_strong_leader(const ObPartitionKey& partition, ObAddr& server)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    server = self_;
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  int get_strong_leader(const ObPartitionKey& partition, ObAddr& server)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    server = self_;
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  int nonblock_renew(const ObPartitionKey& partition, const int64_t expire_renew_time)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    UNUSED(expire_renew_time);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  int nonblock_get(const uint64_t table_id, const int64_t partition_id, share::ObPartitionLocation& location)
 | 
						|
  {
 | 
						|
    UNUSED(table_id);
 | 
						|
    UNUSED(partition_id);
 | 
						|
    UNUSED(location);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  void set_self(const ObAddr& self)
 | 
						|
  {
 | 
						|
    self_ = self;
 | 
						|
  }
 | 
						|
 | 
						|
private:
 | 
						|
  ObAddr self_;
 | 
						|
};
 | 
						|
 | 
						|
struct MySubmitLogTask {
 | 
						|
  MySubmitLogTask() : cb(NULL)
 | 
						|
  {}
 | 
						|
  ~MySubmitLogTask()
 | 
						|
  {}
 | 
						|
  ObITransSubmitLogCb* cb;
 | 
						|
  ObPartitionKey partition;
 | 
						|
};
 | 
						|
 | 
						|
class MockObClogAdapter : public ObIClogAdapter, public ObSimpleThreadPool {
 | 
						|
public:
 | 
						|
  MockObClogAdapter()
 | 
						|
  {}
 | 
						|
  ~MockObClogAdapter()
 | 
						|
  {
 | 
						|
    destroy();
 | 
						|
  }
 | 
						|
  int init(ObPartitionService* partition_service, const ObAddr& self)
 | 
						|
  {
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
 | 
						|
    UNUSED(partition_service);
 | 
						|
    UNUSED(self);
 | 
						|
 | 
						|
    if (OB_SUCCESS != (ret = ObSimpleThreadPool::init(8, 100000))) {
 | 
						|
      TRANS_LOG(WARN, "ObSimpleThreadPool init error", K(ret));
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int init(storage::ObPartitionService* partition_service)
 | 
						|
  {
 | 
						|
    UNUSED(partition_service);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int get_log_id_timestamp(const ObPartitionKey& partition, const int64_t prepare_version,
 | 
						|
      storage::ObIPartitionGroup* pg, ObLogMeta& log_meta)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    UNUSED(prepare_version);
 | 
						|
    UNUSED(pg);
 | 
						|
    UNUSED(log_meta);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int submit_log_id_alloc_task(const int64_t log_type, ObTransCtx* ctx)
 | 
						|
  {
 | 
						|
    UNUSED(log_type);
 | 
						|
    UNUSED(ctx);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int submit_rollback_trans_task(ObTransCtx* ctx)
 | 
						|
  {
 | 
						|
    UNUSED(ctx);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int submit_log(const common::ObPartitionKey& partition, const common::ObVersion& version, const char* buff,
 | 
						|
      const int64_t size, const uint64_t log_id, const int64_t ts, ObITransSubmitLogCb* cb)
 | 
						|
  {
 | 
						|
    UNUSED(version);
 | 
						|
    UNUSED(log_id);
 | 
						|
    UNUSED(ts);
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
 | 
						|
    if (OB_FAIL(submit_log(partition, buff, size, cb))) {
 | 
						|
      TRANS_LOG(WARN, "MockObClogAdapter submit log error", K(ret));
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int submit_log(const common::ObPartitionKey& partition, const common::ObVersion& version, const char* buff,
 | 
						|
      const int64_t size, ObITransSubmitLogCb* cb, storage::ObIPartitionGroup* pg, uint64_t& cur_log_id,
 | 
						|
      int64_t& cur_log_timestamp)
 | 
						|
  {
 | 
						|
    UNUSED(version);
 | 
						|
    UNUSED(cur_log_id);
 | 
						|
    UNUSED(cur_log_timestamp);
 | 
						|
    UNUSED(pg);
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
 | 
						|
    if (OB_SUCCESS != (ret = submit_log(partition, buff, size, cb))) {
 | 
						|
      TRANS_LOG(WARN, "MockObClogAdapter submit log error", K(ret));
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int submit_log(const common::ObPartitionKey& partition, const common::ObVersion& version, const char* buff,
 | 
						|
      const int64_t size, const int64_t base_ts, ObITransSubmitLogCb* cb, storage::ObIPartitionGroup* pg,
 | 
						|
      uint64_t& cur_log_id, int64_t& cur_log_timestamp)
 | 
						|
  {
 | 
						|
    UNUSED(version);
 | 
						|
    UNUSED(base_ts);
 | 
						|
    UNUSED(pg);
 | 
						|
    UNUSED(cur_log_id);
 | 
						|
    UNUSED(cur_log_timestamp);
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
 | 
						|
    if (OB_SUCCESS != (ret = submit_log(partition, buff, size, cb))) {
 | 
						|
      TRANS_LOG(WARN, "MockObClogAdapter submit log error", K(ret));
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
  virtual int submit_aggre_log(const common::ObPartitionKey& partition, const common::ObVersion& version,
 | 
						|
      const char* buff, const int64_t size, ObITransSubmitLogCb* cb, storage::ObIPartitionGroup* pg,
 | 
						|
      const int64_t base_ts, ObTransLogBufferAggreContainer& container)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    UNUSED(version);
 | 
						|
    UNUSED(buff);
 | 
						|
    UNUSED(size);
 | 
						|
    UNUSED(cb);
 | 
						|
    UNUSED(pg);
 | 
						|
    UNUSED(base_ts);
 | 
						|
    UNUSED(container);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int flush_aggre_log(const common::ObPartitionKey& partition, const common::ObVersion& version,
 | 
						|
      storage::ObIPartitionGroup* pg, ObTransLogBufferAggreContainer& container)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    UNUSED(version);
 | 
						|
    UNUSED(pg);
 | 
						|
    UNUSED(container);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  int batch_submit_log(const transaction::ObTransID& trans_id, const common::ObPartitionArray& partition_array,
 | 
						|
      const clog::ObLogInfoArray& log_info_array, const clog::ObISubmitLogCbArray& cb_array)
 | 
						|
  {
 | 
						|
    UNUSED(trans_id);
 | 
						|
    UNUSED(partition_array);
 | 
						|
    UNUSED(log_info_array);
 | 
						|
    UNUSED(cb_array);
 | 
						|
    return common::OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  int submit_log_task(const common::ObPartitionKey& partition, const common::ObVersion& version, const char* buff,
 | 
						|
      const int64_t size, const bool with_need_update_version, const int64_t local_trans_version,
 | 
						|
      const bool with_base_ts, const int64_t base_ts, ObITransSubmitLogCb* cb)
 | 
						|
  {
 | 
						|
    UNUSED(version);
 | 
						|
    UNUSED(with_need_update_version);
 | 
						|
    UNUSED(local_trans_version);
 | 
						|
    UNUSED(with_base_ts);
 | 
						|
    UNUSED(base_ts);
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
 | 
						|
    if (OB_SUCCESS != (ret = submit_log(partition, buff, size, cb))) {
 | 
						|
      TRANS_LOG(WARN, "MockObClogAdapter submit log error", K(ret));
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
 | 
						|
  int submit_log_task(const common::ObPartitionKey& partition, const common::ObVersion& version, const char* buff,
 | 
						|
      const int64_t size, ObITransSubmitLogCb* cb)
 | 
						|
  {
 | 
						|
    UNUSED(version);
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
 | 
						|
    if (OB_SUCCESS != (ret = submit_log(partition, buff, size, cb))) {
 | 
						|
      TRANS_LOG(WARN, "MockObClogAdapter submit log error", K(ret));
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
  int start()
 | 
						|
  {
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  void stop()
 | 
						|
  {}
 | 
						|
  void wait()
 | 
						|
  {}
 | 
						|
 | 
						|
  void destroy()
 | 
						|
  {
 | 
						|
    ObSimpleThreadPool::destroy();
 | 
						|
  }
 | 
						|
 | 
						|
  int get_status(const common::ObPartitionKey& partition, const bool check_election, int& clog_status,
 | 
						|
      int64_t& leader_epoch, common::ObTsWindows& changing_leader_windows)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    UNUSED(check_election);
 | 
						|
    UNUSED(changing_leader_windows);
 | 
						|
    UNUSED(leader_epoch);
 | 
						|
    clog_status = OB_SUCCESS;
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  int get_status(storage::ObIPartitionGroup* partition, const bool check_election, int& clog_status,
 | 
						|
      int64_t& leader_epoch, common::ObTsWindows& changing_leader_windows)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    UNUSED(check_election);
 | 
						|
    UNUSED(changing_leader_windows);
 | 
						|
    UNUSED(leader_epoch);
 | 
						|
    clog_status = OB_SUCCESS;
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  int get_status_unsafe(const common::ObPartitionKey& partition, int& clog_status, int64_t& leader_epoch,
 | 
						|
      common::ObTsWindows& changing_leader_windows)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    UNUSED(changing_leader_windows);
 | 
						|
    UNUSED(leader_epoch);
 | 
						|
    clog_status = OB_SUCCESS;
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  int submit_log(const ObPartitionKey& partition, const char* buff, const int64_t size, ObITransSubmitLogCb* cb)
 | 
						|
  {
 | 
						|
    UNUSED(buff);
 | 
						|
    UNUSED(size);
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
    MySubmitLogTask* task = NULL;
 | 
						|
 | 
						|
    if (NULL == (task = op_reclaim_alloc(MySubmitLogTask))) {
 | 
						|
      TRANS_LOG(WARN, "new MySubmitLogTask error");
 | 
						|
      ret = OB_ERR_UNEXPECTED;
 | 
						|
    } else {
 | 
						|
      task->cb = cb;
 | 
						|
      task->partition = partition;
 | 
						|
      if (OB_SUCCESS != (ret = push(task))) {
 | 
						|
        TRANS_LOG(WARN, "push task error", K(ret));
 | 
						|
      } else {
 | 
						|
        TRANS_LOG(DEBUG, "push task success");
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int backfill_nop_log(
 | 
						|
      const common::ObPartitionKey& partition, storage::ObIPartitionGroup* pg, const ObLogMeta& log_meta)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    UNUSED(pg);
 | 
						|
    UNUSED(log_meta);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int submit_backfill_nop_log_task(const common::ObPartitionKey& partition, const ObLogMeta& log_meta)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    UNUSED(log_meta);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int submit_big_trans_callback_task(const common::ObPartitionKey& partition, const int64_t log_type,
 | 
						|
      const uint64_t log_id, const int64_t log_timestamp, ObTransCtx* ctx)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    UNUSED(log_type);
 | 
						|
    UNUSED(log_id);
 | 
						|
    UNUSED(log_timestamp);
 | 
						|
    UNUSED(ctx);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int get_last_submit_timestamp(const common::ObPartitionKey& partition, int64_t& timestamp)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    UNUSED(timestamp);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  void handle(void* task)
 | 
						|
  {
 | 
						|
    static int64_t log_id;
 | 
						|
    MySubmitLogTask* log_task = static_cast<MySubmitLogTask*>(task);
 | 
						|
    clog::ObLogType log_type = clog::OB_LOG_SUBMIT;
 | 
						|
    log_task->cb->on_success(log_task->partition, log_type, ++log_id, ObClockGenerator::getClock(), false, false);
 | 
						|
    TRANS_LOG(DEBUG, "handle log task sucess");
 | 
						|
    op_reclaim_free(log_task);
 | 
						|
  }
 | 
						|
  bool can_start_trans()
 | 
						|
  {
 | 
						|
    return true;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
class MyMockObPartitionLogService : public MockPartitionLogService {
 | 
						|
public:
 | 
						|
  int submit_log(const char* buff, const int64_t size, const storage::ObStorageLogType log_type,
 | 
						|
      const common::ObVersion& version, ObITransSubmitLogCb* cb)
 | 
						|
  {
 | 
						|
    UNUSED(buff);
 | 
						|
    UNUSED(cb);
 | 
						|
    UNUSED(log_type);
 | 
						|
    UNUSED(version);
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
    if (1 == size) {
 | 
						|
      TRANS_LOG(WARN, "invalid argument", K(size));
 | 
						|
      ret = OB_INVALID_ARGUMENT;
 | 
						|
    } else {
 | 
						|
      TRANS_LOG(INFO, "submit log success.");
 | 
						|
    }
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
class MyMockObPartitionComponentFactory : public MockObIPartitionComponentFactory {
 | 
						|
public:
 | 
						|
  virtual void free(ObIPartitionGroup* partition)
 | 
						|
  {
 | 
						|
    UNUSED(partition);
 | 
						|
    // delete partition;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
class MyMockObPartition : public MockObIPartitionGroup {
 | 
						|
public:
 | 
						|
  MyMockObPartition() : partition_log_service_(NULL), pg_file_(NULL), pkey_(nullptr)
 | 
						|
  {}
 | 
						|
  virtual ~MyMockObPartition()
 | 
						|
  {}
 | 
						|
  void set_pkey(const ObPartitionKey* pkey)
 | 
						|
  {
 | 
						|
    pkey_ = const_cast<ObPartitionKey*>(pkey);
 | 
						|
  }
 | 
						|
  void set_log_service(ObIPartitionLogService* log_service)
 | 
						|
  {
 | 
						|
    partition_log_service_ = log_service;
 | 
						|
  }
 | 
						|
  virtual blocksstable::ObStorageFile* get_storage_file()
 | 
						|
  {
 | 
						|
    return pg_file_;
 | 
						|
  }
 | 
						|
  virtual const blocksstable::ObStorageFile* get_storage_file() const
 | 
						|
  {
 | 
						|
    return pg_file_;
 | 
						|
  }
 | 
						|
  virtual blocksstable::ObStorageFileHandle& get_storage_file_handle()
 | 
						|
  {
 | 
						|
    return file_handle_;
 | 
						|
  }
 | 
						|
  virtual int remove_election_from_mgr()
 | 
						|
  {
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  // get partition log service
 | 
						|
  ObIPartitionLogService* get_log_service()
 | 
						|
  {
 | 
						|
    return partition_log_service_;
 | 
						|
  }
 | 
						|
  const ObPartitionKey& get_partition_key() const
 | 
						|
  {
 | 
						|
    return *pkey_;
 | 
						|
  }
 | 
						|
 | 
						|
private:
 | 
						|
  ObIPartitionLogService* partition_log_service_;
 | 
						|
  blocksstable::ObStorageFile* pg_file_;
 | 
						|
  blocksstable::ObStorageFileHandle file_handle_;
 | 
						|
  ObPartitionKey* pkey_;
 | 
						|
};
 | 
						|
 | 
						|
class MyMockObPartitionService : public MockObIPartitionService {
 | 
						|
public:
 | 
						|
  MyMockObPartitionService() : partition_(NULL)
 | 
						|
  {
 | 
						|
    partition_ = new MyMockObPartition();
 | 
						|
    cp_fty_ = new MyMockObPartitionComponentFactory();
 | 
						|
  }
 | 
						|
  virtual ~MyMockObPartitionService()
 | 
						|
  {
 | 
						|
    if (NULL != partition_) {
 | 
						|
      delete partition_;
 | 
						|
      delete cp_fty_;
 | 
						|
      partition_ = NULL;
 | 
						|
      cp_fty_ = NULL;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  int get_partition(const common::ObPartitionKey& pkey, ObIPartitionGroup*& partition) const
 | 
						|
  {
 | 
						|
    MyMockObPartitionLogService* log_service = NULL;
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
    if (!pkey.is_valid()) {
 | 
						|
      partition_->set_pkey(&pkey);
 | 
						|
      partition = partition_;
 | 
						|
      partition_->set_log_service(new MyMockObPartitionLogService());
 | 
						|
      TRANS_LOG(WARN, "invalid argument, pkey is invalid.", K(pkey));
 | 
						|
      ret = OB_INVALID_ARGUMENT;
 | 
						|
    } else if (1 == pkey.table_id_) {
 | 
						|
      TRANS_LOG(INFO, "get partition success.", K(pkey.table_id_));
 | 
						|
      partition_->set_pkey(&pkey);
 | 
						|
      partition = partition_;
 | 
						|
      partition_->set_log_service(new MyMockObPartitionLogService());
 | 
						|
    } else if (2 == pkey.table_id_) {
 | 
						|
      TRANS_LOG(WARN, "invalid argument, get partition error.", K(pkey.table_id_));
 | 
						|
      partition_->set_pkey(&pkey);
 | 
						|
      partition = partition_;
 | 
						|
      partition_->set_log_service(new MyMockObPartitionLogService());
 | 
						|
      // ret = OB_ERR_UNEXPECTED;
 | 
						|
    } else if (3 == pkey.table_id_) {
 | 
						|
      TRANS_LOG(INFO, "get partition success.", K(pkey.table_id_));
 | 
						|
      partition_->set_pkey(&pkey);
 | 
						|
      partition = partition_;
 | 
						|
      partition_->set_log_service(new MyMockObPartitionLogService());
 | 
						|
    } else if (4 == pkey.table_id_) {
 | 
						|
      TRANS_LOG(INFO, "get partition success.", K(pkey.table_id_));
 | 
						|
      partition_->set_pkey(&pkey);
 | 
						|
      partition = partition_;
 | 
						|
      partition_->set_log_service(new MyMockObPartitionLogService());
 | 
						|
    } else {
 | 
						|
      partition_->set_pkey(&pkey);
 | 
						|
      partition = partition_;
 | 
						|
      partition_->set_log_service(new MyMockObPartitionLogService());
 | 
						|
      TRANS_LOG(INFO, "test info", K(pkey));
 | 
						|
      // do nothing.
 | 
						|
    }
 | 
						|
    if (NULL != log_service) {
 | 
						|
      delete log_service;
 | 
						|
      log_service = NULL;
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
  int get_partition(const common::ObPartitionKey& pkey, ObIPartitionGroupGuard& guard) const
 | 
						|
  {
 | 
						|
    int ret = common::OB_SUCCESS;
 | 
						|
    ObIPartitionGroup* partition = NULL;
 | 
						|
    if (OB_FAIL(get_partition(pkey, partition))) {
 | 
						|
      STORAGE_LOG(WARN, "get partition failed", K(pkey), K(ret));
 | 
						|
    } else {
 | 
						|
      guard.set_partition_group(this->get_pg_mgr(), *partition);
 | 
						|
    }
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
  virtual int get_curr_member_list(const common::ObPartitionKey& pkey, common::ObMemberList& member_list) const
 | 
						|
  {
 | 
						|
    UNUSED(pkey);
 | 
						|
    UNUSED(member_list);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
private:
 | 
						|
  MyMockObPartition* partition_;
 | 
						|
};
 | 
						|
 | 
						|
// define local_ip
 | 
						|
static const char* LOCAL_IP = "127.0.0.1";
 | 
						|
static const int32_t PORT = 8080;
 | 
						|
static const ObAddr::VER IP_TYPE = ObAddr::IPV4;
 | 
						|
 | 
						|
class TestPerformance : public ::testing::Test {
 | 
						|
public:
 | 
						|
  TestPerformance() : self_(IP_TYPE, LOCAL_IP, PORT), sp_trans_(false), access_mod_(ObTransAccessMode::UNKNOWN)
 | 
						|
  {}
 | 
						|
  virtual ~TestPerformance()
 | 
						|
  {
 | 
						|
    destroy();
 | 
						|
  }
 | 
						|
  virtual void SetUp()
 | 
						|
  {
 | 
						|
    init();
 | 
						|
  }
 | 
						|
  virtual void TearDown()
 | 
						|
  {}
 | 
						|
  int init();
 | 
						|
  void destroy();
 | 
						|
 | 
						|
public:
 | 
						|
  int smoke();
 | 
						|
  int get_random_partition(ObPartitionLeaderArray& array);
 | 
						|
  void set_sp_trans(const bool sp_trans)
 | 
						|
  {
 | 
						|
    sp_trans_ = sp_trans;
 | 
						|
  }
 | 
						|
  void set_access_mod(const int32_t access_mod)
 | 
						|
  {
 | 
						|
    access_mod_ = access_mod;
 | 
						|
  }
 | 
						|
 | 
						|
private:
 | 
						|
  int do_trans();
 | 
						|
 | 
						|
private:
 | 
						|
  static const uint64_t TENANT_ID = 1234;
 | 
						|
  static const int32_t PARTITION_COUNT_PER_TABLE = 5;
 | 
						|
  static const int32_t MAX_TABLE = 10;
 | 
						|
  static const int32_t TABLE_NUMBER_START = 20000;
 | 
						|
  int generate_participant_arr_guard_(const ObPartitionArray& participants, ObIPartitionArrayGuard& pkey_guard_arr);
 | 
						|
 | 
						|
private:
 | 
						|
  ObAddr self_;
 | 
						|
  MockObTransRpc rpc_;
 | 
						|
  share::schema::ObMultiVersionSchemaService* schema_service_;
 | 
						|
  MockObLocationAdapter location_adapter_;
 | 
						|
  MockObClogAdapter clog_adapter_;
 | 
						|
  ObTransService trans_service_;
 | 
						|
  MyMockObPartitionService partition_service_;
 | 
						|
  ObPartitionArray partition_array_;
 | 
						|
  ObAddrArray addr_array_;
 | 
						|
  bool sp_trans_;
 | 
						|
  int32_t access_mod_;
 | 
						|
  ObLtsSource lts_source_;
 | 
						|
  MockObTsMgr* ts_mgr_;
 | 
						|
};
 | 
						|
 | 
						|
int TestPerformance::init()
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  ts_mgr_ = new MockObTsMgr(lts_source_);
 | 
						|
  // ObKVGlobalCache::get_instance().init();
 | 
						|
  oceanbase::common::ObClusterVersion::get_instance().refresh_cluster_version("1.3.0");
 | 
						|
  schema_service_ = new share::schema::ObMultiVersionSchemaService();
 | 
						|
  rpc_.init(&trans_service_, self_);
 | 
						|
  location_adapter_.init(NULL, schema_service_);
 | 
						|
  location_adapter_.set_self(self_);
 | 
						|
  clog_adapter_.init(&partition_service_, self_);
 | 
						|
  trans_service_.init(self_, &rpc_, &location_adapter_, &clog_adapter_, &partition_service_, schema_service_, ts_mgr_);
 | 
						|
 | 
						|
  trans_service_.start();
 | 
						|
 | 
						|
  for (int32_t i = 0; i < MAX_TABLE; i++) {
 | 
						|
    for (int32_t j = 0; j < PARTITION_COUNT_PER_TABLE; j++) {
 | 
						|
      ObPartitionKey partition(combine_id(TENANT_ID, i + TABLE_NUMBER_START), j, PARTITION_COUNT_PER_TABLE);
 | 
						|
      partition_array_.push_back(partition);
 | 
						|
      addr_array_.push_back(self_);
 | 
						|
      trans_service_.add_partition(partition);
 | 
						|
      trans_service_.leader_takeover(partition, leader_active_arg);
 | 
						|
      trans_service_.leader_active(partition, leader_active_arg);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  sp_trans_ = true;
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
void TestPerformance::destroy()
 | 
						|
{
 | 
						|
  delete schema_service_;
 | 
						|
  delete ts_mgr_;
 | 
						|
}
 | 
						|
 | 
						|
int TestPerformance::generate_participant_arr_guard_(
 | 
						|
    const ObPartitionArray& participants, ObIPartitionArrayGuard& pkey_guard_arr)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  pkey_guard_arr.set_pg_mgr(partition_service_.get_pg_mgr());
 | 
						|
 | 
						|
  for (int64_t i = 0; OB_SUCC(ret) && i < participants.count(); ++i) {
 | 
						|
    const ObPartitionKey& pkey = participants.at(i);
 | 
						|
    ObIPartitionGroupGuard pkey_guard;
 | 
						|
    if (OB_FAIL(partition_service_.get_partition(pkey, pkey_guard))) {
 | 
						|
      STORAGE_LOG(WARN, "get partition failed", K(pkey), K(ret));
 | 
						|
    } else if (NULL == pkey_guard.get_partition_group()) {
 | 
						|
      ret = OB_ERR_UNEXPECTED;
 | 
						|
      STORAGE_LOG(WARN, "get partition failed", K(pkey), K(ret));
 | 
						|
    } else if (OB_FAIL(pkey_guard_arr.push_back(pkey_guard.get_partition_group()))) {
 | 
						|
      STORAGE_LOG(WARN, "pkey guard push back error", K(ret), K(i), K(participants));
 | 
						|
    } else {
 | 
						|
      STORAGE_LOG(INFO, "pkey guard : ", K(pkey_guard.get_partition_group()->get_partition_key()), K(pkey));
 | 
						|
      // do nothing
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int TestPerformance::do_trans()
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  const uint64_t tenant_id = TENANT_ID;
 | 
						|
  const uint64_t thread_id = 100;
 | 
						|
  const int64_t cur_ts = ObClockGenerator::getClock();
 | 
						|
  const int64_t trans_expired_time = cur_ts + 10000000;
 | 
						|
  const int64_t stmt_expired_time = cur_ts + 1000000;
 | 
						|
  ObStartTransParam trans_param;
 | 
						|
  ObTransDesc trans_desc;
 | 
						|
  ObPartitionLeaderArray pla;
 | 
						|
  ObPartitionArray out;
 | 
						|
  ObStmtDesc& stmt_desc = trans_desc.get_cur_stmt_desc();
 | 
						|
  MockObEndTransCallback cb_;
 | 
						|
  // non-sp trans, each participant executes tree sql requests
 | 
						|
  static const int64_t STMT_PER_TRANS = 3;
 | 
						|
 | 
						|
  if (access_mod_ == ObTransAccessMode::READ_ONLY) {
 | 
						|
    trans_param.set_access_mode(ObTransAccessMode::READ_ONLY);
 | 
						|
    stmt_desc.stmt_type_ = stmt::T_SELECT;
 | 
						|
    stmt_desc.is_sfu_ = false;
 | 
						|
  } else {
 | 
						|
    trans_param.set_access_mode(ObTransAccessMode::READ_WRITE);
 | 
						|
    stmt_desc.stmt_type_ = stmt::T_UPDATE;
 | 
						|
    stmt_desc.is_sfu_ = false;
 | 
						|
  }
 | 
						|
  trans_param.set_type(ObTransType::TRANS_USER);
 | 
						|
  trans_param.set_isolation(ObTransIsolation::READ_COMMITED);
 | 
						|
  trans_param.set_autocommit(true);
 | 
						|
  trans_param.set_cluster_version(GET_MIN_CLUSTER_VERSION());
 | 
						|
 | 
						|
  ObStmtParam stmt_param;
 | 
						|
 | 
						|
  stmt_desc.phy_plan_type_ = OB_PHY_PLAN_LOCAL;
 | 
						|
  stmt_desc.consistency_level_ = ObTransConsistencyLevel::STRONG;
 | 
						|
 | 
						|
  if (OB_SUCCESS != (ret = get_random_partition(pla))) {
 | 
						|
    TRANS_LOG(WARN, "get random partition error", K(ret));
 | 
						|
  } else if (OB_SUCCESS != (ret = trans_service_.start_trans(
 | 
						|
                                tenant_id, thread_id, trans_param, trans_expired_time, 0, 0, trans_desc))) {
 | 
						|
    TRANS_LOG(WARN, "start trans error", K(ret));
 | 
						|
  } else {
 | 
						|
    const int64_t stmt_count = sp_trans_ ? 1 : STMT_PER_TRANS;
 | 
						|
    for (int64_t i = 0; OB_SUCC(ret) && i < stmt_count; ++i) {
 | 
						|
      if (OB_FAIL(stmt_param.init(tenant_id, stmt_expired_time, false))) {
 | 
						|
        TRANS_LOG(WARN, "ObStmtParam init error", K(ret), K(stmt_desc));
 | 
						|
      } else if (OB_SUCCESS != (ret = trans_service_.start_stmt(stmt_param, trans_desc, pla, out))) {
 | 
						|
        TRANS_LOG(WARN, "start stmt error", K(ret));
 | 
						|
      } else {
 | 
						|
        ObPartitionEpochArray partition_epoch_arr;
 | 
						|
        ObIPartitionArrayGuard pkey_guard_arr;
 | 
						|
        generate_participant_arr_guard_(pla.get_partitions(), pkey_guard_arr);
 | 
						|
        ObPartitionArray participants;
 | 
						|
        if (OB_SUCCESS != (ret = trans_service_.start_participant(
 | 
						|
                               trans_desc, pla.get_partitions(), partition_epoch_arr, pkey_guard_arr))) {
 | 
						|
          TRANS_LOG(WARN, "start participant error", K(ret));
 | 
						|
        } else if (OB_SUCCESS !=
 | 
						|
                   (ret = trans_service_.end_participant(false /*is_rollback*/, trans_desc, pla.get_partitions()))) {
 | 
						|
          TRANS_LOG(WARN, "end participant error", K(ret));
 | 
						|
        } else {
 | 
						|
          // do nothing
 | 
						|
        }
 | 
						|
        const bool is_rollback = (OB_SUCCESS == ret) ? false : true;
 | 
						|
        // overwrite retcode
 | 
						|
        ObPartitionEpochArray epoch_arr;
 | 
						|
        bool incomplete = false;
 | 
						|
        if (OB_SUCCESS !=
 | 
						|
            (ret = trans_service_.end_stmt(
 | 
						|
                 is_rollback, incomplete, pla.get_partitions(), epoch_arr, participants, pla, trans_desc))) {
 | 
						|
          TRANS_LOG(WARN, "end stmt error", K(ret));
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
    const bool is_rollback = (OB_SUCCESS == ret) ? false : true;
 | 
						|
    if (OB_SUCCESS != (ret = trans_service_.end_trans(is_rollback, trans_desc, cb_, stmt_expired_time))) {
 | 
						|
      TRANS_LOG(WARN, "end trans error", K(ret));
 | 
						|
    } else {
 | 
						|
      ret = cb_.wait();
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int TestPerformance::smoke()
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  const int64_t MAX_LOOP = 1000;
 | 
						|
 | 
						|
  for (int64_t loop = 0; OB_SUCC(ret) && loop < MAX_LOOP; ++loop) {
 | 
						|
    ret = do_trans();
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int TestPerformance::get_random_partition(ObPartitionLeaderArray& array)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  int64_t count = 0;
 | 
						|
 | 
						|
  if (sp_trans_) {
 | 
						|
    count = 1;
 | 
						|
  } else {
 | 
						|
    count = (rand() % 15) + 2;
 | 
						|
  }
 | 
						|
  array.reset();
 | 
						|
  // Any two of participants of a sentence are not allowed to be the same.
 | 
						|
  for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) {
 | 
						|
    // const int64_t idx = ObRandom::rand(0, partition_array_.count() - 1);
 | 
						|
    const int64_t idx = i;
 | 
						|
    ret = array.push(partition_array_[idx], addr_array_[idx]);
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
static void* thr_fn(void* arg)
 | 
						|
{
 | 
						|
  TestPerformance* p = reinterpret_cast<TestPerformance*>(arg);
 | 
						|
  p->smoke();
 | 
						|
  p->rpc_.destroy();
 | 
						|
  return (void*)0;
 | 
						|
}
 | 
						|
//////////////////////test///////////////////////////
 | 
						|
// test read only transactions without concurrency
 | 
						|
TEST_F(TestPerformance, sp_readonly_performance)
 | 
						|
{
 | 
						|
  TRANS_LOG(INFO, "called", "func", test_info_->name());
 | 
						|
  set_sp_trans(true);
 | 
						|
  set_access_mod(ObTransAccessMode::READ_ONLY);
 | 
						|
  const int64_t start = ObClockGenerator::getClock();
 | 
						|
  EXPECT_EQ(OB_SUCCESS, smoke());
 | 
						|
  const int64_t end = ObClockGenerator::getClock();
 | 
						|
  TRANS_LOG(INFO, "sp_readonly_performance statistic", "total_used", end - start);
 | 
						|
}
 | 
						|
 | 
						|
// test ready only transactions wiht concurrency
 | 
						|
TEST_F(TestPerformance, sp_readonly_concurrent_performance)
 | 
						|
{
 | 
						|
  TRANS_LOG(INFO, "called", "func", test_info_->name());
 | 
						|
 | 
						|
  const int64_t THREAD_NUM = get_cpu_num() * 2;
 | 
						|
  pthread_t tids[THREAD_NUM];
 | 
						|
 | 
						|
  set_sp_trans(true);
 | 
						|
  set_access_mod(ObTransAccessMode::READ_ONLY);
 | 
						|
 | 
						|
  const int64_t start = ObClockGenerator::getClock();
 | 
						|
  for (int64_t i = 0; i < THREAD_NUM; ++i) {
 | 
						|
    EXPECT_EQ(0, pthread_create(&tids[i], NULL, thr_fn, this));
 | 
						|
  }
 | 
						|
 | 
						|
  // collect resources of threads
 | 
						|
  for (int64_t i = 0; i < THREAD_NUM; ++i) {
 | 
						|
    pthread_join(tids[i], NULL);
 | 
						|
  }
 | 
						|
  const int64_t end = ObClockGenerator::getClock();
 | 
						|
  TRANS_LOG(INFO, "sp_readonly_concurrent_performance statistic", "total_used", end - start);
 | 
						|
}
 | 
						|
 | 
						|
// test read-only transactions without concurrency
 | 
						|
TEST_F(TestPerformance, mp_readonly_performance)
 | 
						|
{
 | 
						|
  TRANS_LOG(INFO, "called", "func", test_info_->name());
 | 
						|
  set_sp_trans(false);
 | 
						|
  set_access_mod(ObTransAccessMode::READ_ONLY);
 | 
						|
  const int64_t start = ObClockGenerator::getClock();
 | 
						|
  EXPECT_EQ(OB_SUCCESS, smoke());
 | 
						|
  const int64_t end = ObClockGenerator::getClock();
 | 
						|
  TRANS_LOG(INFO, "mp_readonly_performance statistic", "total_used", end - start);
 | 
						|
}
 | 
						|
 | 
						|
// test read-only transactions with concurrency
 | 
						|
TEST_F(TestPerformance, mp_readonly_concurrent_performance)
 | 
						|
{
 | 
						|
  TRANS_LOG(INFO, "called", "func", test_info_->name());
 | 
						|
 | 
						|
  const int64_t THREAD_NUM = get_cpu_num() * 2;
 | 
						|
  pthread_t tids[THREAD_NUM];
 | 
						|
 | 
						|
  set_sp_trans(false);
 | 
						|
  set_access_mod(ObTransAccessMode::READ_ONLY);
 | 
						|
 | 
						|
  const int64_t start = ObClockGenerator::getClock();
 | 
						|
  for (int64_t i = 0; i < THREAD_NUM; ++i) {
 | 
						|
    EXPECT_EQ(0, pthread_create(&tids[i], NULL, thr_fn, this));
 | 
						|
  }
 | 
						|
 | 
						|
  // collect resources of threads
 | 
						|
  for (int64_t i = 0; i < THREAD_NUM; ++i) {
 | 
						|
    pthread_join(tids[i], NULL);
 | 
						|
  }
 | 
						|
  const int64_t end = ObClockGenerator::getClock();
 | 
						|
  TRANS_LOG(INFO, "mp_readonly_concurrent_performance statistic", "total_used", end - start);
 | 
						|
}
 | 
						|
 | 
						|
// test read-write transactions without concurrency
 | 
						|
TEST_F(TestPerformance, sp_read_write_performance)
 | 
						|
{
 | 
						|
  TRANS_LOG(INFO, "called", "func", test_info_->name());
 | 
						|
  set_sp_trans(true);
 | 
						|
  set_access_mod(ObTransAccessMode::READ_WRITE);
 | 
						|
  const int64_t start = ObClockGenerator::getClock();
 | 
						|
  EXPECT_EQ(OB_SUCCESS, smoke());
 | 
						|
  const int64_t end = ObClockGenerator::getClock();
 | 
						|
  TRANS_LOG(INFO, "sp_read_write_performance statistic", "total_used", end - start);
 | 
						|
}
 | 
						|
 | 
						|
// test read-write transactions with concurrency
 | 
						|
TEST_F(TestPerformance, sp_read_write_concurrent_performance)
 | 
						|
{
 | 
						|
  TRANS_LOG(INFO, "called", "func", test_info_->name());
 | 
						|
 | 
						|
  const int64_t THREAD_NUM = get_cpu_num() * 2;
 | 
						|
  pthread_t tids[THREAD_NUM];
 | 
						|
 | 
						|
  set_sp_trans(true);
 | 
						|
  set_access_mod(ObTransAccessMode::READ_WRITE);
 | 
						|
 | 
						|
  const int64_t start = ObClockGenerator::getClock();
 | 
						|
  for (int64_t i = 0; i < THREAD_NUM; ++i) {
 | 
						|
    EXPECT_EQ(0, pthread_create(&tids[i], NULL, thr_fn, this));
 | 
						|
  }
 | 
						|
 | 
						|
  // collect resources of threads
 | 
						|
  for (int64_t i = 0; i < THREAD_NUM; ++i) {
 | 
						|
    pthread_join(tids[i], NULL);
 | 
						|
  }
 | 
						|
  const int64_t end = ObClockGenerator::getClock();
 | 
						|
  TRANS_LOG(INFO, "sp_read_write_concurrent_performance statistic", "total_used", end - start);
 | 
						|
}
 | 
						|
 | 
						|
// test read-write transactions without concurrency
 | 
						|
TEST_F(TestPerformance, mp_read_write_performance)
 | 
						|
{
 | 
						|
  TRANS_LOG(INFO, "called", "func", test_info_->name());
 | 
						|
  set_sp_trans(false);
 | 
						|
  set_access_mod(ObTransAccessMode::READ_WRITE);
 | 
						|
  const int64_t start = ObClockGenerator::getClock();
 | 
						|
  EXPECT_EQ(OB_SUCCESS, smoke());
 | 
						|
  const int64_t end = ObClockGenerator::getClock();
 | 
						|
  TRANS_LOG(INFO, "mp_read_write_performance statistic", "total_used", end - start);
 | 
						|
}
 | 
						|
 | 
						|
// test read-write transactions with concurrency
 | 
						|
TEST_F(TestPerformance, mp_read_write_concurrent_performance)
 | 
						|
{
 | 
						|
  TRANS_LOG(INFO, "called", "func", test_info_->name());
 | 
						|
 | 
						|
  const int64_t THREAD_NUM = get_cpu_num() * 2;
 | 
						|
  pthread_t tids[THREAD_NUM];
 | 
						|
 | 
						|
  set_sp_trans(false);
 | 
						|
  set_access_mod(ObTransAccessMode::READ_WRITE);
 | 
						|
 | 
						|
  const int64_t start = ObClockGenerator::getClock();
 | 
						|
  for (int64_t i = 0; i < THREAD_NUM; ++i) {
 | 
						|
    EXPECT_EQ(0, pthread_create(&tids[i], NULL, thr_fn, this));
 | 
						|
  }
 | 
						|
 | 
						|
  // collect resources of threads
 | 
						|
  for (int64_t i = 0; i < THREAD_NUM; ++i) {
 | 
						|
    pthread_join(tids[i], NULL);
 | 
						|
  }
 | 
						|
  const int64_t end = ObClockGenerator::getClock();
 | 
						|
  TRANS_LOG(INFO, "mp_read_write_concurrent_performance statistic", "total_used", end - start);
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace unittest
 | 
						|
}  // namespace oceanbase
 | 
						|
 | 
						|
using namespace oceanbase;
 | 
						|
using namespace oceanbase::common;
 | 
						|
 | 
						|
int main(int argc, char** argv)
 | 
						|
{
 | 
						|
  int ret = 1;
 | 
						|
  ObLogger& logger = ObLogger::get_logger();
 | 
						|
  logger.set_file_name("test_performance.log", true);
 | 
						|
  logger.set_log_level(OB_LOG_LEVEL_INFO);
 | 
						|
  if (OB_SUCCESS != ObClockGenerator::init()) {
 | 
						|
    TRANS_LOG(WARN, "clock generator init error!");
 | 
						|
  } else {
 | 
						|
    testing::InitGoogleTest(&argc, argv);
 | 
						|
    ret = RUN_ALL_TESTS();
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 |