[Improvement](inverted index) Enhance compaction performance through direct inverted index merging (#19207)

This commit is contained in:
airborne12
2023-05-08 14:07:32 +08:00
committed by GitHub
parent c7a04fa05a
commit f199860dea
7 changed files with 215 additions and 7 deletions

View File

@ -934,6 +934,8 @@ CONF_String(inverted_index_dict_path, "${DORIS_HOME}/dict");
CONF_Int32(inverted_index_read_buffer_size, "4096");
// tree depth for bkd index
CONF_Int32(max_depth_in_bkd_tree, "32");
// index compaction
CONF_Bool(inverted_index_compaction_enable, "false");
// use num_broadcast_buffer blocks as buffer to do broadcast
CONF_Int32(num_broadcast_buffer, "32");
// semi-structure configs

View File

@ -78,6 +78,7 @@ add_library(Olap STATIC
rowset/segment_v2/inverted_index_desc.cpp
rowset/segment_v2/inverted_index_compound_directory.cpp
rowset/segment_v2/inverted_index_compound_reader.cpp
rowset/segment_v2/inverted_index_compaction.cpp
rowset/segment_v2/bitshuffle_page.cpp
rowset/segment_v2/bitshuffle_wrapper.cpp
rowset/segment_v2/column_reader.cpp

View File

@ -43,6 +43,7 @@
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/inverted_index_compaction.h"
#include "olap/storage_engine.h"
#include "olap/storage_policy.h"
#include "olap/tablet.h"
@ -163,7 +164,8 @@ bool Compaction::is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr&
Status Compaction::do_compact_ordered_rowsets() {
build_basic_info();
RETURN_NOT_OK(construct_output_rowset_writer());
RowsetWriterContext ctx;
RETURN_NOT_OK(construct_output_rowset_writer(ctx));
LOG(INFO) << "start to do ordered data compaction, tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version;
@ -292,8 +294,9 @@ Status Compaction::do_compaction_impl(int64_t permits) {
LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version << ", permits: " << permits;
bool vertical_compaction = should_vertical_compaction();
RowsetWriterContext ctx;
RETURN_NOT_OK(construct_input_rowset_readers());
RETURN_NOT_OK(construct_output_rowset_writer(vertical_compaction));
RETURN_NOT_OK(construct_output_rowset_writer(ctx, vertical_compaction));
if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
Tablet::add_pending_remote_rowset(_output_rs_writer->rowset_id().to_string());
}
@ -302,8 +305,10 @@ Status Compaction::do_compaction_impl(int64_t permits) {
// 2. write merged rows to output rowset
// The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
Merger::Statistics stats;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
// if ctx.skip_inverted_index.size() > 0, it means we need to do inverted index compaction.
// the row ID conversion matrix needs to be used for inverted index compaction.
if (ctx.skip_inverted_index.size() > 0 || (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
stats.rowid_conversion = &_rowid_conversion;
}
@ -343,6 +348,70 @@ Status Compaction::do_compaction_impl(int64_t permits) {
RETURN_NOT_OK(check_correctness(stats));
TRACE("check correctness finished");
if (_input_row_num > 0 && stats.rowid_conversion && config::inverted_index_compaction_enable) {
OlapStopWatch inverted_watch;
// translation vec
// <<dest_idx_num, desc_docId>>
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec =
stats.rowid_conversion->get_rowid_conversion_map();
// source rowset,segment -> index_id
std::map<std::pair<RowsetId, uint32_t>, uint32_t> src_seg_to_id_map =
stats.rowid_conversion->get_src_segment_to_id_map();
// dest rowset id
RowsetId dest_rowset_id = stats.rowid_conversion->get_dst_rowset_id();
// dest segment id -> num rows
std::vector<uint32_t> dest_segment_num_rows;
RETURN_IF_ERROR(_output_rs_writer->get_segment_num_rows(&dest_segment_num_rows));
auto src_segment_num = src_seg_to_id_map.size();
auto dest_segment_num = dest_segment_num_rows.size();
// src index files
// format: rowsetId_segmentId
std::vector<std::string> src_index_files(src_segment_num);
for (auto m : src_seg_to_id_map) {
std::pair<RowsetId, uint32_t> p = m.first;
src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second);
}
// dest index files
// format: rowsetId_segmentId
std::vector<std::string> dest_index_files(dest_segment_num);
for (int i = 0; i < dest_segment_num; ++i) {
auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i);
dest_index_files[i] = prefix;
}
// create index_writer to compaction indexes
auto& fs = _output_rowset->rowset_meta()->fs();
auto tablet_path = _output_rowset->tablet_path();
DCHECK(dest_index_files.size() > 0);
// we choose the first destination segment name as the temporary index writer path
// Used to distinguish between different index compaction
auto index_writer_path = tablet_path + "/" + dest_index_files[0];
LOG(INFO) << "start index compaction"
<< ". tablet=" << _tablet->full_name()
<< ", source index size=" << src_segment_num
<< ", destination index size=" << dest_segment_num << ".";
std::for_each(
ctx.skip_inverted_index.cbegin(), ctx.skip_inverted_index.cend(),
[&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files,
&dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows,
this](int32_t column_uniq_id) {
compact_column(
_cur_tablet_schema->get_inverted_index(column_uniq_id)->index_id(),
src_segment_num, dest_segment_num, src_index_files, dest_index_files,
fs, index_writer_path, tablet_path, trans_vec, dest_segment_num_rows);
});
LOG(INFO) << "succeed to do index compaction"
<< ". tablet=" << _tablet->full_name() << ", input row number=" << _input_row_num
<< ", output row number=" << _output_rowset->num_rows()
<< ". elapsed time=" << inverted_watch.get_elapse_second() << "s.";
}
// 4. modify rowsets in memory
RETURN_NOT_OK(modify_rowsets(&stats));
TRACE("modify rowsets finished");
@ -382,13 +451,24 @@ Status Compaction::do_compaction_impl(int64_t permits) {
return Status::OK();
}
Status Compaction::construct_output_rowset_writer(bool is_vertical) {
RowsetWriterContext ctx;
Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool is_vertical) {
ctx.version = _output_version;
ctx.rowset_state = VISIBLE;
ctx.segments_overlap = NONOVERLAPPING;
ctx.tablet_schema = _cur_tablet_schema;
ctx.newest_write_timestamp = _newest_write_timestamp;
if (config::inverted_index_compaction_enable &&
((_tablet->keys_type() == KeysType::UNIQUE_KEYS ||
_tablet->keys_type() == KeysType::DUP_KEYS))) {
for (auto& index : _cur_tablet_schema->indexes()) {
if (index.index_type() == IndexType::INVERTED) {
auto unique_id = index.col_unique_ids()[0];
if (field_is_slice_type(_cur_tablet_schema->column_by_uid(unique_id).type())) {
ctx.skip_inverted_index.insert(unique_id);
}
}
}
}
if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
// write output rowset to storage policy resource
auto storage_policy = get_storage_policy(_tablet->storage_policy_id());

View File

@ -73,7 +73,7 @@ protected:
virtual Status modify_rowsets(const Merger::Statistics* stats = nullptr);
void gc_output_rowset();
Status construct_output_rowset_writer(bool is_vertical = false);
Status construct_output_rowset_writer(RowsetWriterContext& ctx, bool is_vertical = false);
Status construct_input_rowset_readers();
Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);

View File

@ -48,6 +48,7 @@ public:
// set dst rowset id
void set_dst_rowset_id(const RowsetId& dst_rowset_id) { _dst_rowst_id = dst_rowset_id; }
const RowsetId get_dst_rowset_id() { return _dst_rowst_id; }
// add row id to the map
void add(const std::vector<RowLocation>& rss_row_ids,
@ -92,6 +93,10 @@ public:
return _segments_rowid_map;
}
const std::map<std::pair<RowsetId, uint32_t>, uint32_t>& get_src_segment_to_id_map() {
return _segment_to_id_map;
}
std::pair<RowsetId, uint32_t> get_segment_by_id(uint32_t id) const {
DCHECK_GT(_id_to_segment_map.size(), id);
return _id_to_segment_map.at(id);

View File

@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "inverted_index_compaction.h"
#include <CLucene.h>
#include "inverted_index_compound_directory.h"
#include "inverted_index_compound_reader.h"
namespace doris {
namespace segment_v2 {
void compact_column(int32_t index_id, int src_segment_num, int dest_segment_num,
std::vector<std::string> src_index_files,
std::vector<std::string> dest_index_files, const io::FileSystemSPtr& fs,
std::string index_writer_path, std::string tablet_path,
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec,
std::vector<uint32_t> dest_segment_num_rows) {
lucene::store::Directory* dir =
DorisCompoundDirectory::getDirectory(fs, index_writer_path.c_str(), false);
auto index_writer = _CLNEW lucene::index::IndexWriter(dir, nullptr, true /* create */,
true /* closeDirOnShutdown */);
// get compound directory src_index_dirs
std::vector<lucene::store::Directory*> src_index_dirs(src_segment_num);
for (int i = 0; i < src_segment_num; ++i) {
// format: rowsetId_segmentId_indexId.idx
std::string src_idx_full_name =
src_index_files[i] + "_" + std::to_string(index_id) + ".idx";
DorisCompoundReader* reader = new DorisCompoundReader(
DorisCompoundDirectory::getDirectory(fs, tablet_path.c_str()),
src_idx_full_name.c_str());
src_index_dirs[i] = reader;
}
// get dest idx file paths
std::vector<lucene::store::Directory*> dest_index_dirs(dest_segment_num);
for (int i = 0; i < dest_segment_num; ++i) {
// format: rowsetId_segmentId_columnId
auto path = tablet_path + "/" + dest_index_files[i] + "_" + std::to_string(index_id);
dest_index_dirs[i] = DorisCompoundDirectory::getDirectory(fs, path.c_str(), true);
}
index_writer->indexCompaction(src_index_dirs, dest_index_dirs, trans_vec,
dest_segment_num_rows);
index_writer->close();
_CLDELETE(index_writer);
// NOTE: need to ref_cnt-- for dir,
// when index_writer is destroyed, if closeDir is set, dir will be close
// _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir will be destroyed.
_CLDECDELETE(dir)
for (auto d : src_index_dirs) {
if (d != nullptr) {
d->close();
_CLDELETE(d);
}
}
for (auto d : dest_index_dirs) {
if (d != nullptr) {
// NOTE: DO NOT close dest dir here, because it will be closed when dest index writer finalize.
//d->close();
_CLDELETE(d);
}
}
// delete temporary index_writer_path
fs->delete_directory(index_writer_path.c_str());
}
} // namespace segment_v2
} // namespace doris

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <string>
#include <vector>
#include "io/fs/file_system.h"
namespace doris {
namespace segment_v2 {
void compact_column(int32_t index_id, int src_segment_num, int dest_segment_num,
std::vector<std::string> src_index_files,
std::vector<std::string> dest_index_files, const io::FileSystemSPtr& fs,
std::string index_writer_path, std::string tablet_path,
std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec,
std::vector<uint32_t> dest_segment_num_rows);
} // namespace segment_v2
} // namespace doris