Sync stats cache while task finished, doesn't need to query column_statistics table. (#30609)
This commit is contained in:
@ -96,9 +96,9 @@ import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.qe.VariableMgr;
|
||||
import org.apache.doris.service.arrowflight.FlightSqlConnectProcessor;
|
||||
import org.apache.doris.statistics.AnalysisManager;
|
||||
import org.apache.doris.statistics.ColStatsData;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
import org.apache.doris.statistics.InvalidateStatsTarget;
|
||||
import org.apache.doris.statistics.ResultRow;
|
||||
import org.apache.doris.statistics.StatisticsCacheKey;
|
||||
import org.apache.doris.statistics.TableStatsMeta;
|
||||
import org.apache.doris.statistics.query.QueryStats;
|
||||
@ -3045,11 +3045,11 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
@Override
|
||||
public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws TException {
|
||||
StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class);
|
||||
List<ResultRow> rows = request.statsRows.stream()
|
||||
.map(s -> GsonUtils.GSON.fromJson(s, ResultRow.class))
|
||||
.collect(Collectors.toList());
|
||||
ColumnStatistic c = ColumnStatistic.fromResultRow(rows);
|
||||
if (c != ColumnStatistic.UNKNOWN) {
|
||||
ColStatsData data = GsonUtils.GSON.fromJson(request.colStatsData, ColStatsData.class);
|
||||
ColumnStatistic c = data.toColumnStatistic();
|
||||
if (c == ColumnStatistic.UNKNOWN) {
|
||||
Env.getCurrentEnv().getStatisticsCache().invalidate(k.tableId, k.idxId, k.colName);
|
||||
} else {
|
||||
Env.getCurrentEnv().getStatisticsCache().updateColStatsCache(k.tableId, k.idxId, k.colName, c);
|
||||
}
|
||||
// Return Ok anyway
|
||||
|
||||
@ -136,7 +136,6 @@ public class AnalysisJob {
|
||||
}
|
||||
}
|
||||
updateTaskState(AnalysisState.FINISHED, "");
|
||||
syncLoadStats();
|
||||
queryFinished.clear();
|
||||
buf.clear();
|
||||
}
|
||||
@ -192,17 +191,4 @@ public class AnalysisJob {
|
||||
}
|
||||
}
|
||||
|
||||
protected void syncLoadStats() {
|
||||
long tblId = jobInfo.tblId;
|
||||
for (BaseAnalysisTask task : queryFinished) {
|
||||
if (task.info.externalTableLevelTask) {
|
||||
continue;
|
||||
}
|
||||
String colName = task.col.getName();
|
||||
if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) {
|
||||
analysisManager.removeColStatsStatus(tblId, colName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -705,7 +705,7 @@ public class AnalysisManager implements Writable {
|
||||
boolean success = true;
|
||||
for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
|
||||
// Skip master
|
||||
if (selfNode.equals(frontend.getHost())) {
|
||||
if (selfNode.getHost().equals(frontend.getHost())) {
|
||||
continue;
|
||||
}
|
||||
success = success && statisticsCache.invalidateStats(frontend, request);
|
||||
|
||||
@ -205,16 +205,7 @@ public abstract class BaseAnalysisTask {
|
||||
|
||||
public abstract void doExecute() throws Exception;
|
||||
|
||||
protected void afterExecution() {
|
||||
if (killed) {
|
||||
return;
|
||||
}
|
||||
long tblId = tbl.getId();
|
||||
String colName = col.getName();
|
||||
if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) {
|
||||
Env.getCurrentEnv().getAnalysisManager().removeColStatsStatus(tblId, colName);
|
||||
}
|
||||
}
|
||||
protected void afterExecution() {}
|
||||
|
||||
protected void setTaskStateToRunning() {
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
@ -318,6 +309,7 @@ public abstract class BaseAnalysisTask {
|
||||
try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext()) {
|
||||
stmtExecutor = new StmtExecutor(a.connectContext, sql);
|
||||
ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0));
|
||||
Env.getCurrentEnv().getStatisticsCache().syncColStats(colStatsData);
|
||||
queryId = DebugUtil.printId(stmtExecutor.getContext().queryId());
|
||||
job.appendBuf(this, Collections.singletonList(colStatsData));
|
||||
} finally {
|
||||
|
||||
@ -17,9 +17,14 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
@ -43,17 +48,23 @@ import java.util.StringJoiner;
|
||||
* 13: update_time
|
||||
*/
|
||||
public class ColStatsData {
|
||||
private static final Logger LOG = LogManager.getLogger(ColStatsData.class);
|
||||
|
||||
@SerializedName("statsId")
|
||||
public final StatsId statsId;
|
||||
@SerializedName("count")
|
||||
public final long count;
|
||||
@SerializedName("ndv")
|
||||
public final long ndv;
|
||||
|
||||
@SerializedName("nullCount")
|
||||
public final long nullCount;
|
||||
|
||||
@SerializedName("minLit")
|
||||
public final String minLit;
|
||||
@SerializedName("maxLit")
|
||||
public final String maxLit;
|
||||
|
||||
@SerializedName("dataSizeInBytes")
|
||||
public final long dataSizeInBytes;
|
||||
|
||||
@SerializedName("updateTime")
|
||||
public final String updateTime;
|
||||
|
||||
@VisibleForTesting
|
||||
@ -106,4 +117,56 @@ public class ColStatsData {
|
||||
sj.add(StatisticsUtil.quote(updateTime));
|
||||
return sj.toString();
|
||||
}
|
||||
|
||||
public ColumnStatistic toColumnStatistic() {
|
||||
try {
|
||||
ColumnStatisticBuilder columnStatisticBuilder = new ColumnStatisticBuilder();
|
||||
columnStatisticBuilder.setCount(count);
|
||||
columnStatisticBuilder.setNdv(ndv);
|
||||
columnStatisticBuilder.setNumNulls(nullCount);
|
||||
columnStatisticBuilder.setDataSize(dataSizeInBytes);
|
||||
columnStatisticBuilder.setAvgSizeByte(count == 0 ? 0 : dataSizeInBytes / count);
|
||||
if (statsId == null) {
|
||||
return ColumnStatistic.UNKNOWN;
|
||||
}
|
||||
long catalogId = statsId.catalogId;
|
||||
long idxId = statsId.idxId;
|
||||
long dbID = statsId.dbId;
|
||||
long tblId = statsId.tblId;
|
||||
String colName = statsId.colId;
|
||||
Column col = StatisticsUtil.findColumn(catalogId, dbID, tblId, idxId, colName);
|
||||
if (col == null) {
|
||||
return ColumnStatistic.UNKNOWN;
|
||||
}
|
||||
String min = minLit;
|
||||
String max = maxLit;
|
||||
if (min != null && !min.equalsIgnoreCase("NULL")) {
|
||||
try {
|
||||
columnStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min));
|
||||
columnStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min));
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("Failed to process column {} min value {}.", col, min, e);
|
||||
columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY);
|
||||
}
|
||||
} else {
|
||||
columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY);
|
||||
}
|
||||
if (max != null && !max.equalsIgnoreCase("NULL")) {
|
||||
try {
|
||||
columnStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(), max));
|
||||
columnStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(), max));
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("Failed to process column {} max value {}.", col, max, e);
|
||||
columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY);
|
||||
}
|
||||
} else {
|
||||
columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY);
|
||||
}
|
||||
columnStatisticBuilder.setUpdatedTime(updateTime);
|
||||
return columnStatisticBuilder.build();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to convert column statistics.", e);
|
||||
return ColumnStatistic.UNKNOWN;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,6 +88,7 @@ public class ColumnStatistic {
|
||||
public final LiteralExpr minExpr;
|
||||
public final LiteralExpr maxExpr;
|
||||
|
||||
@SerializedName("updatedTime")
|
||||
public final String updatedTime;
|
||||
|
||||
public ColumnStatistic(double count, double ndv, ColumnStatistic original, double avgSizeByte,
|
||||
|
||||
@ -225,16 +225,6 @@ public class ExternalAnalysisTask extends BaseAnalysisTask {
|
||||
return Pair.of(Math.max(((double) total) / cumulate, 1), cumulate);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterExecution() {
|
||||
// Table level task doesn't need to sync any value to sync stats, it stores the value in metadata.
|
||||
// Partition only task doesn't need to refresh cached.
|
||||
if (isTableLevelTask || isPartitionOnly) {
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* If the size to sample is larger than LIMIT_SIZE (1GB)
|
||||
* and is much larger (1.2*) than the size user want to sample,
|
||||
|
||||
@ -137,13 +137,4 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
|
||||
commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis()));
|
||||
return commonParams;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterExecution() {
|
||||
// Table level task doesn't need to sync any value to sync stats, it stores the value in metadata.
|
||||
if (isTableLevelTask) {
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,11 +21,11 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.ClientPool;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.ha.FrontendNodeType;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.system.Frontend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.FrontendService;
|
||||
import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
@ -39,12 +39,12 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class StatisticsCache {
|
||||
|
||||
@ -203,37 +203,34 @@ public class StatisticsCache {
|
||||
}
|
||||
|
||||
/**
|
||||
* Return false if the log of corresponding stats load is failed.
|
||||
* Refresh stats cache, invalidate cache if the new data is unknown.
|
||||
*/
|
||||
public boolean syncLoadColStats(long tableId, long idxId, String colName) {
|
||||
List<ResultRow> columnResults = StatisticsRepository.loadColStats(tableId, idxId, colName);
|
||||
final StatisticsCacheKey k =
|
||||
new StatisticsCacheKey(tableId, idxId, colName);
|
||||
final ColumnStatistic c = ColumnStatistic.fromResultRow(columnResults);
|
||||
if (c == ColumnStatistic.UNKNOWN) {
|
||||
return false;
|
||||
}
|
||||
putCache(k, c);
|
||||
if (ColumnStatistic.UNKNOWN == c) {
|
||||
return false;
|
||||
public void syncColStats(ColStatsData data) {
|
||||
StatsId statsId = data.statsId;
|
||||
final StatisticsCacheKey k = new StatisticsCacheKey(statsId.tblId, statsId.idxId, statsId.colId);
|
||||
ColumnStatistic columnStatistic = data.toColumnStatistic();
|
||||
if (columnStatistic == ColumnStatistic.UNKNOWN) {
|
||||
invalidate(k.tableId, k.idxId, k.colName);
|
||||
} else {
|
||||
putCache(k, columnStatistic);
|
||||
}
|
||||
TUpdateFollowerStatsCacheRequest updateFollowerStatsCacheRequest = new TUpdateFollowerStatsCacheRequest();
|
||||
updateFollowerStatsCacheRequest.key = GsonUtils.GSON.toJson(k);
|
||||
updateFollowerStatsCacheRequest.statsRows = columnResults.stream().map(GsonUtils.GSON::toJson).collect(
|
||||
Collectors.toList());
|
||||
for (Frontend frontend : Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) {
|
||||
if (StatisticsUtil.isMaster(frontend)) {
|
||||
updateFollowerStatsCacheRequest.colStatsData = GsonUtils.GSON.toJson(data);
|
||||
// For compatible only, to be deprecated.
|
||||
updateFollowerStatsCacheRequest.statsRows = new ArrayList<>();
|
||||
SystemInfoService.HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
|
||||
for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
|
||||
if (selfNode.getHost().equals(frontend.getHost())) {
|
||||
continue;
|
||||
}
|
||||
sendStats(frontend, updateFollowerStatsCacheRequest);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void sendStats(Frontend frontend, TUpdateFollowerStatsCacheRequest updateFollowerStatsCacheRequest) {
|
||||
TNetworkAddress address = new TNetworkAddress(frontend.getHost(),
|
||||
frontend.getRpcPort());
|
||||
TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort());
|
||||
FrontendService.Client client = null;
|
||||
try {
|
||||
client = ClientPool.frontendPool.borrowObject(address);
|
||||
|
||||
@ -20,20 +20,25 @@ package org.apache.doris.statistics;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.util.StringJoiner;
|
||||
|
||||
public class StatsId {
|
||||
|
||||
@SerializedName("id")
|
||||
public final String id;
|
||||
@SerializedName("catalogId")
|
||||
public final long catalogId;
|
||||
@SerializedName("dbId")
|
||||
public final long dbId;
|
||||
@SerializedName("tblId")
|
||||
public final long tblId;
|
||||
@SerializedName("idxId")
|
||||
public final long idxId;
|
||||
|
||||
@SerializedName("colId")
|
||||
public final String colId;
|
||||
|
||||
// nullable
|
||||
@SerializedName("partId")
|
||||
public final String partId;
|
||||
|
||||
@VisibleForTesting
|
||||
|
||||
Reference in New Issue
Block a user