backport: https://github.com/apache/doris/pull/38889
This commit is contained in:
@ -17,19 +17,28 @@
|
||||
|
||||
package org.apache.doris.datasource.jdbc;
|
||||
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.JdbcResource;
|
||||
import org.apache.doris.catalog.JdbcTable;
|
||||
import org.apache.doris.datasource.ExternalTable;
|
||||
import org.apache.doris.datasource.SchemaCacheValue;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.JdbcAnalysisTask;
|
||||
import org.apache.doris.statistics.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
@ -38,6 +47,9 @@ import java.util.Optional;
|
||||
public class JdbcExternalTable extends ExternalTable {
|
||||
private static final Logger LOG = LogManager.getLogger(JdbcExternalTable.class);
|
||||
|
||||
public static final String MYSQL_ROW_COUNT_SQL = "SELECT * FROM QUERY"
|
||||
+ "(\"catalog\"=\"${ctlName}\", \"query\"=\"show table status from `${dbName}` like '${tblName}'\");";
|
||||
|
||||
private JdbcTable jdbcTable;
|
||||
|
||||
/**
|
||||
@ -98,4 +110,50 @@ public class JdbcExternalTable extends ExternalTable {
|
||||
makeSureInitialized();
|
||||
return new JdbcAnalysisTask(info);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long fetchRowCount() {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("ctlName", catalog.getName());
|
||||
params.put("dbName", dbName);
|
||||
params.put("tblName", name);
|
||||
switch (((JdbcExternalCatalog) catalog).getDatabaseTypeName()) {
|
||||
case JdbcResource.MYSQL:
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(MYSQL_ROW_COUNT_SQL);
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
List<ResultRow> resultRows = stmtExecutor.executeInternalQuery();
|
||||
if (resultRows == null || resultRows.size() != 1) {
|
||||
LOG.info("No mysql status found for table {}.{}.{}", catalog.getName(), dbName, name);
|
||||
return -1;
|
||||
}
|
||||
StatementBase parsedStmt = stmtExecutor.getParsedStmt();
|
||||
if (parsedStmt == null || parsedStmt.getColLabels() == null) {
|
||||
LOG.info("No column label found for table {}.{}.{}", catalog.getName(), dbName, name);
|
||||
return -1;
|
||||
}
|
||||
ResultRow resultRow = resultRows.get(0);
|
||||
List<String> colLabels = parsedStmt.getColLabels();
|
||||
int index = colLabels.indexOf("TABLE_ROWS");
|
||||
if (index == -1) {
|
||||
LOG.info("No TABLE_ROWS in status for table {}.{}.{}", catalog.getName(), dbName, name);
|
||||
return -1;
|
||||
}
|
||||
long rows = Long.parseLong(resultRow.get(index));
|
||||
LOG.info("Get mysql table {}.{}.{} row count {}", catalog.getName(), dbName, name, rows);
|
||||
return rows;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to fetch mysql row count for table {}.{}.{}. Reason [{}]",
|
||||
catalog.getName(), dbName, name, e.getMessage());
|
||||
return -1;
|
||||
}
|
||||
case JdbcResource.ORACLE:
|
||||
case JdbcResource.POSTGRESQL:
|
||||
case JdbcResource.SQLSERVER:
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user