FEATURE: for readonly tx debug
This commit is contained in:
parent
d0837ae589
commit
31d65288ff
@ -275,6 +275,12 @@ int ObAllVirtualHADiagnose::insert_stat_(storage::DiagnoseInfo &diagnose_info)
|
||||
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(
|
||||
ObCharset::get_default_charset()));
|
||||
break;
|
||||
case READ_TX:
|
||||
cur_row_.cells_[i].set_varchar(diagnose_info.read_only_tx_info_);
|
||||
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(
|
||||
ObCharset::get_default_charset()));
|
||||
|
||||
break;
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
SERVER_LOG(WARN, "unkown column");
|
||||
|
@ -56,6 +56,7 @@ enum IOStatColumn
|
||||
ENABLE_VOTE,
|
||||
ARB_SRV_INFO,
|
||||
PARENT,
|
||||
READ_TX,
|
||||
};
|
||||
|
||||
class ObAllVirtualHADiagnose : public common::ObVirtualTableScannerIterator
|
||||
@ -82,4 +83,4 @@ private:
|
||||
};
|
||||
} // namespace observer
|
||||
} // namespace oceanbase
|
||||
#endif /* OCEANBASE_OBSERVER_OB_ALL_VIRTUAL_HA_DIAGNOSE_H_ */
|
||||
#endif /* OCEANBASE_OBSERVER_OB_ALL_VIRTUAL_HA_DIAGNOSE_H_ */
|
||||
|
@ -10627,6 +10627,21 @@ int ObInnerTableSchema::all_virtual_ha_diagnose_schema(ObTableSchema &table_sche
|
||||
false, //is_nullable
|
||||
false); //is_autoincrement
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ADD_COLUMN_SCHEMA("readonly_tx", //column_name
|
||||
++column_id, //column_id
|
||||
0, //rowkey_id
|
||||
0, //index_id
|
||||
0, //part_key_pos
|
||||
ObVarcharType, //column_type
|
||||
CS_TYPE_INVALID, //column_collation_type
|
||||
1024, //column_length
|
||||
-1, //column_precision
|
||||
-1, //column_scale
|
||||
false, //is_nullable
|
||||
false); //is_autoincrement
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
table_schema.get_part_option().set_part_num(1);
|
||||
table_schema.set_part_level(PARTITION_LEVEL_ONE);
|
||||
|
@ -13322,7 +13322,8 @@ def_table_schema(
|
||||
('enable_sync', 'bool'),
|
||||
('enable_vote', 'bool'),
|
||||
('arb_srv_info', 'varchar:1024'),
|
||||
('parent', 'varchar:1024')
|
||||
('parent', 'varchar:1024'),
|
||||
('readonly_tx', 'varchar:1024'),
|
||||
],
|
||||
|
||||
partition_columns = ['svr_ip', 'svr_port'],
|
||||
|
130
src/share/leak_checker/ob_leak_checker.h
Normal file
130
src/share/leak_checker/ob_leak_checker.h
Normal file
@ -0,0 +1,130 @@
|
||||
/**
|
||||
* Copyright (c) 2024 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
#ifndef OCEANBASE_SHARE_OB_LEAK_CHECKER_H_
|
||||
#define OCEANBASE_SHARE_OB_LEAK_CHECKER_H_
|
||||
#include "lib/hash/ob_hashmap.h"
|
||||
#include "lib/oblog/ob_log_module.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace share
|
||||
{
|
||||
|
||||
template <typename Key, typename Value>
|
||||
class ObBaseLeakChecker
|
||||
{
|
||||
typedef common::ObLinearHashMap<Key, Value> tenant_leak_checker_t;
|
||||
struct Printer
|
||||
{
|
||||
bool operator()(const Key &k, const Value &v)
|
||||
{
|
||||
bool ret = true;
|
||||
COMMON_LOG(INFO, "LEAK_CHECKER ",
|
||||
"key:", k,
|
||||
"value:", v);
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
public:
|
||||
ObBaseLeakChecker();
|
||||
~ObBaseLeakChecker();
|
||||
int init(const uint64_t tenant_id);
|
||||
void reset();
|
||||
void record(const Key &k, const Value &v, const int64_t max_cnt=INT64_MAX);
|
||||
void release(const Key &k, Value &value);
|
||||
template <typename Function> int for_each(Function &fn);
|
||||
void print();
|
||||
bool is_empty()
|
||||
{ return (ATOMIC_LOAD(&total_size_) == 0); }
|
||||
private:
|
||||
static constexpr int MEMORY_LIMIT = 128L << 20;
|
||||
static constexpr int MAP_SIZE_LIMIT = MEMORY_LIMIT / sizeof(Value);
|
||||
tenant_leak_checker_t checker_info_;
|
||||
int64_t total_size_;
|
||||
};
|
||||
|
||||
template<typename Key, typename Value>
|
||||
ObBaseLeakChecker<Key, Value>::ObBaseLeakChecker()
|
||||
{
|
||||
total_size_ = 0;
|
||||
}
|
||||
|
||||
template<typename Key, typename Value>
|
||||
ObBaseLeakChecker<Key, Value>::~ObBaseLeakChecker()
|
||||
{
|
||||
reset();
|
||||
}
|
||||
|
||||
template<typename Key, typename Value>
|
||||
int ObBaseLeakChecker<Key, Value>::init(const uint64_t tenant_id)
|
||||
{
|
||||
ObMemAttr attr(tenant_id, "leakChecker", ObCtxIds::DEFAULT_CTX_ID,
|
||||
lib::OB_HIGH_ALLOC);
|
||||
int ret = checker_info_.init(attr);
|
||||
if (OB_FAIL(ret)) {
|
||||
COMMON_LOG(ERROR, "failed to create hashmap", K(ret));
|
||||
} else {
|
||||
COMMON_LOG(INFO, "leak checker init succ");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<typename Key, typename Value>
|
||||
void ObBaseLeakChecker<Key, Value>::reset()
|
||||
{
|
||||
checker_info_.reset();
|
||||
total_size_ = 0;
|
||||
}
|
||||
|
||||
template<typename Key, typename Value>
|
||||
void ObBaseLeakChecker<Key, Value>::record(const Key &k, const Value &v, const int64_t max_cnt)
|
||||
{
|
||||
INIT_SUCC(ret);
|
||||
if (total_size_ < OB_MIN(MAP_SIZE_LIMIT, max_cnt)) {
|
||||
if (OB_FAIL(checker_info_.insert(k, v))) {
|
||||
COMMON_LOG(WARN, "Fail to register leak info", K(ret), K(k), K(v));
|
||||
} else {
|
||||
ATOMIC_INC(&total_size_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<typename Key, typename Value>
|
||||
void ObBaseLeakChecker<Key, Value>::release(const Key &k, Value &value)
|
||||
{
|
||||
INIT_SUCC(ret);
|
||||
if (OB_FAIL(checker_info_.erase(k, value))) {
|
||||
COMMON_LOG(WARN, "Fail to unregister leak info", K(ret), K(k));
|
||||
} else {
|
||||
ATOMIC_DEC(&total_size_);
|
||||
}
|
||||
}
|
||||
|
||||
template<typename Key, typename Value>
|
||||
void ObBaseLeakChecker<Key, Value>::print()
|
||||
{
|
||||
Printer fn;
|
||||
for_each(fn);
|
||||
}
|
||||
|
||||
template <typename Key, typename Value>
|
||||
template <typename Function>
|
||||
int ObBaseLeakChecker<Key, Value>::for_each(Function &fn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ret = checker_info_.for_each(fn);
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // storage
|
||||
} // oceanbase
|
||||
#endif // OCEANBASE_SHARE_OB_LEAK_CHECKER_H_
|
@ -863,6 +863,9 @@ DEF_CAP(_parallel_redo_logging_trigger, OB_CLUSTER_PARAMETER, "16M", "[0B,)",
|
||||
DEF_TIME(_ob_get_gts_ahead_interval, OB_CLUSTER_PARAMETER, "0s", "[0s, 1s]",
|
||||
"get gts ahead interval. Range: [0s, 1s]",
|
||||
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_INT(_tx_debug_level, OB_TENANT_PARAMETER, "0", "[0, 10]",
|
||||
"the debug level of transaction module. Range: [0, 10] in integer.",
|
||||
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
|
||||
//// rpc config
|
||||
DEF_TIME(rpc_timeout, OB_CLUSTER_PARAMETER, "2s",
|
||||
|
@ -2182,6 +2182,7 @@ int ObBasicSessionInfo::set_cur_phy_plan(ObPhysicalPlan *cur_phy_plan)
|
||||
} else {
|
||||
cur_phy_plan_ = cur_phy_plan;
|
||||
plan_id_ = cur_phy_plan->get_plan_id();
|
||||
ash_stat_.plan_id_ = plan_id_;
|
||||
int64_t len = cur_phy_plan->stat_.sql_id_.length();
|
||||
MEMCPY(sql_id_, cur_phy_plan->stat_.sql_id_.ptr(), len);
|
||||
sql_id_[len] = '\0';
|
||||
|
@ -78,6 +78,7 @@
|
||||
#ifdef OB_BUILD_SHARED_STORAGE
|
||||
#include "close_modules/shared_storage/storage/shared_storage/ob_public_block_gc_service.h"
|
||||
#endif
|
||||
#include "storage/tx_storage/ob_tx_leak_checker.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -720,6 +721,7 @@ bool ObLS::safe_to_destroy()
|
||||
K(ret), KP(this), KPC(this));
|
||||
ref_mgr_.print();
|
||||
PRINT_OBJ_LEAK(MTL_ID(), share::LEAK_CHECK_OBJ_LS_HANDLE);
|
||||
READ_CHECKER_PRINT(ls_meta_.ls_id_);
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("this ls is safe to destroy", KP(this), KPC(this));
|
||||
@ -2506,6 +2508,8 @@ int ObLS::diagnose(DiagnoseInfo &info) const
|
||||
// election, palf, log handler角色不统一时可能出现无主
|
||||
STORAGE_LOG(WARN, "diagnose rc service failed", K(ret), K(ls_id));
|
||||
}
|
||||
DiagnoseFunctor fn(MTL_ID(), ls_id, info.read_only_tx_info_, 0, sizeof(info.read_only_tx_info_));
|
||||
READ_CHECKER_FOR_EACH(fn);
|
||||
STORAGE_LOG(INFO, "diagnose finish", K(ret), K(info), K(ls_id));
|
||||
return ret;
|
||||
}
|
||||
|
@ -146,6 +146,7 @@ struct DiagnoseInfo
|
||||
#ifdef OB_BUILD_ARBITRATION
|
||||
logservice::LogArbSrvDiagnoseInfo arb_srv_diagnose_info_;
|
||||
#endif
|
||||
char read_only_tx_info_[1024];
|
||||
TO_STRING_KV(K(ls_id_),
|
||||
K(log_handler_diagnose_info_),
|
||||
K(palf_diagnose_info_),
|
||||
@ -158,7 +159,7 @@ struct DiagnoseInfo
|
||||
#ifdef OB_BUILD_ARBITRATION
|
||||
,K(arb_srv_diagnose_info_)
|
||||
#endif
|
||||
);
|
||||
,K(read_only_tx_info_));
|
||||
void reset() {
|
||||
ls_id_ = -1;
|
||||
log_handler_diagnose_info_.reset();
|
||||
@ -172,6 +173,7 @@ struct DiagnoseInfo
|
||||
#ifdef OB_BUILD_ARBITRATION
|
||||
arb_srv_diagnose_info_.reset();
|
||||
#endif
|
||||
read_only_tx_info_[0] = '\0';
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -26,6 +26,7 @@
|
||||
#include "logservice/ob_log_base_header.h"
|
||||
#include "share/scn.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
#include "storage/tx_storage/ob_tx_leak_checker.h"
|
||||
#include "storage/checkpoint/ob_checkpoint_diagnose.h"
|
||||
|
||||
namespace oceanbase
|
||||
@ -51,7 +52,6 @@ int ObLSTxService::init(const ObLSID &ls_id,
|
||||
ls_id_ = ls_id;
|
||||
mgr_ = mgr;
|
||||
trans_service_ = trans_service;
|
||||
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -186,6 +186,8 @@ int ObLSTxService::get_read_store_ctx(const ObTxReadSnapshot &snapshot,
|
||||
ret = trans_service_->get_read_store_ctx(snapshot, read_latest, lock_timeout, store_ctx);
|
||||
if (OB_FAIL(ret)) {
|
||||
mgr_->end_readonly_request();
|
||||
} else {
|
||||
READ_CHECKER_RECORD(store_ctx);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -228,9 +230,12 @@ int ObLSTxService::get_read_store_ctx(const SCN &snapshot,
|
||||
} else {
|
||||
store_ctx.ls_id_ = ls_id_;
|
||||
store_ctx.is_read_store_ctx_ = true;
|
||||
|
||||
ret = trans_service_->get_read_store_ctx(snapshot, lock_timeout, store_ctx);
|
||||
if (OB_FAIL(ret)) {
|
||||
mgr_->end_readonly_request();
|
||||
} else {
|
||||
READ_CHECKER_RECORD(store_ctx);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -290,6 +295,7 @@ int ObLSTxService::revert_store_ctx(storage::ObStoreCtx &store_ctx) const
|
||||
tmp_ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "mgr is null", K(tmp_ret), KP(this));
|
||||
} else {
|
||||
READ_CHECKER_RELEASE(store_ctx);
|
||||
(void)mgr_->end_readonly_request();
|
||||
}
|
||||
}
|
||||
@ -344,6 +350,7 @@ int ObLSTxService::check_all_readonly_tx_clean_up() const
|
||||
if (REACH_TIME_INTERVAL(5000000)) {
|
||||
TRANS_LOG(INFO, "readonly requests are active", K(active_readonly_request_count));
|
||||
mgr_->dump_readonly_request(3);
|
||||
READ_CHECKER_PRINT(ls_id_);
|
||||
}
|
||||
ret = OB_EAGAIN;
|
||||
} else if ((total_request_by_transfer_dest = mgr_->get_total_request_by_transfer_dest()) > 0) {
|
||||
|
@ -77,6 +77,7 @@ void ObStoreCtx::reset()
|
||||
mvcc_acc_ctx_.reset();
|
||||
tablet_stat_.reset();
|
||||
is_read_store_ctx_ = false;
|
||||
check_seq_ = 0;
|
||||
}
|
||||
|
||||
int ObStoreCtx::init_for_read(const ObLSID &ls_id,
|
||||
|
@ -496,6 +496,7 @@ struct ObStoreCtx
|
||||
memtable::ObMvccAccessCtx mvcc_acc_ctx_; // all txn relative context
|
||||
storage::ObTabletStat tablet_stat_; // used for collecting query statistics
|
||||
bool is_read_store_ctx_;
|
||||
int64_t check_seq_;
|
||||
};
|
||||
|
||||
|
||||
|
@ -65,7 +65,9 @@ ObTransService::ObTransService()
|
||||
#ifdef ENABLE_DEBUG_LOG
|
||||
defensive_check_mgr_(NULL),
|
||||
#endif
|
||||
tx_desc_mgr_(*this)
|
||||
tx_desc_mgr_(*this),
|
||||
tx_debug_seq_(0),
|
||||
read_only_checker_()
|
||||
{
|
||||
check_env_();
|
||||
}
|
||||
@ -165,6 +167,8 @@ int ObTransService::init(const ObAddr &self,
|
||||
TRANS_LOG(WARN, "init rollback msg map failed", KR(ret));
|
||||
} else if (OB_FAIL(tablet_to_ls_cache_.init(tenant_id, &tx_ctx_mgr_))) {
|
||||
TRANS_LOG(WARN, "init tablet to ls cache failed", K(ret));
|
||||
} else if (OB_FAIL(read_only_checker_.init(tenant_id))) {
|
||||
TRANS_LOG(WARN, "read only checker init failed", K(ret));
|
||||
} else {
|
||||
self_ = self;
|
||||
tenant_id_ = tenant_id;
|
||||
|
@ -47,6 +47,7 @@
|
||||
#include "ob_tx_free_route.h"
|
||||
#include "ob_tx_free_route_msg.h"
|
||||
#include "ob_tablet_to_ls_cache.h"
|
||||
#include "src/storage/tx_storage/ob_tx_leak_checker.h"
|
||||
|
||||
#define MAX_REDO_SYNC_TASK_COUNT 10
|
||||
|
||||
@ -195,6 +196,9 @@ public:
|
||||
int push(void *task);
|
||||
virtual void handle(void *task) override;
|
||||
public:
|
||||
ObReadOnlyTxChecker &get_read_tx_checker() { return read_only_checker_; }
|
||||
int64_t get_unique_seq()
|
||||
{ return ATOMIC_AAF(&tx_debug_seq_, 1); }
|
||||
int check_trans_partition_leader_unsafe(const share::ObLSID &ls_id, bool &is_leader);
|
||||
int get_weak_read_snapshot(const uint64_t tenant_id, share::SCN &snapshot_version);
|
||||
int calculate_trans_cost(const ObTransID &tid, uint64_t &cost);
|
||||
@ -354,6 +358,10 @@ private:
|
||||
int64_t rollback_sp_msg_sequence_;
|
||||
// for rollback-savepoint msg resp callback to find tx_desc
|
||||
share::ObLightHashMap<ObCommonID, ObRollbackSPMsgGuard, ObRollbackSPMsgGuardAlloc, common::SpinRWLock, 1 << 16 /*bucket_num*/> rollback_sp_msg_mgr_;
|
||||
|
||||
// tenant level atomic inc seq, just for debug
|
||||
int64_t tx_debug_seq_;
|
||||
ObReadOnlyTxChecker read_only_checker_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTransService);
|
||||
};
|
||||
|
313
src/storage/tx_storage/ob_tx_leak_checker.h
Normal file
313
src/storage/tx_storage/ob_tx_leak_checker.h
Normal file
@ -0,0 +1,313 @@
|
||||
/**
|
||||
* Copyright (c) 2024 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
#ifndef OCEANBASE_STORAGE_OB_TX_LEAK_CHECKER_H_
|
||||
#define OCEANBASE_STORAGE_OB_TX_LEAK_CHECKER_H_
|
||||
#include "share/leak_checker/ob_leak_checker.h"
|
||||
#include "common/ob_tablet_id.h"
|
||||
#include "share/ob_ls_id.h"
|
||||
#include "lib/profile/ob_trace_id.h"
|
||||
#include "storage/ob_common_id_utils.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace storage
|
||||
{
|
||||
|
||||
struct ObReadOnlyTxCheckerKey
|
||||
{
|
||||
public:
|
||||
ObReadOnlyTxCheckerKey()
|
||||
: tenant_id_(0), seq_(0)
|
||||
{}
|
||||
~ObReadOnlyTxCheckerKey() = default;
|
||||
int hash(uint64_t &hash_value) const
|
||||
{
|
||||
hash_value = seq_;
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
OB_INLINE bool is_valid() const
|
||||
{
|
||||
return (seq_ != 0);
|
||||
}
|
||||
bool operator== (const ObReadOnlyTxCheckerKey &other) const
|
||||
{
|
||||
return (tenant_id_ == other.tenant_id_
|
||||
&& seq_ == other.seq_);
|
||||
}
|
||||
TO_STRING_KV(K_(tenant_id), K_(seq));
|
||||
public:
|
||||
uint64_t tenant_id_;
|
||||
int64_t seq_;
|
||||
};
|
||||
|
||||
struct ObReadExInfo
|
||||
{
|
||||
public:
|
||||
enum {
|
||||
INVALID_TYPE = 0,
|
||||
WITH_PLAN = 1,
|
||||
WITH_TRACE = 2,
|
||||
WITH_LBT = 3
|
||||
};
|
||||
ObReadExInfo() : type_(INVALID_TYPE) {}
|
||||
~ObReadExInfo() {}
|
||||
TO_STRING_KV(K_(type));
|
||||
public:
|
||||
int64_t type_;
|
||||
};
|
||||
|
||||
struct ObReadExInfoPlan : public ObReadExInfo
|
||||
{
|
||||
public:
|
||||
ObReadExInfoPlan()
|
||||
: plan_id_(0)
|
||||
{ type_ = WITH_PLAN; }
|
||||
~ObReadExInfoPlan() {}
|
||||
INHERIT_TO_STRING_KV("ObReadExInfo", ObReadExInfo, K_(plan_id));
|
||||
public:
|
||||
int64_t plan_id_;
|
||||
};
|
||||
|
||||
struct ObReadExInfoTrace : public ObReadExInfoPlan
|
||||
{
|
||||
public:
|
||||
ObReadExInfoTrace()
|
||||
: trace_id_()
|
||||
{ type_ = WITH_TRACE; }
|
||||
~ObReadExInfoTrace() {}
|
||||
INHERIT_TO_STRING_KV("ObReadExInfoPlan", ObReadExInfoPlan, K_(trace_id));
|
||||
public:
|
||||
common::ObCurTraceId::TraceId trace_id_;
|
||||
};
|
||||
|
||||
struct ObReadExInfoBT : public ObReadExInfoTrace
|
||||
{
|
||||
public:
|
||||
ObReadExInfoBT() { type_ = WITH_LBT; bt_[0] = '\0'; }
|
||||
~ObReadExInfoBT() {}
|
||||
INHERIT_TO_STRING_KV("ObReadExInfoTrace", ObReadExInfoTrace, K_(bt));
|
||||
public:
|
||||
char bt_[512];
|
||||
};
|
||||
|
||||
struct ObReadOnlyTxCheckerValue
|
||||
{
|
||||
public:
|
||||
ObReadOnlyTxCheckerValue()
|
||||
: tenant_id_(0),
|
||||
timestamp_(0),
|
||||
ls_id_(),
|
||||
tablet_id_(),
|
||||
extra_(nullptr)
|
||||
{
|
||||
}
|
||||
~ObReadOnlyTxCheckerValue() = default;
|
||||
TO_STRING_KV(K_(tenant_id), K_(timestamp), K_(ls_id),
|
||||
K_(tablet_id), KPC_(extra));
|
||||
public:
|
||||
uint64_t tenant_id_;
|
||||
int64_t timestamp_;
|
||||
share::ObLSID ls_id_;
|
||||
common::ObTabletID tablet_id_;
|
||||
ObReadExInfo *extra_;
|
||||
};
|
||||
|
||||
struct ObReadOnlyTxPrinter
|
||||
{
|
||||
bool operator()(const ObReadOnlyTxCheckerKey &k, const ObReadOnlyTxCheckerValue &v)
|
||||
{
|
||||
bool ret = true;
|
||||
if (v.tenant_id_ == tenant_id_ && v.ls_id_ == ls_id_) {
|
||||
if (OB_ISNULL(v.extra_)) {
|
||||
COMMON_LOG(INFO, "LEAK_CHECKER ",
|
||||
"key", k,
|
||||
"value", v);
|
||||
} else if (OB_NOT_NULL(v.extra_)) {
|
||||
if (v.extra_->type_ >= ObReadExInfo::WITH_LBT) {
|
||||
ObReadExInfoBT *extra_info = static_cast<ObReadExInfoBT *>(v.extra_);
|
||||
COMMON_LOG(INFO, "LEAK_CHECKER ",
|
||||
"key", k,
|
||||
"value", v,
|
||||
KPC(extra_info));
|
||||
} else if (v.extra_->type_ >= ObReadExInfo::WITH_TRACE) {
|
||||
ObReadExInfoTrace *extra_info = static_cast<ObReadExInfoTrace *>(v.extra_);
|
||||
COMMON_LOG(INFO, "LEAK_CHECKER ",
|
||||
"key", k,
|
||||
"value", v,
|
||||
KPC(extra_info));
|
||||
} else if (v.extra_->type_ >= ObReadExInfo::WITH_PLAN) {
|
||||
ObReadExInfoPlan *extra_info = static_cast<ObReadExInfoPlan *>(v.extra_);
|
||||
COMMON_LOG(INFO, "LEAK_CHECKER ",
|
||||
"key", k,
|
||||
"value", v,
|
||||
KPC(extra_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
public:
|
||||
uint64_t tenant_id_;
|
||||
share::ObLSID ls_id_;
|
||||
};
|
||||
|
||||
struct DiagnoseFunctor
|
||||
{
|
||||
DiagnoseFunctor(const uint64_t tenant_id,
|
||||
const share::ObLSID ls_id,
|
||||
char *buf,
|
||||
const int64_t pos,
|
||||
const int64_t len)
|
||||
: tenant_id_(tenant_id), ls_id_(ls_id),
|
||||
buf_(buf), pos_(pos), len_(len)
|
||||
{}
|
||||
~DiagnoseFunctor() {}
|
||||
bool operator()(const ObReadOnlyTxCheckerKey &k, const ObReadOnlyTxCheckerValue &v)
|
||||
{
|
||||
bool bool_ret = true;
|
||||
int ret = OB_SUCCESS;
|
||||
if (v.tenant_id_ == tenant_id_ && v.ls_id_ == ls_id_) {
|
||||
if (OB_FAIL(databuff_printf(buf_, len_, pos_, "{tablet_id:%ld",
|
||||
v.tablet_id_.id()))){
|
||||
// break the print
|
||||
bool_ret = false;
|
||||
}
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(v.extra_)) {
|
||||
if (OB_SUCC(ret) && v.extra_->type_ >= ObReadExInfo::WITH_PLAN) {
|
||||
ObReadExInfoPlan *info = static_cast<ObReadExInfoPlan *>(v.extra_);
|
||||
if (OB_FAIL(databuff_printf(buf_, len_, pos_, ", plan_id:%ld",
|
||||
info->plan_id_))){
|
||||
// break the print
|
||||
bool_ret = false;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && v.extra_->type_ >= ObReadExInfo::WITH_TRACE) {
|
||||
ObReadExInfoTrace *info = static_cast<ObReadExInfoTrace *>(v.extra_);
|
||||
if (OB_FAIL(databuff_printf(buf_, len_, pos_, ", trace:%s",
|
||||
to_cstring(info->trace_id_)))){
|
||||
// break the print
|
||||
bool_ret = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(databuff_printf(buf_, len_, pos_, "}, "))){
|
||||
// break the print
|
||||
bool_ret = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return bool_ret;
|
||||
}
|
||||
public:
|
||||
uint64_t tenant_id_;
|
||||
share::ObLSID ls_id_;
|
||||
char *buf_;
|
||||
int64_t pos_;
|
||||
int64_t len_;
|
||||
};
|
||||
|
||||
const static int64_t TX_DEBUG_LEVEL_CACHE_REFRESH_INTERVAL = 1_s;
|
||||
static int64_t get_tx_debug_level()
|
||||
{
|
||||
RLOCAL_INIT(int64_t, last_check_timestamp, 0);
|
||||
RLOCAL_INIT(int64_t, last_result, 0);
|
||||
int64_t current_time = ObClockGenerator::getClock();
|
||||
if (current_time - last_check_timestamp < TX_DEBUG_LEVEL_CACHE_REFRESH_INTERVAL) {
|
||||
// Check once when the last memory burst or tenant_id does not match or the interval reaches the threshold
|
||||
} else {
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
|
||||
if (OB_LIKELY(tenant_config.is_valid())) {
|
||||
last_result = tenant_config->_tx_debug_level;
|
||||
}
|
||||
last_check_timestamp = current_time;
|
||||
}
|
||||
|
||||
return last_result;
|
||||
}
|
||||
|
||||
typedef share::ObBaseLeakChecker<ObReadOnlyTxCheckerKey, ObReadOnlyTxCheckerValue> ObReadOnlyTxChecker;
|
||||
#define READ_CHECKER_RECORD(ctx) \
|
||||
do { \
|
||||
const static int64_t MAX_RECORD_CNT = 100000; /* 10W * 0.5K = 50MB */ \
|
||||
int64_t tx_debug_level = get_tx_debug_level(); \
|
||||
if (OB_LIKELY(tx_debug_level <= 0)) { \
|
||||
} else { \
|
||||
ObReadOnlyTxCheckerKey key; \
|
||||
ObReadOnlyTxCheckerValue value; \
|
||||
key.seq_ = MTL(ObTransService *)->get_unique_seq(); \
|
||||
key.tenant_id_ = MTL_ID(); \
|
||||
value.tenant_id_ = MTL_ID(); \
|
||||
value.timestamp_ = ObClockGenerator::getClock(); \
|
||||
value.ls_id_ = ctx.ls_id_; \
|
||||
value.tablet_id_ = ctx.tablet_id_; \
|
||||
ctx.check_seq_ = key.seq_; \
|
||||
if (OB_UNLIKELY(tx_debug_level >= 4)) { \
|
||||
void* buf = ob_malloc(sizeof(ObReadExInfoBT), ObMemAttr(MTL_ID(), "readleakchecker")); \
|
||||
if (OB_NOT_NULL(buf)) { \
|
||||
ObReadExInfoBT *extra_info = new(buf) ObReadExInfoBT(); \
|
||||
extra_info->trace_id_ = *(ObCurTraceId::get_trace_id()); \
|
||||
extra_info->plan_id_ = ObActiveSessionGuard::get_stat().plan_id_; \
|
||||
lbt(extra_info->bt_, sizeof(extra_info->bt_)); \
|
||||
value.extra_ = extra_info; \
|
||||
} \
|
||||
} else if (OB_UNLIKELY(tx_debug_level >= 3)) { \
|
||||
void* buf = ob_malloc(sizeof(ObReadExInfoTrace), ObMemAttr(MTL_ID(), "readleakchecker")); \
|
||||
if (OB_NOT_NULL(buf)) { \
|
||||
ObReadExInfoTrace *extra_info = new(buf) ObReadExInfoTrace(); \
|
||||
extra_info->trace_id_ = *(ObCurTraceId::get_trace_id()); \
|
||||
extra_info->plan_id_ = ObActiveSessionGuard::get_stat().plan_id_; \
|
||||
value.extra_ = extra_info; \
|
||||
} \
|
||||
} else if (OB_UNLIKELY(tx_debug_level >= 2)) { \
|
||||
void* buf = ob_malloc(sizeof(ObReadExInfoPlan), ObMemAttr(MTL_ID(), "readleakchecker")); \
|
||||
if (OB_NOT_NULL(buf)) { \
|
||||
ObReadExInfoPlan *extra_info = new(buf) ObReadExInfoPlan(); \
|
||||
extra_info->plan_id_ = ObActiveSessionGuard::get_stat().plan_id_; \
|
||||
value.extra_ = extra_info; \
|
||||
} \
|
||||
} else if (OB_LIKELY(tx_debug_level >= 1)) { \
|
||||
} \
|
||||
MTL(ObTransService *)->get_read_tx_checker().record(key, value, MAX_RECORD_CNT); \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
#define READ_CHECKER_RELEASE(ctx) \
|
||||
do { \
|
||||
if (OB_UNLIKELY(!MTL(ObTransService *)->get_read_tx_checker().is_empty())) { \
|
||||
ObReadOnlyTxCheckerKey key; \
|
||||
ObReadOnlyTxCheckerValue value; \
|
||||
key.seq_ = ctx.check_seq_; \
|
||||
key.tenant_id_ = MTL_ID(); \
|
||||
MTL(ObTransService *)->get_read_tx_checker().release(key, value); \
|
||||
if (OB_NOT_NULL(value.extra_)) { \
|
||||
ob_free(value.extra_); \
|
||||
} \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
#define READ_CHECKER_PRINT(ls_id) \
|
||||
do { \
|
||||
ObReadOnlyTxPrinter fn; \
|
||||
fn.tenant_id_ = MTL_ID(); \
|
||||
fn.ls_id_ = ls_id; \
|
||||
MTL(ObTransService *)->get_read_tx_checker().for_each(fn); \
|
||||
} while(0)
|
||||
|
||||
#define READ_CHECKER_FOR_EACH(fn) \
|
||||
do { \
|
||||
MTL(ObTransService *)->get_read_tx_checker().for_each(fn); \
|
||||
} while(0)
|
||||
|
||||
} // storage
|
||||
} // oceanbase
|
||||
|
||||
#endif // OCEANBASE_STORAGE_OB_TX_LEAK_CHECKER_H_
|
@ -490,6 +490,7 @@ _transfer_start_trans_timeout
|
||||
_transfer_task_retry_interval
|
||||
_transfer_task_tablet_count_threshold
|
||||
_tx_data_memory_limit_percentage
|
||||
_tx_debug_level
|
||||
_tx_result_retention
|
||||
_tx_share_memory_limit_percentage
|
||||
_upgrade_stage
|
||||
|
@ -7468,6 +7468,7 @@ enable_sync tinyint(4) NO NULL
|
||||
enable_vote tinyint(4) NO NULL
|
||||
arb_srv_info varchar(1024) NO NULL
|
||||
parent varchar(1024) NO NULL
|
||||
readonly_tx varchar(1024) NO NULL
|
||||
select /*+QUERY_TIMEOUT(60000000)*/ IF(count(*) >= 0, 1, 0) from oceanbase.__all_virtual_ha_diagnose;
|
||||
IF(count(*) >= 0, 1, 0)
|
||||
1
|
||||
|
Loading…
x
Reference in New Issue
Block a user