pick #44361
This commit is contained in:
@ -411,9 +411,12 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
// drop bloom filter column
|
||||
Set<String> bfCols = olapTable.getCopiedBfColumns();
|
||||
if (bfCols != null) {
|
||||
Set<String> newBfCols = new HashSet<>();
|
||||
Set<String> newBfCols = null;
|
||||
for (String bfCol : bfCols) {
|
||||
if (!bfCol.equalsIgnoreCase(dropColName)) {
|
||||
if (newBfCols == null) {
|
||||
newBfCols = Sets.newHashSet();
|
||||
}
|
||||
newBfCols.add(bfCol);
|
||||
}
|
||||
}
|
||||
@ -2801,6 +2804,25 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
LOG.info("finished modify table's add or drop or modify columns. table: {}, job: {}, is replay: {}",
|
||||
olapTable.getName(), jobId, isReplay);
|
||||
}
|
||||
// for bloom filter, rebuild bloom filter info by table schema in replay
|
||||
if (isReplay) {
|
||||
Set<String> bfCols = olapTable.getCopiedBfColumns();
|
||||
if (bfCols != null) {
|
||||
List<Column> columns = olapTable.getBaseSchema();
|
||||
Set<String> newBfCols = null;
|
||||
for (String bfCol : bfCols) {
|
||||
for (Column column : columns) {
|
||||
if (column.getName().equalsIgnoreCase(bfCol)) {
|
||||
if (newBfCols == null) {
|
||||
newBfCols = Sets.newHashSet();
|
||||
}
|
||||
newBfCols.add(column.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
olapTable.setBloomFilterInfo(newBfCols, olapTable.getBfFpp());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info) throws MetaNotFoundException {
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !select --
|
||||
1 1
|
||||
1 1 1
|
||||
|
||||
-- !select --
|
||||
1 \N
|
||||
|
||||
@ -21,13 +21,14 @@ suite("test_bloom_filter_drop_column") {
|
||||
|
||||
sql """CREATE TABLE IF NOT EXISTS ${table_name} (
|
||||
`a` varchar(150) NULL,
|
||||
`c1` varchar(10)
|
||||
`c1` varchar(10),
|
||||
`c2` varchar(10)
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`a`)
|
||||
DISTRIBUTED BY HASH(`a`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"bloom_filter_columns" = "c1",
|
||||
"bloom_filter_columns" = "c1, c2",
|
||||
"in_memory" = "false",
|
||||
"storage_format" = "V2"
|
||||
)"""
|
||||
@ -51,12 +52,12 @@ suite("test_bloom_filter_drop_column") {
|
||||
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout")
|
||||
}
|
||||
|
||||
def assertShowCreateTableWithRetry = { tableName, expectedCondition, maxRetries, waitSeconds ->
|
||||
def assertShowCreateTableWithRetry = { tableName, expectedCondition, contains, maxRetries, waitSeconds ->
|
||||
int attempt = 0
|
||||
while (attempt < maxRetries) {
|
||||
def res = sql """SHOW CREATE TABLE ${tableName}"""
|
||||
log.info("Attempt ${attempt + 1}: show table: ${res}")
|
||||
if (res && res.size() > 0 && res[0][1].contains(expectedCondition)) {
|
||||
if (res && res.size() > 0 && ((contains && res[0][1].contains(expectedCondition)) || (!contains && !res[0][1].contains(expectedCondition)))) {
|
||||
logger.info("Attempt ${attempt + 1}: Condition met.")
|
||||
return
|
||||
} else {
|
||||
@ -70,26 +71,43 @@ suite("test_bloom_filter_drop_column") {
|
||||
def finalRes = sql """SHOW CREATE TABLE ${tableName}"""
|
||||
log.info("Final attempt: show table: ${finalRes}")
|
||||
assertTrue(finalRes && finalRes.size() > 0, "SHOW CREATE TABLE return empty or null")
|
||||
assertTrue(finalRes[0][1].contains(expectedCondition), "expected\"${expectedCondition}\",actural: ${finalRes[0][1]}")
|
||||
if (contains) {
|
||||
assertTrue(finalRes[0][1].contains(expectedCondition), "expected to contain \"${expectedCondition}\", actual: ${finalRes[0][1]}")
|
||||
} else {
|
||||
assertTrue(!finalRes[0][1].contains(expectedCondition), "expected not to contain \"${expectedCondition}\", actual: ${finalRes[0][1]}")
|
||||
}
|
||||
}
|
||||
|
||||
sql """INSERT INTO ${table_name} values ('1', '1')"""
|
||||
sql """INSERT INTO ${table_name} values ('1', '1', '1')"""
|
||||
sql "sync"
|
||||
|
||||
qt_select """select * from ${table_name} order by a"""
|
||||
|
||||
assertShowCreateTableWithRetry(table_name, "\"bloom_filter_columns\" = \"c1, c2\"", true, 3, 30)
|
||||
// drop column c1
|
||||
sql """ALTER TABLE ${table_name} DROP COLUMN c1"""
|
||||
wait_for_latest_op_on_table_finish(table_name, timeout)
|
||||
sql "sync"
|
||||
|
||||
// show create table with retry logic
|
||||
assertShowCreateTableWithRetry(table_name, "\"bloom_filter_columns\" = \"\"", 3, 30)
|
||||
assertShowCreateTableWithRetry(table_name, "\"bloom_filter_columns\" = \"c2\"", true, 3, 30)
|
||||
|
||||
// drop column c2
|
||||
sql """ALTER TABLE ${table_name} DROP COLUMN c2"""
|
||||
wait_for_latest_op_on_table_finish(table_name, timeout)
|
||||
sql "sync"
|
||||
|
||||
// show create table with retry logic
|
||||
assertShowCreateTableWithRetry(table_name, "\"bloom_filter_columns\" = \"\"", false, 3, 30)
|
||||
|
||||
// add new column c1
|
||||
sql """ALTER TABLE ${table_name} ADD COLUMN c1 ARRAY<STRING>"""
|
||||
wait_for_latest_op_on_table_finish(table_name, timeout)
|
||||
sql "sync"
|
||||
|
||||
// insert data
|
||||
sql """INSERT INTO ${table_name} values ('2', null)"""
|
||||
sql "sync"
|
||||
// select data
|
||||
qt_select """select * from ${table_name} order by a"""
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user