bugfix: fix rowgroup not unlocked after ctrl-c command bugfix: fix scan error while some tpcc query rollbacked but not sync to delta
This commit is contained in:
@ -1482,6 +1482,13 @@ static void knl_u_datavec_init(knl_u_datavec_context* datavec_cxt)
|
||||
datavec_cxt->ivfflat_probes = 0;
|
||||
}
|
||||
|
||||
#ifdef ENABLE_HTAP
|
||||
static void knl_u_imcstore_init(knl_u_imcstore_context* imcstore_context)
|
||||
{
|
||||
imcstore_context->pinnedRowGroups = NIL;
|
||||
}
|
||||
#endif
|
||||
|
||||
void knl_session_init(knl_session_context* sess_cxt)
|
||||
{
|
||||
Assert (0 != strncmp(CurrentMemoryContext->name, "ErrorContext", sizeof("ErrorContext")));
|
||||
@ -1584,6 +1591,10 @@ void knl_session_init(knl_session_context* sess_cxt)
|
||||
|
||||
knl_u_opfusion_reuse_init(&sess_cxt->opfusion_reuse_ctx);
|
||||
|
||||
#ifdef ENABLE_HTAP
|
||||
knl_u_imcstore_init(&sess_cxt->imcstore_ctx);
|
||||
#endif
|
||||
|
||||
MemoryContextSeal(sess_cxt->top_mem_cxt);
|
||||
}
|
||||
|
||||
|
||||
@ -70,6 +70,7 @@
|
||||
#include "executor/executor.h"
|
||||
#ifdef ENABLE_HTAP
|
||||
#include "access/htap/imcucache_mgr.h"
|
||||
#include "access/htap/imcstore_am.h"
|
||||
#endif
|
||||
|
||||
#ifdef PGXC
|
||||
@ -718,6 +719,9 @@ void CStoreAbortCU()
|
||||
#endif
|
||||
CUListPrefetchAbort();
|
||||
}
|
||||
#ifdef ENABLE_HTAP
|
||||
UnlockRowGroups();
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@ -108,7 +108,8 @@ void IMCStore::InitScan(CStoreScanState* state, Snapshot snapshot)
|
||||
m_relation = state->ss_currentRelation;
|
||||
|
||||
m_imcstoreDesc = IMCU_CACHE->GetImcsDesc(RelationGetRelid(m_relation));
|
||||
m_currentRowGroups = NIL;
|
||||
UnlockRowGroups();
|
||||
u_sess->imcstore_ctx.pinnedRowGroups = NIL;
|
||||
m_ctidCol = m_imcstoreDesc->imcsNatts;
|
||||
m_deltaScanCurr = 0;
|
||||
m_deltaMaskMax = 0;
|
||||
@ -420,7 +421,8 @@ bool IMCStore::LoadCUDesc(
|
||||
if (rowgroup != NULL) {
|
||||
pthread_rwlock_rdlock(&rowgroup->m_mutex);
|
||||
MemoryContext old = MemoryContextSwitchTo(m_scanMemContext);
|
||||
m_currentRowGroups = lappend(m_currentRowGroups, rowgroup);
|
||||
PinnedRowGroup* pinned = New(CurrentMemoryContext)PinnedRowGroup(rowgroup, m_imcstoreDesc);
|
||||
u_sess->imcstore_ctx.pinnedRowGroups = lappend(u_sess->imcstore_ctx.pinnedRowGroups, pinned);
|
||||
MemoryContextSwitchTo(old);
|
||||
}
|
||||
char* valPtr = NULL;
|
||||
@ -532,14 +534,11 @@ void IMCStore::GetCUDeleteMaskIfNeed(_in_ uint32 cuid, _in_ Snapshot snapShot)
|
||||
DeltaOperationType ctidtpye;
|
||||
|
||||
while ((item = deltaIter.GetNext(&ctidtpye, nullptr)) != NULL) {
|
||||
if (ctidtpye == DeltaOperationType::IMCSTORE_INSERT) {
|
||||
BlockNumber blockoffset = ItemPointerGetBlockNumber(item) - cuid * MAX_IMCS_PAGES_ONE_CU;
|
||||
OffsetNumber offset = ItemPointerGetOffsetNumber(item);
|
||||
uint64 idx = blockoffset * MAX_POSSIBLE_ROW_PER_PAGE + offset;
|
||||
m_cuDeltaMask[idx >> 3] |= (1 << (idx % 8));
|
||||
m_deltaMaskMax = Max(idx, m_deltaMaskMax);
|
||||
continue;
|
||||
}
|
||||
BlockNumber blockoffset = ItemPointerGetBlockNumber(item) - cuid * MAX_IMCS_PAGES_ONE_CU;
|
||||
OffsetNumber offset = ItemPointerGetOffsetNumber(item);
|
||||
uint64 idx = blockoffset * MAX_POSSIBLE_ROW_PER_PAGE + offset;
|
||||
m_cuDeltaMask[idx >> 3] |= (1 << (idx % 8));
|
||||
m_deltaMaskMax = Max(idx, m_deltaMaskMax);
|
||||
if (cu == NULL) {
|
||||
continue;
|
||||
}
|
||||
@ -785,14 +784,17 @@ RETRY_LOAD_CU:
|
||||
return cuPtr;
|
||||
}
|
||||
|
||||
void IMCStore::UnlockRowGroups()
|
||||
void UnlockRowGroups()
|
||||
{
|
||||
if (!m_currentRowGroups) return;
|
||||
ListCell *lc = NULL;
|
||||
foreach(lc, m_currentRowGroups) {
|
||||
RowGroup* rowgroup = (RowGroup*)lfirst(lc);
|
||||
pthread_rwlock_unlock(&rowgroup->m_mutex);
|
||||
m_imcstoreDesc->UnReferenceRowGroup();
|
||||
if (!u_sess->imcstore_ctx.pinnedRowGroups) {
|
||||
return;
|
||||
}
|
||||
list_free_ext(m_currentRowGroups);
|
||||
ListCell *lc = NULL;
|
||||
foreach(lc, u_sess->imcstore_ctx.pinnedRowGroups) {
|
||||
PinnedRowGroup* pinned = (PinnedRowGroup*)lfirst(lc);
|
||||
pthread_rwlock_unlock(&pinned->rowgroup->m_mutex);
|
||||
pinned->desc->UnReferenceRowGroup();
|
||||
delete pinned;
|
||||
}
|
||||
list_free_ext(u_sess->imcstore_ctx.pinnedRowGroups);
|
||||
}
|
||||
|
||||
@ -76,7 +76,6 @@ public:
|
||||
void FillPerRowGroupDelta(_in_ IMCStoreScanState* state, _in_ uint32 cuid, _out_ VectorBatch* vecBatchOut);
|
||||
bool InsertDeltaRowToBatch(_in_ IMCStoreScanState* state, ItemPointerData item, _out_ VectorBatch* vecBatchOut);
|
||||
bool ImcstoreFillByDeltaScan(_in_ CStoreScanState* state, _out_ VectorBatch* vecBatchOut) override;
|
||||
void UnlockRowGroups();
|
||||
|
||||
private:
|
||||
IMCUStorage** m_imcuStorage;
|
||||
@ -90,5 +89,12 @@ private:
|
||||
List* m_currentRowGroups;
|
||||
};
|
||||
|
||||
typedef struct PinnedRowGroup : public BaseObject {
|
||||
RowGroup *rowgroup;
|
||||
IMCSDesc *desc;
|
||||
PinnedRowGroup(RowGroup *rowgroup, IMCSDesc *desc) : rowgroup(rowgroup), desc(desc) {};
|
||||
} PinnedRowGroup;
|
||||
void UnlockRowGroups();
|
||||
|
||||
#endif // ENABLE_HTAP
|
||||
#endif // IMCSTORE_AM_H
|
||||
|
||||
@ -3014,6 +3014,12 @@ typedef struct knl_u_datavec_context {
|
||||
int ivfflat_probes;
|
||||
} knl_u_datavec_context;
|
||||
|
||||
#ifdef ENABLE_HTAP
|
||||
typedef struct knl_u_imcstore_context {
|
||||
List* pinnedRowGroups;
|
||||
} knl_u_imcstore_context;
|
||||
#endif
|
||||
|
||||
typedef struct knl_session_context {
|
||||
volatile knl_session_status status;
|
||||
/* used for threadworker, elem in m_readySessionList */
|
||||
@ -3170,6 +3176,10 @@ typedef struct knl_session_context {
|
||||
|
||||
knl_u_datavec_context datavec_ctx;
|
||||
|
||||
#ifdef ENABLE_HTAP
|
||||
knl_u_imcstore_context imcstore_ctx;
|
||||
#endif
|
||||
|
||||
} knl_session_context;
|
||||
|
||||
enum stp_xact_err_type {
|
||||
|
||||
Reference in New Issue
Block a user