[fix][improvement](fs) add HdfsIO profile and modification time (#21638)
Refactor the interface of create_file_reader the file_size and mtime are merged into FileDescription, not in FileReaderOptions anymore. Now the file handle cache can get correct file's modification time from FileDescription. Add HdfsIO for hdfs file reader pick from [Enhancement](multi-catalog) Add hdfs read statistics profile. #21442
This commit is contained in:
@ -97,7 +97,7 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
|
||||
case TFileType::FILE_HDFS: {
|
||||
THdfsParams hdfs_params = parse_properties(properties);
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
|
||||
RETURN_IF_ERROR(fs->create_file(path, &file_writer));
|
||||
break;
|
||||
}
|
||||
@ -108,33 +108,32 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status FileFactory::create_file_reader(const FileSystemProperties& system_properties,
|
||||
const FileDescription& file_description,
|
||||
Status FileFactory::create_file_reader(const io::FileSystemProperties& system_properties,
|
||||
const io::FileDescription& file_description,
|
||||
const io::FileReaderOptions& reader_options,
|
||||
std::shared_ptr<io::FileSystem>* file_system,
|
||||
io::FileReaderSPtr* file_reader,
|
||||
io::FileReaderOptions reader_options) {
|
||||
io::FileReaderSPtr* file_reader, RuntimeProfile* profile) {
|
||||
TFileType::type type = system_properties.system_type;
|
||||
reader_options.file_size = file_description.file_size;
|
||||
switch (type) {
|
||||
case TFileType::FILE_LOCAL: {
|
||||
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_description.path,
|
||||
reader_options, file_reader));
|
||||
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_description, reader_options,
|
||||
file_reader));
|
||||
break;
|
||||
}
|
||||
case TFileType::FILE_S3: {
|
||||
RETURN_IF_ERROR(create_s3_reader(system_properties.properties, file_description.path,
|
||||
file_system, file_reader, reader_options));
|
||||
RETURN_IF_ERROR(create_s3_reader(system_properties.properties, file_description,
|
||||
reader_options, file_system, file_reader));
|
||||
break;
|
||||
}
|
||||
case TFileType::FILE_HDFS: {
|
||||
RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, file_description.path,
|
||||
file_system, file_reader, reader_options));
|
||||
RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, file_description,
|
||||
reader_options, file_system, file_reader, profile));
|
||||
break;
|
||||
}
|
||||
case TFileType::FILE_BROKER: {
|
||||
RETURN_IF_ERROR(create_broker_reader(system_properties.broker_addresses[0],
|
||||
system_properties.properties, file_description,
|
||||
file_system, file_reader, reader_options));
|
||||
reader_options, file_system, file_reader));
|
||||
break;
|
||||
}
|
||||
default:
|
||||
@ -165,42 +164,43 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
|
||||
Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params,
|
||||
const io::FileDescription& fd,
|
||||
const io::FileReaderOptions& reader_options,
|
||||
std::shared_ptr<io::FileSystem>* hdfs_file_system,
|
||||
io::FileReaderSPtr* reader,
|
||||
const io::FileReaderOptions& reader_options) {
|
||||
io::FileReaderSPtr* reader, RuntimeProfile* profile) {
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(fs->open_file(path, reader_options, reader));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", profile, &fs));
|
||||
RETURN_IF_ERROR(fs->open_file(fd, reader_options, reader));
|
||||
*hdfs_file_system = std::move(fs);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status FileFactory::create_s3_reader(const std::map<std::string, std::string>& prop,
|
||||
const std::string& path,
|
||||
const io::FileDescription& fd,
|
||||
const io::FileReaderOptions& reader_options,
|
||||
std::shared_ptr<io::FileSystem>* s3_file_system,
|
||||
io::FileReaderSPtr* reader,
|
||||
const io::FileReaderOptions& reader_options) {
|
||||
S3URI s3_uri(path);
|
||||
io::FileReaderSPtr* reader) {
|
||||
S3URI s3_uri(fd.path);
|
||||
RETURN_IF_ERROR(s3_uri.parse());
|
||||
S3Conf s3_conf;
|
||||
RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(prop, s3_uri, &s3_conf));
|
||||
std::shared_ptr<io::S3FileSystem> fs;
|
||||
RETURN_IF_ERROR(io::S3FileSystem::create(std::move(s3_conf), "", &fs));
|
||||
RETURN_IF_ERROR(fs->open_file(path, reader_options, reader));
|
||||
RETURN_IF_ERROR(fs->open_file(fd, reader_options, reader));
|
||||
*s3_file_system = std::move(fs);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status FileFactory::create_broker_reader(const TNetworkAddress& broker_addr,
|
||||
const std::map<std::string, std::string>& prop,
|
||||
const FileDescription& file_description,
|
||||
const io::FileDescription& fd,
|
||||
const io::FileReaderOptions& reader_options,
|
||||
std::shared_ptr<io::FileSystem>* broker_file_system,
|
||||
io::FileReaderSPtr* reader,
|
||||
const io::FileReaderOptions& reader_options) {
|
||||
io::FileReaderSPtr* reader) {
|
||||
std::shared_ptr<io::BrokerFileSystem> fs;
|
||||
RETURN_IF_ERROR(io::BrokerFileSystem::create(broker_addr, prop, &fs));
|
||||
RETURN_IF_ERROR(fs->open_file(file_description.path, reader_options, reader));
|
||||
RETURN_IF_ERROR(fs->open_file(fd, reader_options, reader));
|
||||
*broker_file_system = std::move(fs);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -31,6 +31,7 @@
|
||||
#include "common/status.h"
|
||||
#include "io/fs/file_reader_options.h"
|
||||
#include "io/fs/file_reader_writer_fwd.h"
|
||||
#include "io/fs/fs_utils.h"
|
||||
|
||||
namespace doris {
|
||||
namespace io {
|
||||
@ -41,19 +42,6 @@ class ExecEnv;
|
||||
class RuntimeProfile;
|
||||
class RuntimeState;
|
||||
|
||||
struct FileSystemProperties {
|
||||
TFileType::type system_type;
|
||||
std::map<std::string, std::string> properties;
|
||||
THdfsParams hdfs_params;
|
||||
std::vector<TNetworkAddress> broker_addresses;
|
||||
};
|
||||
|
||||
struct FileDescription {
|
||||
std::string path;
|
||||
int64_t start_offset;
|
||||
int64_t file_size;
|
||||
};
|
||||
|
||||
class FileFactory {
|
||||
ENABLE_FACTORY_CREATOR(FileFactory);
|
||||
|
||||
@ -69,32 +57,34 @@ public:
|
||||
std::unique_ptr<io::FileWriter>& file_writer);
|
||||
|
||||
/// Create FileReader
|
||||
static Status create_file_reader(
|
||||
const FileSystemProperties& system_properties, const FileDescription& file_description,
|
||||
std::shared_ptr<io::FileSystem>* file_system, io::FileReaderSPtr* file_reader,
|
||||
io::FileReaderOptions reader_options = NO_CACHE_READER_OPTIONS);
|
||||
static Status create_file_reader(const io::FileSystemProperties& system_properties,
|
||||
const io::FileDescription& file_description,
|
||||
const io::FileReaderOptions& reader_options,
|
||||
std::shared_ptr<io::FileSystem>* file_system,
|
||||
io::FileReaderSPtr* file_reader,
|
||||
RuntimeProfile* profile = nullptr);
|
||||
|
||||
// Create FileReader for stream load pipe
|
||||
static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
|
||||
const TUniqueId& fragment_instance_id);
|
||||
|
||||
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
|
||||
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const io::FileDescription& fd,
|
||||
const io::FileReaderOptions& reader_options,
|
||||
std::shared_ptr<io::FileSystem>* hdfs_file_system,
|
||||
io::FileReaderSPtr* reader,
|
||||
const io::FileReaderOptions& reader_options);
|
||||
io::FileReaderSPtr* reader, RuntimeProfile* profile);
|
||||
|
||||
static Status create_s3_reader(const std::map<std::string, std::string>& prop,
|
||||
const std::string& path,
|
||||
const io::FileDescription& fd,
|
||||
const io::FileReaderOptions& reader_options,
|
||||
std::shared_ptr<io::FileSystem>* s3_file_system,
|
||||
io::FileReaderSPtr* reader,
|
||||
const io::FileReaderOptions& reader_options);
|
||||
io::FileReaderSPtr* reader);
|
||||
|
||||
static Status create_broker_reader(const TNetworkAddress& broker_addr,
|
||||
const std::map<std::string, std::string>& prop,
|
||||
const FileDescription& file_description,
|
||||
const io::FileDescription& fd,
|
||||
const io::FileReaderOptions& reader_options,
|
||||
std::shared_ptr<io::FileSystem>* hdfs_file_system,
|
||||
io::FileReaderSPtr* reader,
|
||||
const io::FileReaderOptions& reader_options);
|
||||
io::FileReaderSPtr* reader);
|
||||
|
||||
static TFileType::type convert_storage_type(TStorageBackendType::type type) {
|
||||
switch (type) {
|
||||
|
||||
@ -50,8 +50,10 @@ public:
|
||||
io::FileReaderSPtr reader;
|
||||
io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
RETURN_IF_ERROR(
|
||||
FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, reader_opts));
|
||||
io::FileDescription fd;
|
||||
fd.path = file_path;
|
||||
RETURN_IF_ERROR(FileFactory::create_hdfs_reader(hdfs_params, fd, reader_opts, &fs, &reader,
|
||||
nullptr));
|
||||
auto end = std::chrono::high_resolution_clock::now();
|
||||
auto elapsed_seconds =
|
||||
std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
|
||||
@ -94,7 +96,7 @@ public:
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
io::FileWriterPtr writer;
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
|
||||
RETURN_IF_ERROR(fs->create_file(file_path, &writer));
|
||||
return write(state, writer.get());
|
||||
}
|
||||
@ -115,7 +117,7 @@ public:
|
||||
auto new_file_path = file_path + "_new";
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
RETURN_IF_ERROR(fs->rename(file_path, new_file_path));
|
||||
@ -142,7 +144,7 @@ public:
|
||||
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
bool res = false;
|
||||
|
||||
@ -66,9 +66,11 @@ public:
|
||||
|
||||
io::FileReaderSPtr reader;
|
||||
io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
|
||||
io::FileDescription fd;
|
||||
fd.path = file_path;
|
||||
RETURN_IF_ERROR(FileFactory::create_s3_reader(
|
||||
_conf_map, file_path, reinterpret_cast<std::shared_ptr<io::FileSystem>*>(&fs),
|
||||
&reader, reader_opts));
|
||||
_conf_map, fd, reader_opts, reinterpret_cast<std::shared_ptr<io::FileSystem>*>(&fs),
|
||||
&reader));
|
||||
return read(state, reader);
|
||||
}
|
||||
};
|
||||
@ -118,8 +120,8 @@ public:
|
||||
io::FileReaderOptions reader_options = FileFactory::get_reader_options(nullptr);
|
||||
IOContext io_ctx;
|
||||
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
|
||||
nullptr, fs_props, fd, &fs, &reader, io::DelegateReader::AccessMode::SEQUENTIAL,
|
||||
reader_options, &io_ctx));
|
||||
nullptr, fs_props, fd, reader_options, &fs, &reader,
|
||||
io::DelegateReader::AccessMode::SEQUENTIAL, &io_ctx));
|
||||
return read(state, reader);
|
||||
}
|
||||
};
|
||||
|
||||
@ -101,17 +101,17 @@ Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* write
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BrokerFileSystem::open_file_internal(const Path& file, int64_t file_size,
|
||||
Status BrokerFileSystem::open_file_internal(const FileDescription& fd, const Path& abs_path,
|
||||
FileReaderSPtr* reader) {
|
||||
int64_t fsize = file_size;
|
||||
if (fsize < 0) {
|
||||
RETURN_IF_ERROR(file_size_impl(file, &fsize));
|
||||
int64_t fsize = fd.file_size;
|
||||
if (fsize <= 0) {
|
||||
RETURN_IF_ERROR(file_size_impl(abs_path, &fsize));
|
||||
}
|
||||
|
||||
CHECK_BROKER_CLIENT(_client);
|
||||
TBrokerOpenReaderRequest request;
|
||||
request.__set_version(TBrokerVersion::VERSION_ONE);
|
||||
request.__set_path(file);
|
||||
request.__set_path(abs_path);
|
||||
request.__set_startOffset(0);
|
||||
request.__set_clientId(client_id(_broker_addr));
|
||||
request.__set_properties(_broker_prop);
|
||||
@ -127,15 +127,16 @@ Status BrokerFileSystem::open_file_internal(const Path& file, int64_t file_size,
|
||||
(*_client)->openReader(*response, request);
|
||||
}
|
||||
} catch (apache::thrift::TException& e) {
|
||||
return Status::RpcError("failed to open file {}: {}", file.native(), error_msg(e.what()));
|
||||
return Status::RpcError("failed to open file {}: {}", abs_path.native(),
|
||||
error_msg(e.what()));
|
||||
}
|
||||
|
||||
if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) {
|
||||
return Status::IOError("failed to open file {}: {}", file.native(),
|
||||
return Status::IOError("failed to open file {}: {}", abs_path.native(),
|
||||
error_msg(response->opStatus.message));
|
||||
}
|
||||
*reader = std::make_shared<BrokerFileReader>(
|
||||
_broker_addr, file, fsize, response->fd,
|
||||
_broker_addr, abs_path, fsize, response->fd,
|
||||
std::static_pointer_cast<BrokerFileSystem>(shared_from_this()));
|
||||
return Status::OK();
|
||||
}
|
||||
@ -406,7 +407,9 @@ Status BrokerFileSystem::upload_with_checksum_impl(const Path& local_file, const
|
||||
Status BrokerFileSystem::download_impl(const Path& remote_file, const Path& local_file) {
|
||||
// 1. open remote file for read
|
||||
FileReaderSPtr broker_reader = nullptr;
|
||||
RETURN_IF_ERROR(open_file_internal(remote_file, -1, &broker_reader));
|
||||
FileDescription fd;
|
||||
fd.path = remote_file.native();
|
||||
RETURN_IF_ERROR(open_file_internal(fd, remote_file, &broker_reader));
|
||||
|
||||
// 2. remove the existing local file if exist
|
||||
if (std::filesystem::remove(local_file)) {
|
||||
@ -440,10 +443,12 @@ Status BrokerFileSystem::download_impl(const Path& remote_file, const Path& loca
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BrokerFileSystem::direct_download_impl(const Path& remote_impl, std::string* content) {
|
||||
Status BrokerFileSystem::direct_download_impl(const Path& remote_file, std::string* content) {
|
||||
// 1. open remote file for read
|
||||
FileReaderSPtr broker_reader = nullptr;
|
||||
RETURN_IF_ERROR(open_file_internal(remote_impl, -1, &broker_reader));
|
||||
FileDescription fd;
|
||||
fd.path = remote_file.native();
|
||||
RETURN_IF_ERROR(open_file_internal(fd, remote_file, &broker_reader));
|
||||
|
||||
constexpr size_t buf_sz = 1024 * 1024;
|
||||
std::unique_ptr<char[]> read_buf(new char[buf_sz]);
|
||||
|
||||
@ -49,7 +49,8 @@ public:
|
||||
protected:
|
||||
Status connect_impl() override;
|
||||
Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
|
||||
Status open_file_internal(const Path& file, int64_t file_size, FileReaderSPtr* reader) override;
|
||||
Status open_file_internal(const FileDescription& fd, const Path& abs_path,
|
||||
FileReaderSPtr* reader) override;
|
||||
Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override;
|
||||
Status delete_file_impl(const Path& file) override;
|
||||
Status delete_directory_impl(const Path& dir) override;
|
||||
|
||||
@ -760,13 +760,13 @@ Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset,
|
||||
Status DelegateReader::create_file_reader(RuntimeProfile* profile,
|
||||
const FileSystemProperties& system_properties,
|
||||
const FileDescription& file_description,
|
||||
const io::FileReaderOptions& reader_options,
|
||||
std::shared_ptr<io::FileSystem>* file_system,
|
||||
io::FileReaderSPtr* file_reader, AccessMode access_mode,
|
||||
io::FileReaderOptions reader_options,
|
||||
const IOContext* io_ctx, const PrefetchRange file_range) {
|
||||
io::FileReaderSPtr reader;
|
||||
RETURN_IF_ERROR(FileFactory::create_file_reader(system_properties, file_description,
|
||||
file_system, &reader, reader_options));
|
||||
reader_options, file_system, &reader, profile));
|
||||
if (reader->size() < IN_MEMORY_FILE_SIZE) {
|
||||
*file_reader = std::make_shared<InMemoryFileReader>(reader);
|
||||
} else if (access_mode == AccessMode::SEQUENTIAL) {
|
||||
|
||||
@ -250,10 +250,9 @@ public:
|
||||
|
||||
static Status create_file_reader(
|
||||
RuntimeProfile* profile, const FileSystemProperties& system_properties,
|
||||
const FileDescription& file_description, std::shared_ptr<io::FileSystem>* file_system,
|
||||
io::FileReaderSPtr* file_reader, AccessMode access_mode = SEQUENTIAL,
|
||||
io::FileReaderOptions reader_options = FileFactory::NO_CACHE_READER_OPTIONS,
|
||||
const IOContext* io_ctx = nullptr,
|
||||
const FileDescription& file_description, const io::FileReaderOptions& reader_options,
|
||||
std::shared_ptr<io::FileSystem>* file_system, io::FileReaderSPtr* file_reader,
|
||||
AccessMode access_mode = SEQUENTIAL, const IOContext* io_ctx = nullptr,
|
||||
const PrefetchRange file_range = PrefetchRange(0, 0));
|
||||
};
|
||||
|
||||
|
||||
@ -71,10 +71,6 @@ public:
|
||||
|
||||
FileCachePolicy cache_type;
|
||||
const CachePathPolicy& path_policy;
|
||||
// length of the file in bytes.
|
||||
// -1 means unset.
|
||||
// If the file length is not set, the file length will be fetched from the file system.
|
||||
int64_t file_size = -1;
|
||||
bool has_cache_base_path = false;
|
||||
std::string cache_base_path;
|
||||
// Use modification time to determine whether the file is changed
|
||||
|
||||
@ -27,10 +27,10 @@ Status FileSystem::create_file(const Path& file, FileWriterPtr* writer) {
|
||||
FILESYSTEM_M(create_file_impl(path, writer));
|
||||
}
|
||||
|
||||
Status FileSystem::open_file(const Path& file, const FileReaderOptions& reader_options,
|
||||
Status FileSystem::open_file(const FileDescription& fd, const FileReaderOptions& reader_options,
|
||||
FileReaderSPtr* reader) {
|
||||
auto path = absolute_path(file);
|
||||
FILESYSTEM_M(open_file_impl(path, reader_options, reader));
|
||||
auto path = absolute_path(fd.path);
|
||||
FILESYSTEM_M(open_file_impl(fd, path, reader_options, reader));
|
||||
}
|
||||
|
||||
Status FileSystem::create_directory(const Path& dir, bool failed_if_exists) {
|
||||
|
||||
@ -32,6 +32,7 @@
|
||||
#include "common/status.h"
|
||||
#include "io/fs/file_reader_options.h"
|
||||
#include "io/fs/file_reader_writer_fwd.h"
|
||||
#include "io/fs/fs_utils.h"
|
||||
#include "io/fs/path.h"
|
||||
|
||||
namespace doris {
|
||||
@ -75,9 +76,14 @@ public:
|
||||
// And derived classes should implement all xxx_impl methods.
|
||||
Status create_file(const Path& file, FileWriterPtr* writer);
|
||||
Status open_file(const Path& file, FileReaderSPtr* reader) {
|
||||
return open_file(file, FileReaderOptions::DEFAULT, reader);
|
||||
FileDescription fd;
|
||||
fd.path = file.native();
|
||||
return open_file(fd, FileReaderOptions::DEFAULT, reader);
|
||||
}
|
||||
Status open_file(const Path& file, const FileReaderOptions& reader_options,
|
||||
Status open_file(const FileDescription& fd, FileReaderSPtr* reader) {
|
||||
return open_file(fd, FileReaderOptions::DEFAULT, reader);
|
||||
}
|
||||
Status open_file(const FileDescription& fd, const FileReaderOptions& reader_options,
|
||||
FileReaderSPtr* reader);
|
||||
Status create_directory(const Path& dir, bool failed_if_exists = false);
|
||||
Status delete_file(const Path& file);
|
||||
@ -112,7 +118,8 @@ protected:
|
||||
virtual Status create_file_impl(const Path& file, FileWriterPtr* writer) = 0;
|
||||
|
||||
/// open file and return a FileReader
|
||||
virtual Status open_file_impl(const Path& file, const FileReaderOptions& reader_options,
|
||||
virtual Status open_file_impl(const FileDescription& fd, const Path& abs_file,
|
||||
const FileReaderOptions& reader_options,
|
||||
FileReaderSPtr* reader) = 0;
|
||||
|
||||
/// create directory recursively
|
||||
|
||||
@ -1,46 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "io/fs/fs_utils.h"
|
||||
|
||||
#include <stddef.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "io/fs/file_reader.h"
|
||||
#include "io/fs/file_reader_writer_fwd.h"
|
||||
#include "io/fs/file_system.h"
|
||||
|
||||
namespace doris {
|
||||
namespace io {
|
||||
|
||||
Status read_file_to_string(FileSystemSPtr fs, const Path& file, std::string* content) {
|
||||
FileReaderSPtr file_reader;
|
||||
RETURN_IF_ERROR(fs->open_file(file, &file_reader));
|
||||
size_t file_size = file_reader->size();
|
||||
content->resize(file_size);
|
||||
size_t bytes_read = 0;
|
||||
RETURN_IF_ERROR(file_reader->read_at(0, {*content}, &bytes_read));
|
||||
if (bytes_read != file_size) {
|
||||
return Status::IOError("failed to read file {} to string. bytes read: {}, file size: {}",
|
||||
file.native(), bytes_read, file_size);
|
||||
}
|
||||
return file_reader->close();
|
||||
}
|
||||
|
||||
} // namespace io
|
||||
} // namespace doris
|
||||
@ -17,110 +17,36 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <gen_cpp/PlanNodes_types.h>
|
||||
#include <gen_cpp/Types_types.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <ostream>
|
||||
#include <string>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "io/fs/file_system.h"
|
||||
#include "io/fs/path.h"
|
||||
|
||||
namespace doris {
|
||||
namespace io {
|
||||
|
||||
struct FilePathDesc {
|
||||
FilePathDesc(const std::string& path) { filepath = path; }
|
||||
FilePathDesc() = default;
|
||||
TStorageMedium::type storage_medium = TStorageMedium::HDD;
|
||||
std::string filepath;
|
||||
std::string remote_path;
|
||||
std::string storage_name;
|
||||
io::FileSystem* file_system;
|
||||
|
||||
std::string debug_string() const {
|
||||
std::stringstream ss;
|
||||
ss << "storage_medium: " << to_string(storage_medium) << ", local_path: " << filepath;
|
||||
if (!remote_path.empty()) {
|
||||
ss << ", storage_name: " << storage_name << ", remote_path: " << remote_path;
|
||||
}
|
||||
return ss.str();
|
||||
}
|
||||
// REMOTE_CACHE is the local cache path for remote path, if a data_dir is REMOTE_CACHE,
|
||||
// it means the tablet in it will be set as a remote path.
|
||||
static bool is_remote(TStorageMedium::type checked_storage_medium) {
|
||||
return checked_storage_medium == TStorageMedium::S3 ||
|
||||
checked_storage_medium == TStorageMedium::REMOTE_CACHE;
|
||||
}
|
||||
bool is_remote() const { return is_remote(storage_medium); }
|
||||
struct FileSystemProperties {
|
||||
TFileType::type system_type;
|
||||
std::map<std::string, std::string> properties;
|
||||
THdfsParams hdfs_params;
|
||||
std::vector<TNetworkAddress> broker_addresses;
|
||||
};
|
||||
|
||||
class FilePathDescStream {
|
||||
public:
|
||||
FilePathDescStream& operator<<(const FilePathDesc& val) {
|
||||
_filepath_stream << val.filepath;
|
||||
_storage_medium = val.storage_medium;
|
||||
_storage_name = val.storage_name;
|
||||
if (FilePathDesc::is_remote(_storage_medium)) {
|
||||
_remote_path_stream << val.remote_path;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
FilePathDescStream& operator<<(const std::string& val) {
|
||||
_filepath_stream << val;
|
||||
if (FilePathDesc::is_remote(_storage_medium)) {
|
||||
_remote_path_stream << val;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
FilePathDescStream& operator<<(uint64_t val) {
|
||||
_filepath_stream << val;
|
||||
if (FilePathDesc::is_remote(_storage_medium)) {
|
||||
_remote_path_stream << val;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
FilePathDescStream& operator<<(int64_t val) {
|
||||
_filepath_stream << val;
|
||||
if (FilePathDesc::is_remote(_storage_medium)) {
|
||||
_remote_path_stream << val;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
FilePathDescStream& operator<<(uint32_t val) {
|
||||
_filepath_stream << val;
|
||||
if (FilePathDesc::is_remote(_storage_medium)) {
|
||||
_remote_path_stream << val;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
FilePathDescStream& operator<<(int32_t val) {
|
||||
_filepath_stream << val;
|
||||
if (FilePathDesc::is_remote(_storage_medium)) {
|
||||
_remote_path_stream << val;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
FilePathDesc path_desc() {
|
||||
FilePathDesc path_desc(_filepath_stream.str());
|
||||
path_desc.storage_medium = _storage_medium;
|
||||
if (FilePathDesc::is_remote(_storage_medium)) {
|
||||
path_desc.remote_path = _remote_path_stream.str();
|
||||
}
|
||||
path_desc.storage_name = _storage_name;
|
||||
return path_desc;
|
||||
}
|
||||
|
||||
private:
|
||||
TStorageMedium::type _storage_medium = TStorageMedium::HDD;
|
||||
std::stringstream _filepath_stream;
|
||||
std::stringstream _remote_path_stream;
|
||||
std::string _storage_name;
|
||||
struct FileDescription {
|
||||
std::string path;
|
||||
int64_t start_offset;
|
||||
// length of the file in bytes.
|
||||
// -1 means unset.
|
||||
// If the file length is not set, the file length will be fetched from the file system.
|
||||
int64_t file_size = -1;
|
||||
// modification time of this file.
|
||||
// 0 means unset.
|
||||
int64_t mtime = 0;
|
||||
};
|
||||
|
||||
// read all data from file to string
|
||||
Status read_file_to_string(FileSystemSPtr fs, const Path& file, std::string* content);
|
||||
|
||||
} // namespace io
|
||||
} // namespace doris
|
||||
|
||||
@ -36,12 +36,29 @@ namespace doris {
|
||||
namespace io {
|
||||
|
||||
HdfsFileReader::HdfsFileReader(Path path, const std::string& name_node,
|
||||
FileHandleCache::Accessor accessor)
|
||||
: _path(std::move(path)), _name_node(name_node), _accessor(std::move(accessor)) {
|
||||
FileHandleCache::Accessor accessor, RuntimeProfile* profile)
|
||||
: _path(std::move(path)),
|
||||
_name_node(name_node),
|
||||
_accessor(std::move(accessor)),
|
||||
_profile(profile) {
|
||||
_handle = _accessor.get();
|
||||
|
||||
DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
|
||||
DorisMetrics::instance()->hdfs_file_reader_total->increment(1);
|
||||
if (_profile != nullptr) {
|
||||
#ifdef USE_HADOOP_HDFS
|
||||
const char* hdfs_profile_name = "HdfsIO";
|
||||
ADD_TIMER(_profile, hdfs_profile_name);
|
||||
_hdfs_profile.total_bytes_read =
|
||||
ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, hdfs_profile_name);
|
||||
_hdfs_profile.total_local_bytes_read =
|
||||
ADD_CHILD_COUNTER(_profile, "TotalLocalBytesRead", TUnit::BYTES, hdfs_profile_name);
|
||||
_hdfs_profile.total_short_circuit_bytes_read = ADD_CHILD_COUNTER(
|
||||
_profile, "TotalShortCircuitBytesRead", TUnit::BYTES, hdfs_profile_name);
|
||||
_hdfs_profile.total_total_zero_copy_bytes_read = ADD_CHILD_COUNTER(
|
||||
_profile, "TotalZeroCopyBytesRead", TUnit::BYTES, hdfs_profile_name);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
HdfsFileReader::~HdfsFileReader() {
|
||||
@ -52,6 +69,25 @@ Status HdfsFileReader::close() {
|
||||
bool expected = false;
|
||||
if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
|
||||
DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
|
||||
if (_profile != nullptr) {
|
||||
#ifdef USE_HADOOP_HDFS
|
||||
struct hdfsReadStatistics* hdfs_statistics = nullptr;
|
||||
auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics);
|
||||
if (r != 0) {
|
||||
return Status::InternalError(
|
||||
fmt::format("Failed to run hdfsFileGetReadStatistics(): {}", r));
|
||||
}
|
||||
COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead);
|
||||
COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read,
|
||||
hdfs_statistics->totalLocalBytesRead);
|
||||
COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
|
||||
hdfs_statistics->totalShortCircuitBytesRead);
|
||||
COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
|
||||
hdfs_statistics->totalZeroCopyBytesRead);
|
||||
hdfsFileFreeReadStatistics(hdfs_statistics);
|
||||
hdfsFileClearReadStatistics(_handle->file());
|
||||
#endif
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -38,7 +38,8 @@ class IOContext;
|
||||
|
||||
class HdfsFileReader : public FileReader {
|
||||
public:
|
||||
HdfsFileReader(Path path, const std::string& name_node, FileHandleCache::Accessor accessor);
|
||||
HdfsFileReader(Path path, const std::string& name_node, FileHandleCache::Accessor accessor,
|
||||
RuntimeProfile* profile);
|
||||
|
||||
~HdfsFileReader() override;
|
||||
|
||||
@ -57,11 +58,24 @@ protected:
|
||||
const IOContext* io_ctx) override;
|
||||
|
||||
private:
|
||||
#ifdef USE_HADOOP_HDFS
|
||||
struct HDFSProfile {
|
||||
RuntimeProfile::Counter* total_bytes_read;
|
||||
RuntimeProfile::Counter* total_local_bytes_read;
|
||||
RuntimeProfile::Counter* total_short_circuit_bytes_read;
|
||||
RuntimeProfile::Counter* total_total_zero_copy_bytes_read;
|
||||
};
|
||||
#endif
|
||||
|
||||
Path _path;
|
||||
const std::string& _name_node;
|
||||
FileHandleCache::Accessor _accessor;
|
||||
CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle
|
||||
std::atomic<bool> _closed = false;
|
||||
RuntimeProfile* _profile;
|
||||
#ifdef USE_HADOOP_HDFS
|
||||
HDFSProfile _hdfs_profile;
|
||||
#endif
|
||||
};
|
||||
} // namespace io
|
||||
} // namespace doris
|
||||
|
||||
@ -112,18 +112,13 @@ Status HdfsFileHandleCache::get_file(const std::shared_ptr<HdfsFileSystem>& fs,
|
||||
std::string fname = file.string();
|
||||
RETURN_IF_ERROR(HdfsFileHandleCache::instance()->cache().get_file_handle(
|
||||
fs->_fs_handle->hdfs_fs, fname, mtime, file_size, false, accessor, &cache_hit));
|
||||
// if (cache_hit) {
|
||||
// LOG(INFO) << "yy debug get from file handle cache: " << file.native();
|
||||
// } else {
|
||||
// LOG(INFO) << "yy debug get from file handle new: " << file.native();
|
||||
// }
|
||||
accessor->set_fs(fs);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string& path,
|
||||
std::shared_ptr<HdfsFileSystem>* fs) {
|
||||
RuntimeProfile* profile, std::shared_ptr<HdfsFileSystem>* fs) {
|
||||
#ifdef USE_HADOOP_HDFS
|
||||
if (!config::enable_java_support) {
|
||||
return Status::InternalError(
|
||||
@ -131,14 +126,16 @@ Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string&
|
||||
"true.");
|
||||
}
|
||||
#endif
|
||||
(*fs).reset(new HdfsFileSystem(hdfs_params, path));
|
||||
(*fs).reset(new HdfsFileSystem(hdfs_params, path, profile));
|
||||
return (*fs)->connect();
|
||||
}
|
||||
|
||||
HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path)
|
||||
HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path,
|
||||
RuntimeProfile* profile)
|
||||
: RemoteFileSystem(path, "", FileSystemType::HDFS),
|
||||
_hdfs_params(hdfs_params),
|
||||
_fs_handle(nullptr) {
|
||||
_fs_handle(nullptr),
|
||||
_profile(profile) {
|
||||
_namenode = _hdfs_params.fs_name;
|
||||
}
|
||||
|
||||
@ -165,17 +162,17 @@ Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer)
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status HdfsFileSystem::open_file_internal(const Path& file, int64_t file_size,
|
||||
Status HdfsFileSystem::open_file_internal(const FileDescription& fd, const Path& abs_path,
|
||||
FileReaderSPtr* reader) {
|
||||
CHECK_HDFS_HANDLE(_fs_handle);
|
||||
Path real_path = convert_path(file, _namenode);
|
||||
Path real_path = convert_path(abs_path, _namenode);
|
||||
|
||||
FileHandleCache::Accessor accessor;
|
||||
RETURN_IF_ERROR(HdfsFileHandleCache::instance()->get_file(
|
||||
std::static_pointer_cast<HdfsFileSystem>(shared_from_this()), real_path, 0, file_size,
|
||||
&accessor));
|
||||
std::static_pointer_cast<HdfsFileSystem>(shared_from_this()), real_path, fd.mtime,
|
||||
fd.file_size, &accessor));
|
||||
|
||||
*reader = std::make_shared<HdfsFileReader>(file, _namenode, std::move(accessor));
|
||||
*reader = std::make_shared<HdfsFileReader>(abs_path, _namenode, std::move(accessor), _profile);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -358,7 +355,9 @@ Status HdfsFileSystem::upload_with_checksum_impl(const Path& local, const Path&
|
||||
Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_file) {
|
||||
// 1. open remote file for read
|
||||
FileReaderSPtr hdfs_reader = nullptr;
|
||||
RETURN_IF_ERROR(open_file_internal(remote_file, -1, &hdfs_reader));
|
||||
FileDescription fd;
|
||||
fd.path = remote_file;
|
||||
RETURN_IF_ERROR(open_file_internal(fd, remote_file, &hdfs_reader));
|
||||
|
||||
// 2. remove the existing local file if exist
|
||||
if (std::filesystem::remove(local_file)) {
|
||||
@ -395,7 +394,9 @@ Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_
|
||||
Status HdfsFileSystem::direct_download_impl(const Path& remote_file, std::string* content) {
|
||||
// 1. open remote file for read
|
||||
FileReaderSPtr hdfs_reader = nullptr;
|
||||
RETURN_IF_ERROR(open_file_internal(remote_file, -1, &hdfs_reader));
|
||||
FileDescription fd;
|
||||
fd.path = remote_file;
|
||||
RETURN_IF_ERROR(open_file_internal(fd, remote_file, &hdfs_reader));
|
||||
|
||||
constexpr size_t buf_sz = 1024 * 1024;
|
||||
std::unique_ptr<char[]> read_buf(new char[buf_sz]);
|
||||
|
||||
@ -33,6 +33,7 @@
|
||||
#include "io/fs/hdfs.h"
|
||||
#include "io/fs/path.h"
|
||||
#include "io/fs/remote_file_system.h"
|
||||
#include "util/runtime_profile.h"
|
||||
|
||||
namespace doris {
|
||||
class THdfsParams;
|
||||
@ -111,7 +112,7 @@ class HdfsFileHandleCache;
|
||||
class HdfsFileSystem final : public RemoteFileSystem {
|
||||
public:
|
||||
static Status create(const THdfsParams& hdfs_params, const std::string& path,
|
||||
std::shared_ptr<HdfsFileSystem>* fs);
|
||||
RuntimeProfile* profile, std::shared_ptr<HdfsFileSystem>* fs);
|
||||
|
||||
~HdfsFileSystem() override;
|
||||
|
||||
@ -122,7 +123,8 @@ public:
|
||||
protected:
|
||||
Status connect_impl() override;
|
||||
Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
|
||||
Status open_file_internal(const Path& file, int64_t file_size, FileReaderSPtr* reader) override;
|
||||
Status open_file_internal(const FileDescription& fd, const Path& abs_path,
|
||||
FileReaderSPtr* reader) override;
|
||||
Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override;
|
||||
Status delete_file_impl(const Path& file) override;
|
||||
Status delete_directory_impl(const Path& dir) override;
|
||||
@ -148,12 +150,14 @@ private:
|
||||
|
||||
private:
|
||||
friend class HdfsFileWriter;
|
||||
HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path);
|
||||
HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path,
|
||||
RuntimeProfile* profile);
|
||||
const THdfsParams& _hdfs_params;
|
||||
std::string _namenode;
|
||||
// do not use std::shared_ptr or std::unique_ptr
|
||||
// _fs_handle is managed by HdfsFileSystemCache
|
||||
HdfsFileSystemHandle* _fs_handle;
|
||||
RuntimeProfile* _profile;
|
||||
};
|
||||
} // namespace io
|
||||
} // namespace doris
|
||||
|
||||
@ -64,18 +64,20 @@ Status LocalFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalFileSystem::open_file_impl(const Path& file,
|
||||
Status LocalFileSystem::open_file_impl(const FileDescription& file_desc, const Path& abs_path,
|
||||
const FileReaderOptions& /*reader_options*/,
|
||||
FileReaderSPtr* reader) {
|
||||
int64_t fsize = 0;
|
||||
RETURN_IF_ERROR(file_size_impl(file, &fsize));
|
||||
int64_t fsize = file_desc.file_size;
|
||||
if (fsize <= 0) {
|
||||
RETURN_IF_ERROR(file_size_impl(abs_path, &fsize));
|
||||
}
|
||||
int fd = -1;
|
||||
RETRY_ON_EINTR(fd, open(file.c_str(), O_RDONLY));
|
||||
RETRY_ON_EINTR(fd, open(abs_path.c_str(), O_RDONLY));
|
||||
if (fd < 0) {
|
||||
return Status::IOError("failed to open {}: {}", file.native(), errno_to_str());
|
||||
return Status::IOError("failed to open {}: {}", abs_path.native(), errno_to_str());
|
||||
}
|
||||
*reader = std::make_shared<LocalFileReader>(
|
||||
std::move(file), fsize, fd,
|
||||
std::move(abs_path), fsize, fd,
|
||||
std::static_pointer_cast<LocalFileSystem>(shared_from_this()));
|
||||
return Status::OK();
|
||||
}
|
||||
@ -404,6 +406,22 @@ bool LocalFileSystem::contain_path(const Path& parent_, const Path& sub_) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Status LocalFileSystem::read_file_to_string(const Path& file, std::string* content) {
|
||||
FileReaderSPtr file_reader;
|
||||
FileDescription fd;
|
||||
fd.path = file.native();
|
||||
RETURN_IF_ERROR(open_file(fd, &file_reader));
|
||||
size_t file_size = file_reader->size();
|
||||
content->resize(file_size);
|
||||
size_t bytes_read = 0;
|
||||
RETURN_IF_ERROR(file_reader->read_at(0, {*content}, &bytes_read));
|
||||
if (bytes_read != file_size) {
|
||||
return Status::IOError("failed to read file {} to string. bytes read: {}, file size: {}",
|
||||
file.native(), bytes_read, file_size);
|
||||
}
|
||||
return file_reader->close();
|
||||
}
|
||||
|
||||
static std::shared_ptr<LocalFileSystem> local_fs = io::LocalFileSystem::create("");
|
||||
|
||||
const std::shared_ptr<LocalFileSystem>& global_local_filesystem() {
|
||||
|
||||
@ -69,10 +69,13 @@ public:
|
||||
// delete dir or file
|
||||
Status delete_directory_or_file(const Path& path);
|
||||
|
||||
// read local file and save content to "content"
|
||||
Status read_file_to_string(const Path& file, std::string* content);
|
||||
|
||||
protected:
|
||||
Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
|
||||
Status open_file_impl(const Path& file, const FileReaderOptions& reader_options,
|
||||
FileReaderSPtr* reader) override;
|
||||
Status open_file_impl(const FileDescription& file_desc, const Path& abs_path,
|
||||
const FileReaderOptions& reader_options, FileReaderSPtr* reader) override;
|
||||
Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override;
|
||||
Status delete_file_impl(const Path& file) override;
|
||||
Status delete_directory_impl(const Path& dir) override;
|
||||
|
||||
@ -72,10 +72,11 @@ Status RemoteFileSystem::connect() {
|
||||
FILESYSTEM_M(connect_impl());
|
||||
}
|
||||
|
||||
Status RemoteFileSystem::open_file_impl(const Path& path, const FileReaderOptions& reader_options,
|
||||
Status RemoteFileSystem::open_file_impl(const FileDescription& fd, const Path& abs_path,
|
||||
const FileReaderOptions& reader_options,
|
||||
FileReaderSPtr* reader) {
|
||||
FileReaderSPtr raw_reader;
|
||||
RETURN_IF_ERROR(open_file_internal(path, reader_options.file_size, &raw_reader));
|
||||
RETURN_IF_ERROR(open_file_internal(fd, abs_path, &raw_reader));
|
||||
switch (reader_options.cache_type) {
|
||||
case io::FileCachePolicy::NO_CACHE: {
|
||||
*reader = raw_reader;
|
||||
@ -83,7 +84,7 @@ Status RemoteFileSystem::open_file_impl(const Path& path, const FileReaderOption
|
||||
}
|
||||
case io::FileCachePolicy::SUB_FILE_CACHE:
|
||||
case io::FileCachePolicy::WHOLE_FILE_CACHE: {
|
||||
std::string cache_path = reader_options.path_policy.get_cache_path(path.native());
|
||||
std::string cache_path = reader_options.path_policy.get_cache_path(abs_path.native());
|
||||
io::FileCachePtr cache_reader = FileCacheManager::instance()->new_file_cache(
|
||||
cache_path, config::file_cache_alive_time_sec, raw_reader,
|
||||
reader_options.cache_type);
|
||||
@ -93,7 +94,7 @@ Status RemoteFileSystem::open_file_impl(const Path& path, const FileReaderOption
|
||||
}
|
||||
case io::FileCachePolicy::FILE_BLOCK_CACHE: {
|
||||
StringPiece str(raw_reader->path().native());
|
||||
std::string cache_path = reader_options.path_policy.get_cache_path(path.native());
|
||||
std::string cache_path = reader_options.path_policy.get_cache_path(abs_path.native());
|
||||
if (reader_options.has_cache_base_path) {
|
||||
// from query session variable: file_cache_base_path
|
||||
*reader = std::make_shared<CachedRemoteFileReader>(
|
||||
@ -101,7 +102,7 @@ Status RemoteFileSystem::open_file_impl(const Path& path, const FileReaderOption
|
||||
reader_options.modification_time);
|
||||
} else {
|
||||
*reader = std::make_shared<CachedRemoteFileReader>(std::move(raw_reader), cache_path,
|
||||
reader_options.modification_time);
|
||||
fd.mtime);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
@ -54,7 +54,8 @@ protected:
|
||||
/// connect to remote file system
|
||||
virtual Status connect_impl() = 0;
|
||||
|
||||
virtual Status open_file_impl(const Path& file, const FileReaderOptions& reader_options,
|
||||
virtual Status open_file_impl(const FileDescription& fd, const Path& abs_path,
|
||||
const FileReaderOptions& reader_options,
|
||||
FileReaderSPtr* reader) override;
|
||||
/// upload load_file to remote remote_file
|
||||
/// local_file should be an absolute path on local filesystem.
|
||||
@ -83,7 +84,7 @@ protected:
|
||||
|
||||
// The derived class should implement this method.
|
||||
// if file_size < 0, the file size should be fetched from file system
|
||||
virtual Status open_file_internal(const Path& file, int64_t file_size,
|
||||
virtual Status open_file_internal(const FileDescription& fd, const Path& abs_path,
|
||||
FileReaderSPtr* reader) = 0;
|
||||
};
|
||||
|
||||
|
||||
@ -134,13 +134,13 @@ Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status S3FileSystem::open_file_internal(const Path& file, int64_t file_size,
|
||||
Status S3FileSystem::open_file_internal(const FileDescription& fd, const Path& abs_path,
|
||||
FileReaderSPtr* reader) {
|
||||
int64_t fsize = file_size;
|
||||
int64_t fsize = fd.file_size;
|
||||
if (fsize < 0) {
|
||||
RETURN_IF_ERROR(file_size_impl(file, &fsize));
|
||||
RETURN_IF_ERROR(file_size_impl(abs_path, &fsize));
|
||||
}
|
||||
GET_KEY(key, file);
|
||||
GET_KEY(key, abs_path);
|
||||
auto fs_path = Path(_s3_conf.endpoint) / _s3_conf.bucket / key;
|
||||
*reader = std::make_shared<S3FileReader>(
|
||||
std::move(fs_path), fsize, std::move(key), _s3_conf.bucket,
|
||||
|
||||
@ -69,7 +69,8 @@ public:
|
||||
protected:
|
||||
Status connect_impl() override;
|
||||
Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
|
||||
Status open_file_internal(const Path& file, int64_t file_size, FileReaderSPtr* reader) override;
|
||||
Status open_file_internal(const FileDescription& fd, const Path& abs_path,
|
||||
FileReaderSPtr* reader) override;
|
||||
Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override;
|
||||
Status delete_file_impl(const Path& file) override;
|
||||
Status delete_directory_impl(const Path& dir) override;
|
||||
|
||||
@ -39,7 +39,6 @@
|
||||
#include "gutil/strings/substitute.h"
|
||||
#include "io/fs/file_reader_writer_fwd.h"
|
||||
#include "io/fs/file_writer.h"
|
||||
#include "io/fs/fs_utils.h"
|
||||
#include "io/fs/local_file_system.h"
|
||||
#include "io/fs/path.h"
|
||||
#include "io/fs/remote_file_system.h"
|
||||
@ -149,7 +148,7 @@ Status DataDir::read_cluster_id(const std::string& cluster_id_path, int32_t* clu
|
||||
if (exists) {
|
||||
std::string content;
|
||||
RETURN_IF_ERROR(
|
||||
io::read_file_to_string(io::global_local_filesystem(), cluster_id_path, &content));
|
||||
io::global_local_filesystem()->read_file_to_string(cluster_id_path, &content));
|
||||
if (content.size() > 0) {
|
||||
*cluster_id = std::stoi(content);
|
||||
} else {
|
||||
|
||||
@ -88,8 +88,6 @@ public:
|
||||
|
||||
bool is_ssd_disk() const { return _storage_medium == TStorageMedium::SSD; }
|
||||
|
||||
bool is_remote() const { return io::FilePathDesc::is_remote(_storage_medium); }
|
||||
|
||||
TStorageMedium::type storage_medium() const { return _storage_medium; }
|
||||
|
||||
void register_tablet(Tablet* tablet);
|
||||
|
||||
@ -78,15 +78,16 @@ Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t se
|
||||
const io::FileReaderOptions& reader_options,
|
||||
std::shared_ptr<Segment>* output) {
|
||||
io::FileReaderSPtr file_reader;
|
||||
io::FileDescription fd;
|
||||
fd.path = path;
|
||||
#ifndef BE_TEST
|
||||
RETURN_IF_ERROR(fs->open_file(path, reader_options, &file_reader));
|
||||
RETURN_IF_ERROR(fs->open_file(fd, reader_options, &file_reader));
|
||||
#else
|
||||
// be ut use local file reader instead of remote file reader while use remote cache
|
||||
if (!config::file_cache_type.empty()) {
|
||||
RETURN_IF_ERROR(
|
||||
io::global_local_filesystem()->open_file(path, reader_options, &file_reader));
|
||||
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(fd, reader_options, &file_reader));
|
||||
} else {
|
||||
RETURN_IF_ERROR(fs->open_file(path, reader_options, &file_reader));
|
||||
RETURN_IF_ERROR(fs->open_file(fd, reader_options, &file_reader));
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
@ -86,7 +86,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l
|
||||
} else if (TStorageBackendType::type::HDFS == type) {
|
||||
THdfsParams hdfs_params = parse_properties(_prop);
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
|
||||
_remote_fs = std::move(fs);
|
||||
} else if (TStorageBackendType::type::BROKER == type) {
|
||||
std::shared_ptr<io::BrokerFileSystem> fs;
|
||||
|
||||
@ -33,7 +33,6 @@
|
||||
#include "gutil/strings/numbers.h"
|
||||
#include "gutil/strings/split.h"
|
||||
#include "gutil/strings/substitute.h"
|
||||
#include "io/fs/fs_utils.h"
|
||||
#include "io/fs/local_file_system.h"
|
||||
|
||||
using std::string;
|
||||
@ -104,9 +103,8 @@ Status get_thread_stats(int64_t tid, ThreadStats* stats) {
|
||||
return Status::NotSupported("ThreadStats not supported");
|
||||
}
|
||||
std::string buf;
|
||||
RETURN_IF_ERROR(io::read_file_to_string(io::global_local_filesystem(),
|
||||
strings::Substitute("/proc/self/task/$0/stat", tid),
|
||||
&buf));
|
||||
RETURN_IF_ERROR(io::global_local_filesystem()->read_file_to_string(
|
||||
strings::Substitute("/proc/self/task/$0/stat", tid), &buf));
|
||||
return parse_stat(buf, nullptr, stats);
|
||||
}
|
||||
void disable_core_dumps() {
|
||||
|
||||
@ -44,13 +44,6 @@ std::string join_path_segments(const string& a, const string& b) {
|
||||
}
|
||||
}
|
||||
|
||||
FilePathDesc join_path_desc_segments(const FilePathDesc& path_desc, const string& b) {
|
||||
FilePathDesc seg_path_desc = path_desc;
|
||||
seg_path_desc.filepath = join_path_segments(path_desc.filepath, b);
|
||||
seg_path_desc.remote_path = join_path_segments(path_desc.remote_path, b);
|
||||
return seg_path_desc;
|
||||
}
|
||||
|
||||
std::vector<string> join_path_segments_v(const std::vector<string>& v, const string& s) {
|
||||
std::vector<string> out;
|
||||
for (const string& path : v) {
|
||||
|
||||
@ -21,13 +21,9 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "io/fs/fs_utils.h"
|
||||
|
||||
namespace doris {
|
||||
namespace path_util {
|
||||
|
||||
using doris::io::FilePathDesc;
|
||||
|
||||
// NOTE: The methods here are only related to path processing, do not involve
|
||||
// any file and IO operations.
|
||||
|
||||
@ -40,8 +36,6 @@ std::string join_path_segments(const std::string& a, const std::string& b);
|
||||
std::vector<std::string> join_path_segments_v(const std::vector<std::string>& v,
|
||||
const std::string& s);
|
||||
|
||||
FilePathDesc join_path_desc_segments(const FilePathDesc& path_desc, const std::string& b);
|
||||
|
||||
// Split a path into segments with the appropriate path separator.
|
||||
std::vector<std::string> split_path(const std::string& path);
|
||||
|
||||
|
||||
@ -47,14 +47,14 @@ void BlockSpillReader::_init_profile() {
|
||||
|
||||
Status BlockSpillReader::open() {
|
||||
std::shared_ptr<io::FileSystem> file_system;
|
||||
FileSystemProperties system_properties;
|
||||
io::FileSystemProperties system_properties;
|
||||
system_properties.system_type = TFileType::FILE_LOCAL;
|
||||
|
||||
FileDescription file_description;
|
||||
io::FileDescription file_description;
|
||||
file_description.path = file_path_;
|
||||
|
||||
io::FileReaderOptions reader_options = io::FileReaderOptions::DEFAULT;
|
||||
RETURN_IF_ERROR(FileFactory::create_file_reader(system_properties, file_description,
|
||||
&file_system, &file_reader_));
|
||||
reader_options, &file_system, &file_reader_));
|
||||
|
||||
size_t file_size = file_reader_->size();
|
||||
|
||||
|
||||
@ -170,11 +170,10 @@ Status CsvReader::init_reader(bool is_load) {
|
||||
_state->fragment_instance_id()));
|
||||
} else {
|
||||
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
|
||||
reader_options.modification_time =
|
||||
_range.__isset.modification_time ? _range.modification_time : 0;
|
||||
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
|
||||
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
|
||||
_profile, _system_properties, _file_description, &_file_system, &_file_reader,
|
||||
io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx,
|
||||
_profile, _system_properties, _file_description, reader_options, &_file_system,
|
||||
&_file_reader, io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
|
||||
io::PrefetchRange(_range.start_offset, _range.size)));
|
||||
}
|
||||
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
|
||||
@ -659,10 +658,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
|
||||
|
||||
_file_description.start_offset = start_offset;
|
||||
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
|
||||
reader_options.modification_time =
|
||||
_range.__isset.modification_time ? _range.modification_time : 0;
|
||||
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
|
||||
RETURN_IF_ERROR(FileFactory::create_file_reader(_system_properties, _file_description,
|
||||
&_file_system, &_file_reader, reader_options));
|
||||
reader_options, &_file_system, &_file_reader));
|
||||
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
|
||||
_params.file_type != TFileType::FILE_BROKER) {
|
||||
return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path);
|
||||
|
||||
@ -108,8 +108,8 @@ private:
|
||||
ScannerCounter* _counter;
|
||||
const TFileScanRangeParams& _params;
|
||||
const TFileRangeDesc& _range;
|
||||
FileSystemProperties _system_properties;
|
||||
FileDescription _file_description;
|
||||
io::FileSystemProperties _system_properties;
|
||||
io::FileDescription _file_description;
|
||||
const std::vector<SlotDescriptor*>& _file_slot_descs;
|
||||
// Only for query task, save the file slot to columns in block map.
|
||||
// eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
|
||||
|
||||
@ -382,11 +382,10 @@ Status NewJsonReader::_open_file_reader() {
|
||||
_state->fragment_instance_id()));
|
||||
} else {
|
||||
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
|
||||
reader_options.modification_time =
|
||||
_range.__isset.modification_time ? _range.modification_time : 0;
|
||||
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
|
||||
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
|
||||
_profile, _system_properties, _file_description, &_file_system, &_file_reader,
|
||||
io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx,
|
||||
_profile, _system_properties, _file_description, reader_options, &_file_system,
|
||||
&_file_reader, io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
|
||||
io::PrefetchRange(_range.start_offset, _range.size)));
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
@ -190,8 +190,8 @@ private:
|
||||
ScannerCounter* _counter;
|
||||
const TFileScanRangeParams& _params;
|
||||
const TFileRangeDesc& _range;
|
||||
FileSystemProperties _system_properties;
|
||||
FileDescription _file_description;
|
||||
io::FileSystemProperties _system_properties;
|
||||
io::FileDescription _file_description;
|
||||
const std::vector<SlotDescriptor*>& _file_slot_descs;
|
||||
|
||||
std::shared_ptr<io::FileSystem> _file_system;
|
||||
|
||||
@ -211,11 +211,11 @@ Status OrcReader::_create_file_reader() {
|
||||
if (_file_input_stream == nullptr) {
|
||||
io::FileReaderSPtr inner_reader;
|
||||
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
|
||||
reader_options.modification_time =
|
||||
_file_description.mtime =
|
||||
_scan_range.__isset.modification_time ? _scan_range.modification_time : 0;
|
||||
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
|
||||
_profile, _system_properties, _file_description, &_file_system, &inner_reader,
|
||||
io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx));
|
||||
_profile, _system_properties, _file_description, reader_options, &_file_system,
|
||||
&inner_reader, io::DelegateReader::AccessMode::RANDOM, _io_ctx));
|
||||
_file_input_stream.reset(new ORCFileInputStream(_scan_range.path, inner_reader,
|
||||
&_statistics, _io_ctx, _profile));
|
||||
}
|
||||
|
||||
@ -484,8 +484,8 @@ private:
|
||||
RuntimeState* _state = nullptr;
|
||||
const TFileScanRangeParams& _scan_params;
|
||||
const TFileRangeDesc& _scan_range;
|
||||
FileSystemProperties _system_properties;
|
||||
FileDescription _file_description;
|
||||
io::FileSystemProperties _system_properties;
|
||||
io::FileDescription _file_description;
|
||||
size_t _batch_size;
|
||||
int64_t _range_start_offset;
|
||||
int64_t _range_size;
|
||||
|
||||
@ -213,11 +213,11 @@ Status ParquetReader::_open_file() {
|
||||
SCOPED_RAW_TIMER(&_statistics.open_file_time);
|
||||
++_statistics.open_file_num;
|
||||
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
|
||||
reader_options.modification_time =
|
||||
_file_description.mtime =
|
||||
_scan_range.__isset.modification_time ? _scan_range.modification_time : 0;
|
||||
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
|
||||
_profile, _system_properties, _file_description, &_file_system, &_file_reader,
|
||||
io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx));
|
||||
_profile, _system_properties, _file_description, reader_options, &_file_system,
|
||||
&_file_reader, io::DelegateReader::AccessMode::RANDOM, _io_ctx));
|
||||
}
|
||||
if (_file_metadata == nullptr) {
|
||||
SCOPED_RAW_TIMER(&_statistics.parse_footer_time);
|
||||
|
||||
@ -212,8 +212,8 @@ private:
|
||||
RuntimeProfile* _profile;
|
||||
const TFileScanRangeParams& _scan_params;
|
||||
const TFileRangeDesc& _scan_range;
|
||||
FileSystemProperties _system_properties;
|
||||
FileDescription _file_description;
|
||||
io::FileSystemProperties _system_properties;
|
||||
io::FileDescription _file_description;
|
||||
std::shared_ptr<io::FileSystem> _file_system = nullptr;
|
||||
io::FileReaderSPtr _file_reader = nullptr;
|
||||
ObjLRUCache::CacheHandle _cache_handle;
|
||||
|
||||
@ -595,7 +595,7 @@ Status VFileResultWriter::_delete_dir() {
|
||||
case TStorageBackendType::HDFS: {
|
||||
THdfsParams hdfs_params = parse_properties(_file_opts->broker_properties);
|
||||
std::shared_ptr<io::HdfsFileSystem> hdfs_fs = nullptr;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &hdfs_fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &hdfs_fs));
|
||||
file_system = hdfs_fs;
|
||||
break;
|
||||
}
|
||||
|
||||
@ -75,7 +75,6 @@ TEST_F(PrimaryKeyIndexTest, builder) {
|
||||
EXPECT_TRUE(file_writer->close().ok());
|
||||
EXPECT_EQ(num_rows, builder.num_rows());
|
||||
|
||||
io::FilePathDesc path_desc(filename);
|
||||
PrimaryKeyIndexReader index_reader;
|
||||
io::FileReaderSPtr file_reader;
|
||||
EXPECT_TRUE(fs->open_file(filename, &file_reader).ok());
|
||||
|
||||
@ -206,9 +206,11 @@ protected:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status open_file_internal(const Path& file, int64_t file_size,
|
||||
Status open_file_internal(const io::FileDescription& fd, const Path& abs_path,
|
||||
io::FileReaderSPtr* reader) override {
|
||||
return _local_fs->open_file(get_remote_path(file), io::FileReaderOptions::DEFAULT, reader);
|
||||
io::FileDescription tmp_fd;
|
||||
tmp_fd.path = get_remote_path(abs_path);
|
||||
return _local_fs->open_file(tmp_fd, io::FileReaderOptions::DEFAULT, reader);
|
||||
}
|
||||
|
||||
Status connect_impl() override { return Status::OK(); }
|
||||
|
||||
Reference in New Issue
Block a user