[Load] Add "LOAD WITH HDFS" model, and make hdfs_reader support hdfs ha (#6161)
Support load data from HDFS by using `LOAD WITH HDFS` syntax and read data directly via libhdfs3
This commit is contained in:
@ -48,31 +48,69 @@ import org.apache.doris.thrift.TBrokerOperationStatus;
|
||||
import org.apache.doris.thrift.TBrokerOperationStatusCode;
|
||||
import org.apache.doris.thrift.TBrokerPReadRequest;
|
||||
import org.apache.doris.thrift.TBrokerPWriteRequest;
|
||||
import org.apache.doris.thrift.TBrokerRangeDesc;
|
||||
import org.apache.doris.thrift.TBrokerReadResponse;
|
||||
import org.apache.doris.thrift.TBrokerRenamePathRequest;
|
||||
import org.apache.doris.thrift.TBrokerVersion;
|
||||
import org.apache.doris.thrift.THdfsConf;
|
||||
import org.apache.doris.thrift.THdfsParams;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TPaloBrokerService;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class BrokerUtil {
|
||||
private static final Logger LOG = LogManager.getLogger(BrokerUtil.class);
|
||||
|
||||
private static int READ_BUFFER_SIZE_B = 1024 * 1024;
|
||||
private static String HDFS_FS_KEY = "fs.defaultFS";
|
||||
private static String HDFS_USER_KEY = "hdfs_user";
|
||||
private static String HDFS_KERB_PRINCIPAL = "kerb_principal";
|
||||
private static String HDFS_KERB_TICKET_CACHE_PATH = "kerb_ticket_cache_path";
|
||||
private static String HDFS_KERB_TOKEN = "kerb_token";
|
||||
|
||||
public static void generateHdfsParam(Map<String, String> properties, TBrokerRangeDesc rangeDesc) {
|
||||
rangeDesc.setHdfsParams(new THdfsParams());
|
||||
rangeDesc.hdfs_params.setHdfsConf(new ArrayList<>());
|
||||
for (Map.Entry<String, String> property : properties.entrySet()) {
|
||||
if (property.getKey().equalsIgnoreCase(HDFS_FS_KEY)) {
|
||||
rangeDesc.hdfs_params.setFsName(property.getValue());
|
||||
} else if (property.getKey().equalsIgnoreCase(HDFS_USER_KEY)) {
|
||||
rangeDesc.hdfs_params.setUser(property.getValue());
|
||||
} else if (property.getKey().equalsIgnoreCase(HDFS_KERB_PRINCIPAL)) {
|
||||
rangeDesc.hdfs_params.setKerbPrincipal(property.getValue());
|
||||
} else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TICKET_CACHE_PATH)) {
|
||||
rangeDesc.hdfs_params.setKerbTicketCachePath(property.getValue());
|
||||
} else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TOKEN)) {
|
||||
rangeDesc.hdfs_params.setToken(property.getValue());
|
||||
} else {
|
||||
THdfsConf hdfsConf = new THdfsConf();
|
||||
hdfsConf.setKey(property.getKey());
|
||||
hdfsConf.setValue(property.getValue());
|
||||
rangeDesc.hdfs_params.hdfs_conf.add(hdfsConf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse file status in path with broker, except directory
|
||||
@ -127,6 +165,34 @@ public class BrokerUtil {
|
||||
fileStatuses.add(new TBrokerFileStatus(r.getName(), !r.isFile(), r.getSize(), r.isFile()));
|
||||
}
|
||||
}
|
||||
} else if (brokerDesc.getStorageType() == StorageBackend.StorageType.HDFS) {
|
||||
if (!brokerDesc.getProperties().containsKey(HDFS_FS_KEY)
|
||||
|| !brokerDesc.getProperties().containsKey(HDFS_USER_KEY)) {
|
||||
throw new UserException(String.format(
|
||||
"The properties of hdfs is invalid. %s and %s are needed", HDFS_FS_KEY, HDFS_USER_KEY));
|
||||
}
|
||||
String hdfsFsName = brokerDesc.getProperties().get(HDFS_FS_KEY);
|
||||
String user = brokerDesc.getProperties().get(HDFS_USER_KEY);
|
||||
Configuration conf = new Configuration();
|
||||
for (Map.Entry<String, String> propEntry : brokerDesc.getProperties().entrySet()) {
|
||||
if (propEntry.getKey().equals(HDFS_FS_KEY) || propEntry.getKey().equals(HDFS_USER_KEY)) {
|
||||
continue;
|
||||
}
|
||||
conf.set(propEntry.getKey(), propEntry.getValue());
|
||||
}
|
||||
try {
|
||||
FileSystem fs = FileSystem.get(new URI(hdfsFsName), conf, user);
|
||||
FileStatus[] statusList = fs.listStatus(new Path(path));
|
||||
for (FileStatus status : statusList) {
|
||||
if (status.isFile()) {
|
||||
fileStatuses.add(new TBrokerFileStatus(status.getPath().toUri().getPath(),
|
||||
status.isDirectory(), status.getLen(), status.isFile()));
|
||||
}
|
||||
}
|
||||
} catch (IOException | InterruptedException | URISyntaxException e) {
|
||||
LOG.warn("hdfs check error: ", e);
|
||||
throw new UserException(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -502,6 +502,12 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
rangeDesc.setFileSize(fileStatus.size);
|
||||
rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile);
|
||||
rangeDesc.setColumnsFromPath(columnsFromPath);
|
||||
// set hdfs params for hdfs file type.
|
||||
switch (brokerDesc.getFileType()) {
|
||||
case FILE_HDFS:
|
||||
BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc);
|
||||
break;
|
||||
}
|
||||
return rangeDesc;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user