[fix](partial update) Fix session variable enable_unique_key_partial_update will affect the behavior of insert statement when the target table is not unique table (#28321)
This commit is contained in:
@ -158,6 +158,8 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
|
||||
private InsertType insertType = InsertType.NATIVE_INSERT;
|
||||
|
||||
boolean hasEmptyTargetColumns = false;
|
||||
|
||||
enum InsertType {
|
||||
NATIVE_INSERT("insert_"),
|
||||
UPDATE("update_"),
|
||||
@ -545,11 +547,7 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
Set<String> mentionedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
|
||||
List<String> realTargetColumnNames;
|
||||
if (targetColumnNames == null) {
|
||||
if (!isFromDeleteOrUpdateStmt
|
||||
&& analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
|
||||
throw new AnalysisException("You must explicitly specify the columns to be updated when "
|
||||
+ "updating partial columns using the INSERT statement.");
|
||||
}
|
||||
hasEmptyTargetColumns = true;
|
||||
// the mentioned columns are columns which are visible to user, so here we use
|
||||
// getBaseSchema(), not getFullSchema()
|
||||
for (Column col : targetTable.getBaseSchema(false)) {
|
||||
@ -651,15 +649,15 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
}
|
||||
}
|
||||
|
||||
if (analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
|
||||
trySetPartialUpdate();
|
||||
}
|
||||
|
||||
// check if size of select item equal with columns mentioned in statement
|
||||
if (mentionedColumns.size() != queryStmt.getResultExprs().size()) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_COUNT);
|
||||
}
|
||||
|
||||
if (analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
|
||||
trySetPartialUpdate();
|
||||
}
|
||||
|
||||
// Check if all columns mentioned is enough
|
||||
checkColumnCoverage(mentionedColumns, targetTable.getBaseSchema());
|
||||
|
||||
@ -1181,8 +1179,15 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
return;
|
||||
}
|
||||
OlapTable olapTable = (OlapTable) targetTable;
|
||||
if (olapTable.getKeysType() != KeysType.UNIQUE_KEYS) {
|
||||
return;
|
||||
}
|
||||
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
|
||||
throw new UserException("Partial update is only allowed in unique table with merge-on-write enabled.");
|
||||
throw new UserException("Partial update is only allowed on unique table with merge-on-write enabled.");
|
||||
}
|
||||
if (hasEmptyTargetColumns) {
|
||||
throw new AnalysisException("You must explicitly specify the columns to be updated when "
|
||||
+ "updating partial columns using the INSERT statement.");
|
||||
}
|
||||
for (Column col : olapTable.getFullSchema()) {
|
||||
boolean exists = false;
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
@ -78,7 +79,7 @@ public class BindSink implements AnalysisRuleFactory {
|
||||
Pair<Database, OlapTable> pair = bind(ctx.cascadesContext, sink);
|
||||
Database database = pair.first;
|
||||
OlapTable table = pair.second;
|
||||
boolean isPartialUpdate = sink.isPartialUpdate();
|
||||
boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS;
|
||||
|
||||
LogicalPlan child = ((LogicalPlan) sink.child());
|
||||
boolean childHasSeqCol = child.getOutput().stream()
|
||||
@ -106,7 +107,7 @@ public class BindSink implements AnalysisRuleFactory {
|
||||
if (isPartialUpdate) {
|
||||
// check the necessary conditions for partial updates
|
||||
if (!table.getEnableUniqueKeyMergeOnWrite()) {
|
||||
throw new AnalysisException("Partial update is only allowed in"
|
||||
throw new AnalysisException("Partial update is only allowed on "
|
||||
+ "unique table with merge-on-write enabled.");
|
||||
}
|
||||
if (sink.getColNames().isEmpty() && sink.isFromNativeInsertStmt()) {
|
||||
|
||||
@ -129,3 +129,19 @@
|
||||
3 3 3 2 3
|
||||
4 4 4 1 2
|
||||
|
||||
-- !sql --
|
||||
10000 2017-10-01 2017-10-01T08:00:05 北京 20 0 2017-10-01T06:00 20 10 10
|
||||
10000 2017-10-01 2017-10-01T09:00:05 北京 20 0 2017-10-01T07:00 15 2 2
|
||||
|
||||
-- !sql --
|
||||
10000 2017-10-01 2017-10-01T08:00:05 北京 20 0 2017-10-01T06:00 20 10 10
|
||||
10000 2017-10-01 2017-10-01T09:00:05 北京 20 0 2017-10-01T07:00 15 2 2
|
||||
|
||||
-- !sql --
|
||||
10000 2017-10-01 2017-10-01T08:00:05 北京 20 0 2017-10-01T06:00 20 10 10
|
||||
10000 2017-10-01 2017-10-01T09:00:05 北京 20 0 2017-10-01T07:00 15 2 2
|
||||
|
||||
-- !sql --
|
||||
10000 2017-10-01 2017-10-01T08:00:05 北京 20 0 2017-10-01T06:00 20 10 10
|
||||
10000 2017-10-01 2017-10-01T09:00:05 北京 20 0 2017-10-01T07:00 15 2 2
|
||||
|
||||
|
||||
@ -248,4 +248,96 @@ suite("test_partial_update_native_insert_stmt", "p0") {
|
||||
sql "sync;"
|
||||
}
|
||||
}
|
||||
|
||||
// test that session variable `enable_unique_key_partial_update` will only affect unique tables
|
||||
for (def use_nerieds : [true, false]) {
|
||||
logger.info("current params: use_nerieds: ${use_nerieds}")
|
||||
if (use_nerieds) {
|
||||
sql "set enable_nereids_planner=true;"
|
||||
sql "set enable_nereids_dml=true;"
|
||||
sql "set enable_fallback_to_original_planner=false;"
|
||||
sql "sync;"
|
||||
} else {
|
||||
sql "set enable_nereids_planner=false;"
|
||||
sql "set enable_nereids_dml=false;"
|
||||
sql "sync;"
|
||||
}
|
||||
|
||||
sql "set enable_unique_key_partial_update=true;"
|
||||
sql "sync;"
|
||||
|
||||
def tableName8 = "test_partial_update_native_insert_stmt_agg_${use_nerieds}"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName8}; """
|
||||
sql """ CREATE TABLE IF NOT EXISTS ${tableName8} (
|
||||
`user_id` LARGEINT NOT NULL,
|
||||
`date` DATE NOT NULL,
|
||||
`timestamp` DATETIME NOT NULL,
|
||||
`city` VARCHAR(20),
|
||||
`age` SMALLINT,
|
||||
`sex` TINYINT,
|
||||
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00",
|
||||
`cost` BIGINT SUM DEFAULT "0",
|
||||
`max_dwell_time` INT MAX DEFAULT "0",
|
||||
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
|
||||
)AGGREGATE KEY(`user_id`, `date`, `timestamp` ,`city`, `age`, `sex`)
|
||||
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
|
||||
PROPERTIES ("replication_allocation" = "tag.location.default: 1");"""
|
||||
|
||||
sql """insert into ${tableName8} values
|
||||
(10000,"2017-10-01","2017-10-01 08:00:05","北京",20,0,"2017-10-01 06:00:00",20,10,10),
|
||||
(10000,"2017-10-01","2017-10-01 09:00:05","北京",20,0,"2017-10-01 07:00:00",15,2,2); """
|
||||
qt_sql "select * from ${tableName8} order by user_id;"
|
||||
|
||||
|
||||
def tableName9 = "test_partial_update_native_insert_stmt_dup_${use_nerieds}"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName9}; """
|
||||
sql """ CREATE TABLE IF NOT EXISTS ${tableName9} (
|
||||
`user_id` LARGEINT NOT NULL,
|
||||
`date` DATE NOT NULL,
|
||||
`timestamp` DATETIME NOT NULL,
|
||||
`city` VARCHAR(20),
|
||||
`age` SMALLINT,
|
||||
`sex` TINYINT,
|
||||
`last_visit_date` DATETIME DEFAULT "1970-01-01 00:00:00",
|
||||
`cost` BIGINT DEFAULT "0",
|
||||
`max_dwell_time` INT DEFAULT "0",
|
||||
`min_dwell_time` INT DEFAULT "99999" COMMENT "用户最小停留时间"
|
||||
)DUPLICATE KEY(`user_id`, `date`, `timestamp` ,`city`, `age`, `sex`)
|
||||
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
|
||||
PROPERTIES ("replication_allocation" = "tag.location.default: 1");"""
|
||||
|
||||
sql """insert into ${tableName9} values
|
||||
(10000,"2017-10-01","2017-10-01 08:00:05","北京",20,0,"2017-10-01 06:00:00",20,10,10),
|
||||
(10000,"2017-10-01","2017-10-01 09:00:05","北京",20,0,"2017-10-01 07:00:00",15,2,2); """
|
||||
qt_sql "select * from ${tableName9} order by user_id;"
|
||||
|
||||
|
||||
def tableName10 = "test_partial_update_native_insert_stmt_mor_${use_nerieds}"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName10}; """
|
||||
sql """ CREATE TABLE IF NOT EXISTS ${tableName10} (
|
||||
`user_id` LARGEINT NOT NULL,
|
||||
`date` DATE NOT NULL,
|
||||
`timestamp` DATETIME NOT NULL,
|
||||
`city` VARCHAR(20),
|
||||
`age` SMALLINT,
|
||||
`sex` TINYINT,
|
||||
`last_visit_date` DATETIME DEFAULT "1970-01-01 00:00:00",
|
||||
`cost` BIGINT DEFAULT "0",
|
||||
`max_dwell_time` INT DEFAULT "0",
|
||||
`min_dwell_time` INT DEFAULT "99999" COMMENT "用户最小停留时间"
|
||||
)UNIQUE KEY(`user_id`, `date`, `timestamp` ,`city`, `age`, `sex`)
|
||||
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
|
||||
PROPERTIES ("replication_allocation" = "tag.location.default: 1", "enable_unique_key_merge_on_write" = "false");"""
|
||||
|
||||
test {
|
||||
sql """insert into ${tableName10} values
|
||||
(10000,"2017-10-01","2017-10-01 08:00:05","北京",20,0,"2017-10-01 06:00:00",20,10,10),
|
||||
(10000,"2017-10-01","2017-10-01 09:00:05","北京",20,0,"2017-10-01 07:00:00",15,2,2); """
|
||||
exception "Partial update is only allowed on unique table with merge-on-write enabled"
|
||||
}
|
||||
|
||||
sql """ DROP TABLE IF EXISTS ${tableName8}; """
|
||||
sql """ DROP TABLE IF EXISTS ${tableName9}; """
|
||||
sql """ DROP TABLE IF EXISTS ${tableName10}; """
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user