[fix](multi-catalog)fix page index thrift deserialize (#15001)
fix the err when parse page index: Couldn't deserialize thrift msg. use two buffer to store column index and offset index msg, avoid parse them in a buffer
This commit is contained in:
@ -94,11 +94,11 @@ Status PageIndex::parse_column_index(const tparquet::ColumnChunk& chunk, const u
|
||||
}
|
||||
|
||||
Status PageIndex::parse_offset_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
|
||||
int64_t buffer_size, tparquet::OffsetIndex* offset_index) {
|
||||
int64_t buffer_offset = chunk.offset_index_offset - _offset_index_start + _column_index_size;
|
||||
tparquet::OffsetIndex* offset_index) {
|
||||
int64_t buffer_offset = chunk.offset_index_offset - _offset_index_start;
|
||||
uint32_t length = chunk.offset_index_length;
|
||||
DCHECK_GE(buffer_offset, 0);
|
||||
DCHECK_LE(buffer_offset + length, buffer_size);
|
||||
DCHECK_LE(buffer_offset + length, _offset_index_size);
|
||||
RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, true, offset_index));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ public:
|
||||
Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
|
||||
tparquet::ColumnIndex* _column_index);
|
||||
Status parse_offset_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
|
||||
int64_t buffer_size, tparquet::OffsetIndex* _offset_index);
|
||||
tparquet::OffsetIndex* _offset_index);
|
||||
|
||||
private:
|
||||
friend class ParquetReader;
|
||||
|
||||
@ -545,14 +545,18 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
|
||||
if (!_has_page_index(row_group.columns, page_index)) {
|
||||
return Status::OK();
|
||||
}
|
||||
int64_t buffer_size = page_index._column_index_size + page_index._offset_index_size;
|
||||
uint8_t buff[buffer_size];
|
||||
// int64_t buffer_size = page_index._column_index_size;
|
||||
uint8_t col_index_buff[page_index._column_index_size];
|
||||
int64_t bytes_read = 0;
|
||||
RETURN_IF_ERROR(
|
||||
_file_reader->readat(page_index._column_index_start, buffer_size, &bytes_read, buff));
|
||||
|
||||
RETURN_IF_ERROR(_file_reader->readat(page_index._column_index_start,
|
||||
page_index._column_index_size, &bytes_read,
|
||||
col_index_buff));
|
||||
auto& schema_desc = _file_metadata->schema();
|
||||
std::vector<RowRange> skipped_row_ranges;
|
||||
uint8_t off_index_buff[page_index._offset_index_size];
|
||||
RETURN_IF_ERROR(_file_reader->readat(page_index._offset_index_start,
|
||||
page_index._offset_index_size, &bytes_read,
|
||||
off_index_buff));
|
||||
for (auto& read_col : _read_columns) {
|
||||
auto conjunct_iter = _colname_to_value_range->find(read_col._file_slot_name);
|
||||
if (_colname_to_value_range->end() == conjunct_iter) {
|
||||
@ -563,7 +567,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
|
||||
if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
RETURN_IF_ERROR(page_index.parse_column_index(chunk, buff, &column_index));
|
||||
RETURN_IF_ERROR(page_index.parse_column_index(chunk, col_index_buff, &column_index));
|
||||
const int num_of_pages = column_index.null_pages.size();
|
||||
if (num_of_pages <= 0) {
|
||||
break;
|
||||
@ -577,7 +581,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
|
||||
continue;
|
||||
}
|
||||
tparquet::OffsetIndex offset_index;
|
||||
RETURN_IF_ERROR(page_index.parse_offset_index(chunk, buff, buffer_size, &offset_index));
|
||||
RETURN_IF_ERROR(page_index.parse_offset_index(chunk, off_index_buff, &offset_index));
|
||||
for (int page_id : skipped_page_range) {
|
||||
RowRange skipped_row_range;
|
||||
page_index.create_skipped_row_range(offset_index, row_group.num_rows, page_id,
|
||||
|
||||
Reference in New Issue
Block a user