/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX TRANS #include "tx_node.h" #define FAST_FAIL() \ do { \ if (OB_FAIL(ret)) { \ TRANS_LOG(ERROR, "[tx node crash] fast-fail for easy debug", K(ret)); \ ob_abort(); \ } \ } while(0); namespace oceanbase { namespace common { int ObClusterVersion::get_tenant_data_version(const uint64_t tenant_id, uint64_t &data_version) { data_version = DATA_CURRENT_VERSION; return OB_SUCCESS; } } namespace share { void* ObMemstoreAllocator::alloc(AllocHandle& handle, int64_t size, const int64_t expire_ts) { int ret = OB_SUCCESS; int64_t align_size = upper_align(size, sizeof(int64_t)); uint64_t tenant_id = arena_.get_tenant_id(); bool is_out_of_mem = false; if (!handle.is_id_valid()) { COMMON_LOG(TRACE, "MTALLOC.first_alloc", KP(&handle.mt_)); LockGuard guard(lock_); if (!handle.is_id_valid()) { handle.set_clock(arena_.retired()); hlist_.set_active(handle); } } return arena_.alloc(handle.id_, handle.arena_handle_, align_size); } } namespace storage { int ObTxTable::online() { ATOMIC_INC(&epoch_); ATOMIC_STORE(&state_, TxTableState::ONLINE); return OB_SUCCESS; } } // namespace storage namespace transaction { ObTxDescGuard::~ObTxDescGuard() { if (tx_desc_) { release(); } } int ObTxDescGuard::release() { int ret = OB_SUCCESS; if (tx_desc_) { tx_node_->release_tx(*tx_desc_); tx_desc_ = NULL; } else { ret = OB_ERR_UNEXPECTED; } return ret; } ObString ObTxNode::get_identifer_str() { struct ID { ObAddr addr; int64_t ls_id; DECLARE_TO_STRING { int64_t pos = 0; int32_t pos0 = 0; addr.addr_to_buffer(buf, buf_len, pos0); pos += pos0; BUF_PRINTF("_%ld", ls_id); return pos; } } identifer = { .addr = addr_, .ls_id = ls_id_.id() }; int64_t pos = identifer.to_string(buf_, sizeof(buf_)); return ObString(pos, buf_); } ObTxNode::ObTxNode(const int64_t ls_id, const ObAddr &addr, MsgBus &msg_bus) : name_(name_buf_), addr_(addr), ls_id_(ls_id), tenant_id_(1001), tenant_(tenant_id_, 0, 10, *GCTX.cgroup_ctrl_), fake_part_trans_ctx_pool_(1001, false, false, 4), memtable_(NULL), msg_consumer_(get_identifer_str(), &msg_queue_, std::bind(&ObTxNode::handle_msg_, this, std::placeholders::_1)), t3m_(tenant_id_), fake_rpc_(&msg_bus, addr, &get_location_adapter_()), lock_memtable_(), fake_tx_log_adapter_(nullptr) { fake_part_trans_ctx_pool_.init(); addr.to_string(name_buf_, sizeof(name_buf_)); msg_consumer_.set_name(name_); role_ = Leader; tenant_.enable_tenant_ctx_check_ = false; tenant_.set(&fake_tenant_freezer_); tenant_.set(&fake_part_trans_ctx_pool_); fake_shared_mem_alloc_mgr_.init(); tenant_.set(&fake_shared_mem_alloc_mgr_); tenant_.start(); ObTenantEnv::set_tenant(&tenant_); ObTableHandleV2 lock_memtable_handle; lock_memtable_handle.set_table(&lock_memtable_, &t3m_, ObITable::LOCK_MEMTABLE); lock_memtable_.key_.table_type_ = ObITable::LOCK_MEMTABLE; fake_ls_.ls_tablet_svr_.lock_memtable_mgr_.t3m_ = &t3m_; fake_ls_.ls_tablet_svr_.lock_memtable_mgr_.add_memtable_(lock_memtable_handle); fake_lock_table_.is_inited_ = true; fake_lock_table_.parent_ = &fake_ls_; fake_lock_table_.lock_mt_mgr_ = &(fake_ls_.ls_tablet_svr_.lock_memtable_mgr_); fake_tenant_freezer_.is_inited_ = true; fake_tenant_freezer_.tenant_info_.is_loaded_ = true; fake_tenant_freezer_.tenant_info_.mem_memstore_limit_ = INT64_MAX; // memtable.freezer fake_freezer_.freeze_flag_ = 0;// is_freeze() = false; // txn service int ret = OB_SUCCESS; OZ(txs_.init(addr, &fake_rpc_, &fake_dup_table_rpc_, &get_location_adapter_(), &get_gti_source_(), &get_ts_mgr_(), &rpc_proxy_, &schema_service_, &server_tracer_)); tenant_.set(&txs_); OZ(fake_opt_stat_mgr_.init(tenant_id_)); tenant_.set(&fake_opt_stat_mgr_); OZ(fake_lock_wait_mgr_.init()); tenant_.set(&fake_lock_wait_mgr_); ls_service_.is_inited_ = true; OZ(ls_service_.ls_map_.init(tenant_id_, lib::ObMallocAllocator::get_instance())); tenant_.set(&ls_service_); OZ (create_memtable_(100000, memtable_)); { ObColDesc col_desc; col_desc.col_id_ = 1; col_desc.col_type_.set_type(ObObjType::ObIntType); col_desc.col_order_ = common::ObOrderType::ASC; columns_.push_back(col_desc); col_desc.col_id_ = 2; columns_.push_back(col_desc); } OZ(msg_bus.regist(addr, *this)); OZ(drop_msg_type_set_.create(16)); FAST_FAIL(); } ObTxDescGuard ObTxNode::get_tx_guard() { int ret = OB_SUCCESS; ObTxDesc *tx = NULL; OZ(txs_.acquire_tx(tx)); ObTxDescGuard x = ObTxDescGuard(this, tx); return x; } int ObTxNode::start() { int ret = OB_SUCCESS; ObTenantEnv::set_tenant(&tenant_); if (ObTxNodeRole::Leader == role_) { fake_tx_log_adapter_ = new ObFakeTxLogAdapter(); OZ(fake_tx_log_adapter_->start()); } get_ts_mgr_().reset(); OZ(msg_consumer_.start()); OZ(txs_.start()); OZ(create_ls_(ls_id_)); ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; OZ(txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(ls_id_, ls_tx_ctx_mgr)); if (ObTxNodeRole::Leader == role_) { OZ(ls_tx_ctx_mgr->switch_to_leader()); wait_all_redolog_applied(); CK(ls_tx_ctx_mgr->is_master_()); } if (ls_tx_ctx_mgr) { fake_tx_table_.tx_ctx_table_.ls_tx_ctx_mgr_ = ls_tx_ctx_mgr; fake_tx_table_.is_inited_ = true; fake_tx_table_.ls_ = &fake_ls_; fake_tx_table_.online(); int tx_data_table_offset = offsetof(storage::ObTxTable, tx_data_table_); void* ls_tx_data_table_ptr = (void*)((int64_t)&(fake_ls_.tx_table_) + tx_data_table_offset); ls_tx_data_table_ptr = &fake_tx_table_.tx_data_table_; fake_ls_.tx_table_.is_inited_ = true; fake_ls_.tx_table_.online(); fake_ls_.ls_meta_.clog_checkpoint_scn_ = share::SCN::max_scn(); } else { abort(); } return ret; } struct MsgInfo { void *msg_ptr_ = NULL; int64_t recv_time_ = 0; ObAddr sender_; bool is_callback_msg_ = false; bool is_sync_msg_ = false; TxMsgCallbackMsg callback_msg_; int16_t msg_type_; int64_t size_; const char* buf_; TO_STRING_KV(K_(msg_ptr), K_(recv_time), K_(sender), K_(is_callback_msg), K_(is_sync_msg), K_(msg_type), K_(callback_msg), K_(size)); }; int get_msg_info(ObTxNode::MsgPack *pkt, MsgInfo& msg_info) { int ret = OB_SUCCESS; const char* buf = pkt->body_.ptr(); int64_t size = pkt->body_.length(); int64_t pos = 0; char cat = 0; int16_t pcode = 0; OZ (serialization::decode(buf, size, pos, cat)); if (cat == 1) { msg_info.is_callback_msg_ = true; OZ (msg_info.callback_msg_.deserialize(buf, size, pos)); msg_info.msg_type_ = msg_info.callback_msg_.type_; } else { OZ (serialization::decode(buf, size, pos, pcode)); msg_info.msg_type_ = pcode; msg_info.buf_ = buf + pos; msg_info.size_ = size - pos; } msg_info.is_sync_msg_ = pkt->is_sync_msg_; msg_info.msg_ptr_ = (void*)pkt->body_.ptr(); msg_info.recv_time_ = pkt->recv_time_; msg_info.sender_ = pkt->sender_; return ret; } void ObTxNode::dump_msg_queue_() { int ret = OB_SUCCESS; MsgPack *msg = NULL; int i = 0; while(OB_SUCC(msg_queue_.pop((ObLink*&)msg))) { ++i; MsgInfo msg_info; OZ (get_msg_info(msg, msg_info)); TRANS_LOG(INFO,"[dump_msg]", K(i), K(msg_info), K(ret), KPC(this)); } } void ObTxNode::wait_all_msg_consumed() { while (msg_queue_.size() > 0 || !msg_consumer_.is_idle()) { if (REACH_TIME_INTERVAL(200_ms)) { TRANS_LOG(INFO, "wait msg_queue to be empty", K(msg_queue_.size()), KPC(this)); } usleep(5_ms); } } void ObTxNode::wait_tx_log_synced() { while(fake_tx_log_adapter_->get_inflight_cnt() > 0) { if (REACH_TIME_INTERVAL(200_ms)) { TRANS_LOG(INFO, "wait tx log synced...", K(fake_tx_log_adapter_->get_inflight_cnt()), KPC(this)); } usleep(5_ms); } } ObTxNode::~ObTxNode() __attribute__((optnone)) { int ret = OB_SUCCESS; TRANS_LOG(INFO, "destroy TxNode", KPC(this)); ObTenantEnv::set_tenant(&tenant_); OZ(txs_.tx_ctx_mgr_.revert_ls_tx_ctx_mgr(fake_tx_table_.tx_ctx_table_.ls_tx_ctx_mgr_)); fake_tx_table_.tx_ctx_table_.ls_tx_ctx_mgr_ = nullptr; bool is_tx_clean = false; int retry_cnt = 0; do { usleep(2000); txs_.block_tx(ls_id_, is_tx_clean); } while(!is_tx_clean && ++retry_cnt < 1000); OX(txs_.stop()); OZ(txs_.wait_()); OZ(drop_ls_(ls_id_)); if (role_ == Leader && fake_tx_log_adapter_) { fake_tx_log_adapter_->stop(); fake_tx_log_adapter_->wait(); fake_tx_log_adapter_->destroy(); } msg_consumer_.stop(); msg_consumer_.wait(); dump_msg_queue_(); //fake_tx_table_.is_inited_ = false; if (memtable_) { delete memtable_; } if (role_ == Leader && fake_tx_log_adapter_) { delete fake_tx_log_adapter_; } fake_ls_.ls_meta_.ls_id_ = ObLSID(1001); FAST_FAIL(); ObTenantEnv::set_tenant(NULL); } int ObTxNode::create_memtable_(const int64_t tablet_id, memtable::ObMemtable *&mt) { int ret = OB_SUCCESS; memtable::ObMemtable *t = new memtable::ObMemtable(); ObITable::TableKey table_key; table_key.table_type_ = ObITable::DATA_MEMTABLE; table_key.tablet_id_ = tablet_id; table_key.scn_range_.start_scn_.convert_for_gts(100); table_key.scn_range_.end_scn_.set_max(); ObLSHandle ls_handle; ls_handle.set_ls(ls_service_.ls_map_, fake_ls_, ObLSGetMod::DATA_MEMTABLE_MOD); OZ (t->init(table_key, ls_handle, &fake_freezer_, &fake_memtable_mgr_, 0, 0)); if (OB_SUCC(ret)) { mt = t; } else { delete t; } return ret; } int ObTxNode::create_ls_(const ObLSID ls_id) { int ret = OB_SUCCESS; OZ(txs_.tx_ctx_mgr_.create_ls(tenant_id_, ls_id, &fake_tx_table_, &fake_lock_table_, *fake_ls_.get_tx_svr(), (ObITxLogParam*)0x01, fake_tx_log_adapter_)); if (Leader == role_) { OZ(get_location_adapter_().fill(ls_id, addr_)); } fake_ls_.ls_meta_.ls_id_ = ls_id; fake_ls_.get_tx_svr()->online(); fake_ls_.get_ref_mgr().inc(ObLSGetMod::TXSTORAGE_MOD); MTL(ObLSService*)->ls_map_.add_ls(fake_ls_); return ret; } int ObTxNode::drop_ls_(const ObLSID ls_id) { int ret = OB_SUCCESS; OZ(txs_.tx_ctx_mgr_.remove_ls(ls_id, true)); get_location_adapter_().remove(ls_id); OZ(MTL(ObLSService*)->ls_map_.del_ls(ls_id)); return ret; } int ObTxNode::recv_msg_callback_(TxMsgCallbackMsg &msg) { TRANS_LOG(INFO, "recv msg callback", K(msg), KPC(this)); ObTenantEnv::set_tenant(&tenant_); int ret = OB_SUCCESS; switch(msg.type_) { case TxMsgCallbackMsg::SAVEPOINT_ROLLBACK: // ignore, has changed to use async resp msg break; case TxMsgCallbackMsg::NORMAL: OZ(txs_.handle_trans_msg_callback(msg.sender_ls_id_, msg.receiver_ls_id_, msg.tx_id_, msg.orig_msg_type_, msg.tx_rpc_result_.status_, msg.receiver_addr_, msg.request_id_, msg.tx_rpc_result_.private_data_)); break; default: ret = OB_ERR_UNEXPECTED; } TRANS_LOG(INFO, "handle msg callback done", K(ret), K(msg), KPC(this)); return ret; } int ObTxNode::recv_msg(const ObAddr &sender, ObString &m) { TRANS_LOG(INFO, "recv_msg", K(sender), "msg_ptr", OB_P(m.ptr()), KPC(this)); int ret = OB_SUCCESS; auto pkt = new MsgPack(sender, m); OZ(msg_queue_.push(pkt)); msg_consumer_.wakeup(); return ret; } int ObTxNode::sync_recv_msg(const ObAddr &sender, ObString &m, ObString &resp) { TRANS_LOG(INFO, "sync_recv_msg", K(sender), "msg_ptr", OB_P(m.ptr()), KPC(this)); int ret = OB_SUCCESS; auto pkt = new MsgPack(sender, m, true); OZ(msg_queue_.push(pkt)); msg_consumer_.wakeup(); pkt->cond_.wait(pkt->cond_.get_key(), 500000); if (pkt->resp_ready_) { OX(resp = pkt->resp_); } else { ret = OB_TIMEOUT; TRANS_LOG(ERROR, "wait resp timeout",K(ret)); } ob_free((void*)const_cast(pkt->body_.ptr())); delete pkt; return ret; } int ObTxNode::handle_msg_(MsgPack *pkt) { ObTenantEnv::set_tenant(&tenant_); TRANS_LOG(INFO, "begin to handle_msg", "msg_ptr", OB_P(pkt->body_.ptr()), KPC(this)); int ret = OB_SUCCESS; MsgInfo msg_info; OZ (get_msg_info(pkt, msg_info)); auto sender = msg_info.sender_; const char* buf = msg_info.buf_; int64_t size = msg_info.size_; int64_t pos = 0; if (OB_SUCC(ret) && msg_info.is_callback_msg_) { return recv_msg_callback_(msg_info.callback_msg_); } int16_t msg_type = msg_info.msg_type_; if (OB_HASH_EXIST == drop_msg_type_set_.exist_refactored(msg_type)) { TRANS_LOG(WARN, "drop msg", K(msg_type), KPC(this)); return OB_SUCCESS; } switch (msg_type) { #define TX_MSG_HANDLER__(t, clz, func) \ case t: \ { \ clz msg; \ ObTransRpcResult rslt; \ OZ(msg.deserialize(buf, size, pos)); \ TRANS_LOG(TRACE, "handle_msg::", K(msg), KPC(this)); \ auto status = txs_.func(msg, rslt); \ rslt.status_ = status; \ OZ(fake_rpc_.send_msg_callback(sender, msg, rslt)); \ break; \ } TX_MSG_HANDLER__(TX_COMMIT, ObTxCommitMsg, handle_trans_commit_request); TX_MSG_HANDLER__(TX_COMMIT_RESP, ObTxCommitRespMsg, handle_trans_commit_response); TX_MSG_HANDLER__(TX_ABORT, ObTxAbortMsg, handle_trans_abort_request); TX_MSG_HANDLER__(KEEPALIVE, ObTxKeepaliveMsg, handle_trans_keepalive); TX_MSG_HANDLER__(KEEPALIVE_RESP, ObTxKeepaliveRespMsg, handle_trans_keepalive_response); TX_MSG_HANDLER__(ROLLBACK_SAVEPOINT_RESP, ObTxRollbackSPRespMsg, handle_sp_rollback_response); #undef TX_MSG_HANDLER__ case TX_FREE_ROUTE_CHECK_ALIVE: { ObTxFreeRouteCheckAliveMsg msg; OZ(msg.deserialize(buf, size, pos)); TRANS_LOG(TRACE, "handle_msg", K(msg), KPC(this)); OZ(txs_.tx_free_route_handle_check_alive(msg, OB_TRANS_CTX_NOT_EXIST)); break; } case TX_FREE_ROUTE_CHECK_ALIVE_RESP: { ObTxFreeRouteCheckAliveRespMsg msg; OZ(msg.deserialize(buf, size, pos)); TRANS_LOG(TRACE, "handle_msg", K(msg), KPC(this)); // can not handle by tx node, call extra handler if (extra_msg_handler_) { OZ(extra_msg_handler_(msg_type, &msg)); } break; } case TX_FREE_ROUTE_PUSH_STATE: { ObTxFreeRoutePushState msg; OZ(msg.deserialize(buf, size, pos)); TRANS_LOG(TRACE, "handle_msg", K(msg), KPC(this)); OZ(txs_.tx_free_route_handle_push_state(msg)); ObTxFreeRoutePushStateResp resp; resp.ret_ = ret; int64_t buf_len = resp.get_serialize_size(); char *buf = (char*)ob_malloc(buf_len, ObNewModIds::TEST); int64_t pos = 0; OZ(resp.serialize(buf, buf_len, pos)); pkt->resp_ = ObString(buf_len, buf); break; } case ROLLBACK_SAVEPOINT: { ObTxRollbackSPMsg msg; obrpc::ObTxRpcRollbackSPResult rslt; OZ(msg.deserialize(buf, size, pos)); TRANS_LOG(TRACE, "handle_msg", K(msg), KPC(this)); OZ(txs_.handle_sp_rollback_request(msg, rslt), msg); break; } case TX_2PC_PREPARE_REQ: case TX_2PC_PREPARE_RESP: case TX_2PC_PRE_COMMIT_REQ: case TX_2PC_PRE_COMMIT_RESP: case TX_2PC_COMMIT_REQ: case TX_2PC_COMMIT_RESP: case TX_2PC_ABORT_REQ: case TX_2PC_ABORT_RESP: case TX_2PC_CLEAR_REQ: case TX_2PC_CLEAR_RESP: OZ(txs_.handle_tx_batch_req(msg_type, buf + pos, size - pos, false)); break; default: ret = OB_NOT_SUPPORTED; } if (msg_info.is_sync_msg_) { pkt->resp_ready_ = true; pkt->cond_.signal(); } else { ob_free((void*)const_cast(pkt->body_.ptr())); delete pkt; } TRANS_LOG(INFO, "handle_msg done", K(ret), KPC(this)); return ret; } int ObTxNode::read(ObTxDesc &tx, const int64_t key, int64_t &value, const ObTxIsolationLevel iso) { int ret = OB_SUCCESS; ObTxReadSnapshot snapshot; OZ(get_read_snapshot(tx, iso, ts_after_ms(50), snapshot)); OZ(read(snapshot, key, value)); return ret; } int ObTxNode::read(const ObTxReadSnapshot &snapshot, const int64_t key, int64_t &value) { TRANS_LOG(INFO, "read", K(key), K(snapshot), KPC(this)); int ret = OB_SUCCESS; ObTenantEnv::set_tenant(&tenant_); ObStoreCtx read_store_ctx; read_store_ctx.ls_ = &fake_ls_; read_store_ctx.ls_id_ = ls_id_; OZ(txs_.get_read_store_ctx(snapshot, false, 5000ll * 1000, read_store_ctx)); // HACK, refine: mock LS's each member in some way read_store_ctx.mvcc_acc_ctx_.tx_table_guards_.tx_table_guard_.init(&fake_tx_table_); read_store_ctx.mvcc_acc_ctx_.abs_lock_timeout_ts_ = ObTimeUtility::current_time() + 5000ll * 1000; blocksstable::ObDatumRow row; { ObTableIterParam iter_param; ObArenaAllocator allocator; ObTableReadInfo read_info; const int64_t schema_version = 100; read_info.init(allocator, schema_version, 1, false, columns_, nullptr/*storage_cols_index*/); iter_param.table_id_ = 1; iter_param.tablet_id_ = 100; iter_param.read_info_ = &read_info; iter_param.out_cols_project_ = NULL; iter_param.agg_cols_project_ = NULL; iter_param.is_multi_version_minor_merge_ = false; iter_param.need_scn_ = true; iter_param.pushdown_filter_ = NULL; iter_param.vectorized_enabled_ = false; storage::ObTableAccessContext access_context; { access_context.is_inited_ = true; access_context.use_fuse_row_cache_ = false; access_context.need_scn_ = true; access_context.store_ctx_ = &read_store_ctx; access_context.query_flag_.read_latest_ = true; access_context.stmt_allocator_ = &allocator; access_context.allocator_ = &allocator; } ObDatumRowkey row_key; ObStorageDatum row_key_obj; ObObj key_obj; row_key_obj.set_int(key); key_obj.set_int(key); row_key.assign(&row_key_obj, 1); row_key.store_rowkey_.assign(&key_obj, 1); OZ(memtable_->get(iter_param, access_context, row_key, row)); STORAGE_LOG(INFO, "read_result", K(row), KPC(this)); } OZ(txs_.revert_store_ctx(read_store_ctx)); if (OB_SUCC(ret)) { if (row.row_flag_.is_exist()) { ObArenaAllocator allocator; blocksstable::ObNewRowBuilder new_row_builder; storage::ObStoreRow store_row; OZ(new_row_builder.init(columns_, allocator)); OZ(new_row_builder.build_store_row(row, store_row)); OX(value = store_row.row_val_.cells_[1].get_int()); } else if (row.row_flag_.is_not_exist()) { ret = OB_ENTRY_NOT_EXIST; } else { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpected row result", K(ret), K(row.row_flag_), K(row), KPC(this)); } } return ret; } int ObTxNode::atomic_write(ObTxDesc &tx, const int64_t key, const int64_t value, const int64_t expire_ts, const ObTxParam &tx_param) { int ret = OB_SUCCESS; ObTxSEQ sp; OZ(create_implicit_savepoint(tx, tx_param, sp, true)); OZ(write(tx, key, value)); if (sp.is_valid() && OB_FAIL(ret)) { OZ(rollback_to_implicit_savepoint(tx, sp, expire_ts, nullptr)); } return ret; } int ObTxNode::write(ObTxDesc &tx, const int64_t key, const int64_t value, const int16_t branch) { int ret = OB_SUCCESS; ObTxReadSnapshot snapshot; OZ(get_read_snapshot(tx, tx.isolation_, ts_after_ms(50), snapshot)); OZ(write(tx, snapshot, key, value, branch)); return ret; } int ObTxNode::write(ObTxDesc &tx, const ObTxReadSnapshot &snapshot, const int64_t key, const int64_t value, const int16_t branch) { TRANS_LOG(INFO, "write", K(key), K(value), K(snapshot), K(tx), KPC(this)); int ret = OB_SUCCESS; const transaction::ObSerializeEncryptMeta *encrypt_meta = NULL; ObTenantEnv::set_tenant(&tenant_); ObStoreCtx write_store_ctx; auto iter = new ObTableStoreIterator(); iter->reset(); ObITable *mtb = memtable_; iter->add_table(mtb); write_store_ctx.ls_ = &fake_ls_; write_store_ctx.ls_id_ = ls_id_; write_store_ctx.table_iter_ = iter; write_store_ctx.branch_ = branch; concurrent_control::ObWriteFlag write_flag; OZ(txs_.get_write_store_ctx(tx, snapshot, write_flag, write_store_ctx)); write_store_ctx.mvcc_acc_ctx_.tx_table_guards_.tx_table_guard_.init(&fake_tx_table_); ObArenaAllocator allocator; ObDatumRow row; ObStorageDatum cols[2] = {ObStorageDatum(), ObStorageDatum()}; cols[0].set_int(key); cols[1].set_int(value); row.count_ = 2; row.storage_datums_ = cols; row.row_flag_ = blocksstable::ObDmlFlag::DF_UPDATE; row.trans_id_.reset(); ObTableIterParam param; ObTableAccessContext context; ObVersionRange trans_version_range; const bool read_latest = true; ObQueryFlag query_flag; ObTableReadInfo read_info; const int64_t schema_version = 100; read_info.init(allocator, 2, 1, false, columns_, nullptr/*storage_cols_index*/); trans_version_range.base_version_ = 0; trans_version_range.multi_version_start_ = 0; trans_version_range.snapshot_version_ = EXIST_READ_SNAPSHOT_VERSION; query_flag.use_row_cache_ = ObQueryFlag::DoNotUseCache; query_flag.read_latest_ = read_latest & ObQueryFlag::OBSF_MASK_READ_LATEST; param.table_id_ = 1; param.tablet_id_ = 1; param.read_info_ = &read_info; context.init(query_flag, write_store_ctx, allocator, trans_version_range); const ObMemtableSetArg arg(&row, &columns_, NULL, /*update_idx*/ NULL, /*old_row*/ 1, /*row_count*/ false /*check_exist*/, encrypt_meta); OZ(memtable_->set(param, context, arg)); int tmp_ret = OB_SUCCESS; if (OB_TMP_FAIL(txs_.revert_store_ctx(write_store_ctx))) { TRANS_LOG(WARN, "revert store ctx failed", KR(tmp_ret), K(write_store_ctx)); } delete iter; return ret; } int ObTxNode::write_begin(ObTxDesc &tx, const ObTxReadSnapshot &snapshot, ObStoreCtx& write_store_ctx) { int ret = OB_SUCCESS; ObTenantEnv::set_tenant(&tenant_); auto iter = new ObTableStoreIterator(); iter->reset(); ObITable *mtb = memtable_; iter->add_table(mtb); write_store_ctx.ls_id_ = ls_id_; write_store_ctx.ls_ = &fake_ls_; write_store_ctx.table_iter_ = iter; concurrent_control::ObWriteFlag write_flag; OZ(txs_.get_write_store_ctx(tx, snapshot, write_flag, write_store_ctx)); return ret; } int ObTxNode::write_one_row(ObStoreCtx& write_store_ctx, const int64_t key, const int64_t value) { int ret = OB_SUCCESS; ObTenantEnv::set_tenant(&tenant_); ObArenaAllocator allocator; ObTableReadInfo read_info; const transaction::ObSerializeEncryptMeta *encrypt_meta = NULL; const int64_t schema_version = 100; read_info.init(allocator, 2, 1, false, columns_, nullptr/*storage_cols_index*/); ObDatumRow row; ObStorageDatum cols[2] = {ObStorageDatum(), ObStorageDatum()}; cols[0].set_int(key); cols[1].set_int(value); row.row_flag_ = blocksstable::ObDmlFlag::DF_UPDATE; row.storage_datums_ = cols; row.count_ = 2; ObTableIterParam param; ObTableAccessContext context; ObVersionRange trans_version_range; const bool read_latest = true; ObQueryFlag query_flag; trans_version_range.base_version_ = 0; trans_version_range.multi_version_start_ = 0; trans_version_range.snapshot_version_ = EXIST_READ_SNAPSHOT_VERSION; query_flag.use_row_cache_ = ObQueryFlag::DoNotUseCache; query_flag.read_latest_ = read_latest & ObQueryFlag::OBSF_MASK_READ_LATEST; param.table_id_ = 1; param.tablet_id_ = 1; param.read_info_ = &read_info; OZ(context.init(query_flag, write_store_ctx, allocator, trans_version_range)); const ObMemtableSetArg arg(&row, &columns_, NULL, /*update_idx*/ NULL, /*old_row*/ 1, /*row_count*/ false /*check_exist*/, encrypt_meta); OZ(memtable_->set(param, context, arg)); return ret; } int ObTxNode::write_end(ObStoreCtx& write_store_ctx) { int ret = OB_SUCCESS; ObTenantEnv::set_tenant(&tenant_); delete write_store_ctx.table_iter_; write_store_ctx.table_iter_ = nullptr; OZ(txs_.revert_store_ctx(write_store_ctx)); return ret; } int ObTxNode::replay(const void *buffer, const int64_t nbytes, const palf::LSN &lsn, const int64_t ts_ns) { ObTenantEnv::set_tenant(&tenant_); int ret = OB_SUCCESS; logservice::ObLogBaseHeader base_header; int64_t tmp_pos = 0; const char *log_buf = static_cast(buffer); if (OB_FAIL(base_header.deserialize(log_buf, nbytes, tmp_pos))) { LOG_WARN("log base header deserialize error", K(ret)); } else { share::SCN log_scn; log_scn.convert_for_tx(ts_ns); ObFakeTxReplayExecutor executor(&fake_ls_, ls_id_, tenant_id_, fake_ls_.get_tx_svr(), lsn, log_scn, base_header); executor.set_memtable(memtable_); if (OB_FAIL(executor.execute(log_buf, nbytes, tmp_pos))) { LOG_WARN("replay tx log error", K(ret), K(lsn), K(ts_ns)); } else { LOG_INFO("replay tx log succ", K(ret), K(lsn), K(ts_ns)); } } return ret; } } // transaction } // oceanbase