[Feature] Support Alter Table Clause For External Table (#4699)

* Support Alter Table Clause For External Table
5 alter operation be supported:

* RENAME
* ADD COLUMN
* DROP COLUMN
* MODIFY COLUMN
* REORDER COLUMN
This commit is contained in:
HappenLee
2020-10-25 17:17:56 +08:00
committed by GitHub
parent 2fa3ffda7b
commit f89b660c94
14 changed files with 724 additions and 74 deletions

View File

@ -154,6 +154,92 @@ public class Alter {
}
}
private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable, List<AlterClause> alterClauses,
final String clusterName, Database db) throws UserException {
stmt.rewriteAlterClause(olapTable);
// check conflict alter ops first
alterClauses.addAll(stmt.getOps());
AlterOperations currentAlterOps = new AlterOperations();
currentAlterOps.checkConflict(alterClauses);
// check cluster capacity and db quota, only need to check once.
if (currentAlterOps.needCheckCapacity()) {
Catalog.getCurrentSystemInfo().checkClusterCapacity(clusterName);
db.checkQuota();
}
if (olapTable.getState() != OlapTableState.NORMAL) {
throw new DdlException(
"Table[" + olapTable.getName() + "]'s state is not NORMAL. Do not allow doing ALTER ops");
}
boolean needProcessOutsideDatabaseLock = false;
if (currentAlterOps.hasSchemaChangeOp()) {
// if modify storage type to v2, do schema change to convert all related tablets to segment v2 format
schemaChangeHandler.process(alterClauses, clusterName, db, olapTable);
} else if (currentAlterOps.hasRollupOp()) {
materializedViewHandler.process(alterClauses, clusterName, db, olapTable);
} else if (currentAlterOps.hasPartitionOp()) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
if (alterClause instanceof DropPartitionClause) {
if (!((DropPartitionClause) alterClause).isTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed((OlapTable) db.getTable(olapTable.getName()));
}
Catalog.getCurrentCatalog().dropPartition(db, olapTable, ((DropPartitionClause) alterClause));
} else if (alterClause instanceof ReplacePartitionClause) {
Catalog.getCurrentCatalog().replaceTempPartition(db, olapTable.getName(), (ReplacePartitionClause) alterClause);
} else if (alterClause instanceof ModifyPartitionClause) {
ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause);
// expand the partition names if it is 'Modify Partition(*)'
if (clause.isNeedExpand()) {
List<String> partitionNames = clause.getPartitionNames();
partitionNames.clear();
for (Partition partition : olapTable.getPartitions()) {
partitionNames.add(partition.getName());
}
}
Map<String, String> properties = clause.getProperties();
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
needProcessOutsideDatabaseLock = true;
} else {
List<String> partitionNames = clause.getPartitionNames();
modifyPartitionsProperty(db, olapTable, partitionNames, properties);
}
} else if (alterClause instanceof AddPartitionClause) {
needProcessOutsideDatabaseLock = true;
} else {
throw new DdlException("Invalid alter operation: " + alterClause.getOpType());
}
} else if (currentAlterOps.hasRenameOp()) {
processRename(db, olapTable, alterClauses);
} else if (currentAlterOps.hasReplaceTableOp()) {
processReplaceTable(db, olapTable, alterClauses);
} else if (currentAlterOps.contains(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC)) {
needProcessOutsideDatabaseLock = true;
} else {
throw new DdlException("Invalid alter operations: " + currentAlterOps);
}
return needProcessOutsideDatabaseLock;
}
private void processAlterExternalTable(AlterTableStmt stmt, Table externalTable, Database db) throws UserException {
stmt.rewriteAlterClause(externalTable);
// check conflict alter ops first
List<AlterClause> alterClauses = stmt.getOps();
AlterOperations currentAlterOps = new AlterOperations();
currentAlterOps.checkConflict(alterClauses);
if (currentAlterOps.hasRenameOp()) {
processRename(db, externalTable, alterClauses);
} else if (currentAlterOps.hasSchemaChangeOp()) {
schemaChangeHandler.processExternalTable(alterClauses, db, externalTable);
}
}
public void processAlterTable(AlterTableStmt stmt) throws UserException {
TableName dbTableName = stmt.getTbl();
String dbName = dbTableName.getDb();
@ -163,7 +249,7 @@ public class Alter {
if (db == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
List<AlterClause> alterClauses;
List<AlterClause> alterClauses = Lists.newArrayList();
// some operations will take long time to process, need to be done outside the database lock
boolean needProcessOutsideDatabaseLock = false;
@ -175,73 +261,18 @@ public class Alter {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_TABLE_ERROR, tableName);
}
if (table.getType() != TableType.OLAP) {
throw new DdlException("Do not support alter non-OLAP table[" + tableName + "]");
}
OlapTable olapTable = (OlapTable) table;
stmt.rewriteAlterClause(olapTable);
// check conflict alter ops first
alterClauses = stmt.getOps();
AlterOperations currentAlterOps = new AlterOperations();
currentAlterOps.checkConflict(alterClauses);
// check cluster capacity and db quota, only need to check once.
if (currentAlterOps.needCheckCapacity()) {
Catalog.getCurrentSystemInfo().checkClusterCapacity(clusterName);
db.checkQuota();
}
if (olapTable.getState() != OlapTableState.NORMAL) {
throw new DdlException(
"Table[" + table.getName() + "]'s state is not NORMAL. Do not allow doing ALTER ops");
}
if (currentAlterOps.hasSchemaChangeOp()) {
// if modify storage type to v2, do schema change to convert all related tablets to segment v2 format
schemaChangeHandler.process(alterClauses, clusterName, db, olapTable);
} else if (currentAlterOps.hasRollupOp()) {
materializedViewHandler.process(alterClauses, clusterName, db, olapTable);
} else if (currentAlterOps.hasPartitionOp()) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
if (alterClause instanceof DropPartitionClause) {
if (!((DropPartitionClause) alterClause).isTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed((OlapTable) db.getTable(tableName));
}
Catalog.getCurrentCatalog().dropPartition(db, olapTable, ((DropPartitionClause) alterClause));
} else if (alterClause instanceof ReplacePartitionClause) {
Catalog.getCurrentCatalog().replaceTempPartition(db, tableName, (ReplacePartitionClause) alterClause);
} else if (alterClause instanceof ModifyPartitionClause) {
ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause);
// expand the partition names if it is 'Modify Partition(*)'
if (clause.isNeedExpand()) {
List<String> partitionNames = clause.getPartitionNames();
partitionNames.clear();
for (Partition partition : olapTable.getPartitions()) {
partitionNames.add(partition.getName());
}
}
Map<String, String> properties = clause.getProperties();
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
needProcessOutsideDatabaseLock = true;
} else {
List<String> partitionNames = clause.getPartitionNames();
modifyPartitionsProperty(db, olapTable, partitionNames, properties);
}
} else if (alterClause instanceof AddPartitionClause) {
needProcessOutsideDatabaseLock = true;
} else {
throw new DdlException("Invalid alter operation: " + alterClause.getOpType());
}
} else if (currentAlterOps.hasRenameOp()) {
processRename(db, olapTable, alterClauses);
} else if (currentAlterOps.hasReplaceTableOp()) {
processReplaceTable(db, olapTable, alterClauses);
} else if (currentAlterOps.contains(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC)) {
needProcessOutsideDatabaseLock = true;
} else {
throw new DdlException("Invalid alter operations: " + currentAlterOps);
switch (table.getType()) {
case OLAP:
OlapTable olapTable = (OlapTable) table;
needProcessOutsideDatabaseLock = processAlterOlapTable(stmt, olapTable, alterClauses, clusterName, db);
break;
case ODBC:
case MYSQL:
case ELASTICSEARCH:
processAlterExternalTable(stmt, table, db);
return;
default:
throw new DdlException("Do not support alter " + table.getType().toString() + " table[" + tableName + "]");
}
} finally {
db.writeUnlock();
@ -462,6 +493,17 @@ public class Alter {
}
}
private void processRename(Database db, Table table, List<AlterClause> alterClauses) throws DdlException {
for (AlterClause alterClause : alterClauses) {
if (alterClause instanceof TableRenameClause) {
Catalog.getCurrentCatalog().renameTable(db, table, (TableRenameClause) alterClause);
break;
} else {
Preconditions.checkState(false);
}
}
}
/**
* Batch update partitions' properties
* caller should hold the db lock

View File

@ -26,6 +26,7 @@ import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@ -380,6 +381,12 @@ public abstract class AlterHandler extends MasterDaemon {
public abstract void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable)
throws UserException;
/*
* entry function. handle alter ops for external table
*/
public void processExternalTable(List<AlterClause> alterClauses, Database db, Table externalTable)
throws UserException {};
/*
* cancel alter ops
*/

View File

@ -132,6 +132,26 @@ public class SchemaChangeHandler extends AlterHandler {
indexSchemaMap, newColNameSet);
}
private void processAddColumn(AddColumnClause alterClause, Table externalTable, List<Column> newSchema) throws DdlException {
Column column = alterClause.getColumn();
ColumnPosition columnPos = alterClause.getColPos();
Set<String> newColNameSet = Sets.newHashSet(column.getName());
addColumnInternal(column, columnPos, newSchema, newColNameSet);
}
private void processAddColumns(AddColumnsClause alterClause, Table externalTable, List<Column> newSchema) throws DdlException {
List<Column> columns = alterClause.getColumns();
Set<String> newColNameSet = Sets.newHashSet();
for (Column column : alterClause.getColumns()) {
newColNameSet.add(column.getName());
}
for (Column newColumn : columns) {
addColumnInternal(newColumn, null, newSchema, newColNameSet);
}
}
private void processAddColumns(AddColumnsClause alterClause, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap) throws DdlException {
List<Column> columns = alterClause.getColumns();
@ -158,6 +178,31 @@ public class SchemaChangeHandler extends AlterHandler {
}
}
private void processDropColumn(DropColumnClause alterClause, Table externalTable, List<Column> newSchema) throws DdlException {
String dropColName = alterClause.getColName();
// find column in base index and remove it
boolean found = false;
Iterator<Column> baseIter = newSchema.iterator();
while (baseIter.hasNext()) {
Column column = baseIter.next();
if (column.getName().equalsIgnoreCase(dropColName)) {
if (newSchema.size() > 1) {
baseIter.remove();
found = true;
} else {
throw new DdlException("Do not allow remove last column of table: " + externalTable.getName()
+ " column: " + dropColName);
}
break;
}
}
if (!found) {
throw new DdlException("Column does not exists: " + dropColName);
}
}
private void processDropColumn(DropColumnClause alterClause, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes) throws DdlException {
String dropColName = alterClause.getColName();
@ -294,6 +339,75 @@ public class SchemaChangeHandler extends AlterHandler {
}
}
// User can modify column type and column position
private void processModifyColumn(ModifyColumnClause alterClause, Table externalTable, List<Column> newSchema) throws DdlException {
Column modColumn = alterClause.getColumn();
ColumnPosition columnPos = alterClause.getColPos();
// find modified column
String newColName = modColumn.getName();
boolean hasColPos = (columnPos != null && !columnPos.isFirst());
boolean found = false;
boolean typeChanged = false;
int modColIndex = -1;
int lastColIndex = -1;
for (int i = 0; i < newSchema.size(); i++) {
Column col = newSchema.get(i);
if (col.getName().equalsIgnoreCase(newColName)) {
modColIndex = i;
found = true;
if (!col.equals(modColumn)) {
typeChanged = true;
}
}
if (hasColPos) {
if (col.getName().equalsIgnoreCase(columnPos.getLastCol())) {
lastColIndex = i;
}
} else {
// save the last Key position
if (col.isKey()) {
lastColIndex = i;
}
}
}
// mod col not find
if (!found) {
throw new DdlException("Column[" + newColName + "] does not exists");
}
// last col not find
if (hasColPos && lastColIndex == -1) {
throw new DdlException("Column[" + columnPos.getLastCol() + "] does not exists");
}
// check if add to first
if (columnPos != null && columnPos.isFirst()) {
lastColIndex = -1;
hasColPos = true;
}
Column oriColumn = newSchema.get(modColIndex);
// retain old column name
modColumn.setName(oriColumn.getName());
// handle the move operation in 'indexForFindingColumn' if has
if (hasColPos) {
// move col
if (lastColIndex > modColIndex) {
newSchema.add(lastColIndex + 1, modColumn);
newSchema.remove(modColIndex);
} else if (lastColIndex < modColIndex) {
newSchema.remove(modColIndex);
newSchema.add(lastColIndex + 1, modColumn);
} else {
throw new DdlException("Column[" + columnPos.getLastCol() + "] modify position is invalid");
}
} else {
newSchema.set(modColIndex, modColumn);
}
}
// User can modify column type and column position
private void processModifyColumn(ModifyColumnClause alterClause, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap) throws DdlException {
@ -481,6 +595,38 @@ public class SchemaChangeHandler extends AlterHandler {
}
}
private void processReorderColumn(ReorderColumnsClause alterClause, Table externalTable, List<Column> newSchema) throws DdlException {
List<String> orderedColNames = alterClause.getColumnsByPos();
newSchema.clear();
List<Column> targetIndexSchema = externalTable.getBaseSchema();
// check and create new ordered column list
Set<String> colNameSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (String colName : orderedColNames) {
Column oneCol = null;
for (Column column : targetIndexSchema) {
if (column.getName().equalsIgnoreCase(colName) && column.isVisible()) {
oneCol = column;
break;
}
}
if (oneCol == null) {
throw new DdlException("Column[" + colName + "] not exists");
}
newSchema.add(oneCol);
if (colNameSet.contains(colName)) {
throw new DdlException("Reduplicative column[" + colName + "]");
} else {
colNameSet.add(colName);
}
}
if (newSchema.size() != targetIndexSchema.size()) {
throw new DdlException("Reorder stmt should contains all columns");
}
}
private void processReorderColumn(ReorderColumnsClause alterClause, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap) throws DdlException {
List<String> orderedColNames = alterClause.getColumnsByPos();
@ -533,6 +679,64 @@ public class SchemaChangeHandler extends AlterHandler {
indexSchemaMap.put(targetIndexId, newSchema);
}
/*
* Add 'newColumn' to specified index.
* Modified schema will be saved in 'indexSchemaMap'
*/
private void addColumnInternal(Column newColumn, ColumnPosition columnPos,
List<Column> modIndexSchema,
Set<String> newColNameSet) throws DdlException {
String newColName = newColumn.getName();
int posIndex = -1;
boolean hasPos = (columnPos != null && !columnPos.isFirst());
for (int i = 0; i < modIndexSchema.size(); i++) {
Column col = modIndexSchema.get(i);
if (col.getName().equalsIgnoreCase(newColName)) {
if (!newColNameSet.contains(newColName)) {
// if this is not a base index, we should check if user repeatedly add columns
throw new DdlException("Repeatedly add column: " + newColName);
}
// this is a base index, and the column we check here is added by previous 'add column clause'
// in same ALTER stmt.
// so here we will check if the 2 columns is exactly same. if not, throw exception
if (!col.equals(newColumn)) {
throw new DdlException("Repeatedly add same column with different definition: " + newColName);
}
// column already exist, return
return;
}
if (hasPos) {
// after the field
if (col.getName().equalsIgnoreCase(columnPos.getLastCol())) {
posIndex = i;
}
}
}
// check if lastCol was found
if (hasPos && posIndex == -1) {
throw new DdlException("Column[" + columnPos.getLastCol() + "] does not found");
}
// check if add to first
if (columnPos != null && columnPos.isFirst()) {
posIndex = -1;
hasPos = true;
}
if (hasPos) {
// key
modIndexSchema.add(posIndex + 1, newColumn);
} else {
modIndexSchema.add(newColumn);
}
checkRowLength(modIndexSchema);
}
/*
* Add 'newColumn' to specified index.
* Modified schema will be saved in 'indexSchemaMap'
@ -1469,6 +1673,39 @@ public class SchemaChangeHandler extends AlterHandler {
createJob(db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes);
}
@Override
public void processExternalTable(List<AlterClause> alterClauses, Database db, Table externalTable)
throws UserException {
// copy the external table schema columns
List<Column> newSchema = Lists.newArrayList();
newSchema.addAll(externalTable.getBaseSchema(true));
for (AlterClause alterClause : alterClauses) {
if (alterClause instanceof AddColumnClause) {
// add column
processAddColumn((AddColumnClause) alterClause, externalTable, newSchema);
} else if (alterClause instanceof AddColumnsClause) {
// add columns
processAddColumns((AddColumnsClause) alterClause, externalTable, newSchema);
} else if (alterClause instanceof DropColumnClause) {
// drop column and drop indexes on this column
processDropColumn((DropColumnClause) alterClause, externalTable, newSchema);
} else if (alterClause instanceof ModifyColumnClause) {
// modify column
processModifyColumn((ModifyColumnClause) alterClause, externalTable, newSchema);
} else if (alterClause instanceof ReorderColumnsClause) {
// reorder column
processReorderColumn((ReorderColumnsClause) alterClause, externalTable, newSchema);
} else {
Preconditions.checkState(false);
}
} // end for alter clauses
// replace the old column list
externalTable.setNewFullSchema(newSchema);
// refresh external table column in edit log
Catalog.getCurrentCatalog().refreshExternalTableSchema(db, externalTable, newSchema);
}
private void sendClearAlterTask(Database db, OlapTable olapTable) {
AgentBatchTask batchTask = new AgentBatchTask();
db.readLock();

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
@ -167,6 +168,23 @@ public class AlterTableStmt extends DdlStmt {
ops = clauses;
}
public void rewriteAlterClause(Table table) throws UserException {
List<AlterClause> clauses = new ArrayList<>();
for (AlterClause alterClause : ops) {
if (alterClause instanceof TableRenameClause ||
alterClause instanceof AddColumnClause ||
alterClause instanceof AddColumnsClause ||
alterClause instanceof DropColumnClause ||
alterClause instanceof ModifyColumnClause ||
alterClause instanceof ReorderColumnsClause) {
clauses.add(alterClause);
} else {
throw new AnalysisException( table.getType().toString() + " [" + table.getName() + "] " +
"do not support " + alterClause.getOpType().toString() + " clause now");
}
}
ops = clauses;
}
@Override
public String toSql() {

View File

@ -179,6 +179,7 @@ import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.PartitionPersistInfo;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.RefreshExternalTableInfo;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.persist.SetReplicaStatusOperationLog;
@ -4297,6 +4298,11 @@ public class Catalog {
}
}
public void replayAlterExteranlTableSchema(String dbName, String tableName, List<Column> newSchema) throws DdlException {
Database db = this.fullNameToDb.get(dbName);
db.allterExternalTableSchemaWithLock(tableName, newSchema);
}
private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
DistributionInfo distributionInfo, long version, long versionHash, short replicationNum,
TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException {
@ -5066,9 +5072,12 @@ public class Catalog {
}
// entry of rename table operation
public void renameTable(Database db, OlapTable table, TableRenameClause tableRenameClause) throws DdlException {
if (table.getState() != OlapTableState.NORMAL) {
throw new DdlException("Table[" + table.getName() + "] is under " + table.getState());
public void renameTable(Database db, Table table, TableRenameClause tableRenameClause) throws DdlException {
if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
if ( olapTable.getState() != OlapTableState.NORMAL) {
throw new DdlException("Table[" + olapTable.getName() + "] is under " + olapTable.getState());
}
}
String tableName = table.getName();
@ -5082,9 +5091,9 @@ public class Catalog {
throw new DdlException("Table name[" + newTableName + "] is already used");
}
table.checkAndSetName(newTableName, false);
table.setName(newTableName);
db.dropTable(table.getName());
db.dropTable(tableName);
db.createTable(table);
TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), newTableName);
@ -5092,6 +5101,13 @@ public class Catalog {
LOG.info("rename table[{}] to {}", tableName, newTableName);
}
public void refreshExternalTableSchema(Database db, Table table, List<Column> newSchema) {
RefreshExternalTableInfo refreshExternalTableInfo = new RefreshExternalTableInfo(db.getFullName(),
table.getName(), newSchema);
editLog.logRefreshExternalTableSchema(refreshExternalTableInfo);
LOG.info("refresh db[{}] table[{}] for schema change", db.getFullName(), table.getName());
}
public void replayRenameTable(TableInfo tableInfo) throws DdlException {
long dbId = tableInfo.getDbId();
long tableId = tableInfo.getTableId();
@ -5100,7 +5116,7 @@ public class Catalog {
Database db = getDb(dbId);
db.writeLock();
try {
OlapTable table = (OlapTable) db.getTable(tableId);
Table table = db.getTable(tableId);
String tableName = table.getName();
db.dropTable(tableName);
table.setName(newTableName);

View File

@ -305,6 +305,20 @@ public class Database extends MetaObject implements Writable {
}
}
public void allterExternalTableSchemaWithLock(String tableName, List<Column> newSchema) throws DdlException{
writeLock();
try {
if (!nameToTable.containsKey(tableName)) {
throw new DdlException("Do not contain proper table " + tableName + " in refresh table");
} else {
Table table = nameToTable.get(tableName);
table.setNewFullSchema(newSchema);
}
} finally {
writeUnlock();
}
}
public boolean createTable(Table table) {
boolean result = true;
String tableName = table.getName();

View File

@ -137,6 +137,10 @@ public class Table extends MetaObject implements Writable {
return name;
}
public void setName(String newName) {
name = newName;
}
public TableType getType() {
return type;
}
@ -149,6 +153,7 @@ public class Table extends MetaObject implements Writable {
public List<Column> getBaseSchema() {
return getBaseSchema(Util.showHiddenColumns());
}
public List<Column> getBaseSchema(boolean full) {
if (full) {
return fullSchema;

View File

@ -70,6 +70,7 @@ import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.PartitionPersistInfo;
import org.apache.doris.persist.PrivInfo;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.RefreshExternalTableInfo;
import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplaceTableOperationLog;
@ -176,6 +177,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA: {
data = RefreshExternalTableInfo.read(in);
isRead = true;
break;
}
case OperationType.OP_ADD_PARTITION: {
data = new PartitionPersistInfo();
((PartitionPersistInfo) data).readFields(in);

View File

@ -40,6 +40,7 @@ import org.apache.doris.persist.ModifyPartitionInfo;
import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.PartitionPersistInfo;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.RefreshExternalTableInfo;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.persist.Storage;
import org.apache.doris.persist.TableInfo;
@ -235,6 +236,11 @@ public final class LocalJournalCursor implements JournalCursor {
ret.setData(info);
break;
}
case OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA: {
RefreshExternalTableInfo info = RefreshExternalTableInfo.read(in);
ret.setData(info);
break;
}
case OperationType.OP_ADD_PARTITION: {
PartitionPersistInfo info = new PartitionPersistInfo();
info.readFields(in);

View File

@ -180,6 +180,13 @@ public class EditLog {
catalog.replayCreateTable(info.getDbName(), info.getTable());
break;
}
case OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA: {
RefreshExternalTableInfo info = (RefreshExternalTableInfo) journal.getData();
LOG.info("Begin to unprotect alter external table schema. db = "
+ info.getDbName() + " table = " + info.getTableName());
catalog.replayAlterExteranlTableSchema(info.getDbName(), info.getTableName(), info.getNewSchema());
break;
}
case OperationType.OP_DROP_TABLE: {
DropInfo info = (DropInfo) journal.getData();
Database db = catalog.getDb(info.getDbId());
@ -924,6 +931,10 @@ public class EditLog {
logEdit(OperationType.OP_CREATE_TABLE, info);
}
public void logRefreshExternalTableSchema(RefreshExternalTableInfo info) {
logEdit(OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA, info);
}
public void logAddPartition(PartitionPersistInfo info) {
logEdit(OperationType.OP_ADD_PARTITION, info);
}

View File

@ -183,4 +183,7 @@ public class OperationType {
// resource 276~290
public static final short OP_CREATE_RESOURCE = 276;
public static final short OP_DROP_RESOURCE = 277;
// alter external table
public static final short OP_ALTER_EXTERNAL_TABLE_SCHEMA = 280;
}

View File

@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import com.google.gson.annotations.SerializedName;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.io.Text;
import org.apache.doris.persist.gson.GsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
public class RefreshExternalTableInfo implements Writable {
public static final Logger LOG = LoggerFactory.getLogger(RefreshExternalTableInfo.class);
@SerializedName(value = "dbName")
private String dbName;
@SerializedName(value = "tableName")
private String tableName;
@SerializedName(value = "newSchema")
private List<Column> newSchema;
public RefreshExternalTableInfo() {
// for persist
}
public RefreshExternalTableInfo(String dbName, String tableName, List<Column> newSchema) {
this.dbName = dbName;
this.tableName = tableName;
this.newSchema = newSchema;
}
public String getDbName() {
return dbName;
}
public String getTableName() {
return tableName;
}
public List<Column> getNewSchema() {
return newSchema;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static RefreshExternalTableInfo read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, RefreshExternalTableInfo.class);
}
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof RefreshExternalTableInfo)) {
return false;
}
RefreshExternalTableInfo info = (RefreshExternalTableInfo) obj;
return (dbName.equals(info.dbName))
&& (tableName.equals(info.tableName)) && (newSchema.equals(newSchema));
}
}

View File

@ -27,7 +27,10 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
@ -140,6 +143,25 @@ public class AlterTest {
")\n" +
"DISTRIBUTED BY HASH(k2) BUCKETS 3\n" +
"PROPERTIES('replication_num' = '1');");
Config.enable_odbc_table = true;
createTable("create external table test.odbc_table\n" +
"( `k1` bigint(20) COMMENT \"\",\n" +
" `k2` datetime COMMENT \"\",\n" +
" `k3` varchar(20) COMMENT \"\",\n" +
" `k4` varchar(100) COMMENT \"\",\n" +
" `k5` float COMMENT \"\"\n" +
")ENGINE=ODBC\n" +
"PROPERTIES (\n" +
"\"host\" = \"127.0.0.1\",\n" +
"\"port\" = \"3306\",\n" +
"\"user\" = \"root\",\n" +
"\"password\" = \"123\",\n" +
"\"database\" = \"db1\",\n" +
"\"table\" = \"tbl1\",\n" +
"\"driver\" = \"Oracle Driver\",\n" +
"\"odbc_type\" = \"oracle\"\n" +
");");
}
@AfterClass
@ -548,4 +570,76 @@ public class AlterTest {
Assert.assertNotNull(replace3.getIndexIdByName("r1"));
Assert.assertNotNull(replace3.getIndexIdByName("r2"));
}
public void testExternalTableAlterOperations() throws Exception {
// external table do not support partition operation
String stmt = "alter table test.odbc_table add partition p3 values less than('2020-04-01'), add partition p4 values less than('2020-05-01')";
alterTable(stmt, true);
// external table do not support rollup
stmt = "alter table test.odbc_table add rollup r1 (k1)";
alterTable(stmt, true);
// external table support add column
stmt = "alter table test.odbc_table add column k6 INT KEY after k1, add column k7 TINYINT KEY after k6";
alterTable(stmt, false);
Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
Table odbc_table = db.getTable("odbc_table");
Assert.assertEquals(odbc_table.getBaseSchema().size(), 7);
Assert.assertEquals(odbc_table.getBaseSchema().get(1).getDataType(), PrimitiveType.INT);
Assert.assertEquals(odbc_table.getBaseSchema().get(2).getDataType(), PrimitiveType.TINYINT);
// external table support drop column
stmt = "alter table test.odbc_table drop column k7";
alterTable(stmt, false);
db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
odbc_table = db.getTable("odbc_table");
Assert.assertEquals(odbc_table.getBaseSchema().size(), 6);
// external table support modify column
stmt = "alter table test.odbc_table modify column k6 bigint after k5";
alterTable(stmt, false);
db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
odbc_table = db.getTable("odbc_table");
Assert.assertEquals(odbc_table.getBaseSchema().size(), 6);
Assert.assertEquals(odbc_table.getBaseSchema().get(5).getDataType(), PrimitiveType.BIGINT);
// external table support reorder column
db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
odbc_table = db.getTable("odbc_table");
Assert.assertTrue(odbc_table.getBaseSchema().stream().
map(column -> column.getName()).
reduce("", (totalName, columnName) -> totalName + columnName).equals("k1k2k3k4k5k6"));
stmt = "alter table test.odbc_table order by (k6, k5, k4, k3, k2, k1)";
alterTable(stmt, false);
Assert.assertTrue(odbc_table.getBaseSchema().stream().
map(column -> column.getName()).
reduce("", (totalName, columnName) -> totalName + columnName).equals("k6k5k4k3k2k1"));
// external table support drop column
stmt = "alter table test.odbc_table drop column k6";
alterTable(stmt, false);
stmt = "alter table test.odbc_table drop column k5";
alterTable(stmt, false);
stmt = "alter table test.odbc_table drop column k4";
alterTable(stmt, false);
stmt = "alter table test.odbc_table drop column k3";
alterTable(stmt, false);
stmt = "alter table test.odbc_table drop column k2";
alterTable(stmt, false);
// do not allow drop last column
Assert.assertEquals(odbc_table.getBaseSchema().size(), 1);
stmt = "alter table test.odbc_table drop column k1";
alterTable(stmt, true);
Assert.assertEquals(odbc_table.getBaseSchema().size(), 1);
// external table support rename operation
stmt = "alter table test.odbc_table rename oracle_table";
alterTable(stmt, false);
db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
odbc_table = db.getTable("oracle_table");
Assert.assertTrue(odbc_table != null);
odbc_table = db.getTable("odbc_table");
Assert.assertTrue(odbc_table == null);
}
}

View File

@ -0,0 +1,100 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.doris.catalog.FakeCatalog;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.FeConstants;
public class RefreshExternalTableInfoTest {
private Catalog catalog;
private FakeCatalog fakeCatalog;
@Before
public void setUp() {
fakeCatalog = new FakeCatalog();
catalog = Deencapsulation.newInstance(Catalog.class);
FakeCatalog.setCatalog(catalog);
FakeCatalog.setMetaVersion(FeConstants.meta_version);
}
@Test
public void testSerialization() throws Exception {
// 1. Write objects to file
File file = new File("./RefreshExteranlTableInfo");
file.createNewFile();
DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));
List<Column> columns = new ArrayList<Column>();
Column column2 = new Column("column2",
ScalarType.createType(PrimitiveType.TINYINT), false, AggregateType.MIN, "", "");
columns.add(column2);
columns.add(new Column("column3",
ScalarType.createType(PrimitiveType.SMALLINT), false, AggregateType.SUM, "", ""));
columns.add(new Column("column4",
ScalarType.createType(PrimitiveType.INT), false, AggregateType.REPLACE, "", ""));
columns.add(new Column("column5",
ScalarType.createType(PrimitiveType.BIGINT), false, AggregateType.REPLACE, "", ""));
columns.add(new Column("column6",
ScalarType.createType(PrimitiveType.FLOAT), false, AggregateType.REPLACE, "", ""));
columns.add(new Column("column7",
ScalarType.createType(PrimitiveType.DOUBLE), false, AggregateType.REPLACE, "", ""));
columns.add(new Column("column8", ScalarType.createChar(10), true, null, "", ""));
columns.add(new Column("column9", ScalarType.createVarchar(10), true, null, "", ""));
columns.add(new Column("column10", ScalarType.createType(PrimitiveType.DATE), true, null, "", ""));
columns.add(new Column("column11", ScalarType.createType(PrimitiveType.DATETIME), true, null, "", ""));
RefreshExternalTableInfo info = new RefreshExternalTableInfo("db1", "table1", columns);
info.write(dos);
dos.flush();
dos.close();
// 2. Read objects from file
DataInputStream dis = new DataInputStream(new FileInputStream(file));
RefreshExternalTableInfo rInfo1 = RefreshExternalTableInfo.read(dis);
Assert.assertTrue(rInfo1.getDbName().equals(info.getDbName()));
Assert.assertTrue(rInfo1.getTableName().equals(info.getTableName()));
Assert.assertTrue(rInfo1.getNewSchema().equals(info.getNewSchema()));
// 3. delete files
dis.close();
file.delete();
}
}