[Optimize] Optimize profile lock conflict and view profile while query is executing (#5762)

1. Reduce lock conflicts in RuntimeProfile of be;
2. can view query profile when the query is executing;
3. reduce wait time for 'show proc /current_queries'.
This commit is contained in:
luozenglin
2021-05-13 22:33:26 +08:00
committed by GitHub
parent bdd2a6d055
commit 0c83e43a67
25 changed files with 420 additions and 453 deletions

View File

@ -286,8 +286,12 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil
if (runtime_state->query_options().query_type == TQueryType::LOAD) {
params.__set_loaded_rows(runtime_state->num_rows_load_total());
}
profile->to_thrift(&params.profile);
params.__isset.profile = true;
if (profile == nullptr) {
params.__isset.profile = false;
} else {
profile->to_thrift(&params.profile);
params.__isset.profile = true;
}
if (!runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
@ -583,31 +587,6 @@ void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
}
Status FragmentMgr::trigger_profile_report(const PTriggerProfileReportRequest* request) {
if (request->instance_ids_size() > 0) {
for (int i = 0; i < request->instance_ids_size(); i++) {
const PUniqueId& p_fragment_id = request->instance_ids(i);
TUniqueId id;
id.__set_hi(p_fragment_id.hi());
id.__set_lo(p_fragment_id.lo());
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(id);
if (iter != _fragment_map.end()) {
iter->second->executor()->report_profile_once();
}
}
}
} else {
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.begin();
for (; iter != _fragment_map.end(); iter++) {
iter->second->executor()->report_profile_once();
}
}
return Status::OK();
}
void FragmentMgr::debug(std::stringstream& ss) {
// Keep things simple
std::lock_guard<std::mutex> lock(_lock);

View File

@ -73,8 +73,6 @@ public:
virtual void debug(std::stringstream& ss);
Status trigger_profile_report(const PTriggerProfileReportRequest* request);
// input: TScanOpenParams fragment_instance_id
// output: selected_columns
// execute external query, all query info are packed in TScanOpenParams

View File

@ -241,7 +241,7 @@ Status PlanFragmentExecutor::open() {
// may block
// TODO: if no report thread is started, make sure to send a final profile
// at end, otherwise the coordinator hangs in case we finish w/ an error
if (_report_status_cb && config::status_report_interval > 0) {
if (_is_report_success && !_report_status_cb.empty() && config::status_report_interval > 0) {
std::unique_lock<std::mutex> l(_report_thread_lock);
_report_thread = boost::thread(&PlanFragmentExecutor::report_profile, this);
// make sure the thread started up, otherwise report_profile() might get into a race
@ -365,9 +365,8 @@ void PlanFragmentExecutor::report_profile() {
int report_fragment_offset = rand() % config::status_report_interval;
// We don't want to wait longer than it takes to run the entire fragment.
_stop_report_thread_cv.wait_for(l, std::chrono::seconds(report_fragment_offset));
bool is_report_profile_interval = _is_report_success && config::status_report_interval > 0;
while (_report_thread_active) {
if (is_report_profile_interval) {
if (config::status_report_interval > 0) {
// wait_for can return because the timeout occurred or the condition variable
// was signaled. We can't rely on its return value to distinguish between the
// two cases (e.g. there is a race here where the wait timed out but before grabbing
@ -376,8 +375,8 @@ void PlanFragmentExecutor::report_profile() {
_stop_report_thread_cv.wait_for(l,
std::chrono::seconds(config::status_report_interval));
} else {
// Artificial triggering, such as show proc "/current_queries".
_stop_report_thread_cv.wait(l);
LOG(WARNING) << "config::status_report_interval is equal to or less than zero, exiting reporting thread.";
break;
}
if (VLOG_FILE_IS_ON) {
@ -427,7 +426,11 @@ void PlanFragmentExecutor::send_report(bool done) {
// This will send a report even if we are cancelled. If the query completed correctly
// but fragments still need to be cancelled (e.g. limit reached), the coordinator will
// be waiting for a final report and profile.
_report_status_cb(status, profile(), done || !status.ok());
if (_is_report_success) {
_report_status_cb(status, profile(), done || !status.ok());
} else {
_report_status_cb(status, nullptr, done || !status.ok());
}
}
void PlanFragmentExecutor::stop_report_thread() {
@ -565,7 +568,7 @@ void PlanFragmentExecutor::close() {
}
}
{
if (_is_report_success) {
std::stringstream ss;
// Compute the _local_time_percent before pretty_print the runtime_profile
// Before add this operation, the print out like that:

View File

@ -143,8 +143,6 @@ public:
DataSink* get_sink() { return _sink.get(); }
void report_profile_once() { _stop_report_thread_cv.notify_one(); }
void set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
private:

View File

@ -182,16 +182,6 @@ void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController* cntl_b
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
}
template <typename T>
void PInternalServiceImpl<T>::trigger_profile_report(google::protobuf::RpcController* controller,
const PTriggerProfileReportRequest* request,
PTriggerProfileReportResult* result,
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
auto st = _exec_env->fragment_mgr()->trigger_profile_report(request);
st.to_protobuf(result->mutable_status());
}
template <typename T>
void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controller,
const PProxyRequest* request, PProxyResult* response,

View File

@ -70,11 +70,6 @@ public:
PTabletWriterCancelResult* response,
google::protobuf::Closure* done) override;
void trigger_profile_report(google::protobuf::RpcController* controller,
const PTriggerProfileReportRequest* request,
PTriggerProfileReportResult* result,
google::protobuf::Closure* done) override;
void get_info(google::protobuf::RpcController* controller, const PProxyRequest* request,
PProxyResult* response, google::protobuf::Closure* done) override;

View File

@ -57,11 +57,12 @@ RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile
}
RuntimeProfile::~RuntimeProfile() {
std::map<std::string, Counter*>::const_iterator iter;
for (auto iter = _rate_counters.begin(); iter != _rate_counters.end(); ++iter) {
stop_rate_counters_updates(*iter);
}
for (iter = _counter_map.begin(); iter != _counter_map.end(); ++iter) {
stop_rate_counters_updates(iter->second);
stop_sampling_counters_updates(iter->second);
for (auto iter = _sampling_counters.begin(); iter != _sampling_counters.end(); ++iter) {
stop_sampling_counters_updates(*iter);
}
std::set<std::vector<Counter*>*>::const_iterator buckets_iter;
@ -653,6 +654,7 @@ RuntimeProfile::Counter* RuntimeProfile::add_rate_counter(const std::string& nam
}
Counter* dst_counter = add_counter(name, dst_type);
_rate_counters.push_back(dst_counter);
register_periodic_counter(src_counter, NULL, dst_counter, RATE_COUNTER);
return dst_counter;
}
@ -675,6 +677,7 @@ RuntimeProfile::Counter* RuntimeProfile::add_sampling_counter(const std::string&
RuntimeProfile::Counter* RuntimeProfile::add_sampling_counter(const std::string& name,
SampleFn sample_fn) {
Counter* dst_counter = add_counter(name, TUnit::DOUBLE_VALUE);
_sampling_counters.push_back(dst_counter);
register_periodic_counter(NULL, sample_fn, dst_counter, SAMPLING_COUNTER);
return dst_counter;
}

View File

@ -514,6 +514,10 @@ private:
// of the total time in the entire profile tree.
double _local_time_percent;
std::vector<Counter*> _rate_counters;
std::vector<Counter*> _sampling_counters;
enum PeriodicCounterType {
RATE_COUNTER = 0,
SAMPLING_COUNTER,

View File

@ -105,6 +105,13 @@ public class ThreadPoolManager {
new BlockedPolicy(poolName, 60), poolName, needRegisterMetric);
}
public static ThreadPoolExecutor newDaemonProfileThreadPool(int numThread, int queueSize, String poolName,
boolean needRegisterMetric) {
return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize), new LogDiscardOldestPolicy(poolName), poolName,
needRegisterMetric);
}
public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@ -184,5 +191,25 @@ public class ThreadPoolManager {
}
}
}
static class LogDiscardOldestPolicy implements RejectedExecutionHandler{
private static final Logger LOG = LogManager.getLogger(LogDiscardOldestPolicy.class);
private String threadPoolName;
public LogDiscardOldestPolicy(String threadPoolName) {
this.threadPoolName = threadPoolName;
}
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
Runnable discardTask = executor.getQueue().poll();
LOG.warn("Task: {} submit to {}, and discard the oldest task:{}", task, threadPoolName, discardTask);
executor.execute(task);
}
}
}
}

View File

@ -34,6 +34,7 @@ import java.util.List;
/*
* show proc "/current_queries/{query_id}/fragments"
* set variable "set is_report_success = true" to enable "ScanBytes" and "ProcessRows".
*/
public class CurrentQueryFragmentProcNode implements ProcNodeInterface {
private static final Logger LOG = LogManager.getLogger(CurrentQueryFragmentProcNode.class);
@ -79,10 +80,15 @@ public class CurrentQueryFragmentProcNode implements ProcNodeInterface {
rowData.add(instanceStatistics.getFragmentId());
rowData.add(instanceStatistics.getInstanceId().toString());
rowData.add(instanceStatistics.getAddress().toString());
rowData.add(QueryStatisticsFormatter.getScanBytes(
instanceStatistics.getScanBytes()));
rowData.add(QueryStatisticsFormatter.getRowsReturned(
instanceStatistics.getRowsReturned()));
if (item.getIsReportSucc()) {
rowData.add(QueryStatisticsFormatter.getScanBytes(
instanceStatistics.getScanBytes()));
rowData.add(QueryStatisticsFormatter.getRowsReturned(
instanceStatistics.getRowsReturned()));
} else {
rowData.add("N/A");
rowData.add("N/A");
}
sortedRowData.add(rowData);
}
@ -90,9 +96,7 @@ public class CurrentQueryFragmentProcNode implements ProcNodeInterface {
sortedRowData.sort(new Comparator<List<String>>() {
@Override
public int compare(List<String> l1, List<String> l2) {
final Integer fragmentId1 = Integer.valueOf(l1.get(0));
final Integer fragmentId2 = Integer.valueOf(l2.get(0));
return fragmentId1.compareTo(fragmentId2);
return l1.get(0).compareTo(l2.get(0));
}
});
final BaseProcResult result = new BaseProcResult();

View File

@ -17,20 +17,12 @@
package org.apache.doris.common.proc;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Counter;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.QueryStatisticsItem;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Preconditions;
@ -43,10 +35,6 @@ import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Provide running query's statistics.
@ -58,28 +46,23 @@ public class CurrentQueryInfoProvider {
}
/**
* Firstly send request to trigger profile to report for specified query and wait a while,
* Secondly get Counters from Coordinator's RuntimeProfile and return query's statistics.
* get Counters from Coordinator's RuntimeProfile and return query's statistics.
*
* @param item
* @return
* @throws AnalysisException
*/
public QueryStatistics getQueryStatistics(QueryStatisticsItem item) throws AnalysisException {
triggerReportAndWait(item, getWaitingTimeForSingleQuery(), false);
return new QueryStatistics(item.getQueryProfile());
}
/**
* Same as above, but this will cause BE to report all queries profile.
*
* @param items
* @return
* @throws AnalysisException
*/
public Map<String, QueryStatistics> getQueryStatistics(Collection<QueryStatisticsItem> items)
throws AnalysisException {
triggerReportAndWait(items, getWaitingTime(items.size()), true);
public Map<String, QueryStatistics> getQueryStatistics(Collection<QueryStatisticsItem> items) {
final Map<String, QueryStatistics> queryStatisticsMap = Maps.newHashMap();
for (QueryStatisticsItem item : items) {
queryStatisticsMap.put(item.getQueryId(), new QueryStatistics(item.getQueryProfile()));
@ -95,7 +78,6 @@ public class CurrentQueryInfoProvider {
* @throws AnalysisException
*/
public Collection<InstanceStatistics> getInstanceStatistics(QueryStatisticsItem item) throws AnalysisException {
triggerReportAndWait(item, getWaitingTimeForSingleQuery(), false);
final Map<String, RuntimeProfile> instanceProfiles = collectInstanceProfile(item.getQueryProfile());
final List<InstanceStatistics> instanceStatisticsList = Lists.newArrayList();
for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo : item.getFragmentInstanceInfos()) {
@ -142,136 +124,6 @@ public class CurrentQueryInfoProvider {
}
}
private long getWaitingTimeForSingleQuery() {
return getWaitingTime(1);
}
/**
* @param numOfQuery
* @return unit(ms)
*/
private long getWaitingTime(int numOfQuery) {
final int oneQueryWaitingTime = 100;
final int allQueryMaxWaitingTime = 2000;
final int waitingTime = numOfQuery * oneQueryWaitingTime;
return waitingTime > allQueryMaxWaitingTime ? allQueryMaxWaitingTime : waitingTime;
}
private void triggerReportAndWait(QueryStatisticsItem item, long waitingTime, boolean allQuery)
throws AnalysisException {
final List<QueryStatisticsItem> items = Lists.newArrayList(item);
triggerReportAndWait(items, waitingTime, allQuery);
}
private void triggerReportAndWait(Collection<QueryStatisticsItem> items, long waitingTime, boolean allQuery)
throws AnalysisException {
triggerProfileReport(items, allQuery);
try {
Thread.currentThread().sleep(waitingTime);
} catch (InterruptedException e) {
}
}
/**
* send report profile request.
* @param items
* @param allQuery true:all queries profile will be reported, false:specified queries profile will be reported.
* @throws AnalysisException
*/
private void triggerProfileReport(Collection<QueryStatisticsItem> items, boolean allQuery) throws AnalysisException {
final Map<TNetworkAddress, Request> requests = Maps.newHashMap();
final Map<TNetworkAddress, TNetworkAddress> brpcAddresses = Maps.newHashMap();
for (QueryStatisticsItem item : items) {
for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo : item.getFragmentInstanceInfos()) {
// use brpc address
TNetworkAddress brpcNetAddress = brpcAddresses.get(instanceInfo.getAddress());
if (brpcNetAddress == null) {
try {
brpcNetAddress = toBrpcHost(instanceInfo.getAddress());
brpcAddresses.put(instanceInfo.getAddress(), brpcNetAddress);
} catch (Exception e) {
LOG.warn(e.getMessage());
throw new AnalysisException(e.getMessage());
}
}
// merge different requests
Request request = requests.get(brpcNetAddress);
if (request == null) {
request = new Request(brpcNetAddress);
requests.put(brpcNetAddress, request);
}
// specified query instance which will report.
if (!allQuery) {
final Types.PUniqueId pUId = Types.PUniqueId.newBuilder()
.setHi(instanceInfo.getInstanceId().hi)
.setLo(instanceInfo.getInstanceId().lo)
.build();
request.addInstanceId(pUId);
}
}
}
recvResponse(sendRequest(requests));
}
private List<Pair<Request, Future<InternalService.PTriggerProfileReportResult>>> sendRequest(
Map<TNetworkAddress, Request> requests) throws AnalysisException {
final List<Pair<Request, Future<InternalService.PTriggerProfileReportResult>>> futures = Lists.newArrayList();
for (TNetworkAddress address : requests.keySet()) {
final Request request = requests.get(address);
final InternalService.PTriggerProfileReportRequest pbRequest = InternalService.PTriggerProfileReportRequest
.newBuilder().addAllInstanceIds(request.getInstanceIds()).build();
try {
futures.add(Pair.create(request, BackendServiceProxy.getInstance().
triggerProfileReportAsync(address, pbRequest)));
} catch (RpcException e) {
throw new AnalysisException("Sending request fails for query's execution information.");
}
}
return futures;
}
private void recvResponse(List<Pair<Request, Future<InternalService.PTriggerProfileReportResult>>> futures)
throws AnalysisException {
final String reasonPrefix = "Fail to receive result.";
for (Pair<Request, Future<InternalService.PTriggerProfileReportResult>> pair : futures) {
try {
final InternalService.PTriggerProfileReportResult result
= pair.second.get(2, TimeUnit.SECONDS);
final TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
String errMsg = "";
if (!result.getStatus().getErrorMsgsList().isEmpty()) {
errMsg = result.getStatus().getErrorMsgs(0);
}
throw new AnalysisException(reasonPrefix + " backend:" + pair.first.getAddress()
+ " reason:" + errMsg);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn(reasonPrefix + " reason:" + e.getCause());
throw new AnalysisException(reasonPrefix);
}
}
}
private TNetworkAddress toBrpcHost(TNetworkAddress host) throws AnalysisException {
final Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(
host.getHostname(), host.getPort());
if (backend == null) {
throw new AnalysisException(new StringBuilder("Backend ")
.append(host.getHostname())
.append(":")
.append(host.getPort())
.append(" does not exist")
.toString());
}
if (backend.getBrpcPort() < 0) {
throw new AnalysisException("BRPC port isn't exist.");
}
return new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
}
public static class QueryStatistics {
final List<Map<String, Counter>> counterMaps;
@ -344,26 +196,4 @@ public class CurrentQueryInfoProvider {
return statistics.getScanBytes();
}
}
private static class Request {
private final TNetworkAddress address;
private final List<Types.PUniqueId> instanceIds;
public Request(TNetworkAddress address) {
this.address = address;
this.instanceIds = Lists.newArrayList();
}
public TNetworkAddress getAddress() {
return address;
}
public List<Types.PUniqueId> getInstanceIds() {
return instanceIds;
}
public void addInstanceId(Types.PUniqueId instanceId) {
this.instanceIds.add(instanceId);
}
}
}

View File

@ -32,8 +32,10 @@ import org.apache.logging.log4j.Logger;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
/*
* show proc "/current_queries"
* only set variable "set is_report_success = true" to enable "ScanBytes" and "ProcessRows".
*/
public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
private static final Logger LOG = LogManager.getLogger(CurrentQueryStatisticsProcDir.class);
@ -78,12 +80,17 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
values.add(item.getConnId());
values.add(item.getDb());
values.add(item.getUser());
final CurrentQueryInfoProvider.QueryStatistics statistics
= statisticsMap.get(item.getQueryId());
values.add(QueryStatisticsFormatter.getScanBytes(
statistics.getScanBytes()));
values.add(QueryStatisticsFormatter.getRowsReturned(
statistics.getRowsReturned()));
if (item.getIsReportSucc()) {
final CurrentQueryInfoProvider.QueryStatistics statistics
= statisticsMap.get(item.getQueryId());
values.add(QueryStatisticsFormatter.getScanBytes(
statistics.getScanBytes()));
values.add(QueryStatisticsFormatter.getRowsReturned(
statistics.getRowsReturned()));
} else {
values.add("N/A");
values.add("N/A");
}
values.add(item.getQueryExecTime());
sortedRowData.add(values);
}

View File

@ -62,9 +62,10 @@ public class ProfileManager {
public static final String TOTAL_TIME = "Total";
public static final String QUERY_TYPE = "Query Type";
public static final String QUERY_STATE = "Query State";
public static final String SQL_STATEMENT = "Sql Statement";
public static final String DORIS_VERSION = "Doris Version";
public static final String USER = "User";
public static final String DEFAULT_DB = "Default Db";
public static final String SQL_STATEMENT = "Sql Statement";
public static final String IS_CACHED = "Is Cached";
public static final ArrayList<String> PROFILE_HEADERS = new ArrayList(
@ -78,13 +79,14 @@ public class ProfileManager {
public String errMsg = "";
}
// only protect profileDeque; profileMap is concurrent, no need to protect
// only protect queryIdDeque; queryIdToProfileMap is concurrent, no need to protect
private ReentrantReadWriteLock lock;
private ReadLock readLock;
private WriteLock writeLock;
private Deque<ProfileElement> profileDeque;
private Map<String, ProfileElement> profileMap; // from QueryId to RuntimeProfile
// record the order of profiles by queryId
private Deque<String> queryIdDeque;
private Map<String, ProfileElement> queryIdToProfileMap; // from QueryId to RuntimeProfile
public static ProfileManager getInstance() {
if (INSTANCE == null) {
@ -101,8 +103,8 @@ public class ProfileManager {
lock = new ReentrantReadWriteLock(true);
readLock = lock.readLock();
writeLock = lock.writeLock();
profileDeque = new LinkedList<ProfileElement>();
profileMap = new ConcurrentHashMap<String, ProfileElement>();
queryIdDeque = new LinkedList<>();
queryIdToProfileMap = new ConcurrentHashMap<>();
}
public ProfileElement createElement(RuntimeProfile profile) {
@ -117,7 +119,7 @@ public class ProfileManager {
builder.build();
} catch (Exception e) {
element.errMsg = e.getMessage();
LOG.warn("failed to build profile tree", e);
LOG.debug("failed to build profile tree", e);
return element;
}
@ -139,15 +141,19 @@ public class ProfileManager {
LOG.warn("the key or value of Map is null, "
+ "may be forget to insert 'QUERY_ID' column into infoStrings");
}
profileMap.put(queryId, element);
// a profile may be updated multiple times in queryIdToProfileMap,
// and only needs to be inserted into the queryIdDeque for the first time.
queryIdToProfileMap.put(queryId, element);
writeLock.lock();
try {
if (profileDeque.size() >= ARRAY_SIZE) {
profileMap.remove(profileDeque.getFirst().infoStrings.get(QUERY_ID));
profileDeque.removeFirst();
try {
if (!queryIdDeque.contains(queryId)) {
if (queryIdDeque.size() >= ARRAY_SIZE) {
queryIdToProfileMap.remove(queryIdDeque.getFirst());
queryIdDeque.removeFirst();
}
queryIdDeque.addLast(queryId);
}
profileDeque.addLast(element);
} finally {
writeLock.unlock();
}
@ -157,10 +163,14 @@ public class ProfileManager {
List<List<String>> result = Lists.newArrayList();
readLock.lock();
try {
Iterator reverse = profileDeque.descendingIterator();
Iterator reverse = queryIdDeque.descendingIterator();
while (reverse.hasNext()) {
ProfileElement element = (ProfileElement) reverse.next();
Map<String, String> infoStrings = element.infoStrings;
String queryId = (String) reverse.next();
ProfileElement profileElement = queryIdToProfileMap.get(queryId);
if (profileElement == null){
continue;
}
Map<String, String> infoStrings = profileElement.infoStrings;
List<String> row = Lists.newArrayList();
for (String str : PROFILE_HEADERS ) {
@ -177,7 +187,7 @@ public class ProfileManager {
public String getProfile(String queryID) {
readLock.lock();
try {
ProfileElement element = profileMap.get(queryID);
ProfileElement element = queryIdToProfileMap.get(queryID);
if (element == null) {
return null;
}
@ -191,7 +201,7 @@ public class ProfileManager {
public String getFragmentProfileTreeString(String queryID) {
readLock.lock();
try {
ProfileElement element = profileMap.get(queryID);
ProfileElement element = queryIdToProfileMap.get(queryID);
if (element == null || element.builder == null) {
return null;
}
@ -209,7 +219,7 @@ public class ProfileManager {
ProfileTreeNode tree;
readLock.lock();
try {
ProfileElement element = profileMap.get(queryID);
ProfileElement element = queryIdToProfileMap.get(queryID);
if (element == null || element.builder == null) {
throw new AnalysisException("failed to get fragment profile tree. err: "
+ (element == null ? "not found" : element.errMsg));
@ -224,7 +234,7 @@ public class ProfileManager {
ProfileTreeBuilder builder;
readLock.lock();
try {
ProfileElement element = profileMap.get(queryID);
ProfileElement element = queryIdToProfileMap.get(queryID);
if (element == null || element.builder == null) {
throw new AnalysisException("failed to get instance list. err: "
+ (element == null ? "not found" : element.errMsg));
@ -241,7 +251,7 @@ public class ProfileManager {
ProfileTreeBuilder builder;
readLock.lock();
try {
ProfileElement element = profileMap.get(queryID);
ProfileElement element = queryIdToProfileMap.get(queryID);
if (element == null || element.builder == null) {
throw new AnalysisException("failed to get instance profile tree. err: "
+ (element == null ? "not found" : element.errMsg));

View File

@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
// this interface is used to write profile to ProfileManager when a task is running.
public interface ProfileWriter {
void writeProfile(boolean waitReportDone);
}

View File

@ -31,18 +31,17 @@ import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.Comparator;
import java.util.Formatter;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* It is accessed by two kinds of thread, one is to create this RuntimeProfile
* , named 'query thread', the other is to call
* , named 'query thread', the other is to call
* {@link org.apache.doris.common.proc.CurrentQueryInfoProvider}.
*/
public class RuntimeProfile {
@ -53,13 +52,16 @@ public class RuntimeProfile {
private Map<String, String> infoStrings = Maps.newHashMap();
private List<String> infoStringsDisplayOrder = Lists.newArrayList();
private ReentrantReadWriteLock infoStringsLock = new ReentrantReadWriteLock();
// These will be hold by other thread.
private Map<String, Counter> counterMap = Maps.newConcurrentMap();
private Map<String, RuntimeProfile> childMap = Maps.newConcurrentMap();
private Map<String, TreeSet<String>> childCounterMap = Maps.newConcurrentMap();
// protect TreeSet in ChildCounterMap
private ReentrantReadWriteLock counterLock = new ReentrantReadWriteLock();
private Map<String, TreeSet<String>> childCounterMap = Maps.newHashMap();
private Map<String, RuntimeProfile> childMap = Maps.newConcurrentMap();
private LinkedList<Pair<RuntimeProfile, Boolean>> childList = Lists.newLinkedList();
private ReentrantReadWriteLock childLock = new ReentrantReadWriteLock();
private String name;
@ -103,35 +105,40 @@ public class RuntimeProfile {
}
public Counter addCounter(String name, TUnit type, String parentCounterName) {
Counter counter = this.counterMap.get(name);
if (counter != null) {
return counter;
} else {
Preconditions.checkState(parentCounterName.equals(ROOT_COUNTER)
|| this.counterMap.containsKey(parentCounterName));
Counter newCounter = new Counter(type, 0);
this.counterMap.put(name, newCounter);
Set<String> childCounters = childCounterMap.get(parentCounterName);
if (childCounters == null) {
childCounterMap.put(parentCounterName, new TreeSet<String>());
childCounters = childCounterMap.get(parentCounterName);
counterLock.writeLock().lock();
try {
Counter counter = this.counterMap.get(name);
if (counter != null) {
return counter;
} else {
Preconditions.checkState(parentCounterName.equals(ROOT_COUNTER)
|| this.counterMap.containsKey(parentCounterName));
Counter newCounter = new Counter(type, 0);
this.counterMap.put(name, newCounter);
Set<String> childCounters = childCounterMap.get(parentCounterName);
if (childCounters == null) {
childCounterMap.put(parentCounterName, new TreeSet<String>());
childCounters = childCounterMap.get(parentCounterName);
}
childCounters.add(name);
return newCounter;
}
childCounters.add(name);
return newCounter;
} finally {
counterLock.writeLock().unlock();
}
}
public void update(final TRuntimeProfileTree thriftProfile) {
Reference<Integer> idx = new Reference<Integer>(0);
Reference<Integer> idx = new Reference<Integer>(0);
update(thriftProfile.nodes, idx);
Preconditions.checkState(idx.getRef().equals(thriftProfile.nodes.size()));
}
// preorder traversal, idx should be modified in the traversal process
private void update(List<TRuntimeProfileNode> nodes, Reference<Integer> idx) {
TRuntimeProfileNode node = nodes.get(idx.getRef());
TRuntimeProfileNode node = nodes.get(idx.getRef());
// update this level's counters
if (node.counters != null) {
for (TCounter tcounter : node.counters) {
@ -147,53 +154,71 @@ public class RuntimeProfile {
}
}
}
if (node.child_counters_map != null) {
// update childCounters
for (Map.Entry<String, Set<String>> entry :
node.child_counters_map.entrySet()) {
String parentCounterName = entry.getKey();
Set<String> childCounters = childCounterMap.get(parentCounterName);
if (childCounters == null) {
childCounterMap.put(parentCounterName, new TreeSet<String>());
childCounters = childCounterMap.get(parentCounterName);
}
childCounters.addAll(entry.getValue());
for (Map.Entry<String, Set<String>> entry :
node.child_counters_map.entrySet()) {
String parentCounterName = entry.getKey();
counterLock.writeLock().lock();
try {
Set<String> childCounters = childCounterMap.get(parentCounterName);
if (childCounters == null) {
childCounterMap.put(parentCounterName, new TreeSet<String>());
childCounters = childCounterMap.get(parentCounterName);
}
childCounters.addAll(entry.getValue());
} finally {
counterLock.writeLock().unlock();
}
}
}
}
if (node.info_strings_display_order != null) {
Map<String, String> nodeInfoStrings = node.info_strings;
for (String key : node.info_strings_display_order) {
String value = nodeInfoStrings.get(key);
Preconditions.checkState(value != null);
if (this.infoStrings.containsKey(key)) {
// exists then replace
this.infoStrings.put(key, value);
} else {
this.infoStrings.put(key, value);
this.infoStringsDisplayOrder.add(key);
infoStringsLock.writeLock().lock();
try {
if (this.infoStrings.containsKey(key)) {
// exists then replace
this.infoStrings.put(key, value);
} else {
this.infoStrings.put(key, value);
this.infoStringsDisplayOrder.add(key);
}
} finally {
infoStringsLock.writeLock().unlock();
}
}
}
idx.setRef(idx.getRef() + 1);
for (int i = 0; i < node.num_children; i ++) {
for (int i = 0; i < node.num_children; i++) {
TRuntimeProfileNode tchild = nodes.get(idx.getRef());
String childName = tchild.name;
RuntimeProfile childProfile = this.childMap.get(childName);
if (childProfile == null) {
childMap.put(childName, new RuntimeProfile(childName));
RuntimeProfile childProfile;
childLock.writeLock().lock();
try {
childProfile = this.childMap.get(childName);
Pair<RuntimeProfile, Boolean> pair = Pair.create(childProfile, tchild.indent);
this.childList.add(pair);
if (childProfile == null) {
childMap.put(childName, new RuntimeProfile(childName));
childProfile = this.childMap.get(childName);
Pair<RuntimeProfile, Boolean> pair = Pair.create(childProfile, tchild.indent);
this.childList.add(pair);
}
} finally {
childLock.writeLock().unlock();
}
childProfile.update(nodes, idx);
}
}
// Print the profile:
// 1. Profile Name
// 2. Info Strings
@ -208,49 +233,64 @@ public class RuntimeProfile {
if (counter.getValue() != 0) {
try (Formatter fmt = new Formatter()) {
builder.append("(Active: ")
.append(this.printCounter(counter.getValue(), counter.getType()))
.append(", % non-child: ").append(fmt.format("%.2f", localTimePercent))
.append("%)");
.append(this.printCounter(counter.getValue(), counter.getType()))
.append(", % non-child: ").append(fmt.format("%.2f", localTimePercent))
.append("%)");
}
}
builder.append("\n");
// 2. info String
for (String key : this.infoStringsDisplayOrder) {
builder.append(prefix).append(" - ").append(key).append(": ")
.append(this.infoStrings.get(key)).append("\n");
infoStringsLock.readLock().lock();
try {
for (String key : this.infoStringsDisplayOrder) {
builder.append(prefix).append(" - ").append(key).append(": ")
.append(this.infoStrings.get(key)).append("\n");
}
} finally {
infoStringsLock.readLock().unlock();
}
// 3. counters
printChildCounters(prefix, ROOT_COUNTER, builder);
// 4. children
for (int i = 0; i < childList.size(); i++) {
Pair<RuntimeProfile, Boolean> pair = childList.get(i);
boolean indent = pair.second;
RuntimeProfile profile = pair.first;
profile.prettyPrint(builder, prefix + (indent ? " " : ""));
childLock.readLock().lock();
try {
for (int i = 0; i < childList.size(); i++) {
Pair<RuntimeProfile, Boolean> pair = childList.get(i);
boolean indent = pair.second;
RuntimeProfile profile = pair.first;
profile.prettyPrint(builder, prefix + (indent ? " " : ""));
}
} finally {
childLock.readLock().unlock();
}
}
public String toString() {
StringBuilder builder = new StringBuilder();
prettyPrint(builder, "");
return builder.toString();
}
private void printChildCounters(String prefix, String counterName, StringBuilder builder) {
Set<String> childCounterSet = childCounterMap.get(counterName);
if (childCounterSet == null) {
return;
}
for (String childCounterName : childCounterSet) {
Counter counter = this.counterMap.get(childCounterName);
Preconditions.checkState(counter != null);
builder.append(prefix).append( " - " ).append(childCounterName).append(": ")
.append(printCounter(counter.getValue(), counter.getType())).append("\n");
this.printChildCounters(prefix + " ", childCounterName, builder);
counterLock.readLock().lock();
try {
for (String childCounterName : childCounterSet) {
Counter counter = this.counterMap.get(childCounterName);
Preconditions.checkState(counter != null);
builder.append(prefix).append(" - ").append(childCounterName).append(": ")
.append(printCounter(counter.getValue(), counter.getType())).append("\n");
this.printChildCounters(prefix + " ", childCounterName, builder);
}
} finally {
counterLock.readLock().unlock();
}
}
@ -309,7 +349,7 @@ public class RuntimeProfile {
builder.append(tmpValue);
} else {
builder.append(pair.first).append(pair.second)
.append(" ").append("/sec");
.append(" ").append("/sec");
}
break;
}
@ -317,28 +357,43 @@ public class RuntimeProfile {
Preconditions.checkState(false, "type=" + type);
break;
}
}
}
return builder.toString();
}
public void addChild(RuntimeProfile child) {
if (child == null) {
return;
}
this.childMap.put(child.name, child);
Pair<RuntimeProfile, Boolean> pair = Pair.create(child, true);
this.childList.add(pair);
childLock.writeLock().lock();
try {
if (childMap.containsKey(child.name)) {
childList.removeIf(e -> e.first.name.equals(child.name));
}
this.childMap.put(child.name, child);
Pair<RuntimeProfile, Boolean> pair = Pair.create(child, true);
this.childList.add(pair);
} finally {
childLock.writeLock().unlock();
}
}
public void addFirstChild(RuntimeProfile child) {
if (child == null) {
return;
}
this.childMap.put(child.name, child);
Pair<RuntimeProfile, Boolean> pair = Pair.create(child, true);
this.childList.addFirst(pair);
childLock.writeLock().lock();
try {
if (childMap.containsKey(child.name)) {
childList.removeIf(e -> e.first.name.equals(child.name));
}
this.childMap.put(child.name, child);
Pair<RuntimeProfile, Boolean> pair = Pair.create(child, true);
this.childList.addFirst(pair);
} finally {
childLock.writeLock().unlock();
}
}
// Because the profile of summary and child fragment is not a real parent-child relationship
@ -347,53 +402,62 @@ public class RuntimeProfile {
childMap.values().
forEach(RuntimeProfile::computeTimeInProfile);
}
public void computeTimeInProfile() {
computeTimeInProfile(this.counterTotalTime.getValue());
}
private void computeTimeInProfile(long total) {
if (total == 0) {
return;
}
// Add all the total times in all the children
long totalChildTime = 0;
childLock.readLock().lock();
try {
// Add all the total times in all the children
long totalChildTime = 0;
for (int i = 0; i < this.childList.size(); ++i) {
totalChildTime += childList.get(i).first.getCounterTotalTime().getValue();
}
long localTime = this.getCounterTotalTime().getValue() - totalChildTime;
// Counters have some margin, set to 0 if it was negative.
localTime = Math.max(0, localTime);
this.localTimePercent = Double.valueOf(localTime) / Double.valueOf(total);
this.localTimePercent = Math.min(1.0, this.localTimePercent) * 100;
// Recurse on children
for (int i = 0; i < this.childList.size(); i++) {
childList.get(i).first.computeTimeInProfile(total);
}
} finally {
childLock.readLock().unlock();
}
}
// from bigger to smaller
public void sortChildren() {
Collections.sort(this.childList, new Comparator<Pair<RuntimeProfile, Boolean>>() {
@Override
public int compare(Pair<RuntimeProfile, Boolean> profile1, Pair<RuntimeProfile, Boolean> profile2)
{
return Long.valueOf(profile2.first.getCounterTotalTime().getValue())
.compareTo(profile1.first.getCounterTotalTime().getValue());
}
});
childLock.writeLock().lock();
try {
this.childList.sort((profile1, profile2) -> Long.compare(profile2.first.getCounterTotalTime().getValue(),
profile1.first.getCounterTotalTime().getValue()));
} finally {
childLock.writeLock().unlock();
}
}
public void addInfoString(String key, String value) {
String target = this.infoStrings.get(key);
if (target == null) {
this.infoStrings.put(key, value);
this.infoStringsDisplayOrder.add(key);
} else {
this.infoStrings.put(key, value);
infoStringsLock.writeLock().lock();
try {
String target = this.infoStrings.get(key);
if (target == null) {
this.infoStrings.put(key, value);
this.infoStringsDisplayOrder.add(key);
} else {
this.infoStrings.put(key, value);
}
} finally {
infoStringsLock.writeLock().unlock();
}
}
@ -401,7 +465,6 @@ public class RuntimeProfile {
this.name = name;
}
// Returns the value to which the specified key is mapped;
// or null if this map contains no mapping for the key.
public String getInfoString(String key) {

View File

@ -29,6 +29,7 @@ import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListUtil;
import org.apache.doris.common.util.ProfileWriter;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.LoadErrorHub;
@ -156,6 +157,8 @@ public class Coordinator {
private List<RuntimeProfile> fragmentProfile;
private ProfileWriter profileWriter;
// populated in computeFragmentExecParams()
private Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = Maps.newHashMap();
@ -268,6 +271,14 @@ public class Coordinator {
return queryProfile;
}
public ProfileWriter getProfileWriter() {
return profileWriter;
}
public void setProfileWriter(ProfileWriter profileWriter) {
this.profileWriter = profileWriter;
}
public List<String> getDeltaUrls() {
return deltaUrls;
}
@ -1403,17 +1414,19 @@ public class Coordinator {
jobId, params.backend_id, params.query_id, params.fragment_instance_id, params.loaded_rows,
params.done);
}
return;
}
public void endProfile() {
endProfile(true);
}
public void endProfile(boolean waitProfileDone) {
if (backendExecStates.isEmpty()) {
return;
}
// wait for all backends
if (needReport) {
// Wait for all backends to finish reporting when writing profile last time.
if (waitProfileDone && needReport) {
try {
profileDoneSignal.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
@ -1701,7 +1714,7 @@ public class Coordinator {
PlanFragmentId fragmentId;
int instanceId;
boolean initiated;
boolean done;
volatile boolean done;
boolean hasCanceled;
int profileFragmentId;
RuntimeProfile profile;

View File

@ -17,9 +17,10 @@
package org.apache.doris.qe;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ProfileWriter;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
@ -33,6 +34,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public final class QeProcessorImpl implements QeProcessor {
@ -45,8 +47,13 @@ public final class QeProcessorImpl implements QeProcessor {
INSTANCE = new QeProcessorImpl();
}
private ExecutorService writeProfileExecutor;
private QeProcessorImpl() {
coordinatorMap = Maps.newConcurrentMap();
// write profile to ProfileManager when query is running.
writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(1, 100,
"profile-write-pool", true);
}
@Override
@ -97,7 +104,8 @@ public final class QeProcessorImpl implements QeProcessor {
.connId(String.valueOf(context.getConnectionId()))
.db(context.getDatabase())
.fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos())
.profile(info.getCoord().getQueryProfile()).build();
.profile(info.getCoord().getQueryProfile())
.isReportSucc(context.getSessionVariable().isReportSucc()).build();
querySet.put(queryIdStr, item);
}
return querySet;
@ -120,6 +128,9 @@ public final class QeProcessorImpl implements QeProcessor {
}
try {
info.getCoord().updateFragmentExecStatus(params);
if (info.getCoord().getProfileWriter() != null && params.isSetProfile()) {
writeProfileExecutor.submit(new WriteProfileTask(params));
}
} catch (Exception e) {
LOG.warn(e.getMessage());
return result;
@ -163,4 +174,25 @@ public final class QeProcessorImpl implements QeProcessor {
return startExecTime;
}
}
private class WriteProfileTask implements Runnable {
private TReportExecStatusParams params;
WriteProfileTask(TReportExecStatusParams params) {
this.params = params;
}
@Override
public void run() {
QueryInfo info = coordinatorMap.get(params.query_id);
if (info == null) {
return;
}
ProfileWriter profileWriter = info.getCoord().getProfileWriter();
if (profileWriter != null) {
profileWriter.writeProfile(false);
}
}
}
}

View File

@ -35,6 +35,7 @@ public final class QueryStatisticsItem {
private final List<FragmentInstanceInfo> fragmentInstanceInfos;
// root query profile
private final RuntimeProfile queryProfile;
private final boolean isReportSucc;
private QueryStatisticsItem(Builder builder) {
this.queryId = builder.queryId;
@ -45,6 +46,7 @@ public final class QueryStatisticsItem {
this.queryStartTime = builder.queryStartTime;
this.fragmentInstanceInfos = builder.fragmentInstanceInfos;
this.queryProfile = builder.queryProfile;
this.isReportSucc = builder.isReportSucc;
}
public String getDb() {
@ -80,6 +82,10 @@ public final class QueryStatisticsItem {
return queryProfile;
}
public boolean getIsReportSucc () {
return isReportSucc;
}
public static final class Builder {
private String queryId;
private String db;
@ -89,6 +95,7 @@ public final class QueryStatisticsItem {
private long queryStartTime;
private List<FragmentInstanceInfo> fragmentInstanceInfos;
private RuntimeProfile queryProfile;
private boolean isReportSucc;
public Builder() {
fragmentInstanceInfos = Lists.newArrayList();
@ -134,6 +141,11 @@ public final class QueryStatisticsItem {
return this;
}
public Builder isReportSucc(boolean isReportSucc) {
this.isReportSucc = isReportSucc;
return this;
}
public QueryStatisticsItem build() {
initDefaultValue(this);
return new QueryStatisticsItem(this);

View File

@ -59,6 +59,7 @@ import org.apache.doris.common.Version;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.ProfileWriter;
import org.apache.doris.common.util.QueryPlannerProfile;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.SqlParserUtils;
@ -110,7 +111,7 @@ import java.util.stream.Collectors;
// Do one COM_QUERY process.
// first: Parse receive byte array to statement struct.
// second: Do handle function for statement.
public class StmtExecutor {
public class StmtExecutor implements ProfileWriter {
private static final Logger LOG = LogManager.getLogger(StmtExecutor.class);
private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
@ -122,6 +123,9 @@ public class StmtExecutor {
private Analyzer analyzer;
private RuntimeProfile profile;
private RuntimeProfile summaryProfile;
private RuntimeProfile plannerRuntimeProfile;
private final Object writeProfileLock = new Object();
private volatile boolean isFinishedProfile = false;
private volatile Coordinator coord = null;
private MasterOpExecutor masterOpExecutor = null;
private RedirectStatus redirectStatus = null;
@ -156,38 +160,36 @@ public class StmtExecutor {
}
// At the end of query execution, we begin to add up profile
public void initProfile(QueryPlannerProfile plannerProfile) {
// Summary profile
profile = new RuntimeProfile("Query");
summaryProfile = new RuntimeProfile("Summary");
summaryProfile.addInfoString(ProfileManager.QUERY_ID, DebugUtil.printId(context.queryId()));
summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(context.getStartTime()));
private void initProfile(QueryPlannerProfile plannerProfile, boolean waiteBeReport) {
long currentTimestamp = System.currentTimeMillis();
long totalTimeMs = currentTimestamp - context.getStartTime();
summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp));
summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs));
if (profile == null) {
profile = new RuntimeProfile("Query");
summaryProfile = new RuntimeProfile("Summary");
profile.addChild(summaryProfile);
summaryProfile.addInfoString(ProfileManager.QUERY_ID, DebugUtil.printId(context.queryId()));
summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(context.getStartTime()));
summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp));
summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs));
summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Query");
summaryProfile.addInfoString(ProfileManager.QUERY_STATE, context.getState().toString());
summaryProfile.addInfoString(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION);
summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser());
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase());
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, originStmt.originStmt);
summaryProfile.addInfoString(ProfileManager.IS_CACHED, isCached ? "Yes" : "No");
summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Query");
summaryProfile.addInfoString(ProfileManager.QUERY_STATE, context.getState().toString());
summaryProfile.addInfoString("Doris Version", Version.DORIS_BUILD_VERSION);
summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser());
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase());
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, originStmt.originStmt);
summaryProfile.addInfoString(ProfileManager.IS_CACHED, isCached ? "Yes" : "No");
RuntimeProfile plannerRuntimeProfile = new RuntimeProfile("Execution Summary");
plannerProfile.initRuntimeProfile(plannerRuntimeProfile);
summaryProfile.addChild(plannerRuntimeProfile);
profile.addChild(summaryProfile);
if (coord != null) {
coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(plannerProfile.getQueryBeginTime()));
coord.endProfile();
plannerRuntimeProfile = new RuntimeProfile("Execution Summary");
summaryProfile.addChild(plannerRuntimeProfile);
profile.addChild(coord.getQueryProfile());
coord = null;
} else {
summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp));
summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs));
}
plannerProfile.initRuntimeProfile(plannerRuntimeProfile);
coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(plannerProfile.getQueryBeginTime()));
coord.endProfile(waiteBeReport);
}
public Planner planner() {
@ -292,9 +294,7 @@ public class StmtExecutor {
context.setQueryId(newQueryId);
}
handleQueryStmt();
if (context.getSessionVariable().isReportSucc()) {
writeProfile();
}
writeProfile(true);
break;
} catch (RpcException e) {
if (i == retryTime - 1) {
@ -320,9 +320,7 @@ public class StmtExecutor {
} else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InserStmt is its subclass
try {
handleInsertStmt();
if (context.getSessionVariable().isReportSucc()) {
writeProfile();
}
writeProfile(true);
} catch (Throwable t) {
LOG.warn("handle insert stmt fail", t);
// the transaction of this insert may already begun, we will abort it at outer finally block.
@ -413,10 +411,20 @@ public class StmtExecutor {
masterOpExecutor.execute();
}
private void writeProfile() {
initProfile(plannerProfile);
profile.computeTimeInChildProfile();
ProfileManager.getInstance().pushProfile(profile);
@Override
public void writeProfile(boolean isLastWriteProfile) {
if (!context.getSessionVariable().isReportSucc()) {
return;
}
synchronized (writeProfileLock) {
if (isFinishedProfile) {
return;
}
initProfile(plannerProfile, isLastWriteProfile);
profile.computeTimeInChildProfile();
ProfileManager.getInstance().pushProfile(profile);
isFinishedProfile = isLastWriteProfile;
}
}
// Analyze one statement to structure in memory.
@ -788,8 +796,10 @@ public class StmtExecutor {
coord = new Coordinator(context, analyzer, planner);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
coord.setProfileWriter(this);
coord.exec();
plannerProfile.setQueryScheduleFinishTime();
writeProfile(false);
while (true) {
batch = coord.getNext();
// for outfile query, there will be only one empty batch send back with eos flag

View File

@ -73,11 +73,6 @@ public class BackendServiceClient {
return stub.clearCache(request);
}
public Future<InternalService.PTriggerProfileReportResult> triggerProfileReport(
InternalService.PTriggerProfileReportRequest request) {
return stub.triggerProfileReport(request);
}
public Future<InternalService.PProxyResult> getInfo(InternalService.PProxyRequest request) {
return stub.getInfo(request);
}

View File

@ -157,18 +157,6 @@ public class BackendServiceProxy {
}
}
public Future<InternalService.PTriggerProfileReportResult> triggerProfileReportAsync(
TNetworkAddress address, InternalService.PTriggerProfileReportRequest request) throws RpcException {
try {
final BackendServiceClient client = getProxy(address);
return client.triggerProfileReport(request);
} catch (Throwable e) {
LOG.warn("fetch data catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
throw new RpcException(address.hostname, e.getMessage());
}
}
public Future<InternalService.PProxyResult> getInfo(
TNetworkAddress address, InternalService.PProxyRequest request) throws RpcException {
try {

View File

@ -260,7 +260,7 @@ public class ExportExportingTask extends MasterTask {
summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Query");
summaryProfile.addInfoString(ProfileManager.QUERY_STATE, job.getState().toString());
summaryProfile.addInfoString("Doris Version", Version.DORIS_BUILD_VERSION);
summaryProfile.addInfoString(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION);
summaryProfile.addInfoString(ProfileManager.USER, "xxx");
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, String.valueOf(job.getDbId()));
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, job.getSql());

View File

@ -332,14 +332,6 @@ public class MockedBackendFactory {
responseObserver.onCompleted();
}
@Override
public void triggerProfileReport(InternalService.PTriggerProfileReportRequest request, StreamObserver<InternalService.PTriggerProfileReportResult> responseObserver) {
System.out.println("get triggerProfileReport request");
responseObserver.onNext(InternalService.PTriggerProfileReportResult.newBuilder()
.setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build());
responseObserver.onCompleted();
}
@Override
public void getInfo(InternalService.PProxyRequest request, StreamObserver<InternalService.PProxyResult> responseObserver) {
System.out.println("get get_info request");

View File

@ -211,14 +211,6 @@ message PClearCacheRequest {
};
//End cache proto definition
message PTriggerProfileReportRequest {
repeated PUniqueId instance_ids = 1;
};
message PTriggerProfileReportResult {
required PStatus status = 1;
};
message PStringPair {
required string key = 1;
required string val = 2;
@ -257,7 +249,6 @@ service PBackendService {
rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult);
rpc tablet_writer_add_batch(PTabletWriterAddBatchRequest) returns (PTabletWriterAddBatchResult);
rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult);
rpc trigger_profile_report(PTriggerProfileReportRequest) returns (PTriggerProfileReportResult);
rpc get_info(PProxyRequest) returns (PProxyResult);
rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse);
rpc fetch_cache(PFetchCacheRequest) returns (PFetchCacheResult);

View File

@ -34,7 +34,6 @@ service PInternalService {
rpc tablet_writer_open(doris.PTabletWriterOpenRequest) returns (doris.PTabletWriterOpenResult);
rpc tablet_writer_add_batch(doris.PTabletWriterAddBatchRequest) returns (doris.PTabletWriterAddBatchResult);
rpc tablet_writer_cancel(doris.PTabletWriterCancelRequest) returns (doris.PTabletWriterCancelResult);
rpc trigger_profile_report(doris.PTriggerProfileReportRequest) returns (doris.PTriggerProfileReportResult);
rpc get_info(doris.PProxyRequest) returns (doris.PProxyResult);
rpc update_cache(doris.PUpdateCacheRequest) returns (doris.PCacheResponse);
rpc fetch_cache(doris.PFetchCacheRequest) returns (doris.PFetchCacheResult);