723 lines
23 KiB
C++
723 lines
23 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#ifndef OCEANBASE_TRANSACTION_TEST_BASIC_FAKE_DEFINE_
|
|
#define OCEANBASE_TRANSACTION_TEST_BASIC_FAKE_DEFINE_
|
|
|
|
#define protected public
|
|
#define private public
|
|
|
|
#include "storage/tx/ob_trans_define.h"
|
|
#include "storage/tx_table/ob_tx_table.h"
|
|
#include "lib/utility/ob_defer.h"
|
|
#include "storage/tx/ob_location_adapter.h"
|
|
#include "storage/tx/ob_tx_log_adapter.h"
|
|
#include "storage/tx/ob_dup_table_rpc.h"
|
|
#include "storage/tx/ob_ts_mgr.h"
|
|
#include "storage/tx/ob_gti_source.h"
|
|
#include "storage/tx/ob_tx_replay_executor.h"
|
|
#include "storage/tx/ob_trans_part_ctx.h"
|
|
|
|
namespace oceanbase {
|
|
using namespace share;
|
|
using namespace memtable;
|
|
|
|
|
|
namespace transaction {
|
|
|
|
class ObFakeTxDataTable : public ObTxDataTable {
|
|
public:
|
|
ObSliceAlloc slice_allocator_;
|
|
ObTenantTxDataAllocator __FAKE_ALLOCATOR_OBJ;
|
|
ObTenantTxDataAllocator *FAKE_ALLOCATOR = &__FAKE_ALLOCATOR_OBJ;
|
|
ObTenantTxDataOpAllocator __FAKE_ALLOCATOR_OBJ2;
|
|
ObTenantTxDataOpAllocator *FAKE_ALLOCATOR2 = &__FAKE_ALLOCATOR_OBJ2;
|
|
|
|
public:
|
|
ObFakeTxDataTable() : arena_allocator_(), map_(arena_allocator_, 1 << 20 /*2097152*/)
|
|
{
|
|
IGNORE_RETURN map_.init();
|
|
ObMemAttr mem_attr;
|
|
mem_attr.label_ = "TX_DATA_TABLE";
|
|
mem_attr.tenant_id_ = 1;
|
|
mem_attr.ctx_id_ = ObCtxIds::DEFAULT_CTX_ID;
|
|
ObMemtableMgrHandle memtable_mgr_handle;
|
|
OB_ASSERT(OB_SUCCESS == slice_allocator_.init(
|
|
sizeof(ObTxData), OB_MALLOC_NORMAL_BLOCK_SIZE, common::default_blk_alloc, mem_attr));
|
|
slice_allocator_.set_nway(32);
|
|
FAKE_ALLOCATOR->init("FAKE_A");
|
|
FAKE_ALLOCATOR2->init();
|
|
tx_data_allocator_ = FAKE_ALLOCATOR;
|
|
is_inited_ = true;
|
|
}
|
|
virtual int init(ObLS *ls, ObTxCtxTable *tx_ctx_table) override
|
|
{
|
|
return OB_SUCCESS;
|
|
}
|
|
virtual int start() override { return OB_SUCCESS; }
|
|
virtual void stop() override {}
|
|
virtual void reset() override {}
|
|
virtual void destroy() override {}
|
|
virtual int alloc_tx_data(ObTxDataGuard &tx_data_guard,
|
|
const bool enable_throttle,
|
|
const int64_t abs_expire_time)
|
|
{
|
|
ObMemAttr attr;
|
|
void *ptr = ob_malloc(TX_DATA_SLICE_SIZE, attr);
|
|
ObTxData *tx_data = new (ptr) ObTxData();
|
|
tx_data->ref_cnt_ = 100;
|
|
tx_data->tx_data_allocator_ = FAKE_ALLOCATOR;
|
|
tx_data->op_allocator_ = FAKE_ALLOCATOR2;
|
|
tx_data_guard.init(tx_data);
|
|
return OB_ISNULL(tx_data) ? OB_ALLOCATE_MEMORY_FAILED : OB_SUCCESS;
|
|
}
|
|
virtual int deep_copy_tx_data(const ObTxDataGuard &from_guard, ObTxDataGuard &to_guard) override
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
void *ptr = slice_allocator_.alloc();
|
|
ObTxData *to = new (ptr) ObTxData();
|
|
ObTxData *from = (ObTxData*)from_guard.tx_data();
|
|
to->ref_cnt_ = 100;
|
|
to->tx_data_allocator_ = FAKE_ALLOCATOR;
|
|
to_guard.init(to);
|
|
OX (*to = *from);
|
|
return ret;
|
|
}
|
|
virtual void free_tx_data(ObTxData *tx_data)
|
|
{
|
|
}
|
|
virtual int alloc_undo_status_node(ObUndoStatusNode *&undo_status_node) override
|
|
{
|
|
void *ptr = ob_malloc(TX_DATA_SLICE_SIZE, ObNewModIds::TEST);
|
|
undo_status_node = new (ptr) ObUndoStatusNode();
|
|
return OB_SUCCESS;
|
|
}
|
|
virtual int free_undo_status_node(ObUndoStatusNode *&undo_status_node) override
|
|
{
|
|
return OB_SUCCESS;
|
|
}
|
|
virtual int insert(ObTxData *&tx_data) override
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
OZ (map_.insert(tx_data->tx_id_, tx_data));
|
|
return ret;
|
|
}
|
|
virtual int check_with_tx_data(const ObTransID tx_id,
|
|
ObITxDataCheckFunctor &fn,
|
|
ObTxDataGuard &tx_data_guard,
|
|
share::SCN &recycled_scn) override
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
OZ (map_.get(tx_id, tx_data_guard));
|
|
OZ (fn(*tx_data_guard.tx_data()));
|
|
if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_TRANS_CTX_NOT_EXIST; }
|
|
return ret;
|
|
}
|
|
ObArenaAllocator arena_allocator_;
|
|
ObTxDataHashMap map_;
|
|
};
|
|
|
|
class ObFakeTxTable : public ObTxTable {
|
|
public:
|
|
ObFakeTxTable() : ObTxTable(tx_data_table_) {}
|
|
public:
|
|
ObFakeTxDataTable tx_data_table_;
|
|
};
|
|
|
|
class ObFakeDupTableRpc : public ObIDupTableRpc
|
|
{
|
|
public:
|
|
int start() { return OB_SUCCESS; }
|
|
int stop() { return OB_SUCCESS; }
|
|
int wait() { return OB_SUCCESS; }
|
|
void destroy() {}
|
|
int post_dup_table_lease_request(const uint64_t tenant_id,
|
|
const common::ObAddr &server,
|
|
const ObDupTableLeaseRequestMsg &msg) { return OB_SUCCESS; }
|
|
int post_dup_table_lease_response(const uint64_t tenant_id,
|
|
const common::ObAddr &server,
|
|
const ObDupTableLeaseResponseMsg &msg) { return OB_SUCCESS; }
|
|
int post_redo_log_sync_request(const uint64_t tenant_id,
|
|
const common::ObAddr &server,
|
|
const ObRedoLogSyncRequestMsg &msg) { return OB_SUCCESS; }
|
|
int post_redo_log_sync_response(const uint64_t tenant_id,
|
|
const common::ObAddr &server,
|
|
const ObRedoLogSyncResponseMsg &msg) { return OB_SUCCESS; }
|
|
int post_pre_commit_request(const uint64_t tenant_id,
|
|
const common::ObAddr &server,
|
|
const ObPreCommitRequestMsg &msg) { return OB_SUCCESS; }
|
|
int post_pre_commit_response(const uint64_t tenant_id,
|
|
const common::ObAddr &addr,
|
|
const ObPreCommitResponseMsg &msg) { return OB_SUCCESS; }
|
|
};
|
|
|
|
class ObFakeLocationAdapter : public ObILocationAdapter
|
|
{
|
|
public:
|
|
ObFakeLocationAdapter() {
|
|
ls_addr_table_.create(16, ObModIds::TEST);
|
|
}
|
|
int init(share::schema::ObMultiVersionSchemaService *schema_service,
|
|
share::ObLocationService *location_service) { return OB_SUCCESS; }
|
|
void destroy() { }
|
|
void reset() { ls_addr_table_.reuse(); }
|
|
int get_leader(const int64_t cluster_id,
|
|
const int64_t tenant_id,
|
|
const share::ObLSID &ls_id,
|
|
common::ObAddr &leader)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const common::ObAddr *a = ls_addr_table_.get(ls_id);
|
|
if (a == NULL) { ret = OB_ENTRY_NOT_EXIST; }
|
|
else { leader = *a; }
|
|
return ret;
|
|
}
|
|
int nonblock_get_leader(const int64_t cluster_id,
|
|
const int64_t tenant_id,
|
|
const share::ObLSID &ls_id,
|
|
common::ObAddr &leader)
|
|
{ return get_leader(cluster_id, tenant_id, ls_id, leader); }
|
|
int nonblock_renew(const int64_t cluster_id,
|
|
const int64_t tenant_id,
|
|
const share::ObLSID &ls_id)
|
|
{ return OB_SUCCESS; }
|
|
int nonblock_get(const int64_t cluster_id,
|
|
const int64_t tenant_id,
|
|
const share::ObLSID &ls_id,
|
|
share::ObLSLocation &location)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
common::ObAddr leader;
|
|
OZ(get_leader(cluster_id, tenant_id, ls_id, leader));
|
|
ObLSReplicaLocation rep_loc;
|
|
ObLSRestoreStatus restore_status(ObLSRestoreStatus::Status::NONE);
|
|
auto p = ObReplicaProperty::create_property(100);
|
|
OZ(rep_loc.init(leader, ObRole::LEADER, 10000, ObReplicaType::REPLICA_TYPE_FULL, p, restore_status, 1));
|
|
OZ(location.add_replica_location(rep_loc));
|
|
return ret;
|
|
}
|
|
public:
|
|
// maintains functions
|
|
int fill(const share::ObLSID ls_id, const common::ObAddr &addr)
|
|
{
|
|
return ls_addr_table_.set_refactored(ls_id, addr);
|
|
}
|
|
int remove(const share::ObLSID ls_id)
|
|
{
|
|
return ls_addr_table_.erase_refactored(ls_id);
|
|
}
|
|
int update_localtion(const share::ObLSID ls_id, const common::ObAddr &addr)
|
|
{
|
|
ls_addr_table_.erase_refactored(ls_id);
|
|
return ls_addr_table_.set_refactored(ls_id, addr);
|
|
}
|
|
|
|
private:
|
|
common::hash::ObHashMap<share::ObLSID, common::ObAddr> ls_addr_table_;
|
|
};
|
|
|
|
class ObFakeGtiSource : public ObIGtiSource
|
|
{
|
|
int get_trans_id(int64_t &trans_id) {
|
|
trans_id = ATOMIC_AAF(&tx_id, 1);
|
|
return OB_SUCCESS;
|
|
}
|
|
int64_t tx_id = 66;
|
|
};
|
|
|
|
class ObFakeTsMgr : public ObITsMgr
|
|
{
|
|
private:
|
|
int get_gts_error_ = OB_SUCCESS;
|
|
public:
|
|
void inject_get_gts_error(int error) {
|
|
get_gts_error_ = error;
|
|
}
|
|
void repair_get_gts_error() {
|
|
get_gts_error_ = OB_SUCCESS;
|
|
}
|
|
public:
|
|
void reset() {
|
|
get_gts_error_ = OB_SUCCESS;
|
|
elapse_waiting_mode_ = false;
|
|
get_gts_waiting_mode_ = false;
|
|
}
|
|
int update_gts(const uint64_t tenant_id, const int64_t gts, bool &update) { return OB_SUCCESS; }
|
|
int get_gts(const uint64_t tenant_id,
|
|
const MonotonicTs stc,
|
|
ObTsCbTask *task,
|
|
share::SCN &scn,
|
|
MonotonicTs &receive_gts_ts)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
int gts = 0;
|
|
TRANS_LOG(INFO, "get gts begin", K(gts_), K(>s_));
|
|
if (get_gts_error_) {
|
|
ret = get_gts_error_;
|
|
} else {
|
|
gts = ATOMIC_AAF(>s_, 1);
|
|
scn.convert_for_gts(gts);
|
|
if (task != nullptr && ATOMIC_LOAD(&get_gts_waiting_mode_)) {
|
|
get_gts_waiting_queue_.push(task);
|
|
ret = OB_EAGAIN;
|
|
}
|
|
}
|
|
TRANS_LOG(INFO, "get gts end", K(ret), K(gts_), K(gts), K(get_gts_waiting_mode_));
|
|
return ret;
|
|
}
|
|
|
|
int get_gts_sync(const uint64_t tenant_id,
|
|
const MonotonicTs stc,
|
|
const int64_t timeout_us,
|
|
share::SCN &scn,
|
|
MonotonicTs &receive_gts_ts)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const int64_t expire_ts = ObClockGenerator::getClock() + timeout_us;
|
|
|
|
do {
|
|
int64_t n = ObClockGenerator::getClock();
|
|
if (n >= expire_ts) {
|
|
ret = OB_TIMEOUT;
|
|
} else if (OB_FAIL(get_gts(tenant_id, stc, NULL, scn, receive_gts_ts))) {
|
|
if (OB_EAGAIN == ret) {
|
|
ob_usleep(500);
|
|
}
|
|
}
|
|
} while (OB_EAGAIN == ret);
|
|
|
|
return ret;
|
|
return get_gts(tenant_id, stc, NULL, scn, receive_gts_ts);
|
|
}
|
|
|
|
int get_gts(const uint64_t tenant_id, ObTsCbTask *task, share::SCN &scn) {
|
|
if (get_gts_error_) { return get_gts_error_; }
|
|
return OB_SUCCESS;
|
|
}
|
|
int get_ts_sync(const uint64_t tenant_id, const int64_t timeout_ts,
|
|
share::SCN &scn, bool &is_external_consistent) { return OB_SUCCESS; }
|
|
int get_ts_sync(const uint64_t tenant_id, const int64_t timeout_ts,
|
|
share::SCN &scn) { return OB_SUCCESS; }
|
|
int wait_gts_elapse(const uint64_t tenant_id, const share::SCN &scn, ObTsCbTask *task,
|
|
bool &need_wait)
|
|
{
|
|
TRANS_LOG(INFO, "wait_gts_elapse begin", K(gts_), K(scn));
|
|
int ret = OB_SUCCESS;
|
|
if (task != nullptr && ATOMIC_LOAD(&elapse_waiting_mode_)) {
|
|
elapse_queue_.push(task);
|
|
callback_gts_ = scn.get_val_for_gts()+1;
|
|
need_wait = true;
|
|
} else {
|
|
update_fake_gts(scn.get_val_for_gts());
|
|
}
|
|
TRANS_LOG(INFO, "wait_gts_elapse end", K(gts_), K(scn));
|
|
return ret;
|
|
}
|
|
|
|
int wait_gts_elapse(const uint64_t tenant_id, const share::SCN &scn)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (scn.get_val_for_gts() > gts_) {
|
|
ret = OB_EAGAIN;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int remove_dropped_tenant(const uint64_t tenant_id) {
|
|
UNUSED(tenant_id);
|
|
return OB_SUCCESS;
|
|
}
|
|
|
|
int interrupt_gts_callback_for_ls_offline(const uint64_t tenant_id, const share::ObLSID ls_id) {
|
|
UNUSED(tenant_id);
|
|
UNUSED(ls_id);
|
|
return OB_SUCCESS;
|
|
}
|
|
|
|
int update_base_ts(const int64_t base_ts) { return OB_SUCCESS; }
|
|
int get_base_ts(int64_t &base_ts) { return OB_SUCCESS; }
|
|
bool is_external_consistent(const uint64_t tenant_id) { return true; }
|
|
int get_gts_and_type(const uint64_t tenant_id, const MonotonicTs stc, int64_t >s,
|
|
int64_t &ts_type) { return OB_SUCCESS; }
|
|
int64_t gts_ = 100;
|
|
int64_t callback_gts_ = 100;
|
|
|
|
public:
|
|
void update_fake_gts(int64_t gts)
|
|
{
|
|
TRANS_LOG(INFO, "update fake gts", K(gts_), K(gts), K(>s_));
|
|
gts_ = gts;
|
|
}
|
|
|
|
void set_elapse_waiting_mode() { ATOMIC_SET(&elapse_waiting_mode_, true); }
|
|
void clear_elapse_waiting_mode() { ATOMIC_SET(&elapse_waiting_mode_, false); }
|
|
|
|
void set_get_gts_waiting_mode() { ATOMIC_SET(&get_gts_waiting_mode_, true); }
|
|
void clear_get_gts_waiting_mode() { ATOMIC_SET(&get_gts_waiting_mode_, false); }
|
|
|
|
void elapse_callback()
|
|
{
|
|
if (callback_gts_ > gts_) {
|
|
update_fake_gts(callback_gts_);
|
|
}
|
|
while(true) {
|
|
ObLink *task = elapse_queue_.pop();
|
|
if (task) {
|
|
const MonotonicTs srr(MonotonicTs::current_time());
|
|
share::SCN ts;
|
|
ts.convert_for_gts(gts_);
|
|
const MonotonicTs receive_gts_ts(gts_);
|
|
const ObGTSCacheTaskType task_type = WAIT_GTS_ELAPSING;
|
|
static_cast<ObTsCbTask*>(task)->gts_elapse_callback(srr, ts);
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
void get_gts_callback()
|
|
{
|
|
while(true) {
|
|
ObLink *task = get_gts_waiting_queue_.pop();
|
|
if (task) {
|
|
const MonotonicTs srr(MonotonicTs::current_time());
|
|
share::SCN ts;
|
|
ts.convert_for_gts(gts_);
|
|
const MonotonicTs receive_gts_ts(gts_);
|
|
const ObGTSCacheTaskType task_type = GET_GTS;
|
|
static_cast<ObTsCbTask*>(task)->get_gts_callback(srr, ts, receive_gts_ts);
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
public:
|
|
ObSpScLinkQueue elapse_queue_;
|
|
ObSpScLinkQueue get_gts_waiting_queue_;
|
|
bool elapse_waiting_mode_ = false;
|
|
bool get_gts_waiting_mode_ = false;
|
|
};
|
|
|
|
class ObFakeTxLogAdapter : public ObITxLogAdapter, public share::ObThreadPool
|
|
{
|
|
public:
|
|
virtual int start() {
|
|
int ret = OB_SUCCESS;
|
|
ObThreadPool::set_run_wrapper(MTL_CTX());
|
|
ret = ObThreadPool::start();
|
|
stop_ = false;
|
|
TRANS_LOG(INFO, "start.FakeTxLogAdapter", KP(this));
|
|
return ret;
|
|
}
|
|
|
|
void stop() {
|
|
stop_ = true;
|
|
cond_.signal();
|
|
TRANS_LOG(INFO, "stop.FakeTxLogAdapter", KP(this));
|
|
ObThreadPool::stop();
|
|
}
|
|
|
|
struct ApplyCbTask : public common::ObLink
|
|
{
|
|
int64_t replay_hint_;
|
|
logservice::AppendCb *cb_;
|
|
};
|
|
|
|
struct ReplayCbTask : public common::ObLink
|
|
{
|
|
int64_t replay_hint_;
|
|
char* log_buf_;
|
|
int64_t log_size_;
|
|
palf::LSN lsn_;
|
|
int64_t log_ts_;
|
|
};
|
|
|
|
const static int64_t TASK_QUEUE_CNT = 128;
|
|
ObSpScLinkQueue apply_task_queue_arr[TASK_QUEUE_CNT];
|
|
ObSpScLinkQueue replay_task_queue_arr[TASK_QUEUE_CNT];
|
|
share::SCN max_submit_scn_ = share::SCN::invalid_scn();
|
|
share::SCN max_committed_scn_ = share::SCN::invalid_scn();
|
|
|
|
void run1() {
|
|
while(true) {
|
|
int64_t process_cnt = 0;
|
|
bool stop = stop_;
|
|
if (!ATOMIC_LOAD(&pause_)) {
|
|
for (int64_t i = 0; i < TASK_QUEUE_CNT; ++i) {
|
|
while(true) {
|
|
ObLink *task = apply_task_queue_arr[i].pop();
|
|
if (task) {
|
|
++process_cnt;
|
|
max_committed_scn_ = static_cast<ApplyCbTask*>(task)->cb_->__get_scn();
|
|
static_cast<ApplyCbTask*>(task)->cb_->on_success();
|
|
delete task;
|
|
ATOMIC_DEC(&inflight_cnt_);
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (!pause_ && 0 == process_cnt && stop) {
|
|
break;
|
|
} else if (0 == process_cnt && ATOMIC_BCAS(&is_sleeping_, false, true)) {
|
|
auto key = cond_.get_key();
|
|
cond_.wait(key, 10 * 1000);
|
|
}
|
|
}
|
|
}
|
|
void wakeup() { if (ATOMIC_BCAS(&is_sleeping_, true, false)) { cond_.signal(); }}
|
|
template <typename Function> int replay_all(Function& fn)
|
|
{
|
|
LOG_INFO("replay all begin");
|
|
int ret = OB_SUCCESS;
|
|
|
|
for (int64_t i = 0; i < TASK_QUEUE_CNT; ++i) {
|
|
while(OB_SUCC(ret)) {
|
|
ReplayCbTask *task = static_cast<ReplayCbTask*>(replay_task_queue_arr[i].pop());
|
|
if (task) {
|
|
ret = fn(task->log_buf_, task->log_size_, task->lsn_, task->log_ts_);
|
|
delete task->log_buf_;
|
|
delete task;
|
|
ATOMIC_DEC(&unreplay_cnt_);
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
LOG_INFO("replay all end", K(ret));
|
|
return ret;
|
|
}
|
|
|
|
int submit_log(const char *buf,
|
|
const int64_t size,
|
|
const share::SCN &base_ts,
|
|
ObTxBaseLogCb *cb,
|
|
const bool need_nonblock,
|
|
const int64_t retry_timeout_us)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
logservice::ObLogBaseHeader base_header;
|
|
int64_t tmp_pos = 0;
|
|
if (OB_FAIL(base_header.deserialize(buf, size, tmp_pos))) {
|
|
LOG_WARN("log base header deserialize error", K(ret));
|
|
} else {
|
|
const int64_t replay_hint = base_header.get_replay_hint();
|
|
const int64_t ts = MAX(base_ts.get_val_for_gts(), ts_) + 1;
|
|
share::SCN scn;
|
|
scn.convert_for_gts(ts);
|
|
int64_t queue_idx = replay_hint % TASK_QUEUE_CNT;
|
|
if (!ATOMIC_LOAD(&log_drop_)) {
|
|
const palf::LSN lsn = palf::LSN(++lsn_);
|
|
cb->set_log_ts(scn);
|
|
cb->set_lsn(lsn);
|
|
cb->set_submit_ts(ObTimeUtility::current_time());
|
|
ts_ = ts;
|
|
ApplyCbTask *apply_task = new ApplyCbTask();
|
|
apply_task->replay_hint_ = replay_hint;
|
|
apply_task->cb_ = cb;
|
|
max_submit_scn_ = share::SCN::max(max_submit_scn_, scn);
|
|
|
|
apply_task_queue_arr[queue_idx].push(apply_task);
|
|
ATOMIC_INC(&inflight_cnt_);
|
|
wakeup();
|
|
|
|
ReplayCbTask *replay_task = new ReplayCbTask();
|
|
replay_task->replay_hint_ = replay_hint;
|
|
replay_task->log_buf_ = new char[size];
|
|
memcpy(replay_task->log_buf_, buf, size);
|
|
replay_task->log_size_ = size;
|
|
replay_task->lsn_ = lsn;
|
|
replay_task->log_ts_ = ts;
|
|
replay_task_queue_arr[queue_idx].push(replay_task);
|
|
ATOMIC_INC(&unreplay_cnt_);
|
|
|
|
TRANS_LOG(INFO, "submit_log",
|
|
K(replay_hint), K(inflight_cnt_), K(unreplay_cnt_), K(queue_idx));
|
|
hex_dump(buf, size, true, OB_LOG_LEVEL_INFO);
|
|
} else {
|
|
TRANS_LOG(INFO, "drop_log",
|
|
K(replay_hint), K(inflight_cnt_), K(unreplay_cnt_), K(queue_idx));
|
|
hex_dump(buf, size, true, OB_LOG_LEVEL_INFO);
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int get_role(bool &is_leader, int64_t &epoch) {
|
|
is_leader = true;
|
|
epoch = 1;
|
|
return OB_SUCCESS;
|
|
}
|
|
|
|
int get_max_decided_scn(share::SCN &scn) {
|
|
int ret = OB_SUCCESS;
|
|
share::SCN min_unreplayed_scn;
|
|
share::SCN min_unapplyed_scn;
|
|
min_unreplayed_scn.invalid_scn();
|
|
min_unapplyed_scn.invalid_scn();
|
|
|
|
for (int64_t i = 0; i < TASK_QUEUE_CNT; ++i) {
|
|
if (!replay_task_queue_arr[i].empty()) {
|
|
share::SCN tmp_scn;
|
|
tmp_scn.convert_for_gts(
|
|
static_cast<ReplayCbTask *>(replay_task_queue_arr[i].top())->log_ts_);
|
|
if (min_unreplayed_scn.is_valid() && tmp_scn.is_valid()) {
|
|
min_unreplayed_scn = share::SCN::min(tmp_scn, min_unreplayed_scn);
|
|
} else if (tmp_scn.is_valid()) {
|
|
min_unreplayed_scn = tmp_scn;
|
|
}
|
|
}
|
|
}
|
|
|
|
for (int64_t i = 0; i < TASK_QUEUE_CNT; ++i) {
|
|
if (!apply_task_queue_arr[i].empty()) {
|
|
share::SCN tmp_scn;
|
|
tmp_scn = (static_cast<ObTxBaseLogCb *>(
|
|
(static_cast<ApplyCbTask *>(apply_task_queue_arr[i].top()))
|
|
->cb_))
|
|
->get_log_ts();
|
|
if (min_unapplyed_scn.is_valid() && tmp_scn.is_valid()) {
|
|
min_unapplyed_scn = share::SCN::min(tmp_scn, min_unapplyed_scn);
|
|
} else if (tmp_scn.is_valid()) {
|
|
min_unapplyed_scn = tmp_scn;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (min_unapplyed_scn.is_valid() && min_unapplyed_scn.is_valid()) {
|
|
scn = share::SCN::max(min_unapplyed_scn, min_unapplyed_scn);
|
|
} else {
|
|
scn = max_submit_scn_;
|
|
}
|
|
if (scn.is_valid()) {
|
|
share::SCN::minus(scn, 1);
|
|
}
|
|
return OB_SUCCESS;
|
|
}
|
|
int get_palf_committed_max_scn(share::SCN &scn) const {
|
|
scn = max_committed_scn_;
|
|
return OB_SUCCESS;
|
|
}
|
|
int get_append_mode_initial_scn(share::SCN &ref_scn) {
|
|
int ret = OB_SUCCESS;
|
|
ref_scn = share::SCN::invalid_scn();
|
|
return ret;
|
|
}
|
|
|
|
int get_inflight_cnt() {
|
|
return ATOMIC_LOAD(&inflight_cnt_);
|
|
}
|
|
|
|
int get_unreplay_cnt() {
|
|
return ATOMIC_LOAD(&unreplay_cnt_);
|
|
}
|
|
|
|
void set_pause() {
|
|
ATOMIC_SET(&pause_, true);
|
|
}
|
|
|
|
void clear_pause() {
|
|
ATOMIC_SET(&pause_, false);
|
|
}
|
|
|
|
void set_log_drop() {
|
|
ATOMIC_SET(&log_drop_, true);
|
|
}
|
|
|
|
void clear_log_drop() {
|
|
ATOMIC_SET(&log_drop_, false);
|
|
}
|
|
|
|
bool pause_ = false;
|
|
bool log_drop_ = false;
|
|
int64_t ts_ = 1;
|
|
int64_t lsn_ = 1;
|
|
int64_t inflight_cnt_ = 0;
|
|
int64_t unreplay_cnt_ = 0;
|
|
bool is_sleeping_ = false;
|
|
common::SimpleCond cond_;
|
|
};
|
|
|
|
class ObFakeTxReplayExecutor : public ObTxReplayExecutor
|
|
{
|
|
public:
|
|
ObFakeTxReplayExecutor(storage::ObLS *ls,
|
|
const share::ObLSID &ls_id,
|
|
const uint64_t tenant_id,
|
|
storage::ObLSTxService *ls_tx_srv,
|
|
const palf::LSN &lsn,
|
|
const share::SCN &log_timestamp,
|
|
const logservice::ObLogBaseHeader &base_header)
|
|
: ObTxReplayExecutor(ls, ls_id, tenant_id, ls_tx_srv, lsn, log_timestamp, base_header)
|
|
{ memtable_ = nullptr; }
|
|
~ObFakeTxReplayExecutor() { }
|
|
int set_memtable(memtable::ObMemtable* memtable)
|
|
{
|
|
memtable_ = memtable;
|
|
return OB_SUCCESS;
|
|
}
|
|
int execute(const char *buf,
|
|
const int64_t size,
|
|
const int skip_pos)
|
|
{
|
|
return do_replay_(buf, size, skip_pos);
|
|
}
|
|
int replay_one_row_in_memtable_(memtable::ObMutatorRowHeader& row_head,
|
|
memtable::ObMemtableMutatorIterator *mmi_ptr) override
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
storage::ObStoreCtx storeCtx;
|
|
storeCtx.ls_id_ = ctx_->get_ls_id();
|
|
storeCtx.mvcc_acc_ctx_.tx_ctx_ = ctx_;
|
|
storeCtx.mvcc_acc_ctx_.mem_ctx_ = mt_ctx_;
|
|
storeCtx.tablet_id_ = row_head.tablet_id_;
|
|
storeCtx.mvcc_acc_ctx_.tx_id_ = ctx_->get_trans_id();
|
|
|
|
switch (row_head.mutator_type_) {
|
|
case memtable::MutatorType::MUTATOR_ROW: {
|
|
if (OB_FAIL(memtable_->replay_row(storeCtx, log_ts_ns_, mmi_ptr_))) {
|
|
TRANS_LOG(WARN, "[Replay Tx] replay row error", K(ret));
|
|
} else {
|
|
TRANS_LOG(INFO, "[Replay Tx] replay row in memtable success");
|
|
}
|
|
break;
|
|
}
|
|
default: {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
TRANS_LOG(ERROR, "[Replay Tx] Unknown mutator_type", K(row_head.mutator_type_));
|
|
} // default
|
|
} // switch
|
|
|
|
return ret;
|
|
}
|
|
|
|
public:
|
|
memtable::ObMemtable* memtable_;
|
|
};
|
|
|
|
class ObFakeLSTxService : public ObLSTxService
|
|
{
|
|
public:
|
|
ObFakeLSTxService(ObLS *parent) : ObLSTxService(parent) { }
|
|
~ObFakeLSTxService() {}
|
|
virtual share::SCN get_ls_weak_read_ts() override {
|
|
return share::SCN::min_scn();
|
|
}
|
|
};
|
|
|
|
} // transaction
|
|
} // oceanbase
|
|
|
|
#endif //OCEANBASE_TRANSACTION_TEST_BASIC_FAKE_DEFINE_
|