From 12f89b879fbc0a800d50d1ea7e028417e576d09a Mon Sep 17 00:00:00 2001 From: AKIRA <33112463+Kikyou1997@users.noreply.github.com> Date: Mon, 5 Jun 2023 11:51:02 +0900 Subject: [PATCH] [fix](stats) Analysis info lost after checkpoint (#20412) 1. Implement write/read for AnalysisManager 2. If database or table has any column with complex type, the analyze stmt would fail directly. Enable to ignore complex type columns and analyze rest of them in this PR --- .../apache/doris/analysis/AnalyzeTblStmt.java | 40 ++++++++++-------- .../java/org/apache/doris/catalog/Env.java | 11 +++++ .../doris/persist/meta/MetaPersistMethod.java | 6 +++ .../persist/meta/PersistMetaModules.java | 2 +- .../org/apache/doris/qe/SessionVariable.java | 7 ++++ .../doris/statistics/AnalysisManager.java | 41 ++++++++++++++++--- 6 files changed, 85 insertions(+), 22 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java index c200d1974e..793902b327 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java @@ -144,10 +144,10 @@ public class AnalyzeTblStmt extends AnalyzeStmt { ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME, columnName, FeNameFormat.getColumnNameRegex()); } + checkColumn(); } finally { table.readUnlock(); } - checkColumn(); analyzeProperties.check(); // TODO support external table @@ -160,23 +160,31 @@ public class AnalyzeTblStmt extends AnalyzeStmt { } private void checkColumn() throws AnalysisException { - table.readLock(); - try { - for (String colName : columnNames) { - Column column = table.getColumn(colName); - if (column == null) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME, - colName, FeNameFormat.getColumnNameRegex()); - } - if (ColumnStatistic.UNSUPPORTED_TYPE.contains(column.getType())) { - throw new AnalysisException(String.format("Column[%s] with type[%s] is not supported to analyze", - colName, column.getType().toString())); - } + boolean containsUnsupportedTytpe = false; + for (String colName : columnNames) { + Column column = table.getColumn(colName); + if (column == null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME, + colName, FeNameFormat.getColumnNameRegex()); + } + if (ColumnStatistic.UNSUPPORTED_TYPE.contains(column.getType())) { + containsUnsupportedTytpe = true; + } + } + if (containsUnsupportedTytpe) { + if (ConnectContext.get().getSessionVariable().ignoreColumnWithComplexType) { + columnNames = columnNames.stream() + .filter(c -> !ColumnStatistic.UNSUPPORTED_TYPE.contains( + table.getColumn(c).getType())) + .collect(Collectors.toList()); + } else { + throw new AnalysisException( + "Contains unsupported column type" + + "if you want to ignore them and analyze rest" + + "columns, please set session variable " + + "`ignore_column_with_complex_type` to true"); } - } finally { - table.readUnlock(); } - } public String getCatalogName() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 4565a6bc48..bc3a3b4f76 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -2006,6 +2006,12 @@ public class Env { return checksum; } + public long loadAnalysisManager(DataInputStream in, long checksum) throws IOException { + this.analysisManager = AnalysisManager.readFields(in); + LOG.info("finished replay AnalysisMgr from image"); + return checksum; + } + // Only called by checkpoint thread // return the latest image file's absolute path public String saveImage() throws IOException { @@ -2264,6 +2270,11 @@ public class Env { return checksum; } + public long saveAnalysisMgr(CountingDataOutputStream dos, long checksum) throws IOException { + analysisManager.write(dos); + return checksum; + } + public void createLabelCleaner() { labelCleaner = new MasterDaemon("LoadLabelCleaner", Config.label_clean_interval_second * 1000L) { @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java index ab1b473f04..bdc3a5a224 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java @@ -224,6 +224,12 @@ public class MetaPersistMethod { metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveBinlogs", CountingDataOutputStream.class, long.class); break; + case "AnalysisMgr": + metaPersistMethod.readMethod = + Env.class.getDeclaredMethod("loadAnalysisManager", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Env.class.getDeclaredMethod("saveAnalysisMgr", CountingDataOutputStream.class, long.class); + break; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java index 3331fd04d2..6e99a6757f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java @@ -39,7 +39,7 @@ public class PersistMetaModules { "globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler", "paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles", "plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager", "globalFunction", "workloadGroups", - "binlogs", "resourceGroups"); + "binlogs", "resourceGroups", "AnalysisMgr"); // Modules in this list is deprecated and will not be saved in meta file. (also should not be in MODULE_NAMES) public static final ImmutableList DEPRECATED_MODULE_NAMES = ImmutableList.of( diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index fe76449a1e..861ee1d557 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -343,6 +343,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_SCAN_RUN_SERIAL = "enable_scan_node_run_serial"; + public static final String IGNORE_COMPLEX_TYPE_COLUMN = "ignore_column_with_complex_type"; + public static final List DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -952,6 +954,11 @@ public class SessionVariable implements Serializable, Writable { ) public boolean enableCTEMaterialize = true; + @VariableMgr.VarAttr( + name = IGNORE_COMPLEX_TYPE_COLUMN + ) + public boolean ignoreColumnWithComplexType = false; + // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables, // not the default value set in the code. public void initFuzzyModeVariables() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index d48eed9618..00eb7f98e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -37,6 +37,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Daemon; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -57,6 +58,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -76,7 +80,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; -public class AnalysisManager extends Daemon { +public class AnalysisManager extends Daemon implements Writable { public AnalysisTaskScheduler taskScheduler; @@ -722,12 +726,12 @@ public class AnalysisManager extends Daemon { } } - public void replayCreateAnalysisJob(AnalysisInfo taskInfo) { - this.analysisJobInfoMap.put(taskInfo.jobId, taskInfo); + public void replayCreateAnalysisJob(AnalysisInfo jobInfo) { + this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo); } - public void replayCreateAnalysisTask(AnalysisInfo jobInfo) { - this.analysisTaskInfoMap.put(jobInfo.taskId, jobInfo); + public void replayCreateAnalysisTask(AnalysisInfo taskInfo) { + this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo); } public void replayDeleteAnalysisJob(AnalyzeDeletionLog log) { @@ -826,4 +830,31 @@ public class AnalysisManager extends Daemon { removeAll(findTasks(jobId)); } + public static AnalysisManager readFields(DataInput in) throws IOException { + AnalysisManager analysisManager = new AnalysisManager(); + doRead(in, analysisManager.analysisJobInfoMap, true); + doRead(in, analysisManager.analysisTaskInfoMap, false); + return analysisManager; + } + + private static void doRead(DataInput in, Map map, boolean job) throws IOException { + int size = in.readInt(); + for (int i = 0; i < size; i++) { + AnalysisInfo analysisInfo = AnalysisInfo.read(in); + map.put(job ? analysisInfo.jobId : analysisInfo.taskId, analysisInfo); + } + } + + @Override + public void write(DataOutput out) throws IOException { + doWrite(out, analysisJobInfoMap); + doWrite(out, analysisTaskInfoMap); + } + + private void doWrite(DataOutput out, Map infoMap) throws IOException { + out.writeInt(infoMap.size()); + for (Entry entry : infoMap.entrySet()) { + entry.getValue().write(out); + } + } }