From 2bd01b23c73d33c38e4ecadda6d2adc7cfa2fb55 Mon Sep 17 00:00:00 2001 From: ZHAO Chun Date: Mon, 12 Aug 2019 10:42:00 +0800 Subject: [PATCH] Add page cache for column page in BetaRowset (#1607) --- be/src/common/config.h | 3 + be/src/olap/CMakeLists.txt | 1 + be/src/olap/lru_cache.cpp | 5 + be/src/olap/lru_cache.h | 6 + be/src/olap/page_cache.cpp | 53 ++++++++ be/src/olap/page_cache.h | 121 ++++++++++++++++++ .../olap/rowset/segment_v2/column_reader.cpp | 13 +- be/src/olap/rowset/segment_v2/page_handle.h | 54 +++++--- be/src/runtime/exec_env_init.cpp | 14 ++ be/test/olap/CMakeLists.txt | 1 + be/test/olap/page_cache_test.cpp | 78 +++++++++++ run-ut.sh | 1 + 12 files changed, 334 insertions(+), 16 deletions(-) create mode 100644 be/src/olap/page_cache.cpp create mode 100644 be/src/olap/page_cache.h create mode 100644 be/test/olap/page_cache_test.cpp diff --git a/be/src/common/config.h b/be/src/common/config.h index a047eebb76..80e74a3daa 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -225,6 +225,9 @@ namespace config { CONF_Int64(index_stream_cache_capacity, "10737418240"); CONF_Int64(max_packed_row_block_size, "20971520"); + // Cache for stoage page size + CONF_String(storage_page_cache_limit, "20G"); + // be policy CONF_Int64(base_compaction_start_hour, "20"); CONF_Int64(base_compaction_end_hour, "7"); diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 1790f9effa..8eb8f71472 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -53,6 +53,7 @@ add_library(Olap STATIC olap_server.cpp options.cpp out_stream.cpp + page_cache.cpp push_handler.cpp reader.cpp row_block.cpp diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp index 5d810e7fd0..9090db6306 100644 --- a/be/src/olap/lru_cache.cpp +++ b/be/src/olap/lru_cache.cpp @@ -353,6 +353,11 @@ void* ShardedLRUCache::value(Handle* handle) { return reinterpret_cast(handle)->value; } +Slice ShardedLRUCache::value_slice(Handle* handle) { + auto lru_handle = reinterpret_cast(handle); + return Slice((char*)lru_handle->value, lru_handle->charge); +} + uint64_t ShardedLRUCache::new_id() { MutexLock l(&_id_mutex); return ++(_last_id); diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h index cc251ccfb5..fa738902a0 100644 --- a/be/src/olap/lru_cache.h +++ b/be/src/olap/lru_cache.h @@ -14,6 +14,7 @@ #include "olap/olap_common.h" #include "olap/utils.h" +#include "util/slice.h" namespace doris { @@ -190,6 +191,10 @@ namespace doris { // REQUIRES: handle must have been returned by a method on *this. virtual void* value(Handle* handle) = 0; + // Return the value in Slice format encapsulated in the given handle + // returned by a successful lookup() + virtual Slice value_slice(Handle* handle) = 0; + // If the cache contains entry for key, erase it. Note that the // underlying entry will be kept around until all existing handles // to it have been released. @@ -366,6 +371,7 @@ namespace doris { virtual void release(Handle* handle); virtual void erase(const CacheKey& key); virtual void* value(Handle* handle); + Slice value_slice(Handle* handle) override; virtual uint64_t new_id(); virtual void prune(); virtual size_t get_memory_usage(); diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp new file mode 100644 index 0000000000..a96a7deb08 --- /dev/null +++ b/be/src/olap/page_cache.cpp @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/page_cache.h" + +namespace doris { + +// This should only be used in unit test. 1GB +static StoragePageCache s_ut_cache(1073741824); + +StoragePageCache* StoragePageCache::_s_instance = &s_ut_cache; + +void StoragePageCache::create_global_cache(size_t capacity) { + if (_s_instance == &s_ut_cache) { + _s_instance = new StoragePageCache(capacity); + } +} + +StoragePageCache::StoragePageCache(size_t capacity) : _cache(new_lru_cache(capacity)) { +} + +bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle) { + auto lru_handle = _cache->lookup(key.encode()); + if (lru_handle == nullptr) { + return false; + } + *handle = PageCacheHandle(_cache.get(), lru_handle); + return true; +} + +void StoragePageCache::insert(const CacheKey& key, const Slice& data, PageCacheHandle* handle) { + auto deleter = [](const doris::CacheKey& key, void* value) { + delete[] (uint8_t*)value; + }; + auto lru_handle = _cache->insert(key.encode(), data.data, data.size, deleter); + *handle = PageCacheHandle(_cache.get(), lru_handle); +} + +} diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h new file mode 100644 index 0000000000..52174f61c7 --- /dev/null +++ b/be/src/olap/page_cache.h @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "gutil/macros.h" // for DISALLOW_COPY_AND_ASSIGN +#include "olap/lru_cache.h" + +namespace doris { + +class PageCacheHandle; + +// Warpper around Cache, and used for cache page of column datas +// in Segment. +// TODO(zc): We should add some metric to see cache hit/miss rate. +class StoragePageCache { +public: + // The unique key identifying entries in the page cache. + // Each cached page corresponds to a specific offset within + // a file. + // + // TODO(zc): Now we use file name(std::string) as a part of + // key, which is not efficient. We should make it better later + struct CacheKey { + CacheKey(std::string fname_, int64_t offset_) : fname(std::move(fname_)), offset(offset_) { } + std::string fname; + int64_t offset; + + // Encode to a flat binary which can be used as LRUCache's key + std::string encode() const { + std::string key_buf(fname); + key_buf.append((char*)&offset, sizeof(offset)); + return key_buf; + } + }; + + // Create global instance of this class + static void create_global_cache(size_t capacity); + + // Return global instance. + // Client should call create_global_cache before. + static StoragePageCache* instance() { return _s_instance; } + + StoragePageCache(size_t capacity); + + // Lookup the given page in the cache. + // + // If the page is found, the cache entry will be written into handle. + // PageCacheHandle will release cache entry to cache when it + // destructs. + // + // Return true if entry is found, otherwise return false. + bool lookup(const CacheKey& key, PageCacheHandle* handle); + + // Insert a page with key into this cache. + // Given hanlde will be set to valid reference. + // This function is thread-safe, and when two clients insert two same key + // concurrently, this function can assure that only one page is cached. + void insert(const CacheKey& key, const Slice& data, PageCacheHandle* handle); +private: + StoragePageCache(); + static StoragePageCache* _s_instance; + + std::unique_ptr _cache = nullptr; +}; + +// A handle for StoragePageCache entry. This class make it easy to handle +// Cache entry. Users don't need to release the obtained cache entry. This +// class will release the cache entry when it is destroyed. +class PageCacheHandle { +public: + PageCacheHandle() { } + PageCacheHandle(Cache* cache, Cache::Handle* handle) : _cache(cache), _handle(handle) { } + ~PageCacheHandle() { + if (_handle != nullptr) { + _cache->release(_handle); + } + } + + PageCacheHandle(PageCacheHandle&& other) noexcept { + // we can use std::exchange if we switch c++14 on + std::swap(_cache, other._cache); + std::swap(_handle, other._handle); + } + + PageCacheHandle& operator=(PageCacheHandle&& other) noexcept { + std::swap(_cache, other._cache); + std::swap(_handle, other._handle); + return *this; + } + + Cache* cache() const { return _cache; } + Slice data() const { return _cache->value_slice(_handle); } + +private: + Cache* _cache = nullptr; + Cache::Handle* _handle = nullptr; + + // Don't allow copy and assign + DISALLOW_COPY_AND_ASSIGN(PageCacheHandle); +}; + +} diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index c0432bf515..d1614c955f 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -26,6 +26,7 @@ #include "olap/rowset/segment_v2/options.h" // for PageDecoderOptions #include "olap/types.h" // for TypeInfo #include "olap/column_block.h" // for ColumnBlockView +#include "olap/page_cache.h" #include "util/coding.h" // for get_varint32 #include "util/rle_encoding.h" // for RleDecoder @@ -96,6 +97,14 @@ Status ColumnReader::new_iterator(ColumnIterator** iterator) { } Status ColumnReader::read_page(const PagePointer& pp, PageHandle* handle) { + auto cache = StoragePageCache::instance(); + PageCacheHandle cache_handle; + StoragePageCache::CacheKey cache_key(_file->file_name(), pp.offset); + if (cache->lookup(cache_key, &cache_handle)) { + // we find page in cache, use it + *handle = PageHandle(std::move(cache_handle)); + return Status::OK(); + } // Now we read this from file. we size_t data_size = pp.size; if (has_checksum() && data_size < sizeof(uint32_t)) { @@ -119,7 +128,9 @@ Status ColumnReader::read_page(const PagePointer& pp, PageHandle* handle) { // TODO(zc): compress - *handle = PageHandle::create_from_slice(data); + // insert this into cache and return the cache handle + cache->insert(cache_key, data, &cache_handle); + *handle = PageHandle(std::move(cache_handle)); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/page_handle.h b/be/src/olap/rowset/segment_v2/page_handle.h index 54193c018e..890ec56d90 100644 --- a/be/src/olap/rowset/segment_v2/page_handle.h +++ b/be/src/olap/rowset/segment_v2/page_handle.h @@ -19,46 +19,70 @@ #include "gutil/macros.h" // for DISALLOW_COPY_AND_ASSIGN #include "util/slice.h" // for Slice +#include "olap/page_cache.h" namespace doris { namespace segment_v2 { -// when page is read into memory, we use this to store it -// This class should delete memory +// When a column page is read into memory, we use this to store it. +// A page's data may be in cache, or may not in cache. We use this +// class to unify these two cases. +// If client use this struct to wrap data not in cache, this class +// will free data's memory when it is destoryed. class PageHandle { public: - static PageHandle create_from_slice(const Slice& slice) { - return PageHandle(slice); + PageHandle() : _is_data_owner(false) { } + + // This class will take the ownership of input data's memory. It will + // free it when deconstructs. + PageHandle(const Slice& data) : _is_data_owner(true), _data(data) { } + + // This class will take the content of cache data, and will make input + // cache_data to a invalid cache handle. + PageHandle(PageCacheHandle cache_data) + : _is_data_owner(false), _cache_data(std::move(cache_data)) { } - PageHandle() : _data((const uint8_t*)nullptr, 0) { } - // Move constructor - PageHandle(PageHandle&& other) noexcept : _data(other._data) { - other._data = Slice(); + PageHandle(PageHandle&& other) noexcept + : _is_data_owner(false), + _data(std::move(other._data)), + _cache_data(std::move(other._cache_data)) { + // we can use std::exchange if we switch c++14 on + std::swap(_is_data_owner, other._is_data_owner); } PageHandle& operator=(PageHandle&& other) noexcept { - std::swap(_data, other._data); + std::swap(_is_data_owner, other._is_data_owner); + _data = std::move(other._data); + _cache_data = std::move(other._cache_data); return *this; } ~PageHandle() { - delete[] _data.data; - _data = Slice(); + if (_is_data_owner) { + delete[] _data.data; + } } + // This function only valid when assign valid data, either in cache or not Slice data() const { - return _data; + if (_is_data_owner) { + return _data; + } else { + return _cache_data.data(); + } } private: - PageHandle(const Slice& data) : _data(data) { - } + // when this is true, it means this struct own data and _data is valid. + // otherwise _cache_data is valid, and data is belong to cache. + bool _is_data_owner = false; Slice _data; + PageCacheHandle _cache_data; - // cause we + // Don't allow copy and assign DISALLOW_COPY_AND_ASSIGN(PageHandle); }; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 22e125b335..87184a04b6 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -41,6 +41,7 @@ #include "util/mem_info.h" #include "util/debug_util.h" #include "olap/storage_engine.h" +#include "olap/page_cache.h" #include "util/network_util.h" #include "util/bfd_parser.h" #include "runtime/etl_job_mgr.h" @@ -126,6 +127,7 @@ Status ExecEnv::_init(const std::vector& store_paths) { _small_file_mgr->init(); _init_mem_tracker(); RETURN_IF_ERROR(_tablet_writer_mgr->start_bg_worker()); + return Status::OK(); } @@ -181,6 +183,18 @@ Status ExecEnv::_init_mem_tracker() { LOG(INFO) << "Using global memory limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES); RETURN_IF_ERROR(_disk_io_mgr->init(_mem_tracker)); RETURN_IF_ERROR(_tmp_file_mgr->init(DorisMetrics::metrics())); + + int64_t storage_cache_limit = ParseUtil::parse_mem_spec( + config::storage_page_cache_limit, &is_percent); + if (storage_cache_limit > MemInfo::physical_mem()) { + LOG(WARNING) << "Config storage_page_cache_limit is greater than memory size, config=" + << config::storage_page_cache_limit + << ", memory=" << MemInfo::physical_mem(); + } + StoragePageCache::create_global_cache(storage_cache_limit); + + // TODO(zc): The current memory usage configuration is a bit confusing, + // we need to sort out the use of memory return Status::OK(); } diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index a258e0f73e..0cf4aa772f 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -64,3 +64,4 @@ ADD_BE_TEST(txn_manager_test) ADD_BE_TEST(generic_iterators_test) ADD_BE_TEST(key_coder_test) ADD_BE_TEST(short_key_index_test) +ADD_BE_TEST(page_cache_test) diff --git a/be/test/olap/page_cache_test.cpp b/be/test/olap/page_cache_test.cpp new file mode 100644 index 0000000000..64172b386b --- /dev/null +++ b/be/test/olap/page_cache_test.cpp @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/page_cache.h" + +#include + +namespace doris { + +class StoragePageCacheTest : public testing::Test { +public: + StoragePageCacheTest() { } + virtual ~StoragePageCacheTest() { + } +}; + +TEST(StoragePageCacheTest, normal) { + StoragePageCache cache(10 * 1024); + + StoragePageCache::CacheKey key("abc", 0); + + char* buf = new char[1024]; + { + PageCacheHandle handle; + Slice data(buf, 1024); + cache.insert(key, data, &handle); + + ASSERT_EQ(handle.data().data, buf); + } + // cache hit + { + PageCacheHandle handle; + auto found = cache.lookup(key, &handle); + ASSERT_TRUE(found); + ASSERT_EQ(buf, handle.data().data); + } + // cache miss + { + PageCacheHandle handle; + StoragePageCache::CacheKey miss_key("abc", 1); + auto found = cache.lookup(miss_key, &handle); + ASSERT_FALSE(found); + } + // put too many page to eliminate first page + for (int i = 0; i < 10; ++i) { + StoragePageCache::CacheKey key("bcd", i); + PageCacheHandle handle; + Slice data(new char[1024], 1024); + cache.insert(key, data, &handle); + } + // cache miss for eliminated key + { + PageCacheHandle handle; + auto found = cache.lookup(key, &handle); + ASSERT_FALSE(found); + } +} + +} // namespace doris + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/run-ut.sh b/run-ut.sh index 1a5c99ec9e..646b5556b0 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -258,6 +258,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/generic_iterators_test ${DORIS_TEST_BINARY_DIR}/olap/aggregate_func_test ${DORIS_TEST_BINARY_DIR}/olap/short_key_index_test ${DORIS_TEST_BINARY_DIR}/olap/key_coder_test +${DORIS_TEST_BINARY_DIR}/olap/page_cache_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test