[Bugfix]fix bug in tableapi session pool

This commit is contained in:
WeiXinChan 2023-07-03 09:42:25 +00:00 committed by ob-robot
parent d4b8928662
commit 49ac9ea0a7
6 changed files with 151 additions and 43 deletions

View File

@ -53,24 +53,22 @@ int ObTableCtx::get_tablet_by_rowkey(const ObRowkey &rowkey,
return ret;
}
int ObTableCtx::init_sess_info(uint64_t tenant_id, const ObString &tenant_name, uint64_t user_id)
int ObTableCtx::init_sess_info(ObTableApiCredential &credential)
{
int ret = OB_SUCCESS;
// try get session from session pool
if (OB_FAIL(GCTX.table_service_->get_sess_mgr().get_sess_info(tenant_id,
user_id,
sess_guard_))) {
LOG_WARN("fail to get session info", K(ret), K(tenant_id), K(user_id));
if (OB_FAIL(GCTX.table_service_->get_sess_mgr().get_sess_info(credential, sess_guard_))) {
LOG_WARN("fail to get session info", K(ret), K(credential));
} else if (OB_ISNULL(sess_guard_.get_sess_node_val())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session info is null", K(ret), K(user_id));
LOG_WARN("session info is null", K(ret), K(credential));
}
return ret;
}
int ObTableCtx::init_common(const ObTableApiCredential &credential,
int ObTableCtx::init_common(ObTableApiCredential &credential,
const common::ObTabletID &arg_tablet_id,
const common::ObString &arg_table_name,
const int64_t &timeout_ts)
@ -80,7 +78,6 @@ int ObTableCtx::init_common(const ObTableApiCredential &credential,
const ObTenantSchema *tenant_schema = nullptr;
const uint64_t tenant_id = credential.tenant_id_;
const uint64_t database_id = credential.database_id_;
const uint64_t user_id = credential.user_id_;
ObTabletID tablet_id = arg_tablet_id;
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard_))) {
@ -99,8 +96,8 @@ int ObTableCtx::init_common(const ObTableApiCredential &credential,
} else if (OB_ISNULL(tenant_schema)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("tenant schema is null", K(ret));
} else if (OB_FAIL(init_sess_info(tenant_id, tenant_schema->get_tenant_name_str(), user_id))) {
LOG_WARN("fail to init session info", K(ret), K(tenant_id), K(user_id));
} else if (OB_FAIL(init_sess_info(credential))) {
LOG_WARN("fail to init session info", K(ret), K(credential));
} else if (!arg_tablet_id.is_valid()) {
if (is_scan_) { // 扫描场景使用table_schema上的tablet id,客户端已经做了路由分发
if (table_schema_->is_partitioned_table()) { // 不支持分区表

View File

@ -223,7 +223,7 @@ public:
public:
// 初始化common部分(不包括expr_info_, exec_ctx_, all_exprs_)
int init_common(const ObTableApiCredential &credential,
int init_common(ObTableApiCredential &credential,
const common::ObTabletID &arg_tablet_id,
const common::ObString &arg_table_name,
const int64_t &timeout_ts);
@ -263,7 +263,7 @@ private:
// for common
int get_tablet_by_rowkey(const common::ObRowkey &rowkey,
common::ObTabletID &tablet_id);
int init_sess_info(uint64_t tenant_id, const common::ObString &tenant_name, uint64_t user_id);
int init_sess_info(ObTableApiCredential &credential);
// for scan
int init_index_info(const common::ObString &index_name);
int generate_columns_type(common::ObIArray<sql::ObExprResType> &columns_type);

View File

@ -255,9 +255,7 @@ int ObTableApiProcessorBase::check_user_access(const ObString &credential_str)
const ObTableApiCredential *sess_credetial = nullptr;
if (OB_FAIL(serialization::decode(credential_str.ptr(), credential_str.length(), pos, credential_))) {
LOG_WARN("failed to serialize credential", K(ret), K(pos));
} else if (OB_FAIL(gctx_.table_service_->get_sess_mgr().get_sess_info(credential_.tenant_id_,
credential_.user_id_,
guard))) {
} else if (OB_FAIL(gctx_.table_service_->get_sess_mgr().get_sess_info(credential_, guard))) {
LOG_WARN("fail to get session info", K(ret), K_(credential));
} else if (OB_FAIL(guard.get_credential(sess_credetial))) {
LOG_WARN("fail to get credential", K(ret));

View File

@ -72,16 +72,15 @@ void ObTableApiSessPoolMgr::destroy()
}
}
int ObTableApiSessPoolMgr::get_sess_info(uint64_t tenant_id,
uint64_t user_id,
int ObTableApiSessPoolMgr::get_sess_info(ObTableApiCredential &credential,
ObTableApiSessGuard &guard)
{
int ret = OB_SUCCESS;
ObTableApiSessPoolGuard &pool_guard = guard.get_sess_pool_guard();
if (OB_FAIL(get_session_pool(tenant_id, pool_guard))) {
if (OB_FAIL(get_session_pool(credential.tenant_id_, pool_guard))) {
LOG_WARN("fail to get session pool", K(ret));
} else if (OB_FAIL(pool_guard.get_sess_pool()->get_sess_info(user_id, guard))) {
LOG_WARN("fail to get sess info", K(ret), K(tenant_id), K(user_id));
} else if (OB_FAIL(pool_guard.get_sess_pool()->get_sess_info(credential, guard))) {
LOG_WARN("fail to get sess info", K(ret), K(credential));
}
return ret;
@ -213,7 +212,7 @@ int ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::run_retire_sess_task()
ObTableApiSessPoolGuard pool_guard;
if (OB_FAIL(sess_pool_mgr_->get_session_pool(tenant_id, pool_guard))) {
LOG_WARN("fail to get sess pool", K(ret), K(tenant_id));
} else if (OB_FAIL(pool_guard.get_sess_pool()->move_retired_sess())) {
} else if (OB_FAIL(pool_guard.get_sess_pool()->move_sess_to_retired_list())) {
LOG_WARN("fail to move retired session", K(ret));
}
}
@ -307,7 +306,7 @@ void ObTableApiSessPool::destroy()
}
// 将过期的node从hash map中摘掉,加入retired_nodes_链表中
int ObTableApiSessPool::move_retired_sess()
int ObTableApiSessPool::move_sess_to_retired_list()
{
int ret = OB_SUCCESS;
int64_t cur_time = ObTimeUtility::current_time();
@ -321,7 +320,7 @@ int ObTableApiSessPool::move_retired_sess()
for (int64_t i = 0; OB_SUCC(ret) && i < N; ++i) {
const ObTableApiSessForeachOp::ObTableApiSessKV &kv = arr.at(i);
if (cur_time - kv.node_->get_last_active_ts() >= SESS_RETIRE_TIME) {
if (OB_FAIL(move_retired_sess(kv.key_))) {
if (OB_FAIL(move_sess_to_retired_list(kv.key_))) {
LOG_WARN("fail to move retired session", K(ret), K(kv.key_));
}
}
@ -331,7 +330,7 @@ int ObTableApiSessPool::move_retired_sess()
return ret;
}
int ObTableApiSessPool::move_retired_sess(uint64_t key)
int ObTableApiSessPool::move_sess_to_retired_list(uint64_t key)
{
int ret = OB_SUCCESS;
ObTableApiSessNode *del_node = nullptr;
@ -351,6 +350,19 @@ int ObTableApiSessPool::move_retired_sess(uint64_t key)
return ret;
}
int ObTableApiSessPool::move_sess_to_retired_list(ObTableApiSessNode *node)
{
int ret = OB_SUCCESS;
ObLockGuard<ObSpinLock> guard(lock_);
if (false == (retired_nodes_.add_last(node))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to add retired sess node to retired list", K(ret), K(*node));
}
return ret;
}
// 清理retired_nodes_中没有引用的node
int ObTableApiSessPool::evict_retired_sess()
{
@ -427,16 +439,26 @@ int ObTableApiSessPool::get_sess_node(uint64_t key,
return ret;
}
int ObTableApiSessPool::get_sess_info(uint64_t key, ObTableApiSessGuard &guard)
int ObTableApiSessPool::get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard)
{
int ret = OB_SUCCESS;
ObTableApiSessNode *sess_node = nullptr;
ObTableApiSessNodeVal *sess_val = nullptr;
bool need_extend = false;
if (OB_FAIL(get_sess_node(key, sess_node))) {
LOG_WARN("fail to get sess node", K(ret), K(key));
} else {
if (OB_FAIL(get_sess_node(credential.user_id_, sess_node))) {
LOG_WARN("fail to get sess node", K(ret), K(credential));
}
if (OB_HASH_NOT_EXIST == ret) {
if (OB_FAIL(create_and_add_node(credential))) {
LOG_WARN("fail to create and add session node to session pool", K(ret), K(credential));
} else if (OB_FAIL(get_sess_node(credential.user_id_, sess_node))) { // get again
LOG_WARN("fail to get sess node", K(ret), K(credential));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(sess_node->get_sess_node_val(sess_val))) { // exist
LOG_WARN("fail to get sess node value", K(ret), K(*sess_node));
} else if (OB_ISNULL(sess_val)) {
@ -448,7 +470,7 @@ int ObTableApiSessPool::get_sess_info(uint64_t key, ObTableApiSessGuard &guard)
if (need_extend) {
if (OB_FAIL(sess_node->extend_sess_val(guard))) {
LOG_WARN("fail to extend sess val", K(ret), K(*sess_node), K(key));
LOG_WARN("fail to extend sess val", K(ret), K(*sess_node), K(credential));
}
}
@ -516,17 +538,27 @@ int ObTableApiSessPool::update_sess(ObTableApiCredential &credential)
} else {
LOG_WARN("fail to get session node", K(ret), K(key));
}
} else { // exist, 摘掉原来的,添加新的
if (OB_FAIL(move_retired_sess(key))) {
LOG_WARN("fail to move retired session", K(ret), K(key));
} else if (OB_FAIL(create_and_add_node(credential))) {
LOG_WARN("fail to create and add new node", K(ret), K(credential));
} else { // exist, 替换node,old node移动到淘汰链表等待淘汰
if (OB_FAIL(replace_sess(credential))) {
LOG_WARN("fail to replace session node", K(ret), K(credential));
}
}
return ret;
}
int ObTableApiSessPool::replace_sess(ObTableApiCredential &credential)
{
int ret = OB_SUCCESS;
ObTableApiSessNodeReplaceOp replace_callback(*this, credential);
if (OB_FAIL(key_node_map_.atomic_refactored(credential.user_id_, replace_callback))) {
LOG_WARN("fail to replace session", K(ret), K(credential));
}
return ret;
}
int64_t ObTableApiSessPool::inc_ref_count()
{
return ATOMIC_AAF((uint64_t *)&ref_count_, 1);
@ -692,6 +724,30 @@ int ObTableApiSessNodeAtomicOp::get_value(ObTableApiSessNode *&node)
return ret;
}
int ObTableApiSessNodeReplaceOp::operator()(MapKV &entry)
{
int ret = OB_SUCCESS;
if (nullptr == entry.second) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null session node", K(ret), K(entry.first));
} else {
// 1. 创建新的session node
ObTableApiSessNode *new_node = nullptr;
if (OB_FAIL(pool_.create_node(credential_, new_node))) {
LOG_WARN("fail to create node", K(ret), K_(credential));
} else {
// 2. 替换
ObTableApiSessNode *old_node = entry.second;
entry.second = new_node;
// 3. old node移动到淘汰链表
pool_.move_sess_to_retired_list(old_node); // 添加到链表末尾,不会出错,故不判断返回值
}
}
return ret;
}
int ObTableApiSessForeachOp::operator()(MapKV &entry)
{
int ret = common::OB_SUCCESS;

View File

@ -65,7 +65,7 @@ public:
void wait();
void destroy();
int get_session_pool(uint64_t tenant_id, ObTableApiSessPoolGuard &guard);
int get_sess_info(uint64_t tenant_id, uint64_t user_id, ObTableApiSessGuard &guard);
int get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard);
int update_sess(ObTableApiCredential &credential);
private:
int extend_sess_pool(uint64_t tenant_id, ObTableApiSessPoolGuard &guard);
@ -122,17 +122,19 @@ public:
void set_deleted() { ATOMIC_SET(&is_deleted_, true); }
bool is_deleted() { return ATOMIC_LOAD(&is_deleted_); }
bool is_empty() const { return key_node_map_.empty(); }
int get_sess_info(uint64_t key, ObTableApiSessGuard &guard);
int get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard);
int update_sess(ObTableApiCredential &credential);
// 将过期的node移动到retired_nodes_
int move_retired_sess();
int move_sess_to_retired_list();
int evict_retired_sess();
private:
int create_node(ObTableApiCredential &credential, ObTableApiSessNode *&node);
int move_sess_to_retired_list(ObTableApiSessNode *node);
private:
int replace_sess(ObTableApiCredential &credential);
int create_and_add_node(ObTableApiCredential &credential);
int get_sess_node(uint64_t key, ObTableApiSessNode *&node);
int evict_all_session();
int move_retired_sess(uint64_t key);
int move_sess_to_retired_list(uint64_t key);
private:
common::ObArenaAllocator allocator_;
bool is_inited_;
@ -313,6 +315,24 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObTableApiSessNodeAtomicOp);
};
class ObTableApiSessNodeReplaceOp
{
protected:
typedef common::hash::HashMapPair<uint64_t, ObTableApiSessNode*> MapKV;
public:
ObTableApiSessNodeReplaceOp(ObTableApiSessPool &pool, ObTableApiCredential &credential)
: pool_(pool),
credential_(credential)
{}
virtual ~ObTableApiSessNodeReplaceOp() {}
int operator()(MapKV &entry);
private:
ObTableApiSessPool &pool_;
ObTableApiCredential &credential_;
private:
DISALLOW_COPY_AND_ASSIGN(ObTableApiSessNodeReplaceOp);
};
class ObTableApiSessForeachOp
{
public:

View File

@ -75,8 +75,6 @@ TEST_F(TestTableSessPool, get_session)
ASSERT_EQ(false, pool->is_deleted_);
ASSERT_EQ(tenant_id, pool->tenant_id_);
ObTableApiSessGuard sess_guard;
ASSERT_EQ(OB_HASH_NOT_EXIST, pool->get_sess_info(key, sess_guard));
ASSERT_EQ(OB_HASH_NOT_EXIST, pool->get_sess_info(key, sess_guard));
ObTableApiCredential credential;
credential.tenant_id_ = tenant_id;
credential.user_id_ = user_id;
@ -137,7 +135,7 @@ TEST_F(TestTableSessPool, retire_session)
for (int64_t j = 0; j < kvs.count(); j++) {
if (j % 2 == 0) {
const ObTableApiSessForeachOp::ObTableApiSessKV &kv = kvs.at(j);
ASSERT_EQ(OB_SUCCESS, pool->move_retired_sess(kv.key_));
ASSERT_EQ(OB_SUCCESS, pool->move_sess_to_retired_list(kv.key_));
}
}
}
@ -188,7 +186,7 @@ TEST_F(TestTableSessPool, reference_session)
{
// get
ObTableApiSessGuard guard;
ASSERT_EQ(OB_SUCCESS, pool->get_sess_info(user_id, guard));
ASSERT_EQ(OB_SUCCESS, pool->get_sess_info(credential, guard));
ASSERT_NE(nullptr, guard.get_sess_node_val());
// mark retire
ObTableApiSessForeachOp op;
@ -213,6 +211,45 @@ TEST_F(TestTableSessPool, reference_session)
ASSERT_EQ(1, arr.count());
}
TEST_F(TestTableSessPool, retire_session_then_get_session)
{
uint64_t tenant_id = 1;
uint64_t user_id = 0;
ObTableApiSessPoolMgr mgr;
ASSERT_EQ(OB_SUCCESS, mgr.init());
ObTableApiSessPoolGuard pool_guard;
ASSERT_EQ(OB_SUCCESS, mgr.extend_sess_pool(tenant_id, pool_guard));
ObTableApiCredential credential;
credential.tenant_id_ = tenant_id;
credential.user_id_ = user_id;
ASSERT_EQ(OB_SUCCESS, mgr.update_sess(credential));
// 塞一个ObTableApiSessNodeVal
ObTableApiSessPool *pool = pool_guard.get_sess_pool();
ObTableApiSessNode *node = nullptr;
ASSERT_EQ(OB_SUCCESS, pool->get_sess_node(user_id, node));
void *buf = node->allocator_.alloc(sizeof(ObTableApiSessNodeVal));
ASSERT_NE(nullptr, buf);
ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(tenant_id, node);
val->is_inited_ = true;
ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(val));
// 第一次获取了session
ObTableApiSessGuard sess_guard;
ASSERT_EQ(OB_SUCCESS, mgr.get_sess_info(credential, sess_guard));
sess_guard.~ObTableApiSessGuard(); // 模仿访问结束,析构
// 长时间没有访问ob,session被放到淘汰链表,后台定时回收
ASSERT_EQ(OB_SUCCESS, pool->get_sess_node(user_id, node));
ASSERT_EQ(OB_SUCCESS, pool->move_sess_to_retired_list(user_id));
ASSERT_EQ(1, pool->retired_nodes_.size_);
ASSERT_EQ(OB_SUCCESS, mgr.elimination_task_.run_recycle_retired_sess_task());
ASSERT_EQ(0, pool->retired_nodes_.size_);
// 连接隔了很长时间,突然又访问db
ASSERT_EQ(OB_HASH_NOT_EXIST, pool->get_sess_node(user_id, node));
}
int main(int argc, char **argv)
{
OB_LOGGER.set_log_level("INFO");