Fix some bugs of alter table operation (#550)

1. Fix bug that failed to query restored table after schema change.
2. Fix bug that failed to add rollup to restored table.
3. Optimize the info of SHOW ALTER TABLE stmt.
4. Optimize the info of some PROCs.
5. Optimize the tablet checker to avoid adding too much task to scheduler.
This commit is contained in:
Mingyu Chen
2019-01-17 15:17:51 +08:00
committed by GitHub
parent 5cb1c161a4
commit d15bc83de0
22 changed files with 262 additions and 147 deletions

View File

@ -15,6 +15,7 @@
OK: replica 处于健康状态
DEAD: replica 所在 Backend 不可用
VERSION_ERROR: replica 数据版本有缺失
SCHEMA_ERROR: replica 的 schema hash 不正确
MISSING: replica 不存在
## example

View File

@ -716,8 +716,9 @@
SHOW DATA [FROM db_name[.table_name]];
说明:
如果不指定 FROM 子句,使用展示当前 db 下细分到各个 table 的数据量
如果指定 FROM 子句,则展示 table 下细分到各个 index 的数据量
1. 如果不指定 FROM 子句,使用展示当前 db 下细分到各个 table 的数据量
2. 如果指定 FROM 子句,则展示 table 下细分到各个 index 的数据量
3. 如果想查看各个 Partition 的大小,请参阅 help show partitions
## example
1. 展示默认 db 的各个 table 的数据量及汇总数据量
@ -743,7 +744,7 @@
SHOW PARTITIONS FROM example_db.table_name PARTITION p1;
## keyword
SHOW,PARTITION
SHOW,PARTITIONS
# SHOW TABLET
## description

View File

@ -67,7 +67,7 @@ public abstract class AlterHandler extends Daemon {
}
public AlterHandler(String name) {
super(name);
super(name, 20000);
}
protected void addAlterJob(AlterJob alterJob) {

View File

@ -359,9 +359,10 @@ public class RollupHandler extends AlterHandler {
// the new replica's init version is -1 until finished history rollup
Replica rollupReplica = new Replica(rollupReplicaId, backendId, rollupSchemaHash,
ReplicaState.ROLLUP);
// new replica's last failed version is equal to the partition's next version - 1
// has to set failed verison and version hash here, because there will be no load after rollup
// so that if not set here, last failed version will not be set
// new replica's last failed version should be set to the partition's next version - 1,
// if all go well, the last failed version will be overwritten when rollup task finished and update
// replica version info.
// If not set, there is no other way to know that this replica has failed version.
rollupReplica.updateVersionInfo(rollupReplica.getVersion(), rollupReplica.getVersionHash(),
partition.getCommittedVersion(), partition.getCommittedVersionHash(),
rollupReplica.getLastSuccessVersion(), rollupReplica.getLastSuccessVersionHash());

View File

@ -502,7 +502,7 @@ public class RollupJob extends AlterJob {
}
this.state = JobState.CANCELLED;
if (!Strings.isNullOrEmpty(cancelMsg) && !Strings.isNullOrEmpty(msg)) {
if (Strings.isNullOrEmpty(cancelMsg) && !Strings.isNullOrEmpty(msg)) {
this.cancelMsg = msg;
}
@ -592,7 +592,6 @@ public class RollupJob extends AlterJob {
long rowCount = finishTabletInfo.getRow_count();
// yiguolei: not check version here because the replica's first version will be set by rollup job
// the version is not set now
// the finish task thread doesn't own db lock here, maybe a bug?
rollupReplica.updateVersionInfo(version, versionHash, dataSize, rowCount);
setReplicaFinished(partitionId, rollupReplicaId);
@ -654,7 +653,8 @@ public class RollupJob extends AlterJob {
errorReplicas.add(replica);
} else if (replica.getLastFailedVersion() > 0
&& !partitionIdToUnfinishedReplicaIds.get(partitionId).contains(replica.getId())) {
// if the replica is finished history data, but failed during load, then it is a abnormal
// if the replica has finished converting history data,
// but failed during load, then it is a abnormal.
// remove it from replica set
// have to use delete replica, it will remove it from tablet inverted index
LOG.warn("replica [{}] last failed version > 0 and have finished history rollup job,"
@ -670,7 +670,8 @@ public class RollupJob extends AlterJob {
}
if (rollupTablet.getReplicas().size() < (expectReplicationNum / 2 + 1)) {
cancelMsg = String.format("rollup job[%d] cancelled. tablet[%d] has few health replica."
cancelMsg = String.format(
"rollup job[%d] cancelled. rollup tablet[%d] has few health replica."
+ " num: %d", tableId, rollupTablet.getId(), replicas.size());
LOG.warn(cancelMsg);
return -1;
@ -951,8 +952,11 @@ public class RollupJob extends AlterJob {
db.writeUnlock();
}
List<Integer> list = new ArrayList<>();
Integer[] arr = list.toArray(new Integer[0]);
this.finishedTime = System.currentTimeMillis();
LOG.info("finished schema change job: {}", tableId);
LOG.info("finished rollup job: {}", tableId);
}
@Override
@ -965,9 +969,6 @@ public class RollupJob extends AlterJob {
// table name
jobInfo.add(tbl.getName());
// transactionid
jobInfo.add(transactionId);
// create time
jobInfo.add(TimeUtils.longToTimeString(createTime));
@ -976,6 +977,13 @@ public class RollupJob extends AlterJob {
// base index and rollup index name
jobInfo.add(baseIndexName);
jobInfo.add(rollupIndexName);
// rollup id
jobInfo.add(rollupIndexId);
// transaction id
jobInfo.add(transactionId);
// job state
jobInfo.add(state.name());

View File

@ -533,7 +533,7 @@ public class SchemaChangeJob extends AlterJob {
}
this.state = JobState.CANCELLED;
if (!Strings.isNullOrEmpty(cancelMsg) && !Strings.isNullOrEmpty(msg)) {
if (Strings.isNullOrEmpty(cancelMsg) && !Strings.isNullOrEmpty(msg)) {
this.cancelMsg = msg;
}
@ -812,6 +812,7 @@ public class SchemaChangeJob extends AlterJob {
this.replicaInfos.put(partitionId, replicaInfo);
replica.setState(ReplicaState.NORMAL);
replica.setSchemaHash(schemaHash);
// remove tasks for safety
AgentTaskQueue.removeTask(replica.getBackendId(), TTaskType.SCHEMA_CHANGE,
@ -892,16 +893,12 @@ public class SchemaChangeJob extends AlterJob {
@Override
public synchronized void clear() {
changedIndexIdToSchema = null;
changedIndexIdToSchemaVersion = null;
changedIndexIdToSchemaHash = null;
changedIndexIdToShortKeyColumnCount = null;
resourceInfo = null;
replicaInfos = null;
unfinishedReplicaIds = null;
indexIdToTotalReplicaNum = null;
indexIdToFinishedReplicaNum = null;
partitionIdToFinishedIndexIds = null;
// backendIdToReplicaIds = null;
}
@Override
@ -941,7 +938,6 @@ public class SchemaChangeJob extends AlterJob {
// reset status to PENDING for resending the tasks in polling thread
this.state = JobState.PENDING;
LOG.info("just trace", new Exception());
} finally {
db.writeUnlock();
}
@ -1090,67 +1086,86 @@ public class SchemaChangeJob extends AlterJob {
@Override
public void getJobInfo(List<List<Comparable>> jobInfos, OlapTable tbl) {
if (state == JobState.FINISHED || state == JobState.CANCELLED) {
List<Comparable> jobInfo = new ArrayList<Comparable>();
jobInfo.add(tableId);
jobInfo.add(tbl.getName());
jobInfo.add(transactionId);
jobInfo.add(TimeUtils.longToTimeString(createTime));
jobInfo.add(TimeUtils.longToTimeString(finishedTime));
jobInfo.add("N/A");
jobInfo.add("N/A");
jobInfo.add(state.name());
jobInfo.add(cancelMsg);
jobInfo.add("N/A");
if (changedIndexIdToSchema == null) {
// for compatibility
if (state == JobState.FINISHED || state == JobState.CANCELLED) {
List<Comparable> jobInfo = new ArrayList<Comparable>();
jobInfo.add(tableId);
jobInfo.add(tbl.getName());
jobInfo.add(transactionId);
jobInfo.add(TimeUtils.longToTimeString(createTime));
jobInfo.add(TimeUtils.longToTimeString(finishedTime));
jobInfo.add("N/A");
jobInfo.add("N/A");
jobInfo.add(state.name());
jobInfo.add(cancelMsg);
jobInfo.add("N/A");
jobInfos.add(jobInfo);
jobInfos.add(jobInfo);
return;
}
// in previous version, changedIndexIdToSchema is set to null
// when job is finished or cancelled.
// so if changedIndexIdToSchema == null, the job'state must be FINISHED or CANCELLED
return;
}
// calc progress and state for each table
Map<Long, String> indexProgress = new HashMap<Long, String>();
Map<Long, String> indexState = new HashMap<Long, String>();
for (Long indexId : getChangedIndexToSchema().keySet()) {
// calc progress and state for each table
for (Long indexId : changedIndexIdToSchemaVersion.keySet()) {
int totalReplicaNum = 0;
int finishedReplicaNum = 0;
String idxState = IndexState.NORMAL.name();
for (Partition partition : tbl.getPartitions()) {
MaterializedIndex index = partition.getIndex(indexId);
int tableReplicaNum = getTotalReplicaNumByIndexId(indexId);
int tableFinishedReplicaNum = getFinishedReplicaNumByIndexId(indexId);
Preconditions.checkState(!(tableReplicaNum == 0 && tableFinishedReplicaNum == -1));
Preconditions.checkState(tableFinishedReplicaNum <= tableReplicaNum,
tableFinishedReplicaNum + "/" + tableReplicaNum);
totalReplicaNum += tableReplicaNum;
finishedReplicaNum += tableFinishedReplicaNum;
if (state == JobState.RUNNING) {
int tableReplicaNum = getTotalReplicaNumByIndexId(indexId);
int tableFinishedReplicaNum = getFinishedReplicaNumByIndexId(indexId);
Preconditions.checkState(!(tableReplicaNum == 0 && tableFinishedReplicaNum == -1));
Preconditions.checkState(tableFinishedReplicaNum <= tableReplicaNum,
tableFinishedReplicaNum + "/" + tableReplicaNum);
totalReplicaNum += tableReplicaNum;
finishedReplicaNum += tableFinishedReplicaNum;
}
if (index.getState() != IndexState.NORMAL) {
idxState = index.getState().name();
}
}
if (Catalog.getInstance().isMaster()
&& (state == JobState.RUNNING || state == JobState.FINISHED)) {
indexState.put(indexId, idxState);
if (Catalog.getInstance().isMaster() && state == JobState.RUNNING && totalReplicaNum != 0) {
indexProgress.put(indexId, (finishedReplicaNum * 100 / totalReplicaNum) + "%");
indexState.put(indexId, idxState);
} else {
indexProgress.put(indexId, "0%");
indexState.put(indexId, idxState);
}
}
for (Long indexId : getChangedIndexToSchema().keySet()) {
for (Long indexId : changedIndexIdToSchemaVersion.keySet()) {
List<Comparable> jobInfo = new ArrayList<Comparable>();
jobInfo.add(tableId);
jobInfo.add(tbl.getName());
jobInfo.add(transactionId);
jobInfo.add(TimeUtils.longToTimeString(createTime));
jobInfo.add(TimeUtils.longToTimeString(finishedTime));
jobInfo.add(tbl.getIndexNameById(indexId));
jobInfo.add(indexState.get(indexId));
jobInfo.add(state.name());
jobInfo.add(tbl.getIndexNameById(indexId)); // index name
jobInfo.add(indexId);
// index schema version and schema hash
jobInfo.add(changedIndexIdToSchemaVersion.get(indexId) + "-" + changedIndexIdToSchemaHash.get(indexId));
jobInfo.add(indexState.get(indexId)); // index state
jobInfo.add(transactionId);
jobInfo.add(state.name()); // job state
if (state == JobState.RUNNING) {
jobInfo.add(indexProgress.get(indexId) == null ? "N/A" : indexProgress.get(indexId)); // progress
}
jobInfo.add(cancelMsg);
jobInfo.add(indexProgress.get(indexId));
jobInfos.add(jobInfo);
} // end for indexIds
@ -1165,30 +1180,36 @@ public class SchemaChangeJob extends AlterJob {
// 'unfinishedReplicaIds', 'indexIdToTotalReplicaNum' and 'indexIdToFinishedReplicaNum'
// don't need persist. build it when send tasks
// columns
if (changedIndexIdToSchema != null) {
out.writeBoolean(true);
out.writeInt(changedIndexIdToSchema.size());
for (Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
long indexId = entry.getKey();
out.writeLong(indexId);
out.writeLong(entry.getKey());
out.writeInt(entry.getValue().size());
for (Column column : entry.getValue()) {
column.write(out);
}
// schema version
out.writeInt(changedIndexIdToSchemaVersion.get(indexId));
// schema hash
out.writeInt(changedIndexIdToSchemaHash.get(indexId));
// short key column count
out.writeShort(changedIndexIdToShortKeyColumnCount.get(indexId));
}
} else {
out.writeBoolean(false);
}
// schema version and hash, and short key
if (changedIndexIdToSchemaVersion != null) {
out.writeBoolean(true);
out.writeInt(changedIndexIdToSchemaVersion.size());
for (Entry<Long, Integer> entry : changedIndexIdToSchemaVersion.entrySet()) {
out.writeLong(entry.getKey());
// schema version
out.writeInt(entry.getValue());
// schema hash
out.writeInt(changedIndexIdToSchemaHash.get(entry.getKey()));
// short key column count
out.writeShort(changedIndexIdToShortKeyColumnCount.get(entry.getKey()));
}
}
// replicaInfos is saving for restoring schemaChangeJobFinished
if (replicaInfos != null) {
out.writeBoolean(true);
@ -1232,32 +1253,58 @@ public class SchemaChangeJob extends AlterJob {
tableName = Text.readString(in);
boolean has = in.readBoolean();
if (has) {
int count = in.readInt();
for (int i = 0; i < count; i++) {
long indexId = in.readLong();
int columnNum = in.readInt();
List<Column> columns = new LinkedList<Column>();
for (int j = 0; j < columnNum; j++) {
Column column = Column.read(in);
columns.add(column);
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_48) {
if (in.readBoolean()) {
int count = in.readInt();
for (int i = 0; i < count; i++) {
long indexId = in.readLong();
int columnNum = in.readInt();
List<Column> columns = new LinkedList<Column>();
for (int j = 0; j < columnNum; j++) {
Column column = Column.read(in);
columns.add(column);
}
changedIndexIdToSchema.put(indexId, columns);
// schema version
changedIndexIdToSchemaVersion.put(indexId, in.readInt());
// schema hash
changedIndexIdToSchemaHash.put(indexId, in.readInt());
// short key column count
changedIndexIdToShortKeyColumnCount.put(indexId, in.readShort());
}
changedIndexIdToSchema.put(indexId, columns);
}
} else {
// columns
if (in.readBoolean()) {
int count = in.readInt();
for (int i = 0; i < count; i++) {
long indexId = in.readLong();
int columnNum = in.readInt();
List<Column> columns = new LinkedList<Column>();
for (int j = 0; j < columnNum; j++) {
Column column = Column.read(in);
columns.add(column);
}
changedIndexIdToSchema.put(indexId, columns);
}
}
// schema version
changedIndexIdToSchemaVersion.put(indexId, in.readInt());
// schema hash
changedIndexIdToSchemaHash.put(indexId, in.readInt());
// short key column count
changedIndexIdToShortKeyColumnCount.put(indexId, in.readShort());
// schema version and hash, and short key
if (in.readBoolean()) {
int count = in.readInt();
for (int i = 0; i < count; i++) {
long indexId = in.readLong();
// schema version
changedIndexIdToSchemaVersion.put(indexId, in.readInt());
// schema hash
changedIndexIdToSchemaHash.put(indexId, in.readInt());
// short key column count
changedIndexIdToShortKeyColumnCount.put(indexId, in.readShort());
}
}
}
has = in.readBoolean();
if (has) {
if (in.readBoolean()) {
int count = in.readInt();
for (int i = 0; i < count; ++i) {
long partitionId = in.readLong();

View File

@ -20,8 +20,8 @@ package org.apache.doris.analysis;
import org.apache.doris.analysis.BinaryPredicate.Operator;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Replica.ReplicaStatus;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
@ -40,7 +40,7 @@ import java.util.List;
public class AdminShowReplicaStatusStmt extends ShowStmt {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("TabletId").add("ReplicaId").add("BackendId").add("Version").add("LastFailedVersion")
.add("LastSuccessVersion").add("CommittedVersion").add("VersionNum")
.add("LastSuccessVersion").add("CommittedVersion").add("SchemaHash").add("VersionNum")
.add("State").add("Status")
.build();
@ -83,7 +83,7 @@ public class AdminShowReplicaStatusStmt extends ShowStmt {
if (!analyzeWhere()) {
throw new AnalysisException(
"Where clause should looks like: status =/!= 'OK/DEAD/VERSION_ERROR/MISSING'");
"Where clause should looks like: status =/!= 'OK/DEAD/VERSION_ERROR/SCHEMA_ERROR/MISSING'");
}
}

View File

@ -266,7 +266,7 @@ public class CreateTableStmt extends DdlStmt {
rowLengthBytes += columnDef.getType().getStorageLayoutBytes();
}
if (rowLengthBytes > Config.max_layout_length_per_row) {
if (rowLengthBytes > Config.max_layout_length_per_row && engineName.equals("olap")) {
throw new AnalysisException("The size of a row (" + rowLengthBytes + ") exceed the maximal row size: "
+ Config.max_layout_length_per_row);
}

View File

@ -19,10 +19,10 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.common.AnalysisException;
@ -117,7 +117,7 @@ public class DescribeStmt extends ShowStmt {
if (!isAllTables) {
// show base table schema only
String procString = "/dbs/" + db.getId() + "/" + table.getId() + "/" + TableProcDir.INDEX_SCHEMA
+ "/" + table.getName();
+ "/" + table.getId();
node = ProcService.getInstance().open(procString);
if (node == null) {

View File

@ -644,7 +644,7 @@ public class BackupJob extends AbstractJob {
Collections.sort(replicaIds);
for (Long replicaId : replicaIds) {
Replica replica = tablet.getReplicaById(replicaId);
if (replica.getLastFailedVersion() <= 0 && (replica.getVersion() > visibleVersion
if (replica.getLastFailedVersion() < 0 && (replica.getVersion() > visibleVersion
|| (replica.getVersion() == visibleVersion && replica.getVersionHash() == visibleVersionHash))) {
return replica;
}

View File

@ -502,7 +502,7 @@ public class RestoreJob extends AbstractJob {
+ " in table " + localTbl.getName()
+ " has different replication num '"
+ localRangePartInfo.getReplicationNum(localPartition.getId())
+ "' with parition in repository");
+ "' with parition in repository, which is " + restoreReplicationNum);
return;
}
genFileMapping(localOlapTbl, localPartition, tblInfo.id, backupPartInfo,
@ -524,7 +524,7 @@ public class RestoreJob extends AbstractJob {
+ " in table " + localTbl.getName()
+ " has different replication num '"
+ localPartInfo.getReplicationNum(localPartition.getId())
+ "' with parition in repository");
+ "' with parition in repository, which is " + restoreReplicationNum);
return;
}
@ -723,7 +723,7 @@ public class RestoreJob extends AbstractJob {
Range<PartitionKey> remoteRange = remotePartitionInfo.getRange(remotePartId);
DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId);
localPartitionInfo.addPartition(restoredPart.getId(), remoteRange,
remoteDataProperty, (short) restoreReplicationNum);
remoteDataProperty, (short) restoreReplicationNum);
localTbl.addPartition(restoredPart);
}
@ -1116,7 +1116,7 @@ public class RestoreJob extends AbstractJob {
state = RestoreJobState.DOWNLOADING;
// No log here
// No edit log here
LOG.info("finished to send download tasks to BE. num: {}. {}", batchTask.getTaskNum(), this);
return;
}
@ -1205,8 +1205,8 @@ public class RestoreJob extends AbstractJob {
continue;
}
// update partition committed version
part.updateVisibleVersionAndVersionHash(entry.getValue().first, entry.getValue().second);
// update partition visible version
part.updateVersionForRestore(entry.getValue().first, entry.getValue().second);
// we also need to update the replica version of these overwritten restored partitions
for (MaterializedIndex idx : part.getMaterializedIndices()) {

View File

@ -3791,8 +3791,8 @@ public class Catalog {
// 1. storage type
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE).append("\" = \"");
TStorageType storageType = olapTable
.getStorageTypeByIndexId(olapTable.getIndexIdByName(olapTable.getName()));
TStorageType storageType = olapTable.getStorageTypeByIndexId(
olapTable.getIndexIdByName(olapTable.getName()));
sb.append(storageType.name()).append("\"");
// 2. bloom filter
@ -3919,6 +3919,7 @@ public class Catalog {
}
createTableStmt.add(sb.toString());
// 2. add partition
if (separatePartition && (table instanceof OlapTable)
&& ((OlapTable) table).getPartitionInfo().getType() == PartitionType.RANGE

View File

@ -80,6 +80,7 @@ public class MetadataViewer {
short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId());
for (MaterializedIndex index : partition.getMaterializedIndices()) {
int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
int count = replicationNum;
@ -91,11 +92,12 @@ public class MetadataViewer {
Backend be = infoService.getBackend(replica.getBackendId());
if (be == null || !be.isAvailable()) {
status = ReplicaStatus.DEAD;
} else {
if (replica.getVersion() < visibleVersion
} else if (replica.getVersion() < visibleVersion
|| replica.getLastFailedVersion() > 0) {
status = ReplicaStatus.VERSION_ERROR;
}
} else if (replica.getSchemaHash() != -1 && replica.getSchemaHash() != schemaHash) {
status = ReplicaStatus.SCHEMA_ERROR;
}
if (filterReplica(status, statusFilter, op)) {
@ -109,6 +111,7 @@ public class MetadataViewer {
row.add(String.valueOf(replica.getLastFailedVersion()));
row.add(String.valueOf(replica.getLastSuccessVersion()));
row.add(String.valueOf(visibleVersion));
row.add(String.valueOf(replica.getSchemaHash()));
row.add(String.valueOf(replica.getVersionCount()));
row.add(replica.getState().name());
row.add(status.name());

View File

@ -24,6 +24,9 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Util;
import org.apache.doris.meta.MetaContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@ -37,6 +40,8 @@ import java.util.Map.Entry;
* Internal representation of partition-related metadata.
*/
public class Partition extends MetaObject implements Writable {
private static final Logger LOG = LogManager.getLogger(Partition.class);
public static final long PARTITION_INIT_VERSION = 1L;
public static final long PARTITION_INIT_VERSION_HASH = 0L;
@ -111,6 +116,20 @@ public class Partition extends MetaObject implements Writable {
this.state = state;
}
/*
* If a partition is overwritten by a restore job, we need to reset all version info to
* the restored partition version info》
*/
public void updateVersionForRestore(long visibleVersion, long visibleVersionHash) {
this.visibleVersion = visibleVersion;
this.visibleVersionHash = visibleVersionHash;
this.nextVersion = this.visibleVersion + 1;
this.nextVersionHash = Util.generateVersionHash();
this.committedVersionHash = visibleVersionHash;
LOG.info("update partition {} version for restore: visible: {}-{}, next: {}-{}",
visibleVersion, visibleVersionHash, nextVersion, nextVersionHash);
}
public void updateVisibleVersionAndVersionHash(long visibleVersion, long visibleVersionHash) {
this.visibleVersion = visibleVersion;
this.visibleVersionHash = visibleVersionHash;

View File

@ -48,7 +48,8 @@ public class Replica implements Writable {
OK, // health
DEAD, // backend is not available
VERSION_ERROR, // missing version
MISSING // replica does not exist
MISSING, // replica does not exist
SCHEMA_ERROR // replica's schema hash does not equal to index's schema hash
}
private long id;
@ -57,8 +58,8 @@ public class Replica implements Writable {
private long versionHash;
private int schemaHash = -1;
private long dataSize;
private long rowCount;
private long dataSize = 0;
private long rowCount = 0;
private ReplicaState state;
private long lastFailedVersion = -1L;
@ -218,6 +219,9 @@ public class Replica implements Writable {
long lastFailedVersion, long lastFailedVersionHash,
long lastSuccessVersion, long lastSuccessVersionHash,
long newDataSize, long newRowCount) {
LOG.debug("before update: {}", this.toString());
if (newVersion < this.version) {
LOG.warn("replica[" + id + "] new version is lower than meta version. " + newVersion + " vs " + version);
// yiguolei: could not find any reason why new version less than this.version should run???
@ -282,7 +286,7 @@ public class Replica implements Writable {
}
}
LOG.debug("update {}", this.toString());
LOG.debug("after update {}", this.toString());
}
public synchronized void updateLastFailedVersion(long lastFailedVersion, long lastFailedVersionHash) {

View File

@ -28,6 +28,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.TabletScheduler.AddResult;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Daemon;
@ -57,10 +58,6 @@ public class TabletChecker extends Daemon {
private static final long CHECK_INTERVAL_MS = 20 * 1000L; // 20 second
// if the number of scheduled tablets in TabletScheduler exceed this threshold
// skip checking.
private static final int MAX_SCHEDULING_TABLETS = 5000;
private Catalog catalog;
private SystemInfoService infoService;
private TabletScheduler tabletScheduler;
@ -156,10 +153,12 @@ public class TabletChecker extends Daemon {
*/
@Override
protected void runOneCycle() {
if (tabletScheduler.getPendingNum() > MAX_SCHEDULING_TABLETS
|| tabletScheduler.getRunningNum() > MAX_SCHEDULING_TABLETS) {
int pendingNum = tabletScheduler.getPendingNum();
int runningNum = tabletScheduler.getRunningNum();
if (pendingNum > TabletScheduler.MAX_SCHEDULING_TABLETS
|| runningNum > TabletScheduler.MAX_SCHEDULING_TABLETS) {
LOG.info("too many tablets are being scheduled. pending: {}, running: {}, limit: {}. skip check",
tabletScheduler.getPendingNum(), tabletScheduler.getRunningNum(), MAX_SCHEDULING_TABLETS);
pendingNum, runningNum, TabletScheduler.MAX_SCHEDULING_TABLETS);
return;
}
@ -178,7 +177,7 @@ public class TabletChecker extends Daemon {
long addToSchedulerTabletNum = 0;
List<Long> dbIds = catalog.getDbIds();
for (Long dbId : dbIds) {
OUT: for (Long dbId : dbIds) {
Database db = catalog.getDb(dbId);
if (db == null) {
continue;
@ -237,11 +236,16 @@ public class TabletChecker extends Daemon {
System.currentTimeMillis());
tabletCtx.setOrigPriority(statusWithPrio.second);
if (tabletScheduler.addTablet(tabletCtx, false /* not force */)) {
AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */);
if (res == AddResult.LIMIT_EXCEED) {
LOG.info("number of scheduling tablets in tablet scheduler"
+ " exceed to limit. stop tablet checker");
break OUT;
} else if (res == AddResult.ADDED) {
addToSchedulerTabletNum++;
}
}
}
} // indices
if (prioPartIsHealthy && isInPrios) {
// if all replicas in this partition are healthy, remove this partition from
@ -250,8 +254,8 @@ public class TabletChecker extends Daemon {
db.getId(), olapTbl.getId(), partition.getId());
removePrios(db.getId(), olapTbl.getId(), Lists.newArrayList(partition.getId()));
}
}
}
} // partitions
} // tables
} finally {
db.readUnlock();
}

View File

@ -91,6 +91,10 @@ public class TabletScheduler extends Daemon {
public static final int BALANCE_SLOT_NUM_FOR_PATH = 2;
// if the number of scheduled tablets in TabletScheduler exceed this threshold
// skip checking.
public static final int MAX_SCHEDULING_TABLETS = 5000;
/*
* Tablet is added to pendingTablets as well it's id in allTabletIds.
* TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletIds when
@ -121,6 +125,13 @@ public class TabletScheduler extends Daemon {
private SystemInfoService infoService;
private TabletInvertedIndex invertedIndex;
private TabletSchedulerStat stat;
// result of adding a tablet to pendingTablets
public enum AddResult {
ADDED, // success to add
ALREADY_IN, // already added, skip
LIMIT_EXCEED // number of pending tablets exceed the limit
}
public TabletScheduler(Catalog catalog, SystemInfoService infoService, TabletInvertedIndex invertedIndex,
TabletSchedulerStat stat) {
@ -188,14 +199,22 @@ public class TabletScheduler extends Daemon {
* add a ready-to-be-scheduled tablet to pendingTablets, if it has not being added before.
* if force is true, do not check if tablet is already added before.
*/
public synchronized boolean addTablet(TabletSchedCtx tablet, boolean force) {
public synchronized AddResult addTablet(TabletSchedCtx tablet, boolean force) {
if (!force && containsTablet(tablet.getTabletId())) {
LOG.info("balance is disabled, skip");
return false;
return AddResult.ALREADY_IN;
}
// if this is not a BALANCE task, and not a force add,
// and number of scheduling tablets exceed the limit,
// refuse to add.
if (tablet.getType() != TabletSchedCtx.Type.BALANCE && !force
&& (pendingTablets.size() > MAX_SCHEDULING_TABLETS || runningTablets.size() > MAX_SCHEDULING_TABLETS)) {
return AddResult.LIMIT_EXCEED;
}
allTabletIds.add(tablet.getTabletId());
pendingTablets.offer(tablet);
return true;
return AddResult.ADDED;
}
public synchronized boolean containsTablet(long tabletId) {
@ -705,6 +724,7 @@ public class TabletScheduler extends Daemon {
*/
private void selectTabletsForBalance() {
if (Config.disable_balance) {
LOG.info("balance is disabled. skip selecting tablets for balance");
return;
}

View File

@ -39,8 +39,8 @@ import java.util.Set;
*/
public class IndexInfoProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("IndexName").add("SchemaVersion").add("SchemaHash").add("ShortKeyColumnCount")
.add("StorageType").add("Keys")
.add("IndexId").add("IndexName").add("SchemaVersion").add("SchemaHash")
.add("ShortKeyColumnCount").add("StorageType").add("Keys")
.build();
private Database db;
@ -91,15 +91,16 @@ public class IndexInfoProcDir implements ProcDirInterface {
}
builder.append(Joiner.on(", ").join(columnNames)).append(")");
result.addRow(Lists.newArrayList(indexName,
String.valueOf(schemaVersion),
String.valueOf(schemaHash),
String.valueOf(shortKeyColumnCount),
storageType.name(),
builder.toString()));
result.addRow(Lists.newArrayList(String.valueOf(indexId),
indexName,
String.valueOf(schemaVersion),
String.valueOf(schemaHash),
String.valueOf(shortKeyColumnCount),
storageType.name(),
builder.toString()));
}
} else {
result.addRow(Lists.newArrayList(table.getName(), "", "", "", "", ""));
result.addRow(Lists.newArrayList("-1", table.getName(), "", "", "", "", ""));
}
return result;
@ -114,23 +115,27 @@ public class IndexInfoProcDir implements ProcDirInterface {
}
@Override
public ProcNodeInterface lookup(String indexName) throws AnalysisException {
public ProcNodeInterface lookup(String idxIdStr) throws AnalysisException {
Preconditions.checkNotNull(db);
Preconditions.checkNotNull(table);
long idxId;
try {
idxId = Long.valueOf(idxIdStr);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid index id format: " + idxIdStr);
}
db.readLock();
try {
List<Column> schema = null;
Set<String> bfColumns = null;
if (table.getType() == TableType.OLAP) {
OlapTable olapTable = (OlapTable) table;
Long indexId = olapTable.getIndexIdByName(indexName);
if (indexId == null) {
throw new AnalysisException("Index[" + indexName + "] does not exist in table["
+ table.getName() + "]");
schema = olapTable.getSchemaByIndexId(idxId);
if (schema == null) {
throw new AnalysisException("Index " + idxId + " does not exist");
}
schema = olapTable.getSchemaByIndexId(indexId);
bfColumns = olapTable.getCopiedBfColumns();
} else {
schema = table.getBaseSchema();

View File

@ -31,7 +31,7 @@ import java.util.List;
import java.util.Set;
/*
* SHOW PROC /dbs/dbId/tableId/index_schema/"index name"
* SHOW PROC /dbs/dbId/tableId/index_schema/indexId"
* show index schema
*/
public class IndexSchemaProcNode implements ProcNodeInterface {

View File

@ -34,7 +34,7 @@ public class ReplicasProcNode implements ProcNodeInterface {
.add("ReplicaId").add("BackendId").add("Version").add("VersionHash")
.add("LstSuccessVersion").add("LstSuccessVersionHash")
.add("LstFailedVersion").add("LstFailedVersionHash")
.add("LstFailedTime").add("DataSize").add("RowCount").add("State")
.add("LstFailedTime").add("SchemaHash").add("DataSize").add("RowCount").add("State")
.add("VersionCount").add("PathHash")
.build();
@ -59,6 +59,7 @@ public class ReplicasProcNode implements ProcNodeInterface {
String.valueOf(replica.getLastFailedVersion()),
String.valueOf(replica.getLastFailedVersionHash()),
TimeUtils.longToTimeString(replica.getLastFailedTimestamp()),
String.valueOf(replica.getSchemaHash()),
String.valueOf(replica.getDataSize()),
String.valueOf(replica.getRowCount()),
String.valueOf(replica.getState()),

View File

@ -32,9 +32,9 @@ import java.util.List;
public class RollupProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("JobId").add("TableName").add("TransactionId").add("CreateTime").add("FinishedTime")
.add("BaseIndexName").add("RollupIndexName").add("State").add("Msg")
.add("Progress")
.add("JobId").add("TableName").add("CreateTime").add("FinishedTime")
.add("BaseIndexName").add("RollupIndexName").add("RollupId").add("TransactionId")
.add("State").add("Msg") .add("Progress")
.build();
private RollupHandler rollupHandler;

View File

@ -29,9 +29,9 @@ import java.util.List;
public class SchemaChangeProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("JobId").add("TableName").add("TransactionId").add("CreateTime").add("FinishTime")
.add("IndexName").add("IndexState").add("State").add("Msg")
.add("Progress")
.add("JobId").add("TableName").add("CreateTime").add("FinishTime")
.add("IndexName").add("IndexId").add("SchemaVersion").add("IndexState")
.add("TransactionId").add("State").add("Progress").add("Msg")
.build();
private SchemaChangeHandler schemaChangeHandler;