diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt index 4096d2557b..402a90a800 100644 --- a/be/src/io/CMakeLists.txt +++ b/be/src/io/CMakeLists.txt @@ -41,6 +41,8 @@ set(IO_FILES fs/s3_file_reader.cpp fs/s3_file_system.cpp fs/s3_file_writer.cpp + fs/hdfs_file_system.cpp + fs/hdfs_file_reader.cpp cache/dummy_file_cache.cpp cache/file_cache.cpp cache/file_cache_manager.cpp diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h index e7d4fbbb88..d97148f4c5 100644 --- a/be/src/io/fs/file_system.h +++ b/be/src/io/fs/file_system.h @@ -37,6 +37,7 @@ using ResourceId = std::string; enum class FileSystemType : uint8_t { LOCAL, S3, + HDFS, }; class FileSystem { diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp new file mode 100644 index 0000000000..1b77e64238 --- /dev/null +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/fs/hdfs_file_reader.h" + +#include "io/fs/hdfs_file_system.h" +#include "service/backend_options.h" +#include "util/doris_metrics.h" +namespace doris { +namespace io { +HdfsFileReader::HdfsFileReader(Path path, size_t file_size, const std::string& name_node, + hdfsFile hdfs_file, HdfsFileSystem* fs) + : _path(std::move(path)), + _file_size(file_size), + _name_node(name_node), + _hdfs_file(hdfs_file), + _fs(fs) { + DorisMetrics::instance()->hdfs_file_open_reading->increment(1); + DorisMetrics::instance()->hdfs_file_reader_total->increment(1); +} + +HdfsFileReader::~HdfsFileReader() { + close(); +} + +Status HdfsFileReader::close() { + bool expected = false; + if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { + auto handle = _fs->get_handle(); + auto hdfs_fs = handle->hdfs_fs; + if (_hdfs_file != nullptr && hdfs_fs != nullptr) { + VLOG_NOTICE << "close hdfs file: " << _name_node << _path; + // If the hdfs file was valid, the memory associated with it will + // be freed at the end of this call, even if there was an I/O error + hdfsCloseFile(hdfs_fs, _hdfs_file); + } + + DorisMetrics::instance()->hdfs_file_open_reading->increment(-1); + } + return Status::OK(); +} + +Status HdfsFileReader::read_at(size_t offset, Slice result, const IOContext& /*io_ctx*/, + size_t* bytes_read) { + DCHECK(!closed()); + if (offset > _file_size) { + return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})", + offset, _file_size, _path.native()); + } + size_t bytes_req = result.size; + char* to = result.data; + bytes_req = std::min(bytes_req, _file_size - offset); + if (UNLIKELY(bytes_req == 0)) { + *bytes_read = 0; + return Status::OK(); + } + + auto handle = _fs->get_handle(); + int64_t has_read = 0; + while (has_read < bytes_req) { + int64_t loop_read = + hdfsRead(handle->hdfs_fs, _hdfs_file, to + has_read, bytes_req - has_read); + if (loop_read < 0) { + return Status::InternalError( + "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", + BackendOptions::get_localhost(), _name_node, _path.string(), + hdfsGetLastError()); + } + if (loop_read == 0) { + break; + } + has_read += loop_read; + } + *bytes_read = has_read; + return Status::OK(); +} +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h new file mode 100644 index 0000000000..94823fadd1 --- /dev/null +++ b/be/src/io/fs/hdfs_file_reader.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "io/fs/file_reader.h" +#include "io/fs/hdfs_file_system.h" +namespace doris { +namespace io { + +class HdfsFileReader : public FileReader { +public: + HdfsFileReader(Path path, size_t file_size, const std::string& name_node, hdfsFile hdfs_file, + HdfsFileSystem* fs); + + ~HdfsFileReader() override; + + Status close() override; + + Status read_at(size_t offset, Slice result, const IOContext& io_ctx, + size_t* bytes_read) override; + + const Path& path() const override { return _path; } + + size_t size() const override { return _file_size; } + + bool closed() const override { return _closed.load(std::memory_order_acquire); } + +private: + Path _path; + size_t _file_size; + const std::string& _name_node; + hdfsFile _hdfs_file; + HdfsFileSystem* _fs; + std::atomic _closed = false; +}; +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp new file mode 100644 index 0000000000..d219e0393a --- /dev/null +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/fs/hdfs_file_system.h" + +#include "gutil/hash/hash.h" +#include "io/fs/hdfs_file_reader.h" +#include "io/hdfs_builder.h" +#include "service/backend_options.h" + +namespace doris { +namespace io { + +#ifndef CHECK_HDFS_HANDLE +#define CHECK_HDFS_HANDLE(handle) \ + if (!handle) { \ + return Status::InternalError("init Hdfs handle error"); \ + } +#endif + +// Cache for HdfsFileSystemHandle +class HdfsFileSystemCache { +public: + static int MAX_CACHE_HANDLE; + + static HdfsFileSystemCache* instance() { + static HdfsFileSystemCache s_instance; + return &s_instance; + } + + HdfsFileSystemCache(const HdfsFileSystemCache&) = delete; + const HdfsFileSystemCache& operator=(const HdfsFileSystemCache&) = delete; + + // This function is thread-safe + Status get_connection(const THdfsParams& hdfs_params, HdfsFileSystemHandle** fs_handle); + +private: + std::mutex _lock; + std::unordered_map> _cache; + + HdfsFileSystemCache() = default; + + uint64 _hdfs_hash_code(const THdfsParams& hdfs_params); + Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs); + void _clean_invalid(); + void _clean_oldest(); +}; + +HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path) + : RemoteFileSystem(path, "", FileSystemType::HDFS), + _hdfs_params(hdfs_params), + _path(path), + _fs_handle(nullptr) { + _namenode = _hdfs_params.fs_name; + // if the format of _path is hdfs://ip:port/path, replace it to /path. + // path like hdfs://ip:port/path can't be used by libhdfs3. + if (_path.find(_namenode) != std::string::npos) { + _path = _path.substr(_namenode.size()); + } +} + +HdfsFileSystem::~HdfsFileSystem() { + if (_fs_handle && _fs_handle->from_cache) { + _fs_handle->dec_ref(); + } +} + +Status HdfsFileSystem::connect() { + RETURN_IF_ERROR(HdfsFileSystemCache::instance()->get_connection(_hdfs_params, &_fs_handle)); + if (!_fs_handle) { + return Status::InternalError("failed to init Hdfs handle with, please check hdfs params."); + } + return Status::OK(); +} + +Status HdfsFileSystem::create_file(const Path& /*path*/, FileWriterPtr* /*writer*/) { + // auto handle = get_handle(); + // CHECK_HDFS_HANDLE(handle); + // auto hdfs_file = hdfsOpenFile(handle->hdfs_fs, path.string().c_str(), O_WRONLY, 0, 0, 0); + // if (hdfs_file == nullptr) { + // return Status::InternalError("Failed to create file {}", path.string()); + // } + // hdfsCloseFile(handle->hdfs_fs, hdfs_file); + // return Status::OK(); + return Status::NotSupported("Currently not support to upload file to HDFS"); +} + +Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader) { + CHECK_HDFS_HANDLE(_fs_handle); + size_t file_len = -1; + RETURN_IF_ERROR(file_size(path, &file_len)); + auto hdfs_file = hdfsOpenFile(_fs_handle->hdfs_fs, path.string().c_str(), O_RDONLY, 0, 0, 0); + if (hdfs_file == nullptr) { + if (_fs_handle->from_cache) { + // hdfsFS may be disconnected if not used for a long time + _fs_handle->set_invalid(); + _fs_handle->dec_ref(); + // retry + RETURN_IF_ERROR(connect()); + hdfs_file = hdfsOpenFile(_fs_handle->hdfs_fs, path.string().c_str(), O_RDONLY, 0, 0, 0); + if (hdfs_file == nullptr) { + return Status::InternalError( + "open file failed. (BE: {}) namenode:{}, path:{}, err: {}", + BackendOptions::get_localhost(), _namenode, path.string(), + hdfsGetLastError()); + } + } else { + return Status::InternalError("open file failed. (BE: {}) namenode:{}, path:{}, err: {}", + BackendOptions::get_localhost(), _namenode, path.string(), + hdfsGetLastError()); + } + } + *reader = std::make_shared(path, file_len, _namenode, hdfs_file, this); + return Status::OK(); +} + +Status HdfsFileSystem::delete_file(const Path& path) { + CHECK_HDFS_HANDLE(_fs_handle); + // The recursive argument `is_recursive` is irrelevant if path is a file. + int is_recursive = 0; + int res = hdfsDelete(_fs_handle->hdfs_fs, path.string().c_str(), is_recursive); + if (res == -1) { + return Status::InternalError("Failed to delete file {}", path.string()); + } + return Status::OK(); +} + +Status HdfsFileSystem::create_directory(const Path& path) { + CHECK_HDFS_HANDLE(_fs_handle); + int res = hdfsCreateDirectory(_fs_handle->hdfs_fs, path.string().c_str()); + if (res == -1) { + return Status::InternalError("Failed to create directory {}", path.string()); + } + return Status::OK(); +} + +Status HdfsFileSystem::delete_directory(const Path& path) { + CHECK_HDFS_HANDLE(_fs_handle); + // delete in recursive mode + int is_recursive = 1; + int res = hdfsDelete(_fs_handle->hdfs_fs, path.string().c_str(), is_recursive); + if (res == -1) { + return Status::InternalError("Failed to delete directory {}", path.string()); + } + return Status::OK(); +} + +Status HdfsFileSystem::exists(const Path& path, bool* res) const { + CHECK_HDFS_HANDLE(_fs_handle); + *res = hdfsExists(_fs_handle->hdfs_fs, path.string().c_str()); + return Status::OK(); +} + +Status HdfsFileSystem::file_size(const Path& path, size_t* file_size) const { + CHECK_HDFS_HANDLE(_fs_handle); + hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handle->hdfs_fs, path.string().c_str()); + if (file_info == nullptr) { + return Status::InternalError("Failed to get file size of {}", path.string()); + } + *file_size = file_info->mSize; + hdfsFreeFileInfo(file_info, 1); + return Status::OK(); +} + +Status HdfsFileSystem::list(const Path& path, std::vector* files) { + CHECK_HDFS_HANDLE(_fs_handle); + int numEntries = 0; + hdfsFileInfo* file_info = + hdfsListDirectory(_fs_handle->hdfs_fs, path.string().c_str(), &numEntries); + if (file_info == nullptr) { + return Status::InternalError("Failed to list files/directors of {}", path.string()); + } + for (int idx = 0; idx < numEntries; ++idx) { + files->emplace_back(file_info[idx].mName); + } + hdfsFreeFileInfo(file_info, numEntries); + return Status::OK(); +} + +HdfsFileSystemHandle* HdfsFileSystem::get_handle() { + return _fs_handle; +} + +// ************* HdfsFileSystemCache ****************** +int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64; + +Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* fs) { + HDFSCommonBuilder builder = createHDFSBuilder(hdfs_params); + if (builder.is_need_kinit()) { + RETURN_IF_ERROR(builder.run_kinit()); + } + hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get()); + if (hdfs_fs == nullptr) { + return Status::InternalError("connect to hdfs failed. error: {}", hdfsGetLastError()); + } + *fs = hdfs_fs; + return Status::OK(); +} + +void HdfsFileSystemCache::_clean_invalid() { + std::vector removed_handle; + for (auto& item : _cache) { + if (item.second->invalid() && item.second->ref_cnt() == 0) { + removed_handle.emplace_back(item.first); + } + } + for (auto& handle : removed_handle) { + _cache.erase(handle); + } +} + +void HdfsFileSystemCache::_clean_oldest() { + uint64_t oldest_time = ULONG_MAX; + uint64 oldest = 0; + for (auto& item : _cache) { + if (item.second->ref_cnt() == 0 && item.second->last_access_time() < oldest_time) { + oldest_time = item.second->last_access_time(); + oldest = item.first; + } + } + _cache.erase(oldest); +} + +Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params, + HdfsFileSystemHandle** fs_handle) { + uint64 hash_code = _hdfs_hash_code(hdfs_params); + { + std::lock_guard l(_lock); + auto it = _cache.find(hash_code); + if (it != _cache.end()) { + HdfsFileSystemHandle* handle = it->second.get(); + if (handle->invalid()) { + hdfsFS hdfs_fs = nullptr; + RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs)); + *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false); + } else { + handle->inc_ref(); + *fs_handle = handle; + } + } else { + hdfsFS hdfs_fs = nullptr; + RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs)); + if (_cache.size() >= MAX_CACHE_HANDLE) { + _clean_invalid(); + _clean_oldest(); + } + if (_cache.size() < MAX_CACHE_HANDLE) { + std::unique_ptr handle = + std::make_unique(hdfs_fs, true); + handle->inc_ref(); + *fs_handle = handle.get(); + _cache[hash_code] = std::move(handle); + } else { + *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false); + } + } + } + return Status::OK(); +} + +uint64 HdfsFileSystemCache::_hdfs_hash_code(const THdfsParams& hdfs_params) { + uint64 hash_code = 0; + if (hdfs_params.__isset.fs_name) { + hash_code += Fingerprint(hdfs_params.fs_name); + } + if (hdfs_params.__isset.user) { + hash_code += Fingerprint(hdfs_params.user); + } + if (hdfs_params.__isset.hdfs_kerberos_principal) { + hash_code += Fingerprint(hdfs_params.hdfs_kerberos_principal); + } + if (hdfs_params.__isset.hdfs_kerberos_keytab) { + hash_code += Fingerprint(hdfs_params.hdfs_kerberos_keytab); + } + if (hdfs_params.__isset.hdfs_conf) { + std::map conf_map; + for (auto& conf : hdfs_params.hdfs_conf) { + conf_map[conf.key] = conf.value; + } + for (auto& conf : conf_map) { + hash_code += Fingerprint(conf.first); + hash_code += Fingerprint(conf.second); + } + } + return hash_code; +} +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h new file mode 100644 index 0000000000..99d442e431 --- /dev/null +++ b/be/src/io/fs/hdfs_file_system.h @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "io/fs/remote_file_system.h" +namespace doris { + +namespace io { + +class HdfsFileSystemHandle { +public: + HdfsFileSystemHandle(hdfsFS fs, bool cached) + : hdfs_fs(fs), from_cache(cached), _ref_cnt(0), _last_access_time(0), _invalid(false) {} + + ~HdfsFileSystemHandle() { + DCHECK(_ref_cnt == 0); + if (hdfs_fs != nullptr) { + // Even if there is an error, the resources associated with the hdfsFS will be freed. + hdfsDisconnect(hdfs_fs); + } + hdfs_fs = nullptr; + } + + int64_t last_access_time() { return _last_access_time; } + + void inc_ref() { + _ref_cnt++; + _last_access_time = _now(); + } + + void dec_ref() { + _ref_cnt--; + _last_access_time = _now(); + } + + int ref_cnt() { return _ref_cnt; } + + bool invalid() { return _invalid; } + + void set_invalid() { _invalid = true; } + + hdfsFS hdfs_fs; + // When cache is full, and all handlers are in use, HdfsFileSystemCache will return an uncached handler. + // Client should delete the handler in such case. + const bool from_cache; + +private: + // the number of referenced client + std::atomic _ref_cnt; + // HdfsFileSystemCache try to remove the oldest handler when the cache is full + std::atomic _last_access_time; + // Client will set invalid if error thrown, and HdfsFileSystemCache will not reuse this handler + std::atomic _invalid; + + uint64_t _now() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + } +}; + +class HdfsFileSystem final : public RemoteFileSystem { +public: + HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path); + ~HdfsFileSystem() override; + + Status create_file(const Path& path, FileWriterPtr* writer) override; + + Status open_file(const Path& path, FileReaderSPtr* reader) override; + + Status delete_file(const Path& path) override; + + Status create_directory(const Path& path) override; + + // Delete all files under path. + Status delete_directory(const Path& path) override; + + Status link_file(const Path& /*src*/, const Path& /*dest*/) override { + return Status::NotSupported("Not supported"); + } + + Status exists(const Path& path, bool* res) const override; + + Status file_size(const Path& path, size_t* file_size) const override; + + Status list(const Path& path, std::vector* files) override; + + Status upload(const Path& /*local_path*/, const Path& /*dest_path*/) override { + return Status::NotSupported("Currently not support to upload file to HDFS"); + } + + Status batch_upload(const std::vector& /*local_paths*/, + const std::vector& /*dest_paths*/) override { + return Status::NotSupported("Currently not support to batch upload file to HDFS"); + } + + Status connect() override; + + HdfsFileSystemHandle* get_handle(); + +private: + const THdfsParams& _hdfs_params; + std::string _namenode; + std::string _path; + // do not use std::shared_ptr or std::unique_ptr + // _fs_handle is managed by HdfsFileSystemCache + HdfsFileSystemHandle* _fs_handle; +}; +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index daddee5706..38b18d71a5 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -49,7 +49,7 @@ Status S3FileReader::close() { return Status::OK(); } -Status S3FileReader::read_at(size_t offset, Slice result, const IOContext& io_ctx, +Status S3FileReader::read_at(size_t offset, Slice result, const IOContext& /*io_ctx*/, size_t* bytes_read) { DCHECK(!closed()); if (offset > _file_size) { diff --git a/be/src/io/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp index d57c489691..8c0e77e136 100644 --- a/be/src/io/hdfs_file_reader.cpp +++ b/be/src/io/hdfs_file_reader.cpp @@ -66,7 +66,7 @@ Status HdfsFileReader::open() { } // if the format of _path is hdfs://ip:port/path, replace it to /path. // path like hdfs://ip:port/path can't be used by libhdfs3. - if (_path.find(_namenode) != _path.npos) { + if (_path.find(_namenode) != std::string::npos) { _path = _path.substr(_namenode.size()); } @@ -82,7 +82,7 @@ Status HdfsFileReader::open() { RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(_hdfs_params, &_fs_handle)); _hdfs_fs = _fs_handle->hdfs_fs; _hdfs_file = hdfsOpenFile(_hdfs_fs, _path.c_str(), O_RDONLY, 0, 0, 0); - if (_hdfs_fs == nullptr) { + if (_hdfs_file == nullptr) { return Status::InternalError( "open file failed. (BE: {}) namenode:{}, path:{}, err: {}", BackendOptions::get_localhost(), _namenode, _path, hdfsGetLastError()); diff --git a/be/src/io/hdfs_file_reader.h b/be/src/io/hdfs_file_reader.h index 95c09d8e01..87599841d1 100644 --- a/be/src/io/hdfs_file_reader.h +++ b/be/src/io/hdfs_file_reader.h @@ -88,21 +88,21 @@ public: return &s_instance; } + HdfsFsCache(const HdfsFsCache&) = delete; + const HdfsFsCache& operator=(const HdfsFsCache&) = delete; + // This function is thread-safe Status get_connection(THdfsParams& hdfs_params, HdfsFsHandle** fs_handle); private: - std::mutex _lock; - std::unordered_map> _cache; - HdfsFsCache() = default; - HdfsFsCache(const HdfsFsCache&) = delete; - const HdfsFsCache& operator=(const HdfsFsCache&) = delete; - uint64 _hdfs_hash_code(THdfsParams& hdfs_params); Status _create_fs(THdfsParams& hdfs_params, hdfsFS* fs); void _clean_invalid(); void _clean_oldest(); + + std::mutex _lock; + std::unordered_map> _cache; }; class HdfsFileReader : public FileReader { @@ -110,31 +110,29 @@ public: HdfsFileReader(const THdfsParams& hdfs_params, const std::string& path, int64_t start_offset); HdfsFileReader(const std::map& properties, const std::string& path, int64_t start_offset); - virtual ~HdfsFileReader(); + ~HdfsFileReader() override; - virtual Status open() override; + Status open() override; // Read content to 'buf', 'buf_len' is the max size of this buffer. // Return ok when read success, and 'buf_len' is set to size of read content // If reach to end of file, the eof is set to true. meanwhile 'buf_len' // is set to zero. - virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) override; - virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, - void* out) override; - virtual Status read_one_message(std::unique_ptr* buf, int64_t* length) override; - virtual int64_t size() override; - virtual Status seek(int64_t position) override; - virtual Status tell(int64_t* position) override; - virtual void close() override; - virtual bool closed() override; + Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) override; + Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; + Status read_one_message(std::unique_ptr* buf, int64_t* length) override; + int64_t size() override; + Status seek(int64_t position) override; + Status tell(int64_t* position) override; + void close() override; + bool closed() override; private: void _parse_properties(const std::map& prop); -private: THdfsParams _hdfs_params; - std::string _namenode = ""; - std::string _path = ""; + std::string _namenode; + std::string _path; int64_t _current_offset; int64_t _file_size; hdfsFS _hdfs_fs; diff --git a/be/src/io/hdfs_reader_writer.cpp b/be/src/io/hdfs_reader_writer.cpp index 9904c7d6ad..3cbe2f4436 100644 --- a/be/src/io/hdfs_reader_writer.cpp +++ b/be/src/io/hdfs_reader_writer.cpp @@ -41,5 +41,4 @@ Status HdfsReaderWriter::create_writer(const std::map& writer.reset(new HDFSWriter(properties, path)); return Status::OK(); } - } // namespace doris diff --git a/be/src/io/hdfs_reader_writer.h b/be/src/io/hdfs_reader_writer.h index 1083bc7ca9..3b27b22656 100644 --- a/be/src/io/hdfs_reader_writer.h +++ b/be/src/io/hdfs_reader_writer.h @@ -23,12 +23,7 @@ namespace doris { -// This class is used to create hdfs readers and writers. -// Because libhdfs3 does not support the arm64 environment, -// we use this class to shield the upper layer from the need to deal with the platform environment -// when creating a raeder or writer. -// -// If in the arm64 environment, creating a reader or writer through this class will return an error. +// TODO(ftw): This file should be deleted when new_file_factory.h replace file_factory.h class HdfsReaderWriter { public: static Status create_reader(const THdfsParams& hdfs_params, const std::string& path, @@ -40,5 +35,4 @@ public: static Status create_writer(const std::map& properties, const std::string& path, std::unique_ptr& writer); }; - } // namespace doris diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 51c52d4ab1..d3bebc7eb0 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -170,6 +170,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(upload_fail_count, MetricUnit::ROWSETS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_reader_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_reader_total, MetricUnit::FILESYSTEM); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(hdfs_file_reader_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_writer_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_writer_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_created_total, MetricUnit::FILESYSTEM); @@ -181,6 +182,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_bytes_written_total, MetricUnit::FILESYS DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_reading, MetricUnit::FILESYSTEM); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_reading, MetricUnit::FILESYSTEM); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(hdfs_file_open_reading, MetricUnit::FILESYSTEM); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_writing, MetricUnit::FILESYSTEM); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_writing, MetricUnit::FILESYSTEM); @@ -298,6 +300,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_reader_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_reader_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, hdfs_file_reader_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_writer_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_writer_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, file_created_total); @@ -308,6 +311,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_bytes_written_total); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_reading); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_reading); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, hdfs_file_open_reading); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_writing); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_writing); } diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index a14922f6e1..5fcb4d8ba3 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -154,6 +154,7 @@ public: // Metrics related with file reader/writer IntCounter* local_file_reader_total; IntCounter* s3_file_reader_total; + IntCounter* hdfs_file_reader_total; IntCounter* local_file_writer_total; IntCounter* s3_file_writer_total; IntCounter* file_created_total; @@ -164,6 +165,7 @@ public: IntCounter* s3_bytes_written_total; IntGauge* local_file_open_reading; IntGauge* s3_file_open_reading; + IntGauge* hdfs_file_open_reading; IntGauge* local_file_open_writing; IntGauge* s3_file_open_writing; diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp index f9b67e57d7..d3dbb90ca5 100644 --- a/be/src/util/s3_util.cpp +++ b/be/src/util/s3_util.cpp @@ -23,6 +23,7 @@ #include "common/config.h" #include "common/logging.h" +#include "s3_uri.h" namespace doris { @@ -169,7 +170,42 @@ std::shared_ptr ClientFactory::create(const S3Conf& s3_conf) } return std::make_shared( std::move(aws_cred), std::move(aws_config), - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never); + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + s3_conf.use_virtual_addressing); +} + +Status ClientFactory::convert_properties_to_s3_conf(const std::map& prop, + const S3URI& s3_uri, S3Conf* s3_conf) { + if (!is_s3_conf_valid(prop)) { + return Status::InvalidArgument("S3 properties are incorrect, please check properties."); + } + StringCaseMap properties(prop.begin(), prop.end()); + s3_conf->ak = properties.find(S3_AK)->second; + s3_conf->sk = properties.find(S3_SK)->second; + s3_conf->endpoint = properties.find(S3_ENDPOINT)->second; + s3_conf->region = properties.find(S3_REGION)->second; + + if (properties.find(S3_MAX_CONN_SIZE) != properties.end()) { + s3_conf->max_connections = std::atoi(properties.find(S3_MAX_CONN_SIZE)->second.c_str()); + } + if (properties.find(S3_REQUEST_TIMEOUT_MS) != properties.end()) { + s3_conf->request_timeout_ms = + std::atoi(properties.find(S3_REQUEST_TIMEOUT_MS)->second.c_str()); + } + if (properties.find(S3_CONN_TIMEOUT_MS) != properties.end()) { + s3_conf->connect_timeout_ms = + std::atoi(properties.find(S3_CONN_TIMEOUT_MS)->second.c_str()); + } + s3_conf->bucket = s3_uri.get_bucket(); + s3_conf->prefix = ""; + + // See https://sdk.amazonaws.com/cpp/api/LATEST/class_aws_1_1_s3_1_1_s3_client.html + s3_conf->use_virtual_addressing = true; + if (properties.find(USE_PATH_STYLE) != properties.end()) { + s3_conf->use_virtual_addressing = + properties.find(USE_PATH_STYLE)->second == "true" ? false : true; + } + return Status::OK(); } } // end namespace doris diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h index 74c723bbec..5ef7eb83a6 100644 --- a/be/src/util/s3_util.h +++ b/be/src/util/s3_util.h @@ -23,6 +23,8 @@ #include #include +#include "common/status.h" + namespace Aws { namespace S3 { class S3Client; @@ -31,6 +33,8 @@ class S3Client; namespace doris { +class S3URI; + const static std::string S3_AK = "AWS_ACCESS_KEY"; const static std::string S3_SK = "AWS_SECRET_KEY"; const static std::string S3_ENDPOINT = "AWS_ENDPOINT"; @@ -49,6 +53,7 @@ struct S3Conf { int max_connections = -1; int request_timeout_ms = -1; int connect_timeout_ms = -1; + bool use_virtual_addressing = true; std::string to_string() const; }; @@ -58,7 +63,8 @@ inline std::string S3Conf::to_string() const { ss << "ak: " << ak << ", sk: " << sk << ", endpoint: " << endpoint << ", region: " << region << ", bucket: " << bucket << ", prefix: " << prefix << ", max_connections: " << max_connections << ", request_timeout_ms: " << request_timeout_ms - << ", connect_timeout_ms: " << connect_timeout_ms; + << ", connect_timeout_ms: " << connect_timeout_ms + << ", use_virtual_addressing: " << use_virtual_addressing; return ss.str(); } @@ -76,6 +82,9 @@ public: static bool is_s3_conf_valid(const S3Conf& s3_conf); + static Status convert_properties_to_s3_conf(const std::map& prop, + const S3URI& s3_uri, S3Conf* s3_conf); + private: ClientFactory(); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 0cc83168ac..8dfd11ade4 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -281,7 +281,10 @@ set(VEC_FILES exec/format/orc/vorc_reader.cpp exec/format/json/new_json_reader.cpp exec/format/table/table_format_reader.cpp - exec/format/table/iceberg_reader.cpp) + exec/format/table/iceberg_reader.cpp + exec/format/file_reader/new_file_factory.cpp + exec/format/file_reader/new_plain_text_line_reader.cpp + ) add_library(Vec STATIC ${VEC_FILES} diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.cpp b/be/src/vec/exec/format/file_reader/new_file_factory.cpp new file mode 100644 index 0000000000..23d6fa4eab --- /dev/null +++ b/be/src/vec/exec/format/file_reader/new_file_factory.cpp @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/file_reader/new_file_factory.h" + +#include "io/broker_reader.h" +#include "io/broker_writer.h" +#include "io/buffered_reader.h" +#include "io/fs/file_system.h" +#include "io/fs/hdfs_file_system.h" +#include "io/fs/s3_file_system.h" +#include "io/hdfs_file_reader.h" +#include "io/hdfs_writer.h" +#include "io/local_file_reader.h" +#include "io/local_file_writer.h" +#include "io/s3_reader.h" +#include "io/s3_writer.h" +#include "runtime/exec_env.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "util/s3_util.h" + +namespace doris { + +Status NewFileFactory::create_file_writer(TFileType::type type, ExecEnv* env, + const std::vector& broker_addresses, + const std::map& properties, + const std::string& path, int64_t start_offset, + std::unique_ptr& file_writer) { + switch (type) { + case TFileType::FILE_LOCAL: { + file_writer.reset(new LocalFileWriter(path, start_offset)); + break; + } + case TFileType::FILE_BROKER: { + file_writer.reset(new BrokerWriter(env, broker_addresses, properties, path, start_offset)); + break; + } + case TFileType::FILE_S3: { + file_writer.reset(new S3Writer(properties, path, start_offset)); + break; + } + case TFileType::FILE_HDFS: { + RETURN_IF_ERROR(create_hdfs_writer( + const_cast&>(properties), path, file_writer)); + break; + } + default: + return Status::InternalError("unsupported file writer type: {}", std::to_string(type)); + } + + return Status::OK(); +} + +// ============================ +// broker scan node/unique ptr +Status NewFileFactory::create_file_reader(TFileType::type type, ExecEnv* env, + RuntimeProfile* profile, + const std::vector& broker_addresses, + const std::map& properties, + const TBrokerRangeDesc& range, int64_t start_offset, + std::unique_ptr& file_reader) { + FileReader* file_reader_ptr; + switch (type) { + case TFileType::FILE_LOCAL: { + file_reader_ptr = new LocalFileReader(range.path, start_offset); + break; + } + case TFileType::FILE_BROKER: { + file_reader_ptr = new BufferedReader( + profile, + new BrokerReader(env, broker_addresses, properties, range.path, start_offset, + range.__isset.file_size ? range.file_size : 0)); + break; + } + case TFileType::FILE_S3: { + file_reader_ptr = + new BufferedReader(profile, new S3Reader(properties, range.path, start_offset)); + break; + } + case TFileType::FILE_HDFS: { + FileReader* hdfs_reader = nullptr; + RETURN_IF_ERROR( + create_hdfs_reader(range.hdfs_params, range.path, start_offset, &hdfs_reader)); + file_reader_ptr = new BufferedReader(profile, hdfs_reader); + break; + } + default: + return Status::InternalError("unsupported file reader type: " + std::to_string(type)); + } + file_reader.reset(file_reader_ptr); + + return Status::OK(); +} + +// ============================ +// file scan node/unique ptr +Status NewFileFactory::create_file_reader(RuntimeProfile* /*profile*/, + const FileSystemProperties& system_properties, + const FileDescription& file_description, + std::unique_ptr* file_system, + io::FileReaderSPtr* file_reader) { + TFileType::type type = system_properties.system_type; + io::FileSystem* file_system_ptr = nullptr; + switch (type) { + case TFileType::FILE_S3: { + RETURN_IF_ERROR(create_s3_reader(system_properties.properties, file_description.path, + &file_system_ptr, file_reader)); + break; + } + case TFileType::FILE_HDFS: { + RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, file_description.path, + &file_system_ptr, file_reader)); + break; + } + default: + return Status::NotSupported("unsupported file reader type: {}", std::to_string(type)); + } + file_system->reset(file_system_ptr); + return Status::OK(); +} + +// file scan node/stream load pipe +Status NewFileFactory::create_pipe_reader(const TUniqueId& load_id, + std::shared_ptr& file_reader) { + file_reader = ExecEnv::GetInstance()->load_stream_mgr()->get(load_id); + if (!file_reader) { + return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); + } + return Status::OK(); +} + +Status NewFileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, + int64_t start_offset, FileReader** reader) { + *reader = new HdfsFileReader(hdfs_params, path, start_offset); + return Status::OK(); +} + +Status NewFileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, + io::FileSystem** hdfs_file_system, + io::FileReaderSPtr* reader) { + *hdfs_file_system = new io::HdfsFileSystem(hdfs_params, path); + (dynamic_cast(*hdfs_file_system))->connect(); + (*hdfs_file_system)->open_file(path, reader); + return Status::OK(); +} + +Status NewFileFactory::create_hdfs_writer(const std::map& properties, + const std::string& path, + std::unique_ptr& writer) { + writer.reset(new HDFSWriter(properties, path)); + return Status::OK(); +} + +Status NewFileFactory::create_s3_reader(const std::map& prop, + const std::string& path, io::FileSystem** s3_file_system, + io::FileReaderSPtr* reader) { + S3URI s3_uri(path); + if (!s3_uri.parse()) { + return Status::InvalidArgument("s3 uri is invalid: {}", path); + } + S3Conf s3_conf; + RETURN_IF_ERROR(ClientFactory::convert_properties_to_s3_conf(prop, s3_uri, &s3_conf)); + *s3_file_system = new io::S3FileSystem(s3_conf, ""); + (dynamic_cast(*s3_file_system))->connect(); + (*s3_file_system)->open_file(s3_uri.get_key(), reader); + return Status::OK(); +} +} // namespace doris diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.h b/be/src/vec/exec/format/file_reader/new_file_factory.h new file mode 100644 index 0000000000..82fc1fe6cf --- /dev/null +++ b/be/src/vec/exec/format/file_reader/new_file_factory.h @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "io/file_reader.h" +#include "io/file_writer.h" +#include "io/fs/file_reader.h" + +namespace doris { +namespace io { +class FileSystem; +} + +class ExecEnv; +class TNetworkAddress; +class RuntimeProfile; + +struct FileSystemProperties { + TFileType::type system_type; + std::map properties; + THdfsParams hdfs_params; + std::vector broker_addresses; +}; + +struct FileDescription { + std::string path; + int64_t start_offset; + size_t file_size; + size_t buffer_size; +}; + +class NewFileFactory { +public: + // Create FileWriter + static Status create_file_writer(TFileType::type type, ExecEnv* env, + const std::vector& broker_addresses, + const std::map& properties, + const std::string& path, int64_t start_offset, + std::unique_ptr& file_writer); + + /** + * Create FileReader for broker scan node related scanners and readers + */ + static Status create_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile, + const std::vector& broker_addresses, + const std::map& properties, + const TBrokerRangeDesc& range, int64_t start_offset, + std::unique_ptr& file_reader); + /** + * Create FileReader for file scan node rlated scanners and readers + * If buffer_size > 0, use BufferedReader to wrap the underlying FileReader; + * Otherwise, return the underlying FileReader directly. + */ + static Status create_file_reader(RuntimeProfile* profile, + const FileSystemProperties& system_properties, + const FileDescription& file_description, + std::unique_ptr* file_system, + io::FileReaderSPtr* file_reader); + + // Create FileReader for stream load pipe + static Status create_pipe_reader(const TUniqueId& load_id, + std::shared_ptr& file_reader); + + // TODO(ftw): should be delete after new_hdfs_file_reader ready + static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, + int64_t start_offset, FileReader** reader); + + static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, + io::FileSystem** hdfs_file_system, io::FileReaderSPtr* reader); + + // TODO(ftw): should be delete after new_hdfs_file_writer ready + static Status create_hdfs_writer(const std::map& properties, + const std::string& path, std::unique_ptr& writer); + + static Status create_s3_reader(const std::map& prop, + const std::string& path, io::FileSystem** s3_file_system, + io::FileReaderSPtr* reader); + + static TFileType::type convert_storage_type(TStorageBackendType::type type) { + switch (type) { + case TStorageBackendType::LOCAL: + return TFileType::FILE_LOCAL; + case TStorageBackendType::S3: + return TFileType::FILE_S3; + case TStorageBackendType::BROKER: + return TFileType::FILE_BROKER; + case TStorageBackendType::HDFS: + return TFileType::FILE_HDFS; + default: + LOG(FATAL) << "not match type to convert, from type:" << type; + } + __builtin_unreachable(); + } +}; +} // namespace doris diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp new file mode 100644 index 0000000000..56bc26c648 --- /dev/null +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp @@ -0,0 +1,348 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/file_reader/new_plain_text_line_reader.h" + +#include "common/status.h" +#include "exec/decompressor.h" +#include "io/fs/file_reader.h" +#include "olap/iterators.h" + +// INPUT_CHUNK must +// larger than 15B for correct lz4 file decompressing +// larger than 300B for correct lzo header decompressing +#define INPUT_CHUNK (2 * 1024 * 1024) +// #define INPUT_CHUNK (34) +#define OUTPUT_CHUNK (8 * 1024 * 1024) +// #define OUTPUT_CHUNK (32) +// leave these 2 size small for debugging + +namespace doris { + +NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile, io::FileReader* file_reader, + Decompressor* decompressor, size_t length, + const std::string& line_delimiter, + size_t line_delimiter_length, size_t current_offset) + : _profile(profile), + _file_reader(file_reader), + _decompressor(decompressor), + _min_length(length), + _total_read_bytes(0), + _line_delimiter(line_delimiter), + _line_delimiter_length(line_delimiter_length), + _input_buf(new uint8_t[INPUT_CHUNK]), + _input_buf_size(INPUT_CHUNK), + _input_buf_pos(0), + _input_buf_limit(0), + _output_buf(new uint8_t[OUTPUT_CHUNK]), + _output_buf_size(OUTPUT_CHUNK), + _output_buf_pos(0), + _output_buf_limit(0), + _file_eof(false), + _eof(false), + _stream_end(true), + _more_input_bytes(0), + _more_output_bytes(0), + _current_offset(current_offset), + _bytes_read_counter(nullptr), + _read_timer(nullptr), + _bytes_decompress_counter(nullptr), + _decompress_timer(nullptr) { + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + _bytes_decompress_counter = ADD_COUNTER(_profile, "BytesDecompressed", TUnit::BYTES); + _decompress_timer = ADD_TIMER(_profile, "DecompressTime"); +} + +NewPlainTextLineReader::~NewPlainTextLineReader() { + close(); +} + +void NewPlainTextLineReader::close() { + if (_input_buf != nullptr) { + delete[] _input_buf; + _input_buf = nullptr; + } + + if (_output_buf != nullptr) { + delete[] _output_buf; + _output_buf = nullptr; + } +} + +inline bool NewPlainTextLineReader::update_eof() { + if (done()) { + _eof = true; + } else if (_decompressor == nullptr && (_min_length >= 0 && _total_read_bytes >= _min_length)) { + _eof = true; + } + return _eof; +} + +uint8_t* NewPlainTextLineReader::update_field_pos_and_find_line_delimiter(const uint8_t* start, + size_t len) { + // TODO: meanwhile find and save field pos + return (uint8_t*)memmem(start, len, _line_delimiter.c_str(), _line_delimiter_length); +} + +// extend input buf if necessary only when _more_input_bytes > 0 +void NewPlainTextLineReader::extend_input_buf() { + DCHECK(_more_input_bytes > 0); + + // left capacity + size_t capacity = _input_buf_size - _input_buf_limit; + + // we want at least _more_input_bytes capacity left + do { + if (capacity >= _more_input_bytes) { + // enough + break; + } + + capacity = capacity + _input_buf_pos; + if (capacity >= _more_input_bytes) { + // move the read remaining to the beginning of the current input buf, + memmove(_input_buf, _input_buf + _input_buf_pos, input_buf_read_remaining()); + _input_buf_limit -= _input_buf_pos; + _input_buf_pos = 0; + break; + } + + while (_input_buf_size - input_buf_read_remaining() < _more_input_bytes) { + _input_buf_size = _input_buf_size * 2; + } + + uint8_t* new_input_buf = new uint8_t[_input_buf_size]; + memmove(new_input_buf, _input_buf + _input_buf_pos, input_buf_read_remaining()); + delete[] _input_buf; + + _input_buf = new_input_buf; + _input_buf_limit -= _input_buf_pos; + _input_buf_pos = 0; + } while (false); + + // LOG(INFO) << "extend input buf." + // << " input_buf_size: " << _input_buf_size + // << " input_buf_pos: " << _input_buf_pos + // << " input_buf_limit: " << _input_buf_limit; +} + +void NewPlainTextLineReader::extend_output_buf() { + // left capacity + size_t capacity = _output_buf_size - _output_buf_limit; + // we want at least 1024 bytes capacity left + size_t target = std::max(1024, capacity + _more_output_bytes); + + do { + // 1. if left capacity is enough, return; + if (capacity >= target) { + break; + } + + // 2. try reuse buf + capacity = capacity + _output_buf_pos; + if (capacity >= target) { + // move the read remaining to the beginning of the current output buf, + memmove(_output_buf, _output_buf + _output_buf_pos, output_buf_read_remaining()); + _output_buf_limit -= _output_buf_pos; + _output_buf_pos = 0; + break; + } + + // 3. extend buf size to meet the target + while (_output_buf_size - output_buf_read_remaining() < target) { + _output_buf_size = _output_buf_size * 2; + } + + uint8_t* new_output_buf = new uint8_t[_output_buf_size]; + memmove(new_output_buf, _output_buf + _output_buf_pos, output_buf_read_remaining()); + delete[] _output_buf; + + _output_buf = new_output_buf; + _output_buf_limit -= _output_buf_pos; + _output_buf_pos = 0; + } while (false); + + // LOG(INFO) << "extend output buf." + // << " output_buf_size: " << _output_buf_size + // << " output_buf_pos: " << _output_buf_pos + // << " output_buf_limit: " << _output_buf_limit; +} + +Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* eof) { + if (_eof || update_eof()) { + *size = 0; + *eof = true; + return Status::OK(); + } + int found_line_delimiter = 0; + size_t offset = 0; + while (!done()) { + // find line delimiter in current decompressed data + uint8_t* cur_ptr = _output_buf + _output_buf_pos; + uint8_t* pos = update_field_pos_and_find_line_delimiter( + cur_ptr + offset, output_buf_read_remaining() - offset); + + if (pos == nullptr) { + // didn't find line delimiter, read more data from decompressor + // for multi bytes delimiter we cannot set offset to avoid incomplete + // delimiter + // read from file reader + offset = output_buf_read_remaining(); + extend_output_buf(); + if ((_input_buf_limit > _input_buf_pos) && _more_input_bytes == 0) { + // we still have data in input which is not decompressed. + // and no more data is required for input + } else { + size_t read_len = 0; + int64_t buffer_len = 0; + uint8_t* file_buf; + + if (_decompressor == nullptr) { + // uncompressed file, read directly into output buf + file_buf = _output_buf + _output_buf_limit; + buffer_len = _output_buf_size - _output_buf_limit; + } else { + // MARK + if (_more_input_bytes > 0) { + // we already extend input buf. + // current data in input buf should remain unchanged + file_buf = _input_buf + _input_buf_limit; + buffer_len = _input_buf_size - _input_buf_limit; + // leave input pos and limit unchanged + } else { + // here we are sure that all data in input buf has been consumed. + // which means input pos and limit should be reset. + file_buf = _input_buf; + buffer_len = _input_buf_size; + // reset input pos and limit + _input_buf_pos = 0; + _input_buf_limit = 0; + } + } + + { + SCOPED_TIMER(_read_timer); + Slice file_slice(file_buf, buffer_len); + IOContext io_ctx; + RETURN_IF_ERROR( + _file_reader->read_at(_current_offset, file_slice, io_ctx, &read_len)); + COUNTER_UPDATE(_bytes_read_counter, read_len); + } + // LOG(INFO) << "after read file: _file_eof: " << _file_eof << " read_len: " << read_len; + if (_file_eof || read_len == 0) { + if (!_stream_end) { + return Status::InternalError( + "Compressed file has been truncated, which is not allowed"); + } else { + // last loop we meet stream end, + // and now we finished reading file, so we are finished + // break this loop to see if there is data in buffer + break; + } + } + + if (_decompressor == nullptr) { + _output_buf_limit += read_len; + _stream_end = true; + } else { + // only update input limit. + // input pos is set at MARK step + _input_buf_limit += read_len; + } + + if (read_len < _more_input_bytes) { + // we failed to read enough data, continue to read from file + _more_input_bytes = _more_input_bytes - read_len; + continue; + } + } + + if (_decompressor != nullptr) { + SCOPED_TIMER(_decompress_timer); + // decompress + size_t input_read_bytes = 0; + size_t decompressed_len = 0; + _more_input_bytes = 0; + _more_output_bytes = 0; + RETURN_IF_ERROR(_decompressor->decompress( + _input_buf + _input_buf_pos, /* input */ + _input_buf_limit - _input_buf_pos, /* input_len */ + &input_read_bytes, _output_buf + _output_buf_limit, /* output */ + _output_buf_size - _output_buf_limit, /* output_max_len */ + &decompressed_len, &_stream_end, &_more_input_bytes, &_more_output_bytes)); + + // LOG(INFO) << "after decompress:" + // << " stream_end: " << _stream_end + // << " input_read_bytes: " << input_read_bytes + // << " decompressed_len: " << decompressed_len + // << " more_input_bytes: " << _more_input_bytes + // << " more_output_bytes: " << _more_output_bytes; + + // update pos and limit + _input_buf_pos += input_read_bytes; + _output_buf_limit += decompressed_len; + COUNTER_UPDATE(_bytes_decompress_counter, decompressed_len); + + // TODO(cmy): watch this case + if ((input_read_bytes == 0 /*decompressed_len == 0*/) && _more_input_bytes == 0 && + _more_output_bytes == 0) { + // decompress made no progress, may be + // A. input data is not enough to decompress data to output + // B. output buf is too small to save decompressed output + // this is very unlikely to happen + // print the log and just go to next loop to read more data or extend output buf. + + // (cmy), for now, return failed to avoid potential endless loop + std::stringstream ss; + ss << "decompress made no progress." + << " input_read_bytes: " << input_read_bytes + << " decompressed_len: " << decompressed_len; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + + if (_more_input_bytes > 0) { + extend_input_buf(); + } + } + } else { + // we found a complete line + // ready to return + offset = pos - cur_ptr; + found_line_delimiter = _line_delimiter_length; + break; + } + } // while (!done()) + + *ptr = _output_buf + _output_buf_pos; + *size = offset; + + // Skip offset and _line_delimiter size; + _output_buf_pos += offset + found_line_delimiter; + if (offset == 0 && found_line_delimiter == 0) { + *eof = true; + } else { + *eof = false; + } + + // update total read bytes + _total_read_bytes += *size + found_line_delimiter; + + return Status::OK(); +} +} // namespace doris diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h new file mode 100644 index 0000000000..a39e578577 --- /dev/null +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/line_reader.h" +#include "util/runtime_profile.h" + +namespace doris { +namespace io { +class FileReader; +} + +class Decompressor; +class Status; + +class NewPlainTextLineReader : public LineReader { +public: + NewPlainTextLineReader(RuntimeProfile* profile, io::FileReader* file_reader, + Decompressor* decompressor, size_t length, + const std::string& line_delimiter, size_t line_delimiter_length, + size_t current_offset); + + ~NewPlainTextLineReader() override; + + Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override; + + void close() override; + +private: + bool update_eof(); + + size_t output_buf_read_remaining() const { return _output_buf_limit - _output_buf_pos; } + + size_t input_buf_read_remaining() const { return _input_buf_limit - _input_buf_pos; } + + bool done() { return _file_eof && output_buf_read_remaining() == 0; } + + // find line delimiter from 'start' to 'start' + len, + // return line delimiter pos if found, otherwise return nullptr. + // TODO: + // save to positions of field separator + uint8_t* update_field_pos_and_find_line_delimiter(const uint8_t* start, size_t len); + + void extend_input_buf(); + void extend_output_buf(); + + RuntimeProfile* _profile; + io::FileReader* _file_reader; + Decompressor* _decompressor; + // the min length that should be read. + // -1 means endless(for stream load) + // and only valid if the content is uncompressed + size_t _min_length; + size_t _total_read_bytes; + std::string _line_delimiter; + size_t _line_delimiter_length; + + // save the data read from file reader + uint8_t* _input_buf; + size_t _input_buf_size; + size_t _input_buf_pos; + size_t _input_buf_limit; + + // save the data decompressed from decompressor. + uint8_t* _output_buf; + size_t _output_buf_size; + size_t _output_buf_pos; + size_t _output_buf_limit; + + bool _file_eof; + bool _eof; + bool _stream_end; + size_t _more_input_bytes; + size_t _more_output_bytes; + + size_t _current_offset; + + // Profile counters + RuntimeProfile::Counter* _bytes_read_counter; + RuntimeProfile::Counter* _read_timer; + RuntimeProfile::Counter* _bytes_decompress_counter; + RuntimeProfile::Counter* _decompress_timer; +}; +} // namespace doris