[optimize](remote) Optimize cache reader use a pre-created buffer when downloading the cache (#12165)

* optimize cache reader

* add description for config

* optimize cache reader

* optimize cache reader
This commit is contained in:
zxealous
2022-08-31 10:15:40 +08:00
committed by GitHub
parent f0cde35ea6
commit 254cb321b9
10 changed files with 151 additions and 71 deletions

View File

@ -707,6 +707,8 @@ CONF_Int32(send_batch_thread_pool_queue_size, "102400");
CONF_Int32(download_cache_thread_pool_thread_num, "48");
// number of download cache thread pool queue size
CONF_Int32(download_cache_thread_pool_queue_size, "102400");
// download cache buffer size
CONF_Int64(download_cache_buffer_size, "10485760");
// Limit the number of segment of a newly created rowset.
// The newly created rowset may to be compacted after loading,

View File

@ -40,6 +40,7 @@ set(IO_FILES
fs/local_file_writer.cpp
fs/s3_file_reader.cpp
fs/s3_file_system.cpp
cache/file_cache.cpp
cache/file_cache_manager.cpp
cache/sub_file_cache.cpp
cache/whole_file_cache.cpp

87
be/src/io/cache/file_cache.cpp vendored Normal file
View File

@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/cache/file_cache.h"
#include "common/config.h"
#include "io/fs/local_file_system.h"
namespace doris {
namespace io {
Status FileCache::download_cache_to_local(const Path& cache_file, const Path& cache_done_file,
io::FileReaderSPtr remote_file_reader, size_t req_size,
size_t offset) {
LOG(INFO) << "Download cache file from remote file: " << remote_file_reader->path().native()
<< " -> " << cache_file.native() << ". offset: " << offset
<< ", request size: " << req_size;
io::FileWriterPtr file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_file(cache_file, &file_writer),
fmt::format("Create local cache file failed: {}", cache_file.native()));
auto func = [cache_file, cache_done_file, remote_file_reader, req_size,
offset](io::FileWriter* file_writer) {
char* file_buf = ExecEnv::GetInstance()->get_download_cache_buf(
ExecEnv::GetInstance()->get_serial_download_cache_thread_token());
size_t count_bytes_read = 0;
size_t need_req_size = config::download_cache_buffer_size;
while (count_bytes_read < req_size) {
memset(file_buf, 0, need_req_size);
if (req_size - count_bytes_read < config::download_cache_buffer_size) {
need_req_size = req_size - count_bytes_read;
}
Slice file_slice(file_buf, need_req_size);
size_t bytes_read = 0;
RETURN_NOT_OK_STATUS_WITH_WARN(
remote_file_reader->read_at(offset + count_bytes_read, file_slice, &bytes_read),
fmt::format("read remote file failed. {}. offset: {}, request size: {}",
remote_file_reader->path().native(), offset + count_bytes_read,
need_req_size));
if (bytes_read != need_req_size) {
LOG(ERROR) << "read remote file failed: " << remote_file_reader->path().native()
<< ", bytes read: " << bytes_read
<< " vs need read size: " << need_req_size;
return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
}
count_bytes_read += bytes_read;
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->append(file_slice),
fmt::format("Write local cache file failed: {}", cache_file.native()));
}
return Status::OK();
};
auto st = func(file_writer.get());
if (!st.ok()) {
WARN_IF_ERROR(file_writer->close(),
fmt::format("Close local cache file failed: {}", cache_file.native()));
return st;
}
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->close(),
fmt::format("Close local cache file failed: {}", cache_file.native()));
io::FileWriterPtr done_file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_file(cache_done_file, &done_file_writer),
fmt::format("Create local done file failed: {}", cache_done_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
done_file_writer->close(),
fmt::format("Close local done file failed: {}", cache_done_file.native()));
return Status::OK();
}
} // namespace io
} // namespace doris

View File

@ -45,6 +45,10 @@ public:
virtual Status clean_timeout_cache() = 0;
virtual Status clean_all_cache() = 0;
Status download_cache_to_local(const Path& cache_file, const Path& cache_done_file,
io::FileReaderSPtr remote_file_reader, size_t req_size,
size_t offset = 0);
};
using FileCachePtr = std::shared_ptr<FileCache>;

View File

@ -70,9 +70,18 @@ Status SubFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) {
if (offset_begin + req_size > _remote_file_reader->size()) {
req_size = _remote_file_reader->size() - offset_begin;
}
RETURN_IF_ERROR(_generate_cache_reader(offset_begin, req_size));
auto st = _generate_cache_reader(offset_begin, req_size);
if (!st.ok()) {
WARN_IF_ERROR(_remote_file_reader->close(),
fmt::format("Close remote file reader failed: {}",
_remote_file_reader->path().native()));
return st;
}
}
}
RETURN_NOT_OK_STATUS_WITH_WARN(_remote_file_reader->close(),
fmt::format("Close remote file reader failed: {}",
_remote_file_reader->path().native()));
_cache_file_size = _get_cache_file_size();
}
{
@ -150,41 +159,10 @@ Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
fmt::format("Check local cache file exist failed. {}",
cache_file.native()));
}
LOG(INFO) << "Download cache file from remote file: "
<< _remote_file_reader->path().native() << " -> "
<< cache_file.native();
std::unique_ptr<char[]> file_buf(new char[req_size]);
Slice file_slice(file_buf.get(), req_size);
size_t bytes_read = 0;
RETURN_NOT_OK_STATUS_WITH_WARN(
_remote_file_reader->read_at(offset, file_slice, &bytes_read),
fmt::format("read remote file failed. {}. offset: {}, size: {}",
_remote_file_reader->path().native(), offset, req_size));
if (bytes_read != req_size) {
LOG(ERROR) << "read remote file failed: "
<< _remote_file_reader->path().native()
<< ", bytes read: " << bytes_read
<< " vs file size: " << _remote_file_reader->size();
return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
}
io::FileWriterPtr file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_file(cache_file, &file_writer),
fmt::format("Create local cache file failed: {}", cache_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->append(file_slice),
fmt::format("Write local cache file failed: {}", cache_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->close(),
fmt::format("Close local cache file failed: {}", cache_file.native()));
io::FileWriterPtr done_file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->create_file(
cache_done_file, &done_file_writer),
fmt::format("Create local done file failed: {}",
cache_done_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(done_file_writer->close(),
fmt::format("Close local done file failed: {}",
cache_done_file.native()));
download_cache_to_local(cache_file, cache_done_file,
_remote_file_reader, req_size, offset),
"Download cache from remote to local failed.");
return Status::OK();
};
download_st.set_value(func());

View File

@ -36,7 +36,16 @@ WholeFileCache::~WholeFileCache() {}
Status WholeFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) {
if (_cache_file_reader == nullptr) {
RETURN_IF_ERROR(_generate_cache_reader(offset, result.size));
auto st = _generate_cache_reader(offset, result.size);
if (!st.ok()) {
WARN_IF_ERROR(_remote_file_reader->close(),
fmt::format("Close remote file reader failed: {}",
_remote_file_reader->path().native()));
return st;
}
RETURN_NOT_OK_STATUS_WITH_WARN(_remote_file_reader->close(),
fmt::format("Close remote file reader failed: {}",
_remote_file_reader->path().native()));
}
std::shared_lock<std::shared_mutex> rlock(_cache_lock);
RETURN_NOT_OK_STATUS_WITH_WARN(
@ -99,41 +108,11 @@ Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
fmt::format("Check local cache file exist failed. {}",
cache_file.native()));
}
LOG(INFO) << "Download cache file from remote file: "
<< _remote_file_reader->path().native() << " -> "
<< cache_file.native();
std::unique_ptr<char[]> file_buf(new char[_remote_file_reader->size()]);
Slice file_slice(file_buf.get(), _remote_file_reader->size());
size_t bytes_read = 0;
size_t req_size = _remote_file_reader->size();
RETURN_NOT_OK_STATUS_WITH_WARN(
_remote_file_reader->read_at(0, file_slice, &bytes_read),
fmt::format("read remote file failed. {}",
_remote_file_reader->path().native()));
if (bytes_read != _remote_file_reader->size()) {
LOG(ERROR) << "read remote file failed: "
<< _remote_file_reader->path().native()
<< ", bytes read: " << bytes_read
<< " vs file size: " << _remote_file_reader->size();
return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
}
io::FileWriterPtr file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->create_file(cache_file, &file_writer),
fmt::format("Create local cache file failed: {}", cache_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->append(file_slice),
fmt::format("Write local cache file failed: {}", cache_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->close(),
fmt::format("Close local cache file failed: {}", cache_file.native()));
io::FileWriterPtr done_file_writer;
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->create_file(
cache_done_file, &done_file_writer),
fmt::format("Create local done file failed: {}",
cache_done_file.native()));
RETURN_NOT_OK_STATUS_WITH_WARN(done_file_writer->close(),
fmt::format("Close local done file failed: {}",
cache_done_file.native()));
download_cache_to_local(cache_file, cache_done_file,
_remote_file_reader, req_size),
"Download cache from remote to local failed.");
return Status::OK();
};
download_st.set_value(func());

View File

@ -17,6 +17,9 @@
#pragma once
#include <unordered_map>
#include "common/config.h"
#include "common/status.h"
#include "olap/options.h"
#include "util/threadpool.h"
@ -135,6 +138,18 @@ public:
ThreadPoolToken* get_serial_download_cache_thread_token() {
return _serial_download_cache_thread_token.get();
}
void init_download_cache_buf() {
std::unique_ptr<char[]> download_cache_buf(new char[config::download_cache_buffer_size]);
memset(download_cache_buf.get(), 0, config::download_cache_buffer_size);
_download_cache_buf_map[_serial_download_cache_thread_token.get()] =
std::move(download_cache_buf);
}
char* get_download_cache_buf(ThreadPoolToken* token) {
if (_download_cache_buf_map.find(token) == _download_cache_buf_map.end()) {
return nullptr;
}
return _download_cache_buf_map[token].get();
}
CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
ResultCache* result_cache() { return _result_cache; }
@ -223,7 +238,8 @@ private:
std::unique_ptr<ThreadPool> _download_cache_thread_pool;
// A token used to submit download cache task serially
std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token;
// ThreadPoolToken -> buffer
std::unordered_map<ThreadPoolToken*, std::unique_ptr<char[]>> _download_cache_buf_map;
CgroupsMgr* _cgroups_mgr = nullptr;
FragmentMgr* _fragment_mgr = nullptr;
ResultCache* _result_cache = nullptr;

View File

@ -137,6 +137,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
.set_max_queue_size(config::download_cache_thread_pool_queue_size)
.build(&_download_cache_thread_pool);
set_serial_download_cache_thread_token();
init_download_cache_buf();
_scanner_scheduler = new doris::vectorized::ScannerScheduler();

View File

@ -1119,6 +1119,12 @@ This configuration is used for the context gc thread scheduling cycle. Note: The
* Description: The number of threads in the DownloadCache thread pool. In the download cache task of FileCache, the download cache operation will be submitted to the thread pool as a thread task and wait to be scheduled. After the number of submitted tasks exceeds the length of the thread pool queue, subsequent submitted tasks will be blocked until there is a empty slot in the queue.
* Default value: 102400
### `download_cache_buffer_size`
* Type: int64
* Description: The size of the buffer used to receive data when downloading the cache.
* Default value: 10485760
### `single_replica_load_brpc_port`
* Type: int32

View File

@ -1120,6 +1120,12 @@ routine load任务的线程池大小。 这应该大于 FE 配置 'max_concurren
* 描述: DownloadCache线程池线程数目. 在FileCache的缓存下载任务之中, 缓存下载操作会作为一个线程task提交到线程池之中等待被调度,而提交的任务数目超过线程池队列的长度之后,后续提交的任务将阻塞直到队列之中有新的空缺。
* 默认值:102400
### `download_cache_buffer_size`
* 类型: int64
* 描述: 下载缓存时用于接收数据的buffer的大小。
* 默认值: 10485760
### `serialize_batch`
默认值:false