diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h index 654b9e8964..f169fea434 100644 --- a/be/src/olap/lru_cache.h +++ b/be/src/olap/lru_cache.h @@ -239,12 +239,12 @@ private: // An entry is a variable length heap-allocated structure. Entries // are kept in a circular doubly linked list ordered by access time. -typedef struct LRUHandle { +struct LRUHandle { void* value; void (*deleter)(const CacheKey&, void* value); - LRUHandle* next_hash = nullptr; // next entry in hash table - LRUHandle* next = nullptr; // next entry in lru list - LRUHandle* prev = nullptr; // previous entry in lru list + struct LRUHandle* next_hash = nullptr; // next entry in hash table + struct LRUHandle* next = nullptr; // next entry in lru list + struct LRUHandle* prev = nullptr; // previous entry in lru list size_t charge; size_t key_length; size_t total_size; // including key length @@ -273,8 +273,7 @@ typedef struct LRUHandle { DorisMetrics::instance()->lru_cache_memory_bytes->increment(-bytes); ::free(this); } - -} LRUHandle; +}; // We provide our own simple hash tablet since it removes a whole bunch // of porting hacks and is also faster than some of the built-in hash diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index 53f9378d17..644162bd2e 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -64,9 +64,12 @@ bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle, return true; } -void StoragePageCache::insert(const CacheKey& key, const Slice& data, PageCacheHandle* handle, +void StoragePageCache::insert(const CacheKey& key, DataPage* data, PageCacheHandle* handle, segment_v2::PageTypePB page_type, bool in_memory) { - auto deleter = [](const doris::CacheKey& key, void* value) { delete[] (uint8_t*)value; }; + auto deleter = [](const doris::CacheKey& key, void* value) { + DataPage* cache_value = (DataPage*)value; + delete cache_value; + }; CachePriority priority = CachePriority::NORMAL; if (in_memory) { @@ -74,7 +77,7 @@ void StoragePageCache::insert(const CacheKey& key, const Slice& data, PageCacheH } auto cache = _get_page_cache(page_type); - auto lru_handle = cache->insert(key.encode(), data.data, data.size, deleter, priority); + auto lru_handle = cache->insert(key.encode(), data, data->capacity(), deleter, priority); *handle = PageCacheHandle(cache, lru_handle); } diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h index faeff28d43..8cc41c4385 100644 --- a/be/src/olap/page_cache.h +++ b/be/src/olap/page_cache.h @@ -28,11 +28,52 @@ #include "olap/lru_cache.h" #include "util/slice.h" +#include "vec/common/allocator.h" +#include "vec/common/allocator_fwd.h" namespace doris { class PageCacheHandle; +template +class PageBase : private TAllocator { +public: + PageBase() : _data(nullptr), _size(0), _capacity(0) {} + + PageBase(size_t b) : _size(b), _capacity(b) { + _data = reinterpret_cast(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16)); + ExecEnv::GetInstance()->page_no_cache_mem_tracker()->consume(_capacity); + } + + PageBase(const PageBase&) = delete; + PageBase& operator=(const PageBase&) = delete; + + ~PageBase() { + if (_data != nullptr) { + DCHECK(_capacity != 0 && _size != 0); + TAllocator::free(_data, _capacity); + ExecEnv::GetInstance()->page_no_cache_mem_tracker()->release(_capacity); + } + } + + char* data() { return _data; } + size_t size() { return _size; } + size_t capacity() { return _capacity; } + + void reset_size(size_t n) { + DCHECK(n <= _capacity); + _size = n; + } + +private: + char* _data; + // Effective size, smaller than capacity, such as data page remove checksum suffix. + size_t _size; + size_t _capacity = 0; +}; + +using DataPage = PageBase>; + // Wrapper around Cache, and used for cache page of column data // in Segment. // TODO(zc): We should add some metric to see cache hit/miss rate. @@ -88,7 +129,7 @@ public: // This function is thread-safe, and when two clients insert two same key // concurrently, this function can assure that only one page is cached. // The in_memory page will have higher priority. - void insert(const CacheKey& key, const Slice& data, PageCacheHandle* handle, + void insert(const CacheKey& key, DataPage* data, PageCacheHandle* handle, segment_v2::PageTypePB page_type, bool in_memory = false); // Page cache available check. @@ -150,7 +191,10 @@ public: } Cache* cache() const { return _cache; } - Slice data() const { return _cache->value_slice(_handle); } + Slice data() const { + DataPage* cache_value = (DataPage*)_cache->value(_handle); + return Slice(cache_value->data(), cache_value->size()); + } private: Cache* _cache = nullptr; diff --git a/be/src/olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h b/be/src/olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h index e762f90d39..7ec7bed223 100644 --- a/be/src/olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h +++ b/be/src/olap/rowset/segment_v2/bitshuffle_page_pre_decoder.h @@ -17,6 +17,7 @@ #pragma once +#include "olap/page_cache.h" #include "olap/rowset/segment_v2/binary_dict_page.h" #include "olap/rowset/segment_v2/bitshuffle_page.h" #include "olap/rowset/segment_v2/encoding_info.h" @@ -37,7 +38,7 @@ struct BitShufflePagePreDecoder : public DataPagePreDecoder { * @param size_of_tail including size of footer and null map * @return Status */ - virtual Status decode(std::unique_ptr* page, Slice* page_slice, + virtual Status decode(std::unique_ptr* page, Slice* page_slice, size_t size_of_tail) override { size_t num_elements, compressed_size, num_element_after_padding; int size_of_element; @@ -65,8 +66,8 @@ struct BitShufflePagePreDecoder : public DataPagePreDecoder { Slice decoded_slice; decoded_slice.size = size_of_dict_header + BITSHUFFLE_PAGE_HEADER_SIZE + num_element_after_padding * size_of_element + size_of_tail; - std::unique_ptr decoded_page(new char[decoded_slice.size]); - decoded_slice.data = decoded_page.get(); + std::unique_ptr decoded_page = std::make_unique(decoded_slice.size); + decoded_slice.data = decoded_page->data(); if constexpr (USED_IN_DICT_ENCODING) { memcpy(decoded_slice.data, page_slice->data, size_of_dict_header); diff --git a/be/src/olap/rowset/segment_v2/encoding_info.h b/be/src/olap/rowset/segment_v2/encoding_info.h index 4086f78294..a706683ff5 100644 --- a/be/src/olap/rowset/segment_v2/encoding_info.h +++ b/be/src/olap/rowset/segment_v2/encoding_info.h @@ -23,6 +23,7 @@ #include #include "common/status.h" +#include "olap/page_cache.h" #include "util/slice.h" namespace doris { @@ -41,7 +42,7 @@ enum EncodingTypePB : int; // For better performance, some encodings (like BitShuffle) need to be decoded before being added to the PageCache. class DataPagePreDecoder { public: - virtual Status decode(std::unique_ptr* page, Slice* page_slice, + virtual Status decode(std::unique_ptr* page, Slice* page_slice, size_t size_of_tail) = 0; virtual ~DataPagePreDecoder() = default; }; diff --git a/be/src/olap/rowset/segment_v2/page_handle.h b/be/src/olap/rowset/segment_v2/page_handle.h index c2f82573c2..7e4f766524 100644 --- a/be/src/olap/rowset/segment_v2/page_handle.h +++ b/be/src/olap/rowset/segment_v2/page_handle.h @@ -36,9 +36,7 @@ public: // This class will take the ownership of input data's memory. It will // free it when deconstructs. - PageHandle(const Slice& data) : _is_data_owner(true), _data(data) { - ExecEnv::GetInstance()->page_no_cache_mem_tracker()->consume(_data.size); - } + PageHandle(DataPage* data) : _is_data_owner(true), _data(data) {} // This class will take the content of cache data, and will make input // cache_data to a invalid cache handle. @@ -46,32 +44,31 @@ public: : _is_data_owner(false), _cache_data(std::move(cache_data)) {} // Move constructor - PageHandle(PageHandle&& other) noexcept - : _is_data_owner(false), - _data(std::move(other._data)), - _cache_data(std::move(other._cache_data)) { + PageHandle(PageHandle&& other) noexcept : _cache_data(std::move(other._cache_data)) { // we can use std::exchange if we switch c++14 on std::swap(_is_data_owner, other._is_data_owner); + std::swap(_data, other._data); } PageHandle& operator=(PageHandle&& other) noexcept { std::swap(_is_data_owner, other._is_data_owner); - _data = std::move(other._data); + std::swap(_data, other._data); _cache_data = std::move(other._cache_data); return *this; } ~PageHandle() { - if (_is_data_owner && _data.size > 0) { - delete[] _data.data; - ExecEnv::GetInstance()->page_no_cache_mem_tracker()->consume(-_data.size); + if (_is_data_owner) { + delete _data; + } else { + DCHECK(_data == nullptr); } } // the return slice contains uncompressed page body, page footer, and footer size Slice data() const { if (_is_data_owner) { - return _data; + return Slice(_data->data(), _data->size()); } else { return _cache_data.data(); } @@ -81,7 +78,7 @@ private: // when this is true, it means this struct own data and _data is valid. // otherwise _cache_data is valid, and data is belong to cache. bool _is_data_owner = false; - Slice _data; + DataPage* _data = nullptr; PageCacheHandle _cache_data; // Don't allow copy and assign diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index f9b1fb3408..48dce71f36 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -143,8 +143,8 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* } // hold compressed page at first, reset to decompressed page later - std::unique_ptr page(new char[page_size]); - Slice page_slice(page.get(), page_size); + std::unique_ptr page = std::make_unique(page_size); + Slice page_slice(page->data(), page_size); { SCOPED_RAW_TIMER(&opts.stats->io_ns); size_t bytes_read = 0; @@ -177,12 +177,12 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* return Status::Corruption("Bad page: page is compressed but codec is NO_COMPRESSION"); } SCOPED_RAW_TIMER(&opts.stats->decompress_ns); - std::unique_ptr decompressed_page( - new char[footer->uncompressed_size() + footer_size + 4]); + std::unique_ptr decompressed_page = + std::make_unique(footer->uncompressed_size() + footer_size + 4); // decompress page body Slice compressed_body(page_slice.data, body_size); - Slice decompressed_body(decompressed_page.get(), footer->uncompressed_size()); + Slice decompressed_body(decompressed_page->data(), footer->uncompressed_size()); RETURN_IF_ERROR(opts.codec->decompress(compressed_body, &decompressed_body)); if (decompressed_body.size != footer->uncompressed_size()) { return Status::Corruption( @@ -194,7 +194,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* footer_size + 4); // free memory of compressed page page = std::move(decompressed_page); - page_slice = Slice(page.get(), footer->uncompressed_size() + footer_size + 4); + page_slice = Slice(page->data(), footer->uncompressed_size() + footer_size + 4); opts.stats->uncompressed_bytes_read += page_slice.size; } else { opts.stats->uncompressed_bytes_read += body_size; @@ -210,12 +210,13 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* } *body = Slice(page_slice.data, page_slice.size - 4 - footer_size); + page->reset_size(page_slice.size); if (opts.use_page_cache && cache->is_cache_available(opts.type)) { // insert this page into cache and return the cache handle - cache->insert(cache_key, page_slice, &cache_handle, opts.type, opts.kept_in_memory); + cache->insert(cache_key, page.get(), &cache_handle, opts.type, opts.kept_in_memory); *handle = PageHandle(std::move(cache_handle)); } else { - *handle = PageHandle(page_slice); + *handle = PageHandle(page.get()); } page.release(); // memory now managed by handle return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index 5ccccc2a3d..e969615202 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -53,7 +53,7 @@ struct PageReadOptions { // whether to verify page checksum bool verify_checksum = true; // whether to use page cache in read path - bool use_page_cache = true; + bool use_page_cache = false; // if true, use DURABLE CachePriority in page cache // currently used for in memory olap table bool kept_in_memory = false; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 3434634479..c8a1429492 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -249,6 +249,7 @@ Status Segment::load_index() { opts.page_pointer = PagePointer(_footer.short_key_index_page()); opts.codec = nullptr; // short key index page uses NO_COMPRESSION for now OlapReaderStatistics tmp_stats; + opts.use_page_cache = true; opts.stats = &tmp_stats; opts.type = INDEX_PAGE; Slice body; diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 64f54d40ff..711bcc9dd2 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -77,6 +77,11 @@ static constexpr size_t CHUNK_THRESHOLD = 4096; static constexpr size_t MMAP_MIN_ALIGNMENT = 4096; static constexpr size_t MALLOC_MIN_ALIGNMENT = 8; +// The memory for __int128 should be aligned to 16 bytes. +// By the way, in 64-bit system, the address of a block returned by malloc or realloc in GNU systems +// is always a multiple of sixteen. (https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html) +static constexpr int ALLOCATOR_ALIGNMENT_16 = 16; + /** Responsible for allocating / freeing memory. Used, for example, in PODArray, Arena. * Also used in hash tables. * The interface is different from std::allocator diff --git a/be/test/olap/page_cache_test.cpp b/be/test/olap/page_cache_test.cpp index 8c9a37183e..6f9724fe0c 100644 --- a/be/test/olap/page_cache_test.cpp +++ b/be/test/olap/page_cache_test.cpp @@ -43,26 +43,24 @@ TEST(StoragePageCacheTest, data_page_only) { { // insert normal page - char* buf = new char[1024]; PageCacheHandle handle; - Slice data(buf, 1024); + DataPage* data = new DataPage(1024); cache.insert(key, data, &handle, page_type, false); - EXPECT_EQ(handle.data().data, buf); + EXPECT_EQ(handle.data().data, data->data()); auto found = cache.lookup(key, &handle, page_type); EXPECT_TRUE(found); - EXPECT_EQ(buf, handle.data().data); + EXPECT_EQ(data->data(), handle.data().data); } { // insert in_memory page - char* buf = new char[1024]; PageCacheHandle handle; - Slice data(buf, 1024); + DataPage* data = new DataPage(1024); cache.insert(memory_key, data, &handle, page_type, true); - EXPECT_EQ(handle.data().data, buf); + EXPECT_EQ(handle.data().data, data->data()); auto found = cache.lookup(memory_key, &handle, page_type); EXPECT_TRUE(found); @@ -72,7 +70,7 @@ TEST(StoragePageCacheTest, data_page_only) { for (int i = 0; i < 10 * kNumShards; ++i) { StoragePageCache::CacheKey key("bcd", 0, i); PageCacheHandle handle; - Slice data(new char[1024], 1024); + DataPage* data = new DataPage(1024); cache.insert(key, data, &handle, page_type, false); } @@ -111,26 +109,24 @@ TEST(StoragePageCacheTest, index_page_only) { { // insert normal page - char* buf = new char[1024]; PageCacheHandle handle; - Slice data(buf, 1024); + DataPage* data = new DataPage(1024); cache.insert(key, data, &handle, page_type, false); - EXPECT_EQ(handle.data().data, buf); + EXPECT_EQ(handle.data().data, data->data()); auto found = cache.lookup(key, &handle, page_type); EXPECT_TRUE(found); - EXPECT_EQ(buf, handle.data().data); + EXPECT_EQ(data->data(), handle.data().data); } { // insert in_memory page - char* buf = new char[1024]; PageCacheHandle handle; - Slice data(buf, 1024); + DataPage* data = new DataPage(1024); cache.insert(memory_key, data, &handle, page_type, true); - EXPECT_EQ(handle.data().data, buf); + EXPECT_EQ(handle.data().data, data->data()); auto found = cache.lookup(memory_key, &handle, page_type); EXPECT_TRUE(found); @@ -140,7 +136,7 @@ TEST(StoragePageCacheTest, index_page_only) { for (int i = 0; i < 10 * kNumShards; ++i) { StoragePageCache::CacheKey key("bcd", 0, i); PageCacheHandle handle; - Slice data(new char[1024], 1024); + DataPage* data = new DataPage(1024); cache.insert(key, data, &handle, page_type, false); } @@ -182,35 +178,33 @@ TEST(StoragePageCacheTest, mixed_pages) { { // insert both normal pages - char* buf_data = new char[1024]; - char* buf_index = new char[1024]; PageCacheHandle data_handle, index_handle; - Slice data(buf_data, 1024), index(buf_index, 1024); + DataPage* data = new DataPage(1024); + DataPage* index = new DataPage(1024); cache.insert(data_key, data, &data_handle, page_type_data, false); cache.insert(index_key, index, &index_handle, page_type_index, false); - EXPECT_EQ(data_handle.data().data, buf_data); - EXPECT_EQ(index_handle.data().data, buf_index); + EXPECT_EQ(data_handle.data().data, data->data()); + EXPECT_EQ(index_handle.data().data, index->data()); auto found_data = cache.lookup(data_key, &data_handle, page_type_data); auto found_index = cache.lookup(index_key, &index_handle, page_type_index); EXPECT_TRUE(found_data); EXPECT_TRUE(found_index); - EXPECT_EQ(buf_data, data_handle.data().data); - EXPECT_EQ(buf_index, index_handle.data().data); + EXPECT_EQ(data->data(), data_handle.data().data); + EXPECT_EQ(index->data(), index_handle.data().data); } { // insert both in_memory pages - char* buf_data = new char[1024]; - char* buf_index = new char[1024]; PageCacheHandle data_handle, index_handle; - Slice data(buf_data, 1024), index(buf_index, 1024); + DataPage* data = new DataPage(1024); + DataPage* index = new DataPage(1024); cache.insert(data_key_mem, data, &data_handle, page_type_data, true); cache.insert(index_key_mem, index, &index_handle, page_type_index, true); - EXPECT_EQ(data_handle.data().data, buf_data); - EXPECT_EQ(index_handle.data().data, buf_index); + EXPECT_EQ(data_handle.data().data, data->data()); + EXPECT_EQ(index_handle.data().data, index->data()); auto found_data = cache.lookup(data_key_mem, &data_handle, page_type_data); auto found_index = cache.lookup(index_key_mem, &index_handle, page_type_index); @@ -222,9 +216,10 @@ TEST(StoragePageCacheTest, mixed_pages) { for (int i = 0; i < 10 * kNumShards; ++i) { StoragePageCache::CacheKey key("bcd", 0, i); PageCacheHandle handle; - Slice data(new char[1024], 1024), index(new char[1024], 1024); - cache.insert(key, data, &handle, page_type_data, false); - cache.insert(key, index, &handle, page_type_index, false); + std::unique_ptr data = std::make_unique(1024); + std::unique_ptr index = std::make_unique(1024); + cache.insert(key, data.release(), &handle, page_type_data, false); + cache.insert(key, index.release(), &handle, page_type_index, false); } // cache miss by key @@ -242,11 +237,10 @@ TEST(StoragePageCacheTest, mixed_pages) { PageCacheHandle data_handle, index_handle; StoragePageCache::CacheKey miss_key_data("data_miss", 0, 1); StoragePageCache::CacheKey miss_key_index("index_miss", 0, 1); - char* buf_data = new char[1024]; - char* buf_index = new char[1024]; - Slice data(buf_data, 1024), index(buf_index, 1024); - cache.insert(miss_key_data, data, &data_handle, page_type_data, false); - cache.insert(miss_key_index, index, &index_handle, page_type_index, false); + std::unique_ptr data = std::make_unique(1024); + std::unique_ptr index = std::make_unique(1024); + cache.insert(miss_key_data, data.release(), &data_handle, page_type_data, false); + cache.insert(miss_key_index, index.release(), &index_handle, page_type_index, false); auto found_data = cache.lookup(miss_key_data, &data_handle, page_type_index); auto found_index = cache.lookup(miss_key_index, &index_handle, page_type_data);