/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include #include "../../clog/mock_ob_partition_log_service.h" #include "../mockcontainer/mock_ob_partition_service.h" #include "../mockcontainer/mock_ob_end_trans_callback.h" #include "../mockcontainer/mock_ob_partition.h" #include "../mockcontainer/mock_ob_trans_service.h" #include "../mock_ob_partition_component_factory.h" #define private public #define protected public #include "storage/transaction/ob_trans_service.h" #include "storage/transaction/ob_trans_define.h" #include "clog/ob_partition_log_service.h" #include "lib/stat/ob_session_stat.h" #include "share/ob_errno.h" #include "lib/oblog/ob_log.h" #include "common/ob_partition_key.h" #include "storage/transaction/ob_trans_rpc.h" #include "lib/objectpool/ob_resource_pool.h" #include "lib/container/ob_array_iterator.h" #include "sql/resolver/ob_stmt_type.h" #include "sql/ob_sql_define.h" namespace oceanbase { using namespace common; using namespace transaction; using namespace storage; using namespace share; using namespace obrpc; using namespace memtable; using namespace sql; using namespace clog; namespace unittest { LeaderActiveArg leader_active_arg; // Mock transaction module ObRpcProxy associated with ObTransRpc. // (1) Packaage sent does not go through the network communication. // (2) OB_SUCCESS is returned for the function post_trans_msg. class MockObTransRpc : public ObITransRpc, common::ObSimpleThreadPool { public: MockObTransRpc() : trans_service_(NULL) {} virtual ~MockObTransRpc() { ObSimpleThreadPool::destroy(); } int init(ObTransService* trans_service, const ObAddr& self) { int ret = OB_SUCCESS; UNUSED(self); if (OB_SUCCESS != (ret = ObSimpleThreadPool::init(get_cpu_num() / 2, 100000))) { TRANS_LOG(WARN, "ObSimpleThreadPool init error.", K(ret)); } else { trans_service_ = trans_service; } return ret; } void handle(void* task) { int ret = OB_SUCCESS; TransRpcTask* rpc_task = static_cast(task); ObTransRpcResult result; if (NULL == trans_service_) { TRANS_LOG(WARN, "tranaction service is null"); } else if (NULL == rpc_task) { TRANS_LOG(WARN, "tranaction rpc task is null"); } else if (OB_SUCCESS != (ret = trans_service_->handle_trans_msg(rpc_task->get_msg(), result))) { TRANS_LOG(WARN, "handle transaction message error", K(ret)); } else { TRANS_LOG(INFO, "transaction message handle success", K(rpc_task->get_msg())); } if (NULL != rpc_task) { op_reclaim_free(rpc_task); rpc_task = NULL; } } virtual int init(obrpc::ObTransRpcProxy* rpc_proxy, ObTransService* trans_service, const common::ObAddr& self) { UNUSED(rpc_proxy); UNUSED(trans_service); UNUSED(self); start(); return OB_SUCCESS; } virtual int start() { return OB_SUCCESS; } virtual void stop() {} virtual void wait() {} virtual void destroy() { trans_service_ = NULL; } virtual int post_trans_msg(const common::ObAddr& server, const ObTransMsg& msg, const int64_t msg_type) { UNUSED(server); UNUSED(msg_type); int ret = OB_SUCCESS; TransRpcTask* rpc_task = NULL; if (NULL == (rpc_task = op_reclaim_alloc(TransRpcTask))) { ret = OB_ERR_UNEXPECTED; } else { if (OB_FAIL(rpc_task->init(msg, ObTransRetryTaskType::ERROR_MSG_TASK))) { TRANS_LOG(WARN, "rpc task init error", K(ret), "rpc_task", *rpc_task); } else if (OB_SUCCESS != (ret = push(rpc_task))) { TRANS_LOG(WARN, "push task error", K(ret)); op_reclaim_free(rpc_task); rpc_task = NULL; } else { TRANS_LOG(DEBUG, "transaction message push success", K(msg)); } } return ret; } virtual int post_trans_msg( const uint64_t tenant_id, const common::ObAddr& server, const ObTransMsg& msg, const int64_t msg_type) { UNUSED(tenant_id); int ret = OB_SUCCESS; if (OB_SUCCESS != (ret = post_trans_msg(server, msg, msg_type))) { TRANS_LOG(WARN, "post transaction message error", K(ret)); } return ret; } virtual int post_trans_msg( const uint64_t tenant_id, const common::ObAddr& server, const ObTrxMsgBase& msg, const int64_t msg_type) { UNUSED(tenant_id); UNUSED(server); UNUSED(msg); UNUSED(msg_type); return OB_SUCCESS; } virtual int post_batch_msg(const uint64_t tenant_id, const ObAddr& server, const obrpc::ObIFill& msg, const int64_t msg_type, const ObPartitionKey& pkey) { UNUSED(tenant_id); UNUSED(server); UNUSED(msg); UNUSED(msg_type); UNUSED(pkey); return OB_SUCCESS; } virtual int post_trans_resp_msg(const uint64_t tenant_id, const ObAddr& server, const ObTransMsg& msg) { UNUSED(tenant_id); int ret = OB_SUCCESS; if (OB_SUCCESS != (ret = post_trans_msg(server, msg, msg.get_msg_type()))) { TRANS_LOG(WARN, "post transaction message error", K(ret)); } return ret; } private: ObTransService* trans_service_; }; class MockObLocationAdapter : public ObILocationAdapter { public: MockObLocationAdapter() {} ~MockObLocationAdapter() {} int init(ObIPartitionLocationCache* location_cache, share::schema::ObMultiVersionSchemaService* schema_service) { UNUSED(location_cache); UNUSED(schema_service); return OB_SUCCESS; } void destroy() {} int nonblock_get_strong_leader(const ObPartitionKey& partition, ObAddr& server) { UNUSED(partition); server = self_; return OB_SUCCESS; } int get_strong_leader(const ObPartitionKey& partition, ObAddr& server) { UNUSED(partition); server = self_; return OB_SUCCESS; } int nonblock_renew(const ObPartitionKey& partition, const int64_t expire_renew_time) { UNUSED(partition); UNUSED(expire_renew_time); return OB_SUCCESS; } int nonblock_get(const uint64_t table_id, const int64_t partition_id, share::ObPartitionLocation& location) { UNUSED(table_id); UNUSED(partition_id); UNUSED(location); return OB_SUCCESS; } void set_self(const ObAddr& self) { self_ = self; } private: ObAddr self_; }; struct MySubmitLogTask { MySubmitLogTask() : cb(NULL) {} ~MySubmitLogTask() {} ObITransSubmitLogCb* cb; ObPartitionKey partition; }; class MockObClogAdapter : public ObIClogAdapter, public ObSimpleThreadPool { public: MockObClogAdapter() {} ~MockObClogAdapter() { destroy(); } int init(ObPartitionService* partition_service, const ObAddr& self) { int ret = OB_SUCCESS; UNUSED(partition_service); UNUSED(self); if (OB_SUCCESS != (ret = ObSimpleThreadPool::init(8, 100000))) { TRANS_LOG(WARN, "ObSimpleThreadPool init error", K(ret)); } return ret; } virtual int init(storage::ObPartitionService* partition_service) { UNUSED(partition_service); return OB_SUCCESS; } virtual int get_log_id_timestamp(const ObPartitionKey& partition, const int64_t prepare_version, storage::ObIPartitionGroup* pg, ObLogMeta& log_meta) { UNUSED(partition); UNUSED(prepare_version); UNUSED(pg); UNUSED(log_meta); return OB_SUCCESS; } virtual int submit_log_id_alloc_task(const int64_t log_type, ObTransCtx* ctx) { UNUSED(log_type); UNUSED(ctx); return OB_SUCCESS; } virtual int submit_rollback_trans_task(ObTransCtx* ctx) { UNUSED(ctx); return OB_SUCCESS; } virtual int submit_log(const common::ObPartitionKey& partition, const common::ObVersion& version, const char* buff, const int64_t size, const uint64_t log_id, const int64_t ts, ObITransSubmitLogCb* cb) { UNUSED(version); UNUSED(log_id); UNUSED(ts); int ret = OB_SUCCESS; if (OB_FAIL(submit_log(partition, buff, size, cb))) { TRANS_LOG(WARN, "MockObClogAdapter submit log error", K(ret)); } return ret; } virtual int submit_log(const common::ObPartitionKey& partition, const common::ObVersion& version, const char* buff, const int64_t size, ObITransSubmitLogCb* cb, storage::ObIPartitionGroup* pg, uint64_t& cur_log_id, int64_t& cur_log_timestamp) { UNUSED(version); UNUSED(cur_log_id); UNUSED(cur_log_timestamp); UNUSED(pg); int ret = OB_SUCCESS; if (OB_SUCCESS != (ret = submit_log(partition, buff, size, cb))) { TRANS_LOG(WARN, "MockObClogAdapter submit log error", K(ret)); } return ret; } virtual int submit_log(const common::ObPartitionKey& partition, const common::ObVersion& version, const char* buff, const int64_t size, const int64_t base_ts, ObITransSubmitLogCb* cb, storage::ObIPartitionGroup* pg, uint64_t& cur_log_id, int64_t& cur_log_timestamp) { UNUSED(version); UNUSED(base_ts); UNUSED(pg); UNUSED(cur_log_id); UNUSED(cur_log_timestamp); int ret = OB_SUCCESS; if (OB_SUCCESS != (ret = submit_log(partition, buff, size, cb))) { TRANS_LOG(WARN, "MockObClogAdapter submit log error", K(ret)); } return ret; } virtual int submit_aggre_log(const common::ObPartitionKey& partition, const common::ObVersion& version, const char* buff, const int64_t size, ObITransSubmitLogCb* cb, storage::ObIPartitionGroup* pg, const int64_t base_ts, ObTransLogBufferAggreContainer& container) { UNUSED(partition); UNUSED(version); UNUSED(buff); UNUSED(size); UNUSED(cb); UNUSED(pg); UNUSED(base_ts); UNUSED(container); return OB_SUCCESS; } virtual int flush_aggre_log(const common::ObPartitionKey& partition, const common::ObVersion& version, storage::ObIPartitionGroup* pg, ObTransLogBufferAggreContainer& container) { UNUSED(partition); UNUSED(version); UNUSED(pg); UNUSED(container); return OB_SUCCESS; } int batch_submit_log(const transaction::ObTransID& trans_id, const common::ObPartitionArray& partition_array, const clog::ObLogInfoArray& log_info_array, const clog::ObISubmitLogCbArray& cb_array) { UNUSED(trans_id); UNUSED(partition_array); UNUSED(log_info_array); UNUSED(cb_array); return common::OB_SUCCESS; } int submit_log_task(const common::ObPartitionKey& partition, const common::ObVersion& version, const char* buff, const int64_t size, const bool with_need_update_version, const int64_t local_trans_version, const bool with_base_ts, const int64_t base_ts, ObITransSubmitLogCb* cb) { UNUSED(version); UNUSED(with_need_update_version); UNUSED(local_trans_version); UNUSED(with_base_ts); UNUSED(base_ts); int ret = OB_SUCCESS; if (OB_SUCCESS != (ret = submit_log(partition, buff, size, cb))) { TRANS_LOG(WARN, "MockObClogAdapter submit log error", K(ret)); } return ret; } int submit_log_task(const common::ObPartitionKey& partition, const common::ObVersion& version, const char* buff, const int64_t size, ObITransSubmitLogCb* cb) { UNUSED(version); int ret = OB_SUCCESS; if (OB_SUCCESS != (ret = submit_log(partition, buff, size, cb))) { TRANS_LOG(WARN, "MockObClogAdapter submit log error", K(ret)); } return ret; } int start() { return OB_SUCCESS; } void stop() {} void wait() {} void destroy() { ObSimpleThreadPool::destroy(); } int get_status(const common::ObPartitionKey& partition, const bool check_election, int& clog_status, int64_t& leader_epoch, common::ObTsWindows& changing_leader_windows) { UNUSED(partition); UNUSED(check_election); UNUSED(changing_leader_windows); UNUSED(leader_epoch); clog_status = OB_SUCCESS; return OB_SUCCESS; } int get_status(storage::ObIPartitionGroup* partition, const bool check_election, int& clog_status, int64_t& leader_epoch, common::ObTsWindows& changing_leader_windows) { UNUSED(partition); UNUSED(check_election); UNUSED(changing_leader_windows); UNUSED(leader_epoch); clog_status = OB_SUCCESS; return OB_SUCCESS; } int get_status_unsafe(const common::ObPartitionKey& partition, int& clog_status, int64_t& leader_epoch, common::ObTsWindows& changing_leader_windows) { UNUSED(partition); UNUSED(changing_leader_windows); UNUSED(leader_epoch); clog_status = OB_SUCCESS; return OB_SUCCESS; } int submit_log(const ObPartitionKey& partition, const char* buff, const int64_t size, ObITransSubmitLogCb* cb) { UNUSED(buff); UNUSED(size); int ret = OB_SUCCESS; MySubmitLogTask* task = NULL; if (NULL == (task = op_reclaim_alloc(MySubmitLogTask))) { TRANS_LOG(WARN, "new MySubmitLogTask error"); ret = OB_ERR_UNEXPECTED; } else { task->cb = cb; task->partition = partition; if (OB_SUCCESS != (ret = push(task))) { TRANS_LOG(WARN, "push task error", K(ret)); } else { TRANS_LOG(DEBUG, "push task success"); } } return ret; } virtual int backfill_nop_log( const common::ObPartitionKey& partition, storage::ObIPartitionGroup* pg, const ObLogMeta& log_meta) { UNUSED(partition); UNUSED(pg); UNUSED(log_meta); return OB_SUCCESS; } virtual int submit_backfill_nop_log_task(const common::ObPartitionKey& partition, const ObLogMeta& log_meta) { UNUSED(partition); UNUSED(log_meta); return OB_SUCCESS; } virtual int submit_big_trans_callback_task(const common::ObPartitionKey& partition, const int64_t log_type, const uint64_t log_id, const int64_t log_timestamp, ObTransCtx* ctx) { UNUSED(partition); UNUSED(log_type); UNUSED(log_id); UNUSED(log_timestamp); UNUSED(ctx); return OB_SUCCESS; } virtual int get_last_submit_timestamp(const common::ObPartitionKey& partition, int64_t& timestamp) { UNUSED(partition); UNUSED(timestamp); return OB_SUCCESS; } void handle(void* task) { static int64_t log_id; MySubmitLogTask* log_task = static_cast(task); clog::ObLogType log_type = clog::OB_LOG_SUBMIT; log_task->cb->on_success(log_task->partition, log_type, ++log_id, ObClockGenerator::getClock(), false, false); TRANS_LOG(DEBUG, "handle log task sucess"); op_reclaim_free(log_task); } bool can_start_trans() { return true; } }; class MyMockObPartitionLogService : public MockPartitionLogService { public: int submit_log(const char* buff, const int64_t size, const storage::ObStorageLogType log_type, const common::ObVersion& version, ObITransSubmitLogCb* cb) { UNUSED(buff); UNUSED(cb); UNUSED(log_type); UNUSED(version); int ret = OB_SUCCESS; if (1 == size) { TRANS_LOG(WARN, "invalid argument", K(size)); ret = OB_INVALID_ARGUMENT; } else { TRANS_LOG(INFO, "submit log success."); } return ret; } }; class MyMockObPartitionComponentFactory : public MockObIPartitionComponentFactory { public: virtual void free(ObIPartitionGroup* partition) { UNUSED(partition); // delete partition; } }; class MyMockObPartition : public MockObIPartitionGroup { public: MyMockObPartition() : partition_log_service_(NULL), pg_file_(NULL), pkey_(nullptr) {} virtual ~MyMockObPartition() {} void set_pkey(const ObPartitionKey* pkey) { pkey_ = const_cast(pkey); } void set_log_service(ObIPartitionLogService* log_service) { partition_log_service_ = log_service; } virtual blocksstable::ObStorageFile* get_storage_file() { return pg_file_; } virtual const blocksstable::ObStorageFile* get_storage_file() const { return pg_file_; } virtual blocksstable::ObStorageFileHandle& get_storage_file_handle() { return file_handle_; } virtual int remove_election_from_mgr() { return OB_SUCCESS; } // get partition log service ObIPartitionLogService* get_log_service() { return partition_log_service_; } const ObPartitionKey& get_partition_key() const { return *pkey_; } private: ObIPartitionLogService* partition_log_service_; blocksstable::ObStorageFile* pg_file_; blocksstable::ObStorageFileHandle file_handle_; ObPartitionKey* pkey_; }; class MyMockObPartitionService : public MockObIPartitionService { public: MyMockObPartitionService() : partition_(NULL) { partition_ = new MyMockObPartition(); cp_fty_ = new MyMockObPartitionComponentFactory(); } virtual ~MyMockObPartitionService() { if (NULL != partition_) { delete partition_; delete cp_fty_; partition_ = NULL; cp_fty_ = NULL; } } int get_partition(const common::ObPartitionKey& pkey, ObIPartitionGroup*& partition) const { MyMockObPartitionLogService* log_service = NULL; int ret = OB_SUCCESS; if (!pkey.is_valid()) { partition_->set_pkey(&pkey); partition = partition_; partition_->set_log_service(new MyMockObPartitionLogService()); TRANS_LOG(WARN, "invalid argument, pkey is invalid.", K(pkey)); ret = OB_INVALID_ARGUMENT; } else if (1 == pkey.table_id_) { TRANS_LOG(INFO, "get partition success.", K(pkey.table_id_)); partition_->set_pkey(&pkey); partition = partition_; partition_->set_log_service(new MyMockObPartitionLogService()); } else if (2 == pkey.table_id_) { TRANS_LOG(WARN, "invalid argument, get partition error.", K(pkey.table_id_)); partition_->set_pkey(&pkey); partition = partition_; partition_->set_log_service(new MyMockObPartitionLogService()); // ret = OB_ERR_UNEXPECTED; } else if (3 == pkey.table_id_) { TRANS_LOG(INFO, "get partition success.", K(pkey.table_id_)); partition_->set_pkey(&pkey); partition = partition_; partition_->set_log_service(new MyMockObPartitionLogService()); } else if (4 == pkey.table_id_) { TRANS_LOG(INFO, "get partition success.", K(pkey.table_id_)); partition_->set_pkey(&pkey); partition = partition_; partition_->set_log_service(new MyMockObPartitionLogService()); } else { partition_->set_pkey(&pkey); partition = partition_; partition_->set_log_service(new MyMockObPartitionLogService()); TRANS_LOG(INFO, "test info", K(pkey)); // do nothing. } if (NULL != log_service) { delete log_service; log_service = NULL; } return ret; } int get_partition(const common::ObPartitionKey& pkey, ObIPartitionGroupGuard& guard) const { int ret = common::OB_SUCCESS; ObIPartitionGroup* partition = NULL; if (OB_FAIL(get_partition(pkey, partition))) { STORAGE_LOG(WARN, "get partition failed", K(pkey), K(ret)); } else { guard.set_partition_group(this->get_pg_mgr(), *partition); } return ret; } virtual int get_curr_member_list(const common::ObPartitionKey& pkey, common::ObMemberList& member_list) const { UNUSED(pkey); UNUSED(member_list); return OB_SUCCESS; } private: MyMockObPartition* partition_; }; // define local_ip static const char* LOCAL_IP = "127.0.0.1"; static const int32_t PORT = 8080; static const ObAddr::VER IP_TYPE = ObAddr::IPV4; class TestPerformance : public ::testing::Test { public: TestPerformance() : self_(IP_TYPE, LOCAL_IP, PORT), sp_trans_(false), access_mod_(ObTransAccessMode::UNKNOWN) {} virtual ~TestPerformance() { destroy(); } virtual void SetUp() { init(); } virtual void TearDown() {} int init(); void destroy(); public: int smoke(); int get_random_partition(ObPartitionLeaderArray& array); void set_sp_trans(const bool sp_trans) { sp_trans_ = sp_trans; } void set_access_mod(const int32_t access_mod) { access_mod_ = access_mod; } private: int do_trans(); private: static const uint64_t TENANT_ID = 1234; static const int32_t PARTITION_COUNT_PER_TABLE = 5; static const int32_t MAX_TABLE = 10; static const int32_t TABLE_NUMBER_START = 20000; int generate_participant_arr_guard_(const ObPartitionArray& participants, ObIPartitionArrayGuard& pkey_guard_arr); private: ObAddr self_; MockObTransRpc rpc_; share::schema::ObMultiVersionSchemaService* schema_service_; MockObLocationAdapter location_adapter_; MockObClogAdapter clog_adapter_; ObTransService trans_service_; MyMockObPartitionService partition_service_; ObPartitionArray partition_array_; ObAddrArray addr_array_; bool sp_trans_; int32_t access_mod_; ObLtsSource lts_source_; MockObTsMgr* ts_mgr_; }; int TestPerformance::init() { int ret = OB_SUCCESS; ts_mgr_ = new MockObTsMgr(lts_source_); // ObKVGlobalCache::get_instance().init(); oceanbase::common::ObClusterVersion::get_instance().refresh_cluster_version("1.3.0"); schema_service_ = new share::schema::ObMultiVersionSchemaService(); rpc_.init(&trans_service_, self_); location_adapter_.init(NULL, schema_service_); location_adapter_.set_self(self_); clog_adapter_.init(&partition_service_, self_); trans_service_.init(self_, &rpc_, &location_adapter_, &clog_adapter_, &partition_service_, schema_service_, ts_mgr_); trans_service_.start(); for (int32_t i = 0; i < MAX_TABLE; i++) { for (int32_t j = 0; j < PARTITION_COUNT_PER_TABLE; j++) { ObPartitionKey partition(combine_id(TENANT_ID, i + TABLE_NUMBER_START), j, PARTITION_COUNT_PER_TABLE); partition_array_.push_back(partition); addr_array_.push_back(self_); trans_service_.add_partition(partition); trans_service_.leader_takeover(partition, leader_active_arg); trans_service_.leader_active(partition, leader_active_arg); } } sp_trans_ = true; return ret; } void TestPerformance::destroy() { delete schema_service_; delete ts_mgr_; } int TestPerformance::generate_participant_arr_guard_( const ObPartitionArray& participants, ObIPartitionArrayGuard& pkey_guard_arr) { int ret = OB_SUCCESS; pkey_guard_arr.set_pg_mgr(partition_service_.get_pg_mgr()); for (int64_t i = 0; OB_SUCC(ret) && i < participants.count(); ++i) { const ObPartitionKey& pkey = participants.at(i); ObIPartitionGroupGuard pkey_guard; if (OB_FAIL(partition_service_.get_partition(pkey, pkey_guard))) { STORAGE_LOG(WARN, "get partition failed", K(pkey), K(ret)); } else if (NULL == pkey_guard.get_partition_group()) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "get partition failed", K(pkey), K(ret)); } else if (OB_FAIL(pkey_guard_arr.push_back(pkey_guard.get_partition_group()))) { STORAGE_LOG(WARN, "pkey guard push back error", K(ret), K(i), K(participants)); } else { STORAGE_LOG(INFO, "pkey guard : ", K(pkey_guard.get_partition_group()->get_partition_key()), K(pkey)); // do nothing } } return ret; } int TestPerformance::do_trans() { int ret = OB_SUCCESS; const uint64_t tenant_id = TENANT_ID; const uint64_t thread_id = 100; const int64_t cur_ts = ObClockGenerator::getClock(); const int64_t trans_expired_time = cur_ts + 10000000; const int64_t stmt_expired_time = cur_ts + 1000000; ObStartTransParam trans_param; ObTransDesc trans_desc; ObPartitionLeaderArray pla; ObPartitionArray out; ObStmtDesc& stmt_desc = trans_desc.get_cur_stmt_desc(); MockObEndTransCallback cb_; // non-sp trans, each participant executes tree sql requests static const int64_t STMT_PER_TRANS = 3; if (access_mod_ == ObTransAccessMode::READ_ONLY) { trans_param.set_access_mode(ObTransAccessMode::READ_ONLY); stmt_desc.stmt_type_ = stmt::T_SELECT; stmt_desc.is_sfu_ = false; } else { trans_param.set_access_mode(ObTransAccessMode::READ_WRITE); stmt_desc.stmt_type_ = stmt::T_UPDATE; stmt_desc.is_sfu_ = false; } trans_param.set_type(ObTransType::TRANS_USER); trans_param.set_isolation(ObTransIsolation::READ_COMMITED); trans_param.set_autocommit(true); trans_param.set_cluster_version(GET_MIN_CLUSTER_VERSION()); ObStmtParam stmt_param; stmt_desc.phy_plan_type_ = OB_PHY_PLAN_LOCAL; stmt_desc.consistency_level_ = ObTransConsistencyLevel::STRONG; if (OB_SUCCESS != (ret = get_random_partition(pla))) { TRANS_LOG(WARN, "get random partition error", K(ret)); } else if (OB_SUCCESS != (ret = trans_service_.start_trans( tenant_id, thread_id, trans_param, trans_expired_time, 0, 0, trans_desc))) { TRANS_LOG(WARN, "start trans error", K(ret)); } else { const int64_t stmt_count = sp_trans_ ? 1 : STMT_PER_TRANS; for (int64_t i = 0; OB_SUCC(ret) && i < stmt_count; ++i) { if (OB_FAIL(stmt_param.init(tenant_id, stmt_expired_time, false))) { TRANS_LOG(WARN, "ObStmtParam init error", K(ret), K(stmt_desc)); } else if (OB_SUCCESS != (ret = trans_service_.start_stmt(stmt_param, trans_desc, pla, out))) { TRANS_LOG(WARN, "start stmt error", K(ret)); } else { ObPartitionEpochArray partition_epoch_arr; ObIPartitionArrayGuard pkey_guard_arr; generate_participant_arr_guard_(pla.get_partitions(), pkey_guard_arr); ObPartitionArray participants; if (OB_SUCCESS != (ret = trans_service_.start_participant( trans_desc, pla.get_partitions(), partition_epoch_arr, pkey_guard_arr))) { TRANS_LOG(WARN, "start participant error", K(ret)); } else if (OB_SUCCESS != (ret = trans_service_.end_participant(false /*is_rollback*/, trans_desc, pla.get_partitions()))) { TRANS_LOG(WARN, "end participant error", K(ret)); } else { // do nothing } const bool is_rollback = (OB_SUCCESS == ret) ? false : true; // overwrite retcode ObPartitionEpochArray epoch_arr; bool incomplete = false; if (OB_SUCCESS != (ret = trans_service_.end_stmt( is_rollback, incomplete, pla.get_partitions(), epoch_arr, participants, pla, trans_desc))) { TRANS_LOG(WARN, "end stmt error", K(ret)); } } } const bool is_rollback = (OB_SUCCESS == ret) ? false : true; if (OB_SUCCESS != (ret = trans_service_.end_trans(is_rollback, trans_desc, cb_, stmt_expired_time))) { TRANS_LOG(WARN, "end trans error", K(ret)); } else { ret = cb_.wait(); } } return ret; } int TestPerformance::smoke() { int ret = OB_SUCCESS; const int64_t MAX_LOOP = 1000; for (int64_t loop = 0; OB_SUCC(ret) && loop < MAX_LOOP; ++loop) { ret = do_trans(); } return ret; } int TestPerformance::get_random_partition(ObPartitionLeaderArray& array) { int ret = OB_SUCCESS; int64_t count = 0; if (sp_trans_) { count = 1; } else { count = (rand() % 15) + 2; } array.reset(); // Any two of participants of a sentence are not allowed to be the same. for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) { // const int64_t idx = ObRandom::rand(0, partition_array_.count() - 1); const int64_t idx = i; ret = array.push(partition_array_[idx], addr_array_[idx]); } return ret; } static void* thr_fn(void* arg) { TestPerformance* p = reinterpret_cast(arg); p->smoke(); p->rpc_.destroy(); return (void*)0; } //////////////////////test/////////////////////////// // test read only transactions without concurrency TEST_F(TestPerformance, sp_readonly_performance) { TRANS_LOG(INFO, "called", "func", test_info_->name()); set_sp_trans(true); set_access_mod(ObTransAccessMode::READ_ONLY); const int64_t start = ObClockGenerator::getClock(); EXPECT_EQ(OB_SUCCESS, smoke()); const int64_t end = ObClockGenerator::getClock(); TRANS_LOG(INFO, "sp_readonly_performance statistic", "total_used", end - start); } // test ready only transactions wiht concurrency TEST_F(TestPerformance, sp_readonly_concurrent_performance) { TRANS_LOG(INFO, "called", "func", test_info_->name()); const int64_t THREAD_NUM = get_cpu_num() * 2; pthread_t tids[THREAD_NUM]; set_sp_trans(true); set_access_mod(ObTransAccessMode::READ_ONLY); const int64_t start = ObClockGenerator::getClock(); for (int64_t i = 0; i < THREAD_NUM; ++i) { EXPECT_EQ(0, pthread_create(&tids[i], NULL, thr_fn, this)); } // collect resources of threads for (int64_t i = 0; i < THREAD_NUM; ++i) { pthread_join(tids[i], NULL); } const int64_t end = ObClockGenerator::getClock(); TRANS_LOG(INFO, "sp_readonly_concurrent_performance statistic", "total_used", end - start); } // test read-only transactions without concurrency TEST_F(TestPerformance, mp_readonly_performance) { TRANS_LOG(INFO, "called", "func", test_info_->name()); set_sp_trans(false); set_access_mod(ObTransAccessMode::READ_ONLY); const int64_t start = ObClockGenerator::getClock(); EXPECT_EQ(OB_SUCCESS, smoke()); const int64_t end = ObClockGenerator::getClock(); TRANS_LOG(INFO, "mp_readonly_performance statistic", "total_used", end - start); } // test read-only transactions with concurrency TEST_F(TestPerformance, mp_readonly_concurrent_performance) { TRANS_LOG(INFO, "called", "func", test_info_->name()); const int64_t THREAD_NUM = get_cpu_num() * 2; pthread_t tids[THREAD_NUM]; set_sp_trans(false); set_access_mod(ObTransAccessMode::READ_ONLY); const int64_t start = ObClockGenerator::getClock(); for (int64_t i = 0; i < THREAD_NUM; ++i) { EXPECT_EQ(0, pthread_create(&tids[i], NULL, thr_fn, this)); } // collect resources of threads for (int64_t i = 0; i < THREAD_NUM; ++i) { pthread_join(tids[i], NULL); } const int64_t end = ObClockGenerator::getClock(); TRANS_LOG(INFO, "mp_readonly_concurrent_performance statistic", "total_used", end - start); } // test read-write transactions without concurrency TEST_F(TestPerformance, sp_read_write_performance) { TRANS_LOG(INFO, "called", "func", test_info_->name()); set_sp_trans(true); set_access_mod(ObTransAccessMode::READ_WRITE); const int64_t start = ObClockGenerator::getClock(); EXPECT_EQ(OB_SUCCESS, smoke()); const int64_t end = ObClockGenerator::getClock(); TRANS_LOG(INFO, "sp_read_write_performance statistic", "total_used", end - start); } // test read-write transactions with concurrency TEST_F(TestPerformance, sp_read_write_concurrent_performance) { TRANS_LOG(INFO, "called", "func", test_info_->name()); const int64_t THREAD_NUM = get_cpu_num() * 2; pthread_t tids[THREAD_NUM]; set_sp_trans(true); set_access_mod(ObTransAccessMode::READ_WRITE); const int64_t start = ObClockGenerator::getClock(); for (int64_t i = 0; i < THREAD_NUM; ++i) { EXPECT_EQ(0, pthread_create(&tids[i], NULL, thr_fn, this)); } // collect resources of threads for (int64_t i = 0; i < THREAD_NUM; ++i) { pthread_join(tids[i], NULL); } const int64_t end = ObClockGenerator::getClock(); TRANS_LOG(INFO, "sp_read_write_concurrent_performance statistic", "total_used", end - start); } // test read-write transactions without concurrency TEST_F(TestPerformance, mp_read_write_performance) { TRANS_LOG(INFO, "called", "func", test_info_->name()); set_sp_trans(false); set_access_mod(ObTransAccessMode::READ_WRITE); const int64_t start = ObClockGenerator::getClock(); EXPECT_EQ(OB_SUCCESS, smoke()); const int64_t end = ObClockGenerator::getClock(); TRANS_LOG(INFO, "mp_read_write_performance statistic", "total_used", end - start); } // test read-write transactions with concurrency TEST_F(TestPerformance, mp_read_write_concurrent_performance) { TRANS_LOG(INFO, "called", "func", test_info_->name()); const int64_t THREAD_NUM = get_cpu_num() * 2; pthread_t tids[THREAD_NUM]; set_sp_trans(false); set_access_mod(ObTransAccessMode::READ_WRITE); const int64_t start = ObClockGenerator::getClock(); for (int64_t i = 0; i < THREAD_NUM; ++i) { EXPECT_EQ(0, pthread_create(&tids[i], NULL, thr_fn, this)); } // collect resources of threads for (int64_t i = 0; i < THREAD_NUM; ++i) { pthread_join(tids[i], NULL); } const int64_t end = ObClockGenerator::getClock(); TRANS_LOG(INFO, "mp_read_write_concurrent_performance statistic", "total_used", end - start); } } // namespace unittest } // namespace oceanbase using namespace oceanbase; using namespace oceanbase::common; int main(int argc, char** argv) { int ret = 1; ObLogger& logger = ObLogger::get_logger(); logger.set_file_name("test_performance.log", true); logger.set_log_level(OB_LOG_LEVEL_INFO); if (OB_SUCCESS != ObClockGenerator::init()) { TRANS_LOG(WARN, "clock generator init error!"); } else { testing::InitGoogleTest(&argc, argv); ret = RUN_ALL_TESTS(); } return ret; }