diff --git a/deps/oblib/src/lib/allocator/ob_mod_define.h b/deps/oblib/src/lib/allocator/ob_mod_define.h index e11ebbdcdf..e9fa3f2a6a 100644 --- a/deps/oblib/src/lib/allocator/ob_mod_define.h +++ b/deps/oblib/src/lib/allocator/ob_mod_define.h @@ -719,7 +719,7 @@ LABEL_ITEM_DEF(OB_SQL_PS_CACHE, SqlPsCache) LABEL_ITEM_DEF(OB_SQL_PS_SQL_META, SqlPsSqlMeta) LABEL_ITEM_DEF(OB_SQL_PX_BLOOM_FILTER, SqlPxBloomFilte) LABEL_ITEM_DEF(OB_PS_SESSION_INFO_ARRAY, PsSessiInfoArra) -LABEL_ITEM_DEF(OB_CONNECT_BY_PUMP, ConnecByPump) +LABEL_ITEM_DEF(OB_CONNECT_BY_PUMP, ConnectByPump) // mpimodules LABEL_ITEM_DEF(OB_MPI_INTERM_RESULT, MpiIntermResult) diff --git a/src/sql/engine/connect_by/ob_cnnt_by_pump.cpp b/src/sql/engine/connect_by/ob_cnnt_by_pump.cpp index bd70534437..98225ca28e 100644 --- a/src/sql/engine/connect_by/ob_cnnt_by_pump.cpp +++ b/src/sql/engine/connect_by/ob_cnnt_by_pump.cpp @@ -230,9 +230,14 @@ int ObConnectByOpPump::init(const ObNLConnectBySpec& connect_by, ObNLConnectByOp if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); + } else if (OB_ISNULL(eval_ctx.exec_ctx_.get_my_session())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is null", K(ret)); } else if (OB_FAIL(hash_filter_rows_.create(CONNECT_BY_TREE_HEIGHT))) { LOG_WARN("create hash set failed", K(ret)); } else { + uint64_t tenant_id = eval_ctx.exec_ctx_.get_my_session()->get_effective_tenant_id(); + allocator_.set_tenant_id(tenant_id); connect_by_prior_exprs_ = &connect_by.connect_by_prior_exprs_; eval_ctx_ = &eval_ctx; connect_by_ = &connect_by_op; diff --git a/src/sql/engine/connect_by/ob_cnnt_by_pump.h b/src/sql/engine/connect_by/ob_cnnt_by_pump.h index 1e03551e24..be12228757 100644 --- a/src/sql/engine/connect_by/ob_cnnt_by_pump.h +++ b/src/sql/engine/connect_by/ob_cnnt_by_pump.h @@ -16,6 +16,7 @@ #include "lib/allocator/ob_malloc.h" #include "lib/container/ob_se_array.h" #include "lib/container/ob_fixed_array.h" +#include "sql/engine/ob_exec_context.h" namespace oceanbase { namespace sql { @@ -48,6 +49,10 @@ private: --alloc_cnt_; allocator_.free(ptr); } + void set_tenant_id(int64_t tenant_id) + { + allocator_.set_tenant_id(tenant_id); + } private: common::ObArenaAllocator allocator_; diff --git a/src/sql/engine/connect_by/ob_cnnt_by_pump_bfs.cpp b/src/sql/engine/connect_by/ob_cnnt_by_pump_bfs.cpp index 293637d8e9..51487ecfde 100644 --- a/src/sql/engine/connect_by/ob_cnnt_by_pump_bfs.cpp +++ b/src/sql/engine/connect_by/ob_cnnt_by_pump_bfs.cpp @@ -260,7 +260,12 @@ int ObConnectByOpBFSPump::init( if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); + } else if (OB_ISNULL(eval_ctx.exec_ctx_.get_my_session())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is null", K(ret)); } else { + uint64_t tenant_id = eval_ctx.exec_ctx_.get_my_session()->get_effective_tenant_id(); + allocator_.set_tenant_id(tenant_id); connect_by_ = &connect_by_op; connect_by_prior_exprs_ = &connect_by.connect_by_prior_exprs_; left_prior_exprs_ = &connect_by.left_prior_exprs_; diff --git a/src/sql/engine/connect_by/ob_connect_by_utility.cpp b/src/sql/engine/connect_by/ob_connect_by_utility.cpp index b463f0fb64..bbf7e64b50 100644 --- a/src/sql/engine/connect_by/ob_connect_by_utility.cpp +++ b/src/sql/engine/connect_by/ob_connect_by_utility.cpp @@ -382,6 +382,9 @@ int ObConnectByPump::init(const ObConnectBy& connect_by, common::ObExprCtx* expr if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); + } else if (OB_ISNULL(expr_ctx) || OB_ISNULL(expr_ctx->my_session_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("expr_ctx or session is null", K(ret)); } else if (OB_UNLIKELY(2 != connect_by.get_child_num())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid child num", K(ret)); @@ -397,6 +400,8 @@ int ObConnectByPump::init(const ObConnectBy& connect_by, common::ObExprCtx* expr } else if (OB_FAIL(hash_filter_rows_.create(CONNECT_BY_TREE_HEIGHT))) { LOG_WARN("create hash set failed", K(ret)); } else { + uint64_t tenant_id = expr_ctx->my_session_->get_effective_tenant_id(); + allocator_.set_tenant_id(tenant_id); pump_row_desc_ = connect_by.get_pump_row_desc(); pseudo_column_row_desc_ = connect_by.get_pseudo_column_row_desc(); connect_by_prior_exprs_ = connect_by.get_connect_by_prior_exprs(); diff --git a/src/sql/engine/connect_by/ob_connect_by_utility.h b/src/sql/engine/connect_by/ob_connect_by_utility.h index 1322e7a1ab..24b797d8df 100644 --- a/src/sql/engine/connect_by/ob_connect_by_utility.h +++ b/src/sql/engine/connect_by/ob_connect_by_utility.h @@ -21,6 +21,7 @@ #include "sql/engine/sort/ob_base_sort.h" #include "sql/engine/basic/ob_chunk_row_store.h" #include "sql/engine/ob_phy_operator.h" +#include "sql/engine/ob_exec_context.h" namespace oceanbase { namespace sql { @@ -62,6 +63,10 @@ private: --alloc_cnt_; allocator_.free(ptr); } + void set_tenant_id(int64_t tenant_id) + { + allocator_.set_tenant_id(tenant_id); + } private: common::ObArenaAllocator allocator_; diff --git a/src/sql/engine/connect_by/ob_connect_by_utility_bfs.cpp b/src/sql/engine/connect_by/ob_connect_by_utility_bfs.cpp index f05ec87c78..5250a9b814 100644 --- a/src/sql/engine/connect_by/ob_connect_by_utility_bfs.cpp +++ b/src/sql/engine/connect_by/ob_connect_by_utility_bfs.cpp @@ -278,6 +278,9 @@ int ObConnectByPumpBFS::init(const ObConnectByWithIndex& connect_by, common::ObE if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); + } else if (OB_ISNULL(expr_ctx) || OB_ISNULL(expr_ctx->my_session_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("expr_ctx or session is null", K(ret)); } else if (OB_UNLIKELY(2 != connect_by.get_child_num())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid child num", K(ret)); @@ -289,6 +292,8 @@ int ObConnectByPumpBFS::init(const ObConnectByWithIndex& connect_by, common::ObE } else if (OB_FAIL(alloc_shallow_row_projector(*left_op))) { LOG_WARN("fail to alloc shallow row projector", K(ret)); } else { + uint64_t tenant_id = expr_ctx->my_session_->get_effective_tenant_id(); + allocator_.set_tenant_id(tenant_id); pump_row_desc_ = connect_by.get_pump_row_desc(); pseudo_column_row_desc_ = connect_by.get_pseudo_column_row_desc(); connect_by_prior_exprs_ = connect_by.get_connect_by_prior_exprs();