diff --git a/.gitignore b/.gitignore index 2bdc710987..52134571a5 100644 --- a/.gitignore +++ b/.gitignore @@ -59,6 +59,7 @@ regression-test/realdata/* regression-test/cacheData/* regression-test/conf/regression-conf-custom.groovy regression-test/framework/bin/* +regression-test/certificate.p12 samples/doris-demo/remote-udf-python-demo/*_pb2.py samples/doris-demo/remote-udf-python-demo/*_pb2_grpc.py @@ -103,4 +104,4 @@ lru_cache_test /conf/log4j2-spring.xml /fe/fe-core/src/test/resources/real-help-resource.zip -/ui/dist \ No newline at end of file +/ui/dist diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 54a42740b9..7a7ec74835 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -402,8 +402,9 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { params.__isset.profile = false; } else { req.profile->to_thrift(¶ms.profile); - if (req.load_channel_profile) + if (req.load_channel_profile) { req.load_channel_profile->to_thrift(¶ms.loadChannelProfile); + } params.__isset.profile = true; params.__isset.loadChannelProfile = true; } diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 507a214cc7..2c047e912c 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -109,7 +109,7 @@ Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request, int64_t index_id = request.index_id(); // 1. get tablets channel std::shared_ptr channel; - bool is_finished; + bool is_finished = false; Status st = _get_tablets_channel(channel, is_finished, index_id); if (!st.ok() || is_finished) { return st; diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 7f54480743..e57cd59407 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -117,9 +117,9 @@ protected: Status _get_tablets_channel(std::shared_ptr& channel, bool& is_finished, const int64_t index_id); - template - Status _handle_eos(std::shared_ptr& channel, const Request& request, - Response* response) { + Status _handle_eos(std::shared_ptr& channel, + const PTabletWriterAddBlockRequest& request, + PTabletWriterAddBlockResult* response) { bool finished = false; auto index_id = request.index_id(); RETURN_IF_ERROR(channel->close( diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index e9607d1eff..9e5e87b2cd 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -34,7 +34,7 @@ public: // Signature of a work-processing function. Takes the integer id of the thread which is // calling it (ids run from 0 to num_threads - 1) and a reference to the item to // process. - typedef std::function WorkFunction; + using WorkFunction = std::function; struct Task { public: @@ -138,7 +138,7 @@ private: // Driver method for each thread in the pool. Continues to read work from the queue // until the pool is shutdown. void work_thread(int thread_id) { - Thread::set_self_name(_name.c_str()); + Thread::set_self_name(_name); while (!is_shutdown()) { Task task; if (_work_queue.blocking_get(&task)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index 9bb49363ac..6073ef35f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -66,6 +66,9 @@ public class RuntimeProfile { private Long timestamp = -1L; + private Boolean isDone = false; + private Boolean isCancel = false; + public RuntimeProfile(String name) { this(); this.name = name; @@ -77,6 +80,22 @@ public class RuntimeProfile { this.counterMap.put("TotalTime", counterTotalTime); } + public void setIsCancel(Boolean isCancel) { + this.isCancel = isCancel; + } + + public Boolean getIsCancel() { + return isCancel; + } + + public void setIsDone(Boolean isDone) { + this.isDone = isDone; + } + + public Boolean getIsDone() { + return isDone; + } + public String getName() { return name; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index b83913c465..e60e4122b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -207,7 +207,7 @@ public class Coordinator { // backend execute state private final List backendExecStates = Lists.newArrayList(); - private final Map pipelineExecContexts = new HashMap<>(); + private final Map, PipelineExecContext> pipelineExecContexts = new HashMap<>(); // backend which state need to be checked when joining this coordinator. // It is supposed to be the subset of backendExecStates. private final List needCheckBackendExecStates = Lists.newArrayList(); @@ -840,11 +840,20 @@ public class Coordinator { needCheckBackendState = true; } + Map fragmentInstancesMap = new HashMap(); + for (Map.Entry entry : tParams.entrySet()) { + for (TPipelineInstanceParams instanceParam : entry.getValue().local_params) { + String name = "Instance " + DebugUtil.printId(instanceParam.fragment_instance_id) + + " (host=" + entry.getKey() + ")"; + fragmentInstancesMap.put(instanceParam.fragment_instance_id, new RuntimeProfile(name)); + } + } + // 3. group BackendExecState by BE. So that we can use one RPC to send all fragment instances of a BE. for (Map.Entry entry : tParams.entrySet()) { - PipelineExecContext pipelineExecContext = - new PipelineExecContext(fragment.getFragmentId(), - profileFragmentId, entry.getValue(), this.addressToBackendID, entry.getKey()); + Long backendId = this.addressToBackendID.get(entry.getKey()); + PipelineExecContext pipelineExecContext = new PipelineExecContext(fragment.getFragmentId(), + profileFragmentId, entry.getValue(), backendId, fragmentInstancesMap); // Each tParam will set the total number of Fragments that need to be executed on the same BE, // and the BE will determine whether all Fragments have been executed based on this information. // Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons. @@ -853,7 +862,7 @@ public class Coordinator { entry.getValue().setNeedWaitExecutionTrigger(twoPhaseExecution); entry.getValue().setFragmentId(fragment.getFragmentId().asInt()); - pipelineExecContexts.put(fragment.getFragmentId().asInt(), pipelineExecContext); + pipelineExecContexts.put(Pair.of(fragment.getFragmentId().asInt(), backendId), pipelineExecContext); if (needCheckBackendState) { needCheckPipelineExecContexts.add(pipelineExecContext); if (LOG.isDebugEnabled()) { @@ -2111,7 +2120,7 @@ public class Coordinator { public void updateFragmentExecStatus(TReportExecStatusParams params) { if (enablePipelineEngine) { - PipelineExecContext ctx = pipelineExecContexts.get(params.getFragmentId()); + PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId())); if (!ctx.updateProfile(params)) { return; } @@ -2136,7 +2145,7 @@ public class Coordinator { status.getErrorMsg()); updateStatus(status, params.getFragmentInstanceId()); } - if (ctx.doneFlags.get(params.getFragmentInstanceId())) { + if (ctx.fragmentInstancesMap.get(params.fragment_instance_id).getIsDone()) { if (params.isSetDeltaUrls()) { updateDeltas(params.getDeltaUrls()); } @@ -2714,11 +2723,8 @@ public class Coordinator { PlanFragmentId fragmentId; boolean initiated; volatile boolean done; - volatile Map doneFlags = new HashMap(); boolean hasCanceled; - volatile Map cancelFlags = new HashMap(); - - volatile Map profiles = new HashMap(); + Map fragmentInstancesMap; int cancelProgress = 0; int profileFragmentId; TNetworkAddress brpcAddress; @@ -2729,25 +2735,19 @@ public class Coordinator { private final int numInstances; public PipelineExecContext(PlanFragmentId fragmentId, int profileFragmentId, - TPipelineFragmentParams rpcParams, Map addressToBackendID, - TNetworkAddress addr) { + TPipelineFragmentParams rpcParams, Long backendId, + Map fragmentInstancesMap) { this.profileFragmentId = profileFragmentId; this.fragmentId = fragmentId; this.rpcParams = rpcParams; this.numInstances = rpcParams.local_params.size(); - for (int i = 0; i < this.numInstances; i++) { - this.doneFlags.put(rpcParams.local_params.get(i).fragment_instance_id, false); - this.cancelFlags.put(rpcParams.local_params.get(i).fragment_instance_id, false); + this.fragmentInstancesMap = fragmentInstancesMap; - String name = "Instance " + DebugUtil.printId(rpcParams.local_params.get(i).fragment_instance_id) - + " (host=" + addr + ")"; - this.profiles.put(rpcParams.local_params.get(i).fragment_instance_id, new RuntimeProfile(name)); - } this.initiated = false; this.done = false; - this.address = addr; - this.backend = idToBackend.get(addressToBackendID.get(address)); + this.backend = idToBackend.get(backendId); + this.address = new TNetworkAddress(backend.getIp(), backend.getBePort()); this.brpcAddress = new TNetworkAddress(backend.getIp(), backend.getBrpcPort()); this.hasCanceled = false; @@ -2771,15 +2771,17 @@ public class Coordinator { // update profile. // return true if profile is updated. Otherwise, return false. public synchronized boolean updateProfile(TReportExecStatusParams params) { - if (this.done) { + RuntimeProfile profile = fragmentInstancesMap.get(params.fragment_instance_id); + if (params.done && profile.getIsDone()) { // duplicate packet return false; } + if (params.isSetProfile()) { - this.profiles.get(params.fragment_instance_id).update(params.profile); + profile.update(params.profile); } if (params.done) { - this.doneFlags.replace(params.fragment_instance_id, true); + profile.setIsDone(true); profileReportProgress++; } if (profileReportProgress == numInstances) { @@ -2789,7 +2791,7 @@ public class Coordinator { } public synchronized void printProfile(StringBuilder builder) { - this.profiles.values().stream().forEach(p -> { + this.fragmentInstancesMap.values().stream().forEach(p -> { p.computeTimeInProfile(); p.prettyPrint(builder, ""); }); @@ -2815,7 +2817,7 @@ public class Coordinator { this.initiated, this.done, this.hasCanceled, backend.getId(), DebugUtil.printId(localParam.fragment_instance_id), cancelReason.name()); } - if (cancelFlags.get(localParam.fragment_instance_id)) { + if (fragmentInstancesMap.get(localParam.fragment_instance_id).getIsCancel()) { continue; } try { @@ -2841,7 +2843,7 @@ public class Coordinator { } this.hasCanceled = true; for (int i = 0; i < this.numInstances; i++) { - this.cancelFlags.replace(rpcParams.local_params.get(i).fragment_instance_id, true); + fragmentInstancesMap.get(rpcParams.local_params.get(i).fragment_instance_id).setIsCancel(true); } cancelProgress = numInstances; return true; @@ -3354,8 +3356,8 @@ public class Coordinator { if (!ctx.computeTimeInProfile(fragmentProfile.size())) { return; } - ctx.profiles.values().stream().forEach(p -> - fragmentProfile.get(ctx.profileFragmentId).addChild(p)); + ctx.fragmentInstancesMap.values().stream() + .forEach(p -> fragmentProfile.get(ctx.profileFragmentId).addChild(p)); } } else { for (BackendExecState backendExecState : backendExecStates) {