[Fix](multi catalog)Fix nereids context table always use internal catalog bug (#21953)
The getTable function in CascadesContext only handles the internal catalog case (try to find table only in internal catalog and dbs). However, it should take all the external catalogs into consideration, otherwise, it will failed to find a table or get the wrong table while querying external table. This pr is to fix this bug.
This commit is contained in:
@ -29,6 +29,8 @@ import com.google.common.collect.Lists;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -134,6 +136,8 @@ public interface TableIf {
|
||||
|
||||
Optional<ColumnStatistic> getColumnStatistic(String colName);
|
||||
|
||||
void write(DataOutput out) throws IOException;
|
||||
|
||||
/**
|
||||
* Doris table type.
|
||||
*/
|
||||
|
||||
@ -17,10 +17,11 @@
|
||||
|
||||
package org.apache.doris.nereids;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.nereids.analyzer.Scope;
|
||||
import org.apache.doris.nereids.analyzer.UnboundRelation;
|
||||
import org.apache.doris.nereids.jobs.Job;
|
||||
@ -94,7 +95,7 @@ public class CascadesContext implements ScheduleContext {
|
||||
private final Map<SubqueryExpr, Boolean> subqueryExprIsAnalyzed;
|
||||
private final RuntimeFilterContext runtimeFilterContext;
|
||||
private Optional<Scope> outerScope = Optional.empty();
|
||||
private List<Table> tables = null;
|
||||
private List<TableIf> tables = null;
|
||||
|
||||
private boolean isRewriteRoot;
|
||||
private volatile boolean isTimeout = false;
|
||||
@ -210,7 +211,7 @@ public class CascadesContext implements ScheduleContext {
|
||||
return memo;
|
||||
}
|
||||
|
||||
public void setTables(List<Table> tables) {
|
||||
public void setTables(List<TableIf> tables) {
|
||||
this.tables = tables;
|
||||
}
|
||||
|
||||
@ -340,9 +341,9 @@ public class CascadesContext implements ScheduleContext {
|
||||
}
|
||||
|
||||
/** get table by table name, try to get from information from dumpfile first */
|
||||
public Table getTableByName(String tableName) {
|
||||
public TableIf getTableByName(String tableName) {
|
||||
Preconditions.checkState(tables != null);
|
||||
for (Table table : tables) {
|
||||
for (TableIf table : tables) {
|
||||
if (table.getName().equals(tableName)) {
|
||||
return table;
|
||||
}
|
||||
@ -350,7 +351,7 @@ public class CascadesContext implements ScheduleContext {
|
||||
return null;
|
||||
}
|
||||
|
||||
public List<Table> getTables() {
|
||||
public List<TableIf> getTables() {
|
||||
return tables;
|
||||
}
|
||||
|
||||
@ -388,19 +389,24 @@ public class CascadesContext implements ScheduleContext {
|
||||
return relations;
|
||||
}
|
||||
|
||||
private Table getTable(UnboundRelation unboundRelation) {
|
||||
private TableIf getTable(UnboundRelation unboundRelation) {
|
||||
List<String> nameParts = unboundRelation.getNameParts();
|
||||
switch (nameParts.size()) {
|
||||
case 1: { // table
|
||||
String ctlName = getConnectContext().getEnv().getCurrentCatalog().getName();
|
||||
String dbName = getConnectContext().getDatabase();
|
||||
return getTable(dbName, nameParts.get(0), getConnectContext().getEnv());
|
||||
return getTable(ctlName, dbName, nameParts.get(0), getConnectContext().getEnv());
|
||||
}
|
||||
case 2: { // db.table
|
||||
String ctlName = getConnectContext().getEnv().getCurrentCatalog().getName();
|
||||
String dbName = nameParts.get(0);
|
||||
if (!dbName.equals(getConnectContext().getDatabase())) {
|
||||
dbName = getConnectContext().getClusterName() + ":" + dbName;
|
||||
}
|
||||
return getTable(dbName, nameParts.get(1), getConnectContext().getEnv());
|
||||
return getTable(ctlName, dbName, nameParts.get(1), getConnectContext().getEnv());
|
||||
}
|
||||
case 3: { // catalog.db.table
|
||||
return getTable(nameParts.get(0), nameParts.get(1), nameParts.get(2), getConnectContext().getEnv());
|
||||
}
|
||||
default:
|
||||
throw new IllegalStateException("Table name [" + unboundRelation.getTableName() + "] is invalid.");
|
||||
@ -410,13 +416,22 @@ public class CascadesContext implements ScheduleContext {
|
||||
/**
|
||||
* Find table from catalog.
|
||||
*/
|
||||
public Table getTable(String dbName, String tableName, Env env) {
|
||||
Database db = env.getInternalCatalog().getDb(dbName)
|
||||
.orElseThrow(() -> new RuntimeException("Database [" + dbName + "] does not exist."));
|
||||
public TableIf getTable(String ctlName, String dbName, String tableName, Env env) {
|
||||
CatalogIf catalog = env.getCatalogMgr().getCatalog(ctlName);
|
||||
if (catalog == null) {
|
||||
throw new RuntimeException("Catalog [" + ctlName + "] does not exist.");
|
||||
}
|
||||
DatabaseIf db = catalog.getDbNullable(dbName);
|
||||
if (db == null) {
|
||||
throw new RuntimeException("Database [" + dbName + "] does not exist in catalog [" + ctlName + "].");
|
||||
}
|
||||
db.readLock();
|
||||
try {
|
||||
return db.getTable(tableName).orElseThrow(() -> new RuntimeException(
|
||||
"Table [" + tableName + "] does not exist in database [" + dbName + "]."));
|
||||
TableIf table = db.getTableNullable(tableName);
|
||||
if (table == null) {
|
||||
throw new RuntimeException("Table [" + tableName + "] does not exist in database [" + dbName + "].");
|
||||
}
|
||||
return table;
|
||||
} finally {
|
||||
db.readUnlock();
|
||||
}
|
||||
@ -428,8 +443,7 @@ public class CascadesContext implements ScheduleContext {
|
||||
public static class Lock implements AutoCloseable {
|
||||
|
||||
CascadesContext cascadesContext;
|
||||
|
||||
private final Stack<Table> locked = new Stack<>();
|
||||
private final Stack<TableIf> locked = new Stack<>();
|
||||
|
||||
/**
|
||||
* Try to acquire read locks on tables, throw runtime exception once the acquiring for read lock failed.
|
||||
@ -440,7 +454,7 @@ public class CascadesContext implements ScheduleContext {
|
||||
if (cascadesContext.getTables() == null) {
|
||||
cascadesContext.extractTables(plan);
|
||||
}
|
||||
for (Table table : cascadesContext.tables) {
|
||||
for (TableIf table : cascadesContext.tables) {
|
||||
if (!table.tryReadLock(1, TimeUnit.MINUTES)) {
|
||||
close();
|
||||
throw new RuntimeException(String.format("Failed to get read lock on table: %s", table.getName()));
|
||||
|
||||
@ -19,7 +19,7 @@ package org.apache.doris.nereids.minidump;
|
||||
|
||||
import org.apache.doris.catalog.ColocateTableIndex;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.nereids.NereidsPlanner;
|
||||
import org.apache.doris.nereids.StatementContext;
|
||||
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
|
||||
@ -61,7 +61,7 @@ public class Minidump {
|
||||
private String catalogName;
|
||||
|
||||
// metadata objects
|
||||
private List<Table> tables;
|
||||
private List<TableIf> tables;
|
||||
|
||||
private Map<String, ColumnStatistic> totalColumnStatisticMap = new HashMap<>();
|
||||
|
||||
@ -71,7 +71,7 @@ public class Minidump {
|
||||
|
||||
/** Minidump class used to save environment messages */
|
||||
public Minidump(String sql, SessionVariable sessionVariable,
|
||||
String parsedPlanJson, String resultPlanJson, List<Table> tables,
|
||||
String parsedPlanJson, String resultPlanJson, List<TableIf> tables,
|
||||
String catalogName, String dbName, Map<String, ColumnStatistic> totalColumnStatisticMap,
|
||||
Map<String, Histogram> totalHistogramMap, ColocateTableIndex colocateTableIndex) {
|
||||
this.sql = sql;
|
||||
@ -98,7 +98,7 @@ public class Minidump {
|
||||
return resultPlanJson;
|
||||
}
|
||||
|
||||
public List<Table> getTables() {
|
||||
public List<TableIf> getTables() {
|
||||
return tables;
|
||||
}
|
||||
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.nereids.minidump;
|
||||
import org.apache.doris.catalog.ColocateTableIndex;
|
||||
import org.apache.doris.catalog.SchemaTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
import org.apache.doris.statistics.Histogram;
|
||||
@ -83,7 +84,7 @@ public class MinidumpUtils {
|
||||
newSessionVariable.readFromJson(inputJSON.getString("SessionVariable"));
|
||||
String sql = inputJSON.getString("Sql");
|
||||
|
||||
List<Table> tables = new ArrayList<>();
|
||||
List<TableIf> tables = new ArrayList<>();
|
||||
String catalogName = inputJSON.getString("CatalogName");
|
||||
String dbName = inputJSON.getString("DbName");
|
||||
JSONArray tablesJson = (JSONArray) inputJSON.get("Tables");
|
||||
@ -131,9 +132,9 @@ public class MinidumpUtils {
|
||||
* serialize tables from Table in catalog to json format
|
||||
*/
|
||||
public static JSONArray serializeTables(
|
||||
String minidumpFileDir, String dbAndCatalogName, List<Table> tables) throws IOException {
|
||||
String minidumpFileDir, String dbAndCatalogName, List<TableIf> tables) throws IOException {
|
||||
JSONArray tablesJson = new JSONArray();
|
||||
for (Table table : tables) {
|
||||
for (TableIf table : tables) {
|
||||
if (table instanceof SchemaTable) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -21,7 +21,7 @@ import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.FunctionRegistry;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.telemetry.Telemetry;
|
||||
@ -181,7 +181,7 @@ public class ConnectContext {
|
||||
private StatementContext statementContext;
|
||||
private Map<String, PrepareStmtContext> preparedStmtCtxs = Maps.newHashMap();
|
||||
|
||||
private List<Table> tables = null;
|
||||
private List<TableIf> tables = null;
|
||||
|
||||
private Map<String, ColumnStatistic> totalColumnStatisticMap = new HashMap<>();
|
||||
|
||||
@ -289,11 +289,11 @@ public class ConnectContext {
|
||||
return this.preparedStmtCtxs.get(stmtName);
|
||||
}
|
||||
|
||||
public List<Table> getTables() {
|
||||
public List<TableIf> getTables() {
|
||||
return tables;
|
||||
}
|
||||
|
||||
public void setTables(List<Table> tables) {
|
||||
public void setTables(List<TableIf> tables) {
|
||||
this.tables = tables;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user