[Feature][Meta]Update/Read/Write VisibleVersionTime for Partition#4076 (#4086)

#4076 
1. The visibleVersionTime is updated when insert data to partition
2. GlobalTransactionMgr call partition.updateVisibleVersionAndVersionHash(version, versionHash) when fe is restarted
3. If fe restart, VisibleVersionTime may be changed, but the changed value is newer than the old value
This commit is contained in:
HaiBo Li
2020-07-26 21:20:55 +08:00
committed by GitHub
parent 1f7009354a
commit ed8cb6a002
6 changed files with 84 additions and 31 deletions

View File

@ -92,6 +92,8 @@ public class Partition extends MetaObject implements Writable {
private long committedVersionHash;
@SerializedName(value = "visibleVersion")
private long visibleVersion;
@SerializedName(value = "visibleVersionTime")
private long visibleVersionTime;
@SerializedName(value = "visibleVersionHash")
private long visibleVersionHash;
@SerializedName(value = "nextVersion")
@ -113,6 +115,7 @@ public class Partition extends MetaObject implements Writable {
this.baseIndex = baseIndex;
this.visibleVersion = PARTITION_INIT_VERSION;
this.visibleVersionTime = System.currentTimeMillis();
this.visibleVersionHash = PARTITION_INIT_VERSION_HASH;
// PARTITION_INIT_VERSION == 1, so the first load version is 2 !!!
this.nextVersion = PARTITION_INIT_VERSION + 1;
@ -147,8 +150,7 @@ public class Partition extends MetaObject implements Writable {
* the restored partition version info》
*/
public void updateVersionForRestore(long visibleVersion, long visibleVersionHash) {
this.visibleVersion = visibleVersion;
this.visibleVersionHash = visibleVersionHash;
this.setVisibleVersion(visibleVersion, visibleVersionHash);
this.nextVersion = this.visibleVersion + 1;
this.nextVersionHash = Util.generateVersionHash();
this.committedVersionHash = visibleVersionHash;
@ -157,8 +159,11 @@ public class Partition extends MetaObject implements Writable {
}
public void updateVisibleVersionAndVersionHash(long visibleVersion, long visibleVersionHash) {
this.visibleVersion = visibleVersion;
this.visibleVersionHash = visibleVersionHash;
updateVisibleVersionAndVersionHash(visibleVersion, System.currentTimeMillis(), visibleVersionHash);
}
public void updateVisibleVersionAndVersionHash(long visibleVersion, long visibleVersionTime, long visibleVersionHash) {
this.setVisibleVersion(visibleVersion, visibleVersionTime, visibleVersionHash);
if (MetaContext.get() != null) {
// MetaContext is not null means we are in a edit log replay thread.
// if it is upgrade from old palo cluster, then should update next version info
@ -181,9 +186,26 @@ public class Partition extends MetaObject implements Writable {
return visibleVersion;
}
public long getVisibleVersionTime() {
return visibleVersionTime;
}
public long getVisibleVersionHash() {
return visibleVersionHash;
}
// The method updateVisibleVersionAndVersionHash is called when fe restart, the visibleVersionTime is updated
private void setVisibleVersion(long visibleVersion, long visibleVersionHash) {
this.visibleVersion = visibleVersion;
this.visibleVersionTime = System.currentTimeMillis();
this.visibleVersionHash = visibleVersionHash;
}
public void setVisibleVersion(long visibleVersion, long visibleVersionTime, long visibleVersionHash) {
this.visibleVersion = visibleVersion;
this.visibleVersionTime = visibleVersionTime;
this.visibleVersionHash = visibleVersionHash;
}
public PartitionState getState() {
return this.state;
@ -344,6 +366,7 @@ public class Partition extends MetaObject implements Writable {
}
out.writeLong(visibleVersion);
out.writeLong(visibleVersionTime);
out.writeLong(visibleVersionHash);
out.writeLong(nextVersion);
@ -379,6 +402,11 @@ public class Partition extends MetaObject implements Writable {
}
visibleVersion = in.readLong();
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_88) {
visibleVersionTime = in.readLong();
} else {
visibleVersionTime = System.currentTimeMillis();
}
visibleVersionHash = in.readLong();
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_45) {
nextVersion = in.readLong();

View File

@ -185,6 +185,8 @@ public final class FeMetaVersion {
public static final int VERSION_86 = 86;
// spark resource, resource privilege, broker file group for hive table
public static final int VERSION_87 = 87;
// add partition visibleVersionTime
public static final int VERSION_88 = 88;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_87;
public static final int VERSION_CURRENT = VERSION_88;
}

View File

@ -64,7 +64,8 @@ import java.util.stream.Collectors;
*/
public class PartitionsProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("PartitionId").add("PartitionName").add("VisibleVersion").add("VisibleVersionHash")
.add("PartitionId").add("PartitionName")
.add("VisibleVersion").add("VisibleVersionTime").add("VisibleVersionHash")
.add("State").add("PartitionKey").add("Range").add("DistributionKey")
.add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime")
.add("LastConsistencyCheckTime")
@ -231,6 +232,7 @@ public class PartitionsProcDir implements ProcDirInterface {
partitionInfo.add(partitionId);
partitionInfo.add(partitionName);
partitionInfo.add(partition.getVisibleVersion());
partitionInfo.add(TimeUtils.longToTimeString(partition.getVisibleVersionTime()));
partitionInfo.add(partition.getVisibleVersionHash());
partitionInfo.add(partition.getState());

View File

@ -815,8 +815,8 @@ public class DatabaseTransactionMgr {
OlapTable table = (OlapTable) db.getTable(tableId);
Partition partition = table.getPartition(partitionId);
PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId,
partition.getNextVersion(),
partition.getNextVersionHash());
partition.getNextVersion(), partition.getNextVersionHash(),
System.currentTimeMillis() /* use as partition visible time */);
tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
}
transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
@ -1285,8 +1285,9 @@ public class DatabaseTransactionMgr {
}
} // end for indices
long version = partitionCommitInfo.getVersion();
long versionTime = partitionCommitInfo.getVersionTime();
long versionHash = partitionCommitInfo.getVersionHash();
partition.updateVisibleVersionAndVersionHash(version, versionHash);
partition.updateVisibleVersionAndVersionHash(version, versionTime, versionHash);
if (LOG.isDebugEnabled()) {
LOG.debug("transaction state {} set partition {}'s version to [{}] and version hash to [{}]",
transactionState, partition.getId(), version, versionHash);

View File

@ -17,40 +17,57 @@
package org.apache.doris.transaction;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.doris.common.io.Writable;
public class PartitionCommitInfo implements Writable {
@SerializedName(value = "partitionId")
private long partitionId;
@SerializedName(value = "version")
private long version;
@SerializedName(value = "versionTime")
private long versionTime;
@SerializedName(value = "versionHash")
private long versionHash;
public PartitionCommitInfo() {
}
public PartitionCommitInfo(long partitionId, long version, long versionHash) {
public PartitionCommitInfo(long partitionId, long version, long versionHash, long visibleTime) {
super();
this.partitionId = partitionId;
this.version = version;
this.versionTime = visibleTime;
this.versionHash = versionHash;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(partitionId);
out.writeLong(version);
out.writeLong(versionHash);
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public void readFields(DataInput in) throws IOException {
partitionId = in.readLong();
version = in.readLong();
versionHash = in.readLong();
public static PartitionCommitInfo read(DataInput in) throws IOException {
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_88) {
long partitionId = in.readLong();
long version = in.readLong();
long versionHash = in.readLong();
return new PartitionCommitInfo(partitionId, version, versionHash, System.currentTimeMillis());
} else {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, PartitionCommitInfo.class);
}
}
public long getPartitionId() {
@ -60,6 +77,10 @@ public class PartitionCommitInfo implements Writable {
public long getVersion() {
return version;
}
public long getVersionTime() {
return versionTime;
}
public long getVersionHash() {
return versionHash;
@ -67,12 +88,11 @@ public class PartitionCommitInfo implements Writable {
@Override
public String toString() {
StringBuffer strBuffer = new StringBuffer("partitionid=");
strBuffer.append(partitionId);
strBuffer.append(", version=");
strBuffer.append(version);
strBuffer.append(", versionHash=");
strBuffer.append(versionHash);
return strBuffer.toString();
StringBuilder sb = new StringBuilder("partitionid=");
sb.append(partitionId);
sb.append(", version=").append(version);
sb.append(", versionHash=").append(versionHash);
sb.append(", versionTime=").append(versionTime);
return sb.toString();
}
}

View File

@ -17,14 +17,15 @@
package org.apache.doris.transaction;
import org.apache.doris.common.io.Writable;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import org.apache.doris.common.io.Writable;
import com.google.common.collect.Maps;
public class TableCommitInfo implements Writable {
private long tableId;
@ -60,8 +61,7 @@ public class TableCommitInfo implements Writable {
if (hasPartitionInfo) {
int elementNum = in.readInt();
for (int i = 0; i < elementNum; ++i) {
PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo();
partitionCommitInfo.readFields(in);
PartitionCommitInfo partitionCommitInfo = PartitionCommitInfo.read(in);
idToPartitionCommitInfo.put(partitionCommitInfo.getPartitionId(), partitionCommitInfo);
}
}