From b6c7f3aeb8947b915076df3eb4f3f86bb0199bdb Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Fri, 5 May 2023 14:28:01 +0800 Subject: [PATCH] [opt](FileCache) Add file cache metrics and management (#19177) Add file cache metrics and management. 1. Get file cache metrics > If the performance of file cache is not efficient, there are currently no metrics to investigate the cause. In practice, hit ratio, disk usage, and segments removed status are very important information. API: `http://be_host:be_webserver_port/metrics` File cache metrics for each base path start with `doris_be_file_cache_` prefix. `hits_ratio` is the hit ratio of the cache since BE startup; `removed_elements` is the num of removed segment files since BE startup; Every cache path has three queues: index, normal and disposable. The capacity ratio of the three queues is 1:17:2. ``` doris_be_file_cache_hits_ratio{path="/mnt/datadisk1/gaoxin/file_cache"} 0.500000 doris_be_file_cache_hits_ratio{path="/mnt/datadisk1/gaoxin/small_file_cache"} 0.500000 doris_be_file_cache_removed_elements{path="/mnt/datadisk1/gaoxin/file_cache"} 0 doris_be_file_cache_removed_elements{path="/mnt/datadisk1/gaoxin/small_file_cache"} 0 doris_be_file_cache_normal_queue_max_size{path="/mnt/datadisk1/gaoxin/file_cache"} 912680550400 doris_be_file_cache_normal_queue_max_size{path="/mnt/datadisk1/gaoxin/small_file_cache"} 8500000000 doris_be_file_cache_normal_queue_max_elements{path="/mnt/datadisk1/gaoxin/file_cache"} 217600 doris_be_file_cache_normal_queue_max_elements{path="/mnt/datadisk1/gaoxin/small_file_cache"} 102400 doris_be_file_cache_normal_queue_curr_size{path="/mnt/datadisk1/gaoxin/file_cache"} 14129846 doris_be_file_cache_normal_queue_curr_size{path="/mnt/datadisk1/gaoxin/small_file_cache"} 14874904 doris_be_file_cache_normal_queue_curr_elements{path="/mnt/datadisk1/gaoxin/file_cache"} 18 doris_be_file_cache_normal_queue_curr_elements{path="/mnt/datadisk1/gaoxin/small_file_cache"} 22 ... ``` 2. Release file cache > Frequent segment files swapping can seriously affect the performance of file cache. Adding a deletion interface helps users clean up the file cache. API: `http://be_host:be_webserver_port/api/file_cache?op=release&base_path=${file_cache_base_path}` Return the number of released segment files. If `base_path` is not provide in url, all cache paths will be released. It's thread-safe to call this api, so only the segment files not been read currently can be released. ``` {"released_elements":22} ``` 3. Specify the base path to store cache data > Currently, regression testing lacks test cases of file cache, which cannot guarantee the stability of file cache. This interface is generally used in regression testing scenarios. Different queries use different paths to verify different usage cases and performance. User can set session variable `file_cache_base_path` to specify the base path to store cache data. `file_cache_base_path="random"` as default, means chosing a random path from cached paths to store cache data. If `file_cache_base_path` is not one of the base paths in BE configuration, a random path is used. --- be/src/http/CMakeLists.txt | 2 +- be/src/http/action/file_cache_action.cpp | 68 ++++++++++++++ be/src/http/action/file_cache_action.h | 40 +++++++++ be/src/io/cache/block/block_file_cache.h | 2 + .../cache/block/block_file_cache_factory.cpp | 26 ++++++ .../io/cache/block/block_file_cache_factory.h | 6 ++ .../io/cache/block/block_lru_file_cache.cpp | 90 +++++++++++++++++++ be/src/io/cache/block/block_lru_file_cache.h | 28 ++++++ .../cache/block/cached_remote_file_reader.cpp | 13 +++ .../cache/block/cached_remote_file_reader.h | 3 + be/src/io/file_factory.cpp | 28 ++++-- be/src/io/file_factory.h | 5 +- be/src/io/fs/buffered_reader.cpp | 6 +- be/src/io/fs/buffered_reader.h | 2 +- be/src/io/fs/file_reader_options.h | 7 ++ be/src/io/fs/remote_file_system.cpp | 8 +- be/src/service/http_service.cpp | 4 + be/src/vec/exec/format/csv/csv_reader.cpp | 8 +- .../vec/exec/format/json/new_json_reader.cpp | 4 +- be/src/vec/exec/format/orc/vorc_reader.cpp | 4 +- .../exec/format/parquet/vparquet_reader.cpp | 4 +- be/src/vec/exec/varrow_scanner.cpp | 4 +- .../org/apache/doris/qe/SessionVariable.java | 16 ++++ gensrc/thrift/PaloInternalService.thrift | 3 + 24 files changed, 352 insertions(+), 29 deletions(-) create mode 100644 be/src/http/action/file_cache_action.cpp create mode 100644 be/src/http/action/file_cache_action.h diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt index 93ce59a986..e563f1dadf 100644 --- a/be/src/http/CMakeLists.txt +++ b/be/src/http/CMakeLists.txt @@ -56,4 +56,4 @@ add_library(Webserver STATIC action/check_tablet_segment_action.cpp action/version_action.cpp action/jeprofile_actions.cpp -) + action/file_cache_action.cpp) diff --git a/be/src/http/action/file_cache_action.cpp b/be/src/http/action/file_cache_action.cpp new file mode 100644 index 0000000000..db432d5598 --- /dev/null +++ b/be/src/http/action/file_cache_action.cpp @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "file_cache_action.h" + +#include +#include +#include +#include + +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "io/cache/block/block_file_cache_factory.h" +#include "olap/olap_define.h" +#include "olap/tablet_meta.h" +#include "util/easy_json.h" + +namespace doris { + +const static std::string HEADER_JSON = "application/json"; +const static std::string OP = "op"; + +Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metrics) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str()); + std::string operation = req->param(OP); + if (operation == "release") { + size_t released = 0; + if (req->param("base_path") != "") { + released = io::FileCacheFactory::instance().try_release(req->param("base_path")); + } else { + released = io::FileCacheFactory::instance().try_release(); + } + EasyJson json; + json["released_elements"] = released; + *json_metrics = json.ToString(); + return Status::OK(); + } + return Status::InternalError("invalid operation: {}", operation); +} + +void FileCacheAction::handle(HttpRequest* req) { + std::string json_metrics; + Status status = _handle_header(req, &json_metrics); + std::string status_result = status.to_json(); + if (status.ok()) { + HttpChannel::send_reply(req, HttpStatus::OK, json_metrics); + } else { + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status_result); + } +} + +} // namespace doris diff --git a/be/src/http/action/file_cache_action.h b/be/src/http/action/file_cache_action.h new file mode 100644 index 0000000000..0460273dbc --- /dev/null +++ b/be/src/http/action/file_cache_action.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "http/http_handler.h" + +namespace doris { + +class HttpRequest; + +class FileCacheAction : public HttpHandler { +public: + FileCacheAction() = default; + + ~FileCacheAction() override = default; + + void handle(HttpRequest* req) override; + +private: + Status _handle_header(HttpRequest* req, std::string* json_metrics); +}; +} // namespace doris diff --git a/be/src/io/cache/block/block_file_cache.h b/be/src/io/cache/block/block_file_cache.h index fc7279b8db..0f1cc347d5 100644 --- a/be/src/io/cache/block/block_file_cache.h +++ b/be/src/io/cache/block/block_file_cache.h @@ -106,6 +106,8 @@ public: static Key hash(const std::string& path); + virtual size_t try_release() = 0; + std::string get_path_in_local_cache(const Key& key, size_t offset, CacheType type) const; std::string get_path_in_local_cache(const Key& key) const; diff --git a/be/src/io/cache/block/block_file_cache_factory.cpp b/be/src/io/cache/block/block_file_cache_factory.cpp index bd933612d1..fc60469ada 100644 --- a/be/src/io/cache/block/block_file_cache_factory.cpp +++ b/be/src/io/cache/block/block_file_cache_factory.cpp @@ -42,6 +42,22 @@ FileCacheFactory& FileCacheFactory::instance() { return ret; } +size_t FileCacheFactory::try_release() { + int elements = 0; + for (auto& cache : _caches) { + elements += cache->try_release(); + } + return elements; +} + +size_t FileCacheFactory::try_release(const std::string& base_path) { + auto iter = _path_to_cache.find(base_path); + if (iter != _path_to_cache.end()) { + return iter->second->try_release(); + } + return 0; +} + Status FileCacheFactory::create_file_cache(const std::string& cache_base_path, const FileCacheSettings& file_cache_settings) { if (config::clear_file_cache) { @@ -56,6 +72,7 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path, std::unique_ptr cache = std::make_unique(cache_base_path, file_cache_settings); RETURN_IF_ERROR(cache->initialize()); + _path_to_cache[cache_base_path] = cache.get(); _caches.push_back(std::move(cache)); LOG(INFO) << "[FileCache] path: " << cache_base_path << " total_size: " << file_cache_settings.total_size; @@ -66,6 +83,15 @@ CloudFileCachePtr FileCacheFactory::get_by_path(const IFileCache::Key& key) { return _caches[KeyHash()(key) % _caches.size()].get(); } +CloudFileCachePtr FileCacheFactory::get_by_path(const std::string& cache_base_path) { + auto iter = _path_to_cache.find(cache_base_path); + if (iter == _path_to_cache.end()) { + return nullptr; + } else { + return iter->second; + } +} + std::vector FileCacheFactory::get_query_context_holders( const TUniqueId& query_id) { std::vector holders; diff --git a/be/src/io/cache/block/block_file_cache_factory.h b/be/src/io/cache/block/block_file_cache_factory.h index aba0402c4e..97bffb7193 100644 --- a/be/src/io/cache/block/block_file_cache_factory.h +++ b/be/src/io/cache/block/block_file_cache_factory.h @@ -42,7 +42,12 @@ public: Status create_file_cache(const std::string& cache_base_path, const FileCacheSettings& file_cache_settings); + size_t try_release(); + + size_t try_release(const std::string& base_path); + CloudFileCachePtr get_by_path(const IFileCache::Key& key); + CloudFileCachePtr get_by_path(const std::string& cache_base_path); std::vector get_query_context_holders( const TUniqueId& query_id); FileCacheFactory() = default; @@ -51,6 +56,7 @@ public: private: std::vector> _caches; + std::unordered_map _path_to_cache; }; } // namespace io diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index c7ffd00319..af614560a9 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -49,6 +49,7 @@ #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "io/fs/path.h" +#include "util/doris_metrics.h" #include "util/slice.h" #include "vec/common/hex.h" @@ -57,6 +58,21 @@ namespace fs = std::filesystem; namespace doris { namespace io { +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_hits_ratio, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_removed_elements, MetricUnit::OPERATIONS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_max_size, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_curr_size, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_max_elements, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_curr_elements, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_normal_queue_max_size, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_normal_queue_curr_size, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_normal_queue_max_elements, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_normal_queue_curr_elements, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_size, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_size, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_elements, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_elements, MetricUnit::NOUNIT); + LRUFileCache::LRUFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings) : IFileCache(cache_base_path, cache_settings) { @@ -67,6 +83,28 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path, _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements, 24 * 60 * 60); + _entity = DorisMetrics::instance()->metric_registry()->register_entity( + "lru_file_cache", {{"path", _cache_base_path}}); + _entity->register_hook(_cache_base_path, std::bind(&LRUFileCache::update_cache_metrics, this)); + + INT_DOUBLE_METRIC_REGISTER(_entity, file_cache_hits_ratio); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_removed_elements); + + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_max_size); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_curr_size); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_max_elements); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_curr_elements); + + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_normal_queue_max_size); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_normal_queue_curr_size); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_normal_queue_max_elements); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_normal_queue_curr_elements); + + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_max_size); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_curr_size); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_max_elements); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_curr_elements); + LOG(INFO) << fmt::format( "file cache path={}, disposable queue size={} elements={}, index queue size={} " "elements={}, query queue " @@ -349,6 +387,12 @@ FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t } DCHECK(!file_blocks.empty()); + _num_read_segments += file_blocks.size(); + for (auto& segment : file_blocks) { + if (segment->state() == FileBlock::State::DOWNLOADED) { + _num_hit_segments++; + } + } return FileBlocksHolder(std::move(file_blocks)); } @@ -393,6 +437,25 @@ LRUFileCache::FileBlockCell* LRUFileCache::add_cell(const Key& key, const CacheC return &(it->second); } +size_t LRUFileCache::try_release() { + std::lock_guard l(_mutex); + std::vector trash; + for (auto& [key, segments] : _files) { + for (auto& [offset, cell] : segments) { + if (cell.releasable()) { + trash.emplace_back(&cell); + } + } + } + for (auto& cell : trash) { + FileBlockSPtr file_block = cell->file_block; + std::lock_guard lc(cell->file_block->_mutex); + remove(file_block, l, lc); + } + LOG(INFO) << "Released " << trash.size() << " segments in file cache " << _cache_base_path; + return trash.size(); +} + LRUFileCache::LRUQueue& LRUFileCache::get_queue(CacheType type) { switch (type) { case CacheType::INDEX: @@ -707,6 +770,7 @@ void LRUFileCache::remove(FileBlockSPtr file_block, std::lock_guard& LOG(ERROR) << ec.message(); } } + _num_removed_segments++; if (offsets.empty()) { auto key_path = get_path_in_local_cache(key); _files.erase(key); @@ -1028,5 +1092,31 @@ void LRUFileCache::run_background_operation() { } } +void LRUFileCache::update_cache_metrics() const { + std::lock_guard l(_mutex); + double hit_ratio = 0; + if (_num_read_segments > 0) { + hit_ratio = (double)_num_hit_segments / (double)_num_read_segments; + } + + file_cache_hits_ratio->set_value(hit_ratio); + file_cache_removed_elements->set_value(_num_removed_segments); + + file_cache_index_queue_max_size->set_value(_index_queue.get_max_size()); + file_cache_index_queue_curr_size->set_value(_index_queue.get_total_cache_size(l)); + file_cache_index_queue_max_elements->set_value(_index_queue.get_max_element_size()); + file_cache_index_queue_curr_elements->set_value(_index_queue.get_elements_num(l)); + + file_cache_normal_queue_max_size->set_value(_normal_queue.get_max_size()); + file_cache_normal_queue_curr_size->set_value(_normal_queue.get_total_cache_size(l)); + file_cache_normal_queue_max_elements->set_value(_normal_queue.get_max_element_size()); + file_cache_normal_queue_curr_elements->set_value(_normal_queue.get_elements_num(l)); + + file_cache_disposable_queue_max_size->set_value(_disposable_queue.get_max_size()); + file_cache_disposable_queue_curr_size->set_value(_disposable_queue.get_total_cache_size(l)); + file_cache_disposable_queue_max_elements->set_value(_disposable_queue.get_max_element_size()); + file_cache_disposable_queue_curr_elements->set_value(_disposable_queue.get_elements_num(l)); +} + } // namespace io } // namespace doris diff --git a/be/src/io/cache/block/block_lru_file_cache.h b/be/src/io/cache/block/block_lru_file_cache.h index 0ce4bdf073..ef9546bb5e 100644 --- a/be/src/io/cache/block/block_lru_file_cache.h +++ b/be/src/io/cache/block/block_lru_file_cache.h @@ -34,6 +34,7 @@ #include "common/status.h" #include "io/cache/block/block_file_cache.h" #include "io/cache/block/block_file_segment.h" +#include "util/metrics.h" namespace doris { class TUniqueId; @@ -126,6 +127,8 @@ private: LRUQueue _normal_queue; LRUQueue _disposable_queue; + size_t try_release() override; + LRUFileCache::LRUQueue& get_queue(CacheType type); const LRUFileCache::LRUQueue& get_queue(CacheType type) const; @@ -187,12 +190,37 @@ private: void run_background_operation(); + void update_cache_metrics() const; + public: std::string dump_structure(const Key& key) override; private: std::atomic_bool _close {false}; std::thread _cache_background_thread; + size_t _num_read_segments = 0; + size_t _num_hit_segments = 0; + size_t _num_removed_segments = 0; + + std::shared_ptr _entity = nullptr; + + DoubleGauge* file_cache_hits_ratio = nullptr; + UIntGauge* file_cache_removed_elements = nullptr; + + UIntGauge* file_cache_index_queue_max_size = nullptr; + UIntGauge* file_cache_index_queue_curr_size = nullptr; + UIntGauge* file_cache_index_queue_max_elements = nullptr; + UIntGauge* file_cache_index_queue_curr_elements = nullptr; + + UIntGauge* file_cache_normal_queue_max_size = nullptr; + UIntGauge* file_cache_normal_queue_curr_size = nullptr; + UIntGauge* file_cache_normal_queue_max_elements = nullptr; + UIntGauge* file_cache_normal_queue_curr_elements = nullptr; + + UIntGauge* file_cache_disposable_queue_max_size = nullptr; + UIntGauge* file_cache_disposable_queue_curr_size = nullptr; + UIntGauge* file_cache_disposable_queue_max_elements = nullptr; + UIntGauge* file_cache_disposable_queue_curr_elements = nullptr; }; } // namespace io diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp b/be/src/io/cache/block/cached_remote_file_reader.cpp index 7d9c0ed027..f2c4c80173 100644 --- a/be/src/io/cache/block/cached_remote_file_reader.cpp +++ b/be/src/io/cache/block/cached_remote_file_reader.cpp @@ -48,6 +48,19 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader _cache = FileCacheFactory::instance().get_by_path(_cache_key); } +CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, + const std::string& cache_base_path, + const std::string& cache_path) + : _remote_file_reader(std::move(remote_file_reader)) { + _cache_key = IFileCache::hash(cache_path); + _cache = FileCacheFactory::instance().get_by_path(cache_base_path); + if (_cache == nullptr) { + LOG(WARNING) << "Can't get cache from base path: " << cache_base_path + << ", using random instead."; + _cache = FileCacheFactory::instance().get_by_path(_cache_key); + } +} + CachedRemoteFileReader::~CachedRemoteFileReader() { close(); } diff --git a/be/src/io/cache/block/cached_remote_file_reader.h b/be/src/io/cache/block/cached_remote_file_reader.h index 4e5ed7db0c..68b80e0ce4 100644 --- a/be/src/io/cache/block/cached_remote_file_reader.h +++ b/be/src/io/cache/block/cached_remote_file_reader.h @@ -41,6 +41,9 @@ class CachedRemoteFileReader final : public FileReader { public: CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_path); + CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_base_path, + const std::string& cache_path); + ~CachedRemoteFileReader() override; Status close() override; diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index e8fe4b2cec..65bc429d12 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -45,15 +45,27 @@ namespace io { class FileWriter; } // namespace io -io::FileCachePolicy FileFactory::get_cache_policy(RuntimeState* state) { - if (state != nullptr) { - if (config::enable_file_cache && state->query_options().enable_file_cache) { - return io::FileCachePolicy::FILE_BLOCK_CACHE; - } +static io::FileBlockCachePathPolicy BLOCK_CACHE_POLICY; +static std::string RANDOM_CACHE_BASE_PATH = "random"; + +io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state) { + io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE; + if (config::enable_file_cache && state != nullptr && + state->query_options().__isset.enable_file_cache && + state->query_options().enable_file_cache) { + cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE; } - return io::FileCachePolicy::NO_CACHE; + io::FileReaderOptions reader_options(cache_policy, BLOCK_CACHE_POLICY); + if (state != nullptr && state->query_options().__isset.file_cache_base_path && + state->query_options().file_cache_base_path != RANDOM_CACHE_BASE_PATH) { + reader_options.specify_cache_path(state->query_options().file_cache_base_path); + } + return reader_options; } +io::FileReaderOptions FileFactory::NO_CACHE_READER_OPTIONS = + FileFactory::get_reader_options(nullptr); + Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env, const std::vector& broker_addresses, const std::map& properties, @@ -100,10 +112,8 @@ Status FileFactory::create_file_reader(RuntimeProfile* profile, const FileDescription& file_description, std::shared_ptr* file_system, io::FileReaderSPtr* file_reader, - io::FileCachePolicy cache_policy) { + io::FileReaderOptions reader_options) { TFileType::type type = system_properties.system_type; - io::FileBlockCachePathPolicy file_block_cache; - io::FileReaderOptions reader_options(cache_policy, file_block_cache); reader_options.file_size = file_description.file_size; switch (type) { case TFileType::FILE_LOCAL: { diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 56d4aa98e7..2e034b1ab8 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -58,7 +58,8 @@ class FileFactory { ENABLE_FACTORY_CREATOR(FileFactory); public: - static io::FileCachePolicy get_cache_policy(RuntimeState* state); + static io::FileReaderOptions get_reader_options(RuntimeState* state); + static io::FileReaderOptions NO_CACHE_READER_OPTIONS; /// Create FileWriter static Status create_file_writer(TFileType::type type, ExecEnv* env, @@ -72,7 +73,7 @@ public: RuntimeProfile* profile, const FileSystemProperties& system_properties, const FileDescription& file_description, std::shared_ptr* file_system, io::FileReaderSPtr* file_reader, - io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE); + io::FileReaderOptions reader_options = NO_CACHE_READER_OPTIONS); // Create FileReader for stream load pipe static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader); diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index f57aa16f86..5b2c036dca 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -718,11 +718,11 @@ Status DelegateReader::create_file_reader(RuntimeProfile* profile, const FileDescription& file_description, std::shared_ptr* file_system, io::FileReaderSPtr* file_reader, AccessMode access_mode, - io::FileCachePolicy cache_policy, const IOContext* io_ctx, - const PrefetchRange file_range) { + io::FileReaderOptions reader_options, + const IOContext* io_ctx, const PrefetchRange file_range) { io::FileReaderSPtr reader; RETURN_IF_ERROR(FileFactory::create_file_reader(profile, system_properties, file_description, - file_system, &reader, cache_policy)); + file_system, &reader, reader_options)); if (reader->size() < IN_MEMORY_FILE_SIZE) { *file_reader = std::make_shared(reader); } else if (access_mode == AccessMode::SEQUENTIAL) { diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index f22789de8f..990b0cb7a0 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -304,7 +304,7 @@ public: RuntimeProfile* profile, const FileSystemProperties& system_properties, const FileDescription& file_description, std::shared_ptr* file_system, io::FileReaderSPtr* file_reader, AccessMode access_mode = SEQUENTIAL, - io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE, + io::FileReaderOptions reader_options = FileFactory::NO_CACHE_READER_OPTIONS, const IOContext* io_ctx = nullptr, const PrefetchRange file_range = PrefetchRange(0, 0)); }; diff --git a/be/src/io/fs/file_reader_options.h b/be/src/io/fs/file_reader_options.h index ca50bcb1dd..66629f54de 100644 --- a/be/src/io/fs/file_reader_options.h +++ b/be/src/io/fs/file_reader_options.h @@ -78,6 +78,13 @@ public: // -1 means unset. // If the file length is not set, the file length will be fetched from the file system. int64_t file_size = -1; + bool has_cache_base_path = false; + std::string cache_base_path; + + void specify_cache_path(const std::string& base_path) { + has_cache_base_path = true; + cache_base_path = base_path; + } static FileReaderOptions DEFAULT; }; diff --git a/be/src/io/fs/remote_file_system.cpp b/be/src/io/fs/remote_file_system.cpp index 53f7ced5f9..63246a3c87 100644 --- a/be/src/io/fs/remote_file_system.cpp +++ b/be/src/io/fs/remote_file_system.cpp @@ -94,7 +94,13 @@ Status RemoteFileSystem::open_file_impl(const Path& path, const FileReaderOption case io::FileCachePolicy::FILE_BLOCK_CACHE: { StringPiece str(raw_reader->path().native()); std::string cache_path = reader_options.path_policy.get_cache_path(path.native()); - *reader = std::make_shared(std::move(raw_reader), cache_path); + if (reader_options.has_cache_base_path) { + // from query session variable: file_cache_base_path + *reader = std::make_shared( + std::move(raw_reader), reader_options.cache_base_path, cache_path); + } else { + *reader = std::make_shared(std::move(raw_reader), cache_path); + } break; } default: { diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 7e52f5f3cd..3aad79f709 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -27,6 +27,7 @@ #include "http/action/compaction_action.h" #include "http/action/config_action.h" #include "http/action/download_action.h" +#include "http/action/file_cache_action.h" #include "http/action/health_action.h" #include "http/action/jeprofile_actions.h" #include "http/action/meta_action.h" @@ -141,6 +142,9 @@ Status HttpService::start() { _pool.add(new MetaAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::GET, "/api/meta/{op}/{tablet_id}", meta_action); + FileCacheAction* file_cache_action = _pool.add(new FileCacheAction()); + _ev_http_server->register_handler(HttpMethod::GET, "/api/file_cache", file_cache_action); + #ifndef BE_TEST // Register BE checksum action ChecksumAction* checksum_action = diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index a1cf9e2a54..cff15c88bb 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -169,10 +169,10 @@ Status CsvReader::init_reader(bool is_load) { if (_params.file_type == TFileType::FILE_STREAM) { RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); } else { - io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &_file_reader, - io::DelegateReader::AccessMode::SEQUENTIAL, cache_policy, _io_ctx, + io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); } if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && @@ -656,9 +656,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { } _file_description.start_offset = start_offset; - io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description, - &_file_system, &_file_reader, cache_policy)); + &_file_system, &_file_reader, reader_options)); if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && _params.file_type != TFileType::FILE_BROKER) { return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path); diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index a777949ae7..9643e48ab6 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -374,10 +374,10 @@ Status NewJsonReader::_open_file_reader() { if (_params.file_type == TFileType::FILE_STREAM) { RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); } else { - io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &_file_reader, - io::DelegateReader::AccessMode::SEQUENTIAL, cache_policy, _io_ctx, + io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); } return Status::OK(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 8e24d9a037..7d85b12902 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -199,10 +199,10 @@ void OrcReader::_init_profile() { Status OrcReader::_create_file_reader() { if (_file_input_stream == nullptr) { io::FileReaderSPtr inner_reader; - io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &inner_reader, - io::DelegateReader::AccessMode::RANDOM, cache_policy, _io_ctx)); + io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx)); _file_input_stream.reset( new ORCFileInputStream(_scan_range.path, inner_reader, &_statistics, _io_ctx)); } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 24539fc0ec..48332dfff0 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -205,10 +205,10 @@ Status ParquetReader::_open_file() { if (_file_reader == nullptr) { SCOPED_RAW_TIMER(&_statistics.open_file_time); ++_statistics.open_file_num; - io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &_file_reader, - io::DelegateReader::AccessMode::RANDOM, cache_policy, _io_ctx)); + io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx)); } if (_file_metadata == nullptr) { SCOPED_RAW_TIMER(&_statistics.parse_footer_time); diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp index c705f1fd69..b8ce90dfef 100644 --- a/be/src/vec/exec/varrow_scanner.cpp +++ b/be/src/vec/exec/varrow_scanner.cpp @@ -113,10 +113,10 @@ Status VArrowScanner::_open_next_reader() { io::FileReaderSPtr file_reader; _init_system_properties(range); _init_file_description(range); - io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description, &_file_system, - &file_reader, cache_policy)); + &file_reader, reader_options)); if (file_reader->size() == 0) { continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 5609ba01c2..298710ee7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -287,6 +287,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_FILE_CACHE = "enable_file_cache"; + public static final String FILE_CACHE_BASE_PATH = "file_cache_base_path"; + public static final String GROUP_BY_AND_HAVING_USE_ALIAS_FIRST = "group_by_and_having_use_alias_first"; public static final String DROP_TABLE_IF_CTAS_FAILED = "drop_table_if_ctas_failed"; @@ -796,6 +798,10 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_FILE_CACHE, needForward = true) public boolean enableFileCache = true; + // Specify base path for file cache, or chose a random path. + @VariableMgr.VarAttr(name = FILE_CACHE_BASE_PATH, needForward = true) + public String fileCacheBasePath = "random"; + // Whether drop table when create table as select insert data appear error. @VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true) public boolean dropTableIfCtasFailed = true; @@ -1654,6 +1660,14 @@ public class SessionVariable implements Serializable, Writable { this.enableFileCache = enableFileCache; } + public String getFileCacheBasePath() { + return fileCacheBasePath; + } + + public void setFileCacheBasePath(String basePath) { + this.fileCacheBasePath = basePath; + } + public int getMaxTableCountUseCascadesJoinReorder() { return this.maxTableCountUseCascadesJoinReorder; } @@ -1741,6 +1755,8 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableFileCache(enableFileCache); + tResult.setFileCacheBasePath(fileCacheBasePath); + if (dryRunQuery) { tResult.setDryRunQuery(true); } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index aa7ffc24ae..012578f25d 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -212,6 +212,9 @@ struct TQueryOptions { // partition count(1 << external_agg_partition_bits) when spill aggregation data into disk 69: optional i32 external_agg_partition_bits = 4 + + // Specify base path for file cache + 70: optional string file_cache_base_path }