From 223e4665141dd0066432f2ddcdf6025b85b2f235 Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Wed, 10 Jan 2024 21:51:05 +0800 Subject: [PATCH] [fix](insert-into) fix insert into lose data (#29802) --- .../java/org/apache/doris/qe/Coordinator.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index efee918dfd..12be2b2d8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2520,7 +2520,14 @@ public class Coordinator implements CoordInterface { params.getBackendId(), status.getErrorMsg()); updateStatus(status, params.getFragmentInstanceId()); } - if (ctx.fragmentInstancesMap.get(params.fragment_instance_id).getIsDone()) { + + // params.isDone() should be promised. + // There are some periodic reports during the load process, + // and the reports from the intermediate process may be concurrent with the last report. + // The last report causes the counter to decrease to zero, + // but it is possible that the report without commit-info triggered the commit operation, + // resulting in the data not being published. + if (ctx.fragmentInstancesMap.get(params.fragment_instance_id).getIsDone() && params.isDone()) { if (params.isSetDeltaUrls()) { updateDeltas(params.getDeltaUrls()); } @@ -2585,7 +2592,13 @@ public class Coordinator implements CoordInterface { } } - if (execState.done) { + // params.isDone() should be promised. + // There are some periodic reports during the load process, + // and the reports from the intermediate process may be concurrent with the last report. + // The last report causes the counter to decrease to zero, + // but it is possible that the report without commit-info triggered the commit operation, + // resulting in the data not being published. + if (execState.done && params.isDone()) { if (params.isSetDeltaUrls()) { updateDeltas(params.getDeltaUrls()); }