diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 64a5f5aa96..f5f6bb516d 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -2379,3 +2379,6 @@ DEF_INT(query_memory_limit_percentage, OB_TENANT_PARAMETER, "50", "[0,100]", DEF_INT(package_state_sync_max_size, OB_TENANT_PARAMETER, "8192", "[0, 16777216]", "the max sync size of single package state that can sync package var value. If over it, package state will not sync package var value. Range: [0, 16777216] in integer", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_CAP(_parquet_row_group_prebuffer_size, OB_CLUSTER_PARAMETER, "0M", "[0M,)", + "the parquet prefetch maximum row group size. Range: [0, +∞)", + ObParameterAttr(Section::SSTABLE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); diff --git a/src/sql/CMakeLists.txt b/src/sql/CMakeLists.txt index 6ce2a4d6f2..9a84dd9023 100644 --- a/src/sql/CMakeLists.txt +++ b/src/sql/CMakeLists.txt @@ -997,6 +997,7 @@ ob_set_subtarget(ob_sql engine_table engine/table/ob_orc_table_row_iter.cpp engine/table/ob_parquet_table_row_iter.cpp engine/table/ob_odps_table_row_iter.cpp + engine/table/ob_file_prefetch_buffer.cpp ) ob_set_subtarget(ob_sql executor diff --git a/src/sql/engine/basic/ob_arrow_basic.cpp b/src/sql/engine/basic/ob_arrow_basic.cpp index 1831426e77..e1149cc542 100644 --- a/src/sql/engine/basic/ob_arrow_basic.cpp +++ b/src/sql/engine/basic/ob_arrow_basic.cpp @@ -121,7 +121,6 @@ int64_t ObArrowMemPool::bytes_allocated() const { return total_alloc_size_; } - /* ObArrowFile */ int ObArrowFile::open() { @@ -138,9 +137,14 @@ arrow::Result ObArrowFile::Read(int64_t nbytes, void *out) int ret = OB_SUCCESS; arrow::Result ret_code; int64_t read_size = -1; - if (OB_FAIL(file_reader_.pread(out, nbytes, position_, read_size))) { + if (file_prefetch_buffer_.in_prebuffer_range(position_, nbytes)) { + file_prefetch_buffer_.fetch(position_, nbytes, out); + position_ += nbytes; + ret_code = nbytes; + } else if (OB_FAIL(file_reader_.pread(out, nbytes, position_, read_size))) { LOG_WARN("fail to read file", K(ret), K(nbytes)); - ret_code = arrow::Result(arrow::Status(arrow::StatusCode::IOError, "read file failed")); + ret_code = + arrow::Result(arrow::Status(arrow::StatusCode::IOError, "read file failed")); } else { position_ += read_size; ret_code = read_size; @@ -166,10 +170,14 @@ arrow::Result ObArrowFile::ReadAt(int64_t position, int64_t nbytes, voi int ret = OB_SUCCESS; arrow::Result ret_code; int64_t read_size = -1; - - if (OB_FAIL(file_reader_.pread(out, nbytes, position, read_size))) { + if (file_prefetch_buffer_.in_prebuffer_range(position, nbytes)) { + file_prefetch_buffer_.fetch(position, nbytes, out); + position_ = position + nbytes; + ret_code = nbytes; + } else if (OB_FAIL(file_reader_.pread(out, nbytes, position, read_size))) { LOG_WARN("fail to read file", K(ret), K(position), K(nbytes)); - ret_code = arrow::Result(arrow::Status(arrow::StatusCode::IOError, "read at file failed")); + ret_code = + arrow::Result(arrow::Status(arrow::StatusCode::IOError, "read at file failed")); } else { position_ = position + read_size; ret_code = read_size; diff --git a/src/sql/engine/basic/ob_arrow_basic.h b/src/sql/engine/basic/ob_arrow_basic.h index e605529df8..c838380e5e 100644 --- a/src/sql/engine/basic/ob_arrow_basic.h +++ b/src/sql/engine/basic/ob_arrow_basic.h @@ -23,6 +23,7 @@ #include "share/ob_device_manager.h" #include "sql/engine/table/ob_external_table_access_service.h" #include "sql/engine/basic/ob_select_into_basic.h" +#include "sql/engine/table/ob_file_prefetch_buffer.h" namespace oceanbase { @@ -101,8 +102,10 @@ private: class ObArrowFile : public arrow::io::RandomAccessFile { public: - ObArrowFile(ObExternalDataAccessDriver &file_reader, const char*file_name, arrow::MemoryPool *pool) - : file_reader_(file_reader), file_name_(file_name), pool_(pool) + ObArrowFile(ObExternalDataAccessDriver &file_reader, const char *file_name, + arrow::MemoryPool *pool, ObFilePrefetchBuffer &file_prefetch_buffer) : + file_reader_(file_reader), + file_name_(file_name), pool_(pool), file_prefetch_buffer_(file_prefetch_buffer) {} ~ObArrowFile() override { file_reader_.close(); @@ -127,6 +130,7 @@ private: const char* file_name_; arrow::MemoryPool *pool_; int64_t position_; + ObFilePrefetchBuffer &file_prefetch_buffer_; }; class ObParquetOutputStream : public arrow::io::OutputStream diff --git a/src/sql/engine/table/ob_file_prefetch_buffer.cpp b/src/sql/engine/table/ob_file_prefetch_buffer.cpp new file mode 100644 index 0000000000..188bca08fa --- /dev/null +++ b/src/sql/engine/table/ob_file_prefetch_buffer.cpp @@ -0,0 +1,100 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SQL_ENG +#include "ob_file_prefetch_buffer.h" +#include "sql/engine/ob_exec_context.h" + +namespace oceanbase +{ +using namespace share::schema; +using namespace common; +using namespace share; +namespace sql { + +void ObFilePrefetchBuffer::destroy() +{ + if (nullptr != buffer_) { + alloc_.free(buffer_); + buffer_ = nullptr; + } + offset_ = 0; + length_ = 0; + buffer_size_ = 0; +} + +void ObFilePrefetchBuffer::clear() +{ + offset_ = 0; + length_ = 0; +} + +int ObFilePrefetchBuffer::prefetch(const int64_t file_offset, const int64_t size) +{ + int ret = OB_SUCCESS; + int64_t max_prebuffer_size = GCONF._parquet_row_group_prebuffer_size; + offset_ = 0; + length_ = 0; + if (size > max_prebuffer_size) { + // do nothing + LOG_TRACE("exceeding the maximum prefetch size", K(size), K(max_prebuffer_size)); + } else { + void *buffer = nullptr; + int64_t read_size = 0; + if (size > buffer_size_) { + alloc_.free(buffer_); + buffer_ = nullptr; + buffer_size_ = 0; + } + if (nullptr == buffer_) { + if (OB_ISNULL(buffer = alloc_.alloc(size))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to alloc prefetch buffer", K(ret), K(size)); + } else { + buffer_ = buffer; + buffer_size_ = size; + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(file_reader_.pread(buffer_, size, file_offset, read_size))) { + LOG_WARN("fail to read file", K(ret), K(size)); + } else if (size != read_size) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected read size", K(size), K(read_size)); + } else { + offset_ = file_offset; + length_ = size; + } + LOG_INFO("success prefetch", K(ret), K(file_offset), K(size)); + } + return ret; +} + +bool ObFilePrefetchBuffer::in_prebuffer_range(const int64_t position, const int64_t nbytes) +{ + bool in_range = true; + if (OB_UNLIKELY(nullptr == buffer_) || position < offset_ + || position + nbytes > offset_ + length_) { + in_range = false; + LOG_TRACE("out of prebuffer range", K(position), K(nbytes), K(offset_), K(length_)); + } + return in_range; +} + + +void ObFilePrefetchBuffer::fetch(const int64_t position, const int64_t nbytes, void *out) +{ + MEMCPY(out, (char *)buffer_ + (position - offset_), nbytes); +} + +} +} diff --git a/src/sql/engine/table/ob_file_prefetch_buffer.h b/src/sql/engine/table/ob_file_prefetch_buffer.h new file mode 100644 index 0000000000..4ea9343a11 --- /dev/null +++ b/src/sql/engine/table/ob_file_prefetch_buffer.h @@ -0,0 +1,51 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OB_FILE_PREFETCH_BUFFER_H +#define OB_FILE_PREFETCH_BUFFER_H + +#include "share/ob_i_tablet_scan.h" +#include "lib/file/ob_file.h" +#include "sql/engine/table/ob_external_table_access_service.h" + +namespace oceanbase { +namespace sql { +class ObFilePrefetchBuffer +{ +public: + ObFilePrefetchBuffer(ObExternalDataAccessDriver &file_reader) : + offset_(0), length_(0), buffer_size_(0), buffer_(nullptr), file_reader_(file_reader), + alloc_(common::ObMemAttr(MTL_ID(), "PrefetchBuffer")) + {} + ~ObFilePrefetchBuffer() + { + destroy(); + } + void clear(); + void destroy(); + int prefetch(const int64_t file_offset, const int64_t size); + bool in_prebuffer_range(const int64_t position, const int64_t nbytes); + // NOTE: before calling, make sure it is within the buffer range. + void fetch(const int64_t position, const int64_t nbytes, void *out); + +private: + int64_t offset_; + int64_t length_; + int64_t buffer_size_; + void *buffer_; + ObExternalDataAccessDriver &file_reader_; + common::ObMalloc alloc_; +}; +} +} + +#endif // OB_FILE_PREFETCH_BUFFER_H diff --git a/src/sql/engine/table/ob_parquet_table_row_iter.cpp b/src/sql/engine/table/ob_parquet_table_row_iter.cpp index b61a751097..3c04ff3dc5 100644 --- a/src/sql/engine/table/ob_parquet_table_row_iter.cpp +++ b/src/sql/engine/table/ob_parquet_table_row_iter.cpp @@ -42,6 +42,7 @@ ObParquetTableRowIterator::~ObParquetTableRowIterator() for (int i = 0; i < column_readers_.count(); i++) { column_readers_.at(i) = NULL; } + file_prefetch_buffer_.destroy(); } int ObParquetTableRowIterator::init(const storage::ObTableScanParam *scan_param) { @@ -168,8 +169,8 @@ int ObParquetTableRowIterator::next_file() try { file_meta_.reset(); file_reader_.reset(); - std::shared_ptr cur_file = - std::make_shared(data_access_driver_, url_.ptr(), &arrow_alloc_); + std::shared_ptr cur_file = std::make_shared( + data_access_driver_, url_.ptr(), &arrow_alloc_, file_prefetch_buffer_); OZ (cur_file.get()->open()); if (OB_SUCC(ret)) { file_reader_ = parquet::ParquetFileReader::Open(cur_file, read_props_); @@ -264,19 +265,23 @@ int ObParquetTableRowIterator::next_row_group() } if (OB_SUCC(ret)) { int64_t cur_row_group = (state_.cur_row_group_idx_++) - 1; - try { - std::shared_ptr rg_reader = file_reader_->RowGroup(cur_row_group); - state_.cur_row_group_read_row_count_ = 0; - state_.cur_row_group_row_count_ = file_meta_->RowGroup(cur_row_group)->num_rows(); - for (int i = 0; OB_SUCC(ret) && i < column_indexs_.count(); i++) { - column_readers_.at(i) = rg_reader->Column(column_indexs_.at(i)); + if (OB_FAIL(prefetch_parquet_row_group(file_meta_->RowGroup(cur_row_group)))) { + LOG_WARN("failed to prefetch parquet row group", K(ret)); + } else { + try { + std::shared_ptr rg_reader = file_reader_->RowGroup(cur_row_group); + state_.cur_row_group_read_row_count_ = 0; + state_.cur_row_group_row_count_ = file_meta_->RowGroup(cur_row_group)->num_rows(); + for (int i = 0; OB_SUCC(ret) && i < column_indexs_.count(); i++) { + column_readers_.at(i) = rg_reader->Column(column_indexs_.at(i)); + } + } catch(const std::exception& e) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected index", K(ret), "Info", e.what(), K(cur_row_group), K(column_indexs_)); + } catch(...) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected index", K(ret), K(cur_row_group), K(column_indexs_)); } - } catch(const std::exception& e) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected index", K(ret), "Info", e.what(), K(cur_row_group), K(column_indexs_)); - } catch(...) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected index", K(ret), K(cur_row_group), K(column_indexs_)); } } return ret; @@ -1452,6 +1457,24 @@ int ObParquetTableRowIterator::get_next_row() void ObParquetTableRowIterator::reset() { // reset state_ to initial values for rescan state_.reuse(); + file_prefetch_buffer_.destroy(); +} + +int ObParquetTableRowIterator::prefetch_parquet_row_group( + std::unique_ptr row_group_meta) +{ + int ret = OB_SUCCESS; + int64_t select_col_cnt = column_exprs_.count(); + const double MIN_SELECTION_RATE_THRESHOLD = 0.8; + file_prefetch_buffer_.clear(); + if (select_col_cnt / row_group_meta->num_columns() >= MIN_SELECTION_RATE_THRESHOLD) { + if (OB_FAIL(file_prefetch_buffer_.prefetch(row_group_meta->file_offset(), + row_group_meta->total_compressed_size()))) { + LOG_WARN("failed to prefetch from parquet file", K(row_group_meta->file_offset()), + K(row_group_meta->total_compressed_size())); + } + } + return ret; } DEF_TO_STRING(ObParquetIteratorState) diff --git a/src/sql/engine/table/ob_parquet_table_row_iter.h b/src/sql/engine/table/ob_parquet_table_row_iter.h index 609c85fed9..c5cbf4d1d4 100644 --- a/src/sql/engine/table/ob_parquet_table_row_iter.h +++ b/src/sql/engine/table/ob_parquet_table_row_iter.h @@ -63,7 +63,8 @@ public: read_props_(&arrow_alloc_), file_column_exprs_(allocator_), file_meta_column_exprs_(allocator_), - bit_vector_cache_(NULL) {} + bit_vector_cache_(NULL), + file_prefetch_buffer_(data_access_driver_) {} virtual ~ObParquetTableRowIterator(); int init(const storage::ObTableScanParam *scan_param) override; @@ -147,6 +148,7 @@ private: int next_file(); int next_row_group(); int calc_pseudo_exprs(const int64_t read_count); + int prefetch_parquet_row_group(std::unique_ptr row_group_meta); private: ObParquetIteratorState state_; lib::ObMemAttr mem_attr_; @@ -168,6 +170,7 @@ private: common::ObArrayWrap rep_levels_buf_; common::ObArrayWrap file_url_ptrs_; //for file url expr common::ObArrayWrap file_url_lens_; //for file url expr + ObFilePrefetchBuffer file_prefetch_buffer_; }; } diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index 2bd89c96bc..cf654c018b 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -474,6 +474,7 @@ _parallel_max_active_sessions _parallel_min_message_pool _parallel_redo_logging_trigger _parallel_server_sleep_time +_parquet_row_group_prebuffer_size _partition_wise_plan_enabled _pdml_thread_cache_size _pipelined_table_function_memory_limit