diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/exec/hdfs_file_reader.cpp index 95c819a91e..6c05840abf 100644 --- a/be/src/exec/hdfs_file_reader.cpp +++ b/be/src/exec/hdfs_file_reader.cpp @@ -30,9 +30,7 @@ HdfsFileReader::HdfsFileReader(THdfsParams hdfs_params, const std::string& path, _file_size(-1), _hdfs_fs(nullptr), _hdfs_file(nullptr) { - std::stringstream namenode_ss; - namenode_ss << "hdfs://" << _hdfs_params.host << ":" << _hdfs_params.port; - _namenode = namenode_ss.str(); + _namenode = _hdfs_params.fs_name; } HdfsFileReader::~HdfsFileReader() { diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index 90a7fdefe7..092a88a361 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -218,6 +218,22 @@ under the License. "AWS_SECRET_KEY"="", "AWS_REGION" = "" ) + 6. if using load with hdfs, you need to specify the following attributes + ( + "fs.defaultFS" = "", + "hdfs_user"="", + "kerb_principal" = "", + "kerb_ticket_cache_path" = "", + "kerb_token" = "" + ) + fs.defaultFS: defaultFS + hdfs_user: hdfs user + namenode HA: + By configuring namenode HA, new namenode can be automatically identified when the namenode is switched + dfs.nameservices: hdfs service name, customize, eg: "dfs.nameservices" = "my_ha" + dfs.ha.namenodes.xxx: Customize the name of a namenode, separated by commas. XXX is a custom name in dfs. name services, such as "dfs. ha. namenodes. my_ha" = "my_nn" + dfs.namenode.rpc-address.xxx.nn: Specify RPC address information for namenode, where NN denotes the name of the namenode configured in dfs.ha.namenodes.xxxx, such as: "dfs.namenode.rpc-address.my_ha.my_nn"= "host:port" + dfs.client.failover.proxy.provider: Specify the provider that client connects to namenode by default: org. apache. hadoop. hdfs. server. namenode. ha. Configured Failover ProxyProvider. 4. opt_properties @@ -529,7 +545,37 @@ under the License. properties("fuzzy_parse"="true", "strip_outer_array"="true") ) WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); - + + 16. LOAD WITH HDFS, normal HDFS cluster + LOAD LABEL example_db.label_filter + ( + DATA INFILE("hdfs://host:port/user/data/*/test.txt") + INTO TABLE `tbl1` + COLUMNS TERMINATED BY "," + (k1,k2,v1,v2) + ) + with HDFS ( + "fs.defaultFS"="hdfs://testFs", + "hdfs_user"="user" + ); + 17. LOAD WITH HDFS, hdfs ha + LOAD LABEL example_db.label_filter + ( + DATA INFILE("hdfs://host:port/user/data/*/test.txt") + INTO TABLE `tbl1` + COLUMNS TERMINATED BY "," + (k1,k2,v1,v2) + ) + with HDFS ( + "fs.defaultFS"="hdfs://testFs", + "hdfs_user"="user" + "dfs.nameservices"="my_ha", + "dfs.ha.namenodes.xxx"="my_nn1,my_nn2", + "dfs.namenode.rpc-address.xxx.my_nn1"="host1:port", + "dfs.namenode.rpc-address.xxx.my_nn2"="host2:port", + "dfs.client.failover.proxy.provider.xxx"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + ## keyword BROKER,LOAD diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index 18240f62a2..b9edbfb807 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -217,6 +217,22 @@ under the License. "AWS_SECRET_KEY"="", "AWS_REGION" = "" ) + 6. 如果使用HDFS协议直接连接远程存储时需要指定如下属性 + ( + "fs.defaultFS" = "", + "hdfs_user"="", + "kerb_principal" = "", + "kerb_ticket_cache_path" = "", + "kerb_token" = "" + ) + fs.defaultFS: hdfs集群defaultFS + hdfs_user: 连接hdfs集群时使用的用户名 + namenode HA: + 通过配置 namenode HA,可以在 namenode 切换时,自动识别到新的 namenode + dfs.nameservices: 指定 hdfs 服务的名字,自定义,如:"dfs.nameservices" = "my_ha" + dfs.ha.namenodes.xxx:自定义 namenode 的名字,多个名字以逗号分隔。其中 xxx 为 dfs.nameservices 中自定义的名字,如 "dfs.ha.namenodes.my_ha" = "my_nn" + dfs.namenode.rpc-address.xxx.nn:指定 namenode 的rpc地址信息。其中 nn 表示 dfs.ha.namenodes.xxx 中配置的 namenode 的名字,如:"dfs.namenode.rpc-address.my_ha.my_nn" = "host:port" + dfs.client.failover.proxy.provider:指定 client 连接 namenode 的 provider,默认为:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider 4. opt_properties @@ -547,6 +563,36 @@ under the License. ) WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); + 16. LOAD WITH HDFS, 普通HDFS集群 + LOAD LABEL example_db.label_filter + ( + DATA INFILE("hdfs://host:port/user/data/*/test.txt") + INTO TABLE `tbl1` + COLUMNS TERMINATED BY "," + (k1,k2,v1,v2) + ) + with HDFS ( + "fs.defaultFS"="hdfs://testFs", + "hdfs_user"="user" + ); + 17. LOAD WITH HDFS, 带ha的HDFS集群 + LOAD LABEL example_db.label_filter + ( + DATA INFILE("hdfs://host:port/user/data/*/test.txt") + INTO TABLE `tbl1` + COLUMNS TERMINATED BY "," + (k1,k2,v1,v2) + ) + with HDFS ( + "fs.defaultFS"="hdfs://testFs", + "hdfs_user"="user" + "dfs.nameservices"="my_ha", + "dfs.ha.namenodes.xxx"="my_nn1,my_nn2", + "dfs.namenode.rpc-address.xxx.my_nn1"="host1:port", + "dfs.namenode.rpc-address.xxx.my_nn2"="host2:port", + "dfs.client.failover.proxy.provider.xxx"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + ## keyword BROKER,LOAD diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index c936e9057c..cc73098174 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -48,31 +48,69 @@ import org.apache.doris.thrift.TBrokerOperationStatus; import org.apache.doris.thrift.TBrokerOperationStatusCode; import org.apache.doris.thrift.TBrokerPReadRequest; import org.apache.doris.thrift.TBrokerPWriteRequest; +import org.apache.doris.thrift.TBrokerRangeDesc; import org.apache.doris.thrift.TBrokerReadResponse; import org.apache.doris.thrift.TBrokerRenamePathRequest; import org.apache.doris.thrift.TBrokerVersion; +import org.apache.doris.thrift.THdfsConf; +import org.apache.doris.thrift.THdfsParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloBrokerService; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import java.io.FileInputStream; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; public class BrokerUtil { private static final Logger LOG = LogManager.getLogger(BrokerUtil.class); private static int READ_BUFFER_SIZE_B = 1024 * 1024; + private static String HDFS_FS_KEY = "fs.defaultFS"; + private static String HDFS_USER_KEY = "hdfs_user"; + private static String HDFS_KERB_PRINCIPAL = "kerb_principal"; + private static String HDFS_KERB_TICKET_CACHE_PATH = "kerb_ticket_cache_path"; + private static String HDFS_KERB_TOKEN = "kerb_token"; + + public static void generateHdfsParam(Map properties, TBrokerRangeDesc rangeDesc) { + rangeDesc.setHdfsParams(new THdfsParams()); + rangeDesc.hdfs_params.setHdfsConf(new ArrayList<>()); + for (Map.Entry property : properties.entrySet()) { + if (property.getKey().equalsIgnoreCase(HDFS_FS_KEY)) { + rangeDesc.hdfs_params.setFsName(property.getValue()); + } else if (property.getKey().equalsIgnoreCase(HDFS_USER_KEY)) { + rangeDesc.hdfs_params.setUser(property.getValue()); + } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_PRINCIPAL)) { + rangeDesc.hdfs_params.setKerbPrincipal(property.getValue()); + } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TICKET_CACHE_PATH)) { + rangeDesc.hdfs_params.setKerbTicketCachePath(property.getValue()); + } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TOKEN)) { + rangeDesc.hdfs_params.setToken(property.getValue()); + } else { + THdfsConf hdfsConf = new THdfsConf(); + hdfsConf.setKey(property.getKey()); + hdfsConf.setValue(property.getValue()); + rangeDesc.hdfs_params.hdfs_conf.add(hdfsConf); + } + } + } /** * Parse file status in path with broker, except directory @@ -127,6 +165,34 @@ public class BrokerUtil { fileStatuses.add(new TBrokerFileStatus(r.getName(), !r.isFile(), r.getSize(), r.isFile())); } } + } else if (brokerDesc.getStorageType() == StorageBackend.StorageType.HDFS) { + if (!brokerDesc.getProperties().containsKey(HDFS_FS_KEY) + || !brokerDesc.getProperties().containsKey(HDFS_USER_KEY)) { + throw new UserException(String.format( + "The properties of hdfs is invalid. %s and %s are needed", HDFS_FS_KEY, HDFS_USER_KEY)); + } + String hdfsFsName = brokerDesc.getProperties().get(HDFS_FS_KEY); + String user = brokerDesc.getProperties().get(HDFS_USER_KEY); + Configuration conf = new Configuration(); + for (Map.Entry propEntry : brokerDesc.getProperties().entrySet()) { + if (propEntry.getKey().equals(HDFS_FS_KEY) || propEntry.getKey().equals(HDFS_USER_KEY)) { + continue; + } + conf.set(propEntry.getKey(), propEntry.getValue()); + } + try { + FileSystem fs = FileSystem.get(new URI(hdfsFsName), conf, user); + FileStatus[] statusList = fs.listStatus(new Path(path)); + for (FileStatus status : statusList) { + if (status.isFile()) { + fileStatuses.add(new TBrokerFileStatus(status.getPath().toUri().getPath(), + status.isDirectory(), status.getLen(), status.isFile())); + } + } + } catch (IOException | InterruptedException | URISyntaxException e) { + LOG.warn("hdfs check error: ", e); + throw new UserException(e.getMessage()); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index e3df23e562..269f570031 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -502,6 +502,12 @@ public class BrokerScanNode extends LoadScanNode { rangeDesc.setFileSize(fileStatus.size); rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile); rangeDesc.setColumnsFromPath(columnsFromPath); + // set hdfs params for hdfs file type. + switch (brokerDesc.getFileType()) { + case FILE_HDFS: + BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc); + break; + } return rangeDesc; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 320b9e031b..d3b2cd0b21 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -114,13 +114,12 @@ struct THdfsConf { } struct THdfsParams { - 1: optional string host - 2: optional i32 port - 3: optional string user - 4: optional string kerb_principal - 5: optional string kerb_ticket_cache_path - 6: optional string token - 7: optional list hdfs_conf + 1: optional string fs_name + 2: optional string user + 3: optional string kerb_principal + 4: optional string kerb_ticket_cache_path + 5: optional string token + 6: optional list hdfs_conf } // One broker range information.