From 779c4629b4ccdda9b2db00303b187f752dcd7e4b Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Sun, 7 Feb 2021 11:25:06 +0800 Subject: [PATCH] [Bug] Fix bug of NPE when replaying spark load job. (#5367) * [Bug] Fix bug of NPE when replaying spark load job. The resourceDesc in spark load job may be null because it is not persisted. So when replaying the job, we should check it. * fix * add ut --- .../apache/doris/catalog/SparkResource.java | 16 ++-- .../doris/load/loadv2/SparkLoadJob.java | 17 ++-- .../doris/load/loadv2/SparkLoadJobTest.java | 80 +++++++++++++++++-- 3 files changed, 93 insertions(+), 20 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java index e7ea689ffe..0bda48bf47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java @@ -27,18 +27,17 @@ import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.load.loadv2.SparkRepository; import org.apache.doris.load.loadv2.SparkYarnConfigFiles; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; + import java.io.File; import java.util.Map; -import com.google.gson.annotations.SerializedName; - /** * Spark resource for etl or query. * working_dir and broker[.xxx] are optional and used in spark ETL. @@ -109,8 +108,9 @@ public class SparkResource extends Resource { this(name, Maps.newHashMap(), null, null, Maps.newHashMap()); } - private SparkResource(String name, Map sparkConfigs, String workingDir, String broker, - Map brokerProperties) { + // "public" for testing + public SparkResource(String name, Map sparkConfigs, String workingDir, String broker, + Map brokerProperties) { super(name, ResourceType.SPARK); this.sparkConfigs = sparkConfigs; this.workingDir = workingDir; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 5d40b95606..726d5a34b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -87,6 +87,9 @@ import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -94,9 +97,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.io.DataInput; import java.io.DataOutput; import java.io.File; @@ -168,7 +168,15 @@ public class SparkLoadJob extends BulkLoadJob { * @throws DdlException */ private void setResourceInfo() throws DdlException { - // spark resource + if (resourceDesc == null) { + // resourceDesc is null means this is a replay thread. + // And resourceDesc is not persisted, so it should be null. + // sparkResource and brokerDesc are both persisted, so no need to handle them + // in replay process. + return; + } + + // set sparkResource and brokerDesc String resourceName = resourceDesc.getName(); Resource oriResource = Catalog.getCurrentCatalog().getResourceMgr().getResource(resourceName); if (oriResource == null) { @@ -177,7 +185,6 @@ public class SparkLoadJob extends BulkLoadJob { sparkResource = ((SparkResource) oriResource).getCopiedResource(); sparkResource.update(resourceDesc); - // broker desc Map brokerProperties = sparkResource.getBrokerPropertiesWithoutPrefix(); brokerDesc = new BrokerDesc(sparkResource.getBroker(), brokerProperties); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 957fdb9201..1046e5dcd6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -17,10 +17,6 @@ package org.apache.doris.load.loadv2; -import mockit.Expectations; -import mockit.Injectable; -import mockit.Mocked; - import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.LabelName; @@ -44,6 +40,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DataQualityException; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; @@ -53,6 +50,7 @@ import org.apache.doris.load.EtlStatus; import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.etl.EtlJobConfig; +import org.apache.doris.meta.MetaContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; @@ -64,13 +62,13 @@ import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -81,6 +79,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; + public class SparkLoadJobTest { private long dbId; private String dbName; @@ -510,4 +512,68 @@ public class SparkLoadJobTest { file.delete(); } } + + @Test + public void testSparkLoadJobPersist(@Mocked Catalog catalog, @Mocked Database db, + @Mocked Table table, + @Mocked ResourceMgr resourceMgr) throws Exception { + long dbId = 1000L; + SparkResource sparkResource = new SparkResource("my_spark", Maps.newHashMap(), "/path/to/", "bos", + Maps.newHashMap()); + new Expectations() { + { + catalog.getDb(dbId); + result = db; + catalog.getResourceMgr(); + result = resourceMgr; + //db.getTable(anyLong); + //result = table; + //table.getName(); + //result = "table1"; + resourceMgr.getResource(anyString); + result = sparkResource; + Catalog.getCurrentCatalogJournalVersion(); + result = FeMetaVersion.VERSION_CURRENT; + } + }; + + String label = "label1"; + ResourceDesc resourceDesc = new ResourceDesc("my_spark", Maps.newHashMap()); + String oriStmt = "LOAD LABEL db1.label1\n" + + "(\n" + + "DATA INFILE(\"hdfs://127.0.0.1:8000/user/palo/data/input/file\")\n" + + "INTO TABLE `my_table`\n" + + "WHERE k1 > 10\n" + + ")\n" + + "WITH RESOURCE 'my_spark';"; + OriginStatement originStmt = new OriginStatement(oriStmt, 0); + UserIdentity userInfo = UserIdentity.ADMIN; + SparkLoadJob sparkLoadJob = new SparkLoadJob(dbId, label, resourceDesc, originStmt, userInfo); + sparkLoadJob.setJobProperties(Maps.newHashMap()); + + MetaContext metaContext = new MetaContext(); + metaContext.setMetaVersion(FeMetaVersion.VERSION_CURRENT); + metaContext.setThreadLocalInfo(); + + // 1. Write objects to file + File file = new File("./testSparkLoadJobPersist"); + file.createNewFile(); + DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); + sparkLoadJob.write(dos); + + dos.flush(); + dos.close(); + + // 2. Read objects from file + DataInputStream dis = new DataInputStream(new FileInputStream(file)); + + SparkLoadJob sparkLoadJob2 = (SparkLoadJob) SparkLoadJob.read(dis); + Assert.assertEquals("my_spark", sparkLoadJob2.getResourceName()); + Assert.assertEquals(label, sparkLoadJob2.getLabel()); + Assert.assertEquals(dbId, sparkLoadJob2.getDbId()); + + // 3. delete files + dis.close(); + file.delete(); + } }