[Feature-WIP](inverted) add inverted index writer api for be (#14207)

This commit is contained in:
airborne12
2022-12-26 15:02:12 +08:00
committed by GitHub
parent 1400a89065
commit 24a994eb9f
11 changed files with 168 additions and 2 deletions

View File

@ -26,6 +26,7 @@
namespace doris {
namespace io {
class FileSystem;
class FileWriter {
public:
@ -52,6 +53,8 @@ public:
virtual size_t bytes_appended() const = 0;
virtual FileSystem* fs() const = 0;
const Path& path() const { return _path; }
protected:

View File

@ -42,7 +42,7 @@ Status LocalFileSystem::create_file(const Path& path, FileWriterPtr* writer) {
if (-1 == fd) {
return Status::IOError("cannot open {}: {}", fs_path.native(), std::strerror(errno));
}
*writer = std::make_unique<LocalFileWriter>(std::move(fs_path), fd);
*writer = std::make_unique<LocalFileWriter>(std::move(fs_path), fd, this);
return Status::OK();
}

View File

@ -56,6 +56,12 @@ Status sync_dir(const io::Path& dirname) {
namespace io {
LocalFileWriter::LocalFileWriter(Path path, int fd, FileSystem* fs)
: FileWriter(std::move(path)), _fd(fd), _fs(fs) {
DorisMetrics::instance()->local_file_open_writing->increment(1);
DorisMetrics::instance()->local_file_writer_total->increment(1);
}
LocalFileWriter::LocalFileWriter(Path path, int fd) : FileWriter(std::move(path)), _fd(fd) {
DorisMetrics::instance()->local_file_open_writing->increment(1);
DorisMetrics::instance()->local_file_writer_total->increment(1);

View File

@ -19,6 +19,7 @@
#include <cstddef>
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
namespace doris {
@ -26,7 +27,10 @@ namespace io {
class LocalFileWriter final : public FileWriter {
public:
LocalFileWriter(Path path, int fd, FileSystem* fs);
LocalFileWriter(Path path, int fd);
~LocalFileWriter() override;
Status close() override;
@ -43,11 +47,14 @@ public:
size_t bytes_appended() const override { return _bytes_appended; }
FileSystem* fs() const override { return _fs; }
private:
Status _close(bool sync);
private:
int _fd; // owned
FileSystem* _fs;
size_t _bytes_appended = 0;
bool _dirty = false;

View File

@ -21,6 +21,7 @@
#include <list>
#include "io/fs/file_writer.h"
#include "io/fs/s3_file_system.h"
#include "util/s3_util.h"
namespace Aws::S3 {
@ -52,6 +53,11 @@ public:
size_t bytes_appended() const override { return _bytes_appended; }
FileSystem* fs() const override { return _fs; }
private:
S3FileSystem* _fs;
private:
Status _close();

View File

@ -99,6 +99,7 @@ struct RowsetWriterContext {
int64_t oldest_write_timestamp;
int64_t newest_write_timestamp;
bool enable_unique_key_merge_on_write = false;
std::set<int32_t> skip_inverted_index;
};
} // namespace doris

View File

@ -22,10 +22,12 @@
#include "common/logging.h"
#include "env/env.h"
#include "gutil/strings/substitute.h"
#include "io/fs/file_writer.h"
#include "olap/rowset/segment_v2/bitmap_index_writer.h"
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/rowset/segment_v2/bloom_filter_index_writer.h"
#include "olap/rowset/segment_v2/encoding_info.h"
#include "olap/rowset/segment_v2/inverted_index_writer.h"
#include "olap/rowset/segment_v2/options.h"
#include "olap/rowset/segment_v2/ordinal_page_index.h"
#include "olap/rowset/segment_v2/page_builder.h"
@ -96,6 +98,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
item_options.need_zone_map = false;
item_options.need_bloom_filter = item_column.is_bf_column();
item_options.need_bitmap_index = item_column.has_bitmap_index();
item_options.inverted_index = nullptr;
if (item_column.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
if (item_options.need_bloom_filter) {
return Status::NotSupported("Do not support bloom filter for array type");
@ -296,6 +299,13 @@ Status ScalarColumnWriter::init() {
RETURN_IF_ERROR(
BitmapIndexWriter::create(get_field()->type_info(), &_bitmap_index_builder));
}
if (_opts.inverted_index) {
RETURN_IF_ERROR(InvertedIndexColumnWriter::create(
get_field(), &_inverted_index_builder, _opts.meta->unique_id(),
_file_writer->path().filename().native(),
_file_writer->path().parent_path().native(), _opts.inverted_index,
_file_writer->fs()));
}
if (_opts.need_bloom_filter) {
RETURN_IF_ERROR(BloomFilterIndexWriter::create(
BloomFilterOptions(), get_field()->type_info(), &_bloom_filter_index_builder));
@ -312,6 +322,9 @@ Status ScalarColumnWriter::append_nulls(size_t num_rows) {
if (_opts.need_bitmap_index) {
_bitmap_index_builder->add_nulls(num_rows);
}
if (_opts.inverted_index) {
_inverted_index_builder->add_nulls(num_rows);
}
if (_opts.need_bloom_filter) {
_bloom_filter_index_builder->add_nulls(num_rows);
}
@ -344,6 +357,9 @@ Status ScalarColumnWriter::append_data_in_current_page(const uint8_t* data, size
if (_opts.need_bitmap_index) {
_bitmap_index_builder->add_values(data, *num_written);
}
if (_opts.inverted_index) {
_inverted_index_builder->add_values(get_field()->name(), data, *num_written);
}
if (_opts.need_bloom_filter) {
_bloom_filter_index_builder->add_values(data, *num_written);
}
@ -432,6 +448,13 @@ Status ScalarColumnWriter::write_bitmap_index() {
return Status::OK();
}
Status ScalarColumnWriter::write_inverted_index() {
if (_opts.inverted_index) {
return _inverted_index_builder->finish();
}
return Status::OK();
}
Status ScalarColumnWriter::write_bloom_filter_index() {
if (_opts.need_bloom_filter) {
return _bloom_filter_index_builder->finish(_file_writer, _opts.meta->add_indexes());
@ -532,7 +555,16 @@ Status ArrayColumnWriter::init() {
}
RETURN_IF_ERROR(_item_writer->init());
_offset_writer->register_flush_page_callback(this);
if (_opts.inverted_index) {
auto writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
if (writer != nullptr) {
RETURN_IF_ERROR(InvertedIndexColumnWriter::create(
get_field(), &_inverted_index_builder, _opts.meta->unique_id(),
writer->_file_writer->path().filename().native(),
writer->_file_writer->path().parent_path().native(), _opts.inverted_index,
writer->_file_writer->fs()));
}
}
return Status::OK();
}
@ -541,6 +573,13 @@ Status ArrayColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
return Status::OK();
}
Status ArrayColumnWriter::write_inverted_index() {
if (_opts.inverted_index) {
return _inverted_index_builder->finish();
}
return Status::OK();
}
// Now we can only write data one by one.
Status ArrayColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
size_t remaining = num_rows;
@ -567,6 +606,14 @@ Status ArrayColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
RETURN_IF_ERROR(_item_writer->append_data(reinterpret_cast<const uint8_t**>(&data),
col_cursor->length()));
}
if (_opts.inverted_index) {
auto writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
if (writer != nullptr) {
//NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr
_inverted_index_builder->add_array_values(_item_writer->get_field()->size(),
col_cursor, 1);
}
}
}
remaining -= num_written;
col_cursor += num_written;

View File

@ -21,6 +21,7 @@
#include "common/status.h" // for Status
#include "gen_cpp/segment_v2.pb.h" // for EncodingTypePB
#include "olap/inverted_index_parser.h"
#include "olap/rowset/segment_v2/common.h"
#include "olap/rowset/segment_v2/page_pointer.h" // for PagePointer
#include "olap/tablet_schema.h" // for TabletColumn
@ -50,6 +51,8 @@ struct ColumnWriterOptions {
bool need_zone_map = false;
bool need_bitmap_index = false;
bool need_bloom_filter = false;
std::vector<const TabletIndex*> indexes;
const TabletIndex* inverted_index = nullptr;
std::string to_string() const {
std::stringstream ss;
ss << std::boolalpha << "meta=" << meta->DebugString()
@ -62,6 +65,7 @@ struct ColumnWriterOptions {
};
class BitmapIndexWriter;
class InvertedIndexColumnWriter;
class EncodingInfo;
class NullBitmapBuilder;
class OrdinalIndexWriter;
@ -126,6 +130,8 @@ public:
virtual Status write_bitmap_index() = 0;
virtual Status write_inverted_index() = 0;
virtual Status write_bloom_filter_index() = 0;
virtual ordinal_t get_next_rowid() const = 0;
@ -174,6 +180,7 @@ public:
Status write_ordinal_index() override;
Status write_zone_map() override;
Status write_bitmap_index() override;
Status write_inverted_index() override;
Status write_bloom_filter_index() override;
ordinal_t get_next_rowid() const override { return _next_rowid; }
@ -186,6 +193,7 @@ public:
Status append_data_in_current_page(const uint8_t** ptr, size_t* num_written);
Status append_data_in_current_page(const uint8_t* ptr, size_t* num_written);
friend class ArrayColumnWriter;
private:
std::unique_ptr<PageBuilder> _page_builder;
@ -247,6 +255,7 @@ private:
std::unique_ptr<OrdinalIndexWriter> _ordinal_index_builder;
std::unique_ptr<ZoneMapIndexWriter> _zone_map_index_builder;
std::unique_ptr<BitmapIndexWriter> _bitmap_index_builder;
std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder;
std::unique_ptr<BloomFilterIndexWriter> _bloom_filter_index_builder;
// call before flush data page.
@ -286,6 +295,7 @@ public:
}
return Status::OK();
}
Status write_inverted_index() override;
Status write_bloom_filter_index() override {
if (_opts.need_bloom_filter) {
return Status::NotSupported("array not support bloom filter index");
@ -303,6 +313,7 @@ private:
std::unique_ptr<ScalarColumnWriter> _offset_writer;
std::unique_ptr<ScalarColumnWriter> _null_writer;
std::unique_ptr<ColumnWriter> _item_writer;
std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder;
ColumnWriterOptions _opts;
};

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "common/status.h"
#include "olap/inverted_index_parser.h"
#include "olap/olap_common.h"
#include "olap/tablet_schema.h"
namespace doris {
class CollectionValue;
namespace segment_v2 {
class InvertedIndexColumnWriter {
public:
static Status create(const Field* field, std::unique_ptr<InvertedIndexColumnWriter>* res,
uint32_t uuid, const std::string& segment_file_name,
const std::string& dir, const TabletIndex* inverted_index,
io::FileSystem* fs) {
return Status::OK();
}
virtual Status init() = 0;
InvertedIndexColumnWriter() = default;
virtual ~InvertedIndexColumnWriter() = default;
virtual Status add_values(const std::string name, const void* values, size_t count) = 0;
virtual Status add_array_values(size_t field_size, const CollectionValue* values,
size_t count) = 0;
virtual Status add_nulls(uint32_t count) = 0;
virtual Status finish() = 0;
virtual uint64_t size() const = 0;
private:
DISALLOW_COPY_AND_ASSIGN(InvertedIndexColumnWriter);
};
} // namespace segment_v2
} // namespace doris

View File

@ -24,6 +24,7 @@
#include "olap/primary_key_index.h"
#include "olap/row.h" // ContiguousRow
#include "olap/row_cursor.h" // RowCursor
#include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext
#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter
#include "olap/rowset/segment_v2/page_io.h"
#include "olap/schema.h"
@ -112,6 +113,20 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {
opts.need_zone_map = column.is_key() || _tablet_schema->keys_type() != KeysType::AGG_KEYS;
opts.need_bloom_filter = column.is_bf_column();
opts.need_bitmap_index = column.has_bitmap_index();
bool skip_inverted_index = false;
if (_opts.rowset_ctx != nullptr) {
skip_inverted_index =
_opts.rowset_ctx->skip_inverted_index.count(column.unique_id()) > 0;
}
// indexes for this column
opts.indexes = _tablet_schema->get_indexes_for_column(column.unique_id());
for (auto index : opts.indexes) {
if (!skip_inverted_index && index && index->index_type() == IndexType::INVERTED) {
opts.inverted_index = index;
// TODO support multiple inverted index
break;
}
}
if (column.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
opts.need_zone_map = false;
if (opts.need_bloom_filter) {
@ -362,6 +377,7 @@ Status SegmentWriter::finalize_columns(uint64_t* index_size) {
RETURN_IF_ERROR(_write_ordinal_index());
RETURN_IF_ERROR(_write_zone_map());
RETURN_IF_ERROR(_write_bitmap_index());
RETURN_IF_ERROR(_write_inverted_index());
RETURN_IF_ERROR(_write_bloom_filter_index());
*index_size = _file_writer->bytes_appended() - index_offset;
@ -437,6 +453,13 @@ Status SegmentWriter::_write_bitmap_index() {
return Status::OK();
}
Status SegmentWriter::_write_inverted_index() {
for (auto& column_writer : _column_writers) {
RETURN_IF_ERROR(column_writer->write_inverted_index());
}
return Status::OK();
}
Status SegmentWriter::_write_bloom_filter_index() {
for (auto& column_writer : _column_writers) {
RETURN_IF_ERROR(column_writer->write_bloom_filter_index());

View File

@ -43,6 +43,7 @@ class TabletColumn;
class ShortKeyIndexBuilder;
class PrimaryKeyIndexBuilder;
class KeyCoder;
struct RowsetWriterContext;
namespace io {
class FileWriter;
@ -58,6 +59,8 @@ extern const uint32_t k_segment_magic_length;
struct SegmentWriterOptions {
uint32_t num_rows_per_block = 1024;
bool enable_unique_key_merge_on_write = false;
RowsetWriterContext* rowset_ctx = nullptr;
};
class SegmentWriter {
@ -105,6 +108,7 @@ private:
Status _write_ordinal_index();
Status _write_zone_map();
Status _write_bitmap_index();
Status _write_inverted_index();
Status _write_bloom_filter_index();
Status _write_short_key_index();
Status _write_primary_key_index();