[opt](FileCache) Add file cache metrics and management (#19177)
Add file cache metrics and management. 1. Get file cache metrics > If the performance of file cache is not efficient, there are currently no metrics to investigate the cause. In practice, hit ratio, disk usage, and segments removed status are very important information. API: `http://be_host:be_webserver_port/metrics` File cache metrics for each base path start with `doris_be_file_cache_` prefix. `hits_ratio` is the hit ratio of the cache since BE startup; `removed_elements` is the num of removed segment files since BE startup; Every cache path has three queues: index, normal and disposable. The capacity ratio of the three queues is 1:17:2. ``` doris_be_file_cache_hits_ratio{path="/mnt/datadisk1/gaoxin/file_cache"} 0.500000 doris_be_file_cache_hits_ratio{path="/mnt/datadisk1/gaoxin/small_file_cache"} 0.500000 doris_be_file_cache_removed_elements{path="/mnt/datadisk1/gaoxin/file_cache"} 0 doris_be_file_cache_removed_elements{path="/mnt/datadisk1/gaoxin/small_file_cache"} 0 doris_be_file_cache_normal_queue_max_size{path="/mnt/datadisk1/gaoxin/file_cache"} 912680550400 doris_be_file_cache_normal_queue_max_size{path="/mnt/datadisk1/gaoxin/small_file_cache"} 8500000000 doris_be_file_cache_normal_queue_max_elements{path="/mnt/datadisk1/gaoxin/file_cache"} 217600 doris_be_file_cache_normal_queue_max_elements{path="/mnt/datadisk1/gaoxin/small_file_cache"} 102400 doris_be_file_cache_normal_queue_curr_size{path="/mnt/datadisk1/gaoxin/file_cache"} 14129846 doris_be_file_cache_normal_queue_curr_size{path="/mnt/datadisk1/gaoxin/small_file_cache"} 14874904 doris_be_file_cache_normal_queue_curr_elements{path="/mnt/datadisk1/gaoxin/file_cache"} 18 doris_be_file_cache_normal_queue_curr_elements{path="/mnt/datadisk1/gaoxin/small_file_cache"} 22 ... ``` 2. Release file cache > Frequent segment files swapping can seriously affect the performance of file cache. Adding a deletion interface helps users clean up the file cache. API: `http://be_host:be_webserver_port/api/file_cache?op=release&base_path=${file_cache_base_path}` Return the number of released segment files. If `base_path` is not provide in url, all cache paths will be released. It's thread-safe to call this api, so only the segment files not been read currently can be released. ``` {"released_elements":22} ``` 3. Specify the base path to store cache data > Currently, regression testing lacks test cases of file cache, which cannot guarantee the stability of file cache. This interface is generally used in regression testing scenarios. Different queries use different paths to verify different usage cases and performance. User can set session variable `file_cache_base_path` to specify the base path to store cache data. `file_cache_base_path="random"` as default, means chosing a random path from cached paths to store cache data. If `file_cache_base_path` is not one of the base paths in BE configuration, a random path is used.
This commit is contained in:
@ -56,4 +56,4 @@ add_library(Webserver STATIC
|
||||
action/check_tablet_segment_action.cpp
|
||||
action/version_action.cpp
|
||||
action/jeprofile_actions.cpp
|
||||
)
|
||||
action/file_cache_action.cpp)
|
||||
|
||||
68
be/src/http/action/file_cache_action.cpp
Normal file
68
be/src/http/action/file_cache_action.cpp
Normal file
@ -0,0 +1,68 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "file_cache_action.h"
|
||||
|
||||
#include <memory>
|
||||
#include <shared_mutex>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
#include "http/http_channel.h"
|
||||
#include "http/http_headers.h"
|
||||
#include "http/http_request.h"
|
||||
#include "http/http_status.h"
|
||||
#include "io/cache/block/block_file_cache_factory.h"
|
||||
#include "olap/olap_define.h"
|
||||
#include "olap/tablet_meta.h"
|
||||
#include "util/easy_json.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
const static std::string HEADER_JSON = "application/json";
|
||||
const static std::string OP = "op";
|
||||
|
||||
Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metrics) {
|
||||
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
|
||||
std::string operation = req->param(OP);
|
||||
if (operation == "release") {
|
||||
size_t released = 0;
|
||||
if (req->param("base_path") != "") {
|
||||
released = io::FileCacheFactory::instance().try_release(req->param("base_path"));
|
||||
} else {
|
||||
released = io::FileCacheFactory::instance().try_release();
|
||||
}
|
||||
EasyJson json;
|
||||
json["released_elements"] = released;
|
||||
*json_metrics = json.ToString();
|
||||
return Status::OK();
|
||||
}
|
||||
return Status::InternalError("invalid operation: {}", operation);
|
||||
}
|
||||
|
||||
void FileCacheAction::handle(HttpRequest* req) {
|
||||
std::string json_metrics;
|
||||
Status status = _handle_header(req, &json_metrics);
|
||||
std::string status_result = status.to_json();
|
||||
if (status.ok()) {
|
||||
HttpChannel::send_reply(req, HttpStatus::OK, json_metrics);
|
||||
} else {
|
||||
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status_result);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
40
be/src/http/action/file_cache_action.h
Normal file
40
be/src/http/action/file_cache_action.h
Normal file
@ -0,0 +1,40 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "http/http_handler.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class HttpRequest;
|
||||
|
||||
class FileCacheAction : public HttpHandler {
|
||||
public:
|
||||
FileCacheAction() = default;
|
||||
|
||||
~FileCacheAction() override = default;
|
||||
|
||||
void handle(HttpRequest* req) override;
|
||||
|
||||
private:
|
||||
Status _handle_header(HttpRequest* req, std::string* json_metrics);
|
||||
};
|
||||
} // namespace doris
|
||||
2
be/src/io/cache/block/block_file_cache.h
vendored
2
be/src/io/cache/block/block_file_cache.h
vendored
@ -106,6 +106,8 @@ public:
|
||||
|
||||
static Key hash(const std::string& path);
|
||||
|
||||
virtual size_t try_release() = 0;
|
||||
|
||||
std::string get_path_in_local_cache(const Key& key, size_t offset, CacheType type) const;
|
||||
|
||||
std::string get_path_in_local_cache(const Key& key) const;
|
||||
|
||||
@ -42,6 +42,22 @@ FileCacheFactory& FileCacheFactory::instance() {
|
||||
return ret;
|
||||
}
|
||||
|
||||
size_t FileCacheFactory::try_release() {
|
||||
int elements = 0;
|
||||
for (auto& cache : _caches) {
|
||||
elements += cache->try_release();
|
||||
}
|
||||
return elements;
|
||||
}
|
||||
|
||||
size_t FileCacheFactory::try_release(const std::string& base_path) {
|
||||
auto iter = _path_to_cache.find(base_path);
|
||||
if (iter != _path_to_cache.end()) {
|
||||
return iter->second->try_release();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,
|
||||
const FileCacheSettings& file_cache_settings) {
|
||||
if (config::clear_file_cache) {
|
||||
@ -56,6 +72,7 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,
|
||||
std::unique_ptr<IFileCache> cache =
|
||||
std::make_unique<LRUFileCache>(cache_base_path, file_cache_settings);
|
||||
RETURN_IF_ERROR(cache->initialize());
|
||||
_path_to_cache[cache_base_path] = cache.get();
|
||||
_caches.push_back(std::move(cache));
|
||||
LOG(INFO) << "[FileCache] path: " << cache_base_path
|
||||
<< " total_size: " << file_cache_settings.total_size;
|
||||
@ -66,6 +83,15 @@ CloudFileCachePtr FileCacheFactory::get_by_path(const IFileCache::Key& key) {
|
||||
return _caches[KeyHash()(key) % _caches.size()].get();
|
||||
}
|
||||
|
||||
CloudFileCachePtr FileCacheFactory::get_by_path(const std::string& cache_base_path) {
|
||||
auto iter = _path_to_cache.find(cache_base_path);
|
||||
if (iter == _path_to_cache.end()) {
|
||||
return nullptr;
|
||||
} else {
|
||||
return iter->second;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<IFileCache::QueryFileCacheContextHolderPtr> FileCacheFactory::get_query_context_holders(
|
||||
const TUniqueId& query_id) {
|
||||
std::vector<IFileCache::QueryFileCacheContextHolderPtr> holders;
|
||||
|
||||
@ -42,7 +42,12 @@ public:
|
||||
Status create_file_cache(const std::string& cache_base_path,
|
||||
const FileCacheSettings& file_cache_settings);
|
||||
|
||||
size_t try_release();
|
||||
|
||||
size_t try_release(const std::string& base_path);
|
||||
|
||||
CloudFileCachePtr get_by_path(const IFileCache::Key& key);
|
||||
CloudFileCachePtr get_by_path(const std::string& cache_base_path);
|
||||
std::vector<IFileCache::QueryFileCacheContextHolderPtr> get_query_context_holders(
|
||||
const TUniqueId& query_id);
|
||||
FileCacheFactory() = default;
|
||||
@ -51,6 +56,7 @@ public:
|
||||
|
||||
private:
|
||||
std::vector<std::unique_ptr<IFileCache>> _caches;
|
||||
std::unordered_map<std::string, CloudFileCachePtr> _path_to_cache;
|
||||
};
|
||||
|
||||
} // namespace io
|
||||
|
||||
90
be/src/io/cache/block/block_lru_file_cache.cpp
vendored
90
be/src/io/cache/block/block_lru_file_cache.cpp
vendored
@ -49,6 +49,7 @@
|
||||
#include "io/fs/file_writer.h"
|
||||
#include "io/fs/local_file_system.h"
|
||||
#include "io/fs/path.h"
|
||||
#include "util/doris_metrics.h"
|
||||
#include "util/slice.h"
|
||||
#include "vec/common/hex.h"
|
||||
|
||||
@ -57,6 +58,21 @@ namespace fs = std::filesystem;
|
||||
namespace doris {
|
||||
namespace io {
|
||||
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_hits_ratio, MetricUnit::NOUNIT);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_removed_elements, MetricUnit::OPERATIONS);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_max_size, MetricUnit::BYTES);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_curr_size, MetricUnit::BYTES);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_max_elements, MetricUnit::NOUNIT);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_curr_elements, MetricUnit::NOUNIT);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_normal_queue_max_size, MetricUnit::BYTES);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_normal_queue_curr_size, MetricUnit::BYTES);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_normal_queue_max_elements, MetricUnit::NOUNIT);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_normal_queue_curr_elements, MetricUnit::NOUNIT);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_size, MetricUnit::BYTES);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_size, MetricUnit::BYTES);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_elements, MetricUnit::NOUNIT);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_elements, MetricUnit::NOUNIT);
|
||||
|
||||
LRUFileCache::LRUFileCache(const std::string& cache_base_path,
|
||||
const FileCacheSettings& cache_settings)
|
||||
: IFileCache(cache_base_path, cache_settings) {
|
||||
@ -67,6 +83,28 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path,
|
||||
_normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
|
||||
24 * 60 * 60);
|
||||
|
||||
_entity = DorisMetrics::instance()->metric_registry()->register_entity(
|
||||
"lru_file_cache", {{"path", _cache_base_path}});
|
||||
_entity->register_hook(_cache_base_path, std::bind(&LRUFileCache::update_cache_metrics, this));
|
||||
|
||||
INT_DOUBLE_METRIC_REGISTER(_entity, file_cache_hits_ratio);
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_removed_elements);
|
||||
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_max_size);
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_curr_size);
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_max_elements);
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_curr_elements);
|
||||
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_normal_queue_max_size);
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_normal_queue_curr_size);
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_normal_queue_max_elements);
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_normal_queue_curr_elements);
|
||||
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_max_size);
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_curr_size);
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_max_elements);
|
||||
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_disposable_queue_curr_elements);
|
||||
|
||||
LOG(INFO) << fmt::format(
|
||||
"file cache path={}, disposable queue size={} elements={}, index queue size={} "
|
||||
"elements={}, query queue "
|
||||
@ -349,6 +387,12 @@ FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t
|
||||
}
|
||||
|
||||
DCHECK(!file_blocks.empty());
|
||||
_num_read_segments += file_blocks.size();
|
||||
for (auto& segment : file_blocks) {
|
||||
if (segment->state() == FileBlock::State::DOWNLOADED) {
|
||||
_num_hit_segments++;
|
||||
}
|
||||
}
|
||||
return FileBlocksHolder(std::move(file_blocks));
|
||||
}
|
||||
|
||||
@ -393,6 +437,25 @@ LRUFileCache::FileBlockCell* LRUFileCache::add_cell(const Key& key, const CacheC
|
||||
return &(it->second);
|
||||
}
|
||||
|
||||
size_t LRUFileCache::try_release() {
|
||||
std::lock_guard<std::mutex> l(_mutex);
|
||||
std::vector<FileBlockCell*> trash;
|
||||
for (auto& [key, segments] : _files) {
|
||||
for (auto& [offset, cell] : segments) {
|
||||
if (cell.releasable()) {
|
||||
trash.emplace_back(&cell);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto& cell : trash) {
|
||||
FileBlockSPtr file_block = cell->file_block;
|
||||
std::lock_guard<std::mutex> lc(cell->file_block->_mutex);
|
||||
remove(file_block, l, lc);
|
||||
}
|
||||
LOG(INFO) << "Released " << trash.size() << " segments in file cache " << _cache_base_path;
|
||||
return trash.size();
|
||||
}
|
||||
|
||||
LRUFileCache::LRUQueue& LRUFileCache::get_queue(CacheType type) {
|
||||
switch (type) {
|
||||
case CacheType::INDEX:
|
||||
@ -707,6 +770,7 @@ void LRUFileCache::remove(FileBlockSPtr file_block, std::lock_guard<std::mutex>&
|
||||
LOG(ERROR) << ec.message();
|
||||
}
|
||||
}
|
||||
_num_removed_segments++;
|
||||
if (offsets.empty()) {
|
||||
auto key_path = get_path_in_local_cache(key);
|
||||
_files.erase(key);
|
||||
@ -1028,5 +1092,31 @@ void LRUFileCache::run_background_operation() {
|
||||
}
|
||||
}
|
||||
|
||||
void LRUFileCache::update_cache_metrics() const {
|
||||
std::lock_guard<std::mutex> l(_mutex);
|
||||
double hit_ratio = 0;
|
||||
if (_num_read_segments > 0) {
|
||||
hit_ratio = (double)_num_hit_segments / (double)_num_read_segments;
|
||||
}
|
||||
|
||||
file_cache_hits_ratio->set_value(hit_ratio);
|
||||
file_cache_removed_elements->set_value(_num_removed_segments);
|
||||
|
||||
file_cache_index_queue_max_size->set_value(_index_queue.get_max_size());
|
||||
file_cache_index_queue_curr_size->set_value(_index_queue.get_total_cache_size(l));
|
||||
file_cache_index_queue_max_elements->set_value(_index_queue.get_max_element_size());
|
||||
file_cache_index_queue_curr_elements->set_value(_index_queue.get_elements_num(l));
|
||||
|
||||
file_cache_normal_queue_max_size->set_value(_normal_queue.get_max_size());
|
||||
file_cache_normal_queue_curr_size->set_value(_normal_queue.get_total_cache_size(l));
|
||||
file_cache_normal_queue_max_elements->set_value(_normal_queue.get_max_element_size());
|
||||
file_cache_normal_queue_curr_elements->set_value(_normal_queue.get_elements_num(l));
|
||||
|
||||
file_cache_disposable_queue_max_size->set_value(_disposable_queue.get_max_size());
|
||||
file_cache_disposable_queue_curr_size->set_value(_disposable_queue.get_total_cache_size(l));
|
||||
file_cache_disposable_queue_max_elements->set_value(_disposable_queue.get_max_element_size());
|
||||
file_cache_disposable_queue_curr_elements->set_value(_disposable_queue.get_elements_num(l));
|
||||
}
|
||||
|
||||
} // namespace io
|
||||
} // namespace doris
|
||||
|
||||
28
be/src/io/cache/block/block_lru_file_cache.h
vendored
28
be/src/io/cache/block/block_lru_file_cache.h
vendored
@ -34,6 +34,7 @@
|
||||
#include "common/status.h"
|
||||
#include "io/cache/block/block_file_cache.h"
|
||||
#include "io/cache/block/block_file_segment.h"
|
||||
#include "util/metrics.h"
|
||||
|
||||
namespace doris {
|
||||
class TUniqueId;
|
||||
@ -126,6 +127,8 @@ private:
|
||||
LRUQueue _normal_queue;
|
||||
LRUQueue _disposable_queue;
|
||||
|
||||
size_t try_release() override;
|
||||
|
||||
LRUFileCache::LRUQueue& get_queue(CacheType type);
|
||||
const LRUFileCache::LRUQueue& get_queue(CacheType type) const;
|
||||
|
||||
@ -187,12 +190,37 @@ private:
|
||||
|
||||
void run_background_operation();
|
||||
|
||||
void update_cache_metrics() const;
|
||||
|
||||
public:
|
||||
std::string dump_structure(const Key& key) override;
|
||||
|
||||
private:
|
||||
std::atomic_bool _close {false};
|
||||
std::thread _cache_background_thread;
|
||||
size_t _num_read_segments = 0;
|
||||
size_t _num_hit_segments = 0;
|
||||
size_t _num_removed_segments = 0;
|
||||
|
||||
std::shared_ptr<MetricEntity> _entity = nullptr;
|
||||
|
||||
DoubleGauge* file_cache_hits_ratio = nullptr;
|
||||
UIntGauge* file_cache_removed_elements = nullptr;
|
||||
|
||||
UIntGauge* file_cache_index_queue_max_size = nullptr;
|
||||
UIntGauge* file_cache_index_queue_curr_size = nullptr;
|
||||
UIntGauge* file_cache_index_queue_max_elements = nullptr;
|
||||
UIntGauge* file_cache_index_queue_curr_elements = nullptr;
|
||||
|
||||
UIntGauge* file_cache_normal_queue_max_size = nullptr;
|
||||
UIntGauge* file_cache_normal_queue_curr_size = nullptr;
|
||||
UIntGauge* file_cache_normal_queue_max_elements = nullptr;
|
||||
UIntGauge* file_cache_normal_queue_curr_elements = nullptr;
|
||||
|
||||
UIntGauge* file_cache_disposable_queue_max_size = nullptr;
|
||||
UIntGauge* file_cache_disposable_queue_curr_size = nullptr;
|
||||
UIntGauge* file_cache_disposable_queue_max_elements = nullptr;
|
||||
UIntGauge* file_cache_disposable_queue_curr_elements = nullptr;
|
||||
};
|
||||
|
||||
} // namespace io
|
||||
|
||||
@ -48,6 +48,19 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader
|
||||
_cache = FileCacheFactory::instance().get_by_path(_cache_key);
|
||||
}
|
||||
|
||||
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader,
|
||||
const std::string& cache_base_path,
|
||||
const std::string& cache_path)
|
||||
: _remote_file_reader(std::move(remote_file_reader)) {
|
||||
_cache_key = IFileCache::hash(cache_path);
|
||||
_cache = FileCacheFactory::instance().get_by_path(cache_base_path);
|
||||
if (_cache == nullptr) {
|
||||
LOG(WARNING) << "Can't get cache from base path: " << cache_base_path
|
||||
<< ", using random instead.";
|
||||
_cache = FileCacheFactory::instance().get_by_path(_cache_key);
|
||||
}
|
||||
}
|
||||
|
||||
CachedRemoteFileReader::~CachedRemoteFileReader() {
|
||||
close();
|
||||
}
|
||||
|
||||
@ -41,6 +41,9 @@ class CachedRemoteFileReader final : public FileReader {
|
||||
public:
|
||||
CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_path);
|
||||
|
||||
CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_base_path,
|
||||
const std::string& cache_path);
|
||||
|
||||
~CachedRemoteFileReader() override;
|
||||
|
||||
Status close() override;
|
||||
|
||||
@ -45,15 +45,27 @@ namespace io {
|
||||
class FileWriter;
|
||||
} // namespace io
|
||||
|
||||
io::FileCachePolicy FileFactory::get_cache_policy(RuntimeState* state) {
|
||||
if (state != nullptr) {
|
||||
if (config::enable_file_cache && state->query_options().enable_file_cache) {
|
||||
return io::FileCachePolicy::FILE_BLOCK_CACHE;
|
||||
}
|
||||
static io::FileBlockCachePathPolicy BLOCK_CACHE_POLICY;
|
||||
static std::string RANDOM_CACHE_BASE_PATH = "random";
|
||||
|
||||
io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state) {
|
||||
io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE;
|
||||
if (config::enable_file_cache && state != nullptr &&
|
||||
state->query_options().__isset.enable_file_cache &&
|
||||
state->query_options().enable_file_cache) {
|
||||
cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE;
|
||||
}
|
||||
return io::FileCachePolicy::NO_CACHE;
|
||||
io::FileReaderOptions reader_options(cache_policy, BLOCK_CACHE_POLICY);
|
||||
if (state != nullptr && state->query_options().__isset.file_cache_base_path &&
|
||||
state->query_options().file_cache_base_path != RANDOM_CACHE_BASE_PATH) {
|
||||
reader_options.specify_cache_path(state->query_options().file_cache_base_path);
|
||||
}
|
||||
return reader_options;
|
||||
}
|
||||
|
||||
io::FileReaderOptions FileFactory::NO_CACHE_READER_OPTIONS =
|
||||
FileFactory::get_reader_options(nullptr);
|
||||
|
||||
Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
|
||||
const std::vector<TNetworkAddress>& broker_addresses,
|
||||
const std::map<std::string, std::string>& properties,
|
||||
@ -100,10 +112,8 @@ Status FileFactory::create_file_reader(RuntimeProfile* profile,
|
||||
const FileDescription& file_description,
|
||||
std::shared_ptr<io::FileSystem>* file_system,
|
||||
io::FileReaderSPtr* file_reader,
|
||||
io::FileCachePolicy cache_policy) {
|
||||
io::FileReaderOptions reader_options) {
|
||||
TFileType::type type = system_properties.system_type;
|
||||
io::FileBlockCachePathPolicy file_block_cache;
|
||||
io::FileReaderOptions reader_options(cache_policy, file_block_cache);
|
||||
reader_options.file_size = file_description.file_size;
|
||||
switch (type) {
|
||||
case TFileType::FILE_LOCAL: {
|
||||
|
||||
@ -58,7 +58,8 @@ class FileFactory {
|
||||
ENABLE_FACTORY_CREATOR(FileFactory);
|
||||
|
||||
public:
|
||||
static io::FileCachePolicy get_cache_policy(RuntimeState* state);
|
||||
static io::FileReaderOptions get_reader_options(RuntimeState* state);
|
||||
static io::FileReaderOptions NO_CACHE_READER_OPTIONS;
|
||||
|
||||
/// Create FileWriter
|
||||
static Status create_file_writer(TFileType::type type, ExecEnv* env,
|
||||
@ -72,7 +73,7 @@ public:
|
||||
RuntimeProfile* profile, const FileSystemProperties& system_properties,
|
||||
const FileDescription& file_description, std::shared_ptr<io::FileSystem>* file_system,
|
||||
io::FileReaderSPtr* file_reader,
|
||||
io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE);
|
||||
io::FileReaderOptions reader_options = NO_CACHE_READER_OPTIONS);
|
||||
|
||||
// Create FileReader for stream load pipe
|
||||
static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader);
|
||||
|
||||
@ -718,11 +718,11 @@ Status DelegateReader::create_file_reader(RuntimeProfile* profile,
|
||||
const FileDescription& file_description,
|
||||
std::shared_ptr<io::FileSystem>* file_system,
|
||||
io::FileReaderSPtr* file_reader, AccessMode access_mode,
|
||||
io::FileCachePolicy cache_policy, const IOContext* io_ctx,
|
||||
const PrefetchRange file_range) {
|
||||
io::FileReaderOptions reader_options,
|
||||
const IOContext* io_ctx, const PrefetchRange file_range) {
|
||||
io::FileReaderSPtr reader;
|
||||
RETURN_IF_ERROR(FileFactory::create_file_reader(profile, system_properties, file_description,
|
||||
file_system, &reader, cache_policy));
|
||||
file_system, &reader, reader_options));
|
||||
if (reader->size() < IN_MEMORY_FILE_SIZE) {
|
||||
*file_reader = std::make_shared<InMemoryFileReader>(reader);
|
||||
} else if (access_mode == AccessMode::SEQUENTIAL) {
|
||||
|
||||
@ -304,7 +304,7 @@ public:
|
||||
RuntimeProfile* profile, const FileSystemProperties& system_properties,
|
||||
const FileDescription& file_description, std::shared_ptr<io::FileSystem>* file_system,
|
||||
io::FileReaderSPtr* file_reader, AccessMode access_mode = SEQUENTIAL,
|
||||
io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE,
|
||||
io::FileReaderOptions reader_options = FileFactory::NO_CACHE_READER_OPTIONS,
|
||||
const IOContext* io_ctx = nullptr,
|
||||
const PrefetchRange file_range = PrefetchRange(0, 0));
|
||||
};
|
||||
|
||||
@ -78,6 +78,13 @@ public:
|
||||
// -1 means unset.
|
||||
// If the file length is not set, the file length will be fetched from the file system.
|
||||
int64_t file_size = -1;
|
||||
bool has_cache_base_path = false;
|
||||
std::string cache_base_path;
|
||||
|
||||
void specify_cache_path(const std::string& base_path) {
|
||||
has_cache_base_path = true;
|
||||
cache_base_path = base_path;
|
||||
}
|
||||
|
||||
static FileReaderOptions DEFAULT;
|
||||
};
|
||||
|
||||
@ -94,7 +94,13 @@ Status RemoteFileSystem::open_file_impl(const Path& path, const FileReaderOption
|
||||
case io::FileCachePolicy::FILE_BLOCK_CACHE: {
|
||||
StringPiece str(raw_reader->path().native());
|
||||
std::string cache_path = reader_options.path_policy.get_cache_path(path.native());
|
||||
*reader = std::make_shared<CachedRemoteFileReader>(std::move(raw_reader), cache_path);
|
||||
if (reader_options.has_cache_base_path) {
|
||||
// from query session variable: file_cache_base_path
|
||||
*reader = std::make_shared<CachedRemoteFileReader>(
|
||||
std::move(raw_reader), reader_options.cache_base_path, cache_path);
|
||||
} else {
|
||||
*reader = std::make_shared<CachedRemoteFileReader>(std::move(raw_reader), cache_path);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
|
||||
@ -27,6 +27,7 @@
|
||||
#include "http/action/compaction_action.h"
|
||||
#include "http/action/config_action.h"
|
||||
#include "http/action/download_action.h"
|
||||
#include "http/action/file_cache_action.h"
|
||||
#include "http/action/health_action.h"
|
||||
#include "http/action/jeprofile_actions.h"
|
||||
#include "http/action/meta_action.h"
|
||||
@ -141,6 +142,9 @@ Status HttpService::start() {
|
||||
_pool.add(new MetaAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
|
||||
_ev_http_server->register_handler(HttpMethod::GET, "/api/meta/{op}/{tablet_id}", meta_action);
|
||||
|
||||
FileCacheAction* file_cache_action = _pool.add(new FileCacheAction());
|
||||
_ev_http_server->register_handler(HttpMethod::GET, "/api/file_cache", file_cache_action);
|
||||
|
||||
#ifndef BE_TEST
|
||||
// Register BE checksum action
|
||||
ChecksumAction* checksum_action =
|
||||
|
||||
@ -169,10 +169,10 @@ Status CsvReader::init_reader(bool is_load) {
|
||||
if (_params.file_type == TFileType::FILE_STREAM) {
|
||||
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader));
|
||||
} else {
|
||||
io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state);
|
||||
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
|
||||
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
|
||||
_profile, _system_properties, _file_description, &_file_system, &_file_reader,
|
||||
io::DelegateReader::AccessMode::SEQUENTIAL, cache_policy, _io_ctx,
|
||||
io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx,
|
||||
io::PrefetchRange(_range.start_offset, _range.size)));
|
||||
}
|
||||
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
|
||||
@ -656,9 +656,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
|
||||
}
|
||||
|
||||
_file_description.start_offset = start_offset;
|
||||
io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state);
|
||||
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
|
||||
RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description,
|
||||
&_file_system, &_file_reader, cache_policy));
|
||||
&_file_system, &_file_reader, reader_options));
|
||||
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
|
||||
_params.file_type != TFileType::FILE_BROKER) {
|
||||
return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path);
|
||||
|
||||
@ -374,10 +374,10 @@ Status NewJsonReader::_open_file_reader() {
|
||||
if (_params.file_type == TFileType::FILE_STREAM) {
|
||||
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader));
|
||||
} else {
|
||||
io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state);
|
||||
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
|
||||
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
|
||||
_profile, _system_properties, _file_description, &_file_system, &_file_reader,
|
||||
io::DelegateReader::AccessMode::SEQUENTIAL, cache_policy, _io_ctx,
|
||||
io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx,
|
||||
io::PrefetchRange(_range.start_offset, _range.size)));
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
@ -199,10 +199,10 @@ void OrcReader::_init_profile() {
|
||||
Status OrcReader::_create_file_reader() {
|
||||
if (_file_input_stream == nullptr) {
|
||||
io::FileReaderSPtr inner_reader;
|
||||
io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state);
|
||||
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
|
||||
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
|
||||
_profile, _system_properties, _file_description, &_file_system, &inner_reader,
|
||||
io::DelegateReader::AccessMode::RANDOM, cache_policy, _io_ctx));
|
||||
io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx));
|
||||
_file_input_stream.reset(
|
||||
new ORCFileInputStream(_scan_range.path, inner_reader, &_statistics, _io_ctx));
|
||||
}
|
||||
|
||||
@ -205,10 +205,10 @@ Status ParquetReader::_open_file() {
|
||||
if (_file_reader == nullptr) {
|
||||
SCOPED_RAW_TIMER(&_statistics.open_file_time);
|
||||
++_statistics.open_file_num;
|
||||
io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state);
|
||||
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
|
||||
RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
|
||||
_profile, _system_properties, _file_description, &_file_system, &_file_reader,
|
||||
io::DelegateReader::AccessMode::RANDOM, cache_policy, _io_ctx));
|
||||
io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx));
|
||||
}
|
||||
if (_file_metadata == nullptr) {
|
||||
SCOPED_RAW_TIMER(&_statistics.parse_footer_time);
|
||||
|
||||
@ -113,10 +113,10 @@ Status VArrowScanner::_open_next_reader() {
|
||||
io::FileReaderSPtr file_reader;
|
||||
_init_system_properties(range);
|
||||
_init_file_description(range);
|
||||
io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state);
|
||||
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
|
||||
RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties,
|
||||
_file_description, &_file_system,
|
||||
&file_reader, cache_policy));
|
||||
&file_reader, reader_options));
|
||||
|
||||
if (file_reader->size() == 0) {
|
||||
continue;
|
||||
|
||||
@ -287,6 +287,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String ENABLE_FILE_CACHE = "enable_file_cache";
|
||||
|
||||
public static final String FILE_CACHE_BASE_PATH = "file_cache_base_path";
|
||||
|
||||
public static final String GROUP_BY_AND_HAVING_USE_ALIAS_FIRST = "group_by_and_having_use_alias_first";
|
||||
public static final String DROP_TABLE_IF_CTAS_FAILED = "drop_table_if_ctas_failed";
|
||||
|
||||
@ -796,6 +798,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = ENABLE_FILE_CACHE, needForward = true)
|
||||
public boolean enableFileCache = true;
|
||||
|
||||
// Specify base path for file cache, or chose a random path.
|
||||
@VariableMgr.VarAttr(name = FILE_CACHE_BASE_PATH, needForward = true)
|
||||
public String fileCacheBasePath = "random";
|
||||
|
||||
// Whether drop table when create table as select insert data appear error.
|
||||
@VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true)
|
||||
public boolean dropTableIfCtasFailed = true;
|
||||
@ -1654,6 +1660,14 @@ public class SessionVariable implements Serializable, Writable {
|
||||
this.enableFileCache = enableFileCache;
|
||||
}
|
||||
|
||||
public String getFileCacheBasePath() {
|
||||
return fileCacheBasePath;
|
||||
}
|
||||
|
||||
public void setFileCacheBasePath(String basePath) {
|
||||
this.fileCacheBasePath = basePath;
|
||||
}
|
||||
|
||||
public int getMaxTableCountUseCascadesJoinReorder() {
|
||||
return this.maxTableCountUseCascadesJoinReorder;
|
||||
}
|
||||
@ -1741,6 +1755,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
tResult.setEnableFileCache(enableFileCache);
|
||||
|
||||
tResult.setFileCacheBasePath(fileCacheBasePath);
|
||||
|
||||
if (dryRunQuery) {
|
||||
tResult.setDryRunQuery(true);
|
||||
}
|
||||
|
||||
@ -212,6 +212,9 @@ struct TQueryOptions {
|
||||
|
||||
// partition count(1 << external_agg_partition_bits) when spill aggregation data into disk
|
||||
69: optional i32 external_agg_partition_bits = 4
|
||||
|
||||
// Specify base path for file cache
|
||||
70: optional string file_cache_base_path
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user