[refactor](jdbc catalog) refactor JdbcFunctionPushDownRule (#23826)

1. Change from using string matching function to using Expr matching
2. Replace the `nvl` function with `ifnull` when pushed down to MySQL
3. Adapt ClickHouse's `from_unixtime` function to push down
4. Non-function filtering can still be pushed down when `enable_func_pushdown` is set to false
This commit is contained in:
zy-kkk
2023-09-15 22:16:07 +08:00
committed by GitHub
parent ba4c738ac7
commit 1c142309a6
10 changed files with 265 additions and 64 deletions

View File

@ -17,33 +17,130 @@
package org.apache.doris.planner.external.jdbc;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.thrift.TOdbcTableType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.function.Predicate;
public class JdbcFunctionPushDownRule {
private static final TreeSet<String> UNSUPPORTED_MYSQL_FUNCTIONS = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
private static final Logger LOG = LogManager.getLogger(JdbcFunctionPushDownRule.class);
private static final TreeSet<String> MYSQL_UNSUPPORTED_FUNCTIONS = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
static {
UNSUPPORTED_MYSQL_FUNCTIONS.add("date_trunc");
UNSUPPORTED_MYSQL_FUNCTIONS.add("money_format");
MYSQL_UNSUPPORTED_FUNCTIONS.add("date_trunc");
MYSQL_UNSUPPORTED_FUNCTIONS.add("money_format");
}
public static boolean isUnsupportedFunctions(TOdbcTableType tableType, String filter) {
if (tableType.equals(TOdbcTableType.MYSQL)) {
return isMySQLUnsupportedFunctions(filter);
} else {
return false;
private static final TreeSet<String> CLICKHOUSE_SUPPORTED_FUNCTIONS = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
static {
CLICKHOUSE_SUPPORTED_FUNCTIONS.add("from_unixtime");
}
private static boolean isMySQLFunctionUnsupported(String functionName) {
return MYSQL_UNSUPPORTED_FUNCTIONS.contains(functionName.toLowerCase());
}
private static boolean isClickHouseFunctionUnsupported(String functionName) {
return !CLICKHOUSE_SUPPORTED_FUNCTIONS.contains(functionName.toLowerCase());
}
private static final Map<String, String> REPLACE_MYSQL_FUNCTIONS = Maps.newHashMap();
static {
REPLACE_MYSQL_FUNCTIONS.put("nvl", "ifnull");
}
private static boolean isReplaceMysqlFunctions(String functionName) {
return REPLACE_MYSQL_FUNCTIONS.containsKey(functionName.toLowerCase());
}
private static final Map<String, String> REPLACE_CLICKHOUSE_FUNCTIONS = Maps.newHashMap();
static {
REPLACE_CLICKHOUSE_FUNCTIONS.put("from_unixtime", "FROM_UNIXTIME");
}
private static boolean isReplaceClickHouseFunctions(String functionName) {
return REPLACE_CLICKHOUSE_FUNCTIONS.containsKey(functionName.toLowerCase());
}
public static Expr processFunctions(TOdbcTableType tableType, Expr expr, List<String> errors) {
if (tableType == null || expr == null) {
return expr;
}
Predicate<String> checkFunction;
Predicate<String> replaceFunction;
if (TOdbcTableType.MYSQL.equals(tableType)) {
replaceFunction = JdbcFunctionPushDownRule::isReplaceMysqlFunctions;
checkFunction = JdbcFunctionPushDownRule::isMySQLFunctionUnsupported;
} else if (TOdbcTableType.CLICKHOUSE.equals(tableType)) {
replaceFunction = JdbcFunctionPushDownRule::isReplaceClickHouseFunctions;
checkFunction = JdbcFunctionPushDownRule::isClickHouseFunctionUnsupported;
} else {
return expr;
}
return processFunctionsRecursively(expr, checkFunction, replaceFunction, errors, tableType);
}
private static boolean isMySQLUnsupportedFunctions(String filter) {
for (String func : UNSUPPORTED_MYSQL_FUNCTIONS) {
if (filter.contains(func)) {
return true;
private static Expr processFunctionsRecursively(Expr expr, Predicate<String> checkFunction,
Predicate<String> replaceFunction, List<String> errors, TOdbcTableType tableType) {
if (expr instanceof FunctionCallExpr) {
FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
String func = functionCallExpr.getFnName().getFunction();
Preconditions.checkArgument(!func.isEmpty(), "function can not be empty");
func = replaceFunctionNameIfNecessary(func, replaceFunction, functionCallExpr, tableType);
if (!func.isEmpty() && checkFunction.test(func)) {
String errMsg = "Unsupported function: " + func + " in expr: " + expr.toMySql()
+ " in JDBC Table Type: " + tableType;
LOG.warn(errMsg);
errors.add(errMsg);
}
}
return false;
List<Expr> children = expr.getChildren();
for (int i = 0; i < children.size(); i++) {
Expr child = children.get(i);
Expr newChild = processFunctionsRecursively(child, checkFunction, replaceFunction, errors, tableType);
expr.setChild(i, newChild);
}
return expr;
}
private static String replaceFunctionNameIfNecessary(String func, Predicate<String> replaceFunction,
FunctionCallExpr functionCallExpr, TOdbcTableType tableType) {
if (replaceFunction.test(func)) {
String newFunc;
if (TOdbcTableType.MYSQL.equals(tableType)) {
newFunc = REPLACE_MYSQL_FUNCTIONS.get(func.toLowerCase());
} else if (TOdbcTableType.CLICKHOUSE.equals(tableType)) {
newFunc = REPLACE_CLICKHOUSE_FUNCTIONS.get(func);
} else {
newFunc = null;
}
if (newFunc != null) {
functionCallExpr.setFnName(FunctionName.createBuiltinName(newFunc));
func = functionCallExpr.getFnName().getFunction();
}
}
return func;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.planner.external.jdbc;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
@ -113,7 +114,7 @@ public class JdbcScanNode extends ExternalScanNode {
break;
}
}
//clean conjusts cause graph sannnode no need conjuncts
// clean conjusts cause graph sannnode no need conjuncts
conjuncts = Lists.newArrayList();
}
@ -133,19 +134,32 @@ public class JdbcScanNode extends ExternalScanNode {
}
ArrayList<Expr> conjunctsList = Expr.cloneList(conjuncts, sMap);
List<String> errors = Lists.newArrayList();
List<Expr> pushDownConjuncts = collectConjunctsToPushDown(conjunctsList, errors);
for (Expr individualConjunct : pushDownConjuncts) {
String filter = conjunctExprToString(jdbcType, individualConjunct);
filters.add(filter);
conjuncts.remove(individualConjunct);
}
}
private List<Expr> collectConjunctsToPushDown(List<Expr> conjunctsList, List<String> errors) {
List<Expr> pushDownConjuncts = new ArrayList<>();
for (Expr p : conjunctsList) {
if (shouldPushDownConjunct(jdbcType, p)) {
String filter = conjunctExprToString(jdbcType, p);
if (filter.equals("TRUE")) {
filter = "1 = 1";
List<Expr> individualConjuncts = p.getConjuncts();
for (Expr individualConjunct : individualConjuncts) {
Expr newp = JdbcFunctionPushDownRule.processFunctions(jdbcType, individualConjunct, errors);
if (!errors.isEmpty()) {
errors.clear();
continue;
}
pushDownConjuncts.add(newp);
}
if (JdbcFunctionPushDownRule.isUnsupportedFunctions(jdbcType, filter)) {
continue;
}
filters.add(filter);
conjuncts.remove(p);
}
}
return pushDownConjuncts;
}
private void createJdbcColumns() {
@ -282,7 +296,7 @@ public class JdbcScanNode extends ExternalScanNode {
@Override
public int getNumInstances() {
return ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
? ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() : 1;
? ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() : 1;
}
@Override
@ -292,17 +306,22 @@ public class JdbcScanNode extends ExternalScanNode {
tbl.getId(), -1L);
}
// Now some database have different function call like doris, now doris do not
// push down the function call except MYSQL
public static boolean shouldPushDownConjunct(TOdbcTableType tableType, Expr expr) {
if (!tableType.equals(TOdbcTableType.MYSQL)) {
List<FunctionCallExpr> fnExprList = Lists.newArrayList();
expr.collect(FunctionCallExpr.class, fnExprList);
if (!fnExprList.isEmpty()) {
private static boolean shouldPushDownConjunct(TOdbcTableType tableType, Expr expr) {
if (containsFunctionCallExpr(expr)) {
if (tableType.equals(TOdbcTableType.MYSQL) || tableType.equals(TOdbcTableType.CLICKHOUSE)) {
return Config.enable_func_pushdown;
} else {
return false;
}
} else {
return true;
}
return Config.enable_func_pushdown;
}
private static boolean containsFunctionCallExpr(Expr expr) {
List<FunctionCallExpr> fnExprList = Lists.newArrayList();
expr.collect(FunctionCallExpr.class, fnExprList);
return !fnExprList.isEmpty();
}
public static String conjunctExprToString(TOdbcTableType tableType, Expr expr) {
@ -338,6 +357,11 @@ public class JdbcScanNode extends ExternalScanNode {
}
}
// only for old planner
if (expr.contains(BoolLiteral.class) && expr.getStringValue().equals("1") && expr.getChildren().isEmpty()) {
return "1 = 1";
}
return expr.toMySql();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.doris.planner.external.odbc;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
@ -28,6 +29,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.external.ExternalScanNode;
@ -181,7 +183,7 @@ public class OdbcScanNode extends ExternalScanNode {
}
ArrayList<Expr> odbcConjuncts = Expr.cloneList(conjuncts, sMap);
for (Expr p : odbcConjuncts) {
if (JdbcScanNode.shouldPushDownConjunct(odbcType, p)) {
if (shouldPushDownConjunct(odbcType, p)) {
String filter = JdbcScanNode.conjunctExprToString(odbcType, p);
filters.add(filter);
conjuncts.remove(p);
@ -224,4 +226,15 @@ public class OdbcScanNode extends ExternalScanNode {
return ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
? ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() : 1;
}
public static boolean shouldPushDownConjunct(TOdbcTableType tableType, Expr expr) {
if (!tableType.equals(TOdbcTableType.MYSQL)) {
List<FunctionCallExpr> fnExprList = Lists.newArrayList();
expr.collect(FunctionCallExpr.class, fnExprList);
if (!fnExprList.isEmpty()) {
return false;
}
}
return Config.enable_func_pushdown;
}
}