diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 9e7e2f656d..22e9dca39c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -46,7 +46,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -636,10 +635,6 @@ public class DataDescription implements InsertStmt.DataDesc { return lineDelimiter; } - public void setLineDelimiter(Separator lineDelimiter) { - this.lineDelimiter = lineDelimiter; - } - public byte getEnclose() { return enclose; } @@ -716,14 +711,6 @@ public class DataDescription implements InsertStmt.DataDesc { this.jsonRoot = jsonRoot; } - @Deprecated - public void addColumnMapping(String functionName, Pair> pair) { - if (Strings.isNullOrEmpty(functionName) || pair == null) { - return; - } - columnToHadoopFunction.put(functionName, pair); - } - public Map>> getColumnToHadoopFunction() { return columnToHadoopFunction; } @@ -1143,53 +1130,6 @@ public class DataDescription implements InsertStmt.DataDesc { } } - /* - * If user does not specify COLUMNS in load stmt, we fill it here. - * eg1: - * both COLUMNS and SET clause is empty. after fill: - * (k1,k2,k3) - * - * eg2: - * COLUMNS is empty, SET is not empty - * SET ( k2 = default_value("2") ) - * after fill: - * (k1, k2, k3) - * SET ( k2 = default_value("2") ) - * - * eg3: - * COLUMNS is empty, SET is not empty - * SET (k2 = strftime("%Y-%m-%d %H:%M:%S", k2) - * after fill: - * (k1,k2,k3) - * SET (k2 = strftime("%Y-%m-%d %H:%M:%S", k2) - * - */ - public void fillColumnInfoIfNotSpecified(List baseSchema) { - if (fileFieldNames != null && !fileFieldNames.isEmpty()) { - return; - } - - fileFieldNames = Lists.newArrayList(); - - Set mappingColNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); - for (ImportColumnDesc importColumnDesc : parsedColumnExprList) { - mappingColNames.add(importColumnDesc.getColumnName()); - } - - for (Column column : baseSchema) { - if (!mappingColNames.contains(column.getName())) { - parsedColumnExprList.add(new ImportColumnDesc(column.getName(), null)); - } - if ("json".equals(this.fileFormat)) { - fileFieldNames.add(column.getName()); - } else { - fileFieldNames.add(column.getName().toLowerCase()); - } - } - - LOG.debug("after fill column info. columns: {}, parsed column exprs: {}", fileFieldNames, parsedColumnExprList); - } - public String toSql() { StringBuilder sb = new StringBuilder(); if (isMysqlLoad) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index c193ab8f77..332c82e361 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -21,9 +21,7 @@ import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.PartitionNames; -import org.apache.doris.analysis.Separator; import org.apache.doris.catalog.AggregateType; -import org.apache.doris.catalog.BrokerTable; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.HiveTable; @@ -32,14 +30,12 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Table; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.thrift.TFileCompressType; -import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -95,9 +91,6 @@ public class BrokerFileGroup implements Writable { private long srcTableId = -1; private boolean isLoadFromTable = false; - // for multi load - private TNetworkAddress beAddr; - private long backendID; private boolean stripOuterArray = false; private String jsonPaths = ""; private String jsonRoot = ""; @@ -115,41 +108,6 @@ public class BrokerFileGroup implements Writable { private BrokerFileGroup() { } - // Used for broker table, no need to parse - public BrokerFileGroup(BrokerTable table) throws AnalysisException { - this.tableId = table.getId(); - this.columnSeparator = Separator.convertSeparator(table.getColumnSeparator()); - this.lineDelimiter = Separator.convertSeparator(table.getLineDelimiter()); - this.isNegative = false; - this.filePaths = table.getPaths(); - this.fileFormat = table.getFileFormat(); - } - - /** - * Should used for hive/iceberg/hudi external table. - */ - public BrokerFileGroup(long tableId, - String filePath, - String fileFormat) throws AnalysisException { - this(tableId, "|", "\n", filePath, fileFormat, null, null); - } - - /** - * Should used for hive/iceberg/hudi external table. - */ - public BrokerFileGroup(long tableId, String columnSeparator, String lineDelimiter, String filePath, - String fileFormat, List columnNamesFromPath, List columnExprList) - throws AnalysisException { - this.tableId = tableId; - this.columnSeparator = Separator.convertSeparator(columnSeparator); - this.lineDelimiter = Separator.convertSeparator(lineDelimiter); - this.isNegative = false; - this.filePaths = Lists.newArrayList(filePath); - this.fileFormat = fileFormat; - this.columnNamesFromPath = columnNamesFromPath; - this.columnExprList = columnExprList; - } - public BrokerFileGroup(DataDescription dataDescription) { this.fileFieldNames = dataDescription.getFileFieldNames(); this.columnNamesFromPath = dataDescription.getColumnsFromPath(); @@ -251,8 +209,6 @@ public class BrokerFileGroup implements Writable { srcTableId = srcTable.getId(); isLoadFromTable = true; } - beAddr = dataDescription.getBeAddr(); - backendID = dataDescription.getBackendId(); if (fileFormat != null && fileFormat.equalsIgnoreCase("json")) { stripOuterArray = dataDescription.isStripOuterArray(); jsonPaths = dataDescription.getJsonPaths(); @@ -363,62 +319,30 @@ public class BrokerFileGroup implements Writable { this.fileSize = fileSize; } - public TNetworkAddress getBeAddr() { - return beAddr; - } - - public long getBackendID() { - return backendID; - } - public boolean isStripOuterArray() { return stripOuterArray; } - public void setStripOuterArray(boolean stripOuterArray) { - this.stripOuterArray = stripOuterArray; - } - public boolean isFuzzyParse() { return fuzzyParse; } - public void setFuzzyParse(boolean fuzzyParse) { - this.fuzzyParse = fuzzyParse; - } - public boolean isReadJsonByLine() { return readJsonByLine; } - public void setReadJsonByLine(boolean readJsonByLine) { - this.readJsonByLine = readJsonByLine; - } - public boolean isNumAsString() { return numAsString; } - public void setNumAsString(boolean numAsString) { - this.numAsString = numAsString; - } - public String getJsonPaths() { return jsonPaths; } - public void setJsonPaths(String jsonPaths) { - this.jsonPaths = jsonPaths; - } - public String getJsonRoot() { return jsonRoot; } - public void setJsonRoot(String jsonRoot) { - this.jsonRoot = jsonRoot; - } - public boolean isBinaryFileFormat() { if (fileFormat == null) { // null means default: csv diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index f4bc0f089b..a3e25cde6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -30,21 +30,16 @@ import org.apache.doris.analysis.FunctionParams; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.IsNullPredicate; import org.apache.doris.analysis.NullLiteral; -import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ScalarType; @@ -59,8 +54,6 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; @@ -80,7 +73,6 @@ import org.apache.doris.thrift.TEtlState; import org.apache.doris.thrift.TFileFormatType; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -91,7 +83,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -207,282 +198,6 @@ public class Load { lock.writeLock().unlock(); } - /* - * This is only used for hadoop load - */ - public static void checkAndCreateSource(Database db, DataDescription dataDescription, - Map>> tableToPartitionSources, EtlJobType jobType) throws DdlException { - Source source = new Source(dataDescription.getFilePaths()); - long tableId = -1; - Set sourcePartitionIds = Sets.newHashSet(); - - // source column names and partitions - String tableName = dataDescription.getTableName(); - Map>> columnToFunction = null; - - OlapTable table = db.getOlapTableOrDdlException(tableName); - tableId = table.getId(); - - table.readLock(); - try { - if (table.getPartitionInfo().isMultiColumnPartition() && jobType == EtlJobType.HADOOP) { - throw new DdlException("Load by hadoop cluster does not support table with multi partition columns." - + " Table: " + table.getName() + ". Try using broker load. See 'help broker load;'"); - } - - // check partition - if (dataDescription.getPartitionNames() != null - && table.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) { - ErrorReport.reportDdlException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED); - } - - if (table.getState() == OlapTableState.RESTORE) { - throw new DdlException("Table [" + tableName + "] is under restore"); - } - - if (table.getKeysType() != KeysType.AGG_KEYS && dataDescription.isNegative()) { - throw new DdlException("Load for AGG_KEYS table should not specify NEGATIVE"); - } - - // get table schema - List baseSchema = table.getBaseSchema(false); - // fill the column info if user does not specify them - dataDescription.fillColumnInfoIfNotSpecified(baseSchema); - - // source columns - List columnNames = Lists.newArrayList(); - List assignColumnNames = Lists.newArrayList(); - if (dataDescription.getFileFieldNames() != null) { - assignColumnNames.addAll(dataDescription.getFileFieldNames()); - if (dataDescription.getColumnsFromPath() != null) { - assignColumnNames.addAll(dataDescription.getColumnsFromPath()); - } - } - if (assignColumnNames.isEmpty()) { - // use table columns - for (Column column : baseSchema) { - columnNames.add(column.getName()); - } - } else { - // convert column to schema format - for (String assignCol : assignColumnNames) { - if (table.getColumn(assignCol) != null) { - columnNames.add(table.getColumn(assignCol).getName()); - } else { - columnNames.add(assignCol); - } - } - } - source.setColumnNames(columnNames); - - // check default value - Map>> columnToHadoopFunction - = dataDescription.getColumnToHadoopFunction(); - List parsedColumnExprList = dataDescription.getParsedColumnExprList(); - Map parsedColumnExprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - for (ImportColumnDesc importColumnDesc : parsedColumnExprList) { - parsedColumnExprMap.put(importColumnDesc.getColumnName(), importColumnDesc.getExpr()); - } - for (Column column : baseSchema) { - String columnName = column.getName(); - if (columnNames.contains(columnName)) { - continue; - } - - if (parsedColumnExprMap.containsKey(columnName)) { - continue; - } - - if (column.getDefaultValue() != null || column.isAllowNull()) { - continue; - } - - throw new DdlException("Column has no default value. column: " + columnName); - } - - // check negative for sum aggregate type - if (dataDescription.isNegative()) { - for (Column column : baseSchema) { - if (!column.isKey() && column.getAggregationType() != AggregateType.SUM) { - throw new DdlException("Column is not SUM AggregateType. column:" + column.getName()); - } - } - } - - // check hll - for (Column column : baseSchema) { - if (column.getDataType() == PrimitiveType.HLL) { - if (columnToHadoopFunction != null && !columnToHadoopFunction.containsKey(column.getName())) { - throw new DdlException("Hll column is not assigned. column:" + column.getName()); - } - } - } - - // check mapping column exist in table - // check function - // convert mapping column and func arg columns to schema format - - // When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in - // their names. These columns are invisible to user, but we need to generate data for these columns. - // So we add column mappings for these column. - // eg1: - // base schema is (A, B, C), and B is under schema change, - // so there will be a shadow column: '__doris_shadow_B' - // So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B)); - for (Column column : table.getFullSchema()) { - if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX)) { - String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX); - if (parsedColumnExprMap.containsKey(originCol)) { - Expr mappingExpr = parsedColumnExprMap.get(originCol); - if (mappingExpr != null) { - /* - * eg: - * (A, C) SET (B = func(xx)) - * -> - * (A, C) SET (B = func(xx), __doris_shadow_B = func(xxx)) - */ - if (columnToHadoopFunction.containsKey(originCol)) { - columnToHadoopFunction.put(column.getName(), columnToHadoopFunction.get(originCol)); - } - ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), mappingExpr); - parsedColumnExprList.add(importColumnDesc); - } else { - /* - * eg: - * (A, B, C) - * -> - * (A, B, C) SET (__doris_shadow_B = substitute(B)) - */ - columnToHadoopFunction.put(column.getName(), - Pair.of("substitute", Lists.newArrayList(originCol))); - ImportColumnDesc importColumnDesc - = new ImportColumnDesc(column.getName(), new SlotRef(null, originCol)); - parsedColumnExprList.add(importColumnDesc); - } - } else { - /* - * There is a case that if user does not specify the related origin column, eg: - * COLUMNS (A, C), and B is not specified, but B is being modified - * so there is a shadow column '__doris_shadow_B'. - * We can not just add a mapping function "__doris_shadow_B = substitute(B)", - * because Doris can not find column B. - * In this case, __doris_shadow_B can use its default value, - * so no need to add it to column mapping - */ - // do nothing - } - - } else if (!column.isVisible()) { - /* - * For batch delete table add hidden column __DORIS_DELETE_SIGN__ to columns - * eg: - * (A, B, C) - * -> - * (A, B, C) SET (__DORIS_DELETE_SIGN__ = 0) - */ - columnToHadoopFunction.put(column.getName(), Pair.of("default_value", - Lists.newArrayList(column.getDefaultValue()))); - ImportColumnDesc importColumnDesc = null; - try { - importColumnDesc = new ImportColumnDesc(column.getName(), - new FunctionCallExpr("default_value", Arrays.asList(column.getDefaultValueExpr()))); - } catch (AnalysisException e) { - throw new DdlException(e.getMessage()); - } - parsedColumnExprList.add(importColumnDesc); - } - } - - LOG.debug("after add shadow column. parsedColumnExprList: {}, columnToHadoopFunction: {}", - parsedColumnExprList, columnToHadoopFunction); - - Map columnNameMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - for (String columnName : columnNames) { - columnNameMap.put(columnName, columnName); - } - - // validate hadoop functions - if (columnToHadoopFunction != null) { - columnToFunction = Maps.newHashMap(); - for (Entry>> entry : columnToHadoopFunction.entrySet()) { - String mappingColumnName = entry.getKey(); - Column mappingColumn = table.getColumn(mappingColumnName); - if (mappingColumn == null) { - throw new DdlException("Mapping column is not in table. column: " + mappingColumnName); - } - - Pair> function = entry.getValue(); - try { - DataDescription.validateMappingFunction(function.first, function.second, columnNameMap, - mappingColumn, dataDescription.isHadoopLoad()); - } catch (AnalysisException e) { - throw new DdlException(e.getMessage()); - } - - columnToFunction.put(mappingColumn.getName(), function); - } - } - - // partitions of this source - OlapTable olapTable = table; - PartitionNames partitionNames = dataDescription.getPartitionNames(); - if (partitionNames == null) { - for (Partition partition : olapTable.getPartitions()) { - sourcePartitionIds.add(partition.getId()); - } - } else { - for (String partitionName : partitionNames.getPartitionNames()) { - Partition partition = olapTable.getPartition(partitionName, partitionNames.isTemp()); - if (partition == null) { - throw new DdlException("Partition [" + partitionName + "] does not exist"); - } - sourcePartitionIds.add(partition.getId()); - } - } - } finally { - table.readUnlock(); - } - - // column separator - String columnSeparator = dataDescription.getColumnSeparator(); - if (!Strings.isNullOrEmpty(columnSeparator)) { - source.setColumnSeparator(columnSeparator); - } - - // line delimiter - String lineDelimiter = dataDescription.getLineDelimiter(); - if (!Strings.isNullOrEmpty(lineDelimiter)) { - source.setLineDelimiter(lineDelimiter); - } - - // source negative - source.setNegative(dataDescription.isNegative()); - - // column mapping functions - if (columnToFunction != null) { - source.setColumnToFunction(columnToFunction); - } - - // add source to table partition map - Map> partitionToSources = null; - if (tableToPartitionSources.containsKey(tableId)) { - partitionToSources = tableToPartitionSources.get(tableId); - } else { - partitionToSources = Maps.newHashMap(); - tableToPartitionSources.put(tableId, partitionToSources); - } - for (long partitionId : sourcePartitionIds) { - List sources = null; - if (partitionToSources.containsKey(partitionId)) { - sources = partitionToSources.get(partitionId); - } else { - sources = new ArrayList(); - partitionToSources.put(partitionId, sources); - } - sources.add(source); - } - } - /** * When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in * their names. These columns are invisible to user, but we need to generate data for these columns. @@ -762,10 +477,10 @@ public class Load { if (entry.getKey() != null) { if (entry.getKey().equalsIgnoreCase(Column.DELETE_SIGN)) { throw new UserException("unknown reference column in DELETE ON clause:" - + slot.getColumnName()); + + slot.getColumnName()); } else if (entry.getKey().equalsIgnoreCase(Column.SEQUENCE_COL)) { throw new UserException("unknown reference column in ORDER BY clause:" - + slot.getColumnName()); + + slot.getColumnName()); } } throw new UserException("unknown reference column, column=" + entry.getKey() diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java index cf5220edf3..9a042ca44b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java @@ -54,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +@Deprecated public class LoadJob implements Writable { private static final Logger LOG = LogManager.getLogger(LoadJob.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 946d116b51..6e5d56aa55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -334,10 +334,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements this.loadStatistic.totalFileSizeB = fileSize; } - public TUniqueId getRequestId() { - return requestId; - } - /** * Show table names for frontend * If table name could not be found by id, the table id will be used instead. @@ -398,14 +394,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements return userInfo; } - public void setUserInfo(UserIdentity userInfo) { - this.userInfo = userInfo; - } - - public String getComment() { - return comment; - } - public void setComment(String comment) { this.comment = comment; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index 14cd4772db..42e97db794 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -17,23 +17,17 @@ package org.apache.doris.load.loadv2; -import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.LoadStmt; -import org.apache.doris.analysis.UserIdentity; -import org.apache.doris.catalog.BrokerTable; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.jmockit.Deencapsulation; -import org.apache.doris.common.profile.Profile; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.BrokerFileGroupAggInfo; @@ -43,10 +37,7 @@ import org.apache.doris.load.EtlStatus; import org.apache.doris.load.Load; import org.apache.doris.load.Source; import org.apache.doris.metric.MetricRepo; -import org.apache.doris.planner.OlapTableSink; -import org.apache.doris.planner.PlanFragment; import org.apache.doris.task.MasterTaskExecutor; -import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState; import com.google.common.collect.Lists; @@ -61,13 +52,11 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.UUID; public class BrokerLoadJobTest { @@ -78,8 +67,8 @@ public class BrokerLoadJobTest { @Test public void testFromLoadStmt(@Injectable LoadStmt loadStmt, @Injectable LabelName labelName, - @Injectable DataDescription dataDescription, @Mocked Env env, @Mocked InternalCatalog catalog, - @Injectable Database database) { + @Injectable DataDescription dataDescription, @Mocked Env env, @Mocked InternalCatalog catalog, + @Injectable Database database) { List dataDescriptionList = Lists.newArrayList(); dataDescriptionList.add(dataDescription); @@ -122,8 +111,8 @@ public class BrokerLoadJobTest { @Test public void testFromLoadStmt2(@Injectable LoadStmt loadStmt, @Injectable DataDescription dataDescription, - @Injectable LabelName labelName, @Injectable Database database, @Injectable OlapTable olapTable, - @Mocked Env env, @Mocked InternalCatalog catalog) { + @Injectable LabelName labelName, @Injectable Database database, @Injectable OlapTable olapTable, + @Mocked Env env, @Mocked InternalCatalog catalog) { String label = "label"; long dbId = 1; @@ -182,7 +171,7 @@ public class BrokerLoadJobTest { new MockUp() { @Mock public void checkAndCreateSource(Database db, DataDescription dataDescription, - Map>> tableToPartitionSources, EtlJobType jobType) { + Map>> tableToPartitionSources, EtlJobType jobType) { } }; @@ -202,8 +191,8 @@ public class BrokerLoadJobTest { @Test public void testGetTableNames(@Injectable BrokerFileGroupAggInfo fileGroupAggInfo, - @Injectable BrokerFileGroup brokerFileGroup, @Mocked Env env, @Mocked InternalCatalog catalog, - @Injectable Database database, @Injectable Table table) throws MetaNotFoundException { + @Injectable BrokerFileGroup brokerFileGroup, @Mocked Env env, @Mocked InternalCatalog catalog, + @Injectable Database database, @Injectable Table table) throws MetaNotFoundException { List brokerFileGroups = Lists.newArrayList(); brokerFileGroups.add(brokerFileGroup); Map> aggKeyToFileGroups = Maps.newHashMap(); @@ -281,11 +270,11 @@ public class BrokerLoadJobTest { @Test public void testPendingTaskOnFinished(@Injectable BrokerPendingTaskAttachment attachment, @Mocked Env env, - @Mocked InternalCatalog catalog, @Injectable Database database, - @Injectable BrokerFileGroupAggInfo fileGroupAggInfo, @Injectable BrokerFileGroup brokerFileGroup1, - @Injectable BrokerFileGroup brokerFileGroup2, @Injectable BrokerFileGroup brokerFileGroup3, - @Mocked MasterTaskExecutor masterTaskExecutor, @Injectable OlapTable olapTable, - @Mocked LoadingTaskPlanner loadingTaskPlanner) { + @Mocked InternalCatalog catalog, @Injectable Database database, + @Injectable BrokerFileGroupAggInfo fileGroupAggInfo, @Injectable BrokerFileGroup brokerFileGroup1, + @Injectable BrokerFileGroup brokerFileGroup2, @Injectable BrokerFileGroup brokerFileGroup3, + @Mocked MasterTaskExecutor masterTaskExecutor, @Injectable OlapTable olapTable, + @Mocked LoadingTaskPlanner loadingTaskPlanner) { BrokerLoadJob brokerLoadJob = new BrokerLoadJob(); Deencapsulation.setField(brokerLoadJob, "state", JobState.LOADING); long taskId = 1L; @@ -340,48 +329,6 @@ public class BrokerLoadJobTest { Assert.assertEquals(3, idToTasks.size()); } - @Test - public void testPendingTaskOnFinishedWithUserInfo(@Mocked BrokerPendingTaskAttachment attachment, - @Mocked Env env, - @Injectable BrokerDesc brokerDesc, - @Injectable LoadTaskCallback callback, - @Injectable Database database, - @Injectable FileGroupAggKey aggKey, - @Mocked OlapTable olapTable, - @Mocked PlanFragment sinkFragment, - @Mocked OlapTableSink olapTableSink) throws Exception { - List schema = new ArrayList<>(); - schema.add(new Column("a", PrimitiveType.BIGINT)); - Map properties = new HashMap<>(); - properties.put("broker_name", "test"); - properties.put("path", "hdfs://www.test.com"); - BrokerTable brokerTable = new BrokerTable(123L, "test", schema, properties); - BrokerFileGroup brokerFileGroup = new BrokerFileGroup(brokerTable); - List partitionIds = new ArrayList<>(); - partitionIds.add(123L); - Deencapsulation.setField(brokerFileGroup, "partitionIds", partitionIds); - List fileGroups = Lists.newArrayList(); - fileGroups.add(brokerFileGroup); - UUID uuid = UUID.randomUUID(); - TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - Profile jobProfile = new Profile("test", false); - LoadLoadingTask task = new LoadLoadingTask(database, olapTable, brokerDesc, fileGroups, 100, 100, false, false, 100, - callback, "", 100, 1, 1, true, jobProfile, false, false, LoadTask.Priority.NORMAL); - try { - UserIdentity userInfo = new UserIdentity("root", "localhost"); - userInfo.setIsAnalyzed(); - task.init(loadId, - attachment.getFileStatusByTable(aggKey), - attachment.getFileNumByTable(aggKey), - userInfo); - LoadingTaskPlanner planner = Deencapsulation.getField(task, "planner"); - Analyzer al = Deencapsulation.getField(planner, "analyzer"); - Assert.assertFalse(al.isUDFAllowed()); - } catch (Exception e) { - e.printStackTrace(); - } - } - @Test public void testLoadingTaskOnFinishedWithUnfinishedTask(@Injectable BrokerLoadingTaskAttachment attachment, @Injectable LoadTask loadTask1, @@ -465,8 +412,8 @@ public class BrokerLoadJobTest { @Test public void testLoadingTaskOnFinished(@Injectable BrokerLoadingTaskAttachment attachment1, - @Injectable LoadTask loadTask1, @Mocked Env env, @Mocked InternalCatalog catalog, - @Injectable Database database) { + @Injectable LoadTask loadTask1, @Mocked Env env, @Mocked InternalCatalog catalog, + @Injectable Database database) { BrokerLoadJob brokerLoadJob = new BrokerLoadJob(); Deencapsulation.setField(brokerLoadJob, "state", JobState.LOADING); Map idToTasks = Maps.newHashMap();