From 1ddd0c5115aee4305fa4e94605f086949a3bbaa4 Mon Sep 17 00:00:00 2001 From: syj <904673462@qq.com> Date: Fri, 25 Jun 2021 17:27:56 +0800 Subject: [PATCH] Fix bugs when cstore tables with unique index are updated. --- .../storage/access/cbtree/cbtree.cpp | 1 + .../storage/access/nbtree/nbtinsert.cpp | 1 + .../storage/cstore/cstore_delete.cpp | 39 +++++- .../storage/cstore/cstore_insert.cpp | 73 ++++++---- .../storage/cstore/cstore_update.cpp | 132 +++++++++++++++++- src/include/access/cstore_delete.h | 2 + src/include/access/cstore_insert.h | 19 +++ src/include/access/cstore_update.h | 6 + 8 files changed, 232 insertions(+), 41 deletions(-) diff --git a/src/gausskernel/storage/access/cbtree/cbtree.cpp b/src/gausskernel/storage/access/cbtree/cbtree.cpp index 7759cd7ff..2bc560f24 100644 --- a/src/gausskernel/storage/access/cbtree/cbtree.cpp +++ b/src/gausskernel/storage/access/cbtree/cbtree.cpp @@ -261,6 +261,7 @@ void CheckUniqueOnOtherIdx(Relation index, Relation heapRel, Datum* values, bool if (cudescScan != NULL) { cudescScan->Destroy(); + delete cudescScan; cudescScan = NULL; } diff --git a/src/gausskernel/storage/access/nbtree/nbtinsert.cpp b/src/gausskernel/storage/access/nbtree/nbtinsert.cpp index a528cabe5..52ebf5601 100644 --- a/src/gausskernel/storage/access/nbtree/nbtinsert.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtinsert.cpp @@ -144,6 +144,7 @@ bool _bt_doinsert(Relation rel, IndexTuple itup, IndexUniqueCheck checkUnique, R if (cudescScan != NULL) { cudescScan->Destroy(); + delete cudescScan; cudescScan = NULL; } diff --git a/src/gausskernel/storage/cstore/cstore_delete.cpp b/src/gausskernel/storage/cstore/cstore_delete.cpp index 785e967c3..c18d98597 100644 --- a/src/gausskernel/storage/cstore/cstore_delete.cpp +++ b/src/gausskernel/storage/cstore/cstore_delete.cpp @@ -261,18 +261,23 @@ void CStoreDelete::PutDeleteBatch(_in_ VectorBatch* batch, _in_ JunkFilter* junk if (IsFull()) { // execute delete if parital sort is full - m_totalDeleteNum += (uint64)(this->*m_ExecDeletePtr)(); - - // parital delete need commad id ++ - CommandCounterIncrement(); - - // reset sort state - ResetSortState(); + PartialDelete(); } return; } +void CStoreDelete::PartialDelete() +{ + m_totalDeleteNum += (uint64)(this->*m_ExecDeletePtr)(); + + // parital delete need commad id ++ + CommandCounterIncrement(); + + // reset sort state + ResetSortState(); +} + uint64 CStoreDelete::ExecDelete() { m_totalDeleteNum += (uint64)(this->*m_ExecDeletePtr)(); @@ -396,6 +401,17 @@ void CStoreDelete::PutDeleteBatchForUpdate(_in_ VectorBatch* batch, _in_ JunkFil m_curSortedNum += batch->m_rows; } +void CStoreDelete::PutDeleteBatchForUpdate(_in_ VectorBatch* batch, _in_ int startIdx, _in_ int endIdx) { + Assert(batch && m_sortBatch && m_deleteSortState); + + /* copy batch data */ + m_sortBatch->Copy(batch, startIdx, endIdx); + + /* put batch */ + m_deleteSortState->sort_putbatch(m_deleteSortState, m_sortBatch, 0, m_sortBatch->m_rows); + m_curSortedNum += m_sortBatch->m_rows; +} + uint64 CStoreDelete::ExecDeleteForTable() { Assert(m_relation && m_estate && m_sortBatch && m_deleteSortState && m_rowOffset); @@ -514,6 +530,15 @@ uint64 CStoreDelete::ExecDeleteForPartition() batchsort_getbatch(m_deleteSortState, true, m_sortBatch); + /* + * ExecDeleteForPartition may be invoked by CStore update multi times. + * We should close the m_deltaRelation opened last time. + */ + if (m_deltaRealtion != NULL) { + relation_close(m_deltaRealtion, NoLock); + m_deltaRealtion = NULL; + } + while (!BatchIsNull(m_sortBatch)) { ScalarValue* partidValues = m_sortBatch->m_arr[m_partidIdx - 1].m_vals; ScalarValue* tidValues = m_sortBatch->m_arr[m_ctidIdx - 1].m_vals; diff --git a/src/gausskernel/storage/cstore/cstore_insert.cpp b/src/gausskernel/storage/cstore/cstore_insert.cpp index 3559e8426..c169d0f0b 100644 --- a/src/gausskernel/storage/cstore/cstore_insert.cpp +++ b/src/gausskernel/storage/cstore/cstore_insert.cpp @@ -1924,6 +1924,20 @@ void CStoreInsert::CUInsert(_in_ BatchCUData* CUData, _in_ int options) SaveAll(options, CUData->bitmap); } +void CStoreInsert::SortAndInsert(int options) +{ + /* run sort */ + m_sorter->RunSort(); + + /* reset and fetch next batch of values */ + DoBatchInsert(options); + + m_sorter->Reset(IsEnd()); + + /* reset and free all memory blocks */ + m_tmpBatchRows->reset(false); +} + // BatchInsert // Vertor interface for Insert into CStore table void CStoreInsert::BatchInsert(_in_ VectorBatch* pBatch, _in_ int options) @@ -1945,14 +1959,7 @@ void CStoreInsert::BatchInsert(_in_ VectorBatch* pBatch, _in_ int options) } if (m_sorter->IsFull() || IsEnd()) { - m_sorter->RunSort(); - - /* reset and fetch next batch of values */ - DoBatchInsert(options); - m_sorter->Reset(IsEnd()); - - /* reset and free all memory blocks */ - m_tmpBatchRows->reset(false); + SortAndInsert(options); } } @@ -2118,16 +2125,7 @@ void CStoreInsert::FlashData(int options) /* check sorter whether have data which not insert */ if (m_sorter->GetRowNum() > 0) { - /* run sort */ - m_sorter->RunSort(); - - /* reset and fetch next batch of values */ - DoBatchInsert(options); - - m_sorter->Reset(IsEnd()); - - /* reset and free all memory blocks */ - m_tmpBatchRows->reset(false); + SortAndInsert(options); } } else { /* flash buffered batch rows */ @@ -2996,7 +2994,8 @@ int PartitionValueCache::FillBuffer() } CUDescScan::CUDescScan(_in_ Relation relation) - : m_cudesc(NULL), m_cudescIndex(NULL), m_snapshot(NULL), m_cuids(NIL), m_deletemasks(NIL), m_valids(NIL) + : m_cudesc(NULL), m_cudescIndex(NULL), m_snapshot(NULL), + m_cuids(NIL), m_deletemasks(NIL), m_needFreeMasks(NIL), m_valids(NIL) { m_cudesc = heap_open(relation->rd_rel->relcudescrelid, AccessShareLock); m_cudescIndex = index_open(m_cudesc->rd_rel->relcudescidx, AccessShareLock); @@ -3016,24 +3015,37 @@ CUDescScan::~CUDescScan() m_snapshot = NULL; m_cuids = NIL; m_deletemasks = NIL; + m_needFreeMasks = NIL; m_valids = NIL; } +void CUDescScan::FreeCache() +{ + ListCell* needFreeMaskCell = NULL; + ListCell* maskCell = NULL; + forboth (needFreeMaskCell, m_needFreeMasks, maskCell, m_deletemasks) { + if ((bool)lfirst_int(needFreeMaskCell)) { + pfree(lfirst(maskCell)); + } + } + + list_free_ext(m_cuids); + list_free_ext(m_deletemasks); + list_free_ext(m_needFreeMasks); + list_free_ext(m_valids); +} + void CUDescScan::Destroy() { index_close(m_cudescIndex, AccessShareLock); heap_close(m_cudesc, AccessShareLock); - list_free_ext(m_cuids); - list_free_ext(m_deletemasks); - list_free_ext(m_valids); + FreeCache(); } void CUDescScan::ResetSnapshot(Snapshot snapshot) { m_snapshot = snapshot; - list_free_ext(m_cuids); - list_free_ext(m_deletemasks); - list_free_ext(m_valids); + FreeCache(); } bool CUDescScan::CheckAliveInCache(uint32 CUId, uint32 rownum, bool* found) @@ -3049,7 +3061,8 @@ bool CUDescScan::CheckAliveInCache(uint32 CUId, uint32 rownum, bool* found) *found = true; bool valid = (bool)lfirst_int(validCell); if (valid) { - unsigned char* cachedDelMask = (unsigned char*)lfirst(delmaskCell); + int8* bitmap = (int8*)lfirst(delmaskCell); + unsigned char* cachedDelMask = (unsigned char*)VARDATA_ANY(bitmap); if (cachedDelMask == NULL) { /* All rows are alive*/ return true; @@ -3089,7 +3102,8 @@ bool CUDescScan::CheckItemIsAlive(ItemPointer tid) TupleDesc cudescTupdesc = m_cudesc->rd_att; SysScanDesc scanDesc = systable_beginscan_ordered(m_cudesc, m_cudescIndex, m_snapshot, 2, m_scanKey); - unsigned char* delMask = NULL; + int8* delMask = NULL; + bool needFreeMask = false; bool valid = false; if ((tup = systable_getnext_ordered(scanDesc, ForwardScanDirection)) != NULL) { @@ -3105,11 +3119,11 @@ bool CUDescScan::CheckItemIsAlive(ItemPointer tid) /* Because new memory may be created, so we have to check and free in time. */ if ((Pointer)bitmap != DatumGetPointer(v)) { - pfree_ext(bitmap); + needFreeMask = true; } /* Record the mask. */ - delMask = cuDelMask; + delMask = bitmap; } valid = true; } else { @@ -3122,6 +3136,7 @@ bool CUDescScan::CheckItemIsAlive(ItemPointer tid) /* Save the sacn result in cache. */ lappend_oid(m_cuids, (Oid)CUId); lappend(m_deletemasks, delMask); + lappend_int(m_needFreeMasks, (int)needFreeMask); lappend_int(m_valids, (int)valid); return isAlive; diff --git a/src/gausskernel/storage/cstore/cstore_update.cpp b/src/gausskernel/storage/cstore/cstore_update.cpp index 7f44883e4..d5db3430c 100644 --- a/src/gausskernel/storage/cstore/cstore_update.cpp +++ b/src/gausskernel/storage/cstore/cstore_update.cpp @@ -61,6 +61,8 @@ CStoreUpdate::CStoreUpdate(_in_ Relation rel, _in_ EState* estate, _in_ Plan* pl m_partionInsert->SetPartitionCacheStrategy(FLASH_WHEN_SWICH_PARTITION); m_insert = NULL; } + + m_hasUniqueIdx = CheckHasUniqueIdx(); } CStoreUpdate::~CStoreUpdate() @@ -212,7 +214,9 @@ uint64 CStoreUpdate::ExecUpdate(_in_ VectorBatch* batch, _in_ int options) JunkFilter* junkfilter = m_resultRelInfo->ri_junkFilter; // delete - m_delete->PutDeleteBatch(batch, junkfilter); + if (!m_hasUniqueIdx) { + m_delete->PutDeleteBatch(batch, junkfilter); + } int oriCols = batch->m_cols; batch->m_cols = junkfilter->jf_cleanTupType->natts; @@ -222,16 +226,134 @@ uint64 CStoreUpdate::ExecUpdate(_in_ VectorBatch* batch, _in_ int options) ExecVecConstraints(m_resultRelInfo, batch, m_estate); // insert then batch - if (m_isPartition) - m_partionInsert->BatchInsert(batch, options); - else - m_insert->BatchInsert(batch, options); + if (!m_hasUniqueIdx) { + /* + * When there has no unique index, the delete threshold(e.g. 4200000) + * is larger than default insert threshold(e.g. 60000), the relation may + * firstly insert and then delete. + */ + if (m_isPartition) { + m_partionInsert->BatchInsert(batch, options); + } else { + m_insert->BatchInsert(batch, options); + } + } else { + /* + * When there has unique index, we must delete firstly and then insert, + * or the index key of new data and old data may violate. + */ + if (m_isPartition) { + PartitionBatchDeleteAndInsert(batch, oriCols, options, junkfilter); + } else { + BatchDeleteAndInsert(batch, oriCols, options, junkfilter); + } + } batch->m_cols = oriCols; return (uint64)(uint32)batch->m_rows; } +/* + * @Description: Do batch delete and insert. In this function, the number of rows those are deleted is + * strictly equals number of rows will be inserted. It is usually used when cstore has unique index. + */ +void CStoreUpdate::BatchDeleteAndInsert(VectorBatch *batch, int oriBatchCols, int options, JunkFilter *junkfilter) +{ + int currentCols = batch->m_cols; + /* keep memory space from leaking during bulk-insert */ + MemoryContext insertCnxt = m_insert->GetTmpMemCnxt(); + MemoryContext updateCnxt = MemoryContextSwitchTo(insertCnxt); + + CStorePSort* sorter = m_insert->GetSorter(); + + if (sorter != NULL) { + /* Step 1: relation has partial cluster key */ + Assert(batch->m_cols == m_insert->m_relation->rd_att->natts); + + { + AutoContextSwitch updateContext(updateCnxt); + batch->m_cols = oriBatchCols; + m_delete->PutDeleteBatch(batch, junkfilter); + batch->m_cols = currentCols; + } + + sorter->PutVecBatch(m_insert->m_relation, batch); + + if (sorter->IsFull()) { + { + AutoContextSwitch updateContext(updateCnxt); + m_delete->PartialDelete(); + } + m_insert->SortAndInsert(options); + } + } else { + /* Step 2: relation doesn't have partial cluster key */ + bulkload_rows* bufferedBatchRows = m_insert->GetBufferedBatchRows(); + + Assert(batch->m_rows <= BatchMaxSize); + Assert(batch->m_cols && m_insert->m_relation->rd_att->natts); + Assert(bufferedBatchRows->m_rows_maxnum > 0); + Assert(bufferedBatchRows->m_rows_maxnum % BatchMaxSize == 0); + + int startIdx = 0; + int lastStartIdx = startIdx; + for (;;) { + /* we need cache data until batchrows is full */ + bool needInsert = bufferedBatchRows->append_one_vector( + RelationGetDescr(m_relation), batch, &startIdx, m_insert->m_cstorInsertMem); + if (startIdx > lastStartIdx) { + AutoContextSwitch updateContext(updateCnxt); + batch->m_cols = oriBatchCols; + m_delete->PutDeleteBatchForUpdate(batch, lastStartIdx, startIdx); + batch->m_cols = currentCols; + } + + if (needInsert) { + { + AutoContextSwitch updateContext(updateCnxt); + m_insert->BatchInsertCommon(bufferedBatchRows, options); + } + bufferedBatchRows->reset(true); + lastStartIdx = startIdx; + } else { + break; + } + } + } + + MemoryContextReset(insertCnxt); + (void)MemoryContextSwitchTo(updateCnxt); +} + +void CStoreUpdate::PartitionBatchDeleteAndInsert(VectorBatch *batch, int oriBatchCols, int options, JunkFilter *junkfilter) +{ + int currentCols = batch->m_cols; + batch->m_cols = oriBatchCols; + + m_delete->PutDeleteBatch(batch, junkfilter); + m_delete->PartialDelete(); + + batch->m_cols = currentCols; + + m_partionInsert->BatchInsert(batch, options); +} + +bool CStoreUpdate::CheckHasUniqueIdx() +{ + bool hasUniqueIdx = false; + if (m_resultRelInfo && m_resultRelInfo->ri_NumIndices > 0) { + IndexInfo** indexInfos = m_resultRelInfo->ri_IndexRelationInfo; + for (int i = 0; i < m_resultRelInfo->ri_NumIndices; ++i) { + if (indexInfos[i]->ii_Unique) { + hasUniqueIdx = true; + break; + } + } + } + return hasUniqueIdx; +} + void CStoreUpdate::EndUpdate(_in_ int options) { Assert(m_delete); diff --git a/src/include/access/cstore_delete.h b/src/include/access/cstore_delete.h index ec2f628da..0d12ff626 100644 --- a/src/include/access/cstore_delete.h +++ b/src/include/access/cstore_delete.h @@ -40,6 +40,8 @@ public: void InitSortState(); void InitSortState(_in_ TupleDesc sortTupDesc, _in_ int partidAttNo, _in_ int ctidAttNo); void PutDeleteBatch(_in_ VectorBatch *batch, _in_ JunkFilter *junkfilter); + void PutDeleteBatchForUpdate(_in_ VectorBatch *batch, _in_ int startIdx=0, _in_ int endIdx=-1); + void PartialDelete(); uint64 ExecDelete(); void setReportErrorForUpdate(bool isEnable) { diff --git a/src/include/access/cstore_insert.h b/src/include/access/cstore_insert.h index c63752603..5c2682f17 100644 --- a/src/include/access/cstore_insert.h +++ b/src/include/access/cstore_insert.h @@ -111,6 +111,23 @@ public: static void InitIndexInsertArg(Relation heap_rel, const int *keys_map, int nkeys, InsertArg &args); void InitInsertMemArg(Plan *plan, MemInfoArg *ArgmemInfo); + inline CStorePSort* GetSorter() + { + return m_sorter; + } + + inline MemoryContext GetTmpMemCnxt() + { + return m_tmpMemCnxt; + } + + inline bulkload_rows* GetBufferedBatchRows() + { + return m_bufferedBatchRows; + } + + void SortAndInsert(int options); + Relation m_relation; CU ***m_aio_cu_PPtr; AioDispatchCUDesc_t ***m_aio_dispath_cudesc; @@ -430,6 +447,7 @@ public: bool CheckItemIsAlive(ItemPointer tid); private: + void FreeCache(); inline bool IsDeadRow(uint32 row, unsigned char* cuDelMask); bool CheckAliveInCache(uint32 CUId, uint32 rownum, bool* found); @@ -440,6 +458,7 @@ private: Snapshot m_snapshot; List* m_cuids; List* m_deletemasks; + List* m_needFreeMasks; List* m_valids; }; diff --git a/src/include/access/cstore_update.h b/src/include/access/cstore_update.h index ab93a9712..a5b2a9889 100644 --- a/src/include/access/cstore_update.h +++ b/src/include/access/cstore_update.h @@ -43,6 +43,11 @@ public: MemInfoArg *m_insMemInfo; private: + void BatchDeleteAndInsert(VectorBatch *batch, int oriBatchCols, int options, JunkFilter *junkfilter); + void PartitionBatchDeleteAndInsert(VectorBatch *batch, int oriBatchCols, int options, JunkFilter *junkfilter); + + bool CheckHasUniqueIdx(); + Relation m_relation; CStoreDelete *m_delete; @@ -52,6 +57,7 @@ private: ResultRelInfo *m_resultRelInfo; EState *m_estate; bool m_isPartition; + bool m_hasUniqueIdx; static int BATCHROW_TIMES; InsertArg m_insert_args;