Add Env for file operation (#1321)

This commit is contained in:
ZHAO Chun
2019-06-17 10:18:16 +08:00
committed by Mingyu Chen
parent ba44249f80
commit a0294b8f40
15 changed files with 1407 additions and 46 deletions

View File

@ -359,6 +359,15 @@ Parts of be/src/runtime/string_search.hpp: Python Software License V2
--------------------------------------------------------------------------------
be/src/env (some portions): 3-clause BSD
Some portions of this module are derived from code from RocksDB
( https://github.com/facebook/rocksdb ). RocksDB is dual-licensed
under both the GPLv2 and Apache 2.0 License. We select Apache 2.0
License.
--------------------------------------------------------------------------------
be/src/util/coding.*, be/src/util/status.*: 3-clause BSD
Copyright (c) 2011 The LevelDB Authors. All rights reserved.

View File

@ -447,6 +447,7 @@ set(DORIS_LINK_LIBS
Agent
CodeGen
Common
Env
Exec
Exprs
Gutil
@ -555,23 +556,24 @@ if (${MAKE_TEST} STREQUAL "ON")
add_definitions(-DBE_TEST)
endif ()
add_subdirectory(${SRC_DIR}/agent)
add_subdirectory(${SRC_DIR}/codegen)
add_subdirectory(${SRC_DIR}/common)
add_subdirectory(${SRC_DIR}/util)
add_subdirectory(${SRC_DIR}/gen_cpp)
add_subdirectory(${SRC_DIR}/gutil)
add_subdirectory(${SRC_DIR}/olap)
add_subdirectory(${SRC_DIR}/agent)
add_subdirectory(${SRC_DIR}/http)
add_subdirectory(${SRC_DIR}/service)
add_subdirectory(${SRC_DIR}/env)
add_subdirectory(${SRC_DIR}/exec)
add_subdirectory(${SRC_DIR}/exprs)
add_subdirectory(${SRC_DIR}/udf)
add_subdirectory(${SRC_DIR}/gen_cpp)
add_subdirectory(${SRC_DIR}/geo)
add_subdirectory(${SRC_DIR}/gutil)
add_subdirectory(${SRC_DIR}/http)
add_subdirectory(${SRC_DIR}/olap)
add_subdirectory(${SRC_DIR}/runtime)
add_subdirectory(${SRC_DIR}/service)
add_subdirectory(${SRC_DIR}/testutil)
add_subdirectory(${SRC_DIR}/tools)
add_subdirectory(${SRC_DIR}/udf)
add_subdirectory(${SRC_DIR}/udf_samples)
add_subdirectory(${SRC_DIR}/geo)
add_subdirectory(${SRC_DIR}/util)
# Utility CMake function to make specifying tests and benchmarks less verbose
FUNCTION(ADD_BE_TEST TEST_NAME)
@ -592,15 +594,16 @@ ENDFUNCTION()
if (${MAKE_TEST} STREQUAL "ON")
add_subdirectory(${TEST_DIR}/agent)
add_subdirectory(${TEST_DIR}/olap)
add_subdirectory(${TEST_DIR}/common)
add_subdirectory(${TEST_DIR}/util)
add_subdirectory(${TEST_DIR}/udf)
add_subdirectory(${TEST_DIR}/env)
add_subdirectory(${TEST_DIR}/exec)
add_subdirectory(${TEST_DIR}/exprs)
add_subdirectory(${TEST_DIR}/runtime)
add_subdirectory(${TEST_DIR}/http)
add_subdirectory(${TEST_DIR}/geo)
add_subdirectory(${TEST_DIR}/http)
add_subdirectory(${TEST_DIR}/olap)
add_subdirectory(${TEST_DIR}/runtime)
add_subdirectory(${TEST_DIR}/udf)
add_subdirectory(${TEST_DIR}/util)
endif ()
# Install be

View File

@ -111,7 +111,7 @@ std::string Status::code_as_string() const {
case TStatusCode::CANCELLED:
return "Cancelled";
case TStatusCode::NOT_IMPLEMENTED_ERROR:
return "Not implemented";
return "Not supported";
case TStatusCode::RUNTIME_ERROR:
return "Runtime error";
case TStatusCode::MEM_LIMIT_EXCEEDED:

View File

@ -71,55 +71,70 @@ public:
static Status OK() { return Status(); }
static Status PublishTimeout(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::PUBLISH_TIMEOUT, msg, sub_code, msg2);
static Status PublishTimeout(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::PUBLISH_TIMEOUT, msg, precise_code, msg2);
}
static Status MemoryAllocFailed(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::MEM_ALLOC_FAILED, msg, sub_code, msg2);
static Status MemoryAllocFailed(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::MEM_ALLOC_FAILED, msg, precise_code, msg2);
}
static Status BufferAllocFailed(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::BUFFER_ALLOCATION_FAILED, msg, sub_code, msg2);
static Status BufferAllocFailed(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::BUFFER_ALLOCATION_FAILED, msg, precise_code, msg2);
}
static Status InvalidArgument(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::INVALID_ARGUMENT, msg, sub_code, msg2);
static Status InvalidArgument(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::INVALID_ARGUMENT, msg, precise_code, msg2);
}
static Status MinimumReservationUnavailable(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::INVALID_ARGUMENT, msg, sub_code, msg2);
static Status MinimumReservationUnavailable(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::INVALID_ARGUMENT, msg, precise_code, msg2);
}
static Status IoError(const Slice& msg,
int16_t sub_code = 1,
static Status IOError(const Slice& msg,
int16_t precise_code = 1,
const Slice& msg2 = Slice()) {
return Status(TStatusCode::IO_ERROR, msg, sub_code, msg2);
return Status(TStatusCode::IO_ERROR, msg, precise_code, msg2);
}
static Status NotFound(const Slice& msg,
int16_t precise_code = 1,
const Slice& msg2 = Slice()) {
return Status(TStatusCode::NOT_FOUND, msg, precise_code, msg2);
}
static Status AlreadyExist(const Slice& msg,
int16_t precise_code = 1,
const Slice& msg2 = Slice()) {
return Status(TStatusCode::ALREADY_EXIST, msg, precise_code, msg2);
}
static Status NotSupported(const Slice& msg,
int16_t precise_code = 1,
const Slice& msg2 = Slice()) {
return Status(TStatusCode::NOT_IMPLEMENTED_ERROR, msg, precise_code, msg2);
}
static Status EndOfFile(const Slice& msg,
int16_t sub_code = 1,
int16_t precise_code = 1,
const Slice& msg2 = Slice()) {
return Status(TStatusCode::END_OF_FILE, msg, sub_code, msg2);
return Status(TStatusCode::END_OF_FILE, msg, precise_code, msg2);
}
static Status InternalError(const Slice& msg,
int16_t sub_code = 1,
int16_t precise_code = 1,
const Slice& msg2 = Slice()) {
return Status(TStatusCode::INTERNAL_ERROR, msg, sub_code, msg2);
return Status(TStatusCode::INTERNAL_ERROR, msg, precise_code, msg2);
}
static Status RuntimeError(const Slice& msg,
int16_t sub_code = 1,
int16_t precise_code = 1,
const Slice& msg2 = Slice()) {
return Status(TStatusCode::RUNTIME_ERROR, msg, sub_code, msg2);
return Status(TStatusCode::RUNTIME_ERROR, msg, precise_code, msg2);
}
static Status Cancelled(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::CANCELLED, msg, sub_code, msg2);
static Status Cancelled(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::CANCELLED, msg, precise_code, msg2);
}
static Status MemoryLimitExceeded(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::MEM_LIMIT_EXCEEDED, msg, sub_code, msg2);
static Status MemoryLimitExceeded(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::MEM_LIMIT_EXCEEDED, msg, precise_code, msg2);
}
static Status ThriftRpcError(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::THRIFT_RPC_ERROR, msg, sub_code, msg2);
static Status ThriftRpcError(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::THRIFT_RPC_ERROR, msg, precise_code, msg2);
}
static Status TimedOut(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::TIMEOUT, msg, sub_code, msg2);
static Status TimedOut(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) {
return Status(TStatusCode::TIMEOUT, msg, precise_code, msg2);
}
bool ok() const { return _state == nullptr; }
@ -198,7 +213,7 @@ public:
private:
const char* copy_state(const char* state);
Status(TStatusCode::type code, const Slice& msg, int16_t sub_code, const Slice& msg2);
Status(TStatusCode::type code, const Slice& msg, int16_t precise_code, const Slice& msg2);
private:
// OK status has a nullptr _state. Otherwise, _state is a new[] array
@ -213,7 +228,7 @@ private:
// some generally useful macros
#define RETURN_IF_ERROR(stmt) \
do { \
Status _status_ = (stmt); \
const Status& _status_ = (stmt); \
if (UNLIKELY(!_status_.ok())) { \
return _status_; \
} \
@ -229,7 +244,7 @@ private:
#define EXIT_IF_ERROR(stmt) \
do { \
Status _status_ = (stmt); \
const Status& _status_ = (stmt); \
if (UNLIKELY(!_status_.ok())) { \
string msg = _status_.get_error_msg(); \
LOG(ERROR) << msg; \
@ -237,6 +252,15 @@ private:
} \
} while (false)
/// @brief Emit a warning if @c to_call returns a bad status.
#define WARN_IF_ERROR(to_call, warning_prefix) \
do { \
const Status& _s = (to_call); \
if (PREDICT_FALSE(!_s.ok())) { \
LOG(WARNING) << (warning_prefix) << ": " << _s.to_string(); \
} \
} while (0);
}
#define WARN_UNUSED_RESULT __attribute__((warn_unused_result))

26
be/src/env/CMakeLists.txt vendored Normal file
View File

@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# where to put generated libraries
set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/env")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/env")
add_library(Env STATIC
env_posix.cpp
)

297
be/src/env/env.h vendored Normal file
View File

@ -0,0 +1,297 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors
#pragma once
#include <string>
#include <memory>
#include "common/status.h"
#include "util/slice.h"
namespace doris {
class RandomAccessFile;
class RandomRWFile;
class WritableFile;
class SequentialFile;
class WritableFileOptions;
class RandomAccessFileOptions;
class RandomRWFileOptions;
class Env {
public:
// Governs if/how the file is created.
//
// enum value | file exists | file does not exist
// --------------------------------+-------------------+--------------------
// CREATE_IF_NON_EXISTING_TRUNCATE | opens + truncates | creates
// CREATE_NON_EXISTING | fails | creates
// OPEN_EXISTING | opens | fails
enum CreateMode {
CREATE_IF_NON_EXISTING_TRUNCATE,
CREATE_NON_EXISTING,
OPEN_EXISTING
};
Env() { }
virtual ~Env() { }
// Return a default environment suitable for the current operating
// system. Sophisticated users may wish to provide their own Env
// implementation instead of relying on this default environment.
static Env* Default();
// Create a brand new sequentially-readable file with the specified name.
// On success, stores a pointer to the new file in *result and returns OK.
// On failure stores NULL in *result and returns non-OK. If the file does
// not exist, returns a non-OK status.
//
// The returned file will only be accessed by one thread at a time.
virtual Status new_sequential_file(const std::string& fname,
std::unique_ptr<SequentialFile>* result) = 0;
// Create a brand new random access read-only file with the
// specified name. On success, stores a pointer to the new file in
// *result and returns OK. On failure stores nullptr in *result and
// returns non-OK. If the file does not exist, returns a non-OK
// status.
//
// The returned file may be concurrently accessed by multiple threads.
virtual Status new_random_access_file(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result) = 0;
virtual Status new_random_access_file(const RandomAccessFileOptions& opts,
const std::string& fname,
std::unique_ptr<RandomAccessFile>* result) = 0;
// Create an object that writes to a new file with the specified
// name. Deletes any existing file with the same name and creates a
// new file. On success, stores a pointer to the new file in
// *result and returns OK. On failure stores NULL in *result and
// returns non-OK.
//
// The returned file will only be accessed by one thread at a time.
virtual Status new_writable_file(const std::string& fname,
std::unique_ptr<WritableFile>* result) = 0;
// Like the previous new_writable_file, but allows options to be
// specified.
virtual Status new_writable_file(const WritableFileOptions& opts,
const std::string& fname,
std::unique_ptr<WritableFile>* result) = 0;
// Creates a new readable and writable file. If a file with the same name
// already exists on disk, it is deleted.
//
// Some of the methods of the new file may be accessed concurrently,
// while others are only safe for access by one thread at a time.
virtual Status new_random_rw_file(const std::string& fname,
std::unique_ptr<RandomRWFile>* result) = 0;
// Like the previous new_random_rw_file, but allows options to be specified.
virtual Status new_random_rw_file(const RandomRWFileOptions& opts,
const std::string& fname,
std::unique_ptr<RandomRWFile>* result) = 0;
// Returns OK if the named file exists.
// NotFound if the named file does not exist,
// the calling process does not have permission to determine
// whether this file exists, or if the path is invalid.
// IOError if an IO Error was encountered
virtual Status file_exists(const std::string& fname) = 0;
// Store in *result the names of the children of the specified directory.
// The names are relative to "dir".
// Original contents of *results are dropped.
// Returns OK if "dir" exists and "*result" contains its children.
// NotFound if "dir" does not exist, the calling process does not have
// permission to access "dir", or if "dir" is invalid.
// IOError if an IO Error was encountered
virtual Status get_children(const std::string& dir,
std::vector<std::string>* result) = 0;
// Delete the named file.
virtual Status delete_file(const std::string& fname) = 0;
// Create the specified directory. Returns error if directory exists.
virtual Status create_dir(const std::string& dirname) = 0;
// Creates directory if missing. Return Ok if it exists, or successful in
// Creating.
virtual Status create_dir_if_missing(const std::string& dirname) = 0;
// Delete the specified directory.
virtual Status delete_dir(const std::string& dirname) = 0;
virtual Status get_file_size(const std::string& fname, uint64_t* size) = 0;
// Store the last modification time of fname in *file_mtime.
virtual Status get_file_modification_time(const std::string& fname,
uint64_t* file_mtime) = 0;
// Rename file src to target.
virtual Status rename_file(const std::string& src,
const std::string& target) = 0;
// Hard Link file src to target.
virtual Status link_file(const std::string& /*src*/,
const std::string& /*target*/) {
return Status::NotSupported("link file is not supported for this Env");
}
};
struct RandomAccessFileOptions {
RandomAccessFileOptions() { }
};
// Creation-time options for WritableFile
struct WritableFileOptions {
// Call Sync() during Close().
bool sync_on_close = false;
// See CreateMode for details.
Env::CreateMode mode = Env::CREATE_IF_NON_EXISTING_TRUNCATE;
};
// Creation-time options for RWFile
struct RandomRWFileOptions {
// Call Sync() during Close().
bool sync_on_close = false;
// See CreateMode for details.
Env::CreateMode mode = Env::CREATE_IF_NON_EXISTING_TRUNCATE;
};
// A file abstraction for reading sequentially through a file
class SequentialFile {
public:
SequentialFile() { }
virtual ~SequentialFile() { }
// Read up to "result.size" bytes from the file.
// Sets "result.data" to the data that was read.
//
// If an error was encountered, returns a non-OK status
// and the contents of "result" are invalid.
//
// REQUIRES: External synchronization
virtual Status read(Slice* result) = 0;
// Skip "n" bytes from the file. This is guaranteed to be no
// slower that reading the same data, but may be faster.
//
// If end of file is reached, skipping will stop at the end of the
// file, and Skip will return OK.
//
// REQUIRES: External synchronization
virtual Status skip(uint64_t n) = 0;
// Returns the filename provided when the SequentialFile was constructed.
virtual const std::string& filename() const = 0;
};
class RandomAccessFile {
public:
RandomAccessFile() { }
virtual ~RandomAccessFile() { }
// Read "result.size" bytes from the file starting at "offset".
// Copies the resulting data into "result.data".
//
// If an error was encountered, returns a non-OK status.
//
// This method will internally retry on EINTR and "short reads" in order to
// fully read the requested number of bytes. In the event that it is not
// possible to read exactly 'length' bytes, an IOError is returned.
//
// Safe for concurrent use by multiple threads.
virtual Status read_at(uint64_t offset, const Slice& result) const = 0;
// Reads up to the "results" aggregate size, based on each Slice's "size",
// from the file starting at 'offset'. The Slices must point to already-allocated
// buffers for the data to be written to.
//
// If an error was encountered, returns a non-OK status.
//
// This method will internally retry on EINTR and "short reads" in order to
// fully read the requested number of bytes. In the event that it is not
// possible to read exactly 'length' bytes, an IOError is returned.
//
// Safe for concurrent use by multiple threads.
virtual Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const = 0;
// Return the size of this file
virtual Status size(uint64_t* size) const = 0;
// Return name of this file
virtual const std::string& file_name() const = 0;
};
// A file abstraction for sequential writing. The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
class WritableFile {
public:
enum FlushMode {
FLUSH_SYNC,
FLUSH_ASYNC
};
WritableFile() { }
virtual ~WritableFile() { }
// Append data to the end of the file
// Note: A WritableFile object must support either Append or
// PositionedAppend, so the users cannot mix the two.
virtual Status append(const Slice& data) = 0;
virtual Status appendv(const Slice* data, size_t cnt) = 0;
virtual Status pre_allocate(uint64_t size) = 0;
virtual Status close() = 0;
virtual Status flush(FlushMode mode) = 0;
virtual Status sync() = 0; // sync data
virtual uint64_t size() const = 0;
// Returns the filename provided when the WritableFile was constructed.
virtual const std::string& filename() const = 0;
};
// A file abstraction for random reading and writing.
class RandomRWFile {
public:
enum FlushMode {
FLUSH_SYNC,
FLUSH_ASYNC
};
RandomRWFile() {}
virtual ~RandomRWFile() {}
virtual Status read_at(uint64_t offset, const Slice& result) const = 0;
virtual Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const = 0;
virtual Status write_at(uint64_t offset, const Slice& data) = 0;
virtual Status writev_at(uint64_t offset, const Slice* data, size_t data_cnt) = 0;
virtual Status flush(FlushMode mode, uint64_t offset, size_t length) = 0;
virtual Status sync() = 0;
virtual Status close() = 0;
virtual Status size(uint64_t* size) const = 0;
virtual const std::string& filename() const = 0;
};
}

654
be/src/env/env_posix.cpp vendored Normal file
View File

@ -0,0 +1,654 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors
#include "env/env.h"
#include <dirent.h>
#include <fcntl.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "common/logging.h"
#include "gutil/macros.h"
#include "gutil/port.h"
#include "gutil/strings/substitute.h"
#include "util/errno.h"
#include "util/slice.h"
namespace doris {
using std::string;
using strings::Substitute;
static Status io_error(const std::string& context, int err_number) {
switch (err_number) {
case EACCES:
case ELOOP:
case ENAMETOOLONG:
case ENOENT:
case ENOTDIR:
return Status::NotFound(context, 1, errno_to_string(err_number));
case EEXIST:
return Status::AlreadyExist(context, 1, errno_to_string(err_number));
case EOPNOTSUPP:
case EXDEV: // No cross FS links allowed
return Status::NotSupported(context, 1, errno_to_string(err_number));
case EIO:
LOG(ERROR) << "I/O error, context=" << context;
}
return Status::IOError(context, 1, errno_to_string(err_number));
}
Status do_sync(int fd, const string& filename) {
if (fdatasync(fd) < 0) {
return io_error(filename, errno);
}
return Status::OK();
}
static Status do_open(const string& filename, Env::CreateMode mode, int* fd) {
int flags = O_RDWR;
switch (mode) {
case Env::CREATE_IF_NON_EXISTING_TRUNCATE:
flags |= O_CREAT | O_TRUNC;
break;
case Env::CREATE_NON_EXISTING:
flags |= O_CREAT | O_EXCL;
break;
case Env::OPEN_EXISTING:
break;
default:
return Status::NotSupported(Substitute("Unknown create mode $0", mode));
}
int f;
RETRY_ON_EINTR(f, open(filename.c_str(), flags, 0666));
if (f < 0) {
return io_error(filename, errno);
}
*fd = f;
return Status::OK();
}
static Status do_readv_at(int fd, const std::string& filename, uint64_t offset,
const Slice* res, size_t res_cnt) {
// Convert the results into the iovec vector to request
// and calculate the total bytes requested
size_t bytes_req = 0;
struct iovec iov[res_cnt];
for (size_t i = 0; i < res_cnt; i++) {
const Slice& result = res[i];
bytes_req += result.size;
iov[i] = { result.data, result.size };
}
uint64_t cur_offset = offset;
size_t completed_iov = 0;
size_t rem = bytes_req;
while (rem > 0) {
// Never request more than IOV_MAX in one request
size_t iov_count = std::min(res_cnt - completed_iov, static_cast<size_t>(IOV_MAX));
ssize_t r;
RETRY_ON_EINTR(r, preadv(fd, iov + completed_iov, iov_count, cur_offset));
if (PREDICT_FALSE(r < 0)) {
// An error: return a non-ok status.
return io_error(filename, errno);
}
if (PREDICT_FALSE(r == 0)) {
return Status::EndOfFile(
Substitute("EOF trying to read $0 bytes at offset $1", bytes_req, offset));
}
if (PREDICT_TRUE(r == rem)) {
// All requested bytes were read. This is almost always the case.
return Status::OK();
}
DCHECK_LE(r, rem);
// Adjust iovec vector based on bytes read for the next request
ssize_t bytes_rem = r;
for (size_t i = completed_iov; i < res_cnt; i++) {
if (bytes_rem >= iov[i].iov_len) {
// The full length of this iovec was read
completed_iov++;
bytes_rem -= iov[i].iov_len;
} else {
// Partially read this result.
// Adjust the iov_len and iov_base to request only the missing data.
iov[i].iov_base = static_cast<uint8_t *>(iov[i].iov_base) + bytes_rem;
iov[i].iov_len -= bytes_rem;
break; // Don't need to adjust remaining iovec's
}
}
cur_offset += r;
rem -= r;
}
DCHECK_EQ(0, rem);
return Status::OK();
}
static Status do_writev_at(int fd, const string& filename, uint64_t offset,
const Slice* data, size_t data_cnt, size_t* bytes_written) {
// Convert the results into the iovec vector to request
// and calculate the total bytes requested.
size_t bytes_req = 0;
struct iovec iov[data_cnt];
for (size_t i = 0; i < data_cnt; i++) {
const Slice& result = data[i];
bytes_req += result.size;
iov[i] = { result.data, result.size };
}
uint64_t cur_offset = offset;
size_t completed_iov = 0;
size_t rem = bytes_req;
while (rem > 0) {
// Never request more than IOV_MAX in one request.
size_t iov_count = std::min(data_cnt - completed_iov, static_cast<size_t>(IOV_MAX));
ssize_t w;
RETRY_ON_EINTR(w, pwritev(fd, iov + completed_iov, iov_count, cur_offset));
if (PREDICT_FALSE(w < 0)) {
// An error: return a non-ok status.
return io_error(filename, errno);
}
if (PREDICT_TRUE(w == rem)) {
// All requested bytes were read. This is almost always the case.
break;
}
// Adjust iovec vector based on bytes read for the next request.
ssize_t bytes_rem = w;
for (size_t i = completed_iov; i < data_cnt; i++) {
if (bytes_rem >= iov[i].iov_len) {
// The full length of this iovec was written.
completed_iov++;
bytes_rem -= iov[i].iov_len;
} else {
// Partially wrote this result.
// Adjust the iov_len and iov_base to write only the missing data.
iov[i].iov_base = static_cast<uint8_t *>(iov[i].iov_base) + bytes_rem;
iov[i].iov_len -= bytes_rem;
break; // Don't need to adjust remaining iovec's.
}
}
cur_offset += w;
rem -= w;
}
DCHECK_EQ(0, rem);
*bytes_written = bytes_req;
return Status::OK();
}
class PosixSequentialFile: public SequentialFile {
public:
PosixSequentialFile(string fname, FILE* f)
: _filename(std::move(fname)), _file(f) {}
~PosixSequentialFile() override {
int err;
RETRY_ON_EINTR(err, fclose(_file));
if (PREDICT_FALSE(err != 0)) {
LOG(WARNING) << "Failed to close " << _filename
<< ", msg=" << errno_to_string(ferror(_file));
}
}
Status read(Slice* result) override {
size_t r;
STREAM_RETRY_ON_EINTR(r, _file, fread_unlocked(result->data, 1,
result->size, _file));
if (r < result->size) {
if (feof(_file)) {
// We leave status as ok if we hit the end of the file.
// We need to adjust the slice size.
result->truncate(r);
} else {
// A partial read with an error: return a non-ok status.
return io_error(_filename, ferror(_file));
}
}
return Status::OK();
}
Status skip(uint64_t n) override {
if (fseek(_file, n, SEEK_CUR)) {
return io_error(_filename, errno);
}
return Status::OK();
}
const string& filename() const override { return _filename; }
private:
const string _filename;
FILE* const _file;
};
class PosixRandomAccessFile : public RandomAccessFile {
public:
PosixRandomAccessFile(std::string filename, int fd) : _filename(std::move(filename)), _fd(fd) {
}
~PosixRandomAccessFile() override {
int res;
RETRY_ON_EINTR(res, close(_fd));
if (res != 0) {
LOG(WARNING) << "close file failed, name=" << _filename
<< ", msg=" << errno_to_string(errno);
}
}
Status read_at(uint64_t offset, const Slice& result) const override {
return do_readv_at(_fd, _filename, offset, &result, 1);
}
Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const override {
return do_readv_at(_fd, _filename, offset, res, res_cnt);
}
Status size(uint64_t* size) const override {
struct stat st;
auto res = fstat(_fd, &st);
if (res != 0) {
return io_error(_filename, errno);
}
*size = st.st_size;
return Status::OK();
}
const std::string& file_name() const override { return _filename; }
private:
std::string _filename;
int _fd;
};
class PosixWritableFile : public WritableFile {
public:
PosixWritableFile(std::string filename, int fd, uint64_t filesize, bool sync_on_close)
: _filename(std::move(filename)), _fd(fd), _sync_on_close(sync_on_close), _filesize(filesize) { }
~PosixWritableFile() override {
WARN_IF_ERROR(close(), "Failed to close file, file=" + _filename);
}
Status append(const Slice& data) override {
return appendv(&data, 1);
}
Status appendv(const Slice* data, size_t cnt) override {
size_t bytes_written = 0;
RETURN_IF_ERROR(do_writev_at(_fd, _filename, _filesize, data, cnt, &bytes_written));
_filesize += bytes_written;
return Status::OK();
}
Status pre_allocate(uint64_t size) override {
uint64_t offset = std::max(_filesize, _pre_allocated_size);
int ret;
RETRY_ON_EINTR(ret, fallocate(_fd, 0, offset, size));
if (ret != 0) {
if (errno == EOPNOTSUPP) {
LOG(WARNING) << "The filesystem does not support fallocate().";
} else if (errno == ENOSYS) {
LOG(WARNING) << "The kernel does not implement fallocate().";
} else {
return io_error(_filename, errno);
}
}
_pre_allocated_size = offset + size;
return Status::OK();
}
Status close() override {
if (_closed) {
return Status::OK();
}
Status s;
// If we've allocated more space than we used, truncate to the
// actual size of the file and perform Sync().
if (_filesize < _pre_allocated_size) {
int ret;
RETRY_ON_EINTR(ret, ftruncate(_fd, _filesize));
if (ret != 0) {
s = io_error(_filename, errno);
_pending_sync = true;
}
}
if (_sync_on_close) {
Status sync_status = sync();
if (!sync_status.ok()) {
LOG(ERROR) << "Unable to Sync " << _filename << ": " << sync_status.to_string();
if (s.ok()) {
s = sync_status;
}
}
}
int ret;
RETRY_ON_EINTR(ret, ::close(_fd));
if (ret < 0) {
if (s.ok()) {
s = io_error(_filename, errno);
}
}
_closed = true;
return s;
}
Status flush(FlushMode mode) override {
#if defined(__linux__)
int flags = SYNC_FILE_RANGE_WRITE;
if (mode == FLUSH_SYNC) {
flags |= SYNC_FILE_RANGE_WAIT_BEFORE;
flags |= SYNC_FILE_RANGE_WAIT_AFTER;
}
if (sync_file_range(_fd, 0, 0, flags) < 0) {
return io_error(_filename, errno);
}
#else
if (mode == FLUSH_SYNC && fsync(_fd) < 0) {
return io_error(_filename, errno);
}
#endif
return Status::OK();
}
Status sync() override {
if (_pending_sync) {
_pending_sync = false;
RETURN_IF_ERROR(do_sync(_fd, _filename));
}
return Status::OK();
}
uint64_t size() const override { return _filesize; }
const string& filename() const override { return _filename; }
private:
std::string _filename;
int _fd;
const bool _sync_on_close = false;
bool _pending_sync = false;
bool _closed = false;
uint64_t _filesize = 0;
uint64_t _pre_allocated_size = 0;
};
class PosixRandomRWFile : public RandomRWFile {
public:
PosixRandomRWFile(string fname, int fd, bool sync_on_close)
: _filename(std::move(fname)),
_fd(fd),
_sync_on_close(sync_on_close),
_closed(false) {}
~PosixRandomRWFile() {
WARN_IF_ERROR(close(), "Failed to close " + _filename);
}
virtual Status read_at(uint64_t offset, const Slice& result) const override {
return do_readv_at(_fd, _filename, offset, &result, 1);
}
Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const override {
return do_readv_at(_fd, _filename, offset, res, res_cnt);
}
Status write_at(uint64_t offset, const Slice& data) override {
return writev_at(offset, &data, 1);
}
Status writev_at(uint64_t offset, const Slice* data, size_t data_cnt) override {
size_t bytes_written = 0;
return do_writev_at(_fd, _filename, offset, data, data_cnt, &bytes_written);
}
Status flush(FlushMode mode, uint64_t offset, size_t length) override {
#if defined(__linux__)
int flags = SYNC_FILE_RANGE_WRITE;
if (mode == FLUSH_SYNC) {
flags |= SYNC_FILE_RANGE_WAIT_AFTER;
}
if (sync_file_range(_fd, offset, length, flags) < 0) {
return io_error(_filename, errno);
}
#else
if (mode == FLUSH_SYNC && fsync(_fd) < 0) {
return io_error(_filename, errno);
}
#endif
return Status::OK();
}
Status sync() override {
return do_sync(_fd, _filename);
}
Status close() override {
if (_closed) {
return Status::OK();
}
Status s;
if (_sync_on_close) {
s = sync();
if (!s.ok()) {
LOG(ERROR) << "Unable to Sync " << _filename << ": " << s.to_string();
}
}
int ret;
RETRY_ON_EINTR(ret, ::close(_fd));
if (ret < 0) {
if (s.ok()) {
s = io_error(_filename, errno);
}
}
_closed = true;
return s;
}
Status size(uint64_t* size) const override {
struct stat st;
if (fstat(_fd, &st) == -1) {
return io_error(_filename, errno);
}
*size = st.st_size;
return Status::OK();
}
const string& filename() const override {
return _filename;
}
private:
const string _filename;
const int _fd;
const bool _sync_on_close = false;
bool _closed = false;
};
class PosixEnv : public Env {
public:
PosixEnv() { }
~PosixEnv() override { }
Status new_sequential_file(
const string& fname, std::unique_ptr<SequentialFile>* result) override {
FILE* f;
POINTER_RETRY_ON_EINTR(f, fopen(fname.c_str(), "r"));
if (f == nullptr) {
return io_error(fname, errno);
}
result->reset(new PosixSequentialFile(fname, f));
return Status::OK();
}
Status new_random_access_file(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result) override {
return new_random_access_file(RandomAccessFileOptions(), fname, result);
}
Status new_random_access_file(const RandomAccessFileOptions& opts,
const std::string& fname,
std::unique_ptr<RandomAccessFile>* result) override {
int fd;
RETRY_ON_EINTR(fd, open(fname.c_str(), O_RDONLY));
if (fd < 0) {
return io_error(fname, errno);
}
result->reset(new PosixRandomAccessFile(fname, fd));
return Status::OK();
}
Status new_writable_file(const string& fname,
std::unique_ptr<WritableFile>* result) override {
return new_writable_file(WritableFileOptions(), fname, result);
}
Status new_writable_file(const WritableFileOptions& opts,
const string& fname,
std::unique_ptr<WritableFile>* result) override {
int fd;
RETURN_IF_ERROR(do_open(fname, opts.mode, &fd));
uint64_t file_size = 0;
if (opts.mode == OPEN_EXISTING) {
RETURN_IF_ERROR(get_file_size(fname, &file_size));
}
result->reset(new PosixWritableFile(fname, fd, file_size, opts.sync_on_close));
return Status::OK();
}
Status new_random_rw_file(const string& fname,
std::unique_ptr<RandomRWFile>* result) override {
return new_random_rw_file(RandomRWFileOptions(), fname, result);
}
Status new_random_rw_file(const RandomRWFileOptions& opts,
const string& fname,
std::unique_ptr<RandomRWFile>* result) override {
int fd;
RETURN_IF_ERROR(do_open(fname, opts.mode, &fd));
result->reset(new PosixRandomRWFile(fname, fd, opts.sync_on_close));
return Status::OK();
}
Status file_exists(const std::string& fname) override {
if (access(fname.c_str(), F_OK) != 0) {
return io_error(fname, errno);
}
return Status::OK();
}
Status get_children(const std::string& dir,
std::vector<std::string>* result) override {
result->clear();
DIR* d = opendir(dir.c_str());
if (d == nullptr) {
return io_error(dir, errno);
}
struct dirent* entry;
while ((entry = readdir(d)) != nullptr) {
result->push_back(entry->d_name);
}
closedir(d);
return Status::OK();
}
Status delete_file(const std::string& fname) override {
if (unlink(fname.c_str()) != 0) {
return io_error(fname, errno);
}
return Status::OK();
}
// Create the specified directory. Returns error if directory exists.
Status create_dir(const std::string& name) override {
if (mkdir(name.c_str(), 0755) != 0) {
return io_error(name, errno);
}
return Status::OK();
}
// Creates directory if missing. Return Ok if it exists, or successful in
// Creating.
Status create_dir_if_missing(const std::string& name) override {
if (mkdir(name.c_str(), 0755) != 0) {
if (errno != EEXIST) {
return io_error(name, errno);
} else if (!dir_exists(name)) { // Check that name is actually a
// directory.
// Message is taken from mkdir
return Status::IOError(name + " exists but is not a directory");
}
}
return Status::OK();
}
// Delete the specified directory.
Status delete_dir(const std::string& dirname) override {
if (rmdir(dirname.c_str()) != 0) {
return io_error(dirname, errno);
}
return Status::OK();
}
Status get_file_size(const string& fname, uint64_t* size) override {
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
return io_error(fname, errno);
} else {
*size = sbuf.st_size;
}
return Status::OK();
}
Status get_file_modification_time(const std::string& fname,
uint64_t* file_mtime) override {
struct stat s;
if (stat(fname.c_str(), &s) !=0) {
return io_error(fname, errno);
}
*file_mtime = static_cast<uint64_t>(s.st_mtime);
return Status::OK();
}
Status rename_file(const std::string& src, const std::string& target) override {
if (rename(src.c_str(), target.c_str()) != 0) {
return io_error(src, errno);
}
return Status::OK();
}
Status link_file(const std::string& src, const std::string& target) override {
if (link(src.c_str(), target.c_str()) != 0) {
return io_error(src, errno);
}
return Status::OK();
}
private:
bool dir_exists(const std::string& dname) {
struct stat statbuf;
if (stat(dname.c_str(), &statbuf) == 0) {
return S_ISDIR(statbuf.st_mode);
}
return false; // stat() failed return false
}
};
// Default Posix Env
Env* Env::Default() {
static PosixEnv default_env;
return &default_env;
}
}

View File

@ -82,6 +82,7 @@ set(SRC_FILES
${GEN_CPP_DIR}/palo_internal_service.pb.cc
${GEN_CPP_DIR}/types.pb.cc
${GEN_CPP_DIR}/status.pb.cc
${GEN_CPP_DIR}/segment_v2.pb.cc
#$${GEN_CPP_DIR}/opcode/functions.cc
#$${GEN_CPP_DIR}/opcode/vector-functions.cc
#$${GEN_CPP_DIR}/opcode/opcode-registry-init.cc

View File

@ -262,4 +262,26 @@ enum LinkerInitialized { LINKER_INITIALIZED };
#define FALLTHROUGH_INTENDED do { } while (0)
#endif
// Retry on EINTR for functions like read() that return -1 on error.
#define RETRY_ON_EINTR(err, expr) do { \
static_assert(std::is_signed<decltype(err)>::value, \
#err " must be a signed integer"); \
(err) = (expr); \
} while ((err) == -1 && errno == EINTR)
// Same as above but for stream API calls like fread() and fwrite().
#define STREAM_RETRY_ON_EINTR(nread, stream, expr) do { \
static_assert(std::is_unsigned<decltype(nread)>::value == true, \
#nread " must be an unsigned integer"); \
(nread) = (expr); \
} while ((nread) == 0 && ferror(stream) == EINTR)
// Same as above but for functions that return pointer types (like
// fopen() and freopen()).
#define POINTER_RETRY_ON_EINTR(ptr, expr) do { \
static_assert(std::is_pointer<decltype(ptr)>::value == true, \
#ptr " must be a pointer"); \
(ptr) = (expr); \
} while ((ptr) == nullptr && errno == EINTR)
#endif // BASE_MACROS_H_

View File

@ -33,6 +33,7 @@ set(UTIL_FILES
debug_util.cpp
decompress.cpp
disk_info.cpp
errno.cpp
hash_util.hpp
json_util.cpp
doris_metrics.cpp

View File

@ -120,10 +120,22 @@ inline bool BitmapEquals(const uint8_t* bm1, const uint8_t* bm2, size_t bitmap_s
// }
class BitmapIterator {
public:
BitmapIterator(const uint8_t *map, size_t num_bits)
BitmapIterator(const uint8_t* map, size_t num_bits)
: offset_(0), num_bits_(num_bits), map_(map)
{}
void Reset(const uint8_t* map, size_t num_bits) {
offset_ = 0;
num_bits_ = num_bits_;
map_ = map;
}
void Reset() {
offset_ = 0;
num_bits_ = 0;
map_ = nullptr;
}
bool done() const {
return (num_bits_ - offset_) == 0;
}

50
be/src/util/errno.cpp Normal file
View File

@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/errno.h"
#include <cstring>
#include "gutil/dynamic_annotations.h"
namespace doris {
void errno_to_cstring(int err, char *buf, size_t buf_len) {
#if !defined(__GLIBC__) || \
((_POSIX_C_SOURCE >= 200112 || _XOPEN_SOURCE >= 600) && !defined(_GNU_SOURCE))
// Using POSIX version 'int strerror_r(...)'.
int ret = strerror_r(err, buf, buf_len);
if (ret && ret != ERANGE && ret != EINVAL) {
strncpy(buf, "unknown error", buf_len);
buf[buf_len - 1] = '\0';
}
#else
// Using GLIBC version
// KUDU-1515: TSAN in Clang 3.9 has an incorrect interceptor for strerror_r:
// https://github.com/google/sanitizers/issues/696
ANNOTATE_IGNORE_WRITES_BEGIN();
char* ret = strerror_r(err, buf, buf_len);
ANNOTATE_IGNORE_WRITES_END();
if (ret != buf) {
strncpy(buf, ret, buf_len);
buf[buf_len - 1] = '\0';
}
#endif
}
}

32
be/src/util/errno.h Normal file
View File

@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstddef>
#include <string>
namespace doris {
void errno_to_cstring(int err, char *buf, size_t buf_len);
// Return a string representing an errno.
inline static std::string errno_to_string(int err) {
char buf[512];
errno_to_cstring(err, buf, sizeof(buf));
return std::string(buf);
}
}

21
be/test/env/CMakeLists.txt vendored Normal file
View File

@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# where to put generated libraries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/test/env")
ADD_BE_TEST(env_posix_test)

209
be/test/env/env_posix_test.cpp vendored Normal file
View File

@ -0,0 +1,209 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "env/env.h"
#include <gtest/gtest.h>
#include "common/logging.h"
namespace doris {
class EnvPosixTest : public testing::Test {
public:
EnvPosixTest() { }
virtual ~EnvPosixTest() { }
void SetUp() override {
auto st = Env::Default()->create_dir_if_missing("./ut_dir/env_posix");
ASSERT_TRUE(st.ok());
}
void TearDown() override {
}
};
TEST_F(EnvPosixTest, random_access) {
std::string fname = "./ut_dir/env_posix/random_access";
WritableFileOptions ops;
std::unique_ptr<WritableFile> wfile;
auto env = Env::Default();
auto st = env->new_writable_file(fname, &wfile);
ASSERT_TRUE(st.ok());
st = wfile->pre_allocate(1024);
ASSERT_TRUE(st.ok());
// wirte data
Slice field1("123456789");
st = wfile->append(field1);
ASSERT_TRUE(st.ok());
std::string buf;
for (int i = 0; i < 100; ++i) {
buf.push_back((char)i);
}
st = wfile->append(buf);
ASSERT_TRUE(st.ok());
Slice abc("abc");
Slice bcd("bcd");
Slice slices[2]{abc, bcd};
st = wfile->appendv(slices, 2);
ASSERT_TRUE(st.ok());
st = wfile->flush(WritableFile::FLUSH_ASYNC);
ASSERT_TRUE(st.ok());
st = wfile->sync();
ASSERT_TRUE(st.ok());
st = wfile->close();
ASSERT_TRUE(st.ok());
ASSERT_EQ(115, wfile->size());
uint64_t size;
st = env->get_file_size(fname, &size);
ASSERT_TRUE(st.ok());
ASSERT_EQ(115, size);
{
char mem[1024];
std::unique_ptr<RandomAccessFile> rfile;
st = env->new_random_access_file(fname, &rfile);
ASSERT_TRUE(st.ok());
Slice slice1(mem, 9);
Slice slice2(mem + 9, 100);
Slice slice3(mem + 9 + 100, 3);
Slice read_slices[3] {slice1, slice2, slice3};
st = rfile->readv_at(0, read_slices, 3);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("123456789", std::string(slice1.data, slice1.size).c_str());
ASSERT_STREQ("abc", std::string(slice3.data, slice3.size).c_str());
Slice slice4(mem, 3);
st = rfile->read_at(112, slice4);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("bcd", std::string(slice4.data, slice4.size).c_str());
// end of file
st = rfile->read_at(114, slice4);
ASSERT_EQ(TStatusCode::END_OF_FILE, st.code());
LOG(INFO) << "st=" << st.to_string();
}
}
TEST_F(EnvPosixTest, random_rw) {
std::string fname = "./ut_dir/env_posix/random_rw";
WritableFileOptions ops;
std::unique_ptr<RandomRWFile> wfile;
auto env = Env::Default();
auto st = env->new_random_rw_file(fname, &wfile);
ASSERT_TRUE(st.ok());
// wirte data
Slice field1("123456789");
st = wfile->write_at(0, field1);
ASSERT_TRUE(st.ok());
std::string buf;
for (int i = 0; i < 100; ++i) {
buf.push_back((char)i);
}
st = wfile->write_at(9, buf);
ASSERT_TRUE(st.ok());
Slice abc("abc");
Slice bcd("bcd");
Slice slices[2]{abc, bcd};
st = wfile->writev_at(0, slices, 2);
ASSERT_TRUE(st.ok());
uint64_t size;
st = wfile->size(&size);
ASSERT_TRUE(st.ok());
ASSERT_EQ(109, size);
st = wfile->flush(RandomRWFile::FLUSH_ASYNC, 0, 0);
ASSERT_TRUE(st.ok());
st = wfile->sync();
ASSERT_TRUE(st.ok());
st = wfile->close();
ASSERT_TRUE(st.ok());
st = env->get_file_size(fname, &size);
ASSERT_TRUE(st.ok());
ASSERT_EQ(109, size);
{
char mem[1024];
std::unique_ptr<RandomRWFile> rfile;
RandomRWFileOptions opts;
opts.mode = Env::OPEN_EXISTING;
st = env->new_random_rw_file(opts, fname, &rfile);
ASSERT_TRUE(st.ok());
Slice slice1(mem, 3);
Slice slice2(mem + 3, 3);
Slice slice3(mem + 6, 3);
Slice read_slices[3] {slice1, slice2, slice3};
st = rfile->readv_at(0, read_slices, 3);
LOG(INFO) << st.to_string();
ASSERT_TRUE(st.ok());
ASSERT_STREQ("abc", std::string(slice1.data, slice1.size).c_str());
ASSERT_STREQ("bcd", std::string(slice2.data, slice2.size).c_str());
ASSERT_STREQ("789", std::string(slice3.data, slice3.size).c_str());
Slice slice4(mem, 100);
st = rfile->read_at(9, slice4);
ASSERT_TRUE(st.ok());
// end of file
st = rfile->read_at(102, slice4);
ASSERT_EQ(TStatusCode::END_OF_FILE, st.code());
LOG(INFO) << "st=" << st.to_string();
}
// SequentialFile
{
char mem[1024];
std::unique_ptr<SequentialFile> rfile;
st = env->new_sequential_file(fname, &rfile);
ASSERT_TRUE(st.ok());
Slice slice1(mem, 3);
st = rfile->read(&slice1);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("abc", std::string(slice1.data, slice1.size).c_str());
st = rfile->skip(3);
ASSERT_TRUE(st.ok());
Slice slice3(mem, 3);
st = rfile->read(&slice3);
ASSERT_STREQ("789", std::string(slice3.data, slice3.size).c_str());
st = rfile->skip(90);
ASSERT_TRUE(st.ok());
Slice slice4(mem, 15);
st = rfile->read(&slice4);
ASSERT_TRUE(st.ok());
ASSERT_EQ(10, slice4.size);
st = rfile->read(&slice4);
ASSERT_TRUE(st.ok());
ASSERT_EQ(0, slice4.size);
}
}
}
int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}