From 573e5476ddcdaab943371529f2a3279082d746b2 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Wed, 31 Aug 2022 16:23:36 +0800 Subject: [PATCH] [Opt](load) Speed up the vectorized load (#12146) * [Opt](load) Speed up the vectorized load --- be/src/exec/base_scanner.cpp | 33 +++++++++++------ be/src/exec/base_scanner.h | 1 + be/src/exec/text_converter.hpp | 22 ++++++------ be/src/vec/core/block.cpp | 5 ++- be/src/vec/core/block.h | 2 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 4 +-- be/src/vec/exec/vbroker_scanner.cpp | 15 ++------ be/src/vec/functions/function_cast.h | 40 +++++++++++---------- be/src/vec/functions/function_helpers.cpp | 33 +++++++++++++++++ be/src/vec/functions/function_helpers.h | 7 ++++ be/test/vec/exec/vbroker_scan_node_test.cpp | 8 ++--- be/test/vec/exec/vbroker_scanner_test.cpp | 6 ++-- 12 files changed, 112 insertions(+), 64 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 5808d35021..b6e64e57b9 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -304,6 +304,7 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { size_t rows = _src_block.rows(); auto filter_column = vectorized::ColumnUInt8::create(rows, 1); auto& filter_map = filter_column->get_data(); + auto origin_column_num = _src_block.columns(); for (auto slot_desc : _dest_tuple_desc->slots()) { if (!slot_desc->is_materialized()) { @@ -315,7 +316,12 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { int result_column_id = -1; // PT1 => dest primitive type RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id)); - auto column_ptr = _src_block.get_by_position(result_column_id).column; + bool is_origin_column = result_column_id < origin_column_num; + auto column_ptr = + is_origin_column && _src_block_mem_reuse + ? _src_block.get_by_position(result_column_id).column->clone_resized(rows) + : _src_block.get_by_position(result_column_id).column; + DCHECK(column_ptr != nullptr); // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr @@ -373,7 +379,11 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { } // after do the dest block insert operation, clear _src_block to remove the reference of origin column - _src_block.clear(); + if (_src_block_mem_reuse) { + _src_block.clear_column_data(origin_column_num); + } else { + _src_block.clear(); + } size_t dest_size = dest_block->columns(); // do filter @@ -389,15 +399,18 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) { // TODO: opt the reuse of src_block or dest_block column. some case we have to // shallow copy the column of src_block to dest block Status BaseScanner::_init_src_block() { - DCHECK(_src_block.columns() == 0); - for (auto i = 0; i < _num_of_columns_from_file; ++i) { - SlotDescriptor* slot_desc = _src_slot_descs[i]; - if (slot_desc == nullptr) { - continue; + if (_src_block.is_empty_column()) { + for (auto i = 0; i < _num_of_columns_from_file; ++i) { + SlotDescriptor* slot_desc = _src_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto data_type = slot_desc->get_data_type_ptr(); + auto column_ptr = data_type->create_column(); + column_ptr->reserve(_state->batch_size()); + _src_block.insert(vectorized::ColumnWithTypeAndName(std::move(column_ptr), data_type, + slot_desc->col_name())); } - auto data_type = slot_desc->get_data_type_ptr(); - _src_block.insert(vectorized::ColumnWithTypeAndName( - data_type->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name())); } return Status::OK(); diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index b59fa113a2..37406cd440 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -142,6 +142,7 @@ protected: std::vector _dest_vexpr_ctx; std::unique_ptr _vpre_filter_ctx_ptr; vectorized::Block _src_block; + bool _src_block_mem_reuse = false; int _num_of_columns_from_file; // slot_ids for parquet predicate push down are in tuple desc diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp index 8baa90b6d5..5db043dc48 100644 --- a/be/src/exec/text_converter.hpp +++ b/be/src/exec/text_converter.hpp @@ -169,19 +169,17 @@ inline bool TextConverter::write_slot(const SlotDescriptor* slot_desc, Tuple* tu inline void TextConverter::write_string_column(const SlotDescriptor* slot_desc, vectorized::MutableColumnPtr* column_ptr, const char* data, size_t len) { - vectorized::IColumn* col_ptr = column_ptr->get(); - // \N means it's NULL - if (LIKELY(slot_desc->is_nullable())) { - auto* nullable_column = reinterpret_cast(column_ptr->get()); - if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) { - nullable_column->insert_data(nullptr, 0); - return; - } else { - nullable_column->get_null_map_data().push_back(0); - col_ptr = &nullable_column->get_nested_column(); - } + DCHECK(column_ptr->get()->is_nullable()); + auto* nullable_column = reinterpret_cast(column_ptr->get()); + if (len == 2 && data[0] == '\\' && data[1] == 'N') { + nullable_column->get_null_map_data().push_back(1); + reinterpret_cast(nullable_column->get_nested_column()) + .insert_default(); + } else { + nullable_column->get_null_map_data().push_back(0); + reinterpret_cast(nullable_column->get_nested_column()) + .insert_data(data, len); } - reinterpret_cast(col_ptr)->insert_data(data, len); } inline bool TextConverter::write_column(const SlotDescriptor* slot_desc, diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 5c30a78a3a..18b63a0975 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -918,11 +918,14 @@ void Block::deep_copy_slot(void* dst, MemPool* pool, const doris::TypeDescriptor } } -MutableBlock::MutableBlock(const std::vector& tuple_descs) { +MutableBlock::MutableBlock(const std::vector& tuple_descs, int reserve_size) { for (auto tuple_desc : tuple_descs) { for (auto slot_desc : tuple_desc->slots()) { _data_types.emplace_back(slot_desc->get_data_type_ptr()); _columns.emplace_back(_data_types.back()->create_column()); + if (reserve_size != 0) { + _columns.back()->reserve(reserve_size); + } } } } diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 1d7ffb6a22..5fe90fdf6d 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -375,7 +375,7 @@ public: MutableBlock() = default; ~MutableBlock() = default; - MutableBlock(const std::vector& tuple_descs); + MutableBlock(const std::vector& tuple_descs, int reserve_size = 0); MutableBlock(Block* block) : _columns(block->mutate_columns()), _data_types(block->get_data_types()) {} diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 0b25890a50..7b080d18af 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -175,8 +175,8 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.delete_predicates.begin())); // Merge the columns in delete predicate that not in latest schema in to current tablet schema - for (auto& del_pred_rs : _tablet_reader_params.delete_predicates) { - _tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_rs->version())); + for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) { + _tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_pb->version())); } // Range diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp index 41d0dcac1d..01a9f3255e 100644 --- a/be/src/vec/exec/vbroker_scanner.cpp +++ b/be/src/vec/exec/vbroker_scanner.cpp @@ -42,6 +42,7 @@ VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile, : BrokerScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter) { _text_converter.reset(new (std::nothrow) TextConverter('\\')); + _src_block_mem_reuse = true; } VBrokerScanner::~VBrokerScanner() = default; @@ -81,6 +82,7 @@ Status VBrokerScanner::get_next(Block* output_block, bool* eof) { } } } + columns.clear(); return _fill_dest_block(output_block, eof); } @@ -88,7 +90,7 @@ Status VBrokerScanner::get_next(Block* output_block, bool* eof) { Status VBrokerScanner::_fill_dest_columns(const Slice& line, std::vector& columns) { RETURN_IF_ERROR(_line_split_to_values(line)); - if (!_success) { + if (UNLIKELY(!_success)) { // If not success, which means we met an invalid row, return. return Status::OK(); } @@ -98,19 +100,8 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line, int dest_index = idx++; auto src_slot_desc = _src_slot_descs[i]; - if (!src_slot_desc->is_materialized()) { - continue; - } const Slice& value = _split_values[i]; - if (is_null(value)) { - // nullable - auto* nullable_column = - reinterpret_cast(columns[dest_index].get()); - nullable_column->insert_default(); - continue; - } - _text_converter->write_string_column(src_slot_desc, &columns[dest_index], value.data, value.size); } diff --git a/be/src/vec/functions/function_cast.h b/be/src/vec/functions/function_cast.h index 0ed17714b4..9a34a7b1b5 100644 --- a/be/src/vec/functions/function_cast.h +++ b/be/src/vec/functions/function_cast.h @@ -1280,31 +1280,33 @@ private: const auto& nested_type = nullable_type.get_nested_type(); Block tmp_block; - if (source_is_nullable) - tmp_block = create_block_with_nested_columns(block, arguments); - else + if (source_is_nullable) { + tmp_block = create_block_with_nested_columns_only_args(block, arguments); + size_t tmp_res_index = tmp_block.columns(); + tmp_block.insert({nullptr, nested_type, ""}); + + /// Perform the requested conversion. + RETURN_IF_ERROR( + wrapper(context, tmp_block, {0}, tmp_res_index, input_rows_count)); + + const auto& tmp_res = tmp_block.get_by_position(tmp_res_index); + + res.column = wrap_in_nullable( + tmp_res.column, Block({block.get_by_position(arguments[0]), tmp_res}), + {0}, 1, input_rows_count); + } else { tmp_block = block; - size_t tmp_res_index = block.columns(); - tmp_block.insert({nullptr, nested_type, ""}); + size_t tmp_res_index = block.columns(); + tmp_block.insert({nullptr, nested_type, ""}); - /// Perform the requested conversion. - RETURN_IF_ERROR( - wrapper(context, tmp_block, arguments, tmp_res_index, input_rows_count)); + /// Perform the requested conversion. + RETURN_IF_ERROR(wrapper(context, tmp_block, arguments, tmp_res_index, + input_rows_count)); - const auto& tmp_res = tmp_block.get_by_position(tmp_res_index); - - /// May happen in fuzzy tests. For debug purpose. - if (!tmp_res.column.get()) { - return Status::RuntimeError( - "Couldn't convert {} to {} in prepare_remove_nullable wrapper.", - block.get_by_position(arguments[0]).type->get_name(), - nested_type->get_name()); + res.column = tmp_block.get_by_position(tmp_res_index).column; } - res.column = wrap_in_nullable(tmp_res.column, - Block({block.get_by_position(arguments[0]), tmp_res}), - {0}, 1, input_rows_count); return Status::OK(); }; } else if (source_is_nullable) { diff --git a/be/src/vec/functions/function_helpers.cpp b/be/src/vec/functions/function_helpers.cpp index 0e98d88ee0..fcfdd4b3a2 100644 --- a/be/src/vec/functions/function_helpers.cpp +++ b/be/src/vec/functions/function_helpers.cpp @@ -26,6 +26,39 @@ namespace doris::vectorized { +Block create_block_with_nested_columns_only_args(const Block& block, const ColumnNumbers& args) { + std::set args_set(args.begin(), args.end()); + Block res; + + for (auto i : args_set) { + const auto& col = block.get_by_position(i); + + if (col.type->is_nullable()) { + const DataTypePtr& nested_type = + static_cast(*col.type).get_nested_type(); + + if (!col.column) { + res.insert({nullptr, nested_type, col.name}); + } else if (auto* nullable = check_and_get_column(*col.column)) { + const auto& nested_col = nullable->get_nested_column_ptr(); + res.insert({nested_col, nested_type, col.name}); + } else if (auto* const_column = check_and_get_column(*col.column)) { + const auto& nested_col = + check_and_get_column(const_column->get_data_column()) + ->get_nested_column_ptr(); + res.insert({ColumnConst::create(nested_col, col.column->size()), nested_type, + col.name}); + } else { + LOG(FATAL) << "Illegal column for DataTypeNullable"; + } + } else { + res.insert(col); + } + } + + return res; +} + static Block create_block_with_nested_columns_impl(const Block& block, const std::unordered_set& args) { Block res; diff --git a/be/src/vec/functions/function_helpers.h b/be/src/vec/functions/function_helpers.h index ac6601b06e..e1c1b192bb 100644 --- a/be/src/vec/functions/function_helpers.h +++ b/be/src/vec/functions/function_helpers.h @@ -95,6 +95,13 @@ Block create_block_with_nested_columns(const Block& block, const ColumnNumbers& Block create_block_with_nested_columns(const Block& block, const ColumnNumbers& args, size_t result); +/// Returns the copy of a given block in only args column specified in +/// the "arguments" parameter is replaced with its respective nested +/// column if it is nullable. +/// TODO: the old funciton `create_block_with_nested_columns` have perfermance problem, replace all +/// by the function and delete old one. +Block create_block_with_nested_columns_only_args(const Block& block, const ColumnNumbers& args); + /// Checks argument type at specified index with predicate. /// throws if there is no argument at specified index or if predicate returns false. void validate_argument_type(const IFunction& func, const DataTypes& arguments, diff --git a/be/test/vec/exec/vbroker_scan_node_test.cpp b/be/test/vec/exec/vbroker_scan_node_test.cpp index 6cff164b10..195f468841 100644 --- a/be/test/vec/exec/vbroker_scan_node_test.cpp +++ b/be/test/vec/exec/vbroker_scan_node_test.cpp @@ -227,7 +227,7 @@ void VBrokerScanNodeTest::init_desc_table() { slot_desc.columnPos = 0; slot_desc.byteOffset = 0; slot_desc.nullIndicatorByte = 0; - slot_desc.nullIndicatorBit = -1; + slot_desc.nullIndicatorBit = 0; slot_desc.colName = "k1"; slot_desc.slotIdx = 1; slot_desc.isMaterialized = true; @@ -254,7 +254,7 @@ void VBrokerScanNodeTest::init_desc_table() { slot_desc.columnPos = 1; slot_desc.byteOffset = 16; slot_desc.nullIndicatorByte = 0; - slot_desc.nullIndicatorBit = -1; + slot_desc.nullIndicatorBit = 1; slot_desc.colName = "k2"; slot_desc.slotIdx = 2; slot_desc.isMaterialized = true; @@ -281,7 +281,7 @@ void VBrokerScanNodeTest::init_desc_table() { slot_desc.columnPos = 2; slot_desc.byteOffset = 32; slot_desc.nullIndicatorByte = 0; - slot_desc.nullIndicatorBit = -1; + slot_desc.nullIndicatorBit = 2; slot_desc.colName = "k3"; slot_desc.slotIdx = 3; slot_desc.isMaterialized = true; @@ -308,7 +308,7 @@ void VBrokerScanNodeTest::init_desc_table() { slot_desc.columnPos = 3; slot_desc.byteOffset = 48; slot_desc.nullIndicatorByte = 0; - slot_desc.nullIndicatorBit = -1; + slot_desc.nullIndicatorBit = 3; slot_desc.colName = "k4"; slot_desc.slotIdx = 4; slot_desc.isMaterialized = true; diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp index 7824f10c30..5cb9afc4b2 100644 --- a/be/test/vec/exec/vbroker_scanner_test.cpp +++ b/be/test/vec/exec/vbroker_scanner_test.cpp @@ -209,7 +209,7 @@ void VBrokerScannerTest::init_desc_table() { slot_desc.columnPos = 0; slot_desc.byteOffset = 0; slot_desc.nullIndicatorByte = 0; - slot_desc.nullIndicatorBit = -1; + slot_desc.nullIndicatorBit = 0; slot_desc.colName = "k1"; slot_desc.slotIdx = 1; slot_desc.isMaterialized = true; @@ -236,7 +236,7 @@ void VBrokerScannerTest::init_desc_table() { slot_desc.columnPos = 1; slot_desc.byteOffset = 16; slot_desc.nullIndicatorByte = 0; - slot_desc.nullIndicatorBit = -1; + slot_desc.nullIndicatorBit = 1; slot_desc.colName = "k2"; slot_desc.slotIdx = 2; slot_desc.isMaterialized = true; @@ -263,7 +263,7 @@ void VBrokerScannerTest::init_desc_table() { slot_desc.columnPos = 2; slot_desc.byteOffset = 32; slot_desc.nullIndicatorByte = 0; - slot_desc.nullIndicatorBit = -1; + slot_desc.nullIndicatorBit = 2; slot_desc.colName = "k3"; slot_desc.slotIdx = 3; slot_desc.isMaterialized = true;