misc refinement
This commit is contained in:
@ -12,6 +12,7 @@
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <thread>
|
||||
#include <sys/resource.h>
|
||||
#include <functional>
|
||||
#define private public
|
||||
#define protected public
|
||||
@ -205,7 +206,8 @@ class MockObServer {
|
||||
public:
|
||||
MockObServer(const int64_t ls_id, const char*addr, const int32_t port, MsgBus &msg_bus)
|
||||
: addr_(ObAddr(ObAddr::VER::IPV4, addr, port)),
|
||||
tx_node_(ls_id, addr_, msg_bus),
|
||||
tx_node_ptr_(new ObTxNode(ls_id, addr_, msg_bus)),
|
||||
tx_node_(*tx_node_ptr_),
|
||||
allocator_(), session_()
|
||||
{
|
||||
session_.test_init(1, 1111, 2222, &allocator_);
|
||||
@ -218,6 +220,7 @@ public:
|
||||
tx_node_.release_tx(*session_.get_tx_desc());
|
||||
session_.get_tx_desc() = NULL;
|
||||
}
|
||||
delete tx_node_ptr_;
|
||||
}
|
||||
int start() { return tx_node_.start(); }
|
||||
public:
|
||||
@ -261,7 +264,8 @@ private:
|
||||
int handle_msg_(int type, void *msg);
|
||||
int handle_msg_check_alive_resp__(ObTxFreeRouteCheckAliveRespMsg *msg);
|
||||
ObAddr addr_;
|
||||
ObTxNode tx_node_;
|
||||
ObTxNode *tx_node_ptr_;
|
||||
ObTxNode &tx_node_;
|
||||
ObMalloc allocator_;
|
||||
sql::ObSQLSessionInfo session_;
|
||||
TO_STRING_KV(K_(tx_node), K_(session));
|
||||
@ -657,6 +661,9 @@ public:
|
||||
const testing::TestInfo* const test_info =
|
||||
testing::UnitTest::GetInstance()->current_test_info();
|
||||
MTL_MEM_ALLOC_MGR.init();
|
||||
struct rlimit rl;
|
||||
getrlimit(RLIMIT_STACK, &rl);
|
||||
setrlimit(20*1024*1024, &rl);
|
||||
auto test_name = test_info->name();
|
||||
_TRANS_LOG(INFO, ">>>> starting test : %s", test_name);
|
||||
}
|
||||
|
||||
@ -78,7 +78,25 @@ int ObTxDescGuard::release() {
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
ObString ObTxNode::get_identifer_str() const
|
||||
{
|
||||
struct ID {
|
||||
ObAddr addr;
|
||||
int64_t ls_id;
|
||||
DECLARE_TO_STRING {
|
||||
int64_t pos = 0;
|
||||
int32_t pos0 = 0;
|
||||
addr.addr_to_buffer(buf, buf_len, pos0);
|
||||
pos += pos0;
|
||||
BUF_PRINTF("_%ld", ls_id);
|
||||
return pos;
|
||||
}
|
||||
} identifer = {
|
||||
.addr = addr_,
|
||||
.ls_id = ls_id_.id()
|
||||
};
|
||||
return ObString(to_cstring(&identifer));
|
||||
}
|
||||
ObTxNode::ObTxNode(const int64_t ls_id,
|
||||
const ObAddr &addr,
|
||||
MsgBus &msg_bus) :
|
||||
@ -86,10 +104,10 @@ ObTxNode::ObTxNode(const int64_t ls_id,
|
||||
addr_(addr),
|
||||
ls_id_(ls_id),
|
||||
tenant_id_(1001),
|
||||
tenant_(tenant_id_),
|
||||
tenant_(tenant_id_, 10, *GCTX.cgroup_ctrl_),
|
||||
fake_part_trans_ctx_pool_(1001, false, false, 4),
|
||||
memtable_(NULL),
|
||||
msg_consumer_(ObString("TxNode"),
|
||||
msg_consumer_(get_identifer_str(),
|
||||
&msg_queue_,
|
||||
std::bind(&ObTxNode::handle_msg_,
|
||||
this, std::placeholders::_1)),
|
||||
@ -102,8 +120,10 @@ ObTxNode::ObTxNode(const int64_t ls_id,
|
||||
addr.to_string(name_buf_, sizeof(name_buf_));
|
||||
msg_consumer_.set_name(name_);
|
||||
role_ = Leader;
|
||||
tenant_.enable_tenant_ctx_check_ = false;
|
||||
tenant_.set(&fake_tenant_freezer_);
|
||||
tenant_.set(&fake_part_trans_ctx_pool_);
|
||||
tenant_.start();
|
||||
ObTenantEnv::set_tenant(&tenant_);
|
||||
ObTableHandleV2 lock_memtable_handle;
|
||||
lock_memtable_handle.set_table(&lock_memtable_, &t3m_, ObITable::LOCK_MEMTABLE);
|
||||
@ -237,7 +257,7 @@ void ObTxNode::dump_msg_queue_()
|
||||
int ret = OB_SUCCESS;
|
||||
MsgPack *msg = NULL;
|
||||
int i = 0;
|
||||
while(OB_NOT_NULL((msg = (MsgPack*)msg_queue_.pop()))) {
|
||||
while(OB_SUCC(msg_queue_.pop((ObLink*&)msg))) {
|
||||
++i;
|
||||
MsgInfo msg_info;
|
||||
OZ (get_msg_info(msg, msg_info));
|
||||
@ -350,7 +370,7 @@ int ObTxNode::recv_msg(const ObAddr &sender, ObString &m)
|
||||
TRANS_LOG(INFO, "recv_msg", K(sender), "msg_ptr", OB_P(m.ptr()), KPC(this));
|
||||
int ret = OB_SUCCESS;
|
||||
auto pkt = new MsgPack(sender, m);
|
||||
msg_queue_.push(pkt);
|
||||
OZ(msg_queue_.push(pkt));
|
||||
msg_consumer_.wakeup();
|
||||
return ret;
|
||||
}
|
||||
@ -360,7 +380,7 @@ int ObTxNode::sync_recv_msg(const ObAddr &sender, ObString &m, ObString &resp)
|
||||
TRANS_LOG(INFO, "sync_recv_msg", K(sender), "msg_ptr", OB_P(m.ptr()), KPC(this));
|
||||
int ret = OB_SUCCESS;
|
||||
auto pkt = new MsgPack(sender, m, true);
|
||||
msg_queue_.push(pkt);
|
||||
OZ(msg_queue_.push(pkt));
|
||||
msg_consumer_.wakeup();
|
||||
pkt->cond_.wait(pkt->cond_.get_key(), 500000);
|
||||
if (pkt->resp_ready_) {
|
||||
|
||||
@ -19,6 +19,7 @@
|
||||
#include "storage/tx/ob_trans_service.h"
|
||||
#include "storage/tx/ob_trans_part_ctx.h"
|
||||
#include "share/rc/ob_tenant_base.h"
|
||||
#include "observer/omt/ob_tenant.h"
|
||||
#include "share/ob_alive_server_tracer.h"
|
||||
#include "storage/tablelock/ob_lock_memtable.h"
|
||||
#include "storage/ls/ob_ls_tx_service.h"
|
||||
@ -47,7 +48,7 @@ class QueueConsumer : public share::ObThreadPool
|
||||
{
|
||||
public:
|
||||
QueueConsumer(ObString name,
|
||||
ObSpScLinkQueue *q,
|
||||
ObLinkQueue *q,
|
||||
std::function<int(T*)> func):
|
||||
name_(name), queue_(q), func_(func), cond_() {}
|
||||
virtual int start() {
|
||||
@ -65,8 +66,14 @@ public:
|
||||
ObThreadPool::stop();
|
||||
}
|
||||
void run1() {
|
||||
{
|
||||
char name_buf[128];
|
||||
snprintf(name_buf, 128, "QConsumer:%s", name_.ptr());
|
||||
set_thread_name(name_buf);
|
||||
}
|
||||
while(!stop_) {
|
||||
ObLink *e = queue_->pop();
|
||||
ObLink *e = NULL;
|
||||
queue_->pop(e);
|
||||
if (e) {
|
||||
T *t = static_cast<T*>(e);
|
||||
func_(t);
|
||||
@ -80,11 +87,11 @@ public:
|
||||
}
|
||||
void wakeup() { if (ATOMIC_BCAS(&is_sleeping_, true, false)) { cond_.signal(); } }
|
||||
void set_name(ObString &name) { name_ = name; }
|
||||
TO_STRING_KV(KP(this), K_(name), KP_(queue), K(queue_->empty()), K_(stop));
|
||||
TO_STRING_KV(KP(this), K_(name), KP_(queue), K(queue_->size()), K_(stop));
|
||||
private:
|
||||
ObString name_;
|
||||
bool stop_;
|
||||
ObSpScLinkQueue *queue_;
|
||||
ObLinkQueue *queue_;
|
||||
std::function<int(T*)> func_;
|
||||
common::SimpleCond cond_;
|
||||
bool is_sleeping_ = false;
|
||||
@ -131,7 +138,8 @@ public:
|
||||
}
|
||||
|
||||
public:
|
||||
TO_STRING_KV(KP(this), K(addr_), K_(ls_id));
|
||||
TO_STRING_KV(KP(this), K(addr_), K_(ls_id), K(msg_queue_.size()));
|
||||
ObString get_identifer_str() const;
|
||||
ObTxDescGuard get_tx_guard();
|
||||
// the simple r/w interface
|
||||
int read(ObTxDesc &tx, const int64_t key, int64_t &value, const ObTxIsolationLevel iso = ObTxIsolationLevel::RC);
|
||||
@ -271,13 +279,13 @@ public:
|
||||
ObAddr addr_;
|
||||
ObLSID ls_id_;
|
||||
int64_t tenant_id_;
|
||||
ObTenantBase tenant_;
|
||||
omt::ObTenant tenant_;
|
||||
common::ObServerObjectPool<ObPartTransCtx> fake_part_trans_ctx_pool_;
|
||||
ObTransService txs_;
|
||||
memtable::ObMemtable *memtable_;
|
||||
ObSEArray<ObColDesc, 2> columns_;
|
||||
// msg_handler
|
||||
ObSpScLinkQueue msg_queue_;
|
||||
ObLinkQueue msg_queue_;
|
||||
QueueConsumer<MsgPack> msg_consumer_;
|
||||
// fake objects
|
||||
storage::ObTenantMetaMemMgr t3m_;
|
||||
|
||||
Reference in New Issue
Block a user