[Doris On ES] Add more detailed error message when fail to create es table (#3758)
This commit is contained in:
@ -97,6 +97,9 @@ public class EsTable extends Table {
|
||||
|
||||
private Map<String, String> fieldsContext = new HashMap<>();
|
||||
|
||||
// record the latest and recently exception when sync ES table metadata (mapping, shard location)
|
||||
private Throwable lastMetaDataSyncException = null;
|
||||
|
||||
public EsTable() {
|
||||
super(TableType.ELASTICSEARCH);
|
||||
}
|
||||
@ -387,4 +390,12 @@ public class EsTable extends Table {
|
||||
public void setEsTableState(EsTableState esTableState) {
|
||||
this.esTableState = esTableState;
|
||||
}
|
||||
|
||||
public Throwable getLastMetaDataSyncException() {
|
||||
return lastMetaDataSyncException;
|
||||
}
|
||||
|
||||
public void setLastMetaDataSyncException(Throwable lastMetaDataSyncException) {
|
||||
this.lastMetaDataSyncException = lastMetaDataSyncException;
|
||||
}
|
||||
}
|
||||
|
||||
@ -86,7 +86,7 @@ public class EsRestClient {
|
||||
return nodes;
|
||||
}
|
||||
|
||||
public String getIndexMetaData(String indexName) {
|
||||
public String getIndexMetaData(String indexName) throws Exception {
|
||||
String path = "_cluster/state?indices=" + indexName
|
||||
+ "&metric=routing_table,nodes,metadata&expand_wildcards=open";
|
||||
return execute(path);
|
||||
@ -99,7 +99,7 @@ public class EsRestClient {
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public EsMajorVersion version() {
|
||||
public EsMajorVersion version() throws Exception {
|
||||
Map<String, String> versionMap = get("/", "version");
|
||||
|
||||
EsMajorVersion majorVersion;
|
||||
@ -118,9 +118,10 @@ public class EsRestClient {
|
||||
* @param path the path must not leading with '/'
|
||||
* @return
|
||||
*/
|
||||
private String execute(String path) {
|
||||
private String execute(String path) throws Exception {
|
||||
selectNextNode();
|
||||
boolean nextNode;
|
||||
Exception scratchExceptionForThrow = null;
|
||||
do {
|
||||
Request.Builder builder = new Request.Builder();
|
||||
if (!Strings.isEmpty(basicAuth)) {
|
||||
@ -141,7 +142,6 @@ public class EsRestClient {
|
||||
Request request = builder.get()
|
||||
.url(currentNode + "/" + path)
|
||||
.build();
|
||||
LOG.trace("es rest client request URL: {}", currentNode + "/" + path);
|
||||
try {
|
||||
Response response = networkClient.newCall(request).execute();
|
||||
if (response.isSuccessful()) {
|
||||
@ -149,16 +149,20 @@ public class EsRestClient {
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("request node [{}] [{}] failures {}, try next nodes", currentNode, path, e);
|
||||
scratchExceptionForThrow = e;
|
||||
}
|
||||
nextNode = selectNextNode();
|
||||
if (!nextNode) {
|
||||
LOG.warn("try all nodes [{}],no other nodes left", nodes);
|
||||
}
|
||||
} while (nextNode);
|
||||
if (scratchExceptionForThrow != null) {
|
||||
throw scratchExceptionForThrow;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public <T> T get(String q, String key) {
|
||||
public <T> T get(String q, String key) throws Exception {
|
||||
return parseContent(execute(q), key);
|
||||
}
|
||||
|
||||
|
||||
@ -40,21 +40,12 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.json.JSONObject;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import okhttp3.Authenticator;
|
||||
import okhttp3.Call;
|
||||
import okhttp3.Credentials;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.Response;
|
||||
import okhttp3.Route;
|
||||
|
||||
|
||||
/**
|
||||
* it is used to call es api to get shard allocation state
|
||||
@ -92,9 +83,12 @@ public class EsStateStore extends MasterDaemon {
|
||||
// in the future, we maybe need this version
|
||||
String indexMetaData = client.getIndexMetaData(esTable.getIndexName());
|
||||
if (indexMetaData == null) {
|
||||
esTable.setLastMetaDataSyncException(new Exception("fetch index meta data failure from /_cluster/state"));
|
||||
// set null for checking in EsScanNode#getShardLocations
|
||||
esTable.setEsTableState(null);
|
||||
continue;
|
||||
}
|
||||
EsTableState esTableState = parseClusterState55(indexMetaData, esTable);
|
||||
EsTableState esTableState = getTableState(indexMetaData, esTable);
|
||||
if (esTableState == null) {
|
||||
continue;
|
||||
}
|
||||
@ -105,6 +99,8 @@ public class EsStateStore extends MasterDaemon {
|
||||
esTable.setEsTableState(esTableState);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", esTable.getName(), e);
|
||||
esTable.setEsTableState(null);
|
||||
esTable.setLastMetaDataSyncException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -112,7 +108,6 @@ public class EsStateStore extends MasterDaemon {
|
||||
// should call this method to init the state store after loading image
|
||||
// the rest of tables will be added or removed by replaying edit log
|
||||
// when fe is start to load image, should call this method to init the state store
|
||||
|
||||
public void loadTableFromCatalog() {
|
||||
if (Catalog.isCheckpointThread()) {
|
||||
return;
|
||||
@ -130,54 +125,10 @@ public class EsStateStore extends MasterDaemon {
|
||||
}
|
||||
}
|
||||
|
||||
private EsTableState loadEsIndexMetadataV55(final EsTable esTable) {
|
||||
OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
|
||||
clientBuilder.authenticator(new Authenticator() {
|
||||
@Override
|
||||
public Request authenticate(Route route, Response response) throws IOException {
|
||||
String credential = Credentials.basic(esTable.getUserName(), esTable.getPasswd());
|
||||
return response.request().newBuilder().header("Authorization", credential).build();
|
||||
}
|
||||
});
|
||||
String[] seeds = esTable.getSeeds();
|
||||
for (String seed : seeds) {
|
||||
String url = seed + "/_cluster/state?indices="
|
||||
+ esTable.getIndexName()
|
||||
+ "&metric=routing_table,nodes,metadata&expand_wildcards=open";
|
||||
String basicAuth = "";
|
||||
try {
|
||||
Request request = new Request.Builder()
|
||||
.get()
|
||||
.url(url)
|
||||
.addHeader("Authorization", basicAuth)
|
||||
.build();
|
||||
Call call = clientBuilder.build().newCall(request);
|
||||
Response response = call.execute();
|
||||
String responseStr = response.body().string();
|
||||
if (response.isSuccessful()) {
|
||||
try {
|
||||
EsTableState esTableState = parseClusterState55(responseStr, esTable);
|
||||
if (esTableState != null) {
|
||||
return esTableState;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("errors while parse response msg {}", responseStr, e);
|
||||
}
|
||||
} else {
|
||||
LOG.info("errors while call es [{}] to get state info {}", url, responseStr);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("errors while call es [{}]", url, e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public EsTableState parseClusterState55(String responseStr, EsTable esTable)
|
||||
public EsTableState getTableState(String responseStr, EsTable esTable)
|
||||
throws DdlException, AnalysisException, ExternalDataSourceException {
|
||||
JSONObject jsonObject = new JSONObject(responseStr);
|
||||
String clusterName = jsonObject.getString("cluster_name");
|
||||
JSONObject nodesMap = jsonObject.getJSONObject("nodes");
|
||||
// we build the doc value context for fields maybe used for scanning
|
||||
// "properties": {
|
||||
@ -194,9 +145,17 @@ public class EsStateStore extends MasterDaemon {
|
||||
// {"city": "city.raw"}
|
||||
JSONObject indicesMetaMap = jsonObject.getJSONObject("metadata").getJSONObject("indices");
|
||||
JSONObject indexMetaMap = indicesMetaMap.optJSONObject(esTable.getIndexName());
|
||||
if (indexMetaMap == null) {
|
||||
esTable.setLastMetaDataSyncException(new Exception( "index[" + esTable.getIndexName() + "] not found for the Elasticsearch Cluster"));
|
||||
return null;
|
||||
}
|
||||
if (indexMetaMap != null && (esTable.isKeywordSniffEnable() || esTable.isDocValueScanEnable())) {
|
||||
JSONObject mappings = indexMetaMap.optJSONObject("mappings");
|
||||
JSONObject rootSchema = mappings.optJSONObject(esTable.getMappingType());
|
||||
if (rootSchema == null) {
|
||||
esTable.setLastMetaDataSyncException(new Exception( "type[" + esTable.getMappingType() + "] not found for the Elasticsearch Cluster with index: [" + esTable.getIndexName() + "]"));
|
||||
return null;
|
||||
}
|
||||
JSONObject schema = rootSchema.optJSONObject("properties");
|
||||
List<Column> colList = esTable.getFullSchema();
|
||||
for (Column col : colList) {
|
||||
|
||||
@ -130,7 +130,6 @@ public class EsScanNode extends ScanNode {
|
||||
msg.es_scan_node = esScanNode;
|
||||
}
|
||||
|
||||
// TODO(ygl) assign backend that belong to the same cluster
|
||||
private void assignBackends() throws UserException {
|
||||
backendMap = HashMultimap.create();
|
||||
backendList = Lists.newArrayList();
|
||||
@ -146,11 +145,13 @@ public class EsScanNode extends ScanNode {
|
||||
}
|
||||
|
||||
// only do partition(es index level) prune
|
||||
// TODO (ygl) should not get all shards, prune unrelated shard
|
||||
private List<TScanRangeLocations> getShardLocations() throws UserException {
|
||||
// has to get partition info from es state not from table because the partition info is generated from es cluster state dynamically
|
||||
if (esTableState == null) {
|
||||
throw new UserException("EsTable shard info has not been synced, wait some time or check log");
|
||||
if (table.getLastMetaDataSyncException() != null) {
|
||||
throw new UserException("fetch es table [" + table.getName() + "] metadata failure: " + table.getLastMetaDataSyncException().getLocalizedMessage());
|
||||
}
|
||||
throw new UserException("EsTable metadata has not been synced, Try it later");
|
||||
}
|
||||
Collection<Long> partitionIds = partitionPrune(esTableState.getPartitionInfo());
|
||||
List<EsIndexState> selectedIndex = Lists.newArrayList();
|
||||
@ -225,7 +226,14 @@ public class EsScanNode extends ScanNode {
|
||||
}
|
||||
|
||||
}
|
||||
LOG.debug("generate [{}] scan ranges to scan node [{}]", result.size(), result.get(0));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
StringBuilder scratchBuilder = new StringBuilder();
|
||||
for (TScanRangeLocations scanRangeLocations : result) {
|
||||
scratchBuilder.append(scanRangeLocations.toString());
|
||||
scratchBuilder.append(" ");
|
||||
}
|
||||
LOG.debug("ES table {} scan ranges {}", table.getName(), scratchBuilder.toString());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@ -102,7 +102,7 @@ public class EsStateStoreTest {
|
||||
boolean hasException = false;
|
||||
EsTableState esTableState = null;
|
||||
try {
|
||||
esTableState = esStateStore.parseClusterState55(clusterStateStr1, esTable);
|
||||
esTableState = esStateStore.getTableState(clusterStateStr1, esTable);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
hasException = true;
|
||||
@ -149,7 +149,7 @@ public class EsStateStoreTest {
|
||||
boolean hasException = false;
|
||||
EsTableState esTableState = null;
|
||||
try {
|
||||
esTableState = esStateStore.parseClusterState55(clusterStateStr3, esTable);
|
||||
esTableState = esStateStore.getTableState(clusterStateStr3, esTable);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
hasException = true;
|
||||
@ -197,7 +197,7 @@ public class EsStateStoreTest {
|
||||
boolean hasException = false;
|
||||
EsTableState esTableState = null;
|
||||
try {
|
||||
esTableState = esStateStore.parseClusterState55(clusterStateStr4, esTable);
|
||||
esTableState = esStateStore.getTableState(clusterStateStr4, esTable);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
hasException = true;
|
||||
@ -230,7 +230,7 @@ public class EsStateStoreTest {
|
||||
boolean hasException = false;
|
||||
EsTableState esTableState = null;
|
||||
try {
|
||||
esTableState = esStateStore.parseClusterState55(clusterStateStr2, esTable);
|
||||
esTableState = esStateStore.getTableState(clusterStateStr2, esTable);
|
||||
} catch (Exception e) {
|
||||
hasException = true;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user