diff --git a/src/storage/compaction/ob_partition_merge_iter.cpp b/src/storage/compaction/ob_partition_merge_iter.cpp index edac84ebb..514157094 100644 --- a/src/storage/compaction/ob_partition_merge_iter.cpp +++ b/src/storage/compaction/ob_partition_merge_iter.cpp @@ -2145,6 +2145,23 @@ int ObPartitionMinorMacroMergeIter::get_curr_macro_block( return ret; } +ObPartitionMVRowMergeIter::ObMVSqlResource::ObMVSqlResource() + : read_result_(), + free_session_ctx_(), + session_(nullptr), + conn_(nullptr), + sql_result_(nullptr) +{ +} + +ObPartitionMVRowMergeIter::ObMVSqlResource::~ObMVSqlResource() +{ + read_result_.~ReadResult(); // need decons before session + ObMviewCompactionHelper::release_inner_connection(conn_); + ObMviewCompactionHelper::release_inner_session(free_session_ctx_, session_); + sql_result_ = nullptr; +} + /* * ObPartitionRowMergeIter used for mv major merge */ @@ -2155,20 +2172,13 @@ ObPartitionMVRowMergeIter::ObPartitionMVRowMergeIter(common::ObIAllocator &alloc sql_idx_(-1), sql_read_col_cnt_(0), store_col_cnt_(0), - read_result_(), result_row_(), - free_session_ctx_(), - session_(nullptr), - conn_(nullptr), - sql_result_(nullptr) + mv_sql_resource_() { } ObPartitionMVRowMergeIter::~ObPartitionMVRowMergeIter() { - read_result_.~ReadResult(); // need decons before session - ObMviewCompactionHelper::release_inner_connection(conn_); - ObMviewCompactionHelper::release_inner_session(free_session_ctx_, session_); } int ObPartitionMVRowMergeIter::init(const ObMergeParameter &merge_param, @@ -2237,20 +2247,17 @@ int ObPartitionMVRowMergeIter::inner_init(const ObMergeParameter &merge_param) const ObSqlString &sql = merge_param.mview_merge_param_->refresh_sqls_[sql_idx_].sql_; if (OB_FAIL(ObMviewCompactionHelper::create_inner_session(merge_param.get_schema()->is_oracle_mode(), merge_param.mview_merge_param_->database_id_, - free_session_ctx_, session_))) { + mv_sql_resource_.free_session_ctx_, + mv_sql_resource_.session_))) { LOG_WARN("Failed to create inner session", K(ret), KPC(merge_param.mview_merge_param_)); - } else if (OB_FAIL(ObMviewCompactionHelper::create_inner_connection(session_, conn_))) { + } else if (OB_FAIL(ObMviewCompactionHelper::create_inner_connection(mv_sql_resource_.session_, mv_sql_resource_.conn_))) { LOG_WARN("Failed to create inner connection", K(ret), K_(sql_idx)); - } else if (OB_FAIL(conn_->execute_read(GCONF.cluster_id, MTL_ID(), sql.ptr(), read_result_))) { + } else if (OB_FAIL(mv_sql_resource_.conn_->execute_read(GCONF.cluster_id, MTL_ID(), sql.ptr(), mv_sql_resource_.read_result_))) { LOG_WARN("Failed to execute", K(ret), K_(sql_idx), K(sql)); - } else if (OB_ISNULL(sql_result_ = static_cast(read_result_.get_result()))) { + } else if (OB_ISNULL(mv_sql_resource_.sql_result_ = static_cast(mv_sql_resource_.read_result_.get_result()))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Unexpected null sql result", K(ret), K_(sql_idx), K(sql)); } - if (OB_FAIL(ret)) { - ObMviewCompactionHelper::release_inner_connection(conn_); - ObMviewCompactionHelper::release_inner_session(free_session_ctx_, session_); - } return ret; } @@ -2264,14 +2271,14 @@ int ObPartitionMVRowMergeIter::next() } else if (OB_UNLIKELY(iter_end_)) { ret = OB_ITER_END; } else if (FALSE_IT(curr_row_ = nullptr)) { - } else if (OB_FAIL(sql_result_->next())) { + } else if (OB_FAIL(mv_sql_resource_.sql_result_->next())) { if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("Failed to get next row", K(ret)); } else { iter_end_ = true; } } else { - const ObNewRow *new_row = sql_result_->get_row(); + const ObNewRow *new_row = mv_sql_resource_.sql_result_->get_row(); if (OB_UNLIKELY(nullptr == new_row || sql_read_col_cnt_ != new_row->get_count())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Unexpected result row", K(ret), K(schema_rowkey_column_cnt_), K(sql_read_col_cnt_), KPC(new_row)); diff --git a/src/storage/compaction/ob_partition_merge_iter.h b/src/storage/compaction/ob_partition_merge_iter.h index 7e7a4e602..884118973 100644 --- a/src/storage/compaction/ob_partition_merge_iter.h +++ b/src/storage/compaction/ob_partition_merge_iter.h @@ -410,14 +410,24 @@ private: class ObPartitionMVRowMergeIter final : public ObPartitionMergeIter { public: + struct ObMVSqlResource + { + ObMVSqlResource(); + ~ObMVSqlResource(); + TO_STRING_KV(K_(free_session_ctx), KP_(session), KP_(conn), KP_(sql_result)); + ObISQLClient::ReadResult read_result_; + sql::ObFreeSessionCtx free_session_ctx_; + sql::ObSQLSessionInfo *session_; + sqlclient::ObISQLConnection *conn_; + observer::ObInnerSQLResult *sql_result_; + }; ObPartitionMVRowMergeIter(common::ObIAllocator &allocator); virtual ~ObPartitionMVRowMergeIter(); virtual int init(const ObMergeParameter &merge_param, const int64_t refresh_sql_idx, const ObITableReadInfo *read_info) override; virtual int next() override; - TO_STRING_KV(K_(is_delete), K_(is_replace), K_(sql_idx), K_(sql_read_col_cnt), K_(store_col_cnt), - K_(free_session_ctx), KP_(session), KP_(conn), KP_(sql_result)); + TO_STRING_KV(K_(is_delete), K_(is_replace), K_(sql_idx), K_(sql_read_col_cnt), K_(store_col_cnt), K_(mv_sql_resource)); protected: virtual int inner_init(const ObMergeParameter &merge_param) override; virtual bool inner_check(const ObMergeParameter &merge_param) override; @@ -427,12 +437,8 @@ private: int64_t sql_idx_; int64_t sql_read_col_cnt_; int64_t store_col_cnt_; - ObISQLClient::ReadResult read_result_; blocksstable::ObDatumRow result_row_; - sql::ObFreeSessionCtx free_session_ctx_; - sql::ObSQLSessionInfo *session_; - sqlclient::ObISQLConnection *conn_; - observer::ObInnerSQLResult *sql_result_; + ObMVSqlResource mv_sql_resource_; }; static const int64_t DEFAULT_ITER_COUNT = 16;