backport: https://github.com/apache/doris/pull/42674
This commit is contained in:
@ -51,8 +51,24 @@ import java.util.stream.Collectors;
|
||||
public class JdbcExternalTable extends ExternalTable {
|
||||
private static final Logger LOG = LogManager.getLogger(JdbcExternalTable.class);
|
||||
|
||||
public static final String MYSQL_ROW_COUNT_SQL = "SELECT * FROM QUERY"
|
||||
+ "(\"catalog\"=\"${ctlName}\", \"query\"=\"show table status from `${dbName}` like '${tblName}'\");";
|
||||
public static final String MYSQL_ROW_COUNT_SQL = "SELECT max(row_count) as rows FROM ("
|
||||
+ "(SELECT TABLE_ROWS AS row_count FROM INFORMATION_SCHEMA.TABLES "
|
||||
+ "WHERE TABLE_SCHEMA = '${dbName}' AND TABLE_NAME = '${tblName}' "
|
||||
+ "AND TABLE_TYPE = 'BASE TABLE') "
|
||||
+ "UNION ALL "
|
||||
+ "(SELECT CARDINALITY AS row_count FROM INFORMATION_SCHEMA.STATISTICS "
|
||||
+ "WHERE TABLE_SCHEMA = '${dbName}' AND TABLE_NAME = '${tblName}' "
|
||||
+ "AND CARDINALITY IS NOT NULL)) t";
|
||||
|
||||
public static final String PG_ROW_COUNT_SQL = "SELECT reltuples as rows FROM pg_class "
|
||||
+ "WHERE relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = '${dbName}') "
|
||||
+ "AND relname = '${tblName}'";
|
||||
|
||||
public static final String SQLSERVER_ROW_COUNT_SQL = "SELECT sum(rows) as rows FROM sys.partitions "
|
||||
+ "WHERE object_id = (SELECT object_id('${dbName}.${tblName}')) AND index_id IN (0, 1)";
|
||||
|
||||
public static final String FETCH_ROW_COUNT_TEMPLATE = "SELECT * FROM QUERY"
|
||||
+ "(\"catalog\"=\"${ctlName}\", \"query\"=\"${sql}\");";
|
||||
|
||||
private JdbcTable jdbcTable;
|
||||
|
||||
@ -186,41 +202,55 @@ public class JdbcExternalTable extends ExternalTable {
|
||||
params.put("tblName", this.remoteName);
|
||||
switch (((JdbcExternalCatalog) catalog).getDatabaseTypeName()) {
|
||||
case JdbcResource.MYSQL:
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(MYSQL_ROW_COUNT_SQL);
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
List<ResultRow> resultRows = stmtExecutor.executeInternalQuery();
|
||||
if (resultRows == null || resultRows.size() != 1) {
|
||||
LOG.info("No mysql status found for table {}.{}.{}", catalog.getName(), dbName, name);
|
||||
return -1;
|
||||
}
|
||||
StatementBase parsedStmt = stmtExecutor.getParsedStmt();
|
||||
if (parsedStmt == null || parsedStmt.getColLabels() == null) {
|
||||
LOG.info("No column label found for table {}.{}.{}", catalog.getName(), dbName, name);
|
||||
return -1;
|
||||
}
|
||||
ResultRow resultRow = resultRows.get(0);
|
||||
List<String> colLabels = parsedStmt.getColLabels();
|
||||
int index = colLabels.indexOf("TABLE_ROWS");
|
||||
if (index == -1) {
|
||||
LOG.info("No TABLE_ROWS in status for table {}.{}.{}", catalog.getName(), dbName, name);
|
||||
return -1;
|
||||
}
|
||||
long rows = Long.parseLong(resultRow.get(index));
|
||||
LOG.info("Get mysql table {}.{}.{} row count {}", catalog.getName(), dbName, name, rows);
|
||||
return rows;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to fetch mysql row count for table {}.{}.{}. Reason [{}]",
|
||||
catalog.getName(), dbName, name, e.getMessage());
|
||||
return -1;
|
||||
}
|
||||
case JdbcResource.ORACLE:
|
||||
params.put("sql", MYSQL_ROW_COUNT_SQL);
|
||||
return getRowCount(params);
|
||||
case JdbcResource.POSTGRESQL:
|
||||
params.put("sql", PG_ROW_COUNT_SQL);
|
||||
return getRowCount(params);
|
||||
case JdbcResource.SQLSERVER:
|
||||
params.put("sql", SQLSERVER_ROW_COUNT_SQL);
|
||||
return getRowCount(params);
|
||||
case JdbcResource.ORACLE:
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return -1;
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
|
||||
protected long getRowCount(Map<String, String> params) {
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(FETCH_ROW_COUNT_TEMPLATE);
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
List<ResultRow> resultRows = stmtExecutor.executeInternalQuery();
|
||||
if (resultRows == null || resultRows.size() != 1) {
|
||||
LOG.info("No status found for table {}.{}.{}", catalog.getName(), dbName, name);
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
StatementBase parsedStmt = stmtExecutor.getParsedStmt();
|
||||
if (parsedStmt == null || parsedStmt.getColLabels() == null) {
|
||||
LOG.info("No column label found for table {}.{}.{}", catalog.getName(), dbName, name);
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
ResultRow resultRow = resultRows.get(0);
|
||||
List<String> colLabels = parsedStmt.getColLabels();
|
||||
int index = colLabels.indexOf("rows");
|
||||
if (index == -1) {
|
||||
LOG.info("No TABLE_ROWS in status for table {}.{}.{}", catalog.getName(), dbName, name);
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
long rows = Long.parseLong(resultRow.get(index));
|
||||
if (rows <= 0) {
|
||||
LOG.info("Table {}.{}.{} row count is {}, discard it and use -1 instead",
|
||||
catalog.getName(), dbName, name, rows);
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
LOG.info("Get table {}.{}.{} row count {}", catalog.getName(), dbName, name, rows);
|
||||
return rows;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to fetch row count for table {}.{}.{}. Reason [{}]",
|
||||
catalog.getName(), dbName, name, e.getMessage());
|
||||
return UNKNOWN_ROW_COUNT;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user