Add sharedfileset.
This commit is contained in:
@ -9,6 +9,6 @@ ifneq "$(MAKECMDGOALS)" "clean"
|
||||
endif
|
||||
endif
|
||||
endif
|
||||
OBJS = fd.o buffile.o copydir.o reinit.o lz4_file.o
|
||||
OBJS = fd.o buffile.o copydir.o reinit.o lz4_file.o sharedfileset.o
|
||||
|
||||
include $(top_srcdir)/src/gausskernel/common.mk
|
||||
|
||||
@ -32,12 +32,18 @@
|
||||
* BufFile also supports temporary files that exceed the OS file size limit
|
||||
* (by opening multiple fd.c temporary files). This is an essential feature
|
||||
* for sorts and hashjoins on large amounts of data.
|
||||
*
|
||||
* BufFile supports temporary files that can be made read-only and shared with
|
||||
* other backends, as infrastructure for parallel execution. Such files need
|
||||
* to be created as a member of a SharedFileSet that all participants are
|
||||
* attached to.
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
#include "knl/knl_variable.h"
|
||||
|
||||
#include "executor/instrument.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/buffile.h"
|
||||
@ -71,6 +77,10 @@ struct BufFile {
|
||||
bool isTemp; /* can only add files if this is TRUE */
|
||||
bool isInterXact; /* keep open over transactions? */
|
||||
bool dirty; /* does buffer need to be written? */
|
||||
bool readOnly; /* has the file been set to read only? */
|
||||
|
||||
SharedFileSet *fileset; /* space for segment files if shared */
|
||||
const char *name; /* name of this BufFile if shared */
|
||||
|
||||
/*
|
||||
* resowner is the ResourceOwner to use for underlying temp files. (We
|
||||
@ -92,19 +102,20 @@ struct BufFile {
|
||||
char pad; /* extra 1 byte, just a workaround for the memory issue of pread */
|
||||
};
|
||||
|
||||
static BufFile *makeBufFileCommon(int nfiles);
|
||||
static BufFile* makeBufFile(File firstfile);
|
||||
static void extendBufFile(BufFile* file);
|
||||
static void BufFileLoadBuffer(BufFile* file);
|
||||
static void BufFileDumpBuffer(BufFile* file);
|
||||
static int BufFileFlush(BufFile* file);
|
||||
static File MakeNewSharedSegment(const BufFile *file, int segment);
|
||||
|
||||
/*
|
||||
* Create a BufFile given the first underlying physical file.
|
||||
* NOTE: caller must set isTemp and isInterXact if appropriate.
|
||||
* Create BufFile and perform the common initialization.
|
||||
*/
|
||||
static BufFile* makeBufFile(File firstfile)
|
||||
static BufFile *makeBufFileCommon(int nfiles)
|
||||
{
|
||||
BufFile* file = NULL;
|
||||
BufFile *file = NULL;
|
||||
/*
|
||||
* In ADIO scene, the pointer file->buffer must BLCKSZ byte align, so we need to palloc another BLOCK.
|
||||
* AlignMemoryContext will be reset when the transaction aborts, we should alloc the buffile in
|
||||
@ -112,21 +123,19 @@ static BufFile* makeBufFile(File firstfile)
|
||||
*/
|
||||
ADIO_RUN()
|
||||
{
|
||||
file = (BufFile*)palloc0(sizeof(BufFile) + BLCKSZ + BLCKSZ);
|
||||
file->buffer = ((char*)file) + sizeof(BufFile);
|
||||
file->buffer = (char*)TYPEALIGN(BLCKSZ, file->buffer);
|
||||
file = (BufFile *)palloc0(sizeof(BufFile) + BLCKSZ + BLCKSZ);
|
||||
file->buffer = ((char *)file) + sizeof(BufFile);
|
||||
file->buffer = (char *)TYPEALIGN(BLCKSZ, file->buffer);
|
||||
}
|
||||
ADIO_ELSE()
|
||||
{
|
||||
file = (BufFile*)palloc0(sizeof(BufFile) + BLCKSZ);
|
||||
file->buffer = ((char*)file) + sizeof(BufFile);
|
||||
file = (BufFile *)palloc0(sizeof(BufFile) + BLCKSZ);
|
||||
file->buffer = ((char *)file) + sizeof(BufFile);
|
||||
}
|
||||
ADIO_END();
|
||||
|
||||
file->numFiles = 1;
|
||||
file->files = (File*)palloc(sizeof(File));
|
||||
file->files[0] = firstfile;
|
||||
file->offsets = (off_t*)palloc(sizeof(off_t));
|
||||
file->numFiles = nfiles;
|
||||
file->offsets = (off_t *)palloc(sizeof(off_t));
|
||||
file->offsets[0] = 0L;
|
||||
file->isTemp = false;
|
||||
file->isInterXact = false;
|
||||
@ -141,6 +150,23 @@ static BufFile* makeBufFile(File firstfile)
|
||||
return file;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a BufFile given the first underlying physical file.
|
||||
* NOTE: caller must set isTemp and isInterXact if appropriate.
|
||||
*/
|
||||
static BufFile* makeBufFile(File firstfile)
|
||||
{
|
||||
BufFile* file = makeBufFileCommon(1);
|
||||
|
||||
file->files = (File*)palloc(sizeof(File));
|
||||
file->files[0] = firstfile;
|
||||
file->readOnly = false;
|
||||
file->fileset = NULL;
|
||||
file->name = NULL;
|
||||
|
||||
return file;
|
||||
}
|
||||
|
||||
/*
|
||||
* Add another component temp file.
|
||||
*/
|
||||
@ -154,7 +180,11 @@ static void extendBufFile(BufFile* file)
|
||||
t_thrd.utils_cxt.CurrentResourceOwner = file->resowner;
|
||||
|
||||
Assert(file->isTemp);
|
||||
pfile = OpenTemporaryFile(file->isInterXact);
|
||||
if (file->fileset == NULL) {
|
||||
pfile = OpenTemporaryFile(file->isInterXact);
|
||||
} else {
|
||||
pfile = MakeNewSharedSegment(file, file->numFiles);
|
||||
}
|
||||
|
||||
t_thrd.utils_cxt.CurrentResourceOwner = oldowner;
|
||||
|
||||
@ -200,6 +230,171 @@ BufFile* BufFileCreateTemp(bool inter_xact)
|
||||
return main_buf_file;
|
||||
}
|
||||
|
||||
/*
|
||||
* Build the name for a given segment of a given BufFile.
|
||||
*/
|
||||
static void SharedSegmentName(char *name, Size name_len, const char *buffile_name, int segment)
|
||||
{
|
||||
int rc = sprintf_s(name, name_len, "%s.%d", buffile_name, segment);
|
||||
securec_check_ss(rc, "", "");
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a new segment file backing a shared BufFile.
|
||||
*/
|
||||
static File MakeNewSharedSegment(const BufFile *buffile, int segment)
|
||||
{
|
||||
char name[MAXPGPATH];
|
||||
|
||||
/*
|
||||
* It is possible that there are files left over from before a crash
|
||||
* restart with the same name. In order for BufFileOpenShared() not to
|
||||
* get confused about how many segments there are, we'll unlink the next
|
||||
* segment number if it already exists.
|
||||
*/
|
||||
SharedSegmentName(name, MAXPGPATH, buffile->name, segment + 1);
|
||||
(void)SharedFileSetDelete(buffile->fileset, name, true);
|
||||
|
||||
/* Create the new segment. */
|
||||
SharedSegmentName(name, MAXPGPATH, buffile->name, segment);
|
||||
File file = SharedFileSetCreate(buffile->fileset, name);
|
||||
|
||||
/* SharedFileSetCreate would've errored out */
|
||||
Assert(file > 0);
|
||||
|
||||
return file;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a BufFile that can be discovered and opened read-only by other
|
||||
* backends that are attached to the same SharedFileSet using the same name.
|
||||
*
|
||||
* The naming scheme for shared BufFiles is left up to the calling code. The
|
||||
* name will appear as part of one or more filenames on disk, and might
|
||||
* provide clues to administrators about which subsystem is generating
|
||||
* temporary file data. Since each SharedFileSet object is backed by one or
|
||||
* more uniquely named temporary directory, names don't conflict with
|
||||
* unrelated SharedFileSet objects.
|
||||
*/
|
||||
BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name)
|
||||
{
|
||||
BufFile *file = makeBufFileCommon(1);
|
||||
file->fileset = fileset;
|
||||
file->name = pstrdup(name);
|
||||
file->files = (File *)palloc(sizeof(File));
|
||||
file->files[0] = MakeNewSharedSegment(file, 0);
|
||||
file->readOnly = false;
|
||||
|
||||
return file;
|
||||
}
|
||||
|
||||
/*
|
||||
* Open a file that was previously created in another backend (or this one)
|
||||
* with BufFileCreateShared in the same SharedFileSet using the same name.
|
||||
* The backend that created the file must have called BufFileClose() or
|
||||
* BufFileExportShared() to make sure that it is ready to be opened by other
|
||||
* backends and render it read-only.
|
||||
*/
|
||||
BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name)
|
||||
{
|
||||
char segment_name[MAXPGPATH];
|
||||
int64 capacity = 16;
|
||||
int nfiles = 0;
|
||||
|
||||
File *files = (File*)palloc(sizeof(File) * capacity);
|
||||
|
||||
/*
|
||||
* We don't know how many segments there are, so we'll probe the
|
||||
* filesystem to find out.
|
||||
*/
|
||||
for (;;) {
|
||||
/* See if we need to expand our file segment array. */
|
||||
if ((uint32)nfiles + 1 > capacity) {
|
||||
capacity *= 2;
|
||||
files = (File*)repalloc(files, sizeof(File) * capacity);
|
||||
}
|
||||
/* Try to load a segment. */
|
||||
SharedSegmentName(segment_name, MAXPGPATH, name, nfiles);
|
||||
files[nfiles] = SharedFileSetOpen(fileset, segment_name);
|
||||
if (files[nfiles] <= 0) {
|
||||
break;
|
||||
}
|
||||
++nfiles;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
|
||||
/*
|
||||
* If we didn't find any files at all, then no BufFile exists with this
|
||||
* name.
|
||||
*/
|
||||
if (nfiles == 0) {
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m", segment_name, name)));
|
||||
}
|
||||
|
||||
BufFile *file = makeBufFileCommon(nfiles);
|
||||
file->files = files;
|
||||
file->readOnly = true; /* Can't write to files opened this way */
|
||||
file->fileset = fileset;
|
||||
file->name = pstrdup(name);
|
||||
|
||||
return file;
|
||||
}
|
||||
|
||||
/*
|
||||
* Delete a BufFile that was created by BufFileCreateShared in the given
|
||||
* SharedFileSet using the given name.
|
||||
*
|
||||
* It is not necessary to delete files explicitly with this function. It is
|
||||
* provided only as a way to delete files proactively, rather than waiting for
|
||||
* the SharedFileSet to be cleaned up.
|
||||
*
|
||||
* Only one backend should attempt to delete a given name, and should know
|
||||
* that it exists and has been exported or closed.
|
||||
*/
|
||||
void BufFileDeleteShared(const SharedFileSet *fileset, const char *name)
|
||||
{
|
||||
char segment_name[MAXPGPATH];
|
||||
int segment = 0;
|
||||
bool found = false;
|
||||
|
||||
/*
|
||||
* We don't know how many segments the file has. We'll keep deleting
|
||||
* until we run out. If we don't manage to find even an initial segment,
|
||||
* raise an error.
|
||||
*/
|
||||
for (;;) {
|
||||
SharedSegmentName(segment_name, MAXPGPATH, name, segment);
|
||||
if (!SharedFileSetDelete(fileset, segment_name, true)) {
|
||||
break;
|
||||
}
|
||||
found = true;
|
||||
++segment;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
|
||||
if (!found) {
|
||||
ereport(ERROR, (errmsg("could not delete unknown shared BufFile \"%s\"", name)));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* BufFileExportShared --- flush and make read-only, in preparation for sharing.
|
||||
*/
|
||||
void BufFileExportShared(BufFile *file)
|
||||
{
|
||||
/* Must be a file belonging to a SharedFileSet. */
|
||||
Assert(file->fileset != NULL);
|
||||
|
||||
/* It's probably a bug if someone calls this twice. */
|
||||
Assert(!file->readOnly);
|
||||
|
||||
(void)BufFileFlush(file);
|
||||
file->readOnly = true;
|
||||
}
|
||||
|
||||
#ifdef NOT_USED
|
||||
/*
|
||||
* Create a BufFile and attach it to an already-opened virtual File.
|
||||
@ -431,6 +626,7 @@ size_t BufFileRead(BufFile* file, void* ptr, size_t size)
|
||||
*/
|
||||
size_t BufFileWrite(BufFile* file, void* ptr, size_t size)
|
||||
{
|
||||
Assert(!file->readOnly);
|
||||
size_t nwritten = 0;
|
||||
size_t nthistime;
|
||||
errno_t rc = EOK;
|
||||
|
||||
@ -39,6 +39,14 @@
|
||||
* for a long time, like relation files. It is the caller's responsibility
|
||||
* to close them, there is no automatic mechanism in fd.c for that.
|
||||
*
|
||||
* PathName(Create|Open|Delete)Temporary(File|Dir) are used to manage
|
||||
* temporary files that have names so that they can be shared between
|
||||
* backends. Such files are automatically closed and count against the
|
||||
* temporary file limit of the backend that creates them, but unlike anonymous
|
||||
* files they are not automatically deleted. See sharedfileset.c for a shared
|
||||
* ownership mechanism that provides automatic cleanup for shared files when
|
||||
* the last of a group of backends detaches.
|
||||
*
|
||||
* AllocateFile, AllocateDir and OpenTransientFile are wrappers around
|
||||
* fopen(3), opendir(3), and open(2), respectively. They behave like the
|
||||
* corresponding native functions, except that the handle is registered with
|
||||
@ -145,10 +153,11 @@
|
||||
#define FileUnknownPos ((off_t)-1)
|
||||
|
||||
/* these are the assigned bits in fdstate below: */
|
||||
#define FD_TEMPORARY (1 << 0) /* T = delete when closed */
|
||||
#define FD_XACT_TEMPORARY (1 << 1) /* T = delete at eoXact */
|
||||
#define FD_DELETE_AT_CLOSE (1 << 0) /* T = delete when closed */
|
||||
#define FD_CLOSE_AT_EOXACT (1 << 1) /* T = close at eoXact */
|
||||
#define FD_ERRTBL_LOG (1 << 2) /* T = caching log file for error table */
|
||||
#define FD_ERRTBL_LOG_OWNER (1 << 3) /* T = owner of caching log file for error table */
|
||||
#define FD_TEMP_FILE_LIMIT (1 << 4) /* T = respect temp_file_limit */
|
||||
|
||||
#define RETRY_LIMIT 3 /* alloc socket retry limit */
|
||||
|
||||
@ -219,7 +228,7 @@ static void FreeVfd(File file);
|
||||
static int FileAccess(File file);
|
||||
static File OpenTemporaryFileInTablespace(Oid tblspcOid, bool rejectError);
|
||||
static void CleanupTempFiles(bool isProcExit);
|
||||
static void RemovePgTempFilesInDir(const char* tmpdirname);
|
||||
static void RemovePgTempFilesInDir(const char *tmpdirname, bool unlink_all);
|
||||
static void RemovePgTempRelationFiles(const char* tsdirname);
|
||||
static void RemovePgTempRelationFilesInDbspace(const char* dbspacedirname);
|
||||
static bool looks_like_temp_rel_name(const char* name);
|
||||
@ -227,6 +236,10 @@ static void DataFileIdCloseFile(Vfd* vfdP);
|
||||
static File PathNameOpenFile_internal(FileName fileName, int fileFlags, int fileMode, bool useFileCache,
|
||||
const RelFileNodeForkNum& fileNode, File file = FILE_INVALID);
|
||||
static void ReleaseLruFiles(void);
|
||||
static void walkdir(const char *path, void (*action)(const char *fname, bool isdir, int elevel), bool process_symlinks,
|
||||
int elevel);
|
||||
static struct dirent *ReadDirExtended(DIR *dir, const char *dirname, int elevel);
|
||||
static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel);
|
||||
void AtProcExit_Files(int code, Datum arg);
|
||||
|
||||
void ReportAlarmInsuffDataInstFileDesc()
|
||||
@ -1063,6 +1076,35 @@ static int FileAccess(File file)
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Called whenever a temporary file is deleted to report its size.
|
||||
*/
|
||||
static void ReportTemporaryFileUsage(const char *path, off_t size)
|
||||
{
|
||||
pgstat_report_tempfile(size);
|
||||
|
||||
if (u_sess->attr.attr_common.log_temp_files >= 0) {
|
||||
if ((size / 1024) >= u_sess->attr.attr_common.log_temp_files) {
|
||||
ereport(LOG, (errmsg("temporary file: path \"%s\", size %lu", path, (unsigned long)size)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Called to register a temporary file for automatic close.
|
||||
* ResourceOwnerEnlargeFiles(CurrentResourceOwner) must have been called
|
||||
* before the file was opened.
|
||||
*/
|
||||
static void RegisterTemporaryFile(File file)
|
||||
{
|
||||
u_sess->storage_cxt.VfdCache[file].fdstate |= FD_CLOSE_AT_EOXACT;
|
||||
|
||||
ResourceOwnerRememberFile(t_thrd.utils_cxt.CurrentResourceOwner, file);
|
||||
u_sess->storage_cxt.VfdCache[file].resowner = t_thrd.utils_cxt.CurrentResourceOwner;
|
||||
|
||||
/* ensure cleanup happens at eoxact */
|
||||
u_sess->storage_cxt.have_xact_temporary_files = true;
|
||||
}
|
||||
|
||||
#ifdef NOT_USED
|
||||
/* Called when we get a shared invalidation message on some relation. */
|
||||
@ -1098,7 +1140,7 @@ File DataFileIdOpenFile(FileName fileName, const RelFileNodeForkNum& fileNode, i
|
||||
* otherwise, use ERROR log level.
|
||||
*/
|
||||
#define LogLevelOfCloseFileFailed(_vfdP_) \
|
||||
((t_thrd.xact_cxt.bInAbortTransaction && (((_vfdP_)->fdstate & (FD_TEMPORARY | FD_XACT_TEMPORARY)) != 0)) \
|
||||
((t_thrd.xact_cxt.bInAbortTransaction && (((_vfdP_)->fdstate & (FD_DELETE_AT_CLOSE | FD_CLOSE_AT_EOXACT)) != 0)) \
|
||||
? WARNING \
|
||||
: data_sync_elevel(ERROR))
|
||||
|
||||
@ -1282,6 +1324,64 @@ static File PathNameOpenFile_internal(
|
||||
return file;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create directory 'directory'. If necessary, create 'basedir', which must
|
||||
* be the directory above it. This is designed for creating the top-level
|
||||
* temporary directory on demand before creating a directory underneath it.
|
||||
* Do nothing if the directory already exists.
|
||||
*
|
||||
* Directories created within the top-level temporary directory should begin
|
||||
* with PG_TEMP_FILE_PREFIX, so that they can be identified as temporary and
|
||||
* deleted at startup by RemovePgTempFiles(). Further subdirectories below
|
||||
* that do not need any particular prefix.
|
||||
*/
|
||||
void PathNameCreateTemporaryDir(const char *basedir, const char *directory)
|
||||
{
|
||||
if (mkdir(directory, S_IRWXU) < 0) {
|
||||
if (errno == EEXIST) {
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Failed. Try to create basedir first in case it's missing. Tolerate
|
||||
* EEXIST to close a race against another process following the same
|
||||
* algorithm.
|
||||
*/
|
||||
if (mkdir(directory, S_IRWXU) < 0 && errno != EEXIST) {
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(), errmsg("cannot create temporary directory \"%s\": %m", basedir)));
|
||||
}
|
||||
|
||||
/* Try again. */
|
||||
if (mkdir(directory, S_IRWXU) < 0 && errno != EEXIST) {
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(), errmsg("cannot create temporary subdirectory \"%s\": %m", directory)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Delete a directory and everything in it, if it exists.
|
||||
*/
|
||||
void PathNameDeleteTemporaryDir(const char *dirname)
|
||||
{
|
||||
struct stat statbuf;
|
||||
|
||||
/* Silently ignore missing directory. */
|
||||
if (stat(dirname, &statbuf) != 0 && errno == ENOENT) {
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Currently, walkdir doesn't offer a way for our passed in function to
|
||||
* maintain state. Perhaps it should, so that we could tell the caller
|
||||
* whether this operation succeeded or failed. Since this operation is
|
||||
* used in a cleanup path, we wouldn't actually behave differently: we'll
|
||||
* just log failures.
|
||||
*/
|
||||
walkdir(dirname, unlink_if_exists_fname, false, LOG);
|
||||
}
|
||||
|
||||
/*
|
||||
* Open a temporary file that will disappear when we close it.
|
||||
*
|
||||
@ -1331,18 +1431,12 @@ File OpenTemporaryFile(bool interXact)
|
||||
u_sess->proc_cxt.MyDatabaseTableSpace ? u_sess->proc_cxt.MyDatabaseTableSpace : DEFAULTTABLESPACE_OID,
|
||||
true);
|
||||
|
||||
/* Mark it for deletion at close */
|
||||
u_sess->storage_cxt.VfdCache[file].fdstate |= FD_TEMPORARY;
|
||||
/* Mark it for deletion at close and temporary file size limit */
|
||||
u_sess->storage_cxt.VfdCache[file].fdstate |= FD_DELETE_AT_CLOSE | FD_TEMP_FILE_LIMIT;
|
||||
|
||||
/* Register it with the current resource owner */
|
||||
if (!interXact) {
|
||||
u_sess->storage_cxt.VfdCache[file].fdstate |= FD_XACT_TEMPORARY;
|
||||
|
||||
ResourceOwnerRememberFile(t_thrd.utils_cxt.CurrentResourceOwner, file);
|
||||
u_sess->storage_cxt.VfdCache[file].resowner = t_thrd.utils_cxt.CurrentResourceOwner;
|
||||
|
||||
/* ensure cleanup happens at eoxact */
|
||||
u_sess->storage_cxt.have_xact_temporary_files = true;
|
||||
RegisterTemporaryFile(file);
|
||||
}
|
||||
|
||||
return file;
|
||||
@ -1410,6 +1504,35 @@ void UnlinkCacheFile(const char* pathname)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Return the path of the temp directory in a given tablespace.
|
||||
*/
|
||||
void TempTablespacePath(char *tempdirpath, Size pathSize, Oid tblspcOid)
|
||||
{
|
||||
/*
|
||||
* Identify the tempfile directory for this tablespace.
|
||||
*
|
||||
* If someone tries to specify pg_global, use pg_default instead.
|
||||
*/
|
||||
int rc;
|
||||
if (tblspcOid == InvalidOid || tblspcOid == DEFAULTTABLESPACE_OID || tblspcOid == GLOBALTABLESPACE_OID) {
|
||||
/* The default tablespace is {datadir}/base */
|
||||
rc = snprintf_s(tempdirpath, pathSize, pathSize - 1, "base/%s", PG_TEMP_FILES_DIR);
|
||||
securec_check_ss(rc, "", "");
|
||||
} else {
|
||||
/* All other tablespaces are accessed via symlinks */
|
||||
#ifdef PGXC
|
||||
/* Postgres-XC tablespaces include node name in path */
|
||||
rc = snprintf_s(tempdirpath, pathSize, pathSize - 1, "pg_tblspc/%u/%s_%s/%s", tblspcOid,
|
||||
TABLESPACE_VERSION_DIRECTORY, g_instance.attr.attr_common.PGXCNodeName, PG_TEMP_FILES_DIR);
|
||||
#else
|
||||
rc = snprintf_s(tempdirpath, pathSize, pathSize - 1, "pg_tblspc/%u/%s/%s", tblspcOid,
|
||||
TABLESPACE_VERSION_DIRECTORY, PG_TEMP_FILES_DIR);
|
||||
#endif
|
||||
securec_check_ss(rc, "", "");
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Open a temporary file in a specific tablespace.
|
||||
* Subroutine for OpenTemporaryFile, which see for details.
|
||||
@ -1418,47 +1541,14 @@ static File OpenTemporaryFileInTablespace(Oid tblspcOid, bool rejectError)
|
||||
{
|
||||
char tempdirpath[MAXPGPATH];
|
||||
char tempfilepath[MAXPGPATH];
|
||||
File file;
|
||||
int rc = EOK;
|
||||
|
||||
/*
|
||||
* Identify the tempfile directory for this tablespace.
|
||||
*
|
||||
* If someone tries to specify pg_global, use pg_default instead.
|
||||
*/
|
||||
if (tblspcOid == DEFAULTTABLESPACE_OID || tblspcOid == GLOBALTABLESPACE_OID) {
|
||||
/* The default tablespace is {datadir}/base */
|
||||
rc = snprintf_s(tempdirpath, sizeof(tempdirpath), sizeof(tempdirpath) - 1, "base/%s", PG_TEMP_FILES_DIR);
|
||||
securec_check_ss(rc, "", "");
|
||||
} else {
|
||||
/* All other tablespaces are accessed via symlinks */
|
||||
#ifdef PGXC
|
||||
/* Postgres-XC tablespaces include node name in path */
|
||||
rc = snprintf_s(tempdirpath,
|
||||
sizeof(tempdirpath),
|
||||
sizeof(tempdirpath) - 1,
|
||||
"pg_tblspc/%u/%s_%s/%s",
|
||||
tblspcOid,
|
||||
TABLESPACE_VERSION_DIRECTORY,
|
||||
g_instance.attr.attr_common.PGXCNodeName,
|
||||
PG_TEMP_FILES_DIR);
|
||||
#else
|
||||
rc = snprintf_s(tempdirpath,
|
||||
sizeof(tempdirpath),
|
||||
sizeof(tempdirpath) - 1,
|
||||
"pg_tblspc/%u/%s/%s",
|
||||
tblspcOid,
|
||||
TABLESPACE_VERSION_DIRECTORY,
|
||||
PG_TEMP_FILES_DIR);
|
||||
#endif
|
||||
securec_check_ss(rc, "", "");
|
||||
}
|
||||
TempTablespacePath(tempdirpath, MAXPGPATH, tblspcOid);
|
||||
|
||||
/*
|
||||
* Generate a tempfile name that should be unique within the current
|
||||
* database instance.
|
||||
*/
|
||||
rc = snprintf_s(tempfilepath,
|
||||
int rc = snprintf_s(tempfilepath,
|
||||
sizeof(tempfilepath),
|
||||
sizeof(tempfilepath) - 1,
|
||||
"%s/%s%lu.%ld",
|
||||
@ -1472,7 +1562,7 @@ static File OpenTemporaryFileInTablespace(Oid tblspcOid, bool rejectError)
|
||||
* Open the file. Note: we don't use O_EXCL, in case there is an orphaned
|
||||
* temp file that can be reused.
|
||||
*/
|
||||
file = PathNameOpenFile(tempfilepath, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY, 0600);
|
||||
File file = PathNameOpenFile(tempfilepath, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY, 0600);
|
||||
if (file <= 0) {
|
||||
/*
|
||||
* We might need to create the tablespace's tempfile directory, if no
|
||||
@ -1510,13 +1600,119 @@ void FileCloseWithThief(File file)
|
||||
vfdP->fd = VFD_CLOSED;
|
||||
}
|
||||
|
||||
Assert(!(vfdP->fdstate & FD_TEMPORARY));
|
||||
Assert(!(vfdP->fdstate & FD_DELETE_AT_CLOSE));
|
||||
if (vfdP->resowner) {
|
||||
ResourceOwnerForgetFile(vfdP->resowner, file);
|
||||
}
|
||||
FreeVfd(file);
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a new file. The directory containing it must already exist. Files
|
||||
* created this way are subject to temp_file_limit and are automatically
|
||||
* closed at end of transaction, but are not automatically deleted on close
|
||||
* because they are intended to be shared between cooperating backends.
|
||||
*
|
||||
* If the file is inside the top-level temporary directory, its name should
|
||||
* begin with PG_TEMP_FILE_PREFIX so that it can be identified as temporary
|
||||
* and deleted at startup by RemovePgTempFiles(). Alternatively, it can be
|
||||
* inside a directory created with PathNameCreateTemporaryDir(), in which case
|
||||
* the prefix isn't needed.
|
||||
*/
|
||||
File PathNameCreateTemporaryFile(char *path, bool error_on_failure)
|
||||
{
|
||||
ResourceOwnerEnlargeFiles(t_thrd.utils_cxt.CurrentResourceOwner);
|
||||
|
||||
/*
|
||||
* Open the file. Note: we don't use O_EXCL, in case there is an orphaned
|
||||
* temp file that can be reused.
|
||||
*/
|
||||
File file = PathNameOpenFile(path, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY, S_IRUSR | S_IWUSR);
|
||||
if (file <= 0) {
|
||||
if (error_on_failure) {
|
||||
ereport(ERROR, (errcode_for_file_access(), errmsg("could not create temporary file \"%s\": %m", path)));
|
||||
} else {
|
||||
return file;
|
||||
}
|
||||
}
|
||||
|
||||
/* Mark it for temp_file_limit accounting. */
|
||||
u_sess->storage_cxt.VfdCache[file].fdstate |= FD_TEMP_FILE_LIMIT;
|
||||
|
||||
/* Register it for automatic close. */
|
||||
RegisterTemporaryFile(file);
|
||||
|
||||
return file;
|
||||
}
|
||||
|
||||
/*
|
||||
* Open a file that was created with PathNameCreateTemporaryFile, possibly in
|
||||
* another backend. Files opened this way don't count against the
|
||||
* temp_file_limit of the caller, are read-only and are automatically closed
|
||||
* at the end of the transaction but are not deleted on close.
|
||||
*/
|
||||
File PathNameOpenTemporaryFile(char *path)
|
||||
{
|
||||
ResourceOwnerEnlargeFiles(t_thrd.utils_cxt.CurrentResourceOwner);
|
||||
|
||||
/* We open the file read-only. */
|
||||
File file = PathNameOpenFile(path, O_RDONLY | PG_BINARY, S_IRUSR | S_IWUSR);
|
||||
/* If no such file, then we don't raise an error. */
|
||||
if (file <= 0 && errno != ENOENT) {
|
||||
ereport(ERROR, (errcode_for_file_access(), errmsg("could not open temporary file \"%s\": %m", path)));
|
||||
}
|
||||
|
||||
if (file > 0) {
|
||||
/* Register it for automatic close. */
|
||||
RegisterTemporaryFile(file);
|
||||
}
|
||||
|
||||
return file;
|
||||
}
|
||||
|
||||
/*
|
||||
* Delete a file by pathname. Return true if the file existed, false if
|
||||
* didn't.
|
||||
*/
|
||||
bool PathNameDeleteTemporaryFile(const char *path, bool error_on_failure)
|
||||
{
|
||||
struct stat filestats;
|
||||
int stat_errno;
|
||||
|
||||
/* Get the final size for pgstat reporting. */
|
||||
if (stat(path, &filestats) != 0) {
|
||||
stat_errno = errno;
|
||||
} else {
|
||||
stat_errno = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Unlike FileClose's automatic file deletion code, we tolerate
|
||||
* non-existence to support BufFileDeleteShared which doesn't know how
|
||||
* many segments it has to delete until it runs out.
|
||||
*/
|
||||
if (stat_errno == ENOENT) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (unlink(path) < 0) {
|
||||
if (errno != ENOENT) {
|
||||
ereport(error_on_failure ? ERROR : LOG,
|
||||
(errcode_for_file_access(), errmsg("could not unlink temporary file \"%s\": %m", path)));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
if (stat_errno == 0) {
|
||||
ReportTemporaryFileUsage(path, filestats.st_size);
|
||||
} else {
|
||||
errno = stat_errno;
|
||||
ereport(LOG, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", path)));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* close a file when done with it
|
||||
*/
|
||||
@ -1549,10 +1745,17 @@ void FileClose(File file)
|
||||
RESUME_INTERRUPTS();
|
||||
}
|
||||
|
||||
if (vfdP->fdstate & FD_TEMP_FILE_LIMIT) {
|
||||
/* Subtract its size from current usage (do first in case of error) */
|
||||
u_sess->storage_cxt.temporary_files_size -= vfdP->fileSize;
|
||||
perm_space_decrease(GetUserId(), (uint64)vfdP->fileSize, SP_SPILL);
|
||||
vfdP->fileSize = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Delete the file if it was temporary, and make a log entry if wanted
|
||||
*/
|
||||
if (vfdP->fdstate & FD_TEMPORARY) {
|
||||
if (vfdP->fdstate & FD_DELETE_AT_CLOSE) {
|
||||
struct stat filestats;
|
||||
int stat_errno;
|
||||
|
||||
@ -1563,12 +1766,7 @@ void FileClose(File file)
|
||||
* is arranged to ensure that the worst-case consequence is failing to
|
||||
* emit log message(s), not failing to attempt the unlink.
|
||||
*/
|
||||
vfdP->fdstate &= ~FD_TEMPORARY;
|
||||
|
||||
/* Subtract its size from current usage (do first in case of error) */
|
||||
u_sess->storage_cxt.temporary_files_size -= vfdP->fileSize;
|
||||
perm_space_decrease(GetUserId(), (uint64)vfdP->fileSize, SP_SPILL);
|
||||
vfdP->fileSize = 0;
|
||||
vfdP->fdstate &= ~FD_DELETE_AT_CLOSE;
|
||||
|
||||
/* first try the stat() */
|
||||
if (stat(vfdP->fileName, &filestats))
|
||||
@ -1582,15 +1780,7 @@ void FileClose(File file)
|
||||
|
||||
/* and last report the stat results */
|
||||
if (stat_errno == 0) {
|
||||
pgstat_report_tempfile((size_t)filestats.st_size);
|
||||
|
||||
if (u_sess->attr.attr_common.log_temp_files >= 0) {
|
||||
if ((filestats.st_size / 1024) >= u_sess->attr.attr_common.log_temp_files)
|
||||
ereport(LOG,
|
||||
(errmsg("temporary file: path \"%s\", size %lu",
|
||||
vfdP->fileName,
|
||||
(unsigned long)filestats.st_size)));
|
||||
}
|
||||
ReportTemporaryFileUsage(vfdP->fileName, filestats.st_size);
|
||||
} else {
|
||||
errno = stat_errno;
|
||||
ereport(LOG, (errmsg("could not stat file \"%s\": %m", vfdP->fileName)));
|
||||
@ -1831,7 +2021,7 @@ int FilePWrite(File file, const char* buffer, int amount, off_t offset, uint32 w
|
||||
* message if we do that. All current callers would just throw error
|
||||
* immediately anyway, so this is safe at present.
|
||||
*/
|
||||
if (u_sess->storage_cxt.VfdCache[file].fdstate & FD_TEMPORARY) {
|
||||
if (u_sess->storage_cxt.VfdCache[file].fdstate & FD_TEMP_FILE_LIMIT) {
|
||||
off_t newPos = u_sess->storage_cxt.VfdCache[file].seekPos + amount;
|
||||
|
||||
if (newPos > u_sess->storage_cxt.VfdCache[file].fileSize) {
|
||||
@ -1869,7 +2059,7 @@ retry:
|
||||
u_sess->storage_cxt.VfdCache[file].seekPos += returnCode;
|
||||
|
||||
/* maintain fileSize and temporary_files_size if it's a temp file */
|
||||
if (u_sess->storage_cxt.VfdCache[file].fdstate & FD_TEMPORARY) {
|
||||
if (u_sess->storage_cxt.VfdCache[file].fdstate & FD_TEMP_FILE_LIMIT) {
|
||||
off_t newPos = u_sess->storage_cxt.VfdCache[file].seekPos;
|
||||
|
||||
if (newPos > u_sess->storage_cxt.VfdCache[file].fileSize) {
|
||||
@ -2272,7 +2462,7 @@ int FileTruncate(File file, off_t offset, uint32 wait_event_info)
|
||||
|
||||
if (returnCode == 0 && u_sess->storage_cxt.VfdCache[file].fileSize > offset) {
|
||||
/* adjust our state for truncation of a temp file */
|
||||
Assert(u_sess->storage_cxt.VfdCache[file].fdstate & FD_TEMPORARY);
|
||||
Assert(u_sess->storage_cxt.VfdCache[file].fdstate & FD_TEMP_FILE_LIMIT);
|
||||
uint64 descSize = (uint64)(u_sess->storage_cxt.VfdCache[file].fileSize - offset);
|
||||
perm_space_decrease(GetUserId(), descSize, SP_SPILL);
|
||||
u_sess->storage_cxt.temporary_files_size -= descSize;
|
||||
@ -2823,14 +3013,29 @@ TryAgain:
|
||||
*/
|
||||
struct dirent* ReadDir(DIR* dir, const char* dirname)
|
||||
{
|
||||
struct dirent* dent = NULL;
|
||||
return ReadDirExtended(dir, dirname, ERROR);
|
||||
}
|
||||
|
||||
/*
|
||||
* Alternate version of ReadDir that allows caller to specify the elevel
|
||||
* for any error report (whether it's reporting an initial failure of
|
||||
* AllocateDir or a subsequent directory read failure).
|
||||
*
|
||||
* If elevel < ERROR, returns NULL after any error. With the normal coding
|
||||
* pattern, this will result in falling out of the loop immediately as
|
||||
* though the directory contained no (more) entries.
|
||||
*/
|
||||
static struct dirent *ReadDirExtended(DIR *dir, const char *dirname, int elevel)
|
||||
{
|
||||
struct dirent *dent = NULL;
|
||||
|
||||
/* Give a generic message for AllocateDir failure, if caller didn't */
|
||||
if (dir == NULL) {
|
||||
char realDir[PATH_MAX] = {0};
|
||||
char* ret = realpath(dirname, realDir);
|
||||
ereport(ERROR, (errcode_for_file_access(), errmsg("could not open directory \"%s\": %m",
|
||||
ret != nullptr ? realDir : dirname)));
|
||||
char *ret = realpath(dirname, realDir);
|
||||
ereport(elevel, (errcode_for_file_access(),
|
||||
errmsg("could not open directory \"%s\": %m", ret != nullptr ? realDir : dirname)));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
errno = 0;
|
||||
@ -2849,9 +3054,10 @@ struct dirent* ReadDir(DIR* dir, const char* dirname)
|
||||
|
||||
if (errno) {
|
||||
char realDir[PATH_MAX] = {0};
|
||||
char* ret = realpath(dirname, realDir);
|
||||
ereport(ERROR, (errcode_for_file_access(), errmsg("could not read directory \"%s\": %m",
|
||||
ret != nullptr ? realDir : dirname)));
|
||||
char *ret = realpath(dirname, realDir);
|
||||
ereport(elevel, (errcode_for_file_access(),
|
||||
errmsg("could not read directory \"%s\": %m", ret != nullptr ? realDir : dirname)));
|
||||
return NULL;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
@ -2969,6 +3175,24 @@ bool TempTablespacesAreSet(void)
|
||||
return (u_sess->storage_cxt.numTempTableSpaces >= 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* GetTempTablespaces
|
||||
*
|
||||
* Populate an array with the OIDs of the tablespaces that should be used for
|
||||
* temporary files. Return the number that were copied into the output array.
|
||||
*/
|
||||
int GetTempTablespaces(Oid *tableSpaces, int numSpaces)
|
||||
{
|
||||
int i;
|
||||
|
||||
Assert(TempTablespacesAreSet());
|
||||
for (i = 0; i < u_sess->storage_cxt.numTempTableSpaces && i < numSpaces; ++i) {
|
||||
tableSpaces[i] = u_sess->storage_cxt.tempTableSpaces[i];
|
||||
}
|
||||
|
||||
return i;
|
||||
}
|
||||
|
||||
/*
|
||||
* GetNextTempTableSpace
|
||||
*
|
||||
@ -3059,7 +3283,8 @@ static void CleanupTempFiles(bool isProcExit)
|
||||
for (i = 1; i < u_sess->storage_cxt.SizeVfdCache; i++) {
|
||||
unsigned short fdstate = u_sess->storage_cxt.VfdCache[i].fdstate;
|
||||
|
||||
if ((fdstate & FD_TEMPORARY) && u_sess->storage_cxt.VfdCache[i].fileName != NULL) {
|
||||
if (((fdstate & FD_DELETE_AT_CLOSE) || (fdstate & FD_CLOSE_AT_EOXACT)) &&
|
||||
u_sess->storage_cxt.VfdCache[i].fileName != NULL) {
|
||||
/*
|
||||
* If we're in the process of exiting a backend process, close
|
||||
* all temporary files. Otherwise, only close temporary files
|
||||
@ -3069,10 +3294,9 @@ static void CleanupTempFiles(bool isProcExit)
|
||||
*/
|
||||
if (isProcExit)
|
||||
FileClose((File)i);
|
||||
else if (fdstate & FD_XACT_TEMPORARY) {
|
||||
ereport(WARNING,
|
||||
(errmsg("temporary file %s not closed at end-of-transaction",
|
||||
u_sess->storage_cxt.VfdCache[i].fileName)));
|
||||
else if (fdstate & FD_CLOSE_AT_EOXACT) {
|
||||
ereport(WARNING, (errmsg("temporary file %s not closed at end-of-transaction",
|
||||
u_sess->storage_cxt.VfdCache[i].fileName)));
|
||||
FileClose((File)i);
|
||||
}
|
||||
} else if ((fdstate & FD_ERRTBL_LOG) && u_sess->storage_cxt.VfdCache[i].fileName != NULL) {
|
||||
@ -3120,7 +3344,7 @@ void RemovePgTempFiles(void)
|
||||
*/
|
||||
rc = snprintf_s(temp_path, sizeof(temp_path), sizeof(temp_path) - 1, "base/%s", PG_TEMP_FILES_DIR);
|
||||
securec_check_ss(rc, "", "");
|
||||
RemovePgTempFilesInDir(temp_path);
|
||||
RemovePgTempFilesInDir(temp_path, false);
|
||||
RemovePgTempRelationFiles("base");
|
||||
|
||||
/*
|
||||
@ -3163,7 +3387,7 @@ void RemovePgTempFiles(void)
|
||||
PG_TEMP_FILES_DIR);
|
||||
securec_check_ss(rc, "", "");
|
||||
#endif
|
||||
RemovePgTempFilesInDir(temp_path);
|
||||
RemovePgTempFilesInDir(temp_path, false);
|
||||
|
||||
#ifdef PGXC
|
||||
/* Postgres-XC tablespaces include node name in path */
|
||||
@ -3194,12 +3418,26 @@ void RemovePgTempFiles(void)
|
||||
* t_thrd.proc_cxt.DataDir as well.
|
||||
*/
|
||||
#ifdef EXEC_BACKEND
|
||||
RemovePgTempFilesInDir(PG_TEMP_FILES_DIR);
|
||||
RemovePgTempFilesInDir(PG_TEMP_FILES_DIR, false);
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Process one pgsql_tmp directory for RemovePgTempFiles */
|
||||
static void RemovePgTempFilesInDir(const char* tmpdirname)
|
||||
/*
|
||||
* Process one pgsql_tmp directory for RemovePgTempFiles.
|
||||
*
|
||||
* If missing_ok is true, it's all right for the named directory to not exist.
|
||||
* Any other problem results in a LOG message. (missing_ok should be true at
|
||||
* the top level, since pgsql_tmp directories are not created until needed.)
|
||||
*
|
||||
* At the top level, this should be called with unlink_all = false, so that
|
||||
* only files matching the temporary name prefix will be unlinked. When
|
||||
* recursing it will be called with unlink_all = true to unlink everything
|
||||
* under a top-level temporary directory.
|
||||
*
|
||||
* (These two flags could be replaced by one, but it seems clearer to keep
|
||||
* them separate.)
|
||||
*/
|
||||
static void RemovePgTempFilesInDir(const char *tmpdirname, bool unlink_all)
|
||||
{
|
||||
DIR* temp_dir = NULL;
|
||||
struct dirent* temp_de = NULL;
|
||||
@ -3221,10 +3459,27 @@ static void RemovePgTempFilesInDir(const char* tmpdirname)
|
||||
rc = snprintf_s(rm_path, sizeof(rm_path), sizeof(rm_path) - 1, "%s/%s", tmpdirname, temp_de->d_name);
|
||||
securec_check_ss(rc, "", "");
|
||||
|
||||
if (strncmp(temp_de->d_name, PG_TEMP_FILE_PREFIX, strlen(PG_TEMP_FILE_PREFIX)) == 0)
|
||||
(void)unlink(rm_path); /* note we ignore any error */
|
||||
else
|
||||
if (unlink_all || strncmp(temp_de->d_name, PG_TEMP_FILE_PREFIX, strlen(PG_TEMP_FILE_PREFIX)) == 0) {
|
||||
struct stat statbuf;
|
||||
|
||||
if (lstat(rm_path, &statbuf) < 0) {
|
||||
ereport(LOG, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", rm_path)));
|
||||
continue;
|
||||
}
|
||||
|
||||
if (S_ISDIR(statbuf.st_mode)) {
|
||||
/* recursively remove contents, then directory itself */
|
||||
RemovePgTempFilesInDir(rm_path, true);
|
||||
|
||||
if (rmdir(rm_path) < 0) {
|
||||
ereport(LOG, (errcode_for_file_access(), errmsg("could not remove directory \"%s\": %m", rm_path)));
|
||||
}
|
||||
} else {
|
||||
(void)unlink(rm_path); /* note we ignore any error */
|
||||
}
|
||||
} else {
|
||||
ereport(LOG, (errmsg("unexpected file found in temporary-files directory: \"%s\"", rm_path)));
|
||||
}
|
||||
}
|
||||
|
||||
(void)FreeDir(temp_dir);
|
||||
@ -3349,6 +3604,72 @@ static bool looks_like_temp_rel_name(const char *name)
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* walkdir: recursively walk a directory, applying the action to each
|
||||
* regular file and directory (including the named directory itself).
|
||||
*
|
||||
* If process_symlinks is true, the action and recursion are also applied
|
||||
* to regular files and directories that are pointed to by symlinks in the
|
||||
* given directory; otherwise symlinks are ignored. Symlinks are always
|
||||
* ignored in subdirectories, ie we intentionally don't pass down the
|
||||
* process_symlinks flag to recursive calls.
|
||||
*
|
||||
* Errors are reported at level elevel, which might be ERROR or less.
|
||||
*
|
||||
* See also walkdir in initdb.c, which is a frontend version of this logic.
|
||||
*/
|
||||
static void walkdir(const char *path, void (*action)(const char *fname, bool isdir, int elevel), bool process_symlinks,
|
||||
int elevel)
|
||||
{
|
||||
DIR *dir;
|
||||
struct dirent *de;
|
||||
|
||||
dir = AllocateDir(path);
|
||||
|
||||
while ((de = ReadDirExtended(dir, path, elevel)) != NULL) {
|
||||
char subpath[MAXPGPATH * 2];
|
||||
struct stat fst;
|
||||
int sret;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int rc = sprintf_s(subpath, sizeof(subpath), "%s/%s", path, de->d_name);
|
||||
securec_check_ss(rc, "", "");
|
||||
|
||||
if (process_symlinks) {
|
||||
sret = stat(subpath, &fst);
|
||||
} else {
|
||||
sret = lstat(subpath, &fst);
|
||||
}
|
||||
|
||||
if (sret < 0) {
|
||||
ereport(elevel, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", subpath)));
|
||||
continue;
|
||||
}
|
||||
|
||||
if (S_ISREG(fst.st_mode)) {
|
||||
(*action)(subpath, false, elevel);
|
||||
} else if (S_ISDIR(fst.st_mode)) {
|
||||
walkdir(subpath, action, false, elevel);
|
||||
}
|
||||
}
|
||||
|
||||
(void)FreeDir(dir); /* we ignore any error here */
|
||||
|
||||
/*
|
||||
* It's important to fsync the destination directory itself as individual
|
||||
* file fsyncs don't guarantee that the directory entry for the file is
|
||||
* synced. However, skip this if AllocateDir failed; the action function
|
||||
* might not be robust against that.
|
||||
*/
|
||||
if (dir)
|
||||
(*action)(path, true, elevel);
|
||||
}
|
||||
|
||||
void RemoveErrorCacheFiles()
|
||||
{
|
||||
DIR* temp_dir = NULL;
|
||||
@ -3484,3 +3805,14 @@ FileExistStatus CheckFileExists(const char* path)
|
||||
}
|
||||
}
|
||||
|
||||
static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel)
|
||||
{
|
||||
if (isdir) {
|
||||
if (rmdir(fname) != 0 && errno != ENOENT)
|
||||
ereport(elevel, (errcode_for_file_access(), errmsg("could not rmdir directory \"%s\": %m", fname)));
|
||||
} else {
|
||||
/* Use PathNameDeleteTemporaryFile to report filesize */
|
||||
(void)PathNameDeleteTemporaryFile(fname, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
230
src/gausskernel/storage/file/sharedfileset.cpp
Normal file
230
src/gausskernel/storage/file/sharedfileset.cpp
Normal file
@ -0,0 +1,230 @@
|
||||
/* -------------------------------------------------------------------------
|
||||
*
|
||||
* sharedfileset.c
|
||||
* Shared temporary file management.
|
||||
*
|
||||
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/backend/storage/file/sharedfileset.c
|
||||
*
|
||||
* SharedFileSets provide a temporary namespace (think directory) so that
|
||||
* files can be discovered by name, and a shared ownership semantics so that
|
||||
* shared files survive until the last user detaches.
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include <limits.h>
|
||||
|
||||
#include "access/hash.h"
|
||||
#include "catalog/pg_tablespace.h"
|
||||
#include "commands/tablespace.h"
|
||||
#include "miscadmin.h"
|
||||
#include "storage/dsm.h"
|
||||
#include "storage/sharedfileset.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/hashutils.h"
|
||||
|
||||
static void SharedFileSetOnDetach(void *segment, Datum datum);
|
||||
static void SharedFileSetPath(char *path, Size pathSize, const SharedFileSet *fileset, Oid tablespace);
|
||||
static void SharedFilePath(char *path, Size pathSize, const SharedFileSet *fileset, const char *name);
|
||||
static Oid ChooseTablespace(const SharedFileSet *fileset, const char *name);
|
||||
|
||||
/*
|
||||
* Initialize a space for temporary files that can be opened for read-only
|
||||
* access by other backends. Other backends must attach to it before
|
||||
* accessing it. Associate this SharedFileSet with 'seg'. Any contained
|
||||
* files will be deleted when the last backend detaches.
|
||||
*
|
||||
* Files will be distributed over the tablespaces configured in
|
||||
* temp_tablespaces.
|
||||
*
|
||||
* Under the covers the set is one or more directories which will eventually
|
||||
* be deleted when there are no backends attached.
|
||||
*/
|
||||
void SharedFileSetInit(SharedFileSet *fileset, void *seg)
|
||||
{
|
||||
static THR_LOCAL uint32 counter = 0;
|
||||
|
||||
SpinLockInit(&fileset->mutex);
|
||||
fileset->refcnt = 1;
|
||||
fileset->creator_pid = t_thrd.proc_cxt.MyProcPid;
|
||||
fileset->number = counter;
|
||||
counter = (counter + 1) % INT_MAX;
|
||||
|
||||
/* Capture the tablespace OIDs so that all backends agree on them. */
|
||||
PrepareTempTablespaces();
|
||||
fileset->ntablespaces = GetTempTablespaces(&fileset->tablespaces[0], lengthof(fileset->tablespaces));
|
||||
if (fileset->ntablespaces == 0) {
|
||||
fileset->tablespaces[0] = DEFAULTTABLESPACE_OID;
|
||||
fileset->ntablespaces = 1;
|
||||
}
|
||||
|
||||
/* Register our cleanup callback. */
|
||||
on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset));
|
||||
}
|
||||
|
||||
/*
|
||||
* Attach to a set of directories that was created with SharedFileSetInit.
|
||||
*/
|
||||
void SharedFileSetAttach(SharedFileSet *fileset, void *seg)
|
||||
{
|
||||
bool success = false;
|
||||
|
||||
SpinLockAcquire(&fileset->mutex);
|
||||
if (fileset->refcnt == 0) {
|
||||
success = false;
|
||||
} else {
|
||||
++fileset->refcnt;
|
||||
success = true;
|
||||
}
|
||||
SpinLockRelease(&fileset->mutex);
|
||||
|
||||
if (!success) {
|
||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("could not attach to a SharedFileSet that is already destroyed")));
|
||||
}
|
||||
|
||||
/* Register our cleanup callback. */
|
||||
on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset));
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a new file in the given set.
|
||||
*/
|
||||
File SharedFileSetCreate(const SharedFileSet *fileset, const char *name)
|
||||
{
|
||||
char path[MAXPGPATH];
|
||||
|
||||
SharedFilePath(path, MAXPGPATH, fileset, name);
|
||||
File file = PathNameCreateTemporaryFile(path, false);
|
||||
/* If we failed, see if we need to create the directory on demand. */
|
||||
if (file <= 0) {
|
||||
char tempdirpath[MAXPGPATH];
|
||||
char filesetpath[MAXPGPATH];
|
||||
Oid tablespace = ChooseTablespace(fileset, name);
|
||||
|
||||
TempTablespacePath(tempdirpath, MAXPGPATH, tablespace);
|
||||
SharedFileSetPath(filesetpath, MAXPGPATH, fileset, tablespace);
|
||||
PathNameCreateTemporaryDir(tempdirpath, filesetpath);
|
||||
file = PathNameCreateTemporaryFile(path, true);
|
||||
}
|
||||
|
||||
return file;
|
||||
}
|
||||
|
||||
/*
|
||||
* Open a file that was created with SharedFileSetCreate(), possibly in
|
||||
* another backend.
|
||||
*/
|
||||
File SharedFileSetOpen(const SharedFileSet *fileset, const char *name)
|
||||
{
|
||||
char path[MAXPGPATH];
|
||||
File file;
|
||||
|
||||
SharedFilePath(path, MAXPGPATH, fileset, name);
|
||||
file = PathNameOpenTemporaryFile(path);
|
||||
|
||||
return file;
|
||||
}
|
||||
|
||||
/*
|
||||
* Delete a file that was created with SharedFileSetCreate().
|
||||
* Return true if the file existed, false if didn't.
|
||||
*/
|
||||
bool SharedFileSetDelete(const SharedFileSet *fileset, const char *name, bool error_on_failure)
|
||||
{
|
||||
char path[MAXPGPATH];
|
||||
|
||||
SharedFilePath(path, MAXPGPATH, fileset, name);
|
||||
|
||||
return PathNameDeleteTemporaryFile(path, error_on_failure);
|
||||
}
|
||||
|
||||
/*
|
||||
* Delete all files in the set.
|
||||
*/
|
||||
void SharedFileSetDeleteAll(const SharedFileSet *fileset)
|
||||
{
|
||||
char dirpath[MAXPGPATH];
|
||||
int i;
|
||||
|
||||
/*
|
||||
* Delete the directory we created in each tablespace. Doesn't fail
|
||||
* because we use this in error cleanup paths, but can generate LOG
|
||||
* message on IO error.
|
||||
*/
|
||||
for (i = 0; i < fileset->ntablespaces; ++i) {
|
||||
SharedFileSetPath(dirpath, MAXPGPATH, fileset, fileset->tablespaces[i]);
|
||||
PathNameDeleteTemporaryDir(dirpath);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Callback function that will be invoked when this backend detaches from a
|
||||
* DSM segment holding a SharedFileSet that it has created or attached to. If
|
||||
* we are the last to detach, then try to remove the directories and
|
||||
* everything in them. We can't raise an error on failures, because this runs
|
||||
* in error cleanup paths.
|
||||
*/
|
||||
static void SharedFileSetOnDetach(void *segment, Datum datum)
|
||||
{
|
||||
bool unlink_all = false;
|
||||
SharedFileSet *fileset = (SharedFileSet *)DatumGetPointer(datum);
|
||||
|
||||
SpinLockAcquire(&fileset->mutex);
|
||||
Assert(fileset->refcnt > 0);
|
||||
if (--fileset->refcnt == 0)
|
||||
unlink_all = true;
|
||||
SpinLockRelease(&fileset->mutex);
|
||||
|
||||
/*
|
||||
* If we are the last to detach, we delete the directory in all
|
||||
* tablespaces. Note that we are still actually attached for the rest of
|
||||
* this function so we can safely access its data.
|
||||
*/
|
||||
if (unlink_all)
|
||||
SharedFileSetDeleteAll(fileset);
|
||||
}
|
||||
|
||||
/*
|
||||
* Build the path for the directory holding the files backing a SharedFileSet
|
||||
* in a given tablespace.
|
||||
*/
|
||||
static void SharedFileSetPath(char *path, Size pathSize, const SharedFileSet *fileset, Oid tablespace)
|
||||
{
|
||||
char tempdirpath[MAXPGPATH];
|
||||
|
||||
TempTablespacePath(tempdirpath, MAXPGPATH, tablespace);
|
||||
int rc = sprintf_s(path, pathSize, "%s/%s%lu.%u.sharedfileset", tempdirpath, PG_TEMP_FILE_PREFIX,
|
||||
(unsigned long)fileset->creator_pid, fileset->number);
|
||||
securec_check_ss(rc, "", "");
|
||||
}
|
||||
|
||||
/*
|
||||
* Sorting hat to determine which tablespace a given shared temporary file
|
||||
* belongs in.
|
||||
*/
|
||||
static Oid ChooseTablespace(const SharedFileSet *fileset, const char *name)
|
||||
{
|
||||
uint32 hash = hash_any((const unsigned char *)name, strlen(name));
|
||||
|
||||
return fileset->tablespaces[hash % fileset->ntablespaces];
|
||||
}
|
||||
|
||||
/*
|
||||
* Compute the full path of a file in a SharedFileSet.
|
||||
*/
|
||||
static void SharedFilePath(char *path, Size pathSize, const SharedFileSet *fileset, const char *name)
|
||||
{
|
||||
char dirpath[MAXPGPATH];
|
||||
|
||||
SharedFileSetPath(dirpath, MAXPGPATH, fileset, ChooseTablespace(fileset, name));
|
||||
int rc = sprintf_s(path, pathSize, "%s/%s", dirpath, name);
|
||||
securec_check_ss(rc, "", "");
|
||||
}
|
||||
|
||||
@ -26,6 +26,8 @@
|
||||
#ifndef BUFFILE_H
|
||||
#define BUFFILE_H
|
||||
|
||||
#include "storage/sharedfileset.h"
|
||||
|
||||
/* BufFile is an opaque type whose details are not known outside buffile.c. */
|
||||
|
||||
typedef struct BufFile BufFile;
|
||||
@ -42,4 +44,9 @@ extern int BufFileSeek(BufFile* file, int fileno, off_t offset, int whence);
|
||||
extern void BufFileTell(BufFile* file, int* fileno, off_t* offset);
|
||||
extern int BufFileSeekBlock(BufFile* file, long blknum);
|
||||
|
||||
extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name);
|
||||
extern void BufFileExportShared(BufFile *file);
|
||||
extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name);
|
||||
extern void BufFileDeleteShared(const SharedFileSet *fileset, const char *name);
|
||||
|
||||
#endif /* BUFFILE_H */
|
||||
|
||||
@ -80,6 +80,14 @@ extern int FileTruncate(File file, off_t offset, uint32 wait_event_info = 0);
|
||||
extern void FileWriteback(File file, off_t offset, off_t nbytes);
|
||||
extern char* FilePathName(File file);
|
||||
|
||||
/* Operations used for sharing named temporary files */
|
||||
extern File PathNameCreateTemporaryFile(char *name, bool error_on_failure);
|
||||
extern File PathNameOpenTemporaryFile(char *name);
|
||||
extern bool PathNameDeleteTemporaryFile(const char *name, bool error_on_failure);
|
||||
extern void PathNameCreateTemporaryDir(const char *base, const char *name);
|
||||
extern void PathNameDeleteTemporaryDir(const char *name);
|
||||
extern void TempTablespacePath(char *path, Size pathSize, Oid tablespace);
|
||||
|
||||
extern void FileAsyncCUClose(File* vfdList, int32 vfdnum);
|
||||
extern int FileAsyncRead(AioDispatchDesc_t** dList, int32 dn);
|
||||
extern int FileAsyncWrite(AioDispatchDesc_t** dList, int32 dn);
|
||||
@ -121,6 +129,7 @@ extern void CloseGaussPidDir(void);
|
||||
extern void closeAllVfds(void);
|
||||
extern void SetTempTablespaces(Oid* tableSpaces, int numSpaces);
|
||||
extern bool TempTablespacesAreSet(void);
|
||||
extern int GetTempTablespaces(Oid *tableSpaces, int numSpaces);
|
||||
extern Oid GetNextTempTableSpace(void);
|
||||
extern void AtEOXact_Files(void);
|
||||
extern void AtEOSubXact_Files(bool isCommit, SubTransactionId mySubid, SubTransactionId parentSubid);
|
||||
|
||||
40
src/include/storage/sharedfileset.h
Normal file
40
src/include/storage/sharedfileset.h
Normal file
@ -0,0 +1,40 @@
|
||||
/* -------------------------------------------------------------------------
|
||||
*
|
||||
* sharedfileset.h
|
||||
* Shared temporary file management.
|
||||
*
|
||||
*
|
||||
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* src/include/storage/sharedfileset.h
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef SHAREDFILESET_H
|
||||
#define SHAREDFILESET_H
|
||||
|
||||
#include "storage/fd.h"
|
||||
#include "storage/spin.h"
|
||||
|
||||
/*
|
||||
* A set of temporary files that can be shared by multiple backends.
|
||||
*/
|
||||
typedef struct SharedFileSet {
|
||||
ThreadId creator_pid; /* PID of the creating process */
|
||||
uint32 number; /* per-PID identifier */
|
||||
slock_t mutex; /* mutex protecting the reference count */
|
||||
int refcnt; /* number of attached backends */
|
||||
int ntablespaces; /* number of tablespaces to use */
|
||||
Oid tablespaces[8]; /* OIDs of tablespaces to use. Assumes that it's rare that there more than temp tablespaces */
|
||||
} SharedFileSet;
|
||||
|
||||
extern void SharedFileSetInit(SharedFileSet *fileset, void *dsm_seg);
|
||||
extern void SharedFileSetAttach(SharedFileSet *fileset, void *dsm_seg);
|
||||
extern File SharedFileSetCreate(const SharedFileSet *fileset, const char *name);
|
||||
extern File SharedFileSetOpen(const SharedFileSet *fileset, const char *name);
|
||||
extern bool SharedFileSetDelete(const SharedFileSet *fileset, const char *name, bool error_on_failure);
|
||||
extern void SharedFileSetDeleteAll(const SharedFileSet *fileset);
|
||||
|
||||
#endif
|
||||
Reference in New Issue
Block a user