From f26e3408b23314db4bf627a8df43795ae04271d4 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 27 Jul 2021 13:37:34 +0800 Subject: [PATCH] [Profile] Support show load profile for broker load job (#6214) 1. Add new statement: `SHOW LOAD PROFILE "xxx";` 2. Improve the read performance of orc scanner --- be/src/common/config.h | 1 + be/src/exec/base_scanner.h | 3 + be/src/exec/broker_scanner.cpp | 6 +- be/src/exec/buffered_reader.cpp | 27 +++- be/src/exec/buffered_reader.h | 17 ++- be/src/exec/json_scanner.cpp | 3 +- be/src/exec/orc_scanner.cpp | 4 +- be/src/exec/parquet_scanner.cpp | 4 +- be/test/exec/buffered_reader_test.cpp | 12 +- fe/fe-core/src/main/cup/sql_parser.cup | 4 + .../doris/analysis/ShowLoadProfileStmt.java | 141 ++++++++++++++++++ .../doris/analysis/ShowQueryProfileStmt.java | 10 +- .../profile/MultiProfileTreeBuilder.java | 139 +++++++++++++++++ .../doris/common/profile/PlanTreeBuilder.java | 4 +- .../common/profile/ProfileTreeBuilder.java | 64 ++++---- .../doris/common/util/ProfileManager.java | 89 ++++++----- .../doris/load/loadv2/BrokerLoadJob.java | 2 - .../doris/load/loadv2/LoadLoadingTask.java | 7 +- .../apache/doris/planner/DataPartition.java | 3 + .../org/apache/doris/planner/PlanNode.java | 11 +- .../org/apache/doris/qe/ShowExecutor.java | 64 +++++++- .../doris/task/ExportExportingTask.java | 2 +- 22 files changed, 508 insertions(+), 109 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadProfileStmt.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/profile/MultiProfileTreeBuilder.java diff --git a/be/src/common/config.h b/be/src/common/config.h index bec0b9e657..970ff46cf7 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -618,6 +618,7 @@ CONF_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); // if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method // else we will call sync method CONF_mBool(runtime_filter_use_async_rpc, "true"); + } // namespace config } // namespace doris diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 48cef80ae8..3d3702a2cf 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -33,6 +33,9 @@ class MemTracker; class RuntimeState; class ExprContext; +// The counter will be passed to each scanner. +// Note that this struct is not thread safe. +// So if we support concurrent scan in the future, we need to modify this struct. struct ScannerCounter { ScannerCounter() : num_rows_filtered(0), num_rows_unselected(0) {} diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 058b3e8aef..91b2407fba 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -170,8 +170,7 @@ Status BrokerScanner::open_file_reader() { case TFileType::FILE_HDFS: { #if defined(__x86_64__) BufferedReader* file_reader = - new BufferedReader(new HdfsFileReader(range.hdfs_params, range.path, start_offset), - config::remote_storage_read_buffer_mb * 1024 * 1024); + new BufferedReader(_profile, new HdfsFileReader(range.hdfs_params, range.path, start_offset)); RETURN_IF_ERROR(file_reader->open()); _cur_file_reader = file_reader; break; @@ -189,8 +188,7 @@ Status BrokerScanner::open_file_reader() { } case TFileType::FILE_S3: { BufferedReader* s3_reader = - new BufferedReader(new S3Reader(_params.properties, range.path, start_offset), - config::remote_storage_read_buffer_mb * 1024 * 1024); + new BufferedReader(_profile, new S3Reader(_params.properties, range.path, start_offset)); RETURN_IF_ERROR(s3_reader->open()); _cur_file_reader = s3_reader; break; diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp index a86044dbf5..5c599d41b7 100644 --- a/be/src/exec/buffered_reader.cpp +++ b/be/src/exec/buffered_reader.cpp @@ -20,17 +20,22 @@ #include #include +#include "common/config.h" #include "common/logging.h" namespace doris { // buffered reader -BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size) - : _reader(reader), +BufferedReader::BufferedReader(RuntimeProfile* profile, FileReader* reader, int64_t buffer_size) + : _profile(profile), + _reader(reader), _buffer_size(buffer_size), _buffer_offset(0), _buffer_limit(0), _cur_offset(0) { + if (_buffer_size == -1L) { + _buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024; + } _buffer = new char[_buffer_size]; // set the _cur_offset of this reader as same as the inner reader's, // to make sure the buffer reader will start to read at right position. @@ -47,6 +52,14 @@ Status BufferedReader::open() { ss << "Open buffered reader failed, reader is null"; return Status::InternalError(ss.str()); } + + // the macro ADD_XXX is idempotent. + // So although each scanner calls the ADD_XXX method, they all use the same counters. + _read_timer = ADD_TIMER(_profile, "FileReadTime"); + _remote_read_timer = ADD_CHILD_TIMER(_profile, "FileRemoteReadTime", "FileReadTime"); + _read_counter = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT); + _remote_read_counter = ADD_COUNTER(_profile, "FileRemoteReadCalls", TUnit::UNIT); + RETURN_IF_ERROR(_reader->open()); return Status::OK(); } @@ -68,6 +81,7 @@ Status BufferedReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, } Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { + SCOPED_TIMER(_read_timer); if (nbytes <= 0) { *bytes_read = 0; return Status::OK(); @@ -92,6 +106,7 @@ Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { + _read_count++; // requested bytes missed the local buffer if (position >= _buffer_limit || position < _buffer_offset) { // if requested length is larger than the capacity of buffer, do not @@ -121,6 +136,7 @@ Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* byt Status BufferedReader::_fill() { if (_buffer_offset >= 0) { int64_t bytes_read = 0; + SCOPED_TIMER(_remote_read_timer); RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer)); _buffer_limit = _buffer_offset + bytes_read; } @@ -144,6 +160,13 @@ Status BufferedReader::tell(int64_t* position) { void BufferedReader::close() { _reader->close(); SAFE_DELETE_ARRAY(_buffer); + + if (_read_counter != nullptr) { + COUNTER_UPDATE(_read_counter, _read_count); + } + if (_remote_read_counter != nullptr) { + COUNTER_UPDATE(_remote_read_counter, _remote_read_count); + } } bool BufferedReader::closed() { diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h index b421b1d08b..937e154ca7 100644 --- a/be/src/exec/buffered_reader.h +++ b/be/src/exec/buffered_reader.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "exec/file_reader.h" #include "olap/olap_define.h" +#include "util/runtime_profile.h" namespace doris { @@ -35,7 +36,8 @@ public: // If the reader need the file size, set it when construct FileReader. // There is no other way to set the file size. // buffered_reader will acquire reader - BufferedReader(FileReader* reader, int64_t buffer_size = 1024 * 1024); + // -1 means using config buffered_reader_buffer_size_bytes + BufferedReader(RuntimeProfile* profile, FileReader* reader, int64_t = -1L); virtual ~BufferedReader(); virtual Status open() override; @@ -56,12 +58,25 @@ private: Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out); private: + RuntimeProfile* _profile; std::unique_ptr _reader; char* _buffer; int64_t _buffer_size; int64_t _buffer_offset; int64_t _buffer_limit; int64_t _cur_offset; + + int64_t _read_count = 0; + int64_t _remote_read_count = 0; + + // total time cost in this reader + RuntimeProfile::Counter* _read_timer = nullptr; + // time cost of "_reader", "remote" because "_reader" is always a remote reader + RuntimeProfile::Counter* _remote_read_timer = nullptr; + // counter of calling read() + RuntimeProfile::Counter* _read_counter = nullptr; + // counter of calling "remote read()" + RuntimeProfile::Counter* _remote_read_counter = nullptr; }; } // namespace doris diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index c47796fa0f..5cba337c63 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -160,8 +160,7 @@ Status JsonScanner::open_file_reader() { } case TFileType::FILE_S3: { BufferedReader* s3_reader = - new BufferedReader(new S3Reader(_params.properties, range.path, start_offset), - config::remote_storage_read_buffer_mb * 1024 * 1024); + new BufferedReader(_profile, new S3Reader(_params.properties, range.path, start_offset)); RETURN_IF_ERROR(s3_reader->open()); _cur_file_reader = s3_reader; break; diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 74f64573cc..fafb9e2ad5 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -396,13 +396,13 @@ Status ORCScanner::open_next_reader() { if (range.__isset.file_size) { file_size = range.file_size; } - file_reader.reset(new BufferedReader(new BrokerReader(_state->exec_env(), _broker_addresses, + file_reader.reset(new BufferedReader(_profile, new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties, range.path, range.start_offset, file_size))); break; } case TFileType::FILE_S3: { - file_reader.reset(new BufferedReader( + file_reader.reset(new BufferedReader(_profile, new S3Reader(_params.properties, range.path, range.start_offset))); break; } diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 72e460d514..86107d998d 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -145,13 +145,13 @@ Status ParquetScanner::open_next_reader() { if (range.__isset.file_size) { file_size = range.file_size; } - file_reader.reset(new BufferedReader( + file_reader.reset(new BufferedReader(_profile, new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties, range.path, range.start_offset, file_size))); break; } case TFileType::FILE_S3: { - file_reader.reset(new BufferedReader( + file_reader.reset(new BufferedReader(_profile, new S3Reader(_params.properties, range.path, range.start_offset))); break; } diff --git a/be/test/exec/buffered_reader_test.cpp b/be/test/exec/buffered_reader_test.cpp index 19525a624b..b984d9c5d5 100644 --- a/be/test/exec/buffered_reader_test.cpp +++ b/be/test/exec/buffered_reader_test.cpp @@ -33,10 +33,11 @@ protected: }; TEST_F(BufferedReaderTest, normal_use) { + RuntimeProfile profile("test"); // buffered_reader_test_file 950 bytes auto file_reader = new LocalFileReader( "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0); - BufferedReader reader(file_reader, 1024); + BufferedReader reader(&profile, file_reader, 1024); auto st = reader.open(); ASSERT_TRUE(st.ok()); uint8_t buf[1024]; @@ -50,10 +51,11 @@ TEST_F(BufferedReaderTest, normal_use) { } TEST_F(BufferedReaderTest, test_validity) { + RuntimeProfile profile("test"); // buffered_reader_test_file.txt 45 bytes auto file_reader = new LocalFileReader( "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); - BufferedReader reader(file_reader, 64); + BufferedReader reader(&profile, file_reader, 64); auto st = reader.open(); ASSERT_TRUE(st.ok()); uint8_t buf[10]; @@ -92,10 +94,11 @@ TEST_F(BufferedReaderTest, test_validity) { } TEST_F(BufferedReaderTest, test_seek) { + RuntimeProfile profile("test"); // buffered_reader_test_file.txt 45 bytes auto file_reader = new LocalFileReader( "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); - BufferedReader reader(file_reader, 64); + BufferedReader reader(&profile, file_reader, 64); auto st = reader.open(); ASSERT_TRUE(st.ok()); uint8_t buf[10]; @@ -143,10 +146,11 @@ TEST_F(BufferedReaderTest, test_seek) { } TEST_F(BufferedReaderTest, test_miss) { + RuntimeProfile profile("test"); // buffered_reader_test_file.txt 45 bytes auto file_reader = new LocalFileReader( "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); - BufferedReader reader(file_reader, 64); + BufferedReader reader(&profile, file_reader, 64); auto st = reader.open(); ASSERT_TRUE(st.ok()); uint8_t buf[128]; diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index c3c3ed1407..68cfca327e 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -2623,6 +2623,10 @@ show_param ::= {: RESULT = new ShowQueryProfileStmt(queryIdPath); :} + | KW_LOAD KW_PROFILE STRING_LITERAL:loadIdPath + {: + RESULT = new ShowLoadProfileStmt(loadIdPath); + :} | KW_ENCRYPTKEYS opt_db:dbName opt_wild_where {: RESULT = new ShowEncryptKeysStmt(dbName, parser.wild); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadProfileStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadProfileStmt.java new file mode 100644 index 0000000000..50f8fcc28b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadProfileStmt.java @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ShowResultSetMetaData; + +import com.google.common.base.Strings; + +// For stmt like: +// show load profile "/"; # list all saving load job ids +// show load profile "/10014" # show task ids of specified job +// show load profile "/10014/e0f7390f5363419e-b416a2a79996083e/" # show instance list of the task +// show load profile "/10014/e0f7390f5363419e-b416a2a79996083e/e0f7390f5363419e-b416a2a799960906" # show instance tree graph +public class ShowLoadProfileStmt extends ShowStmt { + private static final ShowResultSetMetaData META_DATA_TASK_IDS = + ShowResultSetMetaData.builder() + .addColumn(new Column("TaskId", ScalarType.createVarchar(128))) + .addColumn(new Column("ActiveTime", ScalarType.createVarchar(64))) + .build(); + + public enum PathType { + JOB_IDS, + TASK_IDS, + INSTANCES, + SINGLE_INSTANCE + } + + private String idPath; + private PathType pathType; + + private String jobId = ""; + private String taskId = ""; + private String instanceId = ""; + + public ShowLoadProfileStmt(String idPath) { + this.idPath = idPath; + } + + public PathType getPathType() { + return pathType; + } + + public String getJobId() { + return jobId; + } + + public String getTaskId() { + return taskId; + } + + public String getInstanceId() { + return instanceId; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + if (Strings.isNullOrEmpty(idPath)) { + // list all query ids + pathType = PathType.JOB_IDS; + return; + } + + if (!idPath.startsWith("/")) { + throw new AnalysisException("Path must starts with '/'"); + } + pathType = PathType.JOB_IDS; + String[] parts = idPath.split("/"); + if (parts.length > 4) { + throw new AnalysisException("Path must in format '/jobId/taskId/instanceId'"); + } + + for (int i = 0; i < parts.length; i++) { + switch (i) { + case 0: + pathType = PathType.JOB_IDS; + continue; + case 1: + jobId = parts[i]; + pathType = PathType.TASK_IDS; + break; + case 2: + taskId = parts[i]; + pathType = PathType.INSTANCES; + break; + case 3: + instanceId = parts[i]; + pathType = PathType.SINGLE_INSTANCE; + break; + default: + break; + } + } + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder("SHOW LOAD PROFILE ").append(idPath); + return sb.toString(); + } + + @Override + public String toString() { + return toSql(); + } + + @Override + public ShowResultSetMetaData getMetaData() { + switch (pathType) { + case JOB_IDS: + return ShowQueryProfileStmt.META_DATA_QUERY_IDS; + case TASK_IDS: + return META_DATA_TASK_IDS; + case INSTANCES: + return ShowQueryProfileStmt.META_DATA_INSTANCES; + case SINGLE_INSTANCE: + return ShowQueryProfileStmt.META_DATA_SINGLE_INSTANCE; + default: + return null; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java index ae6a7ffb6e..7c0994c434 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java @@ -32,7 +32,7 @@ import com.google.common.base.Strings; // show query profile "/e0f7390f5363419e-b416a2a79996083e/0/e0f7390f5363419e-b416a2a799960906" # show graph of the instance public class ShowQueryProfileStmt extends ShowStmt { // This should be same as ProfileManager.PROFILE_HEADERS - private static final ShowResultSetMetaData META_DATA_QUERYIDS = + public static final ShowResultSetMetaData META_DATA_QUERY_IDS = ShowResultSetMetaData.builder() .addColumn(new Column("QueryId", ScalarType.createVarchar(128))) .addColumn(new Column("User", ScalarType.createVarchar(128))) @@ -45,17 +45,17 @@ public class ShowQueryProfileStmt extends ShowStmt { .addColumn(new Column("QueryState", ScalarType.createVarchar(128))) .build(); - private static final ShowResultSetMetaData META_DATA_FRAGMENTS = + public static final ShowResultSetMetaData META_DATA_FRAGMENTS = ShowResultSetMetaData.builder() .addColumn(new Column("Fragments", ScalarType.createVarchar(65535))) .build(); - private static final ShowResultSetMetaData META_DATA_INSTANCES = + public static final ShowResultSetMetaData META_DATA_INSTANCES = ShowResultSetMetaData.builder() .addColumn(new Column("Instances", ScalarType.createVarchar(128))) .addColumn(new Column("Host", ScalarType.createVarchar(64))) .addColumn(new Column("ActiveTime", ScalarType.createVarchar(64))) .build(); - private static final ShowResultSetMetaData META_DATA_SINGLE_INSTANCE = + public static final ShowResultSetMetaData META_DATA_SINGLE_INSTANCE = ShowResultSetMetaData.builder() .addColumn(new Column("Instance", ScalarType.createVarchar(65535))) .build(); @@ -150,7 +150,7 @@ public class ShowQueryProfileStmt extends ShowStmt { public ShowResultSetMetaData getMetaData() { switch (pathType) { case QUERY_IDS: - return META_DATA_QUERYIDS; + return META_DATA_QUERY_IDS; case FRAGMETNS: return META_DATA_FRAGMENTS; case INSTANCES: diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/MultiProfileTreeBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/MultiProfileTreeBuilder.java new file mode 100644 index 0000000000..08af2b9431 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/MultiProfileTreeBuilder.java @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.profile; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.Counter; +import org.apache.doris.common.util.RuntimeProfile; + +import com.clearspring.analytics.util.Lists; +import com.google.common.collect.Maps; + +import org.apache.commons.lang3.tuple.Triple; +import org.glassfish.jersey.internal.guava.Sets; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +// MultiProfileTreeBuilder saves a set of ProfileTreeBuilder. +// For a query profile, there is usually only one ExecutionProfile node. +// For a load job profile, it may produce multiple subtasks, so there may be multiple ExecutionProfile nodes. +// +// Each ExecutionProfile node corresponds to a ProfileTreeBuilder +public class MultiProfileTreeBuilder { + private static final Set PROFILE_ROOT_NAMES; + public static final String PROFILE_NAME_EXECUTION = "Execution Profile"; + + private static final String EXECUTION_ID_PATTERN_STR = "^Execution Profile (.*)"; + private static final Pattern EXECUTION_ID_PATTERN; + + private RuntimeProfile profileRoot; + private Map idToSingleProfile = Maps.newHashMap(); + private Map idToSingleTreeBuilder = Maps.newHashMap(); + + static { + PROFILE_ROOT_NAMES = Sets.newHashSet(); + PROFILE_ROOT_NAMES.add("Query"); + PROFILE_ROOT_NAMES.add("BrokerLoadJob"); + EXECUTION_ID_PATTERN = Pattern.compile(EXECUTION_ID_PATTERN_STR); + } + + public MultiProfileTreeBuilder(RuntimeProfile root) { + this.profileRoot = root; + } + + public void build() throws UserException { + unwrapProfile(); + buildTrees(); + } + + private void unwrapProfile() throws UserException { + if (PROFILE_ROOT_NAMES.stream().anyMatch(n -> profileRoot.getName().startsWith(n))) { + List> children = profileRoot.getChildList(); + boolean find = false; + for (Pair pair : children) { + if (pair.first.getName().startsWith(PROFILE_NAME_EXECUTION)) { + String executionProfileId = getExecutionProfileId(pair.first.getName()); + idToSingleProfile.put(executionProfileId, pair.first); + find = true; + } + } + if (!find) { + throw new UserException("Invalid profile. Expected " + PROFILE_NAME_EXECUTION); + } + } + } + + private String getExecutionProfileId(String executionName) throws UserException { + Matcher m = EXECUTION_ID_PATTERN.matcher(executionName); + if (!m.find() || m.groupCount() != 1) { + throw new UserException("Invalid execution profile name: " + executionName); + } + return m.group(1); + } + + private void buildTrees() throws UserException { + for (Map.Entry entry : idToSingleProfile.entrySet()) { + ProfileTreeBuilder builder = new ProfileTreeBuilder(entry.getValue()); + builder.build(); + idToSingleTreeBuilder.put(entry.getKey(), builder); + } + } + + public List> getSubTaskInfo() { + List> rows = Lists.newArrayList(); + for (Map.Entry entry : idToSingleProfile.entrySet()) { + List row = Lists.newArrayList(); + Counter activeCounter = entry.getValue().getCounterTotalTime(); + row.add(entry.getKey()); + row.add(RuntimeProfile.printCounter(activeCounter.getValue(), activeCounter.getType())); + rows.add(row); + } + return rows; + } + + public List> getInstanceList(String executionId, String fragmentId) + throws AnalysisException { + ProfileTreeBuilder singleBuilder = getExecutionProfileTreeBuilder(executionId); + return singleBuilder.getInstanceList(fragmentId); + } + + public ProfileTreeNode getInstanceTreeRoot(String executionId, String fragmentId, String instanceId) + throws AnalysisException { + ProfileTreeBuilder singleBuilder = getExecutionProfileTreeBuilder(executionId); + return singleBuilder.getInstanceTreeRoot(fragmentId, instanceId); + } + + public ProfileTreeNode getFragmentTreeRoot(String executionId) throws AnalysisException { + ProfileTreeBuilder singleBuilder = getExecutionProfileTreeBuilder(executionId); + return singleBuilder.getFragmentTreeRoot(); + } + + private ProfileTreeBuilder getExecutionProfileTreeBuilder(String executionId) throws AnalysisException { + ProfileTreeBuilder singleBuilder = idToSingleTreeBuilder.get(executionId); + if (singleBuilder == null) { + throw new AnalysisException("Can not find execution profile: " + executionId); + } + return singleBuilder; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/PlanTreeBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/PlanTreeBuilder.java index 39c92c8409..bde659776a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/PlanTreeBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/PlanTreeBuilder.java @@ -57,7 +57,7 @@ public class PlanTreeBuilder { if (sink != null) { StringBuilder sb = new StringBuilder(); sb.append("[").append(sink.getExchNodeId().asInt()).append(": ").append(sink.getClass().getSimpleName()).append("]"); - sb.append("\nFragment: ").append(fragment.getId().asInt()); + sb.append("\n[Fragment: ").append(fragment.getId().asInt()).append("]"); sb.append("\n").append(sink.getExplainString("", TExplainLevel.BRIEF)); sinkNode = new PlanTreeNode(sink.getExchNodeId(), sb.toString()); if (i == 0) { @@ -102,7 +102,7 @@ public class PlanTreeBuilder { } private void buildForPlanNode(PlanNode planNode, PlanTreeNode parent) { - PlanTreeNode node = new PlanTreeNode(planNode.getId(), planNode.toString()); + PlanTreeNode node = new PlanTreeNode(planNode.getId(), planNode.getPlanTreeExplanStr()); if (parent != null) { parent.addChild(node); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java index e4a34e38bc..eea7574b8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileTreeBuilder.java @@ -23,12 +23,12 @@ import org.apache.doris.common.util.Counter; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.thrift.TUnit; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.commons.lang3.tuple.Triple; -import com.clearspring.analytics.util.Lists; -import com.google.common.collect.Maps; - import java.util.Formatter; import java.util.List; import java.util.Map; @@ -46,14 +46,13 @@ import java.util.regex.Pattern; */ public class ProfileTreeBuilder { - private static final String PROFILE_NAME_QUERY = "Query"; - private static final String PROFILE_NAME_EXECUTION = "Execution Profile"; private static final String PROFILE_NAME_DATA_STREAM_SENDER = "DataStreamSender"; private static final String PROFILE_NAME_DATA_BUFFER_SENDER = "DataBufferSender"; + private static final String PROFILE_NAME_OLAP_TABLE_SINK = "OlapTableSink"; private static final String PROFILE_NAME_BLOCK_MGR = "BlockMgr"; private static final String PROFILE_NAME_BUFFER_POOL = "Buffer pool"; private static final String PROFILE_NAME_EXCHANGE_NODE = "EXCHANGE_NODE"; - public static final String DATA_BUFFER_SENDER_ID = "-1"; + public static final String FINAL_SENDER_ID = "-1"; public static final String UNKNOWN_ID = "-2"; private RuntimeProfile profileRoot; @@ -115,7 +114,7 @@ public class ProfileTreeBuilder { public void build() throws UserException { reset(); - unwrapProfile(); + checkProfile(); analyzeAndBuildFragmentTrees(); assembleFragmentTrees(); } @@ -128,25 +127,9 @@ public class ProfileTreeBuilder { fragmentTreeRoot = null; } - private void unwrapProfile() throws UserException { - while(true) { - if (profileRoot.getName().startsWith(PROFILE_NAME_QUERY)) { - List> children = profileRoot.getChildList(); - boolean find = false; - for (Pair pair : children) { - if (pair.first.getName().startsWith(PROFILE_NAME_EXECUTION)) { - this.profileRoot = pair.first; - find = true; - break; - } - } - if (!find) { - throw new UserException("Invalid profile. Expected " + PROFILE_NAME_EXECUTION - + " in " + PROFILE_NAME_QUERY); - } - } else { - break; - } + private void checkProfile() throws UserException { + if (!profileRoot.getName().startsWith(MultiProfileTreeBuilder.PROFILE_NAME_EXECUTION)) { + throw new UserException("Invalid profile. Expected " + MultiProfileTreeBuilder.PROFILE_NAME_EXECUTION); } } @@ -179,7 +162,7 @@ public class ProfileTreeBuilder { RuntimeProfile instanceProfile = fragmentChildren.get(0).first; ProfileTreeNode instanceTreeRoot = buildSingleInstanceTree(instanceProfile, fragmentId, null); instanceTreeRoot.setMaxInstanceActiveTime(RuntimeProfile.printCounter(maxActiveTimeNs, TUnit.TIME_NS)); - if (instanceTreeRoot.id.equals(DATA_BUFFER_SENDER_ID)) { + if (instanceTreeRoot.id.equals(FINAL_SENDER_ID)) { fragmentTreeRoot = instanceTreeRoot; } @@ -195,7 +178,7 @@ public class ProfileTreeBuilder { this.instanceTreeMap.put(fragmentId, instanceTrees); } - // If instanceId is null, which means this profile tree node is for bulding the entire fragment tree. + // If instanceId is null, which means this profile tree node is for building the entire fragment tree. // So that we need to add sender and exchange node to the auxiliary structure. private ProfileTreeNode buildSingleInstanceTree(RuntimeProfile instanceProfile, String fragmentId, String instanceId) throws UserException { @@ -205,7 +188,8 @@ public class ProfileTreeBuilder { for (Pair pair : instanceChildren) { RuntimeProfile profile = pair.first; if (profile.getName().startsWith(PROFILE_NAME_DATA_STREAM_SENDER) - || profile.getName().startsWith(PROFILE_NAME_DATA_BUFFER_SENDER)) { + || profile.getName().startsWith(PROFILE_NAME_DATA_BUFFER_SENDER) + || profile.getName().startsWith(PROFILE_NAME_OLAP_TABLE_SINK)) { senderNode = buildTreeNode(profile, null, fragmentId, instanceId); if (instanceId == null) { senderNodes.add(senderNode); @@ -238,11 +222,11 @@ public class ProfileTreeBuilder { // skip Buffer pool, and buffer pool does not has child return null; } - boolean isDataBufferSender = name.startsWith(PROFILE_NAME_DATA_BUFFER_SENDER); + String finalSenderName = checkAndGetFinalSenderName(name); Matcher m = EXEC_NODE_NAME_ID_PATTERN.matcher(name); String extractName; String extractId; - if ((!m.find() && !isDataBufferSender) || m.groupCount() != 2) { + if ((!m.find() && finalSenderName == null) || m.groupCount() != 2) { // DataStreamBuffer name like: "DataBufferSender (dst_fragment_instance_id=d95356f9219b4831-986b4602b41683ca):" // So it has no id. // Other profile should has id like: @@ -251,8 +235,8 @@ public class ProfileTreeBuilder { extractName = name; extractId = UNKNOWN_ID; } else { - extractName = isDataBufferSender ? PROFILE_NAME_DATA_BUFFER_SENDER : m.group(1); - extractId = isDataBufferSender ? DATA_BUFFER_SENDER_ID : m.group(2); + extractName = finalSenderName != null ? finalSenderName : m.group(1); + extractId = finalSenderName != null ? FINAL_SENDER_ID : m.group(2); } Counter activeCounter = profile.getCounterTotalTime(); ExecNodeNode node = new ExecNodeNode(extractName, extractId); @@ -286,6 +270,18 @@ public class ProfileTreeBuilder { return node; } + // Check if the given node name is from final node, like DATA_BUFFER_SENDER or OLAP_TABLE_SINK + // If yes, return that name, if not, return null; + private String checkAndGetFinalSenderName(String name) { + if (name.startsWith(PROFILE_NAME_DATA_BUFFER_SENDER)) { + return PROFILE_NAME_DATA_BUFFER_SENDER; + } else if (name.startsWith(PROFILE_NAME_OLAP_TABLE_SINK)) { + return PROFILE_NAME_OLAP_TABLE_SINK; + } else { + return null; + } + } + private void buildCounterNode(RuntimeProfile profile, String counterName, CounterNode root) { Map> childCounterMap = profile.getChildCounterMap(); Set childCounterSet = childCounterMap.get(counterName); @@ -308,7 +304,7 @@ public class ProfileTreeBuilder { private void assembleFragmentTrees() throws UserException { for (ProfileTreeNode senderNode : senderNodes) { - if (senderNode.id.equals(DATA_BUFFER_SENDER_ID)) { + if (senderNode.id.equals(FINAL_SENDER_ID)) { // this is result sender, skip it. continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index 448090e6f2..28dd935a27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -18,9 +18,8 @@ package org.apache.doris.common.util; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.profile.ProfileTreeBuilder; +import org.apache.doris.common.profile.MultiProfileTreeBuilder; import org.apache.doris.common.profile.ProfileTreeNode; -import org.apache.doris.common.profile.ProfileTreePrinter; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -68,6 +67,11 @@ public class ProfileManager { public static final String SQL_STATEMENT = "Sql Statement"; public static final String IS_CACHED = "Is Cached"; + public enum ProfileType { + QUERY, + LOAD, + } + public static final ArrayList PROFILE_HEADERS = new ArrayList( Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE, START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE)); @@ -75,7 +79,7 @@ public class ProfileManager { private class ProfileElement { public Map infoStrings = Maps.newHashMap(); public String profileContent = ""; - public ProfileTreeBuilder builder = null; + public MultiProfileTreeBuilder builder = null; public String errMsg = ""; } @@ -113,8 +117,9 @@ public class ProfileManager { for (String header : PROFILE_HEADERS) { element.infoStrings.put(header, summaryProfile.getInfoString(header)); } + element.profileContent = profile.toString(); - ProfileTreeBuilder builder = new ProfileTreeBuilder(profile); + MultiProfileTreeBuilder builder = new MultiProfileTreeBuilder(profile); try { builder.build(); } catch (Exception e) { @@ -122,9 +127,7 @@ public class ProfileManager { LOG.debug("failed to build profile tree", e); return element; } - element.builder = builder; - element.profileContent = profile.toString(); return element; } @@ -158,8 +161,12 @@ public class ProfileManager { writeLock.unlock(); } } - + public List> getAllQueries() { + return getQueryWithType(null); + } + + public List> getQueryWithType(ProfileType type) { List> result = Lists.newArrayList(); readLock.lock(); try { @@ -171,9 +178,12 @@ public class ProfileManager { continue; } Map infoStrings = profileElement.infoStrings; - + if (type != null && !infoStrings.get(QUERY_TYPE).equalsIgnoreCase(type.name())) { + continue; + } + List row = Lists.newArrayList(); - for (String str : PROFILE_HEADERS ) { + for (String str : PROFILE_HEADERS) { row.add(infoStrings.get(str)); } result.add(row); @@ -183,7 +193,7 @@ public class ProfileManager { } return result; } - + public String getProfile(String queryID) { readLock.lock(); try { @@ -191,32 +201,15 @@ public class ProfileManager { if (element == null) { return null; } - + return element.profileContent; } finally { readLock.unlock(); } } - public String getFragmentProfileTreeString(String queryID) { - readLock.lock(); - try { - ProfileElement element = queryIdToProfileMap.get(queryID); - if (element == null || element.builder == null) { - return null; - } - ProfileTreeBuilder builder = element.builder; - return builder.getFragmentTreeRoot().debugTree(0, ProfileTreePrinter.PrintLevel.INSTANCE); - } catch (Exception e) { - LOG.warn("failed to get profile tree", e); - return null; - } finally { - readLock.unlock(); - } - } - - public ProfileTreeNode getFragmentProfileTree(String queryID) throws AnalysisException { - ProfileTreeNode tree; + public ProfileTreeNode getFragmentProfileTree(String queryID, String executionId) throws AnalysisException { + MultiProfileTreeBuilder builder; readLock.lock(); try { ProfileElement element = queryIdToProfileMap.get(queryID); @@ -224,14 +217,16 @@ public class ProfileManager { throw new AnalysisException("failed to get fragment profile tree. err: " + (element == null ? "not found" : element.errMsg)); } - return element.builder.getFragmentTreeRoot(); + builder = element.builder; } finally { readLock.unlock(); } + return builder.getFragmentTreeRoot(executionId); } - public List> getFragmentInstanceList(String queryID, String fragmentId) throws AnalysisException { - ProfileTreeBuilder builder; + public List> getFragmentInstanceList(String queryID, String executionId, String fragmentId) + throws AnalysisException { + MultiProfileTreeBuilder builder; readLock.lock(); try { ProfileElement element = queryIdToProfileMap.get(queryID); @@ -244,11 +239,12 @@ public class ProfileManager { readLock.unlock(); } - return builder.getInstanceList(fragmentId); + return builder.getInstanceList(executionId, fragmentId); } - public ProfileTreeNode getInstanceProfileTree(String queryID, String fragmentId, String instanceId) throws AnalysisException { - ProfileTreeBuilder builder; + public ProfileTreeNode getInstanceProfileTree(String queryID, String executionId, String fragmentId, String instanceId) + throws AnalysisException { + MultiProfileTreeBuilder builder; readLock.lock(); try { ProfileElement element = queryIdToProfileMap.get(queryID); @@ -261,6 +257,25 @@ public class ProfileManager { readLock.unlock(); } - return builder.getInstanceTreeRoot(fragmentId, instanceId); + return builder.getInstanceTreeRoot(executionId, fragmentId, instanceId); + } + + // Return the tasks info of the specified load job + // Columns: TaskId, ActiveTime + public List> getLoadJobTaskList(String jobId) throws AnalysisException { + MultiProfileTreeBuilder builder; + readLock.lock(); + try { + ProfileElement element = queryIdToProfileMap.get(jobId); + if (element == null || element.builder == null) { + throw new AnalysisException("failed to get task ids. err: " + + (element == null ? "not found" : element.errMsg)); + } + builder = element.builder; + } finally { + readLock.unlock(); + } + + return builder.getSubTaskInfo(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index bb3188ee78..e001b87d51 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -326,8 +326,6 @@ public class BrokerLoadJob extends BulkLoadJob { // Add the summary profile to the first jobProfile.addFirstChild(summaryProfile); jobProfile.computeTimeInChildProfile(); - StringBuilder builder = new StringBuilder(); - jobProfile.prettyPrint(builder, ""); ProfileManager.getInstance().pushProfile(jobProfile); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 2e0c7bb60e..4e0054c947 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -68,7 +68,6 @@ public class LoadLoadingTask extends LoadTask { private LoadingTaskPlanner planner; private RuntimeProfile jobProfile; - private RuntimeProfile profile; private long beginTime; public LoadLoadingTask(Database db, OlapTable table, @@ -172,17 +171,15 @@ public class LoadLoadingTask extends LoadTask { return jobDeadlineMs - System.currentTimeMillis(); } - public void createProfile(Coordinator coord) { + private void createProfile(Coordinator coord) { if (jobProfile == null) { // No need to gather profile return; } // Summary profile - profile = new RuntimeProfile("LoadTask: " + DebugUtil.printId(loadId)); coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTime)); coord.endProfile(); - profile.addChild(coord.getQueryProfile()); - jobProfile.addChild(profile); + jobProfile.addChild(coord.getQueryProfile()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index a8ee29a244..fabb260ddd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -131,6 +131,9 @@ public class DataPartition { public String getExplainString(TExplainLevel explainLevel) { StringBuilder str = new StringBuilder(); str.append(type.toString()); + if (explainLevel == TExplainLevel.BRIEF) { + return str.toString(); + } if (!partitionExprs.isEmpty()) { List strings = Lists.newArrayList(); for (Expr expr : partitionExprs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index dc53c44a81..983a1a7c48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -17,7 +17,6 @@ package org.apache.doris.planner; -import com.google.common.base.Joiner; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprId; @@ -32,6 +31,7 @@ import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlan; import org.apache.doris.thrift.TPlanNode; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.Lists; @@ -658,7 +658,6 @@ abstract public class PlanNode extends TreeNode { } } - /** * Returns the estimated combined selectivity of all conjuncts. Uses heuristics to * address the following estimation challenges: @@ -739,6 +738,14 @@ abstract public class PlanNode extends TreeNode { } } + public String getPlanTreeExplanStr() { + StringBuilder sb = new StringBuilder(); + sb.append("[").append(getId().asInt()).append(": ").append(getPlanNodeName()).append("]"); + sb.append("\n[Fragment: ").append(getFragmentId().asInt()).append("]"); + sb.append("\n").append(getNodeExplainString("", TExplainLevel.BRIEF)); + return sb.toString(); + } + public ScanNode getScanNodeInOneFragmentByTupleId(TupleId tupleId) { if (this instanceof ScanNode && tupleIds.contains(tupleId)) { return (ScanNode) this; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 8c54369571..92d93ae13e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -47,6 +47,7 @@ import org.apache.doris.analysis.ShowFunctionsStmt; import org.apache.doris.analysis.ShowGrantsStmt; import org.apache.doris.analysis.ShowIndexStmt; import org.apache.doris.analysis.ShowEncryptKeysStmt; +import org.apache.doris.analysis.ShowLoadProfileStmt; import org.apache.doris.analysis.ShowLoadStmt; import org.apache.doris.analysis.ShowLoadWarningsStmt; import org.apache.doris.analysis.ShowMigrationsStmt; @@ -296,6 +297,8 @@ public class ShowExecutor { handleShowPlugins(); } else if (stmt instanceof ShowQueryProfileStmt) { handleShowQueryProfile(); + } else if (stmt instanceof ShowLoadProfileStmt) { + handleShowLoadProfile(); } else { handleEmtpy(); } @@ -1877,10 +1880,11 @@ public class ShowExecutor { List> rows = Lists.newArrayList(); switch (pathType) { case QUERY_IDS: - rows = ProfileManager.getInstance().getAllQueries(); + rows = ProfileManager.getInstance().getQueryWithType(ProfileManager.ProfileType.QUERY); break; case FRAGMETNS: { - ProfileTreeNode treeRoot = ProfileManager.getInstance().getFragmentProfileTree(showStmt.getQueryId()); + ProfileTreeNode treeRoot = ProfileManager.getInstance().getFragmentProfileTree(showStmt.getQueryId(), + showStmt.getQueryId()); if (treeRoot == null) { throw new AnalysisException("Failed to get fragment tree for query: " + showStmt.getQueryId()); } @@ -1889,8 +1893,11 @@ public class ShowExecutor { break; } case INSTANCES: { + // For query profile, there should be only one execution profile, + // And the execution id is same as query id List> instanceList - = ProfileManager.getInstance().getFragmentInstanceList(showStmt.getQueryId(), showStmt.getFragmentId()); + = ProfileManager.getInstance().getFragmentInstanceList( + showStmt.getQueryId(), showStmt.getQueryId(), showStmt.getFragmentId()); if (instanceList == null) { throw new AnalysisException("Failed to get instance list for fragment: " + showStmt.getFragmentId()); } @@ -1902,8 +1909,57 @@ public class ShowExecutor { break; } case SINGLE_INSTANCE: { + // For query profile, there should be only one execution profile, + // And the execution id is same as query id ProfileTreeNode treeRoot = ProfileManager.getInstance().getInstanceProfileTree(showStmt.getQueryId(), - showStmt.getFragmentId(), showStmt.getInstanceId()); + showStmt.getQueryId(), showStmt.getFragmentId(), showStmt.getInstanceId()); + if (treeRoot == null) { + throw new AnalysisException("Failed to get instance tree for instance: " + showStmt.getInstanceId()); + } + List row = Lists.newArrayList(ProfileTreePrinter.printInstanceTree(treeRoot)); + rows.add(row); + break; + } + default: + break; + } + + resultSet = new ShowResultSet(showStmt.getMetaData(), rows); + } + + private void handleShowLoadProfile() throws AnalysisException { + ShowLoadProfileStmt showStmt = (ShowLoadProfileStmt) stmt; + ShowLoadProfileStmt.PathType pathType = showStmt.getPathType(); + List> rows = Lists.newArrayList(); + switch (pathType) { + case JOB_IDS: + rows = ProfileManager.getInstance().getQueryWithType(ProfileManager.ProfileType.LOAD); + break; + case TASK_IDS: { + rows = ProfileManager.getInstance().getLoadJobTaskList(showStmt.getJobId()); + break; + } + case INSTANCES: { + // For load profile, there should be only one fragment in each execution profile + // And the fragment id is 0. + List> instanceList + = ProfileManager.getInstance().getFragmentInstanceList(showStmt.getJobId(), + showStmt.getTaskId(), "0"); + if (instanceList == null) { + throw new AnalysisException("Failed to get instance list for task: " + showStmt.getTaskId()); + } + for (Triple triple : instanceList) { + List row = Lists.newArrayList(triple.getLeft(), triple.getMiddle(), + RuntimeProfile.printCounter(triple.getRight(), TUnit.TIME_NS)); + rows.add(row); + } + break; + } + case SINGLE_INSTANCE: { + // For load profile, there should be only one fragment in each execution profile. + // And the fragment id is 0. + ProfileTreeNode treeRoot = ProfileManager.getInstance().getInstanceProfileTree(showStmt.getJobId(), + showStmt.getTaskId(), "0", showStmt.getInstanceId()); if (treeRoot == null) { throw new AnalysisException("Failed to get instance tree for instance: " + showStmt.getInstanceId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java index 9b72d5dda6..2adfb23eb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java @@ -252,7 +252,7 @@ public class ExportExportingTask extends MasterTask { } private void initProfile() { - profile = new RuntimeProfile("Query"); + profile = new RuntimeProfile("ExportJob"); RuntimeProfile summaryProfile = new RuntimeProfile("Summary"); summaryProfile.addInfoString(ProfileManager.QUERY_ID, String.valueOf(job.getId())); summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(job.getStartTimeMs()));