[FIX](array/map)fix array map batch append data with right next_array_item_rowid (#23779)
This commit is contained in:
@ -221,7 +221,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
|
||||
length_column.set_index_length(-1); // no short key index
|
||||
std::unique_ptr<Field> bigint_field(FieldFactory::create(length_column));
|
||||
auto* length_writer =
|
||||
new ScalarColumnWriter(length_options, std::move(bigint_field), file_writer);
|
||||
new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer);
|
||||
|
||||
// if nullable, create null writer
|
||||
ScalarColumnWriter* null_writer = nullptr;
|
||||
@ -314,7 +314,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
|
||||
length_column.set_index_length(-1); // no short key index
|
||||
std::unique_ptr<Field> bigint_field(FieldFactory::create(length_column));
|
||||
auto* length_writer =
|
||||
new ScalarColumnWriter(length_options, std::move(bigint_field), file_writer);
|
||||
new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer);
|
||||
|
||||
// create null writer
|
||||
if (opts.meta->is_nullable()) {
|
||||
@ -731,6 +731,48 @@ Status ScalarColumnWriter::finish_current_page() {
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// offset column writer
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts,
|
||||
std::unique_ptr<Field> field, io::FileWriter* file_writer)
|
||||
: ScalarColumnWriter(opts, std::move(field), file_writer) {
|
||||
// now we only explain data in offset column as uint64
|
||||
DCHECK(get_field()->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT);
|
||||
}
|
||||
|
||||
OffsetColumnWriter::~OffsetColumnWriter() = default;
|
||||
|
||||
Status OffsetColumnWriter::init() {
|
||||
RETURN_IF_ERROR(ScalarColumnWriter::init());
|
||||
register_flush_page_callback(this);
|
||||
_next_offset = 0;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
|
||||
size_t remaining = num_rows;
|
||||
while (remaining > 0) {
|
||||
size_t num_written = remaining;
|
||||
RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
|
||||
// _next_offset after append_data_in_current_page is the offset of next data, which will used in finish_current_page() to set next_array_item_ordinal
|
||||
_next_offset = *(const uint64_t*)(*ptr);
|
||||
remaining -= num_written;
|
||||
|
||||
if (_page_builder->is_page_full()) {
|
||||
// get next data for next array_item_rowid
|
||||
RETURN_IF_ERROR(finish_current_page());
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status OffsetColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
|
||||
footer->set_next_array_item_ordinal(_next_offset);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
StructColumnWriter::StructColumnWriter(
|
||||
const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
|
||||
ScalarColumnWriter* null_writer,
|
||||
@ -853,7 +895,7 @@ Status StructColumnWriter::finish_current_page() {
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
|
||||
ScalarColumnWriter* offset_writer,
|
||||
OffsetColumnWriter* offset_writer,
|
||||
ScalarColumnWriter* null_writer,
|
||||
std::unique_ptr<ColumnWriter> item_writer)
|
||||
: ColumnWriter(std::move(field), opts.meta->is_nullable()),
|
||||
@ -871,7 +913,6 @@ Status ArrayColumnWriter::init() {
|
||||
RETURN_IF_ERROR(_null_writer->init());
|
||||
}
|
||||
RETURN_IF_ERROR(_item_writer->init());
|
||||
_offset_writer->register_flush_page_callback(this);
|
||||
if (_opts.inverted_index) {
|
||||
auto writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
|
||||
if (writer != nullptr) {
|
||||
@ -885,11 +926,6 @@ Status ArrayColumnWriter::init() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ArrayColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
|
||||
footer->set_next_array_item_ordinal(_item_writer->get_next_rowid());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ArrayColumnWriter::write_inverted_index() {
|
||||
if (_opts.inverted_index) {
|
||||
return _inverted_index_builder->finish();
|
||||
@ -1008,7 +1044,7 @@ Status ArrayColumnWriter::finish_current_page() {
|
||||
|
||||
/// ============================= MapColumnWriter =====================////
|
||||
MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
|
||||
ScalarColumnWriter* null_writer, ScalarColumnWriter* offset_writer,
|
||||
ScalarColumnWriter* null_writer, OffsetColumnWriter* offset_writer,
|
||||
std::vector<std::unique_ptr<ColumnWriter>>& kv_writers)
|
||||
: ColumnWriter(std::move(field), opts.meta->is_nullable()), _opts(opts) {
|
||||
CHECK_EQ(kv_writers.size(), 2);
|
||||
@ -1028,7 +1064,6 @@ Status MapColumnWriter::init() {
|
||||
}
|
||||
// here register_flush_page_callback to call this.put_extra_info_in_page()
|
||||
// when finish cur data page
|
||||
_offsets_writer->register_flush_page_callback(this);
|
||||
for (auto& sub_writer : _kv_writers) {
|
||||
RETURN_IF_ERROR(sub_writer->init());
|
||||
}
|
||||
@ -1138,12 +1173,6 @@ Status MapColumnWriter::finish_current_page() {
|
||||
return Status::NotSupported("map writer has no data, can not finish_current_page");
|
||||
}
|
||||
|
||||
// write this value for column reader to read according offsets
|
||||
Status MapColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
|
||||
footer->set_next_array_item_ordinal(_kv_writers[0]->get_next_rowid());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MapColumnWriter::write_inverted_index() {
|
||||
if (_opts.inverted_index) {
|
||||
return _inverted_index_builder->finish();
|
||||
|
||||
@ -171,7 +171,7 @@ public:
|
||||
// Because some columns would be stored in a file, we should wait
|
||||
// until all columns has been finished, and then data can be written
|
||||
// to file
|
||||
class ScalarColumnWriter final : public ColumnWriter {
|
||||
class ScalarColumnWriter : public ColumnWriter {
|
||||
public:
|
||||
ScalarColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
|
||||
io::FileWriter* file_writer);
|
||||
@ -208,6 +208,7 @@ public:
|
||||
|
||||
Status append_data_in_current_page(const uint8_t* ptr, size_t* num_written);
|
||||
friend class ArrayColumnWriter;
|
||||
friend class OffsetColumnWriter;
|
||||
|
||||
private:
|
||||
std::unique_ptr<PageBuilder> _page_builder;
|
||||
@ -276,6 +277,26 @@ private:
|
||||
FlushPageCallback* _new_page_callback = nullptr;
|
||||
};
|
||||
|
||||
// offsetColumnWriter is used column which has offset column, like array, map.
|
||||
// column type is only uint64 and should response for whole column value [start, end], end will set
|
||||
// in footer.next_array_item_ordinal which in finish_cur_page() callback put_extra_info_in_page()
|
||||
class OffsetColumnWriter final : public ScalarColumnWriter, FlushPageCallback {
|
||||
public:
|
||||
OffsetColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
|
||||
io::FileWriter* file_writer);
|
||||
|
||||
~OffsetColumnWriter() override;
|
||||
|
||||
Status init() override;
|
||||
|
||||
Status append_data(const uint8_t** ptr, size_t num_rows) override;
|
||||
|
||||
private:
|
||||
Status put_extra_info_in_page(DataPageFooterPB* footer) override;
|
||||
|
||||
uint64_t _next_offset;
|
||||
};
|
||||
|
||||
class StructColumnWriter final : public ColumnWriter {
|
||||
public:
|
||||
explicit StructColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
|
||||
@ -328,10 +349,10 @@ private:
|
||||
ColumnWriterOptions _opts;
|
||||
};
|
||||
|
||||
class ArrayColumnWriter final : public ColumnWriter, public FlushPageCallback {
|
||||
class ArrayColumnWriter final : public ColumnWriter {
|
||||
public:
|
||||
explicit ArrayColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
|
||||
ScalarColumnWriter* offset_writer, ScalarColumnWriter* null_writer,
|
||||
OffsetColumnWriter* offset_writer, ScalarColumnWriter* null_writer,
|
||||
std::unique_ptr<ColumnWriter> item_writer);
|
||||
~ArrayColumnWriter() override = default;
|
||||
|
||||
@ -373,22 +394,21 @@ public:
|
||||
ordinal_t get_next_rowid() const override { return _offset_writer->get_next_rowid(); }
|
||||
|
||||
private:
|
||||
Status put_extra_info_in_page(DataPageFooterPB* header) override;
|
||||
Status write_null_column(size_t num_rows, bool is_null); // 写入num_rows个null标记
|
||||
bool has_empty_items() const { return _item_writer->get_next_rowid() == 0; }
|
||||
|
||||
private:
|
||||
std::unique_ptr<ScalarColumnWriter> _offset_writer;
|
||||
std::unique_ptr<OffsetColumnWriter> _offset_writer;
|
||||
std::unique_ptr<ScalarColumnWriter> _null_writer;
|
||||
std::unique_ptr<ColumnWriter> _item_writer;
|
||||
std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder;
|
||||
ColumnWriterOptions _opts;
|
||||
};
|
||||
|
||||
class MapColumnWriter final : public ColumnWriter, public FlushPageCallback {
|
||||
class MapColumnWriter final : public ColumnWriter {
|
||||
public:
|
||||
explicit MapColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
|
||||
ScalarColumnWriter* null_writer, ScalarColumnWriter* offsets_writer,
|
||||
ScalarColumnWriter* null_writer, OffsetColumnWriter* offsets_writer,
|
||||
std::vector<std::unique_ptr<ColumnWriter>>& _kv_writers);
|
||||
|
||||
~MapColumnWriter() override = default;
|
||||
@ -432,12 +452,10 @@ public:
|
||||
ordinal_t get_next_rowid() const override { return _offsets_writer->get_next_rowid(); }
|
||||
|
||||
private:
|
||||
Status put_extra_info_in_page(DataPageFooterPB* header) override;
|
||||
|
||||
std::vector<std::unique_ptr<ColumnWriter>> _kv_writers;
|
||||
// we need null writer to make sure a row is null or not
|
||||
std::unique_ptr<ScalarColumnWriter> _null_writer;
|
||||
std::unique_ptr<ScalarColumnWriter> _offsets_writer;
|
||||
std::unique_ptr<OffsetColumnWriter> _offsets_writer;
|
||||
std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder;
|
||||
ColumnWriterOptions _opts;
|
||||
};
|
||||
|
||||
@ -943,6 +943,7 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorArray::convert_to_olap(
|
||||
auto elem_size = end_offset - start_offset;
|
||||
|
||||
_offsets.clear();
|
||||
// we need all offsets, so reserve num_rows + 1 to make sure last offset can be got in offset column, instead of according to nested item column
|
||||
_offsets.reserve(_num_rows + 1);
|
||||
for (int i = 0; i <= _num_rows; ++i) {
|
||||
_offsets.push_back(column_array->offset_at(i + _row_pos) - start_offset + _base_offset);
|
||||
@ -1011,8 +1012,9 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
|
||||
auto elem_size = end_offset - start_offset;
|
||||
|
||||
_offsets.clear();
|
||||
_offsets.reserve(_num_rows);
|
||||
for (int i = 0; i < _num_rows; ++i) {
|
||||
// we need all offsets, so reserve num_rows + 1 to make sure last offset can be got in offset column, instead of according to nested item column
|
||||
_offsets.reserve(_num_rows + 1);
|
||||
for (int i = 0; i <= _num_rows; ++i) {
|
||||
_offsets.push_back(column_map->offset_at(i + _row_pos) - start_offset + _base_offset);
|
||||
}
|
||||
_base_offset += elem_size;
|
||||
|
||||
Reference in New Issue
Block a user