fixde memory leak after remove tenant on arbserver.

This commit is contained in:
HaHaJeff
2023-02-07 16:52:48 +08:00
committed by ob-robot
parent a06c0fb826
commit a8767462ad
14 changed files with 37 additions and 25 deletions

View File

@ -2368,11 +2368,18 @@ OB_INLINE int64_t &ob_get_cluster_id()
return cluster_id; return cluster_id;
} }
OB_INLINE int64_t &ob_get_arb_tenant_id()
{
RLOCAL(int64_t, arb_tenant_id);
return arb_tenant_id;
}
#define GETTID() ob_gettid() #define GETTID() ob_gettid()
#define GETTNAME() ob_get_tname() #define GETTNAME() ob_get_tname()
#define GET_TENANT_ID() ob_get_tenant_id() #define GET_TENANT_ID() ob_get_tenant_id()
#define gettid GETTID #define gettid GETTID
#define GET_CLUSTER_ID() ob_get_cluster_id() #define GET_CLUSTER_ID() ob_get_cluster_id()
#define GET_ARB_TENANT_ID() ob_get_arb_tenant_id()
//for explain //for explain
#define LEFT_BRACKET "(" #define LEFT_BRACKET "("

View File

@ -677,7 +677,7 @@ int ObLogger::log_head(const char *mod_name,
tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min,
tm.tm_sec, tv.tv_usec, errstr_[level], mod_name, function, tm.tm_sec, tv.tv_usec, errstr_[level], mod_name, function,
base_file_name, line, GETTID(), GETTNAME(), is_arb_replica_ ? cluster_id_buf : "", base_file_name, line, GETTID(), GETTNAME(), is_arb_replica_ ? cluster_id_buf : "",
GET_TENANT_ID(), TRACE_ID_FORMAT_PARAM(trace_id), is_arb_replica_ ? GET_ARB_TENANT_ID() : GET_TENANT_ID(), TRACE_ID_FORMAT_PARAM(trace_id),
last_logging_cost_time_us_, errcode_buf); last_logging_cost_time_us_, errcode_buf);
} }
} }

View File

@ -34,7 +34,6 @@ class SCN;
} }
namespace palf namespace palf
{ {
#define PALF_ENV_ID (OB_INVALID_TENANT_ID == MTL_ID() ? OB_SERVER_TENANT_ID : MTL_ID())
#define FLASHBACK_SUFFIX ".flashback" #define FLASHBACK_SUFFIX ".flashback"
#define TMP_SUFFIX ".tmp" #define TMP_SUFFIX ".tmp"

View File

@ -54,7 +54,7 @@ int LogGroupBuffer::init(const LSN &start_lsn)
// // TODO: add tenant config // // TODO: add tenant config
// // group_buffer_size = tenant_config->_log_groupgation_buffer_size; // // group_buffer_size = tenant_config->_log_groupgation_buffer_size;
//} //}
ObMemAttr mem_attr(PALF_ENV_ID, "LogGroupBuffer"); ObMemAttr mem_attr(MTL_ID(), "LogGroupBuffer");
if (NULL == (data_buf_ = static_cast<char *>(mtl_malloc(group_buffer_size, mem_attr)))) { if (NULL == (data_buf_ = static_cast<char *>(mtl_malloc(group_buffer_size, mem_attr)))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
PALF_LOG(ERROR, "alloc memory failed", K(ret)); PALF_LOG(ERROR, "alloc memory failed", K(ret));

View File

@ -44,6 +44,7 @@ LogIOWorker::~LogIOWorker()
} }
int LogIOWorker::init(const LogIOWorkerConfig &config, int LogIOWorker::init(const LogIOWorkerConfig &config,
const int64_t tenant_id,
int cb_thread_pool_tg_id, int cb_thread_pool_tg_id,
ObIAllocator *allocator, ObIAllocator *allocator,
IPalfEnvImpl *palf_env_impl) IPalfEnvImpl *palf_env_impl)
@ -57,7 +58,7 @@ int LogIOWorker::init(const LogIOWorkerConfig &config,
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "invalid argument!!!", K(ret), K(config), K(cb_thread_pool_tg_id), KP(allocator), PALF_LOG(ERROR, "invalid argument!!!", K(ret), K(config), K(cb_thread_pool_tg_id), KP(allocator),
KP(palf_env_impl)); KP(palf_env_impl));
} else if (OB_FAIL(queue_.init(config.io_queue_capcity_, "IOWorkerLQ", PALF_ENV_ID))) { } else if (OB_FAIL(queue_.init(config.io_queue_capcity_, "IOWorkerLQ", tenant_id))) {
PALF_LOG(ERROR, "io task queue init failed", K(ret), K(config)); PALF_LOG(ERROR, "io task queue init failed", K(ret), K(config));
} else if (OB_FAIL(batch_io_task_mgr_.init(config.batch_width_, } else if (OB_FAIL(batch_io_task_mgr_.init(config.batch_width_,
config.batch_depth_, config.batch_depth_,

View File

@ -68,6 +68,7 @@ public:
LogIOWorker(); LogIOWorker();
~LogIOWorker(); ~LogIOWorker();
int init(const LogIOWorkerConfig &config, int init(const LogIOWorkerConfig &config,
const int64_t tenant_id,
int cb_thread_pool_tg_id, int cb_thread_pool_tg_id,
ObIAllocator *allocaotr, ObIAllocator *allocaotr,
IPalfEnvImpl *palf_env_impl); IPalfEnvImpl *palf_env_impl);

View File

@ -15,7 +15,6 @@
#include "log_rpc_proxy.h" // LogRpcProxyV2 #include "log_rpc_proxy.h" // LogRpcProxyV2
#include "log_rpc_packet.h" // LogRpcPaket #include "log_rpc_packet.h" // LogRpcPaket
#include "log_req.h" // LogPushReq... #include "log_req.h" // LogPushReq...
#include "log_define.h" // PALF_ENV_ID
namespace oceanbase namespace oceanbase
{ {
using namespace common; using namespace common;
@ -35,12 +34,6 @@ LogRpc::~LogRpc()
destroy(); destroy();
} }
int LogRpc::init(const ObAddr &self,
rpc::frame::ObReqTransport *transport)
{
return init(self, PALF_ENV_ID, transport);
}
int LogRpc::init(const ObAddr &self, int LogRpc::init(const ObAddr &self,
const int64_t tenant_id, const int64_t tenant_id,
rpc::frame::ObReqTransport *transport) rpc::frame::ObReqTransport *transport)

View File

@ -72,8 +72,6 @@ public:
int init(const common::ObAddr &self, int init(const common::ObAddr &self,
const int64_t tenant_id, const int64_t tenant_id,
rpc::frame::ObReqTransport *transport); rpc::frame::ObReqTransport *transport);
int init(const common::ObAddr &self,
rpc::frame::ObReqTransport *transport);
void destroy(); void destroy();
int update_transport_compress_options(const PalfTransportCompressOptions &compress_opt); int update_transport_compress_options(const PalfTransportCompressOptions &compress_opt);
const PalfTransportCompressOptions& get_compress_opts() const; const PalfTransportCompressOptions& get_compress_opts() const;

View File

@ -22,7 +22,7 @@
const ObAddr server = rpc_packet.src_; \ const ObAddr server = rpc_packet.src_; \
int64_t palf_id = rpc_packet.palf_id_; \ int64_t palf_id = rpc_packet.palf_id_; \
if (OB_ISNULL(palf_env_impl_) \ if (OB_ISNULL(palf_env_impl_) \
&& OB_FAIL(__get_palf_env_impl(rpc_pkt_->get_tenant_id(), palf_env_impl_, filter_ == NULL))) { \ && OB_FAIL(__get_palf_env_impl(rpc_pkt_->get_tenant_id(), palf_env_impl_, filter_ == NULL))) { \
} else if (OB_UNLIKELY(NULL != filter_ && true == (*filter_)(server))) { \ } else if (OB_UNLIKELY(NULL != filter_ && true == (*filter_)(server))) { \
PALF_LOG(INFO, "need filter this packet", K(rpc_packet)); \ PALF_LOG(INFO, "need filter this packet", K(rpc_packet)); \
} else { \ } else { \

View File

@ -25,10 +25,7 @@ int __get_palf_env_impl(uint64_t tenant_id, IPalfEnvImpl *&palf_env_impl, const
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
logservice::ObLogService *log_service = nullptr; logservice::ObLogService *log_service = nullptr;
PalfEnv *palf_env = nullptr; PalfEnv *palf_env = nullptr;
if (need_check_tenant_id && tenant_id != PALF_ENV_ID) { if (OB_ISNULL(log_service = MTL(logservice::ObLogService*))) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(ERROR, "mtl id not match", K(tenant_id), K(PALF_ENV_ID), K(ret));
} else if (OB_ISNULL(log_service = MTL(logservice::ObLogService*))) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "get_log_service failed", K(ret)); COMMON_LOG(WARN, "get_log_service failed", K(ret));
} else if (OB_ISNULL(palf_env = log_service->get_palf_env())) { } else if (OB_ISNULL(palf_env = log_service->get_palf_env())) {
@ -37,6 +34,9 @@ int __get_palf_env_impl(uint64_t tenant_id, IPalfEnvImpl *&palf_env_impl, const
} else if (OB_ISNULL(palf_env_impl = palf_env->get_palf_env_impl())) { } else if (OB_ISNULL(palf_env_impl = palf_env->get_palf_env_impl())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "get_palf_env_impl failed", K(ret), KP(log_service), KP(palf_env_impl)); COMMON_LOG(WARN, "get_palf_env_impl failed", K(ret), KP(log_service), KP(palf_env_impl));
} else if (need_check_tenant_id && tenant_id != palf_env_impl->get_tenant_id()) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(ERROR, "tenant_id is not same as palf_env", K(ret), K(tenant_id), "tenant_id_in_palf", palf_env_impl->get_tenant_id());
} else { } else {
// do nothing // do nothing
} }

View File

@ -52,7 +52,7 @@ int PalfEnv::create_palf_env(
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_FAIL(FileDirectoryUtils::delete_tmp_file_or_directory_at(base_dir))) { } else if (OB_FAIL(FileDirectoryUtils::delete_tmp_file_or_directory_at(base_dir))) {
CLOG_LOG(WARN, "delete_tmp_file_or_directory_at failed", K(ret), K(base_dir)); CLOG_LOG(WARN, "delete_tmp_file_or_directory_at failed", K(ret), K(base_dir));
} else if (OB_FAIL(palf_env->palf_env_impl_.init(options, base_dir, self, transport, } else if (OB_FAIL(palf_env->palf_env_impl_.init(options, base_dir, self, MTL_ID(), transport,
log_alloc_mgr, log_block_pool))) { log_alloc_mgr, log_block_pool))) {
PALF_LOG(WARN, "PalfEnvImpl init failed", K(ret), K(base_dir)); PALF_LOG(WARN, "PalfEnvImpl init failed", K(ret), K(base_dir));
} else if (OB_FAIL(palf_env->start_())) { } else if (OB_FAIL(palf_env->start_())) {

View File

@ -146,7 +146,9 @@ PalfEnvImpl::PalfEnvImpl() : palf_meta_lock_(common::ObLatchIds::PALF_ENV_LOCK),
palf_handle_impl_map_(64), // 指定min_size=64 palf_handle_impl_map_(64), // 指定min_size=64
last_palf_epoch_(0), last_palf_epoch_(0),
diskspace_enough_(true), diskspace_enough_(true),
is_inited_(false) tenant_id_(0),
is_inited_(false),
is_running_(false)
{ {
log_dir_[0] = '\0'; log_dir_[0] = '\0';
tmp_log_dir_[0] = '\0'; tmp_log_dir_[0] = '\0';
@ -160,6 +162,7 @@ PalfEnvImpl::~PalfEnvImpl()
int PalfEnvImpl::init( int PalfEnvImpl::init(
const PalfOptions &options, const PalfOptions &options,
const char *base_dir, const ObAddr &self, const char *base_dir, const ObAddr &self,
const int64_t tenant_id,
rpc::frame::ObReqTransport *transport, rpc::frame::ObReqTransport *transport,
common::ObILogAllocator *log_alloc_mgr, common::ObILogAllocator *log_alloc_mgr,
ILogBlockPool *log_block_pool) ILogBlockPool *log_block_pool)
@ -182,11 +185,12 @@ int PalfEnvImpl::init(
KP(log_alloc_mgr), KP(log_block_pool)); KP(log_alloc_mgr), KP(log_block_pool));
} else if (OB_FAIL(fetch_log_engine_.init(this, log_alloc_mgr))) { } else if (OB_FAIL(fetch_log_engine_.init(this, log_alloc_mgr))) {
PALF_LOG(ERROR, "FetchLogEngine init failed", K(ret)); PALF_LOG(ERROR, "FetchLogEngine init failed", K(ret));
} else if (OB_FAIL(log_rpc_.init(self, transport))) { } else if (OB_FAIL(log_rpc_.init(self, tenant_id, transport))) {
PALF_LOG(ERROR, "LogRpc init failed", K(ret)); PALF_LOG(ERROR, "LogRpc init failed", K(ret));
} else if (OB_FAIL(cb_thread_pool_.init(io_cb_num, this))) { } else if (OB_FAIL(cb_thread_pool_.init(io_cb_num, this))) {
PALF_LOG(ERROR, "LogIOTaskThreadPool init failed", K(ret)); PALF_LOG(ERROR, "LogIOTaskThreadPool init failed", K(ret));
} else if (OB_FAIL(log_io_worker_.init(log_io_worker_config_, } else if (OB_FAIL(log_io_worker_.init(log_io_worker_config_,
tenant_id,
cb_thread_pool_.get_tg_id(), cb_thread_pool_.get_tg_id(),
log_alloc_mgr, this))) { log_alloc_mgr, this))) {
PALF_LOG(ERROR, "LogIOWorker init failed", K(ret)); PALF_LOG(ERROR, "LogIOWorker init failed", K(ret));
@ -200,7 +204,7 @@ int PalfEnvImpl::init(
} else if (pret < 0 || pret >= MAX_PATH_SIZE) { } else if (pret < 0 || pret >= MAX_PATH_SIZE) {
ret = OB_BUF_NOT_ENOUGH; ret = OB_BUF_NOT_ENOUGH;
PALF_LOG(ERROR, "construct log path failed", K(ret), K(pret)); PALF_LOG(ERROR, "construct log path failed", K(ret), K(pret));
} else if (OB_FAIL(palf_handle_impl_map_.init("LOG_HASH_MAP", PALF_ENV_ID))) { } else if (OB_FAIL(palf_handle_impl_map_.init("LOG_HASH_MAP", tenant_id))) {
PALF_LOG(ERROR, "palf_handle_impl_map_ init failed", K(ret)); PALF_LOG(ERROR, "palf_handle_impl_map_ init failed", K(ret));
} else if (OB_FAIL(log_loop_thread_.init(this))) { } else if (OB_FAIL(log_loop_thread_.init(this))) {
PALF_LOG(ERROR, "log_loop_thread_ init failed", K(ret)); PALF_LOG(ERROR, "log_loop_thread_ init failed", K(ret));
@ -217,6 +221,7 @@ int PalfEnvImpl::init(
log_alloc_mgr_ = log_alloc_mgr; log_alloc_mgr_ = log_alloc_mgr;
log_block_pool_ = log_block_pool; log_block_pool_ = log_block_pool;
self_ = self; self_ = self;
tenant_id_ = tenant_id;
is_inited_ = true; is_inited_ = true;
is_running_ = true; is_running_ = true;
PALF_LOG(INFO, "PalfEnvImpl init success", K(ret), K(self_), KPC(this)); PALF_LOG(INFO, "PalfEnvImpl init success", K(ret), K(self_), KPC(this));
@ -248,7 +253,7 @@ int PalfEnvImpl::start()
PALF_LOG(ERROR, "LogUpdater start failed", K(ret)); PALF_LOG(ERROR, "LogUpdater start failed", K(ret));
} else { } else {
is_running_ = true; is_running_ = true;
PALF_LOG(INFO, "PalfEnv start success", K(ret), K(PALF_ENV_ID)); PALF_LOG(INFO, "PalfEnv start success", K(ret));
} }
return ret; return ret;
} }
@ -1139,5 +1144,10 @@ int PalfEnvImpl::get_io_start_time(int64_t &last_working_time)
return ret; return ret;
} }
int64_t PalfEnvImpl::get_tenant_id()
{
return tenant_id_;
}
} // end namespace palf } // end namespace palf
} // end namespace oceanbase } // end namespace oceanbase

View File

@ -161,7 +161,9 @@ public:
virtual int remove_directory(const char *base_dir) = 0; virtual int remove_directory(const char *base_dir) = 0;
virtual bool check_disk_space_enough() = 0; virtual bool check_disk_space_enough() = 0;
virtual int get_io_start_time(int64_t &last_working_time) = 0; virtual int get_io_start_time(int64_t &last_working_time) = 0;
virtual int64_t get_tenant_id() = 0;
VIRTUAL_TO_STRING_KV("IPalfEnvImpl", "Dummy"); VIRTUAL_TO_STRING_KV("IPalfEnvImpl", "Dummy");
}; };
// 日志服务的容器类,同时管理logservice对象的生命周期 // 日志服务的容器类,同时管理logservice对象的生命周期
@ -174,6 +176,7 @@ public:
int init(const PalfOptions &options, int init(const PalfOptions &options,
const char *base_dir, const char *base_dir,
const common::ObAddr &self, const common::ObAddr &self,
const int64_t tenant_id,
rpc::frame::ObReqTransport *transport, rpc::frame::ObReqTransport *transport,
common::ObILogAllocator *alloc_mgr, common::ObILogAllocator *alloc_mgr,
ILogBlockPool *log_block_pool); ILogBlockPool *log_block_pool);
@ -220,6 +223,7 @@ public:
int for_each(const common::ObFunction<int(IPalfHandleImpl *ipalf_handle_impl)> &func) override final; int for_each(const common::ObFunction<int(IPalfHandleImpl *ipalf_handle_impl)> &func) override final;
common::ObILogAllocator* get_log_allocator() override final; common::ObILogAllocator* get_log_allocator() override final;
int get_io_start_time(int64_t &last_working_time) override final; int get_io_start_time(int64_t &last_working_time) override final;
int64_t get_tenant_id() override final;
INHERIT_TO_STRING_KV("IPalfEnvImpl", IPalfEnvImpl, K_(self), K_(log_dir), K_(disk_options_wrapper), INHERIT_TO_STRING_KV("IPalfEnvImpl", IPalfEnvImpl, K_(self), K_(log_dir), K_(disk_options_wrapper),
KPC(log_alloc_mgr_)); KPC(log_alloc_mgr_));
// =================== disk space management ================== // =================== disk space management ==================
@ -323,6 +327,7 @@ private:
LogIOWorkerConfig log_io_worker_config_; LogIOWorkerConfig log_io_worker_config_;
bool diskspace_enough_; bool diskspace_enough_;
int64_t tenant_id_;
bool is_inited_; bool is_inited_;
bool is_running_; bool is_running_;
private: private:

View File

@ -1467,8 +1467,6 @@ int ObServer::init_config()
// update gctx_.startup_mode_ // update gctx_.startup_mode_
if (FAILEDx(parse_mode())) { if (FAILEDx(parse_mode())) {
LOG_ERROR("parse_mode failed", KR(ret)); LOG_ERROR("parse_mode failed", KR(ret));
} else if (is_arbitration_mode()) {
ObMallocAllocator::get_instance()->make_allocator_create_on_demand();
} }
config_.print(); config_.print();