[Fix](variant) handle StorageReadOptions to avoid crash in new_column_iterator_with_path (#27936)

In partial update, read variant without `opt` will lead to crash
This commit is contained in:
lihangyu
2023-12-04 17:02:35 +08:00
committed by GitHub
parent 2022a8ab32
commit a7d1e92fc2
6 changed files with 27 additions and 9 deletions

View File

@ -424,7 +424,7 @@ static Status new_default_iterator(const TabletColumn& tablet_column,
Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter,
StorageReadOptions* opt) {
const StorageReadOptions* opt) {
vectorized::PathInData root_path;
if (tablet_column.path_info().empty()) {
// Missing path info, but need read the whole variant column
@ -434,7 +434,7 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
}
auto root = _sub_column_tree.find_leaf(root_path);
auto node = _sub_column_tree.find_exact(tablet_column.path_info());
if (opt->io_ctx.reader_type == ReaderType::READER_ALTER_TABLE) {
if (opt != nullptr && opt->io_ctx.reader_type == ReaderType::READER_ALTER_TABLE) {
CHECK(tablet_column.is_variant_type());
if (node == nullptr) {
// No such variant column in this segment, get a default one
@ -449,7 +449,7 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
return Status::OK();
}
if (opt->io_ctx.reader_type != ReaderType::READER_QUERY) {
if (opt == nullptr || opt->io_ctx.reader_type != ReaderType::READER_QUERY) {
// Could be compaction ..etc and read flat leaves nodes data
auto node = _sub_column_tree.find_leaf(tablet_column.path_info());
if (!node) {
@ -504,7 +504,7 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
// but they are not the same column
Status Segment::new_column_iterator(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter,
StorageReadOptions* opt) {
const StorageReadOptions* opt) {
// init column iterator by path info
if (!tablet_column.path_info().empty() || tablet_column.is_variant_type()) {
return new_column_iterator_with_path(tablet_column, iter, opt);

View File

@ -92,11 +92,11 @@ public:
Status new_column_iterator(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter,
StorageReadOptions* opt = nullptr);
const StorageReadOptions* opt);
Status new_column_iterator_with_path(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter,
StorageReadOptions* opt = nullptr);
const StorageReadOptions* opt);
Status new_column_iterator(int32_t unique_id, std::unique_ptr<ColumnIterator>* iter);

View File

@ -466,7 +466,7 @@ Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_ra
for (auto cid : _seek_schema->column_ids()) {
if (_column_iterators[cid] == nullptr) {
RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
&_column_iterators[cid]));
&_column_iterators[cid], &_opts));
ColumnIteratorOptions iter_opts {
.use_page_cache = _opts.use_page_cache,
.file_reader = _file_reader.get(),

View File

@ -2680,7 +2680,7 @@ Status Tablet::_get_segment_column_iterator(
rowset->rowset_id().to_string(), segid));
}
segment_v2::SegmentSharedPtr segment = *it;
RETURN_IF_ERROR(segment->new_column_iterator(target_column, column_iterator));
RETURN_IF_ERROR(segment->new_column_iterator(target_column, column_iterator, nullptr));
segment_v2::ColumnIteratorOptions opt {
.use_page_cache = !config::disable_storage_page_cache,
.file_reader = segment->file_reader().get(),

View File

@ -51,7 +51,8 @@ Status VStatisticsIterator::init(const StorageReadOptions& opts) {
auto unique_id = _schema.column(cid)->unique_id();
if (_column_iterators_map.count(unique_id) < 1) {
RETURN_IF_ERROR(_segment->new_column_iterator(opts.tablet_schema->column(cid),
&_column_iterators_map[unique_id]));
&_column_iterators_map[unique_id],
nullptr));
}
_column_iterators.push_back(_column_iterators_map[unique_id].get());
}

View File

@ -59,4 +59,21 @@ suite("regression_test_variant_delete_and_update", "variant_type"){
// sql """update ${table_name} set vs = '{"updated_value" : 123}' where k = 2"""
// sql """update ${table_name} set v = '{"updated_value" : 123}' where k = 2"""
qt_sql "select * from ${table_name} order by k"
// delete & insert concurrently
t1 = Thread.startDaemon {
for (int k = 1; k <= 60; k++) {
int x = k % 10;
sql """insert into ${table_name} values(${x}, '${x}', '{"k${x}" : ${x}}')"""
}
}
t2 = Thread.startDaemon {
for (int k = 1; k <= 60; k++) {
int x = k % 10;
sql """delete from ${table_name} where k = ${x} """
}
}
t1.join()
t2.join()
}