831 lines
27 KiB
C++
831 lines
27 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#define USING_LOG_PREFIX TRANS
|
|
#include "tx_node.h"
|
|
#define FAST_FAIL() \
|
|
do { \
|
|
if (OB_FAIL(ret)) { \
|
|
TRANS_LOG(ERROR, "[tx node crash] fast-fail for easy debug", K(ret)); \
|
|
ob_abort(); \
|
|
} \
|
|
} while(0);
|
|
|
|
namespace oceanbase {
|
|
namespace common {
|
|
int ObClusterVersion::get_tenant_data_version(const uint64_t tenant_id, uint64_t &data_version)
|
|
{
|
|
data_version = DATA_CURRENT_VERSION;
|
|
return OB_SUCCESS;
|
|
}
|
|
}
|
|
namespace share
|
|
{
|
|
void* ObMemstoreAllocator::alloc(AllocHandle& handle, int64_t size, const int64_t expire_ts)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
int64_t align_size = upper_align(size, sizeof(int64_t));
|
|
uint64_t tenant_id = arena_.get_tenant_id();
|
|
bool is_out_of_mem = false;
|
|
if (!handle.is_id_valid()) {
|
|
COMMON_LOG(TRACE, "MTALLOC.first_alloc", KP(&handle.mt_));
|
|
LockGuard guard(lock_);
|
|
if (!handle.is_id_valid()) {
|
|
handle.set_clock(arena_.retired());
|
|
hlist_.set_active(handle);
|
|
}
|
|
}
|
|
return arena_.alloc(handle.id_, handle.arena_handle_, align_size);
|
|
}
|
|
}
|
|
|
|
namespace storage {
|
|
|
|
int ObTxTable::online()
|
|
{
|
|
ATOMIC_INC(&epoch_);
|
|
ATOMIC_STORE(&state_, TxTableState::ONLINE);
|
|
return OB_SUCCESS;
|
|
}
|
|
|
|
} // namespace storage
|
|
|
|
namespace transaction {
|
|
|
|
ObTxDescGuard::~ObTxDescGuard() {
|
|
if (tx_desc_) {
|
|
release();
|
|
}
|
|
}
|
|
int ObTxDescGuard::release() {
|
|
int ret = OB_SUCCESS;
|
|
if (tx_desc_) {
|
|
tx_node_->release_tx(*tx_desc_);
|
|
tx_desc_ = NULL;
|
|
} else {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
ObString ObTxNode::get_identifer_str()
|
|
{
|
|
struct ID {
|
|
ObAddr addr;
|
|
int64_t ls_id;
|
|
DECLARE_TO_STRING {
|
|
int64_t pos = 0;
|
|
int32_t pos0 = 0;
|
|
addr.addr_to_buffer(buf, buf_len, pos0);
|
|
pos += pos0;
|
|
BUF_PRINTF("_%ld", ls_id);
|
|
return pos;
|
|
}
|
|
} identifer = {
|
|
.addr = addr_,
|
|
.ls_id = ls_id_.id()
|
|
};
|
|
int64_t pos = identifer.to_string(buf_, sizeof(buf_));
|
|
return ObString(pos, buf_);
|
|
}
|
|
ObTxNode::ObTxNode(const int64_t ls_id,
|
|
const ObAddr &addr,
|
|
MsgBus &msg_bus) :
|
|
name_(name_buf_),
|
|
addr_(addr),
|
|
ls_id_(ls_id),
|
|
tenant_id_(1001),
|
|
tenant_(tenant_id_, 0, 10, *GCTX.cgroup_ctrl_),
|
|
fake_part_trans_ctx_pool_(1001, false, false, 4),
|
|
memtable_(NULL),
|
|
msg_consumer_(get_identifer_str(),
|
|
&msg_queue_,
|
|
std::bind(&ObTxNode::handle_msg_,
|
|
this, std::placeholders::_1)),
|
|
t3m_(tenant_id_),
|
|
fake_rpc_(&msg_bus, addr, &get_location_adapter_()),
|
|
lock_memtable_(),
|
|
fake_tx_log_adapter_(nullptr)
|
|
{
|
|
fake_part_trans_ctx_pool_.init();
|
|
addr.to_string(name_buf_, sizeof(name_buf_));
|
|
msg_consumer_.set_name(name_);
|
|
role_ = Leader;
|
|
tenant_.enable_tenant_ctx_check_ = false;
|
|
tenant_.set(&fake_tenant_freezer_);
|
|
tenant_.set(&fake_part_trans_ctx_pool_);
|
|
fake_shared_mem_alloc_mgr_.init();
|
|
tenant_.set(&fake_shared_mem_alloc_mgr_);
|
|
tenant_.start();
|
|
ObTenantEnv::set_tenant(&tenant_);
|
|
ObTableHandleV2 lock_memtable_handle;
|
|
lock_memtable_handle.set_table(&lock_memtable_, &t3m_, ObITable::LOCK_MEMTABLE);
|
|
lock_memtable_.key_.table_type_ = ObITable::LOCK_MEMTABLE;
|
|
fake_ls_.ls_tablet_svr_.lock_memtable_mgr_.t3m_ = &t3m_;
|
|
fake_ls_.ls_tablet_svr_.lock_memtable_mgr_.add_memtable_(lock_memtable_handle);
|
|
fake_lock_table_.is_inited_ = true;
|
|
fake_lock_table_.parent_ = &fake_ls_;
|
|
fake_lock_table_.lock_mt_mgr_ = &(fake_ls_.ls_tablet_svr_.lock_memtable_mgr_);
|
|
fake_tenant_freezer_.is_inited_ = true;
|
|
fake_tenant_freezer_.tenant_info_.is_loaded_ = true;
|
|
fake_tenant_freezer_.tenant_info_.mem_memstore_limit_ = INT64_MAX;
|
|
// memtable.freezer
|
|
fake_freezer_.freeze_flag_ = 0;// is_freeze() = false;
|
|
// txn service
|
|
int ret = OB_SUCCESS;
|
|
OZ(txs_.init(addr,
|
|
&fake_rpc_,
|
|
&fake_dup_table_rpc_,
|
|
&get_location_adapter_(),
|
|
&get_gti_source_(),
|
|
&get_ts_mgr_(),
|
|
&rpc_proxy_,
|
|
&schema_service_,
|
|
&server_tracer_));
|
|
tenant_.set(&txs_);
|
|
OZ(fake_opt_stat_mgr_.init(tenant_id_));
|
|
tenant_.set(&fake_opt_stat_mgr_);
|
|
OZ(fake_lock_wait_mgr_.init());
|
|
tenant_.set(&fake_lock_wait_mgr_);
|
|
ls_service_.is_inited_ = true;
|
|
OZ(ls_service_.ls_map_.init(tenant_id_, lib::ObMallocAllocator::get_instance()));
|
|
tenant_.set(&ls_service_);
|
|
OZ (create_memtable_(100000, memtable_));
|
|
{
|
|
ObColDesc col_desc;
|
|
col_desc.col_id_ = 1;
|
|
col_desc.col_type_.set_type(ObObjType::ObIntType);
|
|
col_desc.col_order_ = common::ObOrderType::ASC;
|
|
columns_.push_back(col_desc);
|
|
col_desc.col_id_ = 2;
|
|
columns_.push_back(col_desc);
|
|
}
|
|
OZ(msg_bus.regist(addr, *this));
|
|
OZ(drop_msg_type_set_.create(16));
|
|
FAST_FAIL();
|
|
}
|
|
ObTxDescGuard ObTxNode::get_tx_guard() {
|
|
int ret = OB_SUCCESS;
|
|
ObTxDesc *tx = NULL;
|
|
OZ(txs_.acquire_tx(tx));
|
|
ObTxDescGuard x = ObTxDescGuard(this, tx);
|
|
return x;
|
|
}
|
|
int ObTxNode::start() {
|
|
int ret = OB_SUCCESS;
|
|
ObTenantEnv::set_tenant(&tenant_);
|
|
|
|
if (ObTxNodeRole::Leader == role_) {
|
|
fake_tx_log_adapter_ = new ObFakeTxLogAdapter();
|
|
OZ(fake_tx_log_adapter_->start());
|
|
}
|
|
get_ts_mgr_().reset();
|
|
OZ(msg_consumer_.start());
|
|
OZ(txs_.start());
|
|
OZ(create_ls_(ls_id_));
|
|
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL;
|
|
OZ(txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(ls_id_, ls_tx_ctx_mgr));
|
|
if (ObTxNodeRole::Leader == role_) {
|
|
OZ(ls_tx_ctx_mgr->switch_to_leader());
|
|
wait_all_redolog_applied();
|
|
CK(ls_tx_ctx_mgr->is_master_());
|
|
}
|
|
if (ls_tx_ctx_mgr) {
|
|
fake_tx_table_.tx_ctx_table_.ls_tx_ctx_mgr_ = ls_tx_ctx_mgr;
|
|
fake_tx_table_.is_inited_ = true;
|
|
fake_tx_table_.ls_ = &fake_ls_;
|
|
fake_tx_table_.online();
|
|
int tx_data_table_offset = offsetof(storage::ObTxTable, tx_data_table_);
|
|
void* ls_tx_data_table_ptr = (void*)((int64_t)&(fake_ls_.tx_table_) + tx_data_table_offset);
|
|
ls_tx_data_table_ptr = &fake_tx_table_.tx_data_table_;
|
|
fake_ls_.tx_table_.is_inited_ = true;
|
|
fake_ls_.tx_table_.online();
|
|
fake_ls_.ls_meta_.clog_checkpoint_scn_ = share::SCN::max_scn();
|
|
} else {
|
|
abort();
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
struct MsgInfo {
|
|
void *msg_ptr_ = NULL;
|
|
int64_t recv_time_ = 0;
|
|
ObAddr sender_;
|
|
bool is_callback_msg_ = false;
|
|
bool is_sync_msg_ = false;
|
|
TxMsgCallbackMsg callback_msg_;
|
|
int16_t msg_type_;
|
|
int64_t size_;
|
|
const char* buf_;
|
|
TO_STRING_KV(K_(msg_ptr),
|
|
K_(recv_time),
|
|
K_(sender),
|
|
K_(is_callback_msg),
|
|
K_(is_sync_msg),
|
|
K_(msg_type),
|
|
K_(callback_msg),
|
|
K_(size));
|
|
};
|
|
|
|
int get_msg_info(ObTxNode::MsgPack *pkt, MsgInfo& msg_info)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const char* buf = pkt->body_.ptr();
|
|
int64_t size = pkt->body_.length();
|
|
int64_t pos = 0;
|
|
char cat = 0; int16_t pcode = 0;
|
|
OZ (serialization::decode(buf, size, pos, cat));
|
|
if (cat == 1) {
|
|
msg_info.is_callback_msg_ = true;
|
|
OZ (msg_info.callback_msg_.deserialize(buf, size, pos));
|
|
msg_info.msg_type_ = msg_info.callback_msg_.type_;
|
|
} else {
|
|
OZ (serialization::decode(buf, size, pos, pcode));
|
|
msg_info.msg_type_ = pcode;
|
|
msg_info.buf_ = buf + pos;
|
|
msg_info.size_ = size - pos;
|
|
}
|
|
msg_info.is_sync_msg_ = pkt->is_sync_msg_;
|
|
msg_info.msg_ptr_ = (void*)pkt->body_.ptr();
|
|
msg_info.recv_time_ = pkt->recv_time_;
|
|
msg_info.sender_ = pkt->sender_;
|
|
return ret;
|
|
}
|
|
|
|
void ObTxNode::dump_msg_queue_()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
MsgPack *msg = NULL;
|
|
int i = 0;
|
|
while(OB_SUCC(msg_queue_.pop((ObLink*&)msg))) {
|
|
++i;
|
|
MsgInfo msg_info;
|
|
OZ (get_msg_info(msg, msg_info));
|
|
TRANS_LOG(INFO,"[dump_msg]", K(i), K(msg_info), K(ret), KPC(this));
|
|
}
|
|
}
|
|
|
|
void ObTxNode::wait_all_msg_consumed()
|
|
{
|
|
while (msg_queue_.size() > 0 || !msg_consumer_.is_idle()) {
|
|
if (REACH_TIME_INTERVAL(200_ms)) {
|
|
TRANS_LOG(INFO, "wait msg_queue to be empty", K(msg_queue_.size()), KPC(this));
|
|
}
|
|
usleep(5_ms);
|
|
}
|
|
}
|
|
|
|
void ObTxNode::wait_tx_log_synced()
|
|
{
|
|
while(fake_tx_log_adapter_->get_inflight_cnt() > 0) {
|
|
if (REACH_TIME_INTERVAL(200_ms)) {
|
|
TRANS_LOG(INFO, "wait tx log synced...", K(fake_tx_log_adapter_->get_inflight_cnt()), KPC(this));
|
|
}
|
|
usleep(5_ms);
|
|
}
|
|
}
|
|
|
|
ObTxNode::~ObTxNode() __attribute__((optnone)) {
|
|
int ret = OB_SUCCESS;
|
|
TRANS_LOG(INFO, "destroy TxNode", KPC(this));
|
|
ObTenantEnv::set_tenant(&tenant_);
|
|
OZ(txs_.tx_ctx_mgr_.revert_ls_tx_ctx_mgr(fake_tx_table_.tx_ctx_table_.ls_tx_ctx_mgr_));
|
|
fake_tx_table_.tx_ctx_table_.ls_tx_ctx_mgr_ = nullptr;
|
|
bool is_tx_clean = false;
|
|
int retry_cnt = 0;
|
|
do {
|
|
usleep(2000);
|
|
txs_.block_tx(ls_id_, is_tx_clean);
|
|
} while(!is_tx_clean && ++retry_cnt < 1000);
|
|
OX(txs_.stop());
|
|
OZ(txs_.wait_());
|
|
OZ(drop_ls_(ls_id_));
|
|
if (role_ == Leader && fake_tx_log_adapter_) {
|
|
fake_tx_log_adapter_->stop();
|
|
fake_tx_log_adapter_->wait();
|
|
fake_tx_log_adapter_->destroy();
|
|
}
|
|
msg_consumer_.stop();
|
|
msg_consumer_.wait();
|
|
dump_msg_queue_();
|
|
//fake_tx_table_.is_inited_ = false;
|
|
if (memtable_) {
|
|
delete memtable_;
|
|
}
|
|
if (role_ == Leader && fake_tx_log_adapter_) {
|
|
delete fake_tx_log_adapter_;
|
|
}
|
|
fake_ls_.ls_meta_.ls_id_ = ObLSID(1001);
|
|
FAST_FAIL();
|
|
ObTenantEnv::set_tenant(NULL);
|
|
}
|
|
|
|
int ObTxNode::create_memtable_(const int64_t tablet_id, memtable::ObMemtable *&mt) {
|
|
int ret = OB_SUCCESS;
|
|
memtable::ObMemtable *t = new memtable::ObMemtable();
|
|
ObITable::TableKey table_key;
|
|
table_key.table_type_ = ObITable::DATA_MEMTABLE;
|
|
table_key.tablet_id_ = tablet_id;
|
|
table_key.scn_range_.start_scn_.convert_for_gts(100);
|
|
table_key.scn_range_.end_scn_.set_max();
|
|
ObLSHandle ls_handle;
|
|
ls_handle.set_ls(ls_service_.ls_map_, fake_ls_, ObLSGetMod::DATA_MEMTABLE_MOD);
|
|
OZ (t->init(table_key, ls_handle, &fake_freezer_, &fake_memtable_mgr_, 0, 0));
|
|
if (OB_SUCC(ret)) {
|
|
mt = t;
|
|
} else { delete t; }
|
|
return ret;
|
|
}
|
|
|
|
int ObTxNode::create_ls_(const ObLSID ls_id) {
|
|
int ret = OB_SUCCESS;
|
|
OZ(txs_.tx_ctx_mgr_.create_ls(tenant_id_,
|
|
ls_id,
|
|
&fake_tx_table_,
|
|
&fake_lock_table_,
|
|
*fake_ls_.get_tx_svr(),
|
|
(ObITxLogParam*)0x01,
|
|
fake_tx_log_adapter_));
|
|
if (Leader == role_) {
|
|
OZ(get_location_adapter_().fill(ls_id, addr_));
|
|
}
|
|
fake_ls_.ls_meta_.ls_id_ = ls_id;
|
|
fake_ls_.get_tx_svr()->online();
|
|
fake_ls_.get_ref_mgr().inc(ObLSGetMod::TXSTORAGE_MOD);
|
|
MTL(ObLSService*)->ls_map_.add_ls(fake_ls_);
|
|
return ret;
|
|
}
|
|
|
|
int ObTxNode::drop_ls_(const ObLSID ls_id) {
|
|
int ret = OB_SUCCESS;
|
|
OZ(txs_.tx_ctx_mgr_.remove_ls(ls_id, true));
|
|
get_location_adapter_().remove(ls_id);
|
|
OZ(MTL(ObLSService*)->ls_map_.del_ls(ls_id));
|
|
return ret;
|
|
}
|
|
|
|
int ObTxNode::recv_msg_callback_(TxMsgCallbackMsg &msg)
|
|
{
|
|
TRANS_LOG(INFO, "recv msg callback", K(msg), KPC(this));
|
|
ObTenantEnv::set_tenant(&tenant_);
|
|
int ret = OB_SUCCESS;
|
|
switch(msg.type_) {
|
|
case TxMsgCallbackMsg::SAVEPOINT_ROLLBACK:
|
|
// ignore, has changed to use async resp msg
|
|
break;
|
|
case TxMsgCallbackMsg::NORMAL:
|
|
OZ(txs_.handle_trans_msg_callback(msg.sender_ls_id_,
|
|
msg.receiver_ls_id_,
|
|
msg.tx_id_,
|
|
msg.orig_msg_type_,
|
|
msg.tx_rpc_result_.status_,
|
|
msg.receiver_addr_,
|
|
msg.request_id_,
|
|
msg.tx_rpc_result_.private_data_));
|
|
break;
|
|
default:
|
|
ret = OB_ERR_UNEXPECTED;
|
|
}
|
|
TRANS_LOG(INFO, "handle msg callback done", K(ret), K(msg), KPC(this));
|
|
return ret;
|
|
}
|
|
|
|
int ObTxNode::recv_msg(const ObAddr &sender, ObString &m)
|
|
{
|
|
TRANS_LOG(INFO, "recv_msg", K(sender), "msg_ptr", OB_P(m.ptr()), KPC(this));
|
|
int ret = OB_SUCCESS;
|
|
auto pkt = new MsgPack(sender, m);
|
|
OZ(msg_queue_.push(pkt));
|
|
msg_consumer_.wakeup();
|
|
return ret;
|
|
}
|
|
|
|
int ObTxNode::sync_recv_msg(const ObAddr &sender, ObString &m, ObString &resp)
|
|
{
|
|
TRANS_LOG(INFO, "sync_recv_msg", K(sender), "msg_ptr", OB_P(m.ptr()), KPC(this));
|
|
int ret = OB_SUCCESS;
|
|
auto pkt = new MsgPack(sender, m, true);
|
|
OZ(msg_queue_.push(pkt));
|
|
msg_consumer_.wakeup();
|
|
pkt->cond_.wait(pkt->cond_.get_key(), 500000);
|
|
if (pkt->resp_ready_) {
|
|
OX(resp = pkt->resp_);
|
|
} else {
|
|
ret = OB_TIMEOUT;
|
|
TRANS_LOG(ERROR, "wait resp timeout",K(ret));
|
|
}
|
|
ob_free((void*)const_cast<char*>(pkt->body_.ptr()));
|
|
delete pkt;
|
|
return ret;
|
|
}
|
|
|
|
int ObTxNode::handle_msg_(MsgPack *pkt)
|
|
{
|
|
ObTenantEnv::set_tenant(&tenant_);
|
|
TRANS_LOG(INFO, "begin to handle_msg", "msg_ptr", OB_P(pkt->body_.ptr()), KPC(this));
|
|
int ret = OB_SUCCESS;
|
|
MsgInfo msg_info;
|
|
OZ (get_msg_info(pkt, msg_info));
|
|
auto sender = msg_info.sender_;
|
|
const char* buf = msg_info.buf_;
|
|
int64_t size = msg_info.size_;
|
|
int64_t pos = 0;
|
|
if (OB_SUCC(ret) && msg_info.is_callback_msg_) {
|
|
return recv_msg_callback_(msg_info.callback_msg_);
|
|
}
|
|
int16_t msg_type = msg_info.msg_type_;
|
|
if (OB_HASH_EXIST == drop_msg_type_set_.exist_refactored(msg_type)) {
|
|
TRANS_LOG(WARN, "drop msg", K(msg_type), KPC(this));
|
|
return OB_SUCCESS;
|
|
}
|
|
switch (msg_type) {
|
|
#define TX_MSG_HANDLER__(t, clz, func) \
|
|
case t: \
|
|
{ \
|
|
clz msg; \
|
|
ObTransRpcResult rslt; \
|
|
OZ(msg.deserialize(buf, size, pos)); \
|
|
TRANS_LOG(TRACE, "handle_msg::", K(msg), KPC(this)); \
|
|
auto status = txs_.func(msg, rslt); \
|
|
rslt.status_ = status; \
|
|
OZ(fake_rpc_.send_msg_callback(sender, msg, rslt)); \
|
|
break; \
|
|
}
|
|
TX_MSG_HANDLER__(TX_COMMIT, ObTxCommitMsg, handle_trans_commit_request);
|
|
TX_MSG_HANDLER__(TX_COMMIT_RESP, ObTxCommitRespMsg, handle_trans_commit_response);
|
|
TX_MSG_HANDLER__(TX_ABORT, ObTxAbortMsg, handle_trans_abort_request);
|
|
TX_MSG_HANDLER__(KEEPALIVE, ObTxKeepaliveMsg, handle_trans_keepalive);
|
|
TX_MSG_HANDLER__(KEEPALIVE_RESP, ObTxKeepaliveRespMsg, handle_trans_keepalive_response);
|
|
TX_MSG_HANDLER__(ROLLBACK_SAVEPOINT_RESP, ObTxRollbackSPRespMsg, handle_sp_rollback_response);
|
|
#undef TX_MSG_HANDLER__
|
|
case TX_FREE_ROUTE_CHECK_ALIVE:
|
|
{
|
|
ObTxFreeRouteCheckAliveMsg msg;
|
|
OZ(msg.deserialize(buf, size, pos));
|
|
TRANS_LOG(TRACE, "handle_msg", K(msg), KPC(this));
|
|
OZ(txs_.tx_free_route_handle_check_alive(msg, OB_TRANS_CTX_NOT_EXIST));
|
|
break;
|
|
}
|
|
case TX_FREE_ROUTE_CHECK_ALIVE_RESP:
|
|
{
|
|
ObTxFreeRouteCheckAliveRespMsg msg;
|
|
OZ(msg.deserialize(buf, size, pos));
|
|
TRANS_LOG(TRACE, "handle_msg", K(msg), KPC(this));
|
|
// can not handle by tx node, call extra handler
|
|
if (extra_msg_handler_) {
|
|
OZ(extra_msg_handler_(msg_type, &msg));
|
|
}
|
|
break;
|
|
}
|
|
case TX_FREE_ROUTE_PUSH_STATE:
|
|
{
|
|
ObTxFreeRoutePushState msg;
|
|
OZ(msg.deserialize(buf, size, pos));
|
|
TRANS_LOG(TRACE, "handle_msg", K(msg), KPC(this));
|
|
OZ(txs_.tx_free_route_handle_push_state(msg));
|
|
ObTxFreeRoutePushStateResp resp;
|
|
resp.ret_ = ret;
|
|
int64_t buf_len = resp.get_serialize_size();
|
|
char *buf = (char*)ob_malloc(buf_len, ObNewModIds::TEST);
|
|
int64_t pos = 0;
|
|
OZ(resp.serialize(buf, buf_len, pos));
|
|
pkt->resp_ = ObString(buf_len, buf);
|
|
break;
|
|
}
|
|
case ROLLBACK_SAVEPOINT:
|
|
{
|
|
ObTxRollbackSPMsg msg;
|
|
obrpc::ObTxRpcRollbackSPResult rslt;
|
|
OZ(msg.deserialize(buf, size, pos));
|
|
TRANS_LOG(TRACE, "handle_msg", K(msg), KPC(this));
|
|
OZ(txs_.handle_sp_rollback_request(msg, rslt), msg);
|
|
break;
|
|
}
|
|
case TX_2PC_PREPARE_REQ:
|
|
case TX_2PC_PREPARE_RESP:
|
|
case TX_2PC_PRE_COMMIT_REQ:
|
|
case TX_2PC_PRE_COMMIT_RESP:
|
|
case TX_2PC_COMMIT_REQ:
|
|
case TX_2PC_COMMIT_RESP:
|
|
case TX_2PC_ABORT_REQ:
|
|
case TX_2PC_ABORT_RESP:
|
|
case TX_2PC_CLEAR_REQ:
|
|
case TX_2PC_CLEAR_RESP:
|
|
OZ(txs_.handle_tx_batch_req(msg_type, buf + pos, size - pos, false));
|
|
break;
|
|
default:
|
|
ret = OB_NOT_SUPPORTED;
|
|
}
|
|
if (msg_info.is_sync_msg_) {
|
|
pkt->resp_ready_ = true;
|
|
pkt->cond_.signal();
|
|
} else {
|
|
ob_free((void*)const_cast<char*>(pkt->body_.ptr()));
|
|
delete pkt;
|
|
}
|
|
TRANS_LOG(INFO, "handle_msg done", K(ret), KPC(this));
|
|
return ret;
|
|
}
|
|
|
|
int ObTxNode::read(ObTxDesc &tx, const int64_t key, int64_t &value, const ObTxIsolationLevel iso)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObTxReadSnapshot snapshot;
|
|
OZ(get_read_snapshot(tx,
|
|
iso,
|
|
ts_after_ms(50),
|
|
snapshot));
|
|
OZ(read(snapshot, key, value));
|
|
return ret;
|
|
}
|
|
int ObTxNode::read(const ObTxReadSnapshot &snapshot,
|
|
const int64_t key,
|
|
int64_t &value)
|
|
{
|
|
TRANS_LOG(INFO, "read", K(key), K(snapshot), KPC(this));
|
|
int ret = OB_SUCCESS;
|
|
ObTenantEnv::set_tenant(&tenant_);
|
|
ObStoreCtx read_store_ctx;
|
|
read_store_ctx.ls_ = &fake_ls_;
|
|
read_store_ctx.ls_id_ = ls_id_;
|
|
OZ(txs_.get_read_store_ctx(snapshot, false, 5000ll * 1000, read_store_ctx));
|
|
// HACK, refine: mock LS's each member in some way
|
|
read_store_ctx.mvcc_acc_ctx_.tx_table_guards_.tx_table_guard_.init(&fake_tx_table_);
|
|
read_store_ctx.mvcc_acc_ctx_.abs_lock_timeout_ts_ = ObTimeUtility::current_time() + 5000ll * 1000;
|
|
blocksstable::ObDatumRow row;
|
|
{
|
|
ObTableIterParam iter_param;
|
|
ObArenaAllocator allocator;
|
|
ObTableReadInfo read_info;
|
|
const int64_t schema_version = 100;
|
|
read_info.init(allocator, schema_version, 1, false, columns_, nullptr/*storage_cols_index*/);
|
|
iter_param.table_id_ = 1;
|
|
iter_param.tablet_id_ = 100;
|
|
iter_param.read_info_ = &read_info;
|
|
iter_param.out_cols_project_ = NULL;
|
|
iter_param.agg_cols_project_ = NULL;
|
|
iter_param.is_multi_version_minor_merge_ = false;
|
|
iter_param.need_scn_ = true;
|
|
iter_param.pushdown_filter_ = NULL;
|
|
iter_param.vectorized_enabled_ = false;
|
|
|
|
storage::ObTableAccessContext access_context;
|
|
{
|
|
access_context.is_inited_ = true;
|
|
access_context.use_fuse_row_cache_ = false;
|
|
access_context.need_scn_ = true;
|
|
access_context.store_ctx_ = &read_store_ctx;
|
|
access_context.query_flag_.read_latest_ = true;
|
|
access_context.stmt_allocator_ = &allocator;
|
|
access_context.allocator_ = &allocator;
|
|
}
|
|
ObDatumRowkey row_key;
|
|
ObStorageDatum row_key_obj;
|
|
ObObj key_obj;
|
|
row_key_obj.set_int(key);
|
|
key_obj.set_int(key);
|
|
row_key.assign(&row_key_obj, 1);
|
|
row_key.store_rowkey_.assign(&key_obj, 1);
|
|
OZ(memtable_->get(iter_param, access_context, row_key, row));
|
|
STORAGE_LOG(INFO, "read_result", K(row), KPC(this));
|
|
}
|
|
OZ(txs_.revert_store_ctx(read_store_ctx));
|
|
if (OB_SUCC(ret)) {
|
|
if (row.row_flag_.is_exist()) {
|
|
ObArenaAllocator allocator;
|
|
blocksstable::ObNewRowBuilder new_row_builder;
|
|
storage::ObStoreRow store_row;
|
|
OZ(new_row_builder.init(columns_, allocator));
|
|
OZ(new_row_builder.build_store_row(row, store_row));
|
|
OX(value = store_row.row_val_.cells_[1].get_int());
|
|
} else if (row.row_flag_.is_not_exist()) {
|
|
ret = OB_ENTRY_NOT_EXIST;
|
|
} else {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
TRANS_LOG(WARN, "unexpected row result", K(ret), K(row.row_flag_), K(row), KPC(this));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int ObTxNode::atomic_write(ObTxDesc &tx, const int64_t key, const int64_t value,
|
|
const int64_t expire_ts, const ObTxParam &tx_param)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObTxSEQ sp;
|
|
OZ(create_implicit_savepoint(tx, tx_param, sp, true));
|
|
OZ(write(tx, key, value));
|
|
if (sp.is_valid() && OB_FAIL(ret)) {
|
|
OZ(rollback_to_implicit_savepoint(tx, sp, expire_ts, nullptr));
|
|
}
|
|
return ret;
|
|
}
|
|
int ObTxNode::write(ObTxDesc &tx, const int64_t key, const int64_t value, const int16_t branch)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObTxReadSnapshot snapshot;
|
|
OZ(get_read_snapshot(tx,
|
|
tx.isolation_,
|
|
ts_after_ms(50),
|
|
snapshot));
|
|
OZ(write(tx, snapshot, key, value, branch));
|
|
return ret;
|
|
}
|
|
int ObTxNode::write(ObTxDesc &tx,
|
|
const ObTxReadSnapshot &snapshot,
|
|
const int64_t key,
|
|
const int64_t value,
|
|
const int16_t branch)
|
|
{
|
|
TRANS_LOG(INFO, "write", K(key), K(value), K(snapshot), K(tx), KPC(this));
|
|
int ret = OB_SUCCESS;
|
|
const transaction::ObSerializeEncryptMeta *encrypt_meta = NULL;
|
|
ObTenantEnv::set_tenant(&tenant_);
|
|
ObStoreCtx write_store_ctx;
|
|
auto iter = new ObTableStoreIterator();
|
|
iter->reset();
|
|
ObITable *mtb = memtable_;
|
|
iter->add_table(mtb);
|
|
write_store_ctx.ls_ = &fake_ls_;
|
|
write_store_ctx.ls_id_ = ls_id_;
|
|
write_store_ctx.table_iter_ = iter;
|
|
write_store_ctx.branch_ = branch;
|
|
concurrent_control::ObWriteFlag write_flag;
|
|
OZ(txs_.get_write_store_ctx(tx,
|
|
snapshot,
|
|
write_flag,
|
|
write_store_ctx));
|
|
write_store_ctx.mvcc_acc_ctx_.tx_table_guards_.tx_table_guard_.init(&fake_tx_table_);
|
|
ObArenaAllocator allocator;
|
|
ObDatumRow row;
|
|
ObStorageDatum cols[2] = {ObStorageDatum(), ObStorageDatum()};
|
|
cols[0].set_int(key);
|
|
cols[1].set_int(value);
|
|
row.count_ = 2;
|
|
row.storage_datums_ = cols;
|
|
row.row_flag_ = blocksstable::ObDmlFlag::DF_UPDATE;
|
|
row.trans_id_.reset();
|
|
|
|
ObTableIterParam param;
|
|
ObTableAccessContext context;
|
|
ObVersionRange trans_version_range;
|
|
const bool read_latest = true;
|
|
ObQueryFlag query_flag;
|
|
ObTableReadInfo read_info;
|
|
|
|
const int64_t schema_version = 100;
|
|
read_info.init(allocator, 2, 1, false, columns_, nullptr/*storage_cols_index*/);
|
|
|
|
trans_version_range.base_version_ = 0;
|
|
trans_version_range.multi_version_start_ = 0;
|
|
trans_version_range.snapshot_version_ = EXIST_READ_SNAPSHOT_VERSION;
|
|
query_flag.use_row_cache_ = ObQueryFlag::DoNotUseCache;
|
|
query_flag.read_latest_ = read_latest & ObQueryFlag::OBSF_MASK_READ_LATEST;
|
|
|
|
param.table_id_ = 1;
|
|
param.tablet_id_ = 1;
|
|
param.read_info_ = &read_info;
|
|
|
|
context.init(query_flag, write_store_ctx, allocator, trans_version_range);
|
|
const ObMemtableSetArg arg(&row,
|
|
&columns_,
|
|
NULL, /*update_idx*/
|
|
NULL, /*old_row*/
|
|
1, /*row_count*/
|
|
false /*check_exist*/,
|
|
encrypt_meta);
|
|
OZ(memtable_->set(param, context, arg));
|
|
int tmp_ret = OB_SUCCESS;
|
|
if (OB_TMP_FAIL(txs_.revert_store_ctx(write_store_ctx))) {
|
|
TRANS_LOG(WARN, "revert store ctx failed", KR(tmp_ret), K(write_store_ctx));
|
|
}
|
|
delete iter;
|
|
return ret;
|
|
}
|
|
|
|
int ObTxNode::write_begin(ObTxDesc &tx,
|
|
const ObTxReadSnapshot &snapshot,
|
|
ObStoreCtx& write_store_ctx)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObTenantEnv::set_tenant(&tenant_);
|
|
auto iter = new ObTableStoreIterator();
|
|
iter->reset();
|
|
ObITable *mtb = memtable_;
|
|
iter->add_table(mtb);
|
|
write_store_ctx.ls_id_ = ls_id_;
|
|
write_store_ctx.ls_ = &fake_ls_;
|
|
write_store_ctx.table_iter_ = iter;
|
|
concurrent_control::ObWriteFlag write_flag;
|
|
OZ(txs_.get_write_store_ctx(tx,
|
|
snapshot,
|
|
write_flag,
|
|
write_store_ctx));
|
|
return ret;
|
|
}
|
|
|
|
int ObTxNode::write_one_row(ObStoreCtx& write_store_ctx, const int64_t key, const int64_t value)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObTenantEnv::set_tenant(&tenant_);
|
|
|
|
ObArenaAllocator allocator;
|
|
ObTableReadInfo read_info;
|
|
const transaction::ObSerializeEncryptMeta *encrypt_meta = NULL;
|
|
const int64_t schema_version = 100;
|
|
read_info.init(allocator, 2, 1, false, columns_, nullptr/*storage_cols_index*/);
|
|
ObDatumRow row;
|
|
ObStorageDatum cols[2] = {ObStorageDatum(), ObStorageDatum()};
|
|
cols[0].set_int(key);
|
|
cols[1].set_int(value);
|
|
row.row_flag_ = blocksstable::ObDmlFlag::DF_UPDATE;
|
|
row.storage_datums_ = cols;
|
|
row.count_ = 2;
|
|
|
|
ObTableIterParam param;
|
|
ObTableAccessContext context;
|
|
ObVersionRange trans_version_range;
|
|
const bool read_latest = true;
|
|
ObQueryFlag query_flag;
|
|
|
|
|
|
trans_version_range.base_version_ = 0;
|
|
trans_version_range.multi_version_start_ = 0;
|
|
trans_version_range.snapshot_version_ = EXIST_READ_SNAPSHOT_VERSION;
|
|
query_flag.use_row_cache_ = ObQueryFlag::DoNotUseCache;
|
|
query_flag.read_latest_ = read_latest & ObQueryFlag::OBSF_MASK_READ_LATEST;
|
|
|
|
param.table_id_ = 1;
|
|
param.tablet_id_ = 1;
|
|
param.read_info_ = &read_info;
|
|
|
|
OZ(context.init(query_flag, write_store_ctx, allocator, trans_version_range));
|
|
|
|
const ObMemtableSetArg arg(&row,
|
|
&columns_,
|
|
NULL, /*update_idx*/
|
|
NULL, /*old_row*/
|
|
1, /*row_count*/
|
|
false /*check_exist*/,
|
|
encrypt_meta);
|
|
|
|
OZ(memtable_->set(param, context, arg));
|
|
|
|
return ret;
|
|
}
|
|
|
|
int ObTxNode::write_end(ObStoreCtx& write_store_ctx)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObTenantEnv::set_tenant(&tenant_);
|
|
|
|
delete write_store_ctx.table_iter_;
|
|
write_store_ctx.table_iter_ = nullptr;
|
|
OZ(txs_.revert_store_ctx(write_store_ctx));
|
|
|
|
return ret;
|
|
}
|
|
|
|
int ObTxNode::replay(const void *buffer,
|
|
const int64_t nbytes,
|
|
const palf::LSN &lsn,
|
|
const int64_t ts_ns)
|
|
{
|
|
ObTenantEnv::set_tenant(&tenant_);
|
|
int ret = OB_SUCCESS;
|
|
logservice::ObLogBaseHeader base_header;
|
|
int64_t tmp_pos = 0;
|
|
const char *log_buf = static_cast<const char *>(buffer);
|
|
if (OB_FAIL(base_header.deserialize(log_buf, nbytes, tmp_pos))) {
|
|
LOG_WARN("log base header deserialize error", K(ret));
|
|
} else {
|
|
share::SCN log_scn;
|
|
log_scn.convert_for_tx(ts_ns);
|
|
ObFakeTxReplayExecutor executor(&fake_ls_,
|
|
ls_id_,
|
|
tenant_id_,
|
|
fake_ls_.get_tx_svr(),
|
|
lsn,
|
|
log_scn,
|
|
base_header);
|
|
executor.set_memtable(memtable_);
|
|
if (OB_FAIL(executor.execute(log_buf, nbytes, tmp_pos))) {
|
|
LOG_WARN("replay tx log error", K(ret), K(lsn), K(ts_ns));
|
|
} else {
|
|
LOG_INFO("replay tx log succ", K(ret), K(lsn), K(ts_ns));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
} // transaction
|
|
} // oceanbase
|