diff --git a/docs/help/Contents/Administration/admin_show_stmt.md b/docs/help/Contents/Administration/admin_show_stmt.md index b00960271b..1ee64b1e5b 100644 --- a/docs/help/Contents/Administration/admin_show_stmt.md +++ b/docs/help/Contents/Administration/admin_show_stmt.md @@ -15,6 +15,7 @@ OK: replica 处于健康状态 DEAD: replica 所在 Backend 不可用 VERSION_ERROR: replica 数据版本有缺失 + SCHEMA_ERROR: replica 的 schema hash 不正确 MISSING: replica 不存在 ## example diff --git a/docs/help/Contents/Data Manipulation/manipulation_stmt.md b/docs/help/Contents/Data Manipulation/manipulation_stmt.md index 20621e0ef9..59b8f6025c 100644 --- a/docs/help/Contents/Data Manipulation/manipulation_stmt.md +++ b/docs/help/Contents/Data Manipulation/manipulation_stmt.md @@ -716,8 +716,9 @@ SHOW DATA [FROM db_name[.table_name]]; 说明: - 如果不指定 FROM 子句,使用展示当前 db 下细分到各个 table 的数据量 - 如果指定 FROM 子句,则展示 table 下细分到各个 index 的数据量 + 1. 如果不指定 FROM 子句,使用展示当前 db 下细分到各个 table 的数据量 + 2. 如果指定 FROM 子句,则展示 table 下细分到各个 index 的数据量 + 3. 如果想查看各个 Partition 的大小,请参阅 help show partitions ## example 1. 展示默认 db 的各个 table 的数据量及汇总数据量 @@ -743,7 +744,7 @@ SHOW PARTITIONS FROM example_db.table_name PARTITION p1; ## keyword - SHOW,PARTITION + SHOW,PARTITIONS # SHOW TABLET ## description diff --git a/fe/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/src/main/java/org/apache/doris/alter/AlterHandler.java index 2e5a731ffa..2841069f6c 100644 --- a/fe/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -67,7 +67,7 @@ public abstract class AlterHandler extends Daemon { } public AlterHandler(String name) { - super(name); + super(name, 20000); } protected void addAlterJob(AlterJob alterJob) { diff --git a/fe/src/main/java/org/apache/doris/alter/RollupHandler.java b/fe/src/main/java/org/apache/doris/alter/RollupHandler.java index 94c43e9afc..331c79d20f 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupHandler.java @@ -359,9 +359,10 @@ public class RollupHandler extends AlterHandler { // the new replica's init version is -1 until finished history rollup Replica rollupReplica = new Replica(rollupReplicaId, backendId, rollupSchemaHash, ReplicaState.ROLLUP); - // new replica's last failed version is equal to the partition's next version - 1 - // has to set failed verison and version hash here, because there will be no load after rollup - // so that if not set here, last failed version will not be set + // new replica's last failed version should be set to the partition's next version - 1, + // if all go well, the last failed version will be overwritten when rollup task finished and update + // replica version info. + // If not set, there is no other way to know that this replica has failed version. rollupReplica.updateVersionInfo(rollupReplica.getVersion(), rollupReplica.getVersionHash(), partition.getCommittedVersion(), partition.getCommittedVersionHash(), rollupReplica.getLastSuccessVersion(), rollupReplica.getLastSuccessVersionHash()); diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJob.java b/fe/src/main/java/org/apache/doris/alter/RollupJob.java index e1cdb08c76..a458802ec3 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJob.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJob.java @@ -502,7 +502,7 @@ public class RollupJob extends AlterJob { } this.state = JobState.CANCELLED; - if (!Strings.isNullOrEmpty(cancelMsg) && !Strings.isNullOrEmpty(msg)) { + if (Strings.isNullOrEmpty(cancelMsg) && !Strings.isNullOrEmpty(msg)) { this.cancelMsg = msg; } @@ -592,7 +592,6 @@ public class RollupJob extends AlterJob { long rowCount = finishTabletInfo.getRow_count(); // yiguolei: not check version here because the replica's first version will be set by rollup job // the version is not set now - // the finish task thread doesn't own db lock here, maybe a bug? rollupReplica.updateVersionInfo(version, versionHash, dataSize, rowCount); setReplicaFinished(partitionId, rollupReplicaId); @@ -654,7 +653,8 @@ public class RollupJob extends AlterJob { errorReplicas.add(replica); } else if (replica.getLastFailedVersion() > 0 && !partitionIdToUnfinishedReplicaIds.get(partitionId).contains(replica.getId())) { - // if the replica is finished history data, but failed during load, then it is a abnormal + // if the replica has finished converting history data, + // but failed during load, then it is a abnormal. // remove it from replica set // have to use delete replica, it will remove it from tablet inverted index LOG.warn("replica [{}] last failed version > 0 and have finished history rollup job," @@ -670,7 +670,8 @@ public class RollupJob extends AlterJob { } if (rollupTablet.getReplicas().size() < (expectReplicationNum / 2 + 1)) { - cancelMsg = String.format("rollup job[%d] cancelled. tablet[%d] has few health replica." + cancelMsg = String.format( + "rollup job[%d] cancelled. rollup tablet[%d] has few health replica." + " num: %d", tableId, rollupTablet.getId(), replicas.size()); LOG.warn(cancelMsg); return -1; @@ -951,8 +952,11 @@ public class RollupJob extends AlterJob { db.writeUnlock(); } + List list = new ArrayList<>(); + Integer[] arr = list.toArray(new Integer[0]); + this.finishedTime = System.currentTimeMillis(); - LOG.info("finished schema change job: {}", tableId); + LOG.info("finished rollup job: {}", tableId); } @Override @@ -965,9 +969,6 @@ public class RollupJob extends AlterJob { // table name jobInfo.add(tbl.getName()); - // transactionid - jobInfo.add(transactionId); - // create time jobInfo.add(TimeUtils.longToTimeString(createTime)); @@ -976,6 +977,13 @@ public class RollupJob extends AlterJob { // base index and rollup index name jobInfo.add(baseIndexName); jobInfo.add(rollupIndexName); + + // rollup id + jobInfo.add(rollupIndexId); + + // transaction id + jobInfo.add(transactionId); + // job state jobInfo.add(state.name()); diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java index ed754cb6d9..e3f869ed23 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java @@ -533,7 +533,7 @@ public class SchemaChangeJob extends AlterJob { } this.state = JobState.CANCELLED; - if (!Strings.isNullOrEmpty(cancelMsg) && !Strings.isNullOrEmpty(msg)) { + if (Strings.isNullOrEmpty(cancelMsg) && !Strings.isNullOrEmpty(msg)) { this.cancelMsg = msg; } @@ -812,6 +812,7 @@ public class SchemaChangeJob extends AlterJob { this.replicaInfos.put(partitionId, replicaInfo); replica.setState(ReplicaState.NORMAL); + replica.setSchemaHash(schemaHash); // remove tasks for safety AgentTaskQueue.removeTask(replica.getBackendId(), TTaskType.SCHEMA_CHANGE, @@ -892,16 +893,12 @@ public class SchemaChangeJob extends AlterJob { @Override public synchronized void clear() { changedIndexIdToSchema = null; - changedIndexIdToSchemaVersion = null; - changedIndexIdToSchemaHash = null; - changedIndexIdToShortKeyColumnCount = null; resourceInfo = null; replicaInfos = null; unfinishedReplicaIds = null; indexIdToTotalReplicaNum = null; indexIdToFinishedReplicaNum = null; partitionIdToFinishedIndexIds = null; - // backendIdToReplicaIds = null; } @Override @@ -941,7 +938,6 @@ public class SchemaChangeJob extends AlterJob { // reset status to PENDING for resending the tasks in polling thread this.state = JobState.PENDING; - LOG.info("just trace", new Exception()); } finally { db.writeUnlock(); } @@ -1090,67 +1086,86 @@ public class SchemaChangeJob extends AlterJob { @Override public void getJobInfo(List> jobInfos, OlapTable tbl) { - if (state == JobState.FINISHED || state == JobState.CANCELLED) { - List jobInfo = new ArrayList(); - jobInfo.add(tableId); - jobInfo.add(tbl.getName()); - jobInfo.add(transactionId); - jobInfo.add(TimeUtils.longToTimeString(createTime)); - jobInfo.add(TimeUtils.longToTimeString(finishedTime)); - jobInfo.add("N/A"); - jobInfo.add("N/A"); - jobInfo.add(state.name()); - jobInfo.add(cancelMsg); - jobInfo.add("N/A"); + if (changedIndexIdToSchema == null) { + // for compatibility + if (state == JobState.FINISHED || state == JobState.CANCELLED) { + List jobInfo = new ArrayList(); + jobInfo.add(tableId); + jobInfo.add(tbl.getName()); + jobInfo.add(transactionId); + jobInfo.add(TimeUtils.longToTimeString(createTime)); + jobInfo.add(TimeUtils.longToTimeString(finishedTime)); + jobInfo.add("N/A"); + jobInfo.add("N/A"); + jobInfo.add(state.name()); + jobInfo.add(cancelMsg); + jobInfo.add("N/A"); - jobInfos.add(jobInfo); + jobInfos.add(jobInfo); + return; + } + + // in previous version, changedIndexIdToSchema is set to null + // when job is finished or cancelled. + // so if changedIndexIdToSchema == null, the job'state must be FINISHED or CANCELLED return; } - // calc progress and state for each table Map indexProgress = new HashMap(); Map indexState = new HashMap(); - for (Long indexId : getChangedIndexToSchema().keySet()) { + + // calc progress and state for each table + for (Long indexId : changedIndexIdToSchemaVersion.keySet()) { int totalReplicaNum = 0; int finishedReplicaNum = 0; String idxState = IndexState.NORMAL.name(); for (Partition partition : tbl.getPartitions()) { MaterializedIndex index = partition.getIndex(indexId); - int tableReplicaNum = getTotalReplicaNumByIndexId(indexId); - int tableFinishedReplicaNum = getFinishedReplicaNumByIndexId(indexId); - Preconditions.checkState(!(tableReplicaNum == 0 && tableFinishedReplicaNum == -1)); - Preconditions.checkState(tableFinishedReplicaNum <= tableReplicaNum, - tableFinishedReplicaNum + "/" + tableReplicaNum); - totalReplicaNum += tableReplicaNum; - finishedReplicaNum += tableFinishedReplicaNum; + + if (state == JobState.RUNNING) { + int tableReplicaNum = getTotalReplicaNumByIndexId(indexId); + int tableFinishedReplicaNum = getFinishedReplicaNumByIndexId(indexId); + Preconditions.checkState(!(tableReplicaNum == 0 && tableFinishedReplicaNum == -1)); + Preconditions.checkState(tableFinishedReplicaNum <= tableReplicaNum, + tableFinishedReplicaNum + "/" + tableReplicaNum); + totalReplicaNum += tableReplicaNum; + finishedReplicaNum += tableFinishedReplicaNum; + } if (index.getState() != IndexState.NORMAL) { idxState = index.getState().name(); } } - if (Catalog.getInstance().isMaster() - && (state == JobState.RUNNING || state == JobState.FINISHED)) { + + indexState.put(indexId, idxState); + + if (Catalog.getInstance().isMaster() && state == JobState.RUNNING && totalReplicaNum != 0) { indexProgress.put(indexId, (finishedReplicaNum * 100 / totalReplicaNum) + "%"); - indexState.put(indexId, idxState); } else { indexProgress.put(indexId, "0%"); - indexState.put(indexId, idxState); } } - for (Long indexId : getChangedIndexToSchema().keySet()) { + for (Long indexId : changedIndexIdToSchemaVersion.keySet()) { List jobInfo = new ArrayList(); jobInfo.add(tableId); jobInfo.add(tbl.getName()); - jobInfo.add(transactionId); jobInfo.add(TimeUtils.longToTimeString(createTime)); jobInfo.add(TimeUtils.longToTimeString(finishedTime)); - jobInfo.add(tbl.getIndexNameById(indexId)); - jobInfo.add(indexState.get(indexId)); - jobInfo.add(state.name()); + jobInfo.add(tbl.getIndexNameById(indexId)); // index name + jobInfo.add(indexId); + // index schema version and schema hash + jobInfo.add(changedIndexIdToSchemaVersion.get(indexId) + "-" + changedIndexIdToSchemaHash.get(indexId)); + jobInfo.add(indexState.get(indexId)); // index state + jobInfo.add(transactionId); + jobInfo.add(state.name()); // job state + + if (state == JobState.RUNNING) { + jobInfo.add(indexProgress.get(indexId) == null ? "N/A" : indexProgress.get(indexId)); // progress + } + jobInfo.add(cancelMsg); - jobInfo.add(indexProgress.get(indexId)); jobInfos.add(jobInfo); } // end for indexIds @@ -1165,30 +1180,36 @@ public class SchemaChangeJob extends AlterJob { // 'unfinishedReplicaIds', 'indexIdToTotalReplicaNum' and 'indexIdToFinishedReplicaNum' // don't need persist. build it when send tasks + // columns if (changedIndexIdToSchema != null) { out.writeBoolean(true); out.writeInt(changedIndexIdToSchema.size()); for (Entry> entry : changedIndexIdToSchema.entrySet()) { - long indexId = entry.getKey(); - out.writeLong(indexId); + out.writeLong(entry.getKey()); out.writeInt(entry.getValue().size()); for (Column column : entry.getValue()) { column.write(out); } - - // schema version - out.writeInt(changedIndexIdToSchemaVersion.get(indexId)); - - // schema hash - out.writeInt(changedIndexIdToSchemaHash.get(indexId)); - - // short key column count - out.writeShort(changedIndexIdToShortKeyColumnCount.get(indexId)); } } else { out.writeBoolean(false); } + // schema version and hash, and short key + if (changedIndexIdToSchemaVersion != null) { + out.writeBoolean(true); + out.writeInt(changedIndexIdToSchemaVersion.size()); + for (Entry entry : changedIndexIdToSchemaVersion.entrySet()) { + out.writeLong(entry.getKey()); + // schema version + out.writeInt(entry.getValue()); + // schema hash + out.writeInt(changedIndexIdToSchemaHash.get(entry.getKey())); + // short key column count + out.writeShort(changedIndexIdToShortKeyColumnCount.get(entry.getKey())); + } + } + // replicaInfos is saving for restoring schemaChangeJobFinished if (replicaInfos != null) { out.writeBoolean(true); @@ -1232,32 +1253,58 @@ public class SchemaChangeJob extends AlterJob { tableName = Text.readString(in); - boolean has = in.readBoolean(); - if (has) { - int count = in.readInt(); - for (int i = 0; i < count; i++) { - long indexId = in.readLong(); - int columnNum = in.readInt(); - List columns = new LinkedList(); - for (int j = 0; j < columnNum; j++) { - Column column = Column.read(in); - columns.add(column); + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_48) { + if (in.readBoolean()) { + int count = in.readInt(); + for (int i = 0; i < count; i++) { + long indexId = in.readLong(); + int columnNum = in.readInt(); + List columns = new LinkedList(); + for (int j = 0; j < columnNum; j++) { + Column column = Column.read(in); + columns.add(column); + } + changedIndexIdToSchema.put(indexId, columns); + // schema version + changedIndexIdToSchemaVersion.put(indexId, in.readInt()); + // schema hash + changedIndexIdToSchemaHash.put(indexId, in.readInt()); + // short key column count + changedIndexIdToShortKeyColumnCount.put(indexId, in.readShort()); } - changedIndexIdToSchema.put(indexId, columns); + } + } else { + // columns + if (in.readBoolean()) { + int count = in.readInt(); + for (int i = 0; i < count; i++) { + long indexId = in.readLong(); + int columnNum = in.readInt(); + List columns = new LinkedList(); + for (int j = 0; j < columnNum; j++) { + Column column = Column.read(in); + columns.add(column); + } + changedIndexIdToSchema.put(indexId, columns); + } + } - // schema version - changedIndexIdToSchemaVersion.put(indexId, in.readInt()); - - // schema hash - changedIndexIdToSchemaHash.put(indexId, in.readInt()); - - // short key column count - changedIndexIdToShortKeyColumnCount.put(indexId, in.readShort()); + // schema version and hash, and short key + if (in.readBoolean()) { + int count = in.readInt(); + for (int i = 0; i < count; i++) { + long indexId = in.readLong(); + // schema version + changedIndexIdToSchemaVersion.put(indexId, in.readInt()); + // schema hash + changedIndexIdToSchemaHash.put(indexId, in.readInt()); + // short key column count + changedIndexIdToShortKeyColumnCount.put(indexId, in.readShort()); + } } } - has = in.readBoolean(); - if (has) { + if (in.readBoolean()) { int count = in.readInt(); for (int i = 0; i < count; ++i) { long partitionId = in.readLong(); diff --git a/fe/src/main/java/org/apache/doris/analysis/AdminShowReplicaStatusStmt.java b/fe/src/main/java/org/apache/doris/analysis/AdminShowReplicaStatusStmt.java index a09950839f..cd4cdd52e6 100644 --- a/fe/src/main/java/org/apache/doris/analysis/AdminShowReplicaStatusStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/AdminShowReplicaStatusStmt.java @@ -20,8 +20,8 @@ package org.apache.doris.analysis; import org.apache.doris.analysis.BinaryPredicate.Operator; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Replica.ReplicaStatus; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; @@ -40,7 +40,7 @@ import java.util.List; public class AdminShowReplicaStatusStmt extends ShowStmt { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("TabletId").add("ReplicaId").add("BackendId").add("Version").add("LastFailedVersion") - .add("LastSuccessVersion").add("CommittedVersion").add("VersionNum") + .add("LastSuccessVersion").add("CommittedVersion").add("SchemaHash").add("VersionNum") .add("State").add("Status") .build(); @@ -83,7 +83,7 @@ public class AdminShowReplicaStatusStmt extends ShowStmt { if (!analyzeWhere()) { throw new AnalysisException( - "Where clause should looks like: status =/!= 'OK/DEAD/VERSION_ERROR/MISSING'"); + "Where clause should looks like: status =/!= 'OK/DEAD/VERSION_ERROR/SCHEMA_ERROR/MISSING'"); } } diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index e418867b97..01e3086442 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -266,7 +266,7 @@ public class CreateTableStmt extends DdlStmt { rowLengthBytes += columnDef.getType().getStorageLayoutBytes(); } - if (rowLengthBytes > Config.max_layout_length_per_row) { + if (rowLengthBytes > Config.max_layout_length_per_row && engineName.equals("olap")) { throw new AnalysisException("The size of a row (" + rowLengthBytes + ") exceed the maximal row size: " + Config.max_layout_length_per_row); } diff --git a/fe/src/main/java/org/apache/doris/analysis/DescribeStmt.java b/fe/src/main/java/org/apache/doris/analysis/DescribeStmt.java index d61e1f4ee1..b973548552 100644 --- a/fe/src/main/java/org/apache/doris/analysis/DescribeStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/DescribeStmt.java @@ -19,10 +19,10 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Table.TableType; import org.apache.doris.common.AnalysisException; @@ -117,7 +117,7 @@ public class DescribeStmt extends ShowStmt { if (!isAllTables) { // show base table schema only String procString = "/dbs/" + db.getId() + "/" + table.getId() + "/" + TableProcDir.INDEX_SCHEMA - + "/" + table.getName(); + + "/" + table.getId(); node = ProcService.getInstance().open(procString); if (node == null) { diff --git a/fe/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/src/main/java/org/apache/doris/backup/BackupJob.java index 51bb47f895..55b2b4b158 100644 --- a/fe/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/src/main/java/org/apache/doris/backup/BackupJob.java @@ -644,7 +644,7 @@ public class BackupJob extends AbstractJob { Collections.sort(replicaIds); for (Long replicaId : replicaIds) { Replica replica = tablet.getReplicaById(replicaId); - if (replica.getLastFailedVersion() <= 0 && (replica.getVersion() > visibleVersion + if (replica.getLastFailedVersion() < 0 && (replica.getVersion() > visibleVersion || (replica.getVersion() == visibleVersion && replica.getVersionHash() == visibleVersionHash))) { return replica; } diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java index f57c7fb57a..d0f3892db2 100644 --- a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -502,7 +502,7 @@ public class RestoreJob extends AbstractJob { + " in table " + localTbl.getName() + " has different replication num '" + localRangePartInfo.getReplicationNum(localPartition.getId()) - + "' with parition in repository"); + + "' with parition in repository, which is " + restoreReplicationNum); return; } genFileMapping(localOlapTbl, localPartition, tblInfo.id, backupPartInfo, @@ -524,7 +524,7 @@ public class RestoreJob extends AbstractJob { + " in table " + localTbl.getName() + " has different replication num '" + localPartInfo.getReplicationNum(localPartition.getId()) - + "' with parition in repository"); + + "' with parition in repository, which is " + restoreReplicationNum); return; } @@ -723,7 +723,7 @@ public class RestoreJob extends AbstractJob { Range remoteRange = remotePartitionInfo.getRange(remotePartId); DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId); localPartitionInfo.addPartition(restoredPart.getId(), remoteRange, - remoteDataProperty, (short) restoreReplicationNum); + remoteDataProperty, (short) restoreReplicationNum); localTbl.addPartition(restoredPart); } @@ -1116,7 +1116,7 @@ public class RestoreJob extends AbstractJob { state = RestoreJobState.DOWNLOADING; - // No log here + // No edit log here LOG.info("finished to send download tasks to BE. num: {}. {}", batchTask.getTaskNum(), this); return; } @@ -1205,8 +1205,8 @@ public class RestoreJob extends AbstractJob { continue; } - // update partition committed version - part.updateVisibleVersionAndVersionHash(entry.getValue().first, entry.getValue().second); + // update partition visible version + part.updateVersionForRestore(entry.getValue().first, entry.getValue().second); // we also need to update the replica version of these overwritten restored partitions for (MaterializedIndex idx : part.getMaterializedIndices()) { diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index c28a8dad03..0976e16c97 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -3791,8 +3791,8 @@ public class Catalog { // 1. storage type sb.append("\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE).append("\" = \""); - TStorageType storageType = olapTable - .getStorageTypeByIndexId(olapTable.getIndexIdByName(olapTable.getName())); + TStorageType storageType = olapTable.getStorageTypeByIndexId( + olapTable.getIndexIdByName(olapTable.getName())); sb.append(storageType.name()).append("\""); // 2. bloom filter @@ -3919,6 +3919,7 @@ public class Catalog { } createTableStmt.add(sb.toString()); + // 2. add partition if (separatePartition && (table instanceof OlapTable) && ((OlapTable) table).getPartitionInfo().getType() == PartitionType.RANGE diff --git a/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java index e835ca5c26..51727f3b50 100644 --- a/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -80,6 +80,7 @@ public class MetadataViewer { short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId()); for (MaterializedIndex index : partition.getMaterializedIndices()) { + int schemaHash = olapTable.getSchemaHashByIndexId(index.getId()); for (Tablet tablet : index.getTablets()) { long tabletId = tablet.getId(); int count = replicationNum; @@ -91,11 +92,12 @@ public class MetadataViewer { Backend be = infoService.getBackend(replica.getBackendId()); if (be == null || !be.isAvailable()) { status = ReplicaStatus.DEAD; - } else { - if (replica.getVersion() < visibleVersion + } else if (replica.getVersion() < visibleVersion || replica.getLastFailedVersion() > 0) { status = ReplicaStatus.VERSION_ERROR; - } + + } else if (replica.getSchemaHash() != -1 && replica.getSchemaHash() != schemaHash) { + status = ReplicaStatus.SCHEMA_ERROR; } if (filterReplica(status, statusFilter, op)) { @@ -109,6 +111,7 @@ public class MetadataViewer { row.add(String.valueOf(replica.getLastFailedVersion())); row.add(String.valueOf(replica.getLastSuccessVersion())); row.add(String.valueOf(visibleVersion)); + row.add(String.valueOf(replica.getSchemaHash())); row.add(String.valueOf(replica.getVersionCount())); row.add(replica.getState().name()); row.add(status.name()); diff --git a/fe/src/main/java/org/apache/doris/catalog/Partition.java b/fe/src/main/java/org/apache/doris/catalog/Partition.java index 85bef7aac0..942e2bf320 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/src/main/java/org/apache/doris/catalog/Partition.java @@ -24,6 +24,9 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Util; import org.apache.doris.meta.MetaContext; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -37,6 +40,8 @@ import java.util.Map.Entry; * Internal representation of partition-related metadata. */ public class Partition extends MetaObject implements Writable { + private static final Logger LOG = LogManager.getLogger(Partition.class); + public static final long PARTITION_INIT_VERSION = 1L; public static final long PARTITION_INIT_VERSION_HASH = 0L; @@ -111,6 +116,20 @@ public class Partition extends MetaObject implements Writable { this.state = state; } + /* + * If a partition is overwritten by a restore job, we need to reset all version info to + * the restored partition version info》 + */ + public void updateVersionForRestore(long visibleVersion, long visibleVersionHash) { + this.visibleVersion = visibleVersion; + this.visibleVersionHash = visibleVersionHash; + this.nextVersion = this.visibleVersion + 1; + this.nextVersionHash = Util.generateVersionHash(); + this.committedVersionHash = visibleVersionHash; + LOG.info("update partition {} version for restore: visible: {}-{}, next: {}-{}", + visibleVersion, visibleVersionHash, nextVersion, nextVersionHash); + } + public void updateVisibleVersionAndVersionHash(long visibleVersion, long visibleVersionHash) { this.visibleVersion = visibleVersion; this.visibleVersionHash = visibleVersionHash; diff --git a/fe/src/main/java/org/apache/doris/catalog/Replica.java b/fe/src/main/java/org/apache/doris/catalog/Replica.java index ba20e20826..3ff868570f 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/src/main/java/org/apache/doris/catalog/Replica.java @@ -48,7 +48,8 @@ public class Replica implements Writable { OK, // health DEAD, // backend is not available VERSION_ERROR, // missing version - MISSING // replica does not exist + MISSING, // replica does not exist + SCHEMA_ERROR // replica's schema hash does not equal to index's schema hash } private long id; @@ -57,8 +58,8 @@ public class Replica implements Writable { private long versionHash; private int schemaHash = -1; - private long dataSize; - private long rowCount; + private long dataSize = 0; + private long rowCount = 0; private ReplicaState state; private long lastFailedVersion = -1L; @@ -218,6 +219,9 @@ public class Replica implements Writable { long lastFailedVersion, long lastFailedVersionHash, long lastSuccessVersion, long lastSuccessVersionHash, long newDataSize, long newRowCount) { + + LOG.debug("before update: {}", this.toString()); + if (newVersion < this.version) { LOG.warn("replica[" + id + "] new version is lower than meta version. " + newVersion + " vs " + version); // yiguolei: could not find any reason why new version less than this.version should run??? @@ -282,7 +286,7 @@ public class Replica implements Writable { } } - LOG.debug("update {}", this.toString()); + LOG.debug("after update {}", this.toString()); } public synchronized void updateLastFailedVersion(long lastFailedVersion, long lastFailedVersionHash) { diff --git a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java index cddae68f7d..87517493c2 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Table.TableType; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.Tablet.TabletStatus; +import org.apache.doris.clone.TabletScheduler.AddResult; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.common.util.Daemon; @@ -57,10 +58,6 @@ public class TabletChecker extends Daemon { private static final long CHECK_INTERVAL_MS = 20 * 1000L; // 20 second - // if the number of scheduled tablets in TabletScheduler exceed this threshold - // skip checking. - private static final int MAX_SCHEDULING_TABLETS = 5000; - private Catalog catalog; private SystemInfoService infoService; private TabletScheduler tabletScheduler; @@ -156,10 +153,12 @@ public class TabletChecker extends Daemon { */ @Override protected void runOneCycle() { - if (tabletScheduler.getPendingNum() > MAX_SCHEDULING_TABLETS - || tabletScheduler.getRunningNum() > MAX_SCHEDULING_TABLETS) { + int pendingNum = tabletScheduler.getPendingNum(); + int runningNum = tabletScheduler.getRunningNum(); + if (pendingNum > TabletScheduler.MAX_SCHEDULING_TABLETS + || runningNum > TabletScheduler.MAX_SCHEDULING_TABLETS) { LOG.info("too many tablets are being scheduled. pending: {}, running: {}, limit: {}. skip check", - tabletScheduler.getPendingNum(), tabletScheduler.getRunningNum(), MAX_SCHEDULING_TABLETS); + pendingNum, runningNum, TabletScheduler.MAX_SCHEDULING_TABLETS); return; } @@ -178,7 +177,7 @@ public class TabletChecker extends Daemon { long addToSchedulerTabletNum = 0; List dbIds = catalog.getDbIds(); - for (Long dbId : dbIds) { + OUT: for (Long dbId : dbIds) { Database db = catalog.getDb(dbId); if (db == null) { continue; @@ -237,11 +236,16 @@ public class TabletChecker extends Daemon { System.currentTimeMillis()); tabletCtx.setOrigPriority(statusWithPrio.second); - if (tabletScheduler.addTablet(tabletCtx, false /* not force */)) { + AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */); + if (res == AddResult.LIMIT_EXCEED) { + LOG.info("number of scheduling tablets in tablet scheduler" + + " exceed to limit. stop tablet checker"); + break OUT; + } else if (res == AddResult.ADDED) { addToSchedulerTabletNum++; } } - } + } // indices if (prioPartIsHealthy && isInPrios) { // if all replicas in this partition are healthy, remove this partition from @@ -250,8 +254,8 @@ public class TabletChecker extends Daemon { db.getId(), olapTbl.getId(), partition.getId()); removePrios(db.getId(), olapTbl.getId(), Lists.newArrayList(partition.getId())); } - } - } + } // partitions + } // tables } finally { db.readUnlock(); } diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java index 96a57336b3..40bd54a203 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -91,6 +91,10 @@ public class TabletScheduler extends Daemon { public static final int BALANCE_SLOT_NUM_FOR_PATH = 2; + // if the number of scheduled tablets in TabletScheduler exceed this threshold + // skip checking. + public static final int MAX_SCHEDULING_TABLETS = 5000; + /* * Tablet is added to pendingTablets as well it's id in allTabletIds. * TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletIds when @@ -121,6 +125,13 @@ public class TabletScheduler extends Daemon { private SystemInfoService infoService; private TabletInvertedIndex invertedIndex; private TabletSchedulerStat stat; + + // result of adding a tablet to pendingTablets + public enum AddResult { + ADDED, // success to add + ALREADY_IN, // already added, skip + LIMIT_EXCEED // number of pending tablets exceed the limit + } public TabletScheduler(Catalog catalog, SystemInfoService infoService, TabletInvertedIndex invertedIndex, TabletSchedulerStat stat) { @@ -188,14 +199,22 @@ public class TabletScheduler extends Daemon { * add a ready-to-be-scheduled tablet to pendingTablets, if it has not being added before. * if force is true, do not check if tablet is already added before. */ - public synchronized boolean addTablet(TabletSchedCtx tablet, boolean force) { + public synchronized AddResult addTablet(TabletSchedCtx tablet, boolean force) { if (!force && containsTablet(tablet.getTabletId())) { - LOG.info("balance is disabled, skip"); - return false; + return AddResult.ALREADY_IN; } + + // if this is not a BALANCE task, and not a force add, + // and number of scheduling tablets exceed the limit, + // refuse to add. + if (tablet.getType() != TabletSchedCtx.Type.BALANCE && !force + && (pendingTablets.size() > MAX_SCHEDULING_TABLETS || runningTablets.size() > MAX_SCHEDULING_TABLETS)) { + return AddResult.LIMIT_EXCEED; + } + allTabletIds.add(tablet.getTabletId()); pendingTablets.offer(tablet); - return true; + return AddResult.ADDED; } public synchronized boolean containsTablet(long tabletId) { @@ -705,6 +724,7 @@ public class TabletScheduler extends Daemon { */ private void selectTabletsForBalance() { if (Config.disable_balance) { + LOG.info("balance is disabled. skip selecting tablets for balance"); return; } diff --git a/fe/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java index c1c5b50e9f..2e13fec714 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java @@ -39,8 +39,8 @@ import java.util.Set; */ public class IndexInfoProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("IndexName").add("SchemaVersion").add("SchemaHash").add("ShortKeyColumnCount") - .add("StorageType").add("Keys") + .add("IndexId").add("IndexName").add("SchemaVersion").add("SchemaHash") + .add("ShortKeyColumnCount").add("StorageType").add("Keys") .build(); private Database db; @@ -91,15 +91,16 @@ public class IndexInfoProcDir implements ProcDirInterface { } builder.append(Joiner.on(", ").join(columnNames)).append(")"); - result.addRow(Lists.newArrayList(indexName, - String.valueOf(schemaVersion), - String.valueOf(schemaHash), - String.valueOf(shortKeyColumnCount), - storageType.name(), - builder.toString())); + result.addRow(Lists.newArrayList(String.valueOf(indexId), + indexName, + String.valueOf(schemaVersion), + String.valueOf(schemaHash), + String.valueOf(shortKeyColumnCount), + storageType.name(), + builder.toString())); } } else { - result.addRow(Lists.newArrayList(table.getName(), "", "", "", "", "")); + result.addRow(Lists.newArrayList("-1", table.getName(), "", "", "", "", "")); } return result; @@ -114,23 +115,27 @@ public class IndexInfoProcDir implements ProcDirInterface { } @Override - public ProcNodeInterface lookup(String indexName) throws AnalysisException { + public ProcNodeInterface lookup(String idxIdStr) throws AnalysisException { Preconditions.checkNotNull(db); Preconditions.checkNotNull(table); + long idxId; + try { + idxId = Long.valueOf(idxIdStr); + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid index id format: " + idxIdStr); + } + db.readLock(); try { List schema = null; Set bfColumns = null; if (table.getType() == TableType.OLAP) { OlapTable olapTable = (OlapTable) table; - Long indexId = olapTable.getIndexIdByName(indexName); - if (indexId == null) { - throw new AnalysisException("Index[" + indexName + "] does not exist in table[" - + table.getName() + "]"); + schema = olapTable.getSchemaByIndexId(idxId); + if (schema == null) { + throw new AnalysisException("Index " + idxId + " does not exist"); } - schema = olapTable.getSchemaByIndexId(indexId); - bfColumns = olapTable.getCopiedBfColumns(); } else { schema = table.getBaseSchema(); diff --git a/fe/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java index 724bddd42f..570ccc4688 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java @@ -31,7 +31,7 @@ import java.util.List; import java.util.Set; /* - * SHOW PROC /dbs/dbId/tableId/index_schema/"index name" + * SHOW PROC /dbs/dbId/tableId/index_schema/indexId" * show index schema */ public class IndexSchemaProcNode implements ProcNodeInterface { diff --git a/fe/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java index 747bf90921..6ff912155a 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java @@ -34,7 +34,7 @@ public class ReplicasProcNode implements ProcNodeInterface { .add("ReplicaId").add("BackendId").add("Version").add("VersionHash") .add("LstSuccessVersion").add("LstSuccessVersionHash") .add("LstFailedVersion").add("LstFailedVersionHash") - .add("LstFailedTime").add("DataSize").add("RowCount").add("State") + .add("LstFailedTime").add("SchemaHash").add("DataSize").add("RowCount").add("State") .add("VersionCount").add("PathHash") .build(); @@ -59,6 +59,7 @@ public class ReplicasProcNode implements ProcNodeInterface { String.valueOf(replica.getLastFailedVersion()), String.valueOf(replica.getLastFailedVersionHash()), TimeUtils.longToTimeString(replica.getLastFailedTimestamp()), + String.valueOf(replica.getSchemaHash()), String.valueOf(replica.getDataSize()), String.valueOf(replica.getRowCount()), String.valueOf(replica.getState()), diff --git a/fe/src/main/java/org/apache/doris/common/proc/RollupProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/RollupProcDir.java index 8b773b95fc..42c31523c6 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/RollupProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/RollupProcDir.java @@ -32,9 +32,9 @@ import java.util.List; public class RollupProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("JobId").add("TableName").add("TransactionId").add("CreateTime").add("FinishedTime") - .add("BaseIndexName").add("RollupIndexName").add("State").add("Msg") - .add("Progress") + .add("JobId").add("TableName").add("CreateTime").add("FinishedTime") + .add("BaseIndexName").add("RollupIndexName").add("RollupId").add("TransactionId") + .add("State").add("Msg") .add("Progress") .build(); private RollupHandler rollupHandler; diff --git a/fe/src/main/java/org/apache/doris/common/proc/SchemaChangeProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/SchemaChangeProcNode.java index a12a392355..3ec874aa8c 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/SchemaChangeProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/SchemaChangeProcNode.java @@ -29,9 +29,9 @@ import java.util.List; public class SchemaChangeProcNode implements ProcNodeInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("JobId").add("TableName").add("TransactionId").add("CreateTime").add("FinishTime") - .add("IndexName").add("IndexState").add("State").add("Msg") - .add("Progress") + .add("JobId").add("TableName").add("CreateTime").add("FinishTime") + .add("IndexName").add("IndexId").add("SchemaVersion").add("IndexState") + .add("TransactionId").add("State").add("Progress").add("Msg") .build(); private SchemaChangeHandler schemaChangeHandler;