[improvement](statistics)Log one bdbje record for one load transaction. #31619 (#31697)

This commit is contained in:
Jibing-Li
2024-03-02 23:04:26 +08:00
committed by GitHub
parent c1dcce42d4
commit de9b5f7b69
9 changed files with 252 additions and 42 deletions

View File

@ -302,7 +302,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
try {
makeSureInitialized();
} catch (Exception e) {
LOG.warn("Failed to initialize table {}.{}.{}", catalog.name, dbName, name, e);
LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e);
return 0;
}
// All external table should get external row count from cache.

View File

@ -3061,6 +3061,7 @@ public class InternalCatalog implements CatalogIf<Database> {
rowsToTruncate += partition.getBaseIndex().getRowCount();
}
} else {
rowsToTruncate = olapTable.getRowCount();
for (Partition partition : olapTable.getPartitions()) {
// If need absolutely correct, should check running txn here.
// But if the txn is in prepare state, cann't known which partitions had load data.
@ -3211,12 +3212,15 @@ public class InternalCatalog implements CatalogIf<Database> {
} finally {
olapTable.writeUnlock();
}
HashMap<Long, Long> updateRecords = new HashMap<>();
updateRecords.put(olapTable.getId(), rowsToTruncate);
if (truncateEntireTable) {
// Drop the whole table stats after truncate the entire table
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
} else {
// Update the updated rows in table stats after truncate some partitions.
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(olapTable.getId(), rowsToTruncate);
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords);
}
LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames());
}

View File

@ -127,7 +127,9 @@ import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicy;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.NewPartitionLoadedEvent;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.UpdateRowsEvent;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@ -935,6 +937,16 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_LOG_UPDATE_ROWS: {
data = UpdateRowsEvent.read(in);
isRead = true;
break;
}
case OperationType.OP_LOG_NEW_PARTITION_LOADED: {
data = NewPartitionLoadedEvent.read(in);
isRead = true;
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);

View File

@ -88,7 +88,9 @@ import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.NewPartitionLoadedEvent;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.UpdateRowsEvent;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@ -1181,8 +1183,14 @@ public class EditLog {
env.getExternalMetaIdMgr().replayMetaIdMappingsLog((MetaIdMappingsLog) journal.getData());
break;
}
case OperationType.OP_LOG_UPDATE_ROWS:
case OperationType.OP_LOG_NEW_PARTITION_LOADED:
case OperationType.OP_LOG_UPDATE_ROWS: {
env.getAnalysisManager().replayUpdateRowsRecord((UpdateRowsEvent) journal.getData());
break;
}
case OperationType.OP_LOG_NEW_PARTITION_LOADED: {
env.getAnalysisManager().replayNewPartitionLoadedEvent((NewPartitionLoadedEvent) journal.getData());
break;
}
case OperationType.OP_LOG_ALTER_COLUMN_STATS: {
// TODO: implement this while statistics finished related work.
break;
@ -2022,6 +2030,14 @@ public class EditLog {
logEdit(OperationType.OP_UPDATE_TABLE_STATS, tableStats);
}
public void logUpdateRowsRecord(UpdateRowsEvent record) {
logEdit(OperationType.OP_LOG_UPDATE_ROWS, record);
}
public void logNewPartitionLoadedEvent(NewPartitionLoadedEvent event) {
logEdit(OperationType.OP_LOG_NEW_PARTITION_LOADED, event);
}
public void logDeleteTableStats(TableStatsDeletionLog log) {
logEdit(OperationType.OP_DELETE_TABLE_STATS, log);
}

View File

@ -996,21 +996,31 @@ public class AnalysisManager implements Writable {
}
// Invoke this when load transaction finished.
public void updateUpdatedRows(long tblId, long rows) {
TableStatsMeta statsStatus = idToTblStats.get(tblId);
if (statsStatus != null) {
statsStatus.updatedRows.addAndGet(rows);
logCreateTableStats(statsStatus);
public void updateUpdatedRows(Map<Long, Long> records) {
if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread() || records == null || records.isEmpty()) {
return;
}
for (Entry<Long, Long> record : records.entrySet()) {
TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
if (statsStatus != null) {
statsStatus.updatedRows.addAndGet(record.getValue());
}
}
logUpdateRowsRecord(new UpdateRowsEvent(records));
}
// Set to true means new partition loaded data
public void setNewPartitionLoaded(long tblId) {
TableStatsMeta statsStatus = idToTblStats.get(tblId);
if (statsStatus != null && Env.getCurrentEnv().isMaster() && !Env.isCheckpointThread()) {
statsStatus.newPartitionLoaded.set(true);
logCreateTableStats(statsStatus);
public void setNewPartitionLoaded(List<Long> tableIds) {
if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread() || tableIds == null || tableIds.isEmpty()) {
return;
}
for (long tableId : tableIds) {
TableStatsMeta statsStatus = idToTblStats.get(tableId);
if (statsStatus != null) {
statsStatus.newPartitionLoaded.set(true);
}
}
logNewPartitionLoadedEvent(new NewPartitionLoadedEvent(tableIds));
}
public void updateTableStatsStatus(TableStatsMeta tableStats) {
@ -1026,6 +1036,38 @@ public class AnalysisManager implements Writable {
Env.getCurrentEnv().getEditLog().logCreateTableStats(tableStats);
}
public void logUpdateRowsRecord(UpdateRowsEvent record) {
Env.getCurrentEnv().getEditLog().logUpdateRowsRecord(record);
}
public void logNewPartitionLoadedEvent(NewPartitionLoadedEvent event) {
Env.getCurrentEnv().getEditLog().logNewPartitionLoadedEvent(event);
}
public void replayUpdateRowsRecord(UpdateRowsEvent event) {
if (event == null || event.getRecords() == null) {
return;
}
for (Entry<Long, Long> record : event.getRecords().entrySet()) {
TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
if (statsStatus != null) {
statsStatus.updatedRows.addAndGet(record.getValue());
}
}
}
public void replayNewPartitionLoadedEvent(NewPartitionLoadedEvent event) {
if (event == null || event.getTableIds() == null) {
return;
}
for (long tableId : event.getTableIds()) {
TableStatsMeta statsStatus = idToTblStats.get(tableId);
if (statsStatus != null) {
statsStatus.newPartitionLoaded.set(true);
}
}
}
public void registerSysJob(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask> taskInfos) {
recordAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);

View File

@ -27,20 +27,25 @@ import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
public class NewPartitionLoadedEvent implements Writable {
@SerializedName("partitionIdToTableId")
public final Map<Long, Long> partitionIdToTableId = new HashMap<>();
@SerializedName("tableIds")
private List<Long> tableIds;
@VisibleForTesting
public NewPartitionLoadedEvent() {}
public NewPartitionLoadedEvent(List<Long> tableIds) {
this.tableIds = tableIds;
}
// No need to be thread safe, only publish thread will call this.
public void addPartition(long tableId, long partitionId) {
partitionIdToTableId.put(tableId, partitionId);
public void addTableId(long tableId) {
tableIds.add(tableId);
}
public List<Long> getTableIds() {
return tableIds;
}
@Override
@ -51,7 +56,6 @@ public class NewPartitionLoadedEvent implements Writable {
public static NewPartitionLoadedEvent read(DataInput dataInput) throws IOException {
String json = Text.readString(dataInput);
NewPartitionLoadedEvent newPartitionLoadedEvent = GsonUtils.GSON.fromJson(json, NewPartitionLoadedEvent.class);
return newPartitionLoadedEvent;
return GsonUtils.GSON.fromJson(json, NewPartitionLoadedEvent.class);
}
}

View File

@ -21,30 +21,24 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class UpdateRowsEvent implements Writable {
@SerializedName("tableIdToUpdateRows")
public final Map<Long, Long> tableIdToUpdateRows = new HashMap<>();
@SerializedName("records")
private Map<Long, Long> records;
@VisibleForTesting
public UpdateRowsEvent() {}
public UpdateRowsEvent(Map<Long, Long> records) {
this.records = records;
}
// No need to be thread safe, only publish thread will call this.
public void addUpdateRows(long tableId, long rows) {
if (tableIdToUpdateRows.containsKey(tableId)) {
tableIdToUpdateRows.put(tableId, tableIdToUpdateRows.get(tableId) + rows);
} else {
tableIdToUpdateRows.put(tableId, rows);
}
public Map<Long, Long> getRecords() {
return records;
}
@Override
@ -53,9 +47,8 @@ public class UpdateRowsEvent implements Writable {
Text.writeString(out, json);
}
public static UpdateRowsEvent read(DataInput dataInput) throws IOException {
String json = Text.readString(dataInput);
UpdateRowsEvent updateRowsEvent = GsonUtils.GSON.fromJson(json, UpdateRowsEvent.class);
return updateRowsEvent;
public static UpdateRowsEvent read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, UpdateRowsEvent.class);
}
}

View File

@ -1923,6 +1923,7 @@ public class DatabaseTransactionMgr {
private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db) {
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
List<Long> newPartitionLoadedTableIds = new ArrayList<>();
for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
long tableId = tableCommitInfo.getTableId();
OlapTable table = (OlapTable) db.getTableNullable(tableId);
@ -1984,7 +1985,7 @@ public class DatabaseTransactionMgr {
long versionTime = partitionCommitInfo.getVersionTime();
if (partition.getVisibleVersion() == Partition.PARTITION_INIT_VERSION
&& version > Partition.PARTITION_INIT_VERSION) {
analysisManager.setNewPartitionLoaded(tableId);
newPartitionLoadedTableIds.add(tableId);
}
partition.updateVisibleVersionAndTime(version, versionTime);
if (LOG.isDebugEnabled()) {
@ -2011,7 +2012,8 @@ public class DatabaseTransactionMgr {
if (LOG.isDebugEnabled()) {
LOG.debug("table id to loaded rows:{}", tableIdToNumDeltaRows);
}
tableIdToNumDeltaRows.forEach(analysisManager::updateUpdatedRows);
analysisManager.setNewPartitionLoaded(newPartitionLoadedTableIds);
analysisManager.updateUpdatedRows(tableIdToNumDeltaRows);
return true;
}