[OBCDC] Fix memleak of SeArray; Fix wrong usage of inner table/database schema name

This commit is contained in:
SanmuWangZJU
2024-03-19 08:33:57 +00:00
committed by ob-robot
parent cdba49267b
commit acfd2c02d3
13 changed files with 246 additions and 34 deletions

View File

@ -31,6 +31,7 @@ ob_set_subtarget(obcdc_object_list common
ob_cdc_global_info.cpp
ob_cdc_lob_aux_table_schema_info.cpp
ob_cdc_lob_aux_table_parse.cpp
ob_cdc_malloc_sample_info.cpp
ob_cdc_server_endpoint_access_info.cpp
ob_cdc_tenant_endpoint_provider.cpp
ob_cdc_udt.cpp

View File

@ -0,0 +1,83 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*
* OBCDC malloc sample info
*/
#define USING_LOG_PREFIX OBLOG
#include <algorithm>
#include "ob_cdc_malloc_sample_info.h"
#include "ob_log_utils.h"
namespace oceanbase
{
namespace libobcdc
{
int ObCDCMallocSampleInfo::init(const ObMallocSampleMap &sample_map)
{
int ret = OB_SUCCESS;
lib::ObMallocSampleMap::const_iterator it = sample_map.begin();
for (; OB_SUCC(ret) && it != sample_map.end(); ++it) {
ObCDCMallocSamplePair pair(it->first, it->second);
if (OB_FAIL(samples_.push_back(pair))) {
LOG_ERROR("push_back malloc sample into array failed", KR(ret));
}
}
if (OB_SUCC(ret)) {
std::sort(samples_.begin(), samples_.end(), ObCDCMallocSamplePairCompartor());
}
return ret;
}
void ObCDCMallocSampleInfo::print_topk(int64_t k)
{
if (OB_LIKELY(k > 0)) {
const int64_t sample_cnt = samples_.count();
for (int i = 0; i < std::min(sample_cnt, k); i++) {
char bt_str[lib::MAX_BACKTRACE_LENGTH];
ObCDCMallocSamplePair &pair = samples_.at(i);
IGNORE_RETURN parray(bt_str, sizeof(bt_str), (int64_t*)pair.key_.bt_, lib::AOBJECT_BACKTRACE_COUNT);
_LOG_INFO("[DUMP_MALLOC_SAMPLE][TOP %d/%ld][tenant_id: %ld][label: %s][ctx_id: %ld][alloc_size/alloc_cnt: %s/%ld][bt: %s]",
i, k, pair.key_.tenant_id_, pair.key_.label_, pair.key_.ctx_id_, SIZE_TO_STR(pair.value_.alloc_bytes_), pair.value_.alloc_count_, bt_str);
}
}
}
void ObCDCMallocSampleInfo::print_with_filter(const char *label_str, const int64_t alloc_size)
{
if (OB_UNLIKELY(alloc_size > 0) && OB_NOT_NULL(label_str)) {
const int64_t sample_cnt = samples_.count();
LOG_INFO("print_with_filter", KCSTRING(label_str), K(alloc_size));
for (int i = 0; i < sample_cnt; i++) {
const ObCDCMallocSamplePair &pair = samples_.at(i);
if (pair.value_.alloc_bytes_ > alloc_size && ObLabel(pair.key_.label_) == ObLabel(label_str)) {
char bt_str[lib::MAX_BACKTRACE_LENGTH];
IGNORE_RETURN parray(bt_str, sizeof(bt_str), (int64_t*)pair.key_.bt_, lib::AOBJECT_BACKTRACE_COUNT);
_LOG_INFO("[DUMP_MALLOC_SAMPLE][FILTER][tenant_id: %ld][label: %s][ctx_id: %ld][alloc_size/alloc_cnt: %s/%ld][bt: %s]",
pair.key_.tenant_id_, pair.key_.label_, pair.key_.ctx_id_, SIZE_TO_STR(pair.value_.alloc_bytes_), pair.value_.alloc_count_, bt_str);
}
}
}
}
} // end of namespace libobcdc
} // end of namespace oceanbase

View File

@ -0,0 +1,86 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*
* OBCDC malloc sample info
*/
#ifndef OCEANBASE_LIBOBCDC_MALLOC_SAMPLE_INFO_H__
#define OCEANBASE_LIBOBCDC_MALLOC_SAMPLE_INFO_H__
#include "lib/alloc/ob_malloc_sample_struct.h"
#include "lib/container/ob_se_array.h"
namespace oceanbase
{
using namespace lib;
namespace libobcdc
{
struct ObCDCMallocSamplePair
{
ObCDCMallocSamplePair() : key_(), value_() {}
ObCDCMallocSamplePair(const ObMallocSampleKey &key, const ObMallocSampleValue &value): key_(), value_() { reset(key, value); }
ObCDCMallocSamplePair operator=(const ObCDCMallocSamplePair &other)
{
if (this != &other) {
reset(other.key_, other.value_);
}
return *this;
}
void reset(const ObMallocSampleKey &key, const ObMallocSampleValue &value)
{
key_.tenant_id_ = key.tenant_id_;
key_.ctx_id_ = key.ctx_id_;
STRNCPY(key_.label_, key.label_, sizeof(key.label_));
key_.label_[sizeof(key.label_) - 1] = '\0';
MEMCPY((char*)key_.bt_, key.bt_, AOBJECT_BACKTRACE_SIZE);
value_.alloc_bytes_ = value.alloc_bytes_;
value_.alloc_count_ = value.alloc_count_;
}
ObMallocSampleKey key_;
ObMallocSampleValue value_;
TO_STRING_KV("tenant_id", key_.tenant_id_,
"label", key_.label_,
"ctx_id", key_.ctx_id_,
"alloc_bytes", value_.alloc_bytes_,
"alloc_cnt", value_.alloc_count_);
};
struct ObCDCMallocSamplePairCompartor
{
bool operator()(const ObCDCMallocSamplePair &lhs, const ObCDCMallocSamplePair &rhs) const
{ return lhs.value_.alloc_bytes_ > rhs.value_.alloc_bytes_; }
};
typedef common::ObSEArray<ObCDCMallocSamplePair, 32> MallocSampleArray;
class ObCDCMallocSampleInfo
{
public:
ObCDCMallocSampleInfo() : samples_() {}
~ObCDCMallocSampleInfo() { reset(); }
void reset()
{
samples_.reset();
}
public:
int init(const ObMallocSampleMap &sample_map);
void print_topk(int64_t k);
void print_with_filter(const char *label_str, const int64_t alloc_size);
private:
MallocSampleArray samples_;
};
} // end of namespace libobcdc
} // end of namespace oceanbase
#endif

View File

@ -80,6 +80,27 @@ void MultiDataSourceInfo::reset()
ls_attr_arr_.reset();
tablet_change_info_arr_.reset();
has_ddl_trans_op_ = false;
for (int i = 0; i < dict_tenant_metas_.count(); ++i) {
const ObDictTenantMeta *meta = dict_tenant_metas_.at(i);
if (OB_NOT_NULL(meta)) {
meta->~ObDictTenantMeta();
}
}
for (int i = 0; i < dict_database_metas_.count(); ++i) {
const ObDictDatabaseMeta *meta = dict_database_metas_.at(i);
if (OB_NOT_NULL(meta)) {
meta->~ObDictDatabaseMeta();
}
}
for (int i = 0; i < dict_table_metas_.count(); ++i) {
const ObDictTableMeta *meta = dict_table_metas_.at(i);
if (OB_NOT_NULL(meta)) {
meta->~ObDictTableMeta();
}
}
dict_tenant_metas_.reset();
dict_database_metas_.reset();
dict_table_metas_.reset();

View File

@ -14,9 +14,11 @@
#define OCEANBASE_LIBOBCDC_TENANT_QUERYER_H_
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "share/inner_table/ob_inner_table_schema_constants.h"
#include "ob_log_utils.h"
namespace oceanbase
{
using namespace share;
namespace libobcdc
{
// ResultType = T

View File

@ -588,6 +588,9 @@ public:
DEF_CAP(lob_data_storage_memory_limit, OB_CLUSTER_PARAMETER, "1G", "[128M,]", "lob data storage memory limit");
T_DEF_INT_INFT(lob_data_storage_clean_interval_sec, OB_CLUSTER_PARAMETER, 5, 1,
"lob_data_storage clean task nterval in seconds");
DEF_TIME(print_mod_memory_usage_interval, OB_CLUSTER_PARAMETER, "1m", "[0s, ]", "print mod memory usage threshold");
DEF_CAP(print_mod_memory_usage_threshold, OB_CLUSTER_PARAMETER, "0M", "[0M,]", "print mod memory usage threshold");
DEF_STR(print_mod_memory_usage_label, OB_CLUSTER_PARAMETER, "|", "mod label for print memmory usage");
#undef OB_CLUSTER_PARAMETER

View File

@ -25,6 +25,7 @@
#include "sql/ob_sql_init.h" // init_sql_factories
#include "observer/omt/ob_tenant_timezone_mgr.h" // OTTZ_MGR
#include "common/ob_clock_generator.h"
#include "lib/alloc/ob_malloc_sample_struct.h"
#include "ob_log_common.h"
#include "ob_log_config.h" // ObLogConfig
@ -58,6 +59,7 @@
#include "ob_log_tenant_mgr.h" // IObLogTenantMgr
#include "ob_log_rocksdb_store_service.h" // RocksDbStoreService
#include "ob_cdc_auto_config_mgr.h" // CDC_CFG_MGR
#include "ob_cdc_malloc_sample_info.h" // ObCDCMallocSampleInfo
#include "ob_log_trace_id.h"
#include "share/ob_simple_mem_limit_getter.h"
@ -2177,6 +2179,7 @@ void ObLogInstance::timer_routine()
print_refresh_mode(refresh_mode_),
print_fetching_mode(fetching_mode_));
print_tenant_memory_usage_();
dump_malloc_sample_();
if (is_online_refresh_mode(refresh_mode_)) {
schema_getter_->print_stat_info();
}
@ -2463,6 +2466,32 @@ void ObLogInstance::print_tenant_memory_usage_()
}
}
void ObLogInstance::dump_malloc_sample_()
{
int ret = OB_SUCCESS;
static int64_t last_print_ts = 0;
lib::ObMallocSampleMap malloc_sample_map;
ObCDCMallocSampleInfo sample_info;
const int64_t cur_time = get_timestamp();
const int64_t print_interval = TCONF.print_mod_memory_usage_interval.get();
if (OB_LIKELY(last_print_ts + print_interval > cur_time)) {
} else if (OB_FAIL(malloc_sample_map.create(1000, "MallocInfoMap", "MallocInfoMap"))) {
LOG_WARN("init malloc_sample_map failed", KR(ret));
} else if (OB_FAIL(ObMemoryDump::get_instance().load_malloc_sample_map(malloc_sample_map))) {
LOG_WARN("load_malloc_sample_map failed", KR(ret));
} else if (OB_FAIL(sample_info.init(malloc_sample_map))) {
LOG_ERROR("init ob_cdc_malloc_sample_info failed", KR(ret));
} else {
const static int64_t top_mem_usage_mod_print_num = 5;
const int64_t print_mod_memory_usage_threshold = TCONF.print_mod_memory_usage_threshold.get();
const char *print_mod_memory_usage_label = TCONF.print_mod_memory_usage_label;
sample_info.print_topk(top_mem_usage_mod_print_num);
sample_info.print_with_filter(print_mod_memory_usage_label, print_mod_memory_usage_threshold);
last_print_ts = cur_time;
}
}
/// Global traffic control
/// Principle: 1. Keep the total number of active Partition Transaction Tasks (PartTransTask) under control by referring to the number of
/// Match the production rate with the consumption rate to avoid OOM

View File

@ -287,6 +287,7 @@ private:
int config_data_start_schema_version_(const int64_t global_data_start_schema_version);
int update_data_start_schema_version_on_split_mode_();
int set_all_tenant_compat_mode_();
void dump_malloc_sample_();
private:
static ObLogInstance *instance_;

View File

@ -39,7 +39,7 @@ int TenantLSQueryer::build_sql_statement_(const uint64_t tenant_id, ObSqlString
} else {
uint64_t snapshto_scn_val = snapshot_scn.get_val_for_inner_table_field();
if (OB_FAIL(sql.assign_fmt(QUERY_LS_INFO_SQL_FORMAT, "__ALL_LS", snapshto_scn_val, snapshto_scn_val))) {
if (OB_FAIL(sql.assign_fmt(QUERY_LS_INFO_SQL_FORMAT, OB_ALL_LS_TNAME, snapshto_scn_val, snapshto_scn_val))) {
LOG_ERROR("assign_fmt for TenantLSQuerySQL failed", KR(ret),
K_(snapshot_ts_ns), K(snapshot_scn), K(snapshto_scn_val), K(sql));
}

View File

@ -70,8 +70,6 @@ int MySQLConnConfig::reset(const ObAddr &svr,
}
////////////////////////////////////// ObLogMySQLConnector /////////////////////////////////
const char *ObLogMySQLConnector::DEFAULT_DB_NAME_MYSQL_MODE = "OCEANBASE";
const char *ObLogMySQLConnector::DEFAULT_DB_NAME_ORACLE_MODE = "SYS";
ObLogMySQLConnector::ObLogMySQLConnector() :
inited_(false),
@ -293,7 +291,7 @@ int ObLogMySQLConnector::init_conn_(const MySQLConnConfig &cfg,
else if (OB_FAIL(set_timeout_variable_(query_timeout_us, trx_timeout_us))) {
LOG_WARN("set_timeout_variable_ fail", KR(ret), K(query_timeout_us), K(trx_timeout_us), K(cfg));
} else {
db_name = is_oracle_mode() ? DEFAULT_DB_NAME_ORACLE_MODE: DEFAULT_DB_NAME_MYSQL_MODE;
db_name = is_oracle_mode() ? OB_ORA_SYS_SCHEMA_NAME : OB_SYS_DATABASE_NAME;
// first connect to db with empty db_name to get tenant_mode, then set default db according to tenant_mode
if (OB_ISNULL(db_name)) {

View File

@ -71,9 +71,7 @@ private:
const bool enable_ssl_client_authentication);
void destroy_conn_();
int set_timeout_variable_(const int64_t query_timeout, const int64_t trx_timeout);
public:
const static char *DEFAULT_DB_NAME_MYSQL_MODE;
const static char *DEFAULT_DB_NAME_ORACLE_MODE;
private:
bool inited_;
MYSQL *mysql_;

View File

@ -84,13 +84,13 @@ int ObLogMysqlProxy::init(
const char *db_name = nullptr;
if (is_tenant_server_provider) {
// tenant conn poll will always query via sys tenant
db_name = ObLogMySQLConnector::DEFAULT_DB_NAME_MYSQL_MODE;
db_name = OB_SYS_DATABASE_NAME;
} else if (OB_FAIL(detect_tenant_mode_(server_provider))) {
LOG_ERROR("detect_tenant_mode_ failed", KR(ret), K(cluster_user), K(cluster_password));
} else if (is_oracle_mode_) {
db_name = ObLogMySQLConnector::DEFAULT_DB_NAME_ORACLE_MODE;
db_name = OB_ORA_SYS_SCHEMA_NAME;
} else {
db_name = ObLogMySQLConnector::DEFAULT_DB_NAME_MYSQL_MODE;
db_name = OB_SYS_DATABASE_NAME;
}
ObConnPoolConfigParam conn_pool_config;
conn_pool_config.reset();

View File

@ -55,14 +55,13 @@ int QueryClusterIdStrategy::build_sql_statement(char *sql_buf,
{
int ret = OB_SUCCESS;
pos = 0;
const char *query_sql = "SELECT DISTINCT VALUE FROM GV$OB_PARAMETERS WHERE NAME = 'cluster_id'";
if (OB_ISNULL(sql_buf) || OB_UNLIKELY(mul_statement_buf_len <=0)) {
LOG_ERROR("invalid argument", K(sql_buf), K(mul_statement_buf_len));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(databuff_printf(sql_buf, mul_statement_buf_len, pos,
"%s", query_sql))) {
LOG_ERROR("build sql fail", KR(ret), K(pos), K(query_sql), "buf_size", mul_statement_buf_len, K(sql_buf));
"SELECT DISTINCT VALUE FROM %s WHERE NAME = 'cluster_id'", OB_GV_OB_PARAMETERS_TNAME))) {
LOG_ERROR("build sql fail", KR(ret), K(pos), "buf_size", mul_statement_buf_len, KCSTRING(sql_buf));
} else {
// succ
}
@ -78,18 +77,18 @@ int QueryObserverVersionStrategy::build_sql_statement(char *sql_buf,
int ret = OB_SUCCESS;
pos = 0;
const char *query_sql = nullptr;
if (TCTX.is_tenant_sync_mode()) {
query_sql = "SELECT DISTINCT VALUE FROM GV$OB_PARAMETERS WHERE NAME = 'min_observer_version'";
} else {
query_sql = "SELECT DISTINCT VALUE FROM __ALL_VIRTUAL_SYS_PARAMETER_STAT WHERE NAME = 'MIN_OBSERVER_VERSION'";
}
if (OB_ISNULL(sql_buf) || OB_UNLIKELY(mul_statement_buf_len <=0)) {
LOG_ERROR("invalid argument", K(sql_buf), K(mul_statement_buf_len));
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument", KR(ret), K(sql_buf), K(mul_statement_buf_len));
} else if (TCTX.is_tenant_sync_mode()) {
if (OB_FAIL(databuff_printf(sql_buf, mul_statement_buf_len, pos,
"SELECT DISTINCT VALUE FROM %s WHERE NAME = 'min_observer_version'", OB_GV_OB_PARAMETERS_TNAME))) {
LOG_ERROR("build sql failed", KR(ret), K(pos), "buf_size", mul_statement_buf_len, KCSTRING(sql_buf));
}
} else if (OB_FAIL(databuff_printf(sql_buf, mul_statement_buf_len, pos,
"%s", query_sql))) {
LOG_ERROR("build sql fail", KR(ret), K(pos), K(query_sql), "buf_size", mul_statement_buf_len, K(sql_buf));
"SELECT DISTINCT VALUE FROM %s WHERE NAME = 'MIN_OBSERVER_VERSION'", OB_ALL_VIRTUAL_SYS_PARAMETER_STAT_TNAME))) {
LOG_ERROR("build sql fail", KR(ret), K(pos), "buf_size", mul_statement_buf_len, KCSTRING(sql_buf));
} else {
// succ
}
@ -107,8 +106,6 @@ int QueryTimeZoneInfoVersionStrategy::build_sql_statement(char *sql_buf,
const char *query_sql = NULL;
const bool tenant_sync_mode = TCTX.is_tenant_sync_mode();
query_sql = "SELECT VALUE FROM __ALL_VIRTUAL_SYS_STAT WHERE NAME = 'CURRENT_TIMEZONE_VERSION' AND TENANT_ID = ";
if (OB_ISNULL(sql_buf) || OB_UNLIKELY(mul_statement_buf_len <=0)) {
LOG_ERROR("invalid argument", K(sql_buf), K(mul_statement_buf_len));
ret = OB_INVALID_ARGUMENT;
@ -116,8 +113,8 @@ int QueryTimeZoneInfoVersionStrategy::build_sql_statement(char *sql_buf,
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("not support fetch timezone_version in tenant_sync_mode", KR(ret));
} else if (OB_FAIL(databuff_printf(sql_buf, mul_statement_buf_len, pos,
"%s %lu", query_sql, tenant_id_))) {
LOG_ERROR("build sql fail", KR(ret), K(pos), K(query_sql), "buf_size", mul_statement_buf_len, K(sql_buf), K_(tenant_id));
"SELECT VALUE FROM %s WHERE NAME = 'CURRENT_TIMEZONE_VERSION' AND TENANT_ID = %lu", OB_ALL_VIRTUAL_SYS_STAT_TNAME, tenant_id_))) {
LOG_ERROR("build sql fail", KR(ret), K(pos), "buf_size", mul_statement_buf_len, KCSTRING(sql_buf), K_(tenant_id));
}
return ret;
@ -130,20 +127,13 @@ int QueryTenantLSInfoStrategy::build_sql_statement(char *sql_buf,
{
int ret = OB_SUCCESS;
pos = 0;
const char *query_sql = NULL;
query_sql = "SELECT LS_ID FROM CDB_OB_LS WHERE LS_ID != 1 AND TENANT_ID = ";
if (OB_ISNULL(sql_buf) || OB_UNLIKELY(mul_statement_buf_len <=0)) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument", KR(ret), K(sql_buf), K(mul_statement_buf_len));
} else if (OB_FAIL(databuff_printf(sql_buf, mul_statement_buf_len, pos,
"%s", query_sql))) {
LOG_ERROR("build sql fail", KR(ret), K(pos), K(query_sql), "buf_size", mul_statement_buf_len, K(sql_buf));
} else {
if (OB_FAIL(databuff_printf(sql_buf, mul_statement_buf_len, pos, "%lu;", tenant_id_))) {
LOG_ERROR("build tenant_id sql fail", KR(ret), K(pos), K(query_sql),
"buf_size", mul_statement_buf_len, K(sql_buf), K(tenant_id_));
}
"SELECT LS_ID FROM %s WHERE LS_ID != 1 AND TENANT_ID = %lu", OB_CDB_OB_LS_TNAME, tenant_id_))) {
LOG_ERROR("build sql fail", KR(ret), K(pos), "buf_size", mul_statement_buf_len, KCSTRING(sql_buf));
}
return ret;