diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index 64bf625fef..cb9073c77e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -162,13 +162,19 @@ public abstract class AbstractInsertExecutor { } } - private void checkStrictMode() throws Exception { + private void checkStrictModeAndFilterRatio() throws Exception { // if in strict mode, insert will fail if there are filtered rows if (ctx.getSessionVariable().getEnableInsertStrict()) { if (filteredRows > 0) { ErrorReport.reportDdlException("Insert has filtered data in strict mode", ErrorCode.ERR_FAILED_WHEN_INSERT); } + } else { + if (filteredRows > ctx.getSessionVariable().getInsertMaxFilterRatio() * (filteredRows + loadedRows)) { + ErrorReport.reportDdlException("Insert has too many filtered data %d/%d insert_max_filter_ratio is %f", + ErrorCode.ERR_FAILED_WHEN_INSERT, filteredRows, filteredRows + loadedRows, + ctx.getSessionVariable().getInsertMaxFilterRatio()); + } } } @@ -179,7 +185,7 @@ public abstract class AbstractInsertExecutor { beforeExec(); try { execImpl(executor, jobId); - checkStrictMode(); + checkStrictModeAndFilterRatio(); onComplete(); } catch (Throwable t) { onFail(t); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 9ff06b2907..253d12ad86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -98,7 +98,8 @@ public class GroupCommitPlanner { } streamLoadPutRequest .setDb(db.getFullName()) - .setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict ? 0 : 1) + .setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict ? 0 + : ConnectContext.get().getSessionVariable().insertMaxFilterRatio) .setTbl(table.getName()) .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 534b4f3386..7c2257f671 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -128,6 +128,7 @@ public class SessionVariable implements Serializable, Writable { public static final String PROFILE_LEVEL = "profile_level"; public static final String MAX_INSTANCE_NUM = "max_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; + public static final String INSERT_MAX_FILTER_RATIO = "insert_max_filter_ratio"; public static final String ENABLE_SPILLING = "enable_spilling"; public static final String ENABLE_SHORT_CIRCUIT_QUERY = "enable_short_circuit_point_query"; public static final String ENABLE_EXCHANGE_NODE_PARALLEL_MERGE = "enable_exchange_node_parallel_merge"; @@ -848,6 +849,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT, needForward = true) public boolean enableInsertStrict = true; + @VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO, needForward = true) + public double insertMaxFilterRatio = 1.0; + @VariableMgr.VarAttr(name = ENABLE_ODBC_TRANSCATION) public boolean enableOdbcTransaction = false; @@ -2463,6 +2467,14 @@ public class SessionVariable implements Serializable, Writable { this.enableInsertStrict = enableInsertStrict; } + public double getInsertMaxFilterRatio() { + return insertMaxFilterRatio; + } + + public void setInsertMaxFilterRatio(double maxFilterRatio) { + this.insertMaxFilterRatio = maxFilterRatio; + } + public boolean isEnableSqlCache() { return enableSqlCache; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 6edceff76d..1e63a75074 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1930,7 +1930,8 @@ public class StmtExecutor { context.getTxnEntry() .setTxnConf(new TTxnParams().setNeedTxn(true).setEnablePipelineTxnLoad(Config.enable_pipeline_load) .setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("") - .setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0 : 1.0)); + .setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0 + : context.getSessionVariable().getInsertMaxFilterRatio())); StringBuilder sb = new StringBuilder(); sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'") .append(TransactionStatus.PREPARE.name()); @@ -2250,6 +2251,15 @@ public class StmtExecutor { "Insert has filtered data in strict mode, tracking_url=" + coord.getTrackingUrl()); return; } + } else { + if (filteredRows > context.getSessionVariable().getInsertMaxFilterRatio() + * (filteredRows + loadedRows)) { + context.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT, + String.format("Insert has too many filtered data %d/%d insert_max_filter_ratio is %f", + filteredRows, filteredRows + loadedRows, + context.getSessionVariable().getInsertMaxFilterRatio())); + return; + } } if (tblType != TableType.OLAP && tblType != TableType.MATERIALIZED_VIEW) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 35037bb7e2..2be007b141 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -187,6 +187,7 @@ public class StatisticsUtil { sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes); sessionVariable.cpuResourceLimit = Config.cpu_resource_limit_per_analyze_task; sessionVariable.setEnableInsertStrict(true); + sessionVariable.setInsertMaxFilterRatio(1.0); sessionVariable.enablePageCache = false; sessionVariable.enableProfile = Config.enable_profile_when_analyze; sessionVariable.parallelExecInstanceNum = Config.statistics_sql_parallel_exec_instance_num; diff --git a/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out b/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out index 98a1324abd..91e2ec2d33 100644 --- a/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out +++ b/regression-test/data/external_table_p0/tvf/test_hdfs_tvf.out @@ -346,7 +346,6 @@ 2 shanghai 2345672 3 hangzhou 2345673 4 shenzhen 2345674 -5 guangzhou 2345675 -- !desc -- s_suppkey INT Yes false \N NONE diff --git a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy index aa1fc8712b..abdfd871a1 100644 --- a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy @@ -206,13 +206,13 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { "strip_outer_array" = "false", "read_json_by_line" = "true") order by id; """ - // test insert into select + // test insert into select in strict mode or insert_insert_max_filter_ratio is setted def testTable = "test_hdfs_tvf" sql "DROP TABLE IF EXISTS ${testTable}" def result1 = sql """ CREATE TABLE IF NOT EXISTS ${testTable} ( id int, - city varchar(50), + city varchar(8), code int ) COMMENT "test hdfs tvf table" @@ -225,6 +225,9 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { uri = "${defaultFS}" + "/user/doris/preinstalled_data/json_format_test/nest_json.json" format = "json" + + sql "set enable_insert_strict=false;" + sql "set insert_max_filter_ratio=0.2;" def result2 = sql """ insert into ${testTable}(id,city,code) select cast (id as INT) as id, city, cast (code as INT) as code from HDFS( @@ -234,9 +237,41 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { "strip_outer_array" = "false", "read_json_by_line" = "true", "json_root" = "\$.item") """ - sql "sync" - assertTrue(result2[0][0] == 5, "Insert should update 12 rows") + assertTrue(result2[0][0] == 4, "Insert should update 4 rows") + + try{ + sql "set insert_max_filter_ratio=0.1;" + def result3 = sql """ insert into ${testTable}(id,city,code) + select cast (id as INT) as id, city, cast (code as INT) as code + from HDFS( + "uri" = "${uri}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}", + "strip_outer_array" = "false", + "read_json_by_line" = "true", + "json_root" = "\$.item") """ + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains('Insert has too many filtered data 1/5 insert_max_filter_ratio is 0.100000.')) + } + + try{ + sql " set enable_insert_strict=true;" + def result4 = sql """ insert into ${testTable}(id,city,code) + select cast (id as INT) as id, city, cast (code as INT) as code + from HDFS( + "uri" = "${uri}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}", + "strip_outer_array" = "false", + "read_json_by_line" = "true", + "json_root" = "\$.item") """ + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains('Insert has filtered data in strict mode.')) + } + qt_insert """ select * from test_hdfs_tvf order by id; """ // test desc function