From d789bc656ebfeadd24ad5858f923763de685a424 Mon Sep 17 00:00:00 2001 From: Mijamind Date: Sun, 5 Nov 2023 15:55:52 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E8=B5=84=E6=BA=90=E6=B1=A0=E5=8C=96?= =?UTF-8?q?=E3=80=91spq=E6=94=AF=E6=8C=81=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E6=9C=BA=E5=B9=B6=E8=A1=8C=E5=88=9B=E5=BB=BA=E7=B4=A2=E5=BC=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bin/pg_dump/pg_dump.cpp | 4 + src/common/backend/catalog/heap.cpp | 65 +++++++++++++ src/common/backend/catalog/index.cpp | 91 +++++++++++++++++-- src/common/backend/utils/adt/selfuncs.cpp | 5 + src/common/backend/utils/cache/catcache.cpp | 35 ++++++- .../utils/cache/knl_globalsystupcache.cpp | 27 ++++++ src/common/backend/utils/cache/plancache.cpp | 2 +- .../runtime/executor/execExprInterp.cpp | 48 ++++++++-- .../runtime/executor/execUtils.cpp | 5 +- src/gausskernel/runtime/executor/spi.cpp | 7 ++ .../storage/access/common/heaptuple.cpp | 8 ++ .../storage/access/nbtree/spq_btbuild.cpp | 41 ++++++--- src/include/access/spq_btbuild.h | 1 + src/include/access/sysattr.h | 5 + src/include/catalog/heap.h | 4 + src/include/catalog/index.h | 3 + src/include/executor/spi.h | 3 + .../knl/knl_guc/knl_session_attr_spq.h | 1 + src/include/nodes/execnodes.h | 4 + src/include/nodes/parsenodes.h | 5 + 20 files changed, 328 insertions(+), 36 deletions(-) diff --git a/src/bin/pg_dump/pg_dump.cpp b/src/bin/pg_dump/pg_dump.cpp index e799a8a4f..5fb30a083 100644 --- a/src/bin/pg_dump/pg_dump.cpp +++ b/src/bin/pg_dump/pg_dump.cpp @@ -20717,6 +20717,10 @@ static const char* getAttrName(int attrnum, TableInfo* tblInfo) return "tablebucketid"; case UidAttributeNumber: return "gs_tuple_uid"; +#endif +#ifdef USE_SPQ + case RootSelfItemPointerAttributeNumber: + return "_root_ctid"; #endif default: break; diff --git a/src/common/backend/catalog/heap.cpp b/src/common/backend/catalog/heap.cpp index 688138536..f9e45d37e 100644 --- a/src/common/backend/catalog/heap.cpp +++ b/src/common/backend/catalog/heap.cpp @@ -411,7 +411,28 @@ static FormData_pg_attribute a10 = {0, true, 0}; +#ifdef USE_SPQ +static FormData_pg_attribute a11 = {0, + {"_root_ctid"}, + TIDOID, + 0, + sizeof(ItemPointerData), + RootSelfItemPointerAttributeNumber, + 0, + -1, + -1, + false, + 'p', + 's', + true, + false, + false, + true, + 0}; +static const Form_pg_attribute SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6, &a7, &a8, &a9, &a10, &a11}; +#else static const Form_pg_attribute SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6, &a7, &a8, &a9, &a10}; +#endif #else static const Form_pg_attribute SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6, &a7}; #endif @@ -1125,6 +1146,10 @@ static void AddNewAttributeTuples(Oid new_rel_oid, TupleDesc tupdesc, char relki /* skip OID where appropriate */ if (!tupdesc->tdhasoid && SysAtt[i]->attnum == ObjectIdAttributeNumber) continue; +#ifdef USE_SPQ + if (SysAtt[i]->attnum == RootSelfItemPointerAttributeNumber) + continue; +#endif if (!hasbucket && SysAtt[i]->attnum == BucketIdAttributeNumber) continue; if (!hasuids && SysAtt[i]->attnum == UidAttributeNumber) @@ -8322,3 +8347,43 @@ char* heap_serialize_row_attr(Oid rel_oid, bool* depend_undefined) FreeStringInfo(&concat_name); return ret; } + +#ifdef USE_SPQ +HeapTuple heaptuple_from_pg_attribute(Relation pg_attribute_rel, + Form_pg_attribute new_attribute) +{ + Datum values[Natts_pg_attribute] = { 0 }; + bool nulls[Natts_pg_attribute] = { false }; + + values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_attribute->attrelid); + values[Anum_pg_attribute_attname - 1] = NameGetDatum(&new_attribute->attname); + values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(new_attribute->atttypid); + values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(new_attribute->attstattarget); + values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(new_attribute->attlen); + values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(new_attribute->attnum); + values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(new_attribute->attndims); + values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(new_attribute->attcacheoff); + values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(new_attribute->atttypmod); + values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(new_attribute->attbyval); + values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(new_attribute->attstorage); + values[Anum_pg_attribute_attalign - 1] = CharGetDatum(new_attribute->attalign); + values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(new_attribute->attnotnull); + values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(new_attribute->atthasdef); + values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped); + values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal); + values[Anum_pg_attribute_attcmprmode - 1] = Int8GetDatum(new_attribute->attcmprmode); + values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount); + values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation); + values[Anum_pg_attribute_attkvtype - 1] = Int8GetDatum(new_attribute->attkvtype); + + /* start out with empty permissions and empty options */ + nulls[Anum_pg_attribute_attacl - 1] = true; + nulls[Anum_pg_attribute_attoptions - 1] = true; + nulls[Anum_pg_attribute_attfdwoptions - 1] = true; + + /* at default, new fileld attinitdefval of pg_attribute is null. */ + nulls[Anum_pg_attribute_attinitdefval - 1] = true; + + return heap_form_tuple(RelationGetDescr(pg_attribute_rel), values, nulls); +} +#endif diff --git a/src/common/backend/catalog/index.cpp b/src/common/backend/catalog/index.cpp index ecde047aa..a0c236857 100644 --- a/src/common/backend/catalog/index.cpp +++ b/src/common/backend/catalog/index.cpp @@ -7522,7 +7522,6 @@ void spq_validate_index_heapscan( Relation heapRelation, Relation indexRelation, IndexInfo* indexInfo, Snapshot snapshot, v_i_state* state) { bool in_index[MaxHeapTuplesPerPage]; - OffsetNumber root_offsets[MaxHeapTuplesPerPage]; /* state variables for the merge */ ItemPointer indexcursor = NULL; @@ -7534,7 +7533,7 @@ void spq_validate_index_heapscan( bool old_enable_spq = u_sess->attr.attr_spq.gauss_enable_spq; bool old_spq_enable_index_scan = u_sess->attr.attr_spq.spq_optimizer_enable_indexscan; bool old_spq_enable_indexonly_scan = u_sess->attr.attr_spq.spq_optimizer_enable_indexonlyscan; - + bool old_spq_tx = u_sess->attr.attr_spq.spq_enable_transaction; /* * sanity checks */ @@ -7555,17 +7554,18 @@ void spq_validate_index_heapscan( appendStringInfo(attrs, "%s, ", NameStr(att->attname)); } appendStringInfo(attrs, "%s", NameStr(lastattr->attname)); - appendStringInfo(sql, "select ctid %s from %s order by ctid", attrs->data, + appendStringInfo(sql, "select _root_ctid %s from %s order by _root_ctid", attrs->data, RelationGetRelationName(heapRelation)); } u_sess->attr.attr_spq.gauss_enable_spq = true; u_sess->attr.attr_spq.spq_optimizer_enable_indexscan = false; u_sess->attr.attr_spq.spq_optimizer_enable_indexonlyscan = false; + u_sess->attr.attr_spq.spq_enable_transaction = true; SPI_connect(); - if ((plan = SPI_prepare(sql->data, 0, NULL)) == NULL) + if ((plan = SPI_prepare_spq(sql->data, 0, NULL)) == NULL) ereport(ERROR, (errcode(ERRCODE_SPI_PREPARE_FAILURE), errmsg("SPI_prepare(\"%s\") failed: %s", sql->data, SPI_result_code_string(SPI_result)))); @@ -7578,6 +7578,7 @@ void spq_validate_index_heapscan( u_sess->attr.attr_spq.gauss_enable_spq = old_enable_spq; u_sess->attr.attr_spq.spq_optimizer_enable_indexscan = old_spq_enable_index_scan; u_sess->attr.attr_spq.spq_optimizer_enable_indexonlyscan = old_spq_enable_indexonly_scan; + u_sess->attr.attr_spq.spq_enable_transaction = old_spq_tx; SPI_cursor_fetch(portal, true, SPQ_BATCH_SIZE); while (SPI_processed > 0) { @@ -7598,11 +7599,6 @@ void spq_validate_index_heapscan( rootTuple = *heapcursor; root_blkno = ItemPointerGetBlockNumber(heapcursor); root_offnum = ItemPointerGetOffsetNumber(heapcursor); - if (HeapTupleIsHeapOnly(tup)) { - root_offnum = root_offsets[root_offnum - 1]; - Assert(OffsetNumberIsValid(root_offnum)); - ItemPointerSetOffsetNumber(&rootTuple, root_offnum); - } CHECK_FOR_INTERRUPTS(); /* @@ -7660,4 +7656,81 @@ void spq_validate_index_heapscan( indexInfo->ii_ExpressionsState = NIL; indexInfo->ii_PredicateState = NIL; } + +/* + * Input a ctid of tuple, and return its root ctid. + */ +Datum spq_get_root_ctid(HeapTuple tuple, Buffer buffer, ExprContext *econtext) +{ + BlockNumber blkno; + OffsetNumber offnum; + OffsetNumber root_offnum; + char *root_ctid = NULL; + Datum result; + OffsetNumber *cached_root_offsets = NULL; + bool is_memory_alloc = false; + + Assert(tuple); + blkno = ItemPointerGetBlockNumberNoCheck(&(tuple->t_self)); + offnum = ItemPointerGetOffsetNumberNoCheck(&(tuple->t_self)); + + /* + * If the cached_blkno is invalid or the current blkno is a new BlockNumber, + * we should get the new cached_root_offsets. + */ + if (buffer == InvalidBuffer || econtext == NULL || + econtext->cached_blkno == InvalidBlockNumber || blkno != econtext->cached_blkno) { + Page page; + Relation rel; + if (buffer == InvalidBuffer || econtext == NULL) { + cached_root_offsets = (OffsetNumber *) palloc0(sizeof(OffsetNumber) * MaxHeapTuplesPerPage); + is_memory_alloc = true; + rel = heap_open(tuple->t_tableOid, AccessShareLock); + buffer = ReadBuffer(rel, blkno); + page = BufferGetPage(buffer); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + heap_get_root_tuples(page, cached_root_offsets); + UnlockReleaseBuffer(buffer); + heap_close(rel, AccessShareLock); + } else { + Assert(buffer != InvalidBuffer && econtext != NULL); + cached_root_offsets = econtext->cached_root_offsets; + /* Buffer has already be locked by caller. */ + page = BufferGetPage(buffer); + heap_get_root_tuples(page, cached_root_offsets); + econtext->cached_blkno = blkno; + } + } + + if (HeapTupleIsHeapOnly(tuple)) { + /* + * For a heap-only tuple, pretend its TID is that of the root + */ + if (econtext) { + root_offnum = econtext->cached_root_offsets[offnum - 1]; + } else { + root_offnum = cached_root_offsets[offnum - 1]; + } + + if (!OffsetNumberIsValid(root_offnum)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("failed to find parent tuple for heap-only tuple at (%u,%u) in table \"%s\"", + ItemPointerGetBlockNumber(&tuple->t_self), + offnum, + get_rel_name(tuple->t_tableOid)))); + + } else { + root_offnum = offnum; + } + + root_ctid = psprintf("(%u, %u)", blkno, root_offnum); + result = DirectFunctionCall1(tidin, CStringGetDatum(root_ctid)); + + if (is_memory_alloc) + pfree(cached_root_offsets); + + pfree(root_ctid); + PG_RETURN_DATUM(result); +} #endif diff --git a/src/common/backend/utils/adt/selfuncs.cpp b/src/common/backend/utils/adt/selfuncs.cpp index a624f7b2a..79617c74f 100755 --- a/src/common/backend/utils/adt/selfuncs.cpp +++ b/src/common/backend/utils/adt/selfuncs.cpp @@ -5096,6 +5096,11 @@ double get_variable_numdistinct(VariableStatData* vardata, bool* isdefault, bool case UidAttributeNumber: stadistinct = 1.0; /* only 1 value */ break; +#endif +#ifdef USE_SPQ + case RootSelfItemPointerAttributeNumber: + stadistinct = 0.0; /* means "unknown" */ + break; #endif default: stadistinct = 0.0; /* means "unknown" */ diff --git a/src/common/backend/utils/cache/catcache.cpp b/src/common/backend/utils/cache/catcache.cpp index 492e4668f..744abd1eb 100644 --- a/src/common/backend/utils/cache/catcache.cpp +++ b/src/common/backend/utils/cache/catcache.cpp @@ -1697,7 +1697,18 @@ static HeapTuple SearchCatCacheMiss( if (ct == NULL) { if (IsBootstrapProcessingMode()) return NULL; +#ifdef USE_SPQ + if ((cache->id == ATTNUM && DatumGetInt16(arguments[1]) == RootSelfItemPointerAttributeNumber) || + (cache->id == ATTNAME && (strcmp(DatumGetCString(arguments[1]), "_root_ctid") == 0))) { + ct = CatalogCacheCreateEntry(cache, NULL, arguments, hashValue, hashIndex, false); + /* immediately set the refcount to 1 */ + ResourceOwnerEnlargeCatCacheRefs(t_thrd.utils_cxt.CurrentResourceOwner); + ct->refcount++; + ResourceOwnerRememberCatCacheRef(t_thrd.utils_cxt.CurrentResourceOwner, &ct->tuple); + } else +#endif + { ct = CatalogCacheCreateEntry(cache, NULL, arguments, hashValue, hashIndex, true); CACHE4_elog(DEBUG2, @@ -1713,6 +1724,7 @@ static HeapTuple SearchCatCacheMiss( */ return NULL; + } } CACHE4_elog(DEBUG2, @@ -2886,10 +2898,26 @@ static CatCTup* CatalogCacheCreateEntry( MemoryContext oldcxt; /* negative entries have no tuple associated */ +#ifdef USE_SPQ + if (ntp || !negative) { +#else if (ntp) { +#endif int i; errno_t rc; - +#ifdef USE_SPQ + if (!ntp) { + Form_pg_attribute tmp_attribute; + Relation relation; + int attno = DatumGetInt16(arguments[1]); + Assert(attno > FirstLowInvalidHeapAttributeNumber && attno < 0); + tmp_attribute = SystemAttributeDefinition(attno, false, false, false); + relation = heap_open(cache->cc_reloid, AccessShareLock);; + dtp = heaptuple_from_pg_attribute(relation, tmp_attribute); + heap_close(relation, AccessShareLock); + } else +#endif + { Assert(!negative); /* @@ -2903,6 +2931,7 @@ static CatCTup* CatalogCacheCreateEntry( dtp = toast_flatten_tuple(ntp, cache->cc_tupdesc); else dtp = ntp; + } /* Allocate memory for CatCTup and the cached tuple in one go */ oldcxt = MemoryContextSwitchTo(u_sess->cache_mem_cxt); @@ -2924,7 +2953,11 @@ static CatCTup* CatalogCacheCreateEntry( securec_check(rc, "", ""); MemoryContextSwitchTo(oldcxt); +#ifdef USE_SPQ + if ((ntp && dtp != ntp) || !ntp) +#else if (dtp != ntp) +#endif heap_freetuple_ext(dtp); /* extract keys - they'll point into the tuple if not by-value */ diff --git a/src/common/backend/utils/cache/knl_globalsystupcache.cpp b/src/common/backend/utils/cache/knl_globalsystupcache.cpp index a89d163d6..7bec4deef 100644 --- a/src/common/backend/utils/cache/knl_globalsystupcache.cpp +++ b/src/common/backend/utils/cache/knl_globalsystupcache.cpp @@ -828,6 +828,33 @@ GlobalCatCTup *GlobalSysTupCache::SearchTupleMiss(InsertCatTupInfo *tup_info) systable_endscan(scandesc); heap_close(relation, AccessShareLock); +#ifdef USE_SPQ + if ((cc_id == ATTNUM && DatumGetInt16(arguments[1]) == RootSelfItemPointerAttributeNumber) || + (cc_id == ATTNAME && (strcmp(DatumGetCString(arguments[1]), "_root_ctid") == 0))) { + + Form_pg_attribute tmp_attribute; + int attno = DatumGetInt16(arguments[1]); + Assert(attno > FirstLowInvalidHeapAttributeNumber && attno < 0); + tmp_attribute = SystemAttributeDefinition(attno, false, false, false); + relation = heap_open(m_relinfo.cc_reloid, AccessShareLock); + ntp = heaptuple_from_pg_attribute(relation, tmp_attribute); + heap_close(relation, AccessShareLock); + tup_info->ntp = ntp; + if (!tup_info->has_concurrent_lock) { + tup_info->canInsertGSC = false; + } else { + tup_info->canInsertGSC = CanTupleInsertGSC(ntp); + if (!tup_info->canInsertGSC) { + ReleaseGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock); + } + } + ct = InsertHeapTupleIntoCatCacheInSingle(tup_info); + if (tup_info->has_concurrent_lock) { + ReleaseGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock); + } + } +#endif + /* * global catcache match disk , not need negative tuple */ diff --git a/src/common/backend/utils/cache/plancache.cpp b/src/common/backend/utils/cache/plancache.cpp index 9c9d98993..1d4c9704a 100644 --- a/src/common/backend/utils/cache/plancache.cpp +++ b/src/common/backend/utils/cache/plancache.cpp @@ -2058,7 +2058,7 @@ CachedPlan* GetCachedPlan(CachedPlanSource* plansource, ParamListInfo boundParam ListCell* qlc = NULL; foreach (qlc, plansource->query_list) { Query* query = castNode(Query, lfirst(qlc)); - query->is_support_spq = false; + query->is_support_spq = plansource->cursor_options & CURSOR_OPT_SPQ_OK; } #endif diff --git a/src/gausskernel/runtime/executor/execExprInterp.cpp b/src/gausskernel/runtime/executor/execExprInterp.cpp index 94d901509..b7ea3dae1 100644 --- a/src/gausskernel/runtime/executor/execExprInterp.cpp +++ b/src/gausskernel/runtime/executor/execExprInterp.cpp @@ -79,7 +79,9 @@ #include "access/tupconvert.h" #include "executor/node/nodeCtescan.h" #include "rewrite/rewriteHandler.h" - +#ifdef USE_SPQ +#include "access/sysattr.h" +#endif /* * Use computed-goto-based opcode dispatch when computed gotos are available. * But use a separate symbol so that it's easy to adjust locally in this file @@ -729,9 +731,19 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull, ExprDoneCo Assert(innerslot->tts_tuple != NULL); Assert(innerslot->tts_tuple != &(innerslot->tts_minhdr)); +#ifdef USE_SPQ + if (attnum == RootSelfItemPointerAttributeNumber) { + Assert(innerslot->tts_tuple); + *op->resnull = false; + d = spq_get_root_ctid((HeapTuple)innerslot->tts_tuple, innerslot->tts_buffer, econtext); + *op->resvalue = d; + } else +#endif + { /* heap_getsysattr has sufficient defenses against bad attnums */ d = tableam_tslot_getattr(innerslot, attnum, op->resnull); *op->resvalue = d; + } EEO_NEXT(); } @@ -744,10 +756,19 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull, ExprDoneCo /* these asserts must match defenses in slot_getattr */ Assert(outerslot->tts_tuple != NULL); Assert(outerslot->tts_tuple != &(outerslot->tts_minhdr)); - - /* heap_getsysattr has sufficient defenses against bad attnums */ - d = tableam_tslot_getattr(outerslot, attnum, op->resnull); - *op->resvalue = d; +#ifdef USE_SPQ + if (attnum == RootSelfItemPointerAttributeNumber) { + Assert(outerslot->tts_tuple); + *op->resnull = false; + d = spq_get_root_ctid((HeapTuple)outerslot->tts_tuple, outerslot->tts_buffer, econtext); + *op->resvalue = d; + } else +#endif + { + /* heap_getsysattr has sufficient defenses against bad attnums */ + d = tableam_tslot_getattr(outerslot, attnum, op->resnull); + *op->resvalue = d; + } EEO_NEXT(); } @@ -760,10 +781,19 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull, ExprDoneCo /* these asserts must match defenses in slot_getattr */ Assert(scanslot->tts_tuple != NULL); Assert(scanslot->tts_tuple != &(scanslot->tts_minhdr)); - - /* heap_getsysattr has sufficient defenses against bad attnums */ - d = tableam_tslot_getattr(scanslot, attnum, op->resnull); - *op->resvalue = d; +#ifdef USE_SPQ + if (attnum == RootSelfItemPointerAttributeNumber) { + Assert(scanslot->tts_tuple); + *op->resnull = false; + d = spq_get_root_ctid((HeapTuple)scanslot->tts_tuple, scanslot->tts_buffer, econtext); + *op->resvalue = d; + } else +#endif + { + /* heap_getsysattr has sufficient defenses against bad attnums */ + d = tableam_tslot_getattr(scanslot, attnum, op->resnull); + *op->resvalue = d; + } EEO_NEXT(); diff --git a/src/gausskernel/runtime/executor/execUtils.cpp b/src/gausskernel/runtime/executor/execUtils.cpp index 7b59a2e76..b20c9f27a 100644 --- a/src/gausskernel/runtime/executor/execUtils.cpp +++ b/src/gausskernel/runtime/executor/execUtils.cpp @@ -307,7 +307,10 @@ ExprContext* CreateExprContext(EState* estate) econtext->ecxt_callbacks = NULL; econtext->plpgsql_estate = NULL; econtext->hasSetResultStore = false; - +#ifdef USE_SPQ + memset(econtext->cached_root_offsets, 0, sizeof(econtext->cached_root_offsets)); + econtext->cached_blkno = InvalidBlockNumber; +#endif /* * Link the ExprContext into the EState to ensure it is shut down when the * EState is freed. Because we use lcons(), shutdowns will occur in diff --git a/src/gausskernel/runtime/executor/spi.cpp b/src/gausskernel/runtime/executor/spi.cpp index 76c7f4111..c7f07fced 100644 --- a/src/gausskernel/runtime/executor/spi.cpp +++ b/src/gausskernel/runtime/executor/spi.cpp @@ -969,6 +969,13 @@ SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes, parse_query_fu return SPI_prepare_cursor(src, nargs, argtypes, 0, parser); } +#ifdef USE_SPQ +SPIPlanPtr SPI_prepare_spq(const char *src, int nargs, Oid *argtypes, parse_query_func parser) +{ + return SPI_prepare_cursor(src, nargs, argtypes, CURSOR_OPT_SPQ_OK | CURSOR_OPT_SPQ_FORCE, parser); +} +#endif + SPIPlanPtr SPI_prepare_cursor(const char *src, int nargs, Oid *argtypes, int cursorOptions, parse_query_func parser) { _SPI_plan plan; diff --git a/src/gausskernel/storage/access/common/heaptuple.cpp b/src/gausskernel/storage/access/common/heaptuple.cpp index 276a895b4..0509a42ac 100644 --- a/src/gausskernel/storage/access/common/heaptuple.cpp +++ b/src/gausskernel/storage/access/common/heaptuple.cpp @@ -281,6 +281,9 @@ bool heap_attisnull_impl(HeapTuple tup, int attnum, TupleDesc tupDesc) case XC_NodeIdAttributeNumber: case BucketIdAttributeNumber: case UidAttributeNumber: +#endif +#ifdef USE_SPQ + case RootSelfItemPointerAttributeNumber: #endif /* these are never null */ break; @@ -602,6 +605,11 @@ static FORCE_INLINE Datum heap_getsysattr_impl(HeapTuple tup, int attnum, TupleD case UidAttributeNumber: result = UInt64GetDatum(HeapTupleGetUid(tup)); break; +#endif +#ifdef USE_SPQ + case RootSelfItemPointerAttributeNumber: + result = spq_get_root_ctid(tup, InvalidBuffer, NULL); + break; #endif default: ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("invalid attnum: %d", attnum))); diff --git a/src/gausskernel/storage/access/nbtree/spq_btbuild.cpp b/src/gausskernel/storage/access/nbtree/spq_btbuild.cpp index 968e03776..cbca6f4ba 100644 --- a/src/gausskernel/storage/access/nbtree/spq_btbuild.cpp +++ b/src/gausskernel/storage/access/nbtree/spq_btbuild.cpp @@ -29,7 +29,7 @@ void spq_build_main(const BgWorkerContext *bwc); SPQSharedContext *spq_init_shared(Relation heap, Relation index, bool isconcurrent); -SPQWorkerState *spq_worker_init(Relation heap, Relation index); +SPQWorkerState *spq_worker_init(Relation heap, Relation index, SPQSharedContext *shared); void spq_worker_produce(SPQWorkerState *workstate); @@ -270,7 +270,6 @@ void spq_leader_finish(SPQLeaderState *spqleader) BgworkerListSyncQuit(); /* Free last reference to MVCC snapshot, if one was used */ if (IsMVCCSnapshot(spqleader->snapshot)) { - PopActiveSnapshot(); UnregisterSnapshot(spqleader->snapshot); } } @@ -282,6 +281,13 @@ void spq_scansort(BTBuildState *buildstate, Relation heap, Relation index, bool SPQSharedContext *shared; + Snapshot snapshot; + + if (!isconcurrent) + snapshot = SnapshotAny; + else + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + shared = spq_init_shared(heap, index, isconcurrent); /* Launch workers */ @@ -294,21 +300,15 @@ void spq_scansort(BTBuildState *buildstate, Relation heap, Relation index, bool spqleader->shared = shared; spqleader->buffer = NULL; spqleader->processed = 0; - spqleader->snapshot = shared->snapshot; + spqleader->snapshot = snapshot; } /* SPQSharedContext initialization */ SPQSharedContext *spq_init_shared(Relation heap, Relation index, bool isconcurrent) { Size sharedsize; - Snapshot snapshot; SPQSharedContext *shared; - if (!isconcurrent) - snapshot = SnapshotAny; - else - snapshot = RegisterSnapshot(GetTransactionSnapshot()); - /* Calculate shared size */ sharedsize = sizeof(SPQSharedContext); @@ -330,7 +330,8 @@ SPQSharedContext *spq_init_shared(Relation heap, Relation index, bool isconcurre shared->done = false; shared->consumer = -1; shared->producer = -1; - shared->snapshot = snapshot; + shared->snapshot = GetActiveSnapshot(); + shared->num_nodes = t_thrd.spq_ctx.num_nodes; return shared; } @@ -357,8 +358,7 @@ void spq_build_main(const BgWorkerContext *bwc) /* add snapshot for spi */ PushActiveSnapshot(shared->snapshot); - u_sess->opt_cxt.query_dop = shared->dop; - worker = spq_worker_init(heap, index); + worker = spq_worker_init(heap, index, shared); worker->shared = shared; spq_worker_produce(worker); spq_worker_finish(worker); @@ -374,17 +374,21 @@ void spq_build_main(const BgWorkerContext *bwc) return; } -SPQWorkerState *spq_worker_init(Relation heap, Relation index) +SPQWorkerState *spq_worker_init(Relation heap, Relation index, SPQSharedContext* shared) { SPQWorkerState *worker = (SPQWorkerState *)palloc0(sizeof(SPQWorkerState)); worker->all_fetched = true; worker->heap = heap; worker->index = index; worker->sql = makeStringInfo(); + worker->shared = shared; bool old_enable_spq = u_sess->attr.attr_spq.gauss_enable_spq; bool old_spq_enable_index_scan = u_sess->attr.attr_spq.spq_optimizer_enable_indexscan; bool old_spq_enable_indexonly_scan = u_sess->attr.attr_spq.spq_optimizer_enable_indexonlyscan; + bool old_spq_tx = u_sess->attr.attr_spq.spq_enable_transaction; + bool old_spq_dop = u_sess->opt_cxt.query_dop; + int old_num_nodes = t_thrd.spq_ctx.num_nodes; /* generate sql */ { @@ -405,7 +409,7 @@ SPQWorkerState *spq_worker_init(Relation heap, Relation index) if (i != natts - 1) appendStringInfo(sortattrs, ", "); } - appendStringInfo(worker->sql, "select ctid %s from %s order by %s, ctid", attrs->data, RelationGetRelationName(heap), + appendStringInfo(worker->sql, "select _root_ctid %s from %s order by %s, _root_ctid", attrs->data, RelationGetRelationName(heap), sortattrs->data); elog(INFO, "sql: %s", worker->sql->data); @@ -414,10 +418,13 @@ SPQWorkerState *spq_worker_init(Relation heap, Relation index) u_sess->attr.attr_spq.gauss_enable_spq = true; u_sess->attr.attr_spq.spq_optimizer_enable_indexscan = false; u_sess->attr.attr_spq.spq_optimizer_enable_indexonlyscan = false; + u_sess->attr.attr_spq.spq_enable_transaction = true; + u_sess->opt_cxt.query_dop = shared->dop; + t_thrd.spq_ctx.num_nodes = shared->num_nodes; SPI_connect(); - if ((worker->plan = SPI_prepare(worker->sql->data, 0, NULL)) == NULL) + if ((worker->plan = SPI_prepare_spq(worker->sql->data, 0, NULL)) == NULL) ereport(ERROR, (errcode(ERRCODE_SPI_PREPARE_FAILURE), errmsg("SPI_prepare(\"%s\") failed: %s", worker->sql->data, SPI_result_code_string(SPI_result)))); @@ -430,6 +437,9 @@ SPQWorkerState *spq_worker_init(Relation heap, Relation index) u_sess->attr.attr_spq.gauss_enable_spq = old_enable_spq; u_sess->attr.attr_spq.spq_optimizer_enable_indexscan = old_spq_enable_index_scan; u_sess->attr.attr_spq.spq_optimizer_enable_indexonlyscan = old_spq_enable_indexonly_scan; + u_sess->attr.attr_spq.spq_enable_transaction = old_spq_tx; + u_sess->opt_cxt.query_dop = old_spq_dop; + t_thrd.spq_ctx.num_nodes = old_num_nodes; return worker; } @@ -545,6 +555,7 @@ void spq_worker_finish(SPQWorkerState *worker) SPI_cursor_close(worker->portal); SPI_freeplan(worker->plan); SPI_finish(); + PopActiveSnapshot(); } void spq_leafbuild(SPQLeaderState *spqleader) diff --git a/src/include/access/spq_btbuild.h b/src/include/access/spq_btbuild.h index 99ddf3fc0..e5bd3f2a9 100644 --- a/src/include/access/spq_btbuild.h +++ b/src/include/access/spq_btbuild.h @@ -61,6 +61,7 @@ typedef struct SPQSharedContext { char addr[0]; /* varlen */ Snapshot snapshot; int dop; + int num_nodes; } SPQSharedContext; typedef struct IndexTupleBuffer { diff --git a/src/include/access/sysattr.h b/src/include/access/sysattr.h index 7d8d13932..0b4dbf4a2 100644 --- a/src/include/access/sysattr.h +++ b/src/include/access/sysattr.h @@ -29,7 +29,12 @@ #define XC_NodeIdAttributeNumber (-8) #define BucketIdAttributeNumber (-9) #define UidAttributeNumber (-10) +#ifdef USE_SPQ +#define RootSelfItemPointerAttributeNumber (-11) +#define FirstLowInvalidHeapAttributeNumber (-12) +#else #define FirstLowInvalidHeapAttributeNumber (-11) +#endif #else #define FirstLowInvalidHeapAttributeNumber (-8) diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h index 55f4c9c65..8d82a6e9f 100644 --- a/src/include/catalog/heap.h +++ b/src/include/catalog/heap.h @@ -27,6 +27,10 @@ #define PSORT_RESERVE_COLUMN "tid" #define CHCHK_PSORT_RESERVE_COLUMN(attname) (strcmp(PSORT_RESERVE_COLUMN, (attname)) == 0) +#ifdef USE_SPQ +extern HeapTuple heaptuple_from_pg_attribute(Relation pg_attribute_rel, Form_pg_attribute new_attribute); +#endif + typedef struct RawColumnDefault { AttrNumber attnum; /* attribute to attach default to */ Node *raw_default; /* default value (untransformed parse tree) */ diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index fd946bed1..5e028fc9c 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -269,4 +269,7 @@ void ScanBucketsInsertIndex(Relation rel, const List* idxRelList, const List* id extern void ScanPartitionInsertIndex(Relation partTableRel, Relation partRel, const List* indexRelList, const List* indexInfoList); void ScanHeapInsertCBI(Relation parentRel, Relation heapRel, Relation idxRel, Oid tmpPartOid); +#ifdef USE_SPQ +extern Datum spq_get_root_ctid(HeapTuple tuple, Buffer buffer, ExprContext *econtext); +#endif #endif /* INDEX_H */ diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h index d1fa9e8c4..fba0762c3 100644 --- a/src/include/executor/spi.h +++ b/src/include/executor/spi.h @@ -107,6 +107,9 @@ extern int SPI_execute_snapshot(SPIPlanPtr plan, Datum* Values, const char* Null extern int SPI_execute_with_args(const char* src, int nargs, Oid* argtypes, Datum* Values, const char* Nulls, bool read_only, long tcount, Cursor_Data* cursor_data, parse_query_func parser = GetRawParser()); extern SPIPlanPtr SPI_prepare(const char* src, int nargs, Oid* argtypes, parse_query_func parser = GetRawParser()); +#ifdef USE_SPQ +extern SPIPlanPtr SPI_prepare_spq(const char* src, int nargs, Oid* argtypes, parse_query_func parser = GetRawParser()); +#endif extern SPIPlanPtr SPI_prepare_cursor(const char* src, int nargs, Oid* argtypes, int cursorOptions, parse_query_func parser = GetRawParser()); extern SPIPlanPtr SPI_prepare_params(const char* src, ParserSetupHook parserSetup, void* parserSetupArg, int cursorOptions, parse_query_func parser = GetRawParser()); extern int SPI_keepplan(SPIPlanPtr plan); diff --git a/src/include/knl/knl_guc/knl_session_attr_spq.h b/src/include/knl/knl_guc/knl_session_attr_spq.h index 3228f14f0..a7ca4e8a5 100644 --- a/src/include/knl/knl_guc/knl_session_attr_spq.h +++ b/src/include/knl/knl_guc/knl_session_attr_spq.h @@ -194,6 +194,7 @@ typedef struct knl_session_attr_spq { /* enable spq btbuild */ bool spq_enable_btbuild; bool spq_enable_btbuild_cic; + bool spq_enable_transaction; int spq_batch_size; int spq_mem_size; int spq_queue_size; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 19a2a1784..a0b10baa1 100755 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -221,6 +221,10 @@ typedef struct ExprContext { bool can_ignore; // indicates whether ERROR can be ignored when type casting RightRefState* rightRefState; +#ifdef USE_SPQ + OffsetNumber cached_root_offsets[MaxHeapTuplesPerPage]; + BlockNumber cached_blkno; +#endif } ExprContext; /* diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index e2a8e1faa..752f1fd7d 100755 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -34,6 +34,11 @@ #include "tcop/dest.h" #include "nodes/parsenodes_common.h" +#ifdef USE_SPQ +#define CURSOR_OPT_SPQ_OK 0x0200 /* SPQ Execution */ +#define CURSOR_OPT_SPQ_FORCE 0x0400 /* Force to generate a SPQ plan */ +#endif + /* * Relids * Set of relation identifiers (indexes into the rangetable).