From 5abef19be477db672e29fdc7e0ca12490eeb5611 Mon Sep 17 00:00:00 2001 From: "Yunfeng,Wu" Date: Fri, 5 Jun 2020 23:06:46 +0800 Subject: [PATCH] [Doris On ES] Add more detailed error message when fail to create es table (#3758) --- .../org/apache/doris/catalog/EsTable.java | 11 +++ .../apache/doris/external/EsRestClient.java | 14 ++-- .../apache/doris/external/EsStateStore.java | 71 ++++--------------- .../org/apache/doris/planner/EsScanNode.java | 16 +++-- .../org/apache/doris/es/EsStateStoreTest.java | 8 +-- 5 files changed, 51 insertions(+), 69 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java index 252d962d09..a8baaa0f26 100644 --- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java @@ -97,6 +97,9 @@ public class EsTable extends Table { private Map fieldsContext = new HashMap<>(); + // record the latest and recently exception when sync ES table metadata (mapping, shard location) + private Throwable lastMetaDataSyncException = null; + public EsTable() { super(TableType.ELASTICSEARCH); } @@ -387,4 +390,12 @@ public class EsTable extends Table { public void setEsTableState(EsTableState esTableState) { this.esTableState = esTableState; } + + public Throwable getLastMetaDataSyncException() { + return lastMetaDataSyncException; + } + + public void setLastMetaDataSyncException(Throwable lastMetaDataSyncException) { + this.lastMetaDataSyncException = lastMetaDataSyncException; + } } diff --git a/fe/src/main/java/org/apache/doris/external/EsRestClient.java b/fe/src/main/java/org/apache/doris/external/EsRestClient.java index 73759c4a4e..2b93ea947d 100644 --- a/fe/src/main/java/org/apache/doris/external/EsRestClient.java +++ b/fe/src/main/java/org/apache/doris/external/EsRestClient.java @@ -86,7 +86,7 @@ public class EsRestClient { return nodes; } - public String getIndexMetaData(String indexName) { + public String getIndexMetaData(String indexName) throws Exception { String path = "_cluster/state?indices=" + indexName + "&metric=routing_table,nodes,metadata&expand_wildcards=open"; return execute(path); @@ -99,7 +99,7 @@ public class EsRestClient { * * @return */ - public EsMajorVersion version() { + public EsMajorVersion version() throws Exception { Map versionMap = get("/", "version"); EsMajorVersion majorVersion; @@ -118,9 +118,10 @@ public class EsRestClient { * @param path the path must not leading with '/' * @return */ - private String execute(String path) { + private String execute(String path) throws Exception { selectNextNode(); boolean nextNode; + Exception scratchExceptionForThrow = null; do { Request.Builder builder = new Request.Builder(); if (!Strings.isEmpty(basicAuth)) { @@ -141,7 +142,6 @@ public class EsRestClient { Request request = builder.get() .url(currentNode + "/" + path) .build(); - LOG.trace("es rest client request URL: {}", currentNode + "/" + path); try { Response response = networkClient.newCall(request).execute(); if (response.isSuccessful()) { @@ -149,16 +149,20 @@ public class EsRestClient { } } catch (IOException e) { LOG.warn("request node [{}] [{}] failures {}, try next nodes", currentNode, path, e); + scratchExceptionForThrow = e; } nextNode = selectNextNode(); if (!nextNode) { LOG.warn("try all nodes [{}],no other nodes left", nodes); } } while (nextNode); + if (scratchExceptionForThrow != null) { + throw scratchExceptionForThrow; + } return null; } - public T get(String q, String key) { + public T get(String q, String key) throws Exception { return parseContent(execute(q), key); } diff --git a/fe/src/main/java/org/apache/doris/external/EsStateStore.java b/fe/src/main/java/org/apache/doris/external/EsStateStore.java index 0ac05f21ca..460cc70e7c 100644 --- a/fe/src/main/java/org/apache/doris/external/EsStateStore.java +++ b/fe/src/main/java/org/apache/doris/external/EsStateStore.java @@ -40,21 +40,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONObject; -import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import okhttp3.Authenticator; -import okhttp3.Call; -import okhttp3.Credentials; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.Route; - /** * it is used to call es api to get shard allocation state @@ -92,9 +83,12 @@ public class EsStateStore extends MasterDaemon { // in the future, we maybe need this version String indexMetaData = client.getIndexMetaData(esTable.getIndexName()); if (indexMetaData == null) { + esTable.setLastMetaDataSyncException(new Exception("fetch index meta data failure from /_cluster/state")); + // set null for checking in EsScanNode#getShardLocations + esTable.setEsTableState(null); continue; } - EsTableState esTableState = parseClusterState55(indexMetaData, esTable); + EsTableState esTableState = getTableState(indexMetaData, esTable); if (esTableState == null) { continue; } @@ -105,6 +99,8 @@ public class EsStateStore extends MasterDaemon { esTable.setEsTableState(esTableState); } catch (Throwable e) { LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", esTable.getName(), e); + esTable.setEsTableState(null); + esTable.setLastMetaDataSyncException(e); } } } @@ -112,7 +108,6 @@ public class EsStateStore extends MasterDaemon { // should call this method to init the state store after loading image // the rest of tables will be added or removed by replaying edit log // when fe is start to load image, should call this method to init the state store - public void loadTableFromCatalog() { if (Catalog.isCheckpointThread()) { return; @@ -130,54 +125,10 @@ public class EsStateStore extends MasterDaemon { } } - private EsTableState loadEsIndexMetadataV55(final EsTable esTable) { - OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder(); - clientBuilder.authenticator(new Authenticator() { - @Override - public Request authenticate(Route route, Response response) throws IOException { - String credential = Credentials.basic(esTable.getUserName(), esTable.getPasswd()); - return response.request().newBuilder().header("Authorization", credential).build(); - } - }); - String[] seeds = esTable.getSeeds(); - for (String seed : seeds) { - String url = seed + "/_cluster/state?indices=" - + esTable.getIndexName() - + "&metric=routing_table,nodes,metadata&expand_wildcards=open"; - String basicAuth = ""; - try { - Request request = new Request.Builder() - .get() - .url(url) - .addHeader("Authorization", basicAuth) - .build(); - Call call = clientBuilder.build().newCall(request); - Response response = call.execute(); - String responseStr = response.body().string(); - if (response.isSuccessful()) { - try { - EsTableState esTableState = parseClusterState55(responseStr, esTable); - if (esTableState != null) { - return esTableState; - } - } catch (Exception e) { - LOG.warn("errors while parse response msg {}", responseStr, e); - } - } else { - LOG.info("errors while call es [{}] to get state info {}", url, responseStr); - } - } catch (Exception e) { - LOG.warn("errors while call es [{}]", url, e); - } - } - return null; - } - @VisibleForTesting - public EsTableState parseClusterState55(String responseStr, EsTable esTable) + public EsTableState getTableState(String responseStr, EsTable esTable) throws DdlException, AnalysisException, ExternalDataSourceException { JSONObject jsonObject = new JSONObject(responseStr); - String clusterName = jsonObject.getString("cluster_name"); JSONObject nodesMap = jsonObject.getJSONObject("nodes"); // we build the doc value context for fields maybe used for scanning // "properties": { @@ -194,9 +145,17 @@ public class EsStateStore extends MasterDaemon { // {"city": "city.raw"} JSONObject indicesMetaMap = jsonObject.getJSONObject("metadata").getJSONObject("indices"); JSONObject indexMetaMap = indicesMetaMap.optJSONObject(esTable.getIndexName()); + if (indexMetaMap == null) { + esTable.setLastMetaDataSyncException(new Exception( "index[" + esTable.getIndexName() + "] not found for the Elasticsearch Cluster")); + return null; + } if (indexMetaMap != null && (esTable.isKeywordSniffEnable() || esTable.isDocValueScanEnable())) { JSONObject mappings = indexMetaMap.optJSONObject("mappings"); JSONObject rootSchema = mappings.optJSONObject(esTable.getMappingType()); + if (rootSchema == null) { + esTable.setLastMetaDataSyncException(new Exception( "type[" + esTable.getMappingType() + "] not found for the Elasticsearch Cluster with index: [" + esTable.getIndexName() + "]")); + return null; + } JSONObject schema = rootSchema.optJSONObject("properties"); List colList = esTable.getFullSchema(); for (Column col : colList) { diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java index 1412ef4798..65fd3f174f 100644 --- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -130,7 +130,6 @@ public class EsScanNode extends ScanNode { msg.es_scan_node = esScanNode; } - // TODO(ygl) assign backend that belong to the same cluster private void assignBackends() throws UserException { backendMap = HashMultimap.create(); backendList = Lists.newArrayList(); @@ -146,11 +145,13 @@ public class EsScanNode extends ScanNode { } // only do partition(es index level) prune - // TODO (ygl) should not get all shards, prune unrelated shard private List getShardLocations() throws UserException { // has to get partition info from es state not from table because the partition info is generated from es cluster state dynamically if (esTableState == null) { - throw new UserException("EsTable shard info has not been synced, wait some time or check log"); + if (table.getLastMetaDataSyncException() != null) { + throw new UserException("fetch es table [" + table.getName() + "] metadata failure: " + table.getLastMetaDataSyncException().getLocalizedMessage()); + } + throw new UserException("EsTable metadata has not been synced, Try it later"); } Collection partitionIds = partitionPrune(esTableState.getPartitionInfo()); List selectedIndex = Lists.newArrayList(); @@ -225,7 +226,14 @@ public class EsScanNode extends ScanNode { } } - LOG.debug("generate [{}] scan ranges to scan node [{}]", result.size(), result.get(0)); + if (LOG.isDebugEnabled()) { + StringBuilder scratchBuilder = new StringBuilder(); + for (TScanRangeLocations scanRangeLocations : result) { + scratchBuilder.append(scanRangeLocations.toString()); + scratchBuilder.append(" "); + } + LOG.debug("ES table {} scan ranges {}", table.getName(), scratchBuilder.toString()); + } return result; } diff --git a/fe/src/test/java/org/apache/doris/es/EsStateStoreTest.java b/fe/src/test/java/org/apache/doris/es/EsStateStoreTest.java index a7fa438074..c41c2c4e8e 100644 --- a/fe/src/test/java/org/apache/doris/es/EsStateStoreTest.java +++ b/fe/src/test/java/org/apache/doris/es/EsStateStoreTest.java @@ -102,7 +102,7 @@ public class EsStateStoreTest { boolean hasException = false; EsTableState esTableState = null; try { - esTableState = esStateStore.parseClusterState55(clusterStateStr1, esTable); + esTableState = esStateStore.getTableState(clusterStateStr1, esTable); } catch (Exception e) { e.printStackTrace(); hasException = true; @@ -149,7 +149,7 @@ public class EsStateStoreTest { boolean hasException = false; EsTableState esTableState = null; try { - esTableState = esStateStore.parseClusterState55(clusterStateStr3, esTable); + esTableState = esStateStore.getTableState(clusterStateStr3, esTable); } catch (Exception e) { e.printStackTrace(); hasException = true; @@ -197,7 +197,7 @@ public class EsStateStoreTest { boolean hasException = false; EsTableState esTableState = null; try { - esTableState = esStateStore.parseClusterState55(clusterStateStr4, esTable); + esTableState = esStateStore.getTableState(clusterStateStr4, esTable); } catch (Exception e) { e.printStackTrace(); hasException = true; @@ -230,7 +230,7 @@ public class EsStateStoreTest { boolean hasException = false; EsTableState esTableState = null; try { - esTableState = esStateStore.parseClusterState55(clusterStateStr2, esTable); + esTableState = esStateStore.getTableState(clusterStateStr2, esTable); } catch (Exception e) { hasException = true; }