diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java index ac761b5fb6..98ebe54b96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java @@ -32,6 +32,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -80,6 +81,10 @@ public class SparkResource extends Resource { private static final String SPARK_YARN_RESOURCE_MANAGER_ADDRESS = "spark.hadoop.yarn.resourcemanager.address"; private static final String SPARK_FS_DEFAULT_FS = "spark.hadoop.fs.defaultFS"; private static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.address"; + private static final String SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED = "spark.hadoop.yarn.resourcemanager.ha.enabled"; + private static final String SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS = "spark.hadoop.yarn.resourcemanager.ha.rm-ids"; + private static final String YARN_RESOURCE_MANAGER_ADDRESS_FOMART = "spark.hadoop.yarn.resourcemanager.address.%s"; + private static final String YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT = "spark.hadoop.yarn.resourcemanager.hostname.%s"; public enum DeployMode { CLUSTER, @@ -283,11 +288,31 @@ public class SparkResource extends Resource { throw new DdlException("Missing " + SPARK_SUBMIT_DEPLOY_MODE + " in properties"); } // if deploy machines do not set HADOOP_CONF_DIR env, we should set these configs blow - if ((!sparkConfigs.containsKey(SPARK_YARN_RESOURCE_MANAGER_ADDRESS) - || !sparkConfigs.containsKey(SPARK_FS_DEFAULT_FS)) - && isYarnMaster()) { - throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_ADDRESS + " and " + SPARK_FS_DEFAULT_FS - + ") in yarn master"); + if (isYarnMaster()) { + if (!sparkConfigs.containsKey(SPARK_FS_DEFAULT_FS)) { + throw new DdlException("Missing (" + SPARK_FS_DEFAULT_FS + ") in yarn master"); + } + + String haEnabled = sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED); + if (StringUtils.isNotEmpty(haEnabled) && "true".equals(haEnabled)) { + if (StringUtils.isEmpty(SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS)) { + throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS + ") in yarn master, " + + "when " + SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED + "=true."); + } + + String[] haIds = sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS).split(","); + for (String haId : haIds) { + String addressKey = String.format(YARN_RESOURCE_MANAGER_ADDRESS_FOMART, haId); + String hostnameKey = String.format(YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT, haId); + if (!sparkConfigs.containsKey(addressKey) && !sparkConfigs.containsKey(hostnameKey)) { + throw new DdlException("Missing " + addressKey + " or " + hostnameKey + " in yarn master, " + + "when " + SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED + "=true."); + } + } + } else if (!sparkConfigs.containsKey(SPARK_YARN_RESOURCE_MANAGER_ADDRESS)) { + throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_ADDRESS + ") in yarn master, " + + "or not turned on ha."); + } } // check working dir and broker diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java index 9554e2a3c8..0184acc5c2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java @@ -118,6 +118,22 @@ public class SparkResourceTest { BaseProcResult result = new BaseProcResult(); resource.getProcNodeData(result); Assert.assertEquals(9, result.getRows().size()); + + properties.clear(); + properties.put("type", type); + properties.put("spark.master", "yarn"); + properties.put("spark.submit.deployMode", "cluster"); + properties.put("spark.hadoop.yarn.resourcemanager.ha.enabled", "true"); + properties.put("spark.hadoop.yarn.resourcemanager.ha.rm-ids", "rm1,rm2"); + properties.put("spark.hadoop.yarn.resourcemanager.hostname.rm1", "host1"); + properties.put("spark.hadoop.yarn.resourcemanager.hostname.rm2", "host2"); + properties.put("spark.hadoop.fs.defaultFS", "hdfs://127.0.0.1:10000"); + stmt = new CreateResourceStmt(true, false, name, properties); + stmt.analyze(analyzer); + resource = (SparkResource) Resource.fromStmt(stmt); + Assert.assertTrue(resource.isYarnMaster()); + map = resource.getSparkConfigs(); + Assert.assertEquals(7, map.size()); } @Test