[Improvement](profile)Add tvf active_be_tasks() #31815
This commit is contained in:
@ -199,4 +199,54 @@ void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<TRow> RuntimeQueryStatiticsMgr::get_active_be_tasks_statistics(
|
||||
std::vector<std::string> filter_columns) {
|
||||
std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
|
||||
std::vector<TRow> table_rows;
|
||||
int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
|
||||
|
||||
for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
|
||||
TRow trow;
|
||||
|
||||
TQueryStatistics tqs;
|
||||
qs_ctx_ptr->collect_query_statistics(&tqs);
|
||||
|
||||
for (auto iter = filter_columns.begin(); iter != filter_columns.end(); iter++) {
|
||||
std::string col_name = *iter;
|
||||
|
||||
TCell tcell;
|
||||
if (col_name == "beid") {
|
||||
tcell.longVal = be_id;
|
||||
} else if (col_name == "fehost") {
|
||||
tcell.stringVal = qs_ctx_ptr->_fe_addr.hostname;
|
||||
} else if (col_name == "queryid") {
|
||||
tcell.stringVal = query_id;
|
||||
} else if (col_name == "tasktimems") {
|
||||
if (qs_ctx_ptr->_is_query_finished) {
|
||||
tcell.longVal = qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time;
|
||||
} else {
|
||||
tcell.longVal = MonotonicMillis() - qs_ctx_ptr->_query_start_time;
|
||||
}
|
||||
} else if (col_name == "taskcputimems") {
|
||||
tcell.longVal = tqs.cpu_ms;
|
||||
} else if (col_name == "scanrows") {
|
||||
tcell.longVal = tqs.scan_rows;
|
||||
} else if (col_name == "scanbytes") {
|
||||
tcell.longVal = tqs.scan_bytes;
|
||||
} else if (col_name == "bepeakmemorybytes") {
|
||||
tcell.longVal = tqs.max_peak_memory_bytes;
|
||||
} else if (col_name == "currentusedmemorybytes") {
|
||||
tcell.longVal = tqs.current_used_memory_bytes;
|
||||
} else if (col_name == "shufflesendbytes") {
|
||||
tcell.longVal = tqs.shuffle_send_bytes;
|
||||
} else if (col_name == "shufflesendRows") {
|
||||
tcell.longVal = tqs.shuffle_send_rows;
|
||||
}
|
||||
trow.column_value.push_back(tcell);
|
||||
}
|
||||
table_rows.push_back(trow);
|
||||
}
|
||||
return table_rows;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <gen_cpp/Data_types.h>
|
||||
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
|
||||
@ -66,6 +68,9 @@ public:
|
||||
void get_metric_map(std::string query_id,
|
||||
std::map<WorkloadMetricType, std::string>& metric_map);
|
||||
|
||||
// used for tvf active_queries
|
||||
std::vector<TRow> get_active_be_tasks_statistics(std::vector<std::string> filter_columns);
|
||||
|
||||
private:
|
||||
std::shared_mutex _qs_ctx_map_lock;
|
||||
std::map<std::string, std::unique_ptr<QueryStatisticsCtx>> _query_statistics_ctx_map;
|
||||
|
||||
@ -34,6 +34,7 @@
|
||||
#include "runtime/define_primitive_type.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/runtime_query_statistics_mgr.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/types.h"
|
||||
#include "util/thrift_rpc_helper.h"
|
||||
@ -95,7 +96,12 @@ Status VMetaScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju
|
||||
return Status::InternalError("Logical error, VMetaScanner do not allow ColumnNullable");
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
|
||||
if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ACTIVE_BE_TASKS) {
|
||||
// tvf active_be_tasks fetch data in be directly, it does not need to request FE for data
|
||||
RETURN_IF_ERROR(_build_active_be_tasks_data());
|
||||
} else {
|
||||
RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -288,6 +294,20 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VMetaScanner::_build_active_be_tasks_data() {
|
||||
std::vector<std::string> filter_columns;
|
||||
for (const auto& slot : _tuple_desc->slots()) {
|
||||
filter_columns.emplace_back(slot->col_name_lower_case());
|
||||
}
|
||||
|
||||
std::vector<TRow> ret =
|
||||
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_statistics(
|
||||
filter_columns);
|
||||
_batch_data = std::move(ret);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range,
|
||||
TFetchSchemaTableDataRequest* request) {
|
||||
VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request";
|
||||
|
||||
@ -91,6 +91,8 @@ private:
|
||||
TFetchSchemaTableDataRequest* request);
|
||||
Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range,
|
||||
TFetchSchemaTableDataRequest* request);
|
||||
Status _build_active_be_tasks_data();
|
||||
|
||||
bool _meta_eos;
|
||||
TupleId _tuple_id;
|
||||
TUserIdentity _user_identity;
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
|
||||
@ -59,7 +60,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper {
|
||||
tableValued(MvInfos.class, "mv_infos"),
|
||||
tableValued(Jobs.class, "jobs"),
|
||||
tableValued(Tasks.class, "tasks"),
|
||||
tableValued(WorkloadGroups.class, "workload_groups")
|
||||
tableValued(WorkloadGroups.class, "workload_groups"),
|
||||
tableValued(ActiveBeTasks.class, "active_be_tasks")
|
||||
);
|
||||
|
||||
public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions();
|
||||
|
||||
@ -0,0 +1,58 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.trees.expressions.functions.table;
|
||||
|
||||
import org.apache.doris.catalog.FunctionSignature;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.nereids.trees.expressions.Properties;
|
||||
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
|
||||
import org.apache.doris.nereids.types.coercion.AnyDataType;
|
||||
import org.apache.doris.tablefunction.ActiveBeTasksTableValuedFunction;
|
||||
import org.apache.doris.tablefunction.TableValuedFunctionIf;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* stands be running tasks status, currently main including select/streamload/broker load/insert select
|
||||
*/
|
||||
public class ActiveBeTasks extends TableValuedFunction {
|
||||
|
||||
public ActiveBeTasks(Properties properties) {
|
||||
super("active_be_tasks", properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FunctionSignature customSignature() {
|
||||
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TableValuedFunctionIf toCatalogFunction() {
|
||||
try {
|
||||
Map<String, String> arguments = getTVFProperties().getMap();
|
||||
return new ActiveBeTasksTableValuedFunction(arguments);
|
||||
} catch (Throwable t) {
|
||||
throw new AnalysisException("Can not build ActiveBeTasksTableValuedFunction by "
|
||||
+ this + ": " + t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
|
||||
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
|
||||
return visitor.visitActiveBeTasks(this, context);
|
||||
}
|
||||
}
|
||||
@ -47,7 +47,7 @@ public class ActiveQueries extends TableValuedFunction {
|
||||
Map<String, String> arguments = getTVFProperties().getMap();
|
||||
return new ActiveQueriesTableValuedFunction(arguments);
|
||||
} catch (Throwable t) {
|
||||
throw new AnalysisException("Can not build FrontendsTableValuedFunction by "
|
||||
throw new AnalysisException("Can not build ActiveQueriesTableValuedFunction by "
|
||||
+ this + ": " + t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.nereids.trees.expressions.visitor;
|
||||
|
||||
import org.apache.doris.nereids.trees.expressions.functions.table.ActiveBeTasks;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.table.ActiveQueries;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
|
||||
@ -102,4 +103,8 @@ public interface TableValuedFunctionVisitor<R, C> {
|
||||
default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) {
|
||||
return visitTableValuedFunction(workloadGroups, context);
|
||||
}
|
||||
|
||||
default R visitActiveBeTasks(ActiveBeTasks beTasks, C context) {
|
||||
return visitTableValuedFunction(beTasks, context);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,76 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.tablefunction;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.thrift.TMetaScanRange;
|
||||
import org.apache.doris.thrift.TMetadataType;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ActiveBeTasksTableValuedFunction extends MetadataTableValuedFunction {
|
||||
|
||||
public static final String NAME = "active_be_tasks";
|
||||
|
||||
private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
|
||||
new Column("BeId", PrimitiveType.BIGINT),
|
||||
new Column("FeHost", ScalarType.createStringType()),
|
||||
new Column("QueryId", ScalarType.createStringType()),
|
||||
new Column("TaskTimeMs", PrimitiveType.BIGINT),
|
||||
new Column("TaskCpuTimeMs", PrimitiveType.BIGINT),
|
||||
new Column("ScanRows", PrimitiveType.BIGINT),
|
||||
new Column("ScanBytes", PrimitiveType.BIGINT),
|
||||
new Column("BePeakMemoryBytes", PrimitiveType.BIGINT),
|
||||
new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT),
|
||||
new Column("ShuffleSendBytes", PrimitiveType.BIGINT),
|
||||
new Column("ShuffleSendRows", PrimitiveType.BIGINT));
|
||||
|
||||
public ActiveBeTasksTableValuedFunction(Map<String, String> params) throws AnalysisException {
|
||||
if (params.size() != 0) {
|
||||
throw new AnalysisException("ActiveBeTasks table-valued-function does not support any params");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TMetadataType getMetadataType() {
|
||||
return TMetadataType.ACTIVE_BE_TASKS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TMetaScanRange getMetaScanRange() {
|
||||
TMetaScanRange metaScanRange = new TMetaScanRange();
|
||||
metaScanRange.setMetadataType(TMetadataType.ACTIVE_BE_TASKS);
|
||||
return metaScanRange;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTableName() {
|
||||
return "ActiveBeTasksTableValuedFunction";
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Column> getTableColumns() throws AnalysisException {
|
||||
return SCHEMA;
|
||||
}
|
||||
}
|
||||
@ -68,7 +68,7 @@ public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunctio
|
||||
|
||||
public ActiveQueriesTableValuedFunction(Map<String, String> params) throws AnalysisException {
|
||||
if (params.size() != 0) {
|
||||
throw new AnalysisException("Queries table-valued-function does not support any params");
|
||||
throw new AnalysisException("ActiveQueries table-valued-function does not support any params");
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,7 +90,7 @@ public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunctio
|
||||
|
||||
@Override
|
||||
public String getTableName() {
|
||||
return "QueriesTableValuedFunction";
|
||||
return "ActiveQueriesTableValuedFunction";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -78,6 +78,8 @@ public abstract class TableValuedFunctionIf {
|
||||
return new ActiveQueriesTableValuedFunction(params);
|
||||
case WorkloadSchedPolicyTableValuedFunction.NAME:
|
||||
return new WorkloadSchedPolicyTableValuedFunction(params);
|
||||
case ActiveBeTasksTableValuedFunction.NAME:
|
||||
return new ActiveBeTasksTableValuedFunction(params);
|
||||
default:
|
||||
throw new AnalysisException("Could not find table function " + funcName);
|
||||
}
|
||||
|
||||
@ -711,7 +711,8 @@ enum TMetadataType {
|
||||
JOBS,
|
||||
TASKS,
|
||||
QUERIES,
|
||||
WORKLOAD_SCHED_POLICY
|
||||
WORKLOAD_SCHED_POLICY,
|
||||
ACTIVE_BE_TASKS,
|
||||
}
|
||||
|
||||
enum TIcebergQueryType {
|
||||
|
||||
Reference in New Issue
Block a user