backport: https://github.com/apache/doris/pull/43244/
This commit is contained in:
@ -46,6 +46,10 @@ public class ModifyColumnClause extends AlterTableClause {
|
||||
return column;
|
||||
}
|
||||
|
||||
public void setColumn(Column column) {
|
||||
this.column = column;
|
||||
}
|
||||
|
||||
public ColumnPosition getColPos() {
|
||||
return colPos;
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.AlterClause;
|
||||
import org.apache.doris.analysis.AlterTableStmt;
|
||||
import org.apache.doris.analysis.ColumnDef;
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.analysis.DbName;
|
||||
@ -26,10 +27,13 @@ import org.apache.doris.analysis.DistributionDesc;
|
||||
import org.apache.doris.analysis.DropTableStmt;
|
||||
import org.apache.doris.analysis.HashDistributionDesc;
|
||||
import org.apache.doris.analysis.KeysDesc;
|
||||
import org.apache.doris.analysis.ModifyColumnClause;
|
||||
import org.apache.doris.analysis.ModifyPartitionClause;
|
||||
import org.apache.doris.analysis.PartitionDesc;
|
||||
import org.apache.doris.analysis.RangePartitionDesc;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.analysis.TypeDef;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
@ -43,6 +47,7 @@ import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -52,6 +57,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
|
||||
public class InternalSchemaInitializer extends Thread {
|
||||
|
||||
public static final int TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 5;
|
||||
@ -66,6 +72,7 @@ public class InternalSchemaInitializer extends Thread {
|
||||
if (!FeConstants.enableInternalSchemaDb) {
|
||||
return;
|
||||
}
|
||||
modifyColumnStatsTblSchema();
|
||||
while (!created()) {
|
||||
try {
|
||||
FrontendNodeType feType = Env.getCurrentEnv().getFeType();
|
||||
@ -95,6 +102,77 @@ public class InternalSchemaInitializer extends Thread {
|
||||
modifyTblReplicaCount(database, AuditLoader.AUDIT_LOG_TABLE);
|
||||
}
|
||||
|
||||
public void modifyColumnStatsTblSchema() {
|
||||
while (true) {
|
||||
try {
|
||||
Table table = findStatsTable();
|
||||
if (table == null) {
|
||||
break;
|
||||
}
|
||||
table.writeLock();
|
||||
try {
|
||||
doSchemaChange(table);
|
||||
break;
|
||||
} finally {
|
||||
table.writeUnlock();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed to do schema change for stats table. Try again later.", t);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 1000);
|
||||
} catch (InterruptedException t) {
|
||||
// IGNORE
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Table findStatsTable() {
|
||||
// 1. check database exist
|
||||
Optional<Database> dbOpt = Env.getCurrentEnv().getInternalCatalog().getDb(FeConstants.INTERNAL_DB_NAME);
|
||||
if (!dbOpt.isPresent()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 2. check table exist
|
||||
Database db = dbOpt.get();
|
||||
Optional<Table> tableOp = db.getTable(StatisticConstants.STATISTIC_TBL_NAME);
|
||||
return tableOp.orElse(null);
|
||||
}
|
||||
|
||||
public void doSchemaChange(Table table) throws UserException {
|
||||
List<AlterClause> clauses = getModifyColumnClauses(table);
|
||||
if (!clauses.isEmpty()) {
|
||||
TableName tableName = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME,
|
||||
StatisticConstants.DB_NAME, table.getName());
|
||||
AlterTableStmt alter = new AlterTableStmt(tableName, clauses);
|
||||
Env.getCurrentEnv().alterTable(alter);
|
||||
}
|
||||
}
|
||||
|
||||
public List<AlterClause> getModifyColumnClauses(Table table) {
|
||||
List<AlterClause> clauses = Lists.newArrayList();
|
||||
for (Column col : table.fullSchema) {
|
||||
if (col.isKey() && col.getType().isVarchar()
|
||||
&& col.getType().getLength() < StatisticConstants.MAX_NAME_LEN) {
|
||||
TypeDef typeDef = new TypeDef(
|
||||
ScalarType.createVarchar(StatisticConstants.MAX_NAME_LEN), col.isAllowNull());
|
||||
ColumnDef columnDef = new ColumnDef(col.getName(), typeDef, true, null,
|
||||
col.isAllowNull(), -1, new ColumnDef.DefaultValue(false, null), "");
|
||||
try {
|
||||
columnDef.analyze(true);
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("Failed to analyze column {}", col.getName());
|
||||
continue;
|
||||
}
|
||||
ModifyColumnClause clause = new ModifyColumnClause(columnDef, null, null, Maps.newHashMap());
|
||||
clause.setColumn(columnDef.toColumn());
|
||||
clauses.add(clause);
|
||||
}
|
||||
}
|
||||
return clauses;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static void modifyTblReplicaCount(Database database, String tblName) {
|
||||
if (!(Config.min_replication_num_per_tablet < StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM
|
||||
|
||||
@ -34,7 +34,7 @@ public class StatisticConstants {
|
||||
public static final String STATISTIC_TBL_NAME = "column_statistics";
|
||||
public static final String HISTOGRAM_TBL_NAME = "histogram_statistics";
|
||||
|
||||
public static final int MAX_NAME_LEN = 64;
|
||||
public static final int MAX_NAME_LEN = 1024;
|
||||
|
||||
public static final int ID_LEN = 4096;
|
||||
|
||||
|
||||
@ -0,0 +1,75 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.AlterClause;
|
||||
import org.apache.doris.analysis.ModifyColumnClause;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
import org.apache.doris.statistics.StatisticConstants;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
class InternalSchemaInitializerTest {
|
||||
@Test
|
||||
public void testGetModifyColumn() {
|
||||
new MockUp<HMSExternalTable>() {
|
||||
@Mock
|
||||
public HMSExternalTable.DLAType getDlaType() {
|
||||
return HMSExternalTable.DLAType.HUDI;
|
||||
}
|
||||
};
|
||||
|
||||
InternalSchemaInitializer initializer = new InternalSchemaInitializer();
|
||||
OlapTable table = new OlapTable();
|
||||
Column key1 = new Column("key1", ScalarType.createVarcharType(100), true, null, false, null, "");
|
||||
Column key2 = new Column("key2", ScalarType.createVarcharType(100), true, null, true, null, "");
|
||||
Column key3 = new Column("key3", ScalarType.createVarcharType(1024), true, null, null, "");
|
||||
Column key4 = new Column("key4", ScalarType.createVarcharType(1025), true, null, null, "");
|
||||
Column key5 = new Column("key5", ScalarType.INT, true, null, null, "");
|
||||
Column value1 = new Column("value1", ScalarType.INT, false, null, null, "");
|
||||
Column value2 = new Column("value2", ScalarType.createVarcharType(100), false, null, null, "");
|
||||
List<Column> schema = Lists.newArrayList();
|
||||
schema.add(key1);
|
||||
schema.add(key2);
|
||||
schema.add(key3);
|
||||
schema.add(key4);
|
||||
schema.add(key5);
|
||||
schema.add(value1);
|
||||
schema.add(value2);
|
||||
table.fullSchema = schema;
|
||||
List<AlterClause> modifyColumnClauses = initializer.getModifyColumnClauses(table);
|
||||
Assertions.assertEquals(2, modifyColumnClauses.size());
|
||||
ModifyColumnClause clause1 = (ModifyColumnClause) modifyColumnClauses.get(0);
|
||||
Assertions.assertEquals("key1", clause1.getColumn().getName());
|
||||
Assertions.assertEquals(StatisticConstants.MAX_NAME_LEN, clause1.getColumn().getType().getLength());
|
||||
Assertions.assertFalse(clause1.getColumn().isAllowNull());
|
||||
|
||||
ModifyColumnClause clause2 = (ModifyColumnClause) modifyColumnClauses.get(1);
|
||||
Assertions.assertEquals("key2", clause2.getColumn().getName());
|
||||
Assertions.assertEquals(StatisticConstants.MAX_NAME_LEN, clause2.getColumn().getType().getLength());
|
||||
Assertions.assertTrue(clause2.getColumn().isAllowNull());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user