[regression-test](Variant) Add more cases related to schema changes (#27958)

* [regression-test](Variant) Add more cases related to schema changes

And fix bugs about schema change for variant:
fix bug schema change crash on doing schema change with tablet schema that contains extracted columns
This commit is contained in:
lihangyu
2023-12-08 10:15:12 +08:00
committed by GitHub
parent 1d345877ce
commit e75d91c91b
13 changed files with 318 additions and 48 deletions

View File

@ -53,6 +53,7 @@
#include "olap/segment_loader.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
@ -424,7 +425,8 @@ Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet,
TabletSchemaSPtr base_tablet_schema) {
TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) {
// In some cases, there may be more than one type of rowset in a tablet,
// in which case the conversion cannot be done directly by linked schema change,
// but requires direct schema change to rewrite the data.
@ -433,8 +435,8 @@ Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWr
<< " in base tablet is not same as type " << rowset_writer->type()
<< ", use direct schema change.";
return SchemaChangeHandler::get_sc_procedure(_changer, false, true)
->process(rowset_reader, rowset_writer, new_tablet, base_tablet,
base_tablet_schema);
->process(rowset_reader, rowset_writer, new_tablet, base_tablet, base_tablet_schema,
new_tablet_schema);
} else {
Status status = rowset_writer->add_rowset_for_linked_schema_change(rowset_reader->rowset());
if (!status) {
@ -467,10 +469,10 @@ Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWr
Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet,
TabletSchemaSPtr base_tablet_schema) {
TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) {
do {
auto new_block =
vectorized::Block::create_unique(new_tablet->tablet_schema()->create_block());
auto new_block = vectorized::Block::create_unique(new_tablet_schema->create_block());
auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block());
auto st = rowset_reader->next_block(ref_block.get());
@ -501,7 +503,8 @@ VSchemaChangeWithSorting::VSchemaChangeWithSorting(const BlockChanger& changer,
Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet,
TabletSchemaSPtr base_tablet_schema) {
TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) {
// for internal sorting
std::vector<std::unique_ptr<vectorized::Block>> blocks;
@ -529,7 +532,8 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
auto [rowset, guard] = DORIS_TRY(_internal_sorting(
blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second),
newest_write_timestamp, new_tablet, BETA_ROWSET, segments_overlap));
newest_write_timestamp, new_tablet, BETA_ROWSET, segments_overlap,
new_tablet_schema));
src_rowsets.push_back(std::move(rowset));
pending_rs_guards.push_back(std::move(guard));
for (auto& block : blocks) {
@ -542,7 +546,7 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
return Status::OK();
};
auto new_block = vectorized::Block::create_unique(new_tablet->tablet_schema()->create_block());
auto new_block = vectorized::Block::create_unique(new_tablet_schema->create_block());
do {
auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block());
@ -573,8 +577,7 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
_mem_tracker->consume(new_block->allocated_bytes());
// move unique ptr
blocks.push_back(
vectorized::Block::create_unique(new_tablet->tablet_schema()->create_block()));
blocks.push_back(vectorized::Block::create_unique(new_tablet_schema->create_block()));
swap(blocks.back(), new_block);
} while (true);
@ -583,7 +586,8 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
if (src_rowsets.empty()) {
RETURN_IF_ERROR(rowset_writer->flush());
} else {
RETURN_IF_ERROR(_external_sorting(src_rowsets, rowset_writer, new_tablet));
RETURN_IF_ERROR(
_external_sorting(src_rowsets, rowset_writer, new_tablet, new_tablet_schema));
}
return Status::OK();
@ -592,15 +596,15 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
Result<std::pair<RowsetSharedPtr, PendingRowsetGuard>> VSchemaChangeWithSorting::_internal_sorting(
const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version,
int64_t newest_write_timestamp, TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap) {
SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) {
uint64_t merged_rows = 0;
MultiBlockMerger merger(new_tablet);
RowsetWriterContext context;
context.version = version;
context.rowset_state = VISIBLE;
context.segments_overlap = segments_overlap;
context.tablet_schema = new_tablet->tablet_schema();
context.original_tablet_schema = new_tablet->tablet_schema();
context.tablet_schema = new_tablet_schema;
context.original_tablet_schema = new_tablet_schema;
context.newest_write_timestamp = newest_write_timestamp;
context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
std::unique_ptr<RowsetWriter> rowset_writer;
@ -621,7 +625,8 @@ Result<std::pair<RowsetSharedPtr, PendingRowsetGuard>> VSchemaChangeWithSorting:
Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_rowsets,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet) {
TabletSharedPtr new_tablet,
TabletSchemaSPtr new_tablet_schema) {
std::vector<RowsetReaderSharedPtr> rs_readers;
for (auto& rowset : src_rowsets) {
RowsetReaderSharedPtr rs_reader;
@ -631,8 +636,7 @@ Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_
Merger::Statistics stats;
RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, ReaderType::READER_ALTER_TABLE,
new_tablet->tablet_schema(), rs_readers, rowset_writer,
&stats));
new_tablet_schema, rs_readers, rowset_writer, &stats));
_add_merged_rows(stats.merged_rows);
_add_filtered_rows(stats.filtered_rows);
return Status::OK();
@ -871,6 +875,11 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl));
sc_params.base_tablet = base_tablet;
sc_params.new_tablet = new_tablet;
// During a schema change, the extracted columns of a variant should not be included in the tablet schema.
// This is because the schema change for a variant needs to ignore the extracted columns.
// Otherwise, the schema types in different rowsets might be inconsistent. When performing a schema change,
// the complete variant is constructed by reading all the sub-columns of the variant.
sc_params.new_tablet_schema = new_tablet->tablet_schema()->copy_without_extracted_columns();
sc_params.ref_rowset_readers.reserve(rs_splits.size());
for (RowSetSplits& split : rs_splits) {
sc_params.ref_rowset_readers.emplace_back(split.rs_reader);
@ -1045,7 +1054,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
// Add filter information in change, and filter column information will be set in _parse_request
// And filter some data every time the row block changes
BlockChanger changer(sc_params.new_tablet->tablet_schema(), *sc_params.desc_tbl);
BlockChanger changer(sc_params.new_tablet_schema, *sc_params.desc_tbl);
bool sc_sorting = false;
bool sc_directly = false;
@ -1104,8 +1113,8 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
context.version = rs_reader->version();
context.rowset_state = VISIBLE;
context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();
context.tablet_schema = new_tablet->tablet_schema();
context.original_tablet_schema = new_tablet->tablet_schema();
context.tablet_schema = sc_params.new_tablet_schema;
context.original_tablet_schema = sc_params.new_tablet_schema;
context.newest_write_timestamp = rs_reader->newest_write_timestamp();
context.fs = rs_reader->rowset()->rowset_meta()->fs();
context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
@ -1119,7 +1128,8 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
auto pending_rs_guard = StorageEngine::instance()->add_pending_rowset(context);
if (res = sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet,
sc_params.base_tablet, sc_params.base_tablet_schema);
sc_params.base_tablet, sc_params.base_tablet_schema,
sc_params.new_tablet_schema);
!res) {
LOG(WARNING) << "failed to process the version."
<< " version=" << rs_reader->version().first << "-"
@ -1184,10 +1194,11 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
*sc_directly = true;
}
TabletSchemaSPtr new_tablet_schema = sc_params.new_tablet_schema;
// set column mapping
for (int i = 0, new_schema_size = new_tablet->tablet_schema()->num_columns();
i < new_schema_size; ++i) {
const TabletColumn& new_column = new_tablet->tablet_schema()->column(i);
for (int i = 0, new_schema_size = new_tablet_schema->num_columns(); i < new_schema_size; ++i) {
const TabletColumn& new_column = new_tablet_schema->column(i);
const std::string& column_name_lower = to_lower(new_column.name());
ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
column_mapping->new_column = &new_column;
@ -1264,7 +1275,6 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
}
}
TabletSchemaSPtr new_tablet_schema = new_tablet->tablet_schema();
if (base_tablet_schema->keys_type() != new_tablet_schema->keys_type()) {
// only when base table is dup and mv is agg
// the rollup job must be reagg.

View File

@ -106,7 +106,8 @@ public:
virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet,
TabletSchemaSPtr base_tablet_schema) {
TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) {
if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) {
RETURN_IF_ERROR(rowset_writer->flush());
return Status::OK();
@ -115,8 +116,8 @@ public:
_filtered_rows = 0;
_merged_rows = 0;
RETURN_IF_ERROR(
_inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet_schema));
RETURN_IF_ERROR(_inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet_schema,
new_tablet_schema));
_add_filtered_rows(rowset_reader->filtered_rows());
// Check row num changes
@ -140,7 +141,8 @@ protected:
void _add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; }
virtual Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) {
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) {
return Status::NotSupported("inner process unsupported.");
}
@ -169,7 +171,8 @@ public:
Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet,
TabletSchemaSPtr base_tablet_schema) override;
TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) override;
private:
const BlockChanger& _changer;
@ -182,7 +185,8 @@ public:
private:
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) override;
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) override;
bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const override {
return _changer.has_where() || SchemaChange::_check_row_nums(reader, writer);
@ -199,16 +203,17 @@ public:
private:
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) override;
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema,
TabletSchemaSPtr new_tablet_schema) override;
Result<std::pair<RowsetSharedPtr, PendingRowsetGuard>> _internal_sorting(
const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
const Version& temp_delta_versions, int64_t newest_write_timestamp,
TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap);
SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema);
Status _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet);
TabletSharedPtr new_tablet, TabletSchemaSPtr new_tablet_schema);
bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const override {
return _changer.has_where() || SchemaChange::_check_row_nums(reader, writer);
@ -257,6 +262,7 @@ private:
TabletSharedPtr base_tablet;
TabletSharedPtr new_tablet;
TabletSchemaSPtr base_tablet_schema = nullptr;
TabletSchemaSPtr new_tablet_schema = nullptr;
std::vector<RowsetReaderSharedPtr> ref_rowset_readers;
DeleteHandler* delete_handler = nullptr;
std::unordered_map<std::string, AlterMaterializedViewParam> materialized_params_map;

View File

@ -25,6 +25,7 @@
#include <cctype>
// IWYU pragma: no_include <bits/std_abs.h>
#include <cmath> // IWYU pragma: keep
#include <memory>
#include <ostream>
#include "common/compiler_util.h" // IWYU pragma: keep
@ -840,7 +841,7 @@ void TabletSchema::clear_columns() {
_cols.clear();
}
void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns) {
SCOPED_MEM_COUNT_BY_HOOK(&_mem_size);
_keys_type = schema.keys_type();
_num_columns = 0;
@ -858,6 +859,9 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
for (auto& column_pb : schema.column()) {
TabletColumn column;
column.init_from_pb(column_pb);
if (ignore_extracted_columns && column.is_extracted_column()) {
continue;
}
if (column.is_key()) {
_num_key_columns++;
}
@ -1009,6 +1013,14 @@ void TabletSchema::merge_dropped_columns(const TabletSchema& src_schema) {
}
}
TabletSchemaSPtr TabletSchema::copy_without_extracted_columns() {
TabletSchemaSPtr copy = std::make_shared<TabletSchema>();
TabletSchemaPB tablet_schema_pb;
this->to_schema_pb(&tablet_schema_pb);
copy->init_from_pb(tablet_schema_pb, true /*ignore extracted_columns*/);
return copy;
}
// Dropped column is in _field_id_to_index but not in _field_name_to_index
// Could refer to append_column method
bool TabletSchema::is_dropped_column(const TabletColumn& col) const {

View File

@ -233,7 +233,7 @@ public:
// manually init members incorrectly, and define a new function like
// void create_from_pb(const TabletSchemaPB& schema, TabletSchema* tablet_schema).
TabletSchema() = default;
void init_from_pb(const TabletSchemaPB& schema);
void init_from_pb(const TabletSchemaPB& schema, bool ignore_extracted_columns = false);
void to_schema_pb(TabletSchemaPB* tablet_meta_pb) const;
void append_column(TabletColumn column, ColumnType col_type = ColumnType::NORMAL);
void append_index(TabletIndex index);
@ -365,6 +365,8 @@ public:
vectorized::Block create_block_by_cids(const std::vector<uint32_t>& cids);
std::shared_ptr<TabletSchema> copy_without_extracted_columns();
private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
friend bool operator!=(const TabletSchema& a, const TabletSchema& b);

View File

@ -404,7 +404,7 @@ Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& variant_pos) {
try {
// Parse each variant column from raw string column
vectorized::schema_util::parse_variant_columns(block, variant_pos);
RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_pos));
vectorized::schema_util::finalize_variant_columns(block, variant_pos,
false /*not ingore sparse*/);
vectorized::schema_util::encode_variant_sparse_subcolumns(block, variant_pos);
@ -416,7 +416,7 @@ Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& va
return Status::OK();
}
void parse_variant_columns(Block& block, const std::vector<int>& variant_pos) {
Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos) {
for (int i = 0; i < variant_pos.size(); ++i) {
const auto& column_ref = block.get_by_position(variant_pos[i]).column;
bool is_nullable = column_ref->is_nullable();
@ -426,14 +426,23 @@ void parse_variant_columns(Block& block, const std::vector<int>& variant_pos) {
// already parsed
continue;
}
const auto& root = *var.get_root();
const auto& raw_json_column =
root.is_nullable()
? static_cast<const ColumnString&>(
static_cast<const ColumnNullable&>(root).get_nested_column())
: static_cast<const ColumnString&>(root);
ColumnPtr raw_json_column;
if (WhichDataType(remove_nullable(var.get_root_type())).is_json()) {
// TODO more efficient way to parse jsonb type, currently we just convert jsonb to
// json str and parse them into variant
RETURN_IF_ERROR(cast_column({var.get_root(), var.get_root_type(), ""},
std::make_shared<DataTypeString>(), &raw_json_column));
} else {
const auto& root = *var.get_root();
raw_json_column =
root.is_nullable()
? static_cast<const ColumnNullable&>(root).get_nested_column_ptr()
: var.get_root();
}
MutableColumnPtr variant_column = ColumnObject::create(true);
parse_json_to_variant(*variant_column.get(), raw_json_column);
parse_json_to_variant(*variant_column.get(),
assert_cast<const ColumnString&>(*raw_json_column));
// Wrap variant with nullmap if it is nullable
ColumnPtr result = variant_column->get_ptr();
if (is_nullable) {
@ -444,6 +453,7 @@ void parse_variant_columns(Block& block, const std::vector<int>& variant_pos) {
block.get_by_position(variant_pos[i]).column = result;
// block.get_by_position(variant_pos[i]).type = std::make_shared<DataTypeObject>("json", true);
}
return Status::OK();
}
void finalize_variant_columns(Block& block, const std::vector<int>& variant_pos,

View File

@ -87,7 +87,7 @@ TabletColumn get_least_type_column(const TabletColumn& original, const DataTypeP
// 2. finalize variant column to each subcolumn least commn types, default ignore sparse sub columns
// 2. encode sparse sub columns
Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& variant_pos);
void parse_variant_columns(Block& block, const std::vector<int>& variant_pos);
Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos);
void finalize_variant_columns(Block& block, const std::vector<int>& variant_pos,
bool ignore_sparse = true);
void encode_variant_sparse_subcolumns(Block& block, const std::vector<int>& variant_pos);

View File

@ -2074,7 +2074,7 @@ private:
if (variant.is_scalar_variant()) {
ColumnPtr nested = variant.get_root();
auto nested_from_type = variant.get_root_type();
DCHECK(nested_from_type->is_nullable());
// DCHECK(nested_from_type->is_nullable());
DCHECK(!data_type_to->is_nullable());
auto new_context = context->clone();
new_context->set_jsonb_string_as_string(true);

View File

@ -0,0 +1,12 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 {"c":3,"b":2,"a":1} 1.0
3 {"g":7,"h":8,"i":9} 3.0
4 {"k":11,"l":12,"j":10} 4.0
-- !sql --
1 {"c":3,"b":2,"a":1} 1
2 {"y":5,"x":4,"f":3} 2
3 {"g":7,"h":8,"i":9} 3
4 {"k":11,"l":12,"j":10} 4

View File

@ -0,0 +1,37 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
PushEvent 2489368070 2015-01-01T00:00:00Z
PushEvent 2489395767 2015-01-01T01:00:00Z
PushEvent 2489418154 2015-01-01T02:00:00Z
PullRequestEvent 2489436569 2015-01-01T03:00:01Z
PushEvent 2489368072 2015-01-01T00:00:00Z
PushEvent 2489395768 2015-01-01T01:00:00Z
PushEvent 2489418155 2015-01-01T02:00:00Z
GollumEvent 2489436580 2015-01-01T03:00:02Z
CreateEvent 2489368089 2015-01-01T00:00:01Z
PullRequestEvent 2489395770 2015-01-01T01:00:02Z
-- !sql --
PushEvent 2489368070 2015-01-01T00:00:00Z
PushEvent 2489395767 2015-01-01T01:00:00Z
PushEvent 2489418154 2015-01-01T02:00:00Z
PullRequestEvent 2489436569 2015-01-01T03:00:01Z
PushEvent 2489368072 2015-01-01T00:00:00Z
PushEvent 2489395768 2015-01-01T01:00:00Z
PushEvent 2489418155 2015-01-01T02:00:00Z
GollumEvent 2489436580 2015-01-01T03:00:02Z
CreateEvent 2489368089 2015-01-01T00:00:01Z
PullRequestEvent 2489395770 2015-01-01T01:00:02Z
-- !sql --
PushEvent 2489368070 2015-01-01T00:00:00Z
PushEvent 2489395767 2015-01-01T01:00:00Z
PushEvent 2489418154 2015-01-01T02:00:00Z
PullRequestEvent 2489436569 2015-01-01T03:00:01Z
PushEvent 2489368072 2015-01-01T00:00:00Z
PushEvent 2489395768 2015-01-01T01:00:00Z
PushEvent 2489418155 2015-01-01T02:00:00Z
GollumEvent 2489436580 2015-01-01T03:00:02Z
CreateEvent 2489368089 2015-01-01T00:00:01Z
PullRequestEvent 2489395770 2015-01-01T01:00:02Z

View File

@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_alter_variant_table_column_with_delete") {
def tbName1 = "alter_table_column_dup_with_delete"
def getJobState = { tableName ->
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
return jobStateResult[0][9]
}
sql "DROP TABLE IF EXISTS ${tbName1}"
sql """
CREATE TABLE IF NOT EXISTS ${tbName1} (
k1 INT,
v variant,
vv double
)
UNIQUE KEY (k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1 properties("replication_num" = "1", "enable_unique_key_merge_on_write" = "true");
"""
sql """insert into ${tbName1} values(1,'{"a":1, "b":2, "c":3}',1);"""
sql """insert into ${tbName1} values(2,'{"d":4, "e":5, "f":6}',2);"""
sql """delete from ${tbName1} where k1 = 2;"""
sql """insert into ${tbName1} values(3,'{"g":7, "h":8, "i":9}',3);"""
sql """insert into ${tbName1} values(4,'{"j":10, "k":11, "l":12}',4);"""
qt_sql """select * from ${tbName1} order by k1;"""
sql """
ALTER TABLE ${tbName1}
MODIFY COLUMN vv text;
"""
int max_try_secs = 120
while (max_try_secs--) {
String res = getJobState(tbName1)
if (res == "FINISHED") {
sleep(3000)
break
} else {
Thread.sleep(500)
if (max_try_secs < 1) {
println "test timeout," + "state:" + res
assertEquals("FINISHED",res)
}
}
}
sql """insert into ${tbName1} values(2,'{"x":4, "y":5, "f":3}',2);"""
qt_sql "select * from ${tbName1} order by k1;"
//sql "DROP TABLE ${tbName1} FORCE;"
}

View File

@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("double_write_schema_change_with_variant") {
def set_be_config = { key, value ->
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
def load_json_data = {table_name, file_name ->
// load the json data
streamLoad {
table "${table_name}"
// set http request header params
set 'read_json_by_line', 'true'
set 'format', 'json'
set 'max_filter_ratio', '0.1'
file file_name // import json file
time 10000 // limit inflight 10s
// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
logger.info("Stream load ${file_name} result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
// assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
def table_name = "github_events"
sql """DROP TABLE IF EXISTS ${table_name}"""
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
k bigint,
v variant,
change_column double,
INDEX idx_var(v) USING INVERTED PROPERTIES("parser" = "english") COMMENT ''
)
DUPLICATE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 2
properties("replication_num" = "1", "disable_auto_compaction" = "false");
"""
set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "6294967296")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-2.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-3.json'}""")
def getJobState = { indexName ->
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${indexName}' ORDER BY createtime DESC LIMIT 1 """
return jobStateResult[0][9]
}
def insert_sql = """ insert into ${table_name} values(1, '{"id":"25061216922","type":"PushEvent","actor":{"id":100067519,"login":"savorfamily","display_login":"savorfamily","gravatar_id":"123","url":"https://api.github.com/users/savorfamily","avatar_url":"https://avatars.githubusercontent.com/u/100067519?"},"repo":{"id":461434218,"name":"savorfamily/upptime","url":"https://api.github.com/repos/savorfamily/upptime"},"payload":{"push_id":11572320522,"size":1,"distinct_size":1,"ref":"refs/heads/master","head":"81106d369f763cb729d9d77610ace252c9db53f0","before":"2bf823a4febcf809da126828ecef7617c8cc48ea","commits":[{"sha":"81106d369f763cb729d9d77610ace252c9db53f0","author":{"email":"73812536+upptime-bot@users.noreply.github.com","name":"Upptime Bot"},"message":":bento: Update graphs [skip ci]","distinct":true,"url":"https://api.github.com/repos/savorfamily/upptime/commits/81106d369f763cb729d9d77610ace252c9db53f0"}]},"public":true,"created_at":"2022-11-07T02:00:00Z"}', "123111.0") """
def double_write = { ->
int max_try_time = 3000
while (max_try_time--){
String result = getJobState(table_name)
if (result == "FINISHED") {
sleep(3000)
break
} else {
if (result == "RUNNING") {
sql insert_sql
}
sleep(200)
if (max_try_time < 1){
assertEquals(1,2)
}
}
}
}
qt_sql "select v:type, v:id, v:created_at from ${table_name} where cast(v:id as bigint) != 25061216922 order by k, cast(v:id as bigint) limit 10"
sql """ ALTER TABLE ${table_name} modify COLUMN change_column text"""
double_write.call()
sql """ALTER TABLE ${table_name} drop index idx_var"""
double_write.call()
qt_sql "select v:type, v:id, v:created_at from ${table_name} where cast(v:id as bigint) != 25061216922 order by k, cast(v:id as bigint) limit 10"
createMV("create materialized view xxx as select k, sum(k) from ${table_name} group by k order by k;")
qt_sql "select v:type, v:id, v:created_at from ${table_name} where cast(v:id as bigint) != 25061216922 order by k, cast(v:id as bigint) limit 10"
set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "2147483648")
}