[feature](statistics, metadata)Meta data place holder for statistics (#29867)
Meta data place holder for statistics in version 2.1.x. Users could upgrade to this version, but doesn't support rollback. After this change, statistics related functions doesn't need to change meta data any more in the 2.1 series.
This commit is contained in:
@ -81,7 +81,9 @@ import org.apache.doris.policy.StoragePolicy;
|
||||
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
|
||||
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicy;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.AnalysisJobInfo;
|
||||
import org.apache.doris.statistics.AnalysisManager;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo;
|
||||
import org.apache.doris.statistics.TableStatsMeta;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.Frontend;
|
||||
@ -1080,6 +1082,10 @@ public class EditLog {
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_CREATE_ANALYSIS_JOB: {
|
||||
if (journal.getData() instanceof AnalysisJobInfo) {
|
||||
// For rollback compatible.
|
||||
break;
|
||||
}
|
||||
AnalysisInfo info = (AnalysisInfo) journal.getData();
|
||||
if (AnalysisManager.needAbandon(info)) {
|
||||
break;
|
||||
@ -1088,6 +1094,10 @@ public class EditLog {
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_CREATE_ANALYSIS_TASK: {
|
||||
if (journal.getData() instanceof AnalysisTaskInfo) {
|
||||
// For rollback compatible.
|
||||
break;
|
||||
}
|
||||
AnalysisInfo info = (AnalysisInfo) journal.getData();
|
||||
if (AnalysisManager.needAbandon(info)) {
|
||||
break;
|
||||
@ -1131,7 +1141,7 @@ public class EditLog {
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_PERSIST_AUTO_JOB: {
|
||||
env.getAnalysisManager().replayPersistSysJob((AnalysisInfo) journal.getData());
|
||||
// Do nothing
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_DELETE_TABLE_STATS: {
|
||||
@ -1155,6 +1165,12 @@ public class EditLog {
|
||||
env.getBackupHandler().getRepoMgr().alterRepo(repository, true);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_LOG_UPDATE_ROWS:
|
||||
case OperationType.OP_LOG_NEW_PARTITION_LOADED:
|
||||
case OperationType.OP_LOG_ALTER_COLUMN_STATS: {
|
||||
// TODO: implement this while statistics finished related work.
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
IOException e = new IOException();
|
||||
LOG.error("UNKNOWN Operation Type {}", opCode, e);
|
||||
@ -1968,10 +1984,6 @@ public class EditLog {
|
||||
logEdit(OperationType.OP_UPDATE_TABLE_STATS, tableStats);
|
||||
}
|
||||
|
||||
public void logAutoJob(AnalysisInfo analysisInfo) {
|
||||
logEdit(OperationType.OP_PERSIST_AUTO_JOB, analysisInfo);
|
||||
}
|
||||
|
||||
public void logDeleteTableStats(TableStatsDeletionLog log) {
|
||||
logEdit(OperationType.OP_DELETE_TABLE_STATS, log);
|
||||
}
|
||||
|
||||
@ -345,6 +345,7 @@ public class OperationType {
|
||||
|
||||
public static final short OP_UPDATE_TABLE_STATS = 455;
|
||||
|
||||
@Deprecated
|
||||
public static final short OP_PERSIST_AUTO_JOB = 456;
|
||||
|
||||
public static final short OP_DELETE_TABLE_STATS = 457;
|
||||
@ -357,6 +358,11 @@ public class OperationType {
|
||||
|
||||
public static final short OP_INSERT_OVERWRITE = 461;
|
||||
|
||||
public static final short OP_LOG_UPDATE_ROWS = 462;
|
||||
|
||||
public static final short OP_LOG_NEW_PARTITION_LOADED = 463;
|
||||
|
||||
public static final short OP_LOG_ALTER_COLUMN_STATS = 464;
|
||||
|
||||
/**
|
||||
* Get opcode name by op code.
|
||||
|
||||
@ -0,0 +1,54 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
public class AnalysisJobInfo implements Writable {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(AnalysisJobInfo.class);
|
||||
|
||||
@SerializedName("jobId")
|
||||
public final long jobId;
|
||||
|
||||
public AnalysisJobInfo(long jobId) {
|
||||
this.jobId = jobId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
String json = GsonUtils.GSON.toJson(this);
|
||||
Text.writeString(out, json);
|
||||
}
|
||||
|
||||
public static AnalysisJobInfo read(DataInput dataInput) throws IOException {
|
||||
String json = Text.readString(dataInput);
|
||||
AnalysisJobInfo analysisJobInfo = GsonUtils.GSON.fromJson(json, AnalysisJobInfo.class);
|
||||
return analysisJobInfo;
|
||||
}
|
||||
}
|
||||
@ -39,6 +39,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.FeMetaVersion;
|
||||
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
@ -56,7 +57,6 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.AnalysisInfo.JobType;
|
||||
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
|
||||
import org.apache.doris.statistics.util.DBObjects;
|
||||
import org.apache.doris.statistics.util.SimpleQueue;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@ -122,9 +122,6 @@ public class AnalysisManager implements Writable {
|
||||
|
||||
private final Map<Long, AnalysisJob> idToAnalysisJob = new ConcurrentHashMap<>();
|
||||
|
||||
// To be deprecated, keep it for meta compatibility now, will remove later.
|
||||
protected SimpleQueue<AnalysisInfo> autoJobs = createSimpleQueue(null, this);
|
||||
|
||||
private final String progressDisplayTemplate = "%d Finished | %d Failed | %d In Progress | %d Total";
|
||||
|
||||
public AnalysisManager() {
|
||||
@ -862,13 +859,16 @@ public class AnalysisManager implements Writable {
|
||||
readAnalysisInfo(in, analysisManager.analysisJobInfoMap, true);
|
||||
readAnalysisInfo(in, analysisManager.analysisTaskInfoMap, false);
|
||||
readIdToTblStats(in, analysisManager.idToTblStats);
|
||||
readAutoJobs(in, analysisManager);
|
||||
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_128) {
|
||||
readAutoJobs(in, analysisManager);
|
||||
}
|
||||
return analysisManager;
|
||||
}
|
||||
|
||||
private static void readAnalysisInfo(DataInput in, Map<Long, AnalysisInfo> map, boolean job) throws IOException {
|
||||
int size = in.readInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
// AnalysisInfo is compatible with AnalysisJobInfo and AnalysisTaskInfo.
|
||||
AnalysisInfo analysisInfo = AnalysisInfo.read(in);
|
||||
// Unfinished manual once job/tasks doesn't need to keep in memory anymore.
|
||||
if (needAbandon(analysisInfo)) {
|
||||
@ -884,6 +884,9 @@ public class AnalysisManager implements Writable {
|
||||
if (analysisInfo == null) {
|
||||
return true;
|
||||
}
|
||||
if (analysisInfo.scheduleType == null || analysisInfo.scheduleType == null || analysisInfo.jobType == null) {
|
||||
return true;
|
||||
}
|
||||
if ((AnalysisState.PENDING.equals(analysisInfo.state) || AnalysisState.RUNNING.equals(analysisInfo.state))
|
||||
&& ScheduleType.ONCE.equals(analysisInfo.scheduleType)
|
||||
&& JobType.MANUAL.equals(analysisInfo.jobType)) {
|
||||
@ -904,7 +907,6 @@ public class AnalysisManager implements Writable {
|
||||
private static void readAutoJobs(DataInput in, AnalysisManager analysisManager) throws IOException {
|
||||
Type type = new TypeToken<LinkedList<AnalysisInfo>>() {}.getType();
|
||||
GsonUtils.GSON.fromJson(Text.readString(in), type);
|
||||
analysisManager.autoJobs = analysisManager.createSimpleQueue(null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -912,7 +914,6 @@ public class AnalysisManager implements Writable {
|
||||
writeJobInfo(out, analysisJobInfoMap);
|
||||
writeJobInfo(out, analysisTaskInfoMap);
|
||||
writeTableStats(out);
|
||||
writeAutoJobsStatus(out);
|
||||
}
|
||||
|
||||
private void writeJobInfo(DataOutput out, Map<Long, AnalysisInfo> infoMap) throws IOException {
|
||||
@ -929,12 +930,6 @@ public class AnalysisManager implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
private void writeAutoJobsStatus(DataOutput output) throws IOException {
|
||||
Type type = new TypeToken<LinkedList<AnalysisInfo>>() {}.getType();
|
||||
String autoJobs = GsonUtils.GSON.toJson(this.autoJobs, type);
|
||||
Text.writeString(output, autoJobs);
|
||||
}
|
||||
|
||||
// For unit test use only.
|
||||
public void addToJobIdTasksMap(long jobId, Map<Long, BaseAnalysisTask> tasks) {
|
||||
analysisJobIdToTaskMap.put(jobId, tasks);
|
||||
@ -980,31 +975,6 @@ public class AnalysisManager implements Writable {
|
||||
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
|
||||
}
|
||||
|
||||
protected void logAutoJob(AnalysisInfo autoJob) {
|
||||
Env.getCurrentEnv().getEditLog().logAutoJob(autoJob);
|
||||
}
|
||||
|
||||
public void replayPersistSysJob(AnalysisInfo analysisInfo) {
|
||||
autoJobs.offer(analysisInfo);
|
||||
}
|
||||
|
||||
protected SimpleQueue<AnalysisInfo> createSimpleQueue(Collection<AnalysisInfo> collection,
|
||||
AnalysisManager analysisManager) {
|
||||
return new SimpleQueue<>(Config.analyze_record_limit,
|
||||
a -> {
|
||||
// FE is not ready when replaying log and operations triggered by replaying
|
||||
// shouldn't be logged again.
|
||||
if (Env.getCurrentEnv().isReady() && Env.getCurrentEnv().isMaster() && !Env.isCheckpointThread()) {
|
||||
analysisManager.logAutoJob(a);
|
||||
}
|
||||
return null;
|
||||
},
|
||||
a -> {
|
||||
// DO NOTHING
|
||||
return null;
|
||||
}, collection);
|
||||
}
|
||||
|
||||
// Remove col stats status from TableStats if failed load some col stats after analyze corresponding column so that
|
||||
// we could make sure it would be analyzed again soon if user or system submit job for that column again.
|
||||
public void removeColStatsStatus(long tblId, String colName) {
|
||||
|
||||
@ -0,0 +1,58 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
public class AnalysisTaskInfo implements Writable {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(AnalysisTaskInfo.class);
|
||||
|
||||
@SerializedName("jobId")
|
||||
public final long jobId;
|
||||
|
||||
@SerializedName("taskId")
|
||||
public final long taskId;
|
||||
|
||||
public AnalysisTaskInfo(long jobId, long taskId) {
|
||||
this.jobId = jobId;
|
||||
this.taskId = taskId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
String json = GsonUtils.GSON.toJson(this);
|
||||
Text.writeString(out, json);
|
||||
}
|
||||
|
||||
public static AnalysisTaskInfo read(DataInput dataInput) throws IOException {
|
||||
String json = Text.readString(dataInput);
|
||||
AnalysisTaskInfo analysisTaskInfo = GsonUtils.GSON.fromJson(json, AnalysisTaskInfo.class);
|
||||
return analysisTaskInfo;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,57 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class NewPartitionLoadedEvent implements Writable {
|
||||
|
||||
@SerializedName("partitionIdToTableId")
|
||||
public final Map<Long, Long> partitionIdToTableId = new HashMap<>();
|
||||
|
||||
@VisibleForTesting
|
||||
public NewPartitionLoadedEvent() {}
|
||||
|
||||
// No need to be thread safe, only publish thread will call this.
|
||||
public void addPartition(long tableId, long partitionId) {
|
||||
partitionIdToTableId.put(tableId, partitionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
String json = GsonUtils.GSON.toJson(this);
|
||||
Text.writeString(out, json);
|
||||
}
|
||||
|
||||
public static NewPartitionLoadedEvent read(DataInput dataInput) throws IOException {
|
||||
String json = Text.readString(dataInput);
|
||||
NewPartitionLoadedEvent newPartitionLoadedEvent = GsonUtils.GSON.fromJson(json, NewPartitionLoadedEvent.class);
|
||||
return newPartitionLoadedEvent;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,61 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class UpdateRowsEvent implements Writable {
|
||||
|
||||
@SerializedName("tableIdToUpdateRows")
|
||||
public final Map<Long, Long> tableIdToUpdateRows = new HashMap<>();
|
||||
|
||||
@VisibleForTesting
|
||||
public UpdateRowsEvent() {}
|
||||
|
||||
// No need to be thread safe, only publish thread will call this.
|
||||
public void addUpdateRows(long tableId, long rows) {
|
||||
if (tableIdToUpdateRows.containsKey(tableId)) {
|
||||
tableIdToUpdateRows.put(tableId, tableIdToUpdateRows.get(tableId) + rows);
|
||||
} else {
|
||||
tableIdToUpdateRows.put(tableId, rows);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
String json = GsonUtils.GSON.toJson(this);
|
||||
Text.writeString(out, json);
|
||||
}
|
||||
|
||||
public static UpdateRowsEvent read(DataInput dataInput) throws IOException {
|
||||
String json = Text.readString(dataInput);
|
||||
UpdateRowsEvent updateRowsEvent = GsonUtils.GSON.fromJson(json, UpdateRowsEvent.class);
|
||||
return updateRowsEvent;
|
||||
}
|
||||
}
|
||||
@ -1,65 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics.util;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.function.Function;
|
||||
|
||||
// Any operation on this structure should be thread-safe
|
||||
public class SimpleQueue<T> extends LinkedList<T> {
|
||||
|
||||
private final long limit;
|
||||
|
||||
private final Function<T, Void> offerFunc;
|
||||
|
||||
private final Function<T, Void> evictFunc;
|
||||
|
||||
|
||||
public SimpleQueue(long limit, Function<T, Void> offerFunc, Function<T, Void> evictFunc) {
|
||||
this.limit = limit;
|
||||
this.offerFunc = offerFunc;
|
||||
this.evictFunc = evictFunc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean offer(T analysisInfo) {
|
||||
while (size() >= limit) {
|
||||
remove();
|
||||
}
|
||||
super.offer(analysisInfo);
|
||||
offerFunc.apply(analysisInfo);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized T remove() {
|
||||
T analysisInfo = super.remove();
|
||||
evictFunc.apply(analysisInfo);
|
||||
return analysisInfo;
|
||||
}
|
||||
|
||||
public SimpleQueue(long limit, Function<T, Void> offerFunc, Function<T, Void> evictFunc, Collection<T> collection) {
|
||||
this(limit, offerFunc, evictFunc);
|
||||
if (collection != null) {
|
||||
for (T e : collection) {
|
||||
offer(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user