From 7297b275f13b021daf9be00e7bcdabc452b6cb4b Mon Sep 17 00:00:00 2001 From: Zhengguo Yang Date: Sun, 3 Oct 2021 12:14:35 +0800 Subject: [PATCH] [Optimize] Optimize cpu consumption when importing parquet files (#6782) Remove part of dynamic_cast, reduce the overhead caused by type conversion, and probably reduce the cpu consumption of parquet file import by about 10% --- be/src/exec/es/es_predicate.cpp | 2 +- be/src/exec/es_scan_node.cpp | 5 +- be/src/exec/olap_scan_node.cpp | 20 ++++--- be/src/exec/parquet_reader.cpp | 56 +++++++++---------- be/src/exec/parquet_reader.h | 2 +- be/src/olap/collect_iterator.cpp | 13 ++--- be/src/olap/rowset/alpha_rowset_reader.h | 2 + be/src/olap/rowset/beta_rowset_reader.h | 2 + be/src/olap/rowset/rowset_reader.h | 3 + .../olap/rowset/segment_v2/column_reader.cpp | 18 +++--- 10 files changed, 64 insertions(+), 59 deletions(-) diff --git a/be/src/exec/es/es_predicate.cpp b/be/src/exec/es/es_predicate.cpp index 72eb6faa3f..170c0e9cc1 100644 --- a/be/src/exec/es/es_predicate.cpp +++ b/be/src/exec/es/es_predicate.cpp @@ -363,7 +363,7 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) { } std::vector in_pred_values; - const InPredicate* pred = dynamic_cast(conjunct); + const InPredicate* pred = static_cast(conjunct); const Expr* expr = Expr::expr_without_cast(pred->get_child(0)); if (expr->node_type() != TExprNodeType::SLOT_REF) { return Status::InternalError("build disjuncts failed: node type is not slot ref"); diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp index 563bb25b07..b40ad099f6 100644 --- a/be/src/exec/es_scan_node.cpp +++ b/be/src/exec/es_scan_node.cpp @@ -471,7 +471,7 @@ bool EsScanNode::get_disjuncts(ExprContext* context, Expr* conjunct, } TExtInPredicate ext_in_predicate; std::vector in_pred_values; - InPredicate* pred = dynamic_cast(conjunct); + InPredicate* pred = static_cast(conjunct); ext_in_predicate.__set_is_not_in(pred->is_not_in()); if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) { return false; @@ -612,7 +612,8 @@ bool EsScanNode::to_ext_literal(PrimitiveType slot_type, void* value, TExtLitera case TYPE_LARGEINT: { node_type = (TExprNodeType::LARGE_INT_LITERAL); TLargeIntLiteral large_int_literal; - large_int_literal.__set_value(LargeIntValue::to_string(*reinterpret_cast<__int128*>(value))); + large_int_literal.__set_value( + LargeIntValue::to_string(*reinterpret_cast<__int128*>(value))); literal->__set_large_int_literal(large_int_literal); break; } diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 2871b29dee..3dc69024be 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -372,11 +372,11 @@ Status OlapScanNode::close(RuntimeState* state) { _row_batch_added_cv.notify_all(); _scan_batch_added_cv.notify_all(); - // _transfer_thread - // _transfer_thread may not be initialized. So need to check it - if (_transfer_thread != nullptr) { - _transfer_thread->join(); - } + // _transfer_thread + // _transfer_thread may not be initialized. So need to check it + if (_transfer_thread != nullptr) { + _transfer_thread->join(); + } // clear some row batch in queue for (auto row_batch : _materialized_row_batches) { @@ -621,7 +621,7 @@ Status OlapScanNode::normalize_conjuncts() { Status OlapScanNode::build_olap_filters() { for (auto& iter : _column_value_ranges) { std::vector filters; - std::visit([&](auto &&range) { range.to_olap_filter(filters); }, iter.second); + std::visit([&](auto&& range) { range.to_olap_filter(filters); }, iter.second); for (const auto& filter : filters) { _olap_filter.push_back(std::move(filter)); @@ -646,7 +646,9 @@ Status OlapScanNode::build_scan_key() { break; } - RETURN_IF_ERROR(std::visit([&](auto &&range) { return _scan_keys.extend_scan_key(range, _max_scan_key_num); }, iter->second)); + RETURN_IF_ERROR(std::visit( + [&](auto&& range) { return _scan_keys.extend_scan_key(range, _max_scan_key_num); }, + iter->second)); } VLOG_CRITICAL << _scan_keys.debug_string(); @@ -978,7 +980,7 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' if (TExprOpcode::FILTER_IN == _conjunct_ctxs[conj_idx]->root()->op()) { - InPredicate* pred = dynamic_cast(_conjunct_ctxs[conj_idx]->root()); + InPredicate* pred = static_cast(_conjunct_ctxs[conj_idx]->root()); if (!should_push_down_in_predicate(slot, pred)) { continue; } @@ -1061,7 +1063,7 @@ Status OlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot, for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { // 1. Normalize in conjuncts like 'where col not in (v1, v2, v3)' if (TExprOpcode::FILTER_NOT_IN == _conjunct_ctxs[conj_idx]->root()->op()) { - InPredicate* pred = dynamic_cast(_conjunct_ctxs[conj_idx]->root()); + InPredicate* pred = static_cast(_conjunct_ctxs[conj_idx]->root()); if (!should_push_down_in_predicate(slot, pred)) { continue; } diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index e84b833a52..f994c343f6 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -174,9 +174,9 @@ Status ParquetReaderWrap::read_record_batch(const std::vector& bool* eof) { if (_current_line_of_group >= _rows_of_group) { // read next row group VLOG_DEBUG << "read_record_batch, current group id:" << _current_group - << " current line of group:" << _current_line_of_group - << " is larger than rows group size:" << _rows_of_group - << ". start to read next row group"; + << " current line of group:" << _current_line_of_group + << " is larger than rows group size:" << _rows_of_group + << ". start to read next row group"; _current_group++; if (_current_group >= _total_groups) { // read completed. _parquet_column_ids.clear(); @@ -199,9 +199,9 @@ Status ParquetReaderWrap::read_record_batch(const std::vector& _current_line_of_batch = 0; } else if (_current_line_of_batch >= _batch->num_rows()) { VLOG_DEBUG << "read_record_batch, current group id:" << _current_group - << " current line of batch:" << _current_line_of_batch - << " is larger than batch size:" << _batch->num_rows() - << ". start to read next batch"; + << " current line of batch:" << _current_line_of_batch + << " is larger than batch size:" << _batch->num_rows() + << ". start to read next batch"; arrow::Status status = _rb_batch->ReadNext(&_batch); if (!status.ok()) { return Status::InternalError("Read Batch Error With Libarrow."); @@ -213,7 +213,7 @@ Status ParquetReaderWrap::read_record_batch(const std::vector& Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr& ts_array, uint8_t* buf, int32_t* wbytes) { - const auto type = std::dynamic_pointer_cast(ts_array->type()); + const auto type = std::static_pointer_cast(ts_array->type()); // Doris only supports seconds int64_t timestamp = 0; switch (type->unit()) { @@ -264,7 +264,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& switch (_parquet_column_type[i]) { case arrow::Type::type::STRING: { auto str_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (str_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -275,7 +275,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::INT32: { auto int32_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (int32_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -287,7 +287,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::INT64: { auto int64_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (int64_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -299,7 +299,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::UINT32: { auto uint32_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (uint32_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -311,7 +311,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::UINT64: { auto uint64_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (uint64_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -323,7 +323,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::BINARY: { auto str_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (str_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -333,7 +333,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& break; } case arrow::Type::type::FIXED_SIZE_BINARY: { - auto fixed_array = std::dynamic_pointer_cast( + auto fixed_array = std::static_pointer_cast( _batch->column(column_index)); if (fixed_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); @@ -344,8 +344,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& break; } case arrow::Type::type::BOOL: { - auto boolean_array = std::dynamic_pointer_cast( - _batch->column(column_index)); + auto boolean_array = + std::static_pointer_cast(_batch->column(column_index)); if (boolean_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -360,7 +360,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::UINT8: { auto uint8_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (uint8_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -372,7 +372,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::INT8: { auto int8_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (int8_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -384,7 +384,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::UINT16: { auto uint16_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (uint16_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -396,7 +396,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::INT16: { auto int16_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (int16_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -407,7 +407,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& break; } case arrow::Type::type::HALF_FLOAT: { - auto half_float_array = std::dynamic_pointer_cast( + auto half_float_array = std::static_pointer_cast( _batch->column(column_index)); if (half_float_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); @@ -420,7 +420,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::FLOAT: { auto float_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (float_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -435,7 +435,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::DOUBLE: { auto double_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (double_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -446,7 +446,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& break; } case arrow::Type::type::TIMESTAMP: { - auto ts_array = std::dynamic_pointer_cast( + auto ts_array = std::static_pointer_cast( _batch->column(column_index)); if (ts_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); @@ -458,8 +458,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& break; } case arrow::Type::type::DECIMAL: { - auto decimal_array = std::dynamic_pointer_cast( - _batch->column(column_index)); + auto decimal_array = + std::static_pointer_cast(_batch->column(column_index)); if (decimal_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -471,7 +471,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::DATE32: { auto ts_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (ts_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { @@ -487,7 +487,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } case arrow::Type::type::DATE64: { auto ts_array = - std::dynamic_pointer_cast(_batch->column(column_index)); + std::static_pointer_cast(_batch->column(column_index)); if (ts_array->IsNull(_current_line_of_batch)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index e5366e00da..7f253a0b9e 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -95,7 +95,7 @@ private: std::shared_ptr _parquet; // parquet file reader object - std::shared_ptr<::arrow::RecordBatchReader> _rb_batch; + std::unique_ptr<::arrow::RecordBatchReader> _rb_batch; std::shared_ptr _batch; std::unique_ptr _reader; std::shared_ptr _file_metadata; diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp index 9e4b799eee..4eb6b1cb79 100644 --- a/be/src/olap/collect_iterator.cpp +++ b/be/src/olap/collect_iterator.cpp @@ -138,8 +138,7 @@ OLAPStatus CollectIterator::next(const RowCursor** row, bool* delete_flag) { CollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader, Reader* reader) : _rs_reader(rs_reader), _is_delete(rs_reader->delete_flag()), _reader(reader) { - auto* ans = dynamic_cast(rs_reader.get()); - if (LIKELY(ans != nullptr)) { + if (LIKELY(rs_reader->type() == RowsetReader::BETA)) { _refresh_current_row = &Level0Iterator::_refresh_current_row_v2; } else { _refresh_current_row = &Level0Iterator::_refresh_current_row_v1; @@ -149,13 +148,9 @@ CollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader, CollectIterator::Level0Iterator::~Level0Iterator() {} OLAPStatus CollectIterator::Level0Iterator::init() { - auto res = _row_cursor.init(_reader->_tablet->tablet_schema(), _reader->_seek_columns); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to init row cursor, res=" << res; - return res; - } - RETURN_NOT_OK((this->*_refresh_current_row)()); - return OLAP_SUCCESS; + RETURN_NOT_OK_LOG(_row_cursor.init(_reader->_tablet->tablet_schema(), _reader->_seek_columns), + "failed to init row cursor"); + return (this->*_refresh_current_row)(); } const RowCursor* CollectIterator::Level0Iterator::current_row(bool* delete_flag) const { diff --git a/be/src/olap/rowset/alpha_rowset_reader.h b/be/src/olap/rowset/alpha_rowset_reader.h index 1667449dcf..959a90bc88 100644 --- a/be/src/olap/rowset/alpha_rowset_reader.h +++ b/be/src/olap/rowset/alpha_rowset_reader.h @@ -75,6 +75,8 @@ public: int64_t filtered_rows() override; + RowsetReaderType type() const override { return RowsetReaderType::ALPHA; } + private: OLAPStatus _init_merge_ctxs(RowsetReaderContext* read_context); diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index 2e3f157ccd..f073a6540e 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -53,6 +53,8 @@ public: return _stats->rows_del_filtered + _stats->rows_conditions_filtered; } + RowsetReaderType type() const override { return RowsetReaderType::BETA; } + private: RowsetReaderContext* _context; BetaRowsetSharedPtr _rowset; diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index 94e0c03d1c..b9e46d1118 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -32,6 +32,7 @@ using RowsetReaderSharedPtr = std::shared_ptr; class RowsetReader { public: + enum RowsetReaderType { ALPHA, BETA }; virtual ~RowsetReader() {} // reader init @@ -53,6 +54,8 @@ public: virtual RowsetSharedPtr rowset() = 0; virtual int64_t filtered_rows() = 0; + + virtual RowsetReaderType type() const = 0; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index 4c26e88a28..b00c443d47 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -154,8 +154,9 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag return PageIO::read_and_decompress_page(opts, handle, page_body, footer); } -Status ColumnReader::get_row_ranges_by_zone_map( - CondColumn* cond_column, CondColumn* delete_condition, RowRanges* row_ranges) { +Status ColumnReader::get_row_ranges_by_zone_map(CondColumn* cond_column, + CondColumn* delete_condition, + RowRanges* row_ranges) { RETURN_IF_ERROR(_ensure_index_loaded()); std::vector page_indexes; @@ -211,9 +212,8 @@ bool ColumnReader::_zone_map_match_condition(const ZoneMapPB& zone_map, return cond->eval({min_value_container, max_value_container}); } -Status ColumnReader::_get_filtered_pages( - CondColumn* cond_column, CondColumn* delete_condition, - std::vector* page_indexes) { +Status ColumnReader::_get_filtered_pages(CondColumn* cond_column, CondColumn* delete_condition, + std::vector* page_indexes) { FieldType type = _type_info->type(); const std::vector& zone_maps = _zone_map_index->page_zone_maps(); int32_t page_size = _zone_map_index->num_pages(); @@ -392,7 +392,7 @@ Status ArrayFileColumnIterator::init(const ColumnIteratorOptions& opts) { Status ArrayFileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, bool* has_null) { ColumnBlock* array_block = dst->column_block(); - auto* array_batch = dynamic_cast(array_block->vector_batch()); + auto* array_batch = static_cast(array_block->vector_batch()); // 1. read n offsets ColumnBlock offset_block(array_batch->offsets(), nullptr); @@ -630,8 +630,8 @@ Status FileColumnIterator::get_row_ranges_by_zone_map(CondColumn* cond_column, CondColumn* delete_condition, RowRanges* row_ranges) { if (_reader->has_zone_map()) { - RETURN_IF_ERROR(_reader->get_row_ranges_by_zone_map( - cond_column, delete_condition, row_ranges)); + RETURN_IF_ERROR( + _reader->get_row_ranges_by_zone_map(cond_column, delete_condition, row_ranges)); } return Status::OK(); } @@ -668,7 +668,7 @@ Status DefaultValueColumnIterator::init(const ColumnIteratorOptions& opts) { } else if (_type_info->type() == OLAP_FIELD_TYPE_VARCHAR || _type_info->type() == OLAP_FIELD_TYPE_HLL || _type_info->type() == OLAP_FIELD_TYPE_OBJECT || - _type_info->type() == OLAP_FIELD_TYPE_STRING) { + _type_info->type() == OLAP_FIELD_TYPE_STRING) { int32_t length = _default_value.length(); char* string_buffer = reinterpret_cast(_pool->allocate(length)); memory_copy(string_buffer, _default_value.c_str(), length);