[enhance](mtmv)add version and version time for table (#30437)

Add version to record data changes in the table

Scope of impact: 

- Transaction related operations
- drop partition
- replace partition
This commit is contained in:
zhangdong
2024-01-29 14:59:17 +08:00
committed by yiguolei
parent db094da081
commit 3354ac48f7
15 changed files with 257 additions and 36 deletions

View File

@ -5428,11 +5428,13 @@ public class Env {
}
}
olapTable.replaceTempPartitions(partitionNames, tempPartitionNames, isStrictRange, useTempPartitionName);
long version = olapTable.getNextVersion();
long versionTime = System.currentTimeMillis();
olapTable.updateVisibleVersionAndTime(version, versionTime);
// write log
ReplacePartitionOperationLog info =
new ReplacePartitionOperationLog(db.getId(), db.getFullName(), olapTable.getId(), olapTable.getName(),
partitionNames, tempPartitionNames, isStrictRange, useTempPartitionName);
partitionNames, tempPartitionNames, isStrictRange, useTempPartitionName, version, versionTime);
editLog.logReplaceTempPartition(info);
LOG.info("finished to replace partitions {} with temp partitions {} from table: {}", clause.getPartitionNames(),
clause.getTempPartitionNames(), olapTable.getName());
@ -5450,6 +5452,8 @@ public class Env {
olapTable.replaceTempPartitions(replaceTempPartitionLog.getPartitions(),
replaceTempPartitionLog.getTempPartitions(), replaceTempPartitionLog.isStrictRange(),
replaceTempPartitionLog.useTempPartitionName());
olapTable.updateVisibleVersionAndTime(replaceTempPartitionLog.getVersion(),
replaceTempPartitionLog.getVersionTime());
} catch (DdlException e) {
throw new MetaNotFoundException(e);
} finally {

View File

@ -2502,6 +2502,28 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
return tablets;
}
// During `getNextVersion` and `updateVisibleVersionAndTime` period,
// the write lock on the table should be held continuously
public void updateVisibleVersionAndTime(long visibleVersion, long visibleVersionTime) {
LOG.info("updateVisibleVersionAndTime, tableName: {}, visibleVersion, {}, visibleVersionTime: {}", name,
visibleVersion, visibleVersionTime);
tableAttributes.updateVisibleVersionAndTime(visibleVersion, visibleVersionTime);
}
// During `getNextVersion` and `updateVisibleVersionAndTime` period,
// the write lock on the table should be held continuously
public long getNextVersion() {
return tableAttributes.getNextVersion();
}
public long getVisibleVersion() {
return tableAttributes.getVisibleVersion();
}
public long getVisibleVersionTime() {
return tableAttributes.getVisibleVersionTime();
}
@Override
public PartitionType getPartitionType() {
return partitionInfo.getType();

View File

@ -118,7 +118,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
protected String comment = "";
@SerializedName(value = "ta")
private TableAttributes tableAttributes = new TableAttributes();
protected TableAttributes tableAttributes = new TableAttributes();
// check read lock leaky
private Map<Long, String> readLockThreads = null;

View File

@ -34,20 +34,57 @@ import java.util.Map;
* TableAttributes contains additional information about all table
*/
public class TableAttributes implements Writable {
public static final long TABLE_INIT_VERSION = 1L;
@SerializedName(value = "constraints")
private final Map<String, Constraint> constraintsMap = new HashMap<>();
@SerializedName(value = "visibleVersion")
private long visibleVersion;
@SerializedName(value = "visibleVersionTime")
private long visibleVersionTime;
public TableAttributes() {
this.visibleVersion = TABLE_INIT_VERSION;
this.visibleVersionTime = System.currentTimeMillis();
}
public Map<String, Constraint> getConstraintsMap() {
return constraintsMap;
}
public long getVisibleVersion() {
return visibleVersion;
}
public long getVisibleVersionTime() {
return visibleVersionTime;
}
public void updateVisibleVersionAndTime(long visibleVersion, long visibleVersionTime) {
// To be compatible with previous versions
if (visibleVersion <= TABLE_INIT_VERSION) {
return;
}
this.visibleVersion = visibleVersion;
this.visibleVersionTime = visibleVersionTime;
}
public long getNextVersion() {
return visibleVersion + 1;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public TableAttributes read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), TableAttributes.class);
}
public Map<String, Constraint> getConstraintsMap() {
return constraintsMap;
public TableAttributes read(DataInput in) throws IOException {
TableAttributes tableAttributes = GsonUtils.GSON.fromJson(Text.readString(in), TableAttributes.class);
// To be compatible with previous versions
if (tableAttributes.getVisibleVersion() < TABLE_INIT_VERSION) {
tableAttributes.visibleVersion = TABLE_INIT_VERSION;
tableAttributes.visibleVersionTime = System.currentTimeMillis();
}
return tableAttributes;
}
}

View File

@ -45,7 +45,7 @@ public class TablesProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("TableId").add("TableName").add("IndexNum").add("PartitionColumnName")
.add("PartitionNum").add("State").add("Type").add("LastConsistencyCheckTime").add("ReplicaCount")
.add("LastUpdateTime")
.add("VisibleVersion").add("VisibleVersionTime").add("LastUpdateTime")
.build();
private DatabaseIf db;
@ -117,6 +117,8 @@ public class TablesProcDir implements ProcDirInterface {
// last check time
tableInfo.add(TimeUtils.longToTimeString(olapTable.getLastCheckTime()));
tableInfo.add(replicaCount);
tableInfo.add(olapTable.getVisibleVersion());
tableInfo.add(olapTable.getVisibleVersionTime());
} else {
tableInfo.add(table.getId());
tableInfo.add(table.getName());
@ -128,6 +130,8 @@ public class TablesProcDir implements ProcDirInterface {
// last check time
tableInfo.add(FeConstants.null_string);
tableInfo.add(replicaCount);
tableInfo.add(FeConstants.null_string);
tableInfo.add(FeConstants.null_string);
}
tableInfo.add(TimeUtils.longToTimeString(table.getUpdateTime()));
tableInfos.add(tableInfo);

View File

@ -1771,10 +1771,12 @@ public class InternalCatalog implements CatalogIf<Database> {
recycleTime = Env.getCurrentRecycleBin().getRecycleTimeById(partition.getId());
}
}
long version = olapTable.getNextVersion();
long versionTime = System.currentTimeMillis();
olapTable.updateVisibleVersionAndTime(version, versionTime);
// log
DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionName, isTempPartition,
clause.isForceDrop(), recycleTime);
clause.isForceDrop(), recycleTime, version, versionTime);
Env.getCurrentEnv().getEditLog().logDropPartition(info);
LOG.info("succeed in dropping partition[{}], table : [{}-{}], is temp : {}, is force : {}",
@ -1795,6 +1797,7 @@ public class InternalCatalog implements CatalogIf<Database> {
Env.getCurrentRecycleBin().setRecycleTimeByIdForReplay(partition.getId(), info.getRecycleTime());
}
}
olapTable.updateVisibleVersionAndTime(info.getVersion(), info.getVersionTime());
} finally {
olapTable.writeUnlock();
}

View File

@ -42,12 +42,16 @@ public class DropPartitionInfo implements Writable {
private long recycleTime = 0;
@SerializedName(value = "sql")
private String sql;
@SerializedName(value = "version")
private long version = 0L;
@SerializedName(value = "versionTime")
private long versionTime = 0L;
private DropPartitionInfo() {
}
public DropPartitionInfo(Long dbId, Long tableId, String partitionName,
boolean isTempPartition, boolean forceDrop, long recycleTime) {
boolean isTempPartition, boolean forceDrop, long recycleTime, long version, long versionTime) {
this.dbId = dbId;
this.tableId = tableId;
this.partitionName = partitionName;
@ -65,6 +69,8 @@ public class DropPartitionInfo implements Writable {
sb.append(" FORCE");
}
this.sql = sb.toString();
this.version = version;
this.versionTime = versionTime;
}
public Long getDbId() {
@ -88,7 +94,15 @@ public class DropPartitionInfo implements Writable {
}
public Long getRecycleTime() {
return recycleTime;
return recycleTime;
}
public long getVersion() {
return version;
}
public long getVersionTime() {
return versionTime;
}
@Deprecated

View File

@ -49,11 +49,15 @@ public class ReplacePartitionOperationLog implements Writable {
private boolean strictRange;
@SerializedName(value = "useTempPartitionName")
private boolean useTempPartitionName;
@SerializedName(value = "version")
private long version = 0L;
@SerializedName(value = "versionTime")
private long versionTime = 0L;
public ReplacePartitionOperationLog(long dbId, String dbName, long tblId, String tblName,
List<String> partitionNames,
List<String> tempPartitonNames, boolean strictRange,
boolean useTempPartitionName) {
boolean useTempPartitionName, long version, long versionTime) {
this.dbId = dbId;
this.dbName = dbName;
this.tblId = tblId;
@ -62,6 +66,8 @@ public class ReplacePartitionOperationLog implements Writable {
this.tempPartitions = tempPartitonNames;
this.strictRange = strictRange;
this.useTempPartitionName = useTempPartitionName;
this.version = version;
this.versionTime = versionTime;
}
public long getDbId() {
@ -88,6 +94,14 @@ public class ReplacePartitionOperationLog implements Writable {
return useTempPartitionName;
}
public long getVersion() {
return version;
}
public long getVersionTime() {
return versionTime;
}
public static ReplacePartitionOperationLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ReplacePartitionOperationLog.class);

View File

@ -1269,8 +1269,9 @@ public class DatabaseTransactionMgr {
transactionState.setTransactionStatus(TransactionStatus.PRECOMMITTED);
transactionState.setErrorReplicas(errorReplicaIds);
for (long tableId : tableToPartition.keySet()) {
TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
OlapTable table = (OlapTable) db.getTableNullable(tableId);
TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId, table.getNextVersion(),
System.currentTimeMillis());
PartitionInfo tblPartitionInfo = table.getPartitionInfo();
for (long partitionId : tableToPartition.get(tableId)) {
String partitionRange = "";
@ -1309,8 +1310,9 @@ public class DatabaseTransactionMgr {
transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
transactionState.setErrorReplicas(errorReplicaIds);
for (long tableId : tableToPartition.keySet()) {
TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
OlapTable table = (OlapTable) db.getTableNullable(tableId);
TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId, table.getNextVersion(),
System.currentTimeMillis());
PartitionInfo tblPartitionInfo = table.getPartitionInfo();
for (long partitionId : tableToPartition.get(tableId)) {
Partition partition = table.getPartition(partitionId);
@ -1944,6 +1946,9 @@ public class DatabaseTransactionMgr {
transactionState, partition.getId(), version);
}
}
long tableVersion = tableCommitInfo.getVersion();
long tableVersionTime = tableCommitInfo.getVersionTime();
table.updateVisibleVersionAndTime(tableVersion, tableVersionTime);
}
Map<Long, Long> tableIdToTotalNumDeltaRows = transactionState.getTableIdToTotalNumDeltaRows();
Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();

View File

@ -17,7 +17,11 @@
package org.apache.doris.transaction;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
@ -33,30 +37,40 @@ public class TableCommitInfo implements Writable {
private long tableId;
@SerializedName(value = "idToPartitionCommitInfo")
private Map<Long, PartitionCommitInfo> idToPartitionCommitInfo;
@SerializedName(value = "version")
private long version;
@SerializedName(value = "versionTime")
private long versionTime;
public TableCommitInfo() {
}
public TableCommitInfo(long tableId) {
public TableCommitInfo(long tableId, long version, long visibleTime) {
this.tableId = tableId;
idToPartitionCommitInfo = Maps.newHashMap();
this.version = version;
this.versionTime = visibleTime;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(tableId);
if (idToPartitionCommitInfo == null) {
out.writeBoolean(false);
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static TableCommitInfo read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_129) {
TableCommitInfo info = new TableCommitInfo();
info.readFields(in);
return info;
} else {
out.writeBoolean(true);
out.writeInt(idToPartitionCommitInfo.size());
for (PartitionCommitInfo partitionCommitInfo : idToPartitionCommitInfo.values()) {
partitionCommitInfo.write(out);
}
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, TableCommitInfo.class);
}
}
@Deprecated
public void readFields(DataInput in) throws IOException {
tableId = in.readLong();
boolean hasPartitionInfo = in.readBoolean();
@ -89,4 +103,20 @@ public class TableCommitInfo implements Writable {
public PartitionCommitInfo getPartitionCommitInfo(long partitionId) {
return this.idToPartitionCommitInfo.get(partitionId);
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
public long getVersionTime() {
return versionTime;
}
public void setVersionTime(long versionTime) {
this.versionTime = versionTime;
}
}

View File

@ -717,8 +717,7 @@ public class TransactionState implements Writable {
dbId = in.readLong();
int size = in.readInt();
for (int i = 0; i < size; i++) {
TableCommitInfo info = new TableCommitInfo();
info.readFields(in);
TableCommitInfo info = TableCommitInfo.read(in);
idToTableCommitInfos.put(info.getTableId(), info);
}
txnCoordinator = new TxnCoordinator(TxnSourceType.valueOf(in.readInt()), Text.readString(in));