diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 5af6d1bbd..ba4e298ce 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -64,6 +64,7 @@ if(EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/chparser) endif() if(EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/age) add_subdirectory(age) +endif() if(EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/jieba_tokenizer) add_subdirectory(jieba_tokenizer) endif() diff --git a/contrib/jieba_tokenizer/tokenizer.cpp b/contrib/jieba_tokenizer/tokenizer.cpp index dfc8cc081..746a16f28 100644 --- a/contrib/jieba_tokenizer/tokenizer.cpp +++ b/contrib/jieba_tokenizer/tokenizer.cpp @@ -30,7 +30,7 @@ #include "tokenizer.h" const size_t MAX_LENGTH_CRC = 100; -const size_t MAX_KEYWORD_NUM = 100; +const size_t MAX_KEYWORD_NUM = 100000; const size_t MAX_PATH_LEN = 1024; const char* const DICT_PATH = "lib/jieba_dict/jieba.dict.utf8"; @@ -66,22 +66,31 @@ inline static uint32_t HashString2Uint32(const std::string& srcStr) return crc; } -inline static void ConvertEmbeddingMap(std::unordered_map tempMap, EmbeddingMap *embeddingMap) +inline static void ConvertEmbeddingMap(std::unordered_map> tokensMap, + EmbeddingMap *embeddingMap) { - embeddingMap->size = tempMap.size(); + embeddingMap->size = tokensMap.size(); if (embeddingMap->size == 0) { return; } - embeddingMap->pairs = (EmbeddingPair *)malloc(embeddingMap->size * sizeof(EmbeddingPair)); - if (embeddingMap->pairs == nullptr) { + embeddingMap->tokens = (EmbeddingTokenInfo *)malloc(embeddingMap->size * sizeof(EmbeddingTokenInfo)); + if (embeddingMap->tokens == nullptr) { + embeddingMap->size = 0; return; } - size_t index = 0; - for (const auto& pair : tempMap) { - embeddingMap->pairs[index].key = pair.first; - embeddingMap->pairs[index].value = pair.second; - index++; + size_t idx = 0; + for (const auto& token : tokensMap) { + embeddingMap->tokens[idx].key = token.second.first; + embeddingMap->tokens[idx].value = token.second.second; + errno_t rc = strncpy_s(embeddingMap->tokens[idx].token, MAX_TOKEN_LEN, token.first.c_str(), MAX_TOKEN_LEN - 1); + if (rc != EOK) { + free(embeddingMap->tokens); + embeddingMap->tokens = nullptr; + embeddingMap->size = 0; + return; + } + idx++; } } @@ -148,31 +157,36 @@ bool ConvertString2Embedding(const char* srcStr, EmbeddingMap *embeddingMap, boo if (jiebaTokenizer == nullptr || srcStr == nullptr || embeddingMap == nullptr) { return false; } - std::string srcStrInput(srcStr); - std::unordered_map tempMap; + std::string sentence(srcStr); + std::unordered_map> tokensMap; if (isKeywordExtractor) { std::vector keywords; - jiebaTokenizer->extractor.Extract(srcStrInput, keywords, MAX_KEYWORD_NUM); + jiebaTokenizer->extractor.Extract(sentence, keywords, MAX_KEYWORD_NUM); for (const auto& keyword : keywords) { uint32_t hashValue = HashString2Uint32(Convert2LowerCase(keyword.word)); - tempMap[hashValue] += keyword.weight; + tokensMap[keyword.word] = std::make_pair(hashValue, keyword.weight); } - if (!tempMap.empty()) { - ConvertEmbeddingMap(tempMap, embeddingMap); + if (!tokensMap.empty()) { + ConvertEmbeddingMap(tokensMap, embeddingMap); return true; } } + // if the keywords extracted by 'Extract' are empty, then use 'Cut' for tokenization. std::vector tokens; - jiebaTokenizer->Cut(srcStrInput, tokens, true); + jiebaTokenizer->Cut(sentence, tokens, true); for (const auto& token : tokens) { if (IsWhitespace(token)) { continue; } uint32_t hashValue = HashString2Uint32(Convert2LowerCase(token)); - tempMap[hashValue] += 1.0f; + if (tokensMap.find(token) == tokensMap.end()) { + tokensMap[token] = std::make_pair(hashValue, 1.0f); + } else { + tokensMap[token].second += 1.0f; + } } - ConvertEmbeddingMap(tempMap, embeddingMap); + ConvertEmbeddingMap(tokensMap, embeddingMap); return true; } diff --git a/contrib/jieba_tokenizer/tokenizer.h b/contrib/jieba_tokenizer/tokenizer.h index dd5203c5e..6da957dd7 100644 --- a/contrib/jieba_tokenizer/tokenizer.h +++ b/contrib/jieba_tokenizer/tokenizer.h @@ -27,17 +27,20 @@ #include +#define MAX_TOKEN_LEN 100 + #ifdef __cplusplus extern "C" { #endif -typedef struct EmbeddingPair { +typedef struct EmbeddingTokenInfo { uint32_t key; float value; -} EmbeddingPair; + char token[MAX_TOKEN_LEN]; +} EmbeddingTokenInfo; typedef struct { - EmbeddingPair *pairs; + EmbeddingTokenInfo *tokens; size_t size; } EmbeddingMap; diff --git a/src/gausskernel/storage/access/datavec/bm25build.cpp b/src/gausskernel/storage/access/datavec/bm25build.cpp index 995f8b25e..fa280d077 100644 --- a/src/gausskernel/storage/access/datavec/bm25build.cpp +++ b/src/gausskernel/storage/access/datavec/bm25build.cpp @@ -26,22 +26,27 @@ #include "utils/rel_gs.h" #include "storage/buf/block.h" #include "utils/memutils.h" +#include "postmaster/bgworker.h" #include "access/genam.h" #include "access/heapam.h" #include "access/tableam.h" #include "utils/builtins.h" +#include "access/datavec/vector.h" #include "access/datavec/bm25.h" #define CALLBACK_ITEM_POINTER HeapTuple hup +extern slock_t newBufferMutex; + /* * Initialize the build state */ -static void InitBM25BuildState(BM25BuildState *buildstate, Relation heap, Relation index, IndexInfo *indexInfo) +static void InitBM25BuildState(BM25BuildState *buildstate, Relation heap, Relation index, IndexInfo *indexInfo, ForkNumber forkNum) { buildstate->heap = heap; buildstate->index = index; buildstate->indexInfo = indexInfo; + buildstate->forkNum = forkNum; buildstate->reltuples = 0; buildstate->indtuples = 0; @@ -50,45 +55,330 @@ static void InitBM25BuildState(BM25BuildState *buildstate, Relation heap, Relati buildstate->procinfo = nullptr; buildstate->collation = index->rd_indcollation[0]; + buildstate->bm25leader = nullptr; + buildstate->tmpCtx = AllocSetContextCreate(CurrentMemoryContext, "bm25 build temporary context", ALLOCSET_DEFAULT_SIZES); } +static BlockNumber CreateBM25CommonPage(Relation index, ForkNumber forkNum) +{ + Buffer buf; + Page page; + BlockNumber blkno = InvalidBlockNumber; + + buf = BM25NewBuffer(index, forkNum); + page = BufferGetPage(buf); + BM25InitPage(buf, page); + blkno = BufferGetBlockNumber(buf); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); + return blkno; +} + +static void InsertItemToHashBucket(Relation index, BM25EntryPages &bm25EntryPages, uint32 bucketId, + BM25PageLocationInfo &bucketLocation, ForkNumber forkNum) +{ + BlockNumber firstBucketBlkno = bm25EntryPages.hashBucketsPage; + BlockNumber nextblkno = firstBucketBlkno; + BlockNumber curblkno = firstBucketBlkno; + + /*lock hashBuckets*/ + Buffer firstBuf; + Page firstPage; + firstBuf = ReadBuffer(index, firstBucketBlkno); + LockBuffer(firstBuf, BUFFER_LOCK_EXCLUSIVE); + firstPage = BufferGetPage(firstBuf); + if (FindHashBucket(bucketId, bucketLocation, firstBuf, firstPage)) { + UnlockReleaseBuffer(firstBuf); + return; + } + nextblkno = BM25PageGetOpaque(firstPage)->nextblkno; + + Buffer cbuf; + Page cpage; + while (BlockNumberIsValid(nextblkno)) { + OffsetNumber maxoffno; + cbuf = ReadBuffer(index, nextblkno); + LockBuffer(cbuf, BUFFER_LOCK_EXCLUSIVE); + cpage = BufferGetPage(cbuf); + + if (FindHashBucket(bucketId, bucketLocation, cbuf, cpage)) { + UnlockReleaseBuffer(cbuf); + UnlockReleaseBuffer(firstBuf); + return; + } + + curblkno = nextblkno; + nextblkno = BM25PageGetOpaque(cpage)->nextblkno; + UnlockReleaseBuffer(cbuf); + } + + /* hashBucket is not found, add new one */ + uint32 itemSize = MAXALIGN(sizeof(BM25HashBucketItem)); + if (curblkno == firstBucketBlkno) { + cbuf = firstBuf; + cpage = firstPage; + } else { + cbuf = ReadBuffer(index, curblkno); + LockBuffer(cbuf, BUFFER_LOCK_EXCLUSIVE); + cpage = BufferGetPage(cbuf); + } + + /* Ensure free space */ + if (PageGetFreeSpace(cpage) < itemSize) { + BM25AppendPage(index, &cbuf, &cpage, forkNum, (curblkno == firstBucketBlkno) ? false : true); + curblkno = BufferGetBlockNumber(cbuf); + } + + BM25HashBucketPage hashBucket = (BM25HashBucketPage)palloc0(itemSize); + hashBucket->bucketId = bucketId; + hashBucket->bucketBlkno = InvalidBlockNumber; + OffsetNumber offno = PageAddItem(cpage, (Item)hashBucket, itemSize, InvalidOffsetNumber, false, false); + if (offno == InvalidOffsetNumber) { + pfree(hashBucket); + if (curblkno != firstBucketBlkno) + UnlockReleaseBuffer(cbuf); + UnlockReleaseBuffer(firstBuf); + elog(ERROR, "failed to add index item [BM25HashBucket] to \"%s\"", RelationGetRelationName(index)); + } + bm25EntryPages.hashBucketCount++; + bucketLocation.blkno = BufferGetBlockNumber(cbuf); + bucketLocation.offno = offno; + MarkBufferDirty(cbuf); + if (curblkno != firstBucketBlkno) + UnlockReleaseBuffer(cbuf); + UnlockReleaseBuffer(firstBuf); + pfree(hashBucket); + return; +} + +static void InsertItemToTokenMetaList(Relation index, BM25PageLocationInfo &bucketLocation, + BM25TokenData &tokenData, BM25PageLocationInfo &tokenMetaLocation, ForkNumber forkNum) +{ + Page cpageBucket; + Buffer cbufBucket = ReadBuffer(index, bucketLocation.blkno); + LockBuffer(cbufBucket, BUFFER_LOCK_EXCLUSIVE); + cpageBucket = BufferGetPage(cbufBucket); + BM25HashBucketPage hashBucket = (BM25HashBucketPage)PageGetItem(cpageBucket, PageGetItemId(cpageBucket, bucketLocation.offno)); + if (hashBucket->bucketBlkno == InvalidBlockNumber) + hashBucket->bucketBlkno = CreateBM25CommonPage(index, forkNum); + BlockNumber firstTokenMetasBlkno = hashBucket->bucketBlkno; + MarkBufferDirty(cbufBucket); + UnlockReleaseBuffer(cbufBucket); + + /* lock tokenMeta list */ + BlockNumber nextblkno = firstTokenMetasBlkno; + BlockNumber curblkno = firstTokenMetasBlkno; + Buffer firstBuf; + Page firstPage; + firstBuf = ReadBuffer(index, firstTokenMetasBlkno); + LockBuffer(firstBuf, BUFFER_LOCK_EXCLUSIVE); + firstPage = BufferGetPage(firstBuf); + if (FindTokenMeta(tokenData, tokenMetaLocation, firstBuf, firstPage)) { + UnlockReleaseBuffer(firstBuf); + return; + } + nextblkno = BM25PageGetOpaque(firstPage)->nextblkno; + + Buffer cbuf; + Page cpage; + while (BlockNumberIsValid(nextblkno)) { + OffsetNumber maxoffno; + cbuf = ReadBuffer(index, nextblkno); + LockBuffer(cbuf, BUFFER_LOCK_EXCLUSIVE); + cpage = BufferGetPage(cbuf); + if (FindTokenMeta(tokenData, tokenMetaLocation, cbuf, cpage)) { + UnlockReleaseBuffer(cbuf); + UnlockReleaseBuffer(firstBuf); + return; + } + curblkno = nextblkno; + nextblkno = BM25PageGetOpaque(cpage)->nextblkno; + UnlockReleaseBuffer(cbuf); + } + + /* tokenMetaItem is not found, add new one */ + uint32 itemSize = MAXALIGN(sizeof(BM25TokenMetaItem)); + if (curblkno == firstTokenMetasBlkno) { + cbuf = firstBuf; + cpage = firstPage; + } else { + cbuf = ReadBuffer(index, curblkno); + LockBuffer(cbuf, BUFFER_LOCK_EXCLUSIVE); + cpage = BufferGetPage(cbuf); + } + /* Ensure free space */ + if (PageGetFreeSpace(cpage) < itemSize) { + BM25AppendPage(index, &cbuf, &cpage, forkNum, (curblkno == firstTokenMetasBlkno) ? false : true); + curblkno = BufferGetBlockNumber(cbuf); + } + + BM25TokenMetaPage tokenMeta = (BM25TokenMetaPage)palloc0(itemSize); + errno_t rc = strncpy_s(tokenMeta->token, BM25_MAX_TOKEN_LEN, tokenData.tokenValue, BM25_MAX_TOKEN_LEN - 1); + securec_check_c(rc, "\0", "\0"); + tokenMeta->tokenId = BM25AllocateTokenId(index); + tokenData.tokenId = tokenMeta->tokenId; + tokenMeta->maxScore = 0; + tokenMeta->postingBlkno = InvalidBlockNumber; + OffsetNumber offno = PageAddItem(cpage, (Item)tokenMeta, itemSize, InvalidOffsetNumber, false, false); + if (offno == InvalidOffsetNumber) { + pfree(tokenMeta); + if (curblkno != firstTokenMetasBlkno) + UnlockReleaseBuffer(cbuf); + UnlockReleaseBuffer(firstBuf); + elog(ERROR, "failed to add index item [BM25TokenMeta] to \"%s\"", RelationGetRelationName(index)); + } + tokenMetaLocation.blkno = BufferGetBlockNumber(cbuf); + tokenMetaLocation.offno = offno; + MarkBufferDirty(cbuf); + if (curblkno != firstTokenMetasBlkno) + UnlockReleaseBuffer(cbuf); + UnlockReleaseBuffer(firstBuf); + pfree(tokenMeta); + return; +} + +static void InsertItemToPostingList(Relation index, BM25PageLocationInfo &tokenMetaLocation, + BM25TokenData &tokenData, uint32 docLength, uint32 docId, float score, ForkNumber forkNum) +{ + Page cpageTokenMeta; + Buffer cbufTokenMeta = ReadBuffer(index, tokenMetaLocation.blkno); + LockBuffer(cbufTokenMeta, BUFFER_LOCK_EXCLUSIVE); + cpageTokenMeta = BufferGetPage(cbufTokenMeta); + BM25TokenMetaPage tokenMeta = (BM25TokenMetaPage)PageGetItem(cpageTokenMeta, PageGetItemId(cpageTokenMeta, tokenMetaLocation.offno)); + if (tokenMeta->postingBlkno == InvalidBlockNumber) { + tokenMeta->postingBlkno = CreateBM25CommonPage(index, forkNum); + } + tokenMeta->maxScore = (score > tokenMeta->maxScore) ? score : tokenMeta->maxScore; + (tokenMeta->docCount)++; + BlockNumber firstPostingsBlkno = tokenMeta->postingBlkno; + MarkBufferDirty(cbufTokenMeta); + UnlockReleaseBuffer(cbufTokenMeta); + + /* lock posting list */ + Buffer firstBuf; + Page firstPage; + BlockNumber insertPage = firstPostingsBlkno; + firstBuf = ReadBuffer(index, firstPostingsBlkno); + LockBuffer(firstBuf, BUFFER_LOCK_EXCLUSIVE); + firstPage = BufferGetPage(firstBuf); + + Buffer cbuf; + Page cpage; + bool isFirstPage = true; + for (;;) { + OffsetNumber maxoffno; + if (insertPage == firstPostingsBlkno) { + cbuf = firstBuf; + cpage = firstPage; + } else { + cbuf = ReadBuffer(index, insertPage); + LockBuffer(cbuf, BUFFER_LOCK_EXCLUSIVE); + cpage = BufferGetPage(cbuf); + isFirstPage = false; + } + if (PageGetFreeSpace(cpage) >= MAXALIGN(sizeof(BM25TokenPostingItem))) { + break; + } + insertPage = BM25PageGetOpaque(cpage)->nextblkno; + if (BlockNumberIsValid(insertPage)) { + if (!isFirstPage) + UnlockReleaseBuffer(cbuf); + } else { + Buffer newbuf; + Page newpage; + newbuf = BM25NewBuffer(index, forkNum); + newpage = BufferGetPage(newbuf); + BM25InitPage(newbuf, newpage); + insertPage = BufferGetBlockNumber(newbuf); + BM25PageGetOpaque(cpage)->nextblkno = insertPage; + MarkBufferDirty(cbuf); + if (!isFirstPage) + UnlockReleaseBuffer(cbuf); + cbuf = newbuf; + cpage = BufferGetPage(cbuf); + isFirstPage = false; + break; + } + } + + BM25TokenPostingPage postingItem = (BM25TokenPostingPage)palloc0(MAXALIGN(sizeof(BM25TokenPostingItem))); + postingItem->docId = docId; + postingItem->docLength = (uint16)(docLength > PG_UINT16_MAX ? PG_UINT16_MAX : docLength); + postingItem->freq = (uint16)(tokenData.tokenFreq > PG_UINT16_MAX ? PG_UINT16_MAX : tokenData.tokenFreq); + OffsetNumber offno = PageAddItem(cpage, (Item)postingItem, MAXALIGN(sizeof(BM25TokenPostingItem)), InvalidOffsetNumber, false, false); + if (offno == InvalidOffsetNumber) { + pfree(postingItem); + if (!isFirstPage) + UnlockReleaseBuffer(cbuf); + UnlockReleaseBuffer(firstBuf); + elog(ERROR, "failed to add index item [BM25TokenPostingItem] to \"%s\"", RelationGetRelationName(index)); + } + MarkBufferDirty(cbuf); + if (!isFirstPage) + UnlockReleaseBuffer(cbuf); + UnlockReleaseBuffer(firstBuf); + return; +} + +static void InsertToIvertedList(Relation index, uint32 docId, BM25TokenizedDocData &tokenizedDoc, float avgdl, + BM25EntryPages &bm25EntryPages, ForkNumber forkNum) +{ + BM25Scorer scorer = BM25Scorer(u_sess->attr.attr_sql.bm25_k1, u_sess->attr.attr_sql.bm25_b, avgdl); + float docLen = 0; + for (uint32 tokenIdx = 0; tokenIdx < tokenizedDoc.tokenCount; tokenIdx++) { + float freqVal = tokenizedDoc.tokenDatas[tokenIdx].tokenFreq; + docLen += freqVal; + float score = scorer.GetDocBM25Score(freqVal, docLen); + uint32 bucketId = tokenizedDoc.tokenDatas[tokenIdx].hashValue % BM25_BUCKET_MAX_NUM; + BM25PageLocationInfo bucketLocation{0}; + BM25PageLocationInfo tokenMetaLocation{0}; + InsertItemToHashBucket(index, bm25EntryPages, bucketId, bucketLocation, forkNum); + InsertItemToTokenMetaList(index, bucketLocation, tokenizedDoc.tokenDatas[tokenIdx], tokenMetaLocation, forkNum); + InsertItemToPostingList(index, tokenMetaLocation, tokenizedDoc.tokenDatas[tokenIdx], tokenizedDoc.docLength, + docId, score, forkNum); + } + return; +} + static void FreeBuildState(BM25BuildState *buildstate) { MemoryContextDelete(buildstate->tmpCtx); } -static void AllocateForwardIdxForToken(Relation index, uint32 tokenCount, uint64 *start, uint64 *end, BM25DocForwardMetaPage metaPage) +static void AllocateForwardIdxForToken(Relation index, uint32 tokenCount, uint64 *start, uint64 *end, + BM25DocForwardMetaPage metaPage, ForkNumber forkNum) { Buffer buf; Page page; - GenericXLogState *state; /* first page */ if (metaPage->capacity == 0) { - buf = BM25NewBuffer(index, MAIN_FORKNUM); - BM25InitRegisterPage(index, &buf, &page, &state); + buf = BM25NewBuffer(index, forkNum); + page = BufferGetPage(buf); + BM25InitPage(buf, page); BlockNumber newblk = BufferGetBlockNumber(buf); metaPage->startPage = newblk; metaPage->lastPage = newblk; metaPage->size = 0; metaPage->capacity = BM25_DOC_FORWARD_MAX_COUNT_IN_PAGE; - BM25CommitBuffer(buf, state); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); } /* need expand new page */ if (metaPage->capacity - metaPage->size < tokenCount) { buf = ReadBuffer(index, metaPage->lastPage); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - state = GenericXLogStart(index); - page = GenericXLogRegisterBuffer(state, buf, 0); + page = BufferGetPage(buf); /* start append */ while (metaPage->capacity - metaPage->size < tokenCount) { - BM25AppendPage(index, &buf, &page, &state, MAIN_FORKNUM); + BM25AppendPage(index, &buf, &page, MAIN_FORKNUM); BlockNumber newblk = BufferGetBlockNumber(buf); metaPage->lastPage = newblk; metaPage->capacity += BM25_DOC_FORWARD_MAX_COUNT_IN_PAGE; } - BM25CommitBuffer(buf, state); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); } *start = metaPage->size; *end = metaPage->size + tokenCount - 1; @@ -110,13 +400,12 @@ static BlockNumber SeekForwardBlknoForToken(Relation index, BlockNumber startBlk return curBlkno; } -static void InsertDocForwardItem(Relation index, uint32 docId, BM25TokenizedDocData &tokenizedDoc, BM25EntryPages &bm25EntryPages - , uint64 *forwardStart, uint64 *forwardEnd) +static void InsertDocForwardItem(Relation index, uint32 docId, BM25TokenizedDocData &tokenizedDoc, BM25EntryPages &bm25EntryPages, + uint64 *forwardStart, uint64 *forwardEnd, ForkNumber forkNum) { Buffer buf; Page page; BlockNumber forwardStartBlkno; - GenericXLogState *state; Buffer metabuf; Page metapage; BM25DocForwardMetaPage metaForwardPage; @@ -124,12 +413,11 @@ static void InsertDocForwardItem(Relation index, uint32 docId, BM25TokenizedDocD /* open forward list */ metabuf = ReadBuffer(index, bm25EntryPages.docForwardPage); LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE); - state = GenericXLogStart(index); - metapage = GenericXLogRegisterBuffer(state, metabuf, 0); + metapage = BufferGetPage(metabuf); metaForwardPage = BM25PageGetDocForwardMeta(metapage); - AllocateForwardIdxForToken(index, tokenizedDoc.tokenCount, forwardStart, forwardEnd, metaForwardPage); + AllocateForwardIdxForToken(index, tokenizedDoc.tokenCount, forwardStart, forwardEnd, metaForwardPage, forkNum); forwardStartBlkno = metaForwardPage->startPage; - BM25CommitBuffer(metabuf, state); + MarkBufferDirty(metabuf); uint64 tokenIdx = *forwardStart; BlockNumber curStep = tokenIdx / BM25_DOC_FORWARD_MAX_COUNT_IN_PAGE; @@ -139,19 +427,19 @@ static void InsertDocForwardItem(Relation index, uint32 docId, BM25TokenizedDocD buf = ReadBuffer(index, curBlkno); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - state = GenericXLogStart(index); - page = GenericXLogRegisterBuffer(state, buf, GENERIC_XLOG_FULL_IMAGE); + page = BufferGetPage(buf); + for (int i = 0; i < tokenizedDoc.tokenCount; i++) { curStep = tokenIdx / BM25_DOC_FORWARD_MAX_COUNT_IN_PAGE; offset = tokenIdx % BM25_DOC_FORWARD_MAX_COUNT_IN_PAGE; if (curStep != preStep) { curBlkno = BM25PageGetOpaque(page)->nextblkno; - BM25CommitBuffer(buf, state); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); buf = ReadBuffer(index, curBlkno); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - state = GenericXLogStart(index); - page = GenericXLogRegisterBuffer(state, buf, GENERIC_XLOG_FULL_IMAGE); + page = BufferGetPage(buf); preStep = curStep; } BM25DocForwardItem *forwardItem = @@ -160,47 +448,50 @@ static void InsertDocForwardItem(Relation index, uint32 docId, BM25TokenizedDocD forwardItem->tokenHash = tokenizedDoc.tokenDatas[i].tokenId; tokenIdx++; } - BM25CommitBuffer(buf, state); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); } -static bool ExpandDocumentListCapacityIfNeed(Relation index, BM25DocMetaPage docMetaPage, uint32 docId) +static bool ExpandDocumentListCapacityIfNeed(Relation index, BM25DocMetaPage docMetaPage, uint32 docId, + ForkNumber forkNum) { Buffer buf; Page page; - GenericXLogState *state; bool expanded = false; /* start doc page */ if (docMetaPage->docCapacity == 0) { - buf = BM25NewBuffer(index, MAIN_FORKNUM); - BM25InitRegisterPage(index, &buf, &page, &state); + buf = BM25NewBuffer(index, forkNum); + page = BufferGetPage(buf); + BM25InitPage(buf, page); BlockNumber newblk = BufferGetBlockNumber(buf); docMetaPage->startDocPage = newblk; docMetaPage->lastDocPage = newblk; docMetaPage->docCapacity = BM25_DOCUMENT_MAX_COUNT_IN_PAGE; - BM25CommitBuffer(buf, state); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); expanded = true; } /* need expand new page */ if (docMetaPage->docCapacity <= docId) { buf = ReadBuffer(index, docMetaPage->lastDocPage); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - state = GenericXLogStart(index); - page = GenericXLogRegisterBuffer(state, buf, 0); + page = BufferGetPage(buf); /* start append */ while (docMetaPage->docCapacity <= docId) { - BM25AppendPage(index, &buf, &page, &state, MAIN_FORKNUM); + BM25AppendPage(index, &buf, &page, MAIN_FORKNUM); BlockNumber newblk = BufferGetBlockNumber(buf); docMetaPage->lastDocPage = newblk; docMetaPage->docCapacity += BM25_DOCUMENT_MAX_COUNT_IN_PAGE; expanded = true; } - BM25CommitBuffer(buf, state); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); } return expanded; } static void InsertDocumentItem(Relation index, uint32 docId, BM25TokenizedDocData &tokenizedDoc, ItemPointerData &ctid, - BM25EntryPages &bm25EntryPages) + BM25EntryPages &bm25EntryPages, ForkNumber forkNum) { Buffer buf; Page page; @@ -208,7 +499,6 @@ static void InsertDocumentItem(Relation index, uint32 docId, BM25TokenizedDocDat BlockNumber docRealBlkno; BlockNumber startDocPage; uint16 offset; - GenericXLogState *state; Buffer metabuf; Page metapage; BM25DocMetaPage docMetaPage; @@ -218,14 +508,14 @@ static void InsertDocumentItem(Relation index, uint32 docId, BM25TokenizedDocDat /* open document list */ metabuf = ReadBuffer(index, bm25EntryPages.documentMetaPage); LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE); - state = GenericXLogStart(index); - metapage = GenericXLogRegisterBuffer(state, metabuf, 0); + metapage = BufferGetPage(metabuf); docMetaPage = BM25PageGetDocMeta(metapage); - ExpandDocumentListCapacityIfNeed(index, docMetaPage, docId); + ExpandDocumentListCapacityIfNeed(index, docMetaPage, docId, forkNum); startDocPage = docMetaPage->startDocPage; - BM25CommitBuffer(metabuf, state); + MarkBufferDirty(metabuf); + UnlockReleaseBuffer(metabuf); - InsertDocForwardItem(index, docId, tokenizedDoc, bm25EntryPages, &forwardStart, &forwardEnd); + InsertDocForwardItem(index, docId, tokenizedDoc, bm25EntryPages, &forwardStart, &forwardEnd, forkNum); /* write doc info into target blk */ step = docId / BM25_DOCUMENT_MAX_COUNT_IN_PAGE; @@ -233,8 +523,7 @@ static void InsertDocumentItem(Relation index, uint32 docId, BM25TokenizedDocDat docRealBlkno = SeekBlocknoForDoc(index, docId, startDocPage, step); buf = ReadBuffer(index, docRealBlkno); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - state = GenericXLogStart(index); - page = GenericXLogRegisterBuffer(state, buf, GENERIC_XLOG_FULL_IMAGE); + page = BufferGetPage(buf); BM25DocumentItem *docItem = (BM25DocumentItem*)((char *)page + sizeof(PageHeaderData) + offset * BM25_DOCUMENT_ITEM_SIZE); unsigned short infomask = 0 | BM25_DOCUMENT_ITEM_SIZE; @@ -245,7 +534,37 @@ static void InsertDocumentItem(Relation index, uint32 docId, BM25TokenizedDocDat docItem->isActived = true; docItem->tokenStartIdx = forwardStart; docItem->tokenEndIdx = forwardEnd; - BM25CommitBuffer(buf, state); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); +} + +static bool BM25InsertDocument(Relation index, Datum *values, ItemPointerData &ctid, BM25EntryPages &bm25EntryPages, ForkNumber forkNum) +{ + MemoryContext tempCtx = AllocSetContextCreate(CurrentMemoryContext, + "temp bm25 index context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContext oldCtx = MemoryContextSwitchTo(tempCtx); + /* new */ + BM25TokenizedDocData tokenizedDoc = BM25DocumentTokenize(TextDatumGetCString(values[0])); + if (tokenizedDoc.tokenCount == 0) { + MemoryContextSwitchTo(oldCtx); + MemoryContextDelete(tempCtx); + ereport(ERROR, (errmsg("No tokens in document for index: %u, please check.", RelationGetRelationName(index)))); + return false; + } + uint32 docId = BM25AllocateDocId(index); + float avgdl = 1.f; + BM25IncreaseDocAndTokenCount(index, tokenizedDoc.docLength, avgdl); + InsertToIvertedList(index, docId, tokenizedDoc, avgdl, bm25EntryPages, forkNum); + InsertDocumentItem(index, docId, tokenizedDoc, ctid, bm25EntryPages, forkNum); + if (tokenizedDoc.tokenDatas != nullptr) { + pfree(tokenizedDoc.tokenDatas); + } + MemoryContextSwitchTo(oldCtx); + MemoryContextDelete(tempCtx); + return true; } /* @@ -266,7 +585,7 @@ static void BM25BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *valu oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); /* insert document */ - BM25InsertDocument(index, values, hup->t_self, buildstate->bm25EntryPages); + BM25InsertDocument(index, values, hup->t_self, buildstate->bm25EntryPages, buildstate->forkNum); buildstate->indtuples++; /* Reset memory context */ @@ -274,30 +593,45 @@ static void BM25BuildCallback(Relation index, CALLBACK_ITEM_POINTER, Datum *valu MemoryContextReset(buildstate->tmpCtx); } -static bool BM25InsertDocument(Relation index, Datum *values, ItemPointerData &ctid, BM25EntryPages &bm25EntryPages) +static BlockNumber CreateDocMetaPage(Relation index, ForkNumber forkNum) { - MemoryContext tempCtx = AllocSetContextCreate(CurrentMemoryContext, - "temp bm25 index context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContext oldCtx = MemoryContextSwitchTo(tempCtx); - /* todo tokenizer and insert doc */ - MemoryContextSwitchTo(oldCtx); - MemoryContextDelete(tempCtx); - return true; + Page page; + GenericXLogState *state; + BlockNumber metaBlkbo; + BM25DocMetaPage docMetaPage; + + // create matepage + Buffer buf = BM25NewBuffer(index, forkNum); + page = BufferGetPage(buf); + BM25InitPage(buf, page); + + docMetaPage = BM25PageGetDocMeta(page); + docMetaPage->startDocPage = InvalidBlockNumber; + docMetaPage->lastDocPage = InvalidBlockNumber; + docMetaPage->docCapacity = 0; + + BM25PageGetOpaque(page)->nextblkno = InvalidBlockNumber; + BM25PageGetOpaque(page)->page_id = BM25_PAGE_ID; + BM25PageGetOpaque(page)->unused = 0; + + ((PageHeader)page)->pd_lower = ((char *)docMetaPage + sizeof(BM25DocumentMetaPageData)) - (char *)page; + + metaBlkbo = BufferGetBlockNumber(buf); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); + return metaBlkbo; } static BlockNumber CreateDocForwardMetaPage(Relation index, ForkNumber forkNum) { Page page; - GenericXLogState *state; BlockNumber metaBlkbo; BM25DocForwardMetaPage forwardMetaPage; // create matepage Buffer buf = BM25NewBuffer(index, forkNum); - BM25InitRegisterPage(index, &buf, &page, &state); + page = BufferGetPage(buf); + BM25InitPage(buf, page); forwardMetaPage = BM25PageGetDocForwardMeta(page); forwardMetaPage->startPage = InvalidBlockNumber; @@ -312,7 +646,8 @@ static BlockNumber CreateDocForwardMetaPage(Relation index, ForkNumber forkNum) ((PageHeader)page)->pd_lower = ((char *)forwardMetaPage + sizeof(BM25DocForwardMetaPageData)) - (char *)page; metaBlkbo = BufferGetBlockNumber(buf); - BM25CommitBuffer(buf, state); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); return metaBlkbo; } @@ -321,6 +656,8 @@ static BM25EntryPages CreateEntryPages(Relation index, ForkNumber forkNum) BM25EntryPages entryPages; entryPages.documentMetaPage = CreateDocMetaPage(index, forkNum); entryPages.docForwardPage = CreateDocForwardMetaPage(index, forkNum); + entryPages.hashBucketsPage = CreateBM25CommonPage(index, forkNum); + entryPages.hashBucketCount = 0; return entryPages; } @@ -331,11 +668,11 @@ static void CreateMetaPage(Relation index, BM25BuildState *buildstate, ForkNumbe { Buffer buf; Page page; - GenericXLogState *state; BM25MetaPage metap; buf = BM25NewBuffer(index, forkNum); - BM25InitRegisterPage(index, &buf, &page, &state); + page = BufferGetPage(buf); + BM25InitPage(buf, page); /* Set metapage data */ metap = BM25PageGetMeta(page); @@ -350,7 +687,353 @@ static void CreateMetaPage(Relation index, BM25BuildState *buildstate, ForkNumbe ((PageHeader)page)->pd_lower = ((char *)metap + sizeof(BM25MetaPageData)) - (char *)page; - BM25CommitBuffer(buf, state); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); +} + +static BM25Shared *BM25ParallelInitshared(BM25BuildState *buildstate) +{ + BM25Shared *bm25shared = nullptr; + + /* Store shared build state, for which we reserved space */ + bm25shared = + (BM25Shared *)MemoryContextAllocZero(INSTANCE_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE), sizeof(BM25Shared)); + + /* Initialize immutable state */ + bm25shared->heaprelid = RelationGetRelid(buildstate->heap); + bm25shared->indexrelid = RelationGetRelid(buildstate->index); + + bm25shared->bm25EntryPages = buildstate->bm25EntryPages; + SpinLockInit(&bm25shared->mutex); + /* Initialize mutable state */ + bm25shared->nparticipantsdone = 0; + bm25shared->reltuples = 0; + HeapParallelscanInitialize(&bm25shared->heapdesc, buildstate->heap); + return bm25shared; +} + +static void BM25ParallelScanAndInsert(Relation heapRel, Relation indexRel, BM25Shared *bm25shared) +{ + BM25BuildState buildstate; + TableScanDesc scan; + double reltuples; + IndexInfo *indexInfo; + + /* Join parallel scan */ + indexInfo = BuildIndexInfo(indexRel); + InitBM25BuildState(&buildstate, heapRel, indexRel, indexInfo, MAIN_FORKNUM); + buildstate.bm25EntryPages = bm25shared->bm25EntryPages; + + scan = tableam_scan_begin_parallel(heapRel, &bm25shared->heapdesc); + reltuples = tableam_index_build_scan(heapRel, indexRel, indexInfo, true, BM25BuildCallback, (void *)&buildstate, scan); + + /* Record statistics */ + SpinLockAcquire(&bm25shared->mutex); + bm25shared->nparticipantsdone++; + bm25shared->reltuples += reltuples; + bm25shared->bm25EntryPages.hashBucketCount += buildstate.bm25EntryPages.hashBucketCount; + SpinLockRelease(&bm25shared->mutex); + + FreeBuildState(&buildstate); +} + +void BM25ParallelBuildMain(const BgWorkerContext *bwc) +{ + BM25Shared *bm25shared = nullptr; + Relation heapRel; + Relation indexRel; + + /* Look up shared state */ + bm25shared = (BM25Shared *)bwc->bgshared; + + /* Open relations within worker */ + heapRel = heap_open(bm25shared->heaprelid, NoLock); + indexRel = index_open(bm25shared->indexrelid, NoLock); + + /* Perform inserts */ + BM25ParallelScanAndInsert(heapRel, indexRel, bm25shared); + + /* Close relations within worker */ + index_close(indexRel, NoLock); + heap_close(heapRel, NoLock); +} + +static void BM25EndParallel(BM25Leader *bm25leader) +{ + pfree_ext(bm25leader); + BgworkerListSyncQuit(); +} + +/* + * Begin parallel build + */ +static void BM25BeginParallel(BM25BuildState *buildstate, int request) +{ + BM25Shared *bm25shared = nullptr; + BM25Leader *bm25leader = (BM25Leader *)palloc0(sizeof(BM25Leader)); + + Assert(request > 0); + + SpinLockInit(&newBufferMutex); + + bm25shared = BM25ParallelInitshared(buildstate); + /* Launch workers, saving status for leader/caller */ + bm25leader->nparticipanttuplesorts = LaunchBackgroundWorkers(request, bm25shared, BM25ParallelBuildMain, NULL); + bm25leader->bm25shared = bm25shared; + + /* If no workers were successfully launched, back out (do serial build) */ + if (bm25leader->nparticipanttuplesorts == 0) { + BM25EndParallel(bm25leader); + return; + } + + /* Log participants */ + ereport(DEBUG1, (errmsg("using %d parallel workers", bm25leader->nparticipanttuplesorts))); + + /* Save leader state now that it's clear build will be parallel */ + buildstate->bm25leader = bm25leader; +} + +static double ParallelHeapScan(BM25BuildState *buildstate, int *nparticipanttuplesorts) +{ + BM25Shared *bm25shared = buildstate->bm25leader->bm25shared; + double reltuples; + + BgworkerListWaitFinish(&buildstate->bm25leader->nparticipanttuplesorts); + pg_memory_barrier(); + + *nparticipanttuplesorts = buildstate->bm25leader->nparticipanttuplesorts; + reltuples = bm25shared->reltuples; + + return reltuples; +} + +static FORCE_INLINE int ComparePostingFunc(const void *left, const void *right) +{ + BM25TokenPostingPage leftToken = (BM25TokenPostingPage)left; + BM25TokenPostingPage rightToken = (BM25TokenPostingPage)right; + return leftToken->docId - rightToken->docId; +} + +void ReorderPosting(Relation index, BlockNumber postingBlkno, uint32 docCount) +{ + BM25TokenPostingPage postings = (BM25TokenPostingPage)palloc0(sizeof(BM25TokenPostingItem) * docCount); + BlockNumber nextblkno = postingBlkno; + uint32 docIdx = 0; + Buffer cbuf; + Page cpage; + while (BlockNumberIsValid(nextblkno)) { + OffsetNumber maxoffno; + cbuf = ReadBuffer(index, nextblkno); + LockBuffer(cbuf, BUFFER_LOCK_SHARE); + cpage = BufferGetPage(cbuf); + maxoffno = PageGetMaxOffsetNumber(cpage); + for (OffsetNumber offno = FirstOffsetNumber; offno <= maxoffno; offno = OffsetNumberNext(offno)) { + BM25TokenPostingPage item = (BM25TokenPostingPage)PageGetItem(cpage, PageGetItemId(cpage, offno)); + if (docIdx < docCount) { + postings[docIdx] = *item; + docIdx++; + } + + } + nextblkno = BM25PageGetOpaque(cpage)->nextblkno; + UnlockReleaseBuffer(cbuf); + } + qsort(postings, (size_t)docCount, sizeof(BM25TokenPostingItem), ComparePostingFunc); + + // rewrite to posting list + docIdx = 0; + nextblkno = postingBlkno; + while (BlockNumberIsValid(nextblkno)) { + OffsetNumber maxoffno; + cbuf = ReadBuffer(index, nextblkno); + LockBuffer(cbuf, BUFFER_LOCK_EXCLUSIVE); + cpage = BufferGetPage(cbuf); + maxoffno = PageGetMaxOffsetNumber(cpage); + for (OffsetNumber offno = FirstOffsetNumber; offno <= maxoffno; offno = OffsetNumberNext(offno)) { + BM25TokenPostingPage item = (BM25TokenPostingPage)PageGetItem(cpage, PageGetItemId(cpage, offno)); + if (docIdx < docCount) { + *item = postings[docIdx]; + docIdx++; + } + } + nextblkno = BM25PageGetOpaque(cpage)->nextblkno; + MarkBufferDirty(cbuf); + UnlockReleaseBuffer(cbuf); + } + pfree_ext(postings); + return; +} + +void ReorderBucket(Relation index, BlockNumber bucketBlkno) +{ + // loop buckets + BlockNumber nextblkno = bucketBlkno; + Buffer cbuf; + Page cpage; + while (BlockNumberIsValid(nextblkno)) { + OffsetNumber maxoffno; + cbuf = ReadBuffer(index, nextblkno); + LockBuffer(cbuf, BUFFER_LOCK_SHARE); + cpage = BufferGetPage(cbuf); + maxoffno = PageGetMaxOffsetNumber(cpage); + for (OffsetNumber offno = FirstOffsetNumber; offno <= maxoffno; offno = OffsetNumberNext(offno)) { + BM25TokenMetaPage item = (BM25TokenMetaPage)PageGetItem(cpage, PageGetItemId(cpage, offno)); + ReorderPosting(index, item->postingBlkno, item->docCount); + } + nextblkno = BM25PageGetOpaque(cpage)->nextblkno; + UnlockReleaseBuffer(cbuf); + } + return; +} + +void ParallelReorderMain(const BgWorkerContext *bwc) +{ + Relation heapRel; + Relation indexRel; + + /* Look up shared state */ + BM25ReorderShared *reorderShared = (BM25ReorderShared *)bwc->bgshared; + pg_atomic_add_fetch_u32(&reorderShared->curThreadId, 1); + + /* Open relations within worker */ + heapRel = heap_open(reorderShared->heaprelid, NoLock); + indexRel = index_open(reorderShared->indexrelid, NoLock); + + BM25PageLocationInfo startLocation = reorderShared->startPageLocation[reorderShared->curThreadId - 1]; + ereport(LOG, (errmsg("launch reorder background threadId: %d.", reorderShared->curThreadId))); + + // loop buckets + BlockNumber nextblkno = startLocation.blkno; + OffsetNumber startoffno = startLocation.offno; + bool isStartPage = true; + bool isEnd = false; + uint32 scanBucketCount = 0; + Buffer cbuf; + Page cpage; + while (BlockNumberIsValid(nextblkno)) { + OffsetNumber maxoffno; + cbuf = ReadBuffer(indexRel, nextblkno); + LockBuffer(cbuf, BUFFER_LOCK_SHARE); + cpage = BufferGetPage(cbuf); + maxoffno = PageGetMaxOffsetNumber(cpage); + for (OffsetNumber offno = (isStartPage ? startoffno : FirstOffsetNumber); offno <= maxoffno; offno = OffsetNumberNext(offno)) { + scanBucketCount++; + BM25HashBucketItem *item = (BM25HashBucketItem *)PageGetItem(cpage, PageGetItemId(cpage, offno)); + ReorderBucket(indexRel, item->bucketBlkno); + if (scanBucketCount >= reorderShared->batchCount) { + isEnd = true; + break; + } + } + isStartPage = false; + nextblkno = BM25PageGetOpaque(cpage)->nextblkno; + UnlockReleaseBuffer(cbuf); + if (isEnd) { + break; + } + } + + /* Close relations within worker */ + index_close(indexRel, NoLock); + heap_close(heapRel, NoLock); +} + +static void BM25InitReorderShared(BM25ReorderShared *reorderShared, BM25BuildState *buildstate, BlockNumber hashBucketsPage, uint32 reorderParallelNum, uint32 batchHashBucketCount) +{ + + reorderShared->startPageLocation = (BM25PageLocationInfo*)palloc0(sizeof(BM25PageLocationInfo) * reorderParallelNum); + reorderShared->batchCount = batchHashBucketCount; + reorderShared->heaprelid = RelationGetRelid(buildstate->heap); + reorderShared->indexrelid = RelationGetRelid(buildstate->index); + pg_atomic_init_u32(&reorderShared->curThreadId, 0); + + BlockNumber nextblkno = hashBucketsPage; + BlockNumber curblkno = nextblkno; + Buffer cbuf; + Page cpage; + uint32 curHashBucketCount = 0; + uint32 curBatchIdx = 0; + while (BlockNumberIsValid(nextblkno)) { + OffsetNumber maxoffno; + cbuf = ReadBuffer(buildstate->index, nextblkno); + LockBuffer(cbuf, BUFFER_LOCK_SHARE); + cpage = BufferGetPage(cbuf); + maxoffno = PageGetMaxOffsetNumber(cpage); + for (OffsetNumber offno = FirstOffsetNumber; offno <= maxoffno; offno = OffsetNumberNext(offno)) { + if (curHashBucketCount == 0 && curBatchIdx < reorderParallelNum) { + reorderShared->startPageLocation[curBatchIdx].blkno = nextblkno; + reorderShared->startPageLocation[curBatchIdx].offno = offno; + curBatchIdx++; + } + curHashBucketCount++; + if (curHashBucketCount >= batchHashBucketCount) { + curHashBucketCount = 0; + } + } + curblkno = nextblkno; + nextblkno = BM25PageGetOpaque(cpage)->nextblkno; + UnlockReleaseBuffer(cbuf); + } + return; +} + +static void BuildBM25Index(BM25BuildState *buildstate, ForkNumber forkNum) +{ + int parallel_workers = 0; + + /* Calculate parallel workers */ + if (buildstate->heap != NULL) { + parallel_workers = PlanCreateIndexWorkers(buildstate->heap, buildstate->indexInfo); + } + + /* Attempt to launch parallel worker scan when required */ + if (parallel_workers > 0) { + BM25BeginParallel(buildstate, parallel_workers); + } + + if (buildstate->heap != NULL) { + if (!buildstate->bm25leader) { + serial_build: + buildstate->reltuples = tableam_index_build_scan(buildstate->heap, buildstate->index, buildstate->indexInfo, false, + BM25BuildCallback, (void *)buildstate, NULL); + } else { + int nruns; + buildstate->reltuples = ParallelHeapScan(buildstate, &nruns); + if (nruns == 0) { + /* failed to startup any bgworker, retry to do serial build */ + goto serial_build; + } + } + } + + if (buildstate->bm25leader) { + uint32 hashBucketCount = buildstate->bm25leader->bm25shared->bm25EntryPages.hashBucketCount; + BlockNumber hashBucketsPage = buildstate->bm25leader->bm25shared->bm25EntryPages.hashBucketsPage; + BM25EndParallel(buildstate->bm25leader); + + /* reorder posting list*/ + uint32 reorderParallelNum = hashBucketCount < parallel_workers ? hashBucketCount : parallel_workers; + if (reorderParallelNum == 0) { + return; + } + uint32 batchHashBucketCount = (hashBucketCount / reorderParallelNum) + 1; + BM25ReorderShared *reorderShared = + (BM25ReorderShared *)MemoryContextAllocZero(INSTANCE_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE), sizeof(BM25ReorderShared)); + BM25InitReorderShared(reorderShared, buildstate, hashBucketsPage, reorderParallelNum, batchHashBucketCount); + + int successWorkers = LaunchBackgroundWorkers(reorderParallelNum, reorderShared, ParallelReorderMain, NULL); + ereport(LOG, (errmsg("launch reorder background workers: %d.", successWorkers))); + if (successWorkers == 0) { + pfree_ext(reorderShared->startPageLocation); + pfree_ext(reorderShared); + ereport(ERROR, (errmsg("Failed to launch background workers: ParallelReorderMain"))); + } + BgworkerListWaitFinish(&successWorkers); + pfree_ext(reorderShared->startPageLocation); + BgworkerListSyncQuit(); + } + return; } /* @@ -359,12 +1042,14 @@ static void CreateMetaPage(Relation index, BM25BuildState *buildstate, ForkNumbe static void BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo, BM25BuildState *buildstate, ForkNumber forkNum) { - InitBM25BuildState(buildstate, heap, index, indexInfo); + InitBM25BuildState(buildstate, heap, index, indexInfo, forkNum); CreateMetaPage(index, buildstate, forkNum); - if (heap != NULL) { - buildstate->reltuples = tableam_index_build_scan(buildstate->heap, buildstate->index, buildstate->indexInfo, false, - BM25BuildCallback, (void *)buildstate, NULL); - } + + BuildBM25Index(buildstate, forkNum); + + if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM) + LogNewpageRange(index, forkNum, 0, RelationGetNumberOfBlocksInFork(index, forkNum), false); + FreeBuildState(buildstate); } @@ -394,6 +1079,6 @@ bool bm25insert_internal(Relation index, Datum *values, ItemPointer heapCtid) BM25MetaPageData meta; BM25GetMetaPageInfo(index, &meta); - BM25InsertDocument(index, values, *heapCtid, meta.entryPageList); + BM25InsertDocument(index, values, *heapCtid, meta.entryPageList, MAIN_FORKNUM); return true; } \ No newline at end of file diff --git a/src/gausskernel/storage/access/datavec/bm25scan.cpp b/src/gausskernel/storage/access/datavec/bm25scan.cpp index 0924d7bae..8aabc2390 100644 --- a/src/gausskernel/storage/access/datavec/bm25scan.cpp +++ b/src/gausskernel/storage/access/datavec/bm25scan.cpp @@ -32,11 +32,133 @@ #include "access/heapam.h" #include "catalog/index.h" #include "access/tableam.h" +#include "db4ai/bayesnet.h" #include "access/datavec/bm25.h" +typedef struct BM25QueryToken { + BlockNumber tokenPostingBlock; + float qTokenMaxScore; + float qTokenIDFVal; +} BM25QueryToken; + +typedef struct BM25QueryTokensInfo { + BM25QueryToken *queryTokens; + uint32 size; +} BM25QueryTokensInfo; + +static void FindBucketsLocation(Page page, BM25TokenizedDocData &tokenizedQuery, BlockNumber *bucketsLocation, uint32 &bucketFoundCount) +{ + OffsetNumber maxoffno = PageGetMaxOffsetNumber(page); + for (OffsetNumber offnoBucket = FirstOffsetNumber; offnoBucket <= maxoffno; offnoBucket++) { + BM25HashBucketPage bucket = (BM25HashBucketPage)PageGetItem(page, PageGetItemId(page, offnoBucket)); + for (size_t tokenIdx = 0; tokenIdx < tokenizedQuery.tokenCount; tokenIdx++) { + if (bucketsLocation[tokenIdx] == InvalidBlockNumber && + bucket->bucketId == (tokenizedQuery.tokenDatas[tokenIdx].hashValue % BM25_BUCKET_MAX_NUM)) { + bucketsLocation[tokenIdx] = bucket->bucketBlkno; + bucketFoundCount++; + if (bucketFoundCount >= tokenizedQuery.tokenCount) + return; + } + } + } + return; +} + +static void FindTokenInfo(BM25MetaPageData &meta, Page page, BM25TokenizedDocData &tokenizedQuery, + BM25QueryToken *queryTokens, size_t tokenIdx, uint32 &tokenFoundCount) +{ + OffsetNumber maxoffno = PageGetMaxOffsetNumber(page); + for (OffsetNumber offnoTokenMeta = FirstOffsetNumber; offnoTokenMeta <= maxoffno; offnoTokenMeta++) { + BM25TokenMetaPage tokenMeta = (BM25TokenMetaPage)PageGetItem(page, PageGetItemId(page, offnoTokenMeta)); + if (strncmp(tokenMeta->token, tokenizedQuery.tokenDatas[tokenIdx].tokenValue, BM25_MAX_TOKEN_LEN - 1) == 0) { + queryTokens[tokenIdx].qTokenMaxScore = tokenMeta->maxScore; + queryTokens[tokenIdx].tokenPostingBlock = tokenMeta->postingBlkno; + queryTokens[tokenIdx].qTokenIDFVal = tokenizedQuery.tokenDatas[tokenIdx].tokenFreq * + std::log((1 + ((float)meta.documentCount - (float)tokenMeta->docCount + 0.5) / ((float)tokenMeta->docCount + 0.5))); + tokenFoundCount++; + if (tokenFoundCount >= tokenizedQuery.tokenCount) + return; + } + } + return; +} + +static BM25QueryTokensInfo GetQueryTokens(Relation index, const char* sentence) +{ + BM25TokenizedDocData tokenizedQuery = BM25DocumentTokenize(sentence); + if (tokenizedQuery.tokenCount == 0) { + BM25QueryTokensInfo emptyTokensInfo{0}; + emptyTokensInfo.queryTokens = nullptr; + emptyTokensInfo.size = 0; + return emptyTokensInfo; + } + BM25QueryToken *queryTokens = (BM25QueryToken*)palloc0(sizeof(BM25QueryToken) * tokenizedQuery.tokenCount); + BlockNumber *bucketsLocation = (BlockNumber*)palloc0(sizeof(BlockNumber) * tokenizedQuery.tokenCount); + for (size_t tokenIdx = 0; tokenIdx < tokenizedQuery.tokenCount; tokenIdx++) { + queryTokens[tokenIdx].tokenPostingBlock = InvalidBlockNumber; + bucketsLocation[tokenIdx] = InvalidBlockNumber; + } + + /* scan index for queryToken info */ + uint32 bucketFoundCount = 0; + BM25MetaPageData meta; + BM25GetMetaPageInfo(index, &meta); + BlockNumber hashBucketsBlkno = meta.entryPageList.hashBucketsPage; + BlockNumber nextHashBucketsBlkno = hashBucketsBlkno; + Buffer cHashBucketsbuf; + Page cHashBucketspage; + + while (bucketFoundCount < tokenizedQuery.tokenCount && BlockNumberIsValid(nextHashBucketsBlkno)) { + OffsetNumber maxoffno; + cHashBucketsbuf = ReadBuffer(index, nextHashBucketsBlkno); + LockBuffer(cHashBucketsbuf, BUFFER_LOCK_SHARE); + cHashBucketspage = BufferGetPage(cHashBucketsbuf); + FindBucketsLocation(cHashBucketspage, tokenizedQuery, bucketsLocation, bucketFoundCount); + nextHashBucketsBlkno = BM25PageGetOpaque(cHashBucketspage)->nextblkno; + UnlockReleaseBuffer(cHashBucketsbuf); + } + + uint32 tokenFoundCount = 0; + for (size_t tokenIdx = 0; tokenIdx < tokenizedQuery.tokenCount; tokenIdx++) { + if (!BlockNumberIsValid(bucketsLocation[tokenIdx])) { + continue; + } + Buffer cTokenMetasbuf; + Page cTokenMetaspage; + BlockNumber nextTokenMetasBlkno = bucketsLocation[tokenIdx]; + while (tokenFoundCount < tokenizedQuery.tokenCount && BlockNumberIsValid(nextTokenMetasBlkno)) { + OffsetNumber maxoffno; + cTokenMetasbuf = ReadBuffer(index, nextTokenMetasBlkno); + LockBuffer(cTokenMetasbuf, BUFFER_LOCK_SHARE); + cTokenMetaspage = BufferGetPage(cTokenMetasbuf); + FindTokenInfo(meta, cTokenMetaspage, tokenizedQuery, queryTokens, tokenIdx, tokenFoundCount); + nextTokenMetasBlkno = BM25PageGetOpaque(cTokenMetaspage)->nextblkno; + UnlockReleaseBuffer(cTokenMetasbuf); + } + } + BM25QueryToken *resQueryTokens = (BM25QueryToken*)palloc0(sizeof(BM25QueryToken) * tokenFoundCount); + uint32 tokenFillIdx = 0; + for (size_t tokenIdx = 0; tokenIdx < tokenizedQuery.tokenCount; tokenIdx++) { + if (!BlockNumberIsValid(queryTokens[tokenIdx].tokenPostingBlock)) { + continue; + } + resQueryTokens[tokenFillIdx] = queryTokens[tokenIdx]; + tokenFillIdx++; + if (tokenFillIdx >= tokenFoundCount) { + break; + } + } + pfree(queryTokens); + pfree(bucketsLocation); + BM25QueryTokensInfo tokensInfo{0}; + tokensInfo.queryTokens = resQueryTokens; + tokensInfo.size = tokenFoundCount; + return tokensInfo; +} + IndexScanDesc bm25beginscan_internal(Relation index, int nkeys, int norderbys) { - IndexScanDesc scan; + IndexScanDesc scan = RelationGetIndexScan(index, nkeys, norderbys); return scan; } diff --git a/src/gausskernel/storage/access/datavec/bm25utils.cpp b/src/gausskernel/storage/access/datavec/bm25utils.cpp index cd264f4c7..06361f79a 100644 --- a/src/gausskernel/storage/access/datavec/bm25utils.cpp +++ b/src/gausskernel/storage/access/datavec/bm25utils.cpp @@ -31,6 +31,109 @@ #include "access/datavec/bm25.h" #include "access/datavec/utils.h" #include "storage/buf/bufmgr.h" +#include "tokenizer.h" + +slock_t newBufferMutex; + +BM25TokenizedDocData BM25DocumentTokenize(const char* doc) +{ + uint32 docLength = 0; + EmbeddingMap embeddingMap{0}; + ConvertString2Embedding(doc, &embeddingMap, true); + BM25TokenizedDocData tokenizedData = {}; + BM25TokenData* tokenDatas = (BM25TokenData*)palloc0(sizeof(BM25TokenData) * embeddingMap.size); + for (size_t idx = 0; idx < embeddingMap.size; idx++) { + tokenDatas[idx].hashValue = embeddingMap.tokens[idx].key; + tokenDatas[idx].tokenFreq = embeddingMap.tokens[idx].value; + errno_t rc = strncpy_s(tokenDatas[idx].tokenValue, BM25_MAX_TOKEN_LEN, embeddingMap.tokens[idx].token, + BM25_MAX_TOKEN_LEN - 1); + if (rc != EOK) { + pfree(tokenDatas); + tokenDatas = nullptr; + docLength = 0; + embeddingMap.size = 0; + break; + } + tokenDatas[idx].tokenId = 0; + docLength += embeddingMap.tokens[idx].value; + } + tokenizedData.tokenDatas = tokenDatas; + tokenizedData.tokenCount = embeddingMap.size; + tokenizedData.docLength = docLength; + if (embeddingMap.tokens != nullptr) { + free(embeddingMap.tokens); + embeddingMap.tokens = nullptr; + } + return tokenizedData; +} + +/* + * New buffer + */ +Buffer BM25NewBuffer(Relation index, ForkNumber forkNum) +{ + SpinLockAcquire(&newBufferMutex); + Buffer buf = ReadBufferExtended(index, forkNum, P_NEW, RBM_NORMAL, NULL); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + SpinLockRelease(&newBufferMutex); + return buf; +} + +/* + * Init page + */ +void BM25InitPage(Buffer buf, Page page) +{ + PageInit(page, BufferGetPageSize(buf), sizeof(BM25PageOpaqueData)); + BM25PageGetOpaque(page)->nextblkno = InvalidBlockNumber; + BM25PageGetOpaque(page)->page_id = BM25_PAGE_ID; +} + +/* + * Init and register page + */ +void BM25InitRegisterPage(Relation index, Buffer *buf, Page *page, GenericXLogState **state) +{ + *state = GenericXLogStart(index); + *page = GenericXLogRegisterBuffer(*state, *buf, GENERIC_XLOG_FULL_IMAGE); + BM25InitPage(*buf, *page); +} + +/* + * Commit buffer + */ +void BM25CommitBuffer(Buffer buf, GenericXLogState *state) +{ + GenericXLogFinish(state); + UnlockReleaseBuffer(buf); +} + +/* + * Add a new page + * + * The order is very important!! + */ +void BM25AppendPage(Relation index, Buffer *buf, Page *page, ForkNumber forkNum, bool unlockOldBuf) +{ + /* Get new buffer */ + Buffer newbuf = BM25NewBuffer(index, forkNum); + Page newpage = BufferGetPage(newbuf); + + /* Update the previous buffer */ + BM25PageGetOpaque(*page)->nextblkno = BufferGetBlockNumber(newbuf); + + /* Init new page */ + BM25InitPage(newbuf, newpage); + + MarkBufferDirty(*buf); + if (unlockOldBuf) { + /* Unlock */ + UnlockReleaseBuffer(*buf); + } + + *page = BufferGetPage(newbuf); + *buf = newbuf; +} /* * Get the metapage info @@ -58,18 +161,17 @@ uint32 BM25AllocateDocId(Relation index) Page page; BM25MetaPage metapBuf; uint32 docId; - GenericXLogState *state; buf = ReadBuffer(index, BM25_METAPAGE_BLKNO); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - state = GenericXLogStart(index); - page = GenericXLogRegisterBuffer(state, buf, 0); + page = BufferGetPage(buf); metapBuf = BM25PageGetMeta(page); if (unlikely(metapBuf->magicNumber != BM25_MAGIC_NUMBER)) elog(ERROR, "bm25 index is not valid"); docId = metapBuf->nextDocId; metapBuf->nextDocId++; - BM25CommitBuffer(buf, state); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); return docId; } @@ -79,18 +181,17 @@ uint32 BM25AllocateTokenId(Relation index) Page page; BM25MetaPage metapBuf; uint32 tokenId; - GenericXLogState *state; buf = ReadBuffer(index, BM25_METAPAGE_BLKNO); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - state = GenericXLogStart(index); - page = GenericXLogRegisterBuffer(state, buf, 0); + page = BufferGetPage(buf); metapBuf = BM25PageGetMeta(page); if (unlikely(metapBuf->magicNumber != BM25_MAGIC_NUMBER)) elog(ERROR, "bm25 index is not valid"); tokenId = metapBuf->nextTokenId; metapBuf->nextTokenId++; - BM25CommitBuffer(buf, state); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); return tokenId; } @@ -99,19 +200,18 @@ void BM25IncreaseDocAndTokenCount(Relation index, uint32 tokenCount, float &avgd Buffer buf; Page page; BM25MetaPage metapBuf; - GenericXLogState *state; buf = ReadBuffer(index, BM25_METAPAGE_BLKNO); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - state = GenericXLogStart(index); - page = GenericXLogRegisterBuffer(state, buf, 0); + page = BufferGetPage(buf); metapBuf = BM25PageGetMeta(page); if (unlikely(metapBuf->magicNumber != BM25_MAGIC_NUMBER)) elog(ERROR, "bm25 index is not valid"); metapBuf->documentCount++; metapBuf->tokenCount += tokenCount; avgdl = metapBuf->tokenCount / metapBuf->documentCount; - BM25CommitBuffer(buf, state); + MarkBufferDirty(buf); + UnlockReleaseBuffer(buf); } BlockNumber SeekBlocknoForDoc(Relation index, uint32 docId, BlockNumber startBlkno, BlockNumber step) @@ -120,7 +220,7 @@ BlockNumber SeekBlocknoForDoc(Relation index, uint32 docId, BlockNumber startBlk Page page; BlockNumber docBlkno = startBlkno; for (int i = 0; i < step; ++i) { - if (unlikely(BlockNumberIsValid(docBlkno)) { + if (unlikely(BlockNumberIsValid(docBlkno))) { elog(ERROR, "SeekBlocknoForDoc: Invalid Block Number."); } buf = ReadBuffer(index, docBlkno); @@ -130,4 +230,32 @@ BlockNumber SeekBlocknoForDoc(Relation index, uint32 docId, BlockNumber startBlk UnlockReleaseBuffer(buf); } return docBlkno; -} \ No newline at end of file +} + +bool FindHashBucket(uint32 bucketId, BM25PageLocationInfo &bucketLocation, Buffer buf, Page page) +{ + OffsetNumber maxoffno = PageGetMaxOffsetNumber(page); + for (OffsetNumber offno = FirstOffsetNumber; offno <= maxoffno; offno = OffsetNumberNext(offno)) { + BM25HashBucketPage bucket = (BM25HashBucketPage)PageGetItem(page, PageGetItemId(page, offno)); + if (bucket->bucketId == bucketId) { + bucketLocation.blkno = BufferGetBlockNumber(buf); + bucketLocation.offno = offno; + return true; + } + } + return false; +} + +bool FindTokenMeta(BM25TokenData &tokenData, BM25PageLocationInfo &tokenMetaLocation, Buffer buf, Page page) +{ + OffsetNumber maxoffno = PageGetMaxOffsetNumber(page); + for (OffsetNumber offno = FirstOffsetNumber; offno <= maxoffno; offno = OffsetNumberNext(offno)) { + BM25TokenMetaPage tokenMeta = (BM25TokenMetaPage)PageGetItem(page, PageGetItemId(page, offno)); + if (strncmp(tokenMeta->token, tokenData.tokenValue, BM25_MAX_TOKEN_LEN - 1) == 0) { + tokenMetaLocation.blkno = BufferGetBlockNumber(buf); + tokenMetaLocation.offno = offno; + return true; + } + } + return false; +} diff --git a/src/include/access/datavec/bm25.h b/src/include/access/datavec/bm25.h index 0a8a46213..78b039a75 100644 --- a/src/include/access/datavec/bm25.h +++ b/src/include/access/datavec/bm25.h @@ -32,7 +32,6 @@ #include "access/generic_xlog.h" #include "catalog/pg_operator.h" #include "port.h" /* for random() */ -#include "db4ai/bayesnet.h" #include "access/datavec/vecindex.h" #define BM25_VERSION 1 @@ -52,6 +51,8 @@ #define BM25_DOCUMENT_MAX_COUNT_IN_PAGE (BM25_PAGE_DATASIZE / BM25_DOCUMENT_ITEM_SIZE) #define BM25_DOCUMENT_FORWARD_ITEM_SIZE (MAXALIGN(sizeof(BM25DocForwardItem))) #define BM25_DOC_FORWARD_MAX_COUNT_IN_PAGE (BM25_PAGE_DATASIZE / BM25_DOCUMENT_FORWARD_ITEM_SIZE) +#define BM25_MAX_TOKEN_LEN 100 +#define BM25_BUCKET_MAX_NUM 1000 typedef struct BM25ScanData { uint32 docId; @@ -59,6 +60,19 @@ typedef struct BM25ScanData { ItemPointerData heapCtid; } BM25ScanData; +typedef struct BM25TokenData { + uint32 hashValue; + uint32 tokenId; + uint32 tokenFreq; + char tokenValue[BM25_MAX_TOKEN_LEN]; +} BM25TokenData; + +typedef struct BM25TokenizedDocData { + BM25TokenData* tokenDatas; + uint32 tokenCount; + uint32 docLength; +} BM25TokenizedDocData; + typedef struct BM25ScanOpaqueData { uint32 cursor; BM25ScanData* candDocs; @@ -74,6 +88,8 @@ typedef BM25ScanOpaqueData *BM25ScanOpaque; typedef struct BM25EntryPages { BlockNumber documentMetaPage; BlockNumber docForwardPage; + BlockNumber hashBucketsPage; + uint32 hashBucketCount; } BM25EntryPages; typedef struct BM25MetaPageData { @@ -129,11 +145,72 @@ typedef struct BM25DocumentItem { uint64 tokenEndIdx; } BM25DocumentItem; +typedef struct BM25HashBucketItem { + uint32 bucketId; + BlockNumber bucketBlkno; +} BM25HashBucketItem; + +typedef BM25HashBucketItem *BM25HashBucketPage; + +typedef struct BM25TokenMetaItem { + uint32 tokenId; + uint32 docCount; + BlockNumber postingBlkno; + float maxScore; + char token[BM25_MAX_TOKEN_LEN]; +} BM25TokenMetaItem; + +typedef BM25TokenMetaItem *BM25TokenMetaPage; + +typedef struct BM25TokenPostingItem { + uint32 docId; + uint16 docLength; + uint16 freq; +} BM25TokenPostingItem; + +typedef BM25TokenPostingItem *BM25TokenPostingPage; + +typedef struct BM25PageLocationInfo { + BlockNumber blkno; + OffsetNumber offno; +} BM25PageLocationInfo; + +typedef struct BM25Shared { + /* Immutable state */ + Oid heaprelid; + Oid indexrelid; + + /* Mutex for mutable state */ + slock_t mutex; + + /* Mutable state */ + int nparticipantsdone; + double reltuples; + + BM25EntryPages bm25EntryPages; + + ParallelHeapScanDescData heapdesc; +} BM25Shared; + +typedef struct BM25Leader { + int nparticipanttuplesorts; + BM25Shared *bm25shared; +} BM25Leader; + +typedef struct BM25ReorderShared { + BM25PageLocationInfo *startPageLocation; + uint32 batchCount; + uint32 curThreadId; + Oid heaprelid; + Oid indexrelid; +} BM25ReorderShared; + typedef struct BM25BuildState { /* Info */ Relation heap; Relation index; IndexInfo *indexInfo; + ForkNumber forkNum; BM25EntryPages bm25EntryPages; @@ -147,13 +224,40 @@ typedef struct BM25BuildState { /* Memory */ MemoryContext tmpCtx; + + BM25Leader *bm25leader; } BM25BuildState; + +struct BM25Scorer : public BaseObject { +public: + float m_k1; + float m_b; + float m_avgdl; + + float GetDocBM25Score(float tf, float docLen) { + return tf * (m_k1 + 1) / (tf + m_k1 * (1 - m_b + m_b * (docLen / m_avgdl))); + } + + BM25Scorer(float k1, float b, float avgdl) + : m_k1(k1), m_b(b), m_avgdl(avgdl) { + } +}; // struct BM25Scorer + +/* Methods */ +Buffer BM25NewBuffer(Relation index, ForkNumber forkNum); +void BM25InitPage(Buffer buf, Page page); +void BM25InitRegisterPage(Relation index, Buffer *buf, Page *page, GenericXLogState **state); +void BM25CommitBuffer(Buffer buf, GenericXLogState *state); +void BM25AppendPage(Relation index, Buffer *buf, Page *page, ForkNumber forkNum, bool unlockOldBuf = true); void BM25GetMetaPageInfo(Relation index, BM25MetaPage metap); uint32 BM25AllocateDocId(Relation index); uint32 BM25AllocateTokenId(Relation index); void BM25IncreaseDocAndTokenCount(Relation index, uint32 tokenCount, float &avgdl); BlockNumber SeekBlocknoForDoc(Relation index, uint32 docId, BlockNumber startBlkno, BlockNumber step); +bool FindHashBucket(uint32 bucketId, BM25PageLocationInfo &bucketLocation, Buffer buf, Page page); +bool FindTokenMeta(BM25TokenData &tokenData, BM25PageLocationInfo &tokenMetaLocation, Buffer buf, Page page); +BM25TokenizedDocData BM25DocumentTokenize(const char* doc); Datum bm25build(PG_FUNCTION_ARGS); Datum bm25buildempty(PG_FUNCTION_ARGS);