diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 8d96f68bb6..a11a7a2af9 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -53,6 +53,7 @@ #include "olap/segment_loader.h" #include "olap/storage_engine.h" #include "olap/tablet.h" +#include "olap/tablet_fwd.h" #include "olap/tablet_manager.h" #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" @@ -424,7 +425,8 @@ Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column, Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, TabletSharedPtr new_tablet, TabletSharedPtr base_tablet, - TabletSchemaSPtr base_tablet_schema) { + TabletSchemaSPtr base_tablet_schema, + TabletSchemaSPtr new_tablet_schema) { // In some cases, there may be more than one type of rowset in a tablet, // in which case the conversion cannot be done directly by linked schema change, // but requires direct schema change to rewrite the data. @@ -433,8 +435,8 @@ Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWr << " in base tablet is not same as type " << rowset_writer->type() << ", use direct schema change."; return SchemaChangeHandler::get_sc_procedure(_changer, false, true) - ->process(rowset_reader, rowset_writer, new_tablet, base_tablet, - base_tablet_schema); + ->process(rowset_reader, rowset_writer, new_tablet, base_tablet, base_tablet_schema, + new_tablet_schema); } else { Status status = rowset_writer->add_rowset_for_linked_schema_change(rowset_reader->rowset()); if (!status) { @@ -467,10 +469,10 @@ Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWr Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, TabletSharedPtr new_tablet, - TabletSchemaSPtr base_tablet_schema) { + TabletSchemaSPtr base_tablet_schema, + TabletSchemaSPtr new_tablet_schema) { do { - auto new_block = - vectorized::Block::create_unique(new_tablet->tablet_schema()->create_block()); + auto new_block = vectorized::Block::create_unique(new_tablet_schema->create_block()); auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block()); auto st = rowset_reader->next_block(ref_block.get()); @@ -501,7 +503,8 @@ VSchemaChangeWithSorting::VSchemaChangeWithSorting(const BlockChanger& changer, Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, TabletSharedPtr new_tablet, - TabletSchemaSPtr base_tablet_schema) { + TabletSchemaSPtr base_tablet_schema, + TabletSchemaSPtr new_tablet_schema) { // for internal sorting std::vector> blocks; @@ -529,7 +532,8 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea auto [rowset, guard] = DORIS_TRY(_internal_sorting( blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second), - newest_write_timestamp, new_tablet, BETA_ROWSET, segments_overlap)); + newest_write_timestamp, new_tablet, BETA_ROWSET, segments_overlap, + new_tablet_schema)); src_rowsets.push_back(std::move(rowset)); pending_rs_guards.push_back(std::move(guard)); for (auto& block : blocks) { @@ -542,7 +546,7 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea return Status::OK(); }; - auto new_block = vectorized::Block::create_unique(new_tablet->tablet_schema()->create_block()); + auto new_block = vectorized::Block::create_unique(new_tablet_schema->create_block()); do { auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block()); @@ -573,8 +577,7 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea _mem_tracker->consume(new_block->allocated_bytes()); // move unique ptr - blocks.push_back( - vectorized::Block::create_unique(new_tablet->tablet_schema()->create_block())); + blocks.push_back(vectorized::Block::create_unique(new_tablet_schema->create_block())); swap(blocks.back(), new_block); } while (true); @@ -583,7 +586,8 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea if (src_rowsets.empty()) { RETURN_IF_ERROR(rowset_writer->flush()); } else { - RETURN_IF_ERROR(_external_sorting(src_rowsets, rowset_writer, new_tablet)); + RETURN_IF_ERROR( + _external_sorting(src_rowsets, rowset_writer, new_tablet, new_tablet_schema)); } return Status::OK(); @@ -592,15 +596,15 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea Result> VSchemaChangeWithSorting::_internal_sorting( const std::vector>& blocks, const Version& version, int64_t newest_write_timestamp, TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type, - SegmentsOverlapPB segments_overlap) { + SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) { uint64_t merged_rows = 0; MultiBlockMerger merger(new_tablet); RowsetWriterContext context; context.version = version; context.rowset_state = VISIBLE; context.segments_overlap = segments_overlap; - context.tablet_schema = new_tablet->tablet_schema(); - context.original_tablet_schema = new_tablet->tablet_schema(); + context.tablet_schema = new_tablet_schema; + context.original_tablet_schema = new_tablet_schema; context.newest_write_timestamp = newest_write_timestamp; context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE; std::unique_ptr rowset_writer; @@ -621,7 +625,8 @@ Result> VSchemaChangeWithSorting: Status VSchemaChangeWithSorting::_external_sorting(vector& src_rowsets, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet) { + TabletSharedPtr new_tablet, + TabletSchemaSPtr new_tablet_schema) { std::vector rs_readers; for (auto& rowset : src_rowsets) { RowsetReaderSharedPtr rs_reader; @@ -631,8 +636,7 @@ Status VSchemaChangeWithSorting::_external_sorting(vector& src_ Merger::Statistics stats; RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, ReaderType::READER_ALTER_TABLE, - new_tablet->tablet_schema(), rs_readers, rowset_writer, - &stats)); + new_tablet_schema, rs_readers, rowset_writer, &stats)); _add_merged_rows(stats.merged_rows); _add_filtered_rows(stats.filtered_rows); return Status::OK(); @@ -871,6 +875,11 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl)); sc_params.base_tablet = base_tablet; sc_params.new_tablet = new_tablet; + // During a schema change, the extracted columns of a variant should not be included in the tablet schema. + // This is because the schema change for a variant needs to ignore the extracted columns. + // Otherwise, the schema types in different rowsets might be inconsistent. When performing a schema change, + // the complete variant is constructed by reading all the sub-columns of the variant. + sc_params.new_tablet_schema = new_tablet->tablet_schema()->copy_without_extracted_columns(); sc_params.ref_rowset_readers.reserve(rs_splits.size()); for (RowSetSplits& split : rs_splits) { sc_params.ref_rowset_readers.emplace_back(split.rs_reader); @@ -1045,7 +1054,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams // Add filter information in change, and filter column information will be set in _parse_request // And filter some data every time the row block changes - BlockChanger changer(sc_params.new_tablet->tablet_schema(), *sc_params.desc_tbl); + BlockChanger changer(sc_params.new_tablet_schema, *sc_params.desc_tbl); bool sc_sorting = false; bool sc_directly = false; @@ -1104,8 +1113,8 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams context.version = rs_reader->version(); context.rowset_state = VISIBLE; context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap(); - context.tablet_schema = new_tablet->tablet_schema(); - context.original_tablet_schema = new_tablet->tablet_schema(); + context.tablet_schema = sc_params.new_tablet_schema; + context.original_tablet_schema = sc_params.new_tablet_schema; context.newest_write_timestamp = rs_reader->newest_write_timestamp(); context.fs = rs_reader->rowset()->rowset_meta()->fs(); context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE; @@ -1119,7 +1128,8 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams auto pending_rs_guard = StorageEngine::instance()->add_pending_rowset(context); if (res = sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet, - sc_params.base_tablet, sc_params.base_tablet_schema); + sc_params.base_tablet, sc_params.base_tablet_schema, + sc_params.new_tablet_schema); !res) { LOG(WARNING) << "failed to process the version." << " version=" << rs_reader->version().first << "-" @@ -1184,10 +1194,11 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params, *sc_directly = true; } + TabletSchemaSPtr new_tablet_schema = sc_params.new_tablet_schema; + // set column mapping - for (int i = 0, new_schema_size = new_tablet->tablet_schema()->num_columns(); - i < new_schema_size; ++i) { - const TabletColumn& new_column = new_tablet->tablet_schema()->column(i); + for (int i = 0, new_schema_size = new_tablet_schema->num_columns(); i < new_schema_size; ++i) { + const TabletColumn& new_column = new_tablet_schema->column(i); const std::string& column_name_lower = to_lower(new_column.name()); ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i); column_mapping->new_column = &new_column; @@ -1264,7 +1275,6 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params, } } - TabletSchemaSPtr new_tablet_schema = new_tablet->tablet_schema(); if (base_tablet_schema->keys_type() != new_tablet_schema->keys_type()) { // only when base table is dup and mv is agg // the rollup job must be reagg. diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index f11f6d2d81..925b77ba51 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -106,7 +106,8 @@ public: virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, TabletSharedPtr new_tablet, TabletSharedPtr base_tablet, - TabletSchemaSPtr base_tablet_schema) { + TabletSchemaSPtr base_tablet_schema, + TabletSchemaSPtr new_tablet_schema) { if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) { RETURN_IF_ERROR(rowset_writer->flush()); return Status::OK(); @@ -115,8 +116,8 @@ public: _filtered_rows = 0; _merged_rows = 0; - RETURN_IF_ERROR( - _inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet_schema)); + RETURN_IF_ERROR(_inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet_schema, + new_tablet_schema)); _add_filtered_rows(rowset_reader->filtered_rows()); // Check row num changes @@ -140,7 +141,8 @@ protected: void _add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; } virtual Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) { + TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema, + TabletSchemaSPtr new_tablet_schema) { return Status::NotSupported("inner process unsupported."); } @@ -169,7 +171,8 @@ public: Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, TabletSharedPtr new_tablet, TabletSharedPtr base_tablet, - TabletSchemaSPtr base_tablet_schema) override; + TabletSchemaSPtr base_tablet_schema, + TabletSchemaSPtr new_tablet_schema) override; private: const BlockChanger& _changer; @@ -182,7 +185,8 @@ public: private: Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) override; + TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema, + TabletSchemaSPtr new_tablet_schema) override; bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const override { return _changer.has_where() || SchemaChange::_check_row_nums(reader, writer); @@ -199,16 +203,17 @@ public: private: Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) override; + TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema, + TabletSchemaSPtr new_tablet_schema) override; Result> _internal_sorting( const std::vector>& blocks, const Version& temp_delta_versions, int64_t newest_write_timestamp, TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type, - SegmentsOverlapPB segments_overlap); + SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema); Status _external_sorting(std::vector& src_rowsets, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet); + TabletSharedPtr new_tablet, TabletSchemaSPtr new_tablet_schema); bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const override { return _changer.has_where() || SchemaChange::_check_row_nums(reader, writer); @@ -257,6 +262,7 @@ private: TabletSharedPtr base_tablet; TabletSharedPtr new_tablet; TabletSchemaSPtr base_tablet_schema = nullptr; + TabletSchemaSPtr new_tablet_schema = nullptr; std::vector ref_rowset_readers; DeleteHandler* delete_handler = nullptr; std::unordered_map materialized_params_map; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 9346c3573a..e8f9d5d52b 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -25,6 +25,7 @@ #include // IWYU pragma: no_include #include // IWYU pragma: keep +#include #include #include "common/compiler_util.h" // IWYU pragma: keep @@ -840,7 +841,7 @@ void TabletSchema::clear_columns() { _cols.clear(); } -void TabletSchema::init_from_pb(const TabletSchemaPB& schema) { +void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns) { SCOPED_MEM_COUNT_BY_HOOK(&_mem_size); _keys_type = schema.keys_type(); _num_columns = 0; @@ -858,6 +859,9 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) { for (auto& column_pb : schema.column()) { TabletColumn column; column.init_from_pb(column_pb); + if (ignore_extracted_columns && column.is_extracted_column()) { + continue; + } if (column.is_key()) { _num_key_columns++; } @@ -1009,6 +1013,14 @@ void TabletSchema::merge_dropped_columns(const TabletSchema& src_schema) { } } +TabletSchemaSPtr TabletSchema::copy_without_extracted_columns() { + TabletSchemaSPtr copy = std::make_shared(); + TabletSchemaPB tablet_schema_pb; + this->to_schema_pb(&tablet_schema_pb); + copy->init_from_pb(tablet_schema_pb, true /*ignore extracted_columns*/); + return copy; +} + // Dropped column is in _field_id_to_index but not in _field_name_to_index // Could refer to append_column method bool TabletSchema::is_dropped_column(const TabletColumn& col) const { diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 059e38154f..072bebd95a 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -233,7 +233,7 @@ public: // manually init members incorrectly, and define a new function like // void create_from_pb(const TabletSchemaPB& schema, TabletSchema* tablet_schema). TabletSchema() = default; - void init_from_pb(const TabletSchemaPB& schema); + void init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns = false); void to_schema_pb(TabletSchemaPB* tablet_meta_pb) const; void append_column(TabletColumn column, ColumnType col_type = ColumnType::NORMAL); void append_index(TabletIndex index); @@ -365,6 +365,8 @@ public: vectorized::Block create_block_by_cids(const std::vector& cids); + std::shared_ptr copy_without_extracted_columns(); + private: friend bool operator==(const TabletSchema& a, const TabletSchema& b); friend bool operator!=(const TabletSchema& a, const TabletSchema& b); diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index d0fbb287a1..1290ddb237 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -404,7 +404,7 @@ Status get_least_common_schema(const std::vector& schemas, Status parse_and_encode_variant_columns(Block& block, const std::vector& variant_pos) { try { // Parse each variant column from raw string column - vectorized::schema_util::parse_variant_columns(block, variant_pos); + RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_pos)); vectorized::schema_util::finalize_variant_columns(block, variant_pos, false /*not ingore sparse*/); vectorized::schema_util::encode_variant_sparse_subcolumns(block, variant_pos); @@ -416,7 +416,7 @@ Status parse_and_encode_variant_columns(Block& block, const std::vector& va return Status::OK(); } -void parse_variant_columns(Block& block, const std::vector& variant_pos) { +Status parse_variant_columns(Block& block, const std::vector& variant_pos) { for (int i = 0; i < variant_pos.size(); ++i) { const auto& column_ref = block.get_by_position(variant_pos[i]).column; bool is_nullable = column_ref->is_nullable(); @@ -426,14 +426,23 @@ void parse_variant_columns(Block& block, const std::vector& variant_pos) { // already parsed continue; } - const auto& root = *var.get_root(); - const auto& raw_json_column = - root.is_nullable() - ? static_cast( - static_cast(root).get_nested_column()) - : static_cast(root); + ColumnPtr raw_json_column; + if (WhichDataType(remove_nullable(var.get_root_type())).is_json()) { + // TODO more efficient way to parse jsonb type, currently we just convert jsonb to + // json str and parse them into variant + RETURN_IF_ERROR(cast_column({var.get_root(), var.get_root_type(), ""}, + std::make_shared(), &raw_json_column)); + } else { + const auto& root = *var.get_root(); + raw_json_column = + root.is_nullable() + ? static_cast(root).get_nested_column_ptr() + : var.get_root(); + } + MutableColumnPtr variant_column = ColumnObject::create(true); - parse_json_to_variant(*variant_column.get(), raw_json_column); + parse_json_to_variant(*variant_column.get(), + assert_cast(*raw_json_column)); // Wrap variant with nullmap if it is nullable ColumnPtr result = variant_column->get_ptr(); if (is_nullable) { @@ -444,6 +453,7 @@ void parse_variant_columns(Block& block, const std::vector& variant_pos) { block.get_by_position(variant_pos[i]).column = result; // block.get_by_position(variant_pos[i]).type = std::make_shared("json", true); } + return Status::OK(); } void finalize_variant_columns(Block& block, const std::vector& variant_pos, diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index de5778157d..43972e0788 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -87,7 +87,7 @@ TabletColumn get_least_type_column(const TabletColumn& original, const DataTypeP // 2. finalize variant column to each subcolumn least commn types, default ignore sparse sub columns // 2. encode sparse sub columns Status parse_and_encode_variant_columns(Block& block, const std::vector& variant_pos); -void parse_variant_columns(Block& block, const std::vector& variant_pos); +Status parse_variant_columns(Block& block, const std::vector& variant_pos); void finalize_variant_columns(Block& block, const std::vector& variant_pos, bool ignore_sparse = true); void encode_variant_sparse_subcolumns(Block& block, const std::vector& variant_pos); diff --git a/be/src/vec/functions/function_cast.h b/be/src/vec/functions/function_cast.h index f80cf30d0d..301c5d3725 100644 --- a/be/src/vec/functions/function_cast.h +++ b/be/src/vec/functions/function_cast.h @@ -2074,7 +2074,7 @@ private: if (variant.is_scalar_variant()) { ColumnPtr nested = variant.get_root(); auto nested_from_type = variant.get_root_type(); - DCHECK(nested_from_type->is_nullable()); + // DCHECK(nested_from_type->is_nullable()); DCHECK(!data_type_to->is_nullable()); auto new_context = context->clone(); new_context->set_jsonb_string_as_string(true); diff --git a/regression-test/data/variant_p0/schema_change.out b/regression-test/data/variant_p0/schema_change/schema_change.out similarity index 100% rename from regression-test/data/variant_p0/schema_change.out rename to regression-test/data/variant_p0/schema_change/schema_change.out diff --git a/regression-test/data/variant_p0/schema_change/test_alter_table_column_with_delete.out b/regression-test/data/variant_p0/schema_change/test_alter_table_column_with_delete.out new file mode 100644 index 0000000000..a23f0b96c4 --- /dev/null +++ b/regression-test/data/variant_p0/schema_change/test_alter_table_column_with_delete.out @@ -0,0 +1,12 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 {"c":3,"b":2,"a":1} 1.0 +3 {"g":7,"h":8,"i":9} 3.0 +4 {"k":11,"l":12,"j":10} 4.0 + +-- !sql -- +1 {"c":3,"b":2,"a":1} 1 +2 {"y":5,"x":4,"f":3} 2 +3 {"g":7,"h":8,"i":9} 3 +4 {"k":11,"l":12,"j":10} 4 + diff --git a/regression-test/data/variant_p0/schema_change/test_double_write_when_schema_change.out b/regression-test/data/variant_p0/schema_change/test_double_write_when_schema_change.out new file mode 100644 index 0000000000..1a504fcc8c --- /dev/null +++ b/regression-test/data/variant_p0/schema_change/test_double_write_when_schema_change.out @@ -0,0 +1,37 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +PushEvent 2489368070 2015-01-01T00:00:00Z +PushEvent 2489395767 2015-01-01T01:00:00Z +PushEvent 2489418154 2015-01-01T02:00:00Z +PullRequestEvent 2489436569 2015-01-01T03:00:01Z +PushEvent 2489368072 2015-01-01T00:00:00Z +PushEvent 2489395768 2015-01-01T01:00:00Z +PushEvent 2489418155 2015-01-01T02:00:00Z +GollumEvent 2489436580 2015-01-01T03:00:02Z +CreateEvent 2489368089 2015-01-01T00:00:01Z +PullRequestEvent 2489395770 2015-01-01T01:00:02Z + +-- !sql -- +PushEvent 2489368070 2015-01-01T00:00:00Z +PushEvent 2489395767 2015-01-01T01:00:00Z +PushEvent 2489418154 2015-01-01T02:00:00Z +PullRequestEvent 2489436569 2015-01-01T03:00:01Z +PushEvent 2489368072 2015-01-01T00:00:00Z +PushEvent 2489395768 2015-01-01T01:00:00Z +PushEvent 2489418155 2015-01-01T02:00:00Z +GollumEvent 2489436580 2015-01-01T03:00:02Z +CreateEvent 2489368089 2015-01-01T00:00:01Z +PullRequestEvent 2489395770 2015-01-01T01:00:02Z + +-- !sql -- +PushEvent 2489368070 2015-01-01T00:00:00Z +PushEvent 2489395767 2015-01-01T01:00:00Z +PushEvent 2489418154 2015-01-01T02:00:00Z +PullRequestEvent 2489436569 2015-01-01T03:00:01Z +PushEvent 2489368072 2015-01-01T00:00:00Z +PushEvent 2489395768 2015-01-01T01:00:00Z +PushEvent 2489418155 2015-01-01T02:00:00Z +GollumEvent 2489436580 2015-01-01T03:00:02Z +CreateEvent 2489368089 2015-01-01T00:00:01Z +PullRequestEvent 2489395770 2015-01-01T01:00:02Z + diff --git a/regression-test/suites/variant_p0/schema_change.groovy b/regression-test/suites/variant_p0/schema_change/schema_change.groovy similarity index 100% rename from regression-test/suites/variant_p0/schema_change.groovy rename to regression-test/suites/variant_p0/schema_change/schema_change.groovy diff --git a/regression-test/suites/variant_p0/schema_change/test_alter_table_column_with_delete.groovy b/regression-test/suites/variant_p0/schema_change/test_alter_table_column_with_delete.groovy new file mode 100644 index 0000000000..a090c9118a --- /dev/null +++ b/regression-test/suites/variant_p0/schema_change/test_alter_table_column_with_delete.groovy @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_alter_variant_table_column_with_delete") { + def tbName1 = "alter_table_column_dup_with_delete" + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + sql "DROP TABLE IF EXISTS ${tbName1}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbName1} ( + k1 INT, + v variant, + vv double + ) + UNIQUE KEY (k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); + """ + + sql """insert into ${tbName1} values(1,'{"a":1, "b":2, "c":3}',1);""" + sql """insert into ${tbName1} values(2,'{"d":4, "e":5, "f":6}',2);""" + sql """delete from ${tbName1} where k1 = 2;""" + sql """insert into ${tbName1} values(3,'{"g":7, "h":8, "i":9}',3);""" + sql """insert into ${tbName1} values(4,'{"j":10, "k":11, "l":12}',4);""" + qt_sql """select * from ${tbName1} order by k1;""" + + + sql """ + ALTER TABLE ${tbName1} + MODIFY COLUMN vv text; + """ + int max_try_secs = 120 + while (max_try_secs--) { + String res = getJobState(tbName1) + if (res == "FINISHED") { + sleep(3000) + break + } else { + Thread.sleep(500) + if (max_try_secs < 1) { + println "test timeout," + "state:" + res + assertEquals("FINISHED",res) + } + } + } + + sql """insert into ${tbName1} values(2,'{"x":4, "y":5, "f":3}',2);""" + qt_sql "select * from ${tbName1} order by k1;" + //sql "DROP TABLE ${tbName1} FORCE;" +} diff --git a/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy b/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy new file mode 100644 index 0000000000..335d4f9901 --- /dev/null +++ b/regression-test/suites/variant_p0/schema_change/test_double_write_when_schema_change.groovy @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("double_write_schema_change_with_variant") { + def set_be_config = { key, value -> + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def table_name = "github_events" + sql """DROP TABLE IF EXISTS ${table_name}""" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant, + change_column double, + INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 2 + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + + set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "6294967296") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-2.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-3.json'}""") + + def getJobState = { indexName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${indexName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def insert_sql = """ insert into ${table_name} values(1, '{"id":"25061216922","type":"PushEvent","actor":{"id":100067519,"login":"savorfamily","display_login":"savorfamily","gravatar_id":"123","url":"https://api.github.com/users/savorfamily","avatar_url":"https://avatars.githubusercontent.com/u/100067519?"},"repo":{"id":461434218,"name":"savorfamily/upptime","url":"https://api.github.com/repos/savorfamily/upptime"},"payload":{"push_id":11572320522,"size":1,"distinct_size":1,"ref":"refs/heads/master","head":"81106d369f763cb729d9d77610ace252c9db53f0","before":"2bf823a4febcf809da126828ecef7617c8cc48ea","commits":[{"sha":"81106d369f763cb729d9d77610ace252c9db53f0","author":{"email":"73812536+upptime-bot@users.noreply.github.com","name":"Upptime Bot"},"message":":bento: Update graphs [skip ci]","distinct":true,"url":"https://api.github.com/repos/savorfamily/upptime/commits/81106d369f763cb729d9d77610ace252c9db53f0"}]},"public":true,"created_at":"2022-11-07T02:00:00Z"}', "123111.0") """ + + def double_write = { -> + int max_try_time = 3000 + while (max_try_time--){ + String result = getJobState(table_name) + if (result == "FINISHED") { + sleep(3000) + break + } else { + if (result == "RUNNING") { + sql insert_sql + } + sleep(200) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + } + + qt_sql "select v:type, v:id, v:created_at from ${table_name} where cast(v:id as bigint) != 25061216922 order by k, cast(v:id as bigint) limit 10" + + sql """ ALTER TABLE ${table_name} modify COLUMN change_column text""" + double_write.call() + + sql """ALTER TABLE ${table_name} drop index idx_var""" + double_write.call() + qt_sql "select v:type, v:id, v:created_at from ${table_name} where cast(v:id as bigint) != 25061216922 order by k, cast(v:id as bigint) limit 10" + + createMV("create materialized view xxx as select k, sum(k) from ${table_name} group by k order by k;") + qt_sql "select v:type, v:id, v:created_at from ${table_name} where cast(v:id as bigint) != 25061216922 order by k, cast(v:id as bigint) limit 10" + set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "2147483648") +}