[Bug] Get NPE when executing show alter table statement (#3146)

This commit is contained in:
Mingyu Chen
2020-03-20 09:20:21 +08:00
committed by GitHub
parent b286f4271b
commit d90c892bd8
16 changed files with 213 additions and 63 deletions

View File

@ -17,7 +17,6 @@
package org.apache.doris.alter;
import com.google.common.collect.Queues;
import org.apache.doris.alter.AlterJob.JobState;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.CancelStmt;
@ -35,6 +34,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AlterReplicaTask;
@ -66,7 +66,6 @@ public abstract class AlterHandler extends MasterDaemon {
// queue of alter job v2
protected ConcurrentMap<Long, AlterJobV2> alterJobsV2 = Maps.newConcurrentMap();
protected ConcurrentLinkedQueue<AlterJobV2> finishedOrCancelledAlterJobsV2 = Queues.newConcurrentLinkedQueue();
/**
* lock to perform atomic operations.
@ -108,9 +107,7 @@ public abstract class AlterHandler extends MasterDaemon {
public AlterJobV2 getUnfinishedAlterJobV2ByJobId(long jobId) {
for (AlterJobV2 alterJob : alterJobsV2.values()) {
if (alterJob.getJobId() == jobId
&& alterJob.getJobState() != AlterJobV2.JobState.FINISHED
&& alterJob.getJobState() != AlterJobV2.JobState.CANCELLED) {
if (alterJob.getJobId() == jobId && !alterJob.isDone()) {
return alterJob;
}
}
@ -121,6 +118,8 @@ public abstract class AlterHandler extends MasterDaemon {
return this.alterJobsV2;
}
// should be removed in version 0.13
@Deprecated
private void clearExpireFinishedOrCancelledAlterJobs() {
long curTime = System.currentTimeMillis();
// clean history job
@ -136,15 +135,28 @@ public abstract class AlterHandler extends MasterDaemon {
}
private void clearExpireFinishedOrCancelledAlterJobsV2() {
for (AlterJobV2 alterJobV2 : finishedOrCancelledAlterJobsV2) {
Iterator<Map.Entry<Long, AlterJobV2>> iterator = alterJobsV2.entrySet().iterator();
while (iterator.hasNext()) {
AlterJobV2 alterJobV2 = iterator.next().getValue();
if (alterJobV2.isExpire()) {
finishedOrCancelledAlterJobsV2.remove(alterJobV2);
LOG.info("remove history {} jobV2[{}]. finish at {}", alterJobV2.getType(),
alterJobV2.getTableId(), TimeUtils.longToTimeString(alterJobV2.getFinishedTimeMs()));
iterator.remove();
RemoveAlterJobV2OperationLog log = new RemoveAlterJobV2OperationLog(alterJobV2.getJobId(), alterJobV2.getType());
Catalog.getCurrentCatalog().getEditLog().logRemoveExpiredAlterJobV2(log);
LOG.info("remove expired {} job {}. finish at {}", alterJobV2.getType(),
alterJobV2.getJobId(), TimeUtils.longToTimeString(alterJobV2.getFinishedTimeMs()));
}
}
}
public void replayRemoveAlterJobV2(RemoveAlterJobV2OperationLog log) {
if (alterJobsV2.remove(log.getJobId()) != null) {
LOG.info("replay removing expired {} job {}.", log.getType(), log.getJobId());
} else {
// should not happen, but it does no matter, just add a warn log here to observe
LOG.warn("failed to find {} job {} when replay removing expired job.", log.getType(), log.getJobId());
}
}
@Deprecated
protected void addAlterJob(AlterJob alterJob) {
this.alterJobs.put(alterJob.getTableId(), alterJob);

View File

@ -117,7 +117,7 @@ public abstract class AlterJobV2 implements Writable {
}
public boolean isExpire() {
return (System.currentTimeMillis() - finishedTimeMs) / 1000 > Config.history_job_keep_max_second;
return isDone() && (System.currentTimeMillis() - finishedTimeMs) / 1000 > Config.history_job_keep_max_second;
}
public boolean isDone() {
@ -169,11 +169,6 @@ public abstract class AlterJobV2 implements Writable {
}
}
/**
* clear some date structure in this job to save memory
*/
public abstract void clear();
/**
* should be call before executing the job.
* return false if table is not stable.

View File

@ -131,12 +131,6 @@ public class RollupJobV2 extends AlterJobV2 {
this.storageFormat = storageFormat;
}
@Override
public void clear() {
partitionIdToBaseRollupTabletIdMap = null;
partitionIdToRollupIndex = null;
}
/*
* runPendingJob():
* 1. Create all rollup replicas and wait them finished.

View File

@ -17,7 +17,6 @@
package org.apache.doris.alter;
import com.google.common.collect.ImmutableList;
import org.apache.doris.alter.AlterJob.JobState;
import org.apache.doris.analysis.AddColumnClause;
import org.apache.doris.analysis.AddColumnsClause;
@ -82,6 +81,7 @@ import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -1133,16 +1133,6 @@ public class SchemaChangeHandler extends AlterHandler {
}
private void runAlterJobV2() {
Iterator<Map.Entry<Long, AlterJobV2>> iter = alterJobsV2.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Long, AlterJobV2> entry = iter.next();
AlterJobV2 alterJobV2 = entry.getValue();
if (alterJobV2.isDone()) {
alterJobV2.clear();
finishedOrCancelledAlterJobsV2.add(alterJobV2);
iter.remove();
}
}
alterJobsV2.values().forEach(AlterJobV2::run);
}
@ -1289,7 +1279,6 @@ public class SchemaChangeHandler extends AlterHandler {
private void getAlterJobV2Infos(Database db, List<List<Comparable>> schemaChangeJobInfos) {
getAlterJobV2Infos(db, ImmutableList.copyOf(alterJobsV2.values()), schemaChangeJobInfos);
getAlterJobV2Infos(db, ImmutableList.copyOf(finishedOrCancelledAlterJobsV2), schemaChangeJobInfos);
}
@Deprecated

View File

@ -159,16 +159,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
this.storageFormat = storageFormat;
}
@Override
public void clear() {
partitionIndexMap = null;
indexIdMap = null;
indexIdToName = null;
indexSchemaMap = null;
indexSchemaVersionAndHashMap = null;
indexShortKeyMap = null;
}
/*
* runPendingJob():
* 1. Create all replicas of all shadow indexes and wait them finished.

View File

@ -29,7 +29,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.ProcNodeInterface;
import org.apache.doris.common.proc.ProcService;
import org.apache.doris.common.proc.RollupProcDir;
import org.apache.doris.common.proc.SchemaChangeProcNode;
import org.apache.doris.common.proc.SchemaChangeProcDir;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.qe.ShowResultSetMetaData;
@ -164,7 +164,7 @@ public class ShowAlterStmt extends ShowStmt {
throw new AnalysisException("Should order by column");
}
SlotRef slotRef = (SlotRef) orderByElement.getExpr();
int index = SchemaChangeProcNode.analyzeColumn(slotRef.getColumnName());
int index = SchemaChangeProcDir.analyzeColumn(slotRef.getColumnName());
OrderByPair orderByPair = new OrderByPair(index, !orderByElement.getIsAsc());
orderByPairs.add(orderByPair);
}
@ -242,7 +242,7 @@ public class ShowAlterStmt extends ShowStmt {
if (type == AlterType.ROLLUP) {
titleNames = RollupProcDir.TITLE_NAMES;
} else if (type == AlterType.COLUMN) {
titleNames = SchemaChangeProcNode.TITLE_NAMES;
titleNames = SchemaChangeProcDir.TITLE_NAMES;
}
for (String title : titleNames) {

View File

@ -74,7 +74,7 @@ public class JobsProcDir implements ProcDirInterface {
} else if (jobTypeName.equals(ROLLUP)) {
return new RollupProcDir(catalog.getRollupHandler(), db);
} else if (jobTypeName.equals(SCHEMA_CHANGE)) {
return new SchemaChangeProcNode(catalog.getSchemaChangeHandler(), db);
return new SchemaChangeProcDir(catalog.getSchemaChangeHandler(), db);
} else if (jobTypeName.equals(EXPORT)) {
return new ExportProcNode(catalog.getExportMgr(), db);
} else {

View File

@ -26,14 +26,14 @@ import com.google.common.collect.ImmutableList;
import java.util.List;
// Show unfinished rollup tasks of rollup job v2
public class RollupJobProcDir implements ProcNodeInterface {
public class RollupJobProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("BackendId").add("BaseTabletId").add("RollupTabletId")
.build();
private RollupJobV2 rollupJob;
public RollupJobProcDir(RollupJobV2 rollupJob) {
public RollupJobProcNode(RollupJobV2 rollupJob) {
this.rollupJob = rollupJob;
}

View File

@ -88,7 +88,7 @@ public class RollupProcDir implements ProcDirInterface {
return null;
}
return new RollupJobProcDir((RollupJobV2) job);
return new RollupJobProcNode((RollupJobV2) job);
}
}

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.proc;
import org.apache.doris.alter.SchemaChangeJobV2;
import org.apache.doris.common.AnalysisException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
// Show unfinished schema change tasks of schema change job v2
public class SchemaChangeJobProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("BackendId").add("BaseTabletId").add("RollupTabletId")
.build();
private SchemaChangeJobV2 job;
public SchemaChangeJobProcNode(SchemaChangeJobV2 job) {
this.job = job;
}
@Override
public ProcResult fetchResult() throws AnalysisException {
Preconditions.checkNotNull(job);
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
List<List<String>> unfinishedRollupTasks = job.getUnfinishedTasks(2000);
result.setRows(unfinishedRollupTasks);
return result;
}
}

View File

@ -17,22 +17,25 @@
package org.apache.doris.common.proc;
import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.alter.SchemaChangeJobV2;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.LimitElement;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -41,19 +44,19 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
public class SchemaChangeProcNode implements ProcNodeInterface {
public class SchemaChangeProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("JobId").add("TableName").add("CreateTime").add("FinishTime")
.add("IndexName").add("IndexId").add("OriginIndexId").add("SchemaVersion")
.add("TransactionId").add("State").add("Msg").add("Progress").add("Timeout")
.build();
private static final Logger LOG = LogManager.getLogger(SchemaChangeProcNode.class);
private static final Logger LOG = LogManager.getLogger(SchemaChangeProcDir.class);
private SchemaChangeHandler schemaChangeHandler;
private Database db;
public SchemaChangeProcNode(SchemaChangeHandler schemaChangeHandler, Database db) {
public SchemaChangeProcDir(SchemaChangeHandler schemaChangeHandler, Database db) {
this.schemaChangeHandler = schemaChangeHandler;
this.db = db;
}
@ -183,4 +186,31 @@ public class SchemaChangeProcNode implements ProcNodeInterface {
}
throw new AnalysisException("Title name[" + columnName + "] does not exist");
}
@Override
public boolean register(String name, ProcNodeInterface node) {
return false;
}
@Override
public ProcNodeInterface lookup(String jobIdStr) throws AnalysisException {
if (Strings.isNullOrEmpty(jobIdStr)) {
throw new AnalysisException("Job id is null");
}
long jobId = -1L;
try {
jobId = Long.valueOf(jobIdStr);
} catch (Exception e) {
throw new AnalysisException("Job id is invalid");
}
Preconditions.checkState(jobId != -1L);
AlterJobV2 job = schemaChangeHandler.getUnfinishedAlterJobV2ByJobId(jobId);
if (job == null) {
return null;
}
return new SchemaChangeJobProcNode((SchemaChangeJobV2) job);
}
}

View File

@ -63,6 +63,7 @@ import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.PartitionPersistInfo;
import org.apache.doris.persist.PrivInfo;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.persist.RoutineLoadOperation;
@ -220,7 +221,7 @@ public class JournalEntity implements Writable {
break;
}
case OperationType.OP_BATCH_DROP_ROLLUP: {
data = ((BatchDropInfo) data).read(in);
data = BatchDropInfo.read(in);
isRead = true;
break;
}
@ -522,6 +523,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_REMOVE_ALTER_JOB_V2: {
data = RemoveAlterJobV2OperationLog.read(in);
isRead = true;
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);

View File

@ -728,6 +728,20 @@ public class EditLog {
catalog.replaySetReplicaStatus(log);
break;
}
case OperationType.OP_REMOVE_ALTER_JOB_V2: {
RemoveAlterJobV2OperationLog log = (RemoveAlterJobV2OperationLog) journal.getData();
switch (log.getType()) {
case ROLLUP:
catalog.getRollupHandler().replayRemoveAlterJobV2(log);
break;
case SCHEMA_CHANGE:
catalog.getSchemaChangeHandler().replayRemoveAlterJobV2(log);
break;
default:
break;
}
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@ -1257,4 +1271,8 @@ public class EditLog {
public void logSetReplicaStatus(SetReplicaStatusOperationLog log) {
logEdit(OperationType.OP_SET_REPLICA_STATUS, log);
}
public void logRemoveExpiredAlterJobV2(RemoveAlterJobV2OperationLog log) {
logEdit(OperationType.OP_REMOVE_ALTER_JOB_V2, log);
}
}

View File

@ -60,6 +60,7 @@ public class OperationType {
public static final short OP_MODIFY_DISTRIBUTION_TYPE = 122;
public static final short OP_BATCH_ADD_ROLLUP = 123;
public static final short OP_BATCH_DROP_ROLLUP = 124;
public static final short OP_REMOVE_ALTER_JOB_V2 = 125;
// 30~39 130~139 230~239 ...
// load job for only hadoop load

View File

@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
// edit log from removing alter job v2
public class RemoveAlterJobV2OperationLog implements Writable {
@SerializedName(value = "jobId")
private long jobId;
@SerializedName(value = "type")
private AlterJobV2.JobType type;
public RemoveAlterJobV2OperationLog(long jobId, AlterJobV2.JobType type) {
this.jobId = jobId;
this.type = type;
}
public long getJobId() {
return jobId;
}
public AlterJobV2.JobType getType() {
return type;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static RemoveAlterJobV2OperationLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, RemoveAlterJobV2OperationLog.class);
}
}

View File

@ -101,7 +101,7 @@ import org.apache.doris.common.proc.FrontendsProcNode;
import org.apache.doris.common.proc.LoadProcDir;
import org.apache.doris.common.proc.PartitionsProcDir;
import org.apache.doris.common.proc.ProcNodeInterface;
import org.apache.doris.common.proc.SchemaChangeProcNode;
import org.apache.doris.common.proc.SchemaChangeProcDir;
import org.apache.doris.common.proc.TabletsProcDir;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.LogBuilder;
@ -1069,8 +1069,8 @@ public class ShowExecutor {
Preconditions.checkNotNull(procNodeI);
List<List<String>> rows;
//Only SchemaChangeProc support where/order by/limit syntax
if (procNodeI instanceof SchemaChangeProcNode) {
rows = ((SchemaChangeProcNode) procNodeI).fetchResultByFilter(showStmt.getFilterMap(),
if (procNodeI instanceof SchemaChangeProcDir) {
rows = ((SchemaChangeProcDir) procNodeI).fetchResultByFilter(showStmt.getFilterMap(),
showStmt.getOrderPairs(), showStmt.getLimitElement()).getRows();
} else {
rows = procNodeI.fetchResult().getRows();