[Feature] [Vectorized] Some pre-refactorings or interface additions for schema change part2 (#10003)
This commit is contained in:
@ -19,10 +19,13 @@ package org.apache.doris.alter;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.CreateMaterializedViewStmt;
|
||||
import org.apache.doris.analysis.DescriptorTable;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.MVColumnItem;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SqlParser;
|
||||
import org.apache.doris.analysis.SqlScanner;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
@ -363,14 +366,22 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
List<Column> fullSchema = tbl.getBaseSchema(true);
|
||||
DescriptorTable descTable = new DescriptorTable();
|
||||
for (Column column : fullSchema) {
|
||||
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
|
||||
SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
|
||||
destSlotDesc.setIsMaterialized(true);
|
||||
destSlotDesc.setColumn(column);
|
||||
destSlotDesc.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
List<Replica> rollupReplicas = rollupTablet.getReplicas();
|
||||
for (Replica rollupReplica : rollupReplicas) {
|
||||
AlterReplicaTask rollupTask = new AlterReplicaTask(
|
||||
rollupReplica.getBackendId(), dbId, tableId, partitionId,
|
||||
rollupIndexId, baseIndexId,
|
||||
rollupTabletId, baseTabletId, rollupReplica.getId(),
|
||||
rollupSchemaHash, baseSchemaHash,
|
||||
visibleVersion, jobId, JobType.ROLLUP, defineExprs);
|
||||
AlterReplicaTask rollupTask = new AlterReplicaTask(rollupReplica.getBackendId(), dbId, tableId,
|
||||
partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId,
|
||||
rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId,
|
||||
JobType.ROLLUP, defineExprs, descTable);
|
||||
rollupBatchTask.addTask(rollupTask);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,11 @@
|
||||
|
||||
package org.apache.doris.alter;
|
||||
|
||||
import org.apache.doris.analysis.DescriptorTable;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
@ -375,13 +380,21 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
|
||||
tbl.readLock();
|
||||
try {
|
||||
Map<String, Column> indexColumnMap = Maps.newHashMap();
|
||||
for (Map.Entry<Long, List<Column>> entry : indexSchemaMap.entrySet()) {
|
||||
for (Column column : entry.getValue()) {
|
||||
indexColumnMap.put(column.getName(), column);
|
||||
}
|
||||
}
|
||||
|
||||
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
|
||||
|
||||
for (long partitionId : partitionIndexMap.rowKeySet()) {
|
||||
Partition partition = tbl.getPartition(partitionId);
|
||||
Preconditions.checkNotNull(partition, partitionId);
|
||||
|
||||
// the schema change task will transform the data before visible version(included).
|
||||
// the schema change task will transform the data before visible
|
||||
// version(included).
|
||||
long visibleVersion = partition.getVisibleVersion();
|
||||
|
||||
Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
|
||||
@ -389,6 +402,32 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
long shadowIdxId = entry.getKey();
|
||||
MaterializedIndex shadowIdx = entry.getValue();
|
||||
|
||||
Map<String, Expr> defineExprs = Maps.newHashMap();
|
||||
|
||||
List<Column> fullSchema = tbl.getBaseSchema(true);
|
||||
DescriptorTable descTable = new DescriptorTable();
|
||||
for (Column column : fullSchema) {
|
||||
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
|
||||
SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
|
||||
destSlotDesc.setIsMaterialized(true);
|
||||
destSlotDesc.setColumn(column);
|
||||
destSlotDesc.setIsNullable(column.isAllowNull());
|
||||
|
||||
if (indexColumnMap.containsKey(SchemaChangeHandler.SHADOW_NAME_PRFIX + column.getName())) {
|
||||
Column newColumn = indexColumnMap
|
||||
.get(SchemaChangeHandler.SHADOW_NAME_PRFIX + column.getName());
|
||||
if (newColumn.getType() != column.getType()) {
|
||||
try {
|
||||
defineExprs.put(column.getName(),
|
||||
new SlotRef(destSlotDesc).castTo(newColumn.getType()));
|
||||
} catch (AnalysisException e) {
|
||||
throw new AlterCancelException(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
long originIdxId = indexIdMap.get(shadowIdxId);
|
||||
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
|
||||
int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));
|
||||
@ -398,12 +437,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
long originTabletId = partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId);
|
||||
List<Replica> shadowReplicas = shadowTablet.getReplicas();
|
||||
for (Replica shadowReplica : shadowReplicas) {
|
||||
AlterReplicaTask rollupTask = new AlterReplicaTask(
|
||||
shadowReplica.getBackendId(), dbId, tableId, partitionId,
|
||||
shadowIdxId, originIdxId,
|
||||
shadowTabletId, originTabletId, shadowReplica.getId(),
|
||||
shadowSchemaHash, originSchemaHash,
|
||||
visibleVersion, jobId, JobType.SCHEMA_CHANGE);
|
||||
AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId,
|
||||
tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId,
|
||||
shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId,
|
||||
JobType.SCHEMA_CHANGE, defineExprs, descTable);
|
||||
schemaChangeBatchTask.addTask(rollupTask);
|
||||
}
|
||||
}
|
||||
|
||||
@ -800,11 +800,8 @@ public class Analyzer {
|
||||
}
|
||||
result = globalState.descTbl.addSlotDescriptor(d);
|
||||
result.setColumn(col);
|
||||
if (col.isAllowNull() || isOuterJoined(d.getId())) {
|
||||
result.setIsNullable(true);
|
||||
} else {
|
||||
result.setIsNullable(false);
|
||||
}
|
||||
result.setIsNullable(col.isAllowNull() || isOuterJoined(d.getId()));
|
||||
|
||||
slotRefMap.put(key, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -352,11 +352,7 @@ public class InsertStmt extends DdlStmt {
|
||||
slotDesc.setIsMaterialized(true);
|
||||
slotDesc.setType(col.getType());
|
||||
slotDesc.setColumn(col);
|
||||
if (col.isAllowNull()) {
|
||||
slotDesc.setIsNullable(true);
|
||||
} else {
|
||||
slotDesc.setIsNullable(false);
|
||||
}
|
||||
slotDesc.setIsNullable(col.isAllowNull());
|
||||
}
|
||||
// will use it during create load job
|
||||
indexIdToSchemaHash = olapTable.getIndexIdToSchemaHash();
|
||||
|
||||
@ -112,11 +112,7 @@ public class LoadingTaskPlanner {
|
||||
SlotDescriptor slotDesc = descTable.addSlotDescriptor(destTupleDesc);
|
||||
slotDesc.setIsMaterialized(true);
|
||||
slotDesc.setColumn(col);
|
||||
if (col.isAllowNull()) {
|
||||
slotDesc.setIsNullable(true);
|
||||
} else {
|
||||
slotDesc.setIsNullable(false);
|
||||
}
|
||||
slotDesc.setIsNullable(col.isAllowNull());
|
||||
}
|
||||
|
||||
// Generate plan trees
|
||||
|
||||
@ -881,11 +881,7 @@ public class SparkLoadJob extends BulkLoadJob {
|
||||
SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
|
||||
destSlotDesc.setIsMaterialized(true);
|
||||
destSlotDesc.setColumn(column);
|
||||
if (column.isAllowNull()) {
|
||||
destSlotDesc.setIsNullable(true);
|
||||
} else {
|
||||
destSlotDesc.setIsNullable(false);
|
||||
}
|
||||
destSlotDesc.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
initTBrokerScanRange(descTable, destTupleDesc, columns, brokerDesc);
|
||||
initTDescriptorTable(descTable);
|
||||
|
||||
@ -113,11 +113,7 @@ public class UpdatePlanner extends Planner {
|
||||
slotDesc.setIsMaterialized(true);
|
||||
slotDesc.setType(col.getType());
|
||||
slotDesc.setColumn(col);
|
||||
if (col.isAllowNull()) {
|
||||
slotDesc.setIsNullable(true);
|
||||
} else {
|
||||
slotDesc.setIsNullable(false);
|
||||
}
|
||||
slotDesc.setIsNullable(col.isAllowNull());
|
||||
}
|
||||
targetTupleDesc.computeStatAndMemLayout();
|
||||
return targetTupleDesc;
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.task;
|
||||
|
||||
import org.apache.doris.alter.AlterJobV2;
|
||||
import org.apache.doris.analysis.DescriptorTable;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.thrift.TAlterMaterializedViewParam;
|
||||
@ -46,21 +47,16 @@ public class AlterReplicaTask extends AgentTask {
|
||||
private AlterJobV2.JobType jobType;
|
||||
|
||||
private Map<String, Expr> defineExprs;
|
||||
private DescriptorTable descTable;
|
||||
|
||||
public AlterReplicaTask(long backendId, long dbId, long tableId,
|
||||
long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId,
|
||||
long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash,
|
||||
long version, long jobId, AlterJobV2.JobType jobType) {
|
||||
this(backendId, dbId, tableId, partitionId,
|
||||
rollupIndexId, baseIndexId, rollupTabletId,
|
||||
baseTabletId, newReplicaId, newSchemaHash, baseSchemaHash,
|
||||
version, jobId, jobType, null);
|
||||
}
|
||||
|
||||
public AlterReplicaTask(long backendId, long dbId, long tableId,
|
||||
long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId,
|
||||
long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash,
|
||||
long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs) {
|
||||
/**
|
||||
* AlterReplicaTask constructor.
|
||||
*
|
||||
*/
|
||||
public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId,
|
||||
long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash,
|
||||
int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs,
|
||||
DescriptorTable descTable) {
|
||||
super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId);
|
||||
|
||||
this.baseTabletId = baseTabletId;
|
||||
@ -74,6 +70,7 @@ public class AlterReplicaTask extends AgentTask {
|
||||
|
||||
this.jobType = jobType;
|
||||
this.defineExprs = defineExprs;
|
||||
this.descTable = descTable;
|
||||
}
|
||||
|
||||
public long getBaseTabletId() {
|
||||
@ -117,6 +114,7 @@ public class AlterReplicaTask extends AgentTask {
|
||||
req.addToMaterializedViewParams(mvParam);
|
||||
}
|
||||
}
|
||||
req.setDescTbl(descTable.toThrift());
|
||||
return req;
|
||||
}
|
||||
}
|
||||
|
||||
@ -182,11 +182,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
@ -230,11 +226,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
@ -259,11 +251,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
@ -288,11 +276,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
new Expectations() {
|
||||
@ -332,11 +316,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
new Expectations() {
|
||||
@ -379,11 +359,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
new Expectations() {
|
||||
@ -434,11 +410,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
@ -464,11 +436,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
TStreamLoadPutRequest request = getBaseRequest();
|
||||
@ -494,11 +462,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
new Expectations() {
|
||||
@ -548,11 +512,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
new Expectations() {
|
||||
@ -594,11 +554,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
new Expectations() {
|
||||
@ -646,11 +602,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
new Expectations() {
|
||||
@ -701,11 +653,7 @@ public class StreamLoadScanNodeTest {
|
||||
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
new Expectations() {
|
||||
@ -757,11 +705,7 @@ public class StreamLoadScanNodeTest {
|
||||
System.out.println(column);
|
||||
slot.setColumn(column);
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
new Expectations() {
|
||||
@ -823,11 +767,7 @@ public class StreamLoadScanNodeTest {
|
||||
slot.setColumn(column);
|
||||
|
||||
slot.setIsMaterialized(true);
|
||||
if (column.isAllowNull()) {
|
||||
slot.setIsNullable(true);
|
||||
} else {
|
||||
slot.setIsNullable(false);
|
||||
}
|
||||
slot.setIsNullable(column.isAllowNull());
|
||||
}
|
||||
|
||||
new Expectations() {
|
||||
|
||||
Reference in New Issue
Block a user