[Enhancement](multi-catalog) Add hdfs read statistics profile. (#21442)
Add hdfs read statistics profile.
```
- HdfsIO: 0ns
- TotalBytesRead: 133.47 MB
- TotalLocalBytesRead: 133.47 MB
- TotalShortCircuitBytesRead: 133.47 MB
- TotalZeroCopyBytesRead: 0.00
```
This commit is contained in:
@ -97,7 +97,7 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
|
||||
case TFileType::FILE_HDFS: {
|
||||
THdfsParams hdfs_params = parse_properties(properties);
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
|
||||
RETURN_IF_ERROR(fs->create_file(path, &file_writer));
|
||||
break;
|
||||
}
|
||||
@ -128,7 +128,7 @@ Status FileFactory::create_file_reader(const FileSystemProperties& system_proper
|
||||
}
|
||||
case TFileType::FILE_HDFS: {
|
||||
RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, file_description.path,
|
||||
file_system, file_reader, reader_options));
|
||||
file_system, file_reader, reader_options, profile));
|
||||
break;
|
||||
}
|
||||
case TFileType::FILE_BROKER: {
|
||||
@ -168,9 +168,10 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
|
||||
Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
|
||||
std::shared_ptr<io::FileSystem>* hdfs_file_system,
|
||||
io::FileReaderSPtr* reader,
|
||||
const io::FileReaderOptions& reader_options) {
|
||||
const io::FileReaderOptions& reader_options,
|
||||
RuntimeProfile* profile) {
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", profile, &fs));
|
||||
RETURN_IF_ERROR(fs->open_file(path, reader_options, reader));
|
||||
*hdfs_file_system = std::move(fs);
|
||||
return Status::OK();
|
||||
|
||||
@ -81,7 +81,8 @@ public:
|
||||
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
|
||||
std::shared_ptr<io::FileSystem>* hdfs_file_system,
|
||||
io::FileReaderSPtr* reader,
|
||||
const io::FileReaderOptions& reader_options);
|
||||
const io::FileReaderOptions& reader_options,
|
||||
RuntimeProfile* profile);
|
||||
|
||||
static Status create_s3_reader(const std::map<std::string, std::string>& prop,
|
||||
const std::string& path,
|
||||
|
||||
@ -50,8 +50,8 @@ public:
|
||||
io::FileReaderSPtr reader;
|
||||
io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
RETURN_IF_ERROR(
|
||||
FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, reader_opts));
|
||||
RETURN_IF_ERROR(FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader,
|
||||
reader_opts, nullptr));
|
||||
auto end = std::chrono::high_resolution_clock::now();
|
||||
auto elapsed_seconds =
|
||||
std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
|
||||
@ -94,7 +94,7 @@ public:
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
io::FileWriterPtr writer;
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
|
||||
RETURN_IF_ERROR(fs->create_file(file_path, &writer));
|
||||
return write(state, writer.get());
|
||||
}
|
||||
@ -115,7 +115,7 @@ public:
|
||||
auto new_file_path = file_path + "_new";
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
RETURN_IF_ERROR(fs->rename(file_path, new_file_path));
|
||||
@ -142,7 +142,7 @@ public:
|
||||
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
THdfsParams hdfs_params = parse_properties(_conf_map);
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
bool res = false;
|
||||
|
||||
@ -36,12 +36,29 @@ namespace doris {
|
||||
namespace io {
|
||||
|
||||
HdfsFileReader::HdfsFileReader(Path path, const std::string& name_node,
|
||||
FileHandleCache::Accessor accessor)
|
||||
: _path(std::move(path)), _name_node(name_node), _accessor(std::move(accessor)) {
|
||||
FileHandleCache::Accessor accessor, RuntimeProfile* profile)
|
||||
: _path(std::move(path)),
|
||||
_name_node(name_node),
|
||||
_accessor(std::move(accessor)),
|
||||
_profile(profile) {
|
||||
_handle = _accessor.get();
|
||||
|
||||
DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
|
||||
DorisMetrics::instance()->hdfs_file_reader_total->increment(1);
|
||||
if (_profile != nullptr) {
|
||||
#ifdef USE_HADOOP_HDFS
|
||||
const char* hdfs_profile_name = "HdfsIO";
|
||||
ADD_TIMER(_profile, hdfs_profile_name);
|
||||
_hdfs_profile.total_bytes_read =
|
||||
ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, hdfs_profile_name);
|
||||
_hdfs_profile.total_local_bytes_read =
|
||||
ADD_CHILD_COUNTER(_profile, "TotalLocalBytesRead", TUnit::BYTES, hdfs_profile_name);
|
||||
_hdfs_profile.total_short_circuit_bytes_read = ADD_CHILD_COUNTER(
|
||||
_profile, "TotalShortCircuitBytesRead", TUnit::BYTES, hdfs_profile_name);
|
||||
_hdfs_profile.total_total_zero_copy_bytes_read = ADD_CHILD_COUNTER(
|
||||
_profile, "TotalZeroCopyBytesRead", TUnit::BYTES, hdfs_profile_name);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
HdfsFileReader::~HdfsFileReader() {
|
||||
@ -52,6 +69,25 @@ Status HdfsFileReader::close() {
|
||||
bool expected = false;
|
||||
if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
|
||||
DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
|
||||
if (_profile != nullptr) {
|
||||
#ifdef USE_HADOOP_HDFS
|
||||
struct hdfsReadStatistics* hdfs_statistics = nullptr;
|
||||
auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics);
|
||||
if (r != 0) {
|
||||
return Status::InternalError(
|
||||
fmt::format("Failed to run hdfsFileGetReadStatistics(): {}", r));
|
||||
}
|
||||
COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead);
|
||||
COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read,
|
||||
hdfs_statistics->totalLocalBytesRead);
|
||||
COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
|
||||
hdfs_statistics->totalShortCircuitBytesRead);
|
||||
COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
|
||||
hdfs_statistics->totalZeroCopyBytesRead);
|
||||
hdfsFileFreeReadStatistics(hdfs_statistics);
|
||||
hdfsFileClearReadStatistics(_handle->file());
|
||||
#endif
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -38,7 +38,8 @@ class IOContext;
|
||||
|
||||
class HdfsFileReader : public FileReader {
|
||||
public:
|
||||
HdfsFileReader(Path path, const std::string& name_node, FileHandleCache::Accessor accessor);
|
||||
HdfsFileReader(Path path, const std::string& name_node, FileHandleCache::Accessor accessor,
|
||||
RuntimeProfile* profile);
|
||||
|
||||
~HdfsFileReader() override;
|
||||
|
||||
@ -57,11 +58,24 @@ protected:
|
||||
const IOContext* io_ctx) override;
|
||||
|
||||
private:
|
||||
#ifdef USE_HADOOP_HDFS
|
||||
struct HDFSProfile {
|
||||
RuntimeProfile::Counter* total_bytes_read;
|
||||
RuntimeProfile::Counter* total_local_bytes_read;
|
||||
RuntimeProfile::Counter* total_short_circuit_bytes_read;
|
||||
RuntimeProfile::Counter* total_total_zero_copy_bytes_read;
|
||||
};
|
||||
#endif
|
||||
|
||||
Path _path;
|
||||
const std::string& _name_node;
|
||||
FileHandleCache::Accessor _accessor;
|
||||
CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle
|
||||
std::atomic<bool> _closed = false;
|
||||
RuntimeProfile* _profile;
|
||||
#ifdef USE_HADOOP_HDFS
|
||||
HDFSProfile _hdfs_profile;
|
||||
#endif
|
||||
};
|
||||
} // namespace io
|
||||
} // namespace doris
|
||||
|
||||
@ -123,7 +123,7 @@ Status HdfsFileHandleCache::get_file(const std::shared_ptr<HdfsFileSystem>& fs,
|
||||
}
|
||||
|
||||
Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string& path,
|
||||
std::shared_ptr<HdfsFileSystem>* fs) {
|
||||
RuntimeProfile* profile, std::shared_ptr<HdfsFileSystem>* fs) {
|
||||
#ifdef USE_HADOOP_HDFS
|
||||
if (!config::enable_java_support) {
|
||||
return Status::InternalError(
|
||||
@ -131,14 +131,16 @@ Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string&
|
||||
"true.");
|
||||
}
|
||||
#endif
|
||||
(*fs).reset(new HdfsFileSystem(hdfs_params, path));
|
||||
(*fs).reset(new HdfsFileSystem(hdfs_params, path, profile));
|
||||
return (*fs)->connect();
|
||||
}
|
||||
|
||||
HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path)
|
||||
HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path,
|
||||
RuntimeProfile* profile)
|
||||
: RemoteFileSystem(path, "", FileSystemType::HDFS),
|
||||
_hdfs_params(hdfs_params),
|
||||
_fs_handle(nullptr) {
|
||||
_fs_handle(nullptr),
|
||||
_profile(profile) {
|
||||
_namenode = _hdfs_params.fs_name;
|
||||
}
|
||||
|
||||
@ -175,7 +177,7 @@ Status HdfsFileSystem::open_file_internal(const Path& file, int64_t file_size,
|
||||
std::static_pointer_cast<HdfsFileSystem>(shared_from_this()), real_path, 0, file_size,
|
||||
&accessor));
|
||||
|
||||
*reader = std::make_shared<HdfsFileReader>(file, _namenode, std::move(accessor));
|
||||
*reader = std::make_shared<HdfsFileReader>(file, _namenode, std::move(accessor), _profile);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -33,6 +33,7 @@
|
||||
#include "io/fs/hdfs.h"
|
||||
#include "io/fs/path.h"
|
||||
#include "io/fs/remote_file_system.h"
|
||||
#include "util/runtime_profile.h"
|
||||
|
||||
namespace doris {
|
||||
class THdfsParams;
|
||||
@ -111,7 +112,7 @@ class HdfsFileHandleCache;
|
||||
class HdfsFileSystem final : public RemoteFileSystem {
|
||||
public:
|
||||
static Status create(const THdfsParams& hdfs_params, const std::string& path,
|
||||
std::shared_ptr<HdfsFileSystem>* fs);
|
||||
RuntimeProfile* profile, std::shared_ptr<HdfsFileSystem>* fs);
|
||||
|
||||
~HdfsFileSystem() override;
|
||||
|
||||
@ -148,12 +149,14 @@ private:
|
||||
|
||||
private:
|
||||
friend class HdfsFileWriter;
|
||||
HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path);
|
||||
HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path,
|
||||
RuntimeProfile* profile);
|
||||
const THdfsParams& _hdfs_params;
|
||||
std::string _namenode;
|
||||
// do not use std::shared_ptr or std::unique_ptr
|
||||
// _fs_handle is managed by HdfsFileSystemCache
|
||||
HdfsFileSystemHandle* _fs_handle;
|
||||
RuntimeProfile* _profile;
|
||||
};
|
||||
} // namespace io
|
||||
} // namespace doris
|
||||
|
||||
@ -86,7 +86,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l
|
||||
} else if (TStorageBackendType::type::HDFS == type) {
|
||||
THdfsParams hdfs_params = parse_properties(_prop);
|
||||
std::shared_ptr<io::HdfsFileSystem> fs;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs));
|
||||
_remote_fs = std::move(fs);
|
||||
} else if (TStorageBackendType::type::BROKER == type) {
|
||||
std::shared_ptr<io::BrokerFileSystem> fs;
|
||||
|
||||
@ -595,7 +595,7 @@ Status VFileResultWriter::_delete_dir() {
|
||||
case TStorageBackendType::HDFS: {
|
||||
THdfsParams hdfs_params = parse_properties(_file_opts->broker_properties);
|
||||
std::shared_ptr<io::HdfsFileSystem> hdfs_fs = nullptr;
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &hdfs_fs));
|
||||
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &hdfs_fs));
|
||||
file_system = hdfs_fs;
|
||||
break;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user