Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Mijamind
2022-09-04 16:28:08 +08:00
2829 changed files with 532516 additions and 366767 deletions

View File

@ -293,6 +293,8 @@ void InvalidateBuffer(BufferDesc *buf);
void LockTwoLWLock(LWLock *new_partition_lock, LWLock *old_partition_lock);
extern void InitBufferPool(void);
extern void pca_buf_init_ctx();
extern void InitBufferPoolAccess(void);
extern void InitBufferPoolBackend(void);
extern void AtEOXact_Buffers(bool isCommit);

View File

@ -180,8 +180,6 @@ typedef HeapPageHeaderData* HeapPageHeader;
#define GetPageHeaderSize(page) (PageIs8BXidHeapVersion(page) ? SizeOfHeapPageHeaderData : SizeOfPageHeaderData)
#define SizeOfHeapPageUpgradeData MAXALIGN(offsetof(HeapPageHeaderData, pd_linp) - offsetof(PageHeaderData, pd_linp))
#define GET_ITEMID_BY_IDX(buf, i) ((ItemIdData *)(buf + GetPageHeaderSize(buf) + (i) * sizeof(ItemIdData)))
#define PageXLogRecPtrGet(val) \
((uint64) (val).xlogid << 32 | (val).xrecoff)

View File

@ -0,0 +1,67 @@
//
// Created by cfs on 2/10/22.
//
#ifndef OPENGAUSS_CFS_H
#define OPENGAUSS_CFS_H
#include <sys/mman.h>
#include "utils/atomic.h"
#include "storage/buf/block.h"
#include "storage/smgr/relfilenode.h"
#include "datatype/timestamp.h"
struct CfsExtentAddress {
uint32 checksum;
volatile uint8 nchunks; /* number of chunks for this block */
volatile uint8 allocated_chunks; /* number of allocated chunks for this block */
/* variable-length fields, 1 based chunk no array for this block, size of the array must be 2, 4 or 8 */
uint16 chunknos[FLEXIBLE_ARRAY_MEMBER]; // valid value from 1
};
struct CfsCompressOption {
uint16 chunk_size; /* size of each chunk, must be 1/2 1/4 or 1/8 of BLCKSZ */
uint8 algorithm; /* compress algorithm, 1=pglz, 2=lz4 */
};
struct CfsExtentHeader {
pg_atomic_uint32 nblocks; /* number of total blocks in this segment */
pg_atomic_uint32 allocated_chunks; /* number of total allocated chunks in data area */
uint16 chunk_size; /* size of each chunk, must be 1/2 1/4 or 1/8 of BLCKSZ */
uint8 algorithm : 7; /* compress algorithm, 1=pglz, 2=lz4 */
uint8 recycleInOrder : 1; /* show if pca is recycled */
uint8 recv; /* for aligin */
CfsExtentAddress cfsExtentAddress[FLEXIBLE_ARRAY_MEMBER];
};
struct CfsExtInfo {
RelFileNode rnode;
ForkNumber forknum;
BlockNumber extentNumber;
uint32 assistFlag;
};
inline size_t SizeOfExtentAddress(uint16 chunkSize) {
if (chunkSize == 0) {
return -1;
}
return offsetof(CfsExtentAddress, chunknos) + sizeof(uint16) * BLCKSZ / chunkSize;
}
inline off_t OffsetOfPageCompressChunk(uint16 chunkSize, int chunkNo) {
return chunkSize * (chunkNo - 1);
}
inline size_t SizeOfExtentAddressByChunks(uint8 nChunks) {
return offsetof(CfsExtentAddress, chunknos) + sizeof(uint16) * nChunks;
}
inline CfsExtentAddress *GetExtentAddress(CfsExtentHeader *header, uint16 blockOffset) {
auto chunkSize = header->chunk_size;
auto headerOffset = offsetof(CfsExtentHeader, cfsExtentAddress);
auto sizeOfExtentAddress = SizeOfExtentAddress(chunkSize);
return (CfsExtentAddress *) (((char *) header) + headerOffset + blockOffset * sizeOfExtentAddress);
}
#endif // OPENGAUSS_CFS_H

View File

@ -0,0 +1,177 @@
#ifndef OPENGAUSS_CFS_BUFFERS_H
#define OPENGAUSS_CFS_BUFFERS_H
#include "storage/cfs/cfs.h"
#include "storage/cfs/cfs_converter.h"
#include "storage/cfs/cfs_repair.h"
struct CfsBufferKey {
RelFileNodeV2 relFileNode;
uint32 ExtentCount;
};
/*====================================*/
/*=========SINGLE CTRL=========*/
/*====================================*/
typedef enum en_ctrl_state {
CTRL_STATE_ISOLATION = 0, // The temporary free state when taken away from the main chain does not belong to any bucket.
CTRL_STATE_MAIN = 1, // On the main chain
CTRL_STATE_FREE = 2, // Idle link
} ctrl_state_e;
typedef enum en_ctrl_load_status {
CTRL_PAGE_IS_NO_LOAD, // not read from disk
CTRL_PAGE_IS_LOADED, // normal read from disk with no error
CTRL_PAGE_LOADED_ERROR // page on the disk is error
} ctrl_load_status_e;
#ifdef WIN32
typedef struct st_pca_page_ctrl
#else
typedef struct __attribute__((aligned(128))) st_pca_page_ctrl
#endif
{
uint32 ctrl_id; // Specifies the CTRL ID, which is also the subscript of the CTRL array. The subscript starts from 1. 0 is an invalid value
ctrl_state_e state; // On that chain or free. This is controlled by the chain lock.
volatile uint32 lru_prev, lru_next; // Chains before and after. This is controlled by the chain lock.
volatile uint32 bck_prev, bck_next, bck_id; // bck_id indicates the bucket to which a user belongs. This is controlled by the lock of the bucket.
/* The popularity increases by a bit each time a normal access is accessed, decreases by a bit each time a recycle traverse is traversed,
until it reaches the freezing point. This value can be fuzzy. Concurrency control is not required.
*/
pg_atomic_uint32 touch_nr;
/* Access count, that's accurate. Add ctrl lock + +, release lock --, this ++ must be preceded by lock, -- must be preceded by lock release. */
pg_atomic_uint32 ref_num;
/* The preceding variables are not controlled by the content_lock lock. The preceding variables are not modified by the CTRL lock holder. */
LWLock *content_lock; // This command is used to control the concurrency of the actual content on the PCA page.
ctrl_load_status_e load_status;
CfsBufferKey pca_key; // for real pageid on disk
CfsExtentHeader *pca_page; // Authentic PCA Page
} pca_page_ctrl_t;
/*====================================*/
/*=========HASH BUCKETS=========*/
/*====================================*/
// hash key is pageid。
typedef struct st_pca_hash_bucket {
// This lock must be added to the add, remove, and find commands of the ctrl
LWLock *lock;
uint32 bck_id; // ID of the bucket, which is also the subscript of the bucket array. The subscript starts from 1. 0 is an invalid value.
uint32 ctrl_count; // total ctrl count in this bucket.
uint32 first_ctrl_id; // first pca_ctrl id in this bucket.
} pca_hash_bucket_t;
typedef struct st_pca_page_hashtbl {
HashValueFunc hash; /* hash function */
HashCompareFunc match; /* key comparison function */
uint32 bucket_num;
pca_hash_bucket_t *buckets; // for many buckets array, max count is [bucket_num * 3][FLEXIBLE_ARRAY_MEMBER]
} pca_page_hashtbl_t;
/*====================================*/
/*=========LRU LIST=========*/
/*====================================*/
typedef struct st_pca_lru_list {
LWLock *lock; // protect this two link, each ctrl_t must is on one of them.
volatile uint32 count;
volatile uint32 first;
volatile uint32 last;
} pca_lru_list_t;
/*====================================*/
/*=========PCA BUFFER CONTEXT=========*/
/*====================================*/
/*
buffer map is:
--------------------------------------------------------------------------------------------------------------
| |ctrl_t * N .... | pca_page_t * N .... | hash_bucket_t * 3N .... |
--------------------------------------------------------------------------------------------------------------
| | | |
ctx ctrl_buf page_buf hashtbl->buckets
each ctrl->pca is pointing a real page in page_buf area.
*/
typedef struct st_pca_stat_info {
uint64 recycle_cnt;
} pca_stat_info_t;
#define PCA_PART_LIST_NUM (8)
#define PCA_LRU_LIST_NUM (PCA_PART_LIST_NUM * 2)
#ifdef WIN32
typedef struct st_pca_page_buff_ctx
#else
typedef struct __attribute__((aligned(128))) st_pca_page_buff_ctx
#endif
{
pca_page_ctrl_t *ctrl_buf; // ctrl start
char *page_buf; // pca page start
pca_page_hashtbl_t hashtbl; // hash table, internal using
uint32 max_count; // Maximum page count
/*
These two chains are mainly maintained.
For lru for used pca page, if the free link is empty, when recycling is enabled, the tail of 1/6 LRUs is added to the free link at a time to forcibly recycle the free link.
Each ctrl on this chain, the main chain, belongs to a bucket
Eliminate from the tail and add from the head.
*/
pca_lru_list_t main_lru[PCA_PART_LIST_NUM];
/*
lru for unused pca page. During initialization, all CTRLs are set and placed on this link. When applying for CTRLs, all CTRLs are allocated from this link.
All the ctrls on this chain, the free chain, do not belong to any bucket.
When initialized, all the ctrls are stringed together, similar to the high water mark.
Take it from the tail and add it from the tail.
*/
pca_lru_list_t free_lru[PCA_PART_LIST_NUM];
// statistic info
pca_stat_info_t stat;
} pca_page_buff_ctx_t;
#define PCA_INVALID_ID (0)
#define PCA_BUF_TCH_AGE (3) // consider buffer is hot if its touch_number >= BUF_TCH_AGE
#define PCA_BUF_MAX_RECYLE_STEPS (1024)
#define PCA_GET_CTRL_BY_ID(ctx, ctrl_id) ((pca_page_ctrl_t *)(&(ctx->ctrl_buf[(ctrl_id) - 1])))
#define PCA_GET_BUCKET_BY_ID(ctx, bck_id) ((pca_hash_bucket_t *)(&(ctx->hashtbl.buckets[(bck_id) - 1])))
#define PCA_GET_BUCKET_BY_HASH(ctx, hashcode) (PCA_GET_BUCKET_BY_ID(ctx, (((hashcode) % ctx->hashtbl.bucket_num) + 1)))
#define PCA_SET_NO_READ(ctrl) (ctrl->load_status = CTRL_PAGE_IS_NO_LOAD)
typedef enum PcaBufferReadMode {
PCA_BUF_NORMAL_READ,
PCA_BUF_NO_READ
} PcaBufferReadMode;
// for another file
extern pca_page_buff_ctx_t *g_pca_buf_ctx;
/* functions */
/* init the pca buffer */
void pca_buf_init(char *buf, uint32 size);
/* read pca buffer, and read data from disk if not exists in buffer */
pca_page_ctrl_t *pca_buf_read_page(const ExtentLocation& location, LWLockMode lockMode, PcaBufferReadMode readMode);
/* release pca buffer and write data into disk */
void pca_buf_free_page(pca_page_ctrl_t *ctrl, const ExtentLocation& location, bool need_write);
Size pca_buffer_size();
uint32 pca_lock_count();
CfsHeaderPagerCheckStatus CheckAndRepairCompressAddress(CfsExtentHeader *pcMap, uint16 chunk_size, uint8 algorithm,
const ExtentLocation& location);
#endif

View File

@ -0,0 +1,50 @@
//
// Created by cfs on 2/11/22.
//
#ifndef OPENGAUSS_CFS_CONVERTER_H
#define OPENGAUSS_CFS_CONVERTER_H
#include "utils/atomic.h"
#include "storage/buf/block.h"
#include "storage/smgr/relfilenode.h"
#include "storage/smgr/smgr.h"
/* 129 blocks per extent */
constexpr int CFS_EXTENT_SIZE = 129;
constexpr int CFS_EXTENT_COUNT_PER_FILE = RELSEG_SIZE / CFS_EXTENT_SIZE;
constexpr int CFS_MAX_BLOCK_PER_FILE = CFS_EXTENT_COUNT_PER_FILE * CFS_EXTENT_SIZE;
constexpr int CFS_LOGIC_BLOCKS_PER_EXTENT = CFS_EXTENT_SIZE - 1;
constexpr int CFS_LOGIC_BLOCKS_PER_FILE = CFS_LOGIC_BLOCKS_PER_EXTENT * CFS_EXTENT_COUNT_PER_FILE;
#define CFS_MAX_LOGIC_CHRUNKS_NUMBER(chrunk_size) (CFS_LOGIC_BLOCKS_PER_EXTENT * (BLCKSZ / chrunk_size))
struct ExtentLocation {
int fd;
RelFileNode relFileNode;
BlockNumber extentNumber;
BlockNumber extentStart;
BlockNumber extentOffset;
BlockNumber headerNum;
uint16 chrunk_size;
uint8 algorithm;
};
typedef size_t CFS_STORAGE_TYPE;
constexpr CFS_STORAGE_TYPE COMMON_STORAGE = 0;
extern ExtentLocation StorageConvert(SMgrRelation sRel, ForkNumber forcknum, BlockNumber logicBlockNumber, bool skipSync, int type);
extern MdfdVec *CfsMdOpenReln(SMgrRelation reln, ForkNumber forknum, ExtensionBehavior behavior);
extern BlockNumber CfsGetBlocks(SMgrRelation reln, ForkNumber forknum, const MdfdVec *seg);
typedef ExtentLocation (*CfsLocationConvert)(SMgrRelation sRel, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, int type);
extern CfsLocationConvert cfsLocationConverts[2];
constexpr int EXTENT_OPEN_FILE = 0;
constexpr int WRITE_BACK_OPEN_FILE = 1;
constexpr int EXTENT_CREATE_FILE = 2;
#endif //OPENGAUSS_CFS_CONVERTER_H

View File

@ -0,0 +1,17 @@
#ifndef OPENGAUSS_CFS_MD_H
#define OPENGAUSS_CFS_MD_H
#include "storage/smgr/smgr.h"
#define MIN_FALLOCATE_SIZE (4096)
/* cfs interface */
extern void CfsWriteBack(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks, CFS_STORAGE_TYPE type);
extern size_t CfsWritePage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, const char *buffer, bool skipSync, CFS_STORAGE_TYPE type);
void CfsExtendExtent(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, const char *buffer, CFS_STORAGE_TYPE type);
extern BlockNumber CfsNBlock(const RelFileNode &relFileNode, int fd, BlockNumber segNo, off_t len);
extern int CfsReadPage(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, char *buffer, CFS_STORAGE_TYPE type);
void CfsMdPrefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, CFS_STORAGE_TYPE type);
off_t CfsMdTruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber logicBlockNumber, bool skipSync, CFS_STORAGE_TYPE type);
#endif

View File

@ -0,0 +1,19 @@
#ifndef OPENGAUSS_CFS_REPAIR_H
#define OPENGAUSS_CFS_REPAIR_H
#include "storage/smgr/smgr.h"
extern int WriteRepairFile_Compress_extent(SMgrRelation reln, BlockNumber logicBlockNumber,
char* reapirpath, char *buf, BlockNumber offset, uint32 blocks);
extern int WriteRepairFile_Compress(const RelFileNode &rd_node, int fd, char* path, char *buf, BlockNumber blkno_segno_offset, uint32 blk_cnt);
extern void CfsHeaderPageCheckAndRepair(SMgrRelation reln, BlockNumber logicBlockNumber,
char *pca_page_res, uint32 strLen, bool *need_repair_pca);
extern int CfsGetPhysicsFD(const RelFileNode &relnode, BlockNumber logicBlockNumber);
typedef enum CfsHeaderPagerCheckStatus {
CFS_HEADER_CHECK_STATUS_OK, // cfs header page checked ok without any error
CFS_HEADER_CHECK_STATUS_REPAIRED, // cfs header page checked error but has been repaired
CFS_HEADER_CHECK_STATUS_ERROR // can not be repaired.
} CfsHeaderPagerCheckStatus;
#endif

View File

@ -0,0 +1,52 @@
//
// Created by cfs on 4/8/22.
//
#ifndef OPENGAUSS_CFS_TOOLS_H
#define OPENGAUSS_CFS_TOOLS_H
#include "c.h"
#include "storage/cfs/cfs.h"
#include "storage/buf/block.h"
#define COMPRESS_FSEEK_ERROR SIZE_MAX
#define COMPRESS_FREAD_ERROR SIZE_MAX - 1
#define COMPRESS_CHECKSUM_ERROR SIZE_MAX - 2
#define COMPRESS_BLOCK_ERROR SIZE_MAX - 3
#define MIN_COMPRESS_ERROR_RT SIZE_MAX / 2
struct CfsHeaderMap {
CfsExtentHeader *header;
BlockNumber extentCount;
void *pointer;
size_t mmapLen;
};
struct CfsReadStruct {
FILE *fd;
CfsExtentHeader *header;
BlockNumber extentCount;
};
constexpr int MAX_RETRY_LIMIT = 60;
constexpr long RETRY_SLEEP_TIME = 1000000L;
bool CompressedChecksum(const char *compressedData);
size_t CfsReadCompressedPage(char *dst, size_t destLen, BlockNumber extent_offset_blkno, CfsReadStruct *cfsReadStruct,
BlockNumber blockNum);
/**
*
* @param blockNumber block number
* @param pageCompressAddr addr of block
* @return checksum uint32
*/
uint32 AddrChecksum32(const CfsExtentAddress *cfsExtentAddress, const int needChunks);
CfsHeaderMap MMapHeader(FILE* fd, BlockNumber extentIndex, bool readOnly = false);
void MmapFree(CfsHeaderMap *cfsHeaderMap);
size_t ReadBlockNumberOfCFile(FILE* compressFd);
#endif //OPENGAUSS_CFS_TOOLS_H

View File

@ -1,287 +0,0 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* dfs_connector.h
*
*
*
* IDENTIFICATION
* src/include/storage/dfs/dfs_connector.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef DFS_CONNECTOR_H
#define DFS_CONNECTOR_H
#include <memory>
#include "dfs_config.h"
#include "postgres.h"
#include "knl/knl_variable.h"
#include "foreign/foreign.h"
#include "miscadmin.h"
#include "nodes/pg_list.h"
#define DEFAULT_PERM_MOD (384)
/*
* DfsSrvOptions holds the option values to be used
* when connecting external server.
*/
typedef struct DfsSrvOptions {
char* filesystem;
char* address;
char* cfgPath;
char* storePath;
} DfsSrvOptions;
/*
* conn
*/
enum ConnectorType { HDFS_CONNECTOR = 0, OBS_CONNECTOR = 1, UNKNOWN_CONNECTOR };
namespace dfs {
/* Store the block information of the file of dfs system. */
class DFSBlockInfo : public BaseObject {
public:
virtual ~DFSBlockInfo()
{}
/* Get the number of replications of the current file. */
virtual int getNumOfReplica() const = 0;
/*
* Get the string including IP:xferPort for accessing the block in the node.
* @_in_param blockIdx: The index of the block to search of the file.
* @_in_param nodeIdx: The index of the node on which to seatch the block.
* @return the string including IP:xferPort.
*/
virtual const char* getNames(int blockIdx, int nodeIdx) const = 0;
/*
* get whether the location is cached
* @_in_param blockIdx: The index of the block to search of the file.
* @_in_param nodeIdx: The index of the node on which to search the block.
* @return cached-true uncached - false.
*/
virtual bool isCached(int blockIdx, int nodeIdx) const = 0;
};
class DFSConnector : public BaseObject {
public:
virtual ~DFSConnector()
{}
/*
* Check if the path in hdfs is a file not directory, log error if the path does not
* exist.
* @_in param filePath: the path of the hdfs file/directory.
* @return Return true: the path is a file; false: the path is not a file but a directory.
*/
virtual bool isDfsFile(const char* filePath) = 0;
virtual bool isDfsFile(const char* filePath, bool throw_error) = 0;
/* Check if the path is a empty file, log error if the path does not exist. */
virtual bool isDfsEmptyFile(const char* filePath) = 0;
/*
* Get the file size of the path. Return -1 if the path does not exist.
* @_in_param filePath: the path of the hdfs file/directory
* @return Return the size.
*/
virtual int64_t getFileSize(const char* filePath) = 0;
/* Get the handler to connect the DFS system. */
virtual void* getHandler() const = 0;
/*
* Get list of files/directories for a given directory-path.
* hdfsFreeFileInfo is called internally to deallocate memory.
* Log error if the path does not exist.
* @_in_param folderPath: The path of the directory.
* @return Return a list of filepath. Return NULL on error.
*/
virtual List* listDirectory(char* folderPath) = 0;
virtual List* listDirectory(char* folderPath, bool throw_error) = 0;
virtual List* listObjectsStat(char* searchPath, const char* prefix = NULL) = 0;
/*
* Get the block information of the file path.
* Log error if the path does not exist.
* @_in_param filePath: The path of the file.
* @return Return a pointer to DFSBlockInfo.
*/
virtual DFSBlockInfo* getBlockLocations(char* filePath) = 0;
/*
* Drop directory. Return 0 if the path does not exist.
* @_in_param path The path of the directory.
* @_in_param recursive if path is a directory and set to
* non-zero, the directory is deleted else throws an exception. In
* case of a file the recursive argument is irrelevant.
* @return Returns 0 on success, -1 on error.
*/
virtual int dropDirectory(const char* path, int recursive) = 0;
/*
* Make a directory using the given path.
* @_in_param path The path of the directory.
* @return Returns 0 on success, -1 on error.
*/
virtual int createDirectory(const char* path) = 0;
/*
* Make a file using the given path.
* @_in_param path The path of the file to make.
* @_in_param flags - an | of bits/fcntl.h file flags - supported flags
* are O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT),
* O_WRONLY|O_APPEND and O_SYNC. Other flags are generally ignored other than
* (O_RDWR || (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP.
* return Returns 0 on success, -1 on error.
*/
virtual int openFile(const char* path, int flag) = 0;
/* Delete the file using the given file path. Before calling this
* function, pathExists must be checked.
* @_in_param path The path of the file.
* @_in_param recursive if path is a directory and set to
* non-zero, the directory is deleted else throws an exception. In
* case of a file the recursive argument is irrelevant.
* @return Returns 0 on success, -1 on error.
*/
virtual int deleteFile(const char* path, int recursive) = 0;
/*
* pathExists - Checks if a given path exsits on the filesystem
* @param path The path to look for
* @return Returns 0 on success, -1 on error.
*/
virtual bool pathExists(const char* filePath) = 0;
virtual bool existsFile(const char* path) = 0;
/*
* check if the current connector has a valid hdfs file handler.
* @return true if the file handler is valid, false on invalid.
*/
virtual bool hasValidFile() const = 0;
/*
* Write the buffer into the current file according to the length.
* @_in_param buffer: The content to be write to file.
* @_in_param length: The length of the byte to write.
* @return Return the bytes actually write, -1 on error.
*/
virtual int writeCurrentFile(const char* buffer, int length) = 0;
/*
* Read fixed size from the offset of the file into the buffer.
* @_out_param buffer: The buffer to be filled.
* @_in_param length: The size of bytes expected.
* @_in_param offset: The offset at which the reading starts.
* @return the bytes actually read, -1 on error.
*/
virtual int readCurrentFileFully(char* buffer, int length, int64 offset) = 0;
/*
* Flush out the data in client's user buffer. After the return of this call,
* new readers will see the data.
* @return 0 on success, -1 on error and sets errno
*/
virtual int flushCurrentFile() = 0;
/*
* Close the current file.
*/
virtual void closeCurrentFile() = 0;
/*
* change the file's authority.
* @_in_param filePath: The absolute path of the file to be set.
* @_in_param mode: The mode of the authority like 600 or 755.
* Return 0 if succeed. Return 1 if fail.
*/
virtual int chmod(const char* filePath, short mode) = 0;
/*
* Set the label expression on dfs file.
* @_in_param filePath: The absolute path of the file to be set.
* @_in_param expression: The label string like "labelA,labelB".
* Return 0 if succeed. Return 1 if fail.
*/
virtual int setLabelExpression(const char* filePath, const char* expression) = 0;
/*
* Get the timestamp of the last modification.
*/
virtual int64 getLastModifyTime(const char* filePath) = 0;
/*
* Fetch the configure value from the config file.
*/
virtual const char* getValue(const char* key, const char* defValue) const = 0;
/*
* Get connection type
*/
virtual int getType() = 0;
};
/*
* Construct a connector of DFS which wrappers all the approaches to the DFS.
* @_in_param ctx: The memory context on which to create the connector(not used for now).
* @_in_param foreignTableId: The oid of the relation for which we create the connector.
* @return the constructed connector.
*/
DFSConnector* createConnector(MemoryContext ctx, Oid foreignTableId);
DFSConnector* createTempConnector(MemoryContext ctx, Oid foreignTableId);
DFSConnector* createConnector(MemoryContext ctx, ServerTypeOption srvType, void* options);
/*
* Construct a connector of DFS which wrappers all the approaches to the DFS.
* @_in_param ctx: The memory context on which to create the connector(not used for now).
* @_in_param srvOptions: information of the server option.
* @_in_param tablespaceOid: tablespace oid.
* @return the constructed connector.
*/
DFSConnector* createConnector(MemoryContext ctx, DfsSrvOptions* srvOptions, Oid tablespaceOid);
DFSConnector* createTempConnector(MemoryContext ctx, DfsSrvOptions* srvOptions, Oid tablespaceOid);
/* Initialize the global hdfs connector cache hash table. */
void InitHDFSConnectorCacheLock();
/* Remove the connector entry of the hdfs cache according to the server oid. */
void InvalidHDFSConnectorCache(Oid serverOid);
/* Initialize the global obs connector cache hash table. */
void InitOBSConnectorCacheLock();
/*
* Remove the connector entry of the obs cache according to the server oid.
* return true if clean it successfully, otherwise return false.
*/
bool InvalidOBSConnectorCache(Oid serverOid);
/* Clean the thread local variables in kerberos. */
void clearKerberosObjs();
} // namespace dfs
/* check file path should skip */
bool checkFileShouldSkip(char* fileName);
/* check file path should skip */
bool checkPathShouldSkip(char* pathName);
#endif

View File

@ -1,178 +0,0 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* dfscache_mgr.h
* routines to support dfs
*
*
* IDENTIFICATION
* src/include/storage/dfs/dfscache_mgr.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef METACACHEMGR_H
#define METACACHEMGR_H
#include "pg_config.h"
#include <string>
#ifndef ENABLE_LITE_MODE
#include "orc_proto.pb.h"
#endif
#include "storage/cache_mgr.h"
#include "storage/smgr/relfilenode.h"
#define MetaCache (MetaCacheMgr::GetInstance())
typedef struct OrcMetadataTag {
RelFileNodeOld fileNode;
int32 fileID;
uint32 stripeID;
uint32 columnID;
uint64 padding;
} OrcMetadataTag;
typedef struct OrcMetadataValue {
/*
* The value of element pointer could be de-referenced into 'DfsInsert *' or
* 'PartitionStagingFile *'
*/
/* type 1 metadeta, file level, only need relID,fileID */
uint64 footerStart;
std::string* postScript;
std::string* fileFooter;
/* type 2 metadata, column and stripe level */
std::string* stripeFooter;
std::string* rowIndex;
int size;
/* used in OBS file */
char* fileName;
/* data DNA, used in OBS foreign table file */
char* dataDNA;
} OrcMetadataValue;
typedef struct CarbonMetadataTag {
RelFileNodeOld fileNode;
int32 fileID;
uint32 BlockletID;
uint32 columnID;
uint64 padding;
} CarbonMetadataTag;
typedef struct CarbonMetadataValue {
/* type 1 metadeta, file level, only need relID,fileID */
uint64 headerSize;
uint64 footerSize;
unsigned char* fileHeader;
unsigned char* fileFooter;
uint64 size;
/* used in OBS file */
char* fileName;
/* data DNA, used in OBS foreign table file */
char* dataDNA;
} CarbonMetadataValue;
typedef union MetadataTag {
OrcMetadataTag OrcMetaTag;
CarbonMetadataTag CarbonMetaTag;
} MetadataTag;
typedef struct MetadataTagKey {
CacheType type;
MetadataTag key;
} MetadataTagKey;
/* Partition search cache support */
typedef struct OrcMetadataEntry {
/* key of cache entry */
OrcMetadataTag key;
int32 slot_id;
} OrcMetadataEntry;
int MetaCacheMgrNumLocks();
void ReleaseMetaBlock(CacheSlotId_t slotId);
bool MetaCacheRenewBlock(CacheSlotId_t slotId);
OrcMetadataValue* OrcMetaCacheGetBlock(CacheSlotId_t slotId);
CarbonMetadataValue* CarbonMetaCacheGetBlock(CacheSlotId_t slotId);
int OrcMetaCacheGetBlockSize(CacheSlotId_t slotId);
int CarbonMetaCacheGetBlockSize(CacheSlotId_t slotId);
void MetaCacheSetBlockWithFileName(CacheSlotId_t slotId, const char* fileName);
CacheSlotId_t MetaCacheAllocBlock(
RelFileNodeOld* fileNode, int32 fileID, uint32 stripeOrBlocketID, uint32 columnID, bool& found, int type);
#ifndef ENABLE_LITE_MODE
void OrcMetaCacheSetBlock(CacheSlotId_t slotId, uint64 footerStart, const orc::proto::PostScript* postScript,
const orc::proto::Footer* fileFooter, const orc::proto::StripeFooter* stripeFooter,
const orc::proto::RowIndex* rowIndex, const char* fileName, const char* dataDNA);
#endif
void CarbonMetaCacheSetBlock(CacheSlotId_t slotId, uint64 headerSize, uint64 footerSize, unsigned char* fileHeader,
unsigned char* fileFooter, const char* fileName, const char* dataDNA);
class MetaCacheMgr : public BaseObject {
public:
virtual ~MetaCacheMgr()
{}
static MetaCacheMgr* GetInstance(void);
static void NewSingletonInstance(void);
bool ReserveMetaBlockWithSlotId(CacheSlotId_t slotId);
void UnPinMetaBlock(CacheSlotId_t slotId);
int64 GetCurrentMemSize();
void AbortMetaBlock(CacheSlotId_t slotId);
/* Manage I/O busy CUs */
bool MetaBlockWaitIO(int slotId);
void MetaBlockCompleteIO(int slotId);
int GetOrcMetaBlockSize(CacheSlotId_t slotId);
int GetCarbonMetaBlockSize(CacheSlotId_t slotId);
#ifndef ENABLE_LITE_MODE
void SetOrcMetaBlockValue(CacheSlotId_t slotId, uint64 footerStart, const orc::proto::PostScript* postScript,
const orc::proto::Footer* fileFooter, const orc::proto::StripeFooter* stripeFooter,
const orc::proto::RowIndex* rowIndex, const char* fileName, const char* dataDNA);
#endif
void SetCarbonMetaBlockValue(CacheSlotId_t slotId, uint64 headerSize, uint64 footerSize, unsigned char* fileHeader,
unsigned char* fileFooter, const char* fileName, const char* dataDNA);
CacheSlotId_t FindMetaBlock(CacheTag cacheTag);
OrcMetadataValue* GetOrcMetaBlock(CacheSlotId_t slotId);
CarbonMetadataValue* GetCarbonMetaBlock(CacheSlotId_t slotId);
CacheSlotId_t ReserveMetaBlock(CacheTag cacheTag, bool& hasFound);
void PrintMetaCacheSlotLeakWarning(CacheSlotId_t slotId);
void ReleaseMetadataValue(CarbonMetadataValue* nvalue);
OrcMetadataTag InitOrcMetadataTag(RelFileNodeOld* fileNode, int32 fileID, uint32 stripeID, uint32 columnID);
CarbonMetadataTag InitCarbonMetadataTag(RelFileNodeOld* fileNode, int32 fileID, uint32 stripeID, uint32 columnID);
int64 m_cstoreMaxSize;
private:
MetaCacheMgr()
{}
int CalcOrcMetaBlockSize(OrcMetadataValue* nvalue) const;
int CalcCarbonMetaBlockSize(CarbonMetadataValue* nvalue) const;
void SetOrcMetaBlockSize(CacheSlotId_t slotId, int size);
void SetCarbonMetaBlockSize(CacheSlotId_t slotId, int size);
void ReleaseOrcMetadataValue(OrcMetadataValue* nvalue) const;
void ReleaseCarbonMetadataValue(CarbonMetadataValue* nvalue) const;
static MetaCacheMgr* m_meta_cache;
CacheMgr* m_cache_mgr;
};
#endif /* define */

View File

@ -142,8 +142,8 @@ const struct LWLOCK_PARTITION_DESC LWLockPartInfo[] = {
#define NUM_GPRC_PARTITIONS 2
#endif
/*
* WARNING---Please keep the order of LWLockTrunkOffset and BuiltinTrancheIds consistent!!!
/*
* WARNING---Please keep the order of LWLockTrunkOffset and BuiltinTrancheIds consistent!!!
*/
/* Offsets for various chunks of preallocated lwlocks in main array. */
@ -214,7 +214,7 @@ enum BuiltinTrancheIds
LWTRANCHE_USPACE_TRANSGRP_MAPPING,
LWTRANCHE_PROC_XACT_MAPPING,
LWTRANCHE_ASP_MAPPING,
LWTRANCHE_GlobalSeq,
LWTRANCHE_GlobalSeq,
LWTRANCHE_GWC_MAPPING,
LWTRANCHE_NORMALIZED_SQL,
LWTRANCHE_START_BLOCK_MAPPING,
@ -233,6 +233,7 @@ enum BuiltinTrancheIds
LWTRANCHE_MULTIXACTMEMBER_CTL,
LWTRANCHE_OLDSERXID_SLRU_CTL,
LWTRANCHE_WAL_INSERT,
LWTRANCHE_IO_BLOCKED,
LWTRANCHE_DOUBLE_WRITE,
LWTRANCHE_DW_SINGLE_FIRST, /* single flush dw file, first version pos lock */
LWTRANCHE_DW_SINGLE_SECOND, /* single flush dw file, second version pos lock */
@ -246,7 +247,7 @@ enum BuiltinTrancheIds
LWTRANCHE_MPFL,
LWTRANCHE_GTT_CTL, // For GTT
LWTRANCHE_PLDEBUG, // For Pldebugger
LWTRANCHE_NGROUP_MAPPING,
LWTRANCHE_NGROUP_MAPPING,
LWTRANCHE_MATVIEW_SEQNO,
LWTRANCHE_IO_STAT,
LWTRANCHE_WAL_FLUSH_WAIT,
@ -262,6 +263,7 @@ enum BuiltinTrancheIds
LWTRANCHE_FILE_REPAIR,
LWTRANCHE_REPLICATION_ORIGIN,
LWTRANCHE_AUDIT_INDEX_WAIT,
LWTRANCHE_PCA_BUFFER_CONTENT,
/*
* Each trancheId above should have a corresponding item in BuiltinTrancheNames;
*/

View File

@ -13,10 +13,10 @@
#include <sys/mman.h>
#include "c.h"
#include "storage/buf/bufpage.h"
#include "datatype/timestamp.h"
#include "catalog/pg_class.h"
#include "catalog/pg_am.h"
#include "utils/atomic.h"
/* The page compression feature relies on native atomic operation support.
@ -37,25 +37,19 @@
/* COMPRESS_ALGORITHM_XXX must be the same as COMPRESS_TYPE_XXX */
#define COMPRESS_ALGORITHM_PGLZ 1
#define COMPRESS_ALGORITHM_ZSTD 2
#define COMPRESS_ALGORITHM_PGZSTD 3
constexpr uint32 COMPRESS_ADDRESS_FLUSH_CHUNKS = 5000;
#define SUPPORT_COMPRESSED(relKind, relam) \
((relKind) == RELKIND_RELATION || \
(((relKind) == RELKIND_INDEX || (relKind == RELKIND_GLOBAL_INDEX)) && (relam) == BTREE_AM_OID))
(((relKind) == RELKIND_INDEX || (relKind == RELKIND_GLOBAL_INDEX)) && \
((relam) == BTREE_AM_OID || (relam) == UBTREE_AM_OID)))
#define REL_SUPPORT_COMPRESSED(relation) SUPPORT_COMPRESSED((relation)->rd_rel->relkind, (relation)->rd_rel->relam)
typedef uint32 pc_chunk_number_t;
const uint32 PAGE_COMPRESSION_VERSION = 92603;
enum CompressedFileType {
COMPRESSED_TYPE_UNKNOWN,
COMPRESSED_TABLE_FILE,
COMPRESSED_TABLE_PCA_FILE,
COMPRESSED_TABLE_PCD_FILE
};
const uint32 PAGE_COMPRESSION_VERSION = 92765;
/*
* layout of files for Page Compress:
*
@ -99,18 +93,19 @@ typedef struct PageCompressData {
uint32 size : 16; /* size of compressed data */
uint32 byte_convert : 1;
uint32 diff_convert : 1;
uint32 unused : 14;
uint32 algorithm : 4;
uint32 unused : 10;
char data[FLEXIBLE_ARRAY_MEMBER]; /* compressed page, except for the page header */
} PageCompressData;
typedef struct HeapPageCompressData {
char page_header[SizeOfHeapPageHeaderData]; /* page header */
uint32 crc32;
uint32 size : 16; /* size of compressed data */
uint32 byte_convert : 1;
uint32 diff_convert : 1;
uint32 unused : 14;
uint32 algorithm : 4;
uint32 unused : 10;
char data[FLEXIBLE_ARRAY_MEMBER]; /* compressed page, except for the page header */
} HeapPageCompressData;
@ -120,13 +115,12 @@ constexpr uint4 INDEX_OF_QUARTER_BLCKSZ = 1;
constexpr uint4 INDEX_OF_EIGHTH_BRICK_BLCKSZ = 2;
constexpr uint4 INDEX_OF_SIXTEENTHS_BLCKSZ = 3;
#define MAX_PREALLOC_CHUNKS 7
#define PCA_SUFFIX "%s_pca"
#define PCD_SUFFIX "%s_pcd"
#define COMPRESS_STR "_compress"
#define COMPRESS_SUFFIX "%s" COMPRESS_STR
#define SIZE_OF_PAGE_COMPRESS_HEADER_DATA sizeof(PageCompressHeader)
#define SIZE_OF_PAGE_COMPRESS_ADDR_HEADER_DATA offsetof(PageCompressAddr, chunknos)
#define SIZE_OF_PAGE_COMPRESS_DATA_HEADER_DATA(heapData) \
((heapData) ? offsetof(HeapPageCompressData, data) : offsetof(PageCompressData, data))
#define SIZE_OF_PAGE_COMPRESS_ADDR(chunk_size) \
(SIZE_OF_PAGE_COMPRESS_ADDR_HEADER_DATA + sizeof(pc_chunk_number_t) * (BLCKSZ / (chunk_size)))
@ -167,13 +161,13 @@ constexpr unsigned CMP_LEVEL_INDEX = 4;
constexpr unsigned CMP_ALGORITHM_INDEX = 5;
constexpr unsigned CMP_CHUNK_SIZE_INDEX = 6;
struct CmpBitStruct {
struct CmpBitStuct {
unsigned int bitLen;
unsigned int mask;
unsigned int moveBit;
};
constexpr CmpBitStruct g_cmpBitStruct[] = {{CMP_BYTE_CONVERT_LEN, 0x01, 15},
constexpr CmpBitStuct g_cmpBitStruct[] = {{CMP_BYTE_CONVERT_LEN, 0x01, 15},
{CMP_DIFF_CONVERT_LEN, 0x01, 14},
{CMP_PRE_CHUNK_LEN, 0x07, 11},
{CMP_LEVEL_SYMBOL_LEN, 0x01, 10},
@ -237,49 +231,30 @@ inline void TransCompressOptions(const RelFileNode& node, RelFileCompressOption*
(((opt) >> g_cmpBitStruct[CMP_DIFF_CONVERT_INDEX].moveBit) & g_cmpBitStruct[CMP_DIFF_CONVERT_INDEX].mask)
#define GET_COMPRESS_PRE_CHUNKS(opt) \
(((opt) >> g_cmpBitStruct[CMP_PRE_CHUNK_INDEX].moveBit) & g_cmpBitStruct[CMP_PRE_CHUNK_INDEX].mask)
#define GET_COMPRESS_LEVEL_SYMBOL(opt) \
(((opt) >> g_cmpBitStruct[CMP_COMPRESS_LEVEL_SYMBOL].moveBit) & g_cmpBitStruct[CMP_COMPRESS_LEVEL_SYMBOL].mask)
#define GET_COMPRESS_LEVEL(opt) \
(((opt) >> g_cmpBitStruct[CMP_LEVEL_INDEX].moveBit) & g_cmpBitStruct[CMP_LEVEL_INDEX].mask)
#define GET_COMPRESS_ALGORITHM(opt) \
(((opt) >> g_cmpBitStruct[CMP_ALGORITHM_INDEX].moveBit) & g_cmpBitStruct[CMP_ALGORITHM_INDEX].mask)
#define GET_COMPRESS_CHUNK_SIZE(opt) \
(((opt) >> g_cmpBitStruct[CMP_CHUNK_SIZE_INDEX].moveBit) & g_cmpBitStruct[CMP_CHUNK_SIZE_INDEX].mask)
#define IS_COMPRESSED_MAINFORK(reln, forkNum) ((reln)->smgr_rnode.node.opt != 0 && (forkNum) == MAIN_FORKNUM)
#define IS_COMPRESS_DELETE_FORK(forkNum) ((forkNum) == COMPRESS_FORKNUM)
#define IS_COMPRESSED_RNODE(rnode, forkNum) ((rnode).opt != 0 && (forkNum) == MAIN_FORKNUM)
/* Compress function */
template <bool heapPageData>
template <uint8 pagetype>
extern int TemplateCompressPage(const char* src, char* dst, int dst_size, RelFileCompressOption option);
template <bool heapPageData>
extern int TemplateDecompressPage(const char* src, char* dst, uint8 algorithm);
template <uint8 pagetype>
extern int TemplateDecompressPage(const char* src, char* dst);
int CompressPageBufferBound(const char* page, uint8 algorithm);
int CompressPage(const char* src, char* dst, int dst_size, RelFileCompressOption option);
int DecompressPage(const char* src, char* dst, uint8 algorithm);
#define SET_OPT_BY_NEGATIVE_FORK(rnode, forkNumber) \
do { \
SET_COMPRESS_OPTION((rnode), 0, 0, 0, 0, 0, COMPRESS_ALGORITHM_ZSTD, 0); \
} while (0)
/* Memory mapping function */
extern PageCompressHeader* pc_mmap(int fd, int chunk_size, bool readonly);
extern PageCompressHeader* pc_mmap_real_size(int fd, int size, bool readonly);
extern int pc_munmap(PageCompressHeader * map);
extern int pc_msync(PageCompressHeader * map);
int DecompressPage(const char* src, char* dst);
/**
* format mainfork path name to compressed path
* @param dst destination buffer
* @param pathName uncompressed table name
* @param compressFileType pca or pcd
*/
extern void CopyCompressedPath(char dst[MAXPGPATH], const char* pathName, CompressedFileType compressFileType);
extern void CopyCompressedPath(char *dst, const char* pathName);
/**
* @param pathName mainFork File path name
@ -287,33 +262,20 @@ extern void CopyCompressedPath(char dst[MAXPGPATH], const char* pathName, Compre
* @param forkNumber for validation
* @return size of mainFork
*/
#define FILE_BLOCK_SIZE_512 (512)
extern int64 CalculateMainForkSize(char* pathName, RelFileNode* relFileNode, ForkNumber forkNumber);
extern int64 CalculateCompressMainForkSize(char* pathName, bool suppressedENOENT = false);
extern uint16 ReadChunkSize(FILE *pcaFile, char* pcaFilePath, size_t len);
/**
* read compressed chunks into dst, and decompressed page into pageBuffer
* @param dst destination
* @param destLen destination length
* @param blockNumber blockNumber
* @param ReadBlockChunksStruct other data needed
*/
size_t ReadAllChunkOfBlock(char *dst, size_t destLen, BlockNumber blockNumber, ReadBlockChunksStruct& rbStruct);
/**
* check if fileName is end with pca or pcd
* @param fileName fileName
* @return filetype
*/
CompressedFileType IsCompressedFile(char *fileName, size_t fileNameLen);
bool IsCompressedFile(const char *fileName, size_t fileNameLen);
int64 CalculateFileSize(char* pathName, size_t size, bool suppressedENOENT = false);
/**
* release mmap. print warning log if failed
* @param map mmap pointer
* @param fileName mmap filename, for loggging
*/
void ReleaseMap(PageCompressHeader* map, const char* fileName);
int64 CalculateFileSize(char* pathName, bool suppressedENOENT = false);
int64 CalculateFilePhyRealSize(char* pathName, bool suppressedENOENT = false);
/**
* convert chunk size to the index of CHUNK_SIZE_LIST
@ -323,19 +285,6 @@ void ReleaseMap(PageCompressHeader* map, const char* fileName);
*/
extern uint1 ConvertChunkSize(uint32 compressedChunkSize, bool* success);
/**
*
* @param blockNumber block number
* @param pageCompressAddr addr of block
* @return checksum uint32
*/
extern uint32 AddrChecksum32(BlockNumber blockNumber, const PageCompressAddr* pageCompressAddr, uint16 chunkSize);
#ifndef FRONTEND
extern void CheckAndRepairCompressAddress(PageCompressHeader *pcMap, uint16 chunk_size, uint8 algorithm, const char *path);
PageCompressHeader* GetPageCompressHeader(void* vfd, uint16 chunkSize, const RelFileNodeForkNum &relFileNodeForkNum);
void UnReferenceAddrFile(void* vfd);
void RealInitialMMapLockArray();
#endif
#endif /* PAGE_COMPRESSION_H */

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,7 @@
#include "gtm/gtm_c.h"
#include "alarm/alarm.h"
#include "utils/atomic.h"
#include "utils/snapshot.h"
#include "access/multi_redo_settings.h"
@ -197,18 +198,31 @@ struct PGPROC {
char myProgName[64];
pg_time_t myStartTime;
syscalllock deleMemContextMutex;
int64* usedMemory;
/* Support for group XID clearing. */
/* true, if member of ProcArray group waiting for XID clear */
bool procArrayGroupMember;
/* next ProcArray group member waiting for XID clear */
pg_atomic_uint32 procArrayGroupNext;
/*
* latest transaction id among the transaction's main XID and
* subtransactions
*/
TransactionId procArrayGroupMemberXid;
/* Support for group snapshot getting. */
bool snapshotGroupMember;
/* next ProcArray group member waiting for snapshot getting */
pg_atomic_uint32 snapshotGroupNext;
Snapshot snapshotGroup;
TransactionId xminGroup;
TransactionId xmaxGroup;
TransactionId globalxminGroup;
volatile TransactionId replicationSlotXminGroup;
volatile TransactionId replicationSlotCatalogXminGroup;
/* commit sequence number send down */
CommitSeqNo commitCSN;
@ -360,6 +374,8 @@ typedef struct PROC_HDR {
PGPROC* bgworkerFreeProcs;
/* First pgproc waiting for group XID clear */
pg_atomic_uint32 procArrayGroupFirst;
/* First pgproc waiting for group snapshot getting */
pg_atomic_uint32 snapshotGroupFirst;
/* First pgproc waiting for group transaction status update */
pg_atomic_uint32 clogGroupFirst;
/* WALWriter process's latch */
@ -396,7 +412,7 @@ typedef struct PROC_HDR {
*
* PGXC needs another slot for the pool manager process
*/
const int MAX_PAGE_WRITER_THREAD_NUM = 16;
const int MAX_PAGE_WRITER_THREAD_NUM = 17;
#ifndef ENABLE_LITE_MODE
const int MAX_COMPACTION_THREAD_NUM = 100;
@ -453,6 +469,9 @@ extern void InitProcess(void);
extern void InitProcessPhase2(void);
extern void InitAuxiliaryProcess(void);
extern void ProcBaseLockAcquire(pthread_mutex_t *procBaseLock);
extern void ProcBaseLockRelease(pthread_mutex_t *procBaseLock);
extern int GetAuxProcEntryIndex(int baseIdx);
extern void PublishStartupProcessInformation(void);
@ -477,7 +496,9 @@ extern TimestampTz GetStatementFinTime();
extern bool enable_sig_alarm(int delayms, bool is_statement_timeout);
extern bool enable_lockwait_sig_alarm(int delayms);
extern bool enable_session_sig_alarm(int delayms);
extern bool enable_idle_in_transaction_session_sig_alarm(int delayms);
extern bool disable_session_sig_alarm(void);
extern bool disable_idle_in_transaction_session_sig_alarm(void);
extern bool disable_sig_alarm(bool is_statement_timeout);
extern bool pause_sig_alarm(bool is_statement_timeout);
@ -501,12 +522,12 @@ extern void BecomeLockGroupMember(PGPROC *leader);
static inline bool TransactionIdOlderThanAllUndo(TransactionId xid)
{
uint64 cutoff = pg_atomic_read_u64(&g_instance.undo_cxt.oldestXidInUndo);
uint64 cutoff = pg_atomic_read_u64(&g_instance.undo_cxt.globalRecycleXid);
return xid < cutoff;
}
static inline bool TransactionIdOlderThanFrozenXid(TransactionId xid)
{
uint64 cutoff = pg_atomic_read_u64(&g_instance.undo_cxt.oldestFrozenXid);
uint64 cutoff = pg_atomic_read_u64(&g_instance.undo_cxt.globalFrozenXid);
return xid < cutoff;
}

View File

@ -28,6 +28,23 @@ typedef struct RoleIdHashEntry {
int64 roleNum;
} RoleIdHashEntry;
/* Our shared memory area */
typedef struct ProcArrayStruct {
int numProcs; /* number of valid procs entries */
int maxProcs; /* allocated size of procs array */
/* oldest xmin of any replication slot */
TransactionId replication_slot_xmin;
/* oldest catalog xmin of any replication slot */
TransactionId replication_slot_catalog_xmin;
/*
* We declare pgprocnos[] as 1 entry because C wants a fixed-size array,
* but actually it is maxProcs entries long.
*/
int pgprocnos[1]; /* VARIABLE LENGTH ARRAY */
} ProcArrayStruct;
extern void InitRoleIdHashTable();
extern int GetRoleIdCount(Oid roleoid);
extern int IncreaseUserCount(Oid roleoid);
@ -94,7 +111,9 @@ extern bool IsBackendPid(ThreadId pid);
extern VirtualTransactionId* GetCurrentVirtualXIDs(
TransactionId limitXmin, bool excludeXmin0, bool allDbs, int excludeVacuum, int* nvxids);
extern VirtualTransactionId* GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid, XLogRecPtr lsn = 0);
extern VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid, XLogRecPtr lsn = 0,
CommitSeqNo limitXminCSN = InvalidCommitSeqNo,
TransactionId* xminArray = NULL);
extern ThreadId CancelVirtualTransaction(const VirtualTransactionId& vxid, ProcSignalReason sigmode);
extern bool MinimumActiveBackends(int min);

View File

@ -106,7 +106,7 @@ typedef struct {
typedef struct {
/* note: field layout chosen to pack into 16 bytes */
int8 id; /* type field --- must be first */
int8 id; /* type field --- must be first */
int16 bucketId; /* bucketId */
RelFileNodeOld rnode; /* spcNode, dbNode, relNode */
} SharedInvalHbktSmgrMsg;
@ -114,7 +114,7 @@ typedef struct {
#define SHAREDINVALFUNC_ID (-7)
typedef struct {
int8 id; /* type field --- must be first */
int8 id; /* type field --- must be first */
Oid dbId; /* database ID */
int cacheId;
Oid objId; /* function ID or package ID */

View File

@ -94,7 +94,8 @@ extern off_t FileSeek(File file, off_t offset, int whence);
extern int FileTruncate(File file, off_t offset, uint32 wait_event_info = 0);
extern void FileWriteback(File file, off_t offset, off_t nbytes);
extern char* FilePathName(File file);
extern void FileAllocate(File file, uint32 offset, uint32 size);
extern void FileAllocateDirectly(int fd, char* path, uint32 offset, uint32 size);
extern void FileAsyncCUClose(File* vfdList, int32 vfdnum);
extern int FileAsyncRead(AioDispatchDesc_t** dList, int32 dn);
extern int FileAsyncWrite(AioDispatchDesc_t** dList, int32 dn);
@ -104,6 +105,9 @@ extern void FileFastExtendFile(File file, uint32 offset, uint32 size, bool keep_
extern int FileRead(File file, char* buffer, int amount);
extern int FileWrite(File file, const char* buffer, int amount, off_t offset, int fastExtendSize = 0);
/* todo delete */
extern void* MmapExtentAddress(File fd, int pc_memory_map_size, off_t offset);
// Threading virtual files IO interface, using pread() / pwrite()
//
extern int FilePRead(File file, char* buffer, int amount, off_t offset, uint32 wait_event_info = 0);
@ -132,6 +136,7 @@ extern void UnlinkCacheFile(const char* pathname);
/* Operations to allow use of the <dirent.h> library routines */
extern DIR* AllocateDir(const char* dirname);
extern struct dirent* ReadDir(DIR* dir, const char* dirname);
extern struct dirent *ReadDirExtended(DIR *dir, const char *dirname, int elevel);
extern int FreeDir(DIR* dir);
/* Operations to allow use of a plain kernel FD, with automatic cleanup */
extern int OpenTransientFile(FileName fileName, int fileFlags, int fileMode);
@ -186,8 +191,6 @@ extern FileExistStatus CheckFileExists(const char* path);
extern bool repair_deleted_file_check(RelFileNodeForkNum fileNode, int fd);
/* Page compression support routines */
extern void SetupPageCompressMemoryMap(File file, RelFileNode node, const RelFileNodeForkNum& relFileNodeForkNum);
extern PageCompressHeader *GetPageCompressMemoryMap(File file, uint32 chunk_size);
/* Filename components for OpenTemporaryFile */
// Note that this macro must be the same to macro in initdb.cpp

View File

@ -31,9 +31,6 @@ typedef enum {
*/
typedef int ForkNumber;
/* used for delete forknum */
#define COMPRESS_FORKNUM -9
#define SEGMENT_EXT_8192_FORKNUM -8
#define SEGMENT_EXT_1024_FORKNUM -7
#define SEGMENT_EXT_128_FORKNUM -6
@ -81,6 +78,10 @@ typedef int ForkNumber;
* relation with heap disk storage type , 2) 0~BUCKETDATALEN-1 means bucketid
* of a bucket relation, 3) BUCKETDATALEN means a non-bucket segment storage
* relation. Both 2) and 3) representing segment storage type.
*
* opt identifies compressed relation type, reference struct <RelFileCompressOption>.
* 1) zero means normal relation(non-compression),
* 2) otherwise means compressed relation.
*
* Note: spcNode must be GLOBALTABLESPACE_OID if and only if dbNode is
* zero. We support shared relations only in the "global" tablespace.
@ -159,6 +160,7 @@ typedef struct RelFileNodeKey {
RelFileNode relfilenode; /*relfilenode*/
int columnid; /*column for CU store*/
} RelFileNodeKey;
typedef struct RelFileNodeKeyEntry {
RelFileNodeKey key;
int number; /*Times the relfilenode occurence*/
@ -257,6 +259,8 @@ typedef struct {
Oid ownerid;
} ColFileNodeRel;
#define SIZE_OF_COLFILENODE(compress) ((compress) ? sizeof(ColFileNode) : sizeof(ColFileNodeRel))
/*
* 1) ForkNumber type must be 32-bit;
* 2) forknum value occupies the lower 16-bit;
@ -301,9 +305,20 @@ static inline StorageType forknum_get_storage_type(const ForkNumber& forknum)
(colFileNode)->filenode.dbNode = (colFileNodeRel)->filenode.dbNode; \
(colFileNode)->filenode.relNode = (colFileNodeRel)->filenode.relNode; \
(colFileNode)->filenode.opt = 0; \
(colFileNode)->filenode.bucketNode = forknum_get_bucketid((colFileNodeRel)->forknum); \
(colFileNode)->filenode.bucketNode = (int2)forknum_get_bucketid((colFileNodeRel)->forknum); \
(colFileNode)->forknum = forknum_get_forknum((colFileNodeRel)->forknum); \
(colFileNode)->ownerid = (colFileNodeRel)->ownerid; \
} while (0)
#define ColFileNodeFullCopy(colFileNode, colFileNode2) \
do { \
(colFileNode)->filenode.spcNode = (colFileNode2)->filenode.spcNode; \
(colFileNode)->filenode.dbNode = (colFileNode2)->filenode.dbNode; \
(colFileNode)->filenode.relNode = (colFileNode2)->filenode.relNode; \
(colFileNode)->filenode.bucketNode = (int2)forknum_get_bucketid((colFileNode2)->forknum); \
(colFileNode)->forknum = forknum_get_forknum((colFileNode2)->forknum); \
(colFileNode)->filenode.opt = (colFileNode2)->filenode.opt; \
(colFileNode)->ownerid = (colFileNode2)->ownerid; \
} while (0)
#endif /* RELFILENODE_H */

View File

@ -188,5 +188,7 @@ extern Oid get_tablespace_oid_by_name(const char *tablespacename);
extern void redo_xlog_deal_alloc_seg(uint8 opCode, Buffer buffer, const char* data, int data_len,
TransactionId xid);
extern StorageType PartitionGetStorageType(Partition partition, Oid parentOid);
extern bool repair_check_physical_type(uint32 spcNode, uint32 dbNode, int32 forkNum, uint32 *relNode, uint32 *blockNum);
extern RelFileNode get_segment_logic_rnode(SegSpace *spc, BlockNumber head_blocknum, int aim_fork);
#endif

View File

@ -340,6 +340,7 @@ typedef struct IpBlockLocation {
void SetInversePointer(SegExtentGroup *eg, BlockNumber extent, ExtentInversePointer iptr);
ExtentInversePointer GetInversePointer(SegExtentGroup *eg, BlockNumber extent, Buffer *buf);
extern ExtentInversePointer RepairGetInversePointer(SegExtentGroup *seg, BlockNumber extent);
void GetAllInversePointer(SegExtentGroup *seg, uint32 *cnt, ExtentInversePointer **iptrs, BlockNumber **extents);
const char *GetExtentUsageName(ExtentInversePointer iptr);
@ -392,6 +393,7 @@ SegSpace *spc_open(Oid tablespace_id, Oid database_id, bool create, bool isRedo
SegSpace *spc_init_space_node(Oid spcNode, Oid dbNode);
SpaceDataFileStatus spc_status(SegSpace *spc);
SegSpace *spc_drop(Oid tablespace_id, Oid database_id, bool redo);
void spc_drop_space_node(Oid spcNode, Oid dbNode);
void spc_lock(SegSpace *spc);
void spc_unlock(SegSpace *spc);
@ -447,7 +449,8 @@ typedef struct SegPageLocation {
BlockNumber blocknum;
} SegPageLocation;
SegPageLocation seg_get_physical_location(RelFileNode rnode, ForkNumber forknum, BlockNumber blocknum);
SegPageLocation seg_get_physical_location(RelFileNode rnode, ForkNumber forknum, BlockNumber blocknum,
bool check_standby = true);
void seg_record_new_extent_on_level0_page(SegSpace *spc, Buffer seg_head_buffer, uint32 new_extent_id,
BlockNumber new_extent_first_pageno);
void seg_head_update_xlog(Buffer head_buffer, SegmentHead *seg_head, int level0_slot,

View File

@ -16,14 +16,14 @@
#include "fmgr.h"
#include "lib/ilist.h"
#include "storage/smgr/knl_usync.h"
#include "storage/buf/block.h"
#include "storage/smgr/knl_usync.h"
#include "storage/smgr/relfilenode.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"
#include "vecexecutor/vectorbatch.h"
#include "nodes/bitmapset.h"
typedef int File;
/*
* smgr.c maintains a table of SMgrRelation objects, which are essentially
@ -49,6 +49,7 @@ typedef struct SMgrRelationData {
/* pointer to owning pointer, or NULL if none */
struct SMgrRelationData** smgr_owner;
uint64 xact_seqno;
/*
* These next three fields are not actually used or manipulated by smgr,
@ -102,6 +103,12 @@ typedef enum { /* behavior for open file */
typedef SMgrRelationData* SMgrRelation;
typedef struct _MdfdVec {
File mdfd_vfd; /* fd number in fd.c's pool */
BlockNumber mdfd_segno; /* segment number, from 0 */
struct _MdfdVec *mdfd_chain; /* next segment, or NULL */
} MdfdVec;
#define IsSegmentSmgrRelation(smgr) (IsSegmentFileNode((smgr)->smgr_rnode.node))
#define SmgrIsTemp(smgr) RelFileNodeBackendIsTemp((smgr)->smgr_rnode)
@ -143,7 +150,6 @@ extern void smgrsetowner(SMgrRelation* owner, SMgrRelation reln);
extern void smgrclearowner(SMgrRelation* owner, SMgrRelation reln);
extern void smgrclose(SMgrRelation reln, BlockNumber blockNum = InvalidBlockNumber);
extern void smgrcloseall(void);
extern void smgrcleanblocknumall(void);
extern void smgrclosenode(const RelFileNodeBackend& rnode);
extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
extern void smgrdounlink(SMgrRelation reln, bool isRedo, BlockNumber blockNum = InvalidBlockNumber);
@ -161,6 +167,11 @@ extern void smgrtruncatefunc(SMgrRelation reln, ForkNumber forknum, BlockNumber
extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks);
extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
extern void smgrmovebuckets(SMgrRelation reln1, SMgrRelation reln2, List *bList);
extern void SmgrRecoveryPca(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, bool isPcaCheckNeed, bool skipFsync);
extern void SmgrAssistFileProcess(const char *assistInfo, int assistFd);
extern void SmgrChunkFragmentsRestore(const RelFileNode& rnode, ForkNumber forknum, char parttype, bool nowait);
extern void SmgrChunkFragmentsRestoreRecord(const RelFileNode &rnode, ForkNumber forknum);
extern void CfsShrinkerShmemListPush(const RelFileNode &rnode, ForkNumber forknum, char parttype);
extern void AtEOXact_SMgr(void);
@ -186,6 +197,13 @@ extern void md_register_forget_request(RelFileNode rnode, ForkNumber forknum, Bl
/* md sync callbacks */
extern void mdForgetDatabaseFsyncRequests(Oid dbid);
/* chunk compression api */
extern void MdRecoveryPcaPage(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, bool skipFsync);
extern void MdAssistFileProcess(SMgrRelation relation, const char *assistInfo, int assistFd);
extern void CfsRecycleChunk(SMgrRelation reln, ForkNumber forknum);
extern void CfsShrinkRecord(const RelFileNode &node, ForkNumber forknum);
extern void CfsShrinkImpl(void);
/* md sync requests */
extern void ForgetDatabaseSyncRequests(Oid dbid);
extern void CheckPointSyncWithAbsorption(void);

View File

@ -26,7 +26,7 @@ extern void ShutdownRecoveryTransactionEnvironment(void);
extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
const RelFileNode& node, XLogRecPtr lsn = 0);
void ResolveRecoveryConflictWithSnapshotOid(TransactionId latestRemovedXid, Oid dbid);
void ResolveRecoveryConflictWithSnapshotOid(TransactionId latestRemovedXid, Oid dbid, XLogRecPtr lsn);
extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
@ -45,7 +45,7 @@ extern void StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid
extern void StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId* subxids);
extern void StandbyReleaseAllLocks(void);
extern void StandbyReleaseOldLocks(TransactionId oldestRunningXid);
extern bool HasStandbyLocks();
extern void InitRecoveryLockHash();
extern bool standbyWillTouchStandbyLocks(XLogReaderState* record);
@ -66,6 +66,16 @@ typedef struct xl_standby_locks {
xl_standby_lock locks[FLEXIBLE_ARRAY_MEMBER]; /* VARIABLE LENGTH ARRAY */
} xl_standby_locks;
/*
* Keep track of all the locks owned by a given transaction.
*/
typedef struct RecoveryLockListsEntry
{
TransactionId xid;
List *locks;
} RecoveryLockListsEntry;
#define MinSizeOfXactStandbyLocks offsetof(xl_standby_locks, locks)
/*

View File

@ -68,7 +68,7 @@ extern void TvUheapDeleteDelta(Oid relid, Snapshot snap);
extern void TvUheapInsertLost(Oid relid, Snapshot snap);
extern void TvRestoreVersion(TimeCapsuleStmt *stmt);
extern TransactionId TvFetchSnpxminRecycle(TimestampTz tz);
extern TransactionId TvFetchSnpxminRecycle();
/*
* Interfaces for Timecapsule `Recyclebin-based query, restore`

View File

@ -32,7 +32,7 @@
void SharedStorageXlogCopyBackendMain(void);
void WakeUpXLogCopyerBackend();
void CheckShareStorageCtlInfo(XLogRecPtr localEnd);
bool XLogOverwriteFromLocal(bool force = false);
bool XLogOverwriteFromLocal(bool force = false, XLogRecPtr setStart = InvalidXLogRecPtr);
bool XLogOverwriteFromShare();
Size CalShareStorageCtlSize();
ShareStorageXLogCtl *AlignAllocShareStorageCtl();