fix mv sql resource decons order

This commit is contained in:
haitaoyang 2025-01-03 05:15:30 +00:00 committed by ob-robot
parent d37313497c
commit 319471fa36
2 changed files with 38 additions and 25 deletions

View File

@ -2145,6 +2145,23 @@ int ObPartitionMinorMacroMergeIter::get_curr_macro_block(
return ret;
}
ObPartitionMVRowMergeIter::ObMVSqlResource::ObMVSqlResource()
: read_result_(),
free_session_ctx_(),
session_(nullptr),
conn_(nullptr),
sql_result_(nullptr)
{
}
ObPartitionMVRowMergeIter::ObMVSqlResource::~ObMVSqlResource()
{
read_result_.~ReadResult(); // need decons before session
ObMviewCompactionHelper::release_inner_connection(conn_);
ObMviewCompactionHelper::release_inner_session(free_session_ctx_, session_);
sql_result_ = nullptr;
}
/*
* ObPartitionRowMergeIter used for mv major merge
*/
@ -2155,20 +2172,13 @@ ObPartitionMVRowMergeIter::ObPartitionMVRowMergeIter(common::ObIAllocator &alloc
sql_idx_(-1),
sql_read_col_cnt_(0),
store_col_cnt_(0),
read_result_(),
result_row_(),
free_session_ctx_(),
session_(nullptr),
conn_(nullptr),
sql_result_(nullptr)
mv_sql_resource_()
{
}
ObPartitionMVRowMergeIter::~ObPartitionMVRowMergeIter()
{
read_result_.~ReadResult(); // need decons before session
ObMviewCompactionHelper::release_inner_connection(conn_);
ObMviewCompactionHelper::release_inner_session(free_session_ctx_, session_);
}
int ObPartitionMVRowMergeIter::init(const ObMergeParameter &merge_param,
@ -2237,20 +2247,17 @@ int ObPartitionMVRowMergeIter::inner_init(const ObMergeParameter &merge_param)
const ObSqlString &sql = merge_param.mview_merge_param_->refresh_sqls_[sql_idx_].sql_;
if (OB_FAIL(ObMviewCompactionHelper::create_inner_session(merge_param.get_schema()->is_oracle_mode(),
merge_param.mview_merge_param_->database_id_,
free_session_ctx_, session_))) {
mv_sql_resource_.free_session_ctx_,
mv_sql_resource_.session_))) {
LOG_WARN("Failed to create inner session", K(ret), KPC(merge_param.mview_merge_param_));
} else if (OB_FAIL(ObMviewCompactionHelper::create_inner_connection(session_, conn_))) {
} else if (OB_FAIL(ObMviewCompactionHelper::create_inner_connection(mv_sql_resource_.session_, mv_sql_resource_.conn_))) {
LOG_WARN("Failed to create inner connection", K(ret), K_(sql_idx));
} else if (OB_FAIL(conn_->execute_read(GCONF.cluster_id, MTL_ID(), sql.ptr(), read_result_))) {
} else if (OB_FAIL(mv_sql_resource_.conn_->execute_read(GCONF.cluster_id, MTL_ID(), sql.ptr(), mv_sql_resource_.read_result_))) {
LOG_WARN("Failed to execute", K(ret), K_(sql_idx), K(sql));
} else if (OB_ISNULL(sql_result_ = static_cast<observer::ObInnerSQLResult *>(read_result_.get_result()))) {
} else if (OB_ISNULL(mv_sql_resource_.sql_result_ = static_cast<observer::ObInnerSQLResult *>(mv_sql_resource_.read_result_.get_result()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null sql result", K(ret), K_(sql_idx), K(sql));
}
if (OB_FAIL(ret)) {
ObMviewCompactionHelper::release_inner_connection(conn_);
ObMviewCompactionHelper::release_inner_session(free_session_ctx_, session_);
}
return ret;
}
@ -2264,14 +2271,14 @@ int ObPartitionMVRowMergeIter::next()
} else if (OB_UNLIKELY(iter_end_)) {
ret = OB_ITER_END;
} else if (FALSE_IT(curr_row_ = nullptr)) {
} else if (OB_FAIL(sql_result_->next())) {
} else if (OB_FAIL(mv_sql_resource_.sql_result_->next())) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("Failed to get next row", K(ret));
} else {
iter_end_ = true;
}
} else {
const ObNewRow *new_row = sql_result_->get_row();
const ObNewRow *new_row = mv_sql_resource_.sql_result_->get_row();
if (OB_UNLIKELY(nullptr == new_row || sql_read_col_cnt_ != new_row->get_count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected result row", K(ret), K(schema_rowkey_column_cnt_), K(sql_read_col_cnt_), KPC(new_row));

View File

@ -410,14 +410,24 @@ private:
class ObPartitionMVRowMergeIter final : public ObPartitionMergeIter
{
public:
struct ObMVSqlResource
{
ObMVSqlResource();
~ObMVSqlResource();
TO_STRING_KV(K_(free_session_ctx), KP_(session), KP_(conn), KP_(sql_result));
ObISQLClient::ReadResult read_result_;
sql::ObFreeSessionCtx free_session_ctx_;
sql::ObSQLSessionInfo *session_;
sqlclient::ObISQLConnection *conn_;
observer::ObInnerSQLResult *sql_result_;
};
ObPartitionMVRowMergeIter(common::ObIAllocator &allocator);
virtual ~ObPartitionMVRowMergeIter();
virtual int init(const ObMergeParameter &merge_param,
const int64_t refresh_sql_idx,
const ObITableReadInfo *read_info) override;
virtual int next() override;
TO_STRING_KV(K_(is_delete), K_(is_replace), K_(sql_idx), K_(sql_read_col_cnt), K_(store_col_cnt),
K_(free_session_ctx), KP_(session), KP_(conn), KP_(sql_result));
TO_STRING_KV(K_(is_delete), K_(is_replace), K_(sql_idx), K_(sql_read_col_cnt), K_(store_col_cnt), K_(mv_sql_resource));
protected:
virtual int inner_init(const ObMergeParameter &merge_param) override;
virtual bool inner_check(const ObMergeParameter &merge_param) override;
@ -427,12 +437,8 @@ private:
int64_t sql_idx_;
int64_t sql_read_col_cnt_;
int64_t store_col_cnt_;
ObISQLClient::ReadResult read_result_;
blocksstable::ObDatumRow result_row_;
sql::ObFreeSessionCtx free_session_ctx_;
sql::ObSQLSessionInfo *session_;
sqlclient::ObISQLConnection *conn_;
observer::ObInnerSQLResult *sql_result_;
ObMVSqlResource mv_sql_resource_;
};
static const int64_t DEFAULT_ITER_COUNT = 16;