From 056e47167d77b12fd4236a8a9ca86751ba828be0 Mon Sep 17 00:00:00 2001 From: chinaxing Date: Thu, 11 Jan 2024 10:14:47 +0000 Subject: [PATCH] misc refinement --- deps/oblib/src/lib/ob_name_def.h | 1 + deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp | 2 +- .../src/rpc/obmysql/ob_sql_sock_handler.cpp | 6 +- src/objit/include/objit/common/ob_item_type.h | 1 + src/sql/CMakeLists.txt | 1 + .../engine/expr/ob_expr_eval_functions.cpp | 4 +- .../engine/expr/ob_expr_operator_factory.cpp | 3 + .../engine/expr/ob_expr_transaction_id.cpp | 91 +++++++++++++++++++ src/sql/engine/expr/ob_expr_transaction_id.h | 38 ++++++++ src/sql/ob_sql_trans_control.cpp | 6 +- src/sql/resolver/expr/ob_raw_expr.cpp | 3 +- src/storage/tx/ob_trans_part_ctx.cpp | 2 +- src/storage/tx/ob_trans_rpc.cpp | 2 +- src/storage/tx/ob_trans_rpc.h | 7 +- src/storage/tx/ob_trans_service_v4.cpp | 2 +- src/storage/tx/ob_tx_api.cpp | 11 ++- unittest/storage/tx/it/test_tx_free_route.cpp | 11 ++- unittest/storage/tx/it/tx_node.cpp | 32 +++++-- unittest/storage/tx/it/tx_node.h | 22 +++-- 19 files changed, 216 insertions(+), 29 deletions(-) create mode 100644 src/sql/engine/expr/ob_expr_transaction_id.cpp create mode 100644 src/sql/engine/expr/ob_expr_transaction_id.h diff --git a/deps/oblib/src/lib/ob_name_def.h b/deps/oblib/src/lib/ob_name_def.h index 78ec55a4f..a37967757 100644 --- a/deps/oblib/src/lib/ob_name_def.h +++ b/deps/oblib/src/lib/ob_name_def.h @@ -625,6 +625,7 @@ #define N_JOIN_INFO "join_info" #define N_ID_SET "id_set" #define N_CURRENT_SCN "current_scn" +#define N_TRANSACTION_ID "ob_transaction_id" #define N_EQUAL_SET "equal_set" #define N_LEFT_ID "left_id" #define N_RIGHT_ID "right_id" diff --git a/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp b/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp index 085c1d763..7a0cf5574 100644 --- a/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp +++ b/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp @@ -844,7 +844,7 @@ public: LOG_TRACE("revert_sock: sock still readable", K(*s)); int ret = OB_SUCCESS; if (OB_FAIL(handler_.on_readable(s->sess_))) { - LOG_TRACE("push to omt queue fail", K(ret), K(*s)); + LOG_WARN("push to omt queue fail, will close socket", K(ret), K(*s)); push_close_req(s); s->disable_may_handling_flag(); } diff --git a/deps/oblib/src/rpc/obmysql/ob_sql_sock_handler.cpp b/deps/oblib/src/rpc/obmysql/ob_sql_sock_handler.cpp index 4523d1c76..8b9eee21e 100644 --- a/deps/oblib/src/rpc/obmysql/ob_sql_sock_handler.cpp +++ b/deps/oblib/src/rpc/obmysql/ob_sql_sock_handler.cpp @@ -131,17 +131,17 @@ int ObSqlSockHandler::on_readable(void* udata) ret = OB_INVALID_ARGUMENT; LOG_ERROR("sess is null!", K(ret)); } else if (OB_FAIL(sock_processor_.decode_sql_packet(sess->pool_, *sess, NULL, pkt))) { - LOG_WARN("decode sql req fail", K(ret)); + LOG_WARN("decode sql req fail", K(ret), K(sess->sql_session_id_)); } else if (NULL == pkt) { sess->revert_sock(); } else if (OB_FAIL(sock_processor_.build_sql_req(*sess, pkt, sql_req))) { - LOG_WARN("build sql req fail", K(ret)); + LOG_WARN("build sql req fail", K(ret), K(sess->sql_session_id_)); } if (OB_SUCCESS != ret || NULL == sql_req) { } else if (FALSE_IT(sess->set_last_decode_succ_and_deliver_time(ObTimeUtility::current_time()))) { } else if (OB_FAIL(deliver_->deliver(*sql_req))) { - LOG_WARN("deliver sql request fail", K(ret)); + LOG_WARN("deliver sql request fail", K(ret), K(sess->sql_session_id_)); } return ret; diff --git a/src/objit/include/objit/common/ob_item_type.h b/src/objit/include/objit/common/ob_item_type.h index b0bd2184a..11089dd70 100755 --- a/src/objit/include/objit/common/ob_item_type.h +++ b/src/objit/include/objit/common/ob_item_type.h @@ -861,6 +861,7 @@ typedef enum ObItemType T_FUN_SYS_DOC_ID = 1810, T_FUN_SYS_WORD_COUNT = 1811, T_FUN_SYS_BM25 = 1812, + T_FUN_SYS_TRANSACTION_ID = 1813, T_FUN_SYS_END = 2000, T_FUN_SYS_ALIGN_DATE4CMP = 2010, diff --git a/src/sql/CMakeLists.txt b/src/sql/CMakeLists.txt index e12535e54..fb4f09891 100644 --- a/src/sql/CMakeLists.txt +++ b/src/sql/CMakeLists.txt @@ -688,6 +688,7 @@ ob_set_subtarget(ob_sql engine_expr engine/expr/ob_expr_temp_table_ssid.cpp engine/expr/vector_cast/vector_cast.cpp engine/expr/ob_expr_extract_cert_expired_time.cpp + engine/expr/ob_expr_transaction_id.cpp ) ob_set_subtarget(ob_sql engine_join diff --git a/src/sql/engine/expr/ob_expr_eval_functions.cpp b/src/sql/engine/expr/ob_expr_eval_functions.cpp index 7c012ca7e..36183192f 100644 --- a/src/sql/engine/expr/ob_expr_eval_functions.cpp +++ b/src/sql/engine/expr/ob_expr_eval_functions.cpp @@ -338,6 +338,7 @@ #include "ob_expr_between.h" #include "ob_expr_align_date4cmp.h" #include "ob_expr_extract_cert_expired_time.h" +#include "ob_expr_transaction_id.h" namespace oceanbase { @@ -1109,7 +1110,8 @@ static ObExpr::EvalFunc g_expr_eval_functions[] = { NULL, // ObExprDocID::generate_doc_id, /* 667 */ NULL, // ObExprWordSegment::generate_fulltext_column, /* 668 */ NULL, // ObExprWordCount::generate_word_count, /* 669 */ - NULL, // ObExprBM25::eval_bm25_relevance_expr, /* 690 */ + NULL, // ObExprBM25::eval_bm25_relevance_expr, /* 670 */ + ObExprTransactionId::eval_transaction_id, /* 671 */ }; static ObExpr::EvalBatchFunc g_expr_eval_batch_functions[] = { diff --git a/src/sql/engine/expr/ob_expr_operator_factory.cpp b/src/sql/engine/expr/ob_expr_operator_factory.cpp index 626be545b..2ce4e998d 100644 --- a/src/sql/engine/expr/ob_expr_operator_factory.cpp +++ b/src/sql/engine/expr/ob_expr_operator_factory.cpp @@ -404,6 +404,7 @@ #include "sql/engine/expr/ob_expr_temp_table_ssid.h" #include "sql/engine/expr/ob_expr_align_date4cmp.h" #include "sql/engine/expr/ob_expr_extract_cert_expired_time.h" +#include "sql/engine/expr/ob_expr_transaction_id.h" using namespace oceanbase::common; namespace oceanbase @@ -1001,6 +1002,7 @@ void ObExprOperatorFactory::register_expr_operators() REG_OP(ObExprPrefixPattern); REG_OP(ObExprAlignDate4Cmp); REG_OP(ObExprExtractExpiredTime); + REG_OP(ObExprTransactionId); }(); // 注册oracle系统函数 REG_OP_ORCL(ObExprSysConnectByPath); @@ -1312,6 +1314,7 @@ void ObExprOperatorFactory::register_expr_operators() REG_OP_ORCL(ObExprUpdateXml); REG_OP_ORCL(ObExprTempTableSSID); REG_OP_ORCL(ObExprJsonObjectStar); + REG_OP_ORCL(ObExprTransactionId); } bool ObExprOperatorFactory::is_expr_op_type_valid(ObExprOperatorType type) diff --git a/src/sql/engine/expr/ob_expr_transaction_id.cpp b/src/sql/engine/expr/ob_expr_transaction_id.cpp new file mode 100644 index 000000000..f9f9d7024 --- /dev/null +++ b/src/sql/engine/expr/ob_expr_transaction_id.cpp @@ -0,0 +1,91 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SQL_ENG +#include "sql/engine/expr/ob_expr_transaction_id.h" +#include "sql/engine/ob_exec_context.h" +#include "sql/session/ob_sql_session_info.h" +#include "sql/engine/ob_exec_context.h" + +using namespace oceanbase::common; +namespace oceanbase +{ +namespace sql +{ + + +ObExprTransactionId::ObExprTransactionId(ObIAllocator &alloc) + : ObFuncExprOperator(alloc, T_FUN_SYS_TRANSACTION_ID, N_TRANSACTION_ID, 0, NOT_VALID_FOR_GENERATED_COL, NOT_ROW_DIMENSION) +{ +} + +ObExprTransactionId::~ObExprTransactionId() +{ +} + +int ObExprTransactionId::calc_result_type0(ObExprResType &type, ObExprTypeCtx &type_ctx) const +{ + UNUSED(type_ctx); + if (is_oracle_mode()) { + const ObAccuracy &acc = ObAccuracy::DDL_DEFAULT_ACCURACY2[common::ORACLE_MODE][common::ObNumberType]; + type.set_number(); + type.set_scale(acc.get_scale()); + type.set_precision(acc.get_precision()); + } else { + const ObAccuracy &acc = common::ObAccuracy::DDL_DEFAULT_ACCURACY[common::ObUInt64Type]; + type.set_uint64(); + type.set_scale(acc.get_scale()); + type.set_precision(acc.get_precision()); + type.set_result_flag(NOT_NULL_FLAG); + } + return OB_SUCCESS; +} + +int ObExprTransactionId::eval_transaction_id(const ObExpr &expr, ObEvalCtx &ctx, + ObDatum &expr_datum) +{ + int ret = OB_SUCCESS; + UNUSED(expr); + const ObSQLSessionInfo *session_info = NULL; + if (OB_ISNULL(session_info = ctx.exec_ctx_.get_my_session())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session info is null", K(ret)); + } else { + const transaction::ObTxDesc *txdesc = session_info->get_tx_desc(); + const int64_t tx_id = txdesc ? txdesc->get_tx_id().get_id() : 0; + if (ObUInt64Type == expr.datum_meta_.type_) { // mysql mode + expr_datum.set_uint(tx_id); + } else { // oracle mode + ObNumStackOnceAlloc tmp_alloc; + number::ObNumber num; + if (OB_FAIL(num.from(tx_id, tmp_alloc))) { + LOG_WARN("copy number fail", K(ret)); + } else { + expr_datum.set_number(num); + } + } + } + return ret; +} + +int ObExprTransactionId::cg_expr(ObExprCGCtx &op_cg_ctx, const ObRawExpr &raw_expr, + ObExpr &rt_expr) const +{ + UNUSED(raw_expr); + UNUSED(op_cg_ctx); + rt_expr.eval_func_ = ObExprTransactionId::eval_transaction_id; + return OB_SUCCESS; +} + + +}/* ns sql*/ +}/* ns oceanbase */ diff --git a/src/sql/engine/expr/ob_expr_transaction_id.h b/src/sql/engine/expr/ob_expr_transaction_id.h new file mode 100644 index 000000000..9f9fbce62 --- /dev/null +++ b/src/sql/engine/expr/ob_expr_transaction_id.h @@ -0,0 +1,38 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_SQL_ENGINE_EXPR_OB_EXPR_TRANSACTION_ID_ +#define OCEANBASE_SQL_ENGINE_EXPR_OB_EXPR_TRANSACTION_ID_ + +#include "sql/engine/expr/ob_expr_operator.h" + +namespace oceanbase +{ +namespace sql +{ +class ObExprTransactionId : public ObFuncExprOperator +{ +public: + explicit ObExprTransactionId(common::ObIAllocator &alloc); + virtual ~ObExprTransactionId(); + virtual int calc_result_type0(ObExprResType &type, common::ObExprTypeCtx &type_ctx) const; + static int eval_transaction_id(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &expr_datum); + virtual int cg_expr(ObExprCGCtx &op_cg_ctx, + const ObRawExpr &raw_expr, + ObExpr &rt_expr) const override; + +private: + DISALLOW_COPY_AND_ASSIGN(ObExprTransactionId); +}; +}//sql +}//oceanbase +#endif /* OCEANBASE_SQL_ENGINE_EXPR_OB_EXPR_TRANSACTION_ID_ */ diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index 9869188bc..6616eb1a1 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -347,7 +347,11 @@ int ObSqlTransControl::kill_tx(ObSQLSessionInfo *session, int cause) int ret = OB_SUCCESS; if (!session->get_is_deserialized() && session->is_in_transaction()) { auto session_id = session->get_sessid(); - LOG_INFO("begin to kill tx", K(cause), K(session_id), KPC(session)); + if (cause >= 0) { + LOG_INFO("begin to kill tx", "caused_by", transaction::ObTxAbortCauseNames::of(cause), K(cause), K(session_id), KPC(session)); + } else { + LOG_INFO("begin to kill tx", "caused_by", common::ob_error_name(cause), K(cause), K(session_id), KPC(session)); + } transaction::ObTxDesc *tx_desc = session->get_tx_desc(); auto tx_tenant_id = tx_desc->get_tenant_id(); const ObTransID tx_id = tx_desc->get_tx_id(); diff --git a/src/sql/resolver/expr/ob_raw_expr.cpp b/src/sql/resolver/expr/ob_raw_expr.cpp index 2f2075de2..9068aab4d 100644 --- a/src/sql/resolver/expr/ob_raw_expr.cpp +++ b/src/sql/resolver/expr/ob_raw_expr.cpp @@ -887,7 +887,8 @@ int ObRawExpr::is_non_pure_sys_func_expr(bool &is_non_pure) const || T_FUN_SYS_LAST_INSERT_ID == type_ || T_FUN_SYS_ROW_COUNT == type_ || T_FUN_SYS_FOUND_ROWS == type_ - || T_FUN_SYS_CURRENT_USER_PRIV == type_) { + || T_FUN_SYS_CURRENT_USER_PRIV == type_ + || T_FUN_SYS_TRANSACTION_ID == type_) { is_non_pure = true; } } diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 1ebb25336..9fc5b8df5 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -895,7 +895,7 @@ int ObPartTransCtx::commit(const ObTxCommitParts &parts, REC_TRANS_TRACE_EXT2(tlog_, commit, OB_ID(ret), ret, OB_ID(tid), GETTID(), OB_ID(ref), get_ref()); - if (OB_FAIL(ret)) { + if (OB_FAIL(ret) && OB_EAGAIN != ret && OB_TRANS_COMMITED != ret) { TRANS_LOG(WARN, "trx commit failed", KR(ret), KPC(this)); } return ret; diff --git a/src/storage/tx/ob_trans_rpc.cpp b/src/storage/tx/ob_trans_rpc.cpp index 913f5a4af..9692cf67e 100644 --- a/src/storage/tx/ob_trans_rpc.cpp +++ b/src/storage/tx/ob_trans_rpc.cpp @@ -124,7 +124,7 @@ int ObTx##name##P::process() \ const int64_t cur_ts = ObTimeUtility::current_time(); \ total_rt = total_rt + (cur_ts - run_ts); \ total_process++; \ - if (OB_FAIL(ret)) { \ + if (OB_FAIL(ret) && OB_TRANS_COMMITED != ret) { \ TRANS_LOG(WARN, "handle txn message fail", KR(ret), "msg", arg_); \ } \ } \ diff --git a/src/storage/tx/ob_trans_rpc.h b/src/storage/tx/ob_trans_rpc.h index 6306d42b4..359354420 100644 --- a/src/storage/tx/ob_trans_rpc.h +++ b/src/storage/tx/ob_trans_rpc.h @@ -263,7 +263,10 @@ int ObTxRPCCB::process() } else { status = result.get_status(); } - if (need_refresh_location_cache_(status)) { + if (status != OB_SUCCESS + && !receiver_ls_id_.is_scheduler_ls() + && receiver_ls_id_.is_valid() + && need_refresh_location_cache_(status)) { if (OB_FAIL(refresh_location_cache(receiver_ls_id_))) { TRANS_LOG(WARN, "refresh location cache error", KR(ret), K_(trans_id), "ls", receiver_ls_id_, K(result), K(dst), K(status), K_(msg_type)); @@ -282,7 +285,7 @@ int ObTxRPCCB::process() } } } - if (OB_SUCCESS != ret || (OB_SUCCESS != status && status != -1 && status != OB_NEED_RETRY)) { + if (OB_SUCCESS != ret || (OB_SUCCESS != status && status != -1 && status != OB_NEED_RETRY && status != OB_EAGAIN)) { TRANS_LOG(WARN, "trx rpc callback", K(ret), K(status), K(dst), K(result)); } return ret; diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index d496e6671..02b6c340f 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -1924,7 +1924,7 @@ int ObTransService::handle_trans_commit_request(ObTxCommitMsg &msg, #ifndef NDEBUG TRANS_LOG(INFO, "handle trans commit request", K(ret), K(msg)); #else - if (OB_FAIL(ret)) { + if (OB_FAIL(ret) && OB_TRANS_COMMITED != ret) { TRANS_LOG(WARN, "handle trans commit request failed", K(ret), K(msg)); } #endif diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index 8c27265a7..c9476dd3c 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -477,6 +477,7 @@ int ObTransService::submit_commit_tx(ObTxDesc &tx, if (tx.expire_ts_ <= ObClockGenerator::getClock()) { TX_STAT_TIMEOUT_INC TRANS_LOG(WARN, "tx has timeout, it has rollbacked internally", K_(tx.expire_ts), K(tx)); + tx.print_trace_(); ret = OB_TRANS_ROLLBACKED; handle_tx_commit_result_(tx, OB_TRANS_ROLLBACKED); } else if (tx.flags_.PARTS_INCOMPLETE_) { @@ -1837,7 +1838,8 @@ inline int ObTransService::sync_rollback_savepoint__(ObTxDesc &tx, // interrupted, fail fastly if (tx.flags_.INTERRUPTED_) { ret = OB_ERR_INTERRUPTED; - TRANS_LOG(WARN, "rollback was interrupted", K_(tx.tx_id), + TRANS_LOG(WARN, "rollback was interrupted", "caused_by", ObTxAbortCauseNames::of(tx.abort_cause_), + "trans_id", tx.tx_id_, K(remain_cnt), K(waittime), K(retries)); } } @@ -2010,7 +2012,12 @@ OB_INLINE int ObTransService::tx_sanity_check_(ObTxDesc &tx, const bool in_stmt) break; } case ObTxDesc::State::ABORTED: - ret = tx.abort_cause_ < 0 ? tx.abort_cause_ : OB_TRANS_NEED_ROLLBACK; + { + const int cause = tx.abort_cause_; + ret = cause < 0 ? cause : OB_TRANS_NEED_ROLLBACK; + const char *err_name = cause < 0 ? common::ob_error_name(cause) : ObTxAbortCauseNames::of(cause); + TRANS_LOG(WARN, "trans has been aborted", "caused_by", err_name, K(ret), "txid", tx.tx_id_); + } break; case ObTxDesc::State::COMMITTED: ret = OB_TRANS_COMMITED; diff --git a/unittest/storage/tx/it/test_tx_free_route.cpp b/unittest/storage/tx/it/test_tx_free_route.cpp index bc79fe56d..10696ff89 100644 --- a/unittest/storage/tx/it/test_tx_free_route.cpp +++ b/unittest/storage/tx/it/test_tx_free_route.cpp @@ -12,6 +12,7 @@ #include #include +#include #include #define private public #define protected public @@ -205,7 +206,8 @@ class MockObServer { public: MockObServer(const int64_t ls_id, const char*addr, const int32_t port, MsgBus &msg_bus) : addr_(ObAddr(ObAddr::VER::IPV4, addr, port)), - tx_node_(ls_id, addr_, msg_bus), + tx_node_ptr_(new ObTxNode(ls_id, addr_, msg_bus)), + tx_node_(*tx_node_ptr_), allocator_(), session_() { session_.test_init(1, 1111, 2222, &allocator_); @@ -218,6 +220,7 @@ public: tx_node_.release_tx(*session_.get_tx_desc()); session_.get_tx_desc() = NULL; } + delete tx_node_ptr_; } int start() { return tx_node_.start(); } public: @@ -261,7 +264,8 @@ private: int handle_msg_(int type, void *msg); int handle_msg_check_alive_resp__(ObTxFreeRouteCheckAliveRespMsg *msg); ObAddr addr_; - ObTxNode tx_node_; + ObTxNode *tx_node_ptr_; + ObTxNode &tx_node_; ObMalloc allocator_; sql::ObSQLSessionInfo session_; TO_STRING_KV(K_(tx_node), K_(session)); @@ -657,6 +661,9 @@ public: const testing::TestInfo* const test_info = testing::UnitTest::GetInstance()->current_test_info(); MTL_MEM_ALLOC_MGR.init(); + struct rlimit rl; + getrlimit(RLIMIT_STACK, &rl); + setrlimit(20*1024*1024, &rl); auto test_name = test_info->name(); _TRANS_LOG(INFO, ">>>> starting test : %s", test_name); } diff --git a/unittest/storage/tx/it/tx_node.cpp b/unittest/storage/tx/it/tx_node.cpp index 0777815db..c8170fd16 100644 --- a/unittest/storage/tx/it/tx_node.cpp +++ b/unittest/storage/tx/it/tx_node.cpp @@ -78,7 +78,25 @@ int ObTxDescGuard::release() { return ret; } - +ObString ObTxNode::get_identifer_str() const +{ + struct ID { + ObAddr addr; + int64_t ls_id; + DECLARE_TO_STRING { + int64_t pos = 0; + int32_t pos0 = 0; + addr.addr_to_buffer(buf, buf_len, pos0); + pos += pos0; + BUF_PRINTF("_%ld", ls_id); + return pos; + } + } identifer = { + .addr = addr_, + .ls_id = ls_id_.id() + }; + return ObString(to_cstring(&identifer)); +} ObTxNode::ObTxNode(const int64_t ls_id, const ObAddr &addr, MsgBus &msg_bus) : @@ -86,10 +104,10 @@ ObTxNode::ObTxNode(const int64_t ls_id, addr_(addr), ls_id_(ls_id), tenant_id_(1001), - tenant_(tenant_id_), + tenant_(tenant_id_, 10, *GCTX.cgroup_ctrl_), fake_part_trans_ctx_pool_(1001, false, false, 4), memtable_(NULL), - msg_consumer_(ObString("TxNode"), + msg_consumer_(get_identifer_str(), &msg_queue_, std::bind(&ObTxNode::handle_msg_, this, std::placeholders::_1)), @@ -102,8 +120,10 @@ ObTxNode::ObTxNode(const int64_t ls_id, addr.to_string(name_buf_, sizeof(name_buf_)); msg_consumer_.set_name(name_); role_ = Leader; + tenant_.enable_tenant_ctx_check_ = false; tenant_.set(&fake_tenant_freezer_); tenant_.set(&fake_part_trans_ctx_pool_); + tenant_.start(); ObTenantEnv::set_tenant(&tenant_); ObTableHandleV2 lock_memtable_handle; lock_memtable_handle.set_table(&lock_memtable_, &t3m_, ObITable::LOCK_MEMTABLE); @@ -237,7 +257,7 @@ void ObTxNode::dump_msg_queue_() int ret = OB_SUCCESS; MsgPack *msg = NULL; int i = 0; - while(OB_NOT_NULL((msg = (MsgPack*)msg_queue_.pop()))) { + while(OB_SUCC(msg_queue_.pop((ObLink*&)msg))) { ++i; MsgInfo msg_info; OZ (get_msg_info(msg, msg_info)); @@ -350,7 +370,7 @@ int ObTxNode::recv_msg(const ObAddr &sender, ObString &m) TRANS_LOG(INFO, "recv_msg", K(sender), "msg_ptr", OB_P(m.ptr()), KPC(this)); int ret = OB_SUCCESS; auto pkt = new MsgPack(sender, m); - msg_queue_.push(pkt); + OZ(msg_queue_.push(pkt)); msg_consumer_.wakeup(); return ret; } @@ -360,7 +380,7 @@ int ObTxNode::sync_recv_msg(const ObAddr &sender, ObString &m, ObString &resp) TRANS_LOG(INFO, "sync_recv_msg", K(sender), "msg_ptr", OB_P(m.ptr()), KPC(this)); int ret = OB_SUCCESS; auto pkt = new MsgPack(sender, m, true); - msg_queue_.push(pkt); + OZ(msg_queue_.push(pkt)); msg_consumer_.wakeup(); pkt->cond_.wait(pkt->cond_.get_key(), 500000); if (pkt->resp_ready_) { diff --git a/unittest/storage/tx/it/tx_node.h b/unittest/storage/tx/it/tx_node.h index 51ae21b7b..a5db261d2 100644 --- a/unittest/storage/tx/it/tx_node.h +++ b/unittest/storage/tx/it/tx_node.h @@ -19,6 +19,7 @@ #include "storage/tx/ob_trans_service.h" #include "storage/tx/ob_trans_part_ctx.h" #include "share/rc/ob_tenant_base.h" +#include "observer/omt/ob_tenant.h" #include "share/ob_alive_server_tracer.h" #include "storage/tablelock/ob_lock_memtable.h" #include "storage/ls/ob_ls_tx_service.h" @@ -47,7 +48,7 @@ class QueueConsumer : public share::ObThreadPool { public: QueueConsumer(ObString name, - ObSpScLinkQueue *q, + ObLinkQueue *q, std::function func): name_(name), queue_(q), func_(func), cond_() {} virtual int start() { @@ -65,8 +66,14 @@ public: ObThreadPool::stop(); } void run1() { + { + char name_buf[128]; + snprintf(name_buf, 128, "QConsumer:%s", name_.ptr()); + set_thread_name(name_buf); + } while(!stop_) { - ObLink *e = queue_->pop(); + ObLink *e = NULL; + queue_->pop(e); if (e) { T *t = static_cast(e); func_(t); @@ -80,11 +87,11 @@ public: } void wakeup() { if (ATOMIC_BCAS(&is_sleeping_, true, false)) { cond_.signal(); } } void set_name(ObString &name) { name_ = name; } - TO_STRING_KV(KP(this), K_(name), KP_(queue), K(queue_->empty()), K_(stop)); + TO_STRING_KV(KP(this), K_(name), KP_(queue), K(queue_->size()), K_(stop)); private: ObString name_; bool stop_; - ObSpScLinkQueue *queue_; + ObLinkQueue *queue_; std::function func_; common::SimpleCond cond_; bool is_sleeping_ = false; @@ -131,7 +138,8 @@ public: } public: - TO_STRING_KV(KP(this), K(addr_), K_(ls_id)); + TO_STRING_KV(KP(this), K(addr_), K_(ls_id), K(msg_queue_.size())); + ObString get_identifer_str() const; ObTxDescGuard get_tx_guard(); // the simple r/w interface int read(ObTxDesc &tx, const int64_t key, int64_t &value, const ObTxIsolationLevel iso = ObTxIsolationLevel::RC); @@ -271,13 +279,13 @@ public: ObAddr addr_; ObLSID ls_id_; int64_t tenant_id_; - ObTenantBase tenant_; + omt::ObTenant tenant_; common::ObServerObjectPool fake_part_trans_ctx_pool_; ObTransService txs_; memtable::ObMemtable *memtable_; ObSEArray columns_; // msg_handler - ObSpScLinkQueue msg_queue_; + ObLinkQueue msg_queue_; QueueConsumer msg_consumer_; // fake objects storage::ObTenantMetaMemMgr t3m_;