[enhancement](planner) support update from syntax (#17639)

support update from syntax

note: enable_concurrent_update is not supported now

```
UPDATE <target_table>
  SET <col_name> = <value> [ , <col_name> = <value> , ... ]
  [ FROM <additional_tables> ]
  [ WHERE <condition> ]
```

for example:
t1
```
+----+----+----+-----+------------+
| id | c1 | c2 | c3  | c4         |
+----+----+----+-----+------------+
| 3  | 3  | 3  | 3.0 | 2000-01-03 |
| 2  | 2  | 2  | 2.0 | 2000-01-02 |
| 1  | 1  | 1  | 1.0 | 2000-01-01 |
+----+----+----+-----+------------+
```

t2
```
+----+----+----+------+------------+
| id | c1 | c2 | c3   | c4         |
+----+----+----+------+------------+
| 4  | 4  | 4  |  4.0 | 2000-01-04 |
| 2  | 20 | 20 | 20.0 | 2000-01-20 |
| 5  | 5  | 5  |  5.0 | 2000-01-05 |
| 1  | 10 | 10 | 10.0 | 2000-01-10 |
| 3  | 30 | 30 | 30.0 | 2000-01-30 |
+----+----+----+------+------------+
```

t3
```
+----+
| id |
+----+
| 1  |
| 5  |
| 4  |
+----+
```

do update
```sql
 update t1 set t1.c1 = t2.c1, t1.c3 = t2.c3 * 100 from t2 inner join t3 on t2.id = t3.id where t1.id = t2.id;
```

the result
```
+----+----+----+--------+------------+
| id | c1 | c2 | c3     | c4         |
+----+----+----+--------+------------+
| 3  | 3  | 3  |    3.0 | 2000-01-03 |
| 2  | 2  | 2  |    2.0 | 2000-01-02 |
| 1  | 10 | 1  | 1000.0 | 2000-01-01 |
+----+----+----+--------+------------+
```
This commit is contained in:
morrySnow
2023-03-14 19:26:30 +08:00
committed by GitHub
parent f999b823fc
commit 699159698e
17 changed files with 437 additions and 1021 deletions

View File

@ -32,33 +32,63 @@ UPDATE
### Description
This statement is used to update the data (the update statement currently only supports the Unique Key model).
This statement is used to update the data. The UPDATE statement currently only supports the UNIQUE KEY model.
#### Syntax
```sql
UPDATE table_name
UPDATE target_table
SET assignment_list
WHERE expression
WHERE condition
value:
{expr | DEFAULT}
assignment_list:
assignment [, assignment] ...
assignment:
col_name = value
assignment_list:
assignment [, assignment] ...
````
value:
{expr | DEFAULT}
```
Parameters
<version since="dev">
+ table_name: The target table of the data to be updated. Can be of the form 'db_name.table_name'
UPDATE_FROM
</version>
```sql
UPDATE target_table
SET assignment_list
[ FROM additional_tables]
WHERE condition
```
#### Required Parameters
+ target_table: The target table of the data to be updated. Can be of the form 'db_name.table_name'
+ assignment_list: The target column to be updated, in the format 'col_name = value, col_name = value'
+ where expression: the condition that is expected to be updated, an expression that returns true or false can be
+ where condition: the condition that is expected to be updated, an expression that returns true or false can be
Note
#### Optional Parameters
The current UPDATE statement only supports row updates on the Unique model, and there may be data conflicts caused by concurrent updates.
At present, Doris does not deal with such problems, and users need to avoid such problems from the business side.
<version since="dev">
UPDATE_FROM
</version>
+ FROM additional_tables: Specifies one or more tables to use for selecting rows to update or for setting new values. Note that if you want use target table here, you should give it a alias explicitly.
#### Note
<version since="dev">
UPDATE_FROM
</version>
The current UPDATE statement only supports row updates on the Unique model.
### Example
@ -68,13 +98,76 @@ The `test` table is a unique model table, which contains four columns: k1, k2, v
```sql
UPDATE test SET v1 = 1 WHERE k1=1 and k2=2;
````
```
2. Increment the v1 column of the k1=1 column in the 'test' table by 1
```sql
UPDATE test SET v1 = v1+1 WHERE k1=1;
````
```
<version since="dev">
UPDATE_FROM
</version>
3. use the result of `t2` join `t3` to update `t1`
```sql
-- create t1, t2, t3 tables
CREATE TABLE t1
(id INT, c1 BIGINT, c2 STRING, c3 DOUBLE, c4 DATE)
UNIQUE KEY (id)
DISTRIBUTED BY HASH (id)
PROPERTIES('replication_num'='1', "function_column.sequence_col" = "c4");
CREATE TABLE t2
(id INT, c1 BIGINT, c2 STRING, c3 DOUBLE, c4 DATE)
DISTRIBUTED BY HASH (id)
PROPERTIES('replication_num'='1');
CREATE TABLE t3
(id INT)
DISTRIBUTED BY HASH (id)
PROPERTIES('replication_num'='1');
-- insert data
INSERT INTO t1 VALUES
(1, 1, '1', 1.0, '2000-01-01'),
(2, 2, '2', 2.0, '2000-01-02'),
(3, 3, '3', 3.0, '2000-01-03');
INSERT INTO t2 VALUES
(1, 10, '10', 10.0, '2000-01-10'),
(2, 20, '20', 20.0, '2000-01-20'),
(3, 30, '30', 30.0, '2000-01-30'),
(4, 4, '4', 4.0, '2000-01-04'),
(5, 5, '5', 5.0, '2000-01-05');
INSERT INTO t3 VALUES
(1),
(4),
(5);
-- update t1
UPDATE t1
SET t1.c1 = t2.c1, t1.c3 = t2.c3 * 100
FROM t2 INNER JOIN t3 ON t2.id = t3.id
WHERE t1.id = t2.id;
```
the expect result is only update the row where id = 1 in table t1
```
+----+----+----+--------+------------+
| id | c1 | c2 | c3 | c4 |
+----+----+----+--------+------------+
| 1 | 10 | 1 | 1000.0 | 2000-01-01 |
| 2 | 2 | 2 | 2.0 | 2000-01-02 |
| 3 | 3 | 3 | 3.0 | 2000-01-03 |
+----+----+----+--------+------------+
```
### Keywords

View File

@ -32,33 +32,64 @@ UPDATE
### Description
该语句是为进行对数据进行更新的操作,( update 语句目前仅支持 Unique Key 模型
该语句是为进行对数据进行更新的操作,UPDATE 语句目前仅支持 UNIQUE KEY 模型。
#### Syntax
```sql
UPDATE table_name
UPDATE target_table
SET assignment_list
WHERE expression
WHERE condition
value:
{expr | DEFAULT}
assignment_list:
assignment [, assignment] ...
assignment:
col_name = value
assignment_list:
assignment [, assignment] ...
value:
{expr | DEFAULT}
```
Parameters
<version since="dev">
+ table_name: 待更新数据的目标表。可以是 'db_name.table_name' 形式
UPDATE_FROM
</version>
```sql
UPDATE target_table
SET assignment_list
[ FROM additional_tables]
WHERE condition
```
#### Required Parameters
+ target_table: 待更新数据的目标表。可以是 'db_name.table_name' 形式
+ assignment_list: 待更新的目标列,形如 'col_name = value, col_name = value' 格式
+ where expression: 期望更新的条件,一个返回 true 或者 false 的表达式即可
+ WHERE condition: 期望更新的条件,一个返回 true 或者 false 的表达式即可
Note
#### Optional Parameters
当前 UPDATE 语句仅支持在 Unique 模型上的行更新,存在并发更新导致的数据冲突可能。
目前 Doris 并不处理这类问题,需要用户从业务侧规避这类问题。
<version since="dev">
UPDATE_FROM
</version>
+ FROM additional_tables: 指定一个或多个表,用于选中更新的行,或者获取更新的值。注意,如需要在此列表中再次使用目标表,需要为其显式指定别名。
#### Note
<version since="dev">
UPDATE_FROM
</version>
当前 UPDATE 语句仅支持在 Unique 模型上的行更新。
### Example
@ -76,9 +107,69 @@ UPDATE test SET v1 = 1 WHERE k1=1 and k2=2;
UPDATE test SET v1 = v1+1 WHERE k1=1;
```
<version since="dev">
UPDATE_FROM
</version>
3. 使用`t2``t3`表连接的结果,更新`t1`
```sql
-- 创建t1, t2, t3三张表
CREATE TABLE t1
(id INT, c1 BIGINT, c2 STRING, c3 DOUBLE, c4 DATE)
UNIQUE KEY (id)
DISTRIBUTED BY HASH (id)
PROPERTIES('replication_num'='1', "function_column.sequence_col" = "c4");
CREATE TABLE t2
(id INT, c1 BIGINT, c2 STRING, c3 DOUBLE, c4 DATE)
DISTRIBUTED BY HASH (id)
PROPERTIES('replication_num'='1');
CREATE TABLE t3
(id INT)
DISTRIBUTED BY HASH (id)
PROPERTIES('replication_num'='1');
-- 插入数据
INSERT INTO t1 VALUES
(1, 1, '1', 1.0, '2000-01-01', '2000-01-01'),
(2, 2, '2', 2.0, '2000-01-02', '2000-01-02'),
(3, 3, '3', 3.0, '2000-01-03', '2000-01-03');
INSERT INTO t2 VALUES
(1, 10, '10', 10.0, '2000-01-10'),
(2, 20, '20', 20.0, '2000-01-20'),
(3, 30, '30', 30.0, '2000-01-30'),
(4, 4, '4', 4.0, '2000-01-04'),
(5, 5, '5', 5.0, '2000-01-05');
INSERT INTO t3 VALUES
(1),
(4),
(5);
-- 更新 t1
UPDATE t1
SET t1.c1 = t2.c1, t1.c3 = t2.c3 * 100
FROM t2 INNER JOIN t3 ON t2.id = t3.id
WHERE t1.id = t2.id;
```
预期结果为,更新了`t1``id``1`的列
```
+----+----+----+--------+------------+
| id | c1 | c2 | c3 | c4 |
+----+----+----+--------+------------+
| 1 | 10 | 1 | 1000.0 | 2000-01-01 |
| 2 | 2 | 2 | 2.0 | 2000-01-02 |
| 3 | 3 | 3 | 3.0 | 2000-01-03 |
+----+----+----+--------+------------+
```
### Keywords
UPDATE
### Best Practice

View File

@ -685,6 +685,9 @@ nonterminal InsertStmt insert_stmt;
nonterminal InsertTarget insert_target;
nonterminal InsertSource insert_source;
nonterminal UpdateStmt update_stmt;
nonterminal List<BinaryPredicate> set_clause;
nonterminal List<BinaryPredicate> assignment_list;
nonterminal BinaryPredicate assignment;
nonterminal BackupStmt backup_stmt;
nonterminal AbstractBackupTableRefClause opt_backup_table_ref_list;
@ -751,6 +754,7 @@ nonterminal FunctionCallExpr column_slice;
nonterminal ArrayList<TableRef> table_ref_list, base_table_ref_list;
nonterminal ArrayList<LateralViewRef> opt_lateral_view_ref_list, lateral_view_ref_list;
nonterminal FromClause from_clause;
nonterminal FromClause opt_from_clause;
nonterminal TableRef table_ref;
nonterminal TableRef base_table_ref;
nonterminal LateralViewRef lateral_view_ref;
@ -4445,9 +4449,48 @@ insert_source ::=
// update stmt
update_stmt ::=
KW_UPDATE table_name:tbl KW_SET expr_list:setExprs where_clause:whereClause
KW_UPDATE table_name:tbl set_clause:setClause opt_from_clause:fromClause where_clause:whereClause
{:
RESULT = new UpdateStmt(tbl, setExprs, whereClause);
RESULT = new UpdateStmt(tbl, setClause, fromClause, whereClause);
:}
;
opt_from_clause ::=
/* empty */
{:
RESULT = null;
:}
| from_clause:fromClause
{:
RESULT = fromClause;
:}
;
set_clause ::=
KW_SET assignment_list:list
{:
RESULT = list;
:}
;
assignment_list ::=
assignment:a
{:
List<BinaryPredicate> list = new ArrayList<>();
list.add(a);
RESULT = list;
:}
| assignment_list:list COMMA assignment:a
{:
list.add(a);
RESULT = list;
:}
;
assignment ::=
column_ref:columnRef EQUAL expr:value
{:
RESULT = new BinaryPredicate(BinaryPredicate.Operator.EQ, columnRef, value);
:}
;

View File

@ -17,12 +17,12 @@
package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@ -30,88 +30,116 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.qe.SessionVariable;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
/**
* UPDATE is a DML statement that modifies rows in a table.
* UPDATE is a DML statement that modifies rows in a unique key olap table.
* The current update syntax only supports updating the filtered data of a single table.
*
* <p>
* UPDATE table_reference
* SET assignment_list
* [from_clause]
* [WHERE where_condition]
*
* value:
* {expr}
*
* assignment:
* col_name = value
*
* <p>
* assignment_list:
* assignment [, assignment] ...
* <p>
* assignment:
* col_name = value
* <p>
* value:
* {expr}
*/
public class UpdateStmt extends DdlStmt {
private TableName tableName;
private List<Expr> setExprs;
private Expr whereExpr;
// After analyzed
private final TableName tableName;
private final List<BinaryPredicate> setExprs;
private final Expr whereExpr;
private final FromClause fromClause;
private InsertStmt insertStmt;
private Table targetTable;
private TupleDescriptor srcTupleDesc;
List<SelectListItem> selectListItems = Lists.newArrayList();
List<String> cols = Lists.newArrayList();
public UpdateStmt(TableName tableName, List<Expr> setExprs, Expr whereExpr) {
public UpdateStmt(TableName tableName, List<BinaryPredicate> setExprs, FromClause fromClause, Expr whereExpr) {
this.tableName = tableName;
this.setExprs = setExprs;
this.fromClause = fromClause;
this.whereExpr = whereExpr;
}
public TableName getTableName() {
return tableName;
}
public List<Expr> getSetExprs() {
return setExprs;
}
public Expr getWhereExpr() {
return whereExpr;
}
public Table getTargetTable() {
return targetTable;
}
public TupleDescriptor getSrcTupleDesc() {
return srcTupleDesc;
public InsertStmt getInsertStmt() {
return insertStmt;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isInDebugMode()) {
throw new AnalysisException("Update is forbidden since current session is in debug mode."
+ " Please check the following session variables: "
+ String.join(", ", SessionVariable.DEBUG_VARIABLES));
}
analyzeTargetTable(analyzer);
analyzeSetExprs(analyzer);
analyzeWhereExpr(analyzer);
constructInsertStmt();
}
private void constructInsertStmt() {
// not use origin from clause, because we need to mod it, and this action will affect toSql().
FromClause fromUsedInInsert;
TableRef tableRef = new TableRef(tableName, null);
if (fromClause == null) {
fromUsedInInsert = new FromClause(Lists.newArrayList(tableRef));
} else {
fromUsedInInsert = fromClause.clone();
fromUsedInInsert.getTableRefs().add(0, tableRef);
}
SelectStmt selectStmt = new SelectStmt(
// select list
new SelectList(selectListItems, false),
// from clause
fromUsedInInsert,
// where expr
whereExpr,
// group by
null,
// having
null,
// order by
null,
// limit
LimitElement.NO_LIMIT
);
insertStmt = new InsertStmt(
new InsertTarget(tableName, null),
null,
cols,
new InsertSource(selectStmt),
null);
}
private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException {
// step1: analyze table name
// step1: analyze table name and origin table alias
tableName.analyze(analyzer);
// disallow external catalog
Util.prohibitExternalCatalog(tableName.getCtl(), this.getClass().getSimpleName());
// check priv
// check load privilege, select privilege will check when analyze insert stmt
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), tableName.getDb(), tableName.getTbl(), PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "LOAD");
}
// step2: resolve table name with catalog, only unique olap table could be update
// step2: resolve table name with catalog, only unique olap table could be updated
String dbName = tableName.getDb();
String targetTableName = tableName.getTbl();
Preconditions.checkNotNull(dbName);
@ -120,12 +148,12 @@ public class UpdateStmt extends DdlStmt {
targetTable = database.getTableOrAnalysisException(tableName.getTbl());
if (targetTable.getType() != Table.TableType.OLAP
|| ((OlapTable) targetTable).getKeysType() != KeysType.UNIQUE_KEYS) {
throw new AnalysisException("Only unique olap table could be updated.");
throw new AnalysisException("Only unique table could be updated.");
}
// step3: register tuple desc
// register table to ensure we could analyze column name on the left side of set exprs.
targetTable.readLock();
try {
srcTupleDesc = analyzer.registerOlapTable(targetTable, tableName, null);
analyzer.registerOlapTable(targetTable, tableName, null);
} finally {
targetTable.readUnlock();
}
@ -134,14 +162,9 @@ public class UpdateStmt extends DdlStmt {
private void analyzeSetExprs(Analyzer analyzer) throws AnalysisException {
// step1: analyze set exprs
Set<String> columnMappingNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
// the column expr only support binary predicate which's child(0) must be a SloRef.
// the column expr only support binary predicate which child(0) must be a SloRef.
// the duplicate column name of SloRef is forbidden.
for (Expr setExpr : setExprs) {
if (!(setExpr instanceof BinaryPredicate)) {
throw new AnalysisException("Set function expr only support eq binary predicate. "
+ "Expr: " + setExpr.toSql());
}
BinaryPredicate predicate = (BinaryPredicate) setExpr;
for (BinaryPredicate predicate : setExprs) {
if (predicate.getOp() != BinaryPredicate.Operator.EQ) {
throw new AnalysisException("Set function expr only support eq binary predicate. "
+ "The predicate operator error, op: " + predicate.getOp());
@ -149,7 +172,7 @@ public class UpdateStmt extends DdlStmt {
Expr lhs = predicate.getChild(0);
if (!(lhs instanceof SlotRef)) {
throw new AnalysisException("Set function expr only support eq binary predicate "
+ "which's child(0) must be a column name. "
+ "which child(0) must be a column name. "
+ "The child(0) expr error. expr: " + lhs.toSql());
}
String column = ((SlotRef) lhs).getColumnName();
@ -159,8 +182,7 @@ public class UpdateStmt extends DdlStmt {
}
// step2: resolve target columns with catalog,
// only value columns which belong to target table could be updated.
for (Expr setExpr : setExprs) {
Preconditions.checkState(setExpr instanceof BinaryPredicate);
for (BinaryPredicate setExpr : setExprs) {
// check target column
// 1. columns must belong to target table
// 2. only value columns could be updated
@ -170,43 +192,24 @@ public class UpdateStmt extends DdlStmt {
}
lhs.analyze(analyzer);
if (((SlotRef) lhs).getColumn().isKey()) {
throw new AnalysisException("Only value columns of unique table could be updated.");
}
// check set expr of target column
Expr rhs = setExpr.getChild(1);
checkLargeIntOverflow(rhs);
rhs.analyze(analyzer);
if (lhs.getType() != rhs.getType()) {
setExpr.setChild(1, rhs.checkTypeCompatibility(lhs.getType()));
throw new AnalysisException("Only value columns of unique table could be updated");
}
}
}
/*
The overflow detection of LargeInt needs to be verified again here.
The reason is: the first overflow detection(in constructor) cannot filter 2^127.
Therefore, a second verification is required here.
*/
private void checkLargeIntOverflow(Expr expr) throws AnalysisException {
if (expr instanceof LargeIntLiteral) {
expr.analyzeImpl(analyzer);
// step3: generate select list and insert column name list in insert stmt
for (Column column : targetTable.getColumns()) {
Expr expr = new SlotRef(tableName, column.getName());
for (BinaryPredicate setExpr : setExprs) {
Expr lhs = setExpr.getChild(0);
if (((SlotRef) lhs).getColumn().equals(column)) {
expr = setExpr.getChild(1);
}
}
selectListItems.add(new SelectListItem(expr, null));
cols.add(column.getName());
}
}
private void analyzeWhereExpr(Analyzer analyzer) throws AnalysisException {
if (whereExpr == null) {
throw new AnalysisException("Where clause is required");
}
whereExpr.analyze(analyzer);
whereExpr = analyzer.getExprRewriter().rewrite(whereExpr, analyzer, ExprRewriter.ClauseType.WHERE_CLAUSE);
whereExpr.reset();
whereExpr.analyze(analyzer);
if (!whereExpr.getType().equals(Type.BOOLEAN)) {
throw new AnalysisException("Where clause is not a valid statement return bool");
}
analyzer.registerConjunct(whereExpr, srcTupleDesc.getId());
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder("UPDATE ");
@ -215,6 +218,9 @@ public class UpdateStmt extends DdlStmt {
for (Expr setExpr : setExprs) {
sb.append(setExpr.toSql()).append(", ");
}
if (fromClause != null) {
sb.append("\n").append(fromClause.toSql());
}
sb.append("\n");
if (whereExpr != null) {
sb.append(" ").append("WHERE ").append(whereExpr.toSql());

View File

@ -162,7 +162,6 @@ import org.apache.doris.load.routineload.RoutineLoadScheduler;
import org.apache.doris.load.routineload.RoutineLoadTaskScheduler;
import org.apache.doris.load.sync.SyncChecker;
import org.apache.doris.load.sync.SyncJobManager;
import org.apache.doris.load.update.UpdateManager;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.master.MetaHelper;
import org.apache.doris.master.PartitionInMemoryInfoCollector;
@ -318,7 +317,6 @@ public class Env {
private ConsistencyChecker consistencyChecker;
private BackupHandler backupHandler;
private PublishVersionDaemon publishVersionDaemon;
private UpdateManager updateManager;
private DeleteHandler deleteHandler;
private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
@ -554,7 +552,6 @@ public class Env {
this.backupHandler = new BackupHandler(this);
this.metaDir = Config.meta_dir;
this.publishVersionDaemon = new PublishVersionDaemon();
this.updateManager = new UpdateManager();
this.deleteHandler = new DeleteHandler();
this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
this.partitionInMemoryInfoCollector = new PartitionInMemoryInfoCollector();
@ -3478,10 +3475,6 @@ public class Env {
return this.backupHandler;
}
public UpdateManager getUpdateManager() {
return updateManager;
}
public DeleteHandler getDeleteHandler() {
return this.deleteHandler;
}

View File

@ -1,89 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.update;
import org.apache.doris.analysis.UpdateStmt;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class UpdateManager {
private final boolean enableConcurrentUpdate = Config.enable_concurrent_update;
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private Map<Long, List<UpdateStmtExecutor>> tableIdToCurrentUpdate = Maps.newConcurrentMap();
private void writeLock() {
rwLock.writeLock().lock();
}
private void writeUnlock() {
rwLock.writeLock().unlock();
}
public void handleUpdate(UpdateStmt updateStmt) throws UserException {
UpdateStmtExecutor updateStmtExecutor = addUpdateExecutor(updateStmt);
try {
updateStmtExecutor.execute();
} finally {
removeUpdateExecutor(updateStmtExecutor);
}
}
private UpdateStmtExecutor addUpdateExecutor(UpdateStmt updateStmt) throws AnalysisException, DdlException {
writeLock();
try {
List<UpdateStmtExecutor> currentUpdateList
= tableIdToCurrentUpdate.get(updateStmt.getTargetTable().getId());
if (!enableConcurrentUpdate && currentUpdateList != null && currentUpdateList.size() > 0) {
throw new DdlException("There is an update operation in progress for the current table. "
+ "Please try again later, or set enable_concurrent_update in fe.conf to true");
}
UpdateStmtExecutor updateStmtExecutor = UpdateStmtExecutor.fromUpdateStmt(updateStmt);
if (currentUpdateList == null) {
currentUpdateList = Lists.newArrayList();
tableIdToCurrentUpdate.put(updateStmtExecutor.getTargetTableId(), currentUpdateList);
}
currentUpdateList.add(updateStmtExecutor);
return updateStmtExecutor;
} finally {
writeUnlock();
}
}
private void removeUpdateExecutor(UpdateStmtExecutor updateStmtExecutor) {
writeLock();
try {
List<UpdateStmtExecutor> currentUpdateList
= tableIdToCurrentUpdate.get(updateStmtExecutor.getTargetTableId());
if (currentUpdateList == null) {
return;
}
currentUpdateList.remove(updateStmtExecutor);
} finally {
writeUnlock();
}
}
}

View File

@ -1,192 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.update;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.OriginalPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
public class UpdatePlanner extends OriginalPlanner {
private final IdGenerator<PlanNodeId> nodeIdGenerator = PlanNodeId.createGenerator();
private final IdGenerator<PlanFragmentId> fragmentIdGenerator =
PlanFragmentId.createGenerator();
private long targetDBId;
private OlapTable targetTable;
private List<Expr> setExprs;
private TupleDescriptor srcTupleDesc;
private Analyzer analyzer;
private List<ScanNode> scanNodeList = Lists.newArrayList();
public UpdatePlanner(long dbId, OlapTable targetTable, List<Expr> setExprs,
TupleDescriptor srcTupleDesc, Analyzer analyzer) {
super(analyzer);
this.targetDBId = dbId;
this.targetTable = targetTable;
this.setExprs = setExprs;
this.srcTupleDesc = srcTupleDesc;
this.analyzer = analyzer;
}
@Override
public List<ScanNode> getScanNodes() {
return scanNodeList;
}
public void plan(long txnId) throws UserException {
// 1. gen scan node
OlapScanNode olapScanNode = new OlapScanNode(nodeIdGenerator.getNextId(), srcTupleDesc, "OlapScanNode");
/* BEGIN: Temporary code, this part of the code needs to be refactored */
olapScanNode.closePreAggregation("This an update operation");
olapScanNode.useBaseIndexId();
/* END */
olapScanNode.init(analyzer);
olapScanNode.finalize(analyzer);
if (VectorizedUtil.isVectorized()) {
olapScanNode.convertToVectorized();
}
scanNodeList.add(olapScanNode);
// 2. gen olap table sink
OlapTableSink olapTableSink = new OlapTableSink(targetTable, computeTargetTupleDesc(), null, false);
olapTableSink.init(analyzer.getContext().queryId(), txnId, targetDBId,
analyzer.getContext().getSessionVariable().queryTimeoutS,
analyzer.getContext().getSessionVariable().sendBatchParallelism, false);
olapTableSink.complete();
// 3. gen plan fragment
PlanFragment planFragment = new PlanFragment(fragmentIdGenerator.getNextId(), olapScanNode,
DataPartition.RANDOM);
planFragment.setSink(olapTableSink);
planFragment.setOutputExprs(computeOutputExprs());
planFragment.finalize(null);
fragments.add(planFragment);
}
private TupleDescriptor computeTargetTupleDesc() {
DescriptorTable descTable = analyzer.getDescTbl();
TupleDescriptor targetTupleDesc = descTable.createTupleDescriptor();
for (Column col : targetTable.getFullSchema()) {
SlotDescriptor slotDesc = descTable.addSlotDescriptor(targetTupleDesc);
slotDesc.setIsMaterialized(true);
slotDesc.setType(col.getType());
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());
}
targetTupleDesc.computeStatAndMemLayout();
return targetTupleDesc;
}
/**
* There are three Rules of output exprs:
* RuleA: columns that need to be updated,
* use the right child of a set expr
* base column: (k1, v1)
* update stmt: set v1=1
* output expr: k1, 1(use 1 as output expr)
* RuleB: columns that do not need to be updated,
* just add the original value of column -> slot ref
* base column: (k1, v1)
* update stmt: set v1 = 1
* output expr: k1(use k1 slot ref as output expr), 1
* RuleC: the output columns is being added by the schema change job,
* need to add default value expr in output expr
* base column: (k1, v1)
* schema change job: add v2 column
* full column: (k1, v1, v2)
* output expr: k1, v1, default_value(v2)
*/
private List<Expr> computeOutputExprs() throws AnalysisException {
Map<String, Expr> columnNameToSetExpr = Maps.newHashMap();
for (Expr setExpr : setExprs) {
Preconditions.checkState(setExpr instanceof BinaryPredicate);
Preconditions.checkState(setExpr.getChild(0) instanceof SlotRef);
SlotRef slotRef = (SlotRef) setExpr.getChild(0);
// pay attention to case ignore of column name
columnNameToSetExpr.put(slotRef.getColumnName().toLowerCase(), setExpr.getChild(1));
}
Map<String, SlotDescriptor> columnNameToSrcSlotDesc = Maps.newHashMap();
for (SlotDescriptor srcSlotDesc : srcTupleDesc.getSlots()) {
// pay attention to case ignore of column name
columnNameToSrcSlotDesc.put(srcSlotDesc.getColumn().getName().toLowerCase(), srcSlotDesc);
}
// compute output expr
List<Expr> outputExprs = Lists.newArrayList();
for (int i = 0; i < targetTable.getFullSchema().size(); i++) {
Column column = targetTable.getFullSchema().get(i);
// pay attention to case ignore of column name
String originColumnName = (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)
? column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()) : column.getName())
.toLowerCase();
Expr setExpr = columnNameToSetExpr.get(originColumnName);
SlotDescriptor srcSlotDesc = columnNameToSrcSlotDesc.get(originColumnName);
if (setExpr != null) {
// RuleA
outputExprs.add(setExpr);
} else if (srcSlotDesc != null) {
// RuleB
SlotRef slotRef = new SlotRef(srcSlotDesc);
outputExprs.add(slotRef);
} else {
// RuleC
Expr defaultExpr;
if (column.getDefaultValue() != null) {
defaultExpr = column.getDefaultValueExpr();
} else {
if (column.isAllowNull()) {
defaultExpr = NullLiteral.create(column.getType());
} else {
throw new AnalysisException("column has no source field, column=" + column.getName());
}
}
defaultExpr.analyze(analyzer);
outputExprs.add(defaultExpr);
}
}
return outputExprs;
}
}

View File

@ -1,238 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.update;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.UpdateStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
public class UpdateStmtExecutor {
private static final Logger LOG = LogManager.getLogger(UpdateStmtExecutor.class);
private OlapTable targetTable;
private Expr whereExpr;
private List<Expr> setExprs;
private long dbId;
private TUniqueId queryId;
private int timeoutSecond;
private Analyzer analyzer;
private UpdatePlanner updatePlanner;
private String label;
private long txnId;
private Coordinator coordinator;
private long effectRows;
public long getTargetTableId() {
return targetTable.getId();
}
public void execute() throws UserException {
// 0. empty set
// A where clause with a constant equal to false will not execute the update directly
// Example: update xxx set v1=0 where 1=2
if (analyzer.hasEmptyResultSet()) {
QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
analyzer.getContext().getState().setOk();
return;
}
// 1. begin txn
beginTxn();
// 2. plan
targetTable.readLock();
try {
updatePlanner.plan(txnId);
} catch (Throwable e) {
LOG.warn("failed to plan update stmt, query id:{}", DebugUtil.printId(queryId), e);
Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, txnId, e.getMessage());
QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
throw new DdlException("failed to plan update stmt, query id: "
+ DebugUtil.printId(queryId) + ", err: " + e.getMessage());
} finally {
targetTable.readUnlock();
}
// 3. execute plan
try {
executePlan();
} catch (DdlException e) {
LOG.warn("failed to execute update stmt, query id:{}", DebugUtil.printId(queryId), e);
Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, txnId, e.getMessage());
throw e;
} catch (Throwable e) {
LOG.warn("failed to execute update stmt, query id:{}", DebugUtil.printId(queryId), e);
Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, txnId, e.getMessage());
throw new DdlException("failed to execute update stmt, query id: "
+ DebugUtil.printId(queryId) + ", err: " + e.getMessage());
} finally {
QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
}
// 4. commit and publish
commitAndPublishTxn();
}
private void beginTxn() throws LabelAlreadyUsedException, AnalysisException, BeginTransactionException,
DuplicatedRequestException, QuotaExceedException, MetaNotFoundException {
LOG.info("begin transaction for update stmt, query id:{}", DebugUtil.printId(queryId));
label = "update_" + DebugUtil.printId(queryId);
txnId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, Lists.newArrayList(targetTable.getId()), label,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
LoadJobSourceType.INSERT_STREAMING, timeoutSecond);
}
// TODO(ML): Abstract the logic of executing the coordinater and retrying.
// It makes stmt such as insert, load, update and export can be reused
private void executePlan() throws Exception {
LOG.info("begin execute update stmt, query id:{}", DebugUtil.printId(queryId));
coordinator = new Coordinator(Env.getCurrentEnv().getNextId(), queryId, analyzer.getDescTbl(),
updatePlanner.getFragments(), updatePlanner.getScanNodes(), TimeUtils.DEFAULT_TIME_ZONE, false);
coordinator.setQueryType(TQueryType.LOAD);
coordinator.setExecVecEngine(VectorizedUtil.isVectorized());
coordinator.setExecPipEngine(VectorizedUtil.isPipeline());
QeProcessorImpl.INSTANCE.registerQuery(queryId, coordinator);
analyzer.getContext().getExecutor().setCoord(coordinator);
// execute
coordinator.setTimeout(timeoutSecond);
coordinator.exec();
if (coordinator.join(timeoutSecond)) {
if (!coordinator.isDone()) {
coordinator.cancel();
ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
}
if (!coordinator.getExecStatus().ok()) {
String errMsg = "update failed: " + coordinator.getExecStatus().getErrorMsg();
LOG.warn(errMsg);
throw new DdlException(errMsg);
}
LOG.info("finish to execute update stmt, query id:{}", DebugUtil.printId(queryId));
} else {
String errMsg = "coordinator could not finished before update timeout: "
+ coordinator.getExecStatus().getErrorMsg();
LOG.warn(errMsg);
throw new DdlException(errMsg);
}
// counter
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
effectRows = Long.valueOf(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
if (Long.valueOf(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)) != 0) {
throw new DdlException("update failed, some rows did not take effect");
}
}
}
private void commitAndPublishTxn() throws UserException {
GlobalTransactionMgr globalTransactionMgr = Env.getCurrentGlobalTransactionMgr();
TransactionStatus txnStatus;
boolean isPublished;
try {
LOG.info("commit and publish transaction for update stmt, query id: {}", DebugUtil.printId(queryId));
isPublished = globalTransactionMgr.commitAndPublishTransaction(
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId),
Lists.newArrayList(targetTable), txnId, TabletCommitInfo.fromThrift(coordinator.getCommitInfos()),
analyzer.getContext().getSessionVariable().getInsertVisibleTimeoutMs());
} catch (Throwable e) {
// situation2.1: publish error, throw exception
String errMsg = "failed to commit and publish transaction for update stmt, query id:"
+ DebugUtil.printId(queryId);
LOG.warn(errMsg, e);
globalTransactionMgr.abortTransaction(dbId, txnId, e.getMessage());
throw new DdlException(errMsg, e);
}
String errMsg = null;
if (isPublished) {
// situation2.2: publish successful
txnStatus = TransactionStatus.VISIBLE;
} else {
// situation2.3: be published later
txnStatus = TransactionStatus.COMMITTED;
errMsg = "transaction will be published later, data will be visible later";
LOG.warn("transaction will be published later, query id: {}", DebugUtil.printId(queryId));
}
// set context
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(label).append("', 'status':'").append(txnStatus.name()).append("'");
sb.append(", 'txnId':'").append(txnId).append("'");
sb.append(", 'queryId':'").append(DebugUtil.printId(queryId)).append("'");
if (errMsg != null) {
sb.append(", 'err':'").append(errMsg).append("'");
}
sb.append("}");
analyzer.getContext().getState().setOk(effectRows, 0, sb.toString());
}
public static UpdateStmtExecutor fromUpdateStmt(UpdateStmt updateStmt) throws AnalysisException {
UpdateStmtExecutor updateStmtExecutor = new UpdateStmtExecutor();
updateStmtExecutor.targetTable = (OlapTable) updateStmt.getTargetTable();
updateStmtExecutor.whereExpr = updateStmt.getWhereExpr();
updateStmtExecutor.setExprs = updateStmt.getSetExprs();
Database database = Env.getCurrentInternalCatalog()
.getDbOrAnalysisException(updateStmt.getTableName().getDb());
updateStmtExecutor.dbId = database.getId();
updateStmtExecutor.analyzer = updateStmt.getAnalyzer();
updateStmtExecutor.queryId = updateStmtExecutor.analyzer.getContext().queryId();
updateStmtExecutor.timeoutSecond = updateStmtExecutor.analyzer.getContext()
.getExecTimeout();
updateStmtExecutor.updatePlanner = new UpdatePlanner(updateStmtExecutor.dbId, updateStmtExecutor.targetTable,
updateStmt.getSetExprs(), updateStmt.getSrcTupleDesc(),
updateStmt.getAnalyzer());
return updateStmtExecutor;
}
}

View File

@ -110,7 +110,6 @@ import org.apache.doris.analysis.StopSyncJobStmt;
import org.apache.doris.analysis.SyncStmt;
import org.apache.doris.analysis.TruncateTableStmt;
import org.apache.doris.analysis.UninstallPluginStmt;
import org.apache.doris.analysis.UpdateStmt;
import org.apache.doris.catalog.EncryptKeyHelper;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
@ -182,8 +181,6 @@ public class DdlExecutor {
env.getRoutineLoadManager().stopRoutineLoadJob((StopRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof AlterRoutineLoadStmt) {
env.getRoutineLoadManager().alterRoutineLoadJob((AlterRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof UpdateStmt) {
env.getUpdateManager().handleUpdate((UpdateStmt) ddlStmt);
} else if (ddlStmt instanceof DeleteStmt) {
env.getDeleteHandler().process((DeleteStmt) ddlStmt);
} else if (ddlStmt instanceof CreateUserStmt) {

View File

@ -30,6 +30,7 @@ import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
@ -290,6 +291,14 @@ public class SessionVariable implements Serializable, Writable {
public static final String DRY_RUN_QUERY = "dry_run_query";
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
SKIP_DELETE_SIGN,
SKIP_STORAGE_ENGINE_MERGE,
SHOW_HIDDEN_COLUMNS
);
// session origin value
public Map<Field, String> sessionOriginValue = new HashMap<Field, String>();
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@ -858,6 +867,10 @@ public class SessionVariable implements Serializable, Writable {
private Set<Class<? extends Event>> parsedNereidsEventMode = EventSwitchParser.parse(Lists.newArrayList("all"));
public boolean isInDebugMode() {
return showHiddenColumns || skipDeleteBitmap || skipDeletePredicate || skipDeleteSign || skipStorageEngineMerge;
}
public void setEnableNereidsTrace(boolean enableNereidsTrace) {
this.enableNereidsTrace = enableNereidsTrace;
}

View File

@ -59,6 +59,7 @@ import org.apache.doris.analysis.TransactionRollbackStmt;
import org.apache.doris.analysis.TransactionStmt;
import org.apache.doris.analysis.UnlockTablesStmt;
import org.apache.doris.analysis.UnsupportedStmt;
import org.apache.doris.analysis.UpdateStmt;
import org.apache.doris.analysis.UseStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@ -605,6 +606,8 @@ public class StmtExecutor implements ProfileWriter {
}
} else if (parsedStmt instanceof LoadStmt) {
handleLoadStmt();
} else if (parsedStmt instanceof UpdateStmt) {
handleUpdateStmt();
} else if (parsedStmt instanceof DdlStmt) {
handleDdlStmt();
} else if (parsedStmt instanceof ShowStmt) {
@ -1925,6 +1928,19 @@ public class StmtExecutor implements ProfileWriter {
}
}
private void handleUpdateStmt() {
try {
UpdateStmt updateStmt = (UpdateStmt) parsedStmt;
parsedStmt = updateStmt.getInsertStmt();
execute();
if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
LOG.warn("update data error, stmt={}", parsedStmt.toSql());
}
} catch (Exception e) {
LOG.warn("update data error, stmt={}", parsedStmt.toSql(), e);
}
}
private void handleDdlStmt() {
try {
DdlExecutor.execute(context.getEnv(), (DdlStmt) parsedStmt);

View File

@ -1,66 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.update;
import org.apache.doris.analysis.UpdateStmt;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.jmockit.Deencapsulation;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mockit.Expectations;
import mockit.Injectable;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
public class UpdateManagerTest {
@Test
public void testDisableConcurrentUpdate(@Injectable UpdateStmt updateStmt,
@Injectable UpdateStmtExecutor updateStmtExecutor) {
Config.enable_concurrent_update = false;
Map<Long, List<UpdateStmtExecutor>> tableIdToCurrentUpdate = Maps.newConcurrentMap();
List<UpdateStmtExecutor> currentUpdate = Lists.newArrayList();
currentUpdate.add(updateStmtExecutor);
tableIdToCurrentUpdate.put(new Long(1), currentUpdate);
UpdateManager updateManager = new UpdateManager();
Assert.assertFalse(Deencapsulation.getField(updateManager, "enableConcurrentUpdate"));
Deencapsulation.setField(updateManager, "tableIdToCurrentUpdate", tableIdToCurrentUpdate);
new Expectations() {
{
updateStmt.getTargetTable().getId();
result = 1;
}
};
try {
Deencapsulation.invoke(updateManager, "addUpdateExecutor", updateStmt);
Assert.fail();
} catch (Exception e) {
if (e instanceof DdlException) {
System.out.println(e.getMessage());
} else {
throw e;
}
}
}
}

View File

@ -1,101 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.update;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.UpdateStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cluster.Cluster;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.GlobalTransactionMgr;
import com.google.common.collect.Lists;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class UpdateStmtExecutorTest {
@Test
public void testCommitAndPublishTxn(@Injectable Analyzer analyzer,
@Injectable Coordinator coordinator,
@Mocked GlobalTransactionMgr globalTransactionMgr) {
Cluster testCluster = new Cluster("test_cluster", 0);
Database testDb = new Database(1, "test_db");
testDb.setClusterName("test_cluster");
Env.getCurrentEnv().addCluster(testCluster);
Env.getCurrentEnv().unprotectCreateDb(testDb);
UpdateStmtExecutor updateStmtExecutor = new UpdateStmtExecutor();
Deencapsulation.setField(updateStmtExecutor, "dbId", 1);
Deencapsulation.setField(updateStmtExecutor, "effectRows", 0);
Deencapsulation.setField(updateStmtExecutor, "analyzer", analyzer);
Deencapsulation.setField(updateStmtExecutor, "coordinator", coordinator);
Deencapsulation.invoke(updateStmtExecutor, "commitAndPublishTxn");
}
@Test
public void testFromUpdateStmt(@Injectable OlapTable olapTable,
@Mocked Env env,
@Injectable Database db,
@Injectable Analyzer analyzer) throws AnalysisException {
TableName tableName = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, "db", "test");
List<Expr> setExprs = Lists.newArrayList();
SlotRef slotRef = new SlotRef(tableName, "v1");
IntLiteral intLiteral = new IntLiteral(1);
BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ,
slotRef, intLiteral);
setExprs.add(binaryPredicate);
SlotRef keySlotRef = new SlotRef(tableName, "k1");
Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, keySlotRef, intLiteral);
UpdateStmt updateStmt = new UpdateStmt(tableName, setExprs, whereExpr);
Deencapsulation.setField(updateStmt, "targetTable", olapTable);
Deencapsulation.setField(updateStmt, "analyzer", analyzer);
new Expectations() {
{
db.getId();
result = 1;
analyzer.getContext().queryId();
result = new TUniqueId(1, 2);
analyzer.getContext().getExecTimeout();
result = 1000;
olapTable.getId();
result = 2;
}
};
UpdateStmtExecutor executor = UpdateStmtExecutor.fromUpdateStmt(updateStmt);
Assert.assertEquals(new Long(2), new Long(executor.getTargetTableId()));
Assert.assertEquals(whereExpr, Deencapsulation.getField(executor, "whereExpr"));
Assert.assertEquals(setExprs, Deencapsulation.getField(executor, "setExprs"));
Assert.assertEquals(new Long(1), Deencapsulation.getField(executor, "dbId"));
}
}

View File

@ -494,16 +494,8 @@ public class PlannerTest extends TestWithFeService {
Assertions.assertEquals(MysqlStateType.ERR, state.getStateType());
Assertions.assertTrue(state.getErrorMessage()
.contains("you need (at least one of) the LOAD privilege(s) for this operation"));
// set to admin user
connectContext.setCurrentUserIdentity(UserIdentity.ADMIN);
stmtExecutor = new StmtExecutor(connectContext, qSQL);
stmtExecutor.execute();
state = connectContext.getState();
// still error because we can not do real update in unit test.
// just check if it pass the priv check.
Assertions.assertEquals(MysqlStateType.ERR, state.getStateType());
Assertions.assertTrue(state.getErrorMessage().contains("failed to execute update stmt"));
}
@Test

View File

@ -1,185 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.update.UpdatePlanner;
import com.google.common.collect.Lists;
import mockit.Expectations;
import mockit.Injectable;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class UpdatePlannerTest {
private final IdGenerator<TupleId> tupleIdGenerator = TupleId.createGenerator();
private final IdGenerator<SlotId> slotIdGenerator = SlotId.createGenerator();
/**
* Full columns: k1, k2 v1, shadow_column
* Shadow column: SHADOW_NAME_PRFIX + v1
* Set expr: v1=1
* Expect output exprs: k1, k2, 1, 1
*/
@Test
public void testComputeOutputExprsWithShadowColumnAndSetExpr(@Injectable OlapTable targetTable,
@Injectable Column k1,
@Injectable Column k2,
@Injectable Column v1,
@Injectable Column shadowV1,
@Injectable Analyzer analyzer) {
List<Expr> setExprs = Lists.newArrayList();
TableName tableName = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, null, "test");
SlotRef slotRef = new SlotRef(tableName, "V1");
IntLiteral intLiteral = new IntLiteral(1);
BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ,
slotRef, intLiteral);
setExprs.add(binaryPredicate);
TupleDescriptor srcTupleDesc = new TupleDescriptor(tupleIdGenerator.getNextId());
SlotDescriptor k1SlotDesc = new SlotDescriptor(slotIdGenerator.getNextId(), srcTupleDesc);
k1SlotDesc.setColumn(k1);
srcTupleDesc.addSlot(k1SlotDesc);
SlotDescriptor k2SlotDesc = new SlotDescriptor(slotIdGenerator.getNextId(), srcTupleDesc);
k2SlotDesc.setColumn(k2);
srcTupleDesc.addSlot(k2SlotDesc);
SlotDescriptor v1SlotDesc = new SlotDescriptor(slotIdGenerator.getNextId(), srcTupleDesc);
v1SlotDesc.setColumn(v1);
srcTupleDesc.addSlot(v1SlotDesc);
List<Column> fullSchema = Lists.newArrayList();
fullSchema.add(k1);
fullSchema.add(k2);
fullSchema.add(v1);
fullSchema.add(shadowV1);
new Expectations() {
{
targetTable.getFullSchema();
result = fullSchema;
k1.getName();
result = "k1";
k2.getName();
result = "k2";
v1.getName();
result = "v1";
shadowV1.getName();
result = SchemaChangeHandler.SHADOW_NAME_PREFIX + "v1";
}
};
UpdatePlanner updatePlanner = new UpdatePlanner(1, targetTable, setExprs, srcTupleDesc, analyzer);
List<Expr> outputExpr = Deencapsulation.invoke(updatePlanner, "computeOutputExprs");
Assert.assertEquals(4, outputExpr.size());
Expr outputExpr1 = outputExpr.get(0);
Assert.assertTrue(outputExpr1 instanceof SlotRef);
Assert.assertEquals(((SlotRef) outputExpr1).getDesc().getColumn().getName(), "k1");
Expr outputExpr2 = outputExpr.get(1);
Assert.assertTrue(outputExpr2 instanceof SlotRef);
Assert.assertEquals(((SlotRef) outputExpr2).getDesc().getColumn().getName(), "k2");
Expr outputExpr3 = outputExpr.get(2);
Assert.assertTrue(outputExpr3 instanceof IntLiteral);
Assert.assertEquals(((IntLiteral) outputExpr3).getValue(), 1);
Expr outputExpr4 = outputExpr.get(3);
Assert.assertTrue(outputExpr4 instanceof IntLiteral);
Assert.assertEquals(((IntLiteral) outputExpr4).getValue(), 1);
}
@Test
public void testNewColumnBySchemaChange(@Injectable OlapTable targetTable,
@Injectable Column k1,
@Injectable Column k2,
@Injectable Column v1,
@Injectable Column newV2,
@Injectable Analyzer analyzer) throws AnalysisException {
List<Expr> setExprs = Lists.newArrayList();
TableName tableName = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, null, "test");
SlotRef slotRef = new SlotRef(tableName, "V1");
IntLiteral intLiteral = new IntLiteral(1);
BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ,
slotRef, intLiteral);
setExprs.add(binaryPredicate);
TupleDescriptor srcTupleDesc = new TupleDescriptor(tupleIdGenerator.getNextId());
SlotDescriptor k1SlotDesc = new SlotDescriptor(slotIdGenerator.getNextId(), srcTupleDesc);
k1SlotDesc.setColumn(k1);
srcTupleDesc.addSlot(k1SlotDesc);
SlotDescriptor k2SlotDesc = new SlotDescriptor(slotIdGenerator.getNextId(), srcTupleDesc);
k2SlotDesc.setColumn(k2);
srcTupleDesc.addSlot(k2SlotDesc);
SlotDescriptor v1SlotDesc = new SlotDescriptor(slotIdGenerator.getNextId(), srcTupleDesc);
v1SlotDesc.setColumn(v1);
srcTupleDesc.addSlot(v1SlotDesc);
List<Column> fullSchema = Lists.newArrayList();
fullSchema.add(k1);
fullSchema.add(k2);
fullSchema.add(v1);
fullSchema.add(newV2);
new Expectations() {
{
targetTable.getFullSchema();
result = fullSchema;
k1.getName();
result = "k1";
k2.getName();
result = "k2";
v1.getName();
result = "v1";
newV2.getName();
result = "v2";
newV2.getDefaultValue();
result = "1";
newV2.getDefaultValueExpr();
result = new IntLiteral(1);
}
};
UpdatePlanner updatePlanner = new UpdatePlanner(1, targetTable, setExprs, srcTupleDesc, analyzer);
List<Expr> outputExpr = Deencapsulation.invoke(updatePlanner, "computeOutputExprs");
Assert.assertEquals(4, outputExpr.size());
Expr outputExpr1 = outputExpr.get(0);
Assert.assertTrue(outputExpr1 instanceof SlotRef);
Assert.assertEquals(((SlotRef) outputExpr1).getDesc().getColumn().getName(), "k1");
Expr outputExpr2 = outputExpr.get(1);
Assert.assertTrue(outputExpr2 instanceof SlotRef);
Assert.assertEquals(((SlotRef) outputExpr2).getDesc().getColumn().getName(), "k2");
Expr outputExpr3 = outputExpr.get(2);
Assert.assertTrue(outputExpr3 instanceof IntLiteral);
Assert.assertEquals(((IntLiteral) outputExpr3).getValue(), 1);
Expr outputExpr4 = outputExpr.get(3);
Assert.assertTrue(outputExpr4 instanceof IntLiteral);
Assert.assertEquals(((IntLiteral) outputExpr4).getValue(), 1);
}
}

View File

@ -13,3 +13,8 @@ value1 INT Yes false \N REPLACE
value2 INT Yes false \N REPLACE
date_value DATE Yes false \N REPLACE
-- !complex_update --
1 10 1 1000.0 2000-01-01
2 2 2 2.0 2000-01-02
3 3 3 3.0 2000-01-03

View File

@ -16,10 +16,12 @@
// under the License.
suite("test_update_unique", "p0") {
def tbName = "test_update_unique"
sql "DROP TABLE IF EXISTS ${tbName}"
def tbName1 = "test_update_unique_1"
def tbName2 = "test_update_unique_2"
def tbName3 = "test_update_unique_3"
sql "DROP TABLE IF EXISTS ${tbName1}"
sql """
CREATE TABLE IF NOT EXISTS ${tbName} (
CREATE TABLE IF NOT EXISTS ${tbName1} (
k int,
value1 int,
value2 int,
@ -28,14 +30,50 @@ suite("test_update_unique", "p0") {
UNIQUE KEY(k)
DISTRIBUTED BY HASH(k) BUCKETS 5 properties("replication_num" = "1");
"""
sql "insert into ${tbName} values(1, 1, 1, '2000-01-01');"
sql "insert into ${tbName} values(2, 1, 1, '2000-01-01');"
sql "UPDATE ${tbName} SET value1 = 2 WHERE k=1;"
sql "UPDATE ${tbName} SET value1 = value1+1 WHERE k=2;"
sql "UPDATE ${tbName} SET date_value = '1999-01-01' WHERE k in (1,2);"
qt_select_uniq_table "select * from ${tbName} order by k"
sql "UPDATE ${tbName} SET date_value = '1998-01-01' WHERE k is null or k is not null;"
qt_select_uniq_table "select * from ${tbName} order by k"
qt_desc_uniq_table "desc ${tbName}"
sql "DROP TABLE ${tbName}"
sql "insert into ${tbName1} values(1, 1, 1, '2000-01-01');"
sql "insert into ${tbName1} values(2, 1, 1, '2000-01-01');"
sql "UPDATE ${tbName1} SET value1 = 2 WHERE k=1;"
sql "UPDATE ${tbName1} SET value1 = value1+1 WHERE k=2;"
sql "UPDATE ${tbName1} SET date_value = '1999-01-01' WHERE k in (1,2);"
qt_select_uniq_table "select * from ${tbName1} order by k"
sql "UPDATE ${tbName1} SET date_value = '1998-01-01' WHERE k is null or k is not null;"
qt_select_uniq_table "select * from ${tbName1} order by k"
qt_desc_uniq_table "desc ${tbName1}"
sql "DROP TABLE ${tbName1}"
sql "DROP TABLE IF EXISTS ${tbName1}"
sql "DROP TABLE IF EXISTS ${tbName2}"
sql "DROP TABLE IF EXISTS ${tbName3}"
// test complex update syntax
sql """
create table ${tbName1} (id int, c1 bigint, c2 string, c3 double, c4 date) unique key (id) distributed by hash(id) properties('replication_num'='1');
"""
sql """
create table ${tbName2} (id int, c1 bigint, c2 string, c3 double, c4 date) unique key (id) distributed by hash(id) properties('replication_num'='1');
"""
sql """
create table ${tbName3} (id int) distributed by hash (id) properties('replication_num'='1');
"""
sql """
insert into ${tbName1} values(1, 1, '1', 1.0, '2000-01-01'),(2, 2, '2', 2.0, '2000-01-02'),(3, 3, '3', 3.0, '2000-01-03');
"""
sql """
insert into ${tbName2} values(1, 10, '10', 10.0, '2000-01-10'),(2, 20, '20', 20.0, '2000-01-20'),(3, 30, '30', 30.0, '2000-01-30'),(4, 4, '4', 4.0, '2000-01-04'),(5, 5, '5', 5.0, '2000-01-05');
"""
sql """
insert into ${tbName3} values(1), (4), (5);
"""
sql """
update ${tbName1} set ${tbName1}.c1 = ${tbName2}.c1, ${tbName1}.c3 = ${tbName2}.c3 * 100 from ${tbName2} inner join ${tbName3} on ${tbName2}.id = ${tbName3}.id where ${tbName1}.id = ${tbName2}.id;
"""
qt_complex_update """
select * from ${tbName1} order by id;
"""
sql "DROP TABLE IF EXISTS ${tbName1}"
sql "DROP TABLE IF EXISTS ${tbName2}"
sql "DROP TABLE IF EXISTS ${tbName3}"
}