diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e7914d5415..3b15c115e9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -286,8 +286,12 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil if (runtime_state->query_options().query_type == TQueryType::LOAD) { params.__set_loaded_rows(runtime_state->num_rows_load_total()); } - profile->to_thrift(¶ms.profile); - params.__isset.profile = true; + if (profile == nullptr) { + params.__isset.profile = false; + } else { + profile->to_thrift(¶ms.profile); + params.__isset.profile = true; + } if (!runtime_state->output_files().empty()) { params.__isset.delta_urls = true; @@ -583,31 +587,6 @@ void FragmentMgr::cancel_worker() { LOG(INFO) << "FragmentMgr cancel worker is going to exit."; } -Status FragmentMgr::trigger_profile_report(const PTriggerProfileReportRequest* request) { - if (request->instance_ids_size() > 0) { - for (int i = 0; i < request->instance_ids_size(); i++) { - const PUniqueId& p_fragment_id = request->instance_ids(i); - TUniqueId id; - id.__set_hi(p_fragment_id.hi()); - id.__set_lo(p_fragment_id.lo()); - { - std::lock_guard lock(_lock); - auto iter = _fragment_map.find(id); - if (iter != _fragment_map.end()) { - iter->second->executor()->report_profile_once(); - } - } - } - } else { - std::lock_guard lock(_lock); - auto iter = _fragment_map.begin(); - for (; iter != _fragment_map.end(); iter++) { - iter->second->executor()->report_profile_once(); - } - } - return Status::OK(); -} - void FragmentMgr::debug(std::stringstream& ss) { // Keep things simple std::lock_guard lock(_lock); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index fa5dcb5215..7d160744c3 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -73,8 +73,6 @@ public: virtual void debug(std::stringstream& ss); - Status trigger_profile_report(const PTriggerProfileReportRequest* request); - // input: TScanOpenParams fragment_instance_id // output: selected_columns // execute external query, all query info are packed in TScanOpenParams diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 2af9cb8fe0..005cfe5c8b 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -241,7 +241,7 @@ Status PlanFragmentExecutor::open() { // may block // TODO: if no report thread is started, make sure to send a final profile // at end, otherwise the coordinator hangs in case we finish w/ an error - if (_report_status_cb && config::status_report_interval > 0) { + if (_is_report_success && !_report_status_cb.empty() && config::status_report_interval > 0) { std::unique_lock l(_report_thread_lock); _report_thread = boost::thread(&PlanFragmentExecutor::report_profile, this); // make sure the thread started up, otherwise report_profile() might get into a race @@ -365,9 +365,8 @@ void PlanFragmentExecutor::report_profile() { int report_fragment_offset = rand() % config::status_report_interval; // We don't want to wait longer than it takes to run the entire fragment. _stop_report_thread_cv.wait_for(l, std::chrono::seconds(report_fragment_offset)); - bool is_report_profile_interval = _is_report_success && config::status_report_interval > 0; while (_report_thread_active) { - if (is_report_profile_interval) { + if (config::status_report_interval > 0) { // wait_for can return because the timeout occurred or the condition variable // was signaled. We can't rely on its return value to distinguish between the // two cases (e.g. there is a race here where the wait timed out but before grabbing @@ -376,8 +375,8 @@ void PlanFragmentExecutor::report_profile() { _stop_report_thread_cv.wait_for(l, std::chrono::seconds(config::status_report_interval)); } else { - // Artificial triggering, such as show proc "/current_queries". - _stop_report_thread_cv.wait(l); + LOG(WARNING) << "config::status_report_interval is equal to or less than zero, exiting reporting thread."; + break; } if (VLOG_FILE_IS_ON) { @@ -427,7 +426,11 @@ void PlanFragmentExecutor::send_report(bool done) { // This will send a report even if we are cancelled. If the query completed correctly // but fragments still need to be cancelled (e.g. limit reached), the coordinator will // be waiting for a final report and profile. - _report_status_cb(status, profile(), done || !status.ok()); + if (_is_report_success) { + _report_status_cb(status, profile(), done || !status.ok()); + } else { + _report_status_cb(status, nullptr, done || !status.ok()); + } } void PlanFragmentExecutor::stop_report_thread() { @@ -565,7 +568,7 @@ void PlanFragmentExecutor::close() { } } - { + if (_is_report_success) { std::stringstream ss; // Compute the _local_time_percent before pretty_print the runtime_profile // Before add this operation, the print out like that: diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 85311baad4..4fcdf9d105 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -143,8 +143,6 @@ public: DataSink* get_sink() { return _sink.get(); } - void report_profile_once() { _stop_report_thread_cv.notify_one(); } - void set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; } private: diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index a0c4bc996c..62e917f9c1 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -182,16 +182,6 @@ void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_b _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); } -template -void PInternalServiceImpl::trigger_profile_report(google::protobuf::RpcController* controller, - const PTriggerProfileReportRequest* request, - PTriggerProfileReportResult* result, - google::protobuf::Closure* done) { - brpc::ClosureGuard closure_guard(done); - auto st = _exec_env->fragment_mgr()->trigger_profile_report(request); - st.to_protobuf(result->mutable_status()); -} - template void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 91196aea64..1d794dc2d8 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -70,11 +70,6 @@ public: PTabletWriterCancelResult* response, google::protobuf::Closure* done) override; - void trigger_profile_report(google::protobuf::RpcController* controller, - const PTriggerProfileReportRequest* request, - PTriggerProfileReportResult* result, - google::protobuf::Closure* done) override; - void get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) override; diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index 0d9a5b5b68..5cf9c2027f 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -57,11 +57,12 @@ RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile } RuntimeProfile::~RuntimeProfile() { - std::map::const_iterator iter; + for (auto iter = _rate_counters.begin(); iter != _rate_counters.end(); ++iter) { + stop_rate_counters_updates(*iter); + } - for (iter = _counter_map.begin(); iter != _counter_map.end(); ++iter) { - stop_rate_counters_updates(iter->second); - stop_sampling_counters_updates(iter->second); + for (auto iter = _sampling_counters.begin(); iter != _sampling_counters.end(); ++iter) { + stop_sampling_counters_updates(*iter); } std::set*>::const_iterator buckets_iter; @@ -653,6 +654,7 @@ RuntimeProfile::Counter* RuntimeProfile::add_rate_counter(const std::string& nam } Counter* dst_counter = add_counter(name, dst_type); + _rate_counters.push_back(dst_counter); register_periodic_counter(src_counter, NULL, dst_counter, RATE_COUNTER); return dst_counter; } @@ -675,6 +677,7 @@ RuntimeProfile::Counter* RuntimeProfile::add_sampling_counter(const std::string& RuntimeProfile::Counter* RuntimeProfile::add_sampling_counter(const std::string& name, SampleFn sample_fn) { Counter* dst_counter = add_counter(name, TUnit::DOUBLE_VALUE); + _sampling_counters.push_back(dst_counter); register_periodic_counter(NULL, sample_fn, dst_counter, SAMPLING_COUNTER); return dst_counter; } diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 7b562eed46..5b1cb512b7 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -514,6 +514,10 @@ private: // of the total time in the entire profile tree. double _local_time_percent; + std::vector _rate_counters; + + std::vector _sampling_counters; + enum PeriodicCounterType { RATE_COUNTER = 0, SAMPLING_COUNTER, diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java index da32dfb00b..c699065ef0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -105,6 +105,13 @@ public class ThreadPoolManager { new BlockedPolicy(poolName, 60), poolName, needRegisterMetric); } + public static ThreadPoolExecutor newDaemonProfileThreadPool(int numThread, int queueSize, String poolName, + boolean needRegisterMetric) { + return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueSize), new LogDiscardOldestPolicy(poolName), poolName, + needRegisterMetric); + } + public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, @@ -184,5 +191,25 @@ public class ThreadPoolManager { } } } + + static class LogDiscardOldestPolicy implements RejectedExecutionHandler{ + + private static final Logger LOG = LogManager.getLogger(LogDiscardOldestPolicy.class); + + private String threadPoolName; + + public LogDiscardOldestPolicy(String threadPoolName) { + this.threadPoolName = threadPoolName; + } + + @Override + public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { + if (!executor.isShutdown()) { + Runnable discardTask = executor.getQueue().poll(); + LOG.warn("Task: {} submit to {}, and discard the oldest task:{}", task, threadPoolName, discardTask); + executor.execute(task); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java index 1b054df767..619869e342 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java @@ -34,6 +34,7 @@ import java.util.List; /* * show proc "/current_queries/{query_id}/fragments" + * set variable "set is_report_success = true" to enable "ScanBytes" and "ProcessRows". */ public class CurrentQueryFragmentProcNode implements ProcNodeInterface { private static final Logger LOG = LogManager.getLogger(CurrentQueryFragmentProcNode.class); @@ -79,10 +80,15 @@ public class CurrentQueryFragmentProcNode implements ProcNodeInterface { rowData.add(instanceStatistics.getFragmentId()); rowData.add(instanceStatistics.getInstanceId().toString()); rowData.add(instanceStatistics.getAddress().toString()); - rowData.add(QueryStatisticsFormatter.getScanBytes( - instanceStatistics.getScanBytes())); - rowData.add(QueryStatisticsFormatter.getRowsReturned( - instanceStatistics.getRowsReturned())); + if (item.getIsReportSucc()) { + rowData.add(QueryStatisticsFormatter.getScanBytes( + instanceStatistics.getScanBytes())); + rowData.add(QueryStatisticsFormatter.getRowsReturned( + instanceStatistics.getRowsReturned())); + } else { + rowData.add("N/A"); + rowData.add("N/A"); + } sortedRowData.add(rowData); } @@ -90,9 +96,7 @@ public class CurrentQueryFragmentProcNode implements ProcNodeInterface { sortedRowData.sort(new Comparator>() { @Override public int compare(List l1, List l2) { - final Integer fragmentId1 = Integer.valueOf(l1.get(0)); - final Integer fragmentId2 = Integer.valueOf(l2.get(0)); - return fragmentId1.compareTo(fragmentId2); + return l1.get(0).compareTo(l2.get(0)); } }); final BaseProcResult result = new BaseProcResult(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java index 0dd2c8ebf1..49e93a1f2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java @@ -17,20 +17,12 @@ package org.apache.doris.common.proc; -import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Pair; import org.apache.doris.common.util.Counter; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.RuntimeProfile; -import org.apache.doris.proto.InternalService; -import org.apache.doris.proto.Types; import org.apache.doris.qe.QueryStatisticsItem; -import org.apache.doris.rpc.BackendServiceProxy; -import org.apache.doris.rpc.RpcException; -import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Preconditions; @@ -43,10 +35,6 @@ import org.apache.logging.log4j.Logger; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * Provide running query's statistics. @@ -58,28 +46,23 @@ public class CurrentQueryInfoProvider { } /** - * Firstly send request to trigger profile to report for specified query and wait a while, - * Secondly get Counters from Coordinator's RuntimeProfile and return query's statistics. + * get Counters from Coordinator's RuntimeProfile and return query's statistics. * * @param item * @return * @throws AnalysisException */ public QueryStatistics getQueryStatistics(QueryStatisticsItem item) throws AnalysisException { - triggerReportAndWait(item, getWaitingTimeForSingleQuery(), false); return new QueryStatistics(item.getQueryProfile()); } /** - * Same as above, but this will cause BE to report all queries profile. * * @param items * @return * @throws AnalysisException */ - public Map getQueryStatistics(Collection items) - throws AnalysisException { - triggerReportAndWait(items, getWaitingTime(items.size()), true); + public Map getQueryStatistics(Collection items) { final Map queryStatisticsMap = Maps.newHashMap(); for (QueryStatisticsItem item : items) { queryStatisticsMap.put(item.getQueryId(), new QueryStatistics(item.getQueryProfile())); @@ -95,7 +78,6 @@ public class CurrentQueryInfoProvider { * @throws AnalysisException */ public Collection getInstanceStatistics(QueryStatisticsItem item) throws AnalysisException { - triggerReportAndWait(item, getWaitingTimeForSingleQuery(), false); final Map instanceProfiles = collectInstanceProfile(item.getQueryProfile()); final List instanceStatisticsList = Lists.newArrayList(); for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo : item.getFragmentInstanceInfos()) { @@ -142,136 +124,6 @@ public class CurrentQueryInfoProvider { } } - private long getWaitingTimeForSingleQuery() { - return getWaitingTime(1); - } - - /** - * @param numOfQuery - * @return unit(ms) - */ - private long getWaitingTime(int numOfQuery) { - final int oneQueryWaitingTime = 100; - final int allQueryMaxWaitingTime = 2000; - final int waitingTime = numOfQuery * oneQueryWaitingTime; - return waitingTime > allQueryMaxWaitingTime ? allQueryMaxWaitingTime : waitingTime; - } - - private void triggerReportAndWait(QueryStatisticsItem item, long waitingTime, boolean allQuery) - throws AnalysisException { - final List items = Lists.newArrayList(item); - triggerReportAndWait(items, waitingTime, allQuery); - } - - private void triggerReportAndWait(Collection items, long waitingTime, boolean allQuery) - throws AnalysisException { - triggerProfileReport(items, allQuery); - try { - Thread.currentThread().sleep(waitingTime); - } catch (InterruptedException e) { - } - } - - /** - * send report profile request. - * @param items - * @param allQuery true:all queries profile will be reported, false:specified queries profile will be reported. - * @throws AnalysisException - */ - private void triggerProfileReport(Collection items, boolean allQuery) throws AnalysisException { - final Map requests = Maps.newHashMap(); - final Map brpcAddresses = Maps.newHashMap(); - for (QueryStatisticsItem item : items) { - for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo : item.getFragmentInstanceInfos()) { - // use brpc address - TNetworkAddress brpcNetAddress = brpcAddresses.get(instanceInfo.getAddress()); - if (brpcNetAddress == null) { - try { - brpcNetAddress = toBrpcHost(instanceInfo.getAddress()); - brpcAddresses.put(instanceInfo.getAddress(), brpcNetAddress); - } catch (Exception e) { - LOG.warn(e.getMessage()); - throw new AnalysisException(e.getMessage()); - } - } - // merge different requests - Request request = requests.get(brpcNetAddress); - if (request == null) { - request = new Request(brpcNetAddress); - requests.put(brpcNetAddress, request); - } - // specified query instance which will report. - if (!allQuery) { - final Types.PUniqueId pUId = Types.PUniqueId.newBuilder() - .setHi(instanceInfo.getInstanceId().hi) - .setLo(instanceInfo.getInstanceId().lo) - .build(); - request.addInstanceId(pUId); - } - } - } - recvResponse(sendRequest(requests)); - } - - private List>> sendRequest( - Map requests) throws AnalysisException { - final List>> futures = Lists.newArrayList(); - for (TNetworkAddress address : requests.keySet()) { - final Request request = requests.get(address); - final InternalService.PTriggerProfileReportRequest pbRequest = InternalService.PTriggerProfileReportRequest - .newBuilder().addAllInstanceIds(request.getInstanceIds()).build(); - try { - futures.add(Pair.create(request, BackendServiceProxy.getInstance(). - triggerProfileReportAsync(address, pbRequest))); - } catch (RpcException e) { - throw new AnalysisException("Sending request fails for query's execution information."); - } - } - return futures; - } - - private void recvResponse(List>> futures) - throws AnalysisException { - final String reasonPrefix = "Fail to receive result."; - for (Pair> pair : futures) { - try { - final InternalService.PTriggerProfileReportResult result - = pair.second.get(2, TimeUnit.SECONDS); - final TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - String errMsg = ""; - if (!result.getStatus().getErrorMsgsList().isEmpty()) { - errMsg = result.getStatus().getErrorMsgs(0); - } - throw new AnalysisException(reasonPrefix + " backend:" + pair.first.getAddress() - + " reason:" + errMsg); - } - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.warn(reasonPrefix + " reason:" + e.getCause()); - throw new AnalysisException(reasonPrefix); - } - - } - } - - private TNetworkAddress toBrpcHost(TNetworkAddress host) throws AnalysisException { - final Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort( - host.getHostname(), host.getPort()); - if (backend == null) { - throw new AnalysisException(new StringBuilder("Backend ") - .append(host.getHostname()) - .append(":") - .append(host.getPort()) - .append(" does not exist") - .toString()); - } - if (backend.getBrpcPort() < 0) { - throw new AnalysisException("BRPC port isn't exist."); - } - return new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - } - - public static class QueryStatistics { final List> counterMaps; @@ -344,26 +196,4 @@ public class CurrentQueryInfoProvider { return statistics.getScanBytes(); } } - - private static class Request { - private final TNetworkAddress address; - private final List instanceIds; - - public Request(TNetworkAddress address) { - this.address = address; - this.instanceIds = Lists.newArrayList(); - } - - public TNetworkAddress getAddress() { - return address; - } - - public List getInstanceIds() { - return instanceIds; - } - - public void addInstanceId(Types.PUniqueId instanceId) { - this.instanceIds.add(instanceId); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java index c0239fbfb9..af42797d17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java @@ -32,8 +32,10 @@ import org.apache.logging.log4j.Logger; import java.util.Comparator; import java.util.List; import java.util.Map; + /* * show proc "/current_queries" + * only set variable "set is_report_success = true" to enable "ScanBytes" and "ProcessRows". */ public class CurrentQueryStatisticsProcDir implements ProcDirInterface { private static final Logger LOG = LogManager.getLogger(CurrentQueryStatisticsProcDir.class); @@ -78,12 +80,17 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface { values.add(item.getConnId()); values.add(item.getDb()); values.add(item.getUser()); - final CurrentQueryInfoProvider.QueryStatistics statistics - = statisticsMap.get(item.getQueryId()); - values.add(QueryStatisticsFormatter.getScanBytes( - statistics.getScanBytes())); - values.add(QueryStatisticsFormatter.getRowsReturned( - statistics.getRowsReturned())); + if (item.getIsReportSucc()) { + final CurrentQueryInfoProvider.QueryStatistics statistics + = statisticsMap.get(item.getQueryId()); + values.add(QueryStatisticsFormatter.getScanBytes( + statistics.getScanBytes())); + values.add(QueryStatisticsFormatter.getRowsReturned( + statistics.getRowsReturned())); + } else { + values.add("N/A"); + values.add("N/A"); + } values.add(item.getQueryExecTime()); sortedRowData.add(values); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index f4b4ca591b..448090e6f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -62,9 +62,10 @@ public class ProfileManager { public static final String TOTAL_TIME = "Total"; public static final String QUERY_TYPE = "Query Type"; public static final String QUERY_STATE = "Query State"; - public static final String SQL_STATEMENT = "Sql Statement"; + public static final String DORIS_VERSION = "Doris Version"; public static final String USER = "User"; public static final String DEFAULT_DB = "Default Db"; + public static final String SQL_STATEMENT = "Sql Statement"; public static final String IS_CACHED = "Is Cached"; public static final ArrayList PROFILE_HEADERS = new ArrayList( @@ -78,13 +79,14 @@ public class ProfileManager { public String errMsg = ""; } - // only protect profileDeque; profileMap is concurrent, no need to protect + // only protect queryIdDeque; queryIdToProfileMap is concurrent, no need to protect private ReentrantReadWriteLock lock; private ReadLock readLock; private WriteLock writeLock; - private Deque profileDeque; - private Map profileMap; // from QueryId to RuntimeProfile + // record the order of profiles by queryId + private Deque queryIdDeque; + private Map queryIdToProfileMap; // from QueryId to RuntimeProfile public static ProfileManager getInstance() { if (INSTANCE == null) { @@ -101,8 +103,8 @@ public class ProfileManager { lock = new ReentrantReadWriteLock(true); readLock = lock.readLock(); writeLock = lock.writeLock(); - profileDeque = new LinkedList(); - profileMap = new ConcurrentHashMap(); + queryIdDeque = new LinkedList<>(); + queryIdToProfileMap = new ConcurrentHashMap<>(); } public ProfileElement createElement(RuntimeProfile profile) { @@ -117,7 +119,7 @@ public class ProfileManager { builder.build(); } catch (Exception e) { element.errMsg = e.getMessage(); - LOG.warn("failed to build profile tree", e); + LOG.debug("failed to build profile tree", e); return element; } @@ -139,15 +141,19 @@ public class ProfileManager { LOG.warn("the key or value of Map is null, " + "may be forget to insert 'QUERY_ID' column into infoStrings"); } - - profileMap.put(queryId, element); + + // a profile may be updated multiple times in queryIdToProfileMap, + // and only needs to be inserted into the queryIdDeque for the first time. + queryIdToProfileMap.put(queryId, element); writeLock.lock(); - try { - if (profileDeque.size() >= ARRAY_SIZE) { - profileMap.remove(profileDeque.getFirst().infoStrings.get(QUERY_ID)); - profileDeque.removeFirst(); + try { + if (!queryIdDeque.contains(queryId)) { + if (queryIdDeque.size() >= ARRAY_SIZE) { + queryIdToProfileMap.remove(queryIdDeque.getFirst()); + queryIdDeque.removeFirst(); + } + queryIdDeque.addLast(queryId); } - profileDeque.addLast(element); } finally { writeLock.unlock(); } @@ -157,10 +163,14 @@ public class ProfileManager { List> result = Lists.newArrayList(); readLock.lock(); try { - Iterator reverse = profileDeque.descendingIterator(); + Iterator reverse = queryIdDeque.descendingIterator(); while (reverse.hasNext()) { - ProfileElement element = (ProfileElement) reverse.next(); - Map infoStrings = element.infoStrings; + String queryId = (String) reverse.next(); + ProfileElement profileElement = queryIdToProfileMap.get(queryId); + if (profileElement == null){ + continue; + } + Map infoStrings = profileElement.infoStrings; List row = Lists.newArrayList(); for (String str : PROFILE_HEADERS ) { @@ -177,7 +187,7 @@ public class ProfileManager { public String getProfile(String queryID) { readLock.lock(); try { - ProfileElement element = profileMap.get(queryID); + ProfileElement element = queryIdToProfileMap.get(queryID); if (element == null) { return null; } @@ -191,7 +201,7 @@ public class ProfileManager { public String getFragmentProfileTreeString(String queryID) { readLock.lock(); try { - ProfileElement element = profileMap.get(queryID); + ProfileElement element = queryIdToProfileMap.get(queryID); if (element == null || element.builder == null) { return null; } @@ -209,7 +219,7 @@ public class ProfileManager { ProfileTreeNode tree; readLock.lock(); try { - ProfileElement element = profileMap.get(queryID); + ProfileElement element = queryIdToProfileMap.get(queryID); if (element == null || element.builder == null) { throw new AnalysisException("failed to get fragment profile tree. err: " + (element == null ? "not found" : element.errMsg)); @@ -224,7 +234,7 @@ public class ProfileManager { ProfileTreeBuilder builder; readLock.lock(); try { - ProfileElement element = profileMap.get(queryID); + ProfileElement element = queryIdToProfileMap.get(queryID); if (element == null || element.builder == null) { throw new AnalysisException("failed to get instance list. err: " + (element == null ? "not found" : element.errMsg)); @@ -241,7 +251,7 @@ public class ProfileManager { ProfileTreeBuilder builder; readLock.lock(); try { - ProfileElement element = profileMap.get(queryID); + ProfileElement element = queryIdToProfileMap.get(queryID); if (element == null || element.builder == null) { throw new AnalysisException("failed to get instance profile tree. err: " + (element == null ? "not found" : element.errMsg)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileWriter.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileWriter.java new file mode 100644 index 0000000000..3a472708de --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileWriter.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +// this interface is used to write profile to ProfileManager when a task is running. +public interface ProfileWriter { + + void writeProfile(boolean waitReportDone); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index 68eb1a6141..9d465020b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -31,18 +31,17 @@ import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Collections; -import java.util.Comparator; import java.util.Formatter; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * It is accessed by two kinds of thread, one is to create this RuntimeProfile - * , named 'query thread', the other is to call + * , named 'query thread', the other is to call * {@link org.apache.doris.common.proc.CurrentQueryInfoProvider}. */ public class RuntimeProfile { @@ -53,13 +52,16 @@ public class RuntimeProfile { private Map infoStrings = Maps.newHashMap(); private List infoStringsDisplayOrder = Lists.newArrayList(); + private ReentrantReadWriteLock infoStringsLock = new ReentrantReadWriteLock(); - // These will be hold by other thread. private Map counterMap = Maps.newConcurrentMap(); - private Map childMap = Maps.newConcurrentMap(); + private Map> childCounterMap = Maps.newConcurrentMap(); + // protect TreeSet in ChildCounterMap + private ReentrantReadWriteLock counterLock = new ReentrantReadWriteLock(); - private Map> childCounterMap = Maps.newHashMap(); + private Map childMap = Maps.newConcurrentMap(); private LinkedList> childList = Lists.newLinkedList(); + private ReentrantReadWriteLock childLock = new ReentrantReadWriteLock(); private String name; @@ -103,35 +105,40 @@ public class RuntimeProfile { } public Counter addCounter(String name, TUnit type, String parentCounterName) { - Counter counter = this.counterMap.get(name); - if (counter != null) { - return counter; - } else { - Preconditions.checkState(parentCounterName.equals(ROOT_COUNTER) - || this.counterMap.containsKey(parentCounterName)); - Counter newCounter = new Counter(type, 0); - this.counterMap.put(name, newCounter); - - Set childCounters = childCounterMap.get(parentCounterName); - if (childCounters == null) { - childCounterMap.put(parentCounterName, new TreeSet()); - childCounters = childCounterMap.get(parentCounterName); + counterLock.writeLock().lock(); + try { + Counter counter = this.counterMap.get(name); + if (counter != null) { + return counter; + } else { + Preconditions.checkState(parentCounterName.equals(ROOT_COUNTER) + || this.counterMap.containsKey(parentCounterName)); + Counter newCounter = new Counter(type, 0); + this.counterMap.put(name, newCounter); + + Set childCounters = childCounterMap.get(parentCounterName); + if (childCounters == null) { + childCounterMap.put(parentCounterName, new TreeSet()); + childCounters = childCounterMap.get(parentCounterName); + } + childCounters.add(name); + return newCounter; } - childCounters.add(name); - return newCounter; + } finally { + counterLock.writeLock().unlock(); } } - + public void update(final TRuntimeProfileTree thriftProfile) { - Reference idx = new Reference(0); + Reference idx = new Reference(0); update(thriftProfile.nodes, idx); Preconditions.checkState(idx.getRef().equals(thriftProfile.nodes.size())); } - + // preorder traversal, idx should be modified in the traversal process private void update(List nodes, Reference idx) { - TRuntimeProfileNode node = nodes.get(idx.getRef()); - + TRuntimeProfileNode node = nodes.get(idx.getRef()); + // update this level's counters if (node.counters != null) { for (TCounter tcounter : node.counters) { @@ -147,53 +154,71 @@ public class RuntimeProfile { } } } - + if (node.child_counters_map != null) { // update childCounters - for (Map.Entry> entry : - node.child_counters_map.entrySet()) { - String parentCounterName = entry.getKey(); - Set childCounters = childCounterMap.get(parentCounterName); - if (childCounters == null) { - childCounterMap.put(parentCounterName, new TreeSet()); - childCounters = childCounterMap.get(parentCounterName); - } - childCounters.addAll(entry.getValue()); + for (Map.Entry> entry : + node.child_counters_map.entrySet()) { + String parentCounterName = entry.getKey(); + + counterLock.writeLock().lock(); + try { + Set childCounters = childCounterMap.get(parentCounterName); + if (childCounters == null) { + childCounterMap.put(parentCounterName, new TreeSet()); + childCounters = childCounterMap.get(parentCounterName); + } + childCounters.addAll(entry.getValue()); + } finally { + counterLock.writeLock().unlock(); + } } } } - + if (node.info_strings_display_order != null) { Map nodeInfoStrings = node.info_strings; for (String key : node.info_strings_display_order) { String value = nodeInfoStrings.get(key); Preconditions.checkState(value != null); - if (this.infoStrings.containsKey(key)) { - // exists then replace - this.infoStrings.put(key, value); - } else { - this.infoStrings.put(key, value); - this.infoStringsDisplayOrder.add(key); + infoStringsLock.writeLock().lock(); + try { + if (this.infoStrings.containsKey(key)) { + // exists then replace + this.infoStrings.put(key, value); + } else { + this.infoStrings.put(key, value); + this.infoStringsDisplayOrder.add(key); + } + } finally { + infoStringsLock.writeLock().unlock(); } } } - + idx.setRef(idx.getRef() + 1); - - for (int i = 0; i < node.num_children; i ++) { + + for (int i = 0; i < node.num_children; i++) { TRuntimeProfileNode tchild = nodes.get(idx.getRef()); String childName = tchild.name; - RuntimeProfile childProfile = this.childMap.get(childName); - if (childProfile == null) { - childMap.put(childName, new RuntimeProfile(childName)); + RuntimeProfile childProfile; + + childLock.writeLock().lock(); + try { childProfile = this.childMap.get(childName); - Pair pair = Pair.create(childProfile, tchild.indent); - this.childList.add(pair); + if (childProfile == null) { + childMap.put(childName, new RuntimeProfile(childName)); + childProfile = this.childMap.get(childName); + Pair pair = Pair.create(childProfile, tchild.indent); + this.childList.add(pair); + } + } finally { + childLock.writeLock().unlock(); } childProfile.update(nodes, idx); } } - + // Print the profile: // 1. Profile Name // 2. Info Strings @@ -208,49 +233,64 @@ public class RuntimeProfile { if (counter.getValue() != 0) { try (Formatter fmt = new Formatter()) { builder.append("(Active: ") - .append(this.printCounter(counter.getValue(), counter.getType())) - .append(", % non-child: ").append(fmt.format("%.2f", localTimePercent)) - .append("%)"); + .append(this.printCounter(counter.getValue(), counter.getType())) + .append(", % non-child: ").append(fmt.format("%.2f", localTimePercent)) + .append("%)"); } } builder.append("\n"); - + // 2. info String - for (String key : this.infoStringsDisplayOrder) { - builder.append(prefix).append(" - ").append(key).append(": ") - .append(this.infoStrings.get(key)).append("\n"); + infoStringsLock.readLock().lock(); + try { + for (String key : this.infoStringsDisplayOrder) { + builder.append(prefix).append(" - ").append(key).append(": ") + .append(this.infoStrings.get(key)).append("\n"); + } + } finally { + infoStringsLock.readLock().unlock(); } - + // 3. counters printChildCounters(prefix, ROOT_COUNTER, builder); - + // 4. children - for (int i = 0; i < childList.size(); i++) { - Pair pair = childList.get(i); - boolean indent = pair.second; - RuntimeProfile profile = pair.first; - profile.prettyPrint(builder, prefix + (indent ? " " : "")); + childLock.readLock().lock(); + try { + for (int i = 0; i < childList.size(); i++) { + Pair pair = childList.get(i); + boolean indent = pair.second; + RuntimeProfile profile = pair.first; + profile.prettyPrint(builder, prefix + (indent ? " " : "")); + } + } finally { + childLock.readLock().unlock(); } } - + public String toString() { StringBuilder builder = new StringBuilder(); prettyPrint(builder, ""); return builder.toString(); } - + private void printChildCounters(String prefix, String counterName, StringBuilder builder) { Set childCounterSet = childCounterMap.get(counterName); if (childCounterSet == null) { return; } - - for (String childCounterName : childCounterSet) { - Counter counter = this.counterMap.get(childCounterName); - Preconditions.checkState(counter != null); - builder.append(prefix).append( " - " ).append(childCounterName).append(": ") - .append(printCounter(counter.getValue(), counter.getType())).append("\n"); - this.printChildCounters(prefix + " ", childCounterName, builder); + + counterLock.readLock().lock(); + try { + for (String childCounterName : childCounterSet) { + Counter counter = this.counterMap.get(childCounterName); + Preconditions.checkState(counter != null); + builder.append(prefix).append(" - ").append(childCounterName).append(": ") + .append(printCounter(counter.getValue(), counter.getType())).append("\n"); + this.printChildCounters(prefix + " ", childCounterName, builder); + } + } finally { + counterLock.readLock().unlock(); } } @@ -309,7 +349,7 @@ public class RuntimeProfile { builder.append(tmpValue); } else { builder.append(pair.first).append(pair.second) - .append(" ").append("/sec"); + .append(" ").append("/sec"); } break; } @@ -317,28 +357,43 @@ public class RuntimeProfile { Preconditions.checkState(false, "type=" + type); break; } - } + } return builder.toString(); } - + public void addChild(RuntimeProfile child) { if (child == null) { return; } - this.childMap.put(child.name, child); - Pair pair = Pair.create(child, true); - this.childList.add(pair); + childLock.writeLock().lock(); + try { + if (childMap.containsKey(child.name)) { + childList.removeIf(e -> e.first.name.equals(child.name)); + } + this.childMap.put(child.name, child); + Pair pair = Pair.create(child, true); + this.childList.add(pair); + } finally { + childLock.writeLock().unlock(); + } } public void addFirstChild(RuntimeProfile child) { if (child == null) { return; } - - this.childMap.put(child.name, child); - Pair pair = Pair.create(child, true); - this.childList.addFirst(pair); + childLock.writeLock().lock(); + try { + if (childMap.containsKey(child.name)) { + childList.removeIf(e -> e.first.name.equals(child.name)); + } + this.childMap.put(child.name, child); + Pair pair = Pair.create(child, true); + this.childList.addFirst(pair); + } finally { + childLock.writeLock().unlock(); + } } // Because the profile of summary and child fragment is not a real parent-child relationship @@ -347,53 +402,62 @@ public class RuntimeProfile { childMap.values(). forEach(RuntimeProfile::computeTimeInProfile); } - + public void computeTimeInProfile() { computeTimeInProfile(this.counterTotalTime.getValue()); } - + private void computeTimeInProfile(long total) { if (total == 0) { return; } - - // Add all the total times in all the children - long totalChildTime = 0; + childLock.readLock().lock(); + try { + // Add all the total times in all the children + long totalChildTime = 0; for (int i = 0; i < this.childList.size(); ++i) { totalChildTime += childList.get(i).first.getCounterTotalTime().getValue(); } + long localTime = this.getCounterTotalTime().getValue() - totalChildTime; // Counters have some margin, set to 0 if it was negative. localTime = Math.max(0, localTime); this.localTimePercent = Double.valueOf(localTime) / Double.valueOf(total); this.localTimePercent = Math.min(1.0, this.localTimePercent) * 100; - + // Recurse on children for (int i = 0; i < this.childList.size(); i++) { childList.get(i).first.computeTimeInProfile(total); } + } finally { + childLock.readLock().unlock(); + } } - + // from bigger to smaller public void sortChildren() { - Collections.sort(this.childList, new Comparator>() { - @Override - public int compare(Pair profile1, Pair profile2) - { - return Long.valueOf(profile2.first.getCounterTotalTime().getValue()) - .compareTo(profile1.first.getCounterTotalTime().getValue()); - } - }); + childLock.writeLock().lock(); + try { + this.childList.sort((profile1, profile2) -> Long.compare(profile2.first.getCounterTotalTime().getValue(), + profile1.first.getCounterTotalTime().getValue())); + } finally { + childLock.writeLock().unlock(); + } } - + public void addInfoString(String key, String value) { - String target = this.infoStrings.get(key); - if (target == null) { - this.infoStrings.put(key, value); - this.infoStringsDisplayOrder.add(key); - } else { - this.infoStrings.put(key, value); + infoStringsLock.writeLock().lock(); + try { + String target = this.infoStrings.get(key); + if (target == null) { + this.infoStrings.put(key, value); + this.infoStringsDisplayOrder.add(key); + } else { + this.infoStrings.put(key, value); + } + } finally { + infoStringsLock.writeLock().unlock(); } } @@ -401,7 +465,6 @@ public class RuntimeProfile { this.name = name; } - // Returns the value to which the specified key is mapped; // or null if this map contains no mapping for the key. public String getInfoString(String key) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index c36e6b9c7f..c891a0378c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -29,6 +29,7 @@ import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ListUtil; +import org.apache.doris.common.util.ProfileWriter; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.LoadErrorHub; @@ -156,6 +157,8 @@ public class Coordinator { private List fragmentProfile; + private ProfileWriter profileWriter; + // populated in computeFragmentExecParams() private Map fragmentExecParamsMap = Maps.newHashMap(); @@ -268,6 +271,14 @@ public class Coordinator { return queryProfile; } + public ProfileWriter getProfileWriter() { + return profileWriter; + } + + public void setProfileWriter(ProfileWriter profileWriter) { + this.profileWriter = profileWriter; + } + public List getDeltaUrls() { return deltaUrls; } @@ -1403,17 +1414,19 @@ public class Coordinator { jobId, params.backend_id, params.query_id, params.fragment_instance_id, params.loaded_rows, params.done); } - - return; } public void endProfile() { + endProfile(true); + } + + public void endProfile(boolean waitProfileDone) { if (backendExecStates.isEmpty()) { return; } - // wait for all backends - if (needReport) { + // Wait for all backends to finish reporting when writing profile last time. + if (waitProfileDone && needReport) { try { profileDoneSignal.await(2, TimeUnit.SECONDS); } catch (InterruptedException e1) { @@ -1701,7 +1714,7 @@ public class Coordinator { PlanFragmentId fragmentId; int instanceId; boolean initiated; - boolean done; + volatile boolean done; boolean hasCanceled; int profileFragmentId; RuntimeProfile profile; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 90da2077d0..4105afd59e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -17,9 +17,10 @@ package org.apache.doris.qe; - +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.ProfileWriter; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; @@ -33,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Map; +import java.util.concurrent.ExecutorService; public final class QeProcessorImpl implements QeProcessor { @@ -45,8 +47,13 @@ public final class QeProcessorImpl implements QeProcessor { INSTANCE = new QeProcessorImpl(); } + private ExecutorService writeProfileExecutor; + private QeProcessorImpl() { coordinatorMap = Maps.newConcurrentMap(); + // write profile to ProfileManager when query is running. + writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(1, 100, + "profile-write-pool", true); } @Override @@ -97,7 +104,8 @@ public final class QeProcessorImpl implements QeProcessor { .connId(String.valueOf(context.getConnectionId())) .db(context.getDatabase()) .fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos()) - .profile(info.getCoord().getQueryProfile()).build(); + .profile(info.getCoord().getQueryProfile()) + .isReportSucc(context.getSessionVariable().isReportSucc()).build(); querySet.put(queryIdStr, item); } return querySet; @@ -120,6 +128,9 @@ public final class QeProcessorImpl implements QeProcessor { } try { info.getCoord().updateFragmentExecStatus(params); + if (info.getCoord().getProfileWriter() != null && params.isSetProfile()) { + writeProfileExecutor.submit(new WriteProfileTask(params)); + } } catch (Exception e) { LOG.warn(e.getMessage()); return result; @@ -163,4 +174,25 @@ public final class QeProcessorImpl implements QeProcessor { return startExecTime; } } + + private class WriteProfileTask implements Runnable { + private TReportExecStatusParams params; + + WriteProfileTask(TReportExecStatusParams params) { + this.params = params; + } + + @Override + public void run() { + QueryInfo info = coordinatorMap.get(params.query_id); + if (info == null) { + return; + } + + ProfileWriter profileWriter = info.getCoord().getProfileWriter(); + if (profileWriter != null) { + profileWriter.writeProfile(false); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java index 247ce0a0b7..afe95c512d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java @@ -35,6 +35,7 @@ public final class QueryStatisticsItem { private final List fragmentInstanceInfos; // root query profile private final RuntimeProfile queryProfile; + private final boolean isReportSucc; private QueryStatisticsItem(Builder builder) { this.queryId = builder.queryId; @@ -45,6 +46,7 @@ public final class QueryStatisticsItem { this.queryStartTime = builder.queryStartTime; this.fragmentInstanceInfos = builder.fragmentInstanceInfos; this.queryProfile = builder.queryProfile; + this.isReportSucc = builder.isReportSucc; } public String getDb() { @@ -80,6 +82,10 @@ public final class QueryStatisticsItem { return queryProfile; } + public boolean getIsReportSucc () { + return isReportSucc; + } + public static final class Builder { private String queryId; private String db; @@ -89,6 +95,7 @@ public final class QueryStatisticsItem { private long queryStartTime; private List fragmentInstanceInfos; private RuntimeProfile queryProfile; + private boolean isReportSucc; public Builder() { fragmentInstanceInfos = Lists.newArrayList(); @@ -134,6 +141,11 @@ public final class QueryStatisticsItem { return this; } + public Builder isReportSucc(boolean isReportSucc) { + this.isReportSucc = isReportSucc; + return this; + } + public QueryStatisticsItem build() { initDefaultValue(this); return new QueryStatisticsItem(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index d97263eb55..b5b758834d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -59,6 +59,7 @@ import org.apache.doris.common.Version; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.ProfileManager; +import org.apache.doris.common.util.ProfileWriter; import org.apache.doris.common.util.QueryPlannerProfile; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.SqlParserUtils; @@ -110,7 +111,7 @@ import java.util.stream.Collectors; // Do one COM_QUERY process. // first: Parse receive byte array to statement struct. // second: Do handle function for statement. -public class StmtExecutor { +public class StmtExecutor implements ProfileWriter { private static final Logger LOG = LogManager.getLogger(StmtExecutor.class); private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0); @@ -122,6 +123,9 @@ public class StmtExecutor { private Analyzer analyzer; private RuntimeProfile profile; private RuntimeProfile summaryProfile; + private RuntimeProfile plannerRuntimeProfile; + private final Object writeProfileLock = new Object(); + private volatile boolean isFinishedProfile = false; private volatile Coordinator coord = null; private MasterOpExecutor masterOpExecutor = null; private RedirectStatus redirectStatus = null; @@ -156,38 +160,36 @@ public class StmtExecutor { } // At the end of query execution, we begin to add up profile - public void initProfile(QueryPlannerProfile plannerProfile) { - // Summary profile - profile = new RuntimeProfile("Query"); - summaryProfile = new RuntimeProfile("Summary"); - summaryProfile.addInfoString(ProfileManager.QUERY_ID, DebugUtil.printId(context.queryId())); - summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(context.getStartTime())); - + private void initProfile(QueryPlannerProfile plannerProfile, boolean waiteBeReport) { long currentTimestamp = System.currentTimeMillis(); long totalTimeMs = currentTimestamp - context.getStartTime(); - summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp)); - summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs)); + if (profile == null) { + profile = new RuntimeProfile("Query"); + summaryProfile = new RuntimeProfile("Summary"); + profile.addChild(summaryProfile); + summaryProfile.addInfoString(ProfileManager.QUERY_ID, DebugUtil.printId(context.queryId())); + summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(context.getStartTime())); + summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp)); + summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs)); + summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Query"); + summaryProfile.addInfoString(ProfileManager.QUERY_STATE, context.getState().toString()); + summaryProfile.addInfoString(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION); + summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser()); + summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase()); + summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, originStmt.originStmt); + summaryProfile.addInfoString(ProfileManager.IS_CACHED, isCached ? "Yes" : "No"); - summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Query"); - summaryProfile.addInfoString(ProfileManager.QUERY_STATE, context.getState().toString()); - summaryProfile.addInfoString("Doris Version", Version.DORIS_BUILD_VERSION); - summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser()); - summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase()); - summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, originStmt.originStmt); - summaryProfile.addInfoString(ProfileManager.IS_CACHED, isCached ? "Yes" : "No"); - - RuntimeProfile plannerRuntimeProfile = new RuntimeProfile("Execution Summary"); - plannerProfile.initRuntimeProfile(plannerRuntimeProfile); - summaryProfile.addChild(plannerRuntimeProfile); - - profile.addChild(summaryProfile); - - if (coord != null) { - coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(plannerProfile.getQueryBeginTime())); - coord.endProfile(); + plannerRuntimeProfile = new RuntimeProfile("Execution Summary"); + summaryProfile.addChild(plannerRuntimeProfile); profile.addChild(coord.getQueryProfile()); - coord = null; + } else { + summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp)); + summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs)); } + plannerProfile.initRuntimeProfile(plannerRuntimeProfile); + + coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(plannerProfile.getQueryBeginTime())); + coord.endProfile(waiteBeReport); } public Planner planner() { @@ -292,9 +294,7 @@ public class StmtExecutor { context.setQueryId(newQueryId); } handleQueryStmt(); - if (context.getSessionVariable().isReportSucc()) { - writeProfile(); - } + writeProfile(true); break; } catch (RpcException e) { if (i == retryTime - 1) { @@ -320,9 +320,7 @@ public class StmtExecutor { } else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InserStmt is its subclass try { handleInsertStmt(); - if (context.getSessionVariable().isReportSucc()) { - writeProfile(); - } + writeProfile(true); } catch (Throwable t) { LOG.warn("handle insert stmt fail", t); // the transaction of this insert may already begun, we will abort it at outer finally block. @@ -413,10 +411,20 @@ public class StmtExecutor { masterOpExecutor.execute(); } - private void writeProfile() { - initProfile(plannerProfile); - profile.computeTimeInChildProfile(); - ProfileManager.getInstance().pushProfile(profile); + @Override + public void writeProfile(boolean isLastWriteProfile) { + if (!context.getSessionVariable().isReportSucc()) { + return; + } + synchronized (writeProfileLock) { + if (isFinishedProfile) { + return; + } + initProfile(plannerProfile, isLastWriteProfile); + profile.computeTimeInChildProfile(); + ProfileManager.getInstance().pushProfile(profile); + isFinishedProfile = isLastWriteProfile; + } } // Analyze one statement to structure in memory. @@ -788,8 +796,10 @@ public class StmtExecutor { coord = new Coordinator(context, analyzer, planner); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + coord.setProfileWriter(this); coord.exec(); plannerProfile.setQueryScheduleFinishTime(); + writeProfile(false); while (true) { batch = coord.getNext(); // for outfile query, there will be only one empty batch send back with eos flag diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index e098a0def1..d9871c237f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -73,11 +73,6 @@ public class BackendServiceClient { return stub.clearCache(request); } - public Future triggerProfileReport( - InternalService.PTriggerProfileReportRequest request) { - return stub.triggerProfileReport(request); - } - public Future getInfo(InternalService.PProxyRequest request) { return stub.getInfo(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 1c4d785f93..b488dfde13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -157,18 +157,6 @@ public class BackendServiceProxy { } } - public Future triggerProfileReportAsync( - TNetworkAddress address, InternalService.PTriggerProfileReportRequest request) throws RpcException { - try { - final BackendServiceClient client = getProxy(address); - return client.triggerProfileReport(request); - } catch (Throwable e) { - LOG.warn("fetch data catch a exception, address={}:{}", - address.getHostname(), address.getPort(), e); - throw new RpcException(address.hostname, e.getMessage()); - } - } - public Future getInfo( TNetworkAddress address, InternalService.PProxyRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java index ebd5e91ff1..e8c9991fab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java @@ -260,7 +260,7 @@ public class ExportExportingTask extends MasterTask { summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Query"); summaryProfile.addInfoString(ProfileManager.QUERY_STATE, job.getState().toString()); - summaryProfile.addInfoString("Doris Version", Version.DORIS_BUILD_VERSION); + summaryProfile.addInfoString(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION); summaryProfile.addInfoString(ProfileManager.USER, "xxx"); summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, String.valueOf(job.getDbId())); summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, job.getSql()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 871cc59d1f..a65461d89a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -332,14 +332,6 @@ public class MockedBackendFactory { responseObserver.onCompleted(); } - @Override - public void triggerProfileReport(InternalService.PTriggerProfileReportRequest request, StreamObserver responseObserver) { - System.out.println("get triggerProfileReport request"); - responseObserver.onNext(InternalService.PTriggerProfileReportResult.newBuilder() - .setStatus(Status.PStatus.newBuilder().setStatusCode(0)).build()); - responseObserver.onCompleted(); - } - @Override public void getInfo(InternalService.PProxyRequest request, StreamObserver responseObserver) { System.out.println("get get_info request"); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 24560e0473..1a50e94dbc 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -211,14 +211,6 @@ message PClearCacheRequest { }; //End cache proto definition -message PTriggerProfileReportRequest { - repeated PUniqueId instance_ids = 1; -}; - -message PTriggerProfileReportResult { - required PStatus status = 1; -}; - message PStringPair { required string key = 1; required string val = 2; @@ -257,7 +249,6 @@ service PBackendService { rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult); rpc tablet_writer_add_batch(PTabletWriterAddBatchRequest) returns (PTabletWriterAddBatchResult); rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult); - rpc trigger_profile_report(PTriggerProfileReportRequest) returns (PTriggerProfileReportResult); rpc get_info(PProxyRequest) returns (PProxyResult); rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse); rpc fetch_cache(PFetchCacheRequest) returns (PFetchCacheResult); diff --git a/gensrc/proto/palo_internal_service.proto b/gensrc/proto/palo_internal_service.proto index 31adb767d2..c57ca2464c 100644 --- a/gensrc/proto/palo_internal_service.proto +++ b/gensrc/proto/palo_internal_service.proto @@ -34,7 +34,6 @@ service PInternalService { rpc tablet_writer_open(doris.PTabletWriterOpenRequest) returns (doris.PTabletWriterOpenResult); rpc tablet_writer_add_batch(doris.PTabletWriterAddBatchRequest) returns (doris.PTabletWriterAddBatchResult); rpc tablet_writer_cancel(doris.PTabletWriterCancelRequest) returns (doris.PTabletWriterCancelResult); - rpc trigger_profile_report(doris.PTriggerProfileReportRequest) returns (doris.PTriggerProfileReportResult); rpc get_info(doris.PProxyRequest) returns (doris.PProxyResult); rpc update_cache(doris.PUpdateCacheRequest) returns (doris.PCacheResponse); rpc fetch_cache(doris.PFetchCacheRequest) returns (doris.PFetchCacheResult);