diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 5955e97d2c..76a2a2d4a6 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -693,6 +693,9 @@ Status BetaRowsetWriter::_do_create_segment_writer( writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; writer_options.rowset_ctx = &_context; writer_options.is_direct_write = _context.is_direct_write; + if (is_segcompaction) { + writer_options.is_direct_write = false; + } if (is_segcompaction) { writer->reset(new segment_v2::SegmentWriter( diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 1f63937a40..c7e31c2196 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -362,15 +362,9 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* CHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write); // find missing column cids - std::vector missing_cids; - std::vector including_cids; - for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) { - if (_tablet_schema->is_column_missing(i)) { - missing_cids.push_back(i); - } else { - including_cids.push_back(i); - } - } + std::vector missing_cids = _tablet_schema->get_missing_cids(); + std::vector including_cids = _tablet_schema->get_update_cids(); + // create full block and fill with input columns auto full_block = _tablet_schema->create_block(); size_t input_id = 0; @@ -399,7 +393,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* bool has_default = false; std::vector use_default_flag; use_default_flag.reserve(num_rows); - for (size_t pos = 0; pos < num_rows; pos++) { + for (size_t pos = row_pos; pos < num_rows; pos++) { std::string key = _full_encode_keys(key_columns, pos); RETURN_IF_ERROR(_primary_key_index_builder->add_item(key)); _maybe_invalid_row_cache(key); diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 7fc3e63c15..60d117cb29 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -1037,6 +1037,8 @@ void TabletSchema::set_partial_update_info(bool is_partial_update, const std::set& partial_update_input_columns) { _is_partial_update = is_partial_update; _partial_update_input_columns = partial_update_input_columns; + _missing_cids.clear(); + _update_cids.clear(); for (auto i = 0; i < _cols.size(); ++i) { if (_partial_update_input_columns.count(_cols[i].name()) == 0) { _missing_cids.emplace_back(i); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java index 67aded7e08..1d733d1e2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java @@ -130,11 +130,15 @@ public class DeleteStmt extends DdlStmt { + " Please check the following session variables: " + String.join(", ", SessionVariable.DEBUG_VARIABLES)); } + boolean isMow = ((OlapTable) targetTable).getEnableUniqueKeyMergeOnWrite(); for (Column column : targetTable.getColumns()) { Expr expr; + // in mow, we can use partial update so we only need key column and delete sign if (!column.isVisible() && column.getName().equalsIgnoreCase(Column.DELETE_SIGN)) { expr = new BoolLiteral(true); - } else if (column.isKey() || !column.isVisible() || (!column.isAllowNull() && !column.hasDefaultValue())) { + } else if (column.isKey()) { + expr = new SlotRef(targetTableRef.getAliasAsName(), column.getName()); + } else if (!isMow && !column.isVisible() || (!column.isAllowNull() && !column.hasDefaultValue())) { expr = new SlotRef(targetTableRef.getAliasAsName(), column.getName()); } else { continue; @@ -166,13 +170,19 @@ public class DeleteStmt extends DdlStmt { // limit LimitElement.NO_LIMIT ); + boolean isPartialUpdate = false; + if (((OlapTable) targetTable).getEnableUniqueKeyMergeOnWrite() + && cols.size() < targetTable.getColumns().size()) { + isPartialUpdate = true; + } insertStmt = new NativeInsertStmt( new InsertTarget(tableName, null), null, cols, new InsertSource(selectStmt), - null); + null, + isPartialUpdate); } private void analyzeTargetTable(Analyzer analyzer) throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 13a1a2b30f..c3f258246e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -67,6 +67,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -131,6 +132,9 @@ public class NativeInsertStmt extends InsertStmt { private boolean isValuesOrConstantSelect; + private boolean isPartialUpdate = false; + + private HashSet partialUpdateCols = new HashSet(); public NativeInsertStmt(InsertTarget target, String label, List cols, InsertSource source, List hints) { @@ -157,6 +161,13 @@ public class NativeInsertStmt extends InsertStmt { && ((SelectStmt) queryStmt).getTableRefs().isEmpty()); } + public NativeInsertStmt(InsertTarget target, String label, List cols, InsertSource source, + List hints, boolean isPartialUpdate) { + this(target, label, cols, source, hints); + this.isPartialUpdate = isPartialUpdate; + this.partialUpdateCols.addAll(cols); + } + public boolean isValuesOrConstantSelect() { return isValuesOrConstantSelect; } @@ -344,6 +355,9 @@ public class NativeInsertStmt extends InsertStmt { DescriptorTable descTable = analyzer.getDescTbl(); olapTuple = descTable.createTupleDescriptor(); for (Column col : olapTable.getFullSchema()) { + if (isPartialUpdate && !partialUpdateCols.contains(col.getName())) { + continue; + } SlotDescriptor slotDesc = descTable.addSlotDescriptor(olapTuple); slotDesc.setIsMaterialized(true); slotDesc.setType(col.getType()); @@ -377,6 +391,9 @@ public class NativeInsertStmt extends InsertStmt { // check columns of target table for (Column col : baseColumns) { + if (isPartialUpdate && !partialUpdateCols.contains(col.getName())) { + continue; + } if (mentionedCols.contains(col.getName())) { continue; } @@ -694,6 +711,9 @@ public class NativeInsertStmt extends InsertStmt { List> resultExprByName = Lists.newArrayList(); // reorder resultExprs in table column order for (Column col : targetTable.getFullSchema()) { + if (isPartialUpdate && !partialUpdateCols.contains(col.getName())) { + continue; + } if (exprByName.containsKey(col.getName())) { resultExprByName.add(Pair.of(col.getName(), exprByName.get(col.getName()))); } else { @@ -732,6 +752,8 @@ public class NativeInsertStmt extends InsertStmt { if (targetTable instanceof OlapTable) { dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); + OlapTableSink sink = (OlapTableSink) dataSink; + sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols); dataPartition = dataSink.getOutputPartition(); } else if (targetTable instanceof BrokerTable) { BrokerTable table = (BrokerTable) targetTable; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java index 91e1707295..0959970fa7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java @@ -57,7 +57,6 @@ import java.util.TreeSet; * {expr} */ public class UpdateStmt extends DdlStmt { - private TableRef targetTableRef; private TableName tableName; private final List setExprs; @@ -67,6 +66,7 @@ public class UpdateStmt extends DdlStmt { private TableIf targetTable; List selectListItems = Lists.newArrayList(); List cols = Lists.newArrayList(); + private boolean isPartialUpdate = false; public UpdateStmt(TableRef targetTableRef, List setExprs, FromClause fromClause, Expr whereExpr) { this.targetTableRef = targetTableRef; @@ -124,7 +124,8 @@ public class UpdateStmt extends DdlStmt { null, cols, new InsertSource(selectStmt), - null); + null, + isPartialUpdate); } private void analyzeTargetTable(Analyzer analyzer) throws UserException { @@ -186,16 +187,36 @@ public class UpdateStmt extends DdlStmt { } // step3: generate select list and insert column name list in insert stmt + boolean isMow = ((OlapTable) targetTable).getEnableUniqueKeyMergeOnWrite(); + int setExprCnt = 0; + for (Column column : targetTable.getColumns()) { + for (BinaryPredicate setExpr : setExprs) { + Expr lhs = setExpr.getChild(0); + if (((SlotRef) lhs).getColumn().equals(column)) { + setExprCnt++; + } + } + } + // table with sequence col cannot use partial update cause in MOW, we encode pk + // with seq column but we don't know which column is sequence in update + if (isMow && ((OlapTable) targetTable).getSequenceCol() == null + && setExprCnt <= targetTable.getColumns().size() * 3 / 10) { + isPartialUpdate = true; + } for (Column column : targetTable.getColumns()) { Expr expr = new SlotRef(targetTableRef.getAliasAsName(), column.getName()); + boolean existInExpr = false; for (BinaryPredicate setExpr : setExprs) { Expr lhs = setExpr.getChild(0); if (((SlotRef) lhs).getColumn().equals(column)) { expr = setExpr.getChild(1); + existInExpr = true; } } - selectListItems.add(new SelectListItem(expr, null)); - cols.add(column.getName()); + if (column.isKey() || existInExpr || !isPartialUpdate) { + selectListItems.add(new SelectListItem(expr, null)); + cols.add(column.getName()); + } } } diff --git a/regression-test/data/update/test_update_mow.out b/regression-test/data/update/test_update_mow.out new file mode 100644 index 0000000000..615ee7b07d --- /dev/null +++ b/regression-test/data/update/test_update_mow.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_uniq_table -- +1 2 1 1999-01-01 +2 2 1 1999-01-01 + +-- !select_uniq_table -- +1 2 1 1998-01-01 +2 2 1 1998-01-01 + +-- !desc_uniq_table -- +k INT Yes true \N +value1 INT Yes false \N NONE +value2 INT Yes false \N NONE +date_value DATE Yes false \N NONE + +-- !complex_update -- +1 10 1 1000.0 2000-01-01 +2 2 2 2.0 2000-01-02 +3 3 3 3.0 2000-01-03 + +-- !complex_update_by_alias -- +1 10 1 1000.0 2000-01-01 +2 20 2 2000.0 2000-01-02 +3 3 3 3.0 2000-01-03 + diff --git a/regression-test/suites/update/test_update_mow.groovy b/regression-test/suites/update/test_update_mow.groovy new file mode 100644 index 0000000000..b28650fae2 --- /dev/null +++ b/regression-test/suites/update/test_update_mow.groovy @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_update_mow", "p0") { + def tbName1 = "test_update_unique_1" + def tbName2 = "test_update_unique_2" + def tbName3 = "test_update_unique_3" + def tbName4 = "test_update_unique_4" + sql "DROP TABLE IF EXISTS ${tbName1}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbName1} ( + k int, + value1 int, + value2 int, + date_value date + ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH(k) BUCKETS 5 properties("replication_num" = "1"); + """ + sql "insert into ${tbName1} values(1, 1, 1, '2000-01-01');" + sql "insert into ${tbName1} values(2, 1, 1, '2000-01-01');" + sql "UPDATE ${tbName1} SET value1 = 2 WHERE k=1;" + sql "UPDATE ${tbName1} SET value1 = value1+1 WHERE k=2;" + sql "UPDATE ${tbName1} SET date_value = '1999-01-01' WHERE k in (1,2);" + qt_select_uniq_table "select * from ${tbName1} order by k" + sql "UPDATE ${tbName1} SET date_value = '1998-01-01' WHERE k is null or k is not null;" + qt_select_uniq_table "select * from ${tbName1} order by k" + qt_desc_uniq_table "desc ${tbName1}" + sql "DROP TABLE ${tbName1}" + + sql "DROP TABLE IF EXISTS ${tbName1}" + sql "DROP TABLE IF EXISTS ${tbName2}" + sql "DROP TABLE IF EXISTS ${tbName3}" + sql "DROP TABLE IF EXISTS ${tbName4}" + + // test complex update syntax + sql """ + create table ${tbName1} (id int, c1 bigint, c2 string, c3 double, c4 date) unique key (id) distributed by hash(id) properties('replication_num'='1', 'enable_unique_key_merge_on_write' = 'true'); + """ + sql """ + create table ${tbName2} (id int, c1 bigint, c2 string, c3 double, c4 date) unique key (id) distributed by hash(id) properties('replication_num'='1', 'enable_unique_key_merge_on_write' = 'true'); + """ + sql """ + create table ${tbName3} (id int) distributed by hash (id) properties('replication_num'='1'); + """ + sql """ + create table ${tbName4} (id int) distributed by hash (id) properties('replication_num'='1'); + """ + sql """ + insert into ${tbName1} values(1, 1, '1', 1.0, '2000-01-01'),(2, 2, '2', 2.0, '2000-01-02'),(3, 3, '3', 3.0, '2000-01-03'); + """ + sql """ + insert into ${tbName2} values(1, 10, '10', 10.0, '2000-01-10'),(2, 20, '20', 20.0, '2000-01-20'),(3, 30, '30', 30.0, '2000-01-30'),(4, 4, '4', 4.0, '2000-01-04'),(5, 5, '5', 5.0, '2000-01-05'); + """ + sql """ + insert into ${tbName3} values(1), (4), (5); + """ + sql """ + insert into ${tbName4} values(2), (4), (5); + """ + + sql """ + update ${tbName1} set ${tbName1}.c1 = ${tbName2}.c1, ${tbName1}.c3 = ${tbName2}.c3 * 100 from ${tbName2} inner join ${tbName3} on ${tbName2}.id = ${tbName3}.id where ${tbName1}.id = ${tbName2}.id; + """ + + qt_complex_update """ + select * from ${tbName1} order by id; + """ + + sql """ + update ${tbName1} t1a set t1a.c1 = ${tbName2}.c1, t1a.c3 = ${tbName2}.c3 * 100 from ${tbName2} inner join ${tbName4} on ${tbName2}.id = ${tbName4}.id where t1a.id = ${tbName2}.id; + """ + + qt_complex_update_by_alias """ + select * from ${tbName1} order by id; + """ + + sql "DROP TABLE IF EXISTS ${tbName1}" + sql "DROP TABLE IF EXISTS ${tbName2}" + sql "DROP TABLE IF EXISTS ${tbName3}" + sql "DROP TABLE IF EXISTS ${tbName4}" +}