diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index af32c81982..adc02a0aa2 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -97,7 +97,7 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env, case TFileType::FILE_HDFS: { THdfsParams hdfs_params = parse_properties(properties); std::shared_ptr fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); RETURN_IF_ERROR(fs->create_file(path, &file_writer)); break; } @@ -108,33 +108,32 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env, return Status::OK(); } -Status FileFactory::create_file_reader(const FileSystemProperties& system_properties, - const FileDescription& file_description, +Status FileFactory::create_file_reader(const io::FileSystemProperties& system_properties, + const io::FileDescription& file_description, + const io::FileReaderOptions& reader_options, std::shared_ptr* file_system, - io::FileReaderSPtr* file_reader, - io::FileReaderOptions reader_options) { + io::FileReaderSPtr* file_reader, RuntimeProfile* profile) { TFileType::type type = system_properties.system_type; - reader_options.file_size = file_description.file_size; switch (type) { case TFileType::FILE_LOCAL: { - RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_description.path, - reader_options, file_reader)); + RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_description, reader_options, + file_reader)); break; } case TFileType::FILE_S3: { - RETURN_IF_ERROR(create_s3_reader(system_properties.properties, file_description.path, - file_system, file_reader, reader_options)); + RETURN_IF_ERROR(create_s3_reader(system_properties.properties, file_description, + reader_options, file_system, file_reader)); break; } case TFileType::FILE_HDFS: { - RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, file_description.path, - file_system, file_reader, reader_options)); + RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, file_description, + reader_options, file_system, file_reader, profile)); break; } case TFileType::FILE_BROKER: { RETURN_IF_ERROR(create_broker_reader(system_properties.broker_addresses[0], system_properties.properties, file_description, - file_system, file_reader, reader_options)); + reader_options, file_system, file_reader)); break; } default: @@ -165,42 +164,43 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS return Status::OK(); } -Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, +Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, + const io::FileDescription& fd, + const io::FileReaderOptions& reader_options, std::shared_ptr* hdfs_file_system, - io::FileReaderSPtr* reader, - const io::FileReaderOptions& reader_options) { + io::FileReaderSPtr* reader, RuntimeProfile* profile) { std::shared_ptr fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); - RETURN_IF_ERROR(fs->open_file(path, reader_options, reader)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", profile, &fs)); + RETURN_IF_ERROR(fs->open_file(fd, reader_options, reader)); *hdfs_file_system = std::move(fs); return Status::OK(); } Status FileFactory::create_s3_reader(const std::map& prop, - const std::string& path, + const io::FileDescription& fd, + const io::FileReaderOptions& reader_options, std::shared_ptr* s3_file_system, - io::FileReaderSPtr* reader, - const io::FileReaderOptions& reader_options) { - S3URI s3_uri(path); + io::FileReaderSPtr* reader) { + S3URI s3_uri(fd.path); RETURN_IF_ERROR(s3_uri.parse()); S3Conf s3_conf; RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(prop, s3_uri, &s3_conf)); std::shared_ptr fs; RETURN_IF_ERROR(io::S3FileSystem::create(std::move(s3_conf), "", &fs)); - RETURN_IF_ERROR(fs->open_file(path, reader_options, reader)); + RETURN_IF_ERROR(fs->open_file(fd, reader_options, reader)); *s3_file_system = std::move(fs); return Status::OK(); } Status FileFactory::create_broker_reader(const TNetworkAddress& broker_addr, const std::map& prop, - const FileDescription& file_description, + const io::FileDescription& fd, + const io::FileReaderOptions& reader_options, std::shared_ptr* broker_file_system, - io::FileReaderSPtr* reader, - const io::FileReaderOptions& reader_options) { + io::FileReaderSPtr* reader) { std::shared_ptr fs; RETURN_IF_ERROR(io::BrokerFileSystem::create(broker_addr, prop, &fs)); - RETURN_IF_ERROR(fs->open_file(file_description.path, reader_options, reader)); + RETURN_IF_ERROR(fs->open_file(fd, reader_options, reader)); *broker_file_system = std::move(fs); return Status::OK(); } diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index b5cbcdfc7c..42589df531 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -31,6 +31,7 @@ #include "common/status.h" #include "io/fs/file_reader_options.h" #include "io/fs/file_reader_writer_fwd.h" +#include "io/fs/fs_utils.h" namespace doris { namespace io { @@ -41,19 +42,6 @@ class ExecEnv; class RuntimeProfile; class RuntimeState; -struct FileSystemProperties { - TFileType::type system_type; - std::map properties; - THdfsParams hdfs_params; - std::vector broker_addresses; -}; - -struct FileDescription { - std::string path; - int64_t start_offset; - int64_t file_size; -}; - class FileFactory { ENABLE_FACTORY_CREATOR(FileFactory); @@ -69,32 +57,34 @@ public: std::unique_ptr& file_writer); /// Create FileReader - static Status create_file_reader( - const FileSystemProperties& system_properties, const FileDescription& file_description, - std::shared_ptr* file_system, io::FileReaderSPtr* file_reader, - io::FileReaderOptions reader_options = NO_CACHE_READER_OPTIONS); + static Status create_file_reader(const io::FileSystemProperties& system_properties, + const io::FileDescription& file_description, + const io::FileReaderOptions& reader_options, + std::shared_ptr* file_system, + io::FileReaderSPtr* file_reader, + RuntimeProfile* profile = nullptr); // Create FileReader for stream load pipe static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader, const TUniqueId& fragment_instance_id); - static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, + static Status create_hdfs_reader(const THdfsParams& hdfs_params, const io::FileDescription& fd, + const io::FileReaderOptions& reader_options, std::shared_ptr* hdfs_file_system, - io::FileReaderSPtr* reader, - const io::FileReaderOptions& reader_options); + io::FileReaderSPtr* reader, RuntimeProfile* profile); static Status create_s3_reader(const std::map& prop, - const std::string& path, + const io::FileDescription& fd, + const io::FileReaderOptions& reader_options, std::shared_ptr* s3_file_system, - io::FileReaderSPtr* reader, - const io::FileReaderOptions& reader_options); + io::FileReaderSPtr* reader); static Status create_broker_reader(const TNetworkAddress& broker_addr, const std::map& prop, - const FileDescription& file_description, + const io::FileDescription& fd, + const io::FileReaderOptions& reader_options, std::shared_ptr* hdfs_file_system, - io::FileReaderSPtr* reader, - const io::FileReaderOptions& reader_options); + io::FileReaderSPtr* reader); static TFileType::type convert_storage_type(TStorageBackendType::type type) { switch (type) { diff --git a/be/src/io/fs/benchmark/hdfs_benchmark.hpp b/be/src/io/fs/benchmark/hdfs_benchmark.hpp index b508e14a24..2a6f97d5e8 100644 --- a/be/src/io/fs/benchmark/hdfs_benchmark.hpp +++ b/be/src/io/fs/benchmark/hdfs_benchmark.hpp @@ -50,8 +50,10 @@ public: io::FileReaderSPtr reader; io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); THdfsParams hdfs_params = parse_properties(_conf_map); - RETURN_IF_ERROR( - FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, reader_opts)); + io::FileDescription fd; + fd.path = file_path; + RETURN_IF_ERROR(FileFactory::create_hdfs_reader(hdfs_params, fd, reader_opts, &fs, &reader, + nullptr)); auto end = std::chrono::high_resolution_clock::now(); auto elapsed_seconds = std::chrono::duration_cast>(end - start); @@ -94,7 +96,7 @@ public: std::shared_ptr fs; io::FileWriterPtr writer; THdfsParams hdfs_params = parse_properties(_conf_map); - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); RETURN_IF_ERROR(fs->create_file(file_path, &writer)); return write(state, writer.get()); } @@ -115,7 +117,7 @@ public: auto new_file_path = file_path + "_new"; THdfsParams hdfs_params = parse_properties(_conf_map); std::shared_ptr fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); auto start = std::chrono::high_resolution_clock::now(); RETURN_IF_ERROR(fs->rename(file_path, new_file_path)); @@ -142,7 +144,7 @@ public: std::shared_ptr fs; THdfsParams hdfs_params = parse_properties(_conf_map); - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); auto start = std::chrono::high_resolution_clock::now(); bool res = false; diff --git a/be/src/io/fs/benchmark/s3_benchmark.hpp b/be/src/io/fs/benchmark/s3_benchmark.hpp index f97976e5ba..698129d546 100644 --- a/be/src/io/fs/benchmark/s3_benchmark.hpp +++ b/be/src/io/fs/benchmark/s3_benchmark.hpp @@ -66,9 +66,11 @@ public: io::FileReaderSPtr reader; io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr); + io::FileDescription fd; + fd.path = file_path; RETURN_IF_ERROR(FileFactory::create_s3_reader( - _conf_map, file_path, reinterpret_cast*>(&fs), - &reader, reader_opts)); + _conf_map, fd, reader_opts, reinterpret_cast*>(&fs), + &reader)); return read(state, reader); } }; @@ -118,8 +120,8 @@ public: io::FileReaderOptions reader_options = FileFactory::get_reader_options(nullptr); IOContext io_ctx; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( - nullptr, fs_props, fd, &fs, &reader, io::DelegateReader::AccessMode::SEQUENTIAL, - reader_options, &io_ctx)); + nullptr, fs_props, fd, reader_options, &fs, &reader, + io::DelegateReader::AccessMode::SEQUENTIAL, &io_ctx)); return read(state, reader); } }; diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp index cdef7eef38..5a4342027b 100644 --- a/be/src/io/fs/broker_file_system.cpp +++ b/be/src/io/fs/broker_file_system.cpp @@ -101,17 +101,17 @@ Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* write return Status::OK(); } -Status BrokerFileSystem::open_file_internal(const Path& file, int64_t file_size, +Status BrokerFileSystem::open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) { - int64_t fsize = file_size; - if (fsize < 0) { - RETURN_IF_ERROR(file_size_impl(file, &fsize)); + int64_t fsize = fd.file_size; + if (fsize <= 0) { + RETURN_IF_ERROR(file_size_impl(abs_path, &fsize)); } CHECK_BROKER_CLIENT(_client); TBrokerOpenReaderRequest request; request.__set_version(TBrokerVersion::VERSION_ONE); - request.__set_path(file); + request.__set_path(abs_path); request.__set_startOffset(0); request.__set_clientId(client_id(_broker_addr)); request.__set_properties(_broker_prop); @@ -127,15 +127,16 @@ Status BrokerFileSystem::open_file_internal(const Path& file, int64_t file_size, (*_client)->openReader(*response, request); } } catch (apache::thrift::TException& e) { - return Status::RpcError("failed to open file {}: {}", file.native(), error_msg(e.what())); + return Status::RpcError("failed to open file {}: {}", abs_path.native(), + error_msg(e.what())); } if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) { - return Status::IOError("failed to open file {}: {}", file.native(), + return Status::IOError("failed to open file {}: {}", abs_path.native(), error_msg(response->opStatus.message)); } *reader = std::make_shared( - _broker_addr, file, fsize, response->fd, + _broker_addr, abs_path, fsize, response->fd, std::static_pointer_cast(shared_from_this())); return Status::OK(); } @@ -406,7 +407,9 @@ Status BrokerFileSystem::upload_with_checksum_impl(const Path& local_file, const Status BrokerFileSystem::download_impl(const Path& remote_file, const Path& local_file) { // 1. open remote file for read FileReaderSPtr broker_reader = nullptr; - RETURN_IF_ERROR(open_file_internal(remote_file, -1, &broker_reader)); + FileDescription fd; + fd.path = remote_file.native(); + RETURN_IF_ERROR(open_file_internal(fd, remote_file, &broker_reader)); // 2. remove the existing local file if exist if (std::filesystem::remove(local_file)) { @@ -440,10 +443,12 @@ Status BrokerFileSystem::download_impl(const Path& remote_file, const Path& loca return Status::OK(); } -Status BrokerFileSystem::direct_download_impl(const Path& remote_impl, std::string* content) { +Status BrokerFileSystem::direct_download_impl(const Path& remote_file, std::string* content) { // 1. open remote file for read FileReaderSPtr broker_reader = nullptr; - RETURN_IF_ERROR(open_file_internal(remote_impl, -1, &broker_reader)); + FileDescription fd; + fd.path = remote_file.native(); + RETURN_IF_ERROR(open_file_internal(fd, remote_file, &broker_reader)); constexpr size_t buf_sz = 1024 * 1024; std::unique_ptr read_buf(new char[buf_sz]); diff --git a/be/src/io/fs/broker_file_system.h b/be/src/io/fs/broker_file_system.h index 6b04bbc61c..a015f5c1f5 100644 --- a/be/src/io/fs/broker_file_system.h +++ b/be/src/io/fs/broker_file_system.h @@ -49,7 +49,8 @@ public: protected: Status connect_impl() override; Status create_file_impl(const Path& file, FileWriterPtr* writer) override; - Status open_file_internal(const Path& file, int64_t file_size, FileReaderSPtr* reader) override; + Status open_file_internal(const FileDescription& fd, const Path& abs_path, + FileReaderSPtr* reader) override; Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; Status delete_file_impl(const Path& file) override; Status delete_directory_impl(const Path& dir) override; diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index d9af61d3e8..be64439128 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -760,13 +760,13 @@ Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset, Status DelegateReader::create_file_reader(RuntimeProfile* profile, const FileSystemProperties& system_properties, const FileDescription& file_description, + const io::FileReaderOptions& reader_options, std::shared_ptr* file_system, io::FileReaderSPtr* file_reader, AccessMode access_mode, - io::FileReaderOptions reader_options, const IOContext* io_ctx, const PrefetchRange file_range) { io::FileReaderSPtr reader; RETURN_IF_ERROR(FileFactory::create_file_reader(system_properties, file_description, - file_system, &reader, reader_options)); + reader_options, file_system, &reader, profile)); if (reader->size() < IN_MEMORY_FILE_SIZE) { *file_reader = std::make_shared(reader); } else if (access_mode == AccessMode::SEQUENTIAL) { diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index a23112027b..b0728e6af1 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -250,10 +250,9 @@ public: static Status create_file_reader( RuntimeProfile* profile, const FileSystemProperties& system_properties, - const FileDescription& file_description, std::shared_ptr* file_system, - io::FileReaderSPtr* file_reader, AccessMode access_mode = SEQUENTIAL, - io::FileReaderOptions reader_options = FileFactory::NO_CACHE_READER_OPTIONS, - const IOContext* io_ctx = nullptr, + const FileDescription& file_description, const io::FileReaderOptions& reader_options, + std::shared_ptr* file_system, io::FileReaderSPtr* file_reader, + AccessMode access_mode = SEQUENTIAL, const IOContext* io_ctx = nullptr, const PrefetchRange file_range = PrefetchRange(0, 0)); }; diff --git a/be/src/io/fs/file_reader_options.h b/be/src/io/fs/file_reader_options.h index c3e4f74ad1..7477816f8f 100644 --- a/be/src/io/fs/file_reader_options.h +++ b/be/src/io/fs/file_reader_options.h @@ -71,10 +71,6 @@ public: FileCachePolicy cache_type; const CachePathPolicy& path_policy; - // length of the file in bytes. - // -1 means unset. - // If the file length is not set, the file length will be fetched from the file system. - int64_t file_size = -1; bool has_cache_base_path = false; std::string cache_base_path; // Use modification time to determine whether the file is changed diff --git a/be/src/io/fs/file_system.cpp b/be/src/io/fs/file_system.cpp index d399fbbc57..989a68884a 100644 --- a/be/src/io/fs/file_system.cpp +++ b/be/src/io/fs/file_system.cpp @@ -27,10 +27,10 @@ Status FileSystem::create_file(const Path& file, FileWriterPtr* writer) { FILESYSTEM_M(create_file_impl(path, writer)); } -Status FileSystem::open_file(const Path& file, const FileReaderOptions& reader_options, +Status FileSystem::open_file(const FileDescription& fd, const FileReaderOptions& reader_options, FileReaderSPtr* reader) { - auto path = absolute_path(file); - FILESYSTEM_M(open_file_impl(path, reader_options, reader)); + auto path = absolute_path(fd.path); + FILESYSTEM_M(open_file_impl(fd, path, reader_options, reader)); } Status FileSystem::create_directory(const Path& dir, bool failed_if_exists) { diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h index 30ac5f5a4a..04e2615fb0 100644 --- a/be/src/io/fs/file_system.h +++ b/be/src/io/fs/file_system.h @@ -32,6 +32,7 @@ #include "common/status.h" #include "io/fs/file_reader_options.h" #include "io/fs/file_reader_writer_fwd.h" +#include "io/fs/fs_utils.h" #include "io/fs/path.h" namespace doris { @@ -75,9 +76,14 @@ public: // And derived classes should implement all xxx_impl methods. Status create_file(const Path& file, FileWriterPtr* writer); Status open_file(const Path& file, FileReaderSPtr* reader) { - return open_file(file, FileReaderOptions::DEFAULT, reader); + FileDescription fd; + fd.path = file.native(); + return open_file(fd, FileReaderOptions::DEFAULT, reader); } - Status open_file(const Path& file, const FileReaderOptions& reader_options, + Status open_file(const FileDescription& fd, FileReaderSPtr* reader) { + return open_file(fd, FileReaderOptions::DEFAULT, reader); + } + Status open_file(const FileDescription& fd, const FileReaderOptions& reader_options, FileReaderSPtr* reader); Status create_directory(const Path& dir, bool failed_if_exists = false); Status delete_file(const Path& file); @@ -112,7 +118,8 @@ protected: virtual Status create_file_impl(const Path& file, FileWriterPtr* writer) = 0; /// open file and return a FileReader - virtual Status open_file_impl(const Path& file, const FileReaderOptions& reader_options, + virtual Status open_file_impl(const FileDescription& fd, const Path& abs_file, + const FileReaderOptions& reader_options, FileReaderSPtr* reader) = 0; /// create directory recursively diff --git a/be/src/io/fs/fs_utils.cpp b/be/src/io/fs/fs_utils.cpp deleted file mode 100644 index 7f12c6767e..0000000000 --- a/be/src/io/fs/fs_utils.cpp +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "io/fs/fs_utils.h" - -#include - -#include - -#include "io/fs/file_reader.h" -#include "io/fs/file_reader_writer_fwd.h" -#include "io/fs/file_system.h" - -namespace doris { -namespace io { - -Status read_file_to_string(FileSystemSPtr fs, const Path& file, std::string* content) { - FileReaderSPtr file_reader; - RETURN_IF_ERROR(fs->open_file(file, &file_reader)); - size_t file_size = file_reader->size(); - content->resize(file_size); - size_t bytes_read = 0; - RETURN_IF_ERROR(file_reader->read_at(0, {*content}, &bytes_read)); - if (bytes_read != file_size) { - return Status::IOError("failed to read file {} to string. bytes read: {}, file size: {}", - file.native(), bytes_read, file_size); - } - return file_reader->close(); -} - -} // namespace io -} // namespace doris diff --git a/be/src/io/fs/fs_utils.h b/be/src/io/fs/fs_utils.h index 39744d25a0..64ad2c6ea8 100644 --- a/be/src/io/fs/fs_utils.h +++ b/be/src/io/fs/fs_utils.h @@ -17,110 +17,36 @@ #pragma once +#include #include #include -#include #include #include "common/status.h" -#include "io/fs/file_system.h" #include "io/fs/path.h" namespace doris { namespace io { -struct FilePathDesc { - FilePathDesc(const std::string& path) { filepath = path; } - FilePathDesc() = default; - TStorageMedium::type storage_medium = TStorageMedium::HDD; - std::string filepath; - std::string remote_path; - std::string storage_name; - io::FileSystem* file_system; - - std::string debug_string() const { - std::stringstream ss; - ss << "storage_medium: " << to_string(storage_medium) << ", local_path: " << filepath; - if (!remote_path.empty()) { - ss << ", storage_name: " << storage_name << ", remote_path: " << remote_path; - } - return ss.str(); - } - // REMOTE_CACHE is the local cache path for remote path, if a data_dir is REMOTE_CACHE, - // it means the tablet in it will be set as a remote path. - static bool is_remote(TStorageMedium::type checked_storage_medium) { - return checked_storage_medium == TStorageMedium::S3 || - checked_storage_medium == TStorageMedium::REMOTE_CACHE; - } - bool is_remote() const { return is_remote(storage_medium); } +struct FileSystemProperties { + TFileType::type system_type; + std::map properties; + THdfsParams hdfs_params; + std::vector broker_addresses; }; -class FilePathDescStream { -public: - FilePathDescStream& operator<<(const FilePathDesc& val) { - _filepath_stream << val.filepath; - _storage_medium = val.storage_medium; - _storage_name = val.storage_name; - if (FilePathDesc::is_remote(_storage_medium)) { - _remote_path_stream << val.remote_path; - } - return *this; - } - FilePathDescStream& operator<<(const std::string& val) { - _filepath_stream << val; - if (FilePathDesc::is_remote(_storage_medium)) { - _remote_path_stream << val; - } - return *this; - } - FilePathDescStream& operator<<(uint64_t val) { - _filepath_stream << val; - if (FilePathDesc::is_remote(_storage_medium)) { - _remote_path_stream << val; - } - return *this; - } - FilePathDescStream& operator<<(int64_t val) { - _filepath_stream << val; - if (FilePathDesc::is_remote(_storage_medium)) { - _remote_path_stream << val; - } - return *this; - } - FilePathDescStream& operator<<(uint32_t val) { - _filepath_stream << val; - if (FilePathDesc::is_remote(_storage_medium)) { - _remote_path_stream << val; - } - return *this; - } - FilePathDescStream& operator<<(int32_t val) { - _filepath_stream << val; - if (FilePathDesc::is_remote(_storage_medium)) { - _remote_path_stream << val; - } - return *this; - } - FilePathDesc path_desc() { - FilePathDesc path_desc(_filepath_stream.str()); - path_desc.storage_medium = _storage_medium; - if (FilePathDesc::is_remote(_storage_medium)) { - path_desc.remote_path = _remote_path_stream.str(); - } - path_desc.storage_name = _storage_name; - return path_desc; - } - -private: - TStorageMedium::type _storage_medium = TStorageMedium::HDD; - std::stringstream _filepath_stream; - std::stringstream _remote_path_stream; - std::string _storage_name; +struct FileDescription { + std::string path; + int64_t start_offset; + // length of the file in bytes. + // -1 means unset. + // If the file length is not set, the file length will be fetched from the file system. + int64_t file_size = -1; + // modification time of this file. + // 0 means unset. + int64_t mtime = 0; }; -// read all data from file to string -Status read_file_to_string(FileSystemSPtr fs, const Path& file, std::string* content); - } // namespace io } // namespace doris diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 402cdb3faf..cf3a2b6563 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -36,12 +36,29 @@ namespace doris { namespace io { HdfsFileReader::HdfsFileReader(Path path, const std::string& name_node, - FileHandleCache::Accessor accessor) - : _path(std::move(path)), _name_node(name_node), _accessor(std::move(accessor)) { + FileHandleCache::Accessor accessor, RuntimeProfile* profile) + : _path(std::move(path)), + _name_node(name_node), + _accessor(std::move(accessor)), + _profile(profile) { _handle = _accessor.get(); DorisMetrics::instance()->hdfs_file_open_reading->increment(1); DorisMetrics::instance()->hdfs_file_reader_total->increment(1); + if (_profile != nullptr) { +#ifdef USE_HADOOP_HDFS + const char* hdfs_profile_name = "HdfsIO"; + ADD_TIMER(_profile, hdfs_profile_name); + _hdfs_profile.total_bytes_read = + ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, hdfs_profile_name); + _hdfs_profile.total_local_bytes_read = + ADD_CHILD_COUNTER(_profile, "TotalLocalBytesRead", TUnit::BYTES, hdfs_profile_name); + _hdfs_profile.total_short_circuit_bytes_read = ADD_CHILD_COUNTER( + _profile, "TotalShortCircuitBytesRead", TUnit::BYTES, hdfs_profile_name); + _hdfs_profile.total_total_zero_copy_bytes_read = ADD_CHILD_COUNTER( + _profile, "TotalZeroCopyBytesRead", TUnit::BYTES, hdfs_profile_name); +#endif + } } HdfsFileReader::~HdfsFileReader() { @@ -52,6 +69,25 @@ Status HdfsFileReader::close() { bool expected = false; if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { DorisMetrics::instance()->hdfs_file_open_reading->increment(-1); + if (_profile != nullptr) { +#ifdef USE_HADOOP_HDFS + struct hdfsReadStatistics* hdfs_statistics = nullptr; + auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics); + if (r != 0) { + return Status::InternalError( + fmt::format("Failed to run hdfsFileGetReadStatistics(): {}", r)); + } + COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead); + COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read, + hdfs_statistics->totalLocalBytesRead); + COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read, + hdfs_statistics->totalShortCircuitBytesRead); + COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read, + hdfs_statistics->totalZeroCopyBytesRead); + hdfsFileFreeReadStatistics(hdfs_statistics); + hdfsFileClearReadStatistics(_handle->file()); +#endif + } } return Status::OK(); } diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h index efff1bfcd6..864a55bc41 100644 --- a/be/src/io/fs/hdfs_file_reader.h +++ b/be/src/io/fs/hdfs_file_reader.h @@ -38,7 +38,8 @@ class IOContext; class HdfsFileReader : public FileReader { public: - HdfsFileReader(Path path, const std::string& name_node, FileHandleCache::Accessor accessor); + HdfsFileReader(Path path, const std::string& name_node, FileHandleCache::Accessor accessor, + RuntimeProfile* profile); ~HdfsFileReader() override; @@ -57,11 +58,24 @@ protected: const IOContext* io_ctx) override; private: +#ifdef USE_HADOOP_HDFS + struct HDFSProfile { + RuntimeProfile::Counter* total_bytes_read; + RuntimeProfile::Counter* total_local_bytes_read; + RuntimeProfile::Counter* total_short_circuit_bytes_read; + RuntimeProfile::Counter* total_total_zero_copy_bytes_read; + }; +#endif + Path _path; const std::string& _name_node; FileHandleCache::Accessor _accessor; CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle std::atomic _closed = false; + RuntimeProfile* _profile; +#ifdef USE_HADOOP_HDFS + HDFSProfile _hdfs_profile; +#endif }; } // namespace io } // namespace doris diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 775754bd4d..4473ea2a3a 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -112,18 +112,13 @@ Status HdfsFileHandleCache::get_file(const std::shared_ptr& fs, std::string fname = file.string(); RETURN_IF_ERROR(HdfsFileHandleCache::instance()->cache().get_file_handle( fs->_fs_handle->hdfs_fs, fname, mtime, file_size, false, accessor, &cache_hit)); - // if (cache_hit) { - // LOG(INFO) << "yy debug get from file handle cache: " << file.native(); - // } else { - // LOG(INFO) << "yy debug get from file handle new: " << file.native(); - // } accessor->set_fs(fs); return Status::OK(); } Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string& path, - std::shared_ptr* fs) { + RuntimeProfile* profile, std::shared_ptr* fs) { #ifdef USE_HADOOP_HDFS if (!config::enable_java_support) { return Status::InternalError( @@ -131,14 +126,16 @@ Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string& "true."); } #endif - (*fs).reset(new HdfsFileSystem(hdfs_params, path)); + (*fs).reset(new HdfsFileSystem(hdfs_params, path, profile)); return (*fs)->connect(); } -HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path) +HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path, + RuntimeProfile* profile) : RemoteFileSystem(path, "", FileSystemType::HDFS), _hdfs_params(hdfs_params), - _fs_handle(nullptr) { + _fs_handle(nullptr), + _profile(profile) { _namenode = _hdfs_params.fs_name; } @@ -165,17 +162,17 @@ Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer) return Status::OK(); } -Status HdfsFileSystem::open_file_internal(const Path& file, int64_t file_size, +Status HdfsFileSystem::open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) { CHECK_HDFS_HANDLE(_fs_handle); - Path real_path = convert_path(file, _namenode); + Path real_path = convert_path(abs_path, _namenode); FileHandleCache::Accessor accessor; RETURN_IF_ERROR(HdfsFileHandleCache::instance()->get_file( - std::static_pointer_cast(shared_from_this()), real_path, 0, file_size, - &accessor)); + std::static_pointer_cast(shared_from_this()), real_path, fd.mtime, + fd.file_size, &accessor)); - *reader = std::make_shared(file, _namenode, std::move(accessor)); + *reader = std::make_shared(abs_path, _namenode, std::move(accessor), _profile); return Status::OK(); } @@ -358,7 +355,9 @@ Status HdfsFileSystem::upload_with_checksum_impl(const Path& local, const Path& Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_file) { // 1. open remote file for read FileReaderSPtr hdfs_reader = nullptr; - RETURN_IF_ERROR(open_file_internal(remote_file, -1, &hdfs_reader)); + FileDescription fd; + fd.path = remote_file; + RETURN_IF_ERROR(open_file_internal(fd, remote_file, &hdfs_reader)); // 2. remove the existing local file if exist if (std::filesystem::remove(local_file)) { @@ -395,7 +394,9 @@ Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_ Status HdfsFileSystem::direct_download_impl(const Path& remote_file, std::string* content) { // 1. open remote file for read FileReaderSPtr hdfs_reader = nullptr; - RETURN_IF_ERROR(open_file_internal(remote_file, -1, &hdfs_reader)); + FileDescription fd; + fd.path = remote_file; + RETURN_IF_ERROR(open_file_internal(fd, remote_file, &hdfs_reader)); constexpr size_t buf_sz = 1024 * 1024; std::unique_ptr read_buf(new char[buf_sz]); diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index 7720af17cf..f365891aa9 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -33,6 +33,7 @@ #include "io/fs/hdfs.h" #include "io/fs/path.h" #include "io/fs/remote_file_system.h" +#include "util/runtime_profile.h" namespace doris { class THdfsParams; @@ -111,7 +112,7 @@ class HdfsFileHandleCache; class HdfsFileSystem final : public RemoteFileSystem { public: static Status create(const THdfsParams& hdfs_params, const std::string& path, - std::shared_ptr* fs); + RuntimeProfile* profile, std::shared_ptr* fs); ~HdfsFileSystem() override; @@ -122,7 +123,8 @@ public: protected: Status connect_impl() override; Status create_file_impl(const Path& file, FileWriterPtr* writer) override; - Status open_file_internal(const Path& file, int64_t file_size, FileReaderSPtr* reader) override; + Status open_file_internal(const FileDescription& fd, const Path& abs_path, + FileReaderSPtr* reader) override; Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; Status delete_file_impl(const Path& file) override; Status delete_directory_impl(const Path& dir) override; @@ -148,12 +150,14 @@ private: private: friend class HdfsFileWriter; - HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path); + HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path, + RuntimeProfile* profile); const THdfsParams& _hdfs_params; std::string _namenode; // do not use std::shared_ptr or std::unique_ptr // _fs_handle is managed by HdfsFileSystemCache HdfsFileSystemHandle* _fs_handle; + RuntimeProfile* _profile; }; } // namespace io } // namespace doris diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp index 9b5c8eec85..01cd8829dd 100644 --- a/be/src/io/fs/local_file_system.cpp +++ b/be/src/io/fs/local_file_system.cpp @@ -64,18 +64,20 @@ Status LocalFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer return Status::OK(); } -Status LocalFileSystem::open_file_impl(const Path& file, +Status LocalFileSystem::open_file_impl(const FileDescription& file_desc, const Path& abs_path, const FileReaderOptions& /*reader_options*/, FileReaderSPtr* reader) { - int64_t fsize = 0; - RETURN_IF_ERROR(file_size_impl(file, &fsize)); + int64_t fsize = file_desc.file_size; + if (fsize <= 0) { + RETURN_IF_ERROR(file_size_impl(abs_path, &fsize)); + } int fd = -1; - RETRY_ON_EINTR(fd, open(file.c_str(), O_RDONLY)); + RETRY_ON_EINTR(fd, open(abs_path.c_str(), O_RDONLY)); if (fd < 0) { - return Status::IOError("failed to open {}: {}", file.native(), errno_to_str()); + return Status::IOError("failed to open {}: {}", abs_path.native(), errno_to_str()); } *reader = std::make_shared( - std::move(file), fsize, fd, + std::move(abs_path), fsize, fd, std::static_pointer_cast(shared_from_this())); return Status::OK(); } @@ -404,6 +406,22 @@ bool LocalFileSystem::contain_path(const Path& parent_, const Path& sub_) { return true; } +Status LocalFileSystem::read_file_to_string(const Path& file, std::string* content) { + FileReaderSPtr file_reader; + FileDescription fd; + fd.path = file.native(); + RETURN_IF_ERROR(open_file(fd, &file_reader)); + size_t file_size = file_reader->size(); + content->resize(file_size); + size_t bytes_read = 0; + RETURN_IF_ERROR(file_reader->read_at(0, {*content}, &bytes_read)); + if (bytes_read != file_size) { + return Status::IOError("failed to read file {} to string. bytes read: {}, file size: {}", + file.native(), bytes_read, file_size); + } + return file_reader->close(); +} + static std::shared_ptr local_fs = io::LocalFileSystem::create(""); const std::shared_ptr& global_local_filesystem() { diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h index bcebd48a65..d9c0ec96c8 100644 --- a/be/src/io/fs/local_file_system.h +++ b/be/src/io/fs/local_file_system.h @@ -69,10 +69,13 @@ public: // delete dir or file Status delete_directory_or_file(const Path& path); + // read local file and save content to "content" + Status read_file_to_string(const Path& file, std::string* content); + protected: Status create_file_impl(const Path& file, FileWriterPtr* writer) override; - Status open_file_impl(const Path& file, const FileReaderOptions& reader_options, - FileReaderSPtr* reader) override; + Status open_file_impl(const FileDescription& file_desc, const Path& abs_path, + const FileReaderOptions& reader_options, FileReaderSPtr* reader) override; Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; Status delete_file_impl(const Path& file) override; Status delete_directory_impl(const Path& dir) override; diff --git a/be/src/io/fs/remote_file_system.cpp b/be/src/io/fs/remote_file_system.cpp index 611f7cf894..c169777653 100644 --- a/be/src/io/fs/remote_file_system.cpp +++ b/be/src/io/fs/remote_file_system.cpp @@ -72,10 +72,11 @@ Status RemoteFileSystem::connect() { FILESYSTEM_M(connect_impl()); } -Status RemoteFileSystem::open_file_impl(const Path& path, const FileReaderOptions& reader_options, +Status RemoteFileSystem::open_file_impl(const FileDescription& fd, const Path& abs_path, + const FileReaderOptions& reader_options, FileReaderSPtr* reader) { FileReaderSPtr raw_reader; - RETURN_IF_ERROR(open_file_internal(path, reader_options.file_size, &raw_reader)); + RETURN_IF_ERROR(open_file_internal(fd, abs_path, &raw_reader)); switch (reader_options.cache_type) { case io::FileCachePolicy::NO_CACHE: { *reader = raw_reader; @@ -83,7 +84,7 @@ Status RemoteFileSystem::open_file_impl(const Path& path, const FileReaderOption } case io::FileCachePolicy::SUB_FILE_CACHE: case io::FileCachePolicy::WHOLE_FILE_CACHE: { - std::string cache_path = reader_options.path_policy.get_cache_path(path.native()); + std::string cache_path = reader_options.path_policy.get_cache_path(abs_path.native()); io::FileCachePtr cache_reader = FileCacheManager::instance()->new_file_cache( cache_path, config::file_cache_alive_time_sec, raw_reader, reader_options.cache_type); @@ -93,7 +94,7 @@ Status RemoteFileSystem::open_file_impl(const Path& path, const FileReaderOption } case io::FileCachePolicy::FILE_BLOCK_CACHE: { StringPiece str(raw_reader->path().native()); - std::string cache_path = reader_options.path_policy.get_cache_path(path.native()); + std::string cache_path = reader_options.path_policy.get_cache_path(abs_path.native()); if (reader_options.has_cache_base_path) { // from query session variable: file_cache_base_path *reader = std::make_shared( @@ -101,7 +102,7 @@ Status RemoteFileSystem::open_file_impl(const Path& path, const FileReaderOption reader_options.modification_time); } else { *reader = std::make_shared(std::move(raw_reader), cache_path, - reader_options.modification_time); + fd.mtime); } break; } diff --git a/be/src/io/fs/remote_file_system.h b/be/src/io/fs/remote_file_system.h index 44ecde4041..559890d5ee 100644 --- a/be/src/io/fs/remote_file_system.h +++ b/be/src/io/fs/remote_file_system.h @@ -54,7 +54,8 @@ protected: /// connect to remote file system virtual Status connect_impl() = 0; - virtual Status open_file_impl(const Path& file, const FileReaderOptions& reader_options, + virtual Status open_file_impl(const FileDescription& fd, const Path& abs_path, + const FileReaderOptions& reader_options, FileReaderSPtr* reader) override; /// upload load_file to remote remote_file /// local_file should be an absolute path on local filesystem. @@ -83,7 +84,7 @@ protected: // The derived class should implement this method. // if file_size < 0, the file size should be fetched from file system - virtual Status open_file_internal(const Path& file, int64_t file_size, + virtual Status open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) = 0; }; diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index 312799d140..ca4fd0bda8 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -134,13 +134,13 @@ Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer) { return Status::OK(); } -Status S3FileSystem::open_file_internal(const Path& file, int64_t file_size, +Status S3FileSystem::open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) { - int64_t fsize = file_size; + int64_t fsize = fd.file_size; if (fsize < 0) { - RETURN_IF_ERROR(file_size_impl(file, &fsize)); + RETURN_IF_ERROR(file_size_impl(abs_path, &fsize)); } - GET_KEY(key, file); + GET_KEY(key, abs_path); auto fs_path = Path(_s3_conf.endpoint) / _s3_conf.bucket / key; *reader = std::make_shared( std::move(fs_path), fsize, std::move(key), _s3_conf.bucket, diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index 0c7e3d17e1..d2570a1058 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -69,7 +69,8 @@ public: protected: Status connect_impl() override; Status create_file_impl(const Path& file, FileWriterPtr* writer) override; - Status open_file_internal(const Path& file, int64_t file_size, FileReaderSPtr* reader) override; + Status open_file_internal(const FileDescription& fd, const Path& abs_path, + FileReaderSPtr* reader) override; Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; Status delete_file_impl(const Path& file) override; Status delete_directory_impl(const Path& dir) override; diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 970138cd0e..d156a2cbf5 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -39,7 +39,6 @@ #include "gutil/strings/substitute.h" #include "io/fs/file_reader_writer_fwd.h" #include "io/fs/file_writer.h" -#include "io/fs/fs_utils.h" #include "io/fs/local_file_system.h" #include "io/fs/path.h" #include "io/fs/remote_file_system.h" @@ -149,7 +148,7 @@ Status DataDir::read_cluster_id(const std::string& cluster_id_path, int32_t* clu if (exists) { std::string content; RETURN_IF_ERROR( - io::read_file_to_string(io::global_local_filesystem(), cluster_id_path, &content)); + io::global_local_filesystem()->read_file_to_string(cluster_id_path, &content)); if (content.size() > 0) { *cluster_id = std::stoi(content); } else { diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index 9c6adab9c5..1443a7f846 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -88,8 +88,6 @@ public: bool is_ssd_disk() const { return _storage_medium == TStorageMedium::SSD; } - bool is_remote() const { return io::FilePathDesc::is_remote(_storage_medium); } - TStorageMedium::type storage_medium() const { return _storage_medium; } void register_tablet(Tablet* tablet); diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 598a14f1b0..c6d472d035 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -78,15 +78,16 @@ Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t se const io::FileReaderOptions& reader_options, std::shared_ptr* output) { io::FileReaderSPtr file_reader; + io::FileDescription fd; + fd.path = path; #ifndef BE_TEST - RETURN_IF_ERROR(fs->open_file(path, reader_options, &file_reader)); + RETURN_IF_ERROR(fs->open_file(fd, reader_options, &file_reader)); #else // be ut use local file reader instead of remote file reader while use remote cache if (!config::file_cache_type.empty()) { - RETURN_IF_ERROR( - io::global_local_filesystem()->open_file(path, reader_options, &file_reader)); + RETURN_IF_ERROR(io::global_local_filesystem()->open_file(fd, reader_options, &file_reader)); } else { - RETURN_IF_ERROR(fs->open_file(path, reader_options, &file_reader)); + RETURN_IF_ERROR(fs->open_file(fd, reader_options, &file_reader)); } #endif diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index f1b58fa454..3ff8229bc3 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -86,7 +86,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l } else if (TStorageBackendType::type::HDFS == type) { THdfsParams hdfs_params = parse_properties(_prop); std::shared_ptr fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); _remote_fs = std::move(fs); } else if (TStorageBackendType::type::BROKER == type) { std::shared_ptr fs; diff --git a/be/src/util/os_util.cpp b/be/src/util/os_util.cpp index 0b942c5f04..84cc364aba 100644 --- a/be/src/util/os_util.cpp +++ b/be/src/util/os_util.cpp @@ -33,7 +33,6 @@ #include "gutil/strings/numbers.h" #include "gutil/strings/split.h" #include "gutil/strings/substitute.h" -#include "io/fs/fs_utils.h" #include "io/fs/local_file_system.h" using std::string; @@ -104,9 +103,8 @@ Status get_thread_stats(int64_t tid, ThreadStats* stats) { return Status::NotSupported("ThreadStats not supported"); } std::string buf; - RETURN_IF_ERROR(io::read_file_to_string(io::global_local_filesystem(), - strings::Substitute("/proc/self/task/$0/stat", tid), - &buf)); + RETURN_IF_ERROR(io::global_local_filesystem()->read_file_to_string( + strings::Substitute("/proc/self/task/$0/stat", tid), &buf)); return parse_stat(buf, nullptr, stats); } void disable_core_dumps() { diff --git a/be/src/util/path_util.cpp b/be/src/util/path_util.cpp index d3e860044f..f1c96a0fc0 100644 --- a/be/src/util/path_util.cpp +++ b/be/src/util/path_util.cpp @@ -44,13 +44,6 @@ std::string join_path_segments(const string& a, const string& b) { } } -FilePathDesc join_path_desc_segments(const FilePathDesc& path_desc, const string& b) { - FilePathDesc seg_path_desc = path_desc; - seg_path_desc.filepath = join_path_segments(path_desc.filepath, b); - seg_path_desc.remote_path = join_path_segments(path_desc.remote_path, b); - return seg_path_desc; -} - std::vector join_path_segments_v(const std::vector& v, const string& s) { std::vector out; for (const string& path : v) { diff --git a/be/src/util/path_util.h b/be/src/util/path_util.h index 1a1049fcbe..1376d2c32f 100644 --- a/be/src/util/path_util.h +++ b/be/src/util/path_util.h @@ -21,13 +21,9 @@ #include #include -#include "io/fs/fs_utils.h" - namespace doris { namespace path_util { -using doris::io::FilePathDesc; - // NOTE: The methods here are only related to path processing, do not involve // any file and IO operations. @@ -40,8 +36,6 @@ std::string join_path_segments(const std::string& a, const std::string& b); std::vector join_path_segments_v(const std::vector& v, const std::string& s); -FilePathDesc join_path_desc_segments(const FilePathDesc& path_desc, const std::string& b); - // Split a path into segments with the appropriate path separator. std::vector split_path(const std::string& path); diff --git a/be/src/vec/core/block_spill_reader.cpp b/be/src/vec/core/block_spill_reader.cpp index cea20aee0b..8d2d481229 100644 --- a/be/src/vec/core/block_spill_reader.cpp +++ b/be/src/vec/core/block_spill_reader.cpp @@ -47,14 +47,14 @@ void BlockSpillReader::_init_profile() { Status BlockSpillReader::open() { std::shared_ptr file_system; - FileSystemProperties system_properties; + io::FileSystemProperties system_properties; system_properties.system_type = TFileType::FILE_LOCAL; - FileDescription file_description; + io::FileDescription file_description; file_description.path = file_path_; - + io::FileReaderOptions reader_options = io::FileReaderOptions::DEFAULT; RETURN_IF_ERROR(FileFactory::create_file_reader(system_properties, file_description, - &file_system, &file_reader_)); + reader_options, &file_system, &file_reader_)); size_t file_size = file_reader_->size(); diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index ad34dee80b..8db8579402 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -170,11 +170,10 @@ Status CsvReader::init_reader(bool is_load) { _state->fragment_instance_id())); } else { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); - reader_options.modification_time = - _range.__isset.modification_time ? _range.modification_time : 0; + _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &_file_reader, - io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx, + _profile, _system_properties, _file_description, reader_options, &_file_system, + &_file_reader, io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); } if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && @@ -659,10 +658,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { _file_description.start_offset = start_offset; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); - reader_options.modification_time = - _range.__isset.modification_time ? _range.modification_time : 0; + _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description, - &_file_system, &_file_reader, reader_options)); + reader_options, &_file_system, &_file_reader)); if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && _params.file_type != TFileType::FILE_BROKER) { return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path); diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 783239a3b8..83010af83b 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -108,8 +108,8 @@ private: ScannerCounter* _counter; const TFileScanRangeParams& _params; const TFileRangeDesc& _range; - FileSystemProperties _system_properties; - FileDescription _file_description; + io::FileSystemProperties _system_properties; + io::FileDescription _file_description; const std::vector& _file_slot_descs; // Only for query task, save the file slot to columns in block map. // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3 diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 5c3fbd8c70..157b8a63e9 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -382,11 +382,10 @@ Status NewJsonReader::_open_file_reader() { _state->fragment_instance_id())); } else { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); - reader_options.modification_time = - _range.__isset.modification_time ? _range.modification_time : 0; + _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &_file_reader, - io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx, + _profile, _system_properties, _file_description, reader_options, &_file_system, + &_file_reader, io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); } return Status::OK(); diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index 720da50397..5dbd5b5c28 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -190,8 +190,8 @@ private: ScannerCounter* _counter; const TFileScanRangeParams& _params; const TFileRangeDesc& _range; - FileSystemProperties _system_properties; - FileDescription _file_description; + io::FileSystemProperties _system_properties; + io::FileDescription _file_description; const std::vector& _file_slot_descs; std::shared_ptr _file_system; diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 140ca30987..7f87f77590 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -211,11 +211,11 @@ Status OrcReader::_create_file_reader() { if (_file_input_stream == nullptr) { io::FileReaderSPtr inner_reader; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); - reader_options.modification_time = + _file_description.mtime = _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &inner_reader, - io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx)); + _profile, _system_properties, _file_description, reader_options, &_file_system, + &inner_reader, io::DelegateReader::AccessMode::RANDOM, _io_ctx)); _file_input_stream.reset(new ORCFileInputStream(_scan_range.path, inner_reader, &_statistics, _io_ctx, _profile)); } diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 3068d55681..85a73e0e21 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -484,8 +484,8 @@ private: RuntimeState* _state = nullptr; const TFileScanRangeParams& _scan_params; const TFileRangeDesc& _scan_range; - FileSystemProperties _system_properties; - FileDescription _file_description; + io::FileSystemProperties _system_properties; + io::FileDescription _file_description; size_t _batch_size; int64_t _range_start_offset; int64_t _range_size; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 7a8c1c81f5..9b179384e2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -213,11 +213,11 @@ Status ParquetReader::_open_file() { SCOPED_RAW_TIMER(&_statistics.open_file_time); ++_statistics.open_file_num; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); - reader_options.modification_time = + _file_description.mtime = _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &_file_reader, - io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx)); + _profile, _system_properties, _file_description, reader_options, &_file_system, + &_file_reader, io::DelegateReader::AccessMode::RANDOM, _io_ctx)); } if (_file_metadata == nullptr) { SCOPED_RAW_TIMER(&_statistics.parse_footer_time); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index d3584b2993..63f760abcd 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -212,8 +212,8 @@ private: RuntimeProfile* _profile; const TFileScanRangeParams& _scan_params; const TFileRangeDesc& _scan_range; - FileSystemProperties _system_properties; - FileDescription _file_description; + io::FileSystemProperties _system_properties; + io::FileDescription _file_description; std::shared_ptr _file_system = nullptr; io::FileReaderSPtr _file_reader = nullptr; ObjLRUCache::CacheHandle _cache_handle; diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp index ed408e5f7a..8977cd0c47 100644 --- a/be/src/vec/runtime/vfile_result_writer.cpp +++ b/be/src/vec/runtime/vfile_result_writer.cpp @@ -595,7 +595,7 @@ Status VFileResultWriter::_delete_dir() { case TStorageBackendType::HDFS: { THdfsParams hdfs_params = parse_properties(_file_opts->broker_properties); std::shared_ptr hdfs_fs = nullptr; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &hdfs_fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &hdfs_fs)); file_system = hdfs_fs; break; } diff --git a/be/test/olap/primary_key_index_test.cpp b/be/test/olap/primary_key_index_test.cpp index ac277fecec..837d258700 100644 --- a/be/test/olap/primary_key_index_test.cpp +++ b/be/test/olap/primary_key_index_test.cpp @@ -75,7 +75,6 @@ TEST_F(PrimaryKeyIndexTest, builder) { EXPECT_TRUE(file_writer->close().ok()); EXPECT_EQ(num_rows, builder.num_rows()); - io::FilePathDesc path_desc(filename); PrimaryKeyIndexReader index_reader; io::FileReaderSPtr file_reader; EXPECT_TRUE(fs->open_file(filename, &file_reader).ok()); diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index ba7bcbb8b1..f17c8cca43 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -206,9 +206,11 @@ protected: return Status::OK(); } - Status open_file_internal(const Path& file, int64_t file_size, + Status open_file_internal(const io::FileDescription& fd, const Path& abs_path, io::FileReaderSPtr* reader) override { - return _local_fs->open_file(get_remote_path(file), io::FileReaderOptions::DEFAULT, reader); + io::FileDescription tmp_fd; + tmp_fd.path = get_remote_path(abs_path); + return _local_fs->open_file(tmp_fd, io::FileReaderOptions::DEFAULT, reader); } Status connect_impl() override { return Status::OK(); }