[feature](tvf) support max_filter_ratio (#35431) (#36911)

bp #35431

Co-authored-by: 苏小刚 <suxiaogang223@icloud.com>
This commit is contained in:
Mingyu Chen
2024-06-27 20:58:53 +08:00
committed by GitHub
parent bfd634f9c7
commit 5c1eef5f06
7 changed files with 73 additions and 9 deletions

View File

@ -162,13 +162,19 @@ public abstract class AbstractInsertExecutor {
}
}
private void checkStrictMode() throws Exception {
private void checkStrictModeAndFilterRatio() throws Exception {
// if in strict mode, insert will fail if there are filtered rows
if (ctx.getSessionVariable().getEnableInsertStrict()) {
if (filteredRows > 0) {
ErrorReport.reportDdlException("Insert has filtered data in strict mode",
ErrorCode.ERR_FAILED_WHEN_INSERT);
}
} else {
if (filteredRows > ctx.getSessionVariable().getInsertMaxFilterRatio() * (filteredRows + loadedRows)) {
ErrorReport.reportDdlException("Insert has too many filtered data %d/%d insert_max_filter_ratio is %f",
ErrorCode.ERR_FAILED_WHEN_INSERT, filteredRows, filteredRows + loadedRows,
ctx.getSessionVariable().getInsertMaxFilterRatio());
}
}
}
@ -179,7 +185,7 @@ public abstract class AbstractInsertExecutor {
beforeExec();
try {
execImpl(executor, jobId);
checkStrictMode();
checkStrictModeAndFilterRatio();
onComplete();
} catch (Throwable t) {
onFail(t);

View File

@ -98,7 +98,8 @@ public class GroupCommitPlanner {
}
streamLoadPutRequest
.setDb(db.getFullName())
.setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict ? 0 : 1)
.setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict ? 0
: ConnectContext.get().getSessionVariable().insertMaxFilterRatio)
.setTbl(table.getName())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)

View File

@ -128,6 +128,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String PROFILE_LEVEL = "profile_level";
public static final String MAX_INSTANCE_NUM = "max_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String INSERT_MAX_FILTER_RATIO = "insert_max_filter_ratio";
public static final String ENABLE_SPILLING = "enable_spilling";
public static final String ENABLE_SHORT_CIRCUIT_QUERY = "enable_short_circuit_point_query";
public static final String ENABLE_EXCHANGE_NODE_PARALLEL_MERGE = "enable_exchange_node_parallel_merge";
@ -848,6 +849,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT, needForward = true)
public boolean enableInsertStrict = true;
@VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO, needForward = true)
public double insertMaxFilterRatio = 1.0;
@VariableMgr.VarAttr(name = ENABLE_ODBC_TRANSCATION)
public boolean enableOdbcTransaction = false;
@ -2463,6 +2467,14 @@ public class SessionVariable implements Serializable, Writable {
this.enableInsertStrict = enableInsertStrict;
}
public double getInsertMaxFilterRatio() {
return insertMaxFilterRatio;
}
public void setInsertMaxFilterRatio(double maxFilterRatio) {
this.insertMaxFilterRatio = maxFilterRatio;
}
public boolean isEnableSqlCache() {
return enableSqlCache;
}

View File

@ -1930,7 +1930,8 @@ public class StmtExecutor {
context.getTxnEntry()
.setTxnConf(new TTxnParams().setNeedTxn(true).setEnablePipelineTxnLoad(Config.enable_pipeline_load)
.setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("")
.setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0 : 1.0));
.setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0
: context.getSessionVariable().getInsertMaxFilterRatio()));
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'")
.append(TransactionStatus.PREPARE.name());
@ -2250,6 +2251,15 @@ public class StmtExecutor {
"Insert has filtered data in strict mode, tracking_url=" + coord.getTrackingUrl());
return;
}
} else {
if (filteredRows > context.getSessionVariable().getInsertMaxFilterRatio()
* (filteredRows + loadedRows)) {
context.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
String.format("Insert has too many filtered data %d/%d insert_max_filter_ratio is %f",
filteredRows, filteredRows + loadedRows,
context.getSessionVariable().getInsertMaxFilterRatio()));
return;
}
}
if (tblType != TableType.OLAP && tblType != TableType.MATERIALIZED_VIEW) {

View File

@ -187,6 +187,7 @@ public class StatisticsUtil {
sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes);
sessionVariable.cpuResourceLimit = Config.cpu_resource_limit_per_analyze_task;
sessionVariable.setEnableInsertStrict(true);
sessionVariable.setInsertMaxFilterRatio(1.0);
sessionVariable.enablePageCache = false;
sessionVariable.enableProfile = Config.enable_profile_when_analyze;
sessionVariable.parallelExecInstanceNum = Config.statistics_sql_parallel_exec_instance_num;

View File

@ -346,7 +346,6 @@
2 shanghai 2345672
3 hangzhou 2345673
4 shenzhen 2345674
5 guangzhou 2345675
-- !desc --
s_suppkey INT Yes false \N NONE

View File

@ -206,13 +206,13 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") {
"strip_outer_array" = "false",
"read_json_by_line" = "true") order by id; """
// test insert into select
// test insert into select in strict mode or insert_insert_max_filter_ratio is setted
def testTable = "test_hdfs_tvf"
sql "DROP TABLE IF EXISTS ${testTable}"
def result1 = sql """ CREATE TABLE IF NOT EXISTS ${testTable}
(
id int,
city varchar(50),
city varchar(8),
code int
)
COMMENT "test hdfs tvf table"
@ -225,6 +225,9 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") {
uri = "${defaultFS}" + "/user/doris/preinstalled_data/json_format_test/nest_json.json"
format = "json"
sql "set enable_insert_strict=false;"
sql "set insert_max_filter_ratio=0.2;"
def result2 = sql """ insert into ${testTable}(id,city,code)
select cast (id as INT) as id, city, cast (code as INT) as code
from HDFS(
@ -234,9 +237,41 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") {
"strip_outer_array" = "false",
"read_json_by_line" = "true",
"json_root" = "\$.item") """
sql "sync"
assertTrue(result2[0][0] == 5, "Insert should update 12 rows")
assertTrue(result2[0][0] == 4, "Insert should update 4 rows")
try{
sql "set insert_max_filter_ratio=0.1;"
def result3 = sql """ insert into ${testTable}(id,city,code)
select cast (id as INT) as id, city, cast (code as INT) as code
from HDFS(
"uri" = "${uri}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"strip_outer_array" = "false",
"read_json_by_line" = "true",
"json_root" = "\$.item") """
} catch (Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains('Insert has too many filtered data 1/5 insert_max_filter_ratio is 0.100000.'))
}
try{
sql " set enable_insert_strict=true;"
def result4 = sql """ insert into ${testTable}(id,city,code)
select cast (id as INT) as id, city, cast (code as INT) as code
from HDFS(
"uri" = "${uri}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"strip_outer_array" = "false",
"read_json_by_line" = "true",
"json_root" = "\$.item") """
} catch (Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains('Insert has filtered data in strict mode.'))
}
qt_insert """ select * from test_hdfs_tvf order by id; """
// test desc function