[fix](ES Catalog)Make sure ES meta is synced before using (#46781) (#47711)

bp #46781
This commit is contained in:
qiye
2025-02-11 21:00:08 +08:00
committed by GitHub
parent e2f9b436a4
commit 7b579c75c6
6 changed files with 69 additions and 122 deletions

View File

@ -148,17 +148,26 @@ public class EsTable extends Table {
}
public Map<String, String> fieldsContext() throws UserException {
initEsMetaStateTracker();
return esMetaStateTracker.searchContext().fetchFieldsContext();
}
public Map<String, String> docValueContext() throws UserException {
initEsMetaStateTracker();
return esMetaStateTracker.searchContext().docValueFieldsContext();
}
public List<String> needCompatDateFields() throws UserException {
initEsMetaStateTracker();
return esMetaStateTracker.searchContext().needCompatDateFields();
}
private void initEsMetaStateTracker() {
if (esMetaStateTracker == null) {
esMetaStateTracker = new EsMetaStateTracker(client, this);
}
}
private void validate(Map<String, String> properties) throws DdlException {
EsResource.valid(properties, false);
if (properties.containsKey(EsResource.USER)) {
@ -322,6 +331,8 @@ public class EsTable extends Table {
} else {
throw new IOException("invalid partition type: " + partType);
}
// parse httpSslEnabled before use it here.
EsResource.fillUrlsWithSchema(seeds, httpSslEnabled);
client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
}
@ -329,9 +340,7 @@ public class EsTable extends Table {
* Sync es index meta from remote ES Cluster.
*/
public void syncTableMetaData() {
if (esMetaStateTracker == null) {
esMetaStateTracker = new EsMetaStateTracker(client, this);
}
initEsMetaStateTracker();
try {
esMetaStateTracker.run();
this.esTablePartitions = esMetaStateTracker.searchContext().tablePartitions();

View File

@ -197,13 +197,6 @@ extPgPort = 5432
extPgUser = "****"
extPgPassword = "***********"
// elasticsearch external test config for bigdata
enableExternalEsTest = false
extEsHost = "***********"
extEsPort = 9200
extEsUser = "*******"
extEsPassword = "***********"
enableExternalHudiTest = false
hudiEmrCatalog = "***********"

View File

@ -28,6 +28,8 @@ suite("test_es_query", "p0,external,es,external_docker,external_docker_es") {
sql """drop catalog if exists test_es_query_es6;"""
sql """drop catalog if exists test_es_query_es7;"""
sql """drop catalog if exists test_es_query_es8;"""
sql """drop catalog if exists es6_hide;"""
sql """drop catalog if exists es7_hide;"""
sql """drop table if exists test_v1;"""
sql """drop table if exists test_v2;"""
@ -166,9 +168,35 @@ suite("test_es_query", "p0,external,es,external_docker,external_docker_es") {
);
"""
def executeWithRetry = { query, queryName, maxRetries ->
def retryCount = 0
def success = false
while (!success && retryCount < maxRetries) {
try {
sql query
success = true
} catch (Exception e) {
if (e.getMessage().contains("EsTable metadata has not been synced, Try it later")) {
logger.error("Failed to execute ${queryName}: ${e.getMessage()}")
logger.info("Retrying... Attempt ${retryCount + 1}")
retryCount++
sleep(1000) // Sleep for 1 second
} else {
throw e // Rethrow if it's a different exception
}
}
}
if (!success) {
throw new RuntimeException("Failed to execute ${queryName} after ${maxRetries} attempts")
}
}
def query_catalogs = { ->
sql """switch internal"""
sql """use regression_test_external_table_p0_es"""
executeWithRetry("""select * from test_v1 where test2='text#1'""", "sql01", 30)
order_qt_sql01 """select * from test_v1 where test2='text#1'"""
order_qt_sql02 """select * from test_v1 where esquery(test2, '{"match":{"test2":"text#1"}}')"""
order_qt_sql03 """select test4,test5,test6,test7,test8 from test_v1 order by test8"""
@ -182,7 +210,7 @@ suite("test_es_query", "p0,external,es,external_docker,external_docker_es") {
order_qt_sql11 """select test6 from test_v1;"""
order_qt_sql12 """select test9 from test_v1;"""
order_qt_sql20 """select * from test_v2 where test2='text#1'"""
executeWithRetry("""select * from test_v2 where test2='text#1'""", "sql20", 30)
order_qt_sql21 """select * from test_v2 where esquery(test2, '{"match":{"test2":"text#1"}}')"""
order_qt_sql22 """select test4,test5,test6,test7,test8 from test_v2 order by test8"""
order_qt_sql23 """select * from test_v2 where esquery(c_long, '{"term":{"c_long":"-1"}}');"""

View File

@ -23,6 +23,31 @@ suite("test_es_query_no_http_url", "p0,external,es,external_docker,external_dock
String es_7_port = context.config.otherConfigs.get("es_7_port")
String es_8_port = context.config.otherConfigs.get("es_8_port")
def executeWithRetry = { query, queryName, maxRetries ->
def retryCount = 0
def success = false
while (!success && retryCount < maxRetries) {
try {
sql query
success = true
} catch (Exception e) {
if (e.getMessage().contains("EsTable metadata has not been synced, Try it later")) {
logger.error("Failed to execute ${queryName}: ${e.getMessage()}")
logger.info("Retrying... Attempt ${retryCount + 1}")
retryCount++
sleep(1000) // Sleep for 1 second
} else {
throw e // Rethrow if it's a different exception
}
}
}
if (!success) {
throw new RuntimeException("Failed to execute ${queryName} after ${maxRetries} attempts")
}
}
sql """drop catalog if exists es6_no_http_url;"""
sql """drop catalog if exists es7_no_http_url;"""
sql """drop catalog if exists es8_no_http_url;"""
@ -95,9 +120,9 @@ suite("test_es_query_no_http_url", "p0,external,es,external_docker,external_dock
"http_ssl_enabled"="false"
);
"""
order_qt_sql51 """select * from test_v1_no_http_url where test2='text#1'"""
executeWithRetry("""select * from test_v1_no_http_url where test2='text#1'""", "sql51", 30)
sql """
sql """
CREATE TABLE `test_v2_no_http_url` (
`c_datetime` array<datev2> NULL,
`c_long` array<bigint(20)> NULL,
@ -133,7 +158,7 @@ suite("test_es_query_no_http_url", "p0,external,es,external_docker,external_dock
"http_ssl_enabled"="false"
);
"""
order_qt_sql52 """select * from test_v2_no_http_url where test2='text#1'"""
executeWithRetry("""select * from test_v2_no_http_url where test2='text#1'""", "sql52", 30)
// es6
sql """switch es6_no_http_url"""

View File

@ -1,52 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//import org.postgresql.Driver
suite("test_external_catalog_es", "p2,external,es,external_remote,external_remote_es") {
Boolean ignoreP2 = true;
if (ignoreP2) {
logger.info("disable p2 test");
return;
}
String enabled = context.config.otherConfigs.get("enableExternalEsTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extEsHost = context.config.otherConfigs.get("extEsHost")
String extEsPort = context.config.otherConfigs.get("extEsPort")
String extEsUser = context.config.otherConfigs.get("extEsUser")
String extEsPassword = context.config.otherConfigs.get("extEsPassword")
String esCatalogName ="es7_catalog_name"
String jdbcPg14Table1 = "accounts"
sql """drop catalog if exists ${esCatalogName}"""
sql """
CREATE CATALOG ${esCatalogName} PROPERTIES (
"type"="es",
"elasticsearch.hosts"="http://${extEsHost}:${extEsPort}",
"elasticsearch.nodes_discovery"="false",
"elasticsearch.username"="${extEsUser}",
"elasticsearch.password"="${extEsPassword}"
);
"""
qt_sql "select * from ${esCatalogName}.default_db.${jdbcPg14Table1} order by account_number limit 10;"
sql """drop catalog if exists ${esCatalogName};"""
}
}

View File

@ -1,56 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//import org.postgresql.Driver
suite("test_external_es", "p2,external,es,external_remote,external_remote_es") {
String enabled = context.config.otherConfigs.get("enableExternalEsTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extEsHost = context.config.otherConfigs.get("extEsHost")
String extEsPort = context.config.otherConfigs.get("extEsPort")
String extEsUser = context.config.otherConfigs.get("extEsUser")
String extEsPassword = context.config.otherConfigs.get("extEsPassword")
String jdbcPg14Database1 = "jdbc_es_14_database1"
String jdbcPg14Table1 = "jdbc_es_14_table1"
sql """drop database if exists ${jdbcPg14Database1};"""
sql """create database ${jdbcPg14Database1};"""
sql """use ${jdbcPg14Database1};"""
sql """drop table if exists ${jdbcPg14Table1};"""
sql """
CREATE EXTERNAL TABLE `${jdbcPg14Table1}` (
`name` varchar(20) COMMENT "",
`age` varchar(20) COMMENT ""
) ENGINE=ELASTICSEARCH
PROPERTIES (
"hosts" = "http://${extEsHost}:${extEsPort}",
"index" = "helloworld",
"user" = "${extEsUser}",
"password" = "${extEsPassword}"
);
"""
def res=sql """show create table ${jdbcPg14Table1};"""
logger.info("recoding desc res: "+ res.toString())
def res1=sql "select * from ${jdbcPg14Table1};"
logger.info("recoding all: " + res1.toString())
sql """drop table if exists ${jdbcPg14Table1};"""
sql """drop database if exists ${jdbcPg14Database1};"""
}
}