[Fix](Variant) use uinque id to access column reader (#39841) (#40269)

#39841
#40295
This commit is contained in:
lihangyu
2024-09-09 18:01:12 +08:00
committed by GitHub
parent 8f37eccbf2
commit f69063ea87
10 changed files with 174 additions and 53 deletions

View File

@ -17,6 +17,8 @@
#include "olap/rowset/segment_v2/hierarchical_data_reader.h"
#include <memory>
#include "common/status.h"
#include "io/io_common.h"
#include "olap/rowset/segment_v2/column_reader.h"
@ -41,7 +43,7 @@ Status HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader,
vectorized::PathsInData leaves_paths;
SubcolumnColumnReaders::get_leaves_of_node(node, leaves, leaves_paths);
for (size_t i = 0; i < leaves_paths.size(); ++i) {
if (leaves_paths[i] == root->path) {
if (leaves_paths[i].empty()) {
// use set_root to share instead
continue;
}

View File

@ -127,7 +127,7 @@ private:
auto& container_variant = assert_cast<vectorized::ColumnObject&>(*container);
// add root first
if (_path.get_parts().size() == 1) {
if (_path.get_parts().empty() && _root_reader) {
auto& root_var =
_root_reader->column->is_nullable()
? assert_cast<vectorized::ColumnObject&>(

View File

@ -158,7 +158,9 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o
const TabletColumn& col = read_options.tablet_schema->column(column_id);
ColumnReader* reader = nullptr;
if (col.is_extracted_column()) {
const auto* node = _sub_column_tree.find_exact(*col.path_info_ptr());
auto relative_path = col.path_info_ptr()->copy_pop_front();
int32_t unique_id = col.unique_id() > 0 ? col.unique_id() : col.parent_unique_id();
const auto* node = _sub_column_tree[unique_id].find_exact(relative_path);
reader = node != nullptr ? node->data.reader.get() : nullptr;
} else {
reader = _column_readers.contains(col.unique_id())
@ -381,19 +383,27 @@ Status Segment::_load_index_impl() {
// Return the storage datatype of related column to field.
// Return nullptr meaning no such storage infomation for this column
vectorized::DataTypePtr Segment::get_data_type_of(vectorized::PathInDataPtr path, bool is_nullable,
bool ignore_children) const {
vectorized::DataTypePtr Segment::get_data_type_of(const ColumnIdentifier& identifier,
bool read_flat_leaves) const {
// Path has higher priority
if (path != nullptr && !path->empty()) {
const auto* node = _sub_column_tree.find_leaf(*path);
const auto* sparse_node = _sparse_column_tree.find_exact(*path);
if (identifier.path != nullptr && !identifier.path->empty()) {
auto relative_path = identifier.path->copy_pop_front();
int32_t unique_id =
identifier.unique_id > 0 ? identifier.unique_id : identifier.parent_unique_id;
const auto* node = _sub_column_tree.contains(unique_id)
? _sub_column_tree.at(unique_id).find_leaf(relative_path)
: nullptr;
const auto* sparse_node =
_sparse_column_tree.contains(unique_id)
? _sparse_column_tree.at(unique_id).find_exact(relative_path)
: nullptr;
if (node) {
if (ignore_children || (node->children.empty() && sparse_node == nullptr)) {
if (read_flat_leaves || (node->children.empty() && sparse_node == nullptr)) {
return node->data.file_column_type;
}
}
// it contains children or column missing in storage, so treat it as variant
return is_nullable
return identifier.is_nullable
? vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>())
: std::make_shared<vectorized::DataTypeObject>();
}
@ -450,7 +460,9 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) {
if (!column.has_path_info()) {
continue;
}
auto iter = column_path_to_footer_ordinal.find(*column.path_info_ptr());
auto path = column.has_path_info() ? *column.path_info_ptr()
: vectorized::PathInData(column.name_lower_case());
auto iter = column_path_to_footer_ordinal.find(path);
if (iter == column_path_to_footer_ordinal.end()) {
continue;
}
@ -460,11 +472,25 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) {
std::unique_ptr<ColumnReader> reader;
RETURN_IF_ERROR(
ColumnReader::create(opts, column_pb, footer.num_rows(), _file_reader, &reader));
_sub_column_tree.add(
iter->first,
SubcolumnReader {
std::move(reader),
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
// root column use unique id, leaf column use parent_unique_id
int32_t unique_id =
column.parent_unique_id() > 0 ? column.parent_unique_id() : column.unique_id();
auto relative_path = path.copy_pop_front();
if (relative_path.empty()) {
// root column
_sub_column_tree[unique_id].create_root(SubcolumnReader {
std::move(reader),
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
} else {
// check the root is already a leaf node
DCHECK(_sub_column_tree[unique_id].get_leaves()[0]->path.empty());
_sub_column_tree[unique_id].add(
relative_path,
SubcolumnReader {
std::move(reader),
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
}
// init sparse columns paths and type info
for (uint32_t ordinal = 0; ordinal < column_pb.sparse_columns().size(); ++ordinal) {
const auto& spase_column_pb = column_pb.sparse_columns(ordinal);
@ -472,8 +498,8 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) {
vectorized::PathInData path;
path.from_protobuf(spase_column_pb.column_path_info());
// Read from root column, so reader is nullptr
_sparse_column_tree.add(
path,
_sparse_column_tree[unique_id].add(
path.copy_pop_front(),
SubcolumnReader {nullptr,
vectorized::DataTypeFactory::instance().create_data_type(
spase_column_pb)});
@ -523,22 +549,23 @@ Status Segment::_new_iterator_with_variant_root(const TabletColumn& tablet_colum
Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter,
const StorageReadOptions* opt) {
vectorized::PathInData root_path;
if (!tablet_column.has_path_info()) {
// Missing path info, but need read the whole variant column
root_path = vectorized::PathInData(tablet_column.name_lower_case());
} else {
root_path = vectorized::PathInData({tablet_column.path_info_ptr()->get_parts()[0]});
// root column use unique id, leaf column use parent_unique_id
int32_t unique_id = tablet_column.unique_id() > 0 ? tablet_column.unique_id()
: tablet_column.parent_unique_id();
if (!_sub_column_tree.contains(unique_id)) {
// No such variant column in this segment, get a default one
RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
return Status::OK();
}
const auto* root = _sub_column_tree.find_leaf(root_path);
auto relative_path = tablet_column.path_info_ptr()->copy_pop_front();
const auto* root = _sub_column_tree[unique_id].get_root();
const auto* node = tablet_column.has_path_info()
? _sub_column_tree.find_exact(*tablet_column.path_info_ptr())
? _sub_column_tree[unique_id].find_exact(relative_path)
: nullptr;
const auto* sparse_node =
tablet_column.has_path_info()
? _sparse_column_tree.find_exact(*tablet_column.path_info_ptr())
tablet_column.has_path_info() && _sparse_column_tree.contains(unique_id)
? _sparse_column_tree[unique_id].find_exact(relative_path)
: nullptr;
// Currently only compaction and checksum need to read flat leaves
// They both use tablet_schema_with_merged_max_schema_version as read schema
auto type_to_read_flat_leaves = [](ReaderType type) {
@ -552,7 +579,7 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
if (opt != nullptr && type_to_read_flat_leaves(opt->io_ctx.reader_type)) {
// compaction need to read flat leaves nodes data to prevent from amplification
const auto* node = tablet_column.has_path_info()
? _sub_column_tree.find_leaf(*tablet_column.path_info_ptr())
? _sub_column_tree[unique_id].find_leaf(relative_path)
: nullptr;
if (!node) {
// sparse_columns have this path, read from root
@ -574,15 +601,14 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
if (node->is_leaf_node() && sparse_node == nullptr) {
// Node contains column without any child sub columns and no corresponding sparse columns
// Direct read extracted columns
const auto* node = _sub_column_tree.find_leaf(*tablet_column.path_info_ptr());
const auto* node = _sub_column_tree[unique_id].find_leaf(relative_path);
ColumnIterator* it;
RETURN_IF_ERROR(node->data.reader->new_iterator(&it));
iter->reset(it);
} else {
// Node contains column with children columns or has correspoding sparse columns
// Create reader with hirachical data
RETURN_IF_ERROR(HierarchicalDataReader::create(iter, *tablet_column.path_info_ptr(),
node, root));
RETURN_IF_ERROR(HierarchicalDataReader::create(iter, relative_path, node, root));
}
} else {
// No such node, read from either sparse column or default column
@ -648,8 +674,11 @@ Status Segment::new_column_iterator(int32_t unique_id, std::unique_ptr<ColumnIte
ColumnReader* Segment::_get_column_reader(const TabletColumn& col) {
// init column iterator by path info
if (col.has_path_info() || col.is_variant_type()) {
const auto* node =
col.has_path_info() ? _sub_column_tree.find_exact(*col.path_info_ptr()) : nullptr;
auto relative_path = col.path_info_ptr()->copy_pop_front();
int32_t unique_id = col.unique_id() > 0 ? col.unique_id() : col.parent_unique_id();
const auto* node = col.has_path_info()
? _sub_column_tree[unique_id].find_exact(relative_path)
: nullptr;
if (node != nullptr) {
return node->data.reader.get();
}
@ -810,14 +839,19 @@ Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) {
}
bool Segment::same_with_storage_type(int32_t cid, const Schema& schema,
bool ignore_children) const {
auto file_column_type = get_data_type_of(schema.column(cid)->path(),
schema.column(cid)->is_nullable(), ignore_children);
auto expected_type = Schema::get_data_type_ptr(*schema.column(cid));
bool read_flat_leaves) const {
const auto* col = schema.column(cid);
auto file_column_type =
get_data_type_of(ColumnIdentifier {.unique_id = col->unique_id(),
.parent_unique_id = col->parent_unique_id(),
.path = col->path(),
.is_nullable = col->is_nullable()},
read_flat_leaves);
auto expected_type = Schema::get_data_type_ptr(*col);
#ifndef NDEBUG
if (file_column_type && !file_column_type->equals(*expected_type)) {
VLOG_DEBUG << fmt::format("Get column {}, file column type {}, exepected type {}",
schema.column(cid)->name(), file_column_type->get_name(),
col->name(), file_column_type->get_name(),
expected_type->get_name());
}
#endif
@ -843,7 +877,10 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto
vectorized::PathInDataPtr path = std::make_shared<vectorized::PathInData>(
schema.column_by_uid(slot->col_unique_id()).name_lower_case(),
slot->column_paths());
auto storage_type = get_data_type_of(path, slot->is_nullable(), false);
auto storage_type = get_data_type_of(ColumnIdentifier {.unique_id = slot->col_unique_id(),
.path = path,
.is_nullable = slot->is_nullable()},
false);
vectorized::MutableColumnPtr file_storage_column = storage_type->create_column();
DCHECK(storage_type != nullptr);
TabletColumn column = TabletColumn::create_materialized_variant_column(

View File

@ -150,14 +150,19 @@ public:
void remove_from_segment_cache() const;
// Identify the column by unique id or path info
struct ColumnIdentifier {
int32_t unique_id = -1;
int32_t parent_unique_id = -1;
vectorized::PathInDataPtr path;
bool is_nullable = false;
};
// Get the inner file column's data type
// ignore_chidren set to false will treat field as variant
// when it contains children with field paths.
// nullptr will returned if storage type does not contains such column
std::shared_ptr<const vectorized::IDataType> get_data_type_of(vectorized::PathInDataPtr path,
bool is_nullable,
bool ignore_children) const;
std::shared_ptr<const vectorized::IDataType> get_data_type_of(
const ColumnIdentifier& identifier, bool read_flat_leaves) const;
// Check is schema read type equals storage column type
bool same_with_storage_type(int32_t cid, const Schema& schema, bool ignore_children) const;
@ -166,8 +171,12 @@ public:
bool can_apply_predicate_safely(int cid, Predicate* pred, const Schema& schema,
ReaderType read_type) const {
const Field* col = schema.column(cid);
vectorized::DataTypePtr storage_column_type = get_data_type_of(
col->path(), col->is_nullable(), read_type != ReaderType::READER_QUERY);
vectorized::DataTypePtr storage_column_type =
get_data_type_of(ColumnIdentifier {.unique_id = col->unique_id(),
.parent_unique_id = col->parent_unique_id(),
.path = col->path(),
.is_nullable = col->is_nullable()},
read_type != ReaderType::READER_QUERY);
if (storage_column_type == nullptr) {
// Default column iterator
return true;
@ -239,10 +248,12 @@ private:
// Each node in the tree represents the sub column reader and type
// for variants.
SubcolumnColumnReaders _sub_column_tree;
// map column unique id --> it's sub column readers
std::map<int32_t, SubcolumnColumnReaders> _sub_column_tree;
// each sprase column's path and types info
SubcolumnColumnReaders _sparse_column_tree;
// map column unique id --> it's sparse sub column readers
std::map<int32_t, SubcolumnColumnReaders> _sparse_column_tree;
// used to guarantee that short key index will be loaded at most once in a thread-safe way
DorisCallOnce<Status> _load_index_once;

View File

@ -333,7 +333,12 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
const Field* col = _schema->column(i);
if (col) {
auto storage_type = _segment->get_data_type_of(
col->path(), col->is_nullable(),
Segment::ColumnIdentifier {
col->unique_id(),
col->parent_unique_id(),
col->path(),
col->is_nullable(),
},
_opts.io_ctx.reader_type != ReaderType::READER_QUERY);
if (storage_type == nullptr) {
storage_type = vectorized::DataTypeFactory::instance().create_data_type(*col);

View File

@ -269,7 +269,12 @@ private:
continue;
}
vectorized::DataTypePtr storage_type = _segment->get_data_type_of(
_schema->column(cid)->path(), _schema->column(cid)->is_nullable(), false);
Segment::ColumnIdentifier {
.unique_id = _schema->column(cid)->unique_id(),
.parent_unique_id = _schema->column(cid)->parent_unique_id(),
.path = _schema->column(cid)->path(),
.is_nullable = _schema->column(cid)->is_nullable()},
false);
if (storage_type && !storage_type->equals(*block->get_by_position(block_cid).type)) {
// Do additional cast
vectorized::MutableColumnPtr tmp = storage_type->create_column();

View File

@ -137,9 +137,15 @@ public:
/// flag, which is true if node already exists.
using NodeCreator = std::function<NodePtr(NodeKind, bool)>;
// create root as SCALAR node
void create_root(NodeData&& leaf_data) {
root = std::make_shared<Node>(Node::SCALAR, std::move(leaf_data));
leaves.push_back(root);
}
// create root as SCALAR node
void create_root(const NodeData& leaf_data) {
root = std::make_shared<Node>(Node::SCALAR, leaf_data);
root = std::make_shared<Node>(Node::SCALAR, std::move(leaf_data));
leaves.push_back(root);
}

View File

@ -37,7 +37,7 @@ UPPER CASE lower case
\N
\N
\N
\N
""
""
1234566
16

View File

@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
0 {"k1":1,"k2":"hello world","k3":[1234],"k4":1.1,"k5":[[123]]}
-- !sql --
0 {"k1":1,"k2":"hello world","k3":[1234],"k4":1.1,"k5":[[123]]} \N
2 {"xxxx":1234} {"yyyy":1.1111}
-- !sql --
0 {"k1":1,"k2":"hello world","k3":[1234],"k4":1.1,"k5":[[123]]} \N
2 {"xxxx":1234} \N
2 {"xxxx":1234} \N

View File

@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("regression_test_variant_column_rename", "variant_type"){
sql "DROP TABLE IF EXISTS variant_renam"
sql """
CREATE TABLE IF NOT EXISTS variant_renam(
k bigint not null,
v variant not null
)
DUPLICATE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 4
properties("replication_num" = "1");
"""
sql """INSERT INTO variant_renam SELECT *, '{"k1":1, "k2": "hello world", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}' FROM numbers("number" = "1")"""
sql """alter table variant_renam rename column v va""";
qt_sql """select * from variant_renam"""
// drop column and add the same name column
sql """alter table variant_renam add column v2 variant default null"""
sql """insert into variant_renam values (2, '{"xxxx" : 1234}', '{"yyyy" : 1.1111}')"""
qt_sql "select * from variant_renam order by k"
sql """alter table variant_renam drop column v2"""
sql """insert into variant_renam values (2, '{"xxxx" : 1234}')"""
sql """alter table variant_renam add column v2 variant default null"""
qt_sql "select * from variant_renam order by k"
}