Fix bugs when cstore tables with unique index are updated.

This commit is contained in:
syj
2021-06-25 17:27:56 +08:00
parent 28ef7fe121
commit 1ddd0c5115
8 changed files with 232 additions and 41 deletions

View File

@ -261,6 +261,7 @@ void CheckUniqueOnOtherIdx(Relation index, Relation heapRel, Datum* values, bool
if (cudescScan != NULL) {
cudescScan->Destroy();
delete cudescScan;
cudescScan = NULL;
}

View File

@ -144,6 +144,7 @@ bool _bt_doinsert(Relation rel, IndexTuple itup, IndexUniqueCheck checkUnique, R
if (cudescScan != NULL) {
cudescScan->Destroy();
delete cudescScan;
cudescScan = NULL;
}

View File

@ -261,18 +261,23 @@ void CStoreDelete::PutDeleteBatch(_in_ VectorBatch* batch, _in_ JunkFilter* junk
if (IsFull()) {
// execute delete if parital sort is full
m_totalDeleteNum += (uint64)(this->*m_ExecDeletePtr)();
// parital delete need commad id ++
CommandCounterIncrement();
// reset sort state
ResetSortState();
PartialDelete();
}
return;
}
void CStoreDelete::PartialDelete()
{
m_totalDeleteNum += (uint64)(this->*m_ExecDeletePtr)();
// parital delete need commad id ++
CommandCounterIncrement();
// reset sort state
ResetSortState();
}
uint64 CStoreDelete::ExecDelete()
{
m_totalDeleteNum += (uint64)(this->*m_ExecDeletePtr)();
@ -396,6 +401,17 @@ void CStoreDelete::PutDeleteBatchForUpdate(_in_ VectorBatch* batch, _in_ JunkFil
m_curSortedNum += batch->m_rows;
}
void CStoreDelete::PutDeleteBatchForUpdate(_in_ VectorBatch* batch, _in_ int startIdx, _in_ int endIdx) {
Assert(batch && m_sortBatch && m_deleteSortState);
/* copy batch data */
m_sortBatch->Copy<false, false>(batch, startIdx, endIdx);
/* put batch */
m_deleteSortState->sort_putbatch(m_deleteSortState, m_sortBatch, 0, m_sortBatch->m_rows);
m_curSortedNum += m_sortBatch->m_rows;
}
uint64 CStoreDelete::ExecDeleteForTable()
{
Assert(m_relation && m_estate && m_sortBatch && m_deleteSortState && m_rowOffset);
@ -514,6 +530,15 @@ uint64 CStoreDelete::ExecDeleteForPartition()
batchsort_getbatch(m_deleteSortState, true, m_sortBatch);
/*
* ExecDeleteForPartition may be invoked by CStore update multi times.
* We should close the m_deltaRelation opened last time.
*/
if (m_deltaRealtion != NULL) {
relation_close(m_deltaRealtion, NoLock);
m_deltaRealtion = NULL;
}
while (!BatchIsNull(m_sortBatch)) {
ScalarValue* partidValues = m_sortBatch->m_arr[m_partidIdx - 1].m_vals;
ScalarValue* tidValues = m_sortBatch->m_arr[m_ctidIdx - 1].m_vals;

View File

@ -1924,6 +1924,20 @@ void CStoreInsert::CUInsert(_in_ BatchCUData* CUData, _in_ int options)
SaveAll(options, CUData->bitmap);
}
void CStoreInsert::SortAndInsert(int options)
{
/* run sort */
m_sorter->RunSort();
/* reset and fetch next batch of values */
DoBatchInsert(options);
m_sorter->Reset(IsEnd());
/* reset and free all memory blocks */
m_tmpBatchRows->reset(false);
}
// BatchInsert
// Vertor interface for Insert into CStore table
void CStoreInsert::BatchInsert(_in_ VectorBatch* pBatch, _in_ int options)
@ -1945,14 +1959,7 @@ void CStoreInsert::BatchInsert(_in_ VectorBatch* pBatch, _in_ int options)
}
if (m_sorter->IsFull() || IsEnd()) {
m_sorter->RunSort();
/* reset and fetch next batch of values */
DoBatchInsert(options);
m_sorter->Reset(IsEnd());
/* reset and free all memory blocks */
m_tmpBatchRows->reset(false);
SortAndInsert(options);
}
}
@ -2118,16 +2125,7 @@ void CStoreInsert::FlashData(int options)
/* check sorter whether have data which not insert */
if (m_sorter->GetRowNum() > 0) {
/* run sort */
m_sorter->RunSort();
/* reset and fetch next batch of values */
DoBatchInsert(options);
m_sorter->Reset(IsEnd());
/* reset and free all memory blocks */
m_tmpBatchRows->reset(false);
SortAndInsert(options);
}
} else {
/* flash buffered batch rows */
@ -2996,7 +2994,8 @@ int PartitionValueCache::FillBuffer()
}
CUDescScan::CUDescScan(_in_ Relation relation)
: m_cudesc(NULL), m_cudescIndex(NULL), m_snapshot(NULL), m_cuids(NIL), m_deletemasks(NIL), m_valids(NIL)
: m_cudesc(NULL), m_cudescIndex(NULL), m_snapshot(NULL),
m_cuids(NIL), m_deletemasks(NIL), m_needFreeMasks(NIL), m_valids(NIL)
{
m_cudesc = heap_open(relation->rd_rel->relcudescrelid, AccessShareLock);
m_cudescIndex = index_open(m_cudesc->rd_rel->relcudescidx, AccessShareLock);
@ -3016,24 +3015,37 @@ CUDescScan::~CUDescScan()
m_snapshot = NULL;
m_cuids = NIL;
m_deletemasks = NIL;
m_needFreeMasks = NIL;
m_valids = NIL;
}
void CUDescScan::FreeCache()
{
ListCell* needFreeMaskCell = NULL;
ListCell* maskCell = NULL;
forboth (needFreeMaskCell, m_needFreeMasks, maskCell, m_deletemasks) {
if ((bool)lfirst_int(needFreeMaskCell)) {
pfree(lfirst(maskCell));
}
}
list_free_ext(m_cuids);
list_free_ext(m_deletemasks);
list_free_ext(m_needFreeMasks);
list_free_ext(m_valids);
}
void CUDescScan::Destroy()
{
index_close(m_cudescIndex, AccessShareLock);
heap_close(m_cudesc, AccessShareLock);
list_free_ext(m_cuids);
list_free_ext(m_deletemasks);
list_free_ext(m_valids);
FreeCache();
}
void CUDescScan::ResetSnapshot(Snapshot snapshot)
{
m_snapshot = snapshot;
list_free_ext(m_cuids);
list_free_ext(m_deletemasks);
list_free_ext(m_valids);
FreeCache();
}
bool CUDescScan::CheckAliveInCache(uint32 CUId, uint32 rownum, bool* found)
@ -3049,7 +3061,8 @@ bool CUDescScan::CheckAliveInCache(uint32 CUId, uint32 rownum, bool* found)
*found = true;
bool valid = (bool)lfirst_int(validCell);
if (valid) {
unsigned char* cachedDelMask = (unsigned char*)lfirst(delmaskCell);
int8* bitmap = (int8*)lfirst(delmaskCell);
unsigned char* cachedDelMask = (unsigned char*)VARDATA_ANY(bitmap);
if (cachedDelMask == NULL) {
/* All rows are alive*/
return true;
@ -3089,7 +3102,8 @@ bool CUDescScan::CheckItemIsAlive(ItemPointer tid)
TupleDesc cudescTupdesc = m_cudesc->rd_att;
SysScanDesc scanDesc = systable_beginscan_ordered(m_cudesc, m_cudescIndex, m_snapshot, 2, m_scanKey);
unsigned char* delMask = NULL;
int8* delMask = NULL;
bool needFreeMask = false;
bool valid = false;
if ((tup = systable_getnext_ordered(scanDesc, ForwardScanDirection)) != NULL) {
@ -3105,11 +3119,11 @@ bool CUDescScan::CheckItemIsAlive(ItemPointer tid)
/* Because new memory may be created, so we have to check and free in time. */
if ((Pointer)bitmap != DatumGetPointer(v)) {
pfree_ext(bitmap);
needFreeMask = true;
}
/* Record the mask. */
delMask = cuDelMask;
delMask = bitmap;
}
valid = true;
} else {
@ -3122,6 +3136,7 @@ bool CUDescScan::CheckItemIsAlive(ItemPointer tid)
/* Save the sacn result in cache. */
lappend_oid(m_cuids, (Oid)CUId);
lappend(m_deletemasks, delMask);
lappend_int(m_needFreeMasks, (int)needFreeMask);
lappend_int(m_valids, (int)valid);
return isAlive;

View File

@ -61,6 +61,8 @@ CStoreUpdate::CStoreUpdate(_in_ Relation rel, _in_ EState* estate, _in_ Plan* pl
m_partionInsert->SetPartitionCacheStrategy(FLASH_WHEN_SWICH_PARTITION);
m_insert = NULL;
}
m_hasUniqueIdx = CheckHasUniqueIdx();
}
CStoreUpdate::~CStoreUpdate()
@ -212,7 +214,9 @@ uint64 CStoreUpdate::ExecUpdate(_in_ VectorBatch* batch, _in_ int options)
JunkFilter* junkfilter = m_resultRelInfo->ri_junkFilter;
// delete
m_delete->PutDeleteBatch(batch, junkfilter);
if (!m_hasUniqueIdx) {
m_delete->PutDeleteBatch(batch, junkfilter);
}
int oriCols = batch->m_cols;
batch->m_cols = junkfilter->jf_cleanTupType->natts;
@ -222,16 +226,134 @@ uint64 CStoreUpdate::ExecUpdate(_in_ VectorBatch* batch, _in_ int options)
ExecVecConstraints(m_resultRelInfo, batch, m_estate);
// insert then batch
if (m_isPartition)
m_partionInsert->BatchInsert(batch, options);
else
m_insert->BatchInsert(batch, options);
if (!m_hasUniqueIdx) {
/*
* When there has no unique index, the delete threshold(e.g. 4200000)
* is larger than default insert threshold(e.g. 60000), the relation may
* firstly insert and then delete.
*/
if (m_isPartition) {
m_partionInsert->BatchInsert(batch, options);
} else {
m_insert->BatchInsert(batch, options);
}
} else {
/*
* When there has unique index, we must delete firstly and then insert,
* or the index key of new data and old data may violate.
*/
if (m_isPartition) {
PartitionBatchDeleteAndInsert(batch, oriCols, options, junkfilter);
} else {
BatchDeleteAndInsert(batch, oriCols, options, junkfilter);
}
}
batch->m_cols = oriCols;
return (uint64)(uint32)batch->m_rows;
}
/*
* @Description: Do batch delete and insert. In this function, the number of rows those are deleted is
* strictly equals number of rows will be inserted. It is usually used when cstore has unique index.
*/
void CStoreUpdate::BatchDeleteAndInsert(VectorBatch *batch, int oriBatchCols, int options, JunkFilter *junkfilter)
{
int currentCols = batch->m_cols;
/* keep memory space from leaking during bulk-insert */
MemoryContext insertCnxt = m_insert->GetTmpMemCnxt();
MemoryContext updateCnxt = MemoryContextSwitchTo(insertCnxt);
CStorePSort* sorter = m_insert->GetSorter();
if (sorter != NULL) {
/* Step 1: relation has partial cluster key */
Assert(batch->m_cols == m_insert->m_relation->rd_att->natts);
{
AutoContextSwitch updateContext(updateCnxt);
batch->m_cols = oriBatchCols;
m_delete->PutDeleteBatch(batch, junkfilter);
batch->m_cols = currentCols;
}
sorter->PutVecBatch(m_insert->m_relation, batch);
if (sorter->IsFull()) {
{
AutoContextSwitch updateContext(updateCnxt);
m_delete->PartialDelete();
}
m_insert->SortAndInsert(options);
}
} else {
/* Step 2: relation doesn't have partial cluster key */
bulkload_rows* bufferedBatchRows = m_insert->GetBufferedBatchRows();
Assert(batch->m_rows <= BatchMaxSize);
Assert(batch->m_cols && m_insert->m_relation->rd_att->natts);
Assert(bufferedBatchRows->m_rows_maxnum > 0);
Assert(bufferedBatchRows->m_rows_maxnum % BatchMaxSize == 0);
int startIdx = 0;
int lastStartIdx = startIdx;
for (;;) {
/* we need cache data until batchrows is full */
bool needInsert = bufferedBatchRows->append_one_vector(
RelationGetDescr(m_relation), batch, &startIdx, m_insert->m_cstorInsertMem);
if (startIdx > lastStartIdx) {
AutoContextSwitch updateContext(updateCnxt);
batch->m_cols = oriBatchCols;
m_delete->PutDeleteBatchForUpdate(batch, lastStartIdx, startIdx);
batch->m_cols = currentCols;
}
if (needInsert) {
{
AutoContextSwitch updateContext(updateCnxt);
m_insert->BatchInsertCommon(bufferedBatchRows, options);
}
bufferedBatchRows->reset(true);
lastStartIdx = startIdx;
} else {
break;
}
}
}
MemoryContextReset(insertCnxt);
(void)MemoryContextSwitchTo(updateCnxt);
}
void CStoreUpdate::PartitionBatchDeleteAndInsert(VectorBatch *batch, int oriBatchCols, int options, JunkFilter *junkfilter)
{
int currentCols = batch->m_cols;
batch->m_cols = oriBatchCols;
m_delete->PutDeleteBatch(batch, junkfilter);
m_delete->PartialDelete();
batch->m_cols = currentCols;
m_partionInsert->BatchInsert(batch, options);
}
bool CStoreUpdate::CheckHasUniqueIdx()
{
bool hasUniqueIdx = false;
if (m_resultRelInfo && m_resultRelInfo->ri_NumIndices > 0) {
IndexInfo** indexInfos = m_resultRelInfo->ri_IndexRelationInfo;
for (int i = 0; i < m_resultRelInfo->ri_NumIndices; ++i) {
if (indexInfos[i]->ii_Unique) {
hasUniqueIdx = true;
break;
}
}
}
return hasUniqueIdx;
}
void CStoreUpdate::EndUpdate(_in_ int options)
{
Assert(m_delete);

View File

@ -40,6 +40,8 @@ public:
void InitSortState();
void InitSortState(_in_ TupleDesc sortTupDesc, _in_ int partidAttNo, _in_ int ctidAttNo);
void PutDeleteBatch(_in_ VectorBatch *batch, _in_ JunkFilter *junkfilter);
void PutDeleteBatchForUpdate(_in_ VectorBatch *batch, _in_ int startIdx=0, _in_ int endIdx=-1);
void PartialDelete();
uint64 ExecDelete();
void setReportErrorForUpdate(bool isEnable)
{

View File

@ -111,6 +111,23 @@ public:
static void InitIndexInsertArg(Relation heap_rel, const int *keys_map, int nkeys, InsertArg &args);
void InitInsertMemArg(Plan *plan, MemInfoArg *ArgmemInfo);
inline CStorePSort* GetSorter()
{
return m_sorter;
}
inline MemoryContext GetTmpMemCnxt()
{
return m_tmpMemCnxt;
}
inline bulkload_rows* GetBufferedBatchRows()
{
return m_bufferedBatchRows;
}
void SortAndInsert(int options);
Relation m_relation;
CU ***m_aio_cu_PPtr;
AioDispatchCUDesc_t ***m_aio_dispath_cudesc;
@ -430,6 +447,7 @@ public:
bool CheckItemIsAlive(ItemPointer tid);
private:
void FreeCache();
inline bool IsDeadRow(uint32 row, unsigned char* cuDelMask);
bool CheckAliveInCache(uint32 CUId, uint32 rownum, bool* found);
@ -440,6 +458,7 @@ private:
Snapshot m_snapshot;
List* m_cuids;
List* m_deletemasks;
List* m_needFreeMasks;
List* m_valids;
};

View File

@ -43,6 +43,11 @@ public:
MemInfoArg *m_insMemInfo;
private:
void BatchDeleteAndInsert(VectorBatch *batch, int oriBatchCols, int options, JunkFilter *junkfilter);
void PartitionBatchDeleteAndInsert(VectorBatch *batch, int oriBatchCols, int options, JunkFilter *junkfilter);
bool CheckHasUniqueIdx();
Relation m_relation;
CStoreDelete *m_delete;
@ -52,6 +57,7 @@ private:
ResultRelInfo *m_resultRelInfo;
EState *m_estate;
bool m_isPartition;
bool m_hasUniqueIdx;
static int BATCHROW_TIMES;
InsertArg m_insert_args;