[Doris On ES][Refactor] refactor and enchanment ES sync meta logic (#4012)

After PR #3454 was merged, we should refactor and reorganize some logic for long-term sustainable iteration for Doris On ES.
To facilitate code review,I would divided into this work to multiple PRs (some other WIP work I also need to think carefully)

This PR include:

1. introduce SearchContext for all state we needed
2. divide meta-sync logic into three phase
3. modify some logic processing
4. introduce version detect logic for future using
This commit is contained in:
Yunfeng,Wu
2020-07-07 09:04:05 +08:00
committed by GitHub
parent 913b2caac4
commit 3ba38e3381
15 changed files with 551 additions and 335 deletions

View File

@ -20,21 +20,19 @@ package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.external.elasticsearch.EsFieldInfos;
import org.apache.doris.external.elasticsearch.EsMajorVersion;
import org.apache.doris.external.elasticsearch.EsNodeInfo;
import org.apache.doris.external.elasticsearch.EsMetaStateTracker;
import org.apache.doris.external.elasticsearch.EsRestClient;
import org.apache.doris.external.elasticsearch.EsShardPartitions;
import org.apache.doris.external.elasticsearch.EsTablePartitions;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.base.Strings;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@ -83,27 +81,6 @@ public class EsTable extends Table {
private Map<String, String> tableContext = new HashMap<>();
// used to indicate which fields can get from ES docavalue
// because elasticsearch can have "fields" feature, field can have
// two or more types, the first type maybe have not docvalue but other
// can have, such as (text field not have docvalue, but keyword can have):
// "properties": {
// "city": {
// "type": "text",
// "fields": {
// "raw": {
// "type": "keyword"
// }
// }
// }
// }
// then the docvalue context provided the mapping between the select field and real request field :
// {"city": "city.raw"}
// use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw`
private Map<String, String> docValueContext = new HashMap<>();
private Map<String, String> fieldsContext = new HashMap<>();
// record the latest and recently exception when sync ES table metadata (mapping, shard location)
private Throwable lastMetaDataSyncException = null;
@ -118,21 +95,13 @@ public class EsTable extends Table {
validate(properties);
}
public void addFieldInfos(EsFieldInfos esFieldInfos) {
if (enableKeywordSniff && esFieldInfos.getFieldsContext() != null) {
fieldsContext = esFieldInfos.getFieldsContext();
}
if (enableDocValueScan && esFieldInfos.getDocValueContext() != null) {
docValueContext = esFieldInfos.getDocValueContext();
}
}
public Map<String, String> fieldsContext() {
return fieldsContext;
return esMetaStateTracker.searchContext().fetchFieldsContext();
}
public Map<String, String> docValueContext() {
return docValueContext;
return esMetaStateTracker.searchContext().docValueFieldsContext();
}
public boolean isDocValueScanEnable() {
@ -179,9 +148,12 @@ public class EsTable extends Table {
if (properties.containsKey(VERSION)) {
try {
majorVersion = EsMajorVersion.parse(properties.get(VERSION).trim());
if (majorVersion.before(EsMajorVersion.V_5_X)) {
throw new DdlException("Unsupported/Unknown ES Cluster version [" + properties.get(VERSION) + "] ");
}
} catch (Exception e) {
throw new DdlException("fail to parse ES major version, version= "
+ properties.get(VERSION).trim() + ", shoud be like '6.5.3' ");
+ properties.get(VERSION).trim() + ", should be like '6.5.3' ");
}
}
@ -399,6 +371,10 @@ public class EsTable extends Table {
this.esTablePartitions = esTablePartitions;
}
public EsMajorVersion esVersion() {
return majorVersion;
}
public Throwable getLastMetaDataSyncException() {
return lastMetaDataSyncException;
}
@ -407,24 +383,20 @@ public class EsTable extends Table {
this.lastMetaDataSyncException = lastMetaDataSyncException;
}
private EsMetaStateTracker esMetaStateTracker;
/**
* sync es index meta from remote
* sync es index meta from remote ES Cluster
*
* @param client esRestClient
*/
public void syncESIndexMeta(EsRestClient client) {
public void syncTableMetaData(EsRestClient client) {
if (esMetaStateTracker == null) {
esMetaStateTracker = new EsMetaStateTracker(client, this);
}
try {
EsFieldInfos fieldInfos = client.getFieldInfos(this.indexName, this.mappingType, this.fullSchema);
EsShardPartitions esShardPartitions = client.getShardPartitions(this.indexName);
Map<String, EsNodeInfo> nodesInfo = client.getHttpNodes();
if (this.enableKeywordSniff || this.enableDocValueScan) {
addFieldInfos(fieldInfos);
}
this.esTablePartitions = EsTablePartitions.fromShardPartitions(this, esShardPartitions);
if (EsTable.TRANSPORT_HTTP.equals(getTransport())) {
this.esTablePartitions.addHttpAddress(nodesInfo);
}
esMetaStateTracker.run();
this.esTablePartitions = esMetaStateTracker.searchContext().tablePartitions();
} catch (Throwable e) {
LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", this.name, e);
this.esTablePartitions = null;

View File

@ -17,11 +17,8 @@
package org.apache.doris.external.elasticsearch;
import org.apache.doris.common.UserException;
public class DorisEsException extends UserException {
private static final long serialVersionUID = 7912833584319374692L;
public class DorisEsException extends RuntimeException {
public DorisEsException(String msg) {
super(msg);

View File

@ -20,14 +20,18 @@ package org.apache.doris.external.elasticsearch;
/**
* Elasticsearch major version information, useful to check client's query compatibility with the Rest API.
*
* <p>
* reference es-hadoop:
*
*/
public class EsMajorVersion {
public static final EsMajorVersion V_0_X = new EsMajorVersion((byte) 0, "0.x");
public static final EsMajorVersion V_1_X = new EsMajorVersion((byte) 1, "1.x");
public static final EsMajorVersion V_2_X = new EsMajorVersion((byte) 2, "2.x");
public static final EsMajorVersion V_5_X = new EsMajorVersion((byte) 5, "5.x");
public static final EsMajorVersion V_6_X = new EsMajorVersion((byte) 6, "6.x");
public static final EsMajorVersion V_7_X = new EsMajorVersion((byte) 7, "7.x");
public static final EsMajorVersion V_8_X = new EsMajorVersion((byte) 8, "8.x");
public static final EsMajorVersion LATEST = V_7_X;
public final byte major;
@ -62,7 +66,16 @@ public class EsMajorVersion {
return version.major >= major;
}
public static EsMajorVersion parse(String version) throws Exception {
public static EsMajorVersion parse(String version) throws DorisEsException {
if (version.startsWith("0.")) {
return new EsMajorVersion((byte) 0, version);
}
if (version.startsWith("1.")) {
return new EsMajorVersion((byte) 1, version);
}
if (version.startsWith("2.")) {
return new EsMajorVersion((byte) 2, version);
}
if (version.startsWith("5.")) {
return new EsMajorVersion((byte) 5, version);
}
@ -72,8 +85,12 @@ public class EsMajorVersion {
if (version.startsWith("7.")) {
return new EsMajorVersion((byte) 7, version);
}
throw new Exception("Unsupported/Unknown Elasticsearch version [" + version + "]." +
"Highest supported version is [" + LATEST.version + "]. You may need to upgrade ES-Hadoop.");
// used for the next released ES version
if (version.startsWith("8.")) {
return new EsMajorVersion((byte) 8, version);
}
throw new DorisEsException("Unsupported/Unknown ES Cluster version [" + version + "]." +
"Highest supported version is [" + LATEST.version + "].");
}
@Override

View File

@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.elasticsearch;
import org.apache.doris.catalog.EsTable;
import java.util.LinkedList;
import java.util.List;
/**
* It is responsible for this class to schedule all network request sent to remote ES Cluster
* Request sequence
* 1. GET /
* 2. GET {index}/_mapping
* 3. GET {index}/_search_shards
* <p>
* note: step 1 is not necessary
*/
public class EsMetaStateTracker {
private List<SearchPhase> builtinSearchPhase = new LinkedList<>();
private SearchContext searchContext;
public EsMetaStateTracker(EsRestClient client, EsTable esTable) {
builtinSearchPhase.add(new VersionPhase(client));
builtinSearchPhase.add(new MappingPhase(client));
builtinSearchPhase.add(new PartitionPhase(client));
searchContext = new SearchContext(esTable);
}
public SearchContext searchContext() {
return searchContext;
}
public void run() throws DorisEsException {
for (SearchPhase searchPhase : builtinSearchPhase) {
searchPhase.preProcess(searchContext);
searchPhase.execute(searchContext);
searchPhase.postProcess(searchContext);
}
}
}

View File

@ -38,7 +38,7 @@ public class EsNodeInfo {
private boolean hasThrift;
private TNetworkAddress thriftAddress;
public EsNodeInfo(String id, Map<String, Object> map) throws Exception {
public EsNodeInfo(String id, Map<String, Object> map) throws DorisEsException {
this.id = id;
EsMajorVersion version = EsMajorVersion.parse((String) map.get("version"));
this.name = (String) map.get("name");

View File

@ -17,6 +17,7 @@
package org.apache.doris.external.elasticsearch;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.EsTable;
@ -24,7 +25,9 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -33,7 +36,8 @@ import java.util.Map;
/**
* It is used to call es api to get shard allocation state
* It is responsible for loading all ES external table's meta-data such as `fields`, `partitions` periodically,
* playing the `repo` role at Doris On ES
*/
public class EsRepository extends MasterDaemon {
@ -69,8 +73,7 @@ public class EsRepository extends MasterDaemon {
protected void runAfterCatalogReady() {
for (EsTable esTable : esTables.values()) {
try {
EsRestClient client = esClients.get(esTable.getId());
esTable.syncESIndexMeta(client);
esTable.syncTableMetaData(esClients.get(esTable.getId()));
} catch (Throwable e) {
LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", esTable.getName(), e);
esTable.setEsTablePartitions(null);

View File

@ -17,7 +17,6 @@
package org.apache.doris.external.elasticsearch;
import org.apache.doris.catalog.Column;
import org.apache.http.HttpHeaders;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -26,37 +25,38 @@ import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import okhttp3.Credentials;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
public class EsRestClient {
private static final Logger LOG = LogManager.getLogger(EsRestClient.class);
private ObjectMapper mapper;
{
mapper = new ObjectMapper();
mapper.configure(DeserializationConfig.Feature.USE_ANNOTATIONS, false);
mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, false);
}
private static OkHttpClient networkClient = new OkHttpClient.Builder()
.readTimeout(10, TimeUnit.SECONDS)
.build();
private Request.Builder builder;
private String[] nodes;
private String currentNode;
private int currentNodeIndex = 0;
public EsRestClient(String[] nodes, String authUser, String authPassword) {
this.nodes = nodes;
this.builder = new Request.Builder();
@ -66,7 +66,7 @@ public class EsRestClient {
}
this.currentNode = nodes[currentNodeIndex];
}
private void selectNextNode() {
currentNodeIndex++;
// reroute, because the previously failed node may have already been restored
@ -75,8 +75,8 @@ public class EsRestClient {
}
currentNode = nodes[currentNodeIndex];
}
public Map<String, EsNodeInfo> getHttpNodes() throws Exception {
public Map<String, EsNodeInfo> getHttpNodes() throws DorisEsException {
Map<String, Map<String, Object>> nodesData = get("_nodes/http", "nodes");
if (nodesData == null) {
return Collections.emptyMap();
@ -90,35 +90,67 @@ public class EsRestClient {
}
return nodesMap;
}
public EsFieldInfos getFieldInfos(String indexName, String docType, List<Column> colList) throws Exception {
String path = indexName + "/_mapping";
String indexMapping = execute(path);
if (indexMapping == null) {
throw new DorisEsException( "index[" + indexName + "] not found for the Elasticsearch Cluster");
/**
* Get remote ES Cluster version
*
* @return
* @throws Exception
*/
public EsMajorVersion version() throws DorisEsException {
Map<String, Object> result = get("/", null);
if (result == null) {
throw new DorisEsException("Unable to retrieve ES main cluster info.");
}
return EsFieldInfos.fromMapping(colList, indexName, indexMapping, docType);
Map<String, String> versionBody = (Map<String, String>) result.get("version");
return EsMajorVersion.parse(versionBody.get("number"));
}
public EsShardPartitions getShardPartitions(String indexName) throws Exception {
/**
* Get mapping for indexName
*
* @param indexName
* @return
* @throws Exception
*/
public String getMapping(String indexName, boolean includeTypeName) throws DorisEsException {
String path = indexName + "/_mapping";
if (includeTypeName) {
path += "?include_type_name=true";
}
String indexMapping = execute(path);
if (indexMapping == null) {
throw new DorisEsException("index[" + indexName + "] not found");
}
return indexMapping;
}
/**
* Get Shard location
*
* @param indexName
* @return
* @throws DorisEsException
*/
public EsShardPartitions searchShards(String indexName) throws DorisEsException {
String path = indexName + "/_search_shards";
String searchShards = execute(path);
if (searchShards == null) {
throw new DorisEsException( "index[" + indexName + "] search_shards not found for the Elasticsearch Cluster");
throw new DorisEsException("request index [" + indexName + "] search_shards failure");
}
return EsShardPartitions.findShardPartitions(indexName, searchShards);
}
/**
* execute request for specific path,it will try again nodes.length times if it fails
*
* @param path the path must not leading with '/'
* @return response
*/
private String execute(String path) throws Exception {
private String execute(String path) throws DorisEsException {
int retrySize = nodes.length;
Exception scratchExceptionForThrow = null;
DorisEsException scratchExceptionForThrow = null;
for (int i = 0; i < retrySize; i++) {
// maybe should add HTTP schema to the address
// actually, at this time we can only process http protocol
@ -144,7 +176,7 @@ public class EsRestClient {
}
} catch (IOException e) {
LOG.warn("request node [{}] [{}] failures {}, try next nodes", currentNode, path, e);
scratchExceptionForThrow = e;
scratchExceptionForThrow = new DorisEsException(e.getMessage());
} finally {
if (response != null) {
response.close();
@ -158,11 +190,11 @@ public class EsRestClient {
}
return null;
}
public <T> T get(String q, String key) throws Exception {
public <T> T get(String q, String key) throws DorisEsException {
return parseContent(execute(q), key);
}
@SuppressWarnings("unchecked")
private <T> T parseContent(String response, String key) {
Map<String, Object> map = Collections.emptyMap();
@ -171,6 +203,7 @@ public class EsRestClient {
map = mapper.readValue(jsonParser, Map.class);
} catch (IOException ex) {
LOG.error("parse es response failure: [{}]", response);
throw new DorisEsException(ex.getMessage());
}
return (T) (key != null ? map.get(key) : map);
}

View File

@ -59,7 +59,8 @@ public class EsShardPartitions {
* @return shardRoutings is used for searching
*/
public static EsShardPartitions findShardPartitions(String indexName, String searchShards) throws DorisEsException {
EsShardPartitions indexState = new EsShardPartitions(indexName);
EsShardPartitions partitions = new EsShardPartitions(indexName);
JSONObject jsonObject = new JSONObject(searchShards);
JSONArray shards = jsonObject.getJSONArray("shards");
int length = shards.length();
@ -87,9 +88,9 @@ public class EsShardPartitions {
if (singleShardRouting.isEmpty()) {
LOG.warn("could not find a healthy allocation for [{}][{}]", indexName, i);
}
indexState.addShardRouting(i, singleShardRouting);
partitions.addShardRouting(i, singleShardRouting);
}
return indexState;
return partitions;
}
public void addHttpAddress(Map<String, EsNodeInfo> nodesInfo) {

View File

@ -19,122 +19,94 @@ package org.apache.doris.external.elasticsearch;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.EsTable;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* It is used to hold the field information obtained from es, currently including the fields and docValue,
* it will eventually be added to the EsTable
**/
public class EsFieldInfos {
private static final Logger LOG = LogManager.getLogger(EsFieldInfos.class);
* Get index mapping from remote ES Cluster, and resolved `keyword` and `doc_values` field
* Later we can use it to parse all relevant indexes
*/
public class MappingPhase implements SearchPhase {
// userId => userId.keyword
private Map<String, String> fieldsContext;
private EsRestClient client;
// city => city.raw
private Map<String, String> docValueContext;
public EsFieldInfos(Map<String, String> fieldsContext, Map<String, String> docValueContext) {
this.fieldsContext = fieldsContext;
this.docValueContext = docValueContext;
// json response for `{index}/_mapping` API
private String jsonMapping;
private boolean includeTypeName = false;
public MappingPhase(EsRestClient client) {
this.client = client;
}
public Map<String, String> getFieldsContext() {
return fieldsContext;
@Override
public void preProcess(SearchContext context) {
if (context.version() != null && context.version().onOrAfter(EsMajorVersion.V_7_X)) {
includeTypeName = true;
}
}
public Map<String, String> getDocValueContext() {
return docValueContext;
@Override
public void execute(SearchContext context) throws DorisEsException {
jsonMapping = client.getMapping(context.sourceIndex(), includeTypeName);
}
@Override
public void postProcess(SearchContext context) {
resolveFields(context, jsonMapping);
}
/**
* Parse the required field information from the json
* @param colList table column
* @param indexName indexName(alias or really name)
* @param indexMapping the return value of _mapping
* @param docType The docType used by the index
* @return fieldsContext and docValueContext
*
* @param searchContext the current associated column searchContext
* @param indexMapping the return value of _mapping
* @return fetchFieldsContext and docValueFieldsContext
* @throws Exception
*/
public static EsFieldInfos fromMapping(List<Column> colList, String indexName, String indexMapping, String docType) throws DorisEsException {
public void resolveFields(SearchContext searchContext, String indexMapping) throws DorisEsException {
JSONObject jsonObject = new JSONObject(indexMapping);
// the indexName use alias takes the first mapping
Iterator<String> keys = jsonObject.keys();
String docKey = keys.next();
JSONObject docData = jsonObject.optJSONObject(docKey);
//{
// "mappings": {
// "doc": {
// "dynamic": "strict",
// "properties": {
// "time": {
// "type": "long"
// },
// "type": {
// "type": "keyword"
// },
// "userId": {
// "type": "text",
// "fields": {
// "keyword": {
// "type": "keyword"
// }
// }
// }
// }
// }
// }
//}
JSONObject mappings = docData.optJSONObject("mappings");
JSONObject rootSchema = mappings.optJSONObject(docType);
JSONObject rootSchema = mappings.optJSONObject(searchContext.type());
JSONObject properties;
// no type in es7
// After (include) 7.x, type was removed from ES mapping, default type is `_doc`
// https://www.elastic.co/guide/en/elasticsearch/reference/7.0/removal-of-types.html
if (rootSchema == null) {
if (searchContext.type().equals("_doc") == false) {
throw new DorisEsException("index[" + searchContext.sourceIndex() + "]'s type must be exists, "
+ " and after ES7.x type must be `_doc`, but found ["
+ searchContext.type() + "], for table ["
+ searchContext.esTable().getName() + "]");
}
properties = mappings.optJSONObject("properties");
} else {
properties = rootSchema.optJSONObject("properties");
}
if (properties == null) {
throw new DorisEsException( "index[" + indexName + "] type[" + docType + "] mapping not found for the Elasticsearch Cluster");
throw new DorisEsException("index[" + searchContext.sourceIndex() + "] type[" + searchContext.type() + "] mapping not found for the ES Cluster");
}
return parseProperties(colList, properties);
}
// get fields information in properties
private static EsFieldInfos parseProperties(List<Column> colList, JSONObject properties) {
if (properties == null) {
return null;
}
Map<String, String> fieldsMap = new HashMap<>();
Map<String, String> docValueMap = new HashMap<>();
for (Column col : colList) {
for (Column col : searchContext.columns()) {
String colName = col.getName();
// if column exists in Doris Table but no found in ES's mapping, we choose to ignore this situation?
if (!properties.has(colName)) {
continue;
}
JSONObject fieldObject = properties.optJSONObject(colName);
String keywordField = getKeywordField(fieldObject, colName);
if (StringUtils.isNotEmpty(keywordField)) {
fieldsMap.put(colName, keywordField);
}
String docValueField = getDocValueField(fieldObject, colName);
if (StringUtils.isNotEmpty(docValueField)) {
docValueMap.put(colName, docValueField);
}
resolveKeywordFields(searchContext, fieldObject, colName);
resolveDocValuesFields(searchContext, fieldObject, colName);
}
return new EsFieldInfos(fieldsMap, docValueMap);
}
// get a field of keyword type in the fields
private static String getKeywordField(JSONObject fieldObject, String colName) {
private void resolveKeywordFields(SearchContext searchContext, JSONObject fieldObject, String colName) {
String fieldType = fieldObject.optString("type");
// string-type field used keyword type to generate predicate
// if text field type seen, we should use the `field` keyword type?
@ -145,15 +117,14 @@ public class EsFieldInfos {
JSONObject innerTypeObject = fieldsObject.optJSONObject(key);
// just for text type
if ("keyword".equals(innerTypeObject.optString("type"))) {
return colName + "." + key;
searchContext.fetchFieldsContext().put(colName, colName + "." + key);
}
}
}
}
return null;
}
private static String getDocValueField(JSONObject fieldObject, String colName) {
private void resolveDocValuesFields(SearchContext searchContext, JSONObject fieldObject, String colName) {
String fieldType = fieldObject.optString("type");
String docValueField = null;
if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) {
@ -175,16 +146,16 @@ public class EsFieldInfos {
}
}
}
return docValueField;
}
// set doc_value = false manually
if (fieldObject.has("doc_values")) {
boolean docValue = fieldObject.optBoolean("doc_values");
if (!docValue) {
return docValueField;
} else {
// set doc_value = false manually
if (fieldObject.has("doc_values")) {
boolean docValue = fieldObject.optBoolean("doc_values");
if (!docValue) {
return;
}
}
docValueField = colName;
}
docValueField = colName;
return docValueField;
searchContext.docValueFieldsContext().put(colName, docValueField);
}
}

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.elasticsearch;
import org.apache.doris.catalog.EsTable;
import java.util.Map;
/**
* Fetch resolved indices's search shards from remote ES Cluster
*/
public class PartitionPhase implements SearchPhase {
private EsRestClient client;
private EsShardPartitions shardPartitions;
private Map<String, EsNodeInfo> nodesInfo;
public PartitionPhase(EsRestClient client) {
this.client = client;
}
@Override
public void execute(SearchContext context) throws DorisEsException {
shardPartitions = client.searchShards(context.sourceIndex());
nodesInfo = client.getHttpNodes();
}
@Override
public void postProcess(SearchContext context) throws DorisEsException {
context.partitions(shardPartitions);
if (EsTable.TRANSPORT_HTTP.equals(context.esTable().getTransport())) {
context.partitions().addHttpAddress(nodesInfo);
}
}
}

View File

@ -0,0 +1,145 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.elasticsearch;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.EsTable;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* This class encapsulates the state needed to execute a query on ES table such as fields、doc_values、resolved index、
* search shards etc.
* Since then, we would add more state or runtime information to this class such as
* query builder、slice scroll context、aggregation info etc.
**/
public class SearchContext {
private static final Logger LOG = LogManager.getLogger(SearchContext.class);
// fetch string field value from not analyzed fields : userId => userId.keyword
// this is activated when `enable_keyword_sniff = true`
private Map<String, String> fetchFieldsContext = Maps.newHashMap();
// used to indicate which fields can get from ES docavalue
// because elasticsearch can have "fields" feature, field can have
// two or more types, the first type maybe have not docvalue but other
// can have, such as (text field not have docvalue, but keyword can have):
// "properties": {
// "city": {
// "type": "text",
// "fields": {
// "raw": {
// "type": "keyword"
// }
// }
// }
// }
// then the docvalue context provided the mapping between the select field and real request field :
// {"city": "city.raw"}
// use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw`
// fetch field value from doc_values, this is activated when `enable_docvalue_scan= true`
private Map<String, String> docValueFieldsContext = Maps.newHashMap();
// sourceIndex is the name of index when creating ES external table
private final String sourceIndex;
// when the `sourceIndex` is `alias` or `wildcard` matched index, this maybe involved two or more indices
// `resolvedIndices` would return the matched underlying indices
private List<String> resolvedIndices = Collections.emptyList();
// `type` of the `sourceIndex`
private final String type;
private EsTable table;
// all columns which user created for ES external table
private final List<Column> fullSchema;
// represent `resolvedIndices`'s searchable shards
private EsShardPartitions shardPartitions;
// the ES cluster version
private EsMajorVersion version;
public SearchContext(EsTable table) {
this.table = table;
fullSchema = table.getFullSchema();
sourceIndex = table.getIndexName();
type = table.getMappingType();
}
public String sourceIndex() {
return sourceIndex;
}
public List<String> resolvedIndices() {
return resolvedIndices;
}
public String type() {
return type;
}
public List<Column> columns() {
return fullSchema;
}
public EsTable esTable() {
return table;
}
public Map<String, String> fetchFieldsContext() {
return fetchFieldsContext;
}
public Map<String, String> docValueFieldsContext() {
return docValueFieldsContext;
}
public void version(EsMajorVersion version) {
this.version = version;
}
public EsMajorVersion version() {
return version;
}
public void partitions(EsShardPartitions shardPartitions) {
this.shardPartitions = shardPartitions;
}
public EsShardPartitions partitions() {
return shardPartitions;
}
// this will be refactor soon
public EsTablePartitions tablePartitions() throws Exception {
return EsTablePartitions.fromShardPartitions(table, shardPartitions);
}
}

View File

@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.elasticsearch;
/**
* Represents a phase of a ES fetch index metadata request e.g. get mapping, get shard location etc through network
*/
public interface SearchPhase {
/**
* Performs pre processing of the search context before the execute.
*/
default void preProcess(SearchContext context) {
}
/**
* Executes the search phase
*/
void execute(SearchContext context);
/**
* Performs post processing of the search context before the execute.
*/
default void postProcess(SearchContext context) {
}
}

View File

@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.elasticsearch;
/**
* Request version from remote ES Cluster. If request fails, set the version with `LATEST`
*/
public class VersionPhase implements SearchPhase {
private EsRestClient client;
private boolean isVersionSet = false;
public VersionPhase(EsRestClient client) {
this.client = client;
}
@Override
public void preProcess(SearchContext context) {
if (context.esTable().esVersion() != null) {
isVersionSet = true;
context.version(context.esTable().esVersion());
}
}
@Override
public void execute(SearchContext context) {
if (isVersionSet) {
return;
}
EsMajorVersion version;
try {
version = client.version();
} catch (Throwable e) {
version = EsMajorVersion.LATEST;
}
context.version(version);
}
}

View File

@ -1,128 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.elasticsearch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.CatalogTestUtil;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.FakeCatalog;
import org.apache.doris.catalog.FakeEditLog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.meta.MetaContext;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
public class EsRepositoryTest {
private static FakeEditLog fakeEditLog;
private static FakeCatalog fakeCatalog;
private static Catalog masterCatalog;
private static String mappingsStr = "";
private static String es7MappingsStr = "";
private static String searchShardsStr = "";
private EsRepository esRepository;
private EsRestClient fakeClient;
@BeforeClass
public static void init() throws IOException, InstantiationException, IllegalAccessException,
IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException,
URISyntaxException {
fakeEditLog = new FakeEditLog();
fakeCatalog = new FakeCatalog();
masterCatalog = CatalogTestUtil.createTestCatalog();
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeMetaVersion.VERSION_40);
metaContext.setThreadLocalInfo();
// masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
FakeCatalog.setCatalog(masterCatalog);
mappingsStr = loadJsonFromFile("data/es/mappings.json");
es7MappingsStr = loadJsonFromFile("data/es/es7_mappings.json");
searchShardsStr = loadJsonFromFile("data/es/search_shards.json");
}
@Before
public void setUp() {
esRepository = new EsRepository();
fakeClient = new EsRestClient(new String[]{"localhost:9200"}, null, null);
}
@Test
public void testSetEsTableContext() throws Exception {
EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
.getDb(CatalogTestUtil.testDb1)
.getTable(CatalogTestUtil.testEsTableId1);
// es5
EsFieldInfos fieldInfos = EsFieldInfos.fromMapping(esTable.getFullSchema(), esTable.getIndexName(), mappingsStr, esTable.getMappingType());
esTable.addFieldInfos(fieldInfos);
assertEquals("userId.keyword", esTable.fieldsContext().get("userId"));
assertEquals("userId.keyword", esTable.docValueContext().get("userId"));
// es7
EsFieldInfos fieldInfos7 = EsFieldInfos.fromMapping(esTable.getFullSchema(), esTable.getIndexName(), es7MappingsStr, "");
assertEquals("userId.keyword", fieldInfos7.getFieldsContext().get("userId"));
assertEquals("userId.keyword", fieldInfos7.getDocValueContext().get("userId"));
}
@Test(expected = DorisEsException.class)
public void testSetErrorType() throws Exception {
EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
.getDb(CatalogTestUtil.testDb1)
.getTable(CatalogTestUtil.testEsTableId1);
// error type
EsFieldInfos.fromMapping(esTable.getFullSchema(), esTable.getIndexName(), mappingsStr, "errorType");
}
@Test
public void testSetTableState() throws DorisEsException, DdlException {
EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
.getDb(CatalogTestUtil.testDb1)
.getTable(CatalogTestUtil.testEsTableId1);
EsShardPartitions esShardPartitions = EsShardPartitions.findShardPartitions(esTable.getIndexName(), searchShardsStr);
EsTablePartitions esTablePartitions = EsTablePartitions.fromShardPartitions(esTable, esShardPartitions);
assertNotNull(esTablePartitions);
assertEquals(1, esTablePartitions.getUnPartitionedIndexStates().size());
assertEquals(5, esTablePartitions.getEsShardPartitions("indexa").getShardRoutings().size());
}
private static String loadJsonFromFile(String fileName) throws IOException, URISyntaxException {
File file = new File(EsRepositoryTest.class.getClassLoader().getResource(fileName).toURI());
InputStream is = new FileInputStream(file);
BufferedReader br = new BufferedReader(new InputStreamReader(is));
StringBuilder jsonStr = new StringBuilder();
String line = "";
while ((line = br.readLine()) != null) {
jsonStr.append(line);
}
br.close();
is.close();
return jsonStr.toString();
}
}

View File

@ -27,38 +27,38 @@ import org.junit.Test;
public class EsUtilTest {
private String jsonStr = "{\"settings\": {\n"
+ " \"index\": {\n"
+ " \"bpack\": {\n"
+ " \"partition\": {\n"
+ " \"upperbound\": \"12\"\n"
+ " }\n"
+ " },\n"
+ " \"number_of_shards\": \"5\",\n"
+ " \"provided_name\": \"indexa\",\n"
+ " \"creation_date\": \"1539328532060\",\n"
+ " \"number_of_replicas\": \"1\",\n"
+ " \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n"
+ " \"version\": {\n"
+ " \"created\": \"5050099\"\n"
+ " }\n"
+ " }\n"
private String jsonStr = "{\"settings\": {\n"
+ " \"index\": {\n"
+ " \"bpack\": {\n"
+ " \"partition\": {\n"
+ " \"upperbound\": \"12\"\n"
+ " }\n"
+ " },\n"
+ " \"number_of_shards\": \"5\",\n"
+ " \"provided_name\": \"indexa\",\n"
+ " \"creation_date\": \"1539328532060\",\n"
+ " \"number_of_replicas\": \"1\",\n"
+ " \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n"
+ " \"version\": {\n"
+ " \"created\": \"5050099\"\n"
+ " }\n"
+ " }\n"
+ " }}";
@Test
public void testGetJsonObject() {
JSONObject json = new JSONObject(jsonStr);
JSONObject upperBoundSetting = EsUtil.getJsonObject(json, "settings.index.bpack.partition", 0);
assertTrue(upperBoundSetting.has("upperbound"));
assertEquals("12", upperBoundSetting.getString("upperbound"));
JSONObject unExistKey = EsUtil.getJsonObject(json, "set", 0);
assertNull(unExistKey);
JSONObject singleKey = EsUtil.getJsonObject(json, "settings", 0);
assertTrue(singleKey.has("index"));
}
@Test(expected = JSONException.class)
public void testGetJsonObjectWithException() {
JSONObject json = new JSONObject(jsonStr);