[Doris On ES][WIP] Support external ES table with SSL secured and configurable node sniffing (#5325)

Support external ES  table with `SSL` secured and configurable node sniffing
This commit is contained in:
Stalary
2021-04-12 11:23:49 +08:00
committed by GitHub
parent 91043bb116
commit 75db273b93
16 changed files with 310 additions and 43 deletions

View File

@ -4146,6 +4146,8 @@ public class Catalog {
sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n");
sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n");
sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n");
sb.append("\"nodes_discovery\" = \"").append(esTable.isNodesDiscovery()).append("\"\n");
sb.append("\"http_ssl_enabled\" = \"").append(esTable.isHttpSslEnabled()).append("\"\n");
sb.append(")");
} else if (table.getType() == TableType.HIVE) {
HiveTable hiveTable = (HiveTable) table;

View File

@ -24,6 +24,7 @@ import org.apache.doris.external.elasticsearch.EsMajorVersion;
import org.apache.doris.external.elasticsearch.EsMetaStateTracker;
import org.apache.doris.external.elasticsearch.EsRestClient;
import org.apache.doris.external.elasticsearch.EsTablePartitions;
import org.apache.doris.external.elasticsearch.EsUtil;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
@ -63,6 +64,8 @@ public class EsTable extends Table {
public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
public static final String KEYWORD_SNIFF = "enable_keyword_sniff";
public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields";
public static final String NODES_DISCOVERY = "nodes_discovery";
public static final String HTTP_SSL_ENABLED = "http_ssl_enabled";
private String hosts;
private String[] seeds;
@ -87,6 +90,10 @@ public class EsTable extends Table {
// would downgrade to extract value from `stored_fields`
private int maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
private boolean nodesDiscovery = true;
private boolean httpSslEnabled = false;
// Solr doc_values vs stored_fields performance-smackdown indicate:
// It is possible to notice that retrieving an high number of fields leads
// to a sensible worsening of performance if DocValues are used.
@ -138,6 +145,13 @@ public class EsTable extends Table {
return enableKeywordSniff;
}
public boolean isNodesDiscovery() {
return nodesDiscovery;
}
public boolean isHttpSslEnabled() {
return httpSslEnabled;
}
private void validate(Map<String, String> properties) throws DdlException {
if (properties == null) {
@ -185,36 +199,40 @@ public class EsTable extends Table {
// enable doc value scan for Elasticsearch
if (properties.containsKey(DOC_VALUE_SCAN)) {
try {
enableDocValueScan = Boolean.parseBoolean(properties.get(DOC_VALUE_SCAN).trim());
} catch (Exception e) {
throw new DdlException("fail to parse enable_docvalue_scan, enable_docvalue_scan= "
+ properties.get(VERSION).trim() + " ,`enable_docvalue_scan`"
+ " should be like 'true' or 'false', value should be double quotation marks");
}
enableDocValueScan = EsUtil.getBoolean(properties, DOC_VALUE_SCAN);
}
if (properties.containsKey(KEYWORD_SNIFF)) {
try {
enableKeywordSniff = Boolean.parseBoolean(properties.get(KEYWORD_SNIFF).trim());
} catch (Exception e) {
throw new DdlException("fail to parse enable_keyword_sniff, enable_keyword_sniff= "
+ properties.get(VERSION).trim() + " ,`enable_keyword_sniff`"
+ " should be like 'true' or 'false', value should be double quotation marks");
enableKeywordSniff = EsUtil.getBoolean(properties, KEYWORD_SNIFF);
}
if (properties.containsKey(NODES_DISCOVERY)) {
nodesDiscovery = EsUtil.getBoolean(properties, NODES_DISCOVERY);
}
if (properties.containsKey(HTTP_SSL_ENABLED)) {
httpSslEnabled = EsUtil.getBoolean(properties, HTTP_SSL_ENABLED);
// check protocol
for (String seed : seeds) {
if (httpSslEnabled && seed.startsWith("http://")) {
throw new DdlException("if http_ssl_enabled is true, the https protocol must be used");
}
if (!httpSslEnabled && seed.startsWith("https://")) {
throw new DdlException("if http_ssl_enabled is false, the http protocol must be used");
}
}
} else {
enableKeywordSniff = true;
}
if (!Strings.isNullOrEmpty(properties.get(TYPE))
&& !Strings.isNullOrEmpty(properties.get(TYPE).trim())) {
mappingType = properties.get(TYPE).trim();
}
if (!Strings.isNullOrEmpty(properties.get(TRANSPORT))
&& !Strings.isNullOrEmpty(properties.get(TRANSPORT).trim())) {
transport = properties.get(TRANSPORT).trim();
if (!(TRANSPORT_HTTP.equals(transport) || TRANSPORT_THRIFT.equals(transport))) {
throw new DdlException("transport of ES table must be http(recommend) or thrift(reserved inner usage),"
throw new DdlException("transport of ES table must be http/https(recommend) or thrift(reserved inner usage),"
+ " but value is " + transport);
}
}
@ -241,6 +259,8 @@ public class EsTable extends Table {
tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan));
tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff));
tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields));
tableContext.put(NODES_DISCOVERY, String.valueOf(nodesDiscovery));
tableContext.put(HTTP_SSL_ENABLED, String.valueOf(httpSslEnabled));
}
public TTableDescriptor toThrift() {
@ -323,7 +343,16 @@ public class EsTable extends Table {
maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
}
}
if (tableContext.containsKey(NODES_DISCOVERY)) {
nodesDiscovery = Boolean.parseBoolean(tableContext.get(NODES_DISCOVERY));
} else {
nodesDiscovery = true;
}
if (tableContext.containsKey(HTTP_SSL_ENABLED)) {
httpSslEnabled = Boolean.parseBoolean(tableContext.get(HTTP_SSL_ENABLED));
} else {
httpSslEnabled = false;
}
PartitionType partType = PartitionType.valueOf(Text.readString(in));
if (partType == PartitionType.UNPARTITIONED) {
partitionInfo = SinglePartitionInfo.read(in);
@ -357,6 +386,8 @@ public class EsTable extends Table {
tableContext.put("transport", transport);
tableContext.put("enableDocValueScan", "false");
tableContext.put(KEYWORD_SNIFF, "true");
tableContext.put(NODES_DISCOVERY, "true");
tableContext.put(HTTP_SSL_ENABLED, "false");
}
}

View File

@ -19,6 +19,9 @@ package org.apache.doris.external.elasticsearch;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
@ -38,7 +41,9 @@ public class EsNodeInfo {
private boolean hasThrift;
private TNetworkAddress thriftAddress;
public EsNodeInfo(String id, Map<String, Object> map) throws DorisEsException {
private static final Logger LOG = LogManager.getLogger(EsNodeInfo.class);
public EsNodeInfo(String id, Map<String, Object> map, boolean httpSslEnabled) {
this.id = id;
EsMajorVersion version = EsMajorVersion.parse((String) map.get("version"));
this.name = (String) map.get("name");
@ -66,7 +71,7 @@ public class EsNodeInfo {
String address = (String) httpMap.get("publish_address");
if (address != null) {
String[] scratch = address.split(":");
this.publishAddress = new TNetworkAddress(scratch[0], Integer.valueOf(scratch[1]));
this.publishAddress = new TNetworkAddress((httpSslEnabled ? "https://" : "") + scratch[0], Integer.parseInt(scratch[1]));
this.hasHttp = true;
} else {
this.publishAddress = null;
@ -96,6 +101,24 @@ public class EsNodeInfo {
}
}
public EsNodeInfo(String id, String seed) {
this.id = id;
String[] scratch = seed.split(":");
int port = 80;
if (scratch.length == 3) {
port = Integer.parseInt(scratch[2]);
}
String remoteHost = scratch[0] + ":" + scratch[1];
this.name = remoteHost;
this.host = remoteHost;
this.ip = remoteHost;
this.isClient = true;
this.isData = true;
this.isIngest = true;
this.publishAddress = new TNetworkAddress(remoteHost, port);
this.hasHttp = true;
}
public boolean hasHttp() {
return hasHttp;
}

View File

@ -59,7 +59,7 @@ public class EsRepository extends MasterDaemon {
}
esTables.put(esTable.getId(), esTable);
esClients.put(esTable.getId(),
new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd()));
new EsRestClient(esTable.getSeeds(), esTable.getUserName(), esTable.getPasswd(), esTable.isHttpSslEnabled()));
LOG.info("register a new table [{}] to sync list", esTable);
}

View File

@ -27,11 +27,20 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import java.io.IOException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import okhttp3.Credentials;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@ -51,13 +60,16 @@ public class EsRestClient {
private static OkHttpClient networkClient = new OkHttpClient.Builder()
.readTimeout(10, TimeUnit.SECONDS)
.build();
private static OkHttpClient sslNetworkClient;
private Request.Builder builder;
private String[] nodes;
private String currentNode;
private int currentNodeIndex = 0;
private boolean httpSslEnable;
public EsRestClient(String[] nodes, String authUser, String authPassword) {
public EsRestClient(String[] nodes, String authUser, String authPassword, boolean httpSslEnable) {
this.nodes = nodes;
this.builder = new Request.Builder();
if (!Strings.isEmpty(authUser) && !Strings.isEmpty(authPassword)) {
@ -65,6 +77,7 @@ public class EsRestClient {
Credentials.basic(authUser, authPassword));
}
this.currentNode = nodes[currentNodeIndex];
this.httpSslEnable = httpSslEnable;
}
private void selectNextNode() {
@ -83,7 +96,7 @@ public class EsRestClient {
}
Map<String, EsNodeInfo> nodesMap = new HashMap<>();
for (Map.Entry<String, Map<String, Object>> entry : nodesData.entrySet()) {
EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue());
EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), httpSslEnable);
if (node.hasHttp()) {
nodesMap.put(node.getId(), node);
}
@ -141,6 +154,20 @@ public class EsRestClient {
}
return EsShardPartitions.findShardPartitions(indexName, searchShards);
}
/**
* init ssl networkClient use lazy way
**/
private synchronized OkHttpClient getOrCreateSslNetworkClient() {
if (sslNetworkClient == null) {
sslNetworkClient = new OkHttpClient.Builder()
.readTimeout(10, TimeUnit.SECONDS)
.sslSocketFactory(createSSLSocketFactory(), new TrustAllCerts())
.hostnameVerifier(new TrustAllHostnameVerifier())
.build();
}
return sslNetworkClient;
}
/**
* execute request for specific path,it will try again nodes.length times if it fails
@ -151,6 +178,12 @@ public class EsRestClient {
private String execute(String path) throws DorisEsException {
int retrySize = nodes.length;
DorisEsException scratchExceptionForThrow = null;
OkHttpClient httpClient;
if (httpSslEnable) {
httpClient = getOrCreateSslNetworkClient();
} else {
httpClient = networkClient;
}
for (int i = 0; i < retrySize; i++) {
// maybe should add HTTP schema to the address
// actually, at this time we can only process http protocol
@ -170,7 +203,7 @@ public class EsRestClient {
LOG.trace("es rest client request URL: {}", currentNode + "/" + path);
}
try {
response = networkClient.newCall(request).execute();
response = httpClient.newCall(request).execute();
if (response.isSuccessful()) {
return response.body().string();
}
@ -207,4 +240,33 @@ public class EsRestClient {
}
return (T) (key != null ? map.get(key) : map);
}
/**
* support https
**/
private static class TrustAllCerts implements X509TrustManager {
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {}
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {}
public X509Certificate[] getAcceptedIssuers() {return new X509Certificate[0];}
}
private static class TrustAllHostnameVerifier implements HostnameVerifier {
public boolean verify(String hostname, SSLSession session) {
return true;
}
}
private static SSLSocketFactory createSSLSocketFactory() {
SSLSocketFactory ssfFactory;
try {
SSLContext sc = SSLContext.getInstance("TLS");
sc.init(null, new TrustManager[]{new TrustAllCerts()}, new SecureRandom());
ssfFactory = sc.getSocketFactory();
} catch (Exception e) {
throw new DorisEsException("Errors happens when create ssl socket");
}
return ssfFactory;
}
}

View File

@ -24,7 +24,6 @@ import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.SinglePartitionInfo;
import org.apache.doris.common.DdlException;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -36,7 +35,6 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* save the dynamic info parsed from es cluster state such as shard routing, partition info
@ -109,22 +107,6 @@ public class EsTablePartitions {
}
return esTablePartitions;
}
public void addHttpAddress(Map<String, EsNodeInfo> nodesInfo) {
for (EsShardPartitions indexState : partitionedIndexStates.values()) {
indexState.addHttpAddress(nodesInfo);
}
for (EsShardPartitions indexState : unPartitionedIndexStates.values()) {
indexState.addHttpAddress(nodesInfo);
}
}
public TNetworkAddress randomAddress(Map<String, EsNodeInfo> nodesInfo) {
int seed = new Random().nextInt() % nodesInfo.size();
EsNodeInfo[] nodeInfos = (EsNodeInfo[]) nodesInfo.values().toArray();
return nodeInfos[seed].getPublishAddress();
}
public PartitionInfo getPartitionInfo() {
return partitionInfo;

View File

@ -21,8 +21,12 @@ import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.RangePartitionDesc;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.json.JSONObject;
import java.util.Map;
public class EsUtil {
public static void analyzePartitionAndDistributionDesc(PartitionDesc partitionDesc,
@ -82,4 +86,13 @@ public class EsUtil {
return null;
}
}
public static boolean getBoolean(Map<String, String> properties, String name) throws DdlException {
String property = properties.get(name).trim();
try {
return Boolean.parseBoolean(property);
} catch (Exception e) {
throw new DdlException(String.format("fail to parse %s, %s = %s, `%s` should be like 'true' or 'false', value should be double quotation marks", name, name, property, name));
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.external.elasticsearch;
import org.apache.doris.catalog.EsTable;
import java.util.HashMap;
import java.util.Map;
/**
@ -37,7 +38,15 @@ public class PartitionPhase implements SearchPhase {
@Override
public void execute(SearchContext context) throws DorisEsException {
shardPartitions = client.searchShards(context.sourceIndex());
nodesInfo = client.getHttpNodes();
if (context.nodesDiscovery()) {
nodesInfo = client.getHttpNodes();
} else {
nodesInfo = new HashMap<>();
String[] seeds = context.esTable().getSeeds();
for (int i = 0; i < seeds.length; i++) {
nodesInfo.put(String.valueOf(i), new EsNodeInfo(String.valueOf(i), seeds[i]));
}
}
}

View File

@ -84,12 +84,16 @@ public class SearchContext {
// the ES cluster version
private EsMajorVersion version;
// whether the nodes needs to be discovered
private boolean nodesDiscovery;
public SearchContext(EsTable table) {
this.table = table;
fullSchema = table.getFullSchema();
sourceIndex = table.getIndexName();
type = table.getMappingType();
nodesDiscovery = table.isNodesDiscovery();
}
@ -142,4 +146,8 @@ public class SearchContext {
public EsTablePartitions tablePartitions() throws Exception {
return EsTablePartitions.fromShardPartitions(table, shardPartitions);
}
public boolean nodesDiscovery() {
return nodesDiscovery;
}
}

View File

@ -148,6 +148,7 @@ public class EsScanNode extends ScanNode {
Map<String, String> properties = Maps.newHashMap();
properties.put(EsTable.USER, table.getUserName());
properties.put(EsTable.PASSWORD, table.getPasswd());
properties.put(EsTable.HTTP_SSL_ENABLED, String.valueOf(table.isHttpSslEnabled()));
TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt());
esScanNode.setProperties(properties);
if (table.isDocValueScanEnable()) {

View File

@ -50,7 +50,7 @@ public class PartitionPhaseTest extends EsTestCase {
Map<String, Map<String, Object>> nodesData = (Map<String, Map<String, Object>>) mapper.readValue(jsonParser, Map.class).get("nodes");
Map<String, EsNodeInfo> nodesMap = new HashMap<>();
for (Map.Entry<String, Map<String, Object>> entry : nodesData.entrySet()) {
EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue());
EsNodeInfo node = new EsNodeInfo(entry.getKey(), entry.getValue(), false);
if (node.hasHttp()) {
nodesMap.put(node.getId(), node);
}