@ -81,6 +81,8 @@ Compaction::Compaction(const TabletSharedPtr& tablet, const std::string& label)
|
||||
_state(CompactionState::INITED) {
|
||||
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::COMPACTION, label);
|
||||
init_profile(label);
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
|
||||
_rowid_conversion = std::make_unique<RowIdConversion>();
|
||||
}
|
||||
|
||||
Compaction::~Compaction() {
|
||||
@ -90,6 +92,7 @@ Compaction::~Compaction() {
|
||||
_input_rowsets.clear();
|
||||
_output_rowset.reset();
|
||||
_cur_tablet_schema.reset();
|
||||
_rowid_conversion.reset();
|
||||
}
|
||||
|
||||
void Compaction::init_profile(const std::string& label) {
|
||||
@ -378,7 +381,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
|
||||
if (!ctx.columns_to_do_index_compaction.empty() ||
|
||||
(_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
|
||||
_tablet->enable_unique_key_merge_on_write())) {
|
||||
stats.rowid_conversion = &_rowid_conversion;
|
||||
stats.rowid_conversion = _rowid_conversion.get();
|
||||
}
|
||||
int64_t way_num = merge_way_num();
|
||||
|
||||
@ -964,7 +967,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
|
||||
// TODO(LiaoXin): check if there are duplicate keys
|
||||
std::size_t missed_rows_size = 0;
|
||||
_tablet->calc_compaction_output_rowset_delete_bitmap(
|
||||
_input_rowsets, _rowid_conversion, 0, version.second + 1, missed_rows.get(),
|
||||
_input_rowsets, *_rowid_conversion, 0, version.second + 1, missed_rows.get(),
|
||||
location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
|
||||
&output_rowset_delete_bitmap);
|
||||
if (missed_rows) {
|
||||
@ -1024,7 +1027,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
|
||||
}
|
||||
DeleteBitmap txn_output_delete_bitmap(_tablet->tablet_id());
|
||||
_tablet->calc_compaction_output_rowset_delete_bitmap(
|
||||
_input_rowsets, _rowid_conversion, 0, UINT64_MAX, missed_rows.get(),
|
||||
_input_rowsets, *_rowid_conversion, 0, UINT64_MAX, missed_rows.get(),
|
||||
location_map.get(), *it.delete_bitmap.get(), &txn_output_delete_bitmap);
|
||||
if (config::enable_merge_on_write_correctness_check) {
|
||||
RowsetIdUnorderedSet rowsetids;
|
||||
@ -1044,7 +1047,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
|
||||
// Convert the delete bitmap of the input rowsets to output rowset for
|
||||
// incremental data.
|
||||
_tablet->calc_compaction_output_rowset_delete_bitmap(
|
||||
_input_rowsets, _rowid_conversion, version.second, UINT64_MAX,
|
||||
_input_rowsets, *_rowid_conversion, version.second, UINT64_MAX,
|
||||
missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
|
||||
&output_rowset_delete_bitmap);
|
||||
|
||||
|
||||
@ -130,7 +130,7 @@ protected:
|
||||
Version _output_version;
|
||||
|
||||
int64_t _newest_write_timestamp;
|
||||
RowIdConversion _rowid_conversion;
|
||||
std::unique_ptr<RowIdConversion> _rowid_conversion = nullptr;
|
||||
TabletSchemaSPtr _cur_tablet_schema;
|
||||
|
||||
std::unique_ptr<RuntimeProfile> _profile;
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/utils.h"
|
||||
#include "runtime/thread_context.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -33,17 +34,24 @@ namespace doris {
|
||||
class RowIdConversion {
|
||||
public:
|
||||
RowIdConversion() = default;
|
||||
~RowIdConversion() = default;
|
||||
~RowIdConversion() { RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used); }
|
||||
|
||||
// resize segment rowid map to its rows num
|
||||
void init_segment_map(const RowsetId& src_rowset_id, const std::vector<uint32_t>& num_rows) {
|
||||
size_t delta_std_pair_cap = 0;
|
||||
for (size_t i = 0; i < num_rows.size(); i++) {
|
||||
uint32_t id = _segments_rowid_map.size();
|
||||
_segment_to_id_map.emplace(std::pair<RowsetId, uint32_t> {src_rowset_id, i}, id);
|
||||
_id_to_segment_map.emplace_back(src_rowset_id, i);
|
||||
_segments_rowid_map.emplace_back(std::vector<std::pair<uint32_t, uint32_t>>(
|
||||
num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX)));
|
||||
std::vector<std::pair<uint32_t, uint32_t>> vec(
|
||||
num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX));
|
||||
delta_std_pair_cap += vec.capacity();
|
||||
_segments_rowid_map.emplace_back(std::move(vec));
|
||||
}
|
||||
//NOTE: manually count _segments_rowid_map's memory here, because _segments_rowid_map could be used by indexCompaction.
|
||||
// indexCompaction is a thridparty code, it's too complex to modify it.
|
||||
// refer compact_column.
|
||||
track_mem_usage(delta_std_pair_cap);
|
||||
}
|
||||
|
||||
// set dst rowset id
|
||||
@ -109,12 +117,27 @@ public:
|
||||
return _segment_to_id_map.at(segment);
|
||||
}
|
||||
|
||||
private:
|
||||
void track_mem_usage(size_t delta_std_pair_cap) {
|
||||
_std_pair_cap += delta_std_pair_cap;
|
||||
|
||||
size_t new_size =
|
||||
_std_pair_cap * sizeof(std::pair<uint32_t, uint32_t>) +
|
||||
_segments_rowid_map.capacity() * sizeof(std::vector<std::pair<uint32_t, uint32_t>>);
|
||||
|
||||
RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used);
|
||||
CONSUME_THREAD_MEM_TRACKER(new_size);
|
||||
_seg_rowid_map_mem_used = new_size;
|
||||
}
|
||||
|
||||
private:
|
||||
// the first level vector: index indicates src segment.
|
||||
// the second level vector: index indicates row id of source segment,
|
||||
// value indicates row id of destination segment.
|
||||
// <UINT32_MAX, UINT32_MAX> indicates current row not exist.
|
||||
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> _segments_rowid_map;
|
||||
size_t _seg_rowid_map_mem_used {0};
|
||||
size_t _std_pair_cap {0};
|
||||
|
||||
// Map source segment to 0 to n
|
||||
std::map<std::pair<RowsetId, uint32_t>, uint32_t> _segment_to_id_map;
|
||||
|
||||
@ -197,6 +197,14 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
|
||||
RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context));
|
||||
if (_segcompaction_worker) {
|
||||
_segcompaction_worker->init_mem_tracker(rowset_writer_context.txn_id);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
|
||||
int32_t segment_id) {
|
||||
DCHECK(_rowset_meta->is_local());
|
||||
|
||||
@ -199,6 +199,8 @@ public:
|
||||
|
||||
Status build(RowsetSharedPtr& rowset) override;
|
||||
|
||||
Status init(const RowsetWriterContext& rowset_writer_context) override;
|
||||
|
||||
Status flush_segment_writer_for_segcompaction(
|
||||
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size,
|
||||
KeyBoundsPB& key_bounds);
|
||||
@ -231,7 +233,7 @@ private:
|
||||
// already been segment compacted
|
||||
std::atomic<int32_t> _num_segcompacted {0}; // index for segment compaction
|
||||
|
||||
std::shared_ptr<SegcompactionWorker> _segcompaction_worker;
|
||||
std::shared_ptr<SegcompactionWorker> _segcompaction_worker = nullptr;
|
||||
|
||||
// ensure only one inflight segcompaction task for each rowset
|
||||
std::atomic<bool> _is_doing_segcompaction {false};
|
||||
|
||||
@ -69,6 +69,11 @@ using namespace ErrorCode;
|
||||
|
||||
SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : _writer(writer) {}
|
||||
|
||||
void SegcompactionWorker::init_mem_tracker(int64_t txn_id) {
|
||||
_seg_compact_mem_tracker = MemTrackerLimiter::create_shared(
|
||||
MemTrackerLimiter::Type::COMPACTION, "segcompaction-" + std::to_string(txn_id));
|
||||
}
|
||||
|
||||
Status SegcompactionWorker::_get_segcompaction_reader(
|
||||
SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet,
|
||||
std::shared_ptr<Schema> schema, OlapReaderStatistics* stat,
|
||||
@ -220,7 +225,8 @@ Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
|
||||
}
|
||||
|
||||
Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) {
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->segcompaction_mem_tracker());
|
||||
DCHECK(_seg_compact_mem_tracker != nullptr);
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker);
|
||||
/* throttle segcompaction task if memory depleted */
|
||||
if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
|
||||
return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to memory shortage");
|
||||
|
||||
@ -50,6 +50,14 @@ class SegcompactionWorker {
|
||||
public:
|
||||
explicit SegcompactionWorker(BetaRowsetWriter* writer);
|
||||
|
||||
~SegcompactionWorker() {
|
||||
DCHECK(_seg_compact_mem_tracker != nullptr);
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker);
|
||||
if (_rowid_conversion) {
|
||||
_rowid_conversion.reset();
|
||||
}
|
||||
}
|
||||
|
||||
void compact_segments(SegCompactionCandidatesSharedPtr segments);
|
||||
|
||||
bool need_convert_delete_bitmap();
|
||||
@ -65,6 +73,8 @@ public:
|
||||
// set the cancel flag, tasks already started will not be cancelled.
|
||||
bool cancel();
|
||||
|
||||
void init_mem_tracker(int64_t txn_id);
|
||||
|
||||
private:
|
||||
Status _create_segment_writer_for_segcompaction(
|
||||
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end);
|
||||
@ -88,8 +98,9 @@ private:
|
||||
io::FileWriterPtr _file_writer;
|
||||
|
||||
// for unique key mow table
|
||||
std::unique_ptr<SimpleRowIdConversion> _rowid_conversion;
|
||||
std::unique_ptr<SimpleRowIdConversion> _rowid_conversion = nullptr;
|
||||
DeleteBitmapPtr _converted_delete_bitmap;
|
||||
std::shared_ptr<MemTrackerLimiter> _seg_compact_mem_tracker = nullptr;
|
||||
|
||||
// the state is not mutable when 1)actual compaction operation started or 2) cancelled
|
||||
std::atomic<bool> _is_compacting_state_mutable = true;
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/utils.h"
|
||||
#include "vec/common/custom_allocator.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -37,7 +38,7 @@ public:
|
||||
_cur_dst_segment_rowid = 0;
|
||||
for (auto seg_rows : num_rows) {
|
||||
_segments_rowid_map.emplace(seg_rows.first,
|
||||
std::vector<uint32_t>(seg_rows.second, UINT32_MAX));
|
||||
DorisVector<uint32_t>(seg_rows.second, UINT32_MAX));
|
||||
}
|
||||
}
|
||||
|
||||
@ -72,7 +73,7 @@ private:
|
||||
// key: index indicates src segment.
|
||||
// value: index indicates row id of source segment, value indicates row id of destination
|
||||
// segment. UINT32_MAX indicates current row not exist.
|
||||
std::map<uint32_t, std::vector<uint32_t>> _segments_rowid_map;
|
||||
DorisMap<uint32_t, DorisVector<uint32_t>> _segments_rowid_map;
|
||||
|
||||
// dst rowset id
|
||||
RowsetId _rowst_id;
|
||||
|
||||
@ -225,8 +225,8 @@ public:
|
||||
// to nullptr, but the object it points to is not initialized. At this time, when the memory
|
||||
// is released somewhere, the hook is triggered to cause the crash.
|
||||
std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr;
|
||||
[[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const {
|
||||
return thread_mem_tracker_mgr->limiter_mem_tracker_raw();
|
||||
[[nodiscard]] std::shared_ptr<MemTrackerLimiter> thread_mem_tracker() const {
|
||||
return thread_mem_tracker_mgr->limiter_mem_tracker();
|
||||
}
|
||||
|
||||
QueryThreadContext query_thread_context();
|
||||
|
||||
82
be/src/vec/common/custom_allocator.h
Normal file
82
be/src/vec/common/custom_allocator.h
Normal file
@ -0,0 +1,82 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "vec/common/allocator.h"
|
||||
#include "vec/common/allocator_fwd.h"
|
||||
|
||||
template <class T, typename MemoryAllocator = Allocator<true>>
|
||||
class CustomStdAllocator;
|
||||
|
||||
template <typename T>
|
||||
using DorisVector = std::vector<T, CustomStdAllocator<T>>;
|
||||
|
||||
template <class Key, class T, class Compare = std::less<Key>,
|
||||
class Allocator = CustomStdAllocator<std::pair<const Key, T>>>
|
||||
using DorisMap = std::map<Key, T, Compare, Allocator>;
|
||||
|
||||
// NOTE: Even CustomStdAllocator 's allocate/dallocate could modify memory tracker,but it's still stateless,
|
||||
// because threadcontext owns the memtracker, not CustomStdAllocator.
|
||||
template <class T, typename MemoryAllocator>
|
||||
class CustomStdAllocator : private MemoryAllocator {
|
||||
public:
|
||||
using value_type = T;
|
||||
using pointer = T*;
|
||||
using const_pointer = const T*;
|
||||
using size_type = std::size_t;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
|
||||
CustomStdAllocator() noexcept = default;
|
||||
|
||||
template <class U>
|
||||
struct rebind {
|
||||
typedef CustomStdAllocator<U> other;
|
||||
};
|
||||
|
||||
template <class Up>
|
||||
CustomStdAllocator(const CustomStdAllocator<Up>&) noexcept {}
|
||||
|
||||
T* allocate(size_t n) { return static_cast<T*>(MemoryAllocator::alloc(n * sizeof(T))); }
|
||||
|
||||
void deallocate(T* ptr, size_t n) noexcept { MemoryAllocator::free((void*)ptr, n * sizeof(T)); }
|
||||
|
||||
size_t max_size() const noexcept { return size_t(~0) / sizeof(T); }
|
||||
|
||||
T* allocate(size_t n, const void*) { return allocate(n); }
|
||||
|
||||
template <class Up, class... Args>
|
||||
void construct(Up* p, Args&&... args) {
|
||||
::new ((void*)p) Up(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
void destroy(T* p) { p->~T(); }
|
||||
|
||||
T* address(T& t) const noexcept { return std::addressof(t); }
|
||||
|
||||
T* address(const T& t) const noexcept { return std::addressof(t); }
|
||||
};
|
||||
|
||||
template <class T, class Up>
|
||||
bool operator==(const CustomStdAllocator<T>&, const CustomStdAllocator<Up>&) {
|
||||
return true;
|
||||
}
|
||||
|
||||
template <class T, class Up>
|
||||
bool operator!=(const CustomStdAllocator<T>&, const CustomStdAllocator<Up>&) {
|
||||
return false;
|
||||
}
|
||||
@ -244,7 +244,7 @@ static RowsetSharedPtr do_compaction(std::vector<RowsetSharedPtr> rowsets,
|
||||
}
|
||||
|
||||
Merger::Statistics stats;
|
||||
stats.rowid_conversion = &compaction._rowid_conversion;
|
||||
stats.rowid_conversion = compaction._rowid_conversion.get();
|
||||
Status st = Merger::vertical_merge_rowsets(
|
||||
tablet, compaction.compaction_type(), compaction._cur_tablet_schema, input_rs_readers,
|
||||
compaction._output_rs_writer.get(), 100000, 5, &stats);
|
||||
|
||||
Reference in New Issue
Block a user