[Improvement](statistics)Support basic jdbc external table stats collection (#23963)
Support jdbc external table stats collection.
This commit is contained in:
@ -18,8 +18,13 @@
|
||||
package org.apache.doris.catalog.external;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.JdbcTable;
|
||||
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.JdbcAnalysisTask;
|
||||
import org.apache.doris.statistics.TableStats;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -93,4 +98,27 @@ public class JdbcExternalTable extends ExternalTable {
|
||||
jdbcTable.setCheckSum(jdbcCatalog.getCheckSum());
|
||||
return jdbcTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
|
||||
makeSureInitialized();
|
||||
return new JdbcAnalysisTask(info);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRowCount() {
|
||||
makeSureInitialized();
|
||||
TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(id);
|
||||
if (tableStats != null) {
|
||||
long rowCount = tableStats.rowCount;
|
||||
LOG.debug("Estimated row count for db {} table {} is {}.", dbName, name, rowCount);
|
||||
return rowCount;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long estimatedRowCount() {
|
||||
return getRowCount();
|
||||
}
|
||||
}
|
||||
|
||||
@ -688,11 +688,10 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
Set<String> cols = dropStatsStmt.getColumnNames();
|
||||
long tblId = dropStatsStmt.getTblId();
|
||||
TableStats tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
|
||||
if (tableStats == null) {
|
||||
return;
|
||||
if (tableStats != null) {
|
||||
tableStats.updatedTime = 0;
|
||||
replayUpdateTableStatsStatus(tableStats);
|
||||
}
|
||||
tableStats.updatedTime = 0;
|
||||
replayUpdateTableStatsStatus(tableStats);
|
||||
StatisticsRepository.dropStatistics(tblId, cols);
|
||||
for (String col : cols) {
|
||||
Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, col);
|
||||
|
||||
@ -0,0 +1,174 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.external.JdbcExternalTable;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.QueryState;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class JdbcAnalysisTask extends BaseAnalysisTask {
|
||||
private static final Logger LOG = LogManager.getLogger(JdbcAnalysisTask.class);
|
||||
|
||||
private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
|
||||
+ "${catalogId} AS catalog_id, "
|
||||
+ "${dbId} AS db_id, "
|
||||
+ "${tblId} AS tbl_id, "
|
||||
+ "${idxId} AS idx_id, "
|
||||
+ "'${colId}' AS col_id, "
|
||||
+ "NULL AS part_id, "
|
||||
+ "COUNT(1) AS row_count, "
|
||||
+ "NDV(`${colName}`) AS ndv, "
|
||||
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, "
|
||||
+ "MIN(`${colName}`) AS min, "
|
||||
+ "MAX(`${colName}`) AS max, "
|
||||
+ "${dataSizeFunction} AS data_size, "
|
||||
+ "NOW() "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
|
||||
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
|
||||
private final boolean isTableLevelTask;
|
||||
private JdbcExternalTable table;
|
||||
|
||||
public JdbcAnalysisTask(AnalysisInfo info) {
|
||||
super(info);
|
||||
isTableLevelTask = info.externalTableLevelTask;
|
||||
table = (JdbcExternalTable) tbl;
|
||||
}
|
||||
|
||||
public void doExecute() throws Exception {
|
||||
if (isTableLevelTask) {
|
||||
getTableStats();
|
||||
} else {
|
||||
getTableColumnStats();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get table row count and store the result to metadata.
|
||||
*/
|
||||
private void getTableStats() throws Exception {
|
||||
Map<String, String> params = buildTableStatsParams(null);
|
||||
List<ResultRow> columnResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(ANALYZE_TABLE_COUNT_TEMPLATE));
|
||||
String rowCount = columnResult.get(0).get(0);
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTableStatsStatus(new TableStats(table.getId(), Long.parseLong(rowCount), info));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get column statistics and insert the result to __internal_schema.column_statistics
|
||||
*/
|
||||
private void getTableColumnStats() throws Exception {
|
||||
// An example sql for a column stats:
|
||||
// INSERT INTO __internal_schema.column_statistics
|
||||
// SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
|
||||
// 13002 AS catalog_id,
|
||||
// 13038 AS db_id,
|
||||
// 13055 AS tbl_id,
|
||||
// -1 AS idx_id,
|
||||
// 'r_regionkey' AS col_id,
|
||||
// 'NULL' AS part_id,
|
||||
// COUNT(1) AS row_count,
|
||||
// NDV(`r_regionkey`) AS ndv,
|
||||
// SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS null_count,
|
||||
// MIN(`r_regionkey`) AS min,
|
||||
// MAX(`r_regionkey`) AS max,
|
||||
// 0 AS data_size,
|
||||
// NOW() FROM `hive`.`tpch100`.`region`
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
|
||||
Map<String, String> params = buildTableStatsParams("NULL");
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
params.put("colName", col.getName());
|
||||
params.put("colId", info.colName);
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
executeInsertSql(sql);
|
||||
}
|
||||
|
||||
private void executeInsertSql(String sql) throws Exception {
|
||||
long startTime = System.currentTimeMillis();
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
r.connectContext.setExecutor(stmtExecutor);
|
||||
this.stmtExecutor.execute();
|
||||
QueryState queryState = r.connectContext.getState();
|
||||
if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
|
||||
LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], error: [%s]",
|
||||
info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage()));
|
||||
throw new RuntimeException(queryState.getErrorMessage());
|
||||
}
|
||||
LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d ms.",
|
||||
info.catalogName, info.dbName, info.colName, sql, (System.currentTimeMillis() - startTime)));
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> buildTableStatsParams(String partId) {
|
||||
Map<String, String> commonParams = new HashMap<>();
|
||||
String id = StatisticsUtil.constructId(tbl.getId(), -1);
|
||||
if (partId == null) {
|
||||
commonParams.put("partId", "NULL");
|
||||
} else {
|
||||
id = StatisticsUtil.constructId(id, partId);
|
||||
commonParams.put("partId", "\'" + partId + "\'");
|
||||
}
|
||||
commonParams.put("id", id);
|
||||
commonParams.put("catalogId", String.valueOf(catalog.getId()));
|
||||
commonParams.put("dbId", String.valueOf(db.getId()));
|
||||
commonParams.put("tblId", String.valueOf(tbl.getId()));
|
||||
commonParams.put("indexId", "-1");
|
||||
commonParams.put("idxId", "-1");
|
||||
commonParams.put("catalogName", catalog.getName());
|
||||
commonParams.put("dbName", db.getFullName());
|
||||
commonParams.put("tblName", tbl.getName());
|
||||
if (col != null) {
|
||||
commonParams.put("type", col.getType().toString());
|
||||
}
|
||||
commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis()));
|
||||
return commonParams;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterExecution() {
|
||||
// Table level task doesn't need to sync any value to sync stats, it stores the value in metadata.
|
||||
if (isTableLevelTask) {
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
|
||||
}
|
||||
}
|
||||
@ -42,9 +42,9 @@ import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.StructType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.VariantType;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
@ -704,6 +704,6 @@ public class StatisticsUtil {
|
||||
LOG.warn(e.getMessage());
|
||||
return false;
|
||||
}
|
||||
return table.getType().equals(TableType.HMS_EXTERNAL_TABLE);
|
||||
return table instanceof ExternalTable;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,64 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_mysql_jdbc_statistics", "p0,external,mysql,external_docker,external_docker_mysql") {
|
||||
String enabled = context.config.otherConfigs.get("enableJdbcTest")
|
||||
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
|
||||
String mysql_port = context.config.otherConfigs.get("mysql_57_port");
|
||||
String s3_endpoint = getS3Endpoint()
|
||||
String bucket = getS3BucketName()
|
||||
String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-java-8.0.25.jar"
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
String catalog_name = "test_mysql_jdbc_statistics";
|
||||
|
||||
sql """create catalog if not exists ${catalog_name} properties(
|
||||
"type"="jdbc",
|
||||
"user"="root",
|
||||
"password"="123456",
|
||||
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false&zeroDateTimeBehavior=convertToNull",
|
||||
"driver_url" = "${driver_url}",
|
||||
"driver_class" = "com.mysql.cj.jdbc.Driver"
|
||||
);"""
|
||||
|
||||
sql """use ${catalog_name}.doris_test"""
|
||||
sql """analyze table ex_tb0 with sync"""
|
||||
def result = sql """show column stats ex_tb0 (name)"""
|
||||
assertTrue(result.size() == 1)
|
||||
assertTrue(result[0][0] == "name")
|
||||
assertTrue(result[0][1] == "5.0")
|
||||
assertTrue(result[0][2] == "5.0")
|
||||
assertTrue(result[0][3] == "0.0")
|
||||
assertTrue(result[0][4] == "18.0")
|
||||
assertTrue(result[0][5] == "3.0")
|
||||
assertTrue(result[0][6] == "'abc'")
|
||||
assertTrue(result[0][7] == "'abg'")
|
||||
|
||||
result = sql """show column stats ex_tb0 (id)"""
|
||||
assertTrue(result.size() == 1)
|
||||
assertTrue(result[0][0] == "id")
|
||||
assertTrue(result[0][1] == "5.0")
|
||||
assertTrue(result[0][2] == "5.0")
|
||||
assertTrue(result[0][3] == "0.0")
|
||||
assertTrue(result[0][4] == "24.0")
|
||||
assertTrue(result[0][5] == "4.0")
|
||||
assertTrue(result[0][6] == "111")
|
||||
assertTrue(result[0][7] == "115")
|
||||
|
||||
sql """drop catalog ${catalog_name}"""
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user