[feature-wip](file-reader) Merge hdfs reader to the new file reader (#14875)

This commit is contained in:
Tiewei Fang
2022-12-09 13:21:59 +08:00
committed by GitHub
parent 20f2abb3d4
commit 00f44257e2
20 changed files with 1396 additions and 34 deletions

View File

@ -41,6 +41,8 @@ set(IO_FILES
fs/s3_file_reader.cpp
fs/s3_file_system.cpp
fs/s3_file_writer.cpp
fs/hdfs_file_system.cpp
fs/hdfs_file_reader.cpp
cache/dummy_file_cache.cpp
cache/file_cache.cpp
cache/file_cache_manager.cpp

View File

@ -37,6 +37,7 @@ using ResourceId = std::string;
enum class FileSystemType : uint8_t {
LOCAL,
S3,
HDFS,
};
class FileSystem {

View File

@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/hdfs_file_reader.h"
#include "io/fs/hdfs_file_system.h"
#include "service/backend_options.h"
#include "util/doris_metrics.h"
namespace doris {
namespace io {
HdfsFileReader::HdfsFileReader(Path path, size_t file_size, const std::string& name_node,
hdfsFile hdfs_file, HdfsFileSystem* fs)
: _path(std::move(path)),
_file_size(file_size),
_name_node(name_node),
_hdfs_file(hdfs_file),
_fs(fs) {
DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
DorisMetrics::instance()->hdfs_file_reader_total->increment(1);
}
HdfsFileReader::~HdfsFileReader() {
close();
}
Status HdfsFileReader::close() {
bool expected = false;
if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
auto handle = _fs->get_handle();
auto hdfs_fs = handle->hdfs_fs;
if (_hdfs_file != nullptr && hdfs_fs != nullptr) {
VLOG_NOTICE << "close hdfs file: " << _name_node << _path;
// If the hdfs file was valid, the memory associated with it will
// be freed at the end of this call, even if there was an I/O error
hdfsCloseFile(hdfs_fs, _hdfs_file);
}
DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
}
return Status::OK();
}
Status HdfsFileReader::read_at(size_t offset, Slice result, const IOContext& /*io_ctx*/,
size_t* bytes_read) {
DCHECK(!closed());
if (offset > _file_size) {
return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})",
offset, _file_size, _path.native());
}
size_t bytes_req = result.size;
char* to = result.data;
bytes_req = std::min(bytes_req, _file_size - offset);
if (UNLIKELY(bytes_req == 0)) {
*bytes_read = 0;
return Status::OK();
}
auto handle = _fs->get_handle();
int64_t has_read = 0;
while (has_read < bytes_req) {
int64_t loop_read =
hdfsRead(handle->hdfs_fs, _hdfs_file, to + has_read, bytes_req - has_read);
if (loop_read < 0) {
return Status::InternalError(
"Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
BackendOptions::get_localhost(), _name_node, _path.string(),
hdfsGetLastError());
}
if (loop_read == 0) {
break;
}
has_read += loop_read;
}
*bytes_read = has_read;
return Status::OK();
}
} // namespace io
} // namespace doris

View File

@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "io/fs/file_reader.h"
#include "io/fs/hdfs_file_system.h"
namespace doris {
namespace io {
class HdfsFileReader : public FileReader {
public:
HdfsFileReader(Path path, size_t file_size, const std::string& name_node, hdfsFile hdfs_file,
HdfsFileSystem* fs);
~HdfsFileReader() override;
Status close() override;
Status read_at(size_t offset, Slice result, const IOContext& io_ctx,
size_t* bytes_read) override;
const Path& path() const override { return _path; }
size_t size() const override { return _file_size; }
bool closed() const override { return _closed.load(std::memory_order_acquire); }
private:
Path _path;
size_t _file_size;
const std::string& _name_node;
hdfsFile _hdfs_file;
HdfsFileSystem* _fs;
std::atomic<bool> _closed = false;
};
} // namespace io
} // namespace doris

View File

@ -0,0 +1,302 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/hdfs_file_system.h"
#include "gutil/hash/hash.h"
#include "io/fs/hdfs_file_reader.h"
#include "io/hdfs_builder.h"
#include "service/backend_options.h"
namespace doris {
namespace io {
#ifndef CHECK_HDFS_HANDLE
#define CHECK_HDFS_HANDLE(handle) \
if (!handle) { \
return Status::InternalError("init Hdfs handle error"); \
}
#endif
// Cache for HdfsFileSystemHandle
class HdfsFileSystemCache {
public:
static int MAX_CACHE_HANDLE;
static HdfsFileSystemCache* instance() {
static HdfsFileSystemCache s_instance;
return &s_instance;
}
HdfsFileSystemCache(const HdfsFileSystemCache&) = delete;
const HdfsFileSystemCache& operator=(const HdfsFileSystemCache&) = delete;
// This function is thread-safe
Status get_connection(const THdfsParams& hdfs_params, HdfsFileSystemHandle** fs_handle);
private:
std::mutex _lock;
std::unordered_map<uint64, std::unique_ptr<HdfsFileSystemHandle>> _cache;
HdfsFileSystemCache() = default;
uint64 _hdfs_hash_code(const THdfsParams& hdfs_params);
Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs);
void _clean_invalid();
void _clean_oldest();
};
HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path)
: RemoteFileSystem(path, "", FileSystemType::HDFS),
_hdfs_params(hdfs_params),
_path(path),
_fs_handle(nullptr) {
_namenode = _hdfs_params.fs_name;
// if the format of _path is hdfs://ip:port/path, replace it to /path.
// path like hdfs://ip:port/path can't be used by libhdfs3.
if (_path.find(_namenode) != std::string::npos) {
_path = _path.substr(_namenode.size());
}
}
HdfsFileSystem::~HdfsFileSystem() {
if (_fs_handle && _fs_handle->from_cache) {
_fs_handle->dec_ref();
}
}
Status HdfsFileSystem::connect() {
RETURN_IF_ERROR(HdfsFileSystemCache::instance()->get_connection(_hdfs_params, &_fs_handle));
if (!_fs_handle) {
return Status::InternalError("failed to init Hdfs handle with, please check hdfs params.");
}
return Status::OK();
}
Status HdfsFileSystem::create_file(const Path& /*path*/, FileWriterPtr* /*writer*/) {
// auto handle = get_handle();
// CHECK_HDFS_HANDLE(handle);
// auto hdfs_file = hdfsOpenFile(handle->hdfs_fs, path.string().c_str(), O_WRONLY, 0, 0, 0);
// if (hdfs_file == nullptr) {
// return Status::InternalError("Failed to create file {}", path.string());
// }
// hdfsCloseFile(handle->hdfs_fs, hdfs_file);
// return Status::OK();
return Status::NotSupported("Currently not support to upload file to HDFS");
}
Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
CHECK_HDFS_HANDLE(_fs_handle);
size_t file_len = -1;
RETURN_IF_ERROR(file_size(path, &file_len));
auto hdfs_file = hdfsOpenFile(_fs_handle->hdfs_fs, path.string().c_str(), O_RDONLY, 0, 0, 0);
if (hdfs_file == nullptr) {
if (_fs_handle->from_cache) {
// hdfsFS may be disconnected if not used for a long time
_fs_handle->set_invalid();
_fs_handle->dec_ref();
// retry
RETURN_IF_ERROR(connect());
hdfs_file = hdfsOpenFile(_fs_handle->hdfs_fs, path.string().c_str(), O_RDONLY, 0, 0, 0);
if (hdfs_file == nullptr) {
return Status::InternalError(
"open file failed. (BE: {}) namenode:{}, path:{}, err: {}",
BackendOptions::get_localhost(), _namenode, path.string(),
hdfsGetLastError());
}
} else {
return Status::InternalError("open file failed. (BE: {}) namenode:{}, path:{}, err: {}",
BackendOptions::get_localhost(), _namenode, path.string(),
hdfsGetLastError());
}
}
*reader = std::make_shared<HdfsFileReader>(path, file_len, _namenode, hdfs_file, this);
return Status::OK();
}
Status HdfsFileSystem::delete_file(const Path& path) {
CHECK_HDFS_HANDLE(_fs_handle);
// The recursive argument `is_recursive` is irrelevant if path is a file.
int is_recursive = 0;
int res = hdfsDelete(_fs_handle->hdfs_fs, path.string().c_str(), is_recursive);
if (res == -1) {
return Status::InternalError("Failed to delete file {}", path.string());
}
return Status::OK();
}
Status HdfsFileSystem::create_directory(const Path& path) {
CHECK_HDFS_HANDLE(_fs_handle);
int res = hdfsCreateDirectory(_fs_handle->hdfs_fs, path.string().c_str());
if (res == -1) {
return Status::InternalError("Failed to create directory {}", path.string());
}
return Status::OK();
}
Status HdfsFileSystem::delete_directory(const Path& path) {
CHECK_HDFS_HANDLE(_fs_handle);
// delete in recursive mode
int is_recursive = 1;
int res = hdfsDelete(_fs_handle->hdfs_fs, path.string().c_str(), is_recursive);
if (res == -1) {
return Status::InternalError("Failed to delete directory {}", path.string());
}
return Status::OK();
}
Status HdfsFileSystem::exists(const Path& path, bool* res) const {
CHECK_HDFS_HANDLE(_fs_handle);
*res = hdfsExists(_fs_handle->hdfs_fs, path.string().c_str());
return Status::OK();
}
Status HdfsFileSystem::file_size(const Path& path, size_t* file_size) const {
CHECK_HDFS_HANDLE(_fs_handle);
hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handle->hdfs_fs, path.string().c_str());
if (file_info == nullptr) {
return Status::InternalError("Failed to get file size of {}", path.string());
}
*file_size = file_info->mSize;
hdfsFreeFileInfo(file_info, 1);
return Status::OK();
}
Status HdfsFileSystem::list(const Path& path, std::vector<Path>* files) {
CHECK_HDFS_HANDLE(_fs_handle);
int numEntries = 0;
hdfsFileInfo* file_info =
hdfsListDirectory(_fs_handle->hdfs_fs, path.string().c_str(), &numEntries);
if (file_info == nullptr) {
return Status::InternalError("Failed to list files/directors of {}", path.string());
}
for (int idx = 0; idx < numEntries; ++idx) {
files->emplace_back(file_info[idx].mName);
}
hdfsFreeFileInfo(file_info, numEntries);
return Status::OK();
}
HdfsFileSystemHandle* HdfsFileSystem::get_handle() {
return _fs_handle;
}
// ************* HdfsFileSystemCache ******************
int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64;
Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* fs) {
HDFSCommonBuilder builder = createHDFSBuilder(hdfs_params);
if (builder.is_need_kinit()) {
RETURN_IF_ERROR(builder.run_kinit());
}
hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
if (hdfs_fs == nullptr) {
return Status::InternalError("connect to hdfs failed. error: {}", hdfsGetLastError());
}
*fs = hdfs_fs;
return Status::OK();
}
void HdfsFileSystemCache::_clean_invalid() {
std::vector<uint64> removed_handle;
for (auto& item : _cache) {
if (item.second->invalid() && item.second->ref_cnt() == 0) {
removed_handle.emplace_back(item.first);
}
}
for (auto& handle : removed_handle) {
_cache.erase(handle);
}
}
void HdfsFileSystemCache::_clean_oldest() {
uint64_t oldest_time = ULONG_MAX;
uint64 oldest = 0;
for (auto& item : _cache) {
if (item.second->ref_cnt() == 0 && item.second->last_access_time() < oldest_time) {
oldest_time = item.second->last_access_time();
oldest = item.first;
}
}
_cache.erase(oldest);
}
Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params,
HdfsFileSystemHandle** fs_handle) {
uint64 hash_code = _hdfs_hash_code(hdfs_params);
{
std::lock_guard<std::mutex> l(_lock);
auto it = _cache.find(hash_code);
if (it != _cache.end()) {
HdfsFileSystemHandle* handle = it->second.get();
if (handle->invalid()) {
hdfsFS hdfs_fs = nullptr;
RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
*fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
} else {
handle->inc_ref();
*fs_handle = handle;
}
} else {
hdfsFS hdfs_fs = nullptr;
RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
if (_cache.size() >= MAX_CACHE_HANDLE) {
_clean_invalid();
_clean_oldest();
}
if (_cache.size() < MAX_CACHE_HANDLE) {
std::unique_ptr<HdfsFileSystemHandle> handle =
std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true);
handle->inc_ref();
*fs_handle = handle.get();
_cache[hash_code] = std::move(handle);
} else {
*fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
}
}
}
return Status::OK();
}
uint64 HdfsFileSystemCache::_hdfs_hash_code(const THdfsParams& hdfs_params) {
uint64 hash_code = 0;
if (hdfs_params.__isset.fs_name) {
hash_code += Fingerprint(hdfs_params.fs_name);
}
if (hdfs_params.__isset.user) {
hash_code += Fingerprint(hdfs_params.user);
}
if (hdfs_params.__isset.hdfs_kerberos_principal) {
hash_code += Fingerprint(hdfs_params.hdfs_kerberos_principal);
}
if (hdfs_params.__isset.hdfs_kerberos_keytab) {
hash_code += Fingerprint(hdfs_params.hdfs_kerberos_keytab);
}
if (hdfs_params.__isset.hdfs_conf) {
std::map<std::string, std::string> conf_map;
for (auto& conf : hdfs_params.hdfs_conf) {
conf_map[conf.key] = conf.value;
}
for (auto& conf : conf_map) {
hash_code += Fingerprint(conf.first);
hash_code += Fingerprint(conf.second);
}
}
return hash_code;
}
} // namespace io
} // namespace doris

View File

@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/PlanNodes_types.h>
#include <hdfs/hdfs.h>
#include "io/fs/remote_file_system.h"
namespace doris {
namespace io {
class HdfsFileSystemHandle {
public:
HdfsFileSystemHandle(hdfsFS fs, bool cached)
: hdfs_fs(fs), from_cache(cached), _ref_cnt(0), _last_access_time(0), _invalid(false) {}
~HdfsFileSystemHandle() {
DCHECK(_ref_cnt == 0);
if (hdfs_fs != nullptr) {
// Even if there is an error, the resources associated with the hdfsFS will be freed.
hdfsDisconnect(hdfs_fs);
}
hdfs_fs = nullptr;
}
int64_t last_access_time() { return _last_access_time; }
void inc_ref() {
_ref_cnt++;
_last_access_time = _now();
}
void dec_ref() {
_ref_cnt--;
_last_access_time = _now();
}
int ref_cnt() { return _ref_cnt; }
bool invalid() { return _invalid; }
void set_invalid() { _invalid = true; }
hdfsFS hdfs_fs;
// When cache is full, and all handlers are in use, HdfsFileSystemCache will return an uncached handler.
// Client should delete the handler in such case.
const bool from_cache;
private:
// the number of referenced client
std::atomic<int> _ref_cnt;
// HdfsFileSystemCache try to remove the oldest handler when the cache is full
std::atomic<uint64_t> _last_access_time;
// Client will set invalid if error thrown, and HdfsFileSystemCache will not reuse this handler
std::atomic<bool> _invalid;
uint64_t _now() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
}
};
class HdfsFileSystem final : public RemoteFileSystem {
public:
HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path);
~HdfsFileSystem() override;
Status create_file(const Path& path, FileWriterPtr* writer) override;
Status open_file(const Path& path, FileReaderSPtr* reader) override;
Status delete_file(const Path& path) override;
Status create_directory(const Path& path) override;
// Delete all files under path.
Status delete_directory(const Path& path) override;
Status link_file(const Path& /*src*/, const Path& /*dest*/) override {
return Status::NotSupported("Not supported");
}
Status exists(const Path& path, bool* res) const override;
Status file_size(const Path& path, size_t* file_size) const override;
Status list(const Path& path, std::vector<Path>* files) override;
Status upload(const Path& /*local_path*/, const Path& /*dest_path*/) override {
return Status::NotSupported("Currently not support to upload file to HDFS");
}
Status batch_upload(const std::vector<Path>& /*local_paths*/,
const std::vector<Path>& /*dest_paths*/) override {
return Status::NotSupported("Currently not support to batch upload file to HDFS");
}
Status connect() override;
HdfsFileSystemHandle* get_handle();
private:
const THdfsParams& _hdfs_params;
std::string _namenode;
std::string _path;
// do not use std::shared_ptr or std::unique_ptr
// _fs_handle is managed by HdfsFileSystemCache
HdfsFileSystemHandle* _fs_handle;
};
} // namespace io
} // namespace doris

View File

@ -49,7 +49,7 @@ Status S3FileReader::close() {
return Status::OK();
}
Status S3FileReader::read_at(size_t offset, Slice result, const IOContext& io_ctx,
Status S3FileReader::read_at(size_t offset, Slice result, const IOContext& /*io_ctx*/,
size_t* bytes_read) {
DCHECK(!closed());
if (offset > _file_size) {

View File

@ -66,7 +66,7 @@ Status HdfsFileReader::open() {
}
// if the format of _path is hdfs://ip:port/path, replace it to /path.
// path like hdfs://ip:port/path can't be used by libhdfs3.
if (_path.find(_namenode) != _path.npos) {
if (_path.find(_namenode) != std::string::npos) {
_path = _path.substr(_namenode.size());
}
@ -82,7 +82,7 @@ Status HdfsFileReader::open() {
RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(_hdfs_params, &_fs_handle));
_hdfs_fs = _fs_handle->hdfs_fs;
_hdfs_file = hdfsOpenFile(_hdfs_fs, _path.c_str(), O_RDONLY, 0, 0, 0);
if (_hdfs_fs == nullptr) {
if (_hdfs_file == nullptr) {
return Status::InternalError(
"open file failed. (BE: {}) namenode:{}, path:{}, err: {}",
BackendOptions::get_localhost(), _namenode, _path, hdfsGetLastError());

View File

@ -88,21 +88,21 @@ public:
return &s_instance;
}
HdfsFsCache(const HdfsFsCache&) = delete;
const HdfsFsCache& operator=(const HdfsFsCache&) = delete;
// This function is thread-safe
Status get_connection(THdfsParams& hdfs_params, HdfsFsHandle** fs_handle);
private:
std::mutex _lock;
std::unordered_map<uint64, std::unique_ptr<HdfsFsHandle>> _cache;
HdfsFsCache() = default;
HdfsFsCache(const HdfsFsCache&) = delete;
const HdfsFsCache& operator=(const HdfsFsCache&) = delete;
uint64 _hdfs_hash_code(THdfsParams& hdfs_params);
Status _create_fs(THdfsParams& hdfs_params, hdfsFS* fs);
void _clean_invalid();
void _clean_oldest();
std::mutex _lock;
std::unordered_map<uint64, std::unique_ptr<HdfsFsHandle>> _cache;
};
class HdfsFileReader : public FileReader {
@ -110,31 +110,29 @@ public:
HdfsFileReader(const THdfsParams& hdfs_params, const std::string& path, int64_t start_offset);
HdfsFileReader(const std::map<std::string, std::string>& properties, const std::string& path,
int64_t start_offset);
virtual ~HdfsFileReader();
~HdfsFileReader() override;
virtual Status open() override;
Status open() override;
// Read content to 'buf', 'buf_len' is the max size of this buffer.
// Return ok when read success, and 'buf_len' is set to size of read content
// If reach to end of file, the eof is set to true. meanwhile 'buf_len'
// is set to zero.
virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
virtual void close() override;
virtual bool closed() override;
Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) override;
Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override;
Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) override;
int64_t size() override;
Status seek(int64_t position) override;
Status tell(int64_t* position) override;
void close() override;
bool closed() override;
private:
void _parse_properties(const std::map<std::string, std::string>& prop);
private:
THdfsParams _hdfs_params;
std::string _namenode = "";
std::string _path = "";
std::string _namenode;
std::string _path;
int64_t _current_offset;
int64_t _file_size;
hdfsFS _hdfs_fs;

View File

@ -41,5 +41,4 @@ Status HdfsReaderWriter::create_writer(const std::map<std::string, std::string>&
writer.reset(new HDFSWriter(properties, path));
return Status::OK();
}
} // namespace doris

View File

@ -23,12 +23,7 @@
namespace doris {
// This class is used to create hdfs readers and writers.
// Because libhdfs3 does not support the arm64 environment,
// we use this class to shield the upper layer from the need to deal with the platform environment
// when creating a raeder or writer.
//
// If in the arm64 environment, creating a reader or writer through this class will return an error.
// TODO(ftw): This file should be deleted when new_file_factory.h replace file_factory.h
class HdfsReaderWriter {
public:
static Status create_reader(const THdfsParams& hdfs_params, const std::string& path,
@ -40,5 +35,4 @@ public:
static Status create_writer(const std::map<std::string, std::string>& properties,
const std::string& path, std::unique_ptr<FileWriter>& writer);
};
} // namespace doris

View File

@ -170,6 +170,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(upload_fail_count, MetricUnit::ROWSETS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_reader_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_reader_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(hdfs_file_reader_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_writer_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_writer_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_created_total, MetricUnit::FILESYSTEM);
@ -181,6 +182,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_bytes_written_total, MetricUnit::FILESYS
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_reading, MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_reading, MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(hdfs_file_open_reading, MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_writing, MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_writing, MetricUnit::FILESYSTEM);
@ -298,6 +300,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_reader_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_reader_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, hdfs_file_reader_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_writer_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_writer_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, file_created_total);
@ -308,6 +311,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_bytes_written_total);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_reading);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_reading);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, hdfs_file_open_reading);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_writing);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_writing);
}

View File

@ -154,6 +154,7 @@ public:
// Metrics related with file reader/writer
IntCounter* local_file_reader_total;
IntCounter* s3_file_reader_total;
IntCounter* hdfs_file_reader_total;
IntCounter* local_file_writer_total;
IntCounter* s3_file_writer_total;
IntCounter* file_created_total;
@ -164,6 +165,7 @@ public:
IntCounter* s3_bytes_written_total;
IntGauge* local_file_open_reading;
IntGauge* s3_file_open_reading;
IntGauge* hdfs_file_open_reading;
IntGauge* local_file_open_writing;
IntGauge* s3_file_open_writing;

View File

@ -23,6 +23,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "s3_uri.h"
namespace doris {
@ -169,7 +170,42 @@ std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(const S3Conf& s3_conf)
}
return std::make_shared<Aws::S3::S3Client>(
std::move(aws_cred), std::move(aws_config),
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never);
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
s3_conf.use_virtual_addressing);
}
Status ClientFactory::convert_properties_to_s3_conf(const std::map<std::string, std::string>& prop,
const S3URI& s3_uri, S3Conf* s3_conf) {
if (!is_s3_conf_valid(prop)) {
return Status::InvalidArgument("S3 properties are incorrect, please check properties.");
}
StringCaseMap<std::string> properties(prop.begin(), prop.end());
s3_conf->ak = properties.find(S3_AK)->second;
s3_conf->sk = properties.find(S3_SK)->second;
s3_conf->endpoint = properties.find(S3_ENDPOINT)->second;
s3_conf->region = properties.find(S3_REGION)->second;
if (properties.find(S3_MAX_CONN_SIZE) != properties.end()) {
s3_conf->max_connections = std::atoi(properties.find(S3_MAX_CONN_SIZE)->second.c_str());
}
if (properties.find(S3_REQUEST_TIMEOUT_MS) != properties.end()) {
s3_conf->request_timeout_ms =
std::atoi(properties.find(S3_REQUEST_TIMEOUT_MS)->second.c_str());
}
if (properties.find(S3_CONN_TIMEOUT_MS) != properties.end()) {
s3_conf->connect_timeout_ms =
std::atoi(properties.find(S3_CONN_TIMEOUT_MS)->second.c_str());
}
s3_conf->bucket = s3_uri.get_bucket();
s3_conf->prefix = "";
// See https://sdk.amazonaws.com/cpp/api/LATEST/class_aws_1_1_s3_1_1_s3_client.html
s3_conf->use_virtual_addressing = true;
if (properties.find(USE_PATH_STYLE) != properties.end()) {
s3_conf->use_virtual_addressing =
properties.find(USE_PATH_STYLE)->second == "true" ? false : true;
}
return Status::OK();
}
} // end namespace doris

View File

@ -23,6 +23,8 @@
#include <memory>
#include <string>
#include "common/status.h"
namespace Aws {
namespace S3 {
class S3Client;
@ -31,6 +33,8 @@ class S3Client;
namespace doris {
class S3URI;
const static std::string S3_AK = "AWS_ACCESS_KEY";
const static std::string S3_SK = "AWS_SECRET_KEY";
const static std::string S3_ENDPOINT = "AWS_ENDPOINT";
@ -49,6 +53,7 @@ struct S3Conf {
int max_connections = -1;
int request_timeout_ms = -1;
int connect_timeout_ms = -1;
bool use_virtual_addressing = true;
std::string to_string() const;
};
@ -58,7 +63,8 @@ inline std::string S3Conf::to_string() const {
ss << "ak: " << ak << ", sk: " << sk << ", endpoint: " << endpoint << ", region: " << region
<< ", bucket: " << bucket << ", prefix: " << prefix
<< ", max_connections: " << max_connections << ", request_timeout_ms: " << request_timeout_ms
<< ", connect_timeout_ms: " << connect_timeout_ms;
<< ", connect_timeout_ms: " << connect_timeout_ms
<< ", use_virtual_addressing: " << use_virtual_addressing;
return ss.str();
}
@ -76,6 +82,9 @@ public:
static bool is_s3_conf_valid(const S3Conf& s3_conf);
static Status convert_properties_to_s3_conf(const std::map<std::string, std::string>& prop,
const S3URI& s3_uri, S3Conf* s3_conf);
private:
ClientFactory();

View File

@ -281,7 +281,10 @@ set(VEC_FILES
exec/format/orc/vorc_reader.cpp
exec/format/json/new_json_reader.cpp
exec/format/table/table_format_reader.cpp
exec/format/table/iceberg_reader.cpp)
exec/format/table/iceberg_reader.cpp
exec/format/file_reader/new_file_factory.cpp
exec/format/file_reader/new_plain_text_line_reader.cpp
)
add_library(Vec STATIC
${VEC_FILES}

View File

@ -0,0 +1,182 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/format/file_reader/new_file_factory.h"
#include "io/broker_reader.h"
#include "io/broker_writer.h"
#include "io/buffered_reader.h"
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/s3_file_system.h"
#include "io/hdfs_file_reader.h"
#include "io/hdfs_writer.h"
#include "io/local_file_reader.h"
#include "io/local_file_writer.h"
#include "io/s3_reader.h"
#include "io/s3_writer.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "util/s3_util.h"
namespace doris {
Status NewFileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
const std::vector<TNetworkAddress>& broker_addresses,
const std::map<std::string, std::string>& properties,
const std::string& path, int64_t start_offset,
std::unique_ptr<FileWriter>& file_writer) {
switch (type) {
case TFileType::FILE_LOCAL: {
file_writer.reset(new LocalFileWriter(path, start_offset));
break;
}
case TFileType::FILE_BROKER: {
file_writer.reset(new BrokerWriter(env, broker_addresses, properties, path, start_offset));
break;
}
case TFileType::FILE_S3: {
file_writer.reset(new S3Writer(properties, path, start_offset));
break;
}
case TFileType::FILE_HDFS: {
RETURN_IF_ERROR(create_hdfs_writer(
const_cast<std::map<std::string, std::string>&>(properties), path, file_writer));
break;
}
default:
return Status::InternalError("unsupported file writer type: {}", std::to_string(type));
}
return Status::OK();
}
// ============================
// broker scan node/unique ptr
Status NewFileFactory::create_file_reader(TFileType::type type, ExecEnv* env,
RuntimeProfile* profile,
const std::vector<TNetworkAddress>& broker_addresses,
const std::map<std::string, std::string>& properties,
const TBrokerRangeDesc& range, int64_t start_offset,
std::unique_ptr<FileReader>& file_reader) {
FileReader* file_reader_ptr;
switch (type) {
case TFileType::FILE_LOCAL: {
file_reader_ptr = new LocalFileReader(range.path, start_offset);
break;
}
case TFileType::FILE_BROKER: {
file_reader_ptr = new BufferedReader(
profile,
new BrokerReader(env, broker_addresses, properties, range.path, start_offset,
range.__isset.file_size ? range.file_size : 0));
break;
}
case TFileType::FILE_S3: {
file_reader_ptr =
new BufferedReader(profile, new S3Reader(properties, range.path, start_offset));
break;
}
case TFileType::FILE_HDFS: {
FileReader* hdfs_reader = nullptr;
RETURN_IF_ERROR(
create_hdfs_reader(range.hdfs_params, range.path, start_offset, &hdfs_reader));
file_reader_ptr = new BufferedReader(profile, hdfs_reader);
break;
}
default:
return Status::InternalError("unsupported file reader type: " + std::to_string(type));
}
file_reader.reset(file_reader_ptr);
return Status::OK();
}
// ============================
// file scan node/unique ptr
Status NewFileFactory::create_file_reader(RuntimeProfile* /*profile*/,
const FileSystemProperties& system_properties,
const FileDescription& file_description,
std::unique_ptr<io::FileSystem>* file_system,
io::FileReaderSPtr* file_reader) {
TFileType::type type = system_properties.system_type;
io::FileSystem* file_system_ptr = nullptr;
switch (type) {
case TFileType::FILE_S3: {
RETURN_IF_ERROR(create_s3_reader(system_properties.properties, file_description.path,
&file_system_ptr, file_reader));
break;
}
case TFileType::FILE_HDFS: {
RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params, file_description.path,
&file_system_ptr, file_reader));
break;
}
default:
return Status::NotSupported("unsupported file reader type: {}", std::to_string(type));
}
file_system->reset(file_system_ptr);
return Status::OK();
}
// file scan node/stream load pipe
Status NewFileFactory::create_pipe_reader(const TUniqueId& load_id,
std::shared_ptr<FileReader>& file_reader) {
file_reader = ExecEnv::GetInstance()->load_stream_mgr()->get(load_id);
if (!file_reader) {
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}
return Status::OK();
}
Status NewFileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
int64_t start_offset, FileReader** reader) {
*reader = new HdfsFileReader(hdfs_params, path, start_offset);
return Status::OK();
}
Status NewFileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
io::FileSystem** hdfs_file_system,
io::FileReaderSPtr* reader) {
*hdfs_file_system = new io::HdfsFileSystem(hdfs_params, path);
(dynamic_cast<io::HdfsFileSystem*>(*hdfs_file_system))->connect();
(*hdfs_file_system)->open_file(path, reader);
return Status::OK();
}
Status NewFileFactory::create_hdfs_writer(const std::map<std::string, std::string>& properties,
const std::string& path,
std::unique_ptr<FileWriter>& writer) {
writer.reset(new HDFSWriter(properties, path));
return Status::OK();
}
Status NewFileFactory::create_s3_reader(const std::map<std::string, std::string>& prop,
const std::string& path, io::FileSystem** s3_file_system,
io::FileReaderSPtr* reader) {
S3URI s3_uri(path);
if (!s3_uri.parse()) {
return Status::InvalidArgument("s3 uri is invalid: {}", path);
}
S3Conf s3_conf;
RETURN_IF_ERROR(ClientFactory::convert_properties_to_s3_conf(prop, s3_uri, &s3_conf));
*s3_file_system = new io::S3FileSystem(s3_conf, "");
(dynamic_cast<io::S3FileSystem*>(*s3_file_system))->connect();
(*s3_file_system)->open_file(s3_uri.get_key(), reader);
return Status::OK();
}
} // namespace doris

View File

@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
#include "io/file_reader.h"
#include "io/file_writer.h"
#include "io/fs/file_reader.h"
namespace doris {
namespace io {
class FileSystem;
}
class ExecEnv;
class TNetworkAddress;
class RuntimeProfile;
struct FileSystemProperties {
TFileType::type system_type;
std::map<std::string, std::string> properties;
THdfsParams hdfs_params;
std::vector<TNetworkAddress> broker_addresses;
};
struct FileDescription {
std::string path;
int64_t start_offset;
size_t file_size;
size_t buffer_size;
};
class NewFileFactory {
public:
// Create FileWriter
static Status create_file_writer(TFileType::type type, ExecEnv* env,
const std::vector<TNetworkAddress>& broker_addresses,
const std::map<std::string, std::string>& properties,
const std::string& path, int64_t start_offset,
std::unique_ptr<FileWriter>& file_writer);
/**
* Create FileReader for broker scan node related scanners and readers
*/
static Status create_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile,
const std::vector<TNetworkAddress>& broker_addresses,
const std::map<std::string, std::string>& properties,
const TBrokerRangeDesc& range, int64_t start_offset,
std::unique_ptr<FileReader>& file_reader);
/**
* Create FileReader for file scan node rlated scanners and readers
* If buffer_size > 0, use BufferedReader to wrap the underlying FileReader;
* Otherwise, return the underlying FileReader directly.
*/
static Status create_file_reader(RuntimeProfile* profile,
const FileSystemProperties& system_properties,
const FileDescription& file_description,
std::unique_ptr<io::FileSystem>* file_system,
io::FileReaderSPtr* file_reader);
// Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id,
std::shared_ptr<FileReader>& file_reader);
// TODO(ftw): should be delete after new_hdfs_file_reader ready
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
int64_t start_offset, FileReader** reader);
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
io::FileSystem** hdfs_file_system, io::FileReaderSPtr* reader);
// TODO(ftw): should be delete after new_hdfs_file_writer ready
static Status create_hdfs_writer(const std::map<std::string, std::string>& properties,
const std::string& path, std::unique_ptr<FileWriter>& writer);
static Status create_s3_reader(const std::map<std::string, std::string>& prop,
const std::string& path, io::FileSystem** s3_file_system,
io::FileReaderSPtr* reader);
static TFileType::type convert_storage_type(TStorageBackendType::type type) {
switch (type) {
case TStorageBackendType::LOCAL:
return TFileType::FILE_LOCAL;
case TStorageBackendType::S3:
return TFileType::FILE_S3;
case TStorageBackendType::BROKER:
return TFileType::FILE_BROKER;
case TStorageBackendType::HDFS:
return TFileType::FILE_HDFS;
default:
LOG(FATAL) << "not match type to convert, from type:" << type;
}
__builtin_unreachable();
}
};
} // namespace doris

View File

@ -0,0 +1,348 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/format/file_reader/new_plain_text_line_reader.h"
#include "common/status.h"
#include "exec/decompressor.h"
#include "io/fs/file_reader.h"
#include "olap/iterators.h"
// INPUT_CHUNK must
// larger than 15B for correct lz4 file decompressing
// larger than 300B for correct lzo header decompressing
#define INPUT_CHUNK (2 * 1024 * 1024)
// #define INPUT_CHUNK (34)
#define OUTPUT_CHUNK (8 * 1024 * 1024)
// #define OUTPUT_CHUNK (32)
// leave these 2 size small for debugging
namespace doris {
NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile, io::FileReader* file_reader,
Decompressor* decompressor, size_t length,
const std::string& line_delimiter,
size_t line_delimiter_length, size_t current_offset)
: _profile(profile),
_file_reader(file_reader),
_decompressor(decompressor),
_min_length(length),
_total_read_bytes(0),
_line_delimiter(line_delimiter),
_line_delimiter_length(line_delimiter_length),
_input_buf(new uint8_t[INPUT_CHUNK]),
_input_buf_size(INPUT_CHUNK),
_input_buf_pos(0),
_input_buf_limit(0),
_output_buf(new uint8_t[OUTPUT_CHUNK]),
_output_buf_size(OUTPUT_CHUNK),
_output_buf_pos(0),
_output_buf_limit(0),
_file_eof(false),
_eof(false),
_stream_end(true),
_more_input_bytes(0),
_more_output_bytes(0),
_current_offset(current_offset),
_bytes_read_counter(nullptr),
_read_timer(nullptr),
_bytes_decompress_counter(nullptr),
_decompress_timer(nullptr) {
_bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
_read_timer = ADD_TIMER(_profile, "FileReadTime");
_bytes_decompress_counter = ADD_COUNTER(_profile, "BytesDecompressed", TUnit::BYTES);
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
}
NewPlainTextLineReader::~NewPlainTextLineReader() {
close();
}
void NewPlainTextLineReader::close() {
if (_input_buf != nullptr) {
delete[] _input_buf;
_input_buf = nullptr;
}
if (_output_buf != nullptr) {
delete[] _output_buf;
_output_buf = nullptr;
}
}
inline bool NewPlainTextLineReader::update_eof() {
if (done()) {
_eof = true;
} else if (_decompressor == nullptr && (_min_length >= 0 && _total_read_bytes >= _min_length)) {
_eof = true;
}
return _eof;
}
uint8_t* NewPlainTextLineReader::update_field_pos_and_find_line_delimiter(const uint8_t* start,
size_t len) {
// TODO: meanwhile find and save field pos
return (uint8_t*)memmem(start, len, _line_delimiter.c_str(), _line_delimiter_length);
}
// extend input buf if necessary only when _more_input_bytes > 0
void NewPlainTextLineReader::extend_input_buf() {
DCHECK(_more_input_bytes > 0);
// left capacity
size_t capacity = _input_buf_size - _input_buf_limit;
// we want at least _more_input_bytes capacity left
do {
if (capacity >= _more_input_bytes) {
// enough
break;
}
capacity = capacity + _input_buf_pos;
if (capacity >= _more_input_bytes) {
// move the read remaining to the beginning of the current input buf,
memmove(_input_buf, _input_buf + _input_buf_pos, input_buf_read_remaining());
_input_buf_limit -= _input_buf_pos;
_input_buf_pos = 0;
break;
}
while (_input_buf_size - input_buf_read_remaining() < _more_input_bytes) {
_input_buf_size = _input_buf_size * 2;
}
uint8_t* new_input_buf = new uint8_t[_input_buf_size];
memmove(new_input_buf, _input_buf + _input_buf_pos, input_buf_read_remaining());
delete[] _input_buf;
_input_buf = new_input_buf;
_input_buf_limit -= _input_buf_pos;
_input_buf_pos = 0;
} while (false);
// LOG(INFO) << "extend input buf."
// << " input_buf_size: " << _input_buf_size
// << " input_buf_pos: " << _input_buf_pos
// << " input_buf_limit: " << _input_buf_limit;
}
void NewPlainTextLineReader::extend_output_buf() {
// left capacity
size_t capacity = _output_buf_size - _output_buf_limit;
// we want at least 1024 bytes capacity left
size_t target = std::max<size_t>(1024, capacity + _more_output_bytes);
do {
// 1. if left capacity is enough, return;
if (capacity >= target) {
break;
}
// 2. try reuse buf
capacity = capacity + _output_buf_pos;
if (capacity >= target) {
// move the read remaining to the beginning of the current output buf,
memmove(_output_buf, _output_buf + _output_buf_pos, output_buf_read_remaining());
_output_buf_limit -= _output_buf_pos;
_output_buf_pos = 0;
break;
}
// 3. extend buf size to meet the target
while (_output_buf_size - output_buf_read_remaining() < target) {
_output_buf_size = _output_buf_size * 2;
}
uint8_t* new_output_buf = new uint8_t[_output_buf_size];
memmove(new_output_buf, _output_buf + _output_buf_pos, output_buf_read_remaining());
delete[] _output_buf;
_output_buf = new_output_buf;
_output_buf_limit -= _output_buf_pos;
_output_buf_pos = 0;
} while (false);
// LOG(INFO) << "extend output buf."
// << " output_buf_size: " << _output_buf_size
// << " output_buf_pos: " << _output_buf_pos
// << " output_buf_limit: " << _output_buf_limit;
}
Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* eof) {
if (_eof || update_eof()) {
*size = 0;
*eof = true;
return Status::OK();
}
int found_line_delimiter = 0;
size_t offset = 0;
while (!done()) {
// find line delimiter in current decompressed data
uint8_t* cur_ptr = _output_buf + _output_buf_pos;
uint8_t* pos = update_field_pos_and_find_line_delimiter(
cur_ptr + offset, output_buf_read_remaining() - offset);
if (pos == nullptr) {
// didn't find line delimiter, read more data from decompressor
// for multi bytes delimiter we cannot set offset to avoid incomplete
// delimiter
// read from file reader
offset = output_buf_read_remaining();
extend_output_buf();
if ((_input_buf_limit > _input_buf_pos) && _more_input_bytes == 0) {
// we still have data in input which is not decompressed.
// and no more data is required for input
} else {
size_t read_len = 0;
int64_t buffer_len = 0;
uint8_t* file_buf;
if (_decompressor == nullptr) {
// uncompressed file, read directly into output buf
file_buf = _output_buf + _output_buf_limit;
buffer_len = _output_buf_size - _output_buf_limit;
} else {
// MARK
if (_more_input_bytes > 0) {
// we already extend input buf.
// current data in input buf should remain unchanged
file_buf = _input_buf + _input_buf_limit;
buffer_len = _input_buf_size - _input_buf_limit;
// leave input pos and limit unchanged
} else {
// here we are sure that all data in input buf has been consumed.
// which means input pos and limit should be reset.
file_buf = _input_buf;
buffer_len = _input_buf_size;
// reset input pos and limit
_input_buf_pos = 0;
_input_buf_limit = 0;
}
}
{
SCOPED_TIMER(_read_timer);
Slice file_slice(file_buf, buffer_len);
IOContext io_ctx;
RETURN_IF_ERROR(
_file_reader->read_at(_current_offset, file_slice, io_ctx, &read_len));
COUNTER_UPDATE(_bytes_read_counter, read_len);
}
// LOG(INFO) << "after read file: _file_eof: " << _file_eof << " read_len: " << read_len;
if (_file_eof || read_len == 0) {
if (!_stream_end) {
return Status::InternalError(
"Compressed file has been truncated, which is not allowed");
} else {
// last loop we meet stream end,
// and now we finished reading file, so we are finished
// break this loop to see if there is data in buffer
break;
}
}
if (_decompressor == nullptr) {
_output_buf_limit += read_len;
_stream_end = true;
} else {
// only update input limit.
// input pos is set at MARK step
_input_buf_limit += read_len;
}
if (read_len < _more_input_bytes) {
// we failed to read enough data, continue to read from file
_more_input_bytes = _more_input_bytes - read_len;
continue;
}
}
if (_decompressor != nullptr) {
SCOPED_TIMER(_decompress_timer);
// decompress
size_t input_read_bytes = 0;
size_t decompressed_len = 0;
_more_input_bytes = 0;
_more_output_bytes = 0;
RETURN_IF_ERROR(_decompressor->decompress(
_input_buf + _input_buf_pos, /* input */
_input_buf_limit - _input_buf_pos, /* input_len */
&input_read_bytes, _output_buf + _output_buf_limit, /* output */
_output_buf_size - _output_buf_limit, /* output_max_len */
&decompressed_len, &_stream_end, &_more_input_bytes, &_more_output_bytes));
// LOG(INFO) << "after decompress:"
// << " stream_end: " << _stream_end
// << " input_read_bytes: " << input_read_bytes
// << " decompressed_len: " << decompressed_len
// << " more_input_bytes: " << _more_input_bytes
// << " more_output_bytes: " << _more_output_bytes;
// update pos and limit
_input_buf_pos += input_read_bytes;
_output_buf_limit += decompressed_len;
COUNTER_UPDATE(_bytes_decompress_counter, decompressed_len);
// TODO(cmy): watch this case
if ((input_read_bytes == 0 /*decompressed_len == 0*/) && _more_input_bytes == 0 &&
_more_output_bytes == 0) {
// decompress made no progress, may be
// A. input data is not enough to decompress data to output
// B. output buf is too small to save decompressed output
// this is very unlikely to happen
// print the log and just go to next loop to read more data or extend output buf.
// (cmy), for now, return failed to avoid potential endless loop
std::stringstream ss;
ss << "decompress made no progress."
<< " input_read_bytes: " << input_read_bytes
<< " decompressed_len: " << decompressed_len;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
if (_more_input_bytes > 0) {
extend_input_buf();
}
}
} else {
// we found a complete line
// ready to return
offset = pos - cur_ptr;
found_line_delimiter = _line_delimiter_length;
break;
}
} // while (!done())
*ptr = _output_buf + _output_buf_pos;
*size = offset;
// Skip offset and _line_delimiter size;
_output_buf_pos += offset + found_line_delimiter;
if (offset == 0 && found_line_delimiter == 0) {
*eof = true;
} else {
*eof = false;
}
// update total read bytes
_total_read_bytes += *size + found_line_delimiter;
return Status::OK();
}
} // namespace doris

View File

@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "exec/line_reader.h"
#include "util/runtime_profile.h"
namespace doris {
namespace io {
class FileReader;
}
class Decompressor;
class Status;
class NewPlainTextLineReader : public LineReader {
public:
NewPlainTextLineReader(RuntimeProfile* profile, io::FileReader* file_reader,
Decompressor* decompressor, size_t length,
const std::string& line_delimiter, size_t line_delimiter_length,
size_t current_offset);
~NewPlainTextLineReader() override;
Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override;
void close() override;
private:
bool update_eof();
size_t output_buf_read_remaining() const { return _output_buf_limit - _output_buf_pos; }
size_t input_buf_read_remaining() const { return _input_buf_limit - _input_buf_pos; }
bool done() { return _file_eof && output_buf_read_remaining() == 0; }
// find line delimiter from 'start' to 'start' + len,
// return line delimiter pos if found, otherwise return nullptr.
// TODO:
// save to positions of field separator
uint8_t* update_field_pos_and_find_line_delimiter(const uint8_t* start, size_t len);
void extend_input_buf();
void extend_output_buf();
RuntimeProfile* _profile;
io::FileReader* _file_reader;
Decompressor* _decompressor;
// the min length that should be read.
// -1 means endless(for stream load)
// and only valid if the content is uncompressed
size_t _min_length;
size_t _total_read_bytes;
std::string _line_delimiter;
size_t _line_delimiter_length;
// save the data read from file reader
uint8_t* _input_buf;
size_t _input_buf_size;
size_t _input_buf_pos;
size_t _input_buf_limit;
// save the data decompressed from decompressor.
uint8_t* _output_buf;
size_t _output_buf_size;
size_t _output_buf_pos;
size_t _output_buf_limit;
bool _file_eof;
bool _eof;
bool _stream_end;
size_t _more_input_bytes;
size_t _more_output_bytes;
size_t _current_offset;
// Profile counters
RuntimeProfile::Counter* _bytes_read_counter;
RuntimeProfile::Counter* _read_timer;
RuntimeProfile::Counter* _bytes_decompress_counter;
RuntimeProfile::Counter* _decompress_timer;
};
} // namespace doris