From 532d15d3810fbfc4feb1663945a805979ae805eb Mon Sep 17 00:00:00 2001 From: wyb Date: Fri, 19 Jun 2020 17:44:47 +0800 Subject: [PATCH] [Spark Load]Fe submit spark etl job (#3716) After user creates a spark load job which status is PENDING, Fe will schedule and submit the spark etl job. 1. Begin transaction 2. Create a SparkLoadPendingTask for submitting etl job 2.1 Create etl job configuration according to https://github.com/apache/incubator-doris/issues/3010#issuecomment-635174675 2.2 Upload the configuration file and job jar to HDFS with broker 2.3 Submit etl job to spark cluster 2.4 Wait for etl job submission result 3. Update job state to ETL and log job update info if etl job is submitted successfully #3433 --- .../org/apache/doris/catalog/OlapTable.java | 4 + .../java/org/apache/doris/common/Pair.java | 6 + .../apache/doris/common/util/BrokerUtil.java | 412 ++++++++++++- .../apache/doris/journal/JournalEntity.java | 6 + .../main/java/org/apache/doris/load/Load.java | 128 ++-- .../load/loadv2/BrokerLoadPendingTask.java | 2 +- .../org/apache/doris/load/loadv2/LoadJob.java | 90 ++- .../doris/load/loadv2/LoadJobScheduler.java | 9 + .../apache/doris/load/loadv2/LoadManager.java | 11 + .../apache/doris/load/loadv2/LoadTask.java | 10 +- .../doris/load/loadv2/SparkEtlJobHandler.java | 173 ++++++ .../doris/load/loadv2/SparkLoadJob.java | 178 +++++- .../load/loadv2/SparkLoadPendingTask.java | 550 ++++++++++++++++++ .../loadv2/SparkPendingTaskAttachment.java | 62 ++ .../org/apache/doris/persist/EditLog.java | 10 + .../apache/doris/persist/OperationType.java | 2 +- .../apache/doris/persist/gson/GsonUtils.java | 11 +- .../apache/doris/planner/BrokerScanNode.java | 2 +- .../doris/common/util/BrokerUtilTest.java | 204 ++++++- .../loadv2/BrokerLoadPendingTaskTest.java | 2 +- .../apache/doris/load/loadv2/LoadJobTest.java | 7 +- .../load/loadv2/SparkLoadPendingTaskTest.java | 326 +++++++++++ 22 files changed, 2107 insertions(+), 98 deletions(-) create mode 100644 fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java create mode 100644 fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java create mode 100644 fe/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java create mode 100644 fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java index b4d60f6000..c0ac047eb2 100644 --- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -537,6 +537,10 @@ public class OlapTable extends Table { return keysType; } + public KeysType getKeysTypeByIndexId(long indexId) { + return indexIdToMeta.get(indexId).getKeysType(); + } + public PartitionInfo getPartitionInfo() { return partitionInfo; } diff --git a/fe/src/main/java/org/apache/doris/common/Pair.java b/fe/src/main/java/org/apache/doris/common/Pair.java index 14daca0877..1a520e02c3 100644 --- a/fe/src/main/java/org/apache/doris/common/Pair.java +++ b/fe/src/main/java/org/apache/doris/common/Pair.java @@ -17,15 +17,21 @@ package org.apache.doris.common; +import com.google.gson.annotations.SerializedName; + import java.util.Comparator; /** * The equivalent of C++'s std::pair<>. + * + * Notice: When using Pair for persistence, users need to guarantee that F and S can be serialized through Gson */ public class Pair { public static PairComparator> PAIR_VALUE_COMPARATOR = new PairComparator<>(); + @SerializedName(value = "first") public F first; + @SerializedName(value = "second") public S second; public Pair(F first, S second) { diff --git a/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 622b871589..ebbf82bd38 100644 --- a/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -17,52 +17,65 @@ package org.apache.doris.common.util; -import com.google.common.collect.Lists; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.thrift.TBrokerCloseReaderRequest; +import org.apache.doris.thrift.TBrokerCloseWriterRequest; +import org.apache.doris.thrift.TBrokerDeletePathRequest; +import org.apache.doris.thrift.TBrokerFD; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TBrokerListPathRequest; import org.apache.doris.thrift.TBrokerListResponse; +import org.apache.doris.thrift.TBrokerOpenMode; +import org.apache.doris.thrift.TBrokerOpenReaderRequest; +import org.apache.doris.thrift.TBrokerOpenReaderResponse; +import org.apache.doris.thrift.TBrokerOpenWriterRequest; +import org.apache.doris.thrift.TBrokerOpenWriterResponse; +import org.apache.doris.thrift.TBrokerOperationStatus; import org.apache.doris.thrift.TBrokerOperationStatusCode; +import org.apache.doris.thrift.TBrokerPReadRequest; +import org.apache.doris.thrift.TBrokerPWriteRequest; +import org.apache.doris.thrift.TBrokerReadResponse; import org.apache.doris.thrift.TBrokerVersion; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloBrokerService; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.Collections; import java.util.List; public class BrokerUtil { private static final Logger LOG = LogManager.getLogger(BrokerUtil.class); - public static void parseBrokerFile(String path, BrokerDesc brokerDesc, List fileStatuses) + private static int READ_BUFFER_SIZE_B = 1024 * 1024; + + /** + * Parse file status in path with broker, except directory + * @param path + * @param brokerDesc + * @param fileStatuses: file path, size, isDir, isSplitable + * @throws UserException if broker op failed + */ + public static void parseFile(String path, BrokerDesc brokerDesc, List fileStatuses) throws UserException { - FsBroker broker = null; - try { - String localIP = FrontendOptions.getLocalHostAddress(); - broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(brokerDesc.getName(), localIP); - } catch (AnalysisException e) { - throw new UserException(e.getMessage()); - } - TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port); - TPaloBrokerService.Client client = null; - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e) { - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e1) { - throw new UserException("Create connection to broker(" + address + ") failed."); - } - } + TNetworkAddress address = getAddress(brokerDesc); + TPaloBrokerService.Client client = borrowClient(address); boolean failed = true; try { TBrokerListPathRequest request = new TBrokerListPathRequest( @@ -71,11 +84,11 @@ public class BrokerUtil { try { tBrokerListResponse = client.listPath(request); } catch (TException e) { - ClientPool.brokerPool.reopen(client); + reopenClient(client); tBrokerListResponse = client.listPath(request); } if (tBrokerListResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) { - throw new UserException("Broker list path failed.path=" + path + throw new UserException("Broker list path failed. path=" + path + ",broker=" + address + ",msg=" + tBrokerListResponse.getOpStatus().getMessage()); } failed = false; @@ -87,13 +100,9 @@ public class BrokerUtil { } } catch (TException e) { LOG.warn("Broker list path exception, path={}, address={}, exception={}", path, address, e); - throw new UserException("Broker list path exception.path=" + path + ",broker=" + address); + throw new UserException("Broker list path exception. path=" + path + ", broker=" + address); } finally { - if (failed) { - ClientPool.brokerPool.invalidateObject(address, client); - } else { - ClientPool.brokerPool.returnObject(address, client); - } + returnClient(client, address, failed); } } @@ -139,4 +148,351 @@ public class BrokerUtil { return Lists.newArrayList(columns); } + /** + * Read binary data from path with broker + * @param path + * @param brokerDesc + * @return byte[] + * @throws UserException if broker op failed or not only one file + */ + public static byte[] readFile(String path, BrokerDesc brokerDesc) throws UserException { + TNetworkAddress address = getAddress(brokerDesc); + TPaloBrokerService.Client client = borrowClient(address); + boolean failed = true; + TBrokerFD fd = null; + try { + // get file size + TBrokerListPathRequest request = new TBrokerListPathRequest( + TBrokerVersion.VERSION_ONE, path, false, brokerDesc.getProperties()); + TBrokerListResponse tBrokerListResponse = null; + try { + tBrokerListResponse = client.listPath(request); + } catch (TException e) { + reopenClient(client); + tBrokerListResponse = client.listPath(request); + } + if (tBrokerListResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("Broker list path failed. path=" + path + ", broker=" + address + + ",msg=" + tBrokerListResponse.getOpStatus().getMessage()); + } + List fileStatuses = tBrokerListResponse.getFiles(); + if (fileStatuses.size() != 1) { + throw new UserException("Broker files num error. path=" + path + ", broker=" + address + + ", files num: " + fileStatuses.size()); + } + + Preconditions.checkState(!fileStatuses.get(0).isIsDir()); + long fileSize = fileStatuses.get(0).getSize(); + + // open reader + String clientId = FrontendOptions.getLocalHostAddress() + ":" + Config.rpc_port; + TBrokerOpenReaderRequest tOpenReaderRequest = new TBrokerOpenReaderRequest( + TBrokerVersion.VERSION_ONE, path, 0, clientId, brokerDesc.getProperties()); + TBrokerOpenReaderResponse tOpenReaderResponse = null; + try { + tOpenReaderResponse = client.openReader(tOpenReaderRequest); + } catch (TException e) { + reopenClient(client); + tOpenReaderResponse = client.openReader(tOpenReaderRequest); + } + if (tOpenReaderResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("Broker open reader failed. path=" + path + ", broker=" + address + + ", msg=" + tOpenReaderResponse.getOpStatus().getMessage()); + } + fd = tOpenReaderResponse.getFd(); + + // read + TBrokerPReadRequest tPReadRequest = new TBrokerPReadRequest( + TBrokerVersion.VERSION_ONE, fd, 0, fileSize); + TBrokerReadResponse tReadResponse = null; + try { + tReadResponse = client.pread(tPReadRequest); + } catch (TException e) { + reopenClient(client); + tReadResponse = client.pread(tPReadRequest); + } + if (tReadResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("Broker read failed. path=" + path + ", broker=" + address + + ", msg=" + tReadResponse.getOpStatus().getMessage()); + } + failed = false; + return tReadResponse.getData(); + } catch (TException e) { + String failMsg = "Broker read file exception. path=" + path + ", broker=" + address; + LOG.warn(failMsg, e); + throw new UserException(failMsg); + } finally { + // close reader + if (fd != null) { + failed = true; + TBrokerCloseReaderRequest tCloseReaderRequest = new TBrokerCloseReaderRequest( + TBrokerVersion.VERSION_ONE, fd); + TBrokerOperationStatus tOperationStatus = null; + try { + tOperationStatus = client.closeReader(tCloseReaderRequest); + } catch (TException e) { + reopenClient(client); + try { + tOperationStatus = client.closeReader(tCloseReaderRequest); + } catch (TException ex) { + LOG.warn("Broker close reader failed. path={}, address={}", path, address, ex); + } + } + if (tOperationStatus == null || tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { + LOG.warn("Broker close reader failed. path={}, address={}, error={}", path, address, + tOperationStatus.getMessage()); + } else { + failed = false; + } + } + + // return client + returnClient(client, address, failed); + } + } + + /** + * Write binary data to destFilePath with broker + * @param data + * @param destFilePath + * @param brokerDesc + * @throws UserException if broker op failed + */ + public static void writeFile(byte[] data, String destFilePath, BrokerDesc brokerDesc) throws UserException { + BrokerWriter writer = new BrokerWriter(destFilePath, brokerDesc); + try { + writer.open(); + ByteBuffer byteBuffer = ByteBuffer.wrap(data); + writer.write(byteBuffer, data.length); + } finally { + writer.close(); + } + } + + /** + * Write srcFilePath file to destFilePath with broker + * @param srcFilePath + * @param destFilePath + * @param brokerDesc + * @throws UserException if broker op failed + */ + public static void writeFile(String srcFilePath, String destFilePath, + BrokerDesc brokerDesc) throws UserException { + FileInputStream fis = null; + FileChannel channel = null; + BrokerWriter writer = new BrokerWriter(destFilePath, brokerDesc); + ByteBuffer byteBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE_B); + try { + writer.open(); + fis = new FileInputStream(srcFilePath); + channel = fis.getChannel(); + while (true) { + int readSize = channel.read(byteBuffer); + if (readSize == -1) { + break; + } + + byteBuffer.flip(); + writer.write(byteBuffer, readSize); + byteBuffer.clear(); + } + } catch (IOException e) { + String failMsg = "Read file exception. filePath=" + srcFilePath; + LOG.warn(failMsg, e); + throw new UserException(failMsg); + } finally { + // close broker file writer and local file input stream + writer.close(); + try { + if (channel != null) { + channel.close(); + } + if (fis != null) { + fis.close(); + } + } catch (IOException e) { + LOG.warn("Close local file failed. srcPath={}", srcFilePath, e); + } + } + } + + /** + * Delete path with broker + * @param path + * @param brokerDesc + * @throws UserException if broker op failed + */ + public static void deletePath(String path, BrokerDesc brokerDesc) throws UserException { + TNetworkAddress address = getAddress(brokerDesc); + TPaloBrokerService.Client client = borrowClient(address); + boolean failed = true; + try { + TBrokerDeletePathRequest tDeletePathRequest = new TBrokerDeletePathRequest( + TBrokerVersion.VERSION_ONE, path, brokerDesc.getProperties()); + TBrokerOperationStatus tOperationStatus = null; + try { + tOperationStatus = client.deletePath(tDeletePathRequest); + } catch (TException e) { + reopenClient(client); + tOperationStatus = client.deletePath(tDeletePathRequest); + } + if (tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("Broker delete path failed. path=" + path + ", broker=" + address + + ", msg=" + tOperationStatus.getMessage()); + } + failed = false; + } catch (TException e) { + LOG.warn("Broker read path exception, path={}, address={}, exception={}", path, address, e); + throw new UserException("Broker read path exception. path=" + path + ",broker=" + address); + } finally { + returnClient(client, address, failed); + } + } + + private static TNetworkAddress getAddress(BrokerDesc brokerDesc) throws UserException { + FsBroker broker = null; + try { + String localIP = FrontendOptions.getLocalHostAddress(); + broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(brokerDesc.getName(), localIP); + } catch (AnalysisException e) { + throw new UserException(e.getMessage()); + } + return new TNetworkAddress(broker.ip, broker.port); + } + + private static TPaloBrokerService.Client borrowClient(TNetworkAddress address) throws UserException { + TPaloBrokerService.Client client = null; + try { + client = ClientPool.brokerPool.borrowObject(address); + } catch (Exception e) { + try { + client = ClientPool.brokerPool.borrowObject(address); + } catch (Exception e1) { + throw new UserException("Create connection to broker(" + address + ") failed."); + } + } + return client; + } + + private static void returnClient(TPaloBrokerService.Client client, TNetworkAddress address, boolean failed) { + if (failed) { + ClientPool.brokerPool.invalidateObject(address, client); + } else { + ClientPool.brokerPool.returnObject(address, client); + } + } + + private static void reopenClient(TPaloBrokerService.Client client) { + ClientPool.brokerPool.reopen(client); + } + + private static class BrokerWriter { + private String brokerFilePath; + private BrokerDesc brokerDesc; + private TPaloBrokerService.Client client; + private TNetworkAddress address; + private TBrokerFD fd; + private long currentOffset; + private boolean isReady; + private boolean failed; + + public BrokerWriter(String brokerFilePath, BrokerDesc brokerDesc) { + this.brokerFilePath = brokerFilePath; + this.brokerDesc = brokerDesc; + this.isReady = false; + this.failed = true; + } + + public void open() throws UserException { + failed = true; + address = BrokerUtil.getAddress(brokerDesc); + client = BrokerUtil.borrowClient(address); + try { + String clientId = FrontendOptions.getLocalHostAddress() + ":" + Config.rpc_port; + TBrokerOpenWriterRequest tOpenWriterRequest = new TBrokerOpenWriterRequest( + TBrokerVersion.VERSION_ONE, brokerFilePath, TBrokerOpenMode.APPEND, + clientId, brokerDesc.getProperties()); + TBrokerOpenWriterResponse tOpenWriterResponse = null; + try { + tOpenWriterResponse = client.openWriter(tOpenWriterRequest); + } catch (TException e) { + reopenClient(client); + tOpenWriterResponse = client.openWriter(tOpenWriterRequest); + } + if (tOpenWriterResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("Broker open writer failed. destPath=" + brokerFilePath + + ", broker=" + address + + ", msg=" + tOpenWriterResponse.getOpStatus().getMessage()); + } + failed = false; + fd = tOpenWriterResponse.getFd(); + currentOffset = 0L; + isReady = true; + } catch (TException e) { + String failMsg = "Broker open writer exception. filePath=" + brokerFilePath + ", broker=" + address; + LOG.warn(failMsg, e); + throw new UserException(failMsg); + } + } + + public void write(ByteBuffer byteBuffer, long bufferSize) throws UserException { + if (!isReady) { + throw new UserException("Broker writer is not ready. filePath=" + brokerFilePath + ", broker=" + address); + } + + failed = true; + TBrokerOperationStatus tOperationStatus = null; + TBrokerPWriteRequest tPWriteRequest = new TBrokerPWriteRequest( + TBrokerVersion.VERSION_ONE, fd, currentOffset, byteBuffer); + try { + try { + tOperationStatus = client.pwrite(tPWriteRequest); + } catch (TException e) { + reopenClient(client); + tOperationStatus = client.pwrite(tPWriteRequest); + } + if (tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("Broker write failed. filePath=" + brokerFilePath + ", broker=" + address + + ", msg=" + tOperationStatus.getMessage()); + } + failed = false; + currentOffset += bufferSize; + } catch (TException e) { + String failMsg = "Broker write exception. filePath=" + brokerFilePath + ", broker=" + address; + LOG.warn(failMsg, e); + throw new UserException(failMsg); + } + } + + public void close() { + // close broker writer + failed = true; + TBrokerOperationStatus tOperationStatus = null; + if (fd != null) { + TBrokerCloseWriterRequest tCloseWriterRequest = new TBrokerCloseWriterRequest( + TBrokerVersion.VERSION_ONE, fd); + try { + tOperationStatus = client.closeWriter(tCloseWriterRequest); + } catch (TException e) { + reopenClient(client); + try { + tOperationStatus = client.closeWriter(tCloseWriterRequest); + } catch (TException ex) { + LOG.warn("Broker close writer failed. filePath={}, address={}", brokerFilePath, address, ex); + } + } + if (tOperationStatus == null || tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { + LOG.warn("Broker close writer failed. filePath={}, address={}, error={}", brokerFilePath, + address, tOperationStatus.getMessage()); + } else { + failed = false; + } + } + + // return client + returnClient(client, address, failed); + isReady = false; + } + + } } diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java index 69971c10da..b707a15d4b 100644 --- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -42,6 +42,7 @@ import org.apache.doris.load.ExportJob; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.LoadJob; import org.apache.doris.load.loadv2.LoadJobFinalOperation; +import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.master.Checkpoint; import org.apache.doris.mysql.privilege.UserPropertyInfo; @@ -499,6 +500,11 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_UPDATE_LOAD_JOB: { + data = LoadJobStateUpdateInfo.read(in); + isRead = true; + break; + } case OperationType.OP_CREATE_RESOURCE: { data = Resource.read(in); isRead = true; diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 197b809af2..38fa0d75fe 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -864,6 +864,78 @@ public class Load { } } + /** + * When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in + * their names. These columns are invisible to user, but we need to generate data for these columns. + * So we add column mappings for these column. + * eg1: + * base schema is (A, B, C), and B is under schema change, so there will be a shadow column: '__doris_shadow_B' + * So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B)); + */ + public static List getSchemaChangeShadowColumnDesc(Table tbl, Map columnExprMap) { + List shadowColumnDescs = Lists.newArrayList(); + for (Column column : tbl.getFullSchema()) { + if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) { + continue; + } + + String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX); + if (columnExprMap.containsKey(originCol)) { + Expr mappingExpr = columnExprMap.get(originCol); + if (mappingExpr != null) { + /* + * eg: + * (A, C) SET (B = func(xx)) + * -> + * (A, C) SET (B = func(xx), __doris_shadow_B = func(xx)) + */ + ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), mappingExpr); + shadowColumnDescs.add(importColumnDesc); + } else { + /* + * eg: + * (A, B, C) + * -> + * (A, B, C) SET (__doris_shadow_B = B) + */ + ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), + new SlotRef(null, originCol)); + shadowColumnDescs.add(importColumnDesc); + } + } else { + /* + * There is a case that if user does not specify the related origin column, eg: + * COLUMNS (A, C), and B is not specified, but B is being modified so there is a shadow column '__doris_shadow_B'. + * We can not just add a mapping function "__doris_shadow_B = substitute(B)", because Doris can not find column B. + * In this case, __doris_shadow_B can use its default value, so no need to add it to column mapping + */ + // do nothing + } + } + return shadowColumnDescs; + } + + /* + * used for spark load job + * not init slot desc and analyze exprs + */ + public static void initColumns(Table tbl, List columnExprs, + Map>> columnToHadoopFunction) throws UserException { + initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, false); + } + + /* + * This function should be used for broker load v2 and stream load. + * And it must be called in same db lock when planing. + */ + public static void initColumns(Table tbl, List columnExprs, + Map>> columnToHadoopFunction, + Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, + Map slotDescByName, TBrokerScanRangeParams params) throws UserException { + initColumns(tbl, columnExprs, columnToHadoopFunction, exprsByName, analyzer, + srcTupleDesc, slotDescByName, params, true); + } + /* * This function will do followings: * 1. fill the column exprs if user does not specify any column or column mapping. @@ -871,14 +943,12 @@ public class Load { * 3. Add any shadow columns if have. * 4. validate hadoop functions * 5. init slot descs and expr map for load plan - * - * This function should be used for broker load v2 and stream load. - * And it must be called in same db lock when planing. */ public static void initColumns(Table tbl, List columnExprs, Map>> columnToHadoopFunction, Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, - Map slotDescByName, TBrokerScanRangeParams params) throws UserException { + Map slotDescByName, TBrokerScanRangeParams params, + boolean needInitSlotAndAnalyzeExprs) throws UserException { // check mapping column exist in schema // !! all column mappings are in columnExprs !! for (ImportColumnDesc importColumnDesc : columnExprs) { @@ -925,50 +995,8 @@ public class Load { throw new DdlException("Column has no default value. column: " + columnName); } - // When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in - // their names. These columns are invisible to user, but we need to generate data for these columns. - // So we add column mappings for these column. - // eg1: - // base schema is (A, B, C), and B is under schema change, so there will be a shadow column: '__doris_shadow_B' - // So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B)); - for (Column column : tbl.getFullSchema()) { - if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) { - continue; - } - - String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX); - if (columnExprMap.containsKey(originCol)) { - Expr mappingExpr = columnExprMap.get(originCol); - if (mappingExpr != null) { - /* - * eg: - * (A, C) SET (B = func(xx)) - * -> - * (A, C) SET (B = func(xx), __doris_shadow_B = func(xx)) - */ - ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), mappingExpr); - copiedColumnExprs.add(importColumnDesc); - } else { - /* - * eg: - * (A, B, C) - * -> - * (A, B, C) SET (__doris_shadow_B = B) - */ - ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), - new SlotRef(null, originCol)); - copiedColumnExprs.add(importColumnDesc); - } - } else { - /* - * There is a case that if user does not specify the related origin column, eg: - * COLUMNS (A, C), and B is not specified, but B is being modified so there is a shadow column '__doris_shadow_B'. - * We can not just add a mapping function "__doris_shadow_B = substitute(B)", because Doris can not find column B. - * In this case, __doris_shadow_B can use its default value, so no need to add it to column mapping - */ - // do nothing - } - } + // get shadow column desc when table schema change + copiedColumnExprs.addAll(getSchemaChangeShadowColumnDesc(tbl, columnExprMap)); // validate hadoop functions if (columnToHadoopFunction != null) { @@ -991,6 +1019,10 @@ public class Load { } } + if (!needInitSlotAndAnalyzeExprs) { + return; + } + // init slot desc add expr map, also transform hadoop functions for (ImportColumnDesc importColumnDesc : copiedColumnExprs) { // make column name case match with real column name diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index 099caeaa7b..3d394604fb 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -75,7 +75,7 @@ public class BrokerLoadPendingTask extends LoadTask { long groupFileSize = 0; List fileStatuses = Lists.newArrayList(); for (String path : fileGroup.getFilePaths()) { - BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses); + BrokerUtil.parseFile(path, brokerDesc, fileStatuses); } fileStatusList.add(fileStatuses); for (TBrokerFileStatus fstatus : fileStatuses) { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 3b0bb52a55..28b556cd35 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -17,6 +17,7 @@ package org.apache.doris.load.loadv2; +import com.google.gson.annotations.SerializedName; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.catalog.AuthorizationInfo; import org.apache.doris.catalog.Catalog; @@ -30,6 +31,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.FeConstants; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; @@ -46,6 +48,7 @@ import org.apache.doris.load.Load; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PaloPrivilege; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.QeProcessorImpl; @@ -392,7 +395,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements * @throws AnalysisException there are error params in job * @throws DuplicatedRequestException */ - public void execute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException { + public void execute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, + DuplicatedRequestException, LoadException { writeLock(); try { unprotectedExecute(); @@ -401,8 +405,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements } } - public void unprotectedExecute() - throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException { + public void unprotectedExecute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, + DuplicatedRequestException, LoadException { // check if job state is pending if (state != JobState.PENDING) { return; @@ -410,7 +414,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements // the limit of job will be restrict when begin txn beginTxn(); unprotectedExecuteJob(); - unprotectedUpdateState(JobState.LOADING); + // update spark load job state from PENDING to ETL when pending task is finished + if (jobType != EtlJobType.SPARK) { + unprotectedUpdateState(JobState.LOADING); + } } public void processTimeout() { @@ -433,7 +440,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements } } - protected void unprotectedExecuteJob() { + protected void unprotectedExecuteJob() throws LoadException { } /** @@ -706,6 +713,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements case CANCELLED: jobInfo.add("ETL:N/A; LOAD:N/A"); break; + case ETL: + jobInfo.add("ETL:" + progress + "%; LOAD:0%"); + break; default: jobInfo.add("ETL:100%; LOAD:" + progress + "%"); break; @@ -722,7 +732,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements } // task info - jobInfo.add("cluster:N/A" + "; timeout(s):" + timeoutSecond + jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + timeoutSecond + "; max_filter_ratio:" + maxFilterRatio); // error msg @@ -735,7 +745,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements // create time jobInfo.add(TimeUtils.longToTimeString(createTimestamp)); // etl start time - jobInfo.add(TimeUtils.longToTimeString(loadStartTimestamp)); + jobInfo.add(TimeUtils.longToTimeString(getEtlStartTimestamp())); // etl end time jobInfo.add(TimeUtils.longToTimeString(loadStartTimestamp)); // load start time @@ -751,6 +761,14 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements } } + protected String getResourceName() { + return "N/A"; + } + + protected long getEtlStartTimestamp() { + return loadStartTimestamp; + } + public void getJobInfo(Load.JobInfo jobInfo) throws DdlException { checkAuth("SHOW LOAD"); jobInfo.tblNames.addAll(getTableNamesForShow()); @@ -768,6 +786,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements EtlJobType type = EtlJobType.valueOf(Text.readString(in)); if (type == EtlJobType.BROKER) { job = new BrokerLoadJob(); + } else if (type == EtlJobType.SPARK) { + job = new SparkLoadJob(); } else if (type == EtlJobType.INSERT) { job = new InsertLoadJob(); } else if (type == EtlJobType.MINI) { @@ -1016,4 +1036,60 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements timezone = Text.readString(in); } } + + public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) { + state = info.getState(); + transactionId = info.getTransactionId(); + loadStartTimestamp = info.getLoadStartTimestamp(); + } + + public static class LoadJobStateUpdateInfo implements Writable { + @SerializedName(value = "jobId") + private long jobId; + @SerializedName(value = "state") + private JobState state; + @SerializedName(value = "transactionId") + private long transactionId; + @SerializedName(value = "loadStartTimestamp") + private long loadStartTimestamp; + + public LoadJobStateUpdateInfo(long jobId, JobState state, long transactionId, long loadStartTimestamp) { + this.jobId = jobId; + this.state = state; + this.transactionId = transactionId; + this.loadStartTimestamp = loadStartTimestamp; + } + + public long getJobId() { + return jobId; + } + + public JobState getState() { + return state; + } + + public long getTransactionId() { + return transactionId; + } + + public long getLoadStartTimestamp() { + return loadStartTimestamp; + } + + @Override + public String toString() { + return GsonUtils.GSON.toJson(this); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static LoadJobStateUpdateInfo read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, LoadJobStateUpdateInfo.class); + } + } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java index f01e99ba97..9a2b691983 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java @@ -21,6 +21,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.MasterDaemon; @@ -74,8 +75,16 @@ public class LoadJobScheduler extends MasterDaemon { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) .add("error_msg", "There are error properties in job. Job will be cancelled") .build(), e); + // transaction not begin, so need not abort loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()), false, true); + } catch (LoadException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) + .add("error_msg", "Failed to submit etl job. Job will be cancelled") + .build(), e); + // transaction already begin, so need abort + loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()), + true, true); } catch (DuplicatedRequestException e) { // should not happen in load job scheduler, there is no request id. LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 1a5658ab49..5dd2d05b46 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -333,6 +333,17 @@ public class LoadManager implements Writable{ .build()); } + public void replayUpdateLoadJobStateInfo(LoadJob.LoadJobStateUpdateInfo info) { + long jobId = info.getJobId(); + LoadJob job = idToLoadJob.get(jobId); + if (job == null) { + LOG.warn("replay update load job state failed. error: job not found, id: {}", jobId); + return; + } + + job.replayUpdateStateInfo(info); + } + public int getLoadJobNum(JobState jobState, long dbId) { readLock(); try { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java index e9b286f809..0a52369362 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -18,6 +18,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; @@ -53,7 +54,7 @@ public abstract class LoadTask extends MasterTask { } catch (UserException e) { failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage()); LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) - .add("error_msg", "Failed to execute load task").build()); + .add("error_msg", "Failed to execute load task").build(), e); } catch (Exception e) { failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage()); LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) @@ -66,6 +67,13 @@ public abstract class LoadTask extends MasterTask { } } + /** + * init load task + * @throws LoadException + */ + public void init() throws LoadException { + } + /** * execute load task * diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java new file mode 100644 index 0000000000..d135b232d3 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.PaloFe; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.SparkResource; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.load.loadv2.etl.EtlJobConfig; +import org.apache.doris.thrift.TEtlState; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.spark.launcher.SparkAppHandle; +import org.apache.spark.launcher.SparkAppHandle.Listener; +import org.apache.spark.launcher.SparkAppHandle.State; +import org.apache.spark.launcher.SparkLauncher; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Map; + +/** + * SparkEtlJobHandler is responsible for + * 1. submit spark etl job + * 2. get spark etl job status + * 3. kill spark etl job + * 4. get spark etl file paths + * 5. delete etl output path + */ +public class SparkEtlJobHandler { + private static final Logger LOG = LogManager.getLogger(SparkEtlJobHandler.class); + + private static final String APP_RESOURCE_NAME = "palo-fe.jar"; + private static final String CONFIG_FILE_NAME = "jobconfig.json"; + private static final String APP_RESOURCE_LOCAL_PATH = PaloFe.DORIS_HOME_DIR + "/lib/" + APP_RESOURCE_NAME; + private static final String JOB_CONFIG_DIR = "configs"; + private static final String MAIN_CLASS = "org.apache.doris.load.loadv2.etl.SparkEtlJob"; + private static final String ETL_JOB_NAME = "doris__%s"; + // 5min + private static final int GET_APPID_MAX_RETRY_TIMES = 300; + private static final int GET_APPID_SLEEP_MS = 1000; + + class SparkAppListener implements Listener { + @Override + public void stateChanged(SparkAppHandle sparkAppHandle) {} + + @Override + public void infoChanged(SparkAppHandle sparkAppHandle) {} + } + + public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, SparkResource resource, + BrokerDesc brokerDesc, SparkPendingTaskAttachment attachment) throws LoadException { + // delete outputPath + deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc); + + // upload app resource and jobconfig to hdfs + String configsHdfsDir = etlJobConfig.outputPath + "/" + JOB_CONFIG_DIR + "/"; + String appResourceHdfsPath = configsHdfsDir + APP_RESOURCE_NAME; + String jobConfigHdfsPath = configsHdfsDir + CONFIG_FILE_NAME; + try { + BrokerUtil.writeFile(APP_RESOURCE_LOCAL_PATH, appResourceHdfsPath, brokerDesc); + byte[] configData = etlJobConfig.configToJson().getBytes("UTF-8"); + BrokerUtil.writeFile(configData, jobConfigHdfsPath, brokerDesc); + } catch (UserException | UnsupportedEncodingException e) { + throw new LoadException(e.getMessage()); + } + + SparkLauncher launcher = new SparkLauncher(); + // master | deployMode + // ------------|------------- + // yarn | cluster + // spark://xx | client + launcher.setMaster(resource.getMaster()) + .setDeployMode(resource.getDeployMode().name().toLowerCase()) + .setAppResource(appResourceHdfsPath) + // TODO(wyb): spark-load + // replace with getCanonicalName later + //.setMainClass(SparkEtlJob.class.getCanonicalName()) + .setMainClass(MAIN_CLASS) + .setAppName(String.format(ETL_JOB_NAME, loadLabel)) + .addAppArgs(jobConfigHdfsPath); + // spark configs + for (Map.Entry entry : resource.getSparkConfigs().entrySet()) { + launcher.setConf(entry.getKey(), entry.getValue()); + } + + // start app + SparkAppHandle handle = null; + State state = null; + String appId = null; + int retry = 0; + String errMsg = "start spark app failed. error: "; + try { + handle = launcher.startApplication(new SparkAppListener()); + } catch (IOException e) { + LOG.warn(errMsg, e); + throw new LoadException(errMsg + e.getMessage()); + } + + while (retry++ < GET_APPID_MAX_RETRY_TIMES) { + appId = handle.getAppId(); + if (appId != null) { + break; + } + + // check state and retry + state = handle.getState(); + if (fromSparkState(state) == TEtlState.CANCELLED) { + throw new LoadException(errMsg + "spark app state: " + state.toString()); + } + if (retry >= GET_APPID_MAX_RETRY_TIMES) { + throw new LoadException(errMsg + "wait too much time for getting appid. spark app state: " + + state.toString()); + } + + // log + if (retry % 10 == 0) { + LOG.info("spark appid that handle get is null. load job id: {}, state: {}, retry times: {}", + loadJobId, state.toString(), retry); + } + try { + Thread.sleep(GET_APPID_SLEEP_MS); + } catch (InterruptedException e) { + LOG.warn(e.getMessage()); + } + } + + // success + attachment.setAppId(appId); + attachment.setHandle(handle); + } + + public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) { + try { + BrokerUtil.deletePath(outputPath, brokerDesc); + LOG.info("delete path success. path: {}", outputPath); + } catch (UserException e) { + LOG.warn("delete path failed. path: {}", outputPath, e); + } + } + + private TEtlState fromSparkState(State state) { + switch (state) { + case FINISHED: + return TEtlState.FINISHED; + case FAILED: + case KILLED: + case LOST: + return TEtlState.CANCELLED; + default: + // UNKNOWN CONNECTED SUBMITTED RUNNING + return TEtlState.RUNNING; + } + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 8bb68f764e..8f74235046 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -17,30 +17,43 @@ package org.apache.doris.load.loadv2; -import com.google.common.base.Strings; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.ResourceDesc; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.SparkResource; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.PushTask; +import org.apache.doris.transaction.BeginTransactionException; +import org.apache.doris.transaction.TransactionState.LoadJobSourceType; +import org.apache.doris.transaction.TransactionState.TxnCoordinator; +import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.spark.launcher.SparkAppHandle; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; @@ -79,7 +92,7 @@ public class SparkLoadJob extends BulkLoadJob { private long quorumFinishTimestamp = -1; // below for push task private Map> tableToLoadPartitions = Maps.newHashMap(); - //private Map indexToPushBrokerReaderParams = Maps.newHashMap(); + //private Map indexToPushBrokerReaderParams = Maps.newHashMap(); private Map indexToSchemaHash = Maps.newHashMap(); private Map> tabletToSentReplicaPushTask = Maps.newHashMap(); private Set finishedReplicas = Sets.newHashSet(); @@ -127,6 +140,77 @@ public class SparkLoadJob extends BulkLoadJob { brokerDesc = new BrokerDesc(sparkResource.getBroker(), brokerProperties); } + @Override + public void beginTxn() + throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException { + transactionId = Catalog.getCurrentGlobalTransactionMgr() + .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null, + new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + LoadJobSourceType.FRONTEND, id, timeoutSecond); + } + + @Override + protected void unprotectedExecuteJob() throws LoadException { + // create pending task + LoadTask task = new SparkLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), + sparkResource, brokerDesc); + task.init(); + idToTasks.put(task.getSignature(), task); + Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task); + } + + @Override + public void onTaskFinished(TaskAttachment attachment) { + if (attachment instanceof SparkPendingTaskAttachment) { + onPendingTaskFinished((SparkPendingTaskAttachment) attachment); + } + } + + private void onPendingTaskFinished(SparkPendingTaskAttachment attachment) { + writeLock(); + try { + // check if job has been cancelled + if (isTxnDone()) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("state", state) + .add("error_msg", "this task will be ignored when job is: " + state) + .build()); + return; + } + + if (finishedTaskIds.contains(attachment.getTaskId())) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("task_id", attachment.getTaskId()) + .add("error_msg", "this is a duplicated callback of pending task " + + "when broker already has loading task") + .build()); + return; + } + + // add task id into finishedTaskIds + finishedTaskIds.add(attachment.getTaskId()); + + sparkAppHandle = attachment.getHandle(); + appId = attachment.getAppId(); + etlOutputPath = attachment.getOutputPath(); + + executeEtl(); + // log etl state + unprotectedLogUpdateStateInfo(); + } finally { + writeUnlock(); + } + } + + /** + * update etl start time and state in spark load job + */ + private void executeEtl() { + etlStartTimestamp = System.currentTimeMillis(); + state = JobState.ETL; + LOG.info("update to {} state success. job id: {}", state, id); + } + /** * load job already cancelled or finished, clear job below: * 1. kill etl job and delete etl files @@ -136,8 +220,7 @@ public class SparkLoadJob extends BulkLoadJob { Preconditions.checkState(state == JobState.FINISHED || state == JobState.CANCELLED); LOG.debug("kill etl job and delete etl files. id: {}, state: {}", id, state); - // TODO(wyb): spark-load - //SparkEtlJobHandler handler = new SparkEtlJobHandler(); + SparkEtlJobHandler handler = new SparkEtlJobHandler(); if (state == JobState.CANCELLED) { if ((!Strings.isNullOrEmpty(appId) && sparkResource.isYarnMaster()) || sparkAppHandle != null) { try { @@ -152,8 +235,7 @@ public class SparkLoadJob extends BulkLoadJob { try { // delete label dir, remove the last taskId dir String outputPath = etlOutputPath.substring(0, etlOutputPath.lastIndexOf("/")); - // TODO(wyb): spark-load - //handler.deleteEtlOutputPath(outputPath, brokerDesc); + handler.deleteEtlOutputPath(outputPath, brokerDesc); } catch (Exception e) { LOG.warn("delete etl files failed. id: {}, state: {}", id, state, e); } @@ -198,6 +280,16 @@ public class SparkLoadJob extends BulkLoadJob { clearJob(); } + @Override + protected String getResourceName() { + return sparkResource.getName(); + } + + @Override + protected long getEtlStartTimestamp() { + return etlStartTimestamp; + } + @Override public void write(DataOutput out) throws IOException { super.write(out); @@ -226,4 +318,78 @@ public class SparkLoadJob extends BulkLoadJob { tabletMetaToFileInfo.put(tabletMetaStr, fileInfo); } } + + /** + * log load job update info when job state changed to etl or loading + */ + private void unprotectedLogUpdateStateInfo() { + SparkLoadJobStateUpdateInfo info = new SparkLoadJobStateUpdateInfo( + id, state, transactionId, etlStartTimestamp, appId, etlOutputPath, + loadStartTimestamp, tabletMetaToFileInfo); + Catalog.getCurrentCatalog().getEditLog().logUpdateLoadJob(info); + } + + @Override + public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) { + super.replayUpdateStateInfo(info); + SparkLoadJobStateUpdateInfo sparkJobStateInfo = (SparkLoadJobStateUpdateInfo) info; + etlStartTimestamp = sparkJobStateInfo.getEtlStartTimestamp(); + appId = sparkJobStateInfo.getAppId(); + etlOutputPath = sparkJobStateInfo.getEtlOutputPath(); + tabletMetaToFileInfo = sparkJobStateInfo.getTabletMetaToFileInfo(); + + switch (state) { + case ETL: + // nothing to do + break; + case LOADING: + // TODO(wyb): spark-load + //unprotectedPrepareLoadingInfos(); + break; + default: + LOG.warn("replay update load job state info failed. error: wrong state. job id: {}, state: {}", + id, state); + break; + } + } + + /** + * Used for spark load job journal log when job state changed to ETL or LOADING + */ + public static class SparkLoadJobStateUpdateInfo extends LoadJobStateUpdateInfo { + @SerializedName(value = "etlStartTimestamp") + private long etlStartTimestamp; + @SerializedName(value = "appId") + private String appId; + @SerializedName(value = "etlOutputPath") + private String etlOutputPath; + @SerializedName(value = "tabletMetaToFileInfo") + private Map> tabletMetaToFileInfo; + + public SparkLoadJobStateUpdateInfo(long jobId, JobState state, long transactionId, long etlStartTimestamp, + String appId, String etlOutputPath, long loadStartTimestamp, + Map> tabletMetaToFileInfo) { + super(jobId, state, transactionId, loadStartTimestamp); + this.etlStartTimestamp = etlStartTimestamp; + this.appId = appId; + this.etlOutputPath = etlOutputPath; + this.tabletMetaToFileInfo = tabletMetaToFileInfo; + } + + public long getEtlStartTimestamp() { + return etlStartTimestamp; + } + + public String getAppId() { + return appId; + } + + public String getEtlOutputPath() { + return etlOutputPath; + } + + public Map> getTabletMetaToFileInfo() { + return tabletMetaToFileInfo; + } + } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java new file mode 100644 index 0000000000..185e84a549 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -0,0 +1,550 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveTable; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.SparkResource; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.etl.EtlJobConfig; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumnMapping; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlJobProperty; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartition; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartitionInfo; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType; +import org.apache.doris.transaction.TransactionState; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +// 1. create etl job config and write it into jobconfig.json file +// 2. submit spark etl job +public class SparkLoadPendingTask extends LoadTask { + private static final Logger LOG = LogManager.getLogger(SparkLoadPendingTask.class); + + private final Map> aggKeyToBrokerFileGroups; + private final SparkResource resource; + private final BrokerDesc brokerDesc; + private final long dbId; + private final String loadLabel; + private final long loadJobId; + private final long transactionId; + private EtlJobConfig etlJobConfig; + + public SparkLoadPendingTask(SparkLoadJob loadTaskCallback, + Map> aggKeyToBrokerFileGroups, + SparkResource resource, BrokerDesc brokerDesc) { + super(loadTaskCallback); + this.retryTime = 3; + this.attachment = new SparkPendingTaskAttachment(signature); + this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups; + this.resource = resource; + this.brokerDesc = brokerDesc; + this.dbId = loadTaskCallback.getDbId(); + this.loadJobId = loadTaskCallback.getId(); + this.loadLabel = loadTaskCallback.getLabel(); + this.transactionId = loadTaskCallback.getTransactionId(); + this.failMsg = new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL); + } + + @Override + void executeTask() throws LoadException { + LOG.info("begin to execute spark pending task. load job id: {}", loadJobId); + submitEtlJob(); + } + + private void submitEtlJob() throws LoadException { + SparkPendingTaskAttachment sparkAttachment = (SparkPendingTaskAttachment) attachment; + // retry different output path + etlJobConfig.outputPath = EtlJobConfig.getOutputPath(resource.getWorkingDir(), dbId, loadLabel, signature); + sparkAttachment.setOutputPath(etlJobConfig.outputPath); + + // handler submit etl job + SparkEtlJobHandler handler = new SparkEtlJobHandler(); + handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource, brokerDesc, sparkAttachment); + LOG.info("submit spark etl job success. load job id: {}, attachment: {}", loadJobId, sparkAttachment); + } + + @Override + public void init() throws LoadException { + createEtlJobConf(); + } + + private void createEtlJobConf() throws LoadException { + Database db = Catalog.getCurrentCatalog().getDb(dbId); + if (db == null) { + throw new LoadException("db does not exist. id: " + dbId); + } + + Map tables = Maps.newHashMap(); + db.readLock(); + try { + Map> tableIdToPartitionIds = Maps.newHashMap(); + Set allPartitionsTableIds = Sets.newHashSet(); + prepareTablePartitionInfos(db, tableIdToPartitionIds, allPartitionsTableIds); + + + for (Map.Entry> entry : aggKeyToBrokerFileGroups.entrySet()) { + FileGroupAggKey aggKey = entry.getKey(); + long tableId = aggKey.getTableId(); + + OlapTable table = (OlapTable) db.getTable(tableId); + if (table == null) { + throw new LoadException("table does not exist. id: " + tableId); + } + + EtlTable etlTable = null; + if (tables.containsKey(tableId)) { + etlTable = tables.get(tableId); + } else { + // indexes + List etlIndexes = createEtlIndexes(table); + // partition info + EtlPartitionInfo etlPartitionInfo = createEtlPartitionInfo(table, + tableIdToPartitionIds.get(tableId)); + etlTable = new EtlTable(etlIndexes, etlPartitionInfo); + tables.put(tableId, etlTable); + + // add table indexes to transaction state + TransactionState txnState = Catalog.getCurrentGlobalTransactionMgr() + .getTransactionState(dbId, transactionId); + if (txnState == null) { + throw new LoadException("txn does not exist. id: " + transactionId); + } + txnState.addTableIndexes(table); + } + + // file group + for (BrokerFileGroup fileGroup : entry.getValue()) { + etlTable.addFileGroup(createEtlFileGroup(fileGroup, tableIdToPartitionIds.get(tableId), db, table)); + } + } + } finally { + db.readUnlock(); + } + + String outputFilePattern = EtlJobConfig.getOutputFilePattern(loadLabel, FilePatternVersion.V1); + // strictMode timezone properties + EtlJobProperty properties = new EtlJobProperty(); + properties.strictMode = ((LoadJob) callback).strictMode; + properties.timezone = ((LoadJob) callback).timezone; + etlJobConfig = new EtlJobConfig(tables, outputFilePattern, loadLabel, properties); + } + + private void prepareTablePartitionInfos(Database db, Map> tableIdToPartitionIds, + Set allPartitionsTableIds) throws LoadException { + for (FileGroupAggKey aggKey : aggKeyToBrokerFileGroups.keySet()) { + long tableId = aggKey.getTableId(); + if (allPartitionsTableIds.contains(tableId)) { + continue; + } + + OlapTable table = (OlapTable) db.getTable(tableId); + if (table == null) { + throw new LoadException("table does not exist. id: " + tableId); + } + + Set partitionIds = null; + if (tableIdToPartitionIds.containsKey(tableId)) { + partitionIds = tableIdToPartitionIds.get(tableId); + } else { + partitionIds = Sets.newHashSet(); + tableIdToPartitionIds.put(tableId, partitionIds); + } + + Set groupPartitionIds = aggKey.getPartitionIds(); + // if not assign partition, use all partitions + if (groupPartitionIds == null || groupPartitionIds.isEmpty()) { + for (Partition partition : table.getPartitions()) { + partitionIds.add(partition.getId()); + } + + allPartitionsTableIds.add(tableId); + } else { + partitionIds.addAll(groupPartitionIds); + } + } + } + + private List createEtlIndexes(OlapTable table) throws LoadException { + List etlIndexes = Lists.newArrayList(); + + for (Map.Entry> entry : table.getIndexIdToSchema().entrySet()) { + long indexId = entry.getKey(); + int schemaHash = table.getSchemaHashByIndexId(indexId); + + // columns + List etlColumns = Lists.newArrayList(); + for (Column column : entry.getValue()) { + etlColumns.add(createEtlColumn(column)); + } + + // check distribution type + DistributionInfo distributionInfo = table.getDefaultDistributionInfo(); + if (distributionInfo.getType() != DistributionInfoType.HASH) { + // RANDOM not supported + String errMsg = "Unsupported distribution type. type: " + distributionInfo.getType().name(); + LOG.warn(errMsg); + throw new LoadException(errMsg); + } + + // index type + String indexType = null; + KeysType keysType = table.getKeysTypeByIndexId(indexId); + switch (keysType) { + case DUP_KEYS: + indexType = "DUPLICATE"; + break; + case AGG_KEYS: + indexType = "AGGREGATE"; + break; + case UNIQUE_KEYS: + indexType = "UNIQUE"; + break; + default: + String errMsg = "unknown keys type. type: " + keysType.name(); + LOG.warn(errMsg); + throw new LoadException(errMsg); + } + + // is base index + boolean isBaseIndex = indexId == table.getBaseIndexId() ? true : false; + + etlIndexes.add(new EtlIndex(indexId, etlColumns, schemaHash, indexType, isBaseIndex)); + } + + return etlIndexes; + } + + private EtlColumn createEtlColumn(Column column) { + // column name + String name = column.getName(); + // column type + PrimitiveType type = column.getDataType(); + String columnType = column.getDataType().toString(); + // is allow null + boolean isAllowNull = column.isAllowNull(); + // is key + boolean isKey = column.isKey(); + + // aggregation type + String aggregationType = null; + if (column.getAggregationType() != null) { + aggregationType = column.getAggregationType().toString(); + } + + // default value + String defaultValue = null; + if (column.getDefaultValue() != null) { + defaultValue = column.getDefaultValue(); + } + if (column.isAllowNull() && column.getDefaultValue() == null) { + defaultValue = "\\N"; + } + + // string length + int stringLength = 0; + if (type.isStringType()) { + stringLength = column.getStrLen(); + } + + // decimal precision scale + int precision = 0; + int scale = 0; + if (type.isDecimalType() || type.isDecimalV2Type()) { + precision = column.getPrecision(); + scale = column.getScale(); + } + + return new EtlColumn(name, columnType, isAllowNull, isKey, aggregationType, defaultValue, + stringLength, precision, scale); + } + + private EtlPartitionInfo createEtlPartitionInfo(OlapTable table, Set partitionIds) throws LoadException { + PartitionType type = table.getPartitionInfo().getType(); + + List partitionColumnRefs = Lists.newArrayList(); + List etlPartitions = Lists.newArrayList(); + if (type == PartitionType.RANGE) { + RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo(); + for (Column column : rangePartitionInfo.getPartitionColumns()) { + partitionColumnRefs.add(column.getName()); + } + + for (Map.Entry> entry : rangePartitionInfo.getSortedRangeMap(false)) { + long partitionId = entry.getKey(); + if (!partitionIds.contains(partitionId)) { + continue; + } + + Partition partition = table.getPartition(partitionId); + if (partition == null) { + throw new LoadException("partition does not exist. id: " + partitionId); + } + + // bucket num + int bucketNum = partition.getDistributionInfo().getBucketNum(); + + // is max partition + Range range = entry.getValue(); + boolean isMaxPartition = range.upperEndpoint().isMaxValue(); + + // start keys + List rangeKeyExprs = range.lowerEndpoint().getKeys(); + List startKeys = Lists.newArrayList(); + for (int i = 0; i < rangeKeyExprs.size(); ++i) { + LiteralExpr literalExpr = rangeKeyExprs.get(i); + Object keyValue = literalExpr.getRealValue(); + startKeys.add(keyValue); + } + + // end keys + // is empty list when max partition + List endKeys = Lists.newArrayList(); + if (!isMaxPartition) { + rangeKeyExprs = range.upperEndpoint().getKeys(); + for (int i = 0; i < rangeKeyExprs.size(); ++i) { + LiteralExpr literalExpr = rangeKeyExprs.get(i); + Object keyValue = literalExpr.getRealValue(); + endKeys.add(keyValue); + } + } + + etlPartitions.add(new EtlPartition(partitionId, startKeys, endKeys, isMaxPartition, bucketNum)); + } + } else { + Preconditions.checkState(type == PartitionType.UNPARTITIONED); + Preconditions.checkState(partitionIds.size() == 1); + + for (Long partitionId : partitionIds) { + Partition partition = table.getPartition(partitionId); + if (partition == null) { + throw new LoadException("partition does not exist. id: " + partitionId); + } + + // bucket num + int bucketNum = partition.getDistributionInfo().getBucketNum(); + + etlPartitions.add(new EtlPartition(partitionId, Lists.newArrayList(), Lists.newArrayList(), + true, bucketNum)); + } + } + + // distribution column refs + List distributionColumnRefs = Lists.newArrayList(); + DistributionInfo distributionInfo = table.getDefaultDistributionInfo(); + Preconditions.checkState(distributionInfo.getType() == DistributionInfoType.HASH); + for (Column column : ((HashDistributionInfo) distributionInfo).getDistributionColumns()) { + distributionColumnRefs.add(column.getName()); + } + + return new EtlPartitionInfo(type.typeString, partitionColumnRefs, distributionColumnRefs, etlPartitions); + } + + private EtlFileGroup createEtlFileGroup(BrokerFileGroup fileGroup, Set tablePartitionIds, + Database db, OlapTable table) throws LoadException { + List copiedColumnExprList = Lists.newArrayList(fileGroup.getColumnExprList()); + Map exprByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + for (ImportColumnDesc columnDesc : copiedColumnExprList) { + if (!columnDesc.isColumn()) { + exprByName.put(columnDesc.getColumnName(), columnDesc.getExpr()); + } + } + + // check columns + try { + Load.initColumns(table, copiedColumnExprList, fileGroup.getColumnToHadoopFunction()); + } catch (UserException e) { + throw new LoadException(e.getMessage()); + } + // add shadow column mapping when schema change + for (ImportColumnDesc columnDesc : Load.getSchemaChangeShadowColumnDesc(table, exprByName)) { + copiedColumnExprList.add(columnDesc); + exprByName.put(columnDesc.getColumnName(), columnDesc.getExpr()); + } + + // check negative for sum aggregate type + if (fileGroup.isNegative()) { + for (Column column : table.getBaseSchema()) { + if (!column.isKey() && column.getAggregationType() != AggregateType.SUM) { + throw new LoadException("Column is not SUM AggreateType. column:" + column.getName()); + } + } + } + + // fill file field names if empty + List fileFieldNames = fileGroup.getFileFieldNames(); + if (fileFieldNames == null || fileFieldNames.isEmpty()) { + fileFieldNames = Lists.newArrayList(); + for (Column column : table.getBaseSchema()) { + fileFieldNames.add(column.getName()); + } + } + + // column mappings + Map>> columnToHadoopFunction = fileGroup.getColumnToHadoopFunction(); + Map columnMappings = Maps.newHashMap(); + if (columnToHadoopFunction != null) { + for (Map.Entry>> entry : columnToHadoopFunction.entrySet()) { + columnMappings.put(entry.getKey(), + new EtlColumnMapping(entry.getValue().first, entry.getValue().second)); + } + } + for (ImportColumnDesc columnDesc : copiedColumnExprList) { + if (columnDesc.isColumn() || columnMappings.containsKey(columnDesc.getColumnName())) { + continue; + } + // the left must be column expr + columnMappings.put(columnDesc.getColumnName(), new EtlColumnMapping(columnDesc.getExpr().toSql())); + } + + // partition ids + List partitionIds = fileGroup.getPartitionIds(); + if (partitionIds == null || partitionIds.isEmpty()) { + partitionIds = Lists.newArrayList(tablePartitionIds); + } + + // where + // TODO: check + String where = ""; + if (fileGroup.getWhereExpr() != null) { + where = fileGroup.getWhereExpr().toSql(); + } + + // load from table + String hiveDbTableName = ""; + Map hiveTableProperties = Maps.newHashMap(); + if (fileGroup.isLoadFromTable()) { + long srcTableId = fileGroup.getSrcTableId(); + HiveTable srcHiveTable = (HiveTable) db.getTable(srcTableId); + if (srcHiveTable == null) { + throw new LoadException("table does not exist. id: " + srcTableId); + } + hiveDbTableName = srcHiveTable.getHiveDbTable(); + hiveTableProperties.putAll(srcHiveTable.getHiveProperties()); + } + + // check hll and bitmap func + // TODO: more check + for (Column column : table.getBaseSchema()) { + String columnName = column.getName(); + PrimitiveType columnType = column.getDataType(); + Expr expr = exprByName.get(columnName); + if (columnType == PrimitiveType.HLL) { + checkHllMapping(columnName, expr); + } + if (columnType == PrimitiveType.BITMAP) { + checkBitmapMapping(columnName, expr, fileGroup.isLoadFromTable()); + } + } + + EtlFileGroup etlFileGroup = null; + if (fileGroup.isLoadFromTable()) { + etlFileGroup = new EtlFileGroup(SourceType.HIVE, hiveDbTableName, hiveTableProperties, + fileGroup.isNegative(), columnMappings, where, partitionIds); + } else { + etlFileGroup = new EtlFileGroup(SourceType.FILE, fileGroup.getFilePaths(), fileFieldNames, + fileGroup.getColumnsFromPath(), fileGroup.getValueSeparator(), + fileGroup.getLineDelimiter(), fileGroup.isNegative(), + fileGroup.getFileFormat(), columnMappings, + where, partitionIds); + } + + return etlFileGroup; + } + + private void checkHllMapping(String columnName, Expr expr) throws LoadException { + if (expr == null) { + throw new LoadException("HLL column func is not assigned. column:" + columnName); + } + + String msg = "HLL column must use hll function, like " + columnName + "=hll_hash(xxx) or " + + columnName + "=hll_empty()"; + if (!(expr instanceof FunctionCallExpr)) { + throw new LoadException(msg); + } + FunctionCallExpr fn = (FunctionCallExpr) expr; + String functionName = fn.getFnName().getFunction(); + if (!functionName.equalsIgnoreCase("hll_hash") + && !functionName.equalsIgnoreCase("hll_empty")) { + throw new LoadException(msg); + } + } + + private void checkBitmapMapping(String columnName, Expr expr, boolean isLoadFromTable) throws LoadException { + if (expr == null) { + throw new LoadException("BITMAP column func is not assigned. column:" + columnName); + } + + String msg = "BITMAP column must use bitmap function, like " + columnName + "=to_bitmap(xxx) or " + + columnName + "=bitmap_hash() or " + columnName + "=bitmap_dict()"; + if (!(expr instanceof FunctionCallExpr)) { + throw new LoadException(msg); + } + FunctionCallExpr fn = (FunctionCallExpr) expr; + String functionName = fn.getFnName().getFunction(); + if (!functionName.equalsIgnoreCase("to_bitmap") + && !functionName.equalsIgnoreCase("bitmap_hash") + && !functionName.equalsIgnoreCase("bitmap_dict")) { + throw new LoadException(msg); + } + + if (functionName.equalsIgnoreCase("bitmap_dict") && !isLoadFromTable) { + throw new LoadException("Bitmap global dict should load data from hive table"); + } + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java new file mode 100644 index 0000000000..311ca3bab7 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.spark.launcher.SparkAppHandle; + +public class SparkPendingTaskAttachment extends TaskAttachment { + private SparkAppHandle handle; + private String appId; + private String outputPath; + + public SparkPendingTaskAttachment(long taskId) { + super(taskId); + } + + public SparkAppHandle getHandle() { + return handle; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public void setHandle(SparkAppHandle handle) { + this.handle = handle; + } + + public String getOutputPath() { + return outputPath; + } + + public void setOutputPath(String outputPath) { + this.outputPath = outputPath; + } + + @Override + public String toString() { + return "SparkPendingTaskAttachment{" + + "appId='" + appId + '\'' + + ", outputPath='" + outputPath + '\'' + + '}'; + } +} diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java index fa3ef9e425..7f3da6c9f0 100644 --- a/fe/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java @@ -53,6 +53,7 @@ import org.apache.doris.load.ExportMgr; import org.apache.doris.load.Load; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.LoadJob; +import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.LoadJobFinalOperation; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.meta.MetaContext; @@ -684,6 +685,11 @@ public class EditLog { catalog.getLoadManager().replayEndLoadJob(operation); break; } + case OperationType.OP_UPDATE_LOAD_JOB: { + LoadJobStateUpdateInfo info = (LoadJobStateUpdateInfo) journal.getData(); + catalog.getLoadManager().replayUpdateLoadJobStateInfo(info); + break; + } case OperationType.OP_CREATE_RESOURCE: { final Resource resource = (Resource) journal.getData(); catalog.getResourceMgr().replayCreateResource(resource); @@ -1266,6 +1272,10 @@ public class EditLog { logEdit(OperationType.OP_END_LOAD_JOB, loadJobFinalOperation); } + public void logUpdateLoadJob(LoadJobStateUpdateInfo info) { + logEdit(OperationType.OP_UPDATE_LOAD_JOB, info); + } + public void logCreateResource(Resource resource) { // TODO(wyb): spark-load //logEdit(OperationType.OP_CREATE_RESOURCE, resource); diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java b/fe/src/main/java/org/apache/doris/persist/OperationType.java index d06fcba8d3..01171e6a8e 100644 --- a/fe/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java @@ -156,7 +156,7 @@ public class OperationType { // this finish op include finished and cancelled public static final short OP_END_LOAD_JOB = 231; // update job info, used by spark load - //public static final short OP_UPDATE_LOAD_JOB = 232; + public static final short OP_UPDATE_LOAD_JOB = 232; // small files 251~260 public static final short OP_CREATE_SMALL_FILE = 251; diff --git a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 10e1ad7f30..8e9cbe2db8 100644 --- a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -26,6 +26,8 @@ import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.SparkResource; +import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; +import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -102,6 +104,12 @@ public class GsonUtils { .registerSubtype(RollupJobV2.class, RollupJobV2.class.getSimpleName()) .registerSubtype(SchemaChangeJobV2.class, SchemaChangeJobV2.class.getSimpleName()); + // runtime adapter for class "LoadJobStateUpdateInfo" + private static RuntimeTypeAdapterFactory loadJobStateUpdateInfoTypeAdapterFactory + = RuntimeTypeAdapterFactory + .of(LoadJobStateUpdateInfo.class, "clazz") + .registerSubtype(SparkLoadJobStateUpdateInfo.class, SparkLoadJobStateUpdateInfo.class.getSimpleName()); + // the builder of GSON instance. // Add any other adapters if necessary. private static final GsonBuilder GSON_BUILDER = new GsonBuilder() @@ -113,7 +121,8 @@ public class GsonUtils { .registerTypeAdapterFactory(columnTypeAdapterFactory) .registerTypeAdapterFactory(distributionInfoTypeAdapterFactory) .registerTypeAdapterFactory(resourceTypeAdapterFactory) - .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory); + .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory) + .registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory); // this instance is thread-safe. public static final Gson GSON = GSON_BUILDER.create(); diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 5df36ae9ab..a4427eddfc 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -354,7 +354,7 @@ public class BrokerScanNode extends LoadScanNode { for (BrokerFileGroup fileGroup : fileGroups) { List fileStatuses = Lists.newArrayList(); for (String path : fileGroup.getFilePaths()) { - BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses); + BrokerUtil.parseFile(path, brokerDesc, fileStatuses); } fileStatusesList.add(fileStatuses); filesAdded += fileStatuses.size(); diff --git a/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java b/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java index 55a6e00c00..4be5d0fa7e 100644 --- a/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java +++ b/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java @@ -20,12 +20,44 @@ package org.apache.doris.common.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.BrokerMgr; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.FsBroker; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.GenericPool; import org.apache.doris.common.UserException; +import org.apache.doris.thrift.TBrokerCloseReaderRequest; +import org.apache.doris.thrift.TBrokerCloseWriterRequest; +import org.apache.doris.thrift.TBrokerDeletePathRequest; +import org.apache.doris.thrift.TBrokerFD; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TBrokerListPathRequest; +import org.apache.doris.thrift.TBrokerListResponse; +import org.apache.doris.thrift.TBrokerOpenReaderRequest; +import org.apache.doris.thrift.TBrokerOpenReaderResponse; +import org.apache.doris.thrift.TBrokerOpenWriterRequest; +import org.apache.doris.thrift.TBrokerOpenWriterResponse; +import org.apache.doris.thrift.TBrokerOperationStatus; +import org.apache.doris.thrift.TBrokerOperationStatusCode; +import org.apache.doris.thrift.TBrokerPReadRequest; +import org.apache.doris.thrift.TBrokerPWriteRequest; +import org.apache.doris.thrift.TBrokerReadResponse; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPaloBrokerService; import com.google.common.collect.Lists; - +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mock; +import mockit.Mocked; +import mockit.MockUp; +import org.apache.thrift.TException; +import org.junit.Assert; import org.junit.Test; +import java.io.UnsupportedEncodingException; import java.util.Collections; import java.util.List; @@ -122,4 +154,172 @@ public class BrokerUtilTest { } } -} + + @Test + public void testReadFile(@Mocked TPaloBrokerService.Client client, @Mocked Catalog catalog, + @Injectable BrokerMgr brokerMgr) + throws TException, UserException, UnsupportedEncodingException { + // list response + TBrokerListResponse listResponse = new TBrokerListResponse(); + TBrokerOperationStatus status = new TBrokerOperationStatus(); + status.statusCode = TBrokerOperationStatusCode.OK; + listResponse.opStatus = status; + List files = Lists.newArrayList(); + String filePath = "hdfs://127.0.0.1:10000/doris/jobs/1/label6/9/dpp_result.json"; + files.add(new TBrokerFileStatus(filePath, false, 10, false)); + listResponse.files = files; + + // open reader response + TBrokerOpenReaderResponse openReaderResponse = new TBrokerOpenReaderResponse(); + openReaderResponse.opStatus = status; + openReaderResponse.fd = new TBrokerFD(1, 2); + + // read response + String dppResultStr = "{'normal_rows': 10, 'abnormal_rows': 0, 'failed_reason': 'etl job failed'}"; + TBrokerReadResponse readResponse = new TBrokerReadResponse(); + readResponse.opStatus = status; + readResponse.setData(dppResultStr.getBytes("UTF-8")); + + FsBroker fsBroker = new FsBroker("127.0.0.1", 99999); + + new MockUp>() { + @Mock + public TPaloBrokerService.Client borrowObject(TNetworkAddress address) throws Exception { + return client; + } + + @Mock + public void returnObject(TNetworkAddress address, TPaloBrokerService.Client object) { + return; + } + + @Mock + public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client object) { + return; + } + }; + + new Expectations() { + { + catalog.getBrokerMgr(); + result = brokerMgr; + brokerMgr.getBroker(anyString, anyString); + result = fsBroker; + client.listPath((TBrokerListPathRequest) any); + result = listResponse; + client.openReader((TBrokerOpenReaderRequest) any); + result = openReaderResponse; + client.pread((TBrokerPReadRequest) any); + result = readResponse; + times = 1; + client.closeReader((TBrokerCloseReaderRequest) any); + result = status; + } + }; + + BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap()); + byte[] data = BrokerUtil.readFile(filePath, brokerDesc); + String readStr = new String(data, "UTF-8"); + Assert.assertEquals(dppResultStr, readStr); + } + + @Test + public void testWriteFile(@Mocked TPaloBrokerService.Client client, @Mocked Catalog catalog, + @Injectable BrokerMgr brokerMgr) + throws TException, UserException, UnsupportedEncodingException { + // open writer response + TBrokerOpenWriterResponse openWriterResponse = new TBrokerOpenWriterResponse(); + TBrokerOperationStatus status = new TBrokerOperationStatus(); + status.statusCode = TBrokerOperationStatusCode.OK; + openWriterResponse.opStatus = status; + openWriterResponse.fd = new TBrokerFD(1, 2); + FsBroker fsBroker = new FsBroker("127.0.0.1", 99999); + + new MockUp>() { + @Mock + public TPaloBrokerService.Client borrowObject(TNetworkAddress address) throws Exception { + return client; + } + + @Mock + public void returnObject(TNetworkAddress address, TPaloBrokerService.Client object) { + return; + } + + @Mock + public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client object) { + return; + } + }; + + new Expectations() { + { + catalog.getBrokerMgr(); + result = brokerMgr; + brokerMgr.getBroker(anyString, anyString); + result = fsBroker; + client.openWriter((TBrokerOpenWriterRequest) any); + result = openWriterResponse; + client.pwrite((TBrokerPWriteRequest) any); + result = status; + times = 1; + client.closeWriter((TBrokerCloseWriterRequest) any); + result = status; + } + }; + + BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap()); + byte[] configs = "{'label': 'label0'}".getBytes("UTF-8"); + String destFilePath = "hdfs://127.0.0.1:10000/doris/jobs/1/label6/9/configs/jobconfig.json"; + try { + BrokerUtil.writeFile(configs, destFilePath, brokerDesc); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testDeletePath(@Mocked TPaloBrokerService.Client client, @Mocked Catalog catalog, + @Injectable BrokerMgr brokerMgr) throws AnalysisException, TException { + // delete response + TBrokerOperationStatus status = new TBrokerOperationStatus(); + status.statusCode = TBrokerOperationStatusCode.OK; + FsBroker fsBroker = new FsBroker("127.0.0.1", 99999); + + new MockUp>() { + @Mock + public TPaloBrokerService.Client borrowObject(TNetworkAddress address) throws Exception { + return client; + } + + @Mock + public void returnObject(TNetworkAddress address, TPaloBrokerService.Client object) { + return; + } + + @Mock + public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client object) { + return; + } + }; + + new Expectations() { + { + catalog.getBrokerMgr(); + result = brokerMgr; + brokerMgr.getBroker(anyString, anyString); + result = fsBroker; + client.deletePath((TBrokerDeletePathRequest) any); + result = status; + times = 1; + } + }; + + try { + BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap()); + BrokerUtil.deletePath("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", brokerDesc); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } +} \ No newline at end of file diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java index 9b15819dd9..6e9f81e3fc 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java @@ -64,7 +64,7 @@ public class BrokerLoadPendingTaskTest { }; new MockUp() { @Mock - public void parseBrokerFile(String path, BrokerDesc brokerDesc, List fileStatuses) { + public void parseFile(String path, BrokerDesc brokerDesc, List fileStatuses) { fileStatuses.add(tBrokerFileStatus); } }; diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java index dd6993223e..f324b76c9b 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java @@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.metric.LongCounterMetric; @@ -122,7 +123,11 @@ public class LoadJobTest { } }; - loadJob.execute(); + try { + loadJob.execute(); + } catch (LoadException e) { + Assert.fail(e.getMessage()); + } Assert.assertEquals(JobState.LOADING, loadJob.getState()); Assert.assertEquals(1, loadJob.getTransactionId()); diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java new file mode 100644 index 0000000000..374bbb280a --- /dev/null +++ b/fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java @@ -0,0 +1,326 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.DataDescription; +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.analysis.SingleRangePartitionDesc; +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.SinglePartitionInfo; +import org.apache.doris.catalog.SparkResource; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.BrokerFileGroupAggInfo; +import org.apache.doris.load.loadv2.etl.EtlJobConfig; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartition; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartitionInfo; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; + +import java.util.List; +import java.util.Map; + +public class SparkLoadPendingTaskTest { + + @Test + public void testExecuteTask(@Injectable SparkLoadJob sparkLoadJob, + @Injectable SparkResource resource, + @Injectable BrokerDesc brokerDesc, + @Mocked Catalog catalog, + @Injectable Database database, + @Injectable OlapTable table) throws LoadException { + long dbId = 0L; + long tableId = 1L; + + // columns + List columns = Lists.newArrayList(); + columns.add(new Column("c1", Type.BIGINT, true, null, false, null, "")); + + // indexes + Map> indexIdToSchema = Maps.newHashMap(); + long indexId = 3L; + indexIdToSchema.put(indexId, columns); + + // partition and distribution infos + long partitionId = 2L; + DistributionInfo distributionInfo = new HashDistributionInfo(2, Lists.newArrayList(columns.get(0))); + PartitionInfo partitionInfo = new SinglePartitionInfo(); + Partition partition = new Partition(partitionId, "p1", null, distributionInfo); + List partitions = Lists.newArrayList(partition); + + // file group + Map> aggKeyToFileGroups = Maps.newHashMap(); + List brokerFileGroups = Lists.newArrayList(); + DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), + null, null, null, false, null); + BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc); + brokerFileGroups.add(brokerFileGroup); + BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null); + aggKeyToFileGroups.put(aggKey, brokerFileGroups); + + new Expectations() { + { + catalog.getDb(dbId); + result = database; + database.getTable(tableId); + result = table; + table.getPartitions(); + result = partitions; + table.getIndexIdToSchema(); + result = indexIdToSchema; + table.getDefaultDistributionInfo(); + result = distributionInfo; + table.getSchemaHashByIndexId(indexId); + result = 123; + table.getPartitionInfo(); + result = partitionInfo; + table.getPartition(partitionId); + result = partition; + table.getKeysTypeByIndexId(indexId); + result = KeysType.DUP_KEYS; + table.getBaseIndexId(); + result = indexId; + } + }; + + String appId = "application_15888888888_0088"; + new MockUp() { + @Mock + public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, + SparkResource resource, BrokerDesc brokerDesc, + SparkPendingTaskAttachment attachment) throws LoadException { + attachment.setAppId(appId); + } + }; + + SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc); + task.init(); + SparkPendingTaskAttachment attachment = Deencapsulation.getField(task, "attachment"); + Assert.assertEquals(null, attachment.getAppId()); + task.executeTask(); + Assert.assertEquals(appId, attachment.getAppId()); + } + + @Test(expected = LoadException.class) + public void testNoDb(@Injectable SparkLoadJob sparkLoadJob, + @Injectable SparkResource resource, + @Injectable BrokerDesc brokerDesc, + @Mocked Catalog catalog) throws LoadException { + long dbId = 0L; + + new Expectations() { + { + catalog.getDb(dbId); + result = null; + } + }; + + SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, null, resource, brokerDesc); + task.init(); + } + + @Test(expected = LoadException.class) + public void testNoTable(@Injectable SparkLoadJob sparkLoadJob, + @Injectable SparkResource resource, + @Injectable BrokerDesc brokerDesc, + @Mocked Catalog catalog, + @Injectable Database database) throws LoadException { + long dbId = 0L; + long tableId = 1L; + + Map> aggKeyToFileGroups = Maps.newHashMap(); + List brokerFileGroups = Lists.newArrayList(); + DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), + null, null, null, false, null); + BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc); + brokerFileGroups.add(brokerFileGroup); + BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null); + aggKeyToFileGroups.put(aggKey, brokerFileGroups); + + new Expectations() { + { + catalog.getDb(dbId); + result = database; + database.getTable(tableId); + result = null; + } + }; + + SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc); + task.init(); + } + + @Test + public void testRangePartitionHashDistribution(@Injectable SparkLoadJob sparkLoadJob, + @Injectable SparkResource resource, + @Injectable BrokerDesc brokerDesc, + @Mocked Catalog catalog, + @Injectable Database database, + @Injectable OlapTable table) throws LoadException, DdlException, AnalysisException { + long dbId = 0L; + long tableId = 1L; + + // c1 is partition column, c2 is distribution column + List columns = Lists.newArrayList(); + columns.add(new Column("c1", Type.INT, true, null, false, null, "")); + columns.add(new Column("c2", ScalarType.createVarchar(10), true, null, false, null, "")); + columns.add(new Column("c3", Type.INT, false, AggregateType.SUM, false, null, "")); + + // indexes + Map> indexIdToSchema = Maps.newHashMap(); + long index1Id = 3L; + indexIdToSchema.put(index1Id, columns); + long index2Id = 4L; + indexIdToSchema.put(index2Id, Lists.newArrayList(columns.get(0), columns.get(2))); + + // partition and distribution info + long partition1Id = 2L; + long partition2Id = 5L; + int distributionColumnIndex = 1; + DistributionInfo distributionInfo = new HashDistributionInfo(3, Lists.newArrayList(columns.get(distributionColumnIndex))); + Partition partition1 = new Partition(partition1Id, "p1", null, + distributionInfo); + Partition partition2 = new Partition(partition2Id, "p2", null, + new HashDistributionInfo(4, Lists.newArrayList(columns.get(distributionColumnIndex)))); + int partitionColumnIndex = 0; + List partitions = Lists.newArrayList(partition1, partition2); + RangePartitionInfo partitionInfo = new RangePartitionInfo(Lists.newArrayList(columns.get(partitionColumnIndex))); + PartitionKeyDesc partitionKeyDesc1 = new PartitionKeyDesc(Lists.newArrayList(new PartitionValue("10"))); + SingleRangePartitionDesc partitionDesc1 = new SingleRangePartitionDesc(false, "p1", partitionKeyDesc1, null); + partitionDesc1.analyze(1, null); + partitionInfo.handleNewSinglePartitionDesc(partitionDesc1, partition1Id, false); + PartitionKeyDesc partitionKeyDesc2 = new PartitionKeyDesc(Lists.newArrayList(new PartitionValue("20"))); + SingleRangePartitionDesc partitionDesc2 = new SingleRangePartitionDesc(false, "p2", partitionKeyDesc2, null); + partitionDesc2.analyze(1, null); + partitionInfo.handleNewSinglePartitionDesc(partitionDesc2, partition2Id, false); + + // file group + Map> aggKeyToFileGroups = Maps.newHashMap(); + List brokerFileGroups = Lists.newArrayList(); + DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), + null, null, null, false, null); + BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc); + brokerFileGroups.add(brokerFileGroup); + BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null); + aggKeyToFileGroups.put(aggKey, brokerFileGroups); + + new Expectations() { + { + catalog.getDb(dbId); + result = database; + database.getTable(tableId); + result = table; + table.getPartitions(); + result = partitions; + table.getIndexIdToSchema(); + result = indexIdToSchema; + table.getDefaultDistributionInfo(); + result = distributionInfo; + table.getSchemaHashByIndexId(index1Id); + result = 123; + table.getSchemaHashByIndexId(index2Id); + result = 234; + table.getPartitionInfo(); + result = partitionInfo; + table.getPartition(partition1Id); + result = partition1; + table.getPartition(partition2Id); + result = partition2; + table.getKeysTypeByIndexId(index1Id); + result = KeysType.AGG_KEYS; + table.getKeysTypeByIndexId(index2Id); + result = KeysType.AGG_KEYS; + table.getBaseIndexId(); + result = index1Id; + } + }; + + SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc); + EtlJobConfig etlJobConfig = Deencapsulation.getField(task, "etlJobConfig"); + Assert.assertEquals(null, etlJobConfig); + task.init(); + etlJobConfig = Deencapsulation.getField(task, "etlJobConfig"); + Assert.assertTrue(etlJobConfig != null); + + // check table id + Map idToEtlTable = etlJobConfig.tables; + Assert.assertEquals(1, idToEtlTable.size()); + Assert.assertTrue(idToEtlTable.containsKey(tableId)); + + // check indexes + EtlTable etlTable = idToEtlTable.get(tableId); + List etlIndexes = etlTable.indexes; + Assert.assertEquals(2, etlIndexes.size()); + Assert.assertEquals(index1Id, etlIndexes.get(0).indexId); + Assert.assertEquals(index2Id, etlIndexes.get(1).indexId); + + // check base index columns + EtlIndex baseIndex = etlIndexes.get(0); + Assert.assertTrue(baseIndex.isBaseIndex); + Assert.assertEquals(3, baseIndex.columns.size()); + for (int i = 0; i < columns.size(); i++) { + Assert.assertEquals(columns.get(i).getName(), baseIndex.columns.get(i).columnName); + } + Assert.assertEquals("AGGREGATE", baseIndex.indexType); + + // check partitions + EtlPartitionInfo etlPartitionInfo = etlTable.partitionInfo; + Assert.assertEquals("RANGE", etlPartitionInfo.partitionType); + List partitionColumns = etlPartitionInfo.partitionColumnRefs; + Assert.assertEquals(1, partitionColumns.size()); + Assert.assertEquals(columns.get(partitionColumnIndex).getName(), partitionColumns.get(0)); + List distributionColumns = etlPartitionInfo.distributionColumnRefs; + Assert.assertEquals(1, distributionColumns.size()); + Assert.assertEquals(columns.get(distributionColumnIndex).getName(), distributionColumns.get(0)); + List etlPartitions = etlPartitionInfo.partitions; + Assert.assertEquals(2, etlPartitions.size()); + + // check file group + List etlFileGroups = etlTable.fileGroups; + Assert.assertEquals(1, etlFileGroups.size()); + } +}