From f199860dea116a89f9820e033991b6dfbfbb90e3 Mon Sep 17 00:00:00 2001 From: airborne12 Date: Mon, 8 May 2023 14:07:32 +0800 Subject: [PATCH] [Improvement](inverted index) Enhance compaction performance through direct inverted index merging (#19207) --- be/src/common/config.h | 2 + be/src/olap/CMakeLists.txt | 1 + be/src/olap/compaction.cpp | 92 +++++++++++++++++-- be/src/olap/compaction.h | 2 +- be/src/olap/rowid_conversion.h | 5 + .../segment_v2/inverted_index_compaction.cpp | 85 +++++++++++++++++ .../segment_v2/inverted_index_compaction.h | 35 +++++++ 7 files changed, 215 insertions(+), 7 deletions(-) create mode 100644 be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp create mode 100644 be/src/olap/rowset/segment_v2/inverted_index_compaction.h diff --git a/be/src/common/config.h b/be/src/common/config.h index e3b7d33edb..ffdfe3bbce 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -934,6 +934,8 @@ CONF_String(inverted_index_dict_path, "${DORIS_HOME}/dict"); CONF_Int32(inverted_index_read_buffer_size, "4096"); // tree depth for bkd index CONF_Int32(max_depth_in_bkd_tree, "32"); +// index compaction +CONF_Bool(inverted_index_compaction_enable, "false"); // use num_broadcast_buffer blocks as buffer to do broadcast CONF_Int32(num_broadcast_buffer, "32"); // semi-structure configs diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 067cab7bc9..91ae9293c2 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -78,6 +78,7 @@ add_library(Olap STATIC rowset/segment_v2/inverted_index_desc.cpp rowset/segment_v2/inverted_index_compound_directory.cpp rowset/segment_v2/inverted_index_compound_reader.cpp + rowset/segment_v2/inverted_index_compaction.cpp rowset/segment_v2/bitshuffle_page.cpp rowset/segment_v2/bitshuffle_wrapper.cpp rowset/segment_v2/column_reader.cpp diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 71470fc60e..af556a5392 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -43,6 +43,7 @@ #include "olap/rowset/rowset_meta.h" #include "olap/rowset/rowset_writer.h" #include "olap/rowset/rowset_writer_context.h" +#include "olap/rowset/segment_v2/inverted_index_compaction.h" #include "olap/storage_engine.h" #include "olap/storage_policy.h" #include "olap/tablet.h" @@ -163,7 +164,8 @@ bool Compaction::is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& Status Compaction::do_compact_ordered_rowsets() { build_basic_info(); - RETURN_NOT_OK(construct_output_rowset_writer()); + RowsetWriterContext ctx; + RETURN_NOT_OK(construct_output_rowset_writer(ctx)); LOG(INFO) << "start to do ordered data compaction, tablet=" << _tablet->full_name() << ", output_version=" << _output_version; @@ -292,8 +294,9 @@ Status Compaction::do_compaction_impl(int64_t permits) { LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version << ", permits: " << permits; bool vertical_compaction = should_vertical_compaction(); + RowsetWriterContext ctx; RETURN_NOT_OK(construct_input_rowset_readers()); - RETURN_NOT_OK(construct_output_rowset_writer(vertical_compaction)); + RETURN_NOT_OK(construct_output_rowset_writer(ctx, vertical_compaction)); if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { Tablet::add_pending_remote_rowset(_output_rs_writer->rowset_id().to_string()); } @@ -302,8 +305,10 @@ Status Compaction::do_compaction_impl(int64_t permits) { // 2. write merged rows to output rowset // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool Merger::Statistics stats; - if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && - _tablet->enable_unique_key_merge_on_write()) { + // if ctx.skip_inverted_index.size() > 0, it means we need to do inverted index compaction. + // the row ID conversion matrix needs to be used for inverted index compaction. + if (ctx.skip_inverted_index.size() > 0 || (_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _tablet->enable_unique_key_merge_on_write())) { stats.rowid_conversion = &_rowid_conversion; } @@ -343,6 +348,70 @@ Status Compaction::do_compaction_impl(int64_t permits) { RETURN_NOT_OK(check_correctness(stats)); TRACE("check correctness finished"); + if (_input_row_num > 0 && stats.rowid_conversion && config::inverted_index_compaction_enable) { + OlapStopWatch inverted_watch; + // translation vec + // <> + std::vector>> trans_vec = + stats.rowid_conversion->get_rowid_conversion_map(); + + // source rowset,segment -> index_id + std::map, uint32_t> src_seg_to_id_map = + stats.rowid_conversion->get_src_segment_to_id_map(); + // dest rowset id + RowsetId dest_rowset_id = stats.rowid_conversion->get_dst_rowset_id(); + // dest segment id -> num rows + std::vector dest_segment_num_rows; + RETURN_IF_ERROR(_output_rs_writer->get_segment_num_rows(&dest_segment_num_rows)); + + auto src_segment_num = src_seg_to_id_map.size(); + auto dest_segment_num = dest_segment_num_rows.size(); + + // src index files + // format: rowsetId_segmentId + std::vector src_index_files(src_segment_num); + for (auto m : src_seg_to_id_map) { + std::pair p = m.first; + src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second); + } + + // dest index files + // format: rowsetId_segmentId + std::vector dest_index_files(dest_segment_num); + for (int i = 0; i < dest_segment_num; ++i) { + auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i); + dest_index_files[i] = prefix; + } + + // create index_writer to compaction indexes + auto& fs = _output_rowset->rowset_meta()->fs(); + auto tablet_path = _output_rowset->tablet_path(); + + DCHECK(dest_index_files.size() > 0); + // we choose the first destination segment name as the temporary index writer path + // Used to distinguish between different index compaction + auto index_writer_path = tablet_path + "/" + dest_index_files[0]; + LOG(INFO) << "start index compaction" + << ". tablet=" << _tablet->full_name() + << ", source index size=" << src_segment_num + << ", destination index size=" << dest_segment_num << "."; + std::for_each( + ctx.skip_inverted_index.cbegin(), ctx.skip_inverted_index.cend(), + [&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files, + &dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows, + this](int32_t column_uniq_id) { + compact_column( + _cur_tablet_schema->get_inverted_index(column_uniq_id)->index_id(), + src_segment_num, dest_segment_num, src_index_files, dest_index_files, + fs, index_writer_path, tablet_path, trans_vec, dest_segment_num_rows); + }); + + LOG(INFO) << "succeed to do index compaction" + << ". tablet=" << _tablet->full_name() << ", input row number=" << _input_row_num + << ", output row number=" << _output_rowset->num_rows() + << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; + } + // 4. modify rowsets in memory RETURN_NOT_OK(modify_rowsets(&stats)); TRACE("modify rowsets finished"); @@ -382,13 +451,24 @@ Status Compaction::do_compaction_impl(int64_t permits) { return Status::OK(); } -Status Compaction::construct_output_rowset_writer(bool is_vertical) { - RowsetWriterContext ctx; +Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool is_vertical) { ctx.version = _output_version; ctx.rowset_state = VISIBLE; ctx.segments_overlap = NONOVERLAPPING; ctx.tablet_schema = _cur_tablet_schema; ctx.newest_write_timestamp = _newest_write_timestamp; + if (config::inverted_index_compaction_enable && + ((_tablet->keys_type() == KeysType::UNIQUE_KEYS || + _tablet->keys_type() == KeysType::DUP_KEYS))) { + for (auto& index : _cur_tablet_schema->indexes()) { + if (index.index_type() == IndexType::INVERTED) { + auto unique_id = index.col_unique_ids()[0]; + if (field_is_slice_type(_cur_tablet_schema->column_by_uid(unique_id).type())) { + ctx.skip_inverted_index.insert(unique_id); + } + } + } + } if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { // write output rowset to storage policy resource auto storage_policy = get_storage_policy(_tablet->storage_policy_id()); diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 17acb45710..737f4bf01c 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -73,7 +73,7 @@ protected: virtual Status modify_rowsets(const Merger::Statistics* stats = nullptr); void gc_output_rowset(); - Status construct_output_rowset_writer(bool is_vertical = false); + Status construct_output_rowset_writer(RowsetWriterContext& ctx, bool is_vertical = false); Status construct_input_rowset_readers(); Status check_version_continuity(const std::vector& rowsets); diff --git a/be/src/olap/rowid_conversion.h b/be/src/olap/rowid_conversion.h index 543a3f759b..940d6a106c 100644 --- a/be/src/olap/rowid_conversion.h +++ b/be/src/olap/rowid_conversion.h @@ -48,6 +48,7 @@ public: // set dst rowset id void set_dst_rowset_id(const RowsetId& dst_rowset_id) { _dst_rowst_id = dst_rowset_id; } + const RowsetId get_dst_rowset_id() { return _dst_rowst_id; } // add row id to the map void add(const std::vector& rss_row_ids, @@ -92,6 +93,10 @@ public: return _segments_rowid_map; } + const std::map, uint32_t>& get_src_segment_to_id_map() { + return _segment_to_id_map; + } + std::pair get_segment_by_id(uint32_t id) const { DCHECK_GT(_id_to_segment_map.size(), id); return _id_to_segment_map.at(id); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp new file mode 100644 index 0000000000..03badd0807 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "inverted_index_compaction.h" + +#include + +#include "inverted_index_compound_directory.h" +#include "inverted_index_compound_reader.h" + +namespace doris { +namespace segment_v2 { +void compact_column(int32_t index_id, int src_segment_num, int dest_segment_num, + std::vector src_index_files, + std::vector dest_index_files, const io::FileSystemSPtr& fs, + std::string index_writer_path, std::string tablet_path, + std::vector>> trans_vec, + std::vector dest_segment_num_rows) { + lucene::store::Directory* dir = + DorisCompoundDirectory::getDirectory(fs, index_writer_path.c_str(), false); + auto index_writer = _CLNEW lucene::index::IndexWriter(dir, nullptr, true /* create */, + true /* closeDirOnShutdown */); + + // get compound directory src_index_dirs + std::vector src_index_dirs(src_segment_num); + for (int i = 0; i < src_segment_num; ++i) { + // format: rowsetId_segmentId_indexId.idx + std::string src_idx_full_name = + src_index_files[i] + "_" + std::to_string(index_id) + ".idx"; + DorisCompoundReader* reader = new DorisCompoundReader( + DorisCompoundDirectory::getDirectory(fs, tablet_path.c_str()), + src_idx_full_name.c_str()); + src_index_dirs[i] = reader; + } + + // get dest idx file paths + std::vector dest_index_dirs(dest_segment_num); + for (int i = 0; i < dest_segment_num; ++i) { + // format: rowsetId_segmentId_columnId + auto path = tablet_path + "/" + dest_index_files[i] + "_" + std::to_string(index_id); + dest_index_dirs[i] = DorisCompoundDirectory::getDirectory(fs, path.c_str(), true); + } + + index_writer->indexCompaction(src_index_dirs, dest_index_dirs, trans_vec, + dest_segment_num_rows); + + index_writer->close(); + _CLDELETE(index_writer); + // NOTE: need to ref_cnt-- for dir, + // when index_writer is destroyed, if closeDir is set, dir will be close + // _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir will be destroyed. + _CLDECDELETE(dir) + for (auto d : src_index_dirs) { + if (d != nullptr) { + d->close(); + _CLDELETE(d); + } + } + for (auto d : dest_index_dirs) { + if (d != nullptr) { + // NOTE: DO NOT close dest dir here, because it will be closed when dest index writer finalize. + //d->close(); + _CLDELETE(d); + } + } + + // delete temporary index_writer_path + fs->delete_directory(index_writer_path.c_str()); +} +} // namespace segment_v2 +} // namespace doris diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.h b/be/src/olap/rowset/segment_v2/inverted_index_compaction.h new file mode 100644 index 0000000000..a682b6111f --- /dev/null +++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include +#include + +#include "io/fs/file_system.h" + +namespace doris { + +namespace segment_v2 { +void compact_column(int32_t index_id, int src_segment_num, int dest_segment_num, + std::vector src_index_files, + std::vector dest_index_files, const io::FileSystemSPtr& fs, + std::string index_writer_path, std::string tablet_path, + std::vector>> trans_vec, + std::vector dest_segment_num_rows); +} // namespace segment_v2 +} // namespace doris