[4.1] support distributed txn free route

This commit is contained in:
chinaxing
2023-01-28 16:07:17 +08:00
committed by ob-robot
parent 1cdf1dff1c
commit 30d0adec61
89 changed files with 3978 additions and 518 deletions

View File

@ -8,6 +8,7 @@ endfunction()
tx_it_test(test_tx)
tx_it_test(test_tx_free_route)
#tx_it_test(test_tx_perf)

View File

@ -42,45 +42,6 @@ int check_sequence_set_violation(const concurrent_control::ObWriteFlag ,
}
}
namespace common
{
void* ObGMemstoreAllocator::alloc(AllocHandle& handle, int64_t size)
{
int ret = OB_SUCCESS;
int64_t align_size = upper_align(size, sizeof(int64_t));
uint64_t tenant_id = arena_.get_tenant_id();
bool is_out_of_mem = false;
if (!handle.is_id_valid()) {
COMMON_LOG(TRACE, "MTALLOC.first_alloc", KP(&handle.mt_));
LockGuard guard(lock_);
if (!handle.is_id_valid()) {
handle.set_clock(arena_.retired());
hlist_.set_active(handle);
}
}
MTL_SWITCH(tenant_id) {
storage::ObTenantFreezer *freezer = nullptr;
if (handle.mt_.is_inner_tablet()) {
// inner table memory not limited by memstore
} else if (is_virtual_tenant_id(tenant_id)) {
// virtual tenant should not have memstore.
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(ERROR, "virtual tenant should not have memstore", K(ret), K(tenant_id));
} else if (FALSE_IT(freezer = MTL(storage::ObTenantFreezer*))) {
} else if (OB_FAIL(freezer->check_tenant_out_of_memstore_limit(is_out_of_mem))) {
COMMON_LOG(ERROR, "fail to check tenant out of mem limit", K(ret), K(tenant_id));
}
}
if (OB_FAIL(ret)) {
is_out_of_mem = true;
}
if (is_out_of_mem && REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
STORAGE_LOG(WARN, "this tenant is already out of memstore limit or some thing wrong.", K(tenant_id));
}
return is_out_of_mem ? nullptr : arena_.alloc(handle.id_, handle.arena_handle_, align_size);
}
}
class ObTestTx : public ::testing::Test
{
public:

File diff suppressed because it is too large Load Diff

View File

@ -22,6 +22,25 @@
} while(0);
namespace oceanbase {
namespace common
{
void* ObGMemstoreAllocator::alloc(AllocHandle& handle, int64_t size)
{
int ret = OB_SUCCESS;
int64_t align_size = upper_align(size, sizeof(int64_t));
uint64_t tenant_id = arena_.get_tenant_id();
bool is_out_of_mem = false;
if (!handle.is_id_valid()) {
COMMON_LOG(TRACE, "MTALLOC.first_alloc", KP(&handle.mt_));
LockGuard guard(lock_);
if (!handle.is_id_valid()) {
handle.set_clock(arena_.retired());
hlist_.set_active(handle);
}
}
return arena_.alloc(handle.id_, handle.arena_handle_, align_size);
}
}
namespace storage {
@ -58,13 +77,14 @@ ObTxNode::ObTxNode(const int64_t ls_id,
name_(name_buf_),
addr_(addr),
ls_id_(ls_id),
tenant_(1001),
tenant_id_(1001),
tenant_(tenant_id_),
memtable_(NULL),
msg_consumer_(ObString("TxNode"),
&msg_queue_,
std::bind(&ObTxNode::handle_msg_,
this, std::placeholders::_1)),
t3m_(1001),
t3m_(tenant_id_),
fake_rpc_(&msg_bus, addr, &get_location_adapter_()),
lock_memtable_(),
fake_tx_log_adapter_(nullptr)
@ -98,7 +118,8 @@ ObTxNode::ObTxNode(const int64_t ls_id,
&rpc_proxy_,
&schema_service_,
&server_tracer_));
memtable_ = create_memtable_(1001);
tenant_.set(&txs_);
OZ (create_memtable_(100000, memtable_));
{
ObColDesc col_desc;
col_desc.col_id_ = 1;
@ -157,6 +178,7 @@ struct MsgInfo {
int64_t recv_time_ = 0;
ObAddr sender_;
bool is_callback_msg_ = false;
bool is_sync_msg_ = false;
TxMsgCallbackMsg callback_msg_;
int16_t msg_type_;
int64_t size_;
@ -165,6 +187,7 @@ struct MsgInfo {
K_(recv_time),
K_(sender),
K_(is_callback_msg),
K_(is_sync_msg),
K_(msg_type),
K_(callback_msg),
K_(size));
@ -188,6 +211,7 @@ int get_msg_info(ObTxNode::MsgPack *pkt, MsgInfo& msg_info)
msg_info.buf_ = buf + pos;
msg_info.size_ = size - pos;
}
msg_info.is_sync_msg_ = pkt->is_sync_msg_;
msg_info.msg_ptr_ = (void*)pkt->body_.ptr();
msg_info.recv_time_ = pkt->recv_time_;
msg_info.sender_ = pkt->sender_;
@ -234,7 +258,8 @@ ObTxNode::~ObTxNode() __attribute__((optnone)) {
FAST_FAIL();
}
memtable::ObMemtable *ObTxNode::create_memtable_(const int64_t tablet_id) {
int ObTxNode::create_memtable_(const int64_t tablet_id, memtable::ObMemtable *&mt) {
int ret = OB_SUCCESS;
memtable::ObMemtable *t = new memtable::ObMemtable();
ObITable::TableKey table_key;
table_key.table_type_ = ObITable::DATA_MEMTABLE;
@ -243,13 +268,16 @@ memtable::ObMemtable *ObTxNode::create_memtable_(const int64_t tablet_id) {
table_key.scn_range_.end_scn_.set_max();
ObLSHandle ls_handle;
ls_handle.set_ls(fake_ls_map_, fake_ls_, ObLSGetMod::DATA_MEMTABLE_MOD);
t->init(table_key, ls_handle, &fake_freezer_, &fake_memtable_mgr_, 0, 0);
return t;
OZ (t->init(table_key, ls_handle, &fake_freezer_, &fake_memtable_mgr_, 0, 0));
if (OB_SUCC(ret)) {
mt = t;
} else { delete t; }
return ret;
}
int ObTxNode::create_ls_(const ObLSID ls_id) {
int ret = OB_SUCCESS;
OZ(txs_.tx_ctx_mgr_.create_ls(txs_.tenant_id_,
OZ(txs_.tx_ctx_mgr_.create_ls(tenant_id_,
ls_id,
&fake_tx_table_,
&fake_lock_table_,
@ -311,6 +339,25 @@ int ObTxNode::recv_msg(const ObAddr &sender, ObString &m)
return ret;
}
int ObTxNode::sync_recv_msg(const ObAddr &sender, ObString &m, ObString &resp)
{
TRANS_LOG(INFO, "sync_recv_msg", K(sender), "msg_ptr", OB_P(m.ptr()), KPC(this));
int ret = OB_SUCCESS;
auto pkt = new MsgPack(sender, m, true);
msg_queue_.push(pkt);
msg_consumer_.wakeup();
pkt->cond_.wait(pkt->cond_.get_key(), 500000);
if (pkt->resp_ready_) {
OX(resp = pkt->resp_);
} else {
ret = OB_TIMEOUT;
TRANS_LOG(ERROR, "wait resp timeout",K(ret));
}
ob_free((void*)const_cast<char*>(pkt->body_.ptr()));
delete pkt;
return ret;
}
int ObTxNode::handle_msg_(MsgPack *pkt)
{
ObTenantEnv::set_tenant(&tenant_);
@ -328,7 +375,7 @@ int ObTxNode::handle_msg_(MsgPack *pkt)
int16_t msg_type = msg_info.msg_type_;
if (OB_HASH_EXIST == drop_msg_type_set_.exist_refactored(msg_type)) {
TRANS_LOG(WARN, "drop msg", K(msg_type), KPC(this));
return 0;
return OB_SUCCESS;
}
switch (msg_type) {
#define TX_MSG_HANDLER__(t, clz, func) \
@ -337,7 +384,7 @@ int ObTxNode::handle_msg_(MsgPack *pkt)
clz msg; \
ObTransRpcResult rslt; \
OZ(msg.deserialize(buf, size, pos)); \
TRANS_LOG(TRACE, "handle_msg::", K(msg), KPC(this)); \
TRANS_LOG(TRACE, "handle_msg::", K(msg), KPC(this)); \
auto status = txs_.func(msg, rslt); \
rslt.status_ = status; \
OZ(fake_rpc_.send_msg_callback(sender, msg, rslt)); \
@ -349,6 +396,40 @@ int ObTxNode::handle_msg_(MsgPack *pkt)
TX_MSG_HANDLER__(KEEPALIVE, ObTxKeepaliveMsg, handle_trans_keepalive);
TX_MSG_HANDLER__(KEEPALIVE_RESP, ObTxKeepaliveRespMsg, handle_trans_keepalive_response);
#undef TX_MSG_HANDLER__
case TX_FREE_ROUTE_CHECK_ALIVE:
{
ObTxFreeRouteCheckAliveMsg msg;
OZ(msg.deserialize(buf, size, pos));
TRANS_LOG(TRACE, "handle_msg", K(msg), KPC(this));
OZ(txs_.tx_free_route_handle_check_alive(msg));
break;
}
case TX_FREE_ROUTE_CHECK_ALIVE_RESP:
{
ObTxFreeRouteCheckAliveRespMsg msg;
OZ(msg.deserialize(buf, size, pos));
TRANS_LOG(TRACE, "handle_msg", K(msg), KPC(this));
// can not handle by tx node, call extra handler
if (extra_msg_handler_) {
OZ(extra_msg_handler_(msg_type, &msg));
}
break;
}
case TX_FREE_ROUTE_PUSH_STATE:
{
ObTxFreeRoutePushState msg;
OZ(msg.deserialize(buf, size, pos));
TRANS_LOG(TRACE, "handle_msg", K(msg), KPC(this));
OZ(txs_.tx_free_route_handle_push_state(msg));
ObTxFreeRoutePushStateResp resp;
resp.ret_ = ret;
int64_t buf_len = resp.get_serialize_size();
char *buf = (char*)ob_malloc(buf_len);
int64_t pos = 0;
OZ(resp.serialize(buf, buf_len, pos));
pkt->resp_ = ObString(buf_len, buf);
break;
}
case ROLLBACK_SAVEPOINT:
{
ObTxRollbackSPMsg msg;
@ -374,18 +455,23 @@ int ObTxNode::handle_msg_(MsgPack *pkt)
default:
ret = OB_NOT_SUPPORTED;
}
delete pkt;
ob_free((void*)const_cast<char*>(pkt->body_.ptr()));
if (msg_info.is_sync_msg_) {
pkt->resp_ready_ = true;
pkt->cond_.signal();
} else {
ob_free((void*)const_cast<char*>(pkt->body_.ptr()));
delete pkt;
}
TRANS_LOG(INFO, "handle_msg done", K(ret), KPC(this));
return ret;
}
int ObTxNode::read(ObTxDesc &tx, const int64_t key, int64_t &value)
int ObTxNode::read(ObTxDesc &tx, const int64_t key, int64_t &value, const ObTxIsolationLevel iso)
{
int ret = OB_SUCCESS;
ObTxReadSnapshot snapshot;
OZ(get_read_snapshot(tx,
tx.isolation_,
iso,
ts_after_ms(5),
snapshot));
return read(snapshot, key, value);
@ -457,6 +543,19 @@ int ObTxNode::read(const ObTxReadSnapshot &snapshot,
}
return ret;
}
int ObTxNode::atomic_write(ObTxDesc &tx, const int64_t key, const int64_t value,
const int64_t expire_ts, const ObTxParam &tx_param)
{
int ret = OB_SUCCESS;
int64_t sp = -1;
OZ(create_implicit_savepoint(tx, tx_param, sp, true));
OZ(write(tx, key, value));
if (sp != -1 && OB_FAIL(ret)) {
OZ(rollback_to_implicit_savepoint(tx, sp, expire_ts, nullptr));
}
return ret;
}
int ObTxNode::write(ObTxDesc &tx, const int64_t key, const int64_t value)
{
int ret = OB_SUCCESS;
@ -574,13 +673,12 @@ int ObTxNode::replay(const void *buffer,
LOG_WARN("log base header deserialize error", K(ret));
} else if (OB_FAIL(ObFakeTxReplayExecutor::execute(&mock_ls_, mock_ls_.get_tx_svr(), log_buf, nbytes,
tmp_pos, lsn, ts_ns, base_header.get_replay_hint(),
ls_id_, 1001, memtable_))) {
ls_id_, tenant_id_, memtable_))) {
LOG_WARN("replay tx log error", K(ret), K(lsn), K(ts_ns));
} else {
LOG_INFO("replay tx log succ", K(ret), K(lsn), K(ts_ns));
}
return ret;
}
} // transaction
} // oceanbase

View File

@ -130,7 +130,7 @@ public:
TO_STRING_KV(KP(this), K(addr_), K_(ls_id));
ObTxDescGuard get_tx_guard();
// the simple r/w interface
int read(ObTxDesc &tx, const int64_t key, int64_t &value);
int read(ObTxDesc &tx, const int64_t key, int64_t &value, const ObTxIsolationLevel iso = ObTxIsolationLevel::RC);
int read(const ObTxReadSnapshot &snapshot,
const int64_t key,
int64_t &value);
@ -139,7 +139,8 @@ public:
const ObTxReadSnapshot &snapshot,
const int64_t key,
const int64_t value);
int atomic_write(ObTxDesc &tx, const int64_t key, const int64_t value,
const int64_t expire_ts, const ObTxParam &tx_param);
int replay(const void *buffer, const int64_t nbytes, const palf::LSN &lsn, const int64_t ts_ns);
int write_begin(ObTxDesc &tx, const ObTxReadSnapshot &snapshot, ObStoreCtx& write_store_ctx);
@ -165,8 +166,20 @@ public:
DELEGATE_TENANT_WITH_RET(txs_, create_implicit_savepoint, int);
DELEGATE_TENANT_WITH_RET(txs_, create_explicit_savepoint, int);
DELEGATE_TENANT_WITH_RET(txs_, rollback_to_explicit_savepoint, int);
DELEGATE_TENANT_WITH_RET(txs_, release_explicit_savepoint, int);
DELEGATE_TENANT_WITH_RET(txs_, rollback_to_implicit_savepoint, int);
DELEGATE_TENANT_WITH_RET(txs_, interrupt, int);
// tx free route
DELEGATE_TENANT_WITH_RET(txs_, calc_txn_free_route, int);
#define DELEGATE_X__(t) \
DELEGATE_TENANT_WITH_RET(txs_, txn_free_route__update_##t##_state, int); \
DELEGATE_TENANT_WITH_RET(txs_, txn_free_route__serialize_##t##_state, int); \
DELEGATE_TENANT_WITH_RET(txs_, txn_free_route__get_##t##_state_serialize_size, int64_t);
#define DELEGATE_X_(t) DELEGATE_X__(t)
LST_DO(DELEGATE_X_, (), static, dynamic, parts, extra);
DELEGATE_TENANT_WITH_RET(txs_, tx_free_route_check_alive, int);
#undef DELEGATE_X_
#undef DELEGATE_X__
#undef DELEGATE_TENANT_WITH_RET
int get_tx_ctx(const share::ObLSID &ls_id, const ObTransID &tx_id, ObPartTransCtx *&ctx) {
return txs_.tx_ctx_mgr_.get_tx_ctx(ls_id, tx_id, false, ctx);
@ -174,16 +187,22 @@ public:
int revert_tx_ctx(ObPartTransCtx *ctx) { return txs_.tx_ctx_mgr_.revert_tx_ctx(ctx); }
public:
struct MsgPack : ObLink {
MsgPack(const ObAddr &addr, ObString &body)
: recv_time_(ObTimeUtility::current_time()), sender_(addr), body_(body){}
MsgPack(const ObAddr &addr, ObString &body, bool is_sync_msg = false)
: recv_time_(ObTimeUtility::current_time()), sender_(addr),
body_(body), is_sync_msg_(is_sync_msg), resp_ready_(false), resp_(), cond_() {}
int64_t recv_time_;
ObAddr sender_;
ObString body_;
bool is_sync_msg_;
bool resp_ready_;
ObString resp_;
common::SimpleCond cond_; //used to synchronize process-thread and io-thread
};
int recv_msg(const ObAddr &sender, ObString &msg);
int sync_recv_msg(const ObAddr &sender, ObString &msg, ObString &resp);
int handle_msg_(MsgPack *pkt);
private:
memtable::ObMemtable *create_memtable_(const int64_t tablet_id);
int create_memtable_(const int64_t tablet_id, memtable::ObMemtable *& mt);
int create_ls_(const ObLSID ls_id);
int drop_ls_(const ObLSID ls_id);
int recv_msg_callback_(TxMsgCallbackMsg &msg);
@ -244,6 +263,7 @@ public:
ObString name_; char name_buf_[32];
ObAddr addr_;
ObLSID ls_id_;
int64_t tenant_id_;
ObTenantBase tenant_;
ObTransService txs_;
memtable::ObMemtable *memtable_;
@ -272,8 +292,10 @@ public:
storage::ObLS mock_ls_; // TODO mock required member on LS
common::hash::ObHashSet<int16_t> drop_msg_type_set_;
ObLSMap fake_ls_map_;
std::function<int(int,void *)> extra_msg_handler_;
};
} // transaction
} // oceanbase
#endif //OCEANBASE_TRANSACTION_TEST_TX_NODE_DEFINE_

View File

@ -20,6 +20,7 @@ namespace common {
class MsgEndPoint {
public:
virtual int recv_msg(const ObAddr &sender, ObString &msg) = 0;
virtual int sync_recv_msg(const ObAddr &sender, ObString &msg, ObString &resp) = 0;
};
class MsgBus {
struct ObAddrPair {
@ -61,13 +62,28 @@ public:
if (OB_SUCC(ret)) {
ObAddrPair pair(sender, recv);
if (OB_HASH_EXIST == fail_links_.exist_refactored(pair)) {
TRANS_LOG(WARN, "link failure, msg lost", K(sender), K(recv));
TRANS_LOG(WARN, "[msg bus] link failure, msg lost", K(sender), K(recv));
} else {
OZ(e->recv_msg(sender, msg));
}
}
return ret;
}
int sync_send_msg(ObString &msg, const ObAddr &sender, const ObAddr &recv, ObString &resp)
{
int ret = OB_SUCCESS;
MsgEndPoint *e = NULL;
OZ(route_table_.get_refactored(recv, e));
if (OB_SUCC(ret)) {
ObAddrPair pair(sender, recv);
if (OB_HASH_EXIST == fail_links_.exist_refactored(pair)) {
TRANS_LOG(WARN, "[msg bus] link failure, msg lost", K(sender), K(recv));
} else {
OZ(e->sync_recv_msg(sender, msg, resp));
}
}
return ret;
}
int inject_link_failure(const ObAddr &from, const ObAddr &to)
{
int ret = OB_SUCCESS;

View File

@ -17,6 +17,7 @@
#include "storage/tx/ob_trans_rpc.h"
#include "storage/tx/ob_location_adapter.h"
#include "storage/tx/ob_tx_log_adapter.h"
#include "storage/tx/ob_tx_free_route.h"
namespace oceanbase {
using namespace share;
namespace transaction {
@ -92,6 +93,7 @@ public:
obrpc::ObTxRpcRollbackSPResult sp_rollback_result_;
ObTransRpcResult tx_rpc_result_;
};
class ObFakeTransRpc : public ObITransRpc {
public:
ObFakeTransRpc(MsgBus *msg_bus,
@ -107,7 +109,7 @@ public:
int ret = OB_SUCCESS;
int64_t size = msg.get_serialize_size() + 1 /*for msg category*/ + sizeof(int16_t) /* for tx_msg.type_ */;
char *buf = (char*)ob_malloc(size);
buf[0] = 0; // 1 : ObTxMsg
buf[0] = 0; // 0 not callback msg
int64_t pos = 1;
int16_t msg_type = msg.type_;
OZ(serialization::encode(buf, size, pos, msg_type));
@ -125,6 +127,41 @@ public:
OZ(post_msg(leader, msg));
return ret;
}
int post_msg(const ObAddr &server, const ObTxFreeRouteMsg &msg) override
{
int ret = OB_SUCCESS;
int64_t size = msg.get_serialize_size() + 1 /*for msg category*/ + sizeof(int16_t) /* for tx_msg.type_ */;
char *buf = (char*)ob_malloc(size);
buf[0] = 0; // not callback msg
int64_t pos = 1;
int16_t msg_type = msg.type_;
OZ(serialization::encode(buf, size, pos, msg_type));
OZ(msg.serialize(buf, size, pos));
ObString msg_str(size, buf);
TRANS_LOG(INFO, "post_tx_free_route_msg", "msg_ptr", OB_P(buf), K(msg), "receiver", server);
OZ(msg_bus_->send_msg(msg_str, addr_, server));
return ret;
}
int sync_access(const ObAddr &server,
const ObTxFreeRoutePushState &msg,
ObTxFreeRoutePushStateResp &result) override
{
int ret = OB_SUCCESS;
int64_t size = msg.get_serialize_size() + 1 + sizeof(int16_t);
char *buf = (char*)ob_malloc(size);
buf[0] = 0; // not callback msg
int64_t pos = 1;
int16_t msg_type = TX_MSG_TYPE::TX_FREE_ROUTE_PUSH_STATE;
OZ(serialization::encode(buf, size, pos, msg_type));
OZ(msg.serialize(buf, size, pos));
ObString msg_str(size, buf);
TRANS_LOG(INFO, "sync send tx push_state", "msg_ptr", OB_P(buf), K(msg), "receiver", server);
ObString resp;
OZ(msg_bus_->sync_send_msg(msg_str, addr_, server, resp));
pos = 0;
OZ(result.deserialize(resp.ptr(), resp.length(), pos));
return ret;
}
template<class MSG_RESULT_T>
int send_msg_callback(const ObAddr &recv,
const ObTxMsg &msg,