Add sync_dir interface to Env (#2884)
when we need to ensure that **a newly-created file** is fully synchronized back to disk, we should call `fsync()` on the parent directory—that is, the directory containing the newly-created file. That is to say, In this situation, we should call `fsync()` on both the newly-created file and its parent directory. Unfortunately, currently in Doris, in any scenario, directories are not fsynced. This patch adds `sync_dir()` interface first, laying the groundwork for future fixes. This patch also removes unneeded private method `dir_exists()`.
This commit is contained in:
@ -128,15 +128,15 @@ public:
|
||||
}
|
||||
|
||||
bool ok() const { return _state == nullptr; }
|
||||
|
||||
bool is_cancelled() const { return code() == TStatusCode::CANCELLED; }
|
||||
bool is_mem_limit_exceeded() const { return code() == TStatusCode::MEM_LIMIT_EXCEEDED; }
|
||||
bool is_thrift_rpc_error() const { return code() == TStatusCode::THRIFT_RPC_ERROR; }
|
||||
|
||||
bool is_end_of_file() const { return code() == TStatusCode::END_OF_FILE; }
|
||||
|
||||
bool is_not_found() const { return code() == TStatusCode::NOT_FOUND; }
|
||||
|
||||
bool is_already_exist() const { return code() == TStatusCode::ALREADY_EXIST; }
|
||||
bool is_io_error() const {return code() == TStatusCode::IO_ERROR; }
|
||||
|
||||
// Convert into TStatus. Call this if 'status_container' contains an optional
|
||||
// TStatus field named 'status'. This also sets __isset.status.
|
||||
template <typename T>
|
||||
|
||||
39
be/src/env/env.h
vendored
39
be/src/env/env.h
vendored
@ -83,7 +83,6 @@ public:
|
||||
virtual Status new_writable_file(const std::string& fname,
|
||||
std::unique_ptr<WritableFile>* result) = 0;
|
||||
|
||||
|
||||
// Like the previous new_writable_file, but allows options to be
|
||||
// specified.
|
||||
virtual Status new_writable_file(const WritableFileOptions& opts,
|
||||
@ -139,9 +138,21 @@ public:
|
||||
// Delete the named file.
|
||||
virtual Status delete_file(const std::string& fname) = 0;
|
||||
|
||||
// Create the specified directory. Returns error if directory exists.
|
||||
// Create the specified directory.
|
||||
// NOTE: It will return error if the path already exist(not necessarily as a directory)
|
||||
virtual Status create_dir(const std::string& dirname) = 0;
|
||||
|
||||
// Creates directory if missing.
|
||||
// Return OK if it exists, or successful in Creating.
|
||||
virtual Status create_dir_if_missing(const std::string& dirname, bool* created = nullptr) = 0;
|
||||
|
||||
// Delete the specified directory.
|
||||
// NOTE: The dir must be empty.
|
||||
virtual Status delete_dir(const std::string& dirname) = 0;
|
||||
|
||||
// Synchronize the entry for a specific directory.
|
||||
virtual Status sync_dir(const std::string& dirname) = 0;
|
||||
|
||||
// Checks if the file is a directory. Returns an error if it doesn't
|
||||
// exist, otherwise writes true or false into 'is_dir' appropriately.
|
||||
virtual Status is_directory(const std::string& path, bool* is_dir) = 0;
|
||||
@ -154,13 +165,6 @@ public:
|
||||
// All directory entries in 'path' must exist on the filesystem.
|
||||
virtual Status canonicalize(const std::string& path, std::string* result) = 0;
|
||||
|
||||
// Creates directory if missing. Return Ok if it exists, or successful in
|
||||
// Creating.
|
||||
virtual Status create_dir_if_missing(const std::string& dirname) = 0;
|
||||
|
||||
// Delete the specified directory.
|
||||
virtual Status delete_dir(const std::string& dirname) = 0;
|
||||
|
||||
virtual Status get_file_size(const std::string& fname, uint64_t* size) = 0;
|
||||
|
||||
// Store the last modification time of fname in *file_mtime.
|
||||
@ -170,11 +174,9 @@ public:
|
||||
virtual Status rename_file(const std::string& src,
|
||||
const std::string& target) = 0;
|
||||
|
||||
// Hard Link file src to target.
|
||||
virtual Status link_file(const std::string& /*src*/,
|
||||
const std::string& /*target*/) {
|
||||
return Status::NotSupported("link file is not supported for this Env");
|
||||
}
|
||||
// create a hard-link
|
||||
virtual Status link_file(const std::string& /*old_path*/,
|
||||
const std::string& /*new_path*/) = 0;
|
||||
};
|
||||
|
||||
struct RandomAccessFileOptions {
|
||||
@ -265,6 +267,8 @@ public:
|
||||
// A file abstraction for sequential writing. The implementation
|
||||
// must provide buffering since callers may append small fragments
|
||||
// at a time to the file.
|
||||
// Note: To avoid user misuse, WritableFile's API should support only
|
||||
// one of Append or PositionedAppend. We support only Append here.
|
||||
class WritableFile {
|
||||
public:
|
||||
enum FlushMode {
|
||||
@ -276,10 +280,13 @@ public:
|
||||
virtual ~WritableFile() { }
|
||||
|
||||
// Append data to the end of the file
|
||||
// Note: A WritableFile object must support either Append or
|
||||
// PositionedAppend, so the users cannot mix the two.
|
||||
virtual Status append(const Slice& data) = 0;
|
||||
|
||||
// If possible, uses scatter-gather I/O to efficiently append
|
||||
// multiple buffers to a file. Otherwise, falls back to regular I/O.
|
||||
//
|
||||
// For implementation specific quirks and details, see comments in
|
||||
// implementation source code (e.g., env_posix.cc)
|
||||
virtual Status appendv(const Slice* data, size_t cnt) = 0;
|
||||
|
||||
// Pre-allocates 'size' bytes for the file in the underlying filesystem.
|
||||
|
||||
109
be/src/env/env_posix.cpp
vendored
109
be/src/env/env_posix.cpp
vendored
@ -32,6 +32,23 @@ namespace doris {
|
||||
using std::string;
|
||||
using strings::Substitute;
|
||||
|
||||
// Close file descriptor when object goes out of scope.
|
||||
class ScopedFdCloser {
|
||||
public:
|
||||
explicit ScopedFdCloser(int fd) : fd_(fd) {}
|
||||
|
||||
~ScopedFdCloser() {
|
||||
int err;
|
||||
RETRY_ON_EINTR(err, ::close(fd_));
|
||||
if (PREDICT_FALSE(err != 0)) {
|
||||
LOG(WARNING) << "Failed to close fd " << fd_;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
const int fd_;
|
||||
};
|
||||
|
||||
static Status io_error(const std::string& context, int err_number) {
|
||||
switch (err_number) {
|
||||
case EACCES:
|
||||
@ -51,7 +68,7 @@ static Status io_error(const std::string& context, int err_number) {
|
||||
return Status::IOError(context, err_number, errno_to_string(err_number));
|
||||
}
|
||||
|
||||
Status do_sync(int fd, const string& filename) {
|
||||
static Status do_sync(int fd, const string& filename) {
|
||||
if (fdatasync(fd) < 0) {
|
||||
return io_error(filename, errno);
|
||||
}
|
||||
@ -595,7 +612,6 @@ public:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Create the specified directory. Returns error if directory exists.
|
||||
Status create_dir(const std::string& name) override {
|
||||
if (mkdir(name.c_str(), 0755) != 0) {
|
||||
return io_error(name, errno);
|
||||
@ -603,6 +619,46 @@ public:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status create_dir_if_missing(const string& dirname, bool* created = nullptr) override {
|
||||
Status s = create_dir(dirname);
|
||||
if (created != nullptr) {
|
||||
*created = s.ok();
|
||||
}
|
||||
|
||||
// Check that dirname is actually a directory.
|
||||
if (s.is_already_exist()) {
|
||||
bool is_dir = false;
|
||||
RETURN_IF_ERROR(is_directory(dirname, &is_dir));
|
||||
if (is_dir) {
|
||||
return Status::OK();
|
||||
} else {
|
||||
return s.clone_and_append("path already exists but not a dir");
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
// Delete the specified directory.
|
||||
Status delete_dir(const std::string& dirname) override {
|
||||
if (rmdir(dirname.c_str()) != 0) {
|
||||
return io_error(dirname, errno);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status sync_dir(const string& dirname) override {
|
||||
int dir_fd;
|
||||
RETRY_ON_EINTR(dir_fd, open(dirname.c_str(), O_DIRECTORY|O_RDONLY));
|
||||
if (dir_fd < 0) {
|
||||
return io_error(dirname, errno);
|
||||
}
|
||||
ScopedFdCloser fd_closer(dir_fd);
|
||||
if (fsync(dir_fd) != 0) {
|
||||
return io_error(dirname, errno);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status is_directory(const std::string& path, bool* is_dir) override {
|
||||
struct stat path_stat;
|
||||
if (stat(path.c_str(), &path_stat) != 0) {
|
||||
@ -615,6 +671,8 @@ public:
|
||||
}
|
||||
|
||||
Status canonicalize(const std::string& path, std::string* result) override {
|
||||
// NOTE: we must use free() to release the buffer retruned by realpath(),
|
||||
// because the buffer is allocated by malloc(), see `man 3 realpath`.
|
||||
std::unique_ptr<char[], FreeDeleter> r(realpath(path.c_str(), nullptr));
|
||||
if (r == nullptr) {
|
||||
return io_error(Substitute("Unable to canonicalize $0", path), errno);
|
||||
@ -623,29 +681,6 @@ public:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Creates directory if missing. Return Ok if it exists, or successful in
|
||||
// Creating.
|
||||
Status create_dir_if_missing(const std::string& name) override {
|
||||
if (mkdir(name.c_str(), 0755) != 0) {
|
||||
if (errno != EEXIST) {
|
||||
return io_error(name, errno);
|
||||
} else if (!dir_exists(name)) { // Check that name is actually a
|
||||
// directory.
|
||||
// Message is taken from mkdir
|
||||
return Status::IOError(name + " exists but is not a directory");
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Delete the specified directory.
|
||||
Status delete_dir(const std::string& dirname) override {
|
||||
if (rmdir(dirname.c_str()) != 0) {
|
||||
return io_error(dirname, errno);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status get_file_size(const string& fname, uint64_t* size) override {
|
||||
struct stat sbuf;
|
||||
if (stat(fname.c_str(), &sbuf) != 0) {
|
||||
@ -656,8 +691,7 @@ public:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status get_file_modified_time(const std::string& fname,
|
||||
uint64_t* file_mtime) override {
|
||||
Status get_file_modified_time(const std::string& fname, uint64_t* file_mtime) override {
|
||||
struct stat s;
|
||||
if (stat(fname.c_str(), &s) !=0) {
|
||||
return io_error(fname, errno);
|
||||
@ -673,27 +707,18 @@ public:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status link_file(const std::string& src, const std::string& target) override {
|
||||
if (link(src.c_str(), target.c_str()) != 0) {
|
||||
return io_error(src, errno);
|
||||
Status link_file(const std::string& old_path, const std::string& new_path) override {
|
||||
if (link(old_path.c_str(), new_path.c_str()) != 0) {
|
||||
return io_error(old_path, errno);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
private:
|
||||
bool dir_exists(const std::string& dname) {
|
||||
struct stat statbuf;
|
||||
if (stat(dname.c_str(), &statbuf) == 0) {
|
||||
return S_ISDIR(statbuf.st_mode);
|
||||
}
|
||||
return false; // stat() failed return false
|
||||
}
|
||||
};
|
||||
|
||||
// Default Posix Env
|
||||
Env* Env::Default() {
|
||||
static PosixEnv default_env;
|
||||
return &default_env;
|
||||
static PosixEnv default_env;
|
||||
return &default_env;
|
||||
}
|
||||
|
||||
}
|
||||
} // end namespace doris
|
||||
|
||||
Reference in New Issue
Block a user