diff --git a/build-for-release.sh b/build-for-release.sh index 2c27cf6c42..e8bd282c0a 100755 --- a/build-for-release.sh +++ b/build-for-release.sh @@ -125,7 +125,7 @@ if [[ "${_USE_AVX2}" == "0" && "${ARCH}" == "x86_64" ]]; then OUTPUT_BE="${OUTPUT_BE}-noavx2" fi -echo "Pakage Name:" +echo "Package Name:" echo "FE: ${OUTPUT_FE}" echo "BE: ${OUTPUT_BE}" echo "JAR: ${OUTPUT_DEPS}" diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java index 50409eea0d..7477d1673b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.qe.ShowResultSetMetaData; import com.google.common.base.Strings; @@ -32,26 +33,7 @@ import com.google.common.base.Strings; // show query profile "/e0f7390f5363419e-b416a2a79996083e/0/e0f7390f5363419e-b416a2a799960906" # show instance's graph public class ShowQueryProfileStmt extends ShowStmt { // This should be same as ProfileManager.PROFILE_HEADERS - public static final ShowResultSetMetaData META_DATA_QUERY_IDS = - ShowResultSetMetaData.builder() - .addColumn(new Column("JobId", ScalarType.createVarchar(128))) - .addColumn(new Column("QueryId", ScalarType.createVarchar(128))) - .addColumn(new Column("User", ScalarType.createVarchar(128))) - .addColumn(new Column("DefaultDb", ScalarType.createVarchar(128))) - .addColumn(new Column("SQL", ScalarType.createVarchar(65535))) - .addColumn(new Column("QueryType", ScalarType.createVarchar(128))) - .addColumn(new Column("StartTime", ScalarType.createVarchar(128))) - .addColumn(new Column("EndTime", ScalarType.createVarchar(128))) - .addColumn(new Column("TotalTime", ScalarType.createVarchar(128))) - .addColumn(new Column("QueryState", ScalarType.createVarchar(128))) - .addColumn(new Column("TraceId", ScalarType.createVarchar(128))) - .addColumn(new Column("AnalysisTime", ScalarType.createVarchar(128))) - .addColumn(new Column("PlanTime", ScalarType.createVarchar(128))) - .addColumn(new Column("ScheduleTime", ScalarType.createVarchar(128))) - .addColumn(new Column("FetchResultTime", ScalarType.createVarchar(128))) - .addColumn(new Column("WriteResultTime", ScalarType.createVarchar(128))) - .addColumn(new Column("WaitAndFetchResultTime", ScalarType.createVarchar(128))) - .build(); + public static final ShowResultSetMetaData META_DATA_QUERY_IDS; public static final ShowResultSetMetaData META_DATA_FRAGMENTS = ShowResultSetMetaData.builder() @@ -68,6 +50,14 @@ public class ShowQueryProfileStmt extends ShowStmt { .addColumn(new Column("Instance", ScalarType.createVarchar(65535))) .build(); + static { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + for (String key : SummaryProfile.SUMMARY_KEYS) { + builder.addColumn(new Column(key, ScalarType.createStringType())); + } + META_DATA_QUERY_IDS = builder.build(); + } + public enum PathType { QUERY_IDS, FRAGMENTS, diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java new file mode 100644 index 0000000000..e4c6c7c48d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.profile; + +import org.apache.doris.common.MarkedCountDownLatch; +import org.apache.doris.common.Status; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUnit; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +/** + * ExecutionProfile is used to collect profile of a complete query plan(including query or load). + * Need to call addToProfileAsChild() to add it to the root profile. + * It has the following structure: + * Execution Profile: + * Fragment 0: + * Instance 0: + * ... + * Fragment 1: + * Instance 0: + * ... + * ... + * LoadChannels: // only for load job + */ +public class ExecutionProfile { + private static final Logger LOG = LogManager.getLogger(ExecutionProfile.class); + + // The root profile of this execution task + private RuntimeProfile executionProfile; + // Profiles for each fragment. And the InstanceProfile is the child of fragment profile. + // Which will be added to fragment profile when calling Coordinator::sendFragment() + private List fragmentProfiles; + // Profile for load channels. Only for load job. + private RuntimeProfile loadChannelProfile; + // A countdown latch to mark the completion of each instance. + // instance id -> dummy value + private MarkedCountDownLatch profileDoneSignal; + + public ExecutionProfile(TUniqueId queryId, int fragmentNum) { + executionProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId)); + RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments"); + executionProfile.addChild(fragmentsProfile); + fragmentProfiles = Lists.newArrayList(); + for (int i = 0; i < fragmentNum; i++) { + fragmentProfiles.add(new RuntimeProfile("Fragment " + i)); + fragmentsProfile.addChild(fragmentProfiles.get(i)); + } + loadChannelProfile = new RuntimeProfile("LoadChannels"); + executionProfile.addChild(loadChannelProfile); + } + + public RuntimeProfile getExecutionProfile() { + return executionProfile; + } + + public RuntimeProfile getLoadChannelProfile() { + return loadChannelProfile; + } + + public void addToProfileAsChild(RuntimeProfile rootProfile) { + rootProfile.addChild(executionProfile); + } + + public void markInstances(Set instanceIds) { + profileDoneSignal = new MarkedCountDownLatch<>(instanceIds.size()); + for (TUniqueId instanceId : instanceIds) { + profileDoneSignal.addMark(instanceId, -1L /* value is meaningless */); + } + } + + public void update(long startTime, boolean isFinished) { + if (startTime > 0) { + executionProfile.getCounterTotalTime().setValue(TUnit.TIME_MS, TimeUtils.getElapsedTimeMs(startTime)); + } + // Wait for all backends to finish reporting when writing profile last time. + if (isFinished && profileDoneSignal != null) { + try { + profileDoneSignal.await(2, TimeUnit.SECONDS); + } catch (InterruptedException e1) { + LOG.warn("signal await error", e1); + } + } + + for (RuntimeProfile fragmentProfile : fragmentProfiles) { + fragmentProfile.sortChildren(); + } + } + + public void onCancel() { + if (profileDoneSignal != null) { + // count down to zero to notify all objects waiting for this + profileDoneSignal.countDownToZero(new Status()); + LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks() + .stream().map(e -> DebugUtil.printId(e.getKey())).toArray()); + } + } + + public void markOneInstanceDone(TUniqueId fragmentInstanceId) { + if (profileDoneSignal != null) { + profileDoneSignal.markedCountDown(fragmentInstanceId, -1L); + } + } + + public boolean awaitAllInstancesDone(long waitTimeS) throws InterruptedException { + if (profileDoneSignal == null) { + return true; + } + return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS); + } + + public boolean isAllInstancesDone() { + if (profileDoneSignal == null) { + return true; + } + return profileDoneSignal.getCount() == 0; + } + + public void addInstanceProfile(int instanceIdx, RuntimeProfile instanceProfile) { + Preconditions.checkArgument(instanceIdx < fragmentProfiles.size(), + instanceIdx + " vs. " + fragmentProfiles.size()); + fragmentProfiles.get(instanceIdx).addChild(instanceProfile); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java new file mode 100644 index 0000000000..e1336ce5af --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.profile; + +import org.apache.doris.common.util.ProfileManager; +import org.apache.doris.common.util.RuntimeProfile; + +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Map; + +/** + * Profile is a class to record the execution time of a query. + * It has the following structure: + * root profile: + * // summary of this profile, such as start time, end time, query id, etc. + * [SummaryProfile] + * // each execution profile is a complete execution of a query, a job may contain multiple queries. + * [List] + * + * SummaryProfile: + * Summary: + * Execution Summary: + * + * ExecutionProfile: + * Fragment 0: + * Fragment 1: + * ... + */ +public class Profile { + private RuntimeProfile rootProfile; + private SummaryProfile summaryProfile; + private List executionProfiles = Lists.newArrayList(); + private boolean isFinished; + + public Profile(String name, boolean isEnable) { + this.rootProfile = new RuntimeProfile(name); + this.summaryProfile = new SummaryProfile(rootProfile); + // if disabled, just set isFinished to true, so that update() will do nothing + this.isFinished = !isEnable; + } + + public void addExecutionProfile(ExecutionProfile executionProfile) { + this.executionProfiles.add(executionProfile); + executionProfile.addToProfileAsChild(rootProfile); + } + + public synchronized void update(long startTime, Map summaryInfo, boolean isFinished) { + if (this.isFinished) { + return; + } + summaryProfile.update(summaryInfo); + for (ExecutionProfile executionProfile : executionProfiles) { + executionProfile.update(startTime, isFinished); + } + rootProfile.computeTimeInProfile(); + ProfileManager.getInstance().pushProfile(rootProfile); + this.isFinished = isFinished; + } + + public SummaryProfile getSummaryProfile() { + return summaryProfile; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java new file mode 100644 index 0000000000..e3e2586307 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.profile; + +import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.thrift.TUnit; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; + +import java.util.Map; + +/** + * SummaryProfile is part of a query profile. + * It contains the summary information of a query. + */ +public class SummaryProfile { + // Summary + public static final String PROFILE_ID = "Profile ID"; + public static final String TASK_TYPE = "Task Type"; + public static final String START_TIME = "Start Time"; + public static final String END_TIME = "End Time"; + public static final String TOTAL_TIME = "Total"; + public static final String TASK_STATE = "Task State"; + public static final String USER = "User"; + public static final String DEFAULT_DB = "Default Db"; + public static final String SQL_STATEMENT = "Sql Statement"; + public static final String IS_CACHED = "Is Cached"; + public static final String TOTAL_INSTANCES_NUM = "Total Instances Num"; + public static final String INSTANCES_NUM_PER_BE = "Instances Num Per BE"; + public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE = "Parallel Fragment Exec Instance Num"; + public static final String TRACE_ID = "Trace ID"; + + // Execution Summary + public static final String ANALYSIS_TIME = "Analysis Time"; + public static final String PLAN_TIME = "Plan Time"; + public static final String SCHEDULE_TIME = "Schedule Time"; + public static final String FETCH_RESULT_TIME = "Fetch Result Time"; + public static final String WRITE_RESULT_TIME = "Write Result Time"; + public static final String WAIT_FETCH_RESULT_TIME = "Wait and Fetch Result Time"; + + public static final ImmutableList SUMMARY_KEYS = ImmutableList.of(PROFILE_ID, TASK_TYPE, + START_TIME, END_TIME, TOTAL_TIME, TASK_STATE, USER, DEFAULT_DB, SQL_STATEMENT, IS_CACHED, + TOTAL_INSTANCES_NUM, INSTANCES_NUM_PER_BE, PARALLEL_FRAGMENT_EXEC_INSTANCE, TRACE_ID); + + public static final ImmutableList EXECUTION_SUMMARY_KEYS = ImmutableList.of(ANALYSIS_TIME, PLAN_TIME, + SCHEDULE_TIME, FETCH_RESULT_TIME, WRITE_RESULT_TIME, WAIT_FETCH_RESULT_TIME); + + private RuntimeProfile summaryProfile; + private RuntimeProfile executionSummaryProfile; + + // timestamp of query begin + private long queryBeginTime = -1; + // Analysis end time + private long queryAnalysisFinishTime = -1; + // Plan end time + private long queryPlanFinishTime = -1; + // Fragment schedule and send end time + private long queryScheduleFinishTime = -1; + // Query result fetch end time + private long queryFetchResultFinishTime = -1; + private long tempStarTime = -1; + private long queryFetchResultConsumeTime = 0; + private long queryWriteResultConsumeTime = 0; + + public SummaryProfile(RuntimeProfile rootProfile) { + summaryProfile = new RuntimeProfile("Summary"); + executionSummaryProfile = new RuntimeProfile("Execution Summary"); + init(); + rootProfile.addChild(summaryProfile); + rootProfile.addChild(executionSummaryProfile); + } + + private void init() { + for (String key : SUMMARY_KEYS) { + summaryProfile.addInfoString(key, "N/A"); + } + for (String key : EXECUTION_SUMMARY_KEYS) { + executionSummaryProfile.addInfoString(key, "N/A"); + } + } + + public void update(Map summaryInfo) { + updateSummaryProfile(summaryInfo); + updateExecutionSummaryProfile(); + } + + private void updateSummaryProfile(Map infos) { + for (String key : infos.keySet()) { + if (SUMMARY_KEYS.contains(key)) { + summaryProfile.addInfoString(key, infos.get(key)); + } + } + } + + private void updateExecutionSummaryProfile() { + executionSummaryProfile.addInfoString(ANALYSIS_TIME, getPrettyQueryAnalysisFinishTime()); + executionSummaryProfile.addInfoString(PLAN_TIME, getPrettyQueryPlanFinishTime()); + executionSummaryProfile.addInfoString(SCHEDULE_TIME, getPrettyQueryScheduleFinishTime()); + executionSummaryProfile.addInfoString(FETCH_RESULT_TIME, + RuntimeProfile.printCounter(queryFetchResultConsumeTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(WRITE_RESULT_TIME, + RuntimeProfile.printCounter(queryWriteResultConsumeTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(WAIT_FETCH_RESULT_TIME, getPrettyQueryFetchResultFinishTime()); + } + + public void setQueryBeginTime() { + this.queryBeginTime = TimeUtils.getStartTimeMs(); + } + + public void setQueryAnalysisFinishTime() { + this.queryAnalysisFinishTime = TimeUtils.getStartTimeMs(); + } + + public void setQueryPlanFinishTime() { + this.queryPlanFinishTime = TimeUtils.getStartTimeMs(); + } + + public void setQueryScheduleFinishTime() { + this.queryScheduleFinishTime = TimeUtils.getStartTimeMs(); + } + + public void setQueryFetchResultFinishTime() { + this.queryFetchResultFinishTime = TimeUtils.getStartTimeMs(); + } + + public void setTempStartTime() { + this.tempStarTime = TimeUtils.getStartTimeMs(); + } + + public void freshFetchResultConsumeTime() { + this.queryFetchResultConsumeTime += TimeUtils.getStartTimeMs() - tempStarTime; + } + + public void freshWriteResultConsumeTime() { + this.queryWriteResultConsumeTime += TimeUtils.getStartTimeMs() - tempStarTime; + } + + public long getQueryBeginTime() { + return queryBeginTime; + } + + public static class SummaryBuilder { + private Map map = Maps.newHashMap(); + + public SummaryBuilder profileId(String val) { + map.put(PROFILE_ID, val); + return this; + } + + public SummaryBuilder taskType(String val) { + map.put(TASK_TYPE, val); + return this; + } + + public SummaryBuilder startTime(String val) { + map.put(START_TIME, val); + return this; + } + + public SummaryBuilder endTime(String val) { + map.put(END_TIME, val); + return this; + } + + public SummaryBuilder totalTime(String val) { + map.put(TOTAL_TIME, val); + return this; + } + + public SummaryBuilder taskState(String val) { + map.put(TASK_STATE, val); + return this; + } + + public SummaryBuilder user(String val) { + map.put(USER, val); + return this; + } + + public SummaryBuilder defaultDb(String val) { + map.put(DEFAULT_DB, val); + return this; + } + + public SummaryBuilder sqlStatement(String val) { + map.put(SQL_STATEMENT, val); + return this; + } + + public SummaryBuilder isCached(String val) { + map.put(IS_CACHED, val); + return this; + } + + public SummaryBuilder totalInstancesNum(String val) { + map.put(TOTAL_INSTANCES_NUM, val); + return this; + } + + public SummaryBuilder instancesNumPerBe(String val) { + map.put(INSTANCES_NUM_PER_BE, val); + return this; + } + + public SummaryBuilder parallelFragmentExecInstance(String val) { + map.put(PARALLEL_FRAGMENT_EXEC_INSTANCE, val); + return this; + } + + public SummaryBuilder traceId(String val) { + map.put(TRACE_ID, val); + return this; + } + + public Map build() { + return map; + } + } + + private String getPrettyQueryAnalysisFinishTime() { + if (queryBeginTime == -1 || queryAnalysisFinishTime == -1) { + return "N/A"; + } + return RuntimeProfile.printCounter(queryAnalysisFinishTime - queryBeginTime, TUnit.TIME_MS); + } + + private String getPrettyQueryPlanFinishTime() { + if (queryAnalysisFinishTime == -1 || queryPlanFinishTime == -1) { + return "N/A"; + } + return RuntimeProfile.printCounter(queryPlanFinishTime - queryAnalysisFinishTime, TUnit.TIME_MS); + } + + private String getPrettyQueryScheduleFinishTime() { + if (queryPlanFinishTime == -1 || queryScheduleFinishTime == -1) { + return "N/A"; + } + return RuntimeProfile.printCounter(queryScheduleFinishTime - queryPlanFinishTime, TUnit.TIME_MS); + } + + private String getPrettyQueryFetchResultFinishTime() { + if (queryScheduleFinishTime == -1 || queryFetchResultFinishTime == -1) { + return "N/A"; + } + return RuntimeProfile.printCounter(queryFetchResultFinishTime - queryScheduleFinishTime, TUnit.TIME_MS); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java index 8ec969944f..cbd5e88c79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java @@ -28,8 +28,9 @@ public class Counter { return value; } - public void setValue(long newValue) { - value = newValue; + public void setValue(TUnit type, long value) { + this.type = type.getValue(); + this.value = value; } public TUnit getType() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index 6d0f575d38..dbfb7e83f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -25,6 +25,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.profile.MultiProfileTreeBuilder; import org.apache.doris.common.profile.ProfileTreeBuilder; import org.apache.doris.common.profile.ProfileTreeNode; +import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.nereids.stats.StatsErrorEstimator; import com.google.common.base.Strings; @@ -34,8 +35,6 @@ import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Arrays; -import java.util.Collections; import java.util.Deque; import java.util.Iterator; import java.util.LinkedList; @@ -58,48 +57,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; public class ProfileManager { private static final Logger LOG = LogManager.getLogger(ProfileManager.class); private static volatile ProfileManager INSTANCE = null; - // private static final int ARRAY_SIZE = 100; - // private static final int TOTAL_LEN = 1000 * ARRAY_SIZE ; - // just use for load profile and export profile - public static final String JOB_ID = "Job ID"; - public static final String QUERY_ID = "Query ID"; - public static final String START_TIME = "Start Time"; - public static final String END_TIME = "End Time"; - public static final String TOTAL_TIME = "Total"; - public static final String QUERY_TYPE = "Query Type"; - public static final String QUERY_STATE = "Query State"; - public static final String DORIS_VERSION = "Doris Version"; - public static final String USER = "User"; - public static final String DEFAULT_DB = "Default Db"; - public static final String SQL_STATEMENT = "Sql Statement"; - public static final String IS_CACHED = "Is Cached"; - - public static final String TOTAL_INSTANCES_NUM = "Total Instances Num"; - - public static final String INSTANCES_NUM_PER_BE = "Instances Num Per BE"; - - public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE = "Parallel Fragment Exec Instance Num"; - - public static final String TRACE_ID = "Trace ID"; - public static final String ANALYSIS_TIME = "Analysis Time"; - public static final String FETCH_RESULT_TIME = "Fetch Result Time"; - public static final String PLAN_TIME = "Plan Time"; - public static final String SCHEDULE_TIME = "Schedule Time"; - public static final String WRITE_RESULT_TIME = "Write Result Time"; - public static final String WAIT_FETCH_RESULT_TIME = "Wait and Fetch Result Time"; public enum ProfileType { QUERY, LOAD, } - public static final List PROFILE_HEADERS = Collections.unmodifiableList( - Arrays.asList(JOB_ID, QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE, - START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE, TRACE_ID)); - public static final List EXECUTION_HEADERS = Collections.unmodifiableList( - Arrays.asList(ANALYSIS_TIME, PLAN_TIME, SCHEDULE_TIME, FETCH_RESULT_TIME, - WRITE_RESULT_TIME, WAIT_FETCH_RESULT_TIME)); - public static class ProfileElement { public ProfileElement(RuntimeProfile profile) { this.profile = profile; @@ -164,13 +127,13 @@ public class ProfileManager { public ProfileElement createElement(RuntimeProfile profile) { ProfileElement element = new ProfileElement(profile); RuntimeProfile summaryProfile = profile.getChildList().get(0).first; - for (String header : PROFILE_HEADERS) { + for (String header : SummaryProfile.SUMMARY_KEYS) { element.infoStrings.put(header, summaryProfile.getInfoString(header)); } List> childList = summaryProfile.getChildList(); if (!childList.isEmpty()) { RuntimeProfile executionProfile = childList.get(0).first; - for (String header : EXECUTION_HEADERS) { + for (String header : SummaryProfile.EXECUTION_SUMMARY_KEYS) { element.infoStrings.put(header, executionProfile.getInfoString(header)); } } @@ -194,7 +157,7 @@ public class ProfileManager { ProfileElement element = createElement(profile); // 'insert into' does have job_id, put all profiles key with query_id - String key = element.infoStrings.get(ProfileManager.QUERY_ID); + String key = element.infoStrings.get(SummaryProfile.PROFILE_ID); // check when push in, which can ensure every element in the list has QUERY_ID column, // so there is no need to check when remove element from list. if (Strings.isNullOrEmpty(key)) { @@ -235,15 +198,12 @@ public class ProfileManager { continue; } Map infoStrings = profileElement.infoStrings; - if (type != null && !infoStrings.get(QUERY_TYPE).equalsIgnoreCase(type.name())) { + if (type != null && !infoStrings.get(SummaryProfile.TASK_TYPE).equalsIgnoreCase(type.name())) { continue; } List row = Lists.newArrayList(); - for (String str : PROFILE_HEADERS) { - row.add(infoStrings.get(str)); - } - for (String str : EXECUTION_HEADERS) { + for (String str : SummaryProfile.SUMMARY_KEYS) { row.add(infoStrings.get(str)); } result.add(row); @@ -285,7 +245,7 @@ public class ProfileManager { if (element == null) { throw new AuthenticationException("query with id " + queryId + " not found"); } - if (!element.infoStrings.get(USER).equals(user)) { + if (!element.infoStrings.get(SummaryProfile.USER).equals(user)) { throw new AuthenticationException("Access deny to view query with id: " + queryId); } } finally { @@ -377,7 +337,7 @@ public class ProfileManager { readLock.lock(); try { for (Map.Entry entry : queryIdToProfileMap.entrySet()) { - if (entry.getValue().infoStrings.getOrDefault(TRACE_ID, "").equals(traceId)) { + if (entry.getValue().infoStrings.getOrDefault(SummaryProfile.TRACE_ID, "").equals(traceId)) { return entry.getKey(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileWriter.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileWriter.java deleted file mode 100644 index 3a472708de..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileWriter.java +++ /dev/null @@ -1,24 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.util; - -// this interface is used to write profile to ProfileManager when a task is running. -public interface ProfileWriter { - - void writeProfile(boolean waitReportDone); -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryPlannerProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryPlannerProfile.java deleted file mode 100644 index df5ae5aee2..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryPlannerProfile.java +++ /dev/null @@ -1,130 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.util; - -import org.apache.doris.thrift.TUnit; - -/** - * This profile is mainly used to record the time-consuming situation related to - * executing SQL parsing, planning, scheduling, and fetching results on the FE side. - * Can be expanded later. - * - * All timestamp is in nona second - */ -public class QueryPlannerProfile { - public static final String KEY_ANALYSIS = "Analysis Time"; - public static final String KEY_PLAN = "Plan Time"; - public static final String KEY_SCHEDULE = "Schedule Time"; - public static final String KEY_WAIT_AND_FETCH = "Wait and Fetch Result Time"; - - public static final String KEY_FETCH = "Fetch Result Time"; - - public static final String KEY_WRITE = "Write Result Time"; - - // timestamp of query begin - private long queryBeginTime = -1; - // Analysis end time - private long queryAnalysisFinishTime = -1; - // Plan end time - private long queryPlanFinishTime = -1; - // Fragment schedule and send end time - private long queryScheduleFinishTime = -1; - // Query result fetch end time - private long queryFetchResultFinishTime = -1; - - private long tempStarTime = -1; - - private long queryFetchResultConsumeTime = 0; - - private long queryWriteResultConsumeTime = 0; - - public void setQueryBeginTime() { - this.queryBeginTime = TimeUtils.getStartTime(); - } - - public void setQueryAnalysisFinishTime() { - this.queryAnalysisFinishTime = TimeUtils.getStartTime(); - } - - public void setQueryPlanFinishTime() { - this.queryPlanFinishTime = TimeUtils.getStartTime(); - } - - public void setQueryScheduleFinishTime() { - this.queryScheduleFinishTime = TimeUtils.getStartTime(); - } - - public void setQueryFetchResultFinishTime() { - this.queryFetchResultFinishTime = TimeUtils.getStartTime(); - } - - public void setTempStartTime() { - this.tempStarTime = TimeUtils.getStartTime(); - } - - public void freshFetchResultConsumeTime() { - this.queryFetchResultConsumeTime += TimeUtils.getStartTime() - tempStarTime; - } - - public void freshWriteResultConsumeTime() { - this.queryWriteResultConsumeTime += TimeUtils.getStartTime() - tempStarTime; - } - - public long getQueryBeginTime() { - return queryBeginTime; - } - - private String getPrettyQueryAnalysisFinishTime() { - if (queryBeginTime == -1 || queryAnalysisFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryAnalysisFinishTime - queryBeginTime, TUnit.TIME_NS); - } - - private String getPrettyQueryPlanFinishTime() { - if (queryAnalysisFinishTime == -1 || queryPlanFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryPlanFinishTime - queryAnalysisFinishTime, TUnit.TIME_NS); - } - - private String getPrettyQueryScheduleFinishTime() { - if (queryPlanFinishTime == -1 || queryScheduleFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryScheduleFinishTime - queryPlanFinishTime, TUnit.TIME_NS); - } - - private String getPrettyQueryFetchResultFinishTime() { - if (queryScheduleFinishTime == -1 || queryFetchResultFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryFetchResultFinishTime - queryScheduleFinishTime, TUnit.TIME_NS); - } - - public void initRuntimeProfile(RuntimeProfile plannerProfile) { - plannerProfile.addInfoString(KEY_ANALYSIS, getPrettyQueryAnalysisFinishTime()); - plannerProfile.addInfoString(KEY_PLAN, getPrettyQueryPlanFinishTime()); - plannerProfile.addInfoString(KEY_SCHEDULE, getPrettyQueryScheduleFinishTime()); - plannerProfile.addInfoString(KEY_FETCH, - RuntimeProfile.printCounter(queryFetchResultConsumeTime, TUnit.TIME_NS)); - plannerProfile.addInfoString(KEY_WRITE, - RuntimeProfile.printCounter(queryWriteResultConsumeTime, TUnit.TIME_NS)); - plannerProfile.addInfoString(KEY_WAIT_AND_FETCH, getPrettyQueryFetchResultFinishTime()); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index 6073ef35f9..075893e498 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -174,7 +174,7 @@ public class RuntimeProfile { LOG.error("Cannot update counters with the same name but different types" + " type=" + tcounter.type); } else { - counter.setValue(tcounter.value); + counter.setValue(tcounter.type, tcounter.value); } } } @@ -349,6 +349,15 @@ public class RuntimeProfile { } break; } + case TIME_MS: { + if (tmpValue >= DebugUtil.THOUSAND) { + // If the time is over a second, print it up to ms. + DebugUtil.printTimeMs(tmpValue, builder); + } else { + builder.append(tmpValue).append("ms"); + } + break; + } case BYTES: { Pair pair = DebugUtil.getByteUint(tmpValue); Formatter fmt = new Formatter(); @@ -505,3 +514,4 @@ public class RuntimeProfile { return infoStrings; } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java index 9d73d0b368..72443b5a26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java @@ -116,12 +116,12 @@ public class TimeUtils { } } - public static long getStartTime() { - return System.nanoTime(); + public static long getStartTimeMs() { + return System.currentTimeMillis(); } - public static long getEstimatedTime(long startTime) { - return System.nanoTime() - startTime; + public static long getElapsedTimeMs(long startTime) { + return System.currentTimeMillis() - startTime; } public static synchronized String getCurrentFormatTime() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/QueryProfileController.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/QueryProfileController.java index 775d59c0aa..674c6d9014 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/QueryProfileController.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/QueryProfileController.java @@ -17,6 +17,7 @@ package org.apache.doris.httpv2.controller; +import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.util.ProfileManager; import org.apache.doris.httpv2.entity.ResponseBody; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; @@ -70,50 +71,25 @@ public class QueryProfileController extends BaseController { private void addFinishedQueryInfo(Map result) { List> finishedQueries = ProfileManager.getInstance().getAllQueries(); List columnHeaders = Lists.newLinkedList(); - columnHeaders.addAll(ProfileManager.PROFILE_HEADERS); - columnHeaders.addAll(ProfileManager.EXECUTION_HEADERS); - int jobIdIndex = -1; - int queryIdIndex = -1; - int queryTypeIndex = -1; - for (int i = 0; i < columnHeaders.size(); ++i) { - if (columnHeaders.get(i).equals(ProfileManager.JOB_ID)) { - jobIdIndex = i; - continue; - } - if (columnHeaders.get(i).equals(ProfileManager.QUERY_ID)) { - queryIdIndex = i; - continue; - } - if (columnHeaders.get(i).equals(ProfileManager.QUERY_TYPE)) { - queryTypeIndex = i; - continue; - } - } - // set href as the first column - columnHeaders.add(0, DETAIL_COL); + columnHeaders.addAll(SummaryProfile.SUMMARY_KEYS); result.put("column_names", columnHeaders); - result.put("href_column", Lists.newArrayList(DETAIL_COL)); + // The first column is profile id, which is also a href column + result.put("href_column", Lists.newArrayList(columnHeaders.get(0))); List> list = Lists.newArrayList(); result.put("rows", list); for (List row : finishedQueries) { - List realRow = Lists.newLinkedList(row); - - String queryType = realRow.get(queryTypeIndex); - String id = (QUERY_ID_TYPES.contains(queryType)) ? realRow.get(queryIdIndex) : realRow.get(jobIdIndex); - - realRow.add(0, id); Map rowMap = new HashMap<>(); - for (int i = 0; i < realRow.size(); ++i) { - rowMap.put(columnHeaders.get(i), realRow.get(i)); + for (int i = 0; i < row.size(); ++i) { + rowMap.put(columnHeaders.get(i), row.get(i)); } // add hyper link - if (Strings.isNullOrEmpty(id)) { + if (Strings.isNullOrEmpty(row.get(0))) { rowMap.put("__hrefPaths", Lists.newArrayList("/query_profile/-1")); } else { - rowMap.put("__hrefPaths", Lists.newArrayList("/query_profile/" + id)); + rowMap.put("__hrefPaths", Lists.newArrayList("/query_profile/" + row.get(0))); } list.add(rowMap); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index ac1dec8d7a..dfd213b047 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -158,9 +158,6 @@ public class ExportJob implements Writable { private String sql = ""; - // If set to true, the profile of export job with be pushed to ProfileManager - private volatile boolean enableProfile = false; - // The selectStmt is sql 'select ... into outfile ...' @Getter private List selectStmtList = Lists.newArrayList(); @@ -220,7 +217,6 @@ public class ExportJob implements Writable { this.exportPath = path; this.sessionVariables = stmt.getSessionVariables(); this.timeoutSecond = sessionVariables.getQueryTimeoutS(); - this.enableProfile = sessionVariables.enableProfile(); this.qualifiedUser = stmt.getQualifiedUser(); this.userIdentity = stmt.getUserIdentity(); @@ -619,10 +615,6 @@ public class ExportJob implements Writable { return queryId; } - public boolean getEnableProfile() { - return enableProfile; - } - @Override public String toString() { return "ExportJob [jobId=" + id diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 01bbd795f4..bfdce1dc8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -31,12 +31,13 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.Profile; +import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.MetaLockUtils; -import org.apache.doris.common.util.ProfileManager; -import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.util.ProfileManager.ProfileType; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey; @@ -73,7 +74,7 @@ public class BrokerLoadJob extends BulkLoadJob { private static final Logger LOG = LogManager.getLogger(BrokerLoadJob.class); // Profile of this load job, including all tasks' profiles - private RuntimeProfile jobProfile; + private Profile jobProfile; // If set to true, the profile of load job with be pushed to ProfileManager private boolean enableProfile = false; @@ -188,7 +189,7 @@ public class BrokerLoadJob extends BulkLoadJob { Lists.newArrayList(fileGroupAggInfo.getAllTableIds())); // divide job into broker loading task by table List newLoadingTasks = Lists.newArrayList(); - this.jobProfile = new RuntimeProfile("BrokerLoadJob " + id + ". " + label); + this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + label, true); MetaLockUtils.readLockTables(tableList); try { for (Map.Entry> entry @@ -314,27 +315,24 @@ public class BrokerLoadJob extends BulkLoadJob { if (!enableProfile) { return; } + jobProfile.update(createTimestamp, getSummaryInfo(true), true); + } - RuntimeProfile summaryProfile = new RuntimeProfile("Summary"); - summaryProfile.addInfoString(ProfileManager.JOB_ID, String.valueOf(this.id)); - summaryProfile.addInfoString(ProfileManager.QUERY_ID, this.queryId); - summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(createTimestamp)); - summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(finishTimestamp)); - summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, - DebugUtil.getPrettyStringMs(finishTimestamp - createTimestamp)); - - summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Load"); - summaryProfile.addInfoString(ProfileManager.QUERY_STATE, "N/A"); - summaryProfile.addInfoString(ProfileManager.USER, - getUserInfo() != null ? getUserInfo().getQualifiedUser() : "N/A"); - summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, getDefaultDb()); - summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, this.getOriginStmt().originStmt); - summaryProfile.addInfoString(ProfileManager.IS_CACHED, "N/A"); - - // Add the summary profile to the first - jobProfile.addFirstChild(summaryProfile); - jobProfile.computeTimeInChildProfile(); - ProfileManager.getInstance().pushProfile(jobProfile); + private Map getSummaryInfo(boolean isFinished) { + long currentTimestamp = System.currentTimeMillis(); + SummaryBuilder builder = new SummaryBuilder(); + builder.profileId(String.valueOf(id)); + builder.taskType(ProfileType.LOAD.name()); + builder.startTime(TimeUtils.longToTimeString(createTimestamp)); + if (isFinished) { + builder.endTime(TimeUtils.longToTimeString(currentTimestamp)); + builder.totalTime(DebugUtil.getPrettyStringMs(currentTimestamp - createTimestamp)); + } + builder.taskState("FINISHED"); + builder.user(getUserInfo() != null ? getUserInfo().getQualifiedUser() : "N/A"); + builder.defaultDb(getDefaultDb()); + builder.sqlStatement(getOriginStmt().originStmt); + return builder.build(); } private String getDefaultDb() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index cc8248004c..677bd449e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -33,7 +33,6 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.annotation.LogException; import org.apache.doris.common.io.Text; -import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.SqlParserUtils; @@ -77,11 +76,9 @@ public abstract class BulkLoadJob extends LoadJob { // input params protected BrokerDesc brokerDesc; - // queryId of OriginStatement - protected String queryId; // this param is used to persist the expr of columns // the origin stmt is persisted instead of columns expr - // the expr of columns will be reanalyze when the log is replayed + // the expr of columns will be reanalyzed when the log is replayed private OriginStatement originStmt; // include broker desc and data desc @@ -104,11 +101,9 @@ public abstract class BulkLoadJob extends LoadJob { this.userInfo = userInfo; if (ConnectContext.get() != null) { - this.queryId = DebugUtil.printId(ConnectContext.get().queryId()); SessionVariable var = ConnectContext.get().getSessionVariable(); sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode())); } else { - this.queryId = "N/A"; sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index cb90c075ce..13e8a5beaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -25,11 +25,10 @@ import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.Profile; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; -import org.apache.doris.common.util.RuntimeProfile; -import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.FailMsg; import org.apache.doris.qe.Coordinator; @@ -74,7 +73,7 @@ public class LoadLoadingTask extends LoadTask { private LoadingTaskPlanner planner; - private RuntimeProfile jobProfile; + private Profile jobProfile; private long beginTime; public LoadLoadingTask(Database db, OlapTable table, @@ -82,7 +81,7 @@ public class LoadLoadingTask extends LoadTask { long jobDeadlineMs, long execMemLimit, boolean strictMode, long txnId, LoadTaskCallback callback, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, - boolean loadZeroTolerance, RuntimeProfile profile, boolean singleTabletLoadPerSink, + boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink, boolean useNewLoadScanNode) { super(callback, TaskType.LOADING); this.db = db; @@ -100,7 +99,7 @@ public class LoadLoadingTask extends LoadTask { this.loadParallelism = loadParallelism; this.sendBatchParallelism = sendBatchParallelism; this.loadZeroTolerance = loadZeroTolerance; - this.jobProfile = profile; + this.jobProfile = jobProfile; this.singleTabletLoadPerSink = singleTabletLoadPerSink; this.useNewLoadScanNode = useNewLoadScanNode; } @@ -123,7 +122,7 @@ public class LoadLoadingTask extends LoadTask { LOG.info("begin to execute loading task. load id: {} job id: {}. db: {}, tbl: {}. left retry: {}", DebugUtil.printId(loadId), callback.getCallbackId(), db.getFullName(), table.getName(), retryTime); retryTime--; - beginTime = System.nanoTime(); + beginTime = System.currentTimeMillis(); if (!((BrokerLoadJob) callback).updateState(JobState.LOADING)) { // job may already be cancelled return; @@ -135,9 +134,13 @@ public class LoadLoadingTask extends LoadTask { // New one query id, Coordinator curCoordinator = new Coordinator(callback.getCallbackId(), loadId, planner.getDescTable(), planner.getFragments(), planner.getScanNodes(), planner.getTimezone(), loadZeroTolerance); + if (this.jobProfile != null) { + this.jobProfile.addExecutionProfile(curCoordinator.getExecutionProfile()); + } curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); curCoordinator.setExecPipEngine(Config.enable_pipeline_load); + /* * For broker load job, user only need to set mem limit by 'exec_mem_limit' property. * And the variable 'load_mem_limit' does not make any effect. @@ -200,9 +203,7 @@ public class LoadLoadingTask extends LoadTask { return; } // Summary profile - coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTime)); - coord.endProfile(); - jobProfile.addChild(coord.getQueryProfile()); + coord.getExecutionProfile().update(beginTime, true); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 4235a96bb5..334983b0c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -181,7 +181,7 @@ public class NereidsPlanner extends Planner { } if (statementContext.getConnectContext().getExecutor() != null) { - statementContext.getConnectContext().getExecutor().getPlannerProfile().setQueryAnalysisFinishTime(); + statementContext.getConnectContext().getExecutor().getSummaryProfile().setQueryAnalysisFinishTime(); } if (explainLevel == ExplainLevel.ANALYZED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 018176bbb1..50546cc2cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -651,7 +651,7 @@ public class ConnectProcessor { && (executor.getParsedStmt() instanceof QueryStmt // currently only QueryStmt and insert need profile || executor.getParsedStmt() instanceof LogicalPlanAdapter || executor.getParsedStmt() instanceof InsertStmt)) { - executor.writeProfile(true); + executor.updateProfile(true); StatsErrorEstimator statsErrorEstimator = ConnectContext.get().getStatsErrorEstimator(); if (statsErrorEstimator != null) { statsErrorEstimator.updateProfile(ConnectContext.get().queryId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 87282dcaac..7becc8d35c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -24,17 +24,16 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.Config; -import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.ExecutionProfile; import org.apache.doris.common.telemetry.ScopedSpan; import org.apache.doris.common.telemetry.Telemetry; import org.apache.doris.common.util.ConsistentHash; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ListUtil; -import org.apache.doris.common.util.ProfileWriter; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.VectorizedUtil; @@ -189,14 +188,6 @@ public class Coordinator { // Once this is set to true, errors from remote fragments are ignored. private boolean returnedAllResults; - private RuntimeProfile queryProfile; - - private RuntimeProfile fragmentsProfile; - private List fragmentProfile; - private RuntimeProfile loadChannelProfile; - - private ProfileWriter profileWriter; - // populated in computeFragmentExecParams() private final Map fragmentExecParamsMap = Maps.newHashMap(); @@ -219,8 +210,6 @@ public class Coordinator { // set in computeFragmentExecParams(); // same as backend_exec_states_.size() after Exec() private final Set instanceIds = Sets.newHashSet(); - // instance id -> dummy value - private MarkedCountDownLatch profileDoneSignal; private final boolean isBlockQuery; @@ -270,6 +259,12 @@ public class Coordinator { private List tResourceGroups = Lists.newArrayList(); + private final ExecutionProfile executionProfile; + + public ExecutionProfile getExecutionProfile() { + return executionProfile; + } + private static class BackendHash implements Funnel { @Override public void funnel(Backend backend, PrimitiveSink primitiveSink) { @@ -289,13 +284,14 @@ public class Coordinator { } } + // Used for query/insert public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner, StatsErrorEstimator statsErrorEstimator) { this(context, analyzer, planner); this.statsErrorEstimator = statsErrorEstimator; } - // Used for query/insert + // Used for query/insert/test public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.isBlockQuery = planner.isBlockQuery(); this.queryId = context.queryId(); @@ -350,12 +346,13 @@ public class Coordinator { nextInstanceId.setLo(queryId.lo + 1); this.assignedRuntimeFilters = planner.getRuntimeFilters(); this.tResourceGroups = analyzer == null ? null : analyzer.getResourceGroups(); + this.executionProfile = new ExecutionProfile(queryId, fragments.size()); + } // Used for broker load task/export task/update coordinator - public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, - List fragments, List scanNodes, String timezone, - boolean loadZeroTolerance) { + public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List fragments, + List scanNodes, String timezone, boolean loadZeroTolerance) { this.isBlockQuery = true; this.jobId = jobId; this.queryId = queryId; @@ -372,6 +369,7 @@ public class Coordinator { this.nextInstanceId = new TUniqueId(); nextInstanceId.setHi(queryId.hi); nextInstanceId.setLo(queryId.lo + 1); + this.executionProfile = new ExecutionProfile(queryId, fragments.size()); } private void setFromUserProperty(ConnectContext connectContext) { @@ -427,18 +425,6 @@ public class Coordinator { return queryStatus; } - public RuntimeProfile getQueryProfile() { - return queryProfile; - } - - public ProfileWriter getProfileWriter() { - return profileWriter; - } - - public void setProfileWriter(ProfileWriter profileWriter) { - this.profileWriter = profileWriter; - } - public List getDeltaUrls() { return deltaUrls; } @@ -525,20 +511,6 @@ public class Coordinator { coordAddress = new TNetworkAddress(localIP, Config.rpc_port); - int fragmentSize = fragments.size(); - queryProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId)); - - fragmentsProfile = new RuntimeProfile("Fragments"); - queryProfile.addChild(fragmentsProfile); - fragmentProfile = new ArrayList(); - for (int i = 0; i < fragmentSize; i++) { - fragmentProfile.add(new RuntimeProfile("Fragment " + i)); - fragmentsProfile.addChild(fragmentProfile.get(i)); - } - - loadChannelProfile = new RuntimeProfile("LoadChannels"); - queryProfile.addChild(loadChannelProfile); - this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend(); if (LOG.isDebugEnabled()) { LOG.debug("idToBackend size={}", idToBackend.size()); @@ -636,14 +608,7 @@ public class Coordinator { relatedBackendIds); LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet()); } - - // to keep things simple, make async Cancel() calls wait until plan fragment - // execution has been initiated, otherwise we might try to cancel fragment - // execution at backends where it hasn't even started - profileDoneSignal = new MarkedCountDownLatch(instanceIds.size()); - for (TUniqueId instanceId : instanceIds) { - profileDoneSignal.addMark(instanceId, -1L /* value is meaningless */); - } + executionProfile.markInstances(instanceIds); if (!isPointQuery) { if (enablePipelineEngine) { sendPipelineCtx(); @@ -736,7 +701,8 @@ public class Coordinator { for (TExecPlanFragmentParams tParam : tParams) { BackendExecState execState = new BackendExecState(fragment.getFragmentId(), instanceId++, - profileFragmentId, tParam, this.addressToBackendID, loadChannelProfile); + profileFragmentId, tParam, this.addressToBackendID, + executionProfile.getLoadChannelProfile()); // Each tParam will set the total number of Fragments that need to be executed on the same BE, // and the BE will determine whether all Fragments have been executed based on this information. // Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons. @@ -1273,12 +1239,7 @@ public class Coordinator { return; } cancelRemoteFragmentsAsync(cancelReason); - if (profileDoneSignal != null) { - // count down to zero to notify all objects waiting for this - profileDoneSignal.countDownToZero(new Status()); - LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks() - .stream().map(e -> DebugUtil.printId(e.getKey())).toArray()); - } + executionProfile.onCancel(); } private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason) { @@ -2164,7 +2125,7 @@ public class Coordinator { if (params.isSetErrorTabletInfos()) { updateErrorTabletInfos(params.getErrorTabletInfos()); } - profileDoneSignal.markedCountDown(params.getFragmentInstanceId(), -1L); + executionProfile.markOneInstanceDone(params.getFragmentInstanceId()); } if (params.isSetLoadedRows()) { @@ -2222,7 +2183,7 @@ public class Coordinator { if (params.isSetErrorTabletInfos()) { updateErrorTabletInfos(params.getErrorTabletInfos()); } - profileDoneSignal.markedCountDown(params.getFragmentInstanceId(), -1L); + executionProfile.markOneInstanceDone(params.getFragmentInstanceId()); } if (params.isSetLoadedRows()) { @@ -2233,35 +2194,6 @@ public class Coordinator { } } - public void endProfile() { - endProfile(true); - } - - public void endProfile(boolean waitProfileDone) { - if (enablePipelineEngine) { - if (pipelineExecContexts.isEmpty()) { - return; - } - } else { - if (backendExecStates.isEmpty()) { - return; - } - } - - // Wait for all backends to finish reporting when writing profile last time. - if (waitProfileDone && needReport) { - try { - profileDoneSignal.await(2, TimeUnit.SECONDS); - } catch (InterruptedException e1) { - LOG.warn("signal await error", e1); - } - } - - for (int i = 1; i < fragmentProfile.size(); ++i) { - fragmentProfile.get(i).sortChildren(); - } - } - /* * Waiting the coordinator finish executing. * return false if waiting timeout. @@ -2284,7 +2216,7 @@ public class Coordinator { long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime); boolean awaitRes = false; try { - awaitRes = profileDoneSignal.await(waitTime, TimeUnit.SECONDS); + awaitRes = executionProfile.awaitAllInstancesDone(waitTime); } catch (InterruptedException e) { // Do nothing } @@ -2327,7 +2259,7 @@ public class Coordinator { } public boolean isDone() { - return profileDoneSignal.getCount() == 0; + return executionProfile.isAllInstancesDone(); } // map from an impalad host address to the per-node assigned scan ranges; @@ -2577,7 +2509,7 @@ public class Coordinator { volatile boolean done; boolean hasCanceled; int profileFragmentId; - RuntimeProfile profile; + RuntimeProfile instanceProfile; RuntimeProfile loadChannelProfile; TNetworkAddress brpcAddress; TNetworkAddress address; @@ -2601,7 +2533,7 @@ public class Coordinator { String name = "Instance " + DebugUtil.printId(fi.instanceId) + " (host=" + address + ")"; this.loadChannelProfile = loadChannelProfile; - this.profile = new RuntimeProfile(name); + this.instanceProfile = new RuntimeProfile(name); this.hasCanceled = false; this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); } @@ -2628,7 +2560,7 @@ public class Coordinator { return false; } if (params.isSetProfile()) { - profile.update(params.profile); + instanceProfile.update(params.profile); } if (params.isSetLoadChannelProfile()) { loadChannelProfile.update(params.loadChannelProfile); @@ -2641,8 +2573,8 @@ public class Coordinator { } public synchronized void printProfile(StringBuilder builder) { - this.profile.computeTimeInProfile(); - this.profile.prettyPrint(builder, ""); + this.instanceProfile.computeTimeInProfile(); + this.instanceProfile.prettyPrint(builder, ""); } // cancel the fragment instance. @@ -2695,7 +2627,7 @@ public class Coordinator { LOG.warn("profileFragmentId {} should be in [0, {})", profileFragmentId, maxFragmentId); return false; } - profile.computeTimeInProfile(); + instanceProfile.computeTimeInProfile(); return true; } @@ -3378,18 +3310,13 @@ public class Coordinator { private void attachInstanceProfileToFragmentProfile() { if (enablePipelineEngine) { for (PipelineExecContext ctx : pipelineExecContexts.values()) { - if (!ctx.computeTimeInProfile(fragmentProfile.size())) { - return; - } ctx.fragmentInstancesMap.values().stream() - .forEach(p -> fragmentProfile.get(ctx.profileFragmentId).addChild(p)); + .forEach(p -> executionProfile.addInstanceProfile(ctx.profileFragmentId, p)); } } else { for (BackendExecState backendExecState : backendExecStates) { - if (!backendExecState.computeTimeInProfile(fragmentProfile.size())) { - return; - } - fragmentProfile.get(backendExecState.profileFragmentId).addChild(backendExecState.profile); + executionProfile.addInstanceProfile(backendExecState.profileFragmentId, + backendExecState.instanceProfile); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 9b20fe4f7f..eea008f143 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -20,8 +20,8 @@ package org.apache.doris.qe; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.ExecutionProfile; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.ProfileWriter; import org.apache.doris.metric.MetricRepo; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueryType; @@ -163,16 +163,12 @@ public final class QeProcessorImpl implements QeProcessor { continue; } final String queryIdStr = DebugUtil.printId(info.getConnectContext().queryId()); - final QueryStatisticsItem item = new QueryStatisticsItem.Builder() - .queryId(queryIdStr) - .queryStartTime(info.getStartExecTime()) - .sql(info.getSql()) - .user(context.getQualifiedUser()) - .connId(String.valueOf(context.getConnectionId())) - .db(context.getDatabase()) + final QueryStatisticsItem item = new QueryStatisticsItem.Builder().queryId(queryIdStr) + .queryStartTime(info.getStartExecTime()).sql(info.getSql()).user(context.getQualifiedUser()) + .connId(String.valueOf(context.getConnectionId())).db(context.getDatabase()) .catalog(context.getDefaultCatalog()) .fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos()) - .profile(info.getCoord().getQueryProfile()) + .profile(info.getCoord().getExecutionProfile().getExecutionProfile()) .isReportSucc(context.getSessionVariable().enableProfile()).build(); querySet.put(queryIdStr, item); } @@ -203,7 +199,7 @@ public final class QeProcessorImpl implements QeProcessor { } try { info.getCoord().updateFragmentExecStatus(params); - if (info.getCoord().getProfileWriter() != null && params.isSetProfile()) { + if (params.isSetProfile()) { writeProfileExecutor.submit(new WriteProfileTask(params, info)); } } catch (Exception e) { @@ -276,10 +272,8 @@ public final class QeProcessorImpl implements QeProcessor { return; } - ProfileWriter profileWriter = info.getCoord().getProfileWriter(); - if (profileWriter != null) { - profileWriter.writeProfile(false); - } + ExecutionProfile executionProfile = info.getCoord().getExecutionProfile(); + executionProfile.update(-1, false); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c70554b1c7..d77d21e8b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -82,14 +82,13 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.NereidsException; import org.apache.doris.common.UserException; -import org.apache.doris.common.Version; +import org.apache.doris.common.profile.Profile; +import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LiteralUtils; import org.apache.doris.common.util.MetaLockUtils; -import org.apache.doris.common.util.ProfileManager; -import org.apache.doris.common.util.ProfileWriter; -import org.apache.doris.common.util.QueryPlannerProfile; -import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.util.ProfileManager.ProfileType; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; @@ -176,7 +175,7 @@ import java.util.stream.Collectors; // Do one COM_QUERY process. // first: Parse receive byte array to statement struct. // second: Do handle function for statement. -public class StmtExecutor implements ProfileWriter { +public class StmtExecutor { private static final Logger LOG = LogManager.getLogger(StmtExecutor.class); private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0); @@ -189,11 +188,7 @@ public class StmtExecutor implements ProfileWriter { private OriginStatement originStmt; private StatementBase parsedStmt; private Analyzer analyzer; - private RuntimeProfile profile; - private RuntimeProfile summaryProfile; - private RuntimeProfile plannerRuntimeProfile; - private volatile boolean isFinishedProfile = false; - private String queryType = "Query"; + private ProfileType profileType = ProfileType.QUERY; private volatile Coordinator coord = null; private MasterOpExecutor masterOpExecutor = null; private RedirectStatus redirectStatus = null; @@ -202,12 +197,13 @@ public class StmtExecutor implements ProfileWriter { private ShowResultSet proxyResultSet = null; private Data.PQueryStatistics.Builder statisticsForAuditLog; private boolean isCached; - private QueryPlannerProfile plannerProfile = new QueryPlannerProfile(); private String stmtName; private PrepareStmt prepareStmt = null; private String mysqlLoadId; // Distinguish from prepare and execute command private boolean isExecuteStmt = false; + // The profile of this execution + private final Profile profile; // The result schema if "dry_run_query" is true. // Only one column to indicate the real return row numbers. @@ -222,8 +218,10 @@ public class StmtExecutor implements ProfileWriter { this.isProxy = isProxy; this.statementContext = new StatementContext(context, originStmt); this.context.setStatementContext(statementContext); + this.profile = new Profile("Query", this.context.getSessionVariable().enableProfile); } + // for test public StmtExecutor(ConnectContext context, String stmt) { this(context, new OriginStatement(stmt, 0), false); this.stmtName = stmt; @@ -246,6 +244,7 @@ public class StmtExecutor implements ProfileWriter { this.statementContext.setParsedStatement(parsedStmt); } this.context.setStatementContext(statementContext); + this.profile = new Profile("Query", context.getSessionVariable().enableProfile()); } private static InternalService.PDataRow getRowStringValue(List cols) throws UserException { @@ -269,74 +268,31 @@ public class StmtExecutor implements ProfileWriter { return row.build(); } - // At the end of query execution, we begin to add up profile - private void initProfile(QueryPlannerProfile plannerProfile, boolean waiteBeReport) { - RuntimeProfile queryProfile; - // when a query hits the sql cache, `coord` is null. - if (coord == null) { - queryProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(context.queryId())); - } else { - queryProfile = coord.getQueryProfile(); - } - if (profile == null) { - profile = new RuntimeProfile("Query"); - summaryProfile = new RuntimeProfile("Summary"); - profile.addChild(summaryProfile); - summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(context.getStartTime())); - updateSummaryProfile(waiteBeReport); - for (Map.Entry entry : getSummaryInfo().entrySet()) { - summaryProfile.addInfoString(entry.getKey(), entry.getValue()); - } - summaryProfile.addInfoString(ProfileManager.TRACE_ID, context.getSessionVariable().getTraceId()); - plannerRuntimeProfile = new RuntimeProfile("Execution Summary"); - summaryProfile.addChild(plannerRuntimeProfile); - profile.addChild(queryProfile); - } else { - updateSummaryProfile(waiteBeReport); - } - plannerProfile.initRuntimeProfile(plannerRuntimeProfile); - - queryProfile.getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(plannerProfile.getQueryBeginTime())); - endProfile(waiteBeReport); - } - - private void endProfile(boolean waitProfileDone) { - if (context != null && context.getSessionVariable().enableProfile() && coord != null) { - coord.endProfile(waitProfileDone); - } - } - - private void updateSummaryProfile(boolean waiteBeReport) { - Preconditions.checkNotNull(summaryProfile); + private Map getSummaryInfo(boolean isFinished) { long currentTimestamp = System.currentTimeMillis(); - long totalTimeMs = currentTimestamp - context.getStartTime(); - summaryProfile.addInfoString(ProfileManager.END_TIME, - waiteBeReport ? TimeUtils.longToTimeString(currentTimestamp) : "N/A"); - summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs)); - summaryProfile.addInfoString(ProfileManager.QUERY_STATE, - !waiteBeReport && context.getState().getStateType().equals(MysqlStateType.OK) ? "RUNNING" : - context.getState().toString()); - } + SummaryBuilder builder = new SummaryBuilder(); + builder.profileId(DebugUtil.printId(context.queryId())); + builder.taskType(profileType.name()); + builder.startTime(TimeUtils.longToTimeString(context.getStartTime())); + if (isFinished) { + builder.endTime(TimeUtils.longToTimeString(currentTimestamp)); + builder.totalTime(DebugUtil.getPrettyStringMs(currentTimestamp - context.getStartTime())); + } + builder.taskState(!isFinished && context.getState().getStateType().equals(MysqlStateType.OK) ? "RUNNING" + : context.getState().toString()); + builder.user(context.getQualifiedUser()); + builder.defaultDb(context.getDatabase()); + builder.sqlStatement(originStmt.originStmt); + builder.isCached(isCached ? "Yes" : "No"); - private Map getSummaryInfo() { - Map infos = Maps.newLinkedHashMap(); - infos.put(ProfileManager.JOB_ID, "N/A"); - infos.put(ProfileManager.QUERY_ID, DebugUtil.printId(context.queryId())); - infos.put(ProfileManager.QUERY_TYPE, queryType); - infos.put(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION); - infos.put(ProfileManager.USER, context.getQualifiedUser()); - infos.put(ProfileManager.DEFAULT_DB, context.getDatabase()); - infos.put(ProfileManager.SQL_STATEMENT, originStmt.originStmt); - infos.put(ProfileManager.IS_CACHED, isCached ? "Yes" : "No"); - - Map beToInstancesNum = - coord == null ? Maps.newTreeMap() : coord.getBeToInstancesNum(); - infos.put(ProfileManager.TOTAL_INSTANCES_NUM, - String.valueOf(beToInstancesNum.values().stream().reduce(0, Integer::sum))); - infos.put(ProfileManager.INSTANCES_NUM_PER_BE, beToInstancesNum.toString()); - infos.put(ProfileManager.PARALLEL_FRAGMENT_EXEC_INSTANCE, - String.valueOf(context.sessionVariable.parallelExecInstanceNum)); - return infos; + Map beToInstancesNum = coord == null ? Maps.newTreeMap() : coord.getBeToInstancesNum(); + builder.totalInstancesNum(String.valueOf(beToInstancesNum.values().stream().reduce(0, Integer::sum))); + builder.instancesNumPerBe( + beToInstancesNum.entrySet().stream().map(entry -> entry.getKey() + ":" + entry.getValue()) + .collect(Collectors.joining(","))); + builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.parallelExecInstanceNum)); + builder.traceId(context.getSessionVariable().getTraceId()); + return builder.build(); } public void addProfileToSpan() { @@ -344,7 +300,7 @@ public class StmtExecutor implements ProfileWriter { if (!span.isRecording()) { return; } - for (Map.Entry entry : getSummaryInfo().entrySet()) { + for (Map.Entry entry : getSummaryInfo(true).entrySet()) { span.setAttribute(entry.getKey(), entry.getValue()); } } @@ -490,7 +446,7 @@ public class StmtExecutor implements ProfileWriter { LOG.info("Nereids start to execute query:\n {}", originStmt.originStmt); context.setQueryId(queryId); context.setStartTime(); - plannerProfile.setQueryBeginTime(); + profile.getSummaryProfile().setQueryBeginTime(); context.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); parseByNereids(); Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter, @@ -549,7 +505,7 @@ public class StmtExecutor implements ProfileWriter { if (checkBlockRules()) { return; } - plannerProfile.setQueryPlanFinishTime(); + profile.getSummaryProfile().setQueryPlanFinishTime(); handleQueryWithRetry(queryId); } } @@ -595,7 +551,7 @@ public class StmtExecutor implements ProfileWriter { // The final profile report occurs after be returns the query data, and the profile cannot be // received after unregisterQuery(), causing the instance profile to be lost, so we should wait // for the profile before unregisterQuery(). - endProfile(true); + updateProfile(true); QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); } } @@ -610,7 +566,7 @@ public class StmtExecutor implements ProfileWriter { public void executeByLegacy(TUniqueId queryId) throws Exception { context.setStartTime(); - plannerProfile.setQueryBeginTime(); + profile.getSummaryProfile().setQueryBeginTime(); context.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); context.setQueryId(queryId); // set isQuery first otherwise this state will be lost if some error occurs @@ -687,10 +643,10 @@ public class StmtExecutor implements ProfileWriter { handleCtasStmt(); } else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InsertStmt is its subclass try { - handleInsertStmt(); if (!((InsertStmt) parsedStmt).getQueryStmt().isExplain()) { - queryType = "Load"; + profileType = ProfileType.LOAD; } + handleInsertStmt(); } catch (Throwable t) { LOG.warn("handle insert stmt fail: {}", t.getMessage()); // the transaction of this insert may already begin, we will abort it at outer finally block. @@ -801,27 +757,18 @@ public class StmtExecutor implements ProfileWriter { } } - @Override - public void writeProfile(boolean isLastWriteProfile) { + public void updateProfile(boolean isFinished) { if (!context.getSessionVariable().enableProfile()) { return; } - synchronized (writeProfileLock) { - if (isFinishedProfile) { - return; - } - initProfile(plannerProfile, isLastWriteProfile); - profile.computeTimeInChildProfile(); - ProfileManager.getInstance().pushProfile(profile); - isFinishedProfile = isLastWriteProfile; - } + profile.update(context.startTime, getSummaryInfo(isFinished), isFinished); } // Analyze one statement to structure in memory. public void analyze(TQueryOptions tQueryOptions) throws UserException { if (LOG.isDebugEnabled()) { - LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}", - context.getStmtId(), context.getForwardedStmtId()); + LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}", context.getStmtId(), + context.getForwardedStmtId()); } parseByLegacy(); @@ -1071,15 +1018,12 @@ public class StmtExecutor implements ProfileWriter { } } } - plannerProfile.setQueryAnalysisFinishTime(); + profile.getSummaryProfile().setQueryAnalysisFinishTime(); planner = new OriginalPlanner(analyzer); if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) { planner.plan(parsedStmt, tQueryOptions); } - // TODO(zc): - // Preconditions.checkState(!analyzer.hasUnassignedConjuncts()); - - plannerProfile.setQueryPlanFinishTime(); + profile.getSummaryProfile().setQueryPlanFinishTime(); } private void resetAnalyzerAndStmt() { @@ -1333,7 +1277,7 @@ public class StmtExecutor implements ProfileWriter { coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); - coord.setProfileWriter(this); + profile.addExecutionProfile(coord.getExecutionProfile()); Span queryScheduleSpan = context.getTracer().spanBuilder("query schedule").setParent(Context.current()).startSpan(); try (Scope scope = queryScheduleSpan.makeCurrent()) { @@ -1344,15 +1288,15 @@ public class StmtExecutor implements ProfileWriter { } finally { queryScheduleSpan.end(); } - plannerProfile.setQueryScheduleFinishTime(); - writeProfile(false); + profile.getSummaryProfile().setQueryScheduleFinishTime(); + updateProfile(false); Span fetchResultSpan = context.getTracer().spanBuilder("fetch result").setParent(Context.current()).startSpan(); try (Scope scope = fetchResultSpan.makeCurrent()) { while (true) { // register the fetch result time. - plannerProfile.setTempStartTime(); + profile.getSummaryProfile().setTempStartTime(); batch = coord.getNext(); - plannerProfile.freshFetchResultConsumeTime(); + profile.getSummaryProfile().freshFetchResultConsumeTime(); // for outfile query, there will be only one empty batch send back with eos flag if (batch.getBatch() != null) { @@ -1361,7 +1305,7 @@ public class StmtExecutor implements ProfileWriter { } // register send field result time. - plannerProfile.setTempStartTime(); + profile.getSummaryProfile().setTempStartTime(); // For some language driver, getting error packet after fields packet // will be recognized as a success result // so We need to send fields after first batch arrived @@ -1376,7 +1320,7 @@ public class StmtExecutor implements ProfileWriter { for (ByteBuffer row : batch.getBatch().getRows()) { channel.sendOnePacket(row); } - plannerProfile.freshWriteResultConsumeTime(); + profile.getSummaryProfile().freshWriteResultConsumeTime(); context.updateReturnRows(batch.getBatch().getRows().size()); context.setResultAttachedInfo(batch.getBatch().getAttachedInfos()); } @@ -1413,7 +1357,7 @@ public class StmtExecutor implements ProfileWriter { statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder(); context.getState().setEof(); - plannerProfile.setQueryFetchResultFinishTime(); + profile.getSummaryProfile().setQueryFetchResultFinishTime(); } catch (Exception e) { // notify all be cancel runing fragment // in some case may block all fragment handle threads @@ -1686,6 +1630,7 @@ public class StmtExecutor implements ProfileWriter { coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict()); coord.setQueryType(TQueryType.LOAD); + profile.addExecutionProfile(coord.getExecutionProfile()); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); @@ -1775,7 +1720,7 @@ public class StmtExecutor implements ProfileWriter { */ throwable = t; } finally { - endProfile(true); + updateProfile(true); QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); } @@ -2210,6 +2155,7 @@ public class StmtExecutor implements ProfileWriter { planner.getFragments(); RowBatch batch; coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); + profile.addExecutionProfile(coord.getExecutionProfile()); try { QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); @@ -2217,7 +2163,6 @@ public class StmtExecutor implements ProfileWriter { LOG.warn(e.getMessage(), e); } - coord.setProfileWriter(this); Span queryScheduleSpan = context.getTracer() .spanBuilder("internal SQL schedule").setParent(Context.current()).startSpan(); try (Scope scope = queryScheduleSpan.makeCurrent()) { @@ -2273,8 +2218,8 @@ public class StmtExecutor implements ProfileWriter { return resultRows; } - public QueryPlannerProfile getPlannerProfile() { - return plannerProfile; + public SummaryProfile getSummaryProfile() { + return profile.getSummaryProfile(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java index ba0843d5e6..9f96bd689b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java @@ -21,11 +21,6 @@ import org.apache.doris.analysis.OutFileClause; import org.apache.doris.analysis.QueryStmt; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; -import org.apache.doris.common.Version; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.ProfileManager; -import org.apache.doris.common.util.RuntimeProfile; -import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.ExportFailMsg; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportJob.JobState; @@ -48,10 +43,6 @@ public class ExportExportingTask extends MasterTask { private static final Logger LOG = LogManager.getLogger(ExportExportingTask.class); protected final ExportJob job; - - private RuntimeProfile profile = new RuntimeProfile("Export"); - private List fragmentProfiles = Lists.newArrayList(); - private StmtExecutor stmtExecutor; public ExportExportingTask(ExportJob job) { @@ -123,13 +114,11 @@ public class ExportExportingTask extends MasterTask { LOG.info("Exporting task progress is {}%, export job: {}", progress, job.getId()); if (isFailed) { - registerProfile(); job.cancel(errorMsg.getCancelType(), errorMsg.getMsg()); LOG.warn("Exporting task failed because Exception: {}", errorMsg.getMsg()); return; } - registerProfile(); if (job.finish(outfileInfoList)) { LOG.info("export job success. job: {}", job); // TODO(ftw): when we implement exporting tablet one by one, we should release snapshot here @@ -172,38 +161,6 @@ public class ExportExportingTask extends MasterTask { return outfileInfo; } - private void initProfile() { - profile = new RuntimeProfile("ExportJob"); - RuntimeProfile summaryProfile = new RuntimeProfile("Summary"); - summaryProfile.addInfoString(ProfileManager.JOB_ID, String.valueOf(job.getId())); - summaryProfile.addInfoString(ProfileManager.QUERY_ID, job.getQueryId()); - summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(job.getStartTimeMs())); - - long currentTimestamp = System.currentTimeMillis(); - long totalTimeMs = currentTimestamp - job.getStartTimeMs(); - summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp)); - summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs)); - - summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Export"); - summaryProfile.addInfoString(ProfileManager.QUERY_STATE, job.getState().toString()); - summaryProfile.addInfoString(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION); - summaryProfile.addInfoString(ProfileManager.USER, job.getQualifiedUser()); - summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, String.valueOf(job.getDbId())); - summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, job.getSql()); - profile.addChild(summaryProfile); - } - - private void registerProfile() { - if (!job.getEnableProfile()) { - return; - } - initProfile(); - for (RuntimeProfile p : fragmentProfiles) { - profile.addChild(p); - } - ProfileManager.getInstance().pushProfile(profile); - } - private void handleInQueueState() { long dbId = job.getDbId(); Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java index 5e1c396f57..15b4175759 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java @@ -42,9 +42,9 @@ public class RuntimeProfileTest { RuntimeProfile profile1 = new RuntimeProfile("profile1"); RuntimeProfile profile2 = new RuntimeProfile("profile2"); RuntimeProfile profile3 = new RuntimeProfile("profile3"); - profile1.getCounterTotalTime().setValue(1); - profile2.getCounterTotalTime().setValue(3); - profile3.getCounterTotalTime().setValue(2); + profile1.getCounterTotalTime().setValue(TUnit.TIME_NS, 1); + profile2.getCounterTotalTime().setValue(TUnit.TIME_NS, 3); + profile3.getCounterTotalTime().setValue(TUnit.TIME_NS, 2); profile.addChild(profile1); profile.addChild(profile2); profile.addChild(profile3); @@ -102,7 +102,7 @@ public class RuntimeProfileTest { profile.addCounter("key", TUnit.UNIT, ""); Assert.assertNotNull(profile.getCounterMap().get("key")); Assert.assertNull(profile.getCounterMap().get("key2")); - profile.getCounterMap().get("key").setValue(1); + profile.getCounterMap().get("key").setValue(TUnit.TIME_NS, 1); Assert.assertEquals(profile.getCounterMap().get("key").getValue(), 1); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/TimeUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/TimeUtilsTest.java index 5ba3c16657..8e0db1e0dc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/TimeUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/TimeUtilsTest.java @@ -55,8 +55,8 @@ public class TimeUtilsTest { @Test public void testNormal() { Assert.assertNotNull(TimeUtils.getCurrentFormatTime()); - Assert.assertNotNull(TimeUtils.getStartTime()); - Assert.assertTrue(TimeUtils.getEstimatedTime(0L) > 0); + Assert.assertNotNull(TimeUtils.getStartTimeMs()); + Assert.assertTrue(TimeUtils.getElapsedTimeMs(0L) > 0); Assert.assertEquals(-62167420800000L, TimeUtils.MIN_DATE.getTime()); Assert.assertEquals(253402185600000L, TimeUtils.MAX_DATE.getTime()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index 526fad2fd7..c64a616aaf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -33,7 +33,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.jmockit.Deencapsulation; -import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.profile.Profile; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.BrokerFileGroupAggInfo; @@ -358,11 +358,9 @@ public class BrokerLoadJobTest { fileGroups.add(brokerFileGroup); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - RuntimeProfile jobProfile = new RuntimeProfile("test"); - LoadLoadingTask task = new LoadLoadingTask(database, olapTable, brokerDesc, fileGroups, - 100, 100, false, 100, callback, "", - 100, 1, 1, true, jobProfile, false, - false); + Profile jobProfile = new Profile("test", false); + LoadLoadingTask task = new LoadLoadingTask(database, olapTable, brokerDesc, fileGroups, 100, 100, false, 100, + callback, "", 100, 1, 1, true, jobProfile, false, false); try { UserIdentity userInfo = new UserIdentity("root", "localhost"); userInfo.setIsAnalyzed(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java index eea4788f5d..2ba2291bc4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java @@ -182,14 +182,6 @@ public class SessionVariablesTest extends TestWithFeService { } }; - new Expectations(profileManager) { - { - profileManager.pushProfile((RuntimeProfile) any); - // if enable_profile=true, method pushProfile will be called once - times = 1; - } - }; - ExportExportingTask task = new ExportExportingTask(job); task.run(); Assertions.assertTrue(job.isFinalState()); @@ -197,7 +189,6 @@ public class SessionVariablesTest extends TestWithFeService { e.printStackTrace(); Assertions.fail(e.getMessage()); } - } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index 24d46ce1ca..507102fb0d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -33,7 +33,6 @@ import org.apache.doris.analysis.UseStmt; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; import org.apache.doris.common.jmockit.Deencapsulation; -import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlChannel; @@ -215,13 +214,6 @@ public class StmtExecutorTest { coordinator.exec(); minTimes = 0; - coordinator.endProfile(); - minTimes = 0; - - coordinator.getQueryProfile(); - minTimes = 0; - result = new RuntimeProfile(); - coordinator.getNext(); minTimes = 0; result = new RowBatch(); diff --git a/regression-test/suites/query_profile/test_profile.groovy b/regression-test/suites/query_profile/test_profile.groovy index 783c60b3b4..84779270eb 100644 --- a/regression-test/suites/query_profile/test_profile.groovy +++ b/regression-test/suites/query_profile/test_profile.groovy @@ -82,10 +82,8 @@ suite('test_profile') { def insert_order = len - i - 1 def stmt_query_info = obj.data.rows[i] - assertNotNull(stmt_query_info["Query ID"]) - assertNotEquals(stmt_query_info["Query ID"], "N/A") - assertNotNull(stmt_query_info["Detail"]) - assertNotEquals(stmt_query_info["Detail"], "N/A") + assertNotNull(stmt_query_info["Profile ID"]) + assertNotEquals(stmt_query_info["Profile ID"], "N/A") assertEquals(stmt_query_info['Sql Statement'].toString(), """ INSERT INTO ${table} values (${id_data[insert_order]}, "${value_data[insert_order]}") """.toString()) @@ -116,7 +114,7 @@ suite('test_profile') { for(int i = 0 ; i < QUERY_NUM ; i++){ def insert_order = QUERY_NUM - i - 1 def current_obj = show_query_profile_obj[i] - def stmt_query_info = current_obj[4] + def stmt_query_info = current_obj[8] assertNotEquals(current_obj[1].toString(), "N/A".toString()) assertEquals(stmt_query_info.toString(), """ SELECT * FROM ${table} WHERE cost ${ops[insert_order]} ${nums[insert_order]} """.toString()) } @@ -134,10 +132,8 @@ suite('test_profile') { def insert_order = QUERY_NUM - i - 1 def stmt_query_info = obj.data.rows[i] - assertNotNull(stmt_query_info["Query ID"]) - assertNotEquals(stmt_query_info["Query ID"].toString(), "N/A".toString()) - assertNotNull(stmt_query_info["Detail"]) - assertNotEquals(stmt_query_info["Detail"], "N/A") + assertNotNull(stmt_query_info["Profile ID"]) + assertNotEquals(stmt_query_info["Profile ID"].toString(), "N/A".toString()) assertEquals(stmt_query_info['Sql Statement'].toString(), """ SELECT * FROM ${table} WHERE cost ${ops[insert_order]} ${nums[insert_order]} """.toString())