From f3bbdfe7d33766c9dc66cf0bbe6ad64e1cabb845 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Wed, 25 Sep 2019 22:56:59 +0800 Subject: [PATCH] Fix bug that load statistic in show load result is incorrect (#1871) Each load job has several load tasks, and each task is a query plan with serveral plan fragments. Each plan fragment report query profile independently. So we need to collect each plan fragment's report, separately. --- .../doris/load/loadv2/BrokerLoadJob.java | 9 +-- .../org/apache/doris/load/loadv2/LoadJob.java | 55 ++++++++++++++----- .../apache/doris/load/loadv2/LoadManager.java | 11 +++- .../java/org/apache/doris/qe/Coordinator.java | 6 +- 4 files changed, 57 insertions(+), 24 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 9c87721ae6..e9f70bfab8 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -63,7 +63,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; /** * There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask, CommitAndPublishTxn. @@ -239,13 +238,11 @@ public class BrokerLoadJob extends LoadJob { // retry task idToTasks.remove(loadTask.getSignature()); if (loadTask instanceof LoadLoadingTask) { - loadStatistic.numScannedRowsMap.remove(((LoadLoadingTask) loadTask).getLoadId()); + loadStatistic.removeLoad(((LoadLoadingTask) loadTask).getLoadId()); } loadTask.updateRetryInfo(); idToTasks.put(loadTask.getSignature(), loadTask); - if (loadTask instanceof LoadLoadingTask) { - loadStatistic.numScannedRowsMap.put(((LoadLoadingTask) loadTask).getLoadId(), new AtomicLong(0)); - } + // load id will be added to loadStatistic when executing this task Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); return; } @@ -365,7 +362,7 @@ public class BrokerLoadJob extends LoadJob { // idToTasks contains previous LoadPendingTasks, so idToTasks is just used to save all tasks. // use newLoadingTasks to save new created loading tasks and submit them later. newLoadingTasks.add(task); - loadStatistic.numScannedRowsMap.put(loadId, new AtomicLong(0)); + // load id will be added to loadStatistic when executing this task // save all related tables and rollups in transaction state TransactionState txnState = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(transactionId); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 2cc30c6eef..2ac879cda8 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -53,9 +53,11 @@ import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import com.google.common.base.Joiner; +import com.google.common.collect.HashBasedTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.collect.Table; import com.google.gson.Gson; import org.apache.logging.log4j.LogManager; @@ -67,7 +69,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; public abstract class LoadJob extends AbstractTxnStateChangeCallback implements LoadTaskCallback, Writable { @@ -129,23 +130,48 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements public static class LoadStatistic { // number of rows processed on BE, this number will be updated periodically by query report. - // A load job may has several load tasks, so the map key is load task's plan load id. - public Map numScannedRowsMap = Maps.newConcurrentMap(); + // A load job may has several load tasks(queries), and each task has several fragments. + // each fragment will report independently. + // load task id -> fragment id -> rows count + private Table counterTbl = HashBasedTable.create(); + // number of file to be loaded public int fileNum = 0; public long totalFileSizeB = 0; - - public String toJson() { + + // init the statistic of specified load task + public synchronized void initLoad(TUniqueId loadId, Set fragmentIds) { + counterTbl.rowMap().remove(loadId); + for (TUniqueId fragId : fragmentIds) { + counterTbl.put(loadId, fragId, 0L); + } + } + + public synchronized void removeLoad(TUniqueId loadId) { + counterTbl.rowMap().remove(loadId); + } + + public synchronized void updateLoad(TUniqueId loadId, TUniqueId fragmentId, long rows) { + if (counterTbl.contains(loadId, fragmentId)) { + counterTbl.put(loadId, fragmentId, rows); + } + } + + public synchronized void clearAllLoads() { + counterTbl.clear(); + } + + public synchronized String toJson() { long total = 0; - for (AtomicLong atomicLong : numScannedRowsMap.values()) { - total += atomicLong.get(); + for (long rows : counterTbl.values()) { + total += rows; } Map details = Maps.newHashMap(); details.put("ScannedRows", total); details.put("FileNumber", fileNum); details.put("FileSize", totalFileSizeB); - details.put("TaskNumber", numScannedRowsMap.size()); + details.put("TaskNumber", counterTbl.rowMap().size()); Gson gson = new Gson(); return gson.toJson(details); } @@ -222,11 +248,12 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements return transactionId; } - public void updateScannedRows(TUniqueId loadId, long scannedRows) { - AtomicLong atomicLong = loadStatistic.numScannedRowsMap.get(loadId); - if (atomicLong != null) { - atomicLong.set(scannedRows); - } + public void initScannedRows(TUniqueId loadId, Set fragmentIds) { + loadStatistic.initLoad(loadId, fragmentIds); + } + + public void updateScannedRows(TUniqueId loadId, TUniqueId fragmentId, long scannedRows) { + loadStatistic.updateLoad(loadId, fragmentId, scannedRows); } public void setLoadFileInfo(int fileNum, long fileSize) { @@ -510,7 +537,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements } } idToTasks.clear(); - loadStatistic.numScannedRowsMap.clear(); + loadStatistic.clearAllLoads(); // set failMsg and state this.failMsg = failMsg; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index ab3b5045fd..cb6d95bc74 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -560,10 +560,17 @@ public class LoadManager implements Writable{ return false; } - public void updateJobScannedRows(Long jobId, TUniqueId loadId, long scannedRows) { + public void initJobScannedRows(Long jobId, TUniqueId loadId, Set fragmentIds) { LoadJob job = idToLoadJob.get(jobId); if (job != null) { - job.updateScannedRows(loadId, scannedRows); + job.initScannedRows(loadId, fragmentIds); + } + } + + public void updateJobScannedRows(Long jobId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows) { + LoadJob job = idToLoadJob.get(jobId); + if (job != null) { + job.updateScannedRows(loadId, fragmentId, scannedRows); } } diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 4bfa38d464..46237aba20 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -400,10 +400,11 @@ public class Coordinator { toBrpcHost(topParams.instanceExecParams.get(0).host), queryOptions.query_timeout * 1000); } else { - // This is a insert statement. + // This is a load process. this.queryOptions.setIs_report_success(true); deltaUrls = Lists.newArrayList(); loadCounters = Maps.newHashMap(); + Catalog.getCurrentCatalog().getLoadManager().initJobScannedRows(jobId, queryId, instanceIds); } // to keep things simple, make async Cancel() calls wait until plan fragment @@ -1191,7 +1192,8 @@ public class Coordinator { } if (params.isSetLoaded_rows()) { - Catalog.getCurrentCatalog().getLoadManager().updateJobScannedRows(jobId, params.query_id, params.loaded_rows); + Catalog.getCurrentCatalog().getLoadManager().updateJobScannedRows( + jobId, params.query_id, params.fragment_instance_id, params.loaded_rows); } return;