【part 1】Split session memory to tenants

This commit is contained in:
obdev 2023-06-15 09:48:00 +00:00 committed by ob-robot
parent ac9fb77cff
commit 20dd11d7a7
18 changed files with 286 additions and 250 deletions

View File

@ -29,8 +29,11 @@ template <typename PageAllocatorT = ModulePageAllocator, typename PageArenaT = P
class ObStringBufT
{
public:
explicit ObStringBufT(const lib::ObLabel &label = ObModIds::OB_STRING_BUF,
explicit ObStringBufT(const lib::ObMemAttr &attr,
const int64_t block_size = DEF_MEM_BLOCK_SIZE);
explicit ObStringBufT(const lib::ObLabel &label = ObModIds::OB_STRING_BUF,
const int64_t block_size = DEF_MEM_BLOCK_SIZE)
: ObStringBufT(lib::ObMemAttr(OB_SERVER_TENANT_ID, label), block_size) {}
explicit ObStringBufT(PageArenaT &arena);
~ObStringBufT();
int reset();

View File

@ -25,10 +25,10 @@ const int64_t ObStringBufT<PageAllocatorT, PageArenaT>::MIN_DEF_MEM_BLOCK_SIZE =
OB_MALLOC_NORMAL_BLOCK_SIZE;
template <typename PageAllocatorT, typename PageArenaT>
ObStringBufT<PageAllocatorT, PageArenaT>::ObStringBufT(const lib::ObLabel &label /*=nullptr*/,
ObStringBufT<PageAllocatorT, PageArenaT>::ObStringBufT(const lib::ObMemAttr &attr,
const int64_t block_size /*= DEF_MEM_BLOCK_SIZE*/)
: local_arena_(block_size < MIN_DEF_MEM_BLOCK_SIZE ? MIN_DEF_MEM_BLOCK_SIZE : block_size,
PageAllocatorT(label)),
PageAllocatorT(attr)),
arena_(local_arena_)
{
}

View File

@ -24,11 +24,30 @@ namespace common
template <class T>
using __is_default_constructible__ = std::is_default_constructible<T>;
class ObClassMeta
{
constexpr static int64_t MAGIC_CODE = 0xbebe23311332bbcc;
public:
ObClassMeta(void *instance)
: magic_code_(MAGIC_CODE),
instance_(instance)
{}
bool check_magic_code() { return MAGIC_CODE == magic_code_; }
void *instance() { return instance_; }
private:
const int64_t magic_code_;
void *instance_;
};
template<class T>
class ObFixedClassAllocator
{
public:
ObFixedClassAllocator(const int obj_size, const ObMemAttr &attr, int blk_size, int64_t nway) : allocator_(obj_size, attr, blk_size)
using object_type = T;
public:
ObFixedClassAllocator(const ObMemAttr &attr, int64_t nway)
: allocator_(sizeof(T) + sizeof(ObClassMeta), attr,
choose_blk_size(sizeof(T) + sizeof(ObClassMeta)))
{
allocator_.set_nway(static_cast<int32_t>(nway));
}
@ -53,9 +72,7 @@ public:
static ObFixedClassAllocator<T> *get(const char* label = "ConcurObjPool")
{
static ObFixedClassAllocator<T> instance(sizeof(T),
SET_USE_500(ObMemAttr(common::OB_SERVER_TENANT_ID, label)),
choose_blk_size(sizeof(T)),
static ObFixedClassAllocator<T> instance(SET_USE_500(ObMemAttr(common::OB_SERVER_TENANT_ID, label)),
common::get_cpu_count());
return &instance;
}
@ -65,10 +82,9 @@ public:
return allocator_.alloc();
}
void free(T *ptr)
void free(void *ptr)
{
if (OB_LIKELY(NULL != ptr)) {
ptr->~T();
allocator_.free(ptr);
ptr = NULL;
}
@ -78,41 +94,53 @@ private:
ObSliceAlloc allocator_;
};
#define op_alloc_args(type, args...) \
template<typename instance_type, typename object_type>
inline void type_checker(instance_type*, object_type*)
{
OLD_STATIC_ASSERT((std::is_same<typename instance_type::object_type, object_type>::value),
"unmatched type");
}
template<typename T>
inline void free_helper(T *ptr)
{
auto *meta = reinterpret_cast<common::ObClassMeta*>((char*)ptr - sizeof(common::ObClassMeta));
abort_unless(meta->check_magic_code());
ptr->~T();
((ObFixedClassAllocator<T>*)meta->instance())->free(meta);
}
#define op_instance_alloc_args(instance, type, args...) \
({ \
type *ret = NULL; \
common::ObFixedClassAllocator<type> *instance = \
common::ObFixedClassAllocator<type>::get(#type); \
type_checker(instance, ret); \
if (OB_LIKELY(NULL != instance)) { \
void *tmp = instance->alloc(); \
if (OB_LIKELY(NULL != tmp)) { \
ret = new (tmp) type(args); \
void *ptr = (instance)->alloc(); \
if (OB_LIKELY(NULL != ptr)) { \
auto *meta = new (ptr) common::ObClassMeta(instance); \
ret = new (meta + 1) type(args); \
} \
} \
ret; \
})
#define op_alloc_args(type, args...) \
({ \
common::ObFixedClassAllocator<type> *instance = \
common::ObFixedClassAllocator<type>::get(#type); \
op_instance_alloc_args(instance, type, args); \
})
#define op_alloc(type) \
({ \
OLD_STATIC_ASSERT((std::is_default_constructible<type>::value), "type is not default constructible"); \
type *ret = NULL; \
common::ObFixedClassAllocator<type> *instance = \
common::ObFixedClassAllocator<type>::get(#type); \
if (OB_LIKELY(NULL != instance)) { \
void *tmp = instance->alloc(); \
if (OB_LIKELY(NULL != tmp)) { \
ret = new (tmp) type(); \
} \
} \
ret; \
op_alloc_args(type); \
})
#define op_free(ptr) \
({ \
common::ObFixedClassAllocator<__typeof__(*ptr)> *instance = \
common::ObFixedClassAllocator<__typeof__(*ptr)>::get(); \
if (OB_LIKELY(NULL != instance)) { \
instance->free(ptr); \
if (OB_LIKELY(NULL != ptr)) { \
free_helper(ptr); \
} \
})

View File

@ -676,6 +676,7 @@ int MockTenantModuleEnv::init()
MTL_BIND2(mtl_new_default, ObTableLockService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND2(server_obj_pool_mtl_new<transaction::ObPartTransCtx>, nullptr, nullptr, nullptr, nullptr, server_obj_pool_mtl_destroy<transaction::ObPartTransCtx>);
MTL_BIND2(server_obj_pool_mtl_new<ObTableScanIterator>, nullptr, nullptr, nullptr, nullptr, server_obj_pool_mtl_destroy<ObTableScanIterator>);
MTL_BIND(ObTenantSQLSessionMgr::mtl_init, ObTenantSQLSessionMgr::mtl_destroy);
}
if (OB_FAIL(ret)) {

View File

@ -459,6 +459,7 @@ int ObMultiTenant::init(ObAddr myaddr,
MTL_BIND2(server_obj_pool_mtl_new<ObPartTransCtx>, nullptr, nullptr, nullptr, nullptr, server_obj_pool_mtl_destroy<ObPartTransCtx>);
MTL_BIND2(server_obj_pool_mtl_new<ObTableScanIterator>, nullptr, nullptr, nullptr, nullptr, server_obj_pool_mtl_destroy<ObTableScanIterator>);
MTL_BIND(ObDetectManager::mtl_init, ObDetectManager::mtl_destroy);
MTL_BIND(ObTenantSQLSessionMgr::mtl_init, ObTenantSQLSessionMgr::mtl_destroy);
if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) {
MTL_BIND2(nullptr, nullptr, start_mysql_queue, mtl_stop_default,
mtl_wait_default, mtl_destroy_default);
@ -1456,12 +1457,14 @@ int ObMultiTenant::remove_tenant(const uint64_t tenant_id, bool &remove_tenant_s
SpinWLockGuard guard(lock_); //add a lock when set tenant stop, omt will check tenant has stop before calling timeup()
removed_tenant->stop();
}
LOG_INFO("removed_tenant begin to kill tenant session", K(tenant_id));
if (OB_FAIL(GCTX.session_mgr_->kill_tenant(tenant_id))) {
LOG_ERROR("fail to kill tenant session", K(ret), K(tenant_id));
{
SpinWLockGuard guard(lock_);
removed_tenant->start();
if (!is_virtual_tenant_id(tenant_id)) {
LOG_INFO("removed_tenant begin to kill tenant session", K(tenant_id));
if (OB_FAIL(GCTX.session_mgr_->kill_tenant(tenant_id))) {
LOG_ERROR("fail to kill tenant session", K(ret), K(tenant_id));
{
SpinWLockGuard guard(lock_);
removed_tenant->start();
}
}
}
}

View File

@ -41,6 +41,7 @@ namespace obmysql {
}
namespace sql {
namespace dtl { class ObTenantDfc; }
class ObTenantSQLSessionMgr;
class ObTenantSqlMemoryManager;
class ObPlanMonitorNodeList;
class ObPlanBaselineMgr;
@ -254,7 +255,8 @@ using ObTableScanIteratorObjPool = common::ObServerObjectPool<oceanbase::storage
ObTestModule*, \
oceanbase::common::sqlclient::ObTenantOciEnvs*, \
rootserver::ObHeartbeatService*, \
oceanbase::common::ObDetectManager* \
oceanbase::common::ObDetectManager*, \
oceanbase::sql::ObTenantSQLSessionMgr* \
)

View File

@ -452,7 +452,7 @@ int ObSchemaCache::init()
} else if (OB_FAIL(tablet_cache_.init(OB_TABLET_TABLE_CACHE_NAME, priority))) {
LOG_WARN("init tablet-table cache failed", KR(ret));
} else if (OB_FAIL(sys_cache_.create(OB_SCHEMA_CACHE_SYS_CACHE_MAP_BUCKET_NUM,
ObModIds::OB_SCHEMA_CACHE_SYS_CACHE_MAP))) {
SET_USE_500(ObModIds::OB_SCHEMA_CACHE_SYS_CACHE_MAP)))) {
LOG_WARN("init sys cache failed", K(ret));
} else if (OB_FAIL(init_all_core_table())) {
LOG_WARN("init all_core_table cache failed", K(ret));

View File

@ -939,8 +939,9 @@ const ObString ObSysVarFactory::get_sys_var_name_by_id(ObSysVarClassType sys_var
return sys_var_name;
}
ObSysVarFactory::ObSysVarFactory()
: allocator_(ObModIds::OB_COMMON_SYS_VAR_FAC), all_sys_vars_created_(false)
ObSysVarFactory::ObSysVarFactory(const int64_t tenant_id)
: allocator_(ObMemAttr(tenant_id, ObModIds::OB_COMMON_SYS_VAR_FAC)),
all_sys_vars_created_(false)
{
MEMSET(store_, 0, sizeof(store_));
MEMSET(store_buf_, 0, sizeof(store_buf_));

View File

@ -1682,7 +1682,7 @@ public:
class ObSysVarFactory
{
public:
ObSysVarFactory();
ObSysVarFactory(const int64_t tenant_id = OB_SERVER_TENANT_ID);
virtual ~ObSysVarFactory();
void destroy();
int create_sys_var(ObSysVarClassType sys_var_id, ObBasicSysVar *&sys_var);
@ -1719,4 +1719,4 @@ private:
}
}
#endif //OCEANBASE_SHARE_SYSTEM_VARIABLE_OB_SYSTEM_VARIABLE_FACTORY_
#endif //OCEANBASE_SHARE_SYSTEM_VARIABLE_OB_SYSTEM_VARIABLE_FACTORY_

View File

@ -51,8 +51,10 @@ namespace sql
ObBasicSessionInfo::SysVarsCacheData ObBasicSessionInfo::SysVarsCache::base_data_;
ObBasicSessionInfo::ObBasicSessionInfo()
: query_mutex_(common::ObLatchIds::SESSION_QUERY_LOCK),
ObBasicSessionInfo::ObBasicSessionInfo(const uint64_t tenant_id)
: orig_tenant_id_(tenant_id),
tenant_session_mgr_(NULL),
query_mutex_(common::ObLatchIds::SESSION_QUERY_LOCK),
thread_data_mutex_(common::ObLatchIds::SESSION_THREAD_DATA_LOCK),
is_valid_(true),
is_deserialized_(false),
@ -91,16 +93,16 @@ ObBasicSessionInfo::ObBasicSessionInfo()
trans_flags_(),
sql_scope_flags_(),
need_reset_package_(false),
base_sys_var_alloc_(ObModIds::OB_SQL_SESSION, OB_MALLOC_NORMAL_BLOCK_SIZE),
inc_sys_var_alloc1_(ObModIds::OB_SQL_SESSION, OB_MALLOC_NORMAL_BLOCK_SIZE),
inc_sys_var_alloc2_(ObModIds::OB_SQL_SESSION, OB_MALLOC_NORMAL_BLOCK_SIZE),
base_sys_var_alloc_(ObMemAttr(orig_tenant_id_, ObModIds::OB_SQL_SESSION), OB_MALLOC_NORMAL_BLOCK_SIZE),
inc_sys_var_alloc1_(ObMemAttr(orig_tenant_id_, ObModIds::OB_SQL_SESSION), OB_MALLOC_NORMAL_BLOCK_SIZE),
inc_sys_var_alloc2_(ObMemAttr(orig_tenant_id_, ObModIds::OB_SQL_SESSION), OB_MALLOC_NORMAL_BLOCK_SIZE),
current_buf_index_(0),
bucket_allocator_wrapper_(&block_allocator_),
user_var_val_map_(SMALL_BLOCK_SIZE, ObWrapperAllocator(&block_allocator_)),
user_var_val_map_(SMALL_BLOCK_SIZE, ObWrapperAllocator(&block_allocator_), orig_tenant_id_),
influence_plan_var_indexs_(),
is_first_gen_(true),
is_first_gen_config_(true),
sys_var_fac_(),
sys_var_fac_(orig_tenant_id_),
next_frag_mem_point_(OB_MALLOC_NORMAL_BLOCK_SIZE), // 8KB
sys_vars_encode_max_size_(0),
consistency_level_(INVALID_CONSISTENCY),
@ -158,6 +160,7 @@ ObBasicSessionInfo::ObBasicSessionInfo()
sess_bt_buff_[0] = '\0';
inc_sys_var_alloc_[0] = &inc_sys_var_alloc1_;
inc_sys_var_alloc_[1] = &inc_sys_var_alloc2_;
influence_plan_var_indexs_.set_attr(ObMemAttr(orig_tenant_id_, "PlanVaIdx"));
}
ObBasicSessionInfo::~ObBasicSessionInfo()
@ -5396,7 +5399,8 @@ int ObBasicSessionInfo::store_query_string_(const ObString &stmt)
thread_data_.cur_query_buf_len_ = 0;
}
int64_t len = MAX(MIN_CUR_QUERY_LEN, truncated_len + 1);
char *buf = reinterpret_cast<char*>(ob_malloc(len, ObModIds::OB_SQL_SESSION_QUERY_SQL));
char *buf = reinterpret_cast<char*>(ob_malloc(len, ObMemAttr(orig_tenant_id_,
ObModIds::OB_SQL_SESSION_QUERY_SQL)));
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));

View File

@ -385,7 +385,7 @@ public:
};
public:
ObBasicSessionInfo();
ObBasicSessionInfo(const uint64_t tenant_id);
virtual ~ObBasicSessionInfo();
virtual int init(uint32_t sessid, uint64_t proxy_sessid,
@ -396,6 +396,11 @@ public:
virtual void destroy();
//called before put session to freelist: unlock/set invalid
virtual void reset(bool skip_sys_var = false);
void set_tenant_session_mgr(ObTenantSQLSessionMgr *tenant_session_mgr)
{
tenant_session_mgr_ = tenant_session_mgr;
}
ObTenantSQLSessionMgr *get_tenant_session_mgr() { return tenant_session_mgr_; }
virtual void clean_status();
//setters
int reset_timezone();
@ -1964,10 +1969,13 @@ private:
};
};
};
protected:
const uint64_t orig_tenant_id_; // which tenant new me
private:
static const int64_t CACHED_SYS_VAR_VERSION = 721;// a magic num
static const int MAX_SESS_BT_BUFF_SIZE = 1024;
ObTenantSQLSessionMgr *tenant_session_mgr_;
// data structure related:
common::ObRecursiveMutex query_mutex_;//互斥同一个session上的多次query请求
common::ObRecursiveMutex thread_data_mutex_;//互斥多个线程对同一session成员的并发读写, 保护thread_data_的一致性
@ -2195,6 +2203,7 @@ private:
int64_t process_query_time_;
};
inline const common::ObString ObBasicSessionInfo::get_current_query_string() const
{
common::ObString str_ret;

View File

@ -38,14 +38,15 @@ ObSessionValMap::ObSessionValMap()
}
ObSessionValMap::ObSessionValMap(const int64_t block_size,
const ObWrapperAllocator &block_allocator)
const ObWrapperAllocator &block_allocator,
const int64_t tenant_id)
: block_allocator_(SMALL_BLOCK_SIZE, common::OB_MALLOC_NORMAL_BLOCK_SIZE,
ObMalloc(ObModIds::OB_SQL_SESSION_VAR_MAP)),
ObMalloc(ObMemAttr(tenant_id, ObModIds::OB_SQL_SESSION_VAR_MAP))),
var_name_val_map_allocer_(block_size, block_allocator),
str_buf1_(ObModIds::OB_SQL_SESSION_VAR_MAP),
str_buf2_(ObModIds::OB_SQL_SESSION_VAR_MAP),
current_buf_index_(0),
bucket_allocator_(ObModIds::OB_SQL_SESSION_VAR_MAP),
bucket_allocator_(ObMemAttr(tenant_id, ObModIds::OB_SQL_SESSION_VAR_MAP)),
bucket_allocator_wrapper_(&bucket_allocator_),
str_buf_free_threshold_(0),
next_free_mem_point_(str_buf_free_threshold_)

View File

@ -49,7 +49,8 @@ public:
> VarNameValMap;
public:
ObSessionValMap();
ObSessionValMap(const int64_t block_size, const common::ObWrapperAllocator &block_allocator);
ObSessionValMap(const int64_t block_size, const common::ObWrapperAllocator &block_allocator,
const int64_t tenant_id=OB_SERVER_TENANT_ID);
virtual ~ObSessionValMap();
// clear all user variable, keep hash table inited
void reuse();

View File

@ -109,9 +109,9 @@ void ObTenantCachedSchemaGuardInfo::try_revert_schema_guard()
}
}
ObSQLSessionInfo::ObSQLSessionInfo() :
ObSQLSessionInfo::ObSQLSessionInfo(const uint64_t tenant_id) :
ObVersionProvider(),
ObBasicSessionInfo(),
ObBasicSessionInfo(tenant_id),
is_inited_(false),
warnings_buf_(),
show_warnings_buf_(),
@ -204,18 +204,15 @@ int ObSQLSessionInfo::init(uint32_t sessid, uint64_t proxy_sessid,
} else if (FALSE_IT(txn_free_route_ctx_.set_sessid(sessid))) {
} else if (!is_acquire_from_pool() &&
OB_FAIL(package_state_map_.create(hash::cal_next_prime(4),
"PackStateMap",
"PackStateMap"))) {
ObMemAttr(orig_tenant_id_, "PackStateMap")))) {
LOG_WARN("create package state map failed", K(ret));
} else if (!is_acquire_from_pool() &&
OB_FAIL(sequence_currval_map_.create(hash::cal_next_prime(32),
"SequenceMap",
"SequenceMap"))) {
ObMemAttr(orig_tenant_id_, "SequenceMap")))) {
LOG_WARN("create sequence current value map failed", K(ret));
} else if (!is_acquire_from_pool() &&
OB_FAIL(contexts_map_.create(hash::cal_next_prime(32),
"ContextsMap",
"ContextsMap"))) {
ObMemAttr(orig_tenant_id_, "ContextsMap")))) {
LOG_WARN("create contexts map failed", K(ret));
} else {
curr_session_context_size_ = 0;

View File

@ -655,7 +655,7 @@ public:
public:
ObSQLSessionInfo();
ObSQLSessionInfo(const uint64_t tenant_id=OB_SERVER_TENANT_ID);
virtual ~ObSQLSessionInfo();
int init(uint32_t sessid, uint64_t proxy_sessid,
@ -981,7 +981,7 @@ public:
int clear_context(const common::ObString &context_name,
const common::ObString &attribute);
int64_t get_curr_session_context_size() const { return curr_session_context_size_; }
void reuse_context_map()
void reuse_context_map()
{
for (auto it = contexts_map_.begin(); it != contexts_map_.end(); ++it) {
if (OB_NOT_NULL(it->second)) {

View File

@ -35,49 +35,53 @@ using namespace oceanbase::sql;
using namespace oceanbase::share;
using namespace oceanbase::observer;
ObSQLSessionMgr::SessionPool::SessionPool()
ObTenantSQLSessionMgr::SessionPool::SessionPool()
: session_pool_()
{
MEMSET(session_array, 0, POOL_CAPACIPY * sizeof(ObSQLSessionInfo *));
MEMSET(session_array_, 0, POOL_CAPACIPY * sizeof(ObSQLSessionInfo *));
}
int ObSQLSessionMgr::SessionPool::init()
int ObTenantSQLSessionMgr::SessionPool::init(const int64_t capacity)
{
int ret = OB_SUCCESS;
char *session_buf = reinterpret_cast<char *>(session_array);
OZ (session_pool_.init(POOL_CAPACIPY, session_buf));
int64_t real_cap = capacity;
if (real_cap > POOL_CAPACIPY) {
real_cap = POOL_CAPACIPY;
}
char *session_buf = reinterpret_cast<char *>(session_array_);
OZ (session_pool_.init(real_cap, session_buf));
return ret;
}
int ObSQLSessionMgr::SessionPool::pop_session(uint64_t tenant_id, ObSQLSessionInfo *&session)
int ObTenantSQLSessionMgr::SessionPool::pop_session(ObSQLSessionInfo *&session)
{
int ret = OB_SUCCESS;
session = NULL;
if (OB_FAIL(session_pool_.pop(session))) {
if (ret != OB_ENTRY_NOT_EXIST) {
LOG_WARN("failed to pop session", K(ret),
K(tenant_id), K(session_pool_.get_total()), K(session_pool_.get_free()));
K(session_pool_.get_total()), K(session_pool_.get_free()));
} else {
ret = OB_SUCCESS;
LOG_DEBUG("session pool is empty",
K(tenant_id), K(session_pool_.get_total()), K(session_pool_.get_free()));
K(session_pool_.get_total()), K(session_pool_.get_free()));
}
}
return ret;
}
int ObSQLSessionMgr::SessionPool::push_session(uint64_t tenant_id, ObSQLSessionInfo *&session)
int ObTenantSQLSessionMgr::SessionPool::push_session(ObSQLSessionInfo *&session)
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(session)) {
if (OB_FAIL(session_pool_.push(session))) {
if (ret != OB_SIZE_OVERFLOW) {
LOG_WARN("failed to push session", K(ret),
K(tenant_id), K(session_pool_.get_total()), K(session_pool_.get_free()));
K(session_pool_.get_total()), K(session_pool_.get_free()));
} else {
ret = OB_SUCCESS;
LOG_DEBUG("session pool is full",
K(tenant_id), K(session_pool_.get_total()), K(session_pool_.get_free()));
K(session_pool_.get_total()), K(session_pool_.get_free()));
}
} else {
session = NULL;
@ -86,96 +90,127 @@ int ObSQLSessionMgr::SessionPool::push_session(uint64_t tenant_id, ObSQLSessionI
return ret;
}
int64_t ObSQLSessionMgr::SessionPool::count() const
int64_t ObTenantSQLSessionMgr::SessionPool::count() const
{
return session_pool_.get_total();
}
int ObSQLSessionMgr::SessionPoolMap::get_session_pool(uint64_t tenant_id,
SessionPool *&session_pool)
ObTenantSQLSessionMgr::ObTenantSQLSessionMgr(const int64_t tenant_id)
: tenant_id_(tenant_id),
session_allocator_(lib::ObMemAttr(tenant_id, "SQLSessionInfo"), MTL_CPU_COUNT())
{}
ObTenantSQLSessionMgr::~ObTenantSQLSessionMgr()
{}
int ObTenantSQLSessionMgr::init()
{
int ret = OB_SUCCESS;
uint64_t block_id = get_block_id(tenant_id);
uint64_t slot_id = get_slot_id(tenant_id);
SessionPoolBlock *pool_block = NULL;
if (block_id < pool_blocks_.count()) {
OX (pool_block = pool_blocks_.at(block_id));
} else {
OZ (create_pool_block(block_id, pool_block));
}
OV (OB_NOT_NULL(pool_block), OB_ERR_UNEXPECTED, tenant_id, block_id, slot_id);
OX (session_pool = ATOMIC_LOAD(&((*pool_block)[slot_id])));
if (OB_ISNULL(session_pool)) {
OZ (create_session_pool(*pool_block, slot_id, session_pool));
if (OB_FAIL(session_pool_.init(MTL_IS_MINI_MODE() ? 32 : SessionPool::POOL_CAPACIPY))) {
LOG_WARN("fail to init session pool", K(tenant_id_), K(ret));
}
return ret;
}
int ObSQLSessionMgr::SessionPoolMap::create_pool_block(uint64_t block_id,
SessionPoolBlock *&pool_block)
void ObTenantSQLSessionMgr::destroy()
{
int ret = OB_SUCCESS;
void *buf = NULL;
ObLockGuard<ObSpinLock> lock_guard(lock_);
while (OB_SUCC(ret) && block_id >= pool_blocks_.count()) {
OV (OB_NOT_NULL(buf = alloc_.alloc(sizeof(SessionPoolBlock))),
OB_ALLOCATE_MEMORY_FAILED, block_id);
OX (MEMSET(buf, 0, sizeof(SessionPoolBlock)));
OZ (pool_blocks_.push_back(static_cast<SessionPoolBlock *>(buf)), block_id);
}
OX (pool_block = pool_blocks_.at(block_id));
return ret;
}
int ObSQLSessionMgr::SessionPoolMap::create_session_pool(SessionPoolBlock &pool_block,
uint64_t slot_id,
SessionPool *&session_pool)
int ObTenantSQLSessionMgr::mtl_init(ObTenantSQLSessionMgr *&t_session_mgr)
{
int ret = OB_SUCCESS;
void *buf = NULL;
OV (slot_id < SLOT_PER_BLOCK);
ObLockGuard<ObSpinLock> lock_guard(lock_);
if (OB_ISNULL(session_pool = ATOMIC_LOAD(&(pool_block[slot_id])))) {
OV (OB_NOT_NULL(buf = alloc_.alloc(sizeof(SessionPool))), OB_ALLOCATE_MEMORY_FAILED);
OV (OB_NOT_NULL(session_pool = new (buf) SessionPool()));
OZ (session_pool->init());
OX (ATOMIC_STORE(&(pool_block[slot_id]), session_pool));
t_session_mgr = OB_NEW(ObTenantSQLSessionMgr, ObMemAttr(MTL_ID(), "TSQLSessionMgr"),
MTL_ID());
if (OB_ISNULL(t_session_mgr)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc tenant session manager", K(ret));
} else if (OB_FAIL(t_session_mgr->init())) {
LOG_WARN("failed to init tenant session manager", K(ret));
}
return ret;
}
uint64_t ObSQLSessionMgr::SessionPoolMap::get_block_id(uint64_t tenant_id) const
void ObTenantSQLSessionMgr::mtl_destroy(ObTenantSQLSessionMgr *&t_session_mgr)
{
return tenant_id >> BLOCK_ID_SHIFT;
if (nullptr != t_session_mgr) {
t_session_mgr->destroy();
OB_DELETE(ObTenantSQLSessionMgr, "unused", t_session_mgr);
t_session_mgr = nullptr;
}
}
uint64_t ObSQLSessionMgr::SessionPoolMap::get_slot_id(uint64_t tenant_id) const
ObSQLSessionInfo *ObTenantSQLSessionMgr::alloc_session()
{
return tenant_id & SLOT_ID_MASK;
int ret = OB_SUCCESS;
ObSQLSessionInfo *session = NULL;
OX (session_pool_.pop_session(session));
if (OB_ISNULL(session)) {
OX (session = op_instance_alloc_args(&session_allocator_,
ObSQLSessionInfo,
tenant_id_));
}
OV (OB_NOT_NULL(session));
OX (session->set_tenant_session_mgr(this));
OX (session->set_valid(true));
OX (session->set_shadow(true));
return session;
}
void ObTenantSQLSessionMgr::free_session(ObSQLSessionInfo *session)
{
int ret = OB_SUCCESS;
SessionPool *session_pool = NULL;
if (ObTenantSQLSessionMgr::is_valid_tenant_id(session->get_login_tenant_id()) &&
session->can_release_to_pool()) {
if (session->is_use_inner_allocator() && !session->is_tenant_killed()) {
session_pool = &session_pool_;
}
}
if (OB_NOT_NULL(session_pool)) {
OX (session->destroy(true));
OX (session->set_acquire_from_pool(true));
OX (session_pool->push_session(session));
}
if (OB_NOT_NULL(session)) {
OX (op_free(session));
OX (session = NULL);
}
}
void ObTenantSQLSessionMgr::clean_session_pool()
{
int ret = OB_SUCCESS;
ObSQLSessionInfo *session = NULL;
// 注意,这里并没有从设计上保证session池一定被彻底清空,有极低概率会有少量遗留session:
// 1. 在生产系统中,删除租户是非常少见的操作。
// 2. 在生产系统中,删除租户前一定会先保证业务层不再访问该租户。
// 以上前提下,理论上该session池已经没有任何操作。
// 3. 正常停掉某个observer,但即使有少数session遗漏,也会随着进程结束而释放。
// 4. unit迁移等情况,可能会有少数session未被释放,但将来unit再迁回后可复用。
while (session_pool_.count() > 0) {
OX (session_pool_.pop_session(session));
if (OB_NOT_NULL(session)) {
OX (op_free(session));
OX (session = NULL);
}
}
}
bool ObTenantSQLSessionMgr::is_valid_tenant_id(uint64_t tenant_id) const
{
return ::is_valid_tenant_id(tenant_id) &&
tenant_id != OB_INVALID_ID &&
tenant_id != OB_SYS_TENANT_ID;
}
int ObSQLSessionMgr::ValueAlloc::clean_tenant(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
SessionPool *session_pool = NULL;
ObSQLSessionInfo *session = NULL;
if (is_valid_tenant_id(tenant_id)) {
OX (session_pool_map_.get_session_pool(tenant_id, session_pool));
}
if (OB_NOT_NULL(session_pool)) {
// 注意,这里并没有从设计上保证session池一定被彻底清空,有极低概率会有少量遗留session:
// 1. 在生产系统中,删除租户是非常少见的操作。
// 2. 在生产系统中,删除租户前一定会先保证业务层不再访问该租户。
// 以上前提下,理论上该session池已经没有任何操作。
// 3. 正常停掉某个observer,但即使有少数session遗漏,也会随着进程结束而释放。
// 4. unit迁移等情况,可能会有少数session未被释放,但将来unit再迁回后可复用。
while (session_pool->count() > 0) {
OX (session_pool->pop_session(tenant_id, session));
if (OB_NOT_NULL(session)) {
OX (op_reclaim_free(session));
OX (session = NULL);
}
}
MTL_SWITCH(tenant_id) {
auto *t_session_mgr = MTL(ObTenantSQLSessionMgr*);
t_session_mgr->clean_session_pool();
} else {
LOG_ERROR("switch tenant failed", K(ret), K(tenant_id));
}
return ret;
}
@ -183,41 +218,28 @@ int ObSQLSessionMgr::ValueAlloc::clean_tenant(uint64_t tenant_id)
ObSQLSessionInfo *ObSQLSessionMgr::ValueAlloc::alloc_value(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
SessionPool *session_pool = NULL;
ObSQLSessionInfo *session = NULL;
int64_t alloc_total_count = 0;
ObFLTControlInfoManager mgr(tenant_id);
// we use OX instead of OZ in operation of upper session pool, because we need acquire
// from lower session pool when not success, no matter which errno we get here.
if (is_valid_tenant_id(tenant_id)) {
OX (session_pool_map_.get_session_pool(tenant_id, session_pool));
MTL_SWITCH(tenant_id) {
auto *t_session_mgr = MTL(ObTenantSQLSessionMgr*);
if (OB_ISNULL(session = t_session_mgr->alloc_session())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc session", K(ret));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(GCTX.session_mgr_->get_sess_hold_map()
.set_refactored(reinterpret_cast<uint64_t>(session), session))) {
LOG_WARN("fail to set session", K(ret), KP(session));
}
}
OX (alloc_total_count = ATOMIC_FAA(&alloc_total_count_, 1));
}
if (OB_NOT_NULL(session_pool)) {
OX (session_pool->pop_session(tenant_id, session));
}
if (OB_ISNULL(session)) {
OX (session = op_reclaim_alloc(ObSQLSessionInfo));
} else if (OB_FAIL(mgr.init())) {
LOG_WARN("failed to init full link control info", K(ret));
if (alloc_total_count > 0 && alloc_total_count % 10000 == 0) {
LOG_INFO("alloc_session_count", K(alloc_total_count));
}
} else {
if (mgr.is_valid_tenant_config()) {
session->set_flt_control_info(mgr.get_control_info());
}
OX (ATOMIC_FAA(&alloc_from_pool_count_, 1));
}
OV (OB_NOT_NULL(session));
OX (session->set_valid(true));
OX (session->set_shadow(true));
if (OB_SUCC(ret)) {
if (OB_FAIL(GCTX.session_mgr_->get_sess_hold_map()
.set_refactored(reinterpret_cast<uint64_t>(session), session))) {
LOG_WARN("fail to set session", K(ret), KP(session));
}
}
if (alloc_total_count > 0 && alloc_total_count % 10000 == 0) {
LOG_INFO("alloc_session_count", K(alloc_total_count), K_(alloc_from_pool_count));
LOG_ERROR("switch tenant failed", K(ret), K(tenant_id));
}
return session;
}
@ -227,7 +249,6 @@ void ObSQLSessionMgr::ValueAlloc::free_value(ObSQLSessionInfo *session)
if (OB_NOT_NULL(session)) {
int ret = OB_SUCCESS;
uint64_t tenant_id = session->get_login_tenant_id();
SessionPool *session_pool = NULL;
int64_t free_total_count = 0;
// delete from hold map, ingore error
int tmp_ret = OB_SUCCESS;
@ -235,37 +256,18 @@ void ObSQLSessionMgr::ValueAlloc::free_value(ObSQLSessionInfo *session)
reinterpret_cast<uint64_t>(session)))) {
LOG_WARN("fail to erase session", K(session->get_sessid()), K(tmp_ret), KP(session));
}
if (is_valid_tenant_id(tenant_id) && session->can_release_to_pool()) {
if (session->is_use_inner_allocator() && !session->is_tenant_killed()) {
OX (session_pool_map_.get_session_pool(tenant_id, session_pool));
}
OX (free_total_count = ATOMIC_FAA(&free_total_count_, 1));
}
if (OB_NOT_NULL(session_pool)) {
OX (session->destroy(true));
OX (session->set_acquire_from_pool(true));
OX (session_pool->push_session(tenant_id, session));
}
if (OB_NOT_NULL(session)) {
OX (op_reclaim_free(session));
OX (session = NULL);
} else {
OX (ATOMIC_FAA(&free_to_pool_count_, 1));
auto *t_session_mgr = session->get_tenant_session_mgr();
if (t_session_mgr != NULL) {
t_session_mgr->free_session(session);
}
OX (free_total_count = ATOMIC_FAA(&free_total_count_, 1));
if (free_total_count > 0 && free_total_count % 10000 == 0) {
LOG_INFO("free_session_count", K(free_total_count), K_(free_to_pool_count));
LOG_INFO("free_session_count", K(free_total_count));
}
ObActiveSessionGuard::setup_default_ash();
}
}
bool ObSQLSessionMgr::ValueAlloc::is_valid_tenant_id(uint64_t tenant_id) const
{
return ::is_valid_tenant_id(tenant_id) &&
tenant_id != OB_INVALID_ID &&
tenant_id != OB_SYS_TENANT_ID;
}
int ObSQLSessionMgr::init()
{
int ret = OB_SUCCESS;

View File

@ -145,61 +145,12 @@ private:
int get_avaiable_local_seq(uint32_t &local_seq);
int set_first_seq(int64_t first_seq);
class SessionPool
{
public:
SessionPool();
public:
int init();
int pop_session(uint64_t tenant_id, ObSQLSessionInfo *&session);
int push_session(uint64_t tenant_id, ObSQLSessionInfo *&session);
int64_t count() const;
TO_STRING_KV(K(session_pool_.capacity()),
K(session_pool_.get_total()),
K(session_pool_.get_free()));
private:
static const int64_t POOL_CAPACIPY = 512;
static const uint64_t POOL_INITED = 0;
common::ObFixedQueue<ObSQLSessionInfo> session_pool_;
ObSQLSessionInfo *session_array[POOL_CAPACIPY];
};
class SessionPoolMap
{
public:
SessionPoolMap(ObIAllocator &alloc)
: pool_blocks_(ObWrapperAllocator(alloc)),
alloc_(alloc),
lock_(common::ObLatchIds::SESSION_POOL_MAP_LOCK)
{}
int get_session_pool(uint64_t tenant_id, SessionPool *&session_pool);
private:
static const uint64_t BLOCK_ID_SHIFT = 10;
static const uint64_t SLOT_ID_MASK = 0x3FF;
static const uint64_t SLOT_PER_BLOCK = 1ULL << BLOCK_ID_SHIFT;
typedef SessionPool *SessionPoolBlock[SLOT_PER_BLOCK];
typedef Ob2DArray<SessionPoolBlock *, 1024, ObWrapperAllocator> SessionPoolBlocks;
private:
int create_pool_block(uint64_t block_id, SessionPoolBlock *&pool_block);
int create_session_pool(SessionPoolBlock &pool_block, uint64_t slot_id, SessionPool *&session_pool);
uint64_t get_block_id(uint64_t tenant_id) const;
uint64_t get_slot_id(uint64_t tenant_id) const;
private:
SessionPoolBlocks pool_blocks_;
ObIAllocator &alloc_;
ObSpinLock lock_;
};
class ValueAlloc
{
public:
ValueAlloc()
: allocator_(SET_USE_500("SessMap")),
session_pool_map_(allocator_),
alloc_total_count_(0),
alloc_from_pool_count_(0),
free_total_count_(0),
free_to_pool_count_(0)
: alloc_total_count_(0),
free_total_count_(0)
{}
~ValueAlloc() {}
int clean_tenant(uint64_t tenant_id);
@ -218,14 +169,8 @@ private:
}
}
private:
bool is_valid_tenant_id(uint64_t tenant_id) const;
private:
ObArenaAllocator allocator_;
SessionPoolMap session_pool_map_;
volatile int64_t alloc_total_count_;
volatile int64_t alloc_from_pool_count_;
volatile int64_t free_total_count_;
volatile int64_t free_to_pool_count_;
static const int64_t MAX_REUSE_COUNT = 10000;
static const int64_t MAX_SYS_VAR_MEM = 256 * 1024;
};
@ -346,6 +291,45 @@ private:
ObSQLSessionInfo *session_;
};
class ObTenantSQLSessionMgr
{
public:
explicit ObTenantSQLSessionMgr(const int64_t tenant_id);
~ObTenantSQLSessionMgr();
int init();
void destroy();
static int mtl_init(ObTenantSQLSessionMgr *&tenant_session_mgr);
static void mtl_destroy(ObTenantSQLSessionMgr *&tenant_session_mgr);
ObSQLSessionInfo *alloc_session();
void free_session(ObSQLSessionInfo *session);
void clean_session_pool();
private:
class SessionPool
{
public:
static const int64_t POOL_CAPACIPY = 512;
public:
SessionPool();
int init(const int64_t capacity);
int pop_session(ObSQLSessionInfo *&session);
int push_session(ObSQLSessionInfo *&session);
int64_t count() const;
TO_STRING_KV(K(session_pool_.capacity()),
K(session_pool_.get_total()),
K(session_pool_.get_free()));
private:
ObSQLSessionInfo *session_array_[POOL_CAPACIPY];
common::ObFixedQueue<ObSQLSessionInfo> session_pool_;
};
bool is_valid_tenant_id(uint64_t tenant_id) const;
private:
const int64_t tenant_id_;
SessionPool session_pool_;
ObFixedClassAllocator<ObSQLSessionInfo> session_allocator_;
DISALLOW_COPY_AND_ASSIGN(ObTenantSQLSessionMgr);
}; // end of class ObSQLSessionMgr
} // end of namespace sql
} // end of namespace oceanbase

View File

@ -34,7 +34,7 @@ TEST(test_basic_session_info, init_set_get)
OBSERVER.init_schema();
OBSERVER.init_tz_info_mgr();
common::ObArenaAllocator allocator(ObModIds::OB_SQL_SESSION);
ObBasicSessionInfo session_info;
ObBasicSessionInfo session_info(OB_SERVER_TENANT_ID);
easy_connection_t conn;
bool autocommit = false;
bool is_valid = false;
@ -79,7 +79,7 @@ TEST(test_basic_session_info, load_variables)
OBSERVER.init_schema();
OBSERVER.init_tz_info_mgr();
common::ObArenaAllocator allocator(ObModIds::OB_SQL_SESSION);
ObBasicSessionInfo session_info;
ObBasicSessionInfo session_info(OB_SERVER_TENANT_ID);
ObBasicSessionInfo::LockGuard lock_guard(session_info.get_query_lock());
ASSERT_EQ(OB_SUCCESS, ObPreProcessSysVars::init_sys_var());
ASSERT_EQ(OB_SUCCESS, session_info.test_init(0, 0, &allocator));