pick #38701
This commit is contained in:
@ -504,7 +504,7 @@ public class SchemaTable extends Table {
|
||||
.column("QUERY_ID", ScalarType.createVarchar(256))
|
||||
.column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
|
||||
.column("FE",
|
||||
ScalarType.createVarchar(64)).build()))
|
||||
ScalarType.createVarchar(64)).build(), true))
|
||||
.put("workload_policy",
|
||||
new SchemaTable(SystemIdGenerator.getNextId(), "workload_policy", TableType.SCHEMA,
|
||||
builder().column("ID", ScalarType.createType(PrimitiveType.BIGINT))
|
||||
@ -518,10 +518,17 @@ public class SchemaTable extends Table {
|
||||
.build()))
|
||||
.build();
|
||||
|
||||
private boolean fetchAllFe = false;
|
||||
|
||||
protected SchemaTable(long id, String name, TableType type, List<Column> baseSchema) {
|
||||
super(id, name, type, baseSchema);
|
||||
}
|
||||
|
||||
protected SchemaTable(long id, String name, TableType type, List<Column> baseSchema, boolean fetchAllFe) {
|
||||
this(id, name, type, baseSchema);
|
||||
this.fetchAllFe = fetchAllFe;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
throw new UnsupportedOperationException("Do not allow to write SchemaTable to image.");
|
||||
@ -535,6 +542,14 @@ public class SchemaTable extends Table {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static boolean isShouldFetchAllFe(String schemaTableName) {
|
||||
Table table = TABLE_MAP.get(schemaTableName);
|
||||
if (table != null && table instanceof SchemaTable) {
|
||||
return ((SchemaTable) table).fetchAllFe;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* For TABLE_MAP.
|
||||
**/
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.SchemaTable;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.UserException;
|
||||
@ -27,6 +28,8 @@ import org.apache.doris.datasource.FederationBackendPolicy;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.system.Frontend;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
@ -38,6 +41,7 @@ import com.google.common.collect.Lists;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -84,6 +88,21 @@ public class SchemaScanNode extends ScanNode {
|
||||
frontendPort = Config.rpc_port;
|
||||
}
|
||||
|
||||
private void setFeAddrList(TPlanNode msg) {
|
||||
if (SchemaTable.isShouldFetchAllFe(tableName)) {
|
||||
List<TNetworkAddress> feAddrList = new ArrayList();
|
||||
if (ConnectContext.get().getSessionVariable().showAllFeConnection) {
|
||||
List<Frontend> feList = Env.getCurrentEnv().getFrontends(null);
|
||||
for (Frontend fe : feList) {
|
||||
feAddrList.add(new TNetworkAddress(fe.getHost(), fe.getRpcPort()));
|
||||
}
|
||||
} else {
|
||||
feAddrList.add(new TNetworkAddress(frontendIP, frontendPort));
|
||||
}
|
||||
msg.schema_scan_node.setFeAddrList(feAddrList);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void toThrift(TPlanNode msg) {
|
||||
msg.node_type = TPlanNodeType.SCHEMA_SCAN_NODE;
|
||||
@ -116,6 +135,7 @@ public class SchemaScanNode extends ScanNode {
|
||||
|
||||
TUserIdentity tCurrentUser = ConnectContext.get().getCurrentUserIdentity().toThrift();
|
||||
msg.schema_scan_node.setCurrentUserIdent(tCurrentUser);
|
||||
setFeAddrList(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user