[Enhancement](spark load)Support for RM HA (#15000)

Adding RM HA configuration to the spark load.
Spark can accept HA parameters via config, we just need to accept it in the DDL

CREATE EXTERNAL RESOURCE spark_resource_sinan_node_manager_ha
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.executor.memory" = "10g",
"spark.yarn.queue" = "XXXX",
"spark.hadoop.yarn.resourcemanager.address" = "XXXX:8032",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.hostname.rm1" = "XXXX",
"spark.hadoop.yarn.resourcemanager.hostname.rm2" = "XXXX",
"spark.hadoop.fs.defaultFS" = "hdfs://XXXX",
"spark.hadoop.dfs.nameservices" = "hacluster",
"spark.hadoop.dfs.ha.namenodes.hacluster" = "mynamenode1,mynamenode2",
"spark.hadoop.dfs.namenode.rpc-address.hacluster.mynamenode1" = "XXX:8020",
"spark.hadoop.dfs.namenode.rpc-address.hacluster.mynamenode2" = "XXXX:8020",
"spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"working_dir" = "hdfs://XXXX/doris_prd_data/sinan/spark_load/",
"broker" = "broker_personas",
"broker.username" = "hdfs",
"broker.password" = "",
"broker.dfs.nameservices" = "XXX",
"broker.dfs.ha.namenodes.XXX" = "mynamenode1, mynamenode2",
"broker.dfs.namenode.rpc-address.XXXX.mynamenode1" = "XXXX:8020",
"broker.dfs.namenode.rpc-address.XXXX.mynamenode2" = "XXXX:8020",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);

Co-authored-by: liujh <liujh@t3go.cn>
This commit is contained in:
liujinhui
2023-03-07 15:46:14 +08:00
committed by GitHub
parent 704faaed84
commit fca567068e
2 changed files with 46 additions and 5 deletions

View File

@ -32,6 +32,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -80,6 +81,10 @@ public class SparkResource extends Resource {
private static final String SPARK_YARN_RESOURCE_MANAGER_ADDRESS = "spark.hadoop.yarn.resourcemanager.address";
private static final String SPARK_FS_DEFAULT_FS = "spark.hadoop.fs.defaultFS";
private static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.address";
private static final String SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED = "spark.hadoop.yarn.resourcemanager.ha.enabled";
private static final String SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS = "spark.hadoop.yarn.resourcemanager.ha.rm-ids";
private static final String YARN_RESOURCE_MANAGER_ADDRESS_FOMART = "spark.hadoop.yarn.resourcemanager.address.%s";
private static final String YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT = "spark.hadoop.yarn.resourcemanager.hostname.%s";
public enum DeployMode {
CLUSTER,
@ -283,11 +288,31 @@ public class SparkResource extends Resource {
throw new DdlException("Missing " + SPARK_SUBMIT_DEPLOY_MODE + " in properties");
}
// if deploy machines do not set HADOOP_CONF_DIR env, we should set these configs blow
if ((!sparkConfigs.containsKey(SPARK_YARN_RESOURCE_MANAGER_ADDRESS)
|| !sparkConfigs.containsKey(SPARK_FS_DEFAULT_FS))
&& isYarnMaster()) {
throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_ADDRESS + " and " + SPARK_FS_DEFAULT_FS
+ ") in yarn master");
if (isYarnMaster()) {
if (!sparkConfigs.containsKey(SPARK_FS_DEFAULT_FS)) {
throw new DdlException("Missing (" + SPARK_FS_DEFAULT_FS + ") in yarn master");
}
String haEnabled = sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED);
if (StringUtils.isNotEmpty(haEnabled) && "true".equals(haEnabled)) {
if (StringUtils.isEmpty(SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS)) {
throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS + ") in yarn master, "
+ "when " + SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED + "=true.");
}
String[] haIds = sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS).split(",");
for (String haId : haIds) {
String addressKey = String.format(YARN_RESOURCE_MANAGER_ADDRESS_FOMART, haId);
String hostnameKey = String.format(YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT, haId);
if (!sparkConfigs.containsKey(addressKey) && !sparkConfigs.containsKey(hostnameKey)) {
throw new DdlException("Missing " + addressKey + " or " + hostnameKey + " in yarn master, "
+ "when " + SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED + "=true.");
}
}
} else if (!sparkConfigs.containsKey(SPARK_YARN_RESOURCE_MANAGER_ADDRESS)) {
throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_ADDRESS + ") in yarn master, "
+ "or not turned on ha.");
}
}
// check working dir and broker

View File

@ -118,6 +118,22 @@ public class SparkResourceTest {
BaseProcResult result = new BaseProcResult();
resource.getProcNodeData(result);
Assert.assertEquals(9, result.getRows().size());
properties.clear();
properties.put("type", type);
properties.put("spark.master", "yarn");
properties.put("spark.submit.deployMode", "cluster");
properties.put("spark.hadoop.yarn.resourcemanager.ha.enabled", "true");
properties.put("spark.hadoop.yarn.resourcemanager.ha.rm-ids", "rm1,rm2");
properties.put("spark.hadoop.yarn.resourcemanager.hostname.rm1", "host1");
properties.put("spark.hadoop.yarn.resourcemanager.hostname.rm2", "host2");
properties.put("spark.hadoop.fs.defaultFS", "hdfs://127.0.0.1:10000");
stmt = new CreateResourceStmt(true, false, name, properties);
stmt.analyze(analyzer);
resource = (SparkResource) Resource.fromStmt(stmt);
Assert.assertTrue(resource.isYarnMaster());
map = resource.getSparkConfigs();
Assert.assertEquals(7, map.size());
}
@Test