diff --git a/src/gausskernel/process/threadpool/knl_session.cpp b/src/gausskernel/process/threadpool/knl_session.cpp index da08250ed..4b1d651a9 100755 --- a/src/gausskernel/process/threadpool/knl_session.cpp +++ b/src/gausskernel/process/threadpool/knl_session.cpp @@ -1482,6 +1482,13 @@ static void knl_u_datavec_init(knl_u_datavec_context* datavec_cxt) datavec_cxt->ivfflat_probes = 0; } +#ifdef ENABLE_HTAP +static void knl_u_imcstore_init(knl_u_imcstore_context* imcstore_context) +{ + imcstore_context->pinnedRowGroups = NIL; +} +#endif + void knl_session_init(knl_session_context* sess_cxt) { Assert (0 != strncmp(CurrentMemoryContext->name, "ErrorContext", sizeof("ErrorContext"))); @@ -1584,6 +1591,10 @@ void knl_session_init(knl_session_context* sess_cxt) knl_u_opfusion_reuse_init(&sess_cxt->opfusion_reuse_ctx); +#ifdef ENABLE_HTAP + knl_u_imcstore_init(&sess_cxt->imcstore_ctx); +#endif + MemoryContextSeal(sess_cxt->top_mem_cxt); } diff --git a/src/gausskernel/storage/cstore/cstore_am.cpp b/src/gausskernel/storage/cstore/cstore_am.cpp index 5c814f1c9..5a8f44953 100644 --- a/src/gausskernel/storage/cstore/cstore_am.cpp +++ b/src/gausskernel/storage/cstore/cstore_am.cpp @@ -70,6 +70,7 @@ #include "executor/executor.h" #ifdef ENABLE_HTAP #include "access/htap/imcucache_mgr.h" +#include "access/htap/imcstore_am.h" #endif #ifdef PGXC @@ -718,6 +719,9 @@ void CStoreAbortCU() #endif CUListPrefetchAbort(); } +#ifdef ENABLE_HTAP + UnlockRowGroups(); +#endif } /* diff --git a/src/gausskernel/storage/htap/imcstore_am.cpp b/src/gausskernel/storage/htap/imcstore_am.cpp index 34a32d8f5..481c773df 100644 --- a/src/gausskernel/storage/htap/imcstore_am.cpp +++ b/src/gausskernel/storage/htap/imcstore_am.cpp @@ -108,7 +108,8 @@ void IMCStore::InitScan(CStoreScanState* state, Snapshot snapshot) m_relation = state->ss_currentRelation; m_imcstoreDesc = IMCU_CACHE->GetImcsDesc(RelationGetRelid(m_relation)); - m_currentRowGroups = NIL; + UnlockRowGroups(); + u_sess->imcstore_ctx.pinnedRowGroups = NIL; m_ctidCol = m_imcstoreDesc->imcsNatts; m_deltaScanCurr = 0; m_deltaMaskMax = 0; @@ -420,7 +421,8 @@ bool IMCStore::LoadCUDesc( if (rowgroup != NULL) { pthread_rwlock_rdlock(&rowgroup->m_mutex); MemoryContext old = MemoryContextSwitchTo(m_scanMemContext); - m_currentRowGroups = lappend(m_currentRowGroups, rowgroup); + PinnedRowGroup* pinned = New(CurrentMemoryContext)PinnedRowGroup(rowgroup, m_imcstoreDesc); + u_sess->imcstore_ctx.pinnedRowGroups = lappend(u_sess->imcstore_ctx.pinnedRowGroups, pinned); MemoryContextSwitchTo(old); } char* valPtr = NULL; @@ -532,14 +534,11 @@ void IMCStore::GetCUDeleteMaskIfNeed(_in_ uint32 cuid, _in_ Snapshot snapShot) DeltaOperationType ctidtpye; while ((item = deltaIter.GetNext(&ctidtpye, nullptr)) != NULL) { - if (ctidtpye == DeltaOperationType::IMCSTORE_INSERT) { - BlockNumber blockoffset = ItemPointerGetBlockNumber(item) - cuid * MAX_IMCS_PAGES_ONE_CU; - OffsetNumber offset = ItemPointerGetOffsetNumber(item); - uint64 idx = blockoffset * MAX_POSSIBLE_ROW_PER_PAGE + offset; - m_cuDeltaMask[idx >> 3] |= (1 << (idx % 8)); - m_deltaMaskMax = Max(idx, m_deltaMaskMax); - continue; - } + BlockNumber blockoffset = ItemPointerGetBlockNumber(item) - cuid * MAX_IMCS_PAGES_ONE_CU; + OffsetNumber offset = ItemPointerGetOffsetNumber(item); + uint64 idx = blockoffset * MAX_POSSIBLE_ROW_PER_PAGE + offset; + m_cuDeltaMask[idx >> 3] |= (1 << (idx % 8)); + m_deltaMaskMax = Max(idx, m_deltaMaskMax); if (cu == NULL) { continue; } @@ -785,14 +784,17 @@ RETRY_LOAD_CU: return cuPtr; } -void IMCStore::UnlockRowGroups() +void UnlockRowGroups() { - if (!m_currentRowGroups) return; - ListCell *lc = NULL; - foreach(lc, m_currentRowGroups) { - RowGroup* rowgroup = (RowGroup*)lfirst(lc); - pthread_rwlock_unlock(&rowgroup->m_mutex); - m_imcstoreDesc->UnReferenceRowGroup(); + if (!u_sess->imcstore_ctx.pinnedRowGroups) { + return; } - list_free_ext(m_currentRowGroups); + ListCell *lc = NULL; + foreach(lc, u_sess->imcstore_ctx.pinnedRowGroups) { + PinnedRowGroup* pinned = (PinnedRowGroup*)lfirst(lc); + pthread_rwlock_unlock(&pinned->rowgroup->m_mutex); + pinned->desc->UnReferenceRowGroup(); + delete pinned; + } + list_free_ext(u_sess->imcstore_ctx.pinnedRowGroups); } diff --git a/src/include/access/htap/imcstore_am.h b/src/include/access/htap/imcstore_am.h index fd1e40e57..b1eb99baa 100644 --- a/src/include/access/htap/imcstore_am.h +++ b/src/include/access/htap/imcstore_am.h @@ -76,7 +76,6 @@ public: void FillPerRowGroupDelta(_in_ IMCStoreScanState* state, _in_ uint32 cuid, _out_ VectorBatch* vecBatchOut); bool InsertDeltaRowToBatch(_in_ IMCStoreScanState* state, ItemPointerData item, _out_ VectorBatch* vecBatchOut); bool ImcstoreFillByDeltaScan(_in_ CStoreScanState* state, _out_ VectorBatch* vecBatchOut) override; - void UnlockRowGroups(); private: IMCUStorage** m_imcuStorage; @@ -90,5 +89,12 @@ private: List* m_currentRowGroups; }; +typedef struct PinnedRowGroup : public BaseObject { + RowGroup *rowgroup; + IMCSDesc *desc; + PinnedRowGroup(RowGroup *rowgroup, IMCSDesc *desc) : rowgroup(rowgroup), desc(desc) {}; +} PinnedRowGroup; +void UnlockRowGroups(); + #endif // ENABLE_HTAP #endif // IMCSTORE_AM_H diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index 119494015..93fd313b3 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -3014,6 +3014,12 @@ typedef struct knl_u_datavec_context { int ivfflat_probes; } knl_u_datavec_context; +#ifdef ENABLE_HTAP +typedef struct knl_u_imcstore_context { + List* pinnedRowGroups; +} knl_u_imcstore_context; +#endif + typedef struct knl_session_context { volatile knl_session_status status; /* used for threadworker, elem in m_readySessionList */ @@ -3170,6 +3176,10 @@ typedef struct knl_session_context { knl_u_datavec_context datavec_ctx; +#ifdef ENABLE_HTAP + knl_u_imcstore_context imcstore_ctx; +#endif + } knl_session_context; enum stp_xact_err_type {