[fix](stats) Analysis info lost after checkpoint (#20412)
1. Implement write/read for AnalysisManager 2. If database or table has any column with complex type, the analyze stmt would fail directly. Enable to ignore complex type columns and analyze rest of them in this PR
This commit is contained in:
@ -144,10 +144,10 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME,
|
||||
columnName, FeNameFormat.getColumnNameRegex());
|
||||
}
|
||||
checkColumn();
|
||||
} finally {
|
||||
table.readUnlock();
|
||||
}
|
||||
checkColumn();
|
||||
analyzeProperties.check();
|
||||
|
||||
// TODO support external table
|
||||
@ -160,23 +160,31 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
}
|
||||
|
||||
private void checkColumn() throws AnalysisException {
|
||||
table.readLock();
|
||||
try {
|
||||
for (String colName : columnNames) {
|
||||
Column column = table.getColumn(colName);
|
||||
if (column == null) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME,
|
||||
colName, FeNameFormat.getColumnNameRegex());
|
||||
}
|
||||
if (ColumnStatistic.UNSUPPORTED_TYPE.contains(column.getType())) {
|
||||
throw new AnalysisException(String.format("Column[%s] with type[%s] is not supported to analyze",
|
||||
colName, column.getType().toString()));
|
||||
}
|
||||
boolean containsUnsupportedTytpe = false;
|
||||
for (String colName : columnNames) {
|
||||
Column column = table.getColumn(colName);
|
||||
if (column == null) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME,
|
||||
colName, FeNameFormat.getColumnNameRegex());
|
||||
}
|
||||
if (ColumnStatistic.UNSUPPORTED_TYPE.contains(column.getType())) {
|
||||
containsUnsupportedTytpe = true;
|
||||
}
|
||||
}
|
||||
if (containsUnsupportedTytpe) {
|
||||
if (ConnectContext.get().getSessionVariable().ignoreColumnWithComplexType) {
|
||||
columnNames = columnNames.stream()
|
||||
.filter(c -> !ColumnStatistic.UNSUPPORTED_TYPE.contains(
|
||||
table.getColumn(c).getType()))
|
||||
.collect(Collectors.toList());
|
||||
} else {
|
||||
throw new AnalysisException(
|
||||
"Contains unsupported column type"
|
||||
+ "if you want to ignore them and analyze rest"
|
||||
+ "columns, please set session variable "
|
||||
+ "`ignore_column_with_complex_type` to true");
|
||||
}
|
||||
} finally {
|
||||
table.readUnlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public String getCatalogName() {
|
||||
|
||||
@ -2006,6 +2006,12 @@ public class Env {
|
||||
return checksum;
|
||||
}
|
||||
|
||||
public long loadAnalysisManager(DataInputStream in, long checksum) throws IOException {
|
||||
this.analysisManager = AnalysisManager.readFields(in);
|
||||
LOG.info("finished replay AnalysisMgr from image");
|
||||
return checksum;
|
||||
}
|
||||
|
||||
// Only called by checkpoint thread
|
||||
// return the latest image file's absolute path
|
||||
public String saveImage() throws IOException {
|
||||
@ -2264,6 +2270,11 @@ public class Env {
|
||||
return checksum;
|
||||
}
|
||||
|
||||
public long saveAnalysisMgr(CountingDataOutputStream dos, long checksum) throws IOException {
|
||||
analysisManager.write(dos);
|
||||
return checksum;
|
||||
}
|
||||
|
||||
public void createLabelCleaner() {
|
||||
labelCleaner = new MasterDaemon("LoadLabelCleaner", Config.label_clean_interval_second * 1000L) {
|
||||
@Override
|
||||
|
||||
@ -224,6 +224,12 @@ public class MetaPersistMethod {
|
||||
metaPersistMethod.writeMethod =
|
||||
Env.class.getDeclaredMethod("saveBinlogs", CountingDataOutputStream.class, long.class);
|
||||
break;
|
||||
case "AnalysisMgr":
|
||||
metaPersistMethod.readMethod =
|
||||
Env.class.getDeclaredMethod("loadAnalysisManager", DataInputStream.class, long.class);
|
||||
metaPersistMethod.writeMethod =
|
||||
Env.class.getDeclaredMethod("saveAnalysisMgr", CountingDataOutputStream.class, long.class);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
@ -39,7 +39,7 @@ public class PersistMetaModules {
|
||||
"globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler",
|
||||
"paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles",
|
||||
"plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager", "globalFunction", "workloadGroups",
|
||||
"binlogs", "resourceGroups");
|
||||
"binlogs", "resourceGroups", "AnalysisMgr");
|
||||
|
||||
// Modules in this list is deprecated and will not be saved in meta file. (also should not be in MODULE_NAMES)
|
||||
public static final ImmutableList<String> DEPRECATED_MODULE_NAMES = ImmutableList.of(
|
||||
|
||||
@ -343,6 +343,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String ENABLE_SCAN_RUN_SERIAL = "enable_scan_node_run_serial";
|
||||
|
||||
public static final String IGNORE_COMPLEX_TYPE_COLUMN = "ignore_column_with_complex_type";
|
||||
|
||||
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
|
||||
SKIP_DELETE_PREDICATE,
|
||||
SKIP_DELETE_BITMAP,
|
||||
@ -952,6 +954,11 @@ public class SessionVariable implements Serializable, Writable {
|
||||
)
|
||||
public boolean enableCTEMaterialize = true;
|
||||
|
||||
@VariableMgr.VarAttr(
|
||||
name = IGNORE_COMPLEX_TYPE_COLUMN
|
||||
)
|
||||
public boolean ignoreColumnWithComplexType = false;
|
||||
|
||||
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables,
|
||||
// not the default value set in the code.
|
||||
public void initFuzzyModeVariables() {
|
||||
|
||||
@ -37,6 +37,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.Daemon;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
@ -57,6 +58,9 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
@ -76,7 +80,7 @@ import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AnalysisManager extends Daemon {
|
||||
public class AnalysisManager extends Daemon implements Writable {
|
||||
|
||||
public AnalysisTaskScheduler taskScheduler;
|
||||
|
||||
@ -722,12 +726,12 @@ public class AnalysisManager extends Daemon {
|
||||
}
|
||||
}
|
||||
|
||||
public void replayCreateAnalysisJob(AnalysisInfo taskInfo) {
|
||||
this.analysisJobInfoMap.put(taskInfo.jobId, taskInfo);
|
||||
public void replayCreateAnalysisJob(AnalysisInfo jobInfo) {
|
||||
this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo);
|
||||
}
|
||||
|
||||
public void replayCreateAnalysisTask(AnalysisInfo jobInfo) {
|
||||
this.analysisTaskInfoMap.put(jobInfo.taskId, jobInfo);
|
||||
public void replayCreateAnalysisTask(AnalysisInfo taskInfo) {
|
||||
this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo);
|
||||
}
|
||||
|
||||
public void replayDeleteAnalysisJob(AnalyzeDeletionLog log) {
|
||||
@ -826,4 +830,31 @@ public class AnalysisManager extends Daemon {
|
||||
removeAll(findTasks(jobId));
|
||||
}
|
||||
|
||||
public static AnalysisManager readFields(DataInput in) throws IOException {
|
||||
AnalysisManager analysisManager = new AnalysisManager();
|
||||
doRead(in, analysisManager.analysisJobInfoMap, true);
|
||||
doRead(in, analysisManager.analysisTaskInfoMap, false);
|
||||
return analysisManager;
|
||||
}
|
||||
|
||||
private static void doRead(DataInput in, Map<Long, AnalysisInfo> map, boolean job) throws IOException {
|
||||
int size = in.readInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
AnalysisInfo analysisInfo = AnalysisInfo.read(in);
|
||||
map.put(job ? analysisInfo.jobId : analysisInfo.taskId, analysisInfo);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
doWrite(out, analysisJobInfoMap);
|
||||
doWrite(out, analysisTaskInfoMap);
|
||||
}
|
||||
|
||||
private void doWrite(DataOutput out, Map<Long, AnalysisInfo> infoMap) throws IOException {
|
||||
out.writeInt(infoMap.size());
|
||||
for (Entry<Long, AnalysisInfo> entry : infoMap.entrySet()) {
|
||||
entry.getValue().write(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user