[chore](load) rm some load related redundant code (#27102)

This commit is contained in:
Siyang Tang
2023-12-01 09:29:28 +08:00
committed by GitHub
parent 60bc3be8a2
commit c1d73ecefb
6 changed files with 17 additions and 502 deletions

View File

@ -46,7 +46,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -636,10 +635,6 @@ public class DataDescription implements InsertStmt.DataDesc {
return lineDelimiter;
}
public void setLineDelimiter(Separator lineDelimiter) {
this.lineDelimiter = lineDelimiter;
}
public byte getEnclose() {
return enclose;
}
@ -716,14 +711,6 @@ public class DataDescription implements InsertStmt.DataDesc {
this.jsonRoot = jsonRoot;
}
@Deprecated
public void addColumnMapping(String functionName, Pair<String, List<String>> pair) {
if (Strings.isNullOrEmpty(functionName) || pair == null) {
return;
}
columnToHadoopFunction.put(functionName, pair);
}
public Map<String, Pair<String, List<String>>> getColumnToHadoopFunction() {
return columnToHadoopFunction;
}
@ -1143,53 +1130,6 @@ public class DataDescription implements InsertStmt.DataDesc {
}
}
/*
* If user does not specify COLUMNS in load stmt, we fill it here.
* eg1:
* both COLUMNS and SET clause is empty. after fill:
* (k1,k2,k3)
*
* eg2:
* COLUMNS is empty, SET is not empty
* SET ( k2 = default_value("2") )
* after fill:
* (k1, k2, k3)
* SET ( k2 = default_value("2") )
*
* eg3:
* COLUMNS is empty, SET is not empty
* SET (k2 = strftime("%Y-%m-%d %H:%M:%S", k2)
* after fill:
* (k1,k2,k3)
* SET (k2 = strftime("%Y-%m-%d %H:%M:%S", k2)
*
*/
public void fillColumnInfoIfNotSpecified(List<Column> baseSchema) {
if (fileFieldNames != null && !fileFieldNames.isEmpty()) {
return;
}
fileFieldNames = Lists.newArrayList();
Set<String> mappingColNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (ImportColumnDesc importColumnDesc : parsedColumnExprList) {
mappingColNames.add(importColumnDesc.getColumnName());
}
for (Column column : baseSchema) {
if (!mappingColNames.contains(column.getName())) {
parsedColumnExprList.add(new ImportColumnDesc(column.getName(), null));
}
if ("json".equals(this.fileFormat)) {
fileFieldNames.add(column.getName());
} else {
fileFieldNames.add(column.getName().toLowerCase());
}
}
LOG.debug("after fill column info. columns: {}, parsed column exprs: {}", fileFieldNames, parsedColumnExprList);
}
public String toSql() {
StringBuilder sb = new StringBuilder();
if (isMysqlLoad) {

View File

@ -21,9 +21,7 @@ import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.Separator;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.HiveTable;
@ -32,14 +30,12 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@ -95,9 +91,6 @@ public class BrokerFileGroup implements Writable {
private long srcTableId = -1;
private boolean isLoadFromTable = false;
// for multi load
private TNetworkAddress beAddr;
private long backendID;
private boolean stripOuterArray = false;
private String jsonPaths = "";
private String jsonRoot = "";
@ -115,41 +108,6 @@ public class BrokerFileGroup implements Writable {
private BrokerFileGroup() {
}
// Used for broker table, no need to parse
public BrokerFileGroup(BrokerTable table) throws AnalysisException {
this.tableId = table.getId();
this.columnSeparator = Separator.convertSeparator(table.getColumnSeparator());
this.lineDelimiter = Separator.convertSeparator(table.getLineDelimiter());
this.isNegative = false;
this.filePaths = table.getPaths();
this.fileFormat = table.getFileFormat();
}
/**
* Should used for hive/iceberg/hudi external table.
*/
public BrokerFileGroup(long tableId,
String filePath,
String fileFormat) throws AnalysisException {
this(tableId, "|", "\n", filePath, fileFormat, null, null);
}
/**
* Should used for hive/iceberg/hudi external table.
*/
public BrokerFileGroup(long tableId, String columnSeparator, String lineDelimiter, String filePath,
String fileFormat, List<String> columnNamesFromPath, List<ImportColumnDesc> columnExprList)
throws AnalysisException {
this.tableId = tableId;
this.columnSeparator = Separator.convertSeparator(columnSeparator);
this.lineDelimiter = Separator.convertSeparator(lineDelimiter);
this.isNegative = false;
this.filePaths = Lists.newArrayList(filePath);
this.fileFormat = fileFormat;
this.columnNamesFromPath = columnNamesFromPath;
this.columnExprList = columnExprList;
}
public BrokerFileGroup(DataDescription dataDescription) {
this.fileFieldNames = dataDescription.getFileFieldNames();
this.columnNamesFromPath = dataDescription.getColumnsFromPath();
@ -251,8 +209,6 @@ public class BrokerFileGroup implements Writable {
srcTableId = srcTable.getId();
isLoadFromTable = true;
}
beAddr = dataDescription.getBeAddr();
backendID = dataDescription.getBackendId();
if (fileFormat != null && fileFormat.equalsIgnoreCase("json")) {
stripOuterArray = dataDescription.isStripOuterArray();
jsonPaths = dataDescription.getJsonPaths();
@ -363,62 +319,30 @@ public class BrokerFileGroup implements Writable {
this.fileSize = fileSize;
}
public TNetworkAddress getBeAddr() {
return beAddr;
}
public long getBackendID() {
return backendID;
}
public boolean isStripOuterArray() {
return stripOuterArray;
}
public void setStripOuterArray(boolean stripOuterArray) {
this.stripOuterArray = stripOuterArray;
}
public boolean isFuzzyParse() {
return fuzzyParse;
}
public void setFuzzyParse(boolean fuzzyParse) {
this.fuzzyParse = fuzzyParse;
}
public boolean isReadJsonByLine() {
return readJsonByLine;
}
public void setReadJsonByLine(boolean readJsonByLine) {
this.readJsonByLine = readJsonByLine;
}
public boolean isNumAsString() {
return numAsString;
}
public void setNumAsString(boolean numAsString) {
this.numAsString = numAsString;
}
public String getJsonPaths() {
return jsonPaths;
}
public void setJsonPaths(String jsonPaths) {
this.jsonPaths = jsonPaths;
}
public String getJsonRoot() {
return jsonRoot;
}
public void setJsonRoot(String jsonRoot) {
this.jsonRoot = jsonRoot;
}
public boolean isBinaryFileFormat() {
if (fileFormat == null) {
// null means default: csv

View File

@ -30,21 +30,16 @@ import org.apache.doris.analysis.FunctionParams;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IsNullPredicate;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ScalarType;
@ -59,8 +54,6 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
@ -80,7 +73,6 @@ import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TFileFormatType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@ -91,7 +83,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -207,282 +198,6 @@ public class Load {
lock.writeLock().unlock();
}
/*
* This is only used for hadoop load
*/
public static void checkAndCreateSource(Database db, DataDescription dataDescription,
Map<Long, Map<Long, List<Source>>> tableToPartitionSources, EtlJobType jobType) throws DdlException {
Source source = new Source(dataDescription.getFilePaths());
long tableId = -1;
Set<Long> sourcePartitionIds = Sets.newHashSet();
// source column names and partitions
String tableName = dataDescription.getTableName();
Map<String, Pair<String, List<String>>> columnToFunction = null;
OlapTable table = db.getOlapTableOrDdlException(tableName);
tableId = table.getId();
table.readLock();
try {
if (table.getPartitionInfo().isMultiColumnPartition() && jobType == EtlJobType.HADOOP) {
throw new DdlException("Load by hadoop cluster does not support table with multi partition columns."
+ " Table: " + table.getName() + ". Try using broker load. See 'help broker load;'");
}
// check partition
if (dataDescription.getPartitionNames() != null
&& table.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) {
ErrorReport.reportDdlException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED);
}
if (table.getState() == OlapTableState.RESTORE) {
throw new DdlException("Table [" + tableName + "] is under restore");
}
if (table.getKeysType() != KeysType.AGG_KEYS && dataDescription.isNegative()) {
throw new DdlException("Load for AGG_KEYS table should not specify NEGATIVE");
}
// get table schema
List<Column> baseSchema = table.getBaseSchema(false);
// fill the column info if user does not specify them
dataDescription.fillColumnInfoIfNotSpecified(baseSchema);
// source columns
List<String> columnNames = Lists.newArrayList();
List<String> assignColumnNames = Lists.newArrayList();
if (dataDescription.getFileFieldNames() != null) {
assignColumnNames.addAll(dataDescription.getFileFieldNames());
if (dataDescription.getColumnsFromPath() != null) {
assignColumnNames.addAll(dataDescription.getColumnsFromPath());
}
}
if (assignColumnNames.isEmpty()) {
// use table columns
for (Column column : baseSchema) {
columnNames.add(column.getName());
}
} else {
// convert column to schema format
for (String assignCol : assignColumnNames) {
if (table.getColumn(assignCol) != null) {
columnNames.add(table.getColumn(assignCol).getName());
} else {
columnNames.add(assignCol);
}
}
}
source.setColumnNames(columnNames);
// check default value
Map<String, Pair<String, List<String>>> columnToHadoopFunction
= dataDescription.getColumnToHadoopFunction();
List<ImportColumnDesc> parsedColumnExprList = dataDescription.getParsedColumnExprList();
Map<String, Expr> parsedColumnExprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (ImportColumnDesc importColumnDesc : parsedColumnExprList) {
parsedColumnExprMap.put(importColumnDesc.getColumnName(), importColumnDesc.getExpr());
}
for (Column column : baseSchema) {
String columnName = column.getName();
if (columnNames.contains(columnName)) {
continue;
}
if (parsedColumnExprMap.containsKey(columnName)) {
continue;
}
if (column.getDefaultValue() != null || column.isAllowNull()) {
continue;
}
throw new DdlException("Column has no default value. column: " + columnName);
}
// check negative for sum aggregate type
if (dataDescription.isNegative()) {
for (Column column : baseSchema) {
if (!column.isKey() && column.getAggregationType() != AggregateType.SUM) {
throw new DdlException("Column is not SUM AggregateType. column:" + column.getName());
}
}
}
// check hll
for (Column column : baseSchema) {
if (column.getDataType() == PrimitiveType.HLL) {
if (columnToHadoopFunction != null && !columnToHadoopFunction.containsKey(column.getName())) {
throw new DdlException("Hll column is not assigned. column:" + column.getName());
}
}
}
// check mapping column exist in table
// check function
// convert mapping column and func arg columns to schema format
// When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in
// their names. These columns are invisible to user, but we need to generate data for these columns.
// So we add column mappings for these column.
// eg1:
// base schema is (A, B, C), and B is under schema change,
// so there will be a shadow column: '__doris_shadow_B'
// So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B));
for (Column column : table.getFullSchema()) {
if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX);
if (parsedColumnExprMap.containsKey(originCol)) {
Expr mappingExpr = parsedColumnExprMap.get(originCol);
if (mappingExpr != null) {
/*
* eg:
* (A, C) SET (B = func(xx))
* ->
* (A, C) SET (B = func(xx), __doris_shadow_B = func(xxx))
*/
if (columnToHadoopFunction.containsKey(originCol)) {
columnToHadoopFunction.put(column.getName(), columnToHadoopFunction.get(originCol));
}
ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), mappingExpr);
parsedColumnExprList.add(importColumnDesc);
} else {
/*
* eg:
* (A, B, C)
* ->
* (A, B, C) SET (__doris_shadow_B = substitute(B))
*/
columnToHadoopFunction.put(column.getName(),
Pair.of("substitute", Lists.newArrayList(originCol)));
ImportColumnDesc importColumnDesc
= new ImportColumnDesc(column.getName(), new SlotRef(null, originCol));
parsedColumnExprList.add(importColumnDesc);
}
} else {
/*
* There is a case that if user does not specify the related origin column, eg:
* COLUMNS (A, C), and B is not specified, but B is being modified
* so there is a shadow column '__doris_shadow_B'.
* We can not just add a mapping function "__doris_shadow_B = substitute(B)",
* because Doris can not find column B.
* In this case, __doris_shadow_B can use its default value,
* so no need to add it to column mapping
*/
// do nothing
}
} else if (!column.isVisible()) {
/*
* For batch delete table add hidden column __DORIS_DELETE_SIGN__ to columns
* eg:
* (A, B, C)
* ->
* (A, B, C) SET (__DORIS_DELETE_SIGN__ = 0)
*/
columnToHadoopFunction.put(column.getName(), Pair.of("default_value",
Lists.newArrayList(column.getDefaultValue())));
ImportColumnDesc importColumnDesc = null;
try {
importColumnDesc = new ImportColumnDesc(column.getName(),
new FunctionCallExpr("default_value", Arrays.asList(column.getDefaultValueExpr())));
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
parsedColumnExprList.add(importColumnDesc);
}
}
LOG.debug("after add shadow column. parsedColumnExprList: {}, columnToHadoopFunction: {}",
parsedColumnExprList, columnToHadoopFunction);
Map<String, String> columnNameMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (String columnName : columnNames) {
columnNameMap.put(columnName, columnName);
}
// validate hadoop functions
if (columnToHadoopFunction != null) {
columnToFunction = Maps.newHashMap();
for (Entry<String, Pair<String, List<String>>> entry : columnToHadoopFunction.entrySet()) {
String mappingColumnName = entry.getKey();
Column mappingColumn = table.getColumn(mappingColumnName);
if (mappingColumn == null) {
throw new DdlException("Mapping column is not in table. column: " + mappingColumnName);
}
Pair<String, List<String>> function = entry.getValue();
try {
DataDescription.validateMappingFunction(function.first, function.second, columnNameMap,
mappingColumn, dataDescription.isHadoopLoad());
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
columnToFunction.put(mappingColumn.getName(), function);
}
}
// partitions of this source
OlapTable olapTable = table;
PartitionNames partitionNames = dataDescription.getPartitionNames();
if (partitionNames == null) {
for (Partition partition : olapTable.getPartitions()) {
sourcePartitionIds.add(partition.getId());
}
} else {
for (String partitionName : partitionNames.getPartitionNames()) {
Partition partition = olapTable.getPartition(partitionName, partitionNames.isTemp());
if (partition == null) {
throw new DdlException("Partition [" + partitionName + "] does not exist");
}
sourcePartitionIds.add(partition.getId());
}
}
} finally {
table.readUnlock();
}
// column separator
String columnSeparator = dataDescription.getColumnSeparator();
if (!Strings.isNullOrEmpty(columnSeparator)) {
source.setColumnSeparator(columnSeparator);
}
// line delimiter
String lineDelimiter = dataDescription.getLineDelimiter();
if (!Strings.isNullOrEmpty(lineDelimiter)) {
source.setLineDelimiter(lineDelimiter);
}
// source negative
source.setNegative(dataDescription.isNegative());
// column mapping functions
if (columnToFunction != null) {
source.setColumnToFunction(columnToFunction);
}
// add source to table partition map
Map<Long, List<Source>> partitionToSources = null;
if (tableToPartitionSources.containsKey(tableId)) {
partitionToSources = tableToPartitionSources.get(tableId);
} else {
partitionToSources = Maps.newHashMap();
tableToPartitionSources.put(tableId, partitionToSources);
}
for (long partitionId : sourcePartitionIds) {
List<Source> sources = null;
if (partitionToSources.containsKey(partitionId)) {
sources = partitionToSources.get(partitionId);
} else {
sources = new ArrayList<Source>();
partitionToSources.put(partitionId, sources);
}
sources.add(source);
}
}
/**
* When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in
* their names. These columns are invisible to user, but we need to generate data for these columns.
@ -762,10 +477,10 @@ public class Load {
if (entry.getKey() != null) {
if (entry.getKey().equalsIgnoreCase(Column.DELETE_SIGN)) {
throw new UserException("unknown reference column in DELETE ON clause:"
+ slot.getColumnName());
+ slot.getColumnName());
} else if (entry.getKey().equalsIgnoreCase(Column.SEQUENCE_COL)) {
throw new UserException("unknown reference column in ORDER BY clause:"
+ slot.getColumnName());
+ slot.getColumnName());
}
}
throw new UserException("unknown reference column, column=" + entry.getKey()

View File

@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
@Deprecated
public class LoadJob implements Writable {
private static final Logger LOG = LogManager.getLogger(LoadJob.class);

View File

@ -334,10 +334,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
this.loadStatistic.totalFileSizeB = fileSize;
}
public TUniqueId getRequestId() {
return requestId;
}
/**
* Show table names for frontend
* If table name could not be found by id, the table id will be used instead.
@ -398,14 +394,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
return userInfo;
}
public void setUserInfo(UserIdentity userInfo) {
this.userInfo = userInfo;
}
public String getComment() {
return comment;
}
public void setComment(String comment) {
this.comment = comment;
}

View File

@ -17,23 +17,17 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.common.profile.Profile;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
@ -43,10 +37,7 @@ import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.Load;
import org.apache.doris.load.Source;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
import com.google.common.collect.Lists;
@ -61,13 +52,11 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
public class BrokerLoadJobTest {
@ -78,8 +67,8 @@ public class BrokerLoadJobTest {
@Test
public void testFromLoadStmt(@Injectable LoadStmt loadStmt, @Injectable LabelName labelName,
@Injectable DataDescription dataDescription, @Mocked Env env, @Mocked InternalCatalog catalog,
@Injectable Database database) {
@Injectable DataDescription dataDescription, @Mocked Env env, @Mocked InternalCatalog catalog,
@Injectable Database database) {
List<DataDescription> dataDescriptionList = Lists.newArrayList();
dataDescriptionList.add(dataDescription);
@ -122,8 +111,8 @@ public class BrokerLoadJobTest {
@Test
public void testFromLoadStmt2(@Injectable LoadStmt loadStmt, @Injectable DataDescription dataDescription,
@Injectable LabelName labelName, @Injectable Database database, @Injectable OlapTable olapTable,
@Mocked Env env, @Mocked InternalCatalog catalog) {
@Injectable LabelName labelName, @Injectable Database database, @Injectable OlapTable olapTable,
@Mocked Env env, @Mocked InternalCatalog catalog) {
String label = "label";
long dbId = 1;
@ -182,7 +171,7 @@ public class BrokerLoadJobTest {
new MockUp<Load>() {
@Mock
public void checkAndCreateSource(Database db, DataDescription dataDescription,
Map<Long, Map<Long, List<Source>>> tableToPartitionSources, EtlJobType jobType) {
Map<Long, Map<Long, List<Source>>> tableToPartitionSources, EtlJobType jobType) {
}
};
@ -202,8 +191,8 @@ public class BrokerLoadJobTest {
@Test
public void testGetTableNames(@Injectable BrokerFileGroupAggInfo fileGroupAggInfo,
@Injectable BrokerFileGroup brokerFileGroup, @Mocked Env env, @Mocked InternalCatalog catalog,
@Injectable Database database, @Injectable Table table) throws MetaNotFoundException {
@Injectable BrokerFileGroup brokerFileGroup, @Mocked Env env, @Mocked InternalCatalog catalog,
@Injectable Database database, @Injectable Table table) throws MetaNotFoundException {
List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
brokerFileGroups.add(brokerFileGroup);
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
@ -281,11 +270,11 @@ public class BrokerLoadJobTest {
@Test
public void testPendingTaskOnFinished(@Injectable BrokerPendingTaskAttachment attachment, @Mocked Env env,
@Mocked InternalCatalog catalog, @Injectable Database database,
@Injectable BrokerFileGroupAggInfo fileGroupAggInfo, @Injectable BrokerFileGroup brokerFileGroup1,
@Injectable BrokerFileGroup brokerFileGroup2, @Injectable BrokerFileGroup brokerFileGroup3,
@Mocked MasterTaskExecutor masterTaskExecutor, @Injectable OlapTable olapTable,
@Mocked LoadingTaskPlanner loadingTaskPlanner) {
@Mocked InternalCatalog catalog, @Injectable Database database,
@Injectable BrokerFileGroupAggInfo fileGroupAggInfo, @Injectable BrokerFileGroup brokerFileGroup1,
@Injectable BrokerFileGroup brokerFileGroup2, @Injectable BrokerFileGroup brokerFileGroup3,
@Mocked MasterTaskExecutor masterTaskExecutor, @Injectable OlapTable olapTable,
@Mocked LoadingTaskPlanner loadingTaskPlanner) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Deencapsulation.setField(brokerLoadJob, "state", JobState.LOADING);
long taskId = 1L;
@ -340,48 +329,6 @@ public class BrokerLoadJobTest {
Assert.assertEquals(3, idToTasks.size());
}
@Test
public void testPendingTaskOnFinishedWithUserInfo(@Mocked BrokerPendingTaskAttachment attachment,
@Mocked Env env,
@Injectable BrokerDesc brokerDesc,
@Injectable LoadTaskCallback callback,
@Injectable Database database,
@Injectable FileGroupAggKey aggKey,
@Mocked OlapTable olapTable,
@Mocked PlanFragment sinkFragment,
@Mocked OlapTableSink olapTableSink) throws Exception {
List<Column> schema = new ArrayList<>();
schema.add(new Column("a", PrimitiveType.BIGINT));
Map<String, String> properties = new HashMap<>();
properties.put("broker_name", "test");
properties.put("path", "hdfs://www.test.com");
BrokerTable brokerTable = new BrokerTable(123L, "test", schema, properties);
BrokerFileGroup brokerFileGroup = new BrokerFileGroup(brokerTable);
List<Long> partitionIds = new ArrayList<>();
partitionIds.add(123L);
Deencapsulation.setField(brokerFileGroup, "partitionIds", partitionIds);
List<BrokerFileGroup> fileGroups = Lists.newArrayList();
fileGroups.add(brokerFileGroup);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
Profile jobProfile = new Profile("test", false);
LoadLoadingTask task = new LoadLoadingTask(database, olapTable, brokerDesc, fileGroups, 100, 100, false, false, 100,
callback, "", 100, 1, 1, true, jobProfile, false, false, LoadTask.Priority.NORMAL);
try {
UserIdentity userInfo = new UserIdentity("root", "localhost");
userInfo.setIsAnalyzed();
task.init(loadId,
attachment.getFileStatusByTable(aggKey),
attachment.getFileNumByTable(aggKey),
userInfo);
LoadingTaskPlanner planner = Deencapsulation.getField(task, "planner");
Analyzer al = Deencapsulation.getField(planner, "analyzer");
Assert.assertFalse(al.isUDFAllowed());
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testLoadingTaskOnFinishedWithUnfinishedTask(@Injectable BrokerLoadingTaskAttachment attachment,
@Injectable LoadTask loadTask1,
@ -465,8 +412,8 @@ public class BrokerLoadJobTest {
@Test
public void testLoadingTaskOnFinished(@Injectable BrokerLoadingTaskAttachment attachment1,
@Injectable LoadTask loadTask1, @Mocked Env env, @Mocked InternalCatalog catalog,
@Injectable Database database) {
@Injectable LoadTask loadTask1, @Mocked Env env, @Mocked InternalCatalog catalog,
@Injectable Database database) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Deencapsulation.setField(brokerLoadJob, "state", JobState.LOADING);
Map<Long, LoadTask> idToTasks = Maps.newHashMap();