[feature](load) collect loaded rows on table level after txn published (#24346)
As title.
Stream load 20 lines
```
2023-09-14 11:40:04,186 DEBUG (PUBLISH_VERSION|23) [DatabaseTransactionMgr.updateCatalogAfterVisible():1769] table id to loaded rows:{51016=20}
```
```
mysql> select count(*) from dup_tbl_basic;
+----------+
| count(*) |
+----------+
| 20 |
+----------+
1 row in set (0.05 sec)
```
This commit is contained in:
@ -491,6 +491,9 @@ public class MasterImpl {
|
||||
// not remove the task from queue and be will retry
|
||||
return;
|
||||
}
|
||||
if (request.isSetTabletIdToDeltaNumRows()) {
|
||||
publishVersionTask.setTabletIdToDeltaNumRows(request.getTabletIdToDeltaNumRows());
|
||||
}
|
||||
AgentTaskQueue.removeTask(publishVersionTask.getBackendId(),
|
||||
publishVersionTask.getTaskType(),
|
||||
publishVersionTask.getSignature());
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.thrift.TPartitionVersionInfo;
|
||||
import org.apache.doris.thrift.TPublishVersionRequest;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -38,6 +39,11 @@ public class PublishVersionTask extends AgentTask {
|
||||
// tabletId => version, current version = 0
|
||||
private Map<Long, Long> succTablets;
|
||||
|
||||
/**
|
||||
* To collect loaded rows for each tablet from each BE
|
||||
*/
|
||||
private final Map<Long, Long> tabletIdToDeltaNumRows = Maps.newHashMap();
|
||||
|
||||
public PublishVersionTask(long backendId, long transactionId, long dbId,
|
||||
List<TPartitionVersionInfo> partitionVersionInfos, long createTime) {
|
||||
super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, -1L, transactionId, createTime);
|
||||
@ -81,4 +87,12 @@ public class PublishVersionTask extends AgentTask {
|
||||
}
|
||||
this.errorTablets.addAll(errorTablets);
|
||||
}
|
||||
|
||||
public void setTabletIdToDeltaNumRows(Map<Long, Long> tabletIdToDeltaNumRows) {
|
||||
this.tabletIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows);
|
||||
}
|
||||
|
||||
public Map<Long, Long> getTabletIdToDeltaNumRows() {
|
||||
return tabletIdToDeltaNumRows;
|
||||
}
|
||||
}
|
||||
|
||||
@ -51,6 +51,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
|
||||
import org.apache.doris.persist.EditLog;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.statistics.AnalysisManager;
|
||||
import org.apache.doris.task.AgentBatchTask;
|
||||
import org.apache.doris.task.AgentTaskExecutor;
|
||||
import org.apache.doris.task.ClearTransactionTask;
|
||||
@ -1787,6 +1788,9 @@ public class DatabaseTransactionMgr {
|
||||
}
|
||||
}
|
||||
}
|
||||
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
|
||||
LOG.debug("table id to loaded rows:{}", transactionState.getTableIdToNumDeltaRows());
|
||||
transactionState.getTableIdToNumDeltaRows().forEach(analysisManager::updateUpdatedRows);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@ -18,6 +18,8 @@
|
||||
package org.apache.doris.transaction;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.catalog.TabletMeta;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
@ -29,13 +31,17 @@ import org.apache.doris.task.PublishVersionTask;
|
||||
import org.apache.doris.thrift.TPartitionVersionInfo;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class PublishVersionDaemon extends MasterDaemon {
|
||||
|
||||
@ -121,12 +127,39 @@ public class PublishVersionDaemon extends MasterDaemon {
|
||||
AgentTaskExecutor.submit(batchTask);
|
||||
}
|
||||
|
||||
TabletInvertedIndex tabletInvertedIndex = Env.getCurrentEnv().getTabletInvertedIndex();
|
||||
Set<Long> tabletIdFilter = Sets.newHashSet();
|
||||
Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
|
||||
// try to finish the transaction, if failed just retry in next loop
|
||||
for (TransactionState transactionState : readyTransactionStates) {
|
||||
boolean hasBackendAliveAndUnfinishTask = transactionState.getPublishVersionTasks().values().stream()
|
||||
Stream<PublishVersionTask> publishVersionTaskStream = transactionState
|
||||
.getPublishVersionTasks()
|
||||
.values()
|
||||
.stream()
|
||||
.peek(task -> {
|
||||
if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) {
|
||||
Map<Long, Long> tabletIdToDeltaNumRows =
|
||||
task.getTabletIdToDeltaNumRows();
|
||||
tabletIdToDeltaNumRows.forEach((tabletId, numRows) -> {
|
||||
if (!tabletIdFilter.add(tabletId)) {
|
||||
// means the delta num rows for this tablet id has been collected
|
||||
return;
|
||||
}
|
||||
TabletMeta tabletMeta = tabletInvertedIndex.getTabletMeta(tabletId);
|
||||
if (tabletMeta == null) {
|
||||
// for delete, drop, schema change etc. here may be a null value
|
||||
return;
|
||||
}
|
||||
long tableId = tabletMeta.getTableId();
|
||||
tableIdToNumDeltaRows.computeIfPresent(tableId, (tblId, orgNum) -> orgNum + numRows);
|
||||
tableIdToNumDeltaRows.putIfAbsent(tableId, numRows);
|
||||
});
|
||||
}
|
||||
});
|
||||
boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
|
||||
.anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId()));
|
||||
|
||||
boolean shouldFinishTxn = !hasBackendAliveAndUnfinishTask || transactionState.isPublishTimeout();
|
||||
boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout();
|
||||
if (shouldFinishTxn) {
|
||||
try {
|
||||
// one transaction exception should not affect other transaction
|
||||
|
||||
@ -253,6 +253,8 @@ public class TransactionState implements Writable {
|
||||
// tbl id -> (index ids)
|
||||
private Map<Long, Set<Long>> loadedTblIndexes = Maps.newHashMap();
|
||||
|
||||
private Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
|
||||
|
||||
private String errorLogUrl = null;
|
||||
|
||||
// record some error msgs during the transaction operation.
|
||||
@ -701,6 +703,14 @@ public class TransactionState implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
public Map<Long, Long> getTableIdToNumDeltaRows() {
|
||||
return tableIdToNumDeltaRows;
|
||||
}
|
||||
|
||||
public void setTableIdToNumDeltaRows(Map<Long, Long> tableIdToNumDeltaRows) {
|
||||
this.tableIdToNumDeltaRows.putAll(tableIdToNumDeltaRows);
|
||||
}
|
||||
|
||||
public void setErrorMsg(String errMsg) {
|
||||
this.errMsg = errMsg;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user