From a0294b8f4094a268bc798d5e2cef56708786ebd6 Mon Sep 17 00:00:00 2001 From: ZHAO Chun Date: Mon, 17 Jun 2019 10:18:16 +0800 Subject: [PATCH] Add Env for file operation (#1321) --- LICENSE.txt | 9 + be/CMakeLists.txt | 31 +- be/src/common/status.cpp | 2 +- be/src/common/status.h | 84 +++-- be/src/env/CMakeLists.txt | 26 ++ be/src/env/env.h | 297 +++++++++++++++ be/src/env/env_posix.cpp | 654 +++++++++++++++++++++++++++++++++ be/src/gen_cpp/CMakeLists.txt | 1 + be/src/gutil/macros.h | 22 ++ be/src/util/CMakeLists.txt | 1 + be/src/util/bitmap.h | 14 +- be/src/util/errno.cpp | 50 +++ be/src/util/errno.h | 32 ++ be/test/env/CMakeLists.txt | 21 ++ be/test/env/env_posix_test.cpp | 209 +++++++++++ 15 files changed, 1407 insertions(+), 46 deletions(-) create mode 100644 be/src/env/CMakeLists.txt create mode 100644 be/src/env/env.h create mode 100644 be/src/env/env_posix.cpp create mode 100644 be/src/util/errno.cpp create mode 100644 be/src/util/errno.h create mode 100644 be/test/env/CMakeLists.txt create mode 100644 be/test/env/env_posix_test.cpp diff --git a/LICENSE.txt b/LICENSE.txt index 92f632ce8d..700453f399 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -359,6 +359,15 @@ Parts of be/src/runtime/string_search.hpp: Python Software License V2 -------------------------------------------------------------------------------- +be/src/env (some portions): 3-clause BSD + +Some portions of this module are derived from code from RocksDB +( https://github.com/facebook/rocksdb ). RocksDB is dual-licensed + under both the GPLv2 and Apache 2.0 License. We select Apache 2.0 +License. + +-------------------------------------------------------------------------------- + be/src/util/coding.*, be/src/util/status.*: 3-clause BSD Copyright (c) 2011 The LevelDB Authors. All rights reserved. diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 57043772d2..74a7972217 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -447,6 +447,7 @@ set(DORIS_LINK_LIBS Agent CodeGen Common + Env Exec Exprs Gutil @@ -555,23 +556,24 @@ if (${MAKE_TEST} STREQUAL "ON") add_definitions(-DBE_TEST) endif () +add_subdirectory(${SRC_DIR}/agent) add_subdirectory(${SRC_DIR}/codegen) add_subdirectory(${SRC_DIR}/common) -add_subdirectory(${SRC_DIR}/util) -add_subdirectory(${SRC_DIR}/gen_cpp) -add_subdirectory(${SRC_DIR}/gutil) -add_subdirectory(${SRC_DIR}/olap) -add_subdirectory(${SRC_DIR}/agent) -add_subdirectory(${SRC_DIR}/http) -add_subdirectory(${SRC_DIR}/service) +add_subdirectory(${SRC_DIR}/env) add_subdirectory(${SRC_DIR}/exec) add_subdirectory(${SRC_DIR}/exprs) -add_subdirectory(${SRC_DIR}/udf) +add_subdirectory(${SRC_DIR}/gen_cpp) +add_subdirectory(${SRC_DIR}/geo) +add_subdirectory(${SRC_DIR}/gutil) +add_subdirectory(${SRC_DIR}/http) +add_subdirectory(${SRC_DIR}/olap) add_subdirectory(${SRC_DIR}/runtime) +add_subdirectory(${SRC_DIR}/service) add_subdirectory(${SRC_DIR}/testutil) add_subdirectory(${SRC_DIR}/tools) +add_subdirectory(${SRC_DIR}/udf) add_subdirectory(${SRC_DIR}/udf_samples) -add_subdirectory(${SRC_DIR}/geo) +add_subdirectory(${SRC_DIR}/util) # Utility CMake function to make specifying tests and benchmarks less verbose FUNCTION(ADD_BE_TEST TEST_NAME) @@ -592,15 +594,16 @@ ENDFUNCTION() if (${MAKE_TEST} STREQUAL "ON") add_subdirectory(${TEST_DIR}/agent) - add_subdirectory(${TEST_DIR}/olap) add_subdirectory(${TEST_DIR}/common) - add_subdirectory(${TEST_DIR}/util) - add_subdirectory(${TEST_DIR}/udf) + add_subdirectory(${TEST_DIR}/env) add_subdirectory(${TEST_DIR}/exec) add_subdirectory(${TEST_DIR}/exprs) - add_subdirectory(${TEST_DIR}/runtime) - add_subdirectory(${TEST_DIR}/http) add_subdirectory(${TEST_DIR}/geo) + add_subdirectory(${TEST_DIR}/http) + add_subdirectory(${TEST_DIR}/olap) + add_subdirectory(${TEST_DIR}/runtime) + add_subdirectory(${TEST_DIR}/udf) + add_subdirectory(${TEST_DIR}/util) endif () # Install be diff --git a/be/src/common/status.cpp b/be/src/common/status.cpp index 15f3b0d4a2..a47e54483d 100644 --- a/be/src/common/status.cpp +++ b/be/src/common/status.cpp @@ -111,7 +111,7 @@ std::string Status::code_as_string() const { case TStatusCode::CANCELLED: return "Cancelled"; case TStatusCode::NOT_IMPLEMENTED_ERROR: - return "Not implemented"; + return "Not supported"; case TStatusCode::RUNTIME_ERROR: return "Runtime error"; case TStatusCode::MEM_LIMIT_EXCEEDED: diff --git a/be/src/common/status.h b/be/src/common/status.h index 54fdbd0891..6bd6d1d1fd 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -71,55 +71,70 @@ public: static Status OK() { return Status(); } - static Status PublishTimeout(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::PUBLISH_TIMEOUT, msg, sub_code, msg2); + static Status PublishTimeout(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { + return Status(TStatusCode::PUBLISH_TIMEOUT, msg, precise_code, msg2); } - static Status MemoryAllocFailed(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::MEM_ALLOC_FAILED, msg, sub_code, msg2); + static Status MemoryAllocFailed(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { + return Status(TStatusCode::MEM_ALLOC_FAILED, msg, precise_code, msg2); } - static Status BufferAllocFailed(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::BUFFER_ALLOCATION_FAILED, msg, sub_code, msg2); + static Status BufferAllocFailed(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { + return Status(TStatusCode::BUFFER_ALLOCATION_FAILED, msg, precise_code, msg2); } - static Status InvalidArgument(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::INVALID_ARGUMENT, msg, sub_code, msg2); + static Status InvalidArgument(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { + return Status(TStatusCode::INVALID_ARGUMENT, msg, precise_code, msg2); } - static Status MinimumReservationUnavailable(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::INVALID_ARGUMENT, msg, sub_code, msg2); + static Status MinimumReservationUnavailable(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { + return Status(TStatusCode::INVALID_ARGUMENT, msg, precise_code, msg2); } - static Status IoError(const Slice& msg, - int16_t sub_code = 1, + static Status IOError(const Slice& msg, + int16_t precise_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::IO_ERROR, msg, sub_code, msg2); + return Status(TStatusCode::IO_ERROR, msg, precise_code, msg2); + } + static Status NotFound(const Slice& msg, + int16_t precise_code = 1, + const Slice& msg2 = Slice()) { + return Status(TStatusCode::NOT_FOUND, msg, precise_code, msg2); + } + static Status AlreadyExist(const Slice& msg, + int16_t precise_code = 1, + const Slice& msg2 = Slice()) { + return Status(TStatusCode::ALREADY_EXIST, msg, precise_code, msg2); + } + static Status NotSupported(const Slice& msg, + int16_t precise_code = 1, + const Slice& msg2 = Slice()) { + return Status(TStatusCode::NOT_IMPLEMENTED_ERROR, msg, precise_code, msg2); } static Status EndOfFile(const Slice& msg, - int16_t sub_code = 1, + int16_t precise_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::END_OF_FILE, msg, sub_code, msg2); + return Status(TStatusCode::END_OF_FILE, msg, precise_code, msg2); } static Status InternalError(const Slice& msg, - int16_t sub_code = 1, + int16_t precise_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::INTERNAL_ERROR, msg, sub_code, msg2); + return Status(TStatusCode::INTERNAL_ERROR, msg, precise_code, msg2); } static Status RuntimeError(const Slice& msg, - int16_t sub_code = 1, + int16_t precise_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::RUNTIME_ERROR, msg, sub_code, msg2); + return Status(TStatusCode::RUNTIME_ERROR, msg, precise_code, msg2); } - static Status Cancelled(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::CANCELLED, msg, sub_code, msg2); + static Status Cancelled(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { + return Status(TStatusCode::CANCELLED, msg, precise_code, msg2); } - static Status MemoryLimitExceeded(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::MEM_LIMIT_EXCEEDED, msg, sub_code, msg2); + static Status MemoryLimitExceeded(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { + return Status(TStatusCode::MEM_LIMIT_EXCEEDED, msg, precise_code, msg2); } - static Status ThriftRpcError(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::THRIFT_RPC_ERROR, msg, sub_code, msg2); + static Status ThriftRpcError(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { + return Status(TStatusCode::THRIFT_RPC_ERROR, msg, precise_code, msg2); } - static Status TimedOut(const Slice& msg, int16_t sub_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::TIMEOUT, msg, sub_code, msg2); + static Status TimedOut(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { + return Status(TStatusCode::TIMEOUT, msg, precise_code, msg2); } bool ok() const { return _state == nullptr; } @@ -198,7 +213,7 @@ public: private: const char* copy_state(const char* state); - Status(TStatusCode::type code, const Slice& msg, int16_t sub_code, const Slice& msg2); + Status(TStatusCode::type code, const Slice& msg, int16_t precise_code, const Slice& msg2); private: // OK status has a nullptr _state. Otherwise, _state is a new[] array @@ -213,7 +228,7 @@ private: // some generally useful macros #define RETURN_IF_ERROR(stmt) \ do { \ - Status _status_ = (stmt); \ + const Status& _status_ = (stmt); \ if (UNLIKELY(!_status_.ok())) { \ return _status_; \ } \ @@ -229,7 +244,7 @@ private: #define EXIT_IF_ERROR(stmt) \ do { \ - Status _status_ = (stmt); \ + const Status& _status_ = (stmt); \ if (UNLIKELY(!_status_.ok())) { \ string msg = _status_.get_error_msg(); \ LOG(ERROR) << msg; \ @@ -237,6 +252,15 @@ private: } \ } while (false) +/// @brief Emit a warning if @c to_call returns a bad status. +#define WARN_IF_ERROR(to_call, warning_prefix) \ + do { \ + const Status& _s = (to_call); \ + if (PREDICT_FALSE(!_s.ok())) { \ + LOG(WARNING) << (warning_prefix) << ": " << _s.to_string(); \ + } \ + } while (0); + } #define WARN_UNUSED_RESULT __attribute__((warn_unused_result)) diff --git a/be/src/env/CMakeLists.txt b/be/src/env/CMakeLists.txt new file mode 100644 index 0000000000..0f3da6ad82 --- /dev/null +++ b/be/src/env/CMakeLists.txt @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# where to put generated libraries +set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/env") + +# where to put generated binaries +set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/env") + +add_library(Env STATIC + env_posix.cpp +) diff --git a/be/src/env/env.h b/be/src/env/env.h new file mode 100644 index 0000000000..338fb08024 --- /dev/null +++ b/be/src/env/env.h @@ -0,0 +1,297 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors + +#pragma once + +#include +#include + +#include "common/status.h" +#include "util/slice.h" + +namespace doris { + +class RandomAccessFile; +class RandomRWFile; +class WritableFile; +class SequentialFile; +class WritableFileOptions; +class RandomAccessFileOptions; +class RandomRWFileOptions; + +class Env { +public: + // Governs if/how the file is created. + // + // enum value | file exists | file does not exist + // --------------------------------+-------------------+-------------------- + // CREATE_IF_NON_EXISTING_TRUNCATE | opens + truncates | creates + // CREATE_NON_EXISTING | fails | creates + // OPEN_EXISTING | opens | fails + enum CreateMode { + CREATE_IF_NON_EXISTING_TRUNCATE, + CREATE_NON_EXISTING, + OPEN_EXISTING + }; + + Env() { } + virtual ~Env() { } + + // Return a default environment suitable for the current operating + // system. Sophisticated users may wish to provide their own Env + // implementation instead of relying on this default environment. + static Env* Default(); + + // Create a brand new sequentially-readable file with the specified name. + // On success, stores a pointer to the new file in *result and returns OK. + // On failure stores NULL in *result and returns non-OK. If the file does + // not exist, returns a non-OK status. + // + // The returned file will only be accessed by one thread at a time. + virtual Status new_sequential_file(const std::string& fname, + std::unique_ptr* result) = 0; + + // Create a brand new random access read-only file with the + // specified name. On success, stores a pointer to the new file in + // *result and returns OK. On failure stores nullptr in *result and + // returns non-OK. If the file does not exist, returns a non-OK + // status. + // + // The returned file may be concurrently accessed by multiple threads. + virtual Status new_random_access_file(const std::string& fname, + std::unique_ptr* result) = 0; + + virtual Status new_random_access_file(const RandomAccessFileOptions& opts, + const std::string& fname, + std::unique_ptr* result) = 0; + + // Create an object that writes to a new file with the specified + // name. Deletes any existing file with the same name and creates a + // new file. On success, stores a pointer to the new file in + // *result and returns OK. On failure stores NULL in *result and + // returns non-OK. + // + // The returned file will only be accessed by one thread at a time. + virtual Status new_writable_file(const std::string& fname, + std::unique_ptr* result) = 0; + + + // Like the previous new_writable_file, but allows options to be + // specified. + virtual Status new_writable_file(const WritableFileOptions& opts, + const std::string& fname, + std::unique_ptr* result) = 0; + + // Creates a new readable and writable file. If a file with the same name + // already exists on disk, it is deleted. + // + // Some of the methods of the new file may be accessed concurrently, + // while others are only safe for access by one thread at a time. + virtual Status new_random_rw_file(const std::string& fname, + std::unique_ptr* result) = 0; + + // Like the previous new_random_rw_file, but allows options to be specified. + virtual Status new_random_rw_file(const RandomRWFileOptions& opts, + const std::string& fname, + std::unique_ptr* result) = 0; + + // Returns OK if the named file exists. + // NotFound if the named file does not exist, + // the calling process does not have permission to determine + // whether this file exists, or if the path is invalid. + // IOError if an IO Error was encountered + virtual Status file_exists(const std::string& fname) = 0; + + // Store in *result the names of the children of the specified directory. + // The names are relative to "dir". + // Original contents of *results are dropped. + // Returns OK if "dir" exists and "*result" contains its children. + // NotFound if "dir" does not exist, the calling process does not have + // permission to access "dir", or if "dir" is invalid. + // IOError if an IO Error was encountered + virtual Status get_children(const std::string& dir, + std::vector* result) = 0; + + // Delete the named file. + virtual Status delete_file(const std::string& fname) = 0; + + // Create the specified directory. Returns error if directory exists. + virtual Status create_dir(const std::string& dirname) = 0; + + // Creates directory if missing. Return Ok if it exists, or successful in + // Creating. + virtual Status create_dir_if_missing(const std::string& dirname) = 0; + + // Delete the specified directory. + virtual Status delete_dir(const std::string& dirname) = 0; + + virtual Status get_file_size(const std::string& fname, uint64_t* size) = 0; + + // Store the last modification time of fname in *file_mtime. + virtual Status get_file_modification_time(const std::string& fname, + uint64_t* file_mtime) = 0; + // Rename file src to target. + virtual Status rename_file(const std::string& src, + const std::string& target) = 0; + + // Hard Link file src to target. + virtual Status link_file(const std::string& /*src*/, + const std::string& /*target*/) { + return Status::NotSupported("link file is not supported for this Env"); + } +}; + +struct RandomAccessFileOptions { + RandomAccessFileOptions() { } +}; + +// Creation-time options for WritableFile +struct WritableFileOptions { + // Call Sync() during Close(). + bool sync_on_close = false; + // See CreateMode for details. + Env::CreateMode mode = Env::CREATE_IF_NON_EXISTING_TRUNCATE; +}; + +// Creation-time options for RWFile +struct RandomRWFileOptions { + // Call Sync() during Close(). + bool sync_on_close = false; + // See CreateMode for details. + Env::CreateMode mode = Env::CREATE_IF_NON_EXISTING_TRUNCATE; +}; + +// A file abstraction for reading sequentially through a file +class SequentialFile { +public: + SequentialFile() { } + virtual ~SequentialFile() { } + + // Read up to "result.size" bytes from the file. + // Sets "result.data" to the data that was read. + // + // If an error was encountered, returns a non-OK status + // and the contents of "result" are invalid. + // + // REQUIRES: External synchronization + virtual Status read(Slice* result) = 0; + + // Skip "n" bytes from the file. This is guaranteed to be no + // slower that reading the same data, but may be faster. + // + // If end of file is reached, skipping will stop at the end of the + // file, and Skip will return OK. + // + // REQUIRES: External synchronization + virtual Status skip(uint64_t n) = 0; + + // Returns the filename provided when the SequentialFile was constructed. + virtual const std::string& filename() const = 0; +}; + +class RandomAccessFile { +public: + RandomAccessFile() { } + virtual ~RandomAccessFile() { } + + // Read "result.size" bytes from the file starting at "offset". + // Copies the resulting data into "result.data". + // + // If an error was encountered, returns a non-OK status. + // + // This method will internally retry on EINTR and "short reads" in order to + // fully read the requested number of bytes. In the event that it is not + // possible to read exactly 'length' bytes, an IOError is returned. + // + // Safe for concurrent use by multiple threads. + virtual Status read_at(uint64_t offset, const Slice& result) const = 0; + + // Reads up to the "results" aggregate size, based on each Slice's "size", + // from the file starting at 'offset'. The Slices must point to already-allocated + // buffers for the data to be written to. + // + // If an error was encountered, returns a non-OK status. + // + // This method will internally retry on EINTR and "short reads" in order to + // fully read the requested number of bytes. In the event that it is not + // possible to read exactly 'length' bytes, an IOError is returned. + // + // Safe for concurrent use by multiple threads. + virtual Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const = 0; + + // Return the size of this file + virtual Status size(uint64_t* size) const = 0; + + // Return name of this file + virtual const std::string& file_name() const = 0; +}; + +// A file abstraction for sequential writing. The implementation +// must provide buffering since callers may append small fragments +// at a time to the file. +class WritableFile { +public: + enum FlushMode { + FLUSH_SYNC, + FLUSH_ASYNC + }; + + WritableFile() { } + virtual ~WritableFile() { } + + // Append data to the end of the file + // Note: A WritableFile object must support either Append or + // PositionedAppend, so the users cannot mix the two. + virtual Status append(const Slice& data) = 0; + + virtual Status appendv(const Slice* data, size_t cnt) = 0; + + virtual Status pre_allocate(uint64_t size) = 0; + + virtual Status close() = 0; + + virtual Status flush(FlushMode mode) = 0; + + virtual Status sync() = 0; // sync data + + virtual uint64_t size() const = 0; + + // Returns the filename provided when the WritableFile was constructed. + virtual const std::string& filename() const = 0; +}; + +// A file abstraction for random reading and writing. +class RandomRWFile { +public: + enum FlushMode { + FLUSH_SYNC, + FLUSH_ASYNC + }; + RandomRWFile() {} + virtual ~RandomRWFile() {} + + virtual Status read_at(uint64_t offset, const Slice& result) const = 0; + + virtual Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const = 0; + + virtual Status write_at(uint64_t offset, const Slice& data) = 0; + + virtual Status writev_at(uint64_t offset, const Slice* data, size_t data_cnt) = 0; + + virtual Status flush(FlushMode mode, uint64_t offset, size_t length) = 0; + + virtual Status sync() = 0; + + virtual Status close() = 0; + + virtual Status size(uint64_t* size) const = 0; + virtual const std::string& filename() const = 0; +}; + +} diff --git a/be/src/env/env_posix.cpp b/be/src/env/env_posix.cpp new file mode 100644 index 0000000000..3bc6c08faf --- /dev/null +++ b/be/src/env/env_posix.cpp @@ -0,0 +1,654 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors + +#include "env/env.h" + +#include +#include +#include +#include +#include +#include + +#include "common/logging.h" +#include "gutil/macros.h" +#include "gutil/port.h" +#include "gutil/strings/substitute.h" +#include "util/errno.h" +#include "util/slice.h" + +namespace doris { + +using std::string; +using strings::Substitute; + +static Status io_error(const std::string& context, int err_number) { + switch (err_number) { + case EACCES: + case ELOOP: + case ENAMETOOLONG: + case ENOENT: + case ENOTDIR: + return Status::NotFound(context, 1, errno_to_string(err_number)); + case EEXIST: + return Status::AlreadyExist(context, 1, errno_to_string(err_number)); + case EOPNOTSUPP: + case EXDEV: // No cross FS links allowed + return Status::NotSupported(context, 1, errno_to_string(err_number)); + case EIO: + LOG(ERROR) << "I/O error, context=" << context; + } + return Status::IOError(context, 1, errno_to_string(err_number)); +} + +Status do_sync(int fd, const string& filename) { + if (fdatasync(fd) < 0) { + return io_error(filename, errno); + } + return Status::OK(); +} + +static Status do_open(const string& filename, Env::CreateMode mode, int* fd) { + int flags = O_RDWR; + switch (mode) { + case Env::CREATE_IF_NON_EXISTING_TRUNCATE: + flags |= O_CREAT | O_TRUNC; + break; + case Env::CREATE_NON_EXISTING: + flags |= O_CREAT | O_EXCL; + break; + case Env::OPEN_EXISTING: + break; + default: + return Status::NotSupported(Substitute("Unknown create mode $0", mode)); + } + int f; + RETRY_ON_EINTR(f, open(filename.c_str(), flags, 0666)); + if (f < 0) { + return io_error(filename, errno); + } + *fd = f; + return Status::OK(); +} + +static Status do_readv_at(int fd, const std::string& filename, uint64_t offset, + const Slice* res, size_t res_cnt) { + // Convert the results into the iovec vector to request + // and calculate the total bytes requested + size_t bytes_req = 0; + struct iovec iov[res_cnt]; + for (size_t i = 0; i < res_cnt; i++) { + const Slice& result = res[i]; + bytes_req += result.size; + iov[i] = { result.data, result.size }; + } + + uint64_t cur_offset = offset; + size_t completed_iov = 0; + size_t rem = bytes_req; + while (rem > 0) { + // Never request more than IOV_MAX in one request + size_t iov_count = std::min(res_cnt - completed_iov, static_cast(IOV_MAX)); + ssize_t r; + RETRY_ON_EINTR(r, preadv(fd, iov + completed_iov, iov_count, cur_offset)); + if (PREDICT_FALSE(r < 0)) { + // An error: return a non-ok status. + return io_error(filename, errno); + } + + if (PREDICT_FALSE(r == 0)) { + return Status::EndOfFile( + Substitute("EOF trying to read $0 bytes at offset $1", bytes_req, offset)); + } + + if (PREDICT_TRUE(r == rem)) { + // All requested bytes were read. This is almost always the case. + return Status::OK(); + } + DCHECK_LE(r, rem); + // Adjust iovec vector based on bytes read for the next request + ssize_t bytes_rem = r; + for (size_t i = completed_iov; i < res_cnt; i++) { + if (bytes_rem >= iov[i].iov_len) { + // The full length of this iovec was read + completed_iov++; + bytes_rem -= iov[i].iov_len; + } else { + // Partially read this result. + // Adjust the iov_len and iov_base to request only the missing data. + iov[i].iov_base = static_cast(iov[i].iov_base) + bytes_rem; + iov[i].iov_len -= bytes_rem; + break; // Don't need to adjust remaining iovec's + } + } + cur_offset += r; + rem -= r; + } + DCHECK_EQ(0, rem); + return Status::OK(); +} + +static Status do_writev_at(int fd, const string& filename, uint64_t offset, + const Slice* data, size_t data_cnt, size_t* bytes_written) { + // Convert the results into the iovec vector to request + // and calculate the total bytes requested. + size_t bytes_req = 0; + struct iovec iov[data_cnt]; + for (size_t i = 0; i < data_cnt; i++) { + const Slice& result = data[i]; + bytes_req += result.size; + iov[i] = { result.data, result.size }; + } + + uint64_t cur_offset = offset; + size_t completed_iov = 0; + size_t rem = bytes_req; + while (rem > 0) { + // Never request more than IOV_MAX in one request. + size_t iov_count = std::min(data_cnt - completed_iov, static_cast(IOV_MAX)); + ssize_t w; + RETRY_ON_EINTR(w, pwritev(fd, iov + completed_iov, iov_count, cur_offset)); + if (PREDICT_FALSE(w < 0)) { + // An error: return a non-ok status. + return io_error(filename, errno); + } + + if (PREDICT_TRUE(w == rem)) { + // All requested bytes were read. This is almost always the case. + break; + } + // Adjust iovec vector based on bytes read for the next request. + ssize_t bytes_rem = w; + for (size_t i = completed_iov; i < data_cnt; i++) { + if (bytes_rem >= iov[i].iov_len) { + // The full length of this iovec was written. + completed_iov++; + bytes_rem -= iov[i].iov_len; + } else { + // Partially wrote this result. + // Adjust the iov_len and iov_base to write only the missing data. + iov[i].iov_base = static_cast(iov[i].iov_base) + bytes_rem; + iov[i].iov_len -= bytes_rem; + break; // Don't need to adjust remaining iovec's. + } + } + cur_offset += w; + rem -= w; + } + DCHECK_EQ(0, rem); + *bytes_written = bytes_req; + return Status::OK(); +} + +class PosixSequentialFile: public SequentialFile { +public: + PosixSequentialFile(string fname, FILE* f) + : _filename(std::move(fname)), _file(f) {} + + ~PosixSequentialFile() override { + int err; + RETRY_ON_EINTR(err, fclose(_file)); + if (PREDICT_FALSE(err != 0)) { + LOG(WARNING) << "Failed to close " << _filename + << ", msg=" << errno_to_string(ferror(_file)); + } + } + + Status read(Slice* result) override { + size_t r; + STREAM_RETRY_ON_EINTR(r, _file, fread_unlocked(result->data, 1, + result->size, _file)); + if (r < result->size) { + if (feof(_file)) { + // We leave status as ok if we hit the end of the file. + // We need to adjust the slice size. + result->truncate(r); + } else { + // A partial read with an error: return a non-ok status. + return io_error(_filename, ferror(_file)); + } + } + return Status::OK(); + } + + Status skip(uint64_t n) override { + if (fseek(_file, n, SEEK_CUR)) { + return io_error(_filename, errno); + } + return Status::OK(); + } + + const string& filename() const override { return _filename; } + +private: + const string _filename; + FILE* const _file; +}; + +class PosixRandomAccessFile : public RandomAccessFile { +public: + PosixRandomAccessFile(std::string filename, int fd) : _filename(std::move(filename)), _fd(fd) { + } + ~PosixRandomAccessFile() override { + int res; + RETRY_ON_EINTR(res, close(_fd)); + if (res != 0) { + LOG(WARNING) << "close file failed, name=" << _filename + << ", msg=" << errno_to_string(errno); + } + } + + Status read_at(uint64_t offset, const Slice& result) const override { + return do_readv_at(_fd, _filename, offset, &result, 1); + } + + Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const override { + return do_readv_at(_fd, _filename, offset, res, res_cnt); + } + Status size(uint64_t* size) const override { + struct stat st; + auto res = fstat(_fd, &st); + if (res != 0) { + return io_error(_filename, errno); + } + *size = st.st_size; + return Status::OK(); + } + + const std::string& file_name() const override { return _filename; } +private: + std::string _filename; + int _fd; +}; + +class PosixWritableFile : public WritableFile { +public: + PosixWritableFile(std::string filename, int fd, uint64_t filesize, bool sync_on_close) + : _filename(std::move(filename)), _fd(fd), _sync_on_close(sync_on_close), _filesize(filesize) { } + + ~PosixWritableFile() override { + WARN_IF_ERROR(close(), "Failed to close file, file=" + _filename); + } + + Status append(const Slice& data) override { + return appendv(&data, 1); + } + + Status appendv(const Slice* data, size_t cnt) override { + size_t bytes_written = 0; + RETURN_IF_ERROR(do_writev_at(_fd, _filename, _filesize, data, cnt, &bytes_written)); + _filesize += bytes_written; + return Status::OK(); + } + + Status pre_allocate(uint64_t size) override { + uint64_t offset = std::max(_filesize, _pre_allocated_size); + int ret; + RETRY_ON_EINTR(ret, fallocate(_fd, 0, offset, size)); + if (ret != 0) { + if (errno == EOPNOTSUPP) { + LOG(WARNING) << "The filesystem does not support fallocate()."; + } else if (errno == ENOSYS) { + LOG(WARNING) << "The kernel does not implement fallocate()."; + } else { + return io_error(_filename, errno); + } + } + _pre_allocated_size = offset + size; + return Status::OK(); + } + + Status close() override { + if (_closed) { + return Status::OK(); + } + Status s; + + // If we've allocated more space than we used, truncate to the + // actual size of the file and perform Sync(). + if (_filesize < _pre_allocated_size) { + int ret; + RETRY_ON_EINTR(ret, ftruncate(_fd, _filesize)); + if (ret != 0) { + s = io_error(_filename, errno); + _pending_sync = true; + } + } + + if (_sync_on_close) { + Status sync_status = sync(); + if (!sync_status.ok()) { + LOG(ERROR) << "Unable to Sync " << _filename << ": " << sync_status.to_string(); + if (s.ok()) { + s = sync_status; + } + } + } + + int ret; + RETRY_ON_EINTR(ret, ::close(_fd)); + if (ret < 0) { + if (s.ok()) { + s = io_error(_filename, errno); + } + } + + _closed = true; + return s; + } + + Status flush(FlushMode mode) override { +#if defined(__linux__) + int flags = SYNC_FILE_RANGE_WRITE; + if (mode == FLUSH_SYNC) { + flags |= SYNC_FILE_RANGE_WAIT_BEFORE; + flags |= SYNC_FILE_RANGE_WAIT_AFTER; + } + if (sync_file_range(_fd, 0, 0, flags) < 0) { + return io_error(_filename, errno); + } +#else + if (mode == FLUSH_SYNC && fsync(_fd) < 0) { + return io_error(_filename, errno); + } +#endif + return Status::OK(); + } + + Status sync() override { + if (_pending_sync) { + _pending_sync = false; + RETURN_IF_ERROR(do_sync(_fd, _filename)); + } + return Status::OK(); + } + + uint64_t size() const override { return _filesize; } + const string& filename() const override { return _filename; } +private: + std::string _filename; + int _fd; + const bool _sync_on_close = false; + bool _pending_sync = false; + bool _closed = false; + uint64_t _filesize = 0; + uint64_t _pre_allocated_size = 0; +}; + +class PosixRandomRWFile : public RandomRWFile { +public: + PosixRandomRWFile(string fname, int fd, bool sync_on_close) + : _filename(std::move(fname)), + _fd(fd), + _sync_on_close(sync_on_close), + _closed(false) {} + + ~PosixRandomRWFile() { + WARN_IF_ERROR(close(), "Failed to close " + _filename); + } + + virtual Status read_at(uint64_t offset, const Slice& result) const override { + return do_readv_at(_fd, _filename, offset, &result, 1); + } + + Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const override { + return do_readv_at(_fd, _filename, offset, res, res_cnt); + } + + Status write_at(uint64_t offset, const Slice& data) override { + return writev_at(offset, &data, 1); + } + + Status writev_at(uint64_t offset, const Slice* data, size_t data_cnt) override { + size_t bytes_written = 0; + return do_writev_at(_fd, _filename, offset, data, data_cnt, &bytes_written); + } + + Status flush(FlushMode mode, uint64_t offset, size_t length) override { +#if defined(__linux__) + int flags = SYNC_FILE_RANGE_WRITE; + if (mode == FLUSH_SYNC) { + flags |= SYNC_FILE_RANGE_WAIT_AFTER; + } + if (sync_file_range(_fd, offset, length, flags) < 0) { + return io_error(_filename, errno); + } +#else + if (mode == FLUSH_SYNC && fsync(_fd) < 0) { + return io_error(_filename, errno); + } +#endif + return Status::OK(); + } + + Status sync() override { + return do_sync(_fd, _filename); + } + + Status close() override { + if (_closed) { + return Status::OK(); + } + Status s; + if (_sync_on_close) { + s = sync(); + if (!s.ok()) { + LOG(ERROR) << "Unable to Sync " << _filename << ": " << s.to_string(); + } + } + + int ret; + RETRY_ON_EINTR(ret, ::close(_fd)); + if (ret < 0) { + if (s.ok()) { + s = io_error(_filename, errno); + } + } + + _closed = true; + return s; + } + + Status size(uint64_t* size) const override { + struct stat st; + if (fstat(_fd, &st) == -1) { + return io_error(_filename, errno); + } + *size = st.st_size; + return Status::OK(); + } + + const string& filename() const override { + return _filename; + } + +private: + const string _filename; + const int _fd; + const bool _sync_on_close = false; + bool _closed = false; +}; + +class PosixEnv : public Env { +public: + PosixEnv() { } + ~PosixEnv() override { } + + Status new_sequential_file( + const string& fname, std::unique_ptr* result) override { + FILE* f; + POINTER_RETRY_ON_EINTR(f, fopen(fname.c_str(), "r")); + if (f == nullptr) { + return io_error(fname, errno); + } + result->reset(new PosixSequentialFile(fname, f)); + return Status::OK(); + } + + Status new_random_access_file(const std::string& fname, + std::unique_ptr* result) override { + return new_random_access_file(RandomAccessFileOptions(), fname, result); + } + + Status new_random_access_file(const RandomAccessFileOptions& opts, + const std::string& fname, + std::unique_ptr* result) override { + int fd; + RETRY_ON_EINTR(fd, open(fname.c_str(), O_RDONLY)); + if (fd < 0) { + return io_error(fname, errno); + } + result->reset(new PosixRandomAccessFile(fname, fd)); + return Status::OK(); + } + + Status new_writable_file(const string& fname, + std::unique_ptr* result) override { + return new_writable_file(WritableFileOptions(), fname, result); + } + + Status new_writable_file(const WritableFileOptions& opts, + const string& fname, + std::unique_ptr* result) override { + int fd; + RETURN_IF_ERROR(do_open(fname, opts.mode, &fd)); + + uint64_t file_size = 0; + if (opts.mode == OPEN_EXISTING) { + RETURN_IF_ERROR(get_file_size(fname, &file_size)); + } + result->reset(new PosixWritableFile(fname, fd, file_size, opts.sync_on_close)); + return Status::OK(); + } + + Status new_random_rw_file(const string& fname, + std::unique_ptr* result) override { + return new_random_rw_file(RandomRWFileOptions(), fname, result); + } + + Status new_random_rw_file(const RandomRWFileOptions& opts, + const string& fname, + std::unique_ptr* result) override { + int fd; + RETURN_IF_ERROR(do_open(fname, opts.mode, &fd)); + result->reset(new PosixRandomRWFile(fname, fd, opts.sync_on_close)); + return Status::OK(); + } + + Status file_exists(const std::string& fname) override { + if (access(fname.c_str(), F_OK) != 0) { + return io_error(fname, errno); + } + return Status::OK(); + } + + Status get_children(const std::string& dir, + std::vector* result) override { + result->clear(); + DIR* d = opendir(dir.c_str()); + if (d == nullptr) { + return io_error(dir, errno); + } + struct dirent* entry; + while ((entry = readdir(d)) != nullptr) { + result->push_back(entry->d_name); + } + closedir(d); + return Status::OK(); + } + + Status delete_file(const std::string& fname) override { + if (unlink(fname.c_str()) != 0) { + return io_error(fname, errno); + } + return Status::OK(); + } + + // Create the specified directory. Returns error if directory exists. + Status create_dir(const std::string& name) override { + if (mkdir(name.c_str(), 0755) != 0) { + return io_error(name, errno); + } + return Status::OK(); + } + + // Creates directory if missing. Return Ok if it exists, or successful in + // Creating. + Status create_dir_if_missing(const std::string& name) override { + if (mkdir(name.c_str(), 0755) != 0) { + if (errno != EEXIST) { + return io_error(name, errno); + } else if (!dir_exists(name)) { // Check that name is actually a + // directory. + // Message is taken from mkdir + return Status::IOError(name + " exists but is not a directory"); + } + } + return Status::OK(); + } + + // Delete the specified directory. + Status delete_dir(const std::string& dirname) override { + if (rmdir(dirname.c_str()) != 0) { + return io_error(dirname, errno); + } + return Status::OK(); + } + + Status get_file_size(const string& fname, uint64_t* size) override { + struct stat sbuf; + if (stat(fname.c_str(), &sbuf) != 0) { + return io_error(fname, errno); + } else { + *size = sbuf.st_size; + } + return Status::OK(); + } + + Status get_file_modification_time(const std::string& fname, + uint64_t* file_mtime) override { + struct stat s; + if (stat(fname.c_str(), &s) !=0) { + return io_error(fname, errno); + } + *file_mtime = static_cast(s.st_mtime); + return Status::OK(); + } + + Status rename_file(const std::string& src, const std::string& target) override { + if (rename(src.c_str(), target.c_str()) != 0) { + return io_error(src, errno); + } + return Status::OK(); + } + + Status link_file(const std::string& src, const std::string& target) override { + if (link(src.c_str(), target.c_str()) != 0) { + return io_error(src, errno); + } + return Status::OK(); + } + +private: + bool dir_exists(const std::string& dname) { + struct stat statbuf; + if (stat(dname.c_str(), &statbuf) == 0) { + return S_ISDIR(statbuf.st_mode); + } + return false; // stat() failed return false + } +}; + +// Default Posix Env +Env* Env::Default() { + static PosixEnv default_env; + return &default_env; +} + +} diff --git a/be/src/gen_cpp/CMakeLists.txt b/be/src/gen_cpp/CMakeLists.txt index 3212d4d2c2..e98fe61301 100644 --- a/be/src/gen_cpp/CMakeLists.txt +++ b/be/src/gen_cpp/CMakeLists.txt @@ -82,6 +82,7 @@ set(SRC_FILES ${GEN_CPP_DIR}/palo_internal_service.pb.cc ${GEN_CPP_DIR}/types.pb.cc ${GEN_CPP_DIR}/status.pb.cc + ${GEN_CPP_DIR}/segment_v2.pb.cc #$${GEN_CPP_DIR}/opcode/functions.cc #$${GEN_CPP_DIR}/opcode/vector-functions.cc #$${GEN_CPP_DIR}/opcode/opcode-registry-init.cc diff --git a/be/src/gutil/macros.h b/be/src/gutil/macros.h index 0318008a48..da7ea13fd7 100644 --- a/be/src/gutil/macros.h +++ b/be/src/gutil/macros.h @@ -262,4 +262,26 @@ enum LinkerInitialized { LINKER_INITIALIZED }; #define FALLTHROUGH_INTENDED do { } while (0) #endif +// Retry on EINTR for functions like read() that return -1 on error. +#define RETRY_ON_EINTR(err, expr) do { \ + static_assert(std::is_signed::value, \ + #err " must be a signed integer"); \ + (err) = (expr); \ +} while ((err) == -1 && errno == EINTR) + +// Same as above but for stream API calls like fread() and fwrite(). +#define STREAM_RETRY_ON_EINTR(nread, stream, expr) do { \ + static_assert(std::is_unsigned::value == true, \ + #nread " must be an unsigned integer"); \ + (nread) = (expr); \ +} while ((nread) == 0 && ferror(stream) == EINTR) + +// Same as above but for functions that return pointer types (like +// fopen() and freopen()). +#define POINTER_RETRY_ON_EINTR(ptr, expr) do { \ + static_assert(std::is_pointer::value == true, \ + #ptr " must be a pointer"); \ + (ptr) = (expr); \ +} while ((ptr) == nullptr && errno == EINTR) + #endif // BASE_MACROS_H_ diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index e2c5ac842e..7e9eb0367b 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -33,6 +33,7 @@ set(UTIL_FILES debug_util.cpp decompress.cpp disk_info.cpp + errno.cpp hash_util.hpp json_util.cpp doris_metrics.cpp diff --git a/be/src/util/bitmap.h b/be/src/util/bitmap.h index 3e3e23e7d6..879038d1e7 100644 --- a/be/src/util/bitmap.h +++ b/be/src/util/bitmap.h @@ -120,10 +120,22 @@ inline bool BitmapEquals(const uint8_t* bm1, const uint8_t* bm2, size_t bitmap_s // } class BitmapIterator { public: - BitmapIterator(const uint8_t *map, size_t num_bits) + BitmapIterator(const uint8_t* map, size_t num_bits) : offset_(0), num_bits_(num_bits), map_(map) {} + void Reset(const uint8_t* map, size_t num_bits) { + offset_ = 0; + num_bits_ = num_bits_; + map_ = map; + } + + void Reset() { + offset_ = 0; + num_bits_ = 0; + map_ = nullptr; + } + bool done() const { return (num_bits_ - offset_) == 0; } diff --git a/be/src/util/errno.cpp b/be/src/util/errno.cpp new file mode 100644 index 0000000000..bd0e813cd0 --- /dev/null +++ b/be/src/util/errno.cpp @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/errno.h" + +#include + +#include "gutil/dynamic_annotations.h" + +namespace doris { + +void errno_to_cstring(int err, char *buf, size_t buf_len) { +#if !defined(__GLIBC__) || \ + ((_POSIX_C_SOURCE >= 200112 || _XOPEN_SOURCE >= 600) && !defined(_GNU_SOURCE)) + // Using POSIX version 'int strerror_r(...)'. + int ret = strerror_r(err, buf, buf_len); + if (ret && ret != ERANGE && ret != EINVAL) { + strncpy(buf, "unknown error", buf_len); + buf[buf_len - 1] = '\0'; + } +#else + // Using GLIBC version + + // KUDU-1515: TSAN in Clang 3.9 has an incorrect interceptor for strerror_r: + // https://github.com/google/sanitizers/issues/696 + ANNOTATE_IGNORE_WRITES_BEGIN(); + char* ret = strerror_r(err, buf, buf_len); + ANNOTATE_IGNORE_WRITES_END(); + if (ret != buf) { + strncpy(buf, ret, buf_len); + buf[buf_len - 1] = '\0'; + } +#endif +} + +} diff --git a/be/src/util/errno.h b/be/src/util/errno.h new file mode 100644 index 0000000000..82b07582d5 --- /dev/null +++ b/be/src/util/errno.h @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +namespace doris { + +void errno_to_cstring(int err, char *buf, size_t buf_len); + +// Return a string representing an errno. +inline static std::string errno_to_string(int err) { + char buf[512]; + errno_to_cstring(err, buf, sizeof(buf)); + return std::string(buf); +} + +} diff --git a/be/test/env/CMakeLists.txt b/be/test/env/CMakeLists.txt new file mode 100644 index 0000000000..04d8893543 --- /dev/null +++ b/be/test/env/CMakeLists.txt @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# where to put generated libraries +set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/test/env") + +ADD_BE_TEST(env_posix_test) diff --git a/be/test/env/env_posix_test.cpp b/be/test/env/env_posix_test.cpp new file mode 100644 index 0000000000..eef4e1b16e --- /dev/null +++ b/be/test/env/env_posix_test.cpp @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "env/env.h" + +#include + +#include "common/logging.h" + +namespace doris { + +class EnvPosixTest : public testing::Test { +public: + EnvPosixTest() { } + virtual ~EnvPosixTest() { } + void SetUp() override { + auto st = Env::Default()->create_dir_if_missing("./ut_dir/env_posix"); + ASSERT_TRUE(st.ok()); + } + void TearDown() override { + } +}; + +TEST_F(EnvPosixTest, random_access) { + std::string fname = "./ut_dir/env_posix/random_access"; + WritableFileOptions ops; + std::unique_ptr wfile; + auto env = Env::Default(); + auto st = env->new_writable_file(fname, &wfile); + ASSERT_TRUE(st.ok()); + st = wfile->pre_allocate(1024); + ASSERT_TRUE(st.ok()); + // wirte data + Slice field1("123456789"); + st = wfile->append(field1); + ASSERT_TRUE(st.ok()); + std::string buf; + for (int i = 0; i < 100; ++i) { + buf.push_back((char)i); + } + st = wfile->append(buf); + ASSERT_TRUE(st.ok()); + Slice abc("abc"); + Slice bcd("bcd"); + Slice slices[2]{abc, bcd}; + st = wfile->appendv(slices, 2); + ASSERT_TRUE(st.ok()); + st = wfile->flush(WritableFile::FLUSH_ASYNC); + ASSERT_TRUE(st.ok()); + st = wfile->sync(); + ASSERT_TRUE(st.ok()); + st = wfile->close(); + ASSERT_TRUE(st.ok()); + + ASSERT_EQ(115, wfile->size()); + + uint64_t size; + st = env->get_file_size(fname, &size); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(115, size); + { + char mem[1024]; + std::unique_ptr rfile; + st = env->new_random_access_file(fname, &rfile); + ASSERT_TRUE(st.ok()); + + Slice slice1(mem, 9); + Slice slice2(mem + 9, 100); + Slice slice3(mem + 9 + 100, 3); + + Slice read_slices[3] {slice1, slice2, slice3}; + st = rfile->readv_at(0, read_slices, 3); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("123456789", std::string(slice1.data, slice1.size).c_str()); + ASSERT_STREQ("abc", std::string(slice3.data, slice3.size).c_str()); + + Slice slice4(mem, 3); + st = rfile->read_at(112, slice4); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bcd", std::string(slice4.data, slice4.size).c_str()); + + // end of file + st = rfile->read_at(114, slice4); + ASSERT_EQ(TStatusCode::END_OF_FILE, st.code()); + LOG(INFO) << "st=" << st.to_string(); + } +} + +TEST_F(EnvPosixTest, random_rw) { + std::string fname = "./ut_dir/env_posix/random_rw"; + WritableFileOptions ops; + std::unique_ptr wfile; + auto env = Env::Default(); + auto st = env->new_random_rw_file(fname, &wfile); + ASSERT_TRUE(st.ok()); + // wirte data + Slice field1("123456789"); + st = wfile->write_at(0, field1); + ASSERT_TRUE(st.ok()); + std::string buf; + for (int i = 0; i < 100; ++i) { + buf.push_back((char)i); + } + st = wfile->write_at(9, buf); + ASSERT_TRUE(st.ok()); + Slice abc("abc"); + Slice bcd("bcd"); + Slice slices[2]{abc, bcd}; + st = wfile->writev_at(0, slices, 2); + ASSERT_TRUE(st.ok()); + + uint64_t size; + st = wfile->size(&size); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(109, size); + + st = wfile->flush(RandomRWFile::FLUSH_ASYNC, 0, 0); + ASSERT_TRUE(st.ok()); + st = wfile->sync(); + ASSERT_TRUE(st.ok()); + st = wfile->close(); + ASSERT_TRUE(st.ok()); + + st = env->get_file_size(fname, &size); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(109, size); + { + char mem[1024]; + std::unique_ptr rfile; + RandomRWFileOptions opts; + opts.mode = Env::OPEN_EXISTING; + st = env->new_random_rw_file(opts, fname, &rfile); + ASSERT_TRUE(st.ok()); + + Slice slice1(mem, 3); + Slice slice2(mem + 3, 3); + Slice slice3(mem + 6, 3); + + Slice read_slices[3] {slice1, slice2, slice3}; + st = rfile->readv_at(0, read_slices, 3); + LOG(INFO) << st.to_string(); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("abc", std::string(slice1.data, slice1.size).c_str()); + ASSERT_STREQ("bcd", std::string(slice2.data, slice2.size).c_str()); + ASSERT_STREQ("789", std::string(slice3.data, slice3.size).c_str()); + + Slice slice4(mem, 100); + st = rfile->read_at(9, slice4); + ASSERT_TRUE(st.ok()); + + // end of file + st = rfile->read_at(102, slice4); + ASSERT_EQ(TStatusCode::END_OF_FILE, st.code()); + LOG(INFO) << "st=" << st.to_string(); + } + // SequentialFile + { + char mem[1024]; + std::unique_ptr rfile; + st = env->new_sequential_file(fname, &rfile); + ASSERT_TRUE(st.ok()); + + Slice slice1(mem, 3); + st = rfile->read(&slice1); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("abc", std::string(slice1.data, slice1.size).c_str()); + + st = rfile->skip(3); + ASSERT_TRUE(st.ok()); + + Slice slice3(mem, 3); + st = rfile->read(&slice3); + ASSERT_STREQ("789", std::string(slice3.data, slice3.size).c_str()); + + st = rfile->skip(90); + ASSERT_TRUE(st.ok()); + + Slice slice4(mem, 15); + st = rfile->read(&slice4); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(10, slice4.size); + + + st = rfile->read(&slice4); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(0, slice4.size); + } +} + +} + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}