Parquet io optimization

This commit is contained in:
obdev 2025-01-01 14:15:41 +00:00 committed by ob-robot
parent e77ab8b65a
commit e32f5fc6d3
9 changed files with 217 additions and 23 deletions

View File

@ -2379,3 +2379,6 @@ DEF_INT(query_memory_limit_percentage, OB_TENANT_PARAMETER, "50", "[0,100]",
DEF_INT(package_state_sync_max_size, OB_TENANT_PARAMETER, "8192", "[0, 16777216]",
"the max sync size of single package state that can sync package var value. If over it, package state will not sync package var value. Range: [0, 16777216] in integer",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_CAP(_parquet_row_group_prebuffer_size, OB_CLUSTER_PARAMETER, "0M", "[0M,)",
"the parquet prefetch maximum row group size. Range: [0, +∞)",
ObParameterAttr(Section::SSTABLE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

View File

@ -997,6 +997,7 @@ ob_set_subtarget(ob_sql engine_table
engine/table/ob_orc_table_row_iter.cpp
engine/table/ob_parquet_table_row_iter.cpp
engine/table/ob_odps_table_row_iter.cpp
engine/table/ob_file_prefetch_buffer.cpp
)
ob_set_subtarget(ob_sql executor

View File

@ -121,7 +121,6 @@ int64_t ObArrowMemPool::bytes_allocated() const {
return total_alloc_size_;
}
/* ObArrowFile */
int ObArrowFile::open()
{
@ -138,9 +137,14 @@ arrow::Result<int64_t> ObArrowFile::Read(int64_t nbytes, void *out)
int ret = OB_SUCCESS;
arrow::Result<int64_t> ret_code;
int64_t read_size = -1;
if (OB_FAIL(file_reader_.pread(out, nbytes, position_, read_size))) {
if (file_prefetch_buffer_.in_prebuffer_range(position_, nbytes)) {
file_prefetch_buffer_.fetch(position_, nbytes, out);
position_ += nbytes;
ret_code = nbytes;
} else if (OB_FAIL(file_reader_.pread(out, nbytes, position_, read_size))) {
LOG_WARN("fail to read file", K(ret), K(nbytes));
ret_code = arrow::Result<int64_t>(arrow::Status(arrow::StatusCode::IOError, "read file failed"));
ret_code =
arrow::Result<int64_t>(arrow::Status(arrow::StatusCode::IOError, "read file failed"));
} else {
position_ += read_size;
ret_code = read_size;
@ -166,10 +170,14 @@ arrow::Result<int64_t> ObArrowFile::ReadAt(int64_t position, int64_t nbytes, voi
int ret = OB_SUCCESS;
arrow::Result<int64_t> ret_code;
int64_t read_size = -1;
if (OB_FAIL(file_reader_.pread(out, nbytes, position, read_size))) {
if (file_prefetch_buffer_.in_prebuffer_range(position, nbytes)) {
file_prefetch_buffer_.fetch(position, nbytes, out);
position_ = position + nbytes;
ret_code = nbytes;
} else if (OB_FAIL(file_reader_.pread(out, nbytes, position, read_size))) {
LOG_WARN("fail to read file", K(ret), K(position), K(nbytes));
ret_code = arrow::Result<int64_t>(arrow::Status(arrow::StatusCode::IOError, "read at file failed"));
ret_code =
arrow::Result<int64_t>(arrow::Status(arrow::StatusCode::IOError, "read at file failed"));
} else {
position_ = position + read_size;
ret_code = read_size;

View File

@ -23,6 +23,7 @@
#include "share/ob_device_manager.h"
#include "sql/engine/table/ob_external_table_access_service.h"
#include "sql/engine/basic/ob_select_into_basic.h"
#include "sql/engine/table/ob_file_prefetch_buffer.h"
namespace oceanbase
{
@ -101,8 +102,10 @@ private:
class ObArrowFile : public arrow::io::RandomAccessFile {
public:
ObArrowFile(ObExternalDataAccessDriver &file_reader, const char*file_name, arrow::MemoryPool *pool)
: file_reader_(file_reader), file_name_(file_name), pool_(pool)
ObArrowFile(ObExternalDataAccessDriver &file_reader, const char *file_name,
arrow::MemoryPool *pool, ObFilePrefetchBuffer &file_prefetch_buffer) :
file_reader_(file_reader),
file_name_(file_name), pool_(pool), file_prefetch_buffer_(file_prefetch_buffer)
{}
~ObArrowFile() override {
file_reader_.close();
@ -127,6 +130,7 @@ private:
const char* file_name_;
arrow::MemoryPool *pool_;
int64_t position_;
ObFilePrefetchBuffer &file_prefetch_buffer_;
};
class ObParquetOutputStream : public arrow::io::OutputStream

View File

@ -0,0 +1,100 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "ob_file_prefetch_buffer.h"
#include "sql/engine/ob_exec_context.h"
namespace oceanbase
{
using namespace share::schema;
using namespace common;
using namespace share;
namespace sql {
void ObFilePrefetchBuffer::destroy()
{
if (nullptr != buffer_) {
alloc_.free(buffer_);
buffer_ = nullptr;
}
offset_ = 0;
length_ = 0;
buffer_size_ = 0;
}
void ObFilePrefetchBuffer::clear()
{
offset_ = 0;
length_ = 0;
}
int ObFilePrefetchBuffer::prefetch(const int64_t file_offset, const int64_t size)
{
int ret = OB_SUCCESS;
int64_t max_prebuffer_size = GCONF._parquet_row_group_prebuffer_size;
offset_ = 0;
length_ = 0;
if (size > max_prebuffer_size) {
// do nothing
LOG_TRACE("exceeding the maximum prefetch size", K(size), K(max_prebuffer_size));
} else {
void *buffer = nullptr;
int64_t read_size = 0;
if (size > buffer_size_) {
alloc_.free(buffer_);
buffer_ = nullptr;
buffer_size_ = 0;
}
if (nullptr == buffer_) {
if (OB_ISNULL(buffer = alloc_.alloc(size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc prefetch buffer", K(ret), K(size));
} else {
buffer_ = buffer;
buffer_size_ = size;
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(file_reader_.pread(buffer_, size, file_offset, read_size))) {
LOG_WARN("fail to read file", K(ret), K(size));
} else if (size != read_size) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected read size", K(size), K(read_size));
} else {
offset_ = file_offset;
length_ = size;
}
LOG_INFO("success prefetch", K(ret), K(file_offset), K(size));
}
return ret;
}
bool ObFilePrefetchBuffer::in_prebuffer_range(const int64_t position, const int64_t nbytes)
{
bool in_range = true;
if (OB_UNLIKELY(nullptr == buffer_) || position < offset_
|| position + nbytes > offset_ + length_) {
in_range = false;
LOG_TRACE("out of prebuffer range", K(position), K(nbytes), K(offset_), K(length_));
}
return in_range;
}
void ObFilePrefetchBuffer::fetch(const int64_t position, const int64_t nbytes, void *out)
{
MEMCPY(out, (char *)buffer_ + (position - offset_), nbytes);
}
}
}

View File

@ -0,0 +1,51 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_FILE_PREFETCH_BUFFER_H
#define OB_FILE_PREFETCH_BUFFER_H
#include "share/ob_i_tablet_scan.h"
#include "lib/file/ob_file.h"
#include "sql/engine/table/ob_external_table_access_service.h"
namespace oceanbase {
namespace sql {
class ObFilePrefetchBuffer
{
public:
ObFilePrefetchBuffer(ObExternalDataAccessDriver &file_reader) :
offset_(0), length_(0), buffer_size_(0), buffer_(nullptr), file_reader_(file_reader),
alloc_(common::ObMemAttr(MTL_ID(), "PrefetchBuffer"))
{}
~ObFilePrefetchBuffer()
{
destroy();
}
void clear();
void destroy();
int prefetch(const int64_t file_offset, const int64_t size);
bool in_prebuffer_range(const int64_t position, const int64_t nbytes);
// NOTE: before calling, make sure it is within the buffer range.
void fetch(const int64_t position, const int64_t nbytes, void *out);
private:
int64_t offset_;
int64_t length_;
int64_t buffer_size_;
void *buffer_;
ObExternalDataAccessDriver &file_reader_;
common::ObMalloc alloc_;
};
}
}
#endif // OB_FILE_PREFETCH_BUFFER_H

View File

@ -42,6 +42,7 @@ ObParquetTableRowIterator::~ObParquetTableRowIterator()
for (int i = 0; i < column_readers_.count(); i++) {
column_readers_.at(i) = NULL;
}
file_prefetch_buffer_.destroy();
}
int ObParquetTableRowIterator::init(const storage::ObTableScanParam *scan_param)
{
@ -168,8 +169,8 @@ int ObParquetTableRowIterator::next_file()
try {
file_meta_.reset();
file_reader_.reset();
std::shared_ptr<ObArrowFile> cur_file =
std::make_shared<ObArrowFile>(data_access_driver_, url_.ptr(), &arrow_alloc_);
std::shared_ptr<ObArrowFile> cur_file = std::make_shared<ObArrowFile>(
data_access_driver_, url_.ptr(), &arrow_alloc_, file_prefetch_buffer_);
OZ (cur_file.get()->open());
if (OB_SUCC(ret)) {
file_reader_ = parquet::ParquetFileReader::Open(cur_file, read_props_);
@ -264,19 +265,23 @@ int ObParquetTableRowIterator::next_row_group()
}
if (OB_SUCC(ret)) {
int64_t cur_row_group = (state_.cur_row_group_idx_++) - 1;
try {
std::shared_ptr<parquet::RowGroupReader> rg_reader = file_reader_->RowGroup(cur_row_group);
state_.cur_row_group_read_row_count_ = 0;
state_.cur_row_group_row_count_ = file_meta_->RowGroup(cur_row_group)->num_rows();
for (int i = 0; OB_SUCC(ret) && i < column_indexs_.count(); i++) {
column_readers_.at(i) = rg_reader->Column(column_indexs_.at(i));
if (OB_FAIL(prefetch_parquet_row_group(file_meta_->RowGroup(cur_row_group)))) {
LOG_WARN("failed to prefetch parquet row group", K(ret));
} else {
try {
std::shared_ptr<parquet::RowGroupReader> rg_reader = file_reader_->RowGroup(cur_row_group);
state_.cur_row_group_read_row_count_ = 0;
state_.cur_row_group_row_count_ = file_meta_->RowGroup(cur_row_group)->num_rows();
for (int i = 0; OB_SUCC(ret) && i < column_indexs_.count(); i++) {
column_readers_.at(i) = rg_reader->Column(column_indexs_.at(i));
}
} catch(const std::exception& e) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected index", K(ret), "Info", e.what(), K(cur_row_group), K(column_indexs_));
} catch(...) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected index", K(ret), K(cur_row_group), K(column_indexs_));
}
} catch(const std::exception& e) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected index", K(ret), "Info", e.what(), K(cur_row_group), K(column_indexs_));
} catch(...) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected index", K(ret), K(cur_row_group), K(column_indexs_));
}
}
return ret;
@ -1452,6 +1457,24 @@ int ObParquetTableRowIterator::get_next_row()
void ObParquetTableRowIterator::reset() {
// reset state_ to initial values for rescan
state_.reuse();
file_prefetch_buffer_.destroy();
}
int ObParquetTableRowIterator::prefetch_parquet_row_group(
std::unique_ptr<parquet::RowGroupMetaData> row_group_meta)
{
int ret = OB_SUCCESS;
int64_t select_col_cnt = column_exprs_.count();
const double MIN_SELECTION_RATE_THRESHOLD = 0.8;
file_prefetch_buffer_.clear();
if (select_col_cnt / row_group_meta->num_columns() >= MIN_SELECTION_RATE_THRESHOLD) {
if (OB_FAIL(file_prefetch_buffer_.prefetch(row_group_meta->file_offset(),
row_group_meta->total_compressed_size()))) {
LOG_WARN("failed to prefetch from parquet file", K(row_group_meta->file_offset()),
K(row_group_meta->total_compressed_size()));
}
}
return ret;
}
DEF_TO_STRING(ObParquetIteratorState)

View File

@ -63,7 +63,8 @@ public:
read_props_(&arrow_alloc_),
file_column_exprs_(allocator_),
file_meta_column_exprs_(allocator_),
bit_vector_cache_(NULL) {}
bit_vector_cache_(NULL),
file_prefetch_buffer_(data_access_driver_) {}
virtual ~ObParquetTableRowIterator();
int init(const storage::ObTableScanParam *scan_param) override;
@ -147,6 +148,7 @@ private:
int next_file();
int next_row_group();
int calc_pseudo_exprs(const int64_t read_count);
int prefetch_parquet_row_group(std::unique_ptr<parquet::RowGroupMetaData> row_group_meta);
private:
ObParquetIteratorState state_;
lib::ObMemAttr mem_attr_;
@ -168,6 +170,7 @@ private:
common::ObArrayWrap<int16_t> rep_levels_buf_;
common::ObArrayWrap<char *> file_url_ptrs_; //for file url expr
common::ObArrayWrap<ObLength> file_url_lens_; //for file url expr
ObFilePrefetchBuffer file_prefetch_buffer_;
};
}

View File

@ -474,6 +474,7 @@ _parallel_max_active_sessions
_parallel_min_message_pool
_parallel_redo_logging_trigger
_parallel_server_sleep_time
_parquet_row_group_prebuffer_size
_partition_wise_plan_enabled
_pdml_thread_cache_size
_pipelined_table_function_memory_limit