321 lines
7.8 KiB
C++
321 lines
7.8 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#include "ob_mock_tx_log_adapter.h"
|
|
|
|
namespace oceanbase
|
|
{
|
|
|
|
using namespace palf;
|
|
using namespace logservice;
|
|
namespace transaction
|
|
{
|
|
int MockTxLogAdapter::init(ObITxLogParam *param)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
ObSpinLockGuard file_guard(log_file_lock_);
|
|
ObSpinLockGuard cbs_guard(cbs_lock_);
|
|
|
|
// NULL pointer will core in unittest
|
|
MockTxLogParam *mock_param = static_cast<MockTxLogParam *>(param);
|
|
submit_config_ = *mock_param;
|
|
max_allocated_log_ts_ = 0;
|
|
// max_success_log_ts = 0;
|
|
mock_log_file_.clear();
|
|
waiting_cbs_.clear();
|
|
is_running_ = false;
|
|
|
|
ObSimpleThreadPool::init(submit_config_.cb_thread_cnt_, 8000000);
|
|
timer_.init("TransTimeWheel");
|
|
return ret;
|
|
}
|
|
|
|
int MockTxLogAdapter::start()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ATOMIC_STORE(&is_running_, true);
|
|
|
|
timer_.start();
|
|
task_ptr_ =(ObITimeoutTask*)malloc(sizeof(MockCbTimeoutTask));
|
|
new (task_ptr_) MockCbTimeoutTask();
|
|
((MockCbTimeoutTask*)task_ptr_)->init(this);
|
|
timer_.register_timeout_task(*task_ptr_, submit_config_.cb_time_);
|
|
|
|
|
|
CB_CNT_ = 0;
|
|
return ret;
|
|
}
|
|
|
|
void MockTxLogAdapter::stop()
|
|
{
|
|
if (submit_config_.wait_all_cb_) {
|
|
while (is_cbs_finish_()) {
|
|
usleep(10000); // 10 ms
|
|
TRANS_LOG(INFO, "Wait cbs finish");
|
|
}
|
|
}
|
|
ATOMIC_STORE(&is_running_, false);
|
|
timer_.stop();
|
|
ObSimpleThreadPool::stop();
|
|
}
|
|
|
|
void MockTxLogAdapter::wait()
|
|
{
|
|
timer_.wait();
|
|
ObSimpleThreadPool::wait();
|
|
}
|
|
|
|
void MockTxLogAdapter::destroy()
|
|
{
|
|
// do nothing
|
|
timer_.destroy();
|
|
ObSimpleThreadPool::destroy();
|
|
}
|
|
|
|
int MockTxLogAdapter::push(void *task) { return ObSimpleThreadPool::push(task); }
|
|
|
|
void MockTxLogAdapter::handle(void *task)
|
|
{
|
|
AppendCb *cb = static_cast<AppendCb *>(task);
|
|
|
|
TRANS_LOG(INFO, "Start handle cb", K(CB_CNT_));
|
|
// nullptr of cb will core
|
|
if (nullptr != cb) {
|
|
if (ATOMIC_LOAD(&is_running_)) {
|
|
cb->on_success();
|
|
} else {
|
|
cb->on_failure();
|
|
}
|
|
ATOMIC_INC(&CB_CNT_);
|
|
}
|
|
TRANS_LOG(INFO, "End handle cb", K(CB_CNT_));
|
|
}
|
|
|
|
int MockTxLogAdapter::submit_log(const char *buf,
|
|
const int64_t size,
|
|
const share::SCN &base_ts,
|
|
ObTxBaseLogCb *cb,
|
|
const bool need_nonblock,
|
|
const int64_t retry_timeout_us)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
int64_t ts = 0;
|
|
palf::LSN lsn;
|
|
share::SCN scn;
|
|
UNUSED(need_nonblock);
|
|
if (ATOMIC_LOAD(&is_running_)) {
|
|
|
|
{
|
|
ObSpinLockGuard file_guard(log_file_lock_);
|
|
std::string log_buf(buf, size);
|
|
if (base_ts.get_val_for_gts() > max_allocated_log_ts_) {
|
|
max_allocated_log_ts_ = base_ts.get_val_for_gts();
|
|
}
|
|
ts = ++max_allocated_log_ts_;
|
|
scn.convert_for_gts(ts);
|
|
lsn = palf::LSN(++lsn_);
|
|
cb->set_log_ts(scn);
|
|
cb->set_lsn(lsn);
|
|
mock_log_file_[ts] = log_buf;
|
|
}
|
|
|
|
{
|
|
ObSpinLockGuard cbs_guard(cbs_lock_);
|
|
waiting_cbs_.push_back(cb);
|
|
}
|
|
} else {
|
|
ret = OB_NOT_MASTER;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int MockTxLogAdapter::get_role(bool &is_leader, int64_t &epoch)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
if (ATOMIC_LOAD(&is_running_)) {
|
|
is_leader = true;
|
|
epoch = 1;
|
|
} else {
|
|
is_leader = false;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
bool MockTxLogAdapter::is_cbs_finish_()
|
|
{
|
|
ObSpinLockGuard cbs_guard(cbs_lock_);
|
|
|
|
return waiting_cbs_.empty();
|
|
}
|
|
|
|
void MockTxLogAdapter::invoke_all_cbs()
|
|
{
|
|
ObSpinLockGuard cbs_guard(cbs_lock_);
|
|
|
|
TRANS_LOG(INFO,"Start push all cbs",K(waiting_cbs_.size()));
|
|
AppendCb *tmp_cb = nullptr;
|
|
while (!waiting_cbs_.empty()) {
|
|
if (submit_config_.is_asc_cbs_) {
|
|
tmp_cb = waiting_cbs_.front();
|
|
push(static_cast<void *>(tmp_cb));
|
|
waiting_cbs_.pop_front();
|
|
} else {
|
|
tmp_cb = waiting_cbs_.back();
|
|
push(static_cast<void *>(tmp_cb));
|
|
waiting_cbs_.pop_back();
|
|
}
|
|
}
|
|
TRANS_LOG(INFO,"End push all cbs",K(waiting_cbs_.size()));
|
|
|
|
timer_.unregister_timeout_task(*task_ptr_);
|
|
timer_.register_timeout_task(*task_ptr_, submit_config_.cb_time_);
|
|
}
|
|
|
|
bool MockTxLogAdapter::get_log(int64_t log_ts, std::string &log_string)
|
|
{
|
|
bool has_log = false;
|
|
ObSpinLockGuard file_guard(log_file_lock_);
|
|
if(mock_log_file_.find(log_ts) != mock_log_file_.end())
|
|
{
|
|
log_string = mock_log_file_[log_ts];
|
|
has_log = true;
|
|
}
|
|
return has_log;
|
|
}
|
|
|
|
int MockTxLogAdapter::get_next_log(int64_t log_ts, std::string &log_string, int64_t &next_log_ts)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObSpinLockGuard file_guard(log_file_lock_);
|
|
|
|
std::map<int64_t, std::string>::iterator file_iter = mock_log_file_.find(log_ts);
|
|
|
|
if (file_iter == mock_log_file_.end()) {
|
|
file_iter = mock_log_file_.begin();
|
|
}
|
|
|
|
file_iter++;
|
|
|
|
if (file_iter == mock_log_file_.end()) {
|
|
ret = OB_HASH_NOT_EXIST;
|
|
} else {
|
|
log_string = file_iter->second;
|
|
next_log_ts = file_iter->first;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
int64_t MockTxLogAdapter::get_cb_cnt()
|
|
{
|
|
return ATOMIC_LOAD(&CB_CNT_);
|
|
}
|
|
|
|
|
|
// int MockReplayMgr::init(MockTxLogAdapter * log_adapter)
|
|
// {
|
|
// int ret = OB_SUCCESS;
|
|
//
|
|
// log_adapter_ptr_ = log_adapter;
|
|
//
|
|
// return ret;
|
|
// }
|
|
//
|
|
// int MockReplayMgr::start()
|
|
// {
|
|
// int ret = OB_SUCCESS;
|
|
//
|
|
// ret = lib::ThreadPool::start();
|
|
//
|
|
// return ret;
|
|
// }
|
|
//
|
|
// void MockReplayMgr::stop()
|
|
// {
|
|
// lib::ThreadPool::stop();
|
|
// }
|
|
//
|
|
// void MockReplayMgr::wait()
|
|
// {
|
|
// lib::ThreadPool::wait();
|
|
// }
|
|
//
|
|
// void MockReplayMgr::destroy()
|
|
// {
|
|
// lib::ThreadPool::destroy();
|
|
// }
|
|
//
|
|
// void MockReplayMgr::run1()
|
|
// {
|
|
// int ret = OB_SUCCESS;
|
|
// int64_t cur_time = 0;
|
|
// int64_t time_used = 0;
|
|
// std::string tmp_log_string;
|
|
//
|
|
// palf::LSN tmp_lsn;
|
|
// while (!has_set_stop()) {
|
|
// cur_time = ObTimeUtility::current_time();
|
|
//
|
|
// for (auto iter = replay_target_list_.begin(); iter != replay_target_list_.end(); iter++) {
|
|
//
|
|
// while (OB_SUCC(log_adapter_ptr_->get_next_log(iter->replay_success_ts_, iter->replaying_log_,
|
|
// iter->replaying_ts_))) {
|
|
//
|
|
// if (OB_FAIL(iter->replay_target_->replay(tmp_log_string.c_str(), tmp_log_string.size(),
|
|
// tmp_lsn, iter->replaying_ts_))) {
|
|
// TRANS_LOG(WARN, "replay one log error", KP(iter->replay_target_), K(iter->replaying_ts_));
|
|
// } else {
|
|
// iter->replay_success_ts_ = iter->replaying_ts_;
|
|
// }
|
|
// }
|
|
//
|
|
// if (ret != OB_HASH_NOT_EXIST) {
|
|
// TRANS_LOG(WARN, "replay error", K(ret), K(iter->replay_success_ts_),
|
|
// K(iter->replaying_ts_));
|
|
// }
|
|
// }
|
|
// time_used = ObTimeUtility::current_time() - cur_time;
|
|
// if (time_used < log_adapter_ptr_->get_cb_time()) {
|
|
// usleep(log_adapter_ptr_->get_cb_time() - time_used);
|
|
// }
|
|
// }
|
|
// }
|
|
//
|
|
// void MockReplayMgr::register_replay_target(logservice::ObIReplaySubHandler *replay_target)
|
|
// {
|
|
// MockReplayInfo tmp_replay_info;
|
|
// tmp_replay_info.replay_target_ = replay_target;
|
|
//
|
|
// replay_target_list_.push_back(tmp_replay_info);
|
|
//
|
|
// }
|
|
//
|
|
// void MockReplayMgr::unregister_replay_target(logservice::ObIReplaySubHandler *replay_target)
|
|
// {
|
|
// auto iter = replay_target_list_.begin();
|
|
// for (; iter != replay_target_list_.end(); iter++) {
|
|
//
|
|
// if (iter->replay_target_ == replay_target) {
|
|
// break;
|
|
// }
|
|
// }
|
|
//
|
|
// if (iter != replay_target_list_.end()) {
|
|
// replay_target_list_.erase(iter);
|
|
// }
|
|
// }
|
|
|
|
} // namespace transaction
|
|
} // namespace oceanbase
|