diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt index 93ce59a986..e563f1dadf 100644 --- a/be/src/http/CMakeLists.txt +++ b/be/src/http/CMakeLists.txt @@ -56,4 +56,4 @@ add_library(Webserver STATIC action/check_tablet_segment_action.cpp action/version_action.cpp action/jeprofile_actions.cpp -) + action/file_cache_action.cpp) diff --git a/be/src/http/action/file_cache_action.cpp b/be/src/http/action/file_cache_action.cpp new file mode 100644 index 0000000000..db432d5598 --- /dev/null +++ b/be/src/http/action/file_cache_action.cpp @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "file_cache_action.h" + +#include +#include +#include +#include + +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "io/cache/block/block_file_cache_factory.h" +#include "olap/olap_define.h" +#include "olap/tablet_meta.h" +#include "util/easy_json.h" + +namespace doris { + +const static std::string HEADER_JSON = "application/json"; +const static std::string OP = "op"; + +Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metrics) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str()); + std::string operation = req->param(OP); + if (operation == "release") { + size_t released = 0; + if (req->param("base_path") != "") { + released = io::FileCacheFactory::instance().try_release(req->param("base_path")); + } else { + released = io::FileCacheFactory::instance().try_release(); + } + EasyJson json; + json["released_elements"] = released; + *json_metrics = json.ToString(); + return Status::OK(); + } + return Status::InternalError("invalid operation: {}", operation); +} + +void FileCacheAction::handle(HttpRequest* req) { + std::string json_metrics; + Status status = _handle_header(req, &json_metrics); + std::string status_result = status.to_json(); + if (status.ok()) { + HttpChannel::send_reply(req, HttpStatus::OK, json_metrics); + } else { + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status_result); + } +} + +} // namespace doris diff --git a/be/src/http/action/file_cache_action.h b/be/src/http/action/file_cache_action.h new file mode 100644 index 0000000000..0460273dbc --- /dev/null +++ b/be/src/http/action/file_cache_action.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "http/http_handler.h" + +namespace doris { + +class HttpRequest; + +class FileCacheAction : public HttpHandler { +public: + FileCacheAction() = default; + + ~FileCacheAction() override = default; + + void handle(HttpRequest* req) override; + +private: + Status _handle_header(HttpRequest* req, std::string* json_metrics); +}; +} // namespace doris diff --git a/be/src/io/cache/block/block_file_cache.h b/be/src/io/cache/block/block_file_cache.h index fc7279b8db..0f1cc347d5 100644 --- a/be/src/io/cache/block/block_file_cache.h +++ b/be/src/io/cache/block/block_file_cache.h @@ -106,6 +106,8 @@ public: static Key hash(const std::string& path); + virtual size_t try_release() = 0; + std::string get_path_in_local_cache(const Key& key, size_t offset, CacheType type) const; std::string get_path_in_local_cache(const Key& key) const; diff --git a/be/src/io/cache/block/block_file_cache_factory.cpp b/be/src/io/cache/block/block_file_cache_factory.cpp index bd933612d1..fc60469ada 100644 --- a/be/src/io/cache/block/block_file_cache_factory.cpp +++ b/be/src/io/cache/block/block_file_cache_factory.cpp @@ -42,6 +42,22 @@ FileCacheFactory& FileCacheFactory::instance() { return ret; } +size_t FileCacheFactory::try_release() { + int elements = 0; + for (auto& cache : _caches) { + elements += cache->try_release(); + } + return elements; +} + +size_t FileCacheFactory::try_release(const std::string& base_path) { + auto iter = _path_to_cache.find(base_path); + if (iter != _path_to_cache.end()) { + return iter->second->try_release(); + } + return 0; +} + Status FileCacheFactory::create_file_cache(const std::string& cache_base_path, const FileCacheSettings& file_cache_settings) { if (config::clear_file_cache) { @@ -56,6 +72,7 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path, std::unique_ptr cache = std::make_unique(cache_base_path, file_cache_settings); RETURN_IF_ERROR(cache->initialize()); + _path_to_cache[cache_base_path] = cache.get(); _caches.push_back(std::move(cache)); LOG(INFO) << "[FileCache] path: " << cache_base_path << " total_size: " << file_cache_settings.total_size; @@ -66,6 +83,15 @@ CloudFileCachePtr FileCacheFactory::get_by_path(const IFileCache::Key& key) { return _caches[KeyHash()(key) % _caches.size()].get(); } +CloudFileCachePtr FileCacheFactory::get_by_path(const std::string& cache_base_path) { + auto iter = _path_to_cache.find(cache_base_path); + if (iter == _path_to_cache.end()) { + return nullptr; + } else { + return iter->second; + } +} + std::vector FileCacheFactory::get_query_context_holders( const TUniqueId& query_id) { std::vector holders; diff --git a/be/src/io/cache/block/block_file_cache_factory.h b/be/src/io/cache/block/block_file_cache_factory.h index aba0402c4e..97bffb7193 100644 --- a/be/src/io/cache/block/block_file_cache_factory.h +++ b/be/src/io/cache/block/block_file_cache_factory.h @@ -42,7 +42,12 @@ public: Status create_file_cache(const std::string& cache_base_path, const FileCacheSettings& file_cache_settings); + size_t try_release(); + + size_t try_release(const std::string& base_path); + CloudFileCachePtr get_by_path(const IFileCache::Key& key); + CloudFileCachePtr get_by_path(const std::string& cache_base_path); std::vector get_query_context_holders( const TUniqueId& query_id); FileCacheFactory() = default; @@ -51,6 +56,7 @@ public: private: std::vector> _caches; + std::unordered_map _path_to_cache; }; } // namespace io diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index c7ffd00319..af614560a9 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -49,6 +49,7 @@ #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "io/fs/path.h" +#include "util/doris_metrics.h" #include "util/slice.h" #include "vec/common/hex.h" @@ -57,6 +58,21 @@ namespace fs = std::filesystem; namespace doris { namespace io { +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_hits_ratio, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_removed_elements, MetricUnit::OPERATIONS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_max_size, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_curr_size, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_max_elements, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_curr_elements, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_normal_queue_max_size, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_normal_queue_curr_size, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_normal_queue_max_elements, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_normal_queue_curr_elements, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_size, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_size, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_elements, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_elements, MetricUnit::NOUNIT); + LRUFileCache::LRUFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings) : IFileCache(cache_base_path, cache_settings) { @@ -67,6 +83,28 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path, _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements, 24 * 60 * 60); + _entity = DorisMetrics::instance()->metric_registry()->register_entity( + "lru_file_cache", {{"path", _cache_base_path}}); + _entity->register_hook(_cache_base_path, std::bind(&LRUFileCache::update_cache_metrics, this)); + + INT_DOUBLE_METRIC_REGISTER(_entity, file_cache_hits_ratio); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_removed_elements); + + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_max_size); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_curr_size); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_max_elements); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_curr_elements); + + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_normal_queue_max_size); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_normal_queue_curr_size); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_normal_queue_max_elements); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_normal_queue_curr_elements); + + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_max_size); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_curr_size); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_max_elements); + INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_curr_elements); + LOG(INFO) << fmt::format( "file cache path={}, disposable queue size={} elements={}, index queue size={} " "elements={}, query queue " @@ -349,6 +387,12 @@ FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t } DCHECK(!file_blocks.empty()); + _num_read_segments += file_blocks.size(); + for (auto& segment : file_blocks) { + if (segment->state() == FileBlock::State::DOWNLOADED) { + _num_hit_segments++; + } + } return FileBlocksHolder(std::move(file_blocks)); } @@ -393,6 +437,25 @@ LRUFileCache::FileBlockCell* LRUFileCache::add_cell(const Key& key, const CacheC return &(it->second); } +size_t LRUFileCache::try_release() { + std::lock_guard l(_mutex); + std::vector trash; + for (auto& [key, segments] : _files) { + for (auto& [offset, cell] : segments) { + if (cell.releasable()) { + trash.emplace_back(&cell); + } + } + } + for (auto& cell : trash) { + FileBlockSPtr file_block = cell->file_block; + std::lock_guard lc(cell->file_block->_mutex); + remove(file_block, l, lc); + } + LOG(INFO) << "Released " << trash.size() << " segments in file cache " << _cache_base_path; + return trash.size(); +} + LRUFileCache::LRUQueue& LRUFileCache::get_queue(CacheType type) { switch (type) { case CacheType::INDEX: @@ -707,6 +770,7 @@ void LRUFileCache::remove(FileBlockSPtr file_block, std::lock_guard& LOG(ERROR) << ec.message(); } } + _num_removed_segments++; if (offsets.empty()) { auto key_path = get_path_in_local_cache(key); _files.erase(key); @@ -1028,5 +1092,31 @@ void LRUFileCache::run_background_operation() { } } +void LRUFileCache::update_cache_metrics() const { + std::lock_guard l(_mutex); + double hit_ratio = 0; + if (_num_read_segments > 0) { + hit_ratio = (double)_num_hit_segments / (double)_num_read_segments; + } + + file_cache_hits_ratio->set_value(hit_ratio); + file_cache_removed_elements->set_value(_num_removed_segments); + + file_cache_index_queue_max_size->set_value(_index_queue.get_max_size()); + file_cache_index_queue_curr_size->set_value(_index_queue.get_total_cache_size(l)); + file_cache_index_queue_max_elements->set_value(_index_queue.get_max_element_size()); + file_cache_index_queue_curr_elements->set_value(_index_queue.get_elements_num(l)); + + file_cache_normal_queue_max_size->set_value(_normal_queue.get_max_size()); + file_cache_normal_queue_curr_size->set_value(_normal_queue.get_total_cache_size(l)); + file_cache_normal_queue_max_elements->set_value(_normal_queue.get_max_element_size()); + file_cache_normal_queue_curr_elements->set_value(_normal_queue.get_elements_num(l)); + + file_cache_disposable_queue_max_size->set_value(_disposable_queue.get_max_size()); + file_cache_disposable_queue_curr_size->set_value(_disposable_queue.get_total_cache_size(l)); + file_cache_disposable_queue_max_elements->set_value(_disposable_queue.get_max_element_size()); + file_cache_disposable_queue_curr_elements->set_value(_disposable_queue.get_elements_num(l)); +} + } // namespace io } // namespace doris diff --git a/be/src/io/cache/block/block_lru_file_cache.h b/be/src/io/cache/block/block_lru_file_cache.h index 0ce4bdf073..ef9546bb5e 100644 --- a/be/src/io/cache/block/block_lru_file_cache.h +++ b/be/src/io/cache/block/block_lru_file_cache.h @@ -34,6 +34,7 @@ #include "common/status.h" #include "io/cache/block/block_file_cache.h" #include "io/cache/block/block_file_segment.h" +#include "util/metrics.h" namespace doris { class TUniqueId; @@ -126,6 +127,8 @@ private: LRUQueue _normal_queue; LRUQueue _disposable_queue; + size_t try_release() override; + LRUFileCache::LRUQueue& get_queue(CacheType type); const LRUFileCache::LRUQueue& get_queue(CacheType type) const; @@ -187,12 +190,37 @@ private: void run_background_operation(); + void update_cache_metrics() const; + public: std::string dump_structure(const Key& key) override; private: std::atomic_bool _close {false}; std::thread _cache_background_thread; + size_t _num_read_segments = 0; + size_t _num_hit_segments = 0; + size_t _num_removed_segments = 0; + + std::shared_ptr _entity = nullptr; + + DoubleGauge* file_cache_hits_ratio = nullptr; + UIntGauge* file_cache_removed_elements = nullptr; + + UIntGauge* file_cache_index_queue_max_size = nullptr; + UIntGauge* file_cache_index_queue_curr_size = nullptr; + UIntGauge* file_cache_index_queue_max_elements = nullptr; + UIntGauge* file_cache_index_queue_curr_elements = nullptr; + + UIntGauge* file_cache_normal_queue_max_size = nullptr; + UIntGauge* file_cache_normal_queue_curr_size = nullptr; + UIntGauge* file_cache_normal_queue_max_elements = nullptr; + UIntGauge* file_cache_normal_queue_curr_elements = nullptr; + + UIntGauge* file_cache_disposable_queue_max_size = nullptr; + UIntGauge* file_cache_disposable_queue_curr_size = nullptr; + UIntGauge* file_cache_disposable_queue_max_elements = nullptr; + UIntGauge* file_cache_disposable_queue_curr_elements = nullptr; }; } // namespace io diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp b/be/src/io/cache/block/cached_remote_file_reader.cpp index 7d9c0ed027..f2c4c80173 100644 --- a/be/src/io/cache/block/cached_remote_file_reader.cpp +++ b/be/src/io/cache/block/cached_remote_file_reader.cpp @@ -48,6 +48,19 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader _cache = FileCacheFactory::instance().get_by_path(_cache_key); } +CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, + const std::string& cache_base_path, + const std::string& cache_path) + : _remote_file_reader(std::move(remote_file_reader)) { + _cache_key = IFileCache::hash(cache_path); + _cache = FileCacheFactory::instance().get_by_path(cache_base_path); + if (_cache == nullptr) { + LOG(WARNING) << "Can't get cache from base path: " << cache_base_path + << ", using random instead."; + _cache = FileCacheFactory::instance().get_by_path(_cache_key); + } +} + CachedRemoteFileReader::~CachedRemoteFileReader() { close(); } diff --git a/be/src/io/cache/block/cached_remote_file_reader.h b/be/src/io/cache/block/cached_remote_file_reader.h index 4e5ed7db0c..68b80e0ce4 100644 --- a/be/src/io/cache/block/cached_remote_file_reader.h +++ b/be/src/io/cache/block/cached_remote_file_reader.h @@ -41,6 +41,9 @@ class CachedRemoteFileReader final : public FileReader { public: CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_path); + CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_base_path, + const std::string& cache_path); + ~CachedRemoteFileReader() override; Status close() override; diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index e8fe4b2cec..65bc429d12 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -45,15 +45,27 @@ namespace io { class FileWriter; } // namespace io -io::FileCachePolicy FileFactory::get_cache_policy(RuntimeState* state) { - if (state != nullptr) { - if (config::enable_file_cache && state->query_options().enable_file_cache) { - return io::FileCachePolicy::FILE_BLOCK_CACHE; - } +static io::FileBlockCachePathPolicy BLOCK_CACHE_POLICY; +static std::string RANDOM_CACHE_BASE_PATH = "random"; + +io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state) { + io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE; + if (config::enable_file_cache && state != nullptr && + state->query_options().__isset.enable_file_cache && + state->query_options().enable_file_cache) { + cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE; } - return io::FileCachePolicy::NO_CACHE; + io::FileReaderOptions reader_options(cache_policy, BLOCK_CACHE_POLICY); + if (state != nullptr && state->query_options().__isset.file_cache_base_path && + state->query_options().file_cache_base_path != RANDOM_CACHE_BASE_PATH) { + reader_options.specify_cache_path(state->query_options().file_cache_base_path); + } + return reader_options; } +io::FileReaderOptions FileFactory::NO_CACHE_READER_OPTIONS = + FileFactory::get_reader_options(nullptr); + Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env, const std::vector& broker_addresses, const std::map& properties, @@ -100,10 +112,8 @@ Status FileFactory::create_file_reader(RuntimeProfile* profile, const FileDescription& file_description, std::shared_ptr* file_system, io::FileReaderSPtr* file_reader, - io::FileCachePolicy cache_policy) { + io::FileReaderOptions reader_options) { TFileType::type type = system_properties.system_type; - io::FileBlockCachePathPolicy file_block_cache; - io::FileReaderOptions reader_options(cache_policy, file_block_cache); reader_options.file_size = file_description.file_size; switch (type) { case TFileType::FILE_LOCAL: { diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 56d4aa98e7..2e034b1ab8 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -58,7 +58,8 @@ class FileFactory { ENABLE_FACTORY_CREATOR(FileFactory); public: - static io::FileCachePolicy get_cache_policy(RuntimeState* state); + static io::FileReaderOptions get_reader_options(RuntimeState* state); + static io::FileReaderOptions NO_CACHE_READER_OPTIONS; /// Create FileWriter static Status create_file_writer(TFileType::type type, ExecEnv* env, @@ -72,7 +73,7 @@ public: RuntimeProfile* profile, const FileSystemProperties& system_properties, const FileDescription& file_description, std::shared_ptr* file_system, io::FileReaderSPtr* file_reader, - io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE); + io::FileReaderOptions reader_options = NO_CACHE_READER_OPTIONS); // Create FileReader for stream load pipe static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader); diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index f57aa16f86..5b2c036dca 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -718,11 +718,11 @@ Status DelegateReader::create_file_reader(RuntimeProfile* profile, const FileDescription& file_description, std::shared_ptr* file_system, io::FileReaderSPtr* file_reader, AccessMode access_mode, - io::FileCachePolicy cache_policy, const IOContext* io_ctx, - const PrefetchRange file_range) { + io::FileReaderOptions reader_options, + const IOContext* io_ctx, const PrefetchRange file_range) { io::FileReaderSPtr reader; RETURN_IF_ERROR(FileFactory::create_file_reader(profile, system_properties, file_description, - file_system, &reader, cache_policy)); + file_system, &reader, reader_options)); if (reader->size() < IN_MEMORY_FILE_SIZE) { *file_reader = std::make_shared(reader); } else if (access_mode == AccessMode::SEQUENTIAL) { diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index f22789de8f..990b0cb7a0 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -304,7 +304,7 @@ public: RuntimeProfile* profile, const FileSystemProperties& system_properties, const FileDescription& file_description, std::shared_ptr* file_system, io::FileReaderSPtr* file_reader, AccessMode access_mode = SEQUENTIAL, - io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE, + io::FileReaderOptions reader_options = FileFactory::NO_CACHE_READER_OPTIONS, const IOContext* io_ctx = nullptr, const PrefetchRange file_range = PrefetchRange(0, 0)); }; diff --git a/be/src/io/fs/file_reader_options.h b/be/src/io/fs/file_reader_options.h index ca50bcb1dd..66629f54de 100644 --- a/be/src/io/fs/file_reader_options.h +++ b/be/src/io/fs/file_reader_options.h @@ -78,6 +78,13 @@ public: // -1 means unset. // If the file length is not set, the file length will be fetched from the file system. int64_t file_size = -1; + bool has_cache_base_path = false; + std::string cache_base_path; + + void specify_cache_path(const std::string& base_path) { + has_cache_base_path = true; + cache_base_path = base_path; + } static FileReaderOptions DEFAULT; }; diff --git a/be/src/io/fs/remote_file_system.cpp b/be/src/io/fs/remote_file_system.cpp index 53f7ced5f9..63246a3c87 100644 --- a/be/src/io/fs/remote_file_system.cpp +++ b/be/src/io/fs/remote_file_system.cpp @@ -94,7 +94,13 @@ Status RemoteFileSystem::open_file_impl(const Path& path, const FileReaderOption case io::FileCachePolicy::FILE_BLOCK_CACHE: { StringPiece str(raw_reader->path().native()); std::string cache_path = reader_options.path_policy.get_cache_path(path.native()); - *reader = std::make_shared(std::move(raw_reader), cache_path); + if (reader_options.has_cache_base_path) { + // from query session variable: file_cache_base_path + *reader = std::make_shared( + std::move(raw_reader), reader_options.cache_base_path, cache_path); + } else { + *reader = std::make_shared(std::move(raw_reader), cache_path); + } break; } default: { diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 7e52f5f3cd..3aad79f709 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -27,6 +27,7 @@ #include "http/action/compaction_action.h" #include "http/action/config_action.h" #include "http/action/download_action.h" +#include "http/action/file_cache_action.h" #include "http/action/health_action.h" #include "http/action/jeprofile_actions.h" #include "http/action/meta_action.h" @@ -141,6 +142,9 @@ Status HttpService::start() { _pool.add(new MetaAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::GET, "/api/meta/{op}/{tablet_id}", meta_action); + FileCacheAction* file_cache_action = _pool.add(new FileCacheAction()); + _ev_http_server->register_handler(HttpMethod::GET, "/api/file_cache", file_cache_action); + #ifndef BE_TEST // Register BE checksum action ChecksumAction* checksum_action = diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index a1cf9e2a54..cff15c88bb 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -169,10 +169,10 @@ Status CsvReader::init_reader(bool is_load) { if (_params.file_type == TFileType::FILE_STREAM) { RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); } else { - io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &_file_reader, - io::DelegateReader::AccessMode::SEQUENTIAL, cache_policy, _io_ctx, + io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); } if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && @@ -656,9 +656,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { } _file_description.start_offset = start_offset; - io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description, - &_file_system, &_file_reader, cache_policy)); + &_file_system, &_file_reader, reader_options)); if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && _params.file_type != TFileType::FILE_BROKER) { return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path); diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index a777949ae7..9643e48ab6 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -374,10 +374,10 @@ Status NewJsonReader::_open_file_reader() { if (_params.file_type == TFileType::FILE_STREAM) { RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); } else { - io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &_file_reader, - io::DelegateReader::AccessMode::SEQUENTIAL, cache_policy, _io_ctx, + io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); } return Status::OK(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 8e24d9a037..7d85b12902 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -199,10 +199,10 @@ void OrcReader::_init_profile() { Status OrcReader::_create_file_reader() { if (_file_input_stream == nullptr) { io::FileReaderSPtr inner_reader; - io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &inner_reader, - io::DelegateReader::AccessMode::RANDOM, cache_policy, _io_ctx)); + io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx)); _file_input_stream.reset( new ORCFileInputStream(_scan_range.path, inner_reader, &_statistics, _io_ctx)); } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 24539fc0ec..48332dfff0 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -205,10 +205,10 @@ Status ParquetReader::_open_file() { if (_file_reader == nullptr) { SCOPED_RAW_TIMER(&_statistics.open_file_time); ++_statistics.open_file_num; - io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &_file_reader, - io::DelegateReader::AccessMode::RANDOM, cache_policy, _io_ctx)); + io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx)); } if (_file_metadata == nullptr) { SCOPED_RAW_TIMER(&_statistics.parse_footer_time); diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp index c705f1fd69..b8ce90dfef 100644 --- a/be/src/vec/exec/varrow_scanner.cpp +++ b/be/src/vec/exec/varrow_scanner.cpp @@ -113,10 +113,10 @@ Status VArrowScanner::_open_next_reader() { io::FileReaderSPtr file_reader; _init_system_properties(range); _init_file_description(range); - io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description, &_file_system, - &file_reader, cache_policy)); + &file_reader, reader_options)); if (file_reader->size() == 0) { continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 5609ba01c2..298710ee7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -287,6 +287,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_FILE_CACHE = "enable_file_cache"; + public static final String FILE_CACHE_BASE_PATH = "file_cache_base_path"; + public static final String GROUP_BY_AND_HAVING_USE_ALIAS_FIRST = "group_by_and_having_use_alias_first"; public static final String DROP_TABLE_IF_CTAS_FAILED = "drop_table_if_ctas_failed"; @@ -796,6 +798,10 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_FILE_CACHE, needForward = true) public boolean enableFileCache = true; + // Specify base path for file cache, or chose a random path. + @VariableMgr.VarAttr(name = FILE_CACHE_BASE_PATH, needForward = true) + public String fileCacheBasePath = "random"; + // Whether drop table when create table as select insert data appear error. @VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true) public boolean dropTableIfCtasFailed = true; @@ -1654,6 +1660,14 @@ public class SessionVariable implements Serializable, Writable { this.enableFileCache = enableFileCache; } + public String getFileCacheBasePath() { + return fileCacheBasePath; + } + + public void setFileCacheBasePath(String basePath) { + this.fileCacheBasePath = basePath; + } + public int getMaxTableCountUseCascadesJoinReorder() { return this.maxTableCountUseCascadesJoinReorder; } @@ -1741,6 +1755,8 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableFileCache(enableFileCache); + tResult.setFileCacheBasePath(fileCacheBasePath); + if (dryRunQuery) { tResult.setDryRunQuery(true); } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index aa7ffc24ae..012578f25d 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -212,6 +212,9 @@ struct TQueryOptions { // partition count(1 << external_agg_partition_bits) when spill aggregation data into disk 69: optional i32 external_agg_partition_bits = 4 + + // Specify base path for file cache + 70: optional string file_cache_base_path }