[Doris On ES] Support create table with wildcard or aliase index (#3968)

This commit is contained in:
Yunfeng,Wu
2020-07-01 22:08:06 +08:00
committed by GitHub
parent fdcbea480d
commit 2362500e77
8 changed files with 49 additions and 41 deletions

View File

@ -19,11 +19,11 @@ package org.apache.doris.external.elasticsearch;
import org.apache.doris.common.UserException;
public class ExternalDataSourceException extends UserException {
public class DorisEsException extends UserException {
private static final long serialVersionUID = 7912833584319374692L;
public ExternalDataSourceException(String msg) {
public DorisEsException(String msg) {
super(msg);
}
}

View File

@ -64,7 +64,7 @@ public class EsFieldInfos {
* @return fieldsContext and docValueContext
* @throws Exception
*/
public static EsFieldInfos fromMapping(List<Column> colList, String indexName, String indexMapping, String docType) throws ExternalDataSourceException {
public static EsFieldInfos fromMapping(List<Column> colList, String indexName, String indexMapping, String docType) throws DorisEsException {
JSONObject jsonObject = new JSONObject(indexMapping);
// the indexName use alias takes the first mapping
Iterator<String> keys = jsonObject.keys();
@ -103,7 +103,7 @@ public class EsFieldInfos {
properties = rootSchema.optJSONObject("properties");
}
if (properties == null) {
throw new ExternalDataSourceException( "index[" + indexName + "] type[" + docType + "] mapping not found for the Elasticsearch Cluster");
throw new DorisEsException( "index[" + indexName + "] type[" + docType + "] mapping not found for the Elasticsearch Cluster");
}
return parseProperties(colList, properties);
}

View File

@ -95,7 +95,7 @@ public class EsRestClient {
String path = indexName + "/_mapping";
String indexMapping = execute(path);
if (indexMapping == null) {
throw new ExternalDataSourceException( "index[" + indexName + "] not found for the Elasticsearch Cluster");
throw new DorisEsException( "index[" + indexName + "] not found for the Elasticsearch Cluster");
}
return EsFieldInfos.fromMapping(colList, indexName, indexMapping, docType);
}
@ -105,7 +105,7 @@ public class EsRestClient {
String path = indexName + "/_search_shards";
String searchShards = execute(path);
if (searchShards == null) {
throw new ExternalDataSourceException( "index[" + indexName + "] search_shards not found for the Elasticsearch Cluster");
throw new DorisEsException( "index[" + indexName + "] search_shards not found for the Elasticsearch Cluster");
}
return EsShardPartitions.findShardPartitions(indexName, searchShards);
}

View File

@ -20,12 +20,15 @@ package org.apache.doris.external.elasticsearch;
import org.apache.doris.analysis.SingleRangePartitionDesc;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -47,17 +50,17 @@ public class EsShardPartitions {
this.partitionDesc = null;
this.partitionKey = null;
}
/**
* Parse shardRoutings from the json
* @param indexName indexName(alias or really name)
*
* @param indexName indexName(alias or really name)
* @param searchShards the return value of _search_shards
* @return shardRoutings is used for searching
*/
public static EsShardPartitions findShardPartitions(String indexName, String searchShards) throws ExternalDataSourceException {
public static EsShardPartitions findShardPartitions(String indexName, String searchShards) throws DorisEsException {
EsShardPartitions indexState = new EsShardPartitions(indexName);
JSONObject jsonObject = new JSONObject(searchShards);
JSONObject nodesMap = jsonObject.getJSONObject("nodes");
JSONArray shards = jsonObject.getJSONArray("shards");
int length = shards.length();
for (int i = 0; i < length; i++) {
@ -65,14 +68,20 @@ public class EsShardPartitions {
JSONArray shardsArray = shards.getJSONArray(i);
int arrayLength = shardsArray.length();
for (int j = 0; j < arrayLength; j++) {
JSONObject shard = shardsArray.getJSONObject(j);
String shardState = shard.getString("state");
JSONObject indexShard = shardsArray.getJSONObject(j);
String shardState = indexShard.getString("state");
if ("STARTED".equalsIgnoreCase(shardState) || "RELOCATING".equalsIgnoreCase(shardState)) {
try {
singleShardRouting.add(EsShardRouting.parseShardRouting(shardState, String.valueOf(i), shard, nodesMap));
singleShardRouting.add(
EsShardRouting.newSearchShard(
indexShard.getString("index"),
indexShard.getInt("shard"),
indexShard.getBoolean("primary"),
indexShard.getString("node"),
jsonObject.getJSONObject("nodes")));
} catch (Exception e) {
throw new ExternalDataSourceException( "index[" + indexName + "] findShardPartitions error");
}
LOG.error("fetch index [{}] shard partitions failure", indexName, e);
throw new DorisEsException("fetch [" + indexName + "] shard partitions failure [" + e.getMessage() + "]"); }
}
}
if (singleShardRouting.isEmpty()) {
@ -142,6 +151,6 @@ public class EsShardPartitions {
@Override
public String toString() {
return "EsIndexState [indexName=" + indexName + ", partitionDesc=" + partitionDesc + ", partitionKey="
+ partitionKey + "]";
+ partitionKey + "]";
}
}

View File

@ -18,10 +18,10 @@
package org.apache.doris.external.elasticsearch;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.commons.lang.StringUtils;
import org.json.JSONObject;
import org.apache.doris.thrift.TNetworkAddress;
public class EsShardRouting {
@ -41,9 +41,8 @@ public class EsShardRouting {
this.nodeId = nodeId;
}
public static EsShardRouting parseShardRouting(String indexName, String shardKey,
JSONObject shardInfo, JSONObject nodesMap) {
String nodeId = shardInfo.getString("node");
public static EsShardRouting newSearchShard(String indexName, int shardId, boolean isPrimary,
String nodeId, JSONObject nodesMap) {
JSONObject nodeInfo = nodesMap.getJSONObject(nodeId);
String[] transportAddr = nodeInfo.getString("transport_address").split(":");
// get thrift port from node info
@ -53,9 +52,7 @@ public class EsShardRouting {
if (!StringUtils.isEmpty(thriftPort)) {
addr = new TNetworkAddress(transportAddr[0], Integer.parseInt(thriftPort));
}
boolean isPrimary = shardInfo.getBoolean("primary");
return new EsShardRouting(indexName, Integer.parseInt(shardKey),
isPrimary, addr, nodeId);
return new EsShardRouting(indexName, shardId, isPrimary, addr, nodeId);
}
public int getShardId() {

View File

@ -17,12 +17,6 @@
package org.apache.doris.external.elasticsearch;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.PartitionInfo;
@ -31,11 +25,19 @@ import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.SinglePartitionInfo;
import org.apache.doris.common.DdlException;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* save the dynamic info parsed from es cluster state such as shard routing, partition info
*/
@ -56,7 +58,7 @@ public class EsTablePartitions {
}
public static EsTablePartitions fromShardPartitions(EsTable esTable, EsShardPartitions shardPartitions)
throws ExternalDataSourceException, DdlException {
throws DorisEsException, DdlException {
EsTablePartitions esTablePartitions = new EsTablePartitions();
RangePartitionInfo partitionInfo = null;
if (esTable.getPartitionInfo() != null) {
@ -82,7 +84,7 @@ public class EsTablePartitions {
LOG.debug("begin to parse es table [{}] state from search shards, "
+ "with no partition info", esTable.getName());
} else {
throw new ExternalDataSourceException("es table only support range partition, "
throw new DorisEsException("es table only support range partition, "
+ "but current partition type is "
+ esTable.getPartitionInfo().getType());
}

View File

@ -40,6 +40,9 @@ import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -47,9 +50,6 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -214,7 +214,7 @@ public class EsScanNode extends ScanNode {
// Generate on es scan range
TEsScanRange esScanRange = new TEsScanRange();
esScanRange.setEs_hosts(shardAllocations);
esScanRange.setIndex(indexState.getIndexName());
esScanRange.setIndex(shardRouting.get(0).getIndexName());
esScanRange.setType(table.getMappingType());
esScanRange.setShard_id(shardRouting.get(0).getShardId());
// Scan range

View File

@ -91,7 +91,7 @@ public class EsRepositoryTest {
}
@Test(expected = ExternalDataSourceException.class)
@Test(expected = DorisEsException.class)
public void testSetErrorType() throws Exception {
EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
.getDb(CatalogTestUtil.testDb1)
@ -101,7 +101,7 @@ public class EsRepositoryTest {
}
@Test
public void testSetTableState() throws ExternalDataSourceException, DdlException {
public void testSetTableState() throws DorisEsException, DdlException {
EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
.getDb(CatalogTestUtil.testDb1)
.getTable(CatalogTestUtil.testEsTableId1);