[fix](information_schema) fix backend_active_tasks table only return one backend's data (#50721) (#50722)
cherry pick from #50721
This commit is contained in:
@ -910,7 +910,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
SchemaScanNode scanNode = null;
|
||||
if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable(
|
||||
table.getName())) {
|
||||
scanNode = new BackendPartitionedSchemaScanNode(context.nextPlanNodeId(), tupleDescriptor,
|
||||
scanNode = new BackendPartitionedSchemaScanNode(context.nextPlanNodeId(), table, tupleDescriptor,
|
||||
schemaScan.getSchemaCatalog().orElse(null), schemaScan.getSchemaDatabase().orElse(null),
|
||||
schemaScan.getSchemaTable().orElse(null));
|
||||
} else {
|
||||
|
||||
@ -19,7 +19,6 @@ package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
@ -28,6 +27,7 @@ import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.PartitionKey;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.system.Backend;
|
||||
@ -59,6 +59,7 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode {
|
||||
// you need to the table's backend column id name to BACKEND_ID_COLUMN_SET
|
||||
// it's used to backend pruner, see computePartitionInfo;
|
||||
public static final Set<String> BEACKEND_ID_COLUMN_SET = new HashSet<>();
|
||||
private final TableIf tableIf;
|
||||
|
||||
static {
|
||||
BACKEND_TABLE.add("rowsets");
|
||||
@ -90,9 +91,10 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode {
|
||||
private Map<Long, Long> partitionIDToBackendID;
|
||||
private Collection<Long> selectedPartitionIds = Lists.newArrayList();
|
||||
|
||||
public BackendPartitionedSchemaScanNode(PlanNodeId id, TupleDescriptor desc,
|
||||
public BackendPartitionedSchemaScanNode(PlanNodeId id, TableIf table, TupleDescriptor desc,
|
||||
String schemaCatalog, String schemaDatabase, String schemaTable) {
|
||||
super(id, desc, schemaCatalog, schemaDatabase, schemaTable);
|
||||
this.tableIf = table;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -141,9 +143,9 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode {
|
||||
|
||||
private void computePartitionInfo() throws AnalysisException {
|
||||
List<Column> partitionColumns = new ArrayList<>();
|
||||
for (SlotDescriptor slotDesc : desc.getSlots()) {
|
||||
if (BEACKEND_ID_COLUMN_SET.contains(slotDesc.getColumn().getName().toLowerCase())) {
|
||||
partitionColumns.add(slotDesc.getColumn());
|
||||
for (Column column : tableIf.getColumns()) {
|
||||
if (BEACKEND_ID_COLUMN_SET.contains(column.getName().toLowerCase())) {
|
||||
partitionColumns.add(column);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1939,8 +1939,8 @@ public class SingleNodePlanner {
|
||||
case SCHEMA:
|
||||
if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable(
|
||||
tblRef.getDesc().getTable().getName())) {
|
||||
scanNode = new BackendPartitionedSchemaScanNode(ctx.getNextNodeId(), tblRef.getDesc(),
|
||||
null, null, null);
|
||||
scanNode = new BackendPartitionedSchemaScanNode(ctx.getNextNodeId(), tblRef.getTable(),
|
||||
tblRef.getDesc(), null, null, null);
|
||||
} else {
|
||||
scanNode = new SchemaScanNode(ctx.getNextNodeId(), tblRef.getDesc(), null, null, null);
|
||||
}
|
||||
|
||||
@ -0,0 +1,61 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.nereids.NereidsPlanner;
|
||||
import org.apache.doris.nereids.StatementContext;
|
||||
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
|
||||
import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.planner.BackendPartitionedSchemaScanNode;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.PlanNode;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class MetadataTableTest extends TestWithFeService {
|
||||
|
||||
@Override
|
||||
protected int backendNum() {
|
||||
return 2;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanBackendActiveTasks() throws Exception {
|
||||
useDatabase("information_schema");
|
||||
|
||||
LogicalPlan parsed = new NereidsParser().parseSingle(
|
||||
"select sum(SCAN_ROWS), sum(SCAN_BYTES) from backend_active_tasks where QUERY_ID = 'd299cb2156ef4870-aea578938f703503'");
|
||||
StatementContext statementContext = new StatementContext();
|
||||
NereidsPlanner nereidsPlanner = new NereidsPlanner(statementContext);
|
||||
nereidsPlanner.plan(new LogicalPlanAdapter(parsed, statementContext));
|
||||
List<PlanFragment> fragments = nereidsPlanner.getFragments();
|
||||
PlanNode planRoot = fragments.get(fragments.size() - 1).getPlanRoot();
|
||||
List<BackendPartitionedSchemaScanNode> scanNodes = new ArrayList<>();
|
||||
planRoot.collect(BackendPartitionedSchemaScanNode.class, scanNodes);
|
||||
BackendPartitionedSchemaScanNode scanNode = scanNodes.get(0);
|
||||
List<TScanRangeLocations> scanRangeLocations = scanNode.getScanRangeLocations(0);
|
||||
Assertions.assertEquals(backendNum(), scanRangeLocations.size());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user