misc refinement
This commit is contained in:
parent
a7409a1184
commit
056e47167d
1
deps/oblib/src/lib/ob_name_def.h
vendored
1
deps/oblib/src/lib/ob_name_def.h
vendored
@ -625,6 +625,7 @@
|
||||
#define N_JOIN_INFO "join_info"
|
||||
#define N_ID_SET "id_set"
|
||||
#define N_CURRENT_SCN "current_scn"
|
||||
#define N_TRANSACTION_ID "ob_transaction_id"
|
||||
#define N_EQUAL_SET "equal_set"
|
||||
#define N_LEFT_ID "left_id"
|
||||
#define N_RIGHT_ID "right_id"
|
||||
|
2
deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp
vendored
2
deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp
vendored
@ -844,7 +844,7 @@ public:
|
||||
LOG_TRACE("revert_sock: sock still readable", K(*s));
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(handler_.on_readable(s->sess_))) {
|
||||
LOG_TRACE("push to omt queue fail", K(ret), K(*s));
|
||||
LOG_WARN("push to omt queue fail, will close socket", K(ret), K(*s));
|
||||
push_close_req(s);
|
||||
s->disable_may_handling_flag();
|
||||
}
|
||||
|
@ -131,17 +131,17 @@ int ObSqlSockHandler::on_readable(void* udata)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_ERROR("sess is null!", K(ret));
|
||||
} else if (OB_FAIL(sock_processor_.decode_sql_packet(sess->pool_, *sess, NULL, pkt))) {
|
||||
LOG_WARN("decode sql req fail", K(ret));
|
||||
LOG_WARN("decode sql req fail", K(ret), K(sess->sql_session_id_));
|
||||
} else if (NULL == pkt) {
|
||||
sess->revert_sock();
|
||||
} else if (OB_FAIL(sock_processor_.build_sql_req(*sess, pkt, sql_req))) {
|
||||
LOG_WARN("build sql req fail", K(ret));
|
||||
LOG_WARN("build sql req fail", K(ret), K(sess->sql_session_id_));
|
||||
}
|
||||
|
||||
if (OB_SUCCESS != ret || NULL == sql_req) {
|
||||
} else if (FALSE_IT(sess->set_last_decode_succ_and_deliver_time(ObTimeUtility::current_time()))) {
|
||||
} else if (OB_FAIL(deliver_->deliver(*sql_req))) {
|
||||
LOG_WARN("deliver sql request fail", K(ret));
|
||||
LOG_WARN("deliver sql request fail", K(ret), K(sess->sql_session_id_));
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -861,6 +861,7 @@ typedef enum ObItemType
|
||||
T_FUN_SYS_DOC_ID = 1810,
|
||||
T_FUN_SYS_WORD_COUNT = 1811,
|
||||
T_FUN_SYS_BM25 = 1812,
|
||||
T_FUN_SYS_TRANSACTION_ID = 1813,
|
||||
T_FUN_SYS_END = 2000,
|
||||
T_FUN_SYS_ALIGN_DATE4CMP = 2010,
|
||||
|
||||
|
@ -688,6 +688,7 @@ ob_set_subtarget(ob_sql engine_expr
|
||||
engine/expr/ob_expr_temp_table_ssid.cpp
|
||||
engine/expr/vector_cast/vector_cast.cpp
|
||||
engine/expr/ob_expr_extract_cert_expired_time.cpp
|
||||
engine/expr/ob_expr_transaction_id.cpp
|
||||
)
|
||||
|
||||
ob_set_subtarget(ob_sql engine_join
|
||||
|
@ -338,6 +338,7 @@
|
||||
#include "ob_expr_between.h"
|
||||
#include "ob_expr_align_date4cmp.h"
|
||||
#include "ob_expr_extract_cert_expired_time.h"
|
||||
#include "ob_expr_transaction_id.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -1109,7 +1110,8 @@ static ObExpr::EvalFunc g_expr_eval_functions[] = {
|
||||
NULL, // ObExprDocID::generate_doc_id, /* 667 */
|
||||
NULL, // ObExprWordSegment::generate_fulltext_column, /* 668 */
|
||||
NULL, // ObExprWordCount::generate_word_count, /* 669 */
|
||||
NULL, // ObExprBM25::eval_bm25_relevance_expr, /* 690 */
|
||||
NULL, // ObExprBM25::eval_bm25_relevance_expr, /* 670 */
|
||||
ObExprTransactionId::eval_transaction_id, /* 671 */
|
||||
};
|
||||
|
||||
static ObExpr::EvalBatchFunc g_expr_eval_batch_functions[] = {
|
||||
|
@ -404,6 +404,7 @@
|
||||
#include "sql/engine/expr/ob_expr_temp_table_ssid.h"
|
||||
#include "sql/engine/expr/ob_expr_align_date4cmp.h"
|
||||
#include "sql/engine/expr/ob_expr_extract_cert_expired_time.h"
|
||||
#include "sql/engine/expr/ob_expr_transaction_id.h"
|
||||
|
||||
using namespace oceanbase::common;
|
||||
namespace oceanbase
|
||||
@ -1001,6 +1002,7 @@ void ObExprOperatorFactory::register_expr_operators()
|
||||
REG_OP(ObExprPrefixPattern);
|
||||
REG_OP(ObExprAlignDate4Cmp);
|
||||
REG_OP(ObExprExtractExpiredTime);
|
||||
REG_OP(ObExprTransactionId);
|
||||
}();
|
||||
// 注册oracle系统函数
|
||||
REG_OP_ORCL(ObExprSysConnectByPath);
|
||||
@ -1312,6 +1314,7 @@ void ObExprOperatorFactory::register_expr_operators()
|
||||
REG_OP_ORCL(ObExprUpdateXml);
|
||||
REG_OP_ORCL(ObExprTempTableSSID);
|
||||
REG_OP_ORCL(ObExprJsonObjectStar);
|
||||
REG_OP_ORCL(ObExprTransactionId);
|
||||
}
|
||||
|
||||
bool ObExprOperatorFactory::is_expr_op_type_valid(ObExprOperatorType type)
|
||||
|
91
src/sql/engine/expr/ob_expr_transaction_id.cpp
Normal file
91
src/sql/engine/expr/ob_expr_transaction_id.cpp
Normal file
@ -0,0 +1,91 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#define USING_LOG_PREFIX SQL_ENG
|
||||
#include "sql/engine/expr/ob_expr_transaction_id.h"
|
||||
#include "sql/engine/ob_exec_context.h"
|
||||
#include "sql/session/ob_sql_session_info.h"
|
||||
#include "sql/engine/ob_exec_context.h"
|
||||
|
||||
using namespace oceanbase::common;
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace sql
|
||||
{
|
||||
|
||||
|
||||
ObExprTransactionId::ObExprTransactionId(ObIAllocator &alloc)
|
||||
: ObFuncExprOperator(alloc, T_FUN_SYS_TRANSACTION_ID, N_TRANSACTION_ID, 0, NOT_VALID_FOR_GENERATED_COL, NOT_ROW_DIMENSION)
|
||||
{
|
||||
}
|
||||
|
||||
ObExprTransactionId::~ObExprTransactionId()
|
||||
{
|
||||
}
|
||||
|
||||
int ObExprTransactionId::calc_result_type0(ObExprResType &type, ObExprTypeCtx &type_ctx) const
|
||||
{
|
||||
UNUSED(type_ctx);
|
||||
if (is_oracle_mode()) {
|
||||
const ObAccuracy &acc = ObAccuracy::DDL_DEFAULT_ACCURACY2[common::ORACLE_MODE][common::ObNumberType];
|
||||
type.set_number();
|
||||
type.set_scale(acc.get_scale());
|
||||
type.set_precision(acc.get_precision());
|
||||
} else {
|
||||
const ObAccuracy &acc = common::ObAccuracy::DDL_DEFAULT_ACCURACY[common::ObUInt64Type];
|
||||
type.set_uint64();
|
||||
type.set_scale(acc.get_scale());
|
||||
type.set_precision(acc.get_precision());
|
||||
type.set_result_flag(NOT_NULL_FLAG);
|
||||
}
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
|
||||
int ObExprTransactionId::eval_transaction_id(const ObExpr &expr, ObEvalCtx &ctx,
|
||||
ObDatum &expr_datum)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
UNUSED(expr);
|
||||
const ObSQLSessionInfo *session_info = NULL;
|
||||
if (OB_ISNULL(session_info = ctx.exec_ctx_.get_my_session())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("session info is null", K(ret));
|
||||
} else {
|
||||
const transaction::ObTxDesc *txdesc = session_info->get_tx_desc();
|
||||
const int64_t tx_id = txdesc ? txdesc->get_tx_id().get_id() : 0;
|
||||
if (ObUInt64Type == expr.datum_meta_.type_) { // mysql mode
|
||||
expr_datum.set_uint(tx_id);
|
||||
} else { // oracle mode
|
||||
ObNumStackOnceAlloc tmp_alloc;
|
||||
number::ObNumber num;
|
||||
if (OB_FAIL(num.from(tx_id, tmp_alloc))) {
|
||||
LOG_WARN("copy number fail", K(ret));
|
||||
} else {
|
||||
expr_datum.set_number(num);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObExprTransactionId::cg_expr(ObExprCGCtx &op_cg_ctx, const ObRawExpr &raw_expr,
|
||||
ObExpr &rt_expr) const
|
||||
{
|
||||
UNUSED(raw_expr);
|
||||
UNUSED(op_cg_ctx);
|
||||
rt_expr.eval_func_ = ObExprTransactionId::eval_transaction_id;
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
}/* ns sql*/
|
||||
}/* ns oceanbase */
|
38
src/sql/engine/expr/ob_expr_transaction_id.h
Normal file
38
src/sql/engine/expr/ob_expr_transaction_id.h
Normal file
@ -0,0 +1,38 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef OCEANBASE_SQL_ENGINE_EXPR_OB_EXPR_TRANSACTION_ID_
|
||||
#define OCEANBASE_SQL_ENGINE_EXPR_OB_EXPR_TRANSACTION_ID_
|
||||
|
||||
#include "sql/engine/expr/ob_expr_operator.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace sql
|
||||
{
|
||||
class ObExprTransactionId : public ObFuncExprOperator
|
||||
{
|
||||
public:
|
||||
explicit ObExprTransactionId(common::ObIAllocator &alloc);
|
||||
virtual ~ObExprTransactionId();
|
||||
virtual int calc_result_type0(ObExprResType &type, common::ObExprTypeCtx &type_ctx) const;
|
||||
static int eval_transaction_id(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &expr_datum);
|
||||
virtual int cg_expr(ObExprCGCtx &op_cg_ctx,
|
||||
const ObRawExpr &raw_expr,
|
||||
ObExpr &rt_expr) const override;
|
||||
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObExprTransactionId);
|
||||
};
|
||||
}//sql
|
||||
}//oceanbase
|
||||
#endif /* OCEANBASE_SQL_ENGINE_EXPR_OB_EXPR_TRANSACTION_ID_ */
|
@ -347,7 +347,11 @@ int ObSqlTransControl::kill_tx(ObSQLSessionInfo *session, int cause)
|
||||
int ret = OB_SUCCESS;
|
||||
if (!session->get_is_deserialized() && session->is_in_transaction()) {
|
||||
auto session_id = session->get_sessid();
|
||||
LOG_INFO("begin to kill tx", K(cause), K(session_id), KPC(session));
|
||||
if (cause >= 0) {
|
||||
LOG_INFO("begin to kill tx", "caused_by", transaction::ObTxAbortCauseNames::of(cause), K(cause), K(session_id), KPC(session));
|
||||
} else {
|
||||
LOG_INFO("begin to kill tx", "caused_by", common::ob_error_name(cause), K(cause), K(session_id), KPC(session));
|
||||
}
|
||||
transaction::ObTxDesc *tx_desc = session->get_tx_desc();
|
||||
auto tx_tenant_id = tx_desc->get_tenant_id();
|
||||
const ObTransID tx_id = tx_desc->get_tx_id();
|
||||
|
@ -887,7 +887,8 @@ int ObRawExpr::is_non_pure_sys_func_expr(bool &is_non_pure) const
|
||||
|| T_FUN_SYS_LAST_INSERT_ID == type_
|
||||
|| T_FUN_SYS_ROW_COUNT == type_
|
||||
|| T_FUN_SYS_FOUND_ROWS == type_
|
||||
|| T_FUN_SYS_CURRENT_USER_PRIV == type_) {
|
||||
|| T_FUN_SYS_CURRENT_USER_PRIV == type_
|
||||
|| T_FUN_SYS_TRANSACTION_ID == type_) {
|
||||
is_non_pure = true;
|
||||
}
|
||||
}
|
||||
|
@ -895,7 +895,7 @@ int ObPartTransCtx::commit(const ObTxCommitParts &parts,
|
||||
REC_TRANS_TRACE_EXT2(tlog_, commit, OB_ID(ret), ret,
|
||||
OB_ID(tid), GETTID(),
|
||||
OB_ID(ref), get_ref());
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_FAIL(ret) && OB_EAGAIN != ret && OB_TRANS_COMMITED != ret) {
|
||||
TRANS_LOG(WARN, "trx commit failed", KR(ret), KPC(this));
|
||||
}
|
||||
return ret;
|
||||
|
@ -124,7 +124,7 @@ int ObTx##name##P::process() \
|
||||
const int64_t cur_ts = ObTimeUtility::current_time(); \
|
||||
total_rt = total_rt + (cur_ts - run_ts); \
|
||||
total_process++; \
|
||||
if (OB_FAIL(ret)) { \
|
||||
if (OB_FAIL(ret) && OB_TRANS_COMMITED != ret) { \
|
||||
TRANS_LOG(WARN, "handle txn message fail", KR(ret), "msg", arg_); \
|
||||
} \
|
||||
} \
|
||||
|
@ -263,7 +263,10 @@ int ObTxRPCCB<PC>::process()
|
||||
} else {
|
||||
status = result.get_status();
|
||||
}
|
||||
if (need_refresh_location_cache_(status)) {
|
||||
if (status != OB_SUCCESS
|
||||
&& !receiver_ls_id_.is_scheduler_ls()
|
||||
&& receiver_ls_id_.is_valid()
|
||||
&& need_refresh_location_cache_(status)) {
|
||||
if (OB_FAIL(refresh_location_cache(receiver_ls_id_))) {
|
||||
TRANS_LOG(WARN, "refresh location cache error", KR(ret),
|
||||
K_(trans_id), "ls", receiver_ls_id_, K(result), K(dst), K(status), K_(msg_type));
|
||||
@ -282,7 +285,7 @@ int ObTxRPCCB<PC>::process()
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCCESS != ret || (OB_SUCCESS != status && status != -1 && status != OB_NEED_RETRY)) {
|
||||
if (OB_SUCCESS != ret || (OB_SUCCESS != status && status != -1 && status != OB_NEED_RETRY && status != OB_EAGAIN)) {
|
||||
TRANS_LOG(WARN, "trx rpc callback", K(ret), K(status), K(dst), K(result));
|
||||
}
|
||||
return ret;
|
||||
|
@ -1924,7 +1924,7 @@ int ObTransService::handle_trans_commit_request(ObTxCommitMsg &msg,
|
||||
#ifndef NDEBUG
|
||||
TRANS_LOG(INFO, "handle trans commit request", K(ret), K(msg));
|
||||
#else
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_FAIL(ret) && OB_TRANS_COMMITED != ret) {
|
||||
TRANS_LOG(WARN, "handle trans commit request failed", K(ret), K(msg));
|
||||
}
|
||||
#endif
|
||||
|
@ -477,6 +477,7 @@ int ObTransService::submit_commit_tx(ObTxDesc &tx,
|
||||
if (tx.expire_ts_ <= ObClockGenerator::getClock()) {
|
||||
TX_STAT_TIMEOUT_INC
|
||||
TRANS_LOG(WARN, "tx has timeout, it has rollbacked internally", K_(tx.expire_ts), K(tx));
|
||||
tx.print_trace_();
|
||||
ret = OB_TRANS_ROLLBACKED;
|
||||
handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED);
|
||||
} else if (tx.flags_.PARTS_INCOMPLETE_) {
|
||||
@ -1837,7 +1838,8 @@ inline int ObTransService::sync_rollback_savepoint__(ObTxDesc &tx,
|
||||
// interrupted, fail fastly
|
||||
if (tx.flags_.INTERRUPTED_) {
|
||||
ret = OB_ERR_INTERRUPTED;
|
||||
TRANS_LOG(WARN, "rollback was interrupted", K_(tx.tx_id),
|
||||
TRANS_LOG(WARN, "rollback was interrupted", "caused_by", ObTxAbortCauseNames::of(tx.abort_cause_),
|
||||
"trans_id", tx.tx_id_,
|
||||
K(remain_cnt), K(waittime), K(retries));
|
||||
}
|
||||
}
|
||||
@ -2010,7 +2012,12 @@ OB_INLINE int ObTransService::tx_sanity_check_(ObTxDesc &tx, const bool in_stmt)
|
||||
break;
|
||||
}
|
||||
case ObTxDesc::State::ABORTED:
|
||||
ret = tx.abort_cause_ < 0 ? tx.abort_cause_ : OB_TRANS_NEED_ROLLBACK;
|
||||
{
|
||||
const int cause = tx.abort_cause_;
|
||||
ret = cause < 0 ? cause : OB_TRANS_NEED_ROLLBACK;
|
||||
const char *err_name = cause < 0 ? common::ob_error_name(cause) : ObTxAbortCauseNames::of(cause);
|
||||
TRANS_LOG(WARN, "trans has been aborted", "caused_by", err_name, K(ret), "txid", tx.tx_id_);
|
||||
}
|
||||
break;
|
||||
case ObTxDesc::State::COMMITTED:
|
||||
ret = OB_TRANS_COMMITED;
|
||||
|
@ -12,6 +12,7 @@
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <thread>
|
||||
#include <sys/resource.h>
|
||||
#include <functional>
|
||||
#define private public
|
||||
#define protected public
|
||||
@ -205,7 +206,8 @@ class MockObServer {
|
||||
public:
|
||||
MockObServer(const int64_t ls_id, const char*addr, const int32_t port, MsgBus &msg_bus)
|
||||
: addr_(ObAddr(ObAddr::VER::IPV4, addr, port)),
|
||||
tx_node_(ls_id, addr_, msg_bus),
|
||||
tx_node_ptr_(new ObTxNode(ls_id, addr_, msg_bus)),
|
||||
tx_node_(*tx_node_ptr_),
|
||||
allocator_(), session_()
|
||||
{
|
||||
session_.test_init(1, 1111, 2222, &allocator_);
|
||||
@ -218,6 +220,7 @@ public:
|
||||
tx_node_.release_tx(*session_.get_tx_desc());
|
||||
session_.get_tx_desc() = NULL;
|
||||
}
|
||||
delete tx_node_ptr_;
|
||||
}
|
||||
int start() { return tx_node_.start(); }
|
||||
public:
|
||||
@ -261,7 +264,8 @@ private:
|
||||
int handle_msg_(int type, void *msg);
|
||||
int handle_msg_check_alive_resp__(ObTxFreeRouteCheckAliveRespMsg *msg);
|
||||
ObAddr addr_;
|
||||
ObTxNode tx_node_;
|
||||
ObTxNode *tx_node_ptr_;
|
||||
ObTxNode &tx_node_;
|
||||
ObMalloc allocator_;
|
||||
sql::ObSQLSessionInfo session_;
|
||||
TO_STRING_KV(K_(tx_node), K_(session));
|
||||
@ -657,6 +661,9 @@ public:
|
||||
const testing::TestInfo* const test_info =
|
||||
testing::UnitTest::GetInstance()->current_test_info();
|
||||
MTL_MEM_ALLOC_MGR.init();
|
||||
struct rlimit rl;
|
||||
getrlimit(RLIMIT_STACK, &rl);
|
||||
setrlimit(20*1024*1024, &rl);
|
||||
auto test_name = test_info->name();
|
||||
_TRANS_LOG(INFO, ">>>> starting test : %s", test_name);
|
||||
}
|
||||
|
@ -78,7 +78,25 @@ int ObTxDescGuard::release() {
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
ObString ObTxNode::get_identifer_str() const
|
||||
{
|
||||
struct ID {
|
||||
ObAddr addr;
|
||||
int64_t ls_id;
|
||||
DECLARE_TO_STRING {
|
||||
int64_t pos = 0;
|
||||
int32_t pos0 = 0;
|
||||
addr.addr_to_buffer(buf, buf_len, pos0);
|
||||
pos += pos0;
|
||||
BUF_PRINTF("_%ld", ls_id);
|
||||
return pos;
|
||||
}
|
||||
} identifer = {
|
||||
.addr = addr_,
|
||||
.ls_id = ls_id_.id()
|
||||
};
|
||||
return ObString(to_cstring(&identifer));
|
||||
}
|
||||
ObTxNode::ObTxNode(const int64_t ls_id,
|
||||
const ObAddr &addr,
|
||||
MsgBus &msg_bus) :
|
||||
@ -86,10 +104,10 @@ ObTxNode::ObTxNode(const int64_t ls_id,
|
||||
addr_(addr),
|
||||
ls_id_(ls_id),
|
||||
tenant_id_(1001),
|
||||
tenant_(tenant_id_),
|
||||
tenant_(tenant_id_, 10, *GCTX.cgroup_ctrl_),
|
||||
fake_part_trans_ctx_pool_(1001, false, false, 4),
|
||||
memtable_(NULL),
|
||||
msg_consumer_(ObString("TxNode"),
|
||||
msg_consumer_(get_identifer_str(),
|
||||
&msg_queue_,
|
||||
std::bind(&ObTxNode::handle_msg_,
|
||||
this, std::placeholders::_1)),
|
||||
@ -102,8 +120,10 @@ ObTxNode::ObTxNode(const int64_t ls_id,
|
||||
addr.to_string(name_buf_, sizeof(name_buf_));
|
||||
msg_consumer_.set_name(name_);
|
||||
role_ = Leader;
|
||||
tenant_.enable_tenant_ctx_check_ = false;
|
||||
tenant_.set(&fake_tenant_freezer_);
|
||||
tenant_.set(&fake_part_trans_ctx_pool_);
|
||||
tenant_.start();
|
||||
ObTenantEnv::set_tenant(&tenant_);
|
||||
ObTableHandleV2 lock_memtable_handle;
|
||||
lock_memtable_handle.set_table(&lock_memtable_, &t3m_, ObITable::LOCK_MEMTABLE);
|
||||
@ -237,7 +257,7 @@ void ObTxNode::dump_msg_queue_()
|
||||
int ret = OB_SUCCESS;
|
||||
MsgPack *msg = NULL;
|
||||
int i = 0;
|
||||
while(OB_NOT_NULL((msg = (MsgPack*)msg_queue_.pop()))) {
|
||||
while(OB_SUCC(msg_queue_.pop((ObLink*&)msg))) {
|
||||
++i;
|
||||
MsgInfo msg_info;
|
||||
OZ (get_msg_info(msg, msg_info));
|
||||
@ -350,7 +370,7 @@ int ObTxNode::recv_msg(const ObAddr &sender, ObString &m)
|
||||
TRANS_LOG(INFO, "recv_msg", K(sender), "msg_ptr", OB_P(m.ptr()), KPC(this));
|
||||
int ret = OB_SUCCESS;
|
||||
auto pkt = new MsgPack(sender, m);
|
||||
msg_queue_.push(pkt);
|
||||
OZ(msg_queue_.push(pkt));
|
||||
msg_consumer_.wakeup();
|
||||
return ret;
|
||||
}
|
||||
@ -360,7 +380,7 @@ int ObTxNode::sync_recv_msg(const ObAddr &sender, ObString &m, ObString &resp)
|
||||
TRANS_LOG(INFO, "sync_recv_msg", K(sender), "msg_ptr", OB_P(m.ptr()), KPC(this));
|
||||
int ret = OB_SUCCESS;
|
||||
auto pkt = new MsgPack(sender, m, true);
|
||||
msg_queue_.push(pkt);
|
||||
OZ(msg_queue_.push(pkt));
|
||||
msg_consumer_.wakeup();
|
||||
pkt->cond_.wait(pkt->cond_.get_key(), 500000);
|
||||
if (pkt->resp_ready_) {
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "storage/tx/ob_trans_service.h"
|
||||
#include "storage/tx/ob_trans_part_ctx.h"
|
||||
#include "share/rc/ob_tenant_base.h"
|
||||
#include "observer/omt/ob_tenant.h"
|
||||
#include "share/ob_alive_server_tracer.h"
|
||||
#include "storage/tablelock/ob_lock_memtable.h"
|
||||
#include "storage/ls/ob_ls_tx_service.h"
|
||||
@ -47,7 +48,7 @@ class QueueConsumer : public share::ObThreadPool
|
||||
{
|
||||
public:
|
||||
QueueConsumer(ObString name,
|
||||
ObSpScLinkQueue *q,
|
||||
ObLinkQueue *q,
|
||||
std::function<int(T*)> func):
|
||||
name_(name), queue_(q), func_(func), cond_() {}
|
||||
virtual int start() {
|
||||
@ -65,8 +66,14 @@ public:
|
||||
ObThreadPool::stop();
|
||||
}
|
||||
void run1() {
|
||||
{
|
||||
char name_buf[128];
|
||||
snprintf(name_buf, 128, "QConsumer:%s", name_.ptr());
|
||||
set_thread_name(name_buf);
|
||||
}
|
||||
while(!stop_) {
|
||||
ObLink *e = queue_->pop();
|
||||
ObLink *e = NULL;
|
||||
queue_->pop(e);
|
||||
if (e) {
|
||||
T *t = static_cast<T*>(e);
|
||||
func_(t);
|
||||
@ -80,11 +87,11 @@ public:
|
||||
}
|
||||
void wakeup() { if (ATOMIC_BCAS(&is_sleeping_, true, false)) { cond_.signal(); } }
|
||||
void set_name(ObString &name) { name_ = name; }
|
||||
TO_STRING_KV(KP(this), K_(name), KP_(queue), K(queue_->empty()), K_(stop));
|
||||
TO_STRING_KV(KP(this), K_(name), KP_(queue), K(queue_->size()), K_(stop));
|
||||
private:
|
||||
ObString name_;
|
||||
bool stop_;
|
||||
ObSpScLinkQueue *queue_;
|
||||
ObLinkQueue *queue_;
|
||||
std::function<int(T*)> func_;
|
||||
common::SimpleCond cond_;
|
||||
bool is_sleeping_ = false;
|
||||
@ -131,7 +138,8 @@ public:
|
||||
}
|
||||
|
||||
public:
|
||||
TO_STRING_KV(KP(this), K(addr_), K_(ls_id));
|
||||
TO_STRING_KV(KP(this), K(addr_), K_(ls_id), K(msg_queue_.size()));
|
||||
ObString get_identifer_str() const;
|
||||
ObTxDescGuard get_tx_guard();
|
||||
// the simple r/w interface
|
||||
int read(ObTxDesc &tx, const int64_t key, int64_t &value, const ObTxIsolationLevel iso = ObTxIsolationLevel::RC);
|
||||
@ -271,13 +279,13 @@ public:
|
||||
ObAddr addr_;
|
||||
ObLSID ls_id_;
|
||||
int64_t tenant_id_;
|
||||
ObTenantBase tenant_;
|
||||
omt::ObTenant tenant_;
|
||||
common::ObServerObjectPool<ObPartTransCtx> fake_part_trans_ctx_pool_;
|
||||
ObTransService txs_;
|
||||
memtable::ObMemtable *memtable_;
|
||||
ObSEArray<ObColDesc, 2> columns_;
|
||||
// msg_handler
|
||||
ObSpScLinkQueue msg_queue_;
|
||||
ObLinkQueue msg_queue_;
|
||||
QueueConsumer<MsgPack> msg_consumer_;
|
||||
// fake objects
|
||||
storage::ObTenantMetaMemMgr t3m_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user