From d15bc83de030daed73796bdd3b0ce1ea765f10fa Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Thu, 17 Jan 2019 15:17:51 +0800 Subject: [PATCH] Fix some bugs of alter table operation (#550) 1. Fix bug that failed to query restored table after schema change. 2. Fix bug that failed to add rollup to restored table. 3. Optimize the info of SHOW ALTER TABLE stmt. 4. Optimize the info of some PROCs. 5. Optimize the tablet checker to avoid adding too much task to scheduler. --- .../Administration/admin_show_stmt.md | 1 + .../Data Manipulation/manipulation_stmt.md | 7 +- .../org/apache/doris/alter/AlterHandler.java | 2 +- .../org/apache/doris/alter/RollupHandler.java | 7 +- .../org/apache/doris/alter/RollupJob.java | 24 ++- .../apache/doris/alter/SchemaChangeJob.java | 187 +++++++++++------- .../analysis/AdminShowReplicaStatusStmt.java | 6 +- .../doris/analysis/CreateTableStmt.java | 2 +- .../apache/doris/analysis/DescribeStmt.java | 4 +- .../org/apache/doris/backup/BackupJob.java | 2 +- .../org/apache/doris/backup/RestoreJob.java | 12 +- .../org/apache/doris/catalog/Catalog.java | 5 +- .../apache/doris/catalog/MetadataViewer.java | 9 +- .../org/apache/doris/catalog/Partition.java | 19 ++ .../org/apache/doris/catalog/Replica.java | 12 +- .../org/apache/doris/clone/TabletChecker.java | 28 +-- .../apache/doris/clone/TabletScheduler.java | 28 ++- .../doris/common/proc/IndexInfoProcDir.java | 37 ++-- .../common/proc/IndexSchemaProcNode.java | 2 +- .../doris/common/proc/ReplicasProcNode.java | 3 +- .../doris/common/proc/RollupProcDir.java | 6 +- .../common/proc/SchemaChangeProcNode.java | 6 +- 22 files changed, 262 insertions(+), 147 deletions(-) diff --git a/docs/help/Contents/Administration/admin_show_stmt.md b/docs/help/Contents/Administration/admin_show_stmt.md index b00960271b..1ee64b1e5b 100644 --- a/docs/help/Contents/Administration/admin_show_stmt.md +++ b/docs/help/Contents/Administration/admin_show_stmt.md @@ -15,6 +15,7 @@ OK: replica 处于健康状态 DEAD: replica 所在 Backend 不可用 VERSION_ERROR: replica 数据版本有缺失 + SCHEMA_ERROR: replica 的 schema hash 不正确 MISSING: replica 不存在 ## example diff --git a/docs/help/Contents/Data Manipulation/manipulation_stmt.md b/docs/help/Contents/Data Manipulation/manipulation_stmt.md index 20621e0ef9..59b8f6025c 100644 --- a/docs/help/Contents/Data Manipulation/manipulation_stmt.md +++ b/docs/help/Contents/Data Manipulation/manipulation_stmt.md @@ -716,8 +716,9 @@ SHOW DATA [FROM db_name[.table_name]]; 说明: - 如果不指定 FROM 子句,使用展示当前 db 下细分到各个 table 的数据量 - 如果指定 FROM 子句,则展示 table 下细分到各个 index 的数据量 + 1. 如果不指定 FROM 子句,使用展示当前 db 下细分到各个 table 的数据量 + 2. 如果指定 FROM 子句,则展示 table 下细分到各个 index 的数据量 + 3. 如果想查看各个 Partition 的大小,请参阅 help show partitions ## example 1. 展示默认 db 的各个 table 的数据量及汇总数据量 @@ -743,7 +744,7 @@ SHOW PARTITIONS FROM example_db.table_name PARTITION p1; ## keyword - SHOW,PARTITION + SHOW,PARTITIONS # SHOW TABLET ## description diff --git a/fe/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/src/main/java/org/apache/doris/alter/AlterHandler.java index 2e5a731ffa..2841069f6c 100644 --- a/fe/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -67,7 +67,7 @@ public abstract class AlterHandler extends Daemon { } public AlterHandler(String name) { - super(name); + super(name, 20000); } protected void addAlterJob(AlterJob alterJob) { diff --git a/fe/src/main/java/org/apache/doris/alter/RollupHandler.java b/fe/src/main/java/org/apache/doris/alter/RollupHandler.java index 94c43e9afc..331c79d20f 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupHandler.java @@ -359,9 +359,10 @@ public class RollupHandler extends AlterHandler { // the new replica's init version is -1 until finished history rollup Replica rollupReplica = new Replica(rollupReplicaId, backendId, rollupSchemaHash, ReplicaState.ROLLUP); - // new replica's last failed version is equal to the partition's next version - 1 - // has to set failed verison and version hash here, because there will be no load after rollup - // so that if not set here, last failed version will not be set + // new replica's last failed version should be set to the partition's next version - 1, + // if all go well, the last failed version will be overwritten when rollup task finished and update + // replica version info. + // If not set, there is no other way to know that this replica has failed version. rollupReplica.updateVersionInfo(rollupReplica.getVersion(), rollupReplica.getVersionHash(), partition.getCommittedVersion(), partition.getCommittedVersionHash(), rollupReplica.getLastSuccessVersion(), rollupReplica.getLastSuccessVersionHash()); diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJob.java b/fe/src/main/java/org/apache/doris/alter/RollupJob.java index e1cdb08c76..a458802ec3 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJob.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJob.java @@ -502,7 +502,7 @@ public class RollupJob extends AlterJob { } this.state = JobState.CANCELLED; - if (!Strings.isNullOrEmpty(cancelMsg) && !Strings.isNullOrEmpty(msg)) { + if (Strings.isNullOrEmpty(cancelMsg) && !Strings.isNullOrEmpty(msg)) { this.cancelMsg = msg; } @@ -592,7 +592,6 @@ public class RollupJob extends AlterJob { long rowCount = finishTabletInfo.getRow_count(); // yiguolei: not check version here because the replica's first version will be set by rollup job // the version is not set now - // the finish task thread doesn't own db lock here, maybe a bug? rollupReplica.updateVersionInfo(version, versionHash, dataSize, rowCount); setReplicaFinished(partitionId, rollupReplicaId); @@ -654,7 +653,8 @@ public class RollupJob extends AlterJob { errorReplicas.add(replica); } else if (replica.getLastFailedVersion() > 0 && !partitionIdToUnfinishedReplicaIds.get(partitionId).contains(replica.getId())) { - // if the replica is finished history data, but failed during load, then it is a abnormal + // if the replica has finished converting history data, + // but failed during load, then it is a abnormal. // remove it from replica set // have to use delete replica, it will remove it from tablet inverted index LOG.warn("replica [{}] last failed version > 0 and have finished history rollup job," @@ -670,7 +670,8 @@ public class RollupJob extends AlterJob { } if (rollupTablet.getReplicas().size() < (expectReplicationNum / 2 + 1)) { - cancelMsg = String.format("rollup job[%d] cancelled. tablet[%d] has few health replica." + cancelMsg = String.format( + "rollup job[%d] cancelled. rollup tablet[%d] has few health replica." + " num: %d", tableId, rollupTablet.getId(), replicas.size()); LOG.warn(cancelMsg); return -1; @@ -951,8 +952,11 @@ public class RollupJob extends AlterJob { db.writeUnlock(); } + List list = new ArrayList<>(); + Integer[] arr = list.toArray(new Integer[0]); + this.finishedTime = System.currentTimeMillis(); - LOG.info("finished schema change job: {}", tableId); + LOG.info("finished rollup job: {}", tableId); } @Override @@ -965,9 +969,6 @@ public class RollupJob extends AlterJob { // table name jobInfo.add(tbl.getName()); - // transactionid - jobInfo.add(transactionId); - // create time jobInfo.add(TimeUtils.longToTimeString(createTime)); @@ -976,6 +977,13 @@ public class RollupJob extends AlterJob { // base index and rollup index name jobInfo.add(baseIndexName); jobInfo.add(rollupIndexName); + + // rollup id + jobInfo.add(rollupIndexId); + + // transaction id + jobInfo.add(transactionId); + // job state jobInfo.add(state.name()); diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java index ed754cb6d9..e3f869ed23 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java @@ -533,7 +533,7 @@ public class SchemaChangeJob extends AlterJob { } this.state = JobState.CANCELLED; - if (!Strings.isNullOrEmpty(cancelMsg) && !Strings.isNullOrEmpty(msg)) { + if (Strings.isNullOrEmpty(cancelMsg) && !Strings.isNullOrEmpty(msg)) { this.cancelMsg = msg; } @@ -812,6 +812,7 @@ public class SchemaChangeJob extends AlterJob { this.replicaInfos.put(partitionId, replicaInfo); replica.setState(ReplicaState.NORMAL); + replica.setSchemaHash(schemaHash); // remove tasks for safety AgentTaskQueue.removeTask(replica.getBackendId(), TTaskType.SCHEMA_CHANGE, @@ -892,16 +893,12 @@ public class SchemaChangeJob extends AlterJob { @Override public synchronized void clear() { changedIndexIdToSchema = null; - changedIndexIdToSchemaVersion = null; - changedIndexIdToSchemaHash = null; - changedIndexIdToShortKeyColumnCount = null; resourceInfo = null; replicaInfos = null; unfinishedReplicaIds = null; indexIdToTotalReplicaNum = null; indexIdToFinishedReplicaNum = null; partitionIdToFinishedIndexIds = null; - // backendIdToReplicaIds = null; } @Override @@ -941,7 +938,6 @@ public class SchemaChangeJob extends AlterJob { // reset status to PENDING for resending the tasks in polling thread this.state = JobState.PENDING; - LOG.info("just trace", new Exception()); } finally { db.writeUnlock(); } @@ -1090,67 +1086,86 @@ public class SchemaChangeJob extends AlterJob { @Override public void getJobInfo(List> jobInfos, OlapTable tbl) { - if (state == JobState.FINISHED || state == JobState.CANCELLED) { - List jobInfo = new ArrayList(); - jobInfo.add(tableId); - jobInfo.add(tbl.getName()); - jobInfo.add(transactionId); - jobInfo.add(TimeUtils.longToTimeString(createTime)); - jobInfo.add(TimeUtils.longToTimeString(finishedTime)); - jobInfo.add("N/A"); - jobInfo.add("N/A"); - jobInfo.add(state.name()); - jobInfo.add(cancelMsg); - jobInfo.add("N/A"); + if (changedIndexIdToSchema == null) { + // for compatibility + if (state == JobState.FINISHED || state == JobState.CANCELLED) { + List jobInfo = new ArrayList(); + jobInfo.add(tableId); + jobInfo.add(tbl.getName()); + jobInfo.add(transactionId); + jobInfo.add(TimeUtils.longToTimeString(createTime)); + jobInfo.add(TimeUtils.longToTimeString(finishedTime)); + jobInfo.add("N/A"); + jobInfo.add("N/A"); + jobInfo.add(state.name()); + jobInfo.add(cancelMsg); + jobInfo.add("N/A"); - jobInfos.add(jobInfo); + jobInfos.add(jobInfo); + return; + } + + // in previous version, changedIndexIdToSchema is set to null + // when job is finished or cancelled. + // so if changedIndexIdToSchema == null, the job'state must be FINISHED or CANCELLED return; } - // calc progress and state for each table Map indexProgress = new HashMap(); Map indexState = new HashMap(); - for (Long indexId : getChangedIndexToSchema().keySet()) { + + // calc progress and state for each table + for (Long indexId : changedIndexIdToSchemaVersion.keySet()) { int totalReplicaNum = 0; int finishedReplicaNum = 0; String idxState = IndexState.NORMAL.name(); for (Partition partition : tbl.getPartitions()) { MaterializedIndex index = partition.getIndex(indexId); - int tableReplicaNum = getTotalReplicaNumByIndexId(indexId); - int tableFinishedReplicaNum = getFinishedReplicaNumByIndexId(indexId); - Preconditions.checkState(!(tableReplicaNum == 0 && tableFinishedReplicaNum == -1)); - Preconditions.checkState(tableFinishedReplicaNum <= tableReplicaNum, - tableFinishedReplicaNum + "/" + tableReplicaNum); - totalReplicaNum += tableReplicaNum; - finishedReplicaNum += tableFinishedReplicaNum; + + if (state == JobState.RUNNING) { + int tableReplicaNum = getTotalReplicaNumByIndexId(indexId); + int tableFinishedReplicaNum = getFinishedReplicaNumByIndexId(indexId); + Preconditions.checkState(!(tableReplicaNum == 0 && tableFinishedReplicaNum == -1)); + Preconditions.checkState(tableFinishedReplicaNum <= tableReplicaNum, + tableFinishedReplicaNum + "/" + tableReplicaNum); + totalReplicaNum += tableReplicaNum; + finishedReplicaNum += tableFinishedReplicaNum; + } if (index.getState() != IndexState.NORMAL) { idxState = index.getState().name(); } } - if (Catalog.getInstance().isMaster() - && (state == JobState.RUNNING || state == JobState.FINISHED)) { + + indexState.put(indexId, idxState); + + if (Catalog.getInstance().isMaster() && state == JobState.RUNNING && totalReplicaNum != 0) { indexProgress.put(indexId, (finishedReplicaNum * 100 / totalReplicaNum) + "%"); - indexState.put(indexId, idxState); } else { indexProgress.put(indexId, "0%"); - indexState.put(indexId, idxState); } } - for (Long indexId : getChangedIndexToSchema().keySet()) { + for (Long indexId : changedIndexIdToSchemaVersion.keySet()) { List jobInfo = new ArrayList(); jobInfo.add(tableId); jobInfo.add(tbl.getName()); - jobInfo.add(transactionId); jobInfo.add(TimeUtils.longToTimeString(createTime)); jobInfo.add(TimeUtils.longToTimeString(finishedTime)); - jobInfo.add(tbl.getIndexNameById(indexId)); - jobInfo.add(indexState.get(indexId)); - jobInfo.add(state.name()); + jobInfo.add(tbl.getIndexNameById(indexId)); // index name + jobInfo.add(indexId); + // index schema version and schema hash + jobInfo.add(changedIndexIdToSchemaVersion.get(indexId) + "-" + changedIndexIdToSchemaHash.get(indexId)); + jobInfo.add(indexState.get(indexId)); // index state + jobInfo.add(transactionId); + jobInfo.add(state.name()); // job state + + if (state == JobState.RUNNING) { + jobInfo.add(indexProgress.get(indexId) == null ? "N/A" : indexProgress.get(indexId)); // progress + } + jobInfo.add(cancelMsg); - jobInfo.add(indexProgress.get(indexId)); jobInfos.add(jobInfo); } // end for indexIds @@ -1165,30 +1180,36 @@ public class SchemaChangeJob extends AlterJob { // 'unfinishedReplicaIds', 'indexIdToTotalReplicaNum' and 'indexIdToFinishedReplicaNum' // don't need persist. build it when send tasks + // columns if (changedIndexIdToSchema != null) { out.writeBoolean(true); out.writeInt(changedIndexIdToSchema.size()); for (Entry> entry : changedIndexIdToSchema.entrySet()) { - long indexId = entry.getKey(); - out.writeLong(indexId); + out.writeLong(entry.getKey()); out.writeInt(entry.getValue().size()); for (Column column : entry.getValue()) { column.write(out); } - - // schema version - out.writeInt(changedIndexIdToSchemaVersion.get(indexId)); - - // schema hash - out.writeInt(changedIndexIdToSchemaHash.get(indexId)); - - // short key column count - out.writeShort(changedIndexIdToShortKeyColumnCount.get(indexId)); } } else { out.writeBoolean(false); } + // schema version and hash, and short key + if (changedIndexIdToSchemaVersion != null) { + out.writeBoolean(true); + out.writeInt(changedIndexIdToSchemaVersion.size()); + for (Entry entry : changedIndexIdToSchemaVersion.entrySet()) { + out.writeLong(entry.getKey()); + // schema version + out.writeInt(entry.getValue()); + // schema hash + out.writeInt(changedIndexIdToSchemaHash.get(entry.getKey())); + // short key column count + out.writeShort(changedIndexIdToShortKeyColumnCount.get(entry.getKey())); + } + } + // replicaInfos is saving for restoring schemaChangeJobFinished if (replicaInfos != null) { out.writeBoolean(true); @@ -1232,32 +1253,58 @@ public class SchemaChangeJob extends AlterJob { tableName = Text.readString(in); - boolean has = in.readBoolean(); - if (has) { - int count = in.readInt(); - for (int i = 0; i < count; i++) { - long indexId = in.readLong(); - int columnNum = in.readInt(); - List columns = new LinkedList(); - for (int j = 0; j < columnNum; j++) { - Column column = Column.read(in); - columns.add(column); + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_48) { + if (in.readBoolean()) { + int count = in.readInt(); + for (int i = 0; i < count; i++) { + long indexId = in.readLong(); + int columnNum = in.readInt(); + List columns = new LinkedList(); + for (int j = 0; j < columnNum; j++) { + Column column = Column.read(in); + columns.add(column); + } + changedIndexIdToSchema.put(indexId, columns); + // schema version + changedIndexIdToSchemaVersion.put(indexId, in.readInt()); + // schema hash + changedIndexIdToSchemaHash.put(indexId, in.readInt()); + // short key column count + changedIndexIdToShortKeyColumnCount.put(indexId, in.readShort()); } - changedIndexIdToSchema.put(indexId, columns); + } + } else { + // columns + if (in.readBoolean()) { + int count = in.readInt(); + for (int i = 0; i < count; i++) { + long indexId = in.readLong(); + int columnNum = in.readInt(); + List columns = new LinkedList(); + for (int j = 0; j < columnNum; j++) { + Column column = Column.read(in); + columns.add(column); + } + changedIndexIdToSchema.put(indexId, columns); + } + } - // schema version - changedIndexIdToSchemaVersion.put(indexId, in.readInt()); - - // schema hash - changedIndexIdToSchemaHash.put(indexId, in.readInt()); - - // short key column count - changedIndexIdToShortKeyColumnCount.put(indexId, in.readShort()); + // schema version and hash, and short key + if (in.readBoolean()) { + int count = in.readInt(); + for (int i = 0; i < count; i++) { + long indexId = in.readLong(); + // schema version + changedIndexIdToSchemaVersion.put(indexId, in.readInt()); + // schema hash + changedIndexIdToSchemaHash.put(indexId, in.readInt()); + // short key column count + changedIndexIdToShortKeyColumnCount.put(indexId, in.readShort()); + } } } - has = in.readBoolean(); - if (has) { + if (in.readBoolean()) { int count = in.readInt(); for (int i = 0; i < count; ++i) { long partitionId = in.readLong(); diff --git a/fe/src/main/java/org/apache/doris/analysis/AdminShowReplicaStatusStmt.java b/fe/src/main/java/org/apache/doris/analysis/AdminShowReplicaStatusStmt.java index a09950839f..cd4cdd52e6 100644 --- a/fe/src/main/java/org/apache/doris/analysis/AdminShowReplicaStatusStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/AdminShowReplicaStatusStmt.java @@ -20,8 +20,8 @@ package org.apache.doris.analysis; import org.apache.doris.analysis.BinaryPredicate.Operator; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Replica.ReplicaStatus; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; @@ -40,7 +40,7 @@ import java.util.List; public class AdminShowReplicaStatusStmt extends ShowStmt { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("TabletId").add("ReplicaId").add("BackendId").add("Version").add("LastFailedVersion") - .add("LastSuccessVersion").add("CommittedVersion").add("VersionNum") + .add("LastSuccessVersion").add("CommittedVersion").add("SchemaHash").add("VersionNum") .add("State").add("Status") .build(); @@ -83,7 +83,7 @@ public class AdminShowReplicaStatusStmt extends ShowStmt { if (!analyzeWhere()) { throw new AnalysisException( - "Where clause should looks like: status =/!= 'OK/DEAD/VERSION_ERROR/MISSING'"); + "Where clause should looks like: status =/!= 'OK/DEAD/VERSION_ERROR/SCHEMA_ERROR/MISSING'"); } } diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index e418867b97..01e3086442 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -266,7 +266,7 @@ public class CreateTableStmt extends DdlStmt { rowLengthBytes += columnDef.getType().getStorageLayoutBytes(); } - if (rowLengthBytes > Config.max_layout_length_per_row) { + if (rowLengthBytes > Config.max_layout_length_per_row && engineName.equals("olap")) { throw new AnalysisException("The size of a row (" + rowLengthBytes + ") exceed the maximal row size: " + Config.max_layout_length_per_row); } diff --git a/fe/src/main/java/org/apache/doris/analysis/DescribeStmt.java b/fe/src/main/java/org/apache/doris/analysis/DescribeStmt.java index d61e1f4ee1..b973548552 100644 --- a/fe/src/main/java/org/apache/doris/analysis/DescribeStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/DescribeStmt.java @@ -19,10 +19,10 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Table.TableType; import org.apache.doris.common.AnalysisException; @@ -117,7 +117,7 @@ public class DescribeStmt extends ShowStmt { if (!isAllTables) { // show base table schema only String procString = "/dbs/" + db.getId() + "/" + table.getId() + "/" + TableProcDir.INDEX_SCHEMA - + "/" + table.getName(); + + "/" + table.getId(); node = ProcService.getInstance().open(procString); if (node == null) { diff --git a/fe/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/src/main/java/org/apache/doris/backup/BackupJob.java index 51bb47f895..55b2b4b158 100644 --- a/fe/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/src/main/java/org/apache/doris/backup/BackupJob.java @@ -644,7 +644,7 @@ public class BackupJob extends AbstractJob { Collections.sort(replicaIds); for (Long replicaId : replicaIds) { Replica replica = tablet.getReplicaById(replicaId); - if (replica.getLastFailedVersion() <= 0 && (replica.getVersion() > visibleVersion + if (replica.getLastFailedVersion() < 0 && (replica.getVersion() > visibleVersion || (replica.getVersion() == visibleVersion && replica.getVersionHash() == visibleVersionHash))) { return replica; } diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java index f57c7fb57a..d0f3892db2 100644 --- a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -502,7 +502,7 @@ public class RestoreJob extends AbstractJob { + " in table " + localTbl.getName() + " has different replication num '" + localRangePartInfo.getReplicationNum(localPartition.getId()) - + "' with parition in repository"); + + "' with parition in repository, which is " + restoreReplicationNum); return; } genFileMapping(localOlapTbl, localPartition, tblInfo.id, backupPartInfo, @@ -524,7 +524,7 @@ public class RestoreJob extends AbstractJob { + " in table " + localTbl.getName() + " has different replication num '" + localPartInfo.getReplicationNum(localPartition.getId()) - + "' with parition in repository"); + + "' with parition in repository, which is " + restoreReplicationNum); return; } @@ -723,7 +723,7 @@ public class RestoreJob extends AbstractJob { Range remoteRange = remotePartitionInfo.getRange(remotePartId); DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId); localPartitionInfo.addPartition(restoredPart.getId(), remoteRange, - remoteDataProperty, (short) restoreReplicationNum); + remoteDataProperty, (short) restoreReplicationNum); localTbl.addPartition(restoredPart); } @@ -1116,7 +1116,7 @@ public class RestoreJob extends AbstractJob { state = RestoreJobState.DOWNLOADING; - // No log here + // No edit log here LOG.info("finished to send download tasks to BE. num: {}. {}", batchTask.getTaskNum(), this); return; } @@ -1205,8 +1205,8 @@ public class RestoreJob extends AbstractJob { continue; } - // update partition committed version - part.updateVisibleVersionAndVersionHash(entry.getValue().first, entry.getValue().second); + // update partition visible version + part.updateVersionForRestore(entry.getValue().first, entry.getValue().second); // we also need to update the replica version of these overwritten restored partitions for (MaterializedIndex idx : part.getMaterializedIndices()) { diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index c28a8dad03..0976e16c97 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -3791,8 +3791,8 @@ public class Catalog { // 1. storage type sb.append("\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE).append("\" = \""); - TStorageType storageType = olapTable - .getStorageTypeByIndexId(olapTable.getIndexIdByName(olapTable.getName())); + TStorageType storageType = olapTable.getStorageTypeByIndexId( + olapTable.getIndexIdByName(olapTable.getName())); sb.append(storageType.name()).append("\""); // 2. bloom filter @@ -3919,6 +3919,7 @@ public class Catalog { } createTableStmt.add(sb.toString()); + // 2. add partition if (separatePartition && (table instanceof OlapTable) && ((OlapTable) table).getPartitionInfo().getType() == PartitionType.RANGE diff --git a/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java index e835ca5c26..51727f3b50 100644 --- a/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -80,6 +80,7 @@ public class MetadataViewer { short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId()); for (MaterializedIndex index : partition.getMaterializedIndices()) { + int schemaHash = olapTable.getSchemaHashByIndexId(index.getId()); for (Tablet tablet : index.getTablets()) { long tabletId = tablet.getId(); int count = replicationNum; @@ -91,11 +92,12 @@ public class MetadataViewer { Backend be = infoService.getBackend(replica.getBackendId()); if (be == null || !be.isAvailable()) { status = ReplicaStatus.DEAD; - } else { - if (replica.getVersion() < visibleVersion + } else if (replica.getVersion() < visibleVersion || replica.getLastFailedVersion() > 0) { status = ReplicaStatus.VERSION_ERROR; - } + + } else if (replica.getSchemaHash() != -1 && replica.getSchemaHash() != schemaHash) { + status = ReplicaStatus.SCHEMA_ERROR; } if (filterReplica(status, statusFilter, op)) { @@ -109,6 +111,7 @@ public class MetadataViewer { row.add(String.valueOf(replica.getLastFailedVersion())); row.add(String.valueOf(replica.getLastSuccessVersion())); row.add(String.valueOf(visibleVersion)); + row.add(String.valueOf(replica.getSchemaHash())); row.add(String.valueOf(replica.getVersionCount())); row.add(replica.getState().name()); row.add(status.name()); diff --git a/fe/src/main/java/org/apache/doris/catalog/Partition.java b/fe/src/main/java/org/apache/doris/catalog/Partition.java index 85bef7aac0..942e2bf320 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/src/main/java/org/apache/doris/catalog/Partition.java @@ -24,6 +24,9 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Util; import org.apache.doris.meta.MetaContext; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -37,6 +40,8 @@ import java.util.Map.Entry; * Internal representation of partition-related metadata. */ public class Partition extends MetaObject implements Writable { + private static final Logger LOG = LogManager.getLogger(Partition.class); + public static final long PARTITION_INIT_VERSION = 1L; public static final long PARTITION_INIT_VERSION_HASH = 0L; @@ -111,6 +116,20 @@ public class Partition extends MetaObject implements Writable { this.state = state; } + /* + * If a partition is overwritten by a restore job, we need to reset all version info to + * the restored partition version info》 + */ + public void updateVersionForRestore(long visibleVersion, long visibleVersionHash) { + this.visibleVersion = visibleVersion; + this.visibleVersionHash = visibleVersionHash; + this.nextVersion = this.visibleVersion + 1; + this.nextVersionHash = Util.generateVersionHash(); + this.committedVersionHash = visibleVersionHash; + LOG.info("update partition {} version for restore: visible: {}-{}, next: {}-{}", + visibleVersion, visibleVersionHash, nextVersion, nextVersionHash); + } + public void updateVisibleVersionAndVersionHash(long visibleVersion, long visibleVersionHash) { this.visibleVersion = visibleVersion; this.visibleVersionHash = visibleVersionHash; diff --git a/fe/src/main/java/org/apache/doris/catalog/Replica.java b/fe/src/main/java/org/apache/doris/catalog/Replica.java index ba20e20826..3ff868570f 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/src/main/java/org/apache/doris/catalog/Replica.java @@ -48,7 +48,8 @@ public class Replica implements Writable { OK, // health DEAD, // backend is not available VERSION_ERROR, // missing version - MISSING // replica does not exist + MISSING, // replica does not exist + SCHEMA_ERROR // replica's schema hash does not equal to index's schema hash } private long id; @@ -57,8 +58,8 @@ public class Replica implements Writable { private long versionHash; private int schemaHash = -1; - private long dataSize; - private long rowCount; + private long dataSize = 0; + private long rowCount = 0; private ReplicaState state; private long lastFailedVersion = -1L; @@ -218,6 +219,9 @@ public class Replica implements Writable { long lastFailedVersion, long lastFailedVersionHash, long lastSuccessVersion, long lastSuccessVersionHash, long newDataSize, long newRowCount) { + + LOG.debug("before update: {}", this.toString()); + if (newVersion < this.version) { LOG.warn("replica[" + id + "] new version is lower than meta version. " + newVersion + " vs " + version); // yiguolei: could not find any reason why new version less than this.version should run??? @@ -282,7 +286,7 @@ public class Replica implements Writable { } } - LOG.debug("update {}", this.toString()); + LOG.debug("after update {}", this.toString()); } public synchronized void updateLastFailedVersion(long lastFailedVersion, long lastFailedVersionHash) { diff --git a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java index cddae68f7d..87517493c2 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Table.TableType; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.Tablet.TabletStatus; +import org.apache.doris.clone.TabletScheduler.AddResult; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.common.util.Daemon; @@ -57,10 +58,6 @@ public class TabletChecker extends Daemon { private static final long CHECK_INTERVAL_MS = 20 * 1000L; // 20 second - // if the number of scheduled tablets in TabletScheduler exceed this threshold - // skip checking. - private static final int MAX_SCHEDULING_TABLETS = 5000; - private Catalog catalog; private SystemInfoService infoService; private TabletScheduler tabletScheduler; @@ -156,10 +153,12 @@ public class TabletChecker extends Daemon { */ @Override protected void runOneCycle() { - if (tabletScheduler.getPendingNum() > MAX_SCHEDULING_TABLETS - || tabletScheduler.getRunningNum() > MAX_SCHEDULING_TABLETS) { + int pendingNum = tabletScheduler.getPendingNum(); + int runningNum = tabletScheduler.getRunningNum(); + if (pendingNum > TabletScheduler.MAX_SCHEDULING_TABLETS + || runningNum > TabletScheduler.MAX_SCHEDULING_TABLETS) { LOG.info("too many tablets are being scheduled. pending: {}, running: {}, limit: {}. skip check", - tabletScheduler.getPendingNum(), tabletScheduler.getRunningNum(), MAX_SCHEDULING_TABLETS); + pendingNum, runningNum, TabletScheduler.MAX_SCHEDULING_TABLETS); return; } @@ -178,7 +177,7 @@ public class TabletChecker extends Daemon { long addToSchedulerTabletNum = 0; List dbIds = catalog.getDbIds(); - for (Long dbId : dbIds) { + OUT: for (Long dbId : dbIds) { Database db = catalog.getDb(dbId); if (db == null) { continue; @@ -237,11 +236,16 @@ public class TabletChecker extends Daemon { System.currentTimeMillis()); tabletCtx.setOrigPriority(statusWithPrio.second); - if (tabletScheduler.addTablet(tabletCtx, false /* not force */)) { + AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */); + if (res == AddResult.LIMIT_EXCEED) { + LOG.info("number of scheduling tablets in tablet scheduler" + + " exceed to limit. stop tablet checker"); + break OUT; + } else if (res == AddResult.ADDED) { addToSchedulerTabletNum++; } } - } + } // indices if (prioPartIsHealthy && isInPrios) { // if all replicas in this partition are healthy, remove this partition from @@ -250,8 +254,8 @@ public class TabletChecker extends Daemon { db.getId(), olapTbl.getId(), partition.getId()); removePrios(db.getId(), olapTbl.getId(), Lists.newArrayList(partition.getId())); } - } - } + } // partitions + } // tables } finally { db.readUnlock(); } diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java index 96a57336b3..40bd54a203 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -91,6 +91,10 @@ public class TabletScheduler extends Daemon { public static final int BALANCE_SLOT_NUM_FOR_PATH = 2; + // if the number of scheduled tablets in TabletScheduler exceed this threshold + // skip checking. + public static final int MAX_SCHEDULING_TABLETS = 5000; + /* * Tablet is added to pendingTablets as well it's id in allTabletIds. * TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletIds when @@ -121,6 +125,13 @@ public class TabletScheduler extends Daemon { private SystemInfoService infoService; private TabletInvertedIndex invertedIndex; private TabletSchedulerStat stat; + + // result of adding a tablet to pendingTablets + public enum AddResult { + ADDED, // success to add + ALREADY_IN, // already added, skip + LIMIT_EXCEED // number of pending tablets exceed the limit + } public TabletScheduler(Catalog catalog, SystemInfoService infoService, TabletInvertedIndex invertedIndex, TabletSchedulerStat stat) { @@ -188,14 +199,22 @@ public class TabletScheduler extends Daemon { * add a ready-to-be-scheduled tablet to pendingTablets, if it has not being added before. * if force is true, do not check if tablet is already added before. */ - public synchronized boolean addTablet(TabletSchedCtx tablet, boolean force) { + public synchronized AddResult addTablet(TabletSchedCtx tablet, boolean force) { if (!force && containsTablet(tablet.getTabletId())) { - LOG.info("balance is disabled, skip"); - return false; + return AddResult.ALREADY_IN; } + + // if this is not a BALANCE task, and not a force add, + // and number of scheduling tablets exceed the limit, + // refuse to add. + if (tablet.getType() != TabletSchedCtx.Type.BALANCE && !force + && (pendingTablets.size() > MAX_SCHEDULING_TABLETS || runningTablets.size() > MAX_SCHEDULING_TABLETS)) { + return AddResult.LIMIT_EXCEED; + } + allTabletIds.add(tablet.getTabletId()); pendingTablets.offer(tablet); - return true; + return AddResult.ADDED; } public synchronized boolean containsTablet(long tabletId) { @@ -705,6 +724,7 @@ public class TabletScheduler extends Daemon { */ private void selectTabletsForBalance() { if (Config.disable_balance) { + LOG.info("balance is disabled. skip selecting tablets for balance"); return; } diff --git a/fe/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java index c1c5b50e9f..2e13fec714 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java @@ -39,8 +39,8 @@ import java.util.Set; */ public class IndexInfoProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("IndexName").add("SchemaVersion").add("SchemaHash").add("ShortKeyColumnCount") - .add("StorageType").add("Keys") + .add("IndexId").add("IndexName").add("SchemaVersion").add("SchemaHash") + .add("ShortKeyColumnCount").add("StorageType").add("Keys") .build(); private Database db; @@ -91,15 +91,16 @@ public class IndexInfoProcDir implements ProcDirInterface { } builder.append(Joiner.on(", ").join(columnNames)).append(")"); - result.addRow(Lists.newArrayList(indexName, - String.valueOf(schemaVersion), - String.valueOf(schemaHash), - String.valueOf(shortKeyColumnCount), - storageType.name(), - builder.toString())); + result.addRow(Lists.newArrayList(String.valueOf(indexId), + indexName, + String.valueOf(schemaVersion), + String.valueOf(schemaHash), + String.valueOf(shortKeyColumnCount), + storageType.name(), + builder.toString())); } } else { - result.addRow(Lists.newArrayList(table.getName(), "", "", "", "", "")); + result.addRow(Lists.newArrayList("-1", table.getName(), "", "", "", "", "")); } return result; @@ -114,23 +115,27 @@ public class IndexInfoProcDir implements ProcDirInterface { } @Override - public ProcNodeInterface lookup(String indexName) throws AnalysisException { + public ProcNodeInterface lookup(String idxIdStr) throws AnalysisException { Preconditions.checkNotNull(db); Preconditions.checkNotNull(table); + long idxId; + try { + idxId = Long.valueOf(idxIdStr); + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid index id format: " + idxIdStr); + } + db.readLock(); try { List schema = null; Set bfColumns = null; if (table.getType() == TableType.OLAP) { OlapTable olapTable = (OlapTable) table; - Long indexId = olapTable.getIndexIdByName(indexName); - if (indexId == null) { - throw new AnalysisException("Index[" + indexName + "] does not exist in table[" - + table.getName() + "]"); + schema = olapTable.getSchemaByIndexId(idxId); + if (schema == null) { + throw new AnalysisException("Index " + idxId + " does not exist"); } - schema = olapTable.getSchemaByIndexId(indexId); - bfColumns = olapTable.getCopiedBfColumns(); } else { schema = table.getBaseSchema(); diff --git a/fe/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java index 724bddd42f..570ccc4688 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java @@ -31,7 +31,7 @@ import java.util.List; import java.util.Set; /* - * SHOW PROC /dbs/dbId/tableId/index_schema/"index name" + * SHOW PROC /dbs/dbId/tableId/index_schema/indexId" * show index schema */ public class IndexSchemaProcNode implements ProcNodeInterface { diff --git a/fe/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java index 747bf90921..6ff912155a 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java @@ -34,7 +34,7 @@ public class ReplicasProcNode implements ProcNodeInterface { .add("ReplicaId").add("BackendId").add("Version").add("VersionHash") .add("LstSuccessVersion").add("LstSuccessVersionHash") .add("LstFailedVersion").add("LstFailedVersionHash") - .add("LstFailedTime").add("DataSize").add("RowCount").add("State") + .add("LstFailedTime").add("SchemaHash").add("DataSize").add("RowCount").add("State") .add("VersionCount").add("PathHash") .build(); @@ -59,6 +59,7 @@ public class ReplicasProcNode implements ProcNodeInterface { String.valueOf(replica.getLastFailedVersion()), String.valueOf(replica.getLastFailedVersionHash()), TimeUtils.longToTimeString(replica.getLastFailedTimestamp()), + String.valueOf(replica.getSchemaHash()), String.valueOf(replica.getDataSize()), String.valueOf(replica.getRowCount()), String.valueOf(replica.getState()), diff --git a/fe/src/main/java/org/apache/doris/common/proc/RollupProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/RollupProcDir.java index 8b773b95fc..42c31523c6 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/RollupProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/RollupProcDir.java @@ -32,9 +32,9 @@ import java.util.List; public class RollupProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("JobId").add("TableName").add("TransactionId").add("CreateTime").add("FinishedTime") - .add("BaseIndexName").add("RollupIndexName").add("State").add("Msg") - .add("Progress") + .add("JobId").add("TableName").add("CreateTime").add("FinishedTime") + .add("BaseIndexName").add("RollupIndexName").add("RollupId").add("TransactionId") + .add("State").add("Msg") .add("Progress") .build(); private RollupHandler rollupHandler; diff --git a/fe/src/main/java/org/apache/doris/common/proc/SchemaChangeProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/SchemaChangeProcNode.java index a12a392355..3ec874aa8c 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/SchemaChangeProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/SchemaChangeProcNode.java @@ -29,9 +29,9 @@ import java.util.List; public class SchemaChangeProcNode implements ProcNodeInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("JobId").add("TableName").add("TransactionId").add("CreateTime").add("FinishTime") - .add("IndexName").add("IndexState").add("State").add("Msg") - .add("Progress") + .add("JobId").add("TableName").add("CreateTime").add("FinishTime") + .add("IndexName").add("IndexId").add("SchemaVersion").add("IndexState") + .add("TransactionId").add("State").add("Progress").add("Msg") .build(); private SchemaChangeHandler schemaChangeHandler;