diff --git a/be/src/env/env.h b/be/src/env/env.h index 1c1a992999..aeafc1e226 100644 --- a/be/src/env/env.h +++ b/be/src/env/env.h @@ -5,6 +5,7 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors + #pragma once #include @@ -22,7 +23,6 @@ namespace doris { class RandomAccessFile; class RandomRWFile; class WritableFile; -class SequentialFile; class PosixEnv; class StorageBackend; struct FilePathDesc; @@ -50,15 +50,6 @@ public: // implementation instead of relying on this default environment. static Env* Default(); - // Create a brand new sequentially-readable file with the specified name. - // On success, stores a pointer to the new file in *result and returns OK. - // On failure stores nullptr in *result and returns non-OK. If the file does - // not exist, returns a non-OK status. - // - // The returned file will only be accessed by one thread at a time. - virtual Status new_sequential_file(const std::string& fname, - std::unique_ptr* result) = 0; - // Create a brand new random access read-only file with the // specified name. On success, stores a pointer to the new file in // *result and returns OK. On failure stores nullptr in *result and @@ -301,34 +292,6 @@ struct RandomRWFileOptions { Env::OpenMode mode = Env::CREATE_OR_OPEN_WITH_TRUNCATE; }; -// A file abstraction for reading sequentially through a file -class SequentialFile { -public: - SequentialFile() {} - virtual ~SequentialFile() {} - - // Read up to "result.size" bytes from the file. - // Sets "result.data" to the data that was read. - // - // If an error was encountered, returns a non-OK status - // and the contents of "result" are invalid. - // - // REQUIRES: External synchronization - virtual Status read(Slice* result) = 0; - - // Skip "n" bytes from the file. This is guaranteed to be no - // slower that reading the same data, but may be faster. - // - // If end of file is reached, skipping will stop at the end of the - // file, and Skip will return OK. - // - // REQUIRES: External synchronization - virtual Status skip(uint64_t n) = 0; - - // Returns the filename provided when the SequentialFile was constructed. - virtual const std::string& filename() const = 0; -}; - class RandomAccessFile { public: RandomAccessFile() {} diff --git a/be/src/env/env_posix.cpp b/be/src/env/env_posix.cpp index 2317f790f1..5ec462f153 100644 --- a/be/src/env/env_posix.cpp +++ b/be/src/env/env_posix.cpp @@ -223,49 +223,6 @@ static Status do_writev_at(int fd, const string& filename, uint64_t offset, cons return Status::OK(); } -class PosixSequentialFile : public SequentialFile { -public: - PosixSequentialFile(string fname, FILE* f) : _filename(std::move(fname)), _file(f) {} - - ~PosixSequentialFile() override { - int err; - RETRY_ON_EINTR(err, fclose(_file)); - if (PREDICT_FALSE(err != 0)) { - LOG(WARNING) << "Failed to close " << _filename - << ", msg=" << errno_to_string(ferror(_file)); - } - } - - Status read(Slice* result) override { - size_t r; - STREAM_RETRY_ON_EINTR(r, _file, fread_unlocked(result->data, 1, result->size, _file)); - if (r < result->size) { - if (feof(_file)) { - // We leave status as ok if we hit the end of the file. - // We need to adjust the slice size. - result->truncate(r); - } else { - // A partial read with an error: return a non-ok status. - return io_error(_filename, ferror(_file)); - } - } - return Status::OK(); - } - - Status skip(uint64_t n) override { - if (fseek(_file, n, SEEK_CUR)) { - return io_error(_filename, errno); - } - return Status::OK(); - } - - const string& filename() const override { return _filename; } - -private: - const std::string _filename; - FILE* const _file; -}; - class PosixRandomAccessFile : public RandomAccessFile { public: PosixRandomAccessFile(std::string filename, int fd) : _filename(std::move(filename)), _fd(fd) {} @@ -528,16 +485,6 @@ private: bool _closed = false; }; -Status PosixEnv::new_sequential_file(const string& fname, std::unique_ptr* result) { - FILE* f; - POINTER_RETRY_ON_EINTR(f, fopen(fname.c_str(), "r")); - if (f == nullptr) { - return io_error(fname, errno); - } - result->reset(new PosixSequentialFile(fname, f)); - return Status::OK(); -} - // get a RandomAccessFile pointer without file cache Status PosixEnv::new_random_access_file(const std::string& fname, std::unique_ptr* result) { diff --git a/be/src/env/env_posix.h b/be/src/env/env_posix.h index f8555d9ab1..3b2681d478 100644 --- a/be/src/env/env_posix.h +++ b/be/src/env/env_posix.h @@ -24,7 +24,6 @@ namespace doris { class RandomAccessFile; class RandomRWFile; class WritableFile; -class SequentialFile; struct WritableFileOptions; struct RandomAccessFileOptions; struct RandomRWFileOptions; @@ -33,9 +32,6 @@ class PosixEnv : public Env { public: ~PosixEnv() override {} - Status new_sequential_file(const std::string& fname, - std::unique_ptr* result) override; - // get a RandomAccessFile pointer without file cache Status new_random_access_file(const std::string& fname, std::unique_ptr* result) override; diff --git a/be/src/env/env_util.cpp b/be/src/env/env_util.cpp index 3494b2ed10..c6527392dc 100644 --- a/be/src/env/env_util.cpp +++ b/be/src/env/env_util.cpp @@ -75,26 +75,14 @@ Status write_string_to_file_sync(Env* env, const Slice& data, const std::string& return do_write_string_to_file(env, data, fname, true); } -Status read_file_to_string(Env* env, const std::string& fname, faststring* data) { +Status read_file_to_string(Env* env, const std::string& fname, std::string* data) { data->clear(); - std::unique_ptr file; - Status s = env->new_sequential_file(fname, &file); + std::unique_ptr file; + Status s = env->new_random_access_file(fname, &file); if (!s.ok()) { return s; } - static const int kBufferSize = 8192; - std::unique_ptr scratch(new uint8_t[kBufferSize]); - while (true) { - Slice fragment(scratch.get(), kBufferSize); - s = file->read(&fragment); - if (!s.ok()) { - break; - } - data->append(fragment.get_data(), fragment.get_size()); - if (fragment.empty()) { - break; - } - } + s = file->read_all(data); return s; } diff --git a/be/src/env/env_util.h b/be/src/env/env_util.h index 70ea526145..fde092c99b 100644 --- a/be/src/env/env_util.h +++ b/be/src/env/env_util.h @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + #pragma once #include @@ -45,7 +46,7 @@ Status write_string_to_file(Env* env, const Slice& data, const std::string& fnam Status write_string_to_file_sync(Env* env, const Slice& data, const std::string& fname); // A utility routine: read contents of named file into *data -Status read_file_to_string(Env* env, const std::string& fname, faststring* data); +Status read_file_to_string(Env* env, const std::string& fname, std::string* data); } // namespace env_util } // namespace doris diff --git a/be/src/util/os_util.cpp b/be/src/util/os_util.cpp index e44ed166ad..79b7f1e90f 100644 --- a/be/src/util/os_util.cpp +++ b/be/src/util/os_util.cpp @@ -104,11 +104,11 @@ Status get_thread_stats(int64_t tid, ThreadStats* stats) { if (kTicksPerSec <= 0) { return Status::NotSupported("ThreadStats not supported"); } - faststring buf; + std::string buf; RETURN_IF_ERROR(env_util::read_file_to_string( Env::Default(), strings::Substitute("/proc/self/task/$0/stat", tid), &buf)); - return parse_stat(buf.ToString(), nullptr, stats); + return parse_stat(buf, nullptr, stats); } void disable_core_dumps() { struct rlimit lim; @@ -138,13 +138,13 @@ bool is_being_debugged() { // Look for the TracerPid line in /proc/self/status. // If this is non-zero, we are being ptraced, which is indicative of gdb or strace // being attached. - faststring buf; + std::string buf; Status s = env_util::read_file_to_string(Env::Default(), "/proc/self/status", &buf); if (!s.ok()) { LOG(WARNING) << "could not read /proc/self/status: " << s.to_string(); return false; } - StringPiece buf_sp(reinterpret_cast(buf.data()), buf.size()); + StringPiece buf_sp(buf.data(), buf.size()); std::vector lines = Split(buf_sp, "\n"); for (const auto& l : lines) { if (!HasPrefixString(l, "TracerPid:")) continue; diff --git a/be/src/util/storage_backend_mgr.cpp b/be/src/util/storage_backend_mgr.cpp index 3e5540b5c5..ab907e114f 100644 --- a/be/src/util/storage_backend_mgr.cpp +++ b/be/src/util/storage_backend_mgr.cpp @@ -15,17 +15,17 @@ // specific language governing permissions and limitations // under the License. +#include "util/storage_backend_mgr.h" + #include "common/config.h" #include "common/status.h" -#include "env/env_util.h" #include "env/env.h" +#include "env/env_util.h" #include "gutil/strings/substitute.h" -#include "util/faststring.h" #include "util/file_utils.h" -#include "util/storage_backend.h" -#include "util/storage_backend_mgr.h" -#include "util/s3_util.h" #include "util/s3_storage_backend.h" +#include "util/s3_util.h" +#include "util/storage_backend.h" namespace doris { @@ -36,20 +36,22 @@ Status StorageBackendMgr::init(const std::string& storage_param_dir) { Status exist_status = Env::Default()->path_exists(storage_param_dir); if (!exist_status.ok() && (!exist_status.is_not_found() || !Env::Default()->create_dirs(storage_param_dir).ok())) { - RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError(strings::Substitute( - "failed to create remote storage_param root path $0", storage_param_dir)), - "create_dirs failed"); + RETURN_NOT_OK_STATUS_WITH_WARN( + Status::IOError(strings::Substitute( + "failed to create remote storage_param root path $0", storage_param_dir)), + "create_dirs failed"); } std::vector file_names; RETURN_IF_ERROR(FileUtils::list_files(Env::Default(), storage_param_dir, &file_names)); for (auto& file_name : file_names) { - faststring buf; - RETURN_NOT_OK_STATUS_WITH_WARN(env_util::read_file_to_string( - Env::Default(), storage_param_dir + "/" + file_name, &buf), - strings::Substitute("load storage_name failed. $0", file_name)); + std::string buf; + RETURN_NOT_OK_STATUS_WITH_WARN( + env_util::read_file_to_string(Env::Default(), storage_param_dir + "/" + file_name, + &buf), + strings::Substitute("load storage_name failed. $0", file_name)); StorageParamPB storage_param_pb; - RETURN_IF_ERROR(_deserialize_param(buf.ToString(), &storage_param_pb)); + RETURN_IF_ERROR(_deserialize_param(buf, &storage_param_pb)); RETURN_IF_ERROR(_create_remote_storage_internal(storage_param_pb)); LOG(INFO) << "init remote_storage_param successfully. storage_name: " << file_name; } @@ -67,18 +69,20 @@ Status StorageBackendMgr::create_remote_storage(const StorageParamPB& storage_pa std::string storage_name = storage_param_pb.storage_name(); string storage_param_path = _storage_param_dir + "/" + storage_name; - RETURN_NOT_OK_STATUS_WITH_WARN(FileUtils::remove(storage_param_path), - strings::Substitute("rm storage_param_pb file failed: $0", storage_param_path)); + RETURN_NOT_OK_STATUS_WITH_WARN( + FileUtils::remove(storage_param_path), + strings::Substitute("rm storage_param_pb file failed: $0", storage_param_path)); std::string param_binary; RETURN_NOT_OK_STATUS_WITH_WARN(_serialize_param(storage_param_pb, ¶m_binary), "_serialize_param storage_param_pb failed."); RETURN_NOT_OK_STATUS_WITH_WARN( env_util::write_string_to_file(Env::Default(), Slice(param_binary), storage_param_path), strings::Substitute("write_string_to_file failed: $0", storage_param_path)); - faststring buf; - RETURN_NOT_OK_STATUS_WITH_WARN(env_util::read_file_to_string(Env::Default(), storage_param_path, &buf), - strings::Substitute("read storage_name failed. $0", storage_param_path)); - if (buf.ToString() != param_binary) { + std::string buf; + RETURN_NOT_OK_STATUS_WITH_WARN( + env_util::read_file_to_string(Env::Default(), storage_param_path, &buf), + strings::Substitute("read storage_name failed. $0", storage_param_path)); + if (buf != param_binary) { LOG(ERROR) << "storage_param written failed. storage_name: (" << storage_param_pb.storage_name() << "<->" << storage_name << ")"; return Status::InternalError("storage_param written failed"); @@ -89,7 +93,7 @@ Status StorageBackendMgr::create_remote_storage(const StorageParamPB& storage_pa Status StorageBackendMgr::_create_remote_storage_internal(const StorageParamPB& storage_param_pb) { std::string storage_name = storage_param_pb.storage_name(); - WriteLock wrlock(_storage_backend_lock); + std::unique_lock wrlock(_storage_backend_lock); if (_storage_backend_map.size() >= doris::config::max_remote_storage_count) { std::map::iterator itr = _storage_backend_active_time.begin(); std::string timeout_key = itr->first; @@ -107,25 +111,25 @@ Status StorageBackendMgr::_create_remote_storage_internal(const StorageParamPB& } std::map storage_prop; switch (storage_param_pb.storage_medium()) { - case StorageMediumPB::S3: - default: - S3StorageParamPB s3_storage_param = storage_param_pb.s3_storage_param(); - if (s3_storage_param.s3_ak().empty() || s3_storage_param.s3_sk().empty() - || s3_storage_param.s3_endpoint().empty() || s3_storage_param.s3_region().empty()) { - return Status::InternalError("s3_storage_param param is invalid"); - } - storage_prop[S3_AK] = s3_storage_param.s3_ak(); - storage_prop[S3_SK] = s3_storage_param.s3_sk(); - storage_prop[S3_ENDPOINT] = s3_storage_param.s3_endpoint(); - storage_prop[S3_REGION] = s3_storage_param.s3_region(); - storage_prop[S3_MAX_CONN_SIZE] = s3_storage_param.s3_max_conn(); - storage_prop[S3_REQUEST_TIMEOUT_MS] = s3_storage_param.s3_request_timeout_ms(); - storage_prop[S3_CONN_TIMEOUT_MS] = s3_storage_param.s3_conn_timeout_ms(); + case StorageMediumPB::S3: + default: + S3StorageParamPB s3_storage_param = storage_param_pb.s3_storage_param(); + if (s3_storage_param.s3_ak().empty() || s3_storage_param.s3_sk().empty() || + s3_storage_param.s3_endpoint().empty() || s3_storage_param.s3_region().empty()) { + return Status::InternalError("s3_storage_param param is invalid"); + } + storage_prop[S3_AK] = s3_storage_param.s3_ak(); + storage_prop[S3_SK] = s3_storage_param.s3_sk(); + storage_prop[S3_ENDPOINT] = s3_storage_param.s3_endpoint(); + storage_prop[S3_REGION] = s3_storage_param.s3_region(); + storage_prop[S3_MAX_CONN_SIZE] = s3_storage_param.s3_max_conn(); + storage_prop[S3_REQUEST_TIMEOUT_MS] = s3_storage_param.s3_request_timeout_ms(); + storage_prop[S3_CONN_TIMEOUT_MS] = s3_storage_param.s3_conn_timeout_ms(); - if (!ClientFactory::is_s3_conf_valid(storage_prop)) { - return Status::InternalError("s3_storage_param is invalid"); - } - _storage_backend_map[storage_name] = std::make_shared(storage_prop); + if (!ClientFactory::is_s3_conf_valid(storage_prop)) { + return Status::InternalError("s3_storage_param is invalid"); + } + _storage_backend_map[storage_name] = std::make_shared(storage_prop); } _storage_param_map[storage_name] = storage_param_pb; _storage_backend_active_time[storage_name] = time(nullptr); @@ -133,8 +137,9 @@ Status StorageBackendMgr::_create_remote_storage_internal(const StorageParamPB& return Status::OK(); } -std::shared_ptr StorageBackendMgr::get_storage_backend(const std::string& storage_name) { - ReadLock rdlock(_storage_backend_lock); +std::shared_ptr StorageBackendMgr::get_storage_backend( + const std::string& storage_name) { + std::shared_lock rdlock(_storage_backend_lock); if (_storage_backend_map.find(storage_name) == _storage_backend_map.end()) { return nullptr; } @@ -142,8 +147,9 @@ std::shared_ptr StorageBackendMgr::get_storage_backend(const std return _storage_backend_map[storage_name]; } -Status StorageBackendMgr::get_storage_param(const std::string& storage_name, StorageParamPB* storage_param) { - ReadLock rdlock(_storage_backend_lock); +Status StorageBackendMgr::get_storage_param(const std::string& storage_name, + StorageParamPB* storage_param) { + std::shared_lock rdlock(_storage_backend_lock); if (_storage_backend_map.find(storage_name) == _storage_backend_map.end()) { return Status::InternalError("storage_name not exist: " + storage_name); } @@ -152,7 +158,7 @@ Status StorageBackendMgr::get_storage_param(const std::string& storage_name, Sto } Status StorageBackendMgr::get_root_path(const std::string& storage_name, std::string* root_path) { - ReadLock rdlock(_storage_backend_lock); + std::shared_lock rdlock(_storage_backend_lock); if (_storage_backend_map.find(storage_name) == _storage_backend_map.end()) { return Status::InternalError("storage_name not exist: " + storage_name); } @@ -162,18 +168,17 @@ Status StorageBackendMgr::get_root_path(const std::string& storage_name, std::st std::string StorageBackendMgr::get_root_path_from_param(const StorageParamPB& storage_param) { switch (storage_param.storage_medium()) { - case StorageMediumPB::S3: - default: - { - return storage_param.s3_storage_param().root_path(); - } + case StorageMediumPB::S3: + default: { + return storage_param.s3_storage_param().root_path(); + } } } Status StorageBackendMgr::_check_exist(const StorageParamPB& storage_param_pb) { StorageParamPB old_storage_param; RETURN_IF_ERROR(get_storage_param(storage_param_pb.storage_name(), &old_storage_param)); - ReadLock rdlock(_storage_backend_lock); + std::shared_lock rdlock(_storage_backend_lock); std::string old_param_binary; RETURN_NOT_OK_STATUS_WITH_WARN(_serialize_param(old_storage_param, &old_param_binary), "_serialize_param old_storage_param_pb failed."); @@ -187,16 +192,19 @@ Status StorageBackendMgr::_check_exist(const StorageParamPB& storage_param_pb) { return Status::OK(); } -Status StorageBackendMgr::_serialize_param(const StorageParamPB& storage_param_pb, std::string* param_binary) { +Status StorageBackendMgr::_serialize_param(const StorageParamPB& storage_param_pb, + std::string* param_binary) { bool serialize_success = storage_param_pb.SerializeToString(param_binary); if (!serialize_success) { LOG(WARNING) << "failed to serialize storage_param " << storage_param_pb.storage_name(); - return Status::InternalError("failed to serialize storage_param: " + storage_param_pb.storage_name()); + return Status::InternalError("failed to serialize storage_param: " + + storage_param_pb.storage_name()); } return Status::OK(); } -Status StorageBackendMgr::_deserialize_param(const std::string& param_binary, StorageParamPB* storage_param_pb) { +Status StorageBackendMgr::_deserialize_param(const std::string& param_binary, + StorageParamPB* storage_param_pb) { bool parsed = storage_param_pb->ParseFromString(param_binary); if (!parsed) { LOG(WARNING) << "parse storage_param failed"; diff --git a/be/src/util/storage_backend_mgr.h b/be/src/util/storage_backend_mgr.h index c969c42a4a..3fd19dcb99 100644 --- a/be/src/util/storage_backend_mgr.h +++ b/be/src/util/storage_backend_mgr.h @@ -17,15 +17,18 @@ #pragma once -#include #include +#include +#include +#include #include -#include "util/mutex.h" +#include "common/status.h" namespace doris { class StorageBackend; +class StorageParamPB; // StorageBackendMgr is used to manage StorageBackend, it has (key -> StorageBackend) map used to connect remote storage class StorageBackendMgr { @@ -55,6 +58,7 @@ public: // get root_path of remote storage from storage_param static std::string get_root_path_from_param(const StorageParamPB& storage_param); + private: Status _create_remote_storage_internal(const StorageParamPB& storage_param); Status _check_exist(const StorageParamPB& storage_param_pb); diff --git a/be/test/env/env_posix_test.cpp b/be/test/env/env_posix_test.cpp index 0503b01502..3570dc027c 100644 --- a/be/test/env/env_posix_test.cpp +++ b/be/test/env/env_posix_test.cpp @@ -178,37 +178,6 @@ TEST_F(EnvPosixTest, random_rw) { ASSERT_EQ(TStatusCode::END_OF_FILE, st.code()); LOG(INFO) << "st=" << st.to_string(); } - // SequentialFile - { - char mem[1024]; - std::unique_ptr rfile; - st = env->new_sequential_file(fname, &rfile); - ASSERT_TRUE(st.ok()); - - Slice slice1(mem, 3); - st = rfile->read(&slice1); - ASSERT_TRUE(st.ok()); - ASSERT_STREQ("abc", std::string(slice1.data, slice1.size).c_str()); - - st = rfile->skip(3); - ASSERT_TRUE(st.ok()); - - Slice slice3(mem, 3); - st = rfile->read(&slice3); - ASSERT_STREQ("789", std::string(slice3.data, slice3.size).c_str()); - - st = rfile->skip(90); - ASSERT_TRUE(st.ok()); - - Slice slice4(mem, 15); - st = rfile->read(&slice4); - ASSERT_TRUE(st.ok()); - ASSERT_EQ(10, slice4.size); - - st = rfile->read(&slice4); - ASSERT_TRUE(st.ok()); - ASSERT_EQ(0, slice4.size); - } } TEST_F(EnvPosixTest, iterate_dir) { diff --git a/be/test/plugin/plugin_zip_test.cpp b/be/test/plugin/plugin_zip_test.cpp index 2ee1eba3c7..e564734e94 100644 --- a/be/test/plugin/plugin_zip_test.cpp +++ b/be/test/plugin/plugin_zip_test.cpp @@ -40,8 +40,6 @@ public: std::string path = GetCurrentRunningDir(); ASSERT_FALSE(path.empty()); - std::unique_ptr file; - auto& file_name = req->param("FILE"); FILE* fp = fopen((path + "/plugin_test/source/" + file_name).c_str(), "r");