diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java index e7ea689ffe..0bda48bf47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java @@ -27,18 +27,17 @@ import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.load.loadv2.SparkRepository; import org.apache.doris.load.loadv2.SparkYarnConfigFiles; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; + import java.io.File; import java.util.Map; -import com.google.gson.annotations.SerializedName; - /** * Spark resource for etl or query. * working_dir and broker[.xxx] are optional and used in spark ETL. @@ -109,8 +108,9 @@ public class SparkResource extends Resource { this(name, Maps.newHashMap(), null, null, Maps.newHashMap()); } - private SparkResource(String name, Map sparkConfigs, String workingDir, String broker, - Map brokerProperties) { + // "public" for testing + public SparkResource(String name, Map sparkConfigs, String workingDir, String broker, + Map brokerProperties) { super(name, ResourceType.SPARK); this.sparkConfigs = sparkConfigs; this.workingDir = workingDir; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 5d40b95606..726d5a34b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -87,6 +87,9 @@ import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -94,9 +97,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.io.DataInput; import java.io.DataOutput; import java.io.File; @@ -168,7 +168,15 @@ public class SparkLoadJob extends BulkLoadJob { * @throws DdlException */ private void setResourceInfo() throws DdlException { - // spark resource + if (resourceDesc == null) { + // resourceDesc is null means this is a replay thread. + // And resourceDesc is not persisted, so it should be null. + // sparkResource and brokerDesc are both persisted, so no need to handle them + // in replay process. + return; + } + + // set sparkResource and brokerDesc String resourceName = resourceDesc.getName(); Resource oriResource = Catalog.getCurrentCatalog().getResourceMgr().getResource(resourceName); if (oriResource == null) { @@ -177,7 +185,6 @@ public class SparkLoadJob extends BulkLoadJob { sparkResource = ((SparkResource) oriResource).getCopiedResource(); sparkResource.update(resourceDesc); - // broker desc Map brokerProperties = sparkResource.getBrokerPropertiesWithoutPrefix(); brokerDesc = new BrokerDesc(sparkResource.getBroker(), brokerProperties); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 957fdb9201..1046e5dcd6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -17,10 +17,6 @@ package org.apache.doris.load.loadv2; -import mockit.Expectations; -import mockit.Injectable; -import mockit.Mocked; - import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.LabelName; @@ -44,6 +40,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DataQualityException; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; @@ -53,6 +50,7 @@ import org.apache.doris.load.EtlStatus; import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.etl.EtlJobConfig; +import org.apache.doris.meta.MetaContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; @@ -64,13 +62,13 @@ import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -81,6 +79,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; + public class SparkLoadJobTest { private long dbId; private String dbName; @@ -510,4 +512,68 @@ public class SparkLoadJobTest { file.delete(); } } + + @Test + public void testSparkLoadJobPersist(@Mocked Catalog catalog, @Mocked Database db, + @Mocked Table table, + @Mocked ResourceMgr resourceMgr) throws Exception { + long dbId = 1000L; + SparkResource sparkResource = new SparkResource("my_spark", Maps.newHashMap(), "/path/to/", "bos", + Maps.newHashMap()); + new Expectations() { + { + catalog.getDb(dbId); + result = db; + catalog.getResourceMgr(); + result = resourceMgr; + //db.getTable(anyLong); + //result = table; + //table.getName(); + //result = "table1"; + resourceMgr.getResource(anyString); + result = sparkResource; + Catalog.getCurrentCatalogJournalVersion(); + result = FeMetaVersion.VERSION_CURRENT; + } + }; + + String label = "label1"; + ResourceDesc resourceDesc = new ResourceDesc("my_spark", Maps.newHashMap()); + String oriStmt = "LOAD LABEL db1.label1\n" + + "(\n" + + "DATA INFILE(\"hdfs://127.0.0.1:8000/user/palo/data/input/file\")\n" + + "INTO TABLE `my_table`\n" + + "WHERE k1 > 10\n" + + ")\n" + + "WITH RESOURCE 'my_spark';"; + OriginStatement originStmt = new OriginStatement(oriStmt, 0); + UserIdentity userInfo = UserIdentity.ADMIN; + SparkLoadJob sparkLoadJob = new SparkLoadJob(dbId, label, resourceDesc, originStmt, userInfo); + sparkLoadJob.setJobProperties(Maps.newHashMap()); + + MetaContext metaContext = new MetaContext(); + metaContext.setMetaVersion(FeMetaVersion.VERSION_CURRENT); + metaContext.setThreadLocalInfo(); + + // 1. Write objects to file + File file = new File("./testSparkLoadJobPersist"); + file.createNewFile(); + DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); + sparkLoadJob.write(dos); + + dos.flush(); + dos.close(); + + // 2. Read objects from file + DataInputStream dis = new DataInputStream(new FileInputStream(file)); + + SparkLoadJob sparkLoadJob2 = (SparkLoadJob) SparkLoadJob.read(dis); + Assert.assertEquals("my_spark", sparkLoadJob2.getResourceName()); + Assert.assertEquals(label, sparkLoadJob2.getLabel()); + Assert.assertEquals(dbId, sparkLoadJob2.getDbId()); + + // 3. delete files + dis.close(); + file.delete(); + } }