diff --git a/be/src/common/config.h b/be/src/common/config.h index 96ff2f7ea6..8d655185c2 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -245,7 +245,7 @@ CONF_Bool(enable_low_cardinality_optimize, "true"); CONF_mBool(disable_auto_compaction, "false"); // whether enable vectorized compaction CONF_Bool(enable_vectorized_compaction, "true"); -// whether enable vectorized schema change +// whether enable vectorized schema change, material-view or rollup task will fail if this config open. CONF_Bool(enable_vectorized_alter_table, "false"); // check the configuration of auto compaction in seconds when auto compaction disabled diff --git a/be/src/olap/column_mapping.h b/be/src/olap/column_mapping.h index de82705a91..a3ad7721d1 100644 --- a/be/src/olap/column_mapping.h +++ b/be/src/olap/column_mapping.h @@ -20,6 +20,8 @@ #include #include + +#include "olap/tablet_schema.h" namespace doris { class WrapperField; @@ -36,8 +38,9 @@ struct ColumnMapping { // materialize view transform function used in schema change std::string materialized_function; std::shared_ptr expr; + const TabletColumn* new_column; }; using SchemaMapping = std::vector; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index d790c8aee3..7243736281 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -148,7 +148,7 @@ Status ColumnReader::new_bitmap_index_iterator(BitmapIndexIterator** iterator) { Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp, PageHandle* handle, Slice* page_body, PageFooterPB* footer, - BlockCompressionCodec* codec) { + BlockCompressionCodec* codec) const { iter_opts.sanity_check(); PageReadOptions opts; opts.rblock = iter_opts.rblock; @@ -847,74 +847,70 @@ Status DefaultValueColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, b return Status::OK(); } -void DefaultValueColumnIterator::insert_default_data(vectorized::MutableColumnPtr& dst, size_t n) { +void DefaultValueColumnIterator::insert_default_data(const TypeInfo* type_info, size_t type_size, + void* mem_value, + vectorized::MutableColumnPtr& dst, size_t n) { vectorized::Int128 int128; char* data_ptr = (char*)&int128; size_t data_len = sizeof(int128); - auto insert_column_data = [&]() { - for (size_t i = 0; i < n; ++i) { - dst->insert_data(data_ptr, data_len); - } - }; - - switch (_type_info->type()) { + switch (type_info->type()) { case OLAP_FIELD_TYPE_OBJECT: case OLAP_FIELD_TYPE_HLL: { dst->insert_many_defaults(n); break; } case OLAP_FIELD_TYPE_DATE: { - assert(_type_size == sizeof(FieldTypeTraits::CppType)); //uint24_t - std::string str = FieldTypeTraits::to_string(_mem_value); + assert(type_size == sizeof(FieldTypeTraits::CppType)); //uint24_t + std::string str = FieldTypeTraits::to_string(mem_value); vectorized::VecDateTimeValue value; value.from_date_str(str.c_str(), str.length()); value.cast_to_date(); //TODO: here is int128 = int64, here rely on the logic of little endian int128 = binary_cast(value); - insert_column_data(); + dst->insert_many_data(data_ptr, data_len, n); break; } case OLAP_FIELD_TYPE_DATETIME: { - assert(_type_size == sizeof(FieldTypeTraits::CppType)); //int64_t - std::string str = FieldTypeTraits::to_string(_mem_value); + assert(type_size == sizeof(FieldTypeTraits::CppType)); //int64_t + std::string str = FieldTypeTraits::to_string(mem_value); vectorized::VecDateTimeValue value; value.from_date_str(str.c_str(), str.length()); value.to_datetime(); int128 = binary_cast(value); - insert_column_data(); + dst->insert_many_data(data_ptr, data_len, n); break; } case OLAP_FIELD_TYPE_DATEV2: { - assert(_type_size == sizeof(FieldTypeTraits::CppType)); //uint32_t + assert(type_size == sizeof(FieldTypeTraits::CppType)); //uint32_t - int128 = *((FieldTypeTraits::CppType*)_mem_value); - insert_column_data(); + int128 = *((FieldTypeTraits::CppType*)mem_value); + dst->insert_many_data(data_ptr, data_len, n); break; } case OLAP_FIELD_TYPE_DECIMAL: { - assert(_type_size == + assert(type_size == sizeof(FieldTypeTraits::CppType)); //decimal12_t - decimal12_t* d = (decimal12_t*)_mem_value; + decimal12_t* d = (decimal12_t*)mem_value; int128 = DecimalV2Value(d->integer, d->fraction).value(); - insert_column_data(); + dst->insert_many_data(data_ptr, data_len, n); break; } case OLAP_FIELD_TYPE_STRING: case OLAP_FIELD_TYPE_VARCHAR: case OLAP_FIELD_TYPE_CHAR: { - data_ptr = ((Slice*)_mem_value)->data; - data_len = ((Slice*)_mem_value)->size; - insert_column_data(); + data_ptr = ((Slice*)mem_value)->data; + data_len = ((Slice*)mem_value)->size; + dst->insert_many_data(data_ptr, data_len, n); break; } default: { - data_ptr = (char*)_mem_value; - data_len = _type_size; - insert_column_data(); + data_ptr = (char*)mem_value; + data_len = type_size; + dst->insert_many_data(data_ptr, data_len, n); } } } @@ -926,7 +922,7 @@ Status DefaultValueColumnIterator::next_batch(size_t* n, vectorized::MutableColu dst->insert_many_defaults(*n); } else { *has_null = false; - insert_default_data(dst, *n); + insert_default_data(_type_info.get(), _type_size, _mem_value, dst, *n); } return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index b82dbabb01..eb1536d69c 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -106,7 +106,7 @@ public: // read a page from file into a page handle Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp, PageHandle* handle, Slice* page_body, PageFooterPB* footer, - BlockCompressionCodec* codec); + BlockCompressionCodec* codec) const; bool is_nullable() const { return _meta.is_nullable(); } @@ -449,9 +449,10 @@ public: ordinal_t get_current_ordinal() const override { return _current_rowid; } -private: - void insert_default_data(vectorized::MutableColumnPtr& dst, size_t n); + static void insert_default_data(const TypeInfo* type_info, size_t type_size, void* mem_value, + vectorized::MutableColumnPtr& dst, size_t n); +private: bool _has_default_value; std::string _default_value; bool _is_nullable; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 94f20da9d5..83b48c0d5c 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -26,9 +26,10 @@ #include "olap/row.h" #include "olap/row_block.h" #include "olap/row_cursor.h" -#include "olap/rowset/rowset_id_generator.h" +#include "olap/rowset/segment_v2/column_reader.h" #include "olap/storage_engine.h" #include "olap/tablet.h" +#include "olap/types.h" #include "olap/wrapper_field.h" #include "runtime/mem_tracker.h" #include "util/defer_op.h" @@ -91,6 +92,154 @@ private: std::priority_queue _heap; }; +class MultiBlockMerger { +public: + MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {} + + Status merge(const std::vector>& blocks, + RowsetWriter* rowset_writer, uint64_t* merged_rows) { + int rows = 0; + for (auto& block : blocks) { + rows += block->rows(); + } + if (!rows) { + return Status::OK(); + } + + std::vector row_refs; + row_refs.reserve(rows); + for (auto& block : blocks) { + for (uint16_t i = 0; i < block->rows(); i++) { + row_refs.emplace_back(block.get(), i); + } + } + // TODO: try to use pdqsort to replace std::sort + // The block version is incremental. + std::stable_sort(row_refs.begin(), row_refs.end(), _cmp); + + auto finalized_block = _tablet->tablet_schema().create_block(); + int columns = finalized_block.columns(); + *merged_rows += rows; + + if (_tablet->keys_type() == KeysType::AGG_KEYS) { + auto tablet_schema = _tablet->tablet_schema(); + int key_number = _tablet->num_key_columns(); + + std::vector agg_functions; + std::vector agg_places; + + for (int i = key_number; i < columns; i++) { + vectorized::AggregateFunctionPtr function = + tablet_schema.column(i).get_aggregate_function( + {finalized_block.get_data_type(i)}, vectorized::AGG_LOAD_SUFFIX); + agg_functions.push_back(function); + // create aggregate data + vectorized::AggregateDataPtr place = new char[function->size_of_data()]; + function->create(place); + agg_places.push_back(place); + } + + for (int i = 0; i < rows; i++) { + auto row_ref = row_refs[i]; + + for (int j = key_number; j < columns; j++) { + auto column_ptr = row_ref.get_column(j).get(); + agg_functions[j - key_number]->add( + agg_places[j - key_number], + const_cast(&column_ptr), row_ref.position, + nullptr); + } + + if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) { + for (int j = 0; j < key_number; j++) { + finalized_block.get_by_position(j).column->assume_mutable()->insert_from( + *row_ref.get_column(j), row_ref.position); + } + + for (int j = key_number; j < columns; j++) { + agg_functions[j - key_number]->insert_result_into( + agg_places[j - key_number], + finalized_block.get_by_position(j).column->assume_mutable_ref()); + agg_functions[j - key_number]->create(agg_places[j - key_number]); + } + + if (i == rows - 1 || finalized_block.rows() == ALTER_TABLE_BATCH_SIZE) { + *merged_rows -= finalized_block.rows(); + rowset_writer->add_block(&finalized_block); + finalized_block.clear_column_data(); + } + } + } + + for (int i = 0; i < columns - key_number; i++) { + agg_functions[i]->destroy(agg_places[i]); + delete[] agg_places[i]; + } + } else { + std::vector pushed_row_refs; + if (_tablet->keys_type() == KeysType::DUP_KEYS) { + std::swap(pushed_row_refs, row_refs); + } else if (_tablet->keys_type() == KeysType::UNIQUE_KEYS) { + for (int i = 0; i < rows; i++) { + if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) { + pushed_row_refs.push_back(row_refs[i]); + } + } + } + + // update real inserted row number + rows = pushed_row_refs.size(); + *merged_rows -= rows; + + for (int i = 0; i < rows; i += ALTER_TABLE_BATCH_SIZE) { + int limit = std::min(ALTER_TABLE_BATCH_SIZE, rows - i); + + for (int idx = 0; idx < columns; idx++) { + auto column = finalized_block.get_by_position(idx).column->assume_mutable(); + + for (int j = 0; j < limit; j++) { + auto row_ref = pushed_row_refs[i + j]; + column->insert_from(*row_ref.get_column(idx), row_ref.position); + } + } + rowset_writer->add_block(&finalized_block); + finalized_block.clear_column_data(); + } + } + + RETURN_IF_ERROR(rowset_writer->flush()); + return Status::OK(); + } + +private: + struct RowRef { + RowRef(vectorized::Block* block_, uint16_t position_) + : block(block_), position(position_) {} + vectorized::ColumnPtr get_column(int index) const { + return block->get_by_position(index).column; + } + const vectorized::Block* block; + uint16_t position; + }; + + struct RowRefComparator { + RowRefComparator(TabletSharedPtr tablet) : _num_columns(tablet->num_key_columns()) {} + + int compare(const RowRef& lhs, const RowRef& rhs) const { + return lhs.block->compare_at(lhs.position, rhs.position, _num_columns, *rhs.block, -1); + } + + bool operator()(const RowRef& lhs, const RowRef& rhs) const { + return compare(lhs, rhs) < 0; + } + + const size_t _num_columns; + }; + + TabletSharedPtr _tablet; + RowRefComparator _cmp; +}; + RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, DescriptorTbl desc_tbl) : _desc_tbl(desc_tbl) { _schema_mapping.resize(tablet_schema.num_columns()); @@ -665,6 +814,128 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data #undef TYPE_REINTERPRET_CAST #undef ASSIGN_DEFAULT_VALUE +Status RowBlockChanger::change_block(vectorized::Block* ref_block, + vectorized::Block* new_block) const { + if (new_block->columns() != _schema_mapping.size()) { + LOG(WARNING) << "block does not match with schema mapping rules. " + << "block_schema_size=" << new_block->columns() + << ", mapping_schema_size=" << _schema_mapping.size(); + return Status::OLAPInternalError(OLAP_ERR_NOT_INITED); + } + + // material-view or rollup task will fail now + if (_desc_tbl.get_row_tuples().size() != ref_block->columns()) { + return Status::NotSupported( + "_desc_tbl.get_row_tuples().size() != ref_block->columns(), maybe because rollup " + "not supported now. "); + } + + std::vector nullable_tuples; + for (int i = 0; i < ref_block->columns(); i++) { + nullable_tuples.emplace_back(ref_block->get_by_position(i).column->is_nullable()); + } + + ObjectPool pool; + RuntimeState* state = pool.add(new RuntimeState()); + state->set_desc_tbl(&_desc_tbl); + RowDescriptor row_desc = RowDescriptor::create_default(_desc_tbl, nullable_tuples); + + const int row_size = ref_block->rows(); + const int column_size = new_block->columns(); + + // swap ref_block[key] and new_block[value] + std::map swap_idx_map; + + for (int idx = 0; idx < column_size; idx++) { + int ref_idx = _schema_mapping[idx].ref_column; + + if (!_schema_mapping[idx].materialized_function.empty()) { + return Status::NotSupported("Materialized function not supported now. "); + } + + if (ref_idx < 0) { + // new column, write default value + auto value = _schema_mapping[idx].default_value; + auto column = new_block->get_by_position(idx).column->assume_mutable(); + if (value->is_null()) { + DCHECK(column->is_nullable()); + column->insert_many_defaults(row_size); + } else { + auto type_info = get_type_info(_schema_mapping[idx].new_column); + DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(), + value->ptr(), column, row_size); + } + } else if (_schema_mapping[idx].expr != nullptr) { + // calculate special materialized function, to_bitmap/hll_hash/count_field or cast expr + vectorized::VExprContext* ctx = nullptr; + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_tree(&pool, *_schema_mapping[idx].expr, &ctx)); + + RETURN_IF_ERROR(ctx->prepare(state, row_desc)); + RETURN_IF_ERROR(ctx->open(state)); + + int result_column_id = -1; + RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id)); + DCHECK(ref_block->get_by_position(result_column_id).column->size() == row_size) + << new_block->get_by_position(idx).name << " size invalid" + << ", expect=" << row_size + << ", real=" << ref_block->get_by_position(result_column_id).column->size(); + + if (_schema_mapping[idx].expr->nodes[0].node_type == TExprNodeType::CAST_EXPR) { + RETURN_IF_ERROR( + _check_cast_valid(ref_block->get_by_position(ref_idx).column, + ref_block->get_by_position(result_column_id).column)); + } + swap_idx_map[result_column_id] = idx; + + ctx->close(state); + } else { + // same type, just swap column + swap_idx_map[ref_idx] = idx; + } + } + + for (auto it : swap_idx_map) { + new_block->get_by_position(it.second).column.swap( + ref_block->get_by_position(it.first).column); + } + + return Status::OK(); +} + +Status RowBlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column, + vectorized::ColumnPtr new_column) const { + // TODO: rethink this check + // This check is to prevent schema-change from causing data loss, + // But it is possible to generate null data in material-view or rollup. + + if (ref_column->is_nullable() != new_column->is_nullable()) { + return Status::DataQualityError("column.is_nullable() is changed!"); + } + + if (ref_column->is_nullable()) { + auto* ref_null_map = + vectorized::check_and_get_column(ref_column) + ->get_null_map_column() + .get_data() + .data(); + auto* new_null_map = + vectorized::check_and_get_column(new_column) + ->get_null_map_column() + .get_data() + .data(); + + bool is_changed = false; + for (size_t i = 0; i < ref_column->size(); i++) { + is_changed |= (ref_null_map[i] != new_null_map[i]); + } + if (is_changed) { + return Status::DataQualityError("is_null of data is changed!"); + } + } + return Status::OK(); +} + RowBlockSorter::RowBlockSorter(RowBlockAllocator* row_block_allocator) : _row_block_allocator(row_block_allocator), _swap_row_block(nullptr) {} @@ -1050,6 +1321,34 @@ Status SchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader, return res; } +Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader, + RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, + TabletSharedPtr base_tablet) { + auto new_block = + std::make_unique(new_tablet->tablet_schema().create_block()); + auto ref_block = + std::make_unique(base_tablet->tablet_schema().create_block()); + + int origin_columns_size = ref_block->columns(); + + rowset_reader->next_block(ref_block.get()); + while (ref_block->rows()) { + RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get())); + RETURN_IF_ERROR(rowset_writer->add_block(new_block.get())); + + new_block->clear_column_data(); + ref_block->clear_column_data(origin_columns_size); + rowset_reader->next_block(ref_block.get()); + } + + if (!rowset_writer->flush()) { + return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR); + } + + return Status::OK(); +} + SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer, size_t memory_limitation) : _row_block_changer(row_block_changer), @@ -1062,6 +1361,18 @@ SchemaChangeWithSorting::~SchemaChangeWithSorting() { SAFE_DELETE(_row_block_allocator); } +VSchemaChangeWithSorting::VSchemaChangeWithSorting(const RowBlockChanger& row_block_changer, + size_t memory_limitation) + : _changer(row_block_changer), + _memory_limitation(memory_limitation), + _temp_delta_versions(Version::mock()) { + _mem_tracker = MemTracker::create_tracker( + config::memory_limitation_per_thread_for_schema_change_bytes, + fmt::format("VSchemaChangeWithSorting:changer={}", + std::to_string(int64(&row_block_changer))), + StorageEngine::instance()->schema_change_mem_tracker(), MemTrackerLevel::TASK); +} + Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, TabletSharedPtr new_tablet, @@ -1219,6 +1530,89 @@ Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_read return res; } +Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader, + RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, + TabletSharedPtr base_tablet) { + // for internal sorting + std::vector> blocks; + + // for external sorting + // src_rowsets to store the rowset generated by internal sorting + std::vector src_rowsets; + + Defer defer {[&]() { + // remove the intermediate rowsets generated by internal sorting + for (auto& row_set : src_rowsets) { + StorageEngine::instance()->add_unused_rowset(row_set); + } + }}; + + RowsetSharedPtr rowset = rowset_reader->rowset(); + SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap(); + _temp_delta_versions.first = _temp_delta_versions.second; + + auto new_block = + std::make_unique(new_tablet->tablet_schema().create_block()); + auto ref_block = + std::make_unique(base_tablet->tablet_schema().create_block()); + + int origin_columns_size = ref_block->columns(); + + auto create_rowset = [&]() -> Status { + if (blocks.empty()) { + return Status::OK(); + } + + RowsetSharedPtr rowset; + RETURN_IF_ERROR(_internal_sorting( + blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second), + new_tablet, BETA_ROWSET, segments_overlap, &rowset)); + src_rowsets.push_back(rowset); + + for (auto& block : blocks) { + _mem_tracker->release(block->allocated_bytes()); + } + blocks.clear(); + + // increase temp version + _temp_delta_versions.second++; + return Status::OK(); + }; + + rowset_reader->next_block(ref_block.get()); + while (ref_block->rows()) { + RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get())); + if (!_mem_tracker->try_consume(new_block->allocated_bytes())) { + RETURN_IF_ERROR(create_rowset()); + + if (!_mem_tracker->try_consume(new_block->allocated_bytes())) { + LOG(WARNING) << "Memory limitation is too small for Schema Change." + << "memory_limitation=" << _memory_limitation; + return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR); + } + } + + // move unique ptr + blocks.push_back( + std::make_unique(new_tablet->tablet_schema().create_block())); + swap(blocks.back(), new_block); + + ref_block->clear_column_data(origin_columns_size); + rowset_reader->next_block(ref_block.get()); + } + + RETURN_IF_ERROR(create_rowset()); + + if (src_rowsets.empty()) { + RETURN_IF_ERROR(rowset_writer->flush()); + } else { + RETURN_IF_ERROR(_external_sorting(src_rowsets, rowset_writer, new_tablet)); + } + + return Status::OK(); +} + bool SchemaChangeWithSorting::_internal_sorting(const std::vector& row_block_arr, const Version& version, TabletSharedPtr new_tablet, SegmentsOverlapPB segments_overlap, @@ -1247,6 +1641,29 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector& ro return true; } +Status VSchemaChangeWithSorting::_internal_sorting( + const std::vector>& blocks, const Version& version, + TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type, + SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset) { + uint64_t merged_rows = 0; + MultiBlockMerger merger(new_tablet); + + std::unique_ptr rowset_writer; + RETURN_IF_ERROR( + new_tablet->create_rowset_writer(version, VISIBLE, segments_overlap, &rowset_writer)); + + Defer defer {[&]() { + new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + + rowset_writer->rowset_id().to_string()); + }}; + + RETURN_IF_ERROR(merger.merge(blocks, rowset_writer.get(), &merged_rows)); + + _add_merged_rows(merged_rows); + *rowset = rowset_writer->build(); + return Status::OK(); +} + bool SchemaChangeWithSorting::_external_sorting(vector& src_rowsets, RowsetWriter* rowset_writer, TabletSharedPtr new_tablet) { @@ -1275,6 +1692,25 @@ bool SchemaChangeWithSorting::_external_sorting(vector& src_row return true; } +Status VSchemaChangeWithSorting::_external_sorting(vector& src_rowsets, + RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet) { + std::vector rs_readers; + for (auto& rowset : src_rowsets) { + RowsetReaderSharedPtr rs_reader; + RETURN_IF_ERROR(rowset->create_reader(&rs_reader)); + rs_readers.push_back(rs_reader); + } + + Merger::Statistics stats; + RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, READER_ALTER_TABLE, rs_readers, + rowset_writer, &stats)); + + _add_merged_rows(stats.merged_rows); + _add_filtered_rows(stats.filtered_rows); + return Status::OK(); +} + Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) { LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id << ", new_tablet_id=" << request.new_tablet_id diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 2f820ae79e..71507fd7fb 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -52,7 +52,12 @@ public: Status change_row_block(const RowBlock* ref_block, int32_t data_version, RowBlock* mutable_block, uint64_t* filtered_rows) const; + Status change_block(vectorized::Block* ref_block, vectorized::Block* new_block) const; + private: + Status _check_cast_valid(vectorized::ColumnPtr ref_column, + vectorized::ColumnPtr new_column) const; + // @brief column-mapping specification of new schema SchemaMapping _schema_mapping; @@ -180,6 +185,17 @@ private: DISALLOW_COPY_AND_ASSIGN(SchemaChangeDirectly); }; +class VSchemaChangeDirectly : public SchemaChange { +public: + VSchemaChangeDirectly(const RowBlockChanger& row_block_changer) : _changer(row_block_changer) {} + +private: + Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override; + + const RowBlockChanger& _changer; +}; + // @breif schema change with sorting class SchemaChangeWithSorting : public SchemaChange { public: @@ -206,6 +222,29 @@ private: DISALLOW_COPY_AND_ASSIGN(SchemaChangeWithSorting); }; +class VSchemaChangeWithSorting : public SchemaChange { +public: + VSchemaChangeWithSorting(const RowBlockChanger& row_block_changer, size_t memory_limitation); + ~VSchemaChangeWithSorting() override = default; + +private: + Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override; + + Status _internal_sorting(const std::vector>& blocks, + const Version& temp_delta_versions, TabletSharedPtr new_tablet, + RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap, + RowsetSharedPtr* rowset); + + Status _external_sorting(std::vector& src_rowsets, RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet); + + const RowBlockChanger& _changer; + size_t _memory_limitation; + Version _temp_delta_versions; + std::shared_ptr _mem_tracker; +}; + class SchemaChangeHandler { public: static Status schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, @@ -218,12 +257,23 @@ public: static std::unique_ptr get_sc_procedure(const RowBlockChanger& rb_changer, bool sc_sorting, bool sc_directly) { if (sc_sorting) { - return std::make_unique( - rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes); + if (config::enable_vectorized_alter_table) { + return std::make_unique( + rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes); + } else { + return std::make_unique( + rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes); + } } + if (sc_directly) { - return std::make_unique(rb_changer); + if (config::enable_vectorized_alter_table) { + return std::make_unique(rb_changer); + } else { + return std::make_unique(rb_changer); + } } + return std::make_unique(rb_changer); }