diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index 387ce91e2c..9f5474fa01 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -74,9 +74,11 @@ public final class FeMetaVersion { public static final int VERSION_126 = 126; // For constraints public static final int VERSION_127 = 127; + // For statistics. Update rows, new partition loaded, AnalysisJobInfo and AnalysisTaskInfo + public static final int VERSION_128 = 128; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_127; + public static final int VERSION_CURRENT = VERSION_128; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 5b2ff25f54..8c2424d483 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -81,7 +81,9 @@ import org.apache.doris.policy.StoragePolicy; import org.apache.doris.resource.workloadgroup.WorkloadGroup; import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicy; import org.apache.doris.statistics.AnalysisInfo; +import org.apache.doris.statistics.AnalysisJobInfo; import org.apache.doris.statistics.AnalysisManager; +import org.apache.doris.statistics.AnalysisTaskInfo; import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -1080,6 +1082,10 @@ public class EditLog { break; } case OperationType.OP_CREATE_ANALYSIS_JOB: { + if (journal.getData() instanceof AnalysisJobInfo) { + // For rollback compatible. + break; + } AnalysisInfo info = (AnalysisInfo) journal.getData(); if (AnalysisManager.needAbandon(info)) { break; @@ -1088,6 +1094,10 @@ public class EditLog { break; } case OperationType.OP_CREATE_ANALYSIS_TASK: { + if (journal.getData() instanceof AnalysisTaskInfo) { + // For rollback compatible. + break; + } AnalysisInfo info = (AnalysisInfo) journal.getData(); if (AnalysisManager.needAbandon(info)) { break; @@ -1131,7 +1141,7 @@ public class EditLog { break; } case OperationType.OP_PERSIST_AUTO_JOB: { - env.getAnalysisManager().replayPersistSysJob((AnalysisInfo) journal.getData()); + // Do nothing break; } case OperationType.OP_DELETE_TABLE_STATS: { @@ -1155,6 +1165,12 @@ public class EditLog { env.getBackupHandler().getRepoMgr().alterRepo(repository, true); break; } + case OperationType.OP_LOG_UPDATE_ROWS: + case OperationType.OP_LOG_NEW_PARTITION_LOADED: + case OperationType.OP_LOG_ALTER_COLUMN_STATS: { + // TODO: implement this while statistics finished related work. + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); @@ -1968,10 +1984,6 @@ public class EditLog { logEdit(OperationType.OP_UPDATE_TABLE_STATS, tableStats); } - public void logAutoJob(AnalysisInfo analysisInfo) { - logEdit(OperationType.OP_PERSIST_AUTO_JOB, analysisInfo); - } - public void logDeleteTableStats(TableStatsDeletionLog log) { logEdit(OperationType.OP_DELETE_TABLE_STATS, log); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 42312297b9..0945dc0f15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -345,6 +345,7 @@ public class OperationType { public static final short OP_UPDATE_TABLE_STATS = 455; + @Deprecated public static final short OP_PERSIST_AUTO_JOB = 456; public static final short OP_DELETE_TABLE_STATS = 457; @@ -357,6 +358,11 @@ public class OperationType { public static final short OP_INSERT_OVERWRITE = 461; + public static final short OP_LOG_UPDATE_ROWS = 462; + + public static final short OP_LOG_NEW_PARTITION_LOADED = 463; + + public static final short OP_LOG_ALTER_COLUMN_STATS = 464; /** * Get opcode name by op code. diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobInfo.java new file mode 100644 index 0000000000..57ae461fe1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobInfo.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class AnalysisJobInfo implements Writable { + + private static final Logger LOG = LogManager.getLogger(AnalysisJobInfo.class); + + @SerializedName("jobId") + public final long jobId; + + public AnalysisJobInfo(long jobId) { + this.jobId = jobId; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static AnalysisJobInfo read(DataInput dataInput) throws IOException { + String json = Text.readString(dataInput); + AnalysisJobInfo analysisJobInfo = GsonUtils.GSON.fromJson(json, AnalysisJobInfo.class); + return analysisJobInfo; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 00e858c153..2e12b44339 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -39,6 +39,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.ThreadPoolManager.BlockedPolicy; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -56,7 +57,6 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.DBObjects; -import org.apache.doris.statistics.util.SimpleQueue; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; @@ -122,9 +122,6 @@ public class AnalysisManager implements Writable { private final Map idToAnalysisJob = new ConcurrentHashMap<>(); - // To be deprecated, keep it for meta compatibility now, will remove later. - protected SimpleQueue autoJobs = createSimpleQueue(null, this); - private final String progressDisplayTemplate = "%d Finished | %d Failed | %d In Progress | %d Total"; public AnalysisManager() { @@ -862,13 +859,16 @@ public class AnalysisManager implements Writable { readAnalysisInfo(in, analysisManager.analysisJobInfoMap, true); readAnalysisInfo(in, analysisManager.analysisTaskInfoMap, false); readIdToTblStats(in, analysisManager.idToTblStats); - readAutoJobs(in, analysisManager); + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_128) { + readAutoJobs(in, analysisManager); + } return analysisManager; } private static void readAnalysisInfo(DataInput in, Map map, boolean job) throws IOException { int size = in.readInt(); for (int i = 0; i < size; i++) { + // AnalysisInfo is compatible with AnalysisJobInfo and AnalysisTaskInfo. AnalysisInfo analysisInfo = AnalysisInfo.read(in); // Unfinished manual once job/tasks doesn't need to keep in memory anymore. if (needAbandon(analysisInfo)) { @@ -884,6 +884,9 @@ public class AnalysisManager implements Writable { if (analysisInfo == null) { return true; } + if (analysisInfo.scheduleType == null || analysisInfo.scheduleType == null || analysisInfo.jobType == null) { + return true; + } if ((AnalysisState.PENDING.equals(analysisInfo.state) || AnalysisState.RUNNING.equals(analysisInfo.state)) && ScheduleType.ONCE.equals(analysisInfo.scheduleType) && JobType.MANUAL.equals(analysisInfo.jobType)) { @@ -904,7 +907,6 @@ public class AnalysisManager implements Writable { private static void readAutoJobs(DataInput in, AnalysisManager analysisManager) throws IOException { Type type = new TypeToken>() {}.getType(); GsonUtils.GSON.fromJson(Text.readString(in), type); - analysisManager.autoJobs = analysisManager.createSimpleQueue(null, null); } @Override @@ -912,7 +914,6 @@ public class AnalysisManager implements Writable { writeJobInfo(out, analysisJobInfoMap); writeJobInfo(out, analysisTaskInfoMap); writeTableStats(out); - writeAutoJobsStatus(out); } private void writeJobInfo(DataOutput out, Map infoMap) throws IOException { @@ -929,12 +930,6 @@ public class AnalysisManager implements Writable { } } - private void writeAutoJobsStatus(DataOutput output) throws IOException { - Type type = new TypeToken>() {}.getType(); - String autoJobs = GsonUtils.GSON.toJson(this.autoJobs, type); - Text.writeString(output, autoJobs); - } - // For unit test use only. public void addToJobIdTasksMap(long jobId, Map tasks) { analysisJobIdToTaskMap.put(jobId, tasks); @@ -980,31 +975,6 @@ public class AnalysisManager implements Writable { analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos); } - protected void logAutoJob(AnalysisInfo autoJob) { - Env.getCurrentEnv().getEditLog().logAutoJob(autoJob); - } - - public void replayPersistSysJob(AnalysisInfo analysisInfo) { - autoJobs.offer(analysisInfo); - } - - protected SimpleQueue createSimpleQueue(Collection collection, - AnalysisManager analysisManager) { - return new SimpleQueue<>(Config.analyze_record_limit, - a -> { - // FE is not ready when replaying log and operations triggered by replaying - // shouldn't be logged again. - if (Env.getCurrentEnv().isReady() && Env.getCurrentEnv().isMaster() && !Env.isCheckpointThread()) { - analysisManager.logAutoJob(a); - } - return null; - }, - a -> { - // DO NOTHING - return null; - }, collection); - } - // Remove col stats status from TableStats if failed load some col stats after analyze corresponding column so that // we could make sure it would be analyzed again soon if user or system submit job for that column again. public void removeColStatsStatus(long tblId, String colName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java new file mode 100644 index 0000000000..fb7a5dcbd5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class AnalysisTaskInfo implements Writable { + + private static final Logger LOG = LogManager.getLogger(AnalysisTaskInfo.class); + + @SerializedName("jobId") + public final long jobId; + + @SerializedName("taskId") + public final long taskId; + + public AnalysisTaskInfo(long jobId, long taskId) { + this.jobId = jobId; + this.taskId = taskId; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static AnalysisTaskInfo read(DataInput dataInput) throws IOException { + String json = Text.readString(dataInput); + AnalysisTaskInfo analysisTaskInfo = GsonUtils.GSON.fromJson(json, AnalysisTaskInfo.class); + return analysisTaskInfo; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/NewPartitionLoadedEvent.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/NewPartitionLoadedEvent.java new file mode 100644 index 0000000000..d09cb2df6c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/NewPartitionLoadedEvent.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class NewPartitionLoadedEvent implements Writable { + + @SerializedName("partitionIdToTableId") + public final Map partitionIdToTableId = new HashMap<>(); + + @VisibleForTesting + public NewPartitionLoadedEvent() {} + + // No need to be thread safe, only publish thread will call this. + public void addPartition(long tableId, long partitionId) { + partitionIdToTableId.put(tableId, partitionId); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static NewPartitionLoadedEvent read(DataInput dataInput) throws IOException { + String json = Text.readString(dataInput); + NewPartitionLoadedEvent newPartitionLoadedEvent = GsonUtils.GSON.fromJson(json, NewPartitionLoadedEvent.class); + return newPartitionLoadedEvent; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java new file mode 100644 index 0000000000..04f185c8b7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class UpdateRowsEvent implements Writable { + + @SerializedName("tableIdToUpdateRows") + public final Map tableIdToUpdateRows = new HashMap<>(); + + @VisibleForTesting + public UpdateRowsEvent() {} + + // No need to be thread safe, only publish thread will call this. + public void addUpdateRows(long tableId, long rows) { + if (tableIdToUpdateRows.containsKey(tableId)) { + tableIdToUpdateRows.put(tableId, tableIdToUpdateRows.get(tableId) + rows); + } else { + tableIdToUpdateRows.put(tableId, rows); + } + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static UpdateRowsEvent read(DataInput dataInput) throws IOException { + String json = Text.readString(dataInput); + UpdateRowsEvent updateRowsEvent = GsonUtils.GSON.fromJson(json, UpdateRowsEvent.class); + return updateRowsEvent; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/SimpleQueue.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/SimpleQueue.java deleted file mode 100644 index 5740c4e308..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/SimpleQueue.java +++ /dev/null @@ -1,65 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics.util; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.function.Function; - -// Any operation on this structure should be thread-safe -public class SimpleQueue extends LinkedList { - - private final long limit; - - private final Function offerFunc; - - private final Function evictFunc; - - - public SimpleQueue(long limit, Function offerFunc, Function evictFunc) { - this.limit = limit; - this.offerFunc = offerFunc; - this.evictFunc = evictFunc; - } - - @Override - public synchronized boolean offer(T analysisInfo) { - while (size() >= limit) { - remove(); - } - super.offer(analysisInfo); - offerFunc.apply(analysisInfo); - return true; - } - - @Override - public synchronized T remove() { - T analysisInfo = super.remove(); - evictFunc.apply(analysisInfo); - return analysisInfo; - } - - public SimpleQueue(long limit, Function offerFunc, Function evictFunc, Collection collection) { - this(limit, offerFunc, evictFunc); - if (collection != null) { - for (T e : collection) { - offer(e); - } - } - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index 69f6dc0dd5..2fc6d24e30 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -30,7 +30,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; -import org.apache.doris.statistics.util.SimpleQueue; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; @@ -331,28 +330,6 @@ public class AnalysisManagerTest { Assertions.assertTrue(analysisManager.analysisTaskInfoMap.containsKey(3L)); } - @Test - public void testRecordLimit3() { - Config.analyze_record_limit = 2; - AnalysisManager analysisManager = new AnalysisManager(); - analysisManager.autoJobs.offer(new AnalysisInfoBuilder().setJobId(1).build()); - analysisManager.autoJobs.offer(new AnalysisInfoBuilder().setJobId(2).build()); - analysisManager.autoJobs.offer(new AnalysisInfoBuilder().setJobId(3).build()); - Assertions.assertEquals(2, analysisManager.autoJobs.size()); - } - - @Test - public void testCreateSimpleQueue() { - AnalysisManager analysisManager = new AnalysisManager(); - ArrayList jobs = Lists.newArrayList(); - jobs.add(new AnalysisInfoBuilder().setJobId(1).build()); - jobs.add(new AnalysisInfoBuilder().setJobId(2).build()); - SimpleQueue simpleQueue = analysisManager.createSimpleQueue(jobs, analysisManager); - Assertions.assertEquals(2, simpleQueue.size()); - simpleQueue = analysisManager.createSimpleQueue(null, analysisManager); - Assertions.assertEquals(0, simpleQueue.size()); - } - @Test public void testShowAutoJobs(@Injectable ShowAnalyzeStmt stmt) { new MockUp() { @@ -404,5 +381,4 @@ public class AnalysisManagerTest { Assertions.assertEquals(AnalysisState.FINISHED, analysisInfos.get(1).getState()); Assertions.assertEquals(AnalysisState.FAILED, analysisInfos.get(2).getState()); } - }