[feature](multi-catalog) support map&struct type in parquet&orc reader (#17087)

Support parsing map&struct type in parquet&orc reader.

## Remaining Problems
1. Doris use array type to build the key and value column of a `map`, but doesn't fill the offsets in value column, so the offsets in value column is wasted.
2. Parquet support reading only key or value column in `map`, this PR hasn't supported yet.
3. Parquet support reading partial columns in `struct`, this PR hasn't supported yet.
This commit is contained in:
Ashin Gau
2023-02-26 08:55:39 +08:00
committed by GitHub
parent e42465ae59
commit c43e521d29
6 changed files with 367 additions and 49 deletions

View File

@ -27,8 +27,12 @@
#include "olap/iterators.h"
#include "util/slice.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_struct.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_struct.h"
namespace doris::vectorized {
@ -698,23 +702,23 @@ Status OrcReader::_decode_string_column(const std::string& col_name,
}
Status OrcReader::_fill_doris_array_offsets(const std::string& col_name,
const MutableColumnPtr& data_column,
orc::ListVectorBatch* lvb, size_t num_values,
size_t* element_size) {
ColumnArray::Offsets64& doris_offsets,
orc::DataBuffer<int64_t>& orc_offsets,
size_t num_values, size_t* element_size) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
if (num_values > 0) {
auto& offsets_data = static_cast<ColumnArray&>(*data_column).get_offsets();
auto& orc_offsets = lvb->offsets;
if (orc_offsets.size() < num_values + 1) {
return Status::InternalError("Wrong array offsets in orc file for column '{}'",
col_name);
}
auto prev_offset = offsets_data.back();
auto prev_offset = doris_offsets.back();
auto base_offset = orc_offsets[0];
for (int i = 1; i < num_values + 1; ++i) {
offsets_data.emplace_back(prev_offset + orc_offsets[i] - base_offset);
doris_offsets.emplace_back(prev_offset + orc_offsets[i] - base_offset);
}
*element_size = orc_offsets[num_values] - base_offset;
} else {
*element_size = 0;
}
return Status::OK();
}
@ -792,17 +796,67 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name,
return Status::InternalError("Wrong data type for colum '{}'", col_name);
}
auto* orc_list = down_cast<orc::ListVectorBatch*>(cvb);
size_t element_size;
RETURN_IF_ERROR(_fill_doris_array_offsets(col_name, data_column, orc_list, num_values,
auto& doris_offsets = static_cast<ColumnArray&>(*data_column).get_offsets();
auto& orc_offsets = orc_list->offsets;
size_t element_size = 0;
RETURN_IF_ERROR(_fill_doris_array_offsets(col_name, doris_offsets, orc_offsets, num_values,
&element_size));
DataTypePtr& nested_type = const_cast<DataTypePtr&>(
(reinterpret_cast<const DataTypeArray*>(remove_nullable(data_type).get()))
reinterpret_cast<const DataTypeArray*>(remove_nullable(data_type).get())
->get_nested_type());
const orc::Type* nested_orc_type = orc_column_type->getSubtype(0);
return _orc_column_to_doris_column(
col_name, static_cast<ColumnArray&>(*data_column).get_data_ptr(), nested_type,
nested_orc_type, orc_list->elements.get(), element_size);
}
case TypeIndex::Map: {
if (orc_column_type->getKind() != orc::TypeKind::MAP) {
return Status::InternalError("Wrong data type for colum '{}'", col_name);
}
auto* orc_map = down_cast<orc::MapVectorBatch*>(cvb);
auto& doris_map = static_cast<ColumnMap&>(*data_column);
size_t element_size = 0;
RETURN_IF_ERROR(_fill_doris_array_offsets(col_name, doris_map.get_offsets(),
orc_map->offsets, num_values, &element_size));
DataTypePtr& doris_key_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
->get_key_type());
DataTypePtr& doris_value_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
->get_value_type());
const orc::Type* orc_key_type = orc_column_type->getSubtype(0);
const orc::Type* orc_value_type = orc_column_type->getSubtype(1);
const ColumnPtr& doris_key_column =
typeid_cast<const ColumnArray*>(doris_map.get_keys_ptr().get())->get_data_ptr();
const ColumnPtr& doris_value_column =
typeid_cast<const ColumnArray*>(doris_map.get_values_ptr().get())->get_data_ptr();
RETURN_IF_ERROR(_orc_column_to_doris_column(col_name, doris_key_column, doris_key_type,
orc_key_type, orc_map->keys.get(),
element_size));
return _orc_column_to_doris_column(col_name, doris_value_column, doris_value_type,
orc_value_type, orc_map->elements.get(), element_size);
}
case TypeIndex::Struct: {
if (orc_column_type->getKind() != orc::TypeKind::STRUCT) {
return Status::InternalError("Wrong data type for colum '{}'", col_name);
}
auto* orc_struct = down_cast<orc::StructVectorBatch*>(cvb);
auto& doris_struct = static_cast<ColumnStruct&>(*data_column);
if (orc_struct->fields.size() != doris_struct.tuple_size()) {
return Status::InternalError("Wrong number of struct fields for column '{}'", col_name);
}
const DataTypeStruct* doris_struct_type =
reinterpret_cast<const DataTypeStruct*>(remove_nullable(data_type).get());
for (int i = 0; i < doris_struct.tuple_size(); ++i) {
orc::ColumnVectorBatch* orc_field = orc_struct->fields[i];
const orc::Type* orc_type = orc_column_type->getSubtype(i);
const ColumnPtr& doris_field = doris_struct.get_column_ptr(i);
const DataTypePtr& doris_type = doris_struct_type->get_element(i);
RETURN_IF_ERROR(_orc_column_to_doris_column(col_name, doris_field, doris_type, orc_type,
orc_field, num_values));
}
return Status::OK();
}
default:
break;
}

View File

@ -22,6 +22,7 @@
#include "common/config.h"
#include "exec/olap_common.h"
#include "io/fs/file_reader.h"
#include "vec/columns/column_array.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/exec/format/format_common.h"
@ -251,8 +252,9 @@ private:
size_t num_values);
Status _fill_doris_array_offsets(const std::string& col_name,
const MutableColumnPtr& data_column, orc::ListVectorBatch* lvb,
size_t num_values, size_t* element_size);
ColumnArray::Offsets64& doris_offsets,
orc::DataBuffer<int64_t>& orc_offsets, size_t num_values,
size_t* element_size);
std::string _get_field_name_lower_case(const orc::Type* orc_type, int pos);

View File

@ -43,6 +43,14 @@ public:
return _rle_decoder.GetNextRun(val, max_run);
}
inline level_t get_next() {
level_t next = -1;
_rle_decoder.Get(&next);
return next;
}
inline void rewind_one() { _rle_decoder.RewindOne(); }
private:
tparquet::Encoding::type _encoding;
level_t _bit_width = 0;

View File

@ -22,13 +22,40 @@
#include "schema_desc.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_struct.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_struct.h"
#include "vparquet_column_chunk_reader.h"
namespace doris::vectorized {
static void fill_struct_null_map(FieldSchema* field, NullMap& null_map,
const std::vector<level_t>& rep_levels,
const std::vector<level_t>& def_levels) {
size_t num_levels = def_levels.size();
DCHECK_EQ(num_levels, rep_levels.size());
size_t origin_size = null_map.size();
null_map.resize(origin_size + num_levels);
size_t pos = 0;
for (size_t i = 0; i < num_levels; ++i) {
// skip the levels affect its ancestor or its descendants
if (def_levels[i] < field->repeated_parent_def_level ||
rep_levels[i] > field->repetition_level) {
continue;
}
if (def_levels[i] >= field->definition_level) {
null_map[pos++] = 0;
} else {
null_map[pos++] = 1;
}
}
null_map.resize(origin_size + pos);
}
static void fill_array_offset(FieldSchema* field, ColumnArray::Offsets64& offsets_data,
NullMap* null_map_ptr, const std::vector<level_t>& rep_levels,
const std::vector<level_t>& def_levels) {
@ -72,9 +99,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
std::unique_ptr<ParquetColumnReader>& reader,
size_t max_buf_size) {
if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) {
return Status::Corruption("not supported type");
}
if (field->type.type == TYPE_ARRAY) {
std::unique_ptr<ParquetColumnReader> element_reader;
RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz,
@ -83,6 +107,30 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
ArrayColumnReader* array_reader = new ArrayColumnReader(row_ranges, ctz);
RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field));
reader.reset(array_reader);
} else if (field->type.type == TYPE_MAP) {
std::unique_ptr<ParquetColumnReader> key_reader;
std::unique_ptr<ParquetColumnReader> value_reader;
RETURN_IF_ERROR(create(file, &field->children[0].children[0], row_group, row_ranges, ctz,
key_reader, max_buf_size));
RETURN_IF_ERROR(create(file, &field->children[0].children[1], row_group, row_ranges, ctz,
value_reader, max_buf_size));
key_reader->set_nested_column();
value_reader->set_nested_column();
MapColumnReader* map_reader = new MapColumnReader(row_ranges, ctz);
RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field));
reader.reset(map_reader);
} else if (field->type.type == TYPE_STRUCT) {
std::vector<std::unique_ptr<ParquetColumnReader>> child_readers;
for (int i = 0; i < field->children.size(); ++i) {
std::unique_ptr<ParquetColumnReader> child_reader;
RETURN_IF_ERROR(create(file, &field->children[i], row_group, row_ranges, ctz,
child_reader, max_buf_size));
child_reader->set_nested_column();
child_readers.emplace_back(std::move(child_reader));
}
StructColumnReader* struct_reader = new StructColumnReader(row_ranges, ctz);
RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field));
reader.reset(struct_reader);
} else {
const tparquet::ColumnChunk& chunk = row_group.columns[field->physical_column_index];
ScalarColumnReader* scalar_reader = new ScalarColumnReader(row_ranges, chunk, ctz);
@ -141,7 +189,7 @@ Status ScalarColumnReader::_skip_values(size_t num_values) {
size_t null_size = 0;
size_t nonnull_size = 0;
while (skipped < num_values) {
level_t def_level;
level_t def_level = -1;
size_t loop_skip = def_decoder.get_next_run(&def_level, num_values - skipped);
if (loop_skip == 0) {
continue;
@ -229,35 +277,44 @@ Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& doris_colu
Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t batch_size,
size_t* read_rows, bool* eof) {
// prepare repetition and definition levels
_rep_levels.resize(0);
_def_levels.resize(0);
size_t parsed_rows = 0;
if (_nested_first_read) {
_nested_first_read = false;
} else {
// we have read one more repetition leve in last loop
parsed_rows = 1;
_rep_levels.emplace_back(0);
}
size_t remaining_values = _chunk_reader->remaining_num_values() - parsed_rows;
LevelDecoder& rep_decoder = _chunk_reader->rep_level_decoder();
while (parsed_rows <= batch_size && remaining_values > 0) {
level_t rep_level;
// TODO(gaoxin): It's better to read repetition levels in a batch.
size_t loop_parsed = rep_decoder.get_next_run(&rep_level, remaining_values);
for (size_t i = 0; i < loop_parsed; ++i) {
size_t remaining_values = _chunk_reader->remaining_num_values();
bool has_rep_level = _chunk_reader->max_rep_level() > 0;
bool has_def_level = _chunk_reader->max_def_level() > 0;
if (has_rep_level) {
LevelDecoder& rep_decoder = _chunk_reader->rep_level_decoder();
while (parsed_rows <= batch_size && remaining_values > 0) {
level_t rep_level = rep_decoder.get_next();
if (rep_level == 0) {
if (parsed_rows == batch_size) {
rep_decoder.rewind_one();
break;
}
parsed_rows++;
}
_rep_levels.emplace_back(rep_level);
remaining_values--;
}
remaining_values -= loop_parsed;
// when repetition level == 1, it's a new row in doris_column.
if (rep_level == 0) {
parsed_rows += loop_parsed;
} else {
parsed_rows = std::min(remaining_values, batch_size);
remaining_values -= parsed_rows;
_rep_levels.resize(parsed_rows);
for (size_t i = 0; i < parsed_rows; ++i) {
_rep_levels[i] = 0;
}
}
size_t parsed_values = _chunk_reader->remaining_num_values() - remaining_values;
_def_levels.resize(parsed_values);
_chunk_reader->def_level_decoder().get_levels(&_def_levels[0], parsed_values);
if (has_def_level) {
_chunk_reader->def_level_decoder().get_levels(&_def_levels[0], parsed_values);
} else {
for (size_t i = 0; i < parsed_values; ++i) {
_def_levels[i] = 0;
}
}
MutableColumnPtr data_column;
std::vector<uint16_t> null_map;
@ -276,7 +333,8 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
}
size_t has_read = 0;
size_t ancestor_nulls = 0;
bool prev_is_null = true;
null_map.emplace_back(0);
bool prev_is_null = false;
while (has_read < parsed_values) {
level_t def_level = _def_levels[has_read++];
size_t loop_read = 1;
@ -287,21 +345,27 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
if (def_level < _field_schema->repeated_parent_def_level) {
// when def_level is less than repeated_parent_def_level, it means that level
// will affect its ancestor.
ancestor_nulls++;
ancestor_nulls += loop_read;
continue;
}
bool is_null = def_level < _field_schema->definition_level;
if (!(prev_is_null ^ is_null)) {
null_map.emplace_back(0);
if (prev_is_null == is_null) {
if (USHRT_MAX - null_map.back() >= loop_read) {
null_map.back() += loop_read;
}
} else {
if (!(prev_is_null ^ is_null)) {
null_map.emplace_back(0);
}
size_t remaining = loop_read;
while (remaining > USHRT_MAX) {
null_map.emplace_back(USHRT_MAX);
null_map.emplace_back(0);
remaining -= USHRT_MAX;
}
null_map.emplace_back((u_short)remaining);
prev_is_null = is_null;
}
size_t remaining = loop_read;
while (remaining > USHRT_MAX) {
null_map.emplace_back(USHRT_MAX);
null_map.emplace_back(0);
remaining -= USHRT_MAX;
}
null_map.emplace_back((u_short)remaining);
prev_is_null = is_null;
}
size_t num_values = parsed_values - ancestor_nulls;
@ -448,4 +512,119 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr&
return Status::OK();
}
Status MapColumnReader::init(std::unique_ptr<ParquetColumnReader> key_reader,
std::unique_ptr<ParquetColumnReader> value_reader,
FieldSchema* field) {
_field_schema = field;
_key_reader = std::move(key_reader);
_value_reader = std::move(value_reader);
return Status::OK();
}
Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t batch_size,
size_t* read_rows, bool* eof) {
MutableColumnPtr data_column;
NullMap* null_map_ptr = nullptr;
if (doris_column->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(doris_column)).mutate().get());
null_map_ptr = &nullable_column->get_null_map_data();
data_column = nullable_column->get_nested_column_ptr();
} else {
if (_field_schema->is_nullable) {
return Status::Corruption("Not nullable column has null values in parquet file");
}
data_column = doris_column->assume_mutable();
}
auto& map = static_cast<ColumnMap&>(*data_column);
DataTypePtr& key_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(type).get())->get_key_type());
DataTypePtr& value_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(type).get())->get_value_type());
ColumnPtr& key_column =
typeid_cast<ColumnArray*>(map.get_keys_ptr()->assume_mutable().get())->get_data_ptr();
ColumnPtr& value_column =
typeid_cast<ColumnArray*>(map.get_values_ptr()->assume_mutable().get())->get_data_ptr();
size_t key_rows = 0;
size_t value_rows = 0;
bool key_eof = false;
bool value_eof = false;
RETURN_IF_ERROR(_key_reader->read_column_data(key_column, key_type, select_vector, batch_size,
&key_rows, &key_eof));
select_vector.reset();
RETURN_IF_ERROR(_value_reader->read_column_data(value_column, value_type, select_vector,
batch_size, &value_rows, &value_eof));
DCHECK_EQ(key_rows, value_rows);
DCHECK_EQ(key_eof, value_eof);
*read_rows = key_rows;
*eof = key_eof;
if (*read_rows == 0) {
return Status::OK();
}
// fill offset and null map
fill_array_offset(_field_schema, map.get_offsets(), null_map_ptr, _key_reader->get_rep_level(),
_key_reader->get_def_level());
return Status::OK();
}
Status StructColumnReader::init(std::vector<std::unique_ptr<ParquetColumnReader>>&& child_readers,
FieldSchema* field) {
_field_schema = field;
_child_readers = std::move(child_readers);
return Status::OK();
}
Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t batch_size,
size_t* read_rows, bool* eof) {
MutableColumnPtr data_column;
NullMap* null_map_ptr = nullptr;
if (doris_column->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(doris_column)).mutate().get());
null_map_ptr = &nullable_column->get_null_map_data();
data_column = nullable_column->get_nested_column_ptr();
} else {
if (_field_schema->is_nullable) {
return Status::Corruption("Not nullable column has null values in parquet file");
}
data_column = doris_column->assume_mutable();
}
auto& doris_struct = static_cast<ColumnStruct&>(*data_column);
if (_child_readers.size() != doris_struct.tuple_size()) {
return Status::InternalError("Wrong number of struct fields");
}
const DataTypeStruct* doris_struct_type =
reinterpret_cast<const DataTypeStruct*>(remove_nullable(type).get());
for (int i = 0; i < doris_struct.tuple_size(); ++i) {
ColumnPtr& doris_field = doris_struct.get_column_ptr(i);
DataTypePtr& doris_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(i));
select_vector.reset();
size_t loop_rows = 0;
bool loop_eof = false;
_child_readers[i]->read_column_data(doris_field, doris_type, select_vector, batch_size,
&loop_rows, &loop_eof);
if (i != 0) {
DCHECK_EQ(*read_rows, loop_rows);
DCHECK_EQ(*eof, loop_eof);
} else {
*read_rows = loop_rows;
*eof = loop_eof;
}
}
if (null_map_ptr != nullptr) {
fill_struct_null_map(_field_schema, *null_map_ptr, _child_readers[0]->get_rep_level(),
_child_readers[0]->get_def_level());
}
return Status::OK();
}
}; // namespace doris::vectorized

View File

@ -101,7 +101,6 @@ protected:
FieldSchema* _field_schema;
// When scalar column is the child of nested column, we should turn off the filtering by page index and lazy read.
bool _nested_column = false;
bool _nested_first_read = true;
const std::vector<RowRange>& _row_ranges;
cctz::time_zone* _ctz;
tparquet::OffsetIndex* _offset_index;
@ -164,4 +163,72 @@ public:
private:
std::unique_ptr<ParquetColumnReader> _element_reader = nullptr;
};
class MapColumnReader : public ParquetColumnReader {
public:
MapColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz)
: ParquetColumnReader(row_ranges, ctz) {}
~MapColumnReader() override { close(); }
Status init(std::unique_ptr<ParquetColumnReader> key_reader,
std::unique_ptr<ParquetColumnReader> value_reader, FieldSchema* field);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows,
bool* eof) override;
const std::vector<level_t>& get_rep_level() const override {
return _key_reader->get_rep_level();
}
const std::vector<level_t>& get_def_level() const override {
return _key_reader->get_def_level();
}
Statistics statistics() override {
Statistics kst = _key_reader->statistics();
Statistics vst = _value_reader->statistics();
kst.merge(vst);
return kst;
}
void close() override {}
private:
std::unique_ptr<ParquetColumnReader> _key_reader = nullptr;
std::unique_ptr<ParquetColumnReader> _value_reader = nullptr;
};
class StructColumnReader : public ParquetColumnReader {
public:
StructColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz)
: ParquetColumnReader(row_ranges, ctz) {}
~StructColumnReader() override { close(); }
Status init(std::vector<std::unique_ptr<ParquetColumnReader>>&& child_readers,
FieldSchema* field);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows,
bool* eof) override;
const std::vector<level_t>& get_rep_level() const override {
return _child_readers[0]->get_rep_level();
}
const std::vector<level_t>& get_def_level() const override {
return _child_readers[0]->get_def_level();
}
Statistics statistics() override {
Statistics st;
for (const auto& reader : _child_readers) {
Statistics cst = reader->statistics();
st.merge(cst);
}
return st;
}
void close() override {}
private:
std::vector<std::unique_ptr<ParquetColumnReader>> _child_readers;
};
}; // namespace doris::vectorized

View File

@ -669,6 +669,11 @@ public class HiveMetaStoreClientHelper {
}
}
/**
* The nested column has inner columns, and each column is separated a comma. The inner column maybe a nested
* column too, so we cannot simply split by the comma. We need to match the angle brackets,
* and deal with the inner column recursively.
*/
private static int findNextNestedField(String commaSplitFields) {
int numLess = 0;
for (int i = 0; i < commaSplitFields.length(); i++) {
@ -714,12 +719,14 @@ public class HiveMetaStoreClientHelper {
default:
break;
}
// resolve schema like array<int>
if (lowerCaseType.startsWith("array")) {
if (lowerCaseType.indexOf("<") == 5 && lowerCaseType.lastIndexOf(">") == lowerCaseType.length() - 1) {
Type innerType = hiveTypeToDorisType(lowerCaseType.substring(6, lowerCaseType.length() - 1));
return ArrayType.create(innerType, true);
}
}
// resolve schema like map<text, int>
if (lowerCaseType.startsWith("map")) {
if (lowerCaseType.indexOf("<") == 3 && lowerCaseType.lastIndexOf(">") == lowerCaseType.length() - 1) {
String keyValue = lowerCaseType.substring(4, lowerCaseType.length() - 1);
@ -730,6 +737,7 @@ public class HiveMetaStoreClientHelper {
}
}
}
// resolve schema like struct<col1: text, col2: int>
if (lowerCaseType.startsWith("struct")) {
if (lowerCaseType.indexOf("<") == 6 && lowerCaseType.lastIndexOf(">") == lowerCaseType.length() - 1) {
String listFields = lowerCaseType.substring(7, lowerCaseType.length() - 1);