diff --git a/src/observer/table/ob_table_context.cpp b/src/observer/table/ob_table_context.cpp index 39f786e6a5..60a8c71fb8 100644 --- a/src/observer/table/ob_table_context.cpp +++ b/src/observer/table/ob_table_context.cpp @@ -53,24 +53,22 @@ int ObTableCtx::get_tablet_by_rowkey(const ObRowkey &rowkey, return ret; } -int ObTableCtx::init_sess_info(uint64_t tenant_id, const ObString &tenant_name, uint64_t user_id) +int ObTableCtx::init_sess_info(ObTableApiCredential &credential) { int ret = OB_SUCCESS; // try get session from session pool - if (OB_FAIL(GCTX.table_service_->get_sess_mgr().get_sess_info(tenant_id, - user_id, - sess_guard_))) { - LOG_WARN("fail to get session info", K(ret), K(tenant_id), K(user_id)); + if (OB_FAIL(GCTX.table_service_->get_sess_mgr().get_sess_info(credential, sess_guard_))) { + LOG_WARN("fail to get session info", K(ret), K(credential)); } else if (OB_ISNULL(sess_guard_.get_sess_node_val())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("session info is null", K(ret), K(user_id)); + LOG_WARN("session info is null", K(ret), K(credential)); } return ret; } -int ObTableCtx::init_common(const ObTableApiCredential &credential, +int ObTableCtx::init_common(ObTableApiCredential &credential, const common::ObTabletID &arg_tablet_id, const common::ObString &arg_table_name, const int64_t &timeout_ts) @@ -80,7 +78,6 @@ int ObTableCtx::init_common(const ObTableApiCredential &credential, const ObTenantSchema *tenant_schema = nullptr; const uint64_t tenant_id = credential.tenant_id_; const uint64_t database_id = credential.database_id_; - const uint64_t user_id = credential.user_id_; ObTabletID tablet_id = arg_tablet_id; if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard_))) { @@ -99,8 +96,8 @@ int ObTableCtx::init_common(const ObTableApiCredential &credential, } else if (OB_ISNULL(tenant_schema)) { ret = OB_SCHEMA_ERROR; LOG_WARN("tenant schema is null", K(ret)); - } else if (OB_FAIL(init_sess_info(tenant_id, tenant_schema->get_tenant_name_str(), user_id))) { - LOG_WARN("fail to init session info", K(ret), K(tenant_id), K(user_id)); + } else if (OB_FAIL(init_sess_info(credential))) { + LOG_WARN("fail to init session info", K(ret), K(credential)); } else if (!arg_tablet_id.is_valid()) { if (is_scan_) { // 扫描场景使用table_schema上的tablet id,客户端已经做了路由分发 if (table_schema_->is_partitioned_table()) { // 不支持分区表 diff --git a/src/observer/table/ob_table_context.h b/src/observer/table/ob_table_context.h index 78828fcdc2..b17f09b342 100644 --- a/src/observer/table/ob_table_context.h +++ b/src/observer/table/ob_table_context.h @@ -223,7 +223,7 @@ public: public: // 初始化common部分(不包括expr_info_, exec_ctx_, all_exprs_) - int init_common(const ObTableApiCredential &credential, + int init_common(ObTableApiCredential &credential, const common::ObTabletID &arg_tablet_id, const common::ObString &arg_table_name, const int64_t &timeout_ts); @@ -263,7 +263,7 @@ private: // for common int get_tablet_by_rowkey(const common::ObRowkey &rowkey, common::ObTabletID &tablet_id); - int init_sess_info(uint64_t tenant_id, const common::ObString &tenant_name, uint64_t user_id); + int init_sess_info(ObTableApiCredential &credential); // for scan int init_index_info(const common::ObString &index_name); int generate_columns_type(common::ObIArray &columns_type); diff --git a/src/observer/table/ob_table_rpc_processor.cpp b/src/observer/table/ob_table_rpc_processor.cpp index 57db7c5e53..24744761a5 100644 --- a/src/observer/table/ob_table_rpc_processor.cpp +++ b/src/observer/table/ob_table_rpc_processor.cpp @@ -255,9 +255,7 @@ int ObTableApiProcessorBase::check_user_access(const ObString &credential_str) const ObTableApiCredential *sess_credetial = nullptr; if (OB_FAIL(serialization::decode(credential_str.ptr(), credential_str.length(), pos, credential_))) { LOG_WARN("failed to serialize credential", K(ret), K(pos)); - } else if (OB_FAIL(gctx_.table_service_->get_sess_mgr().get_sess_info(credential_.tenant_id_, - credential_.user_id_, - guard))) { + } else if (OB_FAIL(gctx_.table_service_->get_sess_mgr().get_sess_info(credential_, guard))) { LOG_WARN("fail to get session info", K(ret), K_(credential)); } else if (OB_FAIL(guard.get_credential(sess_credetial))) { LOG_WARN("fail to get credential", K(ret)); diff --git a/src/observer/table/ob_table_session_pool.cpp b/src/observer/table/ob_table_session_pool.cpp index 70f98fba19..96cc07e52f 100644 --- a/src/observer/table/ob_table_session_pool.cpp +++ b/src/observer/table/ob_table_session_pool.cpp @@ -72,16 +72,15 @@ void ObTableApiSessPoolMgr::destroy() } } -int ObTableApiSessPoolMgr::get_sess_info(uint64_t tenant_id, - uint64_t user_id, +int ObTableApiSessPoolMgr::get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard) { int ret = OB_SUCCESS; ObTableApiSessPoolGuard &pool_guard = guard.get_sess_pool_guard(); - if (OB_FAIL(get_session_pool(tenant_id, pool_guard))) { + if (OB_FAIL(get_session_pool(credential.tenant_id_, pool_guard))) { LOG_WARN("fail to get session pool", K(ret)); - } else if (OB_FAIL(pool_guard.get_sess_pool()->get_sess_info(user_id, guard))) { - LOG_WARN("fail to get sess info", K(ret), K(tenant_id), K(user_id)); + } else if (OB_FAIL(pool_guard.get_sess_pool()->get_sess_info(credential, guard))) { + LOG_WARN("fail to get sess info", K(ret), K(credential)); } return ret; @@ -213,7 +212,7 @@ int ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::run_retire_sess_task() ObTableApiSessPoolGuard pool_guard; if (OB_FAIL(sess_pool_mgr_->get_session_pool(tenant_id, pool_guard))) { LOG_WARN("fail to get sess pool", K(ret), K(tenant_id)); - } else if (OB_FAIL(pool_guard.get_sess_pool()->move_retired_sess())) { + } else if (OB_FAIL(pool_guard.get_sess_pool()->move_sess_to_retired_list())) { LOG_WARN("fail to move retired session", K(ret)); } } @@ -307,7 +306,7 @@ void ObTableApiSessPool::destroy() } // 将过期的node从hash map中摘掉,加入retired_nodes_链表中 -int ObTableApiSessPool::move_retired_sess() +int ObTableApiSessPool::move_sess_to_retired_list() { int ret = OB_SUCCESS; int64_t cur_time = ObTimeUtility::current_time(); @@ -321,7 +320,7 @@ int ObTableApiSessPool::move_retired_sess() for (int64_t i = 0; OB_SUCC(ret) && i < N; ++i) { const ObTableApiSessForeachOp::ObTableApiSessKV &kv = arr.at(i); if (cur_time - kv.node_->get_last_active_ts() >= SESS_RETIRE_TIME) { - if (OB_FAIL(move_retired_sess(kv.key_))) { + if (OB_FAIL(move_sess_to_retired_list(kv.key_))) { LOG_WARN("fail to move retired session", K(ret), K(kv.key_)); } } @@ -331,7 +330,7 @@ int ObTableApiSessPool::move_retired_sess() return ret; } -int ObTableApiSessPool::move_retired_sess(uint64_t key) +int ObTableApiSessPool::move_sess_to_retired_list(uint64_t key) { int ret = OB_SUCCESS; ObTableApiSessNode *del_node = nullptr; @@ -351,6 +350,19 @@ int ObTableApiSessPool::move_retired_sess(uint64_t key) return ret; } +int ObTableApiSessPool::move_sess_to_retired_list(ObTableApiSessNode *node) +{ + int ret = OB_SUCCESS; + + ObLockGuard guard(lock_); + if (false == (retired_nodes_.add_last(node))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to add retired sess node to retired list", K(ret), K(*node)); + } + + return ret; +} + // 清理retired_nodes_中没有引用的node int ObTableApiSessPool::evict_retired_sess() { @@ -427,16 +439,26 @@ int ObTableApiSessPool::get_sess_node(uint64_t key, return ret; } -int ObTableApiSessPool::get_sess_info(uint64_t key, ObTableApiSessGuard &guard) +int ObTableApiSessPool::get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard) { int ret = OB_SUCCESS; ObTableApiSessNode *sess_node = nullptr; ObTableApiSessNodeVal *sess_val = nullptr; bool need_extend = false; - if (OB_FAIL(get_sess_node(key, sess_node))) { - LOG_WARN("fail to get sess node", K(ret), K(key)); - } else { + if (OB_FAIL(get_sess_node(credential.user_id_, sess_node))) { + LOG_WARN("fail to get sess node", K(ret), K(credential)); + } + + if (OB_HASH_NOT_EXIST == ret) { + if (OB_FAIL(create_and_add_node(credential))) { + LOG_WARN("fail to create and add session node to session pool", K(ret), K(credential)); + } else if (OB_FAIL(get_sess_node(credential.user_id_, sess_node))) { // get again + LOG_WARN("fail to get sess node", K(ret), K(credential)); + } + } + + if (OB_SUCC(ret)) { if (OB_FAIL(sess_node->get_sess_node_val(sess_val))) { // exist LOG_WARN("fail to get sess node value", K(ret), K(*sess_node)); } else if (OB_ISNULL(sess_val)) { @@ -448,7 +470,7 @@ int ObTableApiSessPool::get_sess_info(uint64_t key, ObTableApiSessGuard &guard) if (need_extend) { if (OB_FAIL(sess_node->extend_sess_val(guard))) { - LOG_WARN("fail to extend sess val", K(ret), K(*sess_node), K(key)); + LOG_WARN("fail to extend sess val", K(ret), K(*sess_node), K(credential)); } } @@ -516,17 +538,27 @@ int ObTableApiSessPool::update_sess(ObTableApiCredential &credential) } else { LOG_WARN("fail to get session node", K(ret), K(key)); } - } else { // exist, 摘掉原来的,添加新的 - if (OB_FAIL(move_retired_sess(key))) { - LOG_WARN("fail to move retired session", K(ret), K(key)); - } else if (OB_FAIL(create_and_add_node(credential))) { - LOG_WARN("fail to create and add new node", K(ret), K(credential)); + } else { // exist, 替换node,old node移动到淘汰链表等待淘汰 + if (OB_FAIL(replace_sess(credential))) { + LOG_WARN("fail to replace session node", K(ret), K(credential)); } } return ret; } +int ObTableApiSessPool::replace_sess(ObTableApiCredential &credential) +{ + int ret = OB_SUCCESS; + + ObTableApiSessNodeReplaceOp replace_callback(*this, credential); + if (OB_FAIL(key_node_map_.atomic_refactored(credential.user_id_, replace_callback))) { + LOG_WARN("fail to replace session", K(ret), K(credential)); + } + + return ret; +} + int64_t ObTableApiSessPool::inc_ref_count() { return ATOMIC_AAF((uint64_t *)&ref_count_, 1); @@ -692,6 +724,30 @@ int ObTableApiSessNodeAtomicOp::get_value(ObTableApiSessNode *&node) return ret; } +int ObTableApiSessNodeReplaceOp::operator()(MapKV &entry) +{ + int ret = OB_SUCCESS; + + if (nullptr == entry.second) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null session node", K(ret), K(entry.first)); + } else { + // 1. 创建新的session node + ObTableApiSessNode *new_node = nullptr; + if (OB_FAIL(pool_.create_node(credential_, new_node))) { + LOG_WARN("fail to create node", K(ret), K_(credential)); + } else { + // 2. 替换 + ObTableApiSessNode *old_node = entry.second; + entry.second = new_node; + // 3. old node移动到淘汰链表 + pool_.move_sess_to_retired_list(old_node); // 添加到链表末尾,不会出错,故不判断返回值 + } + } + + return ret; +} + int ObTableApiSessForeachOp::operator()(MapKV &entry) { int ret = common::OB_SUCCESS; diff --git a/src/observer/table/ob_table_session_pool.h b/src/observer/table/ob_table_session_pool.h index 187a8d31d3..c4e7cd817a 100644 --- a/src/observer/table/ob_table_session_pool.h +++ b/src/observer/table/ob_table_session_pool.h @@ -65,7 +65,7 @@ public: void wait(); void destroy(); int get_session_pool(uint64_t tenant_id, ObTableApiSessPoolGuard &guard); - int get_sess_info(uint64_t tenant_id, uint64_t user_id, ObTableApiSessGuard &guard); + int get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard); int update_sess(ObTableApiCredential &credential); private: int extend_sess_pool(uint64_t tenant_id, ObTableApiSessPoolGuard &guard); @@ -122,17 +122,19 @@ public: void set_deleted() { ATOMIC_SET(&is_deleted_, true); } bool is_deleted() { return ATOMIC_LOAD(&is_deleted_); } bool is_empty() const { return key_node_map_.empty(); } - int get_sess_info(uint64_t key, ObTableApiSessGuard &guard); + int get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard); int update_sess(ObTableApiCredential &credential); // 将过期的node移动到retired_nodes_ - int move_retired_sess(); + int move_sess_to_retired_list(); int evict_retired_sess(); -private: int create_node(ObTableApiCredential &credential, ObTableApiSessNode *&node); + int move_sess_to_retired_list(ObTableApiSessNode *node); +private: + int replace_sess(ObTableApiCredential &credential); int create_and_add_node(ObTableApiCredential &credential); int get_sess_node(uint64_t key, ObTableApiSessNode *&node); int evict_all_session(); - int move_retired_sess(uint64_t key); + int move_sess_to_retired_list(uint64_t key); private: common::ObArenaAllocator allocator_; bool is_inited_; @@ -313,6 +315,24 @@ private: DISALLOW_COPY_AND_ASSIGN(ObTableApiSessNodeAtomicOp); }; +class ObTableApiSessNodeReplaceOp +{ +protected: + typedef common::hash::HashMapPair MapKV; +public: + ObTableApiSessNodeReplaceOp(ObTableApiSessPool &pool, ObTableApiCredential &credential) + : pool_(pool), + credential_(credential) + {} + virtual ~ObTableApiSessNodeReplaceOp() {} + int operator()(MapKV &entry); +private: + ObTableApiSessPool &pool_; + ObTableApiCredential &credential_; +private: + DISALLOW_COPY_AND_ASSIGN(ObTableApiSessNodeReplaceOp); +}; + class ObTableApiSessForeachOp { public: diff --git a/unittest/observer/table/test_table_sess_pool.cpp b/unittest/observer/table/test_table_sess_pool.cpp index c041a12ef8..0d9a26a780 100644 --- a/unittest/observer/table/test_table_sess_pool.cpp +++ b/unittest/observer/table/test_table_sess_pool.cpp @@ -75,8 +75,6 @@ TEST_F(TestTableSessPool, get_session) ASSERT_EQ(false, pool->is_deleted_); ASSERT_EQ(tenant_id, pool->tenant_id_); ObTableApiSessGuard sess_guard; - ASSERT_EQ(OB_HASH_NOT_EXIST, pool->get_sess_info(key, sess_guard)); - ASSERT_EQ(OB_HASH_NOT_EXIST, pool->get_sess_info(key, sess_guard)); ObTableApiCredential credential; credential.tenant_id_ = tenant_id; credential.user_id_ = user_id; @@ -137,7 +135,7 @@ TEST_F(TestTableSessPool, retire_session) for (int64_t j = 0; j < kvs.count(); j++) { if (j % 2 == 0) { const ObTableApiSessForeachOp::ObTableApiSessKV &kv = kvs.at(j); - ASSERT_EQ(OB_SUCCESS, pool->move_retired_sess(kv.key_)); + ASSERT_EQ(OB_SUCCESS, pool->move_sess_to_retired_list(kv.key_)); } } } @@ -188,7 +186,7 @@ TEST_F(TestTableSessPool, reference_session) { // get ObTableApiSessGuard guard; - ASSERT_EQ(OB_SUCCESS, pool->get_sess_info(user_id, guard)); + ASSERT_EQ(OB_SUCCESS, pool->get_sess_info(credential, guard)); ASSERT_NE(nullptr, guard.get_sess_node_val()); // mark retire ObTableApiSessForeachOp op; @@ -213,6 +211,45 @@ TEST_F(TestTableSessPool, reference_session) ASSERT_EQ(1, arr.count()); } +TEST_F(TestTableSessPool, retire_session_then_get_session) +{ + uint64_t tenant_id = 1; + uint64_t user_id = 0; + ObTableApiSessPoolMgr mgr; + ASSERT_EQ(OB_SUCCESS, mgr.init()); + ObTableApiSessPoolGuard pool_guard; + ASSERT_EQ(OB_SUCCESS, mgr.extend_sess_pool(tenant_id, pool_guard)); + ObTableApiCredential credential; + credential.tenant_id_ = tenant_id; + credential.user_id_ = user_id; + ASSERT_EQ(OB_SUCCESS, mgr.update_sess(credential)); + + // 塞一个ObTableApiSessNodeVal + ObTableApiSessPool *pool = pool_guard.get_sess_pool(); + ObTableApiSessNode *node = nullptr; + ASSERT_EQ(OB_SUCCESS, pool->get_sess_node(user_id, node)); + void *buf = node->allocator_.alloc(sizeof(ObTableApiSessNodeVal)); + ASSERT_NE(nullptr, buf); + ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(tenant_id, node); + val->is_inited_ = true; + ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(val)); + + // 第一次获取了session + ObTableApiSessGuard sess_guard; + ASSERT_EQ(OB_SUCCESS, mgr.get_sess_info(credential, sess_guard)); + sess_guard.~ObTableApiSessGuard(); // 模仿访问结束,析构 + + // 长时间没有访问ob,session被放到淘汰链表,后台定时回收 + ASSERT_EQ(OB_SUCCESS, pool->get_sess_node(user_id, node)); + ASSERT_EQ(OB_SUCCESS, pool->move_sess_to_retired_list(user_id)); + ASSERT_EQ(1, pool->retired_nodes_.size_); + ASSERT_EQ(OB_SUCCESS, mgr.elimination_task_.run_recycle_retired_sess_task()); + ASSERT_EQ(0, pool->retired_nodes_.size_); + + // 连接隔了很长时间,突然又访问db + ASSERT_EQ(OB_HASH_NOT_EXIST, pool->get_sess_node(user_id, node)); +} + int main(int argc, char **argv) { OB_LOGGER.set_log_level("INFO");