Files
oceanbase/unittest/storage/fake_partition_mgr.h
gm 4a92b6d7df reformat source code
according to code styles, 'AccessModifierOffset' should be -2.
2021-06-17 10:40:36 +08:00

442 lines
15 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_OBSERVER_FAKE_PARTITION_MGR_H_
#define OCEANBASE_OBSERVER_FAKE_PARTITION_MGR_H_
#include "lib/container/ob_array.h"
#include "lib/thread/ob_async_task_queue.h"
#include "storage/ob_i_partition_group.h"
#include "storage/ob_i_partition_mgr.h"
#include "storage/ob_i_partition_storage.h"
#include "storage/ob_major_freeze_status.h"
#define private public
#include "storage/ob_replay_status.h"
namespace oceanbase {
namespace storage {
class ObFakePartitionStorage : public ObIPartitionStorage {
public:
ObFakePartitionStorage();
virtual ~ObFakePartitionStorage();
virtual int init(const common::ObPartitionKey& pkey, ObIPartitionComponentFactory* cp_fty,
share::schema::ObMultiVersionSchemaService* schema_service, ObIPartitionReport* report,
blocksstable::ObStorageCacheSuite& cache_suite);
virtual void destroy();
//
// scan table partition
//
// @param ctx [in] transaction context
// @param param [in] query param
// @param result [out] iterator to get the result set
//
// @return result iterator
//
virtual int table_scan(const ObStoreCtx& ctx, ObTableScanParam& param, common::ObNewRowIterator*& result);
//
// release scan iterator
//
// @param iter [in] iterator to be reverted
//
virtual int revert_scan_iter(common::ObNewRowIterator* iter);
//
// delete rows
// delete table rows and index rows
//
// @param trans_desc [in] transaction handle
// @param timeout [in] process timeout
// @param table_id [in] table
// @param index_included [in] need to delete index too
// @param column_ids [in] all column referenced.rowkey first
// @param row_iter [in] primary keys to be deleted
// @param affected_rows [out]
//
// @retval OB_TRANSACTION_SET_VIOLATION
// @retval OB_SNAPSHOT_DISCARDED
// @retval OB_TRANS_NOT_FOUND
// @retval OB_TRANS_ROLLBACKED
// @retval OB_TRANS_IS_READONLY
// @retval OB_ERR_EXCLUSIVE_LOCK_CONFLICT
//
virtual int delete_rows(const ObStoreCtx& ctx, const int64_t timeout, const common::ObIArray<uint64_t>& column_ids,
common::ObNewRowIterator* row_iter, int64_t& affected_rows);
//
// insert rows
// insert table rows and index rows
//
// @param trans_desc [in] transaction handle
// @param timeout [in] process timeout
// @param table_id [in] table
// @param column_ids [in] insert columns
// @param row_iter [in] insert values
// @param affected_rows [out]
//
// @retval OB_TRANS_NOT_FOUND
// @retval OB_TRANS_ROLLBACKED
// @retval OB_TRANS_IS_READONLY
// @retval OB_ERR_EXCLUSIVE_LOCK_CONFLICT
// @retval OB_ERR_PRIMARY_KEY_DUPLICATE
//
virtual int insert_rows(const ObStoreCtx& ctx, const int64_t timeout, const common::ObIArray<uint64_t>& column_ids,
common::ObNewRowIterator* row_iter, int64_t& affected_rows);
//
// update rows
// update table rows and index rows
//
// @param trans_desc [in] transaction handle
// @param timeout [in] process timeout
// @param table_id [in] table
// @param index_included [in] if index need to be updated
// @param column_ids [in] all columns related
// @param column_ids [in] updated columns
// @param row_iter [in] odd rows are old and even rows are new ones
// @param affected_rows [out]
//
// @retval OB_TRANSACTION_SET_VIOLATION
// @retval OB_SNAPSHOT_DISCARDED
// @retval OB_TRANS_NOT_FOUND
// @retval OB_TRANS_ROLLBACKED
// @retval OB_TRANS_IS_READONLY
// @retval OB_ERR_EXCLUSIVE_LOCK_CONFLICT
// @retval OB_ERR_PRIMARY_KEY_DUPLICATE
//
virtual int update_rows(const ObStoreCtx& ctx, const int64_t timeout, const common::ObIArray<uint64_t>& column_ids,
const common::ObIArray<uint64_t>& updated_column_ids, common::ObNewRowIterator* row_iter, int64_t& affected_rows);
//
// lock rows
// lock table rows
//
// @param trans_desc [in] transaction handle
// @param timeout [in] process timeout
// @param table_id [in] table id
// @param row_iter [in] rowkey iterator
// @param lock_flag [in] lock flags: LF_WRITE or LF_NONE
// @param affected_rows [out]
//
// @retval OB_TRANSACTION_SET_VIOLATION
// @retval OB_SNAPSHOT_DISCARDED
// @retval OB_TRANS_NOT_FOUND
// @retval OB_TRANS_ROLLBACKED
// @retval OB_TRANS_IS_READONLY
// @retval OB_ERR_EXCLUSIVE_LOCK_CONFLICT
//
virtual int lock_rows(const ObStoreCtx& ctx, const int64_t timeout, common::ObNewRowIterator* row_iter,
ObLockFlag lock_flag, int64_t& affected_rows);
//
// do freeze
//
// @param freeze_type [in] major/minor freeze
// @param frozen_version [out]
//
// @return
//
virtual int freeze_memtable(const FreezeType freeze_type, uint64_t frozen_version);
//
// migrate the static and increment data for a given version partition
//
// @param migrate_version [in] target migrate version for a partition
//
// @return
// SUCC / FAILED
virtual int migrate(uint64_t migrate_version);
virtual int merge(uint64_t frozen_version);
// NOT thread safe, add store to the partition storage.
// Without lock protection, only should be invoked while initializing or log replaying
//@param store [in] the store may be a ssstore or memstore.
//@param write_redo_log [in] true if need write redo log, only used for ssstore
//@return OB_SUCCESS or other error code
virtual int add_store(ObIStore& store, const bool write_redo_log);
// NOT thread safe, remove store from the partition storage.
// Without lock protection, only should be invoked while initializing or log replaying
//@param version [in] the version of remove store
//@param write_redo_log [in] true if need write redo log, only used for ssstore
//@return OB_SUCCESS, OB_ENTRY_NOT_EXIST or other error code
virtual int remove_store(const common::ObVersion& version, const bool write_redo_log);
//
// replay log
//
// @param mutator [in] new replayed data
//
// @return
//
virtual int replay_mutator(common::ObMutator& mutator);
virtual enum ObPartitionStorageStatus get_status(const uint64_t frozen_version);
virtual int save_base_storage_info(const common::ObBaseStorageInfo& info);
virtual int get_base_storage_info(common::ObBaseStorageInfo& info);
virtual int get_saved_storage_info(ObSavedStorageInfo& info);
virtual int replay(const ObStoreCtx& ctx, const char* data, const int64_t data_len);
virtual int lock(const ObStoreCtx& ctx);
//
// return cost of doing real scan described in param.
//
virtual int64_t get_scan_cost(const ObTableScanParam& param);
// write ssstore objects @version tree to data file , used by write_check_point
virtual int serialize(ObArenaAllocator& allocator, char*& new_buf, int64_t& serialize_size);
// read ssstore objects from data file to construct partition storage's version tree.
virtual int deserialize(const char* buf, const int64_t buf_len, int64_t& pos);
virtual int get_serialize_size() const;
virtual int64_t get_storage_log_seq_num();
virtual const common::ObPartitionKey& get_partition_key() const;
virtual common::ObIAllocator& get_memstore_allocator()
{
return memstore_allocator_;
}
virtual const share::schema::ObMultiVersionSchemaService* get_schema_service() const
{
return NULL;
}
private:
common::ObPartitionKey partition_key_;
uint64_t merged_version_;
enum ObPartitionStorageStatus state_;
common::DefaultPageAllocator memstore_allocator_;
lib::ObMutex mutex_;
};
class ObFakePartition : public ObIPartition {
public:
ObFakePartition();
virtual ~ObFakePartition()
{}
virtual int init(const common::ObPartitionKey& key, ObIPartitionComponentFactory* cp_fty,
share::schema::ObMultiVersionSchemaService* schema_service, clog::ObIPartitionLogService* pls);
virtual void destroy();
virtual int replay_base_storage_log(
const int64_t log_seq_num, const int64_t subcmd, const char* buf, const int64_t len, int64_t& pos);
// get partition log service
virtual const clog::ObIPartitionLogService* get_log_service() const;
virtual clog::ObIPartitionLogService* get_log_service();
// get partition storage
virtual ObIPartitionStorage* get_storage();
virtual const common::ObPartitionKey& get_partition_key() const;
virtual void set_partition_key(const common::ObPartitionKey& pkey);
virtual int table_scan(const ObStoreCtx& ctx, ObTableScanParam& param, common::ObNewRowIterator*& result);
virtual int revert_scan_iter(common::ObNewRowIterator* iter);
virtual int delete_rows(const ObStoreCtx& ctx, const int64_t timeout, const common::ObIArray<uint64_t>& column_ids,
common::ObNewRowIterator* row_iter, int64_t& affected_rows);
virtual int insert_rows(const ObStoreCtx& ctx, const int64_t timeout, const common::ObIArray<uint64_t>& column_ids,
common::ObNewRowIterator* row_iter, int64_t& affected_rows);
virtual int update_rows(const ObStoreCtx& ctx, const int64_t timeout, const common::ObIArray<uint64_t>& column_ids,
const common::ObIArray<uint64_t>& updated_column_ids, common::ObNewRowIterator* row_iter, int64_t& affected_rows);
virtual int lock_rows(const ObStoreCtx& ctx, const int64_t timeout, common::ObNewRowIterator* row_iter,
const ObLockFlag lock_flag, int64_t& affected_rows);
virtual ObReplayStatus& get_replay_status();
virtual void set_valid(bool valid)
{
UNUSED(valid);
}
virtual bool is_valid() const
{
return true;
}
virtual int rdlock()
{
return common::OB_SUCCESS;
}
virtual int wrlock()
{
return common::OB_SUCCESS;
}
virtual int try_rdlock()
{
return common::OB_SUCCESS;
}
virtual int try_wrlock()
{
return common::OB_SUCCESS;
}
virtual int unlock()
{
return common::OB_SUCCESS;
}
virtual int save_base_storage_info(const common::ObBaseStorageInfo& info);
virtual int save_frozen_storage_info(const common::ObBaseStorageInfo& info);
virtual int get_base_storage_info(common::ObBaseStorageInfo& info);
virtual int get_frozen_storage_info(common::ObBaseStorageInfo& info);
virtual int get_saved_storage_info(ObSavedStorageInfo& info);
virtual int serialize(char* buf, const int64_t buf_len, int64_t& pos) const;
virtual int deserialize(const char* buf, const int64_t buf_len, int64_t& pos);
virtual int64_t get_serialize_size() const;
virtual common::ObRole get_role() const
{
return role_;
}
virtual void set_role(const common::ObRole role)
{
role_ = role;
}
// major freeze related
virtual int set_major_freeze_status(const int64_t frozen_version, const int64_t major_freeze_status);
virtual int get_major_freeze_status(int64_t& frozen_version, int64_t& major_freeze_status) const;
virtual void set_memtable_version(const int64_t memtable_version);
virtual int64_t get_memtable_version() const;
virtual int64_t get_c2r_timestamp()
{
return OB_INVALID_TIMESTAMP;
}
virtual void set_c2r_timestamp(int64_t timestamp)
{
UNUSED(timestamp);
}
virtual int set_replica_type(const ObReplicaType& replica_type, const bool write_redo_log)
{
UNUSED(replica_type);
UNUSED(write_redo_log);
return common::OB_SUCCESS;
}
private:
common::ObPartitionKey partition_key_;
ObFakePartitionStorage storage_;
int64_t frozen_version_;
int64_t major_freeze_status_;
int64_t memtable_version_;
common::ObRole role_;
};
class ObMajorFreezeTask : public share::ObAsyncTask {
public:
ObMajorFreezeTask(ObIMajorFreezeCb* cb, const common::ObPartitionKey& partition_key, const int64_t cmd,
const int64_t frozen_version, const int64_t frozen_timestamp, const int err);
virtual ~ObMajorFreezeTask()
{}
virtual int process();
virtual int64_t get_deep_copy_size() const
{
return sizeof(ObMajorFreezeTask);
}
virtual share::ObAsyncTask* deep_copy(char* buf, const int64_t buf_size) const;
private:
ObIMajorFreezeCb* cb_;
common::ObPartitionKey partition_key_;
int64_t cmd_;
int64_t frozen_version_;
int64_t frozen_timestamp_;
int err_; // if OB_SUCCESS, means do major_freeze op succeed
};
class ObFakePartitionMgr : public ObIPartitionMgr {
public:
class ObFakePartitionIter : public ObIPartitionIter {
public:
ObFakePartitionIter(ObFakePartitionMgr& pt_mgr) : index_(0), pt_mgr_(pt_mgr)
{}
virtual ~ObFakePartitionIter()
{}
virtual int get_next(ObIPartitionGroup*& partition);
void reset()
{
index_ = 0;
}
// make iterator work even delete partition during iteration
void fit_delete(const int64_t index);
private:
int64_t index_;
ObFakePartitionMgr& pt_mgr_;
};
// use be test major freeze
enum MajorFreezeTestMode {
ALL_SUCCEED = 0,
SOME_NOT_LEADER = 1,
SOME_ERROR = 2,
MIX = 3,
};
ObFakePartitionMgr() : partition_iter_(*this), partitions_()
{}
virtual ~ObFakePartitionMgr()
{
queue_.destroy();
}
virtual int init(ObIPartitionComponentFactory* cp_fty);
virtual void destroy();
virtual int get_all_partitions(common::ObIArray<ObIPartitionGroup*>& partition_list);
virtual int get_partition(const common::ObPartitionKey& pkey, ObIPartitionGroup*& partition);
// Add new partition to partition manager
virtual int add_partition(const common::ObPartitionKey& pkey, ObIPartitionGroup** partition = NULL);
virtual int add_partition(ObIPartitionGroup& partition);
// Delete partition from partition manager
virtual int del_partition(const common::ObPartitionKey& pkey, ObIPartitionGroup** partitoin = NULL);
virtual int revert_partition(ObIPartitionGroup* partition);
virtual ObIPartitionIter* alloc_scan_iter();
virtual int revert_scan_iter(ObIPartitionIter* iter);
virtual common::ObArray<ObFakePartition>& get_partitions();
virtual bool is_empty() const
{
return partitions_.size() == 0;
}
int64_t get_partition_count() const
{
return partitions_.size();
}
virtual int set_major_freeze_cb(ObIMajorFreezeCb* cb);
virtual int prepare_major_freeze(
const obrpc::ObPartitionList& partitions, const int64_t frozen_version, const int64_t frozen_timestamp);
virtual int commit_major_freeze(
const obrpc::ObPartitionList& partitions, const int64_t frozen_version, const int64_t frozen_timestamp);
virtual int abort_major_freeze(
const obrpc::ObPartitionList& partitions, const int64_t frozen_version, const int64_t frozen_timestamp);
virtual int set_major_freeze_status(
const common::ObPartitionKey& partition, const int64_t frozen_version, const int64_t major_freeze_status);
virtual int get_major_freeze_status(
const common::ObPartitionKey& partition, int64_t& frozen_version, int64_t& major_freeze_status);
void set_mf_test_mode(MajorFreezeTestMode mode)
{
mf_test_mode_ = mode;
}
private:
int gen_err(const int64_t i);
ObFakePartitionIter partition_iter_;
common::ObArray<ObFakePartition> partitions_;
common::ObArray<ObFakePartitionIter*> partition_iters_;
ObIMajorFreezeCb* major_freeze_cb_;
share::ObAsyncTaskQueue queue_;
MajorFreezeTestMode mf_test_mode_;
};
} // end namespace storage
} // end namespace oceanbase
#endif