diff --git a/be/src/common/config.h b/be/src/common/config.h index 787c5b5513..7c44d8041d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -707,6 +707,8 @@ CONF_Int32(send_batch_thread_pool_queue_size, "102400"); CONF_Int32(download_cache_thread_pool_thread_num, "48"); // number of download cache thread pool queue size CONF_Int32(download_cache_thread_pool_queue_size, "102400"); +// download cache buffer size +CONF_Int64(download_cache_buffer_size, "10485760"); // Limit the number of segment of a newly created rowset. // The newly created rowset may to be compacted after loading, diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt index 69a473d14a..5a9fdfafd3 100644 --- a/be/src/io/CMakeLists.txt +++ b/be/src/io/CMakeLists.txt @@ -40,6 +40,7 @@ set(IO_FILES fs/local_file_writer.cpp fs/s3_file_reader.cpp fs/s3_file_system.cpp + cache/file_cache.cpp cache/file_cache_manager.cpp cache/sub_file_cache.cpp cache/whole_file_cache.cpp diff --git a/be/src/io/cache/file_cache.cpp b/be/src/io/cache/file_cache.cpp new file mode 100644 index 0000000000..3be8c30759 --- /dev/null +++ b/be/src/io/cache/file_cache.cpp @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/cache/file_cache.h" + +#include "common/config.h" +#include "io/fs/local_file_system.h" + +namespace doris { +namespace io { + +Status FileCache::download_cache_to_local(const Path& cache_file, const Path& cache_done_file, + io::FileReaderSPtr remote_file_reader, size_t req_size, + size_t offset) { + LOG(INFO) << "Download cache file from remote file: " << remote_file_reader->path().native() + << " -> " << cache_file.native() << ". offset: " << offset + << ", request size: " << req_size; + io::FileWriterPtr file_writer; + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->create_file(cache_file, &file_writer), + fmt::format("Create local cache file failed: {}", cache_file.native())); + auto func = [cache_file, cache_done_file, remote_file_reader, req_size, + offset](io::FileWriter* file_writer) { + char* file_buf = ExecEnv::GetInstance()->get_download_cache_buf( + ExecEnv::GetInstance()->get_serial_download_cache_thread_token()); + size_t count_bytes_read = 0; + size_t need_req_size = config::download_cache_buffer_size; + while (count_bytes_read < req_size) { + memset(file_buf, 0, need_req_size); + if (req_size - count_bytes_read < config::download_cache_buffer_size) { + need_req_size = req_size - count_bytes_read; + } + Slice file_slice(file_buf, need_req_size); + size_t bytes_read = 0; + RETURN_NOT_OK_STATUS_WITH_WARN( + remote_file_reader->read_at(offset + count_bytes_read, file_slice, &bytes_read), + fmt::format("read remote file failed. {}. offset: {}, request size: {}", + remote_file_reader->path().native(), offset + count_bytes_read, + need_req_size)); + if (bytes_read != need_req_size) { + LOG(ERROR) << "read remote file failed: " << remote_file_reader->path().native() + << ", bytes read: " << bytes_read + << " vs need read size: " << need_req_size; + return Status::OLAPInternalError(OLAP_ERR_OS_ERROR); + } + count_bytes_read += bytes_read; + RETURN_NOT_OK_STATUS_WITH_WARN( + file_writer->append(file_slice), + fmt::format("Write local cache file failed: {}", cache_file.native())); + } + return Status::OK(); + }; + auto st = func(file_writer.get()); + if (!st.ok()) { + WARN_IF_ERROR(file_writer->close(), + fmt::format("Close local cache file failed: {}", cache_file.native())); + return st; + } + RETURN_NOT_OK_STATUS_WITH_WARN( + file_writer->close(), + fmt::format("Close local cache file failed: {}", cache_file.native())); + io::FileWriterPtr done_file_writer; + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->create_file(cache_done_file, &done_file_writer), + fmt::format("Create local done file failed: {}", cache_done_file.native())); + RETURN_NOT_OK_STATUS_WITH_WARN( + done_file_writer->close(), + fmt::format("Close local done file failed: {}", cache_done_file.native())); + return Status::OK(); +} + +} // namespace io +} // namespace doris diff --git a/be/src/io/cache/file_cache.h b/be/src/io/cache/file_cache.h index d68c840cc7..979f3c145f 100644 --- a/be/src/io/cache/file_cache.h +++ b/be/src/io/cache/file_cache.h @@ -45,6 +45,10 @@ public: virtual Status clean_timeout_cache() = 0; virtual Status clean_all_cache() = 0; + + Status download_cache_to_local(const Path& cache_file, const Path& cache_done_file, + io::FileReaderSPtr remote_file_reader, size_t req_size, + size_t offset = 0); }; using FileCachePtr = std::shared_ptr; diff --git a/be/src/io/cache/sub_file_cache.cpp b/be/src/io/cache/sub_file_cache.cpp index ad20a4e686..ce1a8722a6 100644 --- a/be/src/io/cache/sub_file_cache.cpp +++ b/be/src/io/cache/sub_file_cache.cpp @@ -70,9 +70,18 @@ Status SubFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) { if (offset_begin + req_size > _remote_file_reader->size()) { req_size = _remote_file_reader->size() - offset_begin; } - RETURN_IF_ERROR(_generate_cache_reader(offset_begin, req_size)); + auto st = _generate_cache_reader(offset_begin, req_size); + if (!st.ok()) { + WARN_IF_ERROR(_remote_file_reader->close(), + fmt::format("Close remote file reader failed: {}", + _remote_file_reader->path().native())); + return st; + } } } + RETURN_NOT_OK_STATUS_WITH_WARN(_remote_file_reader->close(), + fmt::format("Close remote file reader failed: {}", + _remote_file_reader->path().native())); _cache_file_size = _get_cache_file_size(); } { @@ -150,41 +159,10 @@ Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) { fmt::format("Check local cache file exist failed. {}", cache_file.native())); } - LOG(INFO) << "Download cache file from remote file: " - << _remote_file_reader->path().native() << " -> " - << cache_file.native(); - std::unique_ptr file_buf(new char[req_size]); - Slice file_slice(file_buf.get(), req_size); - size_t bytes_read = 0; RETURN_NOT_OK_STATUS_WITH_WARN( - _remote_file_reader->read_at(offset, file_slice, &bytes_read), - fmt::format("read remote file failed. {}. offset: {}, size: {}", - _remote_file_reader->path().native(), offset, req_size)); - if (bytes_read != req_size) { - LOG(ERROR) << "read remote file failed: " - << _remote_file_reader->path().native() - << ", bytes read: " << bytes_read - << " vs file size: " << _remote_file_reader->size(); - return Status::OLAPInternalError(OLAP_ERR_OS_ERROR); - } - io::FileWriterPtr file_writer; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->create_file(cache_file, &file_writer), - fmt::format("Create local cache file failed: {}", cache_file.native())); - RETURN_NOT_OK_STATUS_WITH_WARN( - file_writer->append(file_slice), - fmt::format("Write local cache file failed: {}", cache_file.native())); - RETURN_NOT_OK_STATUS_WITH_WARN( - file_writer->close(), - fmt::format("Close local cache file failed: {}", cache_file.native())); - io::FileWriterPtr done_file_writer; - RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->create_file( - cache_done_file, &done_file_writer), - fmt::format("Create local done file failed: {}", - cache_done_file.native())); - RETURN_NOT_OK_STATUS_WITH_WARN(done_file_writer->close(), - fmt::format("Close local done file failed: {}", - cache_done_file.native())); + download_cache_to_local(cache_file, cache_done_file, + _remote_file_reader, req_size, offset), + "Download cache from remote to local failed."); return Status::OK(); }; download_st.set_value(func()); diff --git a/be/src/io/cache/whole_file_cache.cpp b/be/src/io/cache/whole_file_cache.cpp index 2c450d26e3..5cc67928bc 100644 --- a/be/src/io/cache/whole_file_cache.cpp +++ b/be/src/io/cache/whole_file_cache.cpp @@ -36,7 +36,16 @@ WholeFileCache::~WholeFileCache() {} Status WholeFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) { if (_cache_file_reader == nullptr) { - RETURN_IF_ERROR(_generate_cache_reader(offset, result.size)); + auto st = _generate_cache_reader(offset, result.size); + if (!st.ok()) { + WARN_IF_ERROR(_remote_file_reader->close(), + fmt::format("Close remote file reader failed: {}", + _remote_file_reader->path().native())); + return st; + } + RETURN_NOT_OK_STATUS_WITH_WARN(_remote_file_reader->close(), + fmt::format("Close remote file reader failed: {}", + _remote_file_reader->path().native())); } std::shared_lock rlock(_cache_lock); RETURN_NOT_OK_STATUS_WITH_WARN( @@ -99,41 +108,11 @@ Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) { fmt::format("Check local cache file exist failed. {}", cache_file.native())); } - LOG(INFO) << "Download cache file from remote file: " - << _remote_file_reader->path().native() << " -> " - << cache_file.native(); - std::unique_ptr file_buf(new char[_remote_file_reader->size()]); - Slice file_slice(file_buf.get(), _remote_file_reader->size()); - size_t bytes_read = 0; + size_t req_size = _remote_file_reader->size(); RETURN_NOT_OK_STATUS_WITH_WARN( - _remote_file_reader->read_at(0, file_slice, &bytes_read), - fmt::format("read remote file failed. {}", - _remote_file_reader->path().native())); - if (bytes_read != _remote_file_reader->size()) { - LOG(ERROR) << "read remote file failed: " - << _remote_file_reader->path().native() - << ", bytes read: " << bytes_read - << " vs file size: " << _remote_file_reader->size(); - return Status::OLAPInternalError(OLAP_ERR_OS_ERROR); - } - io::FileWriterPtr file_writer; - RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->create_file(cache_file, &file_writer), - fmt::format("Create local cache file failed: {}", cache_file.native())); - RETURN_NOT_OK_STATUS_WITH_WARN( - file_writer->append(file_slice), - fmt::format("Write local cache file failed: {}", cache_file.native())); - RETURN_NOT_OK_STATUS_WITH_WARN( - file_writer->close(), - fmt::format("Close local cache file failed: {}", cache_file.native())); - io::FileWriterPtr done_file_writer; - RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->create_file( - cache_done_file, &done_file_writer), - fmt::format("Create local done file failed: {}", - cache_done_file.native())); - RETURN_NOT_OK_STATUS_WITH_WARN(done_file_writer->close(), - fmt::format("Close local done file failed: {}", - cache_done_file.native())); + download_cache_to_local(cache_file, cache_done_file, + _remote_file_reader, req_size), + "Download cache from remote to local failed."); return Status::OK(); }; download_st.set_value(func()); diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index a86889f35d..8e10db7d52 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -17,6 +17,9 @@ #pragma once +#include + +#include "common/config.h" #include "common/status.h" #include "olap/options.h" #include "util/threadpool.h" @@ -135,6 +138,18 @@ public: ThreadPoolToken* get_serial_download_cache_thread_token() { return _serial_download_cache_thread_token.get(); } + void init_download_cache_buf() { + std::unique_ptr download_cache_buf(new char[config::download_cache_buffer_size]); + memset(download_cache_buf.get(), 0, config::download_cache_buffer_size); + _download_cache_buf_map[_serial_download_cache_thread_token.get()] = + std::move(download_cache_buf); + } + char* get_download_cache_buf(ThreadPoolToken* token) { + if (_download_cache_buf_map.find(token) == _download_cache_buf_map.end()) { + return nullptr; + } + return _download_cache_buf_map[token].get(); + } CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; } FragmentMgr* fragment_mgr() { return _fragment_mgr; } ResultCache* result_cache() { return _result_cache; } @@ -223,7 +238,8 @@ private: std::unique_ptr _download_cache_thread_pool; // A token used to submit download cache task serially std::unique_ptr _serial_download_cache_thread_token; - + // ThreadPoolToken -> buffer + std::unordered_map> _download_cache_buf_map; CgroupsMgr* _cgroups_mgr = nullptr; FragmentMgr* _fragment_mgr = nullptr; ResultCache* _result_cache = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 0aedb39b49..907b7c26f1 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -137,6 +137,7 @@ Status ExecEnv::_init(const std::vector& store_paths) { .set_max_queue_size(config::download_cache_thread_pool_queue_size) .build(&_download_cache_thread_pool); set_serial_download_cache_thread_token(); + init_download_cache_buf(); _scanner_scheduler = new doris::vectorized::ScannerScheduler(); diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 7d9ebc2b28..6120d389b7 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -1119,6 +1119,12 @@ This configuration is used for the context gc thread scheduling cycle. Note: The * Description: The number of threads in the DownloadCache thread pool. In the download cache task of FileCache, the download cache operation will be submitted to the thread pool as a thread task and wait to be scheduled. After the number of submitted tasks exceeds the length of the thread pool queue, subsequent submitted tasks will be blocked until there is a empty slot in the queue. * Default value: 102400 +### `download_cache_buffer_size` + +* Type: int64 +* Description: The size of the buffer used to receive data when downloading the cache. +* Default value: 10485760 + ### `single_replica_load_brpc_port` * Type: int32 diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 540324976e..1861316c76 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -1120,6 +1120,12 @@ routine load任务的线程池大小。 这应该大于 FE 配置 'max_concurren * 描述: DownloadCache线程池线程数目. 在FileCache的缓存下载任务之中, 缓存下载操作会作为一个线程task提交到线程池之中等待被调度,而提交的任务数目超过线程池队列的长度之后,后续提交的任务将阻塞直到队列之中有新的空缺。 * 默认值:102400 +### `download_cache_buffer_size` + +* 类型: int64 +* 描述: 下载缓存时用于接收数据的buffer的大小。 +* 默认值: 10485760 + ### `serialize_batch` 默认值:false