From 11eafe524fea9fa0fc2aa8c8c761fe099c75ed0a Mon Sep 17 00:00:00 2001 From: ZHAO Chun Date: Fri, 13 Sep 2019 08:27:24 +0800 Subject: [PATCH] Add ChunkAllocator to accelerate chunk allocation (#1792) I add ChunkAllocator in this CL to put unused memory chunk to a chunk pool other than return it to system allocator. Now we only change MemPool's chunk allocation and free to this. And two configuration are introduduced too. 'chunk_reserved_bytes_limit' is the limit of how many bytes this chunk pool can reserve in total and its default value is 2147483648(2GB). 'use_mmap_allocate_chunk': if chunk is allocated via mmap and default value is false. And in my test case with default configuration a simple like "select * from table limit 10", this can improve throughput from 280 QPS to to 650 QPS. And when I config 'chunk_reserved_bytes_limit' to 0, which means this is disabled, the throughput is the same with origin's. --- be/src/common/config.h | 13 ++ be/src/common/daemon.cpp | 3 + be/src/runtime/CMakeLists.txt | 2 + be/src/runtime/mem_pool.cpp | 43 ++-- be/src/runtime/mem_pool.h | 24 +-- be/src/runtime/memory/chunk.h | 35 ++++ be/src/runtime/memory/chunk_allocator.cpp | 190 ++++++++++++++++++ be/src/runtime/memory/chunk_allocator.h | 80 ++++++++ be/src/runtime/memory/system_allocator.cpp | 78 +++++++ be/src/runtime/memory/system_allocator.h | 38 ++++ be/src/util/cpu_info.cpp | 3 +- .../olap/rowset/segment_v2/segment_test.cpp | 2 +- be/test/runtime/CMakeLists.txt | 2 + be/test/runtime/mem_pool_test.cpp | 14 +- .../runtime/memory/chunk_allocator_test.cpp | 47 +++++ .../runtime/memory/system_allocator_test.cpp | 53 +++++ run-ut.sh | 2 + 17 files changed, 583 insertions(+), 46 deletions(-) create mode 100644 be/src/runtime/memory/chunk.h create mode 100644 be/src/runtime/memory/chunk_allocator.cpp create mode 100644 be/src/runtime/memory/chunk_allocator.h create mode 100644 be/src/runtime/memory/system_allocator.cpp create mode 100644 be/src/runtime/memory/system_allocator.h create mode 100644 be/test/runtime/memory/chunk_allocator_test.cpp create mode 100644 be/test/runtime/memory/system_allocator_test.cpp diff --git a/be/src/common/config.h b/be/src/common/config.h index d094638682..f41ee4e713 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -303,6 +303,19 @@ namespace config { CONF_Bool(disable_mem_pools, "false"); + // Whether to allocate chunk using mmap. If you enable this, you'd better to + // increase vm.max_map_count's value whose default value is 65530. + // you can do it as root via "sysctl -w vm.max_map_count=262144" or + // "echo 262144 > /proc/sys/vm/max_map_count" + // NOTE: When this is set to true, you must set chunk_reserved_bytes_limit + // to a relative large number or the performace is very very bad. + CONF_Bool(use_mmap_allocate_chunk, "false"); + + // Chunk Allocator's reserved bytes limit, + // Default value is 2GB, increase this variable can improve performance, but will + // aquire more free memory which can not be used by other modules + CONF_Int64(chunk_reserved_bytes_limit, "2147483648"); + // The probing algorithm of partitioned hash table. // Enable quadratic probing hash table CONF_Bool(enable_quadratic_probing, "false"); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index c68794631f..f6fc98908e 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -31,6 +31,7 @@ #include "util/doris_metrics.h" #include "runtime/bufferpool/buffer_pool.h" #include "runtime/exec_env.h" +#include "runtime/memory/chunk_allocator.h" #include "runtime/mem_tracker.h" #include "runtime/user_function_cache.h" #include "exprs/operators.h" @@ -286,6 +287,8 @@ void init_daemon(int argc, char** argv, const std::vector& paths) { LOG(INFO) << MemInfo::debug_string(); init_doris_metrics(paths); init_signals(); + + ChunkAllocator::init_instance(config::chunk_reserved_bytes_limit); } } diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index bd823f8ec0..e46afeef3e 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -103,6 +103,8 @@ set(RUNTIME_FILES result_queue_mgr.cpp memory_scratch_sink.cpp external_scan_context_mgr.cpp + memory/system_allocator.cpp + memory/chunk_allocator.cpp ) if (WITH_MYSQL) diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index 76b2c941ad..95086c1636 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include "runtime/memory/chunk_allocator.h" #include "runtime/mem_pool.h" #include "runtime/mem_tracker.h" #include "util/bit_util.h" @@ -37,18 +38,17 @@ const char* MemPool::LLVM_CLASS_NAME = "class.doris::MemPool"; const int MemPool::DEFAULT_ALIGNMENT; uint32_t MemPool::k_zero_length_region_ alignas(std::max_align_t) = MEM_POOL_POISON; -MemPool::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf) - : data(buf), - size(size), +MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) + : chunk(chunk_), allocated_bytes(0) { - DorisMetrics::memory_pool_bytes_total.increment(size); + DorisMetrics::memory_pool_bytes_total.increment(chunk.size); } MemPool::~MemPool() { int64_t total_bytes_released = 0; for (auto& chunk : chunks_) { - total_bytes_released += chunk.size; - free(chunk.data); + total_bytes_released += chunk.chunk.size; + ChunkAllocator::instance()->free(chunk.chunk); } mem_tracker_->release(total_bytes_released); DorisMetrics::memory_pool_bytes_total.increment(-total_bytes_released); @@ -58,7 +58,7 @@ void MemPool::clear() { current_chunk_idx_ = -1; for (auto& chunk: chunks_) { chunk.allocated_bytes = 0; - ASAN_POISON_MEMORY_REGION(chunk.data, chunk.size); + ASAN_POISON_MEMORY_REGION(chunk.chunk.data, chunk.chunk.size); } total_allocated_bytes_ = 0; DCHECK(check_integrity(false)); @@ -67,8 +67,8 @@ void MemPool::clear() { void MemPool::free_all() { int64_t total_bytes_released = 0; for (auto& chunk: chunks_) { - total_bytes_released += chunk.size; - free(chunk.data); + total_bytes_released += chunk.chunk.size; + ChunkAllocator::instance()->free(chunk.chunk); } chunks_.clear(); next_chunk_size_ = INITIAL_CHUNK_SIZE; @@ -96,7 +96,7 @@ bool MemPool::find_chunk(size_t min_size, bool check_limits) { for (int idx = current_chunk_idx_ + 1; idx < chunks_.size(); ++idx) { // All chunks after 'current_chunk_idx_' should be free. DCHECK_EQ(chunks_[idx].allocated_bytes, 0); - if (chunks_[idx].size >= min_size) { + if (chunks_[idx].chunk.size >= min_size) { // This chunk is big enough. Move it before the other free chunks. if (idx != first_free_idx) std::swap(chunks_[idx], chunks_[first_free_idx]); current_chunk_idx_ = first_free_idx; @@ -118,26 +118,25 @@ bool MemPool::find_chunk(size_t min_size, bool check_limits) { chunk_size = max(min_size, next_chunk_size_); } + chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size); if (check_limits) { if (!mem_tracker_->try_consume(chunk_size)) return false; } else { mem_tracker_->consume(chunk_size); } - // Allocate a new chunk. Return early if malloc fails. - uint8_t* buf = reinterpret_cast(malloc(chunk_size)); - if (UNLIKELY(buf == nullptr)) { + // Allocate a new chunk. Return early if allocate fails. + Chunk chunk; + if (!ChunkAllocator::instance()->allocate(chunk_size, &chunk)) { mem_tracker_->release(chunk_size); return false; } - - ASAN_POISON_MEMORY_REGION(buf, chunk_size); - + ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size); // Put it before the first free chunk. If no free chunks, it goes at the end. if (first_free_idx == static_cast(chunks_.size())) { - chunks_.emplace_back(chunk_size, buf); + chunks_.emplace_back(chunk); } else { - chunks_.insert(chunks_.begin() + first_free_idx, ChunkInfo(chunk_size, buf)); + chunks_.insert(chunks_.begin() + first_free_idx, ChunkInfo(chunk)); } current_chunk_idx_ = first_free_idx; total_reserved_bytes_ += chunk_size; @@ -169,7 +168,7 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) { auto end_chunk = src->chunks_.begin() + num_acquired_chunks; int64_t total_transfered_bytes = 0; for (auto i = src->chunks_.begin(); i != end_chunk; ++i) { - total_transfered_bytes += i->size; + total_transfered_bytes += i->chunk.size; } src->total_reserved_bytes_ -= total_transfered_bytes; total_reserved_bytes_ += total_transfered_bytes; @@ -224,10 +223,10 @@ string MemPool::debug_string() { char str[16]; out << "MemPool(#chunks=" << chunks_.size() << " ["; for (int i = 0; i < chunks_.size(); ++i) { - sprintf(str, "0x%lx=", reinterpret_cast(chunks_[i].data)); + sprintf(str, "0x%lx=", reinterpret_cast(chunks_[i].chunk.data)); out << (i > 0 ? " " : "") << str - << chunks_[i].size + << chunks_[i].chunk.size << "/" << chunks_[i].allocated_bytes; } out << "] current_chunk=" << current_chunk_idx_ @@ -246,7 +245,7 @@ bool MemPool::check_integrity(bool check_current_chunk_empty) { // check that current_chunk_idx_ points to the last chunk with allocated data int64_t total_allocated = 0; for (int i = 0; i < chunks_.size(); ++i) { - DCHECK_GT(chunks_[i].size, 0); + DCHECK_GT(chunks_[i].chunk.size, 0); if (i < current_chunk_idx_) { DCHECK_GT(chunks_[i].allocated_bytes, 0); } else if (i == current_chunk_idx_) { diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 0dd91d54f9..80fda97e2e 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -28,6 +28,7 @@ #include "common/logging.h" #include "gutil/dynamic_annotations.h" #include "util/bit_util.h" +#include "runtime/memory/chunk.h" namespace doris { @@ -173,18 +174,11 @@ private: static const int MAX_CHUNK_SIZE = 512 * 1024; struct ChunkInfo { - uint8_t* data; // Owned by the ChunkInfo. - int64_t size; // in bytes - + Chunk chunk; /// bytes allocated via Allocate() in this chunk int64_t allocated_bytes; - - explicit ChunkInfo(int64_t size, uint8_t* buf); - - ChunkInfo() - : data(NULL), - size(0), - allocated_bytes(0) {} + explicit ChunkInfo(const Chunk& chunk); + ChunkInfo() : allocated_bytes(0) { } }; /// A static field used as non-NULL pointer for zero length allocations. NULL is @@ -220,12 +214,12 @@ private: ChunkInfo& info = chunks_[current_chunk_idx_]; int64_t aligned_allocated_bytes = BitUtil::RoundUpToPowerOf2( info.allocated_bytes, alignment); - if (aligned_allocated_bytes + size <= info.size) { + if (aligned_allocated_bytes + size <= info.chunk.size) { // Ensure the requested alignment is respected. int64_t padding = aligned_allocated_bytes - info.allocated_bytes; - uint8_t* result = info.data + aligned_allocated_bytes; + uint8_t* result = info.chunk.data + aligned_allocated_bytes; ASAN_UNPOISON_MEMORY_REGION(result, size); - DCHECK_LE(info.allocated_bytes + size, info.size); + DCHECK_LE(info.allocated_bytes + size, info.chunk.size); info.allocated_bytes += padding + size; total_allocated_bytes_ += padding + size; DCHECK_LE(current_chunk_idx_, chunks_.size() - 1); @@ -241,9 +235,9 @@ private: if (UNLIKELY(!find_chunk(size, CHECK_LIMIT_FIRST))) return NULL; ChunkInfo& info = chunks_[current_chunk_idx_]; - uint8_t* result = info.data + info.allocated_bytes; + uint8_t* result = info.chunk.data + info.allocated_bytes; ASAN_UNPOISON_MEMORY_REGION(result, size); - DCHECK_LE(info.allocated_bytes + size, info.size); + DCHECK_LE(info.allocated_bytes + size, info.chunk.size); info.allocated_bytes += size; total_allocated_bytes_ += size; DCHECK_LE(current_chunk_idx_, chunks_.size() - 1); diff --git a/be/src/runtime/memory/chunk.h b/be/src/runtime/memory/chunk.h new file mode 100644 index 0000000000..703489a453 --- /dev/null +++ b/be/src/runtime/memory/chunk.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace doris { + +// A chunk of continuous memory. +// Almost all files depend on this struct, and each modification +// will result in recompilation of all files. So, we put it in a +// file to keep this file simple and infrequently changed. +struct Chunk { + uint8_t* data; + size_t size; + int core_id; +}; + +} diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp new file mode 100644 index 0000000000..e4facb16cc --- /dev/null +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/memory/chunk_allocator.h" + +#include +#include +#include + +#include "gutil/dynamic_annotations.h" +#include "runtime/memory/chunk.h" +#include "runtime/memory/system_allocator.h" +#include "util/bit_util.h" +#include "util/cpu_info.h" +#include "util/doris_metrics.h" +#include "util/runtime_profile.h" +#include "util/spinlock.h" + +namespace doris { + +ChunkAllocator* ChunkAllocator::_s_instance = nullptr; + +static IntCounter local_core_alloc_count; +static IntCounter other_core_alloc_count; +static IntCounter system_alloc_count; +static IntCounter system_free_count; +static IntCounter system_alloc_cost_ns; +static IntCounter system_free_cost_ns; + +#if BE_TEST +ChunkAllocator* ChunkAllocator::instance() { + if (_s_instance == nullptr) { + DorisMetrics::instance()->initialize("common_ut"); + CpuInfo::init(); + ChunkAllocator::init_instance(4096); + } + return _s_instance; +} +#endif + + +// Keep free chunk's ptr in size separated free list. +// This class is thread-safe. +class ChunkArena { +public: + ChunkArena() : _chunk_lists(64) { } + + ~ChunkArena() { + for (int i = 0; i < 64; ++i) { + if (_chunk_lists[i].empty()) continue; + size_t size = (uint64_t)1 << i; + for (auto ptr : _chunk_lists[i]) { + SystemAllocator::free(ptr, size); + } + } + } + + // Try to pop a free chunk from corresponding free list. + // Return true if success + bool pop_free_chunk(size_t size, uint8_t** ptr) { + int idx = BitUtil::Log2Ceiling64(size); + auto& free_list = _chunk_lists[idx]; + + std::lock_guard l(_lock); + if (free_list.empty()) { + return false; + } + *ptr = free_list.back(); + free_list.pop_back(); + ASAN_UNPOISON_MEMORY_REGION(*ptr, size); + return true; + } + + void push_free_chunk(uint8_t* ptr, size_t size) { + int idx = BitUtil::Log2Ceiling64(size); + // Poison this chunk to make asan can detect invalid access + ASAN_POISON_MEMORY_REGION(ptr, size); + std::lock_guard l(_lock); + _chunk_lists[idx].push_back(ptr); + } +private: + SpinLock _lock; + std::vector> _chunk_lists; +}; + +void ChunkAllocator::init_instance(size_t reserve_limit) { + if (_s_instance != nullptr) return; + _s_instance = new ChunkAllocator(reserve_limit); + +#define REGISTER_METIRC_WITH_NAME(name, metric) \ + DorisMetrics::metrics()->register_metric(#name, &metric) + +#define REGISTER_METIRC_WITH_PREFIX(prefix, name) \ + REGISTER_METIRC_WITH_NAME(prefix##name, name) + +#define REGISTER_METIRC(name) \ + REGISTER_METIRC_WITH_PREFIX(chunk_pool_, name) + + REGISTER_METIRC(local_core_alloc_count); + REGISTER_METIRC(other_core_alloc_count); + REGISTER_METIRC(system_alloc_count); + REGISTER_METIRC(system_free_count); + REGISTER_METIRC(system_alloc_cost_ns); + REGISTER_METIRC(system_free_cost_ns); +} + +ChunkAllocator::ChunkAllocator(size_t reserve_limit) + : _reserve_bytes_limit(reserve_limit), + _reserved_bytes(0), + _arenas(CpuInfo::get_max_num_cores()) { + for (int i = 0; i < _arenas.size(); ++i) { + _arenas[i].reset(new ChunkArena()); + } +} + +bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { + // fast path: allocate from current core arena + int core_id = CpuInfo::get_current_core(); + chunk->size = size; + chunk->core_id = core_id; + + if (_arenas[core_id]->pop_free_chunk(size, &chunk->data)) { + _reserved_bytes.fetch_sub(size); + local_core_alloc_count.increment(1); + return true; + } + if (_reserved_bytes > size) { + // try to allocate from other core's arena + ++core_id; + for (int i = 1; i < _arenas.size(); ++i, ++core_id) { + if (_arenas[core_id % _arenas.size()]->pop_free_chunk(size, &chunk->data)) { + _reserved_bytes.fetch_sub(size); + other_core_alloc_count.increment(1); + // reset chunk's core_id to other + chunk->core_id = core_id % _arenas.size(); + return true; + } + } + } + + int64_t cost_ns = 0; + { + SCOPED_RAW_TIMER(&cost_ns); + // allocate from system allocator + chunk->data = SystemAllocator::allocate(size); + } + system_alloc_count.increment(1); + system_alloc_cost_ns.increment(cost_ns); + if (chunk->data == nullptr) { + return false; + } + return true; +} + +void ChunkAllocator::free(const Chunk& chunk) { + int64_t old_reserved_bytes = _reserved_bytes; + int64_t new_reserved_bytes = 0; + do { + new_reserved_bytes = old_reserved_bytes + chunk.size; + if (new_reserved_bytes > _reserve_bytes_limit) { + int64_t cost_ns = 0; + { + SCOPED_RAW_TIMER(&cost_ns); + SystemAllocator::free(chunk.data, chunk.size); + } + system_free_count.increment(1); + system_free_cost_ns.increment(cost_ns); + + return; + } + } while (!_reserved_bytes.compare_exchange_weak(old_reserved_bytes, new_reserved_bytes)); + + _arenas[chunk.core_id]->push_free_chunk(chunk.data, chunk.size); +} + +} diff --git a/be/src/runtime/memory/chunk_allocator.h b/be/src/runtime/memory/chunk_allocator.h new file mode 100644 index 0000000000..1ec7fba9d8 --- /dev/null +++ b/be/src/runtime/memory/chunk_allocator.h @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace doris { + +class Chunk; +class ChunkArena; + +// Used to allocate memory with power-of-two length. +// This Allocator allocate memory from system and cache free chunks for +// later use. +// +// ChunkAllocator has one ChunkArena for each CPU core, it will try to allocate +// memory from current core arena firstly. In this way, there will be no lock contention +// between concurrently-running threads. If this fails, ChunkAllocator will try to allocate +// memroy from other core's arena. +// +// Memory Reservation +// ChunkAllocator has a limit about how much free chunk bytes it can reserve, above which +// chunk will released to system memory. For the worst case, when the limits is 0, it will +// act as allocating directly from system. +// +// ChunkArena will keep a separate free list for each chunk size. In common case, chunk will +// be allocated from current core arena. In this case, there is no lock contention. +// +// Must call CpuInfo::init() and DorisMetrics::initialize() to achieve good performance +// before first object is created. And call init_instance() before use instance is called. +class ChunkAllocator { +public: + static void init_instance(size_t reserve_limit); + +#if BE_TEST + static ChunkAllocator* instance(); +#else + static ChunkAllocator* instance() { + return _s_instance; + } +#endif + + ChunkAllocator(size_t reserve_limit); + + // Allocate a Chunk with a power-of-two length "size". + // Return true if success and allocated chunk is saved in "chunk". + // Otherwise return false. + bool allocate(size_t size, Chunk* chunk); + + // Free chunk allocated from this allocator + void free(const Chunk& chunk); +private: + static ChunkAllocator* _s_instance; + + size_t _reserve_bytes_limit; + std::atomic _reserved_bytes; + // each core has a ChunkArena + std::vector> _arenas; +}; + +} diff --git a/be/src/runtime/memory/system_allocator.cpp b/be/src/runtime/memory/system_allocator.cpp new file mode 100644 index 0000000000..8f08ad687e --- /dev/null +++ b/be/src/runtime/memory/system_allocator.cpp @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/memory/system_allocator.h" + +#include +#include + +#include + +#include "common/config.h" +#include "common/logging.h" + +namespace doris { + +#define PAGE_SIZE (4 * 1024) // 4K + +uint8_t* SystemAllocator::allocate(size_t length) { + if (config::use_mmap_allocate_chunk) { + return allocate_via_mmap(length); + } else { + return allocate_via_malloc(length); + } +} + +void SystemAllocator::free(uint8_t* ptr, size_t length) { + if (config::use_mmap_allocate_chunk) { + auto res = munmap(ptr, length); + if (res != 0) { + char buf[64]; + LOG(ERROR) << "fail to free memory via munmap, errno=" << errno + << ", errmsg=" << strerror_r(errno, buf, 64); + } + } else { + ::free(ptr); + } +} + +uint8_t* SystemAllocator::allocate_via_malloc(size_t length) { + void* ptr = nullptr; + // try to use a whole page instead of parts of one page + int res = posix_memalign(&ptr, PAGE_SIZE, length); + if (res != 0) { + char buf[64]; + LOG(ERROR) << "fail to allocate mem via posix_memalign, res=" << res + << ", errmsg=" << strerror_r(res, buf, 64); + return nullptr; + } + return (uint8_t*)ptr; +} + +uint8_t* SystemAllocator::allocate_via_mmap(size_t length) { + auto ptr = (uint8_t*)mmap(nullptr, length, PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); + if (ptr == MAP_FAILED) { + char buf[64]; + LOG(ERROR) << "fail to allocate memory via mmap, errno=" << errno + << ", errmsg=" << strerror_r(errno, buf, 64); + return nullptr; + } + return ptr; +} + +} diff --git a/be/src/runtime/memory/system_allocator.h b/be/src/runtime/memory/system_allocator.h new file mode 100644 index 0000000000..930b9c9caa --- /dev/null +++ b/be/src/runtime/memory/system_allocator.h @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace doris { + +// Allocate memory from system allocator, this allocator can be configured +// to allocate memory via mmap or malloc. +class SystemAllocator { +public: + static uint8_t* allocate(size_t length); + + static void free(uint8_t* ptr, size_t length); +private: + static uint8_t* allocate_via_mmap(size_t length); + static uint8_t* allocate_via_malloc(size_t length); + +}; + +} diff --git a/be/src/util/cpu_info.cpp b/be/src/util/cpu_info.cpp index ba3e3435f5..7bdb120e3d 100755 --- a/be/src/util/cpu_info.cpp +++ b/be/src/util/cpu_info.cpp @@ -75,7 +75,7 @@ int64_t CpuInfo::hardware_flags_ = 0; int64_t CpuInfo::original_hardware_flags_; int64_t CpuInfo::cycles_per_ms_; int CpuInfo::num_cores_ = 1; -int CpuInfo::max_num_cores_; +int CpuInfo::max_num_cores_ = 1; string CpuInfo::model_name_ = "unknown"; int CpuInfo::max_num_numa_nodes_; unique_ptr CpuInfo::core_to_numa_node_; @@ -111,6 +111,7 @@ int64_t ParseCPUFlags(const string& values) { } void CpuInfo::init() { + if (initialized_) return; string line; string name; string value; diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp index e9fa03c359..c9b327432f 100644 --- a/be/test/olap/rowset/segment_v2/segment_test.cpp +++ b/be/test/olap/rowset/segment_v2/segment_test.cpp @@ -364,7 +364,7 @@ TEST_F(SegmentReaderWriterTest, estimate_segment_size) { uint32_t segment_size = writer.estimate_segment_size(); LOG(INFO) << "estimate segment size is:" << segment_size; - uint32_t file_size = 0; + uint64_t file_size = 0; st = writer.finalize(&file_size); ASSERT_TRUE(st.ok()); diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt index e002e4d171..ccfe08bc4b 100644 --- a/be/test/runtime/CMakeLists.txt +++ b/be/test/runtime/CMakeLists.txt @@ -63,3 +63,5 @@ ADD_BE_TEST(small_file_mgr_test) ADD_BE_TEST(result_queue_mgr_test) ADD_BE_TEST(memory_scratch_sink_test) ADD_BE_TEST(external_scan_context_mgr_test) +ADD_BE_TEST(memory/chunk_allocator_test) +ADD_BE_TEST(memory/system_allocator_test) diff --git a/be/test/runtime/mem_pool_test.cpp b/be/test/runtime/mem_pool_test.cpp index 10aa9c1cd6..ca39f3613c 100644 --- a/be/test/runtime/mem_pool_test.cpp +++ b/be/test/runtime/mem_pool_test.cpp @@ -55,27 +55,27 @@ TEST(MemPoolTest, Basic) { // size of the next allocated chunk (64K) p.allocate(65 * 1024); EXPECT_EQ((12 + 8 + 65) * 1024 - 7, p.total_allocated_bytes()); - EXPECT_EQ((16 + 32 + 65) * 1024, p.total_reserved_bytes()); + EXPECT_EQ((16 + 32 + 128) * 1024, p.total_reserved_bytes()); // Clear() resets allocated data, but doesn't remove any chunks p.clear(); EXPECT_EQ(0, p.total_allocated_bytes()); - EXPECT_EQ((16 + 32 + 65) * 1024, p.total_reserved_bytes()); + EXPECT_EQ((16 + 32 + 128) * 1024, p.total_reserved_bytes()); // next allocation reuses existing chunks p.allocate(1024); EXPECT_EQ(1024, p.total_allocated_bytes()); - EXPECT_EQ((16 + 32 + 65) * 1024, p.total_reserved_bytes()); + EXPECT_EQ((16 + 32 + 128) * 1024, p.total_reserved_bytes()); // ... unless it doesn't fit into any available chunk p.allocate(120 * 1024); EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes()); - EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.total_reserved_bytes()); + EXPECT_EQ((16 + 32 + 128) * 1024, p.total_reserved_bytes()); // ... Try another chunk that fits into an existing chunk p.allocate(33 * 1024); EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes()); - EXPECT_EQ((130 + 65 + 16 + 32) * 1024, p.total_reserved_bytes()); + EXPECT_EQ((16 + 32 + 128 + 256) * 1024, p.total_reserved_bytes()); // we're releasing 3 chunks, which get added to p2 p2.acquire_data(&p, false); @@ -84,13 +84,13 @@ TEST(MemPoolTest, Basic) { p3.acquire_data(&p2, true); // we're keeping the 65k chunk EXPECT_EQ(33 * 1024, p2.total_allocated_bytes()); - EXPECT_EQ(65 * 1024, p2.total_reserved_bytes()); + EXPECT_EQ(256 * 1024, p2.total_reserved_bytes()); { MemPool p4(&tracker); p4.exchange_data(&p2); EXPECT_EQ(33 * 1024, p4.total_allocated_bytes()); - EXPECT_EQ(65 * 1024, p4.total_reserved_bytes()); + EXPECT_EQ(256 * 1024, p4.total_reserved_bytes()); } } diff --git a/be/test/runtime/memory/chunk_allocator_test.cpp b/be/test/runtime/memory/chunk_allocator_test.cpp new file mode 100644 index 0000000000..470c3de9dd --- /dev/null +++ b/be/test/runtime/memory/chunk_allocator_test.cpp @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/memory/chunk_allocator.h" + +#include + +#include "common/config.h" +#include "runtime/memory/chunk.h" +#include "util/cpu_info.h" +#include "util/doris_metrics.h" + +namespace doris { + +TEST(ChunkAllocatorTest, Normal) { + config::use_mmap_allocate_chunk = true; + for (size_t size = 4096; size <= 1024 * 1024; size <<= 1) { + Chunk chunk; + ASSERT_TRUE(ChunkAllocator::instance()->allocate(size, &chunk)); + ASSERT_NE(nullptr, chunk.data); + ASSERT_EQ(size, chunk.size); + ChunkAllocator::instance()->free(chunk); + } +} +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + doris::DorisMetrics::instance()->initialize("chunk_allocator_ut"); + doris::CpuInfo::init(); + doris::ChunkAllocator::init_instance(1024 * 1024); + return RUN_ALL_TESTS(); +} diff --git a/be/test/runtime/memory/system_allocator_test.cpp b/be/test/runtime/memory/system_allocator_test.cpp new file mode 100644 index 0000000000..87450643b6 --- /dev/null +++ b/be/test/runtime/memory/system_allocator_test.cpp @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/memory/system_allocator.h" + +#include + +#include "common/config.h" + +namespace doris { + +template +void test_normal() { + config::use_mmap_allocate_chunk = use_mmap; + { + auto ptr = SystemAllocator::allocate(4096); + ASSERT_NE(nullptr, ptr); + ASSERT_EQ(0, (uint64_t)ptr % 4096); + SystemAllocator::free(ptr, 4096); + } + { + auto ptr = SystemAllocator::allocate(100); + ASSERT_NE(nullptr, ptr); + ASSERT_EQ(0, (uint64_t)ptr % 4096); + SystemAllocator::free(ptr, 100); + } +} + +TEST(SystemAllocatorTest, TestNormal) { + test_normal(); + test_normal(); +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/run-ut.sh b/run-ut.sh index 6c5a23944b..8b16d47abb 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -211,6 +211,8 @@ ${DORIS_TEST_BINARY_DIR}/runtime/snapshot_loader_test ${DORIS_TEST_BINARY_DIR}/runtime/user_function_cache_test ${DORIS_TEST_BINARY_DIR}/runtime/small_file_mgr_test ${DORIS_TEST_BINARY_DIR}/runtime/mem_pool_test +${DORIS_TEST_BINARY_DIR}/runtime/memory/chunk_allocator_test +${DORIS_TEST_BINARY_DIR}/runtime/memory/system_allocator_test # Running expr Unittest # Running http