[Optimize] Optimize cpu consumption when importing parquet files (#6782)

Remove part of dynamic_cast, reduce the overhead caused by type conversion,
and probably reduce the cpu consumption of parquet file import by about 10%
This commit is contained in:
Zhengguo Yang
2021-10-03 12:14:35 +08:00
committed by GitHub
parent fb7fc27a0a
commit 7297b275f1
10 changed files with 64 additions and 59 deletions

View File

@ -363,7 +363,7 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) {
}
std::vector<ExtLiteral> in_pred_values;
const InPredicate* pred = dynamic_cast<const InPredicate*>(conjunct);
const InPredicate* pred = static_cast<const InPredicate*>(conjunct);
const Expr* expr = Expr::expr_without_cast(pred->get_child(0));
if (expr->node_type() != TExprNodeType::SLOT_REF) {
return Status::InternalError("build disjuncts failed: node type is not slot ref");

View File

@ -471,7 +471,7 @@ bool EsScanNode::get_disjuncts(ExprContext* context, Expr* conjunct,
}
TExtInPredicate ext_in_predicate;
std::vector<TExtLiteral> in_pred_values;
InPredicate* pred = dynamic_cast<InPredicate*>(conjunct);
InPredicate* pred = static_cast<InPredicate*>(conjunct);
ext_in_predicate.__set_is_not_in(pred->is_not_in());
if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
return false;
@ -612,7 +612,8 @@ bool EsScanNode::to_ext_literal(PrimitiveType slot_type, void* value, TExtLitera
case TYPE_LARGEINT: {
node_type = (TExprNodeType::LARGE_INT_LITERAL);
TLargeIntLiteral large_int_literal;
large_int_literal.__set_value(LargeIntValue::to_string(*reinterpret_cast<__int128*>(value)));
large_int_literal.__set_value(
LargeIntValue::to_string(*reinterpret_cast<__int128*>(value)));
literal->__set_large_int_literal(large_int_literal);
break;
}

View File

@ -372,11 +372,11 @@ Status OlapScanNode::close(RuntimeState* state) {
_row_batch_added_cv.notify_all();
_scan_batch_added_cv.notify_all();
// _transfer_thread
// _transfer_thread may not be initialized. So need to check it
if (_transfer_thread != nullptr) {
_transfer_thread->join();
}
// _transfer_thread
// _transfer_thread may not be initialized. So need to check it
if (_transfer_thread != nullptr) {
_transfer_thread->join();
}
// clear some row batch in queue
for (auto row_batch : _materialized_row_batches) {
@ -621,7 +621,7 @@ Status OlapScanNode::normalize_conjuncts() {
Status OlapScanNode::build_olap_filters() {
for (auto& iter : _column_value_ranges) {
std::vector<TCondition> filters;
std::visit([&](auto &&range) { range.to_olap_filter(filters); }, iter.second);
std::visit([&](auto&& range) { range.to_olap_filter(filters); }, iter.second);
for (const auto& filter : filters) {
_olap_filter.push_back(std::move(filter));
@ -646,7 +646,9 @@ Status OlapScanNode::build_scan_key() {
break;
}
RETURN_IF_ERROR(std::visit([&](auto &&range) { return _scan_keys.extend_scan_key(range, _max_scan_key_num); }, iter->second));
RETURN_IF_ERROR(std::visit(
[&](auto&& range) { return _scan_keys.extend_scan_key(range, _max_scan_key_num); },
iter->second));
}
VLOG_CRITICAL << _scan_keys.debug_string();
@ -978,7 +980,7 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot,
// 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
if (TExprOpcode::FILTER_IN == _conjunct_ctxs[conj_idx]->root()->op()) {
InPredicate* pred = dynamic_cast<InPredicate*>(_conjunct_ctxs[conj_idx]->root());
InPredicate* pred = static_cast<InPredicate*>(_conjunct_ctxs[conj_idx]->root());
if (!should_push_down_in_predicate(slot, pred)) {
continue;
}
@ -1061,7 +1063,7 @@ Status OlapScanNode::normalize_not_in_and_not_eq_predicate(SlotDescriptor* slot,
for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
// 1. Normalize in conjuncts like 'where col not in (v1, v2, v3)'
if (TExprOpcode::FILTER_NOT_IN == _conjunct_ctxs[conj_idx]->root()->op()) {
InPredicate* pred = dynamic_cast<InPredicate*>(_conjunct_ctxs[conj_idx]->root());
InPredicate* pred = static_cast<InPredicate*>(_conjunct_ctxs[conj_idx]->root());
if (!should_push_down_in_predicate(slot, pred)) {
continue;
}

View File

@ -174,9 +174,9 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
bool* eof) {
if (_current_line_of_group >= _rows_of_group) { // read next row group
VLOG_DEBUG << "read_record_batch, current group id:" << _current_group
<< " current line of group:" << _current_line_of_group
<< " is larger than rows group size:" << _rows_of_group
<< ". start to read next row group";
<< " current line of group:" << _current_line_of_group
<< " is larger than rows group size:" << _rows_of_group
<< ". start to read next row group";
_current_group++;
if (_current_group >= _total_groups) { // read completed.
_parquet_column_ids.clear();
@ -199,9 +199,9 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
_current_line_of_batch = 0;
} else if (_current_line_of_batch >= _batch->num_rows()) {
VLOG_DEBUG << "read_record_batch, current group id:" << _current_group
<< " current line of batch:" << _current_line_of_batch
<< " is larger than batch size:" << _batch->num_rows()
<< ". start to read next batch";
<< " current line of batch:" << _current_line_of_batch
<< " is larger than batch size:" << _batch->num_rows()
<< ". start to read next batch";
arrow::Status status = _rb_batch->ReadNext(&_batch);
if (!status.ok()) {
return Status::InternalError("Read Batch Error With Libarrow.");
@ -213,7 +213,7 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array,
uint8_t* buf, int32_t* wbytes) {
const auto type = std::dynamic_pointer_cast<arrow::TimestampType>(ts_array->type());
const auto type = std::static_pointer_cast<arrow::TimestampType>(ts_array->type());
// Doris only supports seconds
int64_t timestamp = 0;
switch (type->unit()) {
@ -264,7 +264,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
switch (_parquet_column_type[i]) {
case arrow::Type::type::STRING: {
auto str_array =
std::dynamic_pointer_cast<arrow::StringArray>(_batch->column(column_index));
std::static_pointer_cast<arrow::StringArray>(_batch->column(column_index));
if (str_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -275,7 +275,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::INT32: {
auto int32_array =
std::dynamic_pointer_cast<arrow::Int32Array>(_batch->column(column_index));
std::static_pointer_cast<arrow::Int32Array>(_batch->column(column_index));
if (int32_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -287,7 +287,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::INT64: {
auto int64_array =
std::dynamic_pointer_cast<arrow::Int64Array>(_batch->column(column_index));
std::static_pointer_cast<arrow::Int64Array>(_batch->column(column_index));
if (int64_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -299,7 +299,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::UINT32: {
auto uint32_array =
std::dynamic_pointer_cast<arrow::UInt32Array>(_batch->column(column_index));
std::static_pointer_cast<arrow::UInt32Array>(_batch->column(column_index));
if (uint32_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -311,7 +311,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::UINT64: {
auto uint64_array =
std::dynamic_pointer_cast<arrow::UInt64Array>(_batch->column(column_index));
std::static_pointer_cast<arrow::UInt64Array>(_batch->column(column_index));
if (uint64_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -323,7 +323,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::BINARY: {
auto str_array =
std::dynamic_pointer_cast<arrow::BinaryArray>(_batch->column(column_index));
std::static_pointer_cast<arrow::BinaryArray>(_batch->column(column_index));
if (str_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -333,7 +333,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
break;
}
case arrow::Type::type::FIXED_SIZE_BINARY: {
auto fixed_array = std::dynamic_pointer_cast<arrow::FixedSizeBinaryArray>(
auto fixed_array = std::static_pointer_cast<arrow::FixedSizeBinaryArray>(
_batch->column(column_index));
if (fixed_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
@ -344,8 +344,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
break;
}
case arrow::Type::type::BOOL: {
auto boolean_array = std::dynamic_pointer_cast<arrow::BooleanArray>(
_batch->column(column_index));
auto boolean_array =
std::static_pointer_cast<arrow::BooleanArray>(_batch->column(column_index));
if (boolean_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -360,7 +360,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::UINT8: {
auto uint8_array =
std::dynamic_pointer_cast<arrow::UInt8Array>(_batch->column(column_index));
std::static_pointer_cast<arrow::UInt8Array>(_batch->column(column_index));
if (uint8_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -372,7 +372,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::INT8: {
auto int8_array =
std::dynamic_pointer_cast<arrow::Int8Array>(_batch->column(column_index));
std::static_pointer_cast<arrow::Int8Array>(_batch->column(column_index));
if (int8_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -384,7 +384,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::UINT16: {
auto uint16_array =
std::dynamic_pointer_cast<arrow::UInt16Array>(_batch->column(column_index));
std::static_pointer_cast<arrow::UInt16Array>(_batch->column(column_index));
if (uint16_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -396,7 +396,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::INT16: {
auto int16_array =
std::dynamic_pointer_cast<arrow::Int16Array>(_batch->column(column_index));
std::static_pointer_cast<arrow::Int16Array>(_batch->column(column_index));
if (int16_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -407,7 +407,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
break;
}
case arrow::Type::type::HALF_FLOAT: {
auto half_float_array = std::dynamic_pointer_cast<arrow::HalfFloatArray>(
auto half_float_array = std::static_pointer_cast<arrow::HalfFloatArray>(
_batch->column(column_index));
if (half_float_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
@ -420,7 +420,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::FLOAT: {
auto float_array =
std::dynamic_pointer_cast<arrow::FloatArray>(_batch->column(column_index));
std::static_pointer_cast<arrow::FloatArray>(_batch->column(column_index));
if (float_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -435,7 +435,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::DOUBLE: {
auto double_array =
std::dynamic_pointer_cast<arrow::DoubleArray>(_batch->column(column_index));
std::static_pointer_cast<arrow::DoubleArray>(_batch->column(column_index));
if (double_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -446,7 +446,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
break;
}
case arrow::Type::type::TIMESTAMP: {
auto ts_array = std::dynamic_pointer_cast<arrow::TimestampArray>(
auto ts_array = std::static_pointer_cast<arrow::TimestampArray>(
_batch->column(column_index));
if (ts_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
@ -458,8 +458,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
break;
}
case arrow::Type::type::DECIMAL: {
auto decimal_array = std::dynamic_pointer_cast<arrow::DecimalArray>(
_batch->column(column_index));
auto decimal_array =
std::static_pointer_cast<arrow::DecimalArray>(_batch->column(column_index));
if (decimal_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -471,7 +471,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::DATE32: {
auto ts_array =
std::dynamic_pointer_cast<arrow::Date32Array>(_batch->column(column_index));
std::static_pointer_cast<arrow::Date32Array>(_batch->column(column_index));
if (ts_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
@ -487,7 +487,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
case arrow::Type::type::DATE64: {
auto ts_array =
std::dynamic_pointer_cast<arrow::Date64Array>(_batch->column(column_index));
std::static_pointer_cast<arrow::Date64Array>(_batch->column(column_index));
if (ts_array->IsNull(_current_line_of_batch)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {

View File

@ -95,7 +95,7 @@ private:
std::shared_ptr<ParquetFile> _parquet;
// parquet file reader object
std::shared_ptr<::arrow::RecordBatchReader> _rb_batch;
std::unique_ptr<::arrow::RecordBatchReader> _rb_batch;
std::shared_ptr<arrow::RecordBatch> _batch;
std::unique_ptr<parquet::arrow::FileReader> _reader;
std::shared_ptr<parquet::FileMetaData> _file_metadata;

View File

@ -138,8 +138,7 @@ OLAPStatus CollectIterator::next(const RowCursor** row, bool* delete_flag) {
CollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader, Reader* reader)
: _rs_reader(rs_reader), _is_delete(rs_reader->delete_flag()), _reader(reader) {
auto* ans = dynamic_cast<BetaRowsetReader*>(rs_reader.get());
if (LIKELY(ans != nullptr)) {
if (LIKELY(rs_reader->type() == RowsetReader::BETA)) {
_refresh_current_row = &Level0Iterator::_refresh_current_row_v2;
} else {
_refresh_current_row = &Level0Iterator::_refresh_current_row_v1;
@ -149,13 +148,9 @@ CollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader,
CollectIterator::Level0Iterator::~Level0Iterator() {}
OLAPStatus CollectIterator::Level0Iterator::init() {
auto res = _row_cursor.init(_reader->_tablet->tablet_schema(), _reader->_seek_columns);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to init row cursor, res=" << res;
return res;
}
RETURN_NOT_OK((this->*_refresh_current_row)());
return OLAP_SUCCESS;
RETURN_NOT_OK_LOG(_row_cursor.init(_reader->_tablet->tablet_schema(), _reader->_seek_columns),
"failed to init row cursor");
return (this->*_refresh_current_row)();
}
const RowCursor* CollectIterator::Level0Iterator::current_row(bool* delete_flag) const {

View File

@ -75,6 +75,8 @@ public:
int64_t filtered_rows() override;
RowsetReaderType type() const override { return RowsetReaderType::ALPHA; }
private:
OLAPStatus _init_merge_ctxs(RowsetReaderContext* read_context);

View File

@ -53,6 +53,8 @@ public:
return _stats->rows_del_filtered + _stats->rows_conditions_filtered;
}
RowsetReaderType type() const override { return RowsetReaderType::BETA; }
private:
RowsetReaderContext* _context;
BetaRowsetSharedPtr _rowset;

View File

@ -32,6 +32,7 @@ using RowsetReaderSharedPtr = std::shared_ptr<RowsetReader>;
class RowsetReader {
public:
enum RowsetReaderType { ALPHA, BETA };
virtual ~RowsetReader() {}
// reader init
@ -53,6 +54,8 @@ public:
virtual RowsetSharedPtr rowset() = 0;
virtual int64_t filtered_rows() = 0;
virtual RowsetReaderType type() const = 0;
};
} // namespace doris

View File

@ -154,8 +154,9 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag
return PageIO::read_and_decompress_page(opts, handle, page_body, footer);
}
Status ColumnReader::get_row_ranges_by_zone_map(
CondColumn* cond_column, CondColumn* delete_condition, RowRanges* row_ranges) {
Status ColumnReader::get_row_ranges_by_zone_map(CondColumn* cond_column,
CondColumn* delete_condition,
RowRanges* row_ranges) {
RETURN_IF_ERROR(_ensure_index_loaded());
std::vector<uint32_t> page_indexes;
@ -211,9 +212,8 @@ bool ColumnReader::_zone_map_match_condition(const ZoneMapPB& zone_map,
return cond->eval({min_value_container, max_value_container});
}
Status ColumnReader::_get_filtered_pages(
CondColumn* cond_column, CondColumn* delete_condition,
std::vector<uint32_t>* page_indexes) {
Status ColumnReader::_get_filtered_pages(CondColumn* cond_column, CondColumn* delete_condition,
std::vector<uint32_t>* page_indexes) {
FieldType type = _type_info->type();
const std::vector<ZoneMapPB>& zone_maps = _zone_map_index->page_zone_maps();
int32_t page_size = _zone_map_index->num_pages();
@ -392,7 +392,7 @@ Status ArrayFileColumnIterator::init(const ColumnIteratorOptions& opts) {
Status ArrayFileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, bool* has_null) {
ColumnBlock* array_block = dst->column_block();
auto* array_batch = dynamic_cast<ArrayColumnVectorBatch*>(array_block->vector_batch());
auto* array_batch = static_cast<ArrayColumnVectorBatch*>(array_block->vector_batch());
// 1. read n offsets
ColumnBlock offset_block(array_batch->offsets(), nullptr);
@ -630,8 +630,8 @@ Status FileColumnIterator::get_row_ranges_by_zone_map(CondColumn* cond_column,
CondColumn* delete_condition,
RowRanges* row_ranges) {
if (_reader->has_zone_map()) {
RETURN_IF_ERROR(_reader->get_row_ranges_by_zone_map(
cond_column, delete_condition, row_ranges));
RETURN_IF_ERROR(
_reader->get_row_ranges_by_zone_map(cond_column, delete_condition, row_ranges));
}
return Status::OK();
}
@ -668,7 +668,7 @@ Status DefaultValueColumnIterator::init(const ColumnIteratorOptions& opts) {
} else if (_type_info->type() == OLAP_FIELD_TYPE_VARCHAR ||
_type_info->type() == OLAP_FIELD_TYPE_HLL ||
_type_info->type() == OLAP_FIELD_TYPE_OBJECT ||
_type_info->type() == OLAP_FIELD_TYPE_STRING) {
_type_info->type() == OLAP_FIELD_TYPE_STRING) {
int32_t length = _default_value.length();
char* string_buffer = reinterpret_cast<char*>(_pool->allocate(length));
memory_copy(string_buffer, _default_value.c_str(), length);