[Bug] Fix bug of NPE when replaying spark load job. (#5367)

* [Bug] Fix bug of NPE when replaying spark load job.

The resourceDesc in spark load job may be null because it is not persisted.
So when replaying the job, we should check it.

* fix

* add ut
This commit is contained in:
Mingyu Chen
2021-02-07 11:25:06 +08:00
committed by GitHub
parent 9c022e3764
commit 779c4629b4
3 changed files with 93 additions and 20 deletions

View File

@ -27,18 +27,17 @@ import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.load.loadv2.SparkRepository;
import org.apache.doris.load.loadv2.SparkYarnConfigFiles;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import java.io.File;
import java.util.Map;
import com.google.gson.annotations.SerializedName;
/**
* Spark resource for etl or query.
* working_dir and broker[.xxx] are optional and used in spark ETL.
@ -109,8 +108,9 @@ public class SparkResource extends Resource {
this(name, Maps.newHashMap(), null, null, Maps.newHashMap());
}
private SparkResource(String name, Map<String, String> sparkConfigs, String workingDir, String broker,
Map<String, String> brokerProperties) {
// "public" for testing
public SparkResource(String name, Map<String, String> sparkConfigs, String workingDir, String broker,
Map<String, String> brokerProperties) {
super(name, ResourceType.SPARK);
this.sparkConfigs = sparkConfigs;
this.workingDir = workingDir;

View File

@ -87,6 +87,9 @@ import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@ -94,9 +97,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
@ -168,7 +168,15 @@ public class SparkLoadJob extends BulkLoadJob {
* @throws DdlException
*/
private void setResourceInfo() throws DdlException {
// spark resource
if (resourceDesc == null) {
// resourceDesc is null means this is a replay thread.
// And resourceDesc is not persisted, so it should be null.
// sparkResource and brokerDesc are both persisted, so no need to handle them
// in replay process.
return;
}
// set sparkResource and brokerDesc
String resourceName = resourceDesc.getName();
Resource oriResource = Catalog.getCurrentCatalog().getResourceMgr().getResource(resourceName);
if (oriResource == null) {
@ -177,7 +185,6 @@ public class SparkLoadJob extends BulkLoadJob {
sparkResource = ((SparkResource) oriResource).getCopiedResource();
sparkResource.update(resourceDesc);
// broker desc
Map<String, String> brokerProperties = sparkResource.getBrokerPropertiesWithoutPrefix();
brokerDesc = new BrokerDesc(sparkResource.getBroker(), brokerProperties);
}

View File

@ -17,10 +17,6 @@
package org.apache.doris.load.loadv2;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.LabelName;
@ -44,6 +40,7 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
@ -53,6 +50,7 @@ import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
@ -64,13 +62,13 @@ import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
@ -81,6 +79,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
public class SparkLoadJobTest {
private long dbId;
private String dbName;
@ -510,4 +512,68 @@ public class SparkLoadJobTest {
file.delete();
}
}
@Test
public void testSparkLoadJobPersist(@Mocked Catalog catalog, @Mocked Database db,
@Mocked Table table,
@Mocked ResourceMgr resourceMgr) throws Exception {
long dbId = 1000L;
SparkResource sparkResource = new SparkResource("my_spark", Maps.newHashMap(), "/path/to/", "bos",
Maps.newHashMap());
new Expectations() {
{
catalog.getDb(dbId);
result = db;
catalog.getResourceMgr();
result = resourceMgr;
//db.getTable(anyLong);
//result = table;
//table.getName();
//result = "table1";
resourceMgr.getResource(anyString);
result = sparkResource;
Catalog.getCurrentCatalogJournalVersion();
result = FeMetaVersion.VERSION_CURRENT;
}
};
String label = "label1";
ResourceDesc resourceDesc = new ResourceDesc("my_spark", Maps.newHashMap());
String oriStmt = "LOAD LABEL db1.label1\n" +
"(\n" +
"DATA INFILE(\"hdfs://127.0.0.1:8000/user/palo/data/input/file\")\n" +
"INTO TABLE `my_table`\n" +
"WHERE k1 > 10\n" +
")\n" +
"WITH RESOURCE 'my_spark';";
OriginStatement originStmt = new OriginStatement(oriStmt, 0);
UserIdentity userInfo = UserIdentity.ADMIN;
SparkLoadJob sparkLoadJob = new SparkLoadJob(dbId, label, resourceDesc, originStmt, userInfo);
sparkLoadJob.setJobProperties(Maps.newHashMap());
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeMetaVersion.VERSION_CURRENT);
metaContext.setThreadLocalInfo();
// 1. Write objects to file
File file = new File("./testSparkLoadJobPersist");
file.createNewFile();
DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));
sparkLoadJob.write(dos);
dos.flush();
dos.close();
// 2. Read objects from file
DataInputStream dis = new DataInputStream(new FileInputStream(file));
SparkLoadJob sparkLoadJob2 = (SparkLoadJob) SparkLoadJob.read(dis);
Assert.assertEquals("my_spark", sparkLoadJob2.getResourceName());
Assert.assertEquals(label, sparkLoadJob2.getLabel());
Assert.assertEquals(dbId, sparkLoadJob2.getDbId());
// 3. delete files
dis.close();
file.delete();
}
}