From f11d32021392efa0b1f464b75718b151e665c565 Mon Sep 17 00:00:00 2001
From: Stalary <452024236@qq.com>
Date: Wed, 11 May 2022 22:11:10 +0800
Subject: [PATCH] [feature] support row policy filter (#9206)
---
docs/.vuepress/sidebar/en.js | 1 +
docs/.vuepress/sidebar/zh-CN.js | 1 +
.../Create/CREATE-POLICY.md | 84 +++++
.../Create/CREATE-POLICY.md | 84 +++++
.../Drop/DROP-POLICY.md | 64 ++++
.../Show-Statements/SHOW-POLICY.md | 79 ++++
.../Show-Statements/SHOW-SQL-BLOCK-RULE.md | 2 +-
.../apache/doris/common/FeMetaVersion.java | 4 +-
fe/fe-core/src/main/cup/sql_parser.cup | 31 +-
.../doris/analysis/CreatePolicyStmt.java | 99 +++++
.../apache/doris/analysis/DropPolicyStmt.java | 80 ++++
.../apache/doris/analysis/ShowPolicyStmt.java | 89 +++++
.../apache/doris/analysis/StmtRewriter.java | 212 +++++++----
.../apache/doris/analysis/UserIdentity.java | 36 +-
.../org/apache/doris/catalog/Catalog.java | 24 ++
.../org/apache/doris/common/MetaReader.java | 1 +
.../org/apache/doris/common/MetaWriter.java | 1 +
.../apache/doris/journal/JournalEntity.java | 12 +
.../org/apache/doris/persist/EditLog.java | 20 +
.../apache/doris/persist/OperationType.java | 4 +
.../apache/doris/policy/DropPolicyLog.java | 82 ++++
.../org/apache/doris/policy/FilterType.java | 45 +++
.../java/org/apache/doris/policy/Policy.java | 154 ++++++++
.../org/apache/doris/policy/PolicyMgr.java | 354 ++++++++++++++++++
.../apache/doris/policy/PolicyTypeEnum.java | 26 ++
.../java/org/apache/doris/qe/DdlExecutor.java | 8 +-
.../org/apache/doris/qe/ShowExecutor.java | 8 +
.../org/apache/doris/qe/StmtExecutor.java | 20 +
fe/fe-core/src/main/jflex/sql_scanner.flex | 5 +-
.../doris/load/loadv2/SparkLoadJobTest.java | 45 ++-
.../org/apache/doris/policy/PolicyTest.java | 202 ++++++++++
.../doris/utframe/TestWithFeService.java | 95 +++--
32 files changed, 1832 insertions(+), 140 deletions(-)
create mode 100644 docs/en/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md
create mode 100644 docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md
create mode 100644 docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Drop/DROP-POLICY.md
create mode 100644 docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-POLICY.md
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/policy/DropPolicyLog.java
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/policy/FilterType.java
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
create mode 100644 fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java
create mode 100644 fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java
diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js
index b5030bab59..ee1c9f532c 100644
--- a/docs/.vuepress/sidebar/en.js
+++ b/docs/.vuepress/sidebar/en.js
@@ -633,6 +633,7 @@ module.exports = [
"CREATE-FUNCTION",
"CREATE-INDEX",
"CREATE-MATERIALIZED-VIEW",
+ "CREATE-POLICY",
"CREATE-RESOURCE",
"CREATE-SQL-BLOCK-RULE",
"CREATE-TABLE-LIKE",
diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js
index 173d203ed3..899b5d94f8 100644
--- a/docs/.vuepress/sidebar/zh-CN.js
+++ b/docs/.vuepress/sidebar/zh-CN.js
@@ -633,6 +633,7 @@ module.exports = [
"CREATE-FUNCTION",
"CREATE-INDEX",
"CREATE-MATERIALIZED-VIEW",
+ "CREATE-POLICY",
"CREATE-RESOURCE",
"CREATE-SQL-BLOCK-RULE",
"CREATE-TABLE-LIKE",
diff --git a/docs/en/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md b/docs/en/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md
new file mode 100644
index 0000000000..7202f2a30b
--- /dev/null
+++ b/docs/en/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md
@@ -0,0 +1,84 @@
+---
+{
+ "title": "CREATE-POLICY",
+ "language": "en"
+}
+---
+
+
+
+## CREATE-POLICY
+
+### Name
+
+CREATE POLICY
+
+### Description
+
+Create security policies and explain to view the rewritten SQL.
+
+#### 行安全策略
+grammar:
+
+```sql
+CREATE ROW POLICY test_row_policy_1 ON test.table1
+AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2));
+```
+
+illustrate:
+
+- filterType:It is usual to constrict a set of policies through AND. PERMISSIVE to constrict a set of policies through OR
+- Configure multiple policies. First, merge the RESTRICTIVE policy with the PERMISSIVE policy
+- It is connected with AND between RESTRICTIVE AND PERMISSIVE
+- It cannot be created for users root and admin
+
+### Example
+
+1. Create a set of row security policies
+
+ ```sql
+ CREATE ROW POLICY test_row_policy_1 ON test.table1
+ AS RESTRICTIVE TO test USING (c1 = 'a');
+ ```
+ ```sql
+ CREATE ROW POLICY test_row_policy_2 ON test.table1
+ AS RESTRICTIVE TO test USING (c2 = 'b');
+ ```
+ ```sql
+ CREATE ROW POLICY test_row_policy_3 ON test.table1
+ AS PERMISSIVE TO test USING (c3 = 'c');
+ ```
+ ```sql
+ CREATE ROW POLICY test_row_policy_3 ON test.table1
+ AS PERMISSIVE TO test USING (c4 = 'd');
+ ```
+
+ When we execute the query on Table1, the rewritten SQL is
+
+ ```sql
+ select * from (select * from table1 where c1 = 'a' and c2 = 'b' or c3 = 'c' or c4 = 'd')
+ ```
+
+### Keywords
+
+ CREATE, POLICY
+
+### Best Practice
+
diff --git a/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md b/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md
new file mode 100644
index 0000000000..f17db5b5c5
--- /dev/null
+++ b/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md
@@ -0,0 +1,84 @@
+---
+{
+ "title": "CREATE-POLICY",
+ "language": "zh-CN"
+}
+---
+
+
+
+## CREATE-POLICY
+
+### Name
+
+CREATE POLICY
+
+### Description
+
+创建安全策略,explain 可以查看改写后的 SQL。
+
+#### 行安全策略
+语法:
+
+```sql
+CREATE ROW POLICY test_row_policy_1 ON test.table1
+AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2));
+```
+
+参数说明:
+
+- filterType:RESTRICTIVE 将一组策略通过 AND 连接, PERMISSIVE 将一组策略通过 OR 连接
+- 配置多个策略首先合并 RESTRICTIVE 的策略,再添加 PERMISSIVE 的策略
+- RESTRICTIVE 和 PERMISSIVE 之间通过 AND 连接的
+- 不允许对 root 和 admin 用户创建
+
+### Example
+
+1. 创建一组行安全策略
+
+ ```sql
+ CREATE ROW POLICY test_row_policy_1 ON test.table1
+ AS RESTRICTIVE TO test USING (c1 = 'a');
+ ```
+ ```sql
+ CREATE ROW POLICY test_row_policy_2 ON test.table1
+ AS RESTRICTIVE TO test USING (c2 = 'b');
+ ```
+ ```sql
+ CREATE ROW POLICY test_row_policy_3 ON test.table1
+ AS PERMISSIVE TO test USING (c3 = 'c');
+ ```
+ ```sql
+ CREATE ROW POLICY test_row_policy_3 ON test.table1
+ AS PERMISSIVE TO test USING (c4 = 'd');
+ ```
+
+ 当我们执行对 table1 的查询时被改写后的 sql 为
+
+ ```sql
+ select * from (select * from table1 where c1 = 'a' and c2 = 'b' or c3 = 'c' or c4 = 'd')
+ ```
+
+### Keywords
+
+ CREATE, POLICY
+
+### Best Practice
+
diff --git a/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Drop/DROP-POLICY.md b/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Drop/DROP-POLICY.md
new file mode 100644
index 0000000000..2e8d2e96a8
--- /dev/null
+++ b/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Drop/DROP-POLICY.md
@@ -0,0 +1,64 @@
+---
+{
+ "title": "DROP-POLICY",
+ "language": "zh-CN"
+}
+---
+
+
+
+## DROP-POLICY
+
+### Name
+
+DROP POLICY
+
+### Description
+
+删除安全策略
+
+#### 行安全策略
+
+语法:
+
+```sql
+DROP ROW POLICY test_row_policy_1 on table1 [FOR user];
+```
+
+### Example
+
+1. 删除 table1 的 test_row_policy_1
+
+ ```sql
+ DROP ROW POLICY test_row_policy_1 on table1
+ ```
+
+2. 删除 table1 作用于 test 的 test_row_policy_1 行安全策略
+
+ ```sql
+ DROP ROW POLICY test_row_policy_1 on table1 for test
+ ```
+
+### Keywords
+
+ DROP, POLICY
+
+### Best Practice
+
diff --git a/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-POLICY.md b/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-POLICY.md
new file mode 100644
index 0000000000..2d7e2604d7
--- /dev/null
+++ b/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-POLICY.md
@@ -0,0 +1,79 @@
+---
+{
+ "title": "SHOW-ROW-POLICY",
+ "language": "zh-CN"
+}
+---
+
+
+
+## SHOW-POLICY
+
+### Name
+
+SHOW ROW POLICY
+
+### Description
+
+查看当前 DB 下的行安全策略
+
+语法:
+
+```sql
+SHOW ROW POLICY [FOR user]
+```
+
+### Example
+
+1. 查看所有安全策略。
+
+ ```sql
+ mysql> SHOW ROW POLICY;
+ +-------------------+----------------------+-----------+------+-------------+-------------------+------+-------------------------------------------------------------------------------------------------------------------------------------------+
+ | PolicyName | DbName | TableName | Type | FilterType | WherePredicate | User | OriginStmt |
+ +-------------------+----------------------+-----------+------+-------------+-------------------+------+-------------------------------------------------------------------------------------------------------------------------------------------+
+ | test_row_policy_1 | default_cluster:test | table1 | ROW | RESTRICTIVE | `id` IN (1, 2) | root | /* ApplicationName=DataGrip 2021.3.4 */ CREATE ROW POLICY test_row_policy_1 ON test.table1 AS RESTRICTIVE TO root USING (id in (1, 2));
+ |
+ | test_row_policy_2 | default_cluster:test | table1 | ROW | RESTRICTIVE | `col1` = 'col1_1' | root | /* ApplicationName=DataGrip 2021.3.4 */ CREATE ROW POLICY test_row_policy_2 ON test.table1 AS RESTRICTIVE TO root USING (col1='col1_1');
+ |
+ +-------------------+----------------------+-----------+------+-------------+-------------------+------+-------------------------------------------------------------------------------------------------------------------------------------------+
+ 2 rows in set (0.00 sec)
+ ```
+
+2. 指定用户名查询
+
+ ```sql
+ mysql> SHOW ROW POLICY FOR test;
+ +-------------------+----------------------+-----------+------+------------+-------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------+
+ | PolicyName | DbName | TableName | Type | FilterType | WherePredicate | User | OriginStmt |
+ +-------------------+----------------------+-----------+------+------------+-------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------+
+ | test_row_policy_3 | default_cluster:test | table1 | ROW | PERMISSIVE | `col1` = 'col1_2' | default_cluster:test | /* ApplicationName=DataGrip 2021.3.4 */ CREATE ROW POLICY test_row_policy_3 ON test.table1 AS PERMISSIVE TO test USING (col1='col1_2');
+ |
+ +-------------------+----------------------+-----------+------+------------+-------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------+
+ 1 row in set (0.01 sec)
+ ```
+
+
+### Keywords
+
+ SHOW, POLICY
+
+### Best Practice
+
diff --git a/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-SQL-BLOCK-RULE.md b/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-SQL-BLOCK-RULE.md
index 1c43551281..e08a869217 100644
--- a/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-SQL-BLOCK-RULE.md
+++ b/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-SQL-BLOCK-RULE.md
@@ -55,7 +55,7 @@ SHOW SQL_BLOCK_RULE [FOR RULE_NAME];
2 rows in set (0.01 sec)
```
-2. 制定规则名查询
+2. 指定规则名查询
```sql
mysql> SHOW SQL_BLOCK_RULE FOR test_rule2;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 840365f92d..a9c1d2b2c8 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -36,8 +36,10 @@ public final class FeMetaVersion {
public static final int VERSION_107 = 107;
// add storage_cold_medium and remote_storage_resource_name in DataProperty
public static final int VERSION_108 = 108;
+ // add row policy
+ public static final int VERSION_109 = 109;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
- public static final int VERSION_CURRENT = VERSION_108;
+ public static final int VERSION_CURRENT = VERSION_109;
// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index f5291bb38e..53ccf909be 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -48,6 +48,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Version;
import org.apache.doris.mysql.MysqlPassword;
import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.policy.PolicyTypeEnum;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -260,7 +261,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A
KW_NAME, KW_NAMES, KW_NEGATIVE, KW_NO, KW_NOT, KW_NULL, KW_NULLS,
KW_OBSERVER, KW_OFFSET, KW_ON, KW_ONLY, KW_OPEN, KW_OR, KW_ORDER, KW_OUTER, KW_OUTFILE, KW_OVER,
KW_PARAMETER, KW_PARTITION, KW_PARTITIONS, KW_PASSWORD, KW_LDAP_ADMIN_PASSWORD, KW_PATH, KW_PAUSE, KW_PIPE, KW_PRECEDING,
- KW_PLUGIN, KW_PLUGINS,
+ KW_PLUGIN, KW_PLUGINS, KW_POLICY,
KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROFILE, KW_PROPERTIES, KW_PROPERTY,
KW_QUERY, KW_QUOTA,
KW_RANDOM, KW_RANGE, KW_READ, KW_REBALANCE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RELEASE, KW_RENAME,
@@ -269,13 +270,13 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A
KW_S3, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, KW_SELECT, KW_SEMI, KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SETS, KW_SHOW, KW_SIGNED,
KW_SKEW,
KW_SMALLINT, KW_SNAPSHOT, KW_SONAME, KW_SPLIT, KW_START, KW_STATUS, KW_STATS, KW_STOP, KW_STORAGE, KW_STREAM, KW_STRING, KW_STRUCT,
- KW_SUM, KW_SUPERUSER, KW_SYNC, KW_SYSTEM,
+ KW_SUM, KW_SUPERUSER, KW_SYNC, KW_SYSTEM, KW_SQL_BLOCK_RULE,
KW_TABLE, KW_TABLES, KW_TABLET, KW_TABLETS, KW_TASK, KW_TEMPORARY, KW_TERMINATED, KW_TEXT, KW_THAN, KW_TIME, KW_THEN, KW_TIMESTAMP, KW_TINYINT,KW_TRASH,
KW_TO, KW_TRANSACTION, KW_TRIGGERS, KW_TRIM, KW_TRUE, KW_TRUNCATE, KW_TYPE, KW_TYPES,
KW_UNCOMMITTED, KW_UNBOUNDED, KW_UNION, KW_UNIQUE, KW_UNLOCK, KW_UNSIGNED, KW_USE, KW_USER, KW_USING, KW_UNINSTALL,
KW_VALUE, KW_VALUES, KW_VARCHAR, KW_VARIABLES, KW_VERBOSE, KW_VIEW,
KW_WARNINGS, KW_WEEK, KW_WHEN, KW_WHITELIST, KW_WHERE, KW_WITH, KW_WORK, KW_WRITE,
- KW_YEAR, KW_SQL_BLOCK_RULE;
+ KW_YEAR;
terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT;
terminal BITAND, BITOR, BITXOR, BITNOT;
@@ -1353,6 +1354,12 @@ create_stmt ::=
{:
RESULT = new CreateSqlBlockRuleStmt(ruleName, properties);
:}
+ /* row policy */
+ | KW_CREATE KW_ROW KW_POLICY opt_if_not_exists:ifNotExists ident:policyName KW_ON table_name:tbl KW_AS ident:filterType KW_TO user_identity:user
+ KW_USING LPAREN expr:wherePredicate RPAREN
+ {:
+ RESULT = new CreatePolicyStmt(PolicyTypeEnum.ROW, ifNotExists, policyName, tbl, filterType, user, wherePredicate);
+ :}
;
channel_desc_list ::=
@@ -2058,6 +2065,14 @@ drop_stmt ::=
{:
RESULT = new DropSqlBlockRuleStmt(ruleNames);
:}
+ | KW_DROP KW_ROW KW_POLICY opt_if_exists:ifExists ident:policyName KW_ON table_name:tbl
+ {:
+ RESULT = new DropPolicyStmt(PolicyTypeEnum.ROW, ifExists, policyName, tbl, null);
+ :}
+ | KW_DROP KW_ROW KW_POLICY opt_if_exists:ifExists ident:policyName KW_ON table_name:tbl KW_FOR user_identity:user
+ {:
+ RESULT = new DropPolicyStmt(PolicyTypeEnum.ROW, ifExists, policyName, tbl, user);
+ :}
;
// Recover statement
@@ -2546,6 +2561,14 @@ show_stmt ::=
{:
RESULT = new ShowSqlBlockRuleStmt(null);
:}
+ | KW_SHOW KW_ROW KW_POLICY KW_FOR user_identity:user
+ {:
+ RESULT = new ShowPolicyStmt(PolicyTypeEnum.ROW, user);
+ :}
+ | KW_SHOW KW_ROW KW_POLICY
+ {:
+ RESULT = new ShowPolicyStmt(PolicyTypeEnum.ROW, null);
+ :}
;
show_param ::=
@@ -5785,6 +5808,8 @@ keyword ::=
{: RESULT = id; :}
| KW_VARCHAR:id
{: RESULT = id; :}
+ | KW_POLICY:id
+ {: RESULT = id; :}
;
// Identifier that contain keyword
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java
new file mode 100644
index 0000000000..b8771575da
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.policy.FilterType;
+import org.apache.doris.policy.PolicyTypeEnum;
+import org.apache.doris.qe.ConnectContext;
+
+import lombok.Getter;
+
+/**
+ * Create policy statement.
+ * syntax:
+ * CREATE ROW POLICY [IF NOT EXISTS] test_row_policy ON test_table AS {PERMISSIVE|RESTRICTIVE} TO user USING (a = ’xxx‘)
+ */
+public class CreatePolicyStmt extends DdlStmt {
+
+ @Getter
+ private final PolicyTypeEnum type;
+
+ @Getter
+ private final boolean ifNotExists;
+
+ @Getter
+ private final String policyName;
+
+ @Getter
+ private final TableName tableName;
+
+ @Getter
+ private final FilterType filterType;
+
+ @Getter
+ private final UserIdentity user;
+
+ @Getter
+ private Expr wherePredicate;
+
+ /**
+ * Use for cup.
+ **/
+ public CreatePolicyStmt(PolicyTypeEnum type, boolean ifNotExists, String policyName, TableName tableName, String filterType,
+ UserIdentity user, Expr wherePredicate) {
+ this.type = type;
+ this.ifNotExists = ifNotExists;
+ this.policyName = policyName;
+ this.tableName = tableName;
+ this.filterType = FilterType.of(filterType);
+ this.user = user;
+ this.wherePredicate = wherePredicate;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws UserException {
+ super.analyze(analyzer);
+ tableName.analyze(analyzer);
+ user.analyze(analyzer.getClusterName());
+ if (user.isRootUser() || user.isAdminUser()) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CreatePolicyStmt",
+ user.getQualifiedUser(), user.getHost(), tableName.getTbl());
+ }
+ // check auth
+ if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
+ }
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE ").append(type).append(" POLICY ");
+ if (ifNotExists) {
+ sb.append("IF NOT EXISTS");
+ }
+ sb.append(policyName).append(" ON ").append(tableName.toSql()).append(" AS ").append(filterType)
+ .append(" TO ").append(user.getQualifiedUser()).append(" USING ").append(wherePredicate.toSql());
+ return sb.toString();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java
new file mode 100644
index 0000000000..87fb616c0a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.policy.PolicyTypeEnum;
+import org.apache.doris.qe.ConnectContext;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+/**
+ * Drop policy statement.
+ * syntax:
+ * DROP [ROW] POLICY [IF EXISTS] test_row_policy ON test_table [FOR user]
+ **/
+@AllArgsConstructor
+public class DropPolicyStmt extends DdlStmt {
+
+ @Getter
+ private final PolicyTypeEnum type;
+
+ @Getter
+ private final boolean ifExists;
+
+ @Getter
+ private final String policyName;
+
+ @Getter
+ private final TableName tableName;
+
+ @Getter
+ private final UserIdentity user;
+
+ @Override
+ public void analyze(Analyzer analyzer) throws UserException {
+ super.analyze(analyzer);
+ tableName.analyze(analyzer);
+ if (user != null) {
+ user.analyze(analyzer.getClusterName());
+ }
+ // check auth
+ if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
+ }
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("DROP ").append(type).append(" POLICY ");
+ if (ifExists) {
+ sb.append("IF EXISTS ");
+ }
+ sb.append(policyName).append(" ON ").append(tableName.toSql());
+ if (user != null) {
+ sb.append(" FOR ").append(user.getQualifiedUser());
+ }
+ return sb.toString();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java
new file mode 100644
index 0000000000..f450952ba1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.policy.PolicyTypeEnum;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSetMetaData;
+
+import lombok.Getter;
+
+/**
+ * Show policy statement
+ * syntax:
+ * SHOW ROW POLICY [FOR user]
+ **/
+public class ShowPolicyStmt extends ShowStmt {
+
+ @Getter
+ private final PolicyTypeEnum type;
+
+ @Getter
+ private final UserIdentity user;
+
+ public ShowPolicyStmt(PolicyTypeEnum type, UserIdentity user) {
+ this.type = type;
+ this.user = user;
+ }
+
+ private static final ShowResultSetMetaData ROW_META_DATA =
+ ShowResultSetMetaData.builder()
+ .addColumn(new Column("PolicyName", ScalarType.createVarchar(100)))
+ .addColumn(new Column("DbName", ScalarType.createVarchar(100)))
+ .addColumn(new Column("TableName", ScalarType.createVarchar(100)))
+ .addColumn(new Column("Type", ScalarType.createVarchar(20)))
+ .addColumn(new Column("FilterType", ScalarType.createVarchar(20)))
+ .addColumn(new Column("WherePredicate", ScalarType.createVarchar(65535)))
+ .addColumn(new Column("User", ScalarType.createVarchar(20)))
+ .addColumn(new Column("OriginStmt", ScalarType.createVarchar(65535)))
+ .build();
+
+ @Override
+ public void analyze(Analyzer analyzer) throws UserException {
+ super.analyze(analyzer);
+ if (user != null) {
+ user.analyze(analyzer.getClusterName());
+ }
+ // check auth
+ if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
+ }
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SHOW ").append(type).append(" POLICY");
+ if (user != null) {
+ sb.append(" FOR ").append(user);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public ShowResultSetMetaData getMetaData() {
+ return ROW_META_DATA;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
index ef5c7f91d0..a2b0122f50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
@@ -20,16 +20,20 @@
package org.apache.doris.analysis;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.TableAliasGenerator;
import org.apache.doris.common.UserException;
+import org.apache.doris.policy.Policy;
+import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +58,8 @@ public class StmtRewriter {
Preconditions.checkNotNull(analyzedStmt.analyzer);
return rewriteQueryStatement(analyzedStmt, analyzer);
} else if (parsedStmt instanceof InsertStmt) {
- final InsertStmt insertStmt = (InsertStmt)parsedStmt;
- final QueryStmt analyzedStmt = (QueryStmt)insertStmt.getQueryStmt();
+ final InsertStmt insertStmt = (InsertStmt) parsedStmt;
+ final QueryStmt analyzedStmt = (QueryStmt) insertStmt.getQueryStmt();
Preconditions.checkNotNull(analyzedStmt.analyzer);
QueryStmt rewrittenQueryStmt = rewriteQueryStatement(analyzedStmt, analyzer);
insertStmt.setQueryStmt(rewrittenQueryStmt);
@@ -66,10 +70,10 @@ public class StmtRewriter {
return parsedStmt;
}
- /**
- * Calls the appropriate equal method based on the specific type of query stmt. See
- * rewriteSelectStatement() and rewriteUnionStatement() documentation.
- */
+ /**
+ * Calls the appropriate equal method based on the specific type of query stmt. See
+ * rewriteSelectStatement() and rewriteUnionStatement() documentation.
+ */
public static QueryStmt rewriteQueryStatement(QueryStmt stmt, Analyzer analyzer)
throws AnalysisException {
Preconditions.checkNotNull(stmt);
@@ -88,9 +92,11 @@ public class StmtRewriter {
throws AnalysisException {
SelectStmt result = stmt;
// Rewrite all the subqueries in the FROM clause.
- for (TableRef tblRef: result.fromClause_) {
- if (!(tblRef instanceof InlineViewRef)) continue;
- InlineViewRef inlineViewRef = (InlineViewRef)tblRef;
+ for (TableRef tblRef : result.fromClause_) {
+ if (!(tblRef instanceof InlineViewRef)) {
+ continue;
+ }
+ InlineViewRef inlineViewRef = (InlineViewRef) tblRef;
QueryStmt rewrittenQueryStmt = rewriteQueryStatement(inlineViewRef.getViewStmt(),
inlineViewRef.getAnalyzer());
inlineViewRef.setViewStmt(rewrittenQueryStmt);
@@ -127,37 +133,39 @@ public class StmtRewriter {
* For example:
* select cs_item_sk, sum(cs_sales_price) from catalog_sales a group by cs_item_sk
* having sum(cs_sales_price) >
- * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk);
+ * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk);
+ *
*
* Step1: equal having subquery to where subquery
* Outer query is changed to inline view in rewritten query
* Inline view of outer query:
- * from (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
+ * from (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
* Rewritten subquery of expr:
- * where a.sum_cs_sales_price >
- * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk)
+ * where a.sum_cs_sales_price >
+ * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk)
* Rewritten query:
- * select cs_item_sk, a.sum_cs_sales_price from
- * (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
- * where a.sum_cs_sales_price >
- * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk)
+ * select cs_item_sk, a.sum_cs_sales_price from
+ * (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
+ * where a.sum_cs_sales_price >
+ * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk)
*
* Step2: equal where subquery
* Inline view of subquery:
- * from (select b.cs_item_sk, min(cs_sales_price) from catalog_sales b group by cs_item_sk) c
+ * from (select b.cs_item_sk, min(cs_sales_price) from catalog_sales b group by cs_item_sk) c
* Rewritten correlated predicate:
- * where c.cs_item_sk = a.cs_item_sk and a.sum_cs_sales_price > c.min(cs_sales_price)
+ * where c.cs_item_sk = a.cs_item_sk and a.sum_cs_sales_price > c.min(cs_sales_price)
* The final stmt:
* select a.cs_item_sk, a.sum_cs_sales_price from
- * (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
- * join
- * (select b.cs_item_sk, min(b.cs_sales_price) min_cs_sales_price from catalog_sales b group by b.cs_item_sk) c
+ * (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
+ * join
+ * (select b.cs_item_sk, min(b.cs_sales_price) min_cs_sales_price from catalog_sales b group by b.cs_item_sk) c
* where c.cs_item_sk = a.cs_item_sk and a.sum_cs_sales_price > c.min_cs_sales_price;
*
* @param stmt
* @param analyzer
*/
- private static SelectStmt rewriteHavingClauseSubqueries(SelectStmt stmt, Analyzer analyzer) throws AnalysisException {
+ private static SelectStmt rewriteHavingClauseSubqueries(SelectStmt stmt, Analyzer analyzer)
+ throws AnalysisException {
// prepare parameters
SelectList selectList = stmt.getSelectList();
List columnLabels = stmt.getColLabels();
@@ -256,7 +264,7 @@ public class StmtRewriter {
SelectListItem newItem = new SelectListItem(selectList.getItems().get(i).getExpr().reset().substitute(smap),
columnLabels.get(i));
newSelectItems.add(newItem);
- LOG.debug("New select item is changed to "+ newItem.toSql());
+ LOG.debug("New select item is changed to " + newItem.toSql());
}
SelectList newSelectList = new SelectList(newSelectItems, selectList.isDistinct());
@@ -288,7 +296,7 @@ public class StmtRewriter {
* @return select a, sum(v1), sum(v2)
*/
private static SelectList addMissingAggregationColumns(SelectList selectList,
- List aggregateExprs) {
+ List aggregateExprs) {
SelectList result = selectList.clone();
for (FunctionCallExpr functionCallExpr : aggregateExprs) {
boolean columnExists = false;
@@ -312,10 +320,10 @@ public class StmtRewriter {
*/
private static void rewriteUnionStatement(SetOperationStmt stmt, Analyzer analyzer)
throws AnalysisException {
- for (SetOperationStmt.SetOperand operand: stmt.getOperands()) {
+ for (SetOperationStmt.SetOperand operand : stmt.getOperands()) {
Preconditions.checkState(operand.getQueryStmt() instanceof SelectStmt);
QueryStmt rewrittenQueryStmt = StmtRewriter.rewriteSelectStatement(
- (SelectStmt)operand.getQueryStmt(), operand.getAnalyzer());
+ (SelectStmt) operand.getQueryStmt(), operand.getAnalyzer());
operand.setQueryStmt(rewrittenQueryStmt);
}
}
@@ -338,6 +346,7 @@ public class StmtRewriter {
}
return false;
}
+
/**
* Rewrite all subqueries of a stmt's WHERE clause. Initially, all the
* conjuncts containing subqueries are extracted from the WHERE clause and are
@@ -345,52 +354,51 @@ public class StmtRewriter {
* merged into its parent select block by converting it into a join.
* Conjuncts with subqueries that themselves contain conjuncts with subqueries are
* recursively rewritten in a bottom up fashion.
- *
+ *
* The following example illustrates the bottom up rewriting of nested queries.
* Suppose we have the following three level nested query Q0:
- *
+ *
* SELECT *
* FROM T1 : Q0
* WHERE T1.a IN (SELECT a
- * FROM T2 WHERE T2.b IN (SELECT b
- * FROM T3))
+ * FROM T2 WHERE T2.b IN (SELECT b
+ * FROM T3))
* AND T1.c < 10;
- *
+ *
* This query will be rewritten as follows. Initially, the IN predicate
* T1.a IN (SELECT a FROM T2 WHERE T2.b IN (SELECT b FROM T3)) is extracted
* from the top level block (Q0) since it contains a subquery and is
* replaced by a true BoolLiteral, resulting in the following query Q1:
- *
+ *
* SELECT * FROM T1 WHERE TRUE : Q1
- *
+ *
* Since the stmt in the extracted predicate contains a conjunct with a subquery,
* it is also rewritten. As before, rewriting stmt SELECT a FROM T2
* WHERE T2.b IN (SELECT b FROM T3) works by first extracting the conjunct that
* contains the subquery (T2.b IN (SELECT b FROM T3)) and substituting it with
* a true BoolLiteral, producing the following stmt Q2:
- *
+ *
* SELECT a FROM T2 WHERE TRUE : Q2
- *
+ *
* The predicate T2.b IN (SELECT b FROM T3) is then merged with Q2,
* producing the following unnested query Q3:
- *
+ *
* SELECT a FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1 ON T2.b = $a$1.b : Q3
- *
+ *
* The extracted IN predicate becomes:
- *
+ *
* T1.a IN (SELECT a FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1 ON T2.b = $a$1.b)
- *
+ *
* Finally, the rewritten IN predicate is merged with query block Q1,
* producing the following unnested query (WHERE clauses that contain only
* conjunctions of true BoolLiterals are eliminated):
- *
+ *
* SELECT *
* FROM T1 LEFT SEMI JOIN (SELECT a
- * FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1
- * ON T2.b = $a$1.b) $a$1
+ * FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1
+ * ON T2.b = $a$1.b) $a$1
* ON $a$1.a = T1.a
* WHERE T1.c < 10;
- *
*/
private static void rewriteWhereClauseSubqueries(
SelectStmt stmt, Analyzer analyzer)
@@ -494,7 +502,7 @@ public class StmtRewriter {
* subquery stmt. The modified analyzed expr is returned.
*/
private static Expr rewriteExpr(Expr expr, Analyzer analyzer)
- throws AnalysisException {
+ throws AnalysisException {
// Extract the subquery and equal it.
Subquery subquery = expr.getSubquery();
Preconditions.checkNotNull(subquery);
@@ -506,7 +514,7 @@ public class StmtRewriter {
rewrittenStmt.reset();
Subquery newSubquery = new Subquery(rewrittenStmt);
newSubquery.analyze(analyzer);
-
+
ExprSubstitutionMap smap = new ExprSubstitutionMap();
smap.put(subquery, newSubquery);
return expr.substitute(smap, analyzer, false);
@@ -539,7 +547,7 @@ public class StmtRewriter {
Preconditions.checkNotNull(expr);
Preconditions.checkNotNull(analyzer);
Preconditions.checkState(expr.getSubquery().getAnalyzer() != null,
- "subquery must be analyze address=" + System.identityHashCode(expr.getSubquery()));
+ "subquery must be analyze address=" + System.identityHashCode(expr.getSubquery()));
boolean updateSelectList = false;
SelectStmt subqueryStmt = (SelectStmt) expr.getSubquery().getStatement();
@@ -554,7 +562,7 @@ public class StmtRewriter {
}
// (select k1 $a from t2) $b
InlineViewRef inlineView = new InlineViewRef(
- stmt.getTableAliasGenerator().getNextAlias(), subqueryStmt, colLabels);
+ stmt.getTableAliasGenerator().getNextAlias(), subqueryStmt, colLabels);
// Extract all correlated predicates from the subquery.
List onClauseConjuncts = extractCorrelatedPredicates(subqueryStmt);
@@ -578,7 +586,7 @@ public class StmtRewriter {
for (Expr conjunct : onClauseConjuncts) {
canRewriteScalarFunction(expr, conjunct);
updateInlineView(inlineView, conjunct, stmt.getTableRefIds(),
- lhsExprs, rhsExprs, updateGroupBy);
+ lhsExprs, rhsExprs, updateGroupBy);
}
/**
@@ -612,10 +620,10 @@ public class StmtRewriter {
// Create a join conjunct from the expr that contains a subquery.
Expr joinConjunct = createJoinConjunct(expr, inlineView, analyzer,
- !onClauseConjuncts.isEmpty());
+ !onClauseConjuncts.isEmpty());
if (joinConjunct != null) {
SelectListItem firstItem =
- ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
+ ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
if (!onClauseConjuncts.isEmpty()
&& firstItem.getExpr().contains(Expr.NON_NULL_EMPTY_AGG)) {
// Correlated subqueries with an aggregate function that returns non-null on
@@ -626,7 +634,7 @@ public class StmtRewriter {
// stmt's WHERE clause because it needs to be applied to the result of the
// LEFT OUTER JOIN (both matched and unmatched tuples).
stmt.whereClause =
- CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause);
+ CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause);
joinConjunct = null;
joinOp = JoinOperator.LEFT_OUTER_JOIN;
updateSelectList = true;
@@ -639,7 +647,7 @@ public class StmtRewriter {
// Create the ON clause from the extracted correlated predicates.
Expr onClausePredicate =
- CompoundPredicate.createConjunctivePredicate(onClauseConjuncts);
+ CompoundPredicate.createConjunctivePredicate(onClauseConjuncts);
if (onClausePredicate == null) {
Preconditions.checkState(expr instanceof ExistsPredicate);
if (((ExistsPredicate) expr).isNotExists()) {
@@ -679,9 +687,13 @@ public class StmtRewriter {
// Check if we have a valid ON clause for an equi-join.
boolean hasEqJoinPred = false;
for (Expr conjunct : onClausePredicate.getConjuncts()) {
- if (!(conjunct instanceof BinaryPredicate)) continue;
+ if (!(conjunct instanceof BinaryPredicate)) {
+ continue;
+ }
BinaryPredicate.Operator operator = ((BinaryPredicate) conjunct).getOp();
- if (!operator.isEquivalence()) continue;
+ if (!operator.isEquivalence()) {
+ continue;
+ }
List lhsTupleIds = Lists.newArrayList();
conjunct.getChild(0).getIds(lhsTupleIds, null);
if (lhsTupleIds.isEmpty()) {
@@ -723,7 +735,7 @@ public class StmtRewriter {
// We can equal the aggregate subquery using a cross join. All conjuncts
// that were extracted from the subquery are added to stmt's WHERE clause.
stmt.whereClause =
- CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause);
+ CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause);
inlineView.setJoinOp(JoinOperator.CROSS_JOIN);
// Indicate that the CROSS JOIN may add a new visible tuple to stmt's
// select list (if the latter contains an unqualified star item '*')
@@ -732,9 +744,9 @@ public class StmtRewriter {
// We have a valid equi-join conjunct.
if (expr instanceof InPredicate
- && ((InPredicate) expr).isNotIn()
- || expr instanceof ExistsPredicate
- && ((ExistsPredicate) expr).isNotExists()) {
+ && ((InPredicate) expr).isNotIn()
+ || expr instanceof ExistsPredicate
+ && ((ExistsPredicate) expr).isNotExists()) {
// For the case of a NOT IN with an eq join conjunct, replace the join
// conjunct with a conjunct that uses the null-matching eq operator.
if (expr instanceof InPredicate) {
@@ -825,7 +837,7 @@ public class StmtRewriter {
/**
* Extract all correlated predicates of a subquery.
- *
+ *
* TODO Handle correlated predicates in a HAVING clause.
*/
private static ArrayList extractCorrelatedPredicates(SelectStmt subqueryStmt)
@@ -835,14 +847,14 @@ public class StmtRewriter {
if (subqueryStmt.hasWhereClause()) {
if (!canExtractCorrelatedPredicates(subqueryStmt.getWhereClause(),
- subqueryTupleIds)) {
+ subqueryTupleIds)) {
throw new AnalysisException("Disjunctions with correlated predicates "
+ "are not supported: " + subqueryStmt.getWhereClause().toSql());
}
// Extract the correlated predicates from the subquery's WHERE clause and
// replace them with true BoolLiterals.
Expr newWhereClause = extractCorrelatedPredicates(subqueryStmt.getWhereClause(),
- subqueryTupleIds, correlatedPredicates);
+ subqueryTupleIds, correlatedPredicates);
if (canEliminate(newWhereClause)) {
newWhereClause = null;
}
@@ -857,7 +869,7 @@ public class StmtRewriter {
ArrayList onClauseCorrelatedPreds = Lists.newArrayList();
Expr newOnClause = extractCorrelatedPredicates(tableRef.getOnClause(),
- subqueryTupleIds, onClauseCorrelatedPreds);
+ subqueryTupleIds, onClauseCorrelatedPreds);
if (onClauseCorrelatedPreds.isEmpty()) {
continue;
}
@@ -883,14 +895,14 @@ public class StmtRewriter {
* and the extracted correlated predicates are added to 'matches'.
*/
private static Expr extractCorrelatedPredicates(Expr root, List tupleIds,
- ArrayList matches) {
+ ArrayList matches) {
if (isCorrelatedPredicate(root, tupleIds)) {
matches.add(root);
return new BoolLiteral(true);
}
for (int i = 0; i < root.getChildren().size(); ++i) {
root.getChildren().set(i, extractCorrelatedPredicates(root.getChild(i), tupleIds,
- matches));
+ matches));
}
return root;
}
@@ -900,7 +912,7 @@ public class StmtRewriter {
* correlated predicate cannot be extracted if it is part of a disjunction.
*/
private static boolean canExtractCorrelatedPredicates(Expr expr,
- List subqueryTupleIds) {
+ List subqueryTupleIds) {
if (!(expr instanceof CompoundPredicate)) {
return true;
}
@@ -937,8 +949,8 @@ public class StmtRewriter {
SelectListItem item = stmt.getSelectList().getItems().get(0);
if (!item.getExpr().contains(Expr.CORRELATED_SUBQUERY_SUPPORT_AGG_FN)) {
throw new AnalysisException("The select item in correlated subquery of binary predicate should only "
- + "be sum, min, max, avg and count. Current subquery:"
- + stmt.toSql());
+ + "be sum, min, max, avg and count. Current subquery:"
+ + stmt.toSql());
}
}
// Grouping and/or aggregation (including analytic functions) is forbidden in correlated subquery of in
@@ -953,7 +965,9 @@ public class StmtRewriter {
final com.google.common.base.Predicate isSingleSlotRef =
new com.google.common.base.Predicate() {
@Override
- public boolean apply(Expr arg) { return arg.unwrapSlotRef(false) != null; }
+ public boolean apply(Expr arg) {
+ return arg.unwrapSlotRef(false) != null;
+ }
};
// A HAVING clause is only allowed on correlated EXISTS subqueries with
@@ -1081,7 +1095,7 @@ public class StmtRewriter {
* the aggregate function is wrapped into a 'zeroifnull' function.
*/
private static Expr createJoinConjunct(Expr exprWithSubquery, InlineViewRef inlineView,
- Analyzer analyzer, boolean isCorrelated) throws AnalysisException {
+ Analyzer analyzer, boolean isCorrelated) throws AnalysisException {
Preconditions.checkNotNull(exprWithSubquery);
Preconditions.checkNotNull(inlineView);
Preconditions.checkState(exprWithSubquery.contains(Subquery.class));
@@ -1123,7 +1137,7 @@ public class StmtRewriter {
// NullLiteral whereas count returns a NumericLiteral.
if (((FunctionCallExpr) aggFns.get(0)).getFn().getReturnType().isNumericType()) {
FunctionCallExpr zeroIfNull = new FunctionCallExpr("ifnull",
- Lists.newArrayList((Expr) slotRef, new IntLiteral(0, Type.BIGINT)));
+ Lists.newArrayList((Expr) slotRef, new IntLiteral(0, Type.BIGINT)));
zeroIfNull.analyze(analyzer);
subquerySubstitute = zeroIfNull;
} else if (((FunctionCallExpr) aggFns.get(0)).getFn().getReturnType().isStringType()) {
@@ -1141,5 +1155,59 @@ public class StmtRewriter {
smap.put(subquery, subquerySubstitute);
return exprWithSubquery.substitute(smap, analyzer, false);
}
+
+ public static boolean rewriteByPolicy(StatementBase statementBase, Analyzer analyzer) throws UserException {
+ Catalog currentCatalog = Catalog.getCurrentCatalog();
+ UserIdentity currentUserIdentity = ConnectContext.get().getCurrentUserIdentity();
+ String user = analyzer.getQualifiedUser();
+ if (currentUserIdentity.isRootUser() || currentUserIdentity.isAdminUser()) {
+ return false;
+ }
+ if (!currentCatalog.getPolicyMgr().existPolicy(user)) {
+ return false;
+ }
+ if (!(statementBase instanceof SelectStmt)) {
+ return false;
+ }
+ SelectStmt selectStmt = (SelectStmt) statementBase;
+ boolean reAnalyze = false;
+ for (int i = 0; i < selectStmt.fromClause_.size(); i++) {
+ TableRef tableRef = selectStmt.fromClause_.get(i);
+ // Recursively rewrite subquery
+ if (tableRef instanceof InlineViewRef) {
+ InlineViewRef viewRef = (InlineViewRef) tableRef;
+ if (rewriteByPolicy(viewRef.getQueryStmt(), analyzer)) {
+ reAnalyze = true;
+ }
+ continue;
+ }
+ Table table = tableRef.getTable();
+ String dbName = tableRef.getName().getDb();
+ if (dbName == null) {
+ dbName = analyzer.getDefaultDb();
+ }
+ Database db = currentCatalog.getDbOrAnalysisException(dbName);
+ long dbId = db.getId();
+ long tableId = table.getId();
+ Policy matchPolicy = currentCatalog.getPolicyMgr().getMatchRowPolicy(dbId, tableId, user);
+ if (matchPolicy == null) {
+ continue;
+ }
+ SelectList selectList = new SelectList();
+ selectList.addItem(SelectListItem.createStarItem(tableRef.getAliasAsName()));
+
+ SelectStmt stmt = new SelectStmt(selectList,
+ new FromClause(Lists.newArrayList(tableRef)),
+ matchPolicy.getWherePredicate(),
+ null,
+ null,
+ null,
+ LimitElement.NO_LIMIT);
+ selectStmt.fromClause_.set(i, new InlineViewRef(tableRef.getAliasAsName().getTbl(), stmt));
+ selectStmt.analyze(analyzer);
+ reAnalyze = true;
+ }
+ return reAnalyze;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UserIdentity.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/UserIdentity.java
index 538df18677..46724612f9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/UserIdentity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UserIdentity.java
@@ -17,18 +17,22 @@
package org.apache.doris.analysis;
+import org.apache.doris.catalog.Catalog;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TUserIdentity;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
@@ -41,9 +45,16 @@ import java.io.IOException;
// cmy@192.168.%
// cmy@[domain.name]
public class UserIdentity implements Writable {
+
+ @SerializedName(value = "user")
private String user;
+
+ @SerializedName(value = "host")
private String host;
+
+ @SerializedName(value = "isDomain")
private boolean isDomain;
+
private boolean isAnalyzed = false;
public static final UserIdentity ROOT;
@@ -170,6 +181,10 @@ public class UserIdentity implements Writable {
return user.equals(PaloAuth.ROOT_USER);
}
+ public boolean isAdminUser() {
+ return user.equals(PaloAuth.ADMIN_USER);
+ }
+
public TUserIdentity toThrift() {
Preconditions.checkState(isAnalyzed);
TUserIdentity tUserIdent = new TUserIdentity();
@@ -180,9 +195,17 @@ public class UserIdentity implements Writable {
}
public static UserIdentity read(DataInput in) throws IOException {
- UserIdentity userIdentity = new UserIdentity();
- userIdentity.readFields(in);
- return userIdentity;
+ // Use Gson in the VERSION_109
+ if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_109) {
+ UserIdentity userIdentity = new UserIdentity();
+ userIdentity.readFields(in);
+ return userIdentity;
+ } else {
+ String json = Text.readString(in);
+ UserIdentity userIdentity = GsonUtils.GSON.fromJson(json, UserIdentity.class);
+ userIdentity.setIsAnalyzed();
+ return userIdentity;
+ }
}
@Override
@@ -226,12 +249,11 @@ public class UserIdentity implements Writable {
@Override
public void write(DataOutput out) throws IOException {
Preconditions.checkState(isAnalyzed);
- Text.writeString(out, user);
- Text.writeString(out, host);
- out.writeBoolean(isDomain);
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
}
- public void readFields(DataInput in) throws IOException {
+ @Deprecated
+ private void readFields(DataInput in) throws IOException {
user = Text.readString(in);
host = Text.readString(in);
isDomain = in.readBoolean();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index d7e3d28e9d..a833c6e3e3 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -217,6 +217,7 @@ import org.apache.doris.persist.TablePropertyInfo;
import org.apache.doris.persist.TruncateTableInfo;
import org.apache.doris.plugin.PluginInfo;
import org.apache.doris.plugin.PluginMgr;
+import org.apache.doris.policy.PolicyMgr;
import org.apache.doris.qe.AuditEventProcessor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.GlobalVariable;
@@ -459,6 +460,8 @@ public class Catalog {
private RefreshManager refreshManager;
+ private PolicyMgr policyMgr;
+
public List getFrontends(FrontendNodeType nodeType) {
if (nodeType == null) {
// get all
@@ -630,6 +633,7 @@ public class Catalog {
this.pluginMgr = new PluginMgr();
this.auditEventProcessor = new AuditEventProcessor(this.pluginMgr);
this.refreshManager = new RefreshManager();
+ this.policyMgr = new PolicyMgr();
}
public static void destroyCheckpoint() {
@@ -1945,6 +1949,17 @@ public class Catalog {
return checksum;
}
+ /**
+ * Load policy through file.
+ **/
+ public long loadPolicy(DataInputStream in, long checksum) throws IOException {
+ if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_109) {
+ policyMgr = PolicyMgr.read(in);
+ }
+ LOG.info("finished replay policy from image");
+ return checksum;
+ }
+
// Only called by checkpoint thread
// return the latest image file's absolute path
public String saveImage() throws IOException {
@@ -2212,6 +2227,11 @@ public class Catalog {
return checksum;
}
+ public long savePolicy(CountingDataOutputStream out, long checksum) throws IOException {
+ Catalog.getCurrentCatalog().getPolicyMgr().write(out);
+ return checksum;
+ }
+
public void createLabelCleaner() {
labelCleaner = new MasterDaemon("LoadLabelCleaner", Config.label_clean_interval_second * 1000L) {
@Override
@@ -5190,6 +5210,10 @@ public class Catalog {
public EsRepository getEsRepository() {
return this.esRepository;
}
+
+ public PolicyMgr getPolicyMgr() {
+ return this.policyMgr;
+ }
public void setMaster(MasterInfo info) {
this.masterIp = info.getIp();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java b/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
index be646f96ff..7bd8e07bd7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
@@ -103,6 +103,7 @@ public class MetaReader {
checksum = catalog.loadPlugins(dis, checksum);
checksum = catalog.loadDeleteHandler(dis, checksum);
checksum = catalog.loadSqlBlockRule(dis, checksum);
+ checksum = catalog.loadPolicy(dis, checksum);
}
MetaFooter metaFooter = MetaFooter.read(imageFile);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java b/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java
index 387a50be3d..4ecf24140e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java
@@ -129,6 +129,7 @@ public class MetaWriter {
checksum.setRef(writer.doWork("plugins", () -> catalog.savePlugins(dos, checksum.getRef())));
checksum.setRef(writer.doWork("deleteHandler", () -> catalog.saveDeleteHandler(dos, checksum.getRef())));
checksum.setRef(writer.doWork("sqlBlockRule", () -> catalog.saveSqlBlockRule(dos, checksum.getRef())));
+ checksum.setRef(writer.doWork("policy", () -> catalog.savePolicy(dos, checksum.getRef())));
imageFileOut.getChannel().force(true);
}
MetaFooter.write(imageFile, metaIndices, checksum.getRef());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index d9166c9ec2..df62b168e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -90,6 +90,8 @@ import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TablePropertyInfo;
import org.apache.doris.persist.TruncateTableInfo;
import org.apache.doris.plugin.PluginInfo;
+import org.apache.doris.policy.DropPolicyLog;
+import org.apache.doris.policy.Policy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@@ -642,6 +644,16 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_CREATE_POLICY: {
+ data = Policy.read(in);
+ isRead = true;
+ break;
+ }
+ case OperationType.OP_DROP_POLICY: {
+ data = DropPolicyLog.read(in);
+ isRead = true;
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 9b8c174d42..ca958128ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -63,6 +63,8 @@ import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
import org.apache.doris.plugin.PluginInfo;
+import org.apache.doris.policy.DropPolicyLog;
+import org.apache.doris.policy.Policy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@@ -809,6 +811,16 @@ public class EditLog {
catalog.getAlterInstance().replayProcessModifyEngine(log);
break;
}
+ case OperationType.OP_CREATE_POLICY: {
+ Policy log = (Policy) journal.getData();
+ catalog.getPolicyMgr().replayCreate(log);
+ break;
+ }
+ case OperationType.OP_DROP_POLICY: {
+ DropPolicyLog log = (DropPolicyLog) journal.getData();
+ catalog.getPolicyMgr().replayDrop(log);
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1411,4 +1423,12 @@ public class EditLog {
public void logModifyTableEngine(ModifyTableEngineOperationLog log) {
logEdit(OperationType.OP_MODIFY_TABLE_ENGINE, log);
}
+
+ public void logCreatePolicy(Policy policy) {
+ logEdit(OperationType.OP_CREATE_POLICY, policy);
+ }
+
+ public void logDropPolicy(DropPolicyLog log) {
+ logEdit(OperationType.OP_DROP_POLICY, log);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 3c7a42dddd..31c06fc5ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -218,6 +218,10 @@ public class OperationType {
public static final short OP_CREATE_SQL_BLOCK_RULE = 300;
public static final short OP_ALTER_SQL_BLOCK_RULE = 301;
public static final short OP_DROP_SQL_BLOCK_RULE = 302;
+
+ // policy 310-320
+ public static final short OP_CREATE_POLICY = 310;
+ public static final short OP_DROP_POLICY = 311;
// get opcode name by op codeStri
public static String getOpName(short opCode) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/DropPolicyLog.java b/fe/fe-core/src/main/java/org/apache/doris/policy/DropPolicyLog.java
new file mode 100644
index 0000000000..90fb824e15
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/DropPolicyLog.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.policy;
+
+import org.apache.doris.analysis.DropPolicyStmt;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Use for transmission drop policy log.
+ **/
+@AllArgsConstructor
+@Getter
+public class DropPolicyLog implements Writable {
+
+ @SerializedName(value = "dbId")
+ private long dbId;
+
+ @SerializedName(value = "tableId")
+ private long tableId;
+
+ @SerializedName(value = "type")
+ private PolicyTypeEnum type;
+
+ @SerializedName(value = "policyName")
+ private String policyName;
+
+ @SerializedName(value = "user")
+ private UserIdentity user;
+
+ /**
+ * Generate delete logs through stmt.
+ **/
+ public static DropPolicyLog fromDropStmt(DropPolicyStmt stmt) throws AnalysisException {
+ String curDb = stmt.getTableName().getDb();
+ if (curDb == null) {
+ curDb = ConnectContext.get().getDatabase();
+ }
+ Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb);
+ Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl());
+ return new DropPolicyLog(db.getId(), table.getId(), stmt.getType(), stmt.getPolicyName(), stmt.getUser());
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static DropPolicyLog read(DataInput in) throws IOException {
+ return GsonUtils.GSON.fromJson(Text.readString(in), DropPolicyLog.class);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/FilterType.java b/fe/fe-core/src/main/java/org/apache/doris/policy/FilterType.java
new file mode 100644
index 0000000000..49c32a8689
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/FilterType.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.policy;
+
+import org.apache.doris.analysis.CompoundPredicate;
+
+import lombok.Getter;
+
+import java.util.Arrays;
+
+/**
+ * Use for associating policies.
+ **/
+public enum FilterType {
+
+ PERMISSIVE(CompoundPredicate.Operator.OR),
+ RESTRICTIVE(CompoundPredicate.Operator.AND);
+
+ @Getter
+ private final CompoundPredicate.Operator op;
+
+ FilterType(CompoundPredicate.Operator op) {
+ this.op = op;
+ }
+
+ public static FilterType of(String name) {
+ return Arrays.stream(FilterType.values()).filter(f -> f.name().equalsIgnoreCase(name)).findFirst()
+ .orElse(RESTRICTIVE);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
new file mode 100644
index 0000000000..d617dda050
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.policy;
+
+import org.apache.doris.analysis.CreatePolicyStmt;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+
+/**
+ * Save policy for filtering data.
+ **/
+@Data
+@AllArgsConstructor
+public class Policy implements Writable, GsonPostProcessable {
+
+ public static final String ROW_POLICY = "ROW";
+
+ private static final Logger LOG = LogManager.getLogger(Policy.class);
+
+ @SerializedName(value = "dbId")
+ private long dbId;
+
+ @SerializedName(value = "tableId")
+ private long tableId;
+
+ @SerializedName(value = "policyName")
+ private String policyName;
+
+ /**
+ * ROW.
+ **/
+ @SerializedName(value = "type")
+ private PolicyTypeEnum type;
+
+ /**
+ * PERMISSIVE | RESTRICTIVE, If multiple types exist, the last type prevails.
+ **/
+ @SerializedName(value = "filterType")
+ private final FilterType filterType;
+
+ private Expr wherePredicate;
+
+ /**
+ * Policy bind user.
+ **/
+ @SerializedName(value = "user")
+ private final UserIdentity user;
+
+ /**
+ * Use for Serialization/deserialization.
+ **/
+ @SerializedName(value = "originStmt")
+ private String originStmt;
+
+ /**
+ * Trans stmt to Policy.
+ **/
+ public static Policy fromCreateStmt(CreatePolicyStmt stmt) throws AnalysisException {
+ String curDb = stmt.getTableName().getDb();
+ if (curDb == null) {
+ curDb = ConnectContext.get().getDatabase();
+ }
+ Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb);
+ Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl());
+ UserIdentity userIdent = stmt.getUser();
+ userIdent.analyze(ConnectContext.get().getClusterName());
+ return new Policy(db.getId(), table.getId(), stmt.getPolicyName(), stmt.getType(), stmt.getFilterType(),
+ stmt.getWherePredicate(), userIdent, stmt.getOrigStmt().originStmt);
+ }
+
+ /**
+ * Use for SHOW POLICY.
+ **/
+ public List getShowInfo() throws AnalysisException {
+ Database database = Catalog.getCurrentCatalog().getDbOrAnalysisException(this.dbId);
+ Table table = database.getTableOrAnalysisException(this.tableId);
+ return Lists.newArrayList(this.policyName, database.getFullName(), table.getName(), this.type.name(),
+ this.filterType.name(), this.wherePredicate.toSql(), this.user.getQualifiedUser(), this.originStmt);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ /**
+ * Read policy from file.
+ **/
+ public static Policy read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, Policy.class);
+ }
+
+ @Override
+ public void gsonPostProcess() throws IOException {
+ if (wherePredicate != null) {
+ return;
+ }
+ try {
+ SqlScanner input = new SqlScanner(new StringReader(originStmt), 0L);
+ SqlParser parser = new SqlParser(input);
+ CreatePolicyStmt stmt = (CreatePolicyStmt) SqlParserUtils.getFirstStmt(parser);
+ wherePredicate = stmt.getWherePredicate();
+ } catch (Exception e) {
+ throw new IOException("policy parse originStmt error", e);
+ }
+ }
+
+ @Override
+ public Policy clone() {
+ return new Policy(this.dbId, this.tableId, this.policyName, this.type, this.filterType, this.wherePredicate,
+ this.user, this.originStmt);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
new file mode 100644
index 0000000000..c3a1bebbf8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.policy;
+
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.CreatePolicyStmt;
+import org.apache.doris.analysis.DropPolicyStmt;
+import org.apache.doris.analysis.ShowPolicyStmt;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSet;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * Management policy and cache it.
+ **/
+public class PolicyMgr implements Writable {
+ private static final Logger LOG = LogManager.getLogger(PolicyMgr.class);
+
+ private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ @SerializedName(value = "dbIdToPolicyMap")
+ private Map> dbIdToPolicyMap = Maps.newConcurrentMap();
+
+ /**
+ * Cache merge policy for match.
+ * key:dbId:tableId-type-user
+ **/
+ private Map> dbIdToMergePolicyMap = Maps.newConcurrentMap();
+
+ private Set userPolicySet = Sets.newConcurrentHashSet();
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ /**
+ * Create policy through stmt.
+ **/
+ public void createPolicy(CreatePolicyStmt stmt) throws UserException {
+ Policy policy = Policy.fromCreateStmt(stmt);
+ writeLock();
+ try {
+ if (existPolicy(policy.getDbId(), policy.getTableId(), policy.getType(),
+ policy.getPolicyName(), policy.getUser())) {
+ if (stmt.isIfNotExists()) {
+ return;
+ }
+ throw new DdlException("the policy " + policy.getPolicyName() + " already create");
+ }
+ unprotectedAdd(policy);
+ Catalog.getCurrentCatalog().getEditLog().logCreatePolicy(policy);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ /**
+ * Drop policy through stmt.
+ **/
+ public void dropPolicy(DropPolicyStmt stmt) throws DdlException, AnalysisException {
+ DropPolicyLog policy = DropPolicyLog.fromDropStmt(stmt);
+ writeLock();
+ try {
+ if (!existPolicy(policy.getDbId(), policy.getTableId(), policy.getType(),
+ policy.getPolicyName(), policy.getUser())) {
+ if (stmt.isIfExists()) {
+ return;
+ }
+ throw new DdlException("the policy " + policy.getPolicyName() + " not exist");
+ }
+ unprotectedDrop(policy);
+ Catalog.getCurrentCatalog().getEditLog().logDropPolicy(policy);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public boolean existPolicy(String user) {
+ return userPolicySet.contains(user);
+ }
+
+ private boolean existPolicy(long dbId, long tableId, PolicyTypeEnum type, String policyName, UserIdentity user) {
+ List policies = getDbPolicies(dbId);
+ return policies.stream().anyMatch(policy -> matchPolicy(policy, tableId, type, policyName, user));
+ }
+
+ private List getDbPolicies(long dbId) {
+ if (dbIdToPolicyMap == null) {
+ return new ArrayList<>();
+ }
+ return dbIdToPolicyMap.getOrDefault(dbId, new ArrayList<>());
+ }
+
+ private List getDbUserPolicies(long dbId, String user) {
+ if (dbIdToPolicyMap == null) {
+ return new ArrayList<>();
+ }
+ return dbIdToPolicyMap.getOrDefault(dbId, new ArrayList<>()).stream()
+ .filter(p -> p.getUser().getQualifiedUser().equals(user)).collect(Collectors.toList());
+ }
+
+ public void replayCreate(Policy policy) {
+ unprotectedAdd(policy);
+ LOG.info("replay create policy: {}", policy);
+ }
+
+ private void unprotectedAdd(Policy policy) {
+ if (policy == null) {
+ return;
+ }
+ long dbId = policy.getDbId();
+ List dbPolicies = getDbPolicies(dbId);
+ dbPolicies.add(policy);
+ dbIdToPolicyMap.put(dbId, dbPolicies);
+ updateMergePolicyMap(dbId);
+ userPolicySet.add(policy.getUser().getQualifiedUser());
+ }
+
+ public void replayDrop(DropPolicyLog log) {
+ unprotectedDrop(log);
+ LOG.info("replay drop policy log: {}", log);
+ }
+
+ private void unprotectedDrop(DropPolicyLog log) {
+ long dbId = log.getDbId();
+ List policies = getDbPolicies(dbId);
+ policies.removeIf(p -> matchPolicy(p, log.getTableId(), log.getType(), log.getPolicyName(), log.getUser()));
+ dbIdToPolicyMap.put(dbId, policies);
+ updateMergePolicyMap(dbId);
+ if (log.getUser() == null) {
+ updateAllUserPolicySet();
+ } else {
+ String user = log.getUser().getQualifiedUser();
+ if (!existUserPolicy(user)) {
+ userPolicySet.remove(user);
+ }
+ }
+ }
+
+ private boolean matchPolicy(Policy policy, long tableId, PolicyTypeEnum type,
+ String policyName, UserIdentity user) {
+ return policy.getTableId() == tableId
+ && policy.getType().equals(type)
+ && StringUtils.equals(policy.getPolicyName(), policyName)
+ && (user == null || StringUtils.equals(policy.getUser().getQualifiedUser(), user.getQualifiedUser()));
+ }
+
+ /**
+ * Match row policy and return it.
+ **/
+ public Policy getMatchRowPolicy(long dbId, long tableId, String user) {
+ readLock();
+ try {
+ if (!dbIdToMergePolicyMap.containsKey(dbId)) {
+ return null;
+ }
+ String key = Joiner.on("-").join(tableId, Policy.ROW_POLICY, user);
+ if (!dbIdToMergePolicyMap.get(dbId).containsKey(key)) {
+ return null;
+ }
+ return dbIdToMergePolicyMap.get(dbId).get(key);
+ } finally {
+ readUnlock();
+ }
+ }
+
+ /**
+ * Show policy through stmt.
+ **/
+ public ShowResultSet showPolicy(ShowPolicyStmt showStmt) throws AnalysisException {
+ List> rows = Lists.newArrayList();
+ List policies;
+ long currentDbId = ConnectContext.get().getCurrentDbId();
+ if (showStmt.getUser() == null) {
+ policies = Catalog.getCurrentCatalog().getPolicyMgr().getDbPolicies(currentDbId);
+ } else {
+ policies = Catalog.getCurrentCatalog().getPolicyMgr()
+ .getDbUserPolicies(currentDbId, showStmt.getUser().getQualifiedUser());
+ }
+ for (Policy policy : policies) {
+ if (policy.getWherePredicate() == null) {
+ continue;
+ }
+ rows.add(policy.getShowInfo());
+ }
+ return new ShowResultSet(showStmt.getMetaData(), rows);
+ }
+
+ private void updateAllMergePolicyMap() {
+ dbIdToPolicyMap.forEach((dbId, policies) -> updateMergePolicyMap(dbId));
+ }
+
+ private void updateAllUserPolicySet() {
+ userPolicySet.clear();
+ dbIdToPolicyMap.forEach((dbId, policies) ->
+ policies.forEach(policy -> userPolicySet.add(policy.getUser().getQualifiedUser())));
+ }
+
+
+ private boolean existUserPolicy(String user) {
+ readLock();
+ try {
+ for (Map policies : dbIdToMergePolicyMap.values()) {
+ for (Policy policy : policies.values()) {
+ if (policy.getUser().getQualifiedUser().equals(user)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } finally {
+ readUnlock();
+ }
+
+ }
+
+ /**
+ * The merge policy cache needs to be regenerated after the update.
+ **/
+ private void updateMergePolicyMap(long dbId) {
+ readLock();
+ try {
+ if (!dbIdToPolicyMap.containsKey(dbId)) {
+ return;
+ }
+ List policies = dbIdToPolicyMap.get(dbId);
+ Map andMap = new HashMap<>();
+ Map orMap = new HashMap<>();
+ for (Policy policy : policies) {
+ // read from json, need set isAnalyzed
+ policy.getUser().setIsAnalyzed();
+ String key =
+ Joiner.on("-").join(policy.getTableId(), policy.getType(), policy.getUser().getQualifiedUser());
+ // merge wherePredicate
+ if (CompoundPredicate.Operator.AND.equals(policy.getFilterType().getOp())) {
+ Policy frontPolicy = andMap.get(key);
+ if (frontPolicy == null) {
+ andMap.put(key, policy.clone());
+ } else {
+ frontPolicy.setWherePredicate(
+ new CompoundPredicate(CompoundPredicate.Operator.AND, frontPolicy.getWherePredicate(),
+ policy.getWherePredicate()));
+ andMap.put(key, frontPolicy.clone());
+ }
+ } else {
+ Policy frontPolicy = orMap.get(key);
+ if (frontPolicy == null) {
+ orMap.put(key, policy.clone());
+ } else {
+ frontPolicy.setWherePredicate(
+ new CompoundPredicate(CompoundPredicate.Operator.OR, frontPolicy.getWherePredicate(),
+ policy.getWherePredicate()));
+ orMap.put(key, frontPolicy.clone());
+ }
+ }
+ }
+ Map mergeMap = new HashMap<>();
+ Set policyKeys = new HashSet<>();
+ policyKeys.addAll(andMap.keySet());
+ policyKeys.addAll(orMap.keySet());
+ policyKeys.forEach(key -> {
+ if (andMap.containsKey(key) && orMap.containsKey(key)) {
+ Policy mergePolicy = andMap.get(key).clone();
+ mergePolicy.setWherePredicate(
+ new CompoundPredicate(CompoundPredicate.Operator.AND, mergePolicy.getWherePredicate(),
+ orMap.get(key).getWherePredicate()));
+ mergeMap.put(key, mergePolicy);
+ }
+ if (!andMap.containsKey(key)) {
+ mergeMap.put(key, orMap.get(key));
+ }
+ if (!orMap.containsKey(key)) {
+ mergeMap.put(key, andMap.get(key));
+ }
+ });
+ dbIdToMergePolicyMap.put(dbId, mergeMap);
+ } finally {
+ readUnlock();
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ /**
+ * Read policyMgr from file.
+ **/
+ public static PolicyMgr read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ PolicyMgr policyMgr = GsonUtils.GSON.fromJson(json, PolicyMgr.class);
+ // update merge policy cache
+ policyMgr.updateAllMergePolicyMap();
+ // update user policy cache
+ policyMgr.updateAllUserPolicySet();
+ return policyMgr;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java
new file mode 100644
index 0000000000..483b8cd93b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.policy;
+
+/**
+ * Policy type enum, currently only row.
+ **/
+public enum PolicyTypeEnum {
+
+ ROW
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 3d9b45ffcb..5904848b55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -17,11 +17,11 @@
package org.apache.doris.qe;
+import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
import org.apache.doris.analysis.AdminCancelRepairTableStmt;
import org.apache.doris.analysis.AdminCheckTabletsStmt;
import org.apache.doris.analysis.AdminCleanTrashStmt;
import org.apache.doris.analysis.AdminCompactTableStmt;
-import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
import org.apache.doris.analysis.AdminRebalanceDiskStmt;
import org.apache.doris.analysis.AdminRepairTableStmt;
import org.apache.doris.analysis.AdminSetConfigStmt;
@@ -51,6 +51,7 @@ import org.apache.doris.analysis.CreateEncryptKeyStmt;
import org.apache.doris.analysis.CreateFileStmt;
import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
+import org.apache.doris.analysis.CreatePolicyStmt;
import org.apache.doris.analysis.CreateRepositoryStmt;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.analysis.CreateRoleStmt;
@@ -69,6 +70,7 @@ import org.apache.doris.analysis.DropEncryptKeyStmt;
import org.apache.doris.analysis.DropFileStmt;
import org.apache.doris.analysis.DropFunctionStmt;
import org.apache.doris.analysis.DropMaterializedViewStmt;
+import org.apache.doris.analysis.DropPolicyStmt;
import org.apache.doris.analysis.DropRepositoryStmt;
import org.apache.doris.analysis.DropResourceStmt;
import org.apache.doris.analysis.DropRoleStmt;
@@ -305,6 +307,10 @@ public class DdlExecutor {
catalog.getStatisticsJobManager().createStatisticsJob((AnalyzeStmt) ddlStmt);
} else if (ddlStmt instanceof AlterResourceStmt) {
catalog.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt);
+ } else if (ddlStmt instanceof CreatePolicyStmt) {
+ catalog.getPolicyMgr().createPolicy((CreatePolicyStmt) ddlStmt);
+ } else if (ddlStmt instanceof DropPolicyStmt) {
+ catalog.getPolicyMgr().dropPolicy((DropPolicyStmt) ddlStmt);
} else {
throw new DdlException("Unknown statement.");
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 2a1779c8d6..f04cfe31f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -60,6 +60,7 @@ import org.apache.doris.analysis.ShowMigrationsStmt;
import org.apache.doris.analysis.ShowPartitionIdStmt;
import org.apache.doris.analysis.ShowPartitionsStmt;
import org.apache.doris.analysis.ShowPluginsStmt;
+import org.apache.doris.analysis.ShowPolicyStmt;
import org.apache.doris.analysis.ShowProcStmt;
import org.apache.doris.analysis.ShowProcesslistStmt;
import org.apache.doris.analysis.ShowQueryProfileStmt;
@@ -344,6 +345,8 @@ public class ShowExecutor {
handleAdminDiagnoseTablet();
} else if (stmt instanceof ShowCreateMaterializedViewStmt) {
handleShowCreateMaterializedView();
+ } else if (stmt instanceof ShowPolicyStmt) {
+ handleShowPolicy();
} else {
handleEmtpy();
}
@@ -2205,4 +2208,9 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showStmt.getMetaData(), resultRowSet);
}
+ public void handleShowPolicy() throws AnalysisException {
+ ShowPolicyStmt showStmt = (ShowPolicyStmt) stmt;
+ resultSet = Catalog.getCurrentCatalog().getPolicyMgr().showPolicy(showStmt);
+ }
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index b8280cff63..057ceb5a89 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -35,6 +35,7 @@ import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.SelectListItem;
import org.apache.doris.analysis.SelectStmt;
+import org.apache.doris.analysis.SetOperationStmt;
import org.apache.doris.analysis.SetStmt;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.ShowStmt;
@@ -676,6 +677,25 @@ public class StmtExecutor implements ProfileWriter {
parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt);
reAnalyze = true;
}
+ if (parsedStmt instanceof SelectStmt) {
+ if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {
+ reAnalyze = true;
+ }
+ }
+ if (parsedStmt instanceof SetOperationStmt) {
+ List operands = ((SetOperationStmt) parsedStmt).getOperands();
+ for (SetOperationStmt.SetOperand operand : operands) {
+ if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)) {
+ reAnalyze = true;
+ }
+ }
+ }
+ if (parsedStmt instanceof InsertStmt) {
+ QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();
+ if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {
+ reAnalyze = true;
+ }
+ }
if (reAnalyze) {
// The rewrites should have no user-visible effect. Remember the original result
// types and column labels to restore them after the rewritten stmt has been
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex
index aa2a3f3ff9..8f3dd68a35 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -317,6 +317,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("profile", new Integer(SqlParserSymbols.KW_PROFILE));
keywordMap.put("properties", new Integer(SqlParserSymbols.KW_PROPERTIES));
keywordMap.put("property", new Integer(SqlParserSymbols.KW_PROPERTY));
+ keywordMap.put("policy", new Integer(SqlParserSymbols.KW_POLICY));
keywordMap.put("query", new Integer(SqlParserSymbols.KW_QUERY));
keywordMap.put("quota", new Integer(SqlParserSymbols.KW_QUOTA));
keywordMap.put("random", new Integer(SqlParserSymbols.KW_RANDOM));
@@ -380,9 +381,11 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("superuser", new Integer(SqlParserSymbols.KW_SUPERUSER));
keywordMap.put("sync", new Integer(SqlParserSymbols.KW_SYNC));
keywordMap.put("system", new Integer(SqlParserSymbols.KW_SYSTEM));
+ keywordMap.put("sql_block_rule", new Integer(SqlParserSymbols.KW_SQL_BLOCK_RULE));
keywordMap.put("table", new Integer(SqlParserSymbols.KW_TABLE));
keywordMap.put("tables", new Integer(SqlParserSymbols.KW_TABLES));
keywordMap.put("tablet", new Integer(SqlParserSymbols.KW_TABLET));
+ keywordMap.put("tablets", new Integer(SqlParserSymbols.KW_TABLETS));
keywordMap.put("task", new Integer(SqlParserSymbols.KW_TASK));
keywordMap.put("temporary", new Integer(SqlParserSymbols.KW_TEMPORARY));
keywordMap.put("terminated", new Integer(SqlParserSymbols.KW_TERMINATED));
@@ -428,8 +431,6 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("write", new Integer(SqlParserSymbols.KW_WRITE));
keywordMap.put("year", new Integer(SqlParserSymbols.KW_YEAR));
keywordMap.put("||", new Integer(SqlParserSymbols.KW_PIPE));
- keywordMap.put("sql_block_rule", new Integer(SqlParserSymbols.KW_SQL_BLOCK_RULE));
- keywordMap.put("tablets", new Integer(SqlParserSymbols.KW_TABLETS));
}
// map from token id to token description
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index 9908f9b804..9fe32fd36a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -61,7 +61,9 @@ import org.apache.doris.transaction.TabletCommitInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -76,10 +78,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mocked;
-
+/**
+ * Test for SparkLoadJobTest.
+ **/
public class SparkLoadJobTest {
private long dbId;
private String dbName;
@@ -100,6 +101,9 @@ public class SparkLoadJobTest {
private long backendId;
private int schemaHash;
+ /**
+ * Init.
+ **/
@Before
public void setUp() {
dbId = 1L;
@@ -207,16 +211,19 @@ public class SparkLoadJobTest {
};
ResourceDesc resourceDesc = new ResourceDesc(resourceName, Maps.newHashMap());
- SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
+ SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc,
+ new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
job.execute();
Assert.assertEquals(JobState.PENDING, job.getState());
}
@Test
- public void testOnPendingTaskFinished(@Mocked Catalog catalog, @Injectable String originStmt) throws MetaNotFoundException {
+ public void testOnPendingTaskFinished(@Mocked Catalog catalog, @Injectable String originStmt)
+ throws MetaNotFoundException {
ResourceDesc resourceDesc = new ResourceDesc(resourceName, Maps.newHashMap());
- SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
+ SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc,
+ new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId);
attachment.setAppId(appId);
attachment.setOutputPath(etlOutputPath);
@@ -235,7 +242,8 @@ public class SparkLoadJobTest {
sparkConfigs.put("spark.master", "yarn");
sparkConfigs.put("spark.submit.deployMode", "cluster");
sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999");
- SparkLoadJob job = new SparkLoadJob(dbId, label, null, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
+ SparkLoadJob job = new SparkLoadJob(dbId, label, null,
+ new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
job.state = JobState.ETL;
job.setMaxFilterRatio(0.15);
job.transactionId = transactionId;
@@ -500,18 +508,20 @@ public class SparkLoadJobTest {
result = resourceMgr;
resourceMgr.getResource(anyString);
result = sparkResource;
+ Catalog.getCurrentCatalogJournalVersion();
+ result = FeMetaVersion.VERSION_CURRENT;
}
};
String label = "label1";
ResourceDesc resourceDesc = new ResourceDesc("my_spark", Maps.newHashMap());
- String oriStmt = "LOAD LABEL db1.label1\n" +
- "(\n" +
- "DATA INFILE(\"hdfs://127.0.0.1:8000/user/palo/data/input/file\")\n" +
- "INTO TABLE `my_table`\n" +
- "WHERE k1 > 10\n" +
- ")\n" +
- "WITH RESOURCE 'my_spark';";
+ String oriStmt = "LOAD LABEL db1.label1\n"
+ + "(\n"
+ + "DATA INFILE(\"hdfs://127.0.0.1:8000/user/palo/data/input/file\")\n"
+ + "INTO TABLE `my_table`\n"
+ + "WHERE k1 > 10\n"
+ + ")\n"
+ + "WITH RESOURCE 'my_spark';";
OriginStatement originStmt = new OriginStatement(oriStmt, 0);
UserIdentity userInfo = UserIdentity.ADMIN;
SparkLoadJob sparkLoadJob = new SparkLoadJob(dbId, label, resourceDesc, originStmt, userInfo);
@@ -526,13 +536,10 @@ public class SparkLoadJobTest {
file.createNewFile();
DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));
sparkLoadJob.write(dos);
-
dos.flush();
dos.close();
-
// 2. Read objects from file
DataInputStream dis = new DataInputStream(new FileInputStream(file));
-
SparkLoadJob sparkLoadJob2 = (SparkLoadJob) SparkLoadJob.read(dis);
Assert.assertEquals("my_spark", sparkLoadJob2.getResourceName());
Assert.assertEquals(label, sparkLoadJob2.getLabel());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java
new file mode 100644
index 0000000000..6bbc977e71
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.policy;
+
+import org.apache.doris.analysis.CreateUserStmt;
+import org.apache.doris.analysis.GrantStmt;
+import org.apache.doris.analysis.ShowPolicyStmt;
+import org.apache.doris.analysis.TablePattern;
+import org.apache.doris.analysis.UserDesc;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.AccessPrivilege;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ExceptionChecker;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+/**
+ * Test for Policy.
+ **/
+public class PolicyTest extends TestWithFeService {
+
+ @Override
+ protected void runBeforeAll() throws Exception {
+ FeConstants.runningUnitTest = true;
+ createDatabase("test");
+ useDatabase("test");
+ createTable("create table table1\n"
+ + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
+ + "properties(\"replication_num\" = \"1\");");
+ createTable("create table table2\n"
+ + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
+ + "properties(\"replication_num\" = \"1\");");
+ // create user
+ UserIdentity user = new UserIdentity("test_policy", "%");
+ user.analyze(SystemInfoService.DEFAULT_CLUSTER);
+ CreateUserStmt createUserStmt = new CreateUserStmt(new UserDesc(user));
+ Catalog.getCurrentCatalog().getAuth().createUser(createUserStmt);
+ List privileges = Lists.newArrayList(AccessPrivilege.ADMIN_PRIV);
+ TablePattern tablePattern = new TablePattern("*", "*");
+ tablePattern.analyze(SystemInfoService.DEFAULT_CLUSTER);
+ GrantStmt grantStmt = new GrantStmt(user, null, tablePattern, privileges);
+ Catalog.getCurrentCatalog().getAuth().grant(grantStmt);
+ useUser("test_policy");
+ }
+
+ @Test
+ public void testNoPolicy() throws Exception {
+ useUser("root");
+ String queryStr = "EXPLAIN select * from test.table1";
+ String explainString = getSQLPlanOrErrorMsg(queryStr);
+ useUser("test_policy");
+ Assertions.assertFalse(explainString.contains("`k1` = 1"));
+ }
+
+ @Test
+ public void testExistPolicy() throws Exception {
+ createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
+ Assertions.assertTrue(Catalog.getCurrentCatalog().getPolicyMgr().existPolicy("default_cluster:test_policy"));
+ dropPolicy("DROP ROW POLICY test_row_policy ON test.table1 FOR test_policy");
+ Assertions.assertFalse(Catalog.getCurrentCatalog().getPolicyMgr().existPolicy("default_cluster:test_policy"));
+ createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
+ dropPolicy("DROP ROW POLICY test_row_policy ON test.table1");
+ Assertions.assertFalse(Catalog.getCurrentCatalog().getPolicyMgr().existPolicy("default_cluster:test_policy"));
+ }
+
+ @Test
+ public void testNormalSql() throws Exception {
+ createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
+ String queryStr = "EXPLAIN select * from test.table1";
+ String explainString = getSQLPlanOrErrorMsg(queryStr);
+ Assertions.assertTrue(explainString.contains("`k1` = 1"));
+ dropPolicy("DROP ROW POLICY test_row_policy ON test.table1 FOR test_policy");
+ }
+
+ @Test
+ public void testUnionSql() throws Exception {
+ createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
+ String queryStr = "EXPLAIN select * from test.table1 union all select * from test.table1";
+ String explainString = getSQLPlanOrErrorMsg(queryStr);
+ Assertions.assertTrue(explainString.contains("`k1` = 1"));
+ dropPolicy("DROP ROW POLICY test_row_policy ON test.table1 FOR test_policy");
+ }
+
+ @Test
+ public void testInsertSelectSql() throws Exception {
+ createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
+ String queryStr = "EXPLAIN insert into test.table1 select * from test.table1";
+ String explainString = getSQLPlanOrErrorMsg(queryStr);
+ Assertions.assertTrue(explainString.contains("`k1` = 1"));
+ dropPolicy("DROP ROW POLICY test_row_policy ON test.table1 FOR test_policy");
+ }
+
+ @Test
+ public void testDuplicateAddPolicy() throws Exception {
+ createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
+ createPolicy("CREATE ROW POLICY IF NOT EXISTS test_row_policy1 ON test.table1 AS PERMISSIVE"
+ + " TO test_policy USING (k1 = 1)");
+ ExceptionChecker.expectThrowsWithMsg(DdlException.class, "the policy test_row_policy1 already create",
+ () -> createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE"
+ + " TO test_policy USING (k1 = 1)"));
+ dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1");
+ }
+
+ @Test
+ public void testNoAuth() {
+ ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
+ "CreatePolicyStmt command denied to user 'root'@'%' for table 'table1'",
+ () -> createPolicy(
+ "CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE TO root USING (k1 = 1)"));
+ }
+
+ @Test
+ public void testShowPolicy() throws Exception {
+ createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
+ createPolicy("CREATE ROW POLICY test_row_policy2 ON test.table1 AS PERMISSIVE TO test_policy USING (k2 = 1)");
+ ShowPolicyStmt showPolicyStmt =
+ (ShowPolicyStmt) parseAndAnalyzeStmt("SHOW ROW POLICY");
+ int firstSize = Catalog.getCurrentCatalog().getPolicyMgr().showPolicy(showPolicyStmt).getResultRows().size();
+ Assertions.assertTrue(firstSize > 0);
+ dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1");
+ dropPolicy("DROP ROW POLICY test_row_policy2 ON test.table1");
+ int secondSize = Catalog.getCurrentCatalog().getPolicyMgr().showPolicy(showPolicyStmt).getResultRows().size();
+ Assertions.assertEquals(2, firstSize - secondSize);
+ }
+
+ @Test
+ public void testDropPolicy() throws Exception {
+ createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE TO test_policy USING (k2 = 1)");
+ dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1");
+ dropPolicy("DROP ROW POLICY IF EXISTS test_row_policy5 ON test.table1");
+ ExceptionChecker.expectThrowsWithMsg(DdlException.class, "the policy test_row_policy1 not exist",
+ () -> dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1"));
+ }
+
+ @Test
+ public void testMergeFilter() throws Exception {
+ createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS RESTRICTIVE TO test_policy USING (k1 = 1)");
+ createPolicy("CREATE ROW POLICY test_row_policy2 ON test.table1 AS RESTRICTIVE TO test_policy USING (k2 = 1)");
+ createPolicy("CREATE ROW POLICY test_row_policy3 ON test.table1 AS PERMISSIVE TO test_policy USING (k2 = 2)");
+ createPolicy("CREATE ROW POLICY test_row_policy4 ON test.table1 AS PERMISSIVE TO test_policy USING (k2 = 1)");
+ String queryStr = "EXPLAIN select * from test.table1";
+ String explainString = getSQLPlanOrErrorMsg(queryStr);
+ Assertions.assertTrue(explainString.contains("`k1` = 1, `k2` = 1, `k2` = 2 OR `k2` = 1"));
+ dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1");
+ dropPolicy("DROP ROW POLICY test_row_policy2 ON test.table1");
+ dropPolicy("DROP ROW POLICY test_row_policy3 ON test.table1");
+ dropPolicy("DROP ROW POLICY test_row_policy4 ON test.table1");
+ }
+
+ @Test
+ public void testComplexSql() throws Exception {
+ createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS RESTRICTIVE TO test_policy USING (k1 = 1)");
+ createPolicy("CREATE ROW POLICY test_row_policy2 ON test.table1 AS RESTRICTIVE TO test_policy USING (k2 = 1)");
+ String joinSql = "select * from table1 join table2 on table1.k1=table2.k1";
+ System.out.println(getSQLPlanOrErrorMsg(joinSql));
+ Assertions.assertTrue(getSQLPlanOrErrorMsg(joinSql).contains(
+ " TABLE: table1\n"
+ + " PREAGGREGATION: ON\n"
+ + " PREDICATES: `k1` = 1, `k2` = 1"));
+ String unionSql = "select * from table1 union select * from table2";
+ Assertions.assertTrue(getSQLPlanOrErrorMsg(unionSql).contains(
+ " TABLE: table1\n"
+ + " PREAGGREGATION: ON\n"
+ + " PREDICATES: `k1` = 1, `k2` = 1"));
+ String subQuerySql = "select * from table2 where k1 in (select k1 from table1)";
+ Assertions.assertTrue(getSQLPlanOrErrorMsg(subQuerySql).contains(
+ " TABLE: table1\n"
+ + " PREAGGREGATION: ON\n"
+ + " PREDICATES: `k1` = 1, `k2` = 1"));
+ String aliasSql = "select * from table1 t1 join table2 t2 on t1.k1=t2.k1";
+ Assertions.assertTrue(getSQLPlanOrErrorMsg(aliasSql).contains(
+ " TABLE: table1\n"
+ + " PREAGGREGATION: ON\n"
+ + " PREDICATES: `k1` = 1, `k2` = 1"));
+ dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1");
+ dropPolicy("DROP ROW POLICY test_row_policy2 ON test.table1");
+ }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 5c2352cd48..62c9ed44b0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -17,27 +17,12 @@
package org.apache.doris.utframe;
-import java.io.File;
-import java.io.IOException;
-import java.io.StringReader;
-import java.net.DatagramSocket;
-import java.net.ServerSocket;
-import java.net.SocketException;
-import java.nio.channels.SocketChannel;
-import java.nio.file.Files;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import org.apache.commons.io.FileUtils;
-
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreatePolicyStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.CreateViewStmt;
+import org.apache.doris.analysis.DropPolicyStmt;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
@@ -45,6 +30,7 @@ import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -53,6 +39,7 @@ import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.planner.Planner;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;
@@ -65,11 +52,27 @@ import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException;
import org.apache.doris.utframe.MockedFrontend.FeStartException;
import org.apache.doris.utframe.MockedFrontend.NotInitException;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
+import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.nio.channels.SocketChannel;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
/**
* This is the base class for unit class that wants to start a FE service.
*
@@ -87,7 +90,7 @@ import org.junit.jupiter.api.TestInstance;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class TestWithFeService {
protected String runningDir =
- "fe/mocked/" + getClass().getSimpleName() + "/" + UUID.randomUUID() + "/";
+ "fe/mocked/" + getClass().getSimpleName() + "/" + UUID.randomUUID() + "/";
protected ConnectContext connectContext;
@BeforeAll
@@ -124,11 +127,11 @@ public abstract class TestWithFeService {
// Parse an origin stmt and analyze it. Return a StatementBase instance.
protected StatementBase parseAndAnalyzeStmt(String originStmt)
- throws Exception {
+ throws Exception {
System.out.println("begin to parse stmt: " + originStmt);
SqlScanner input =
- new SqlScanner(new StringReader(originStmt),
- connectContext.getSessionVariable().getSqlMode());
+ new SqlScanner(new StringReader(originStmt),
+ connectContext.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
Analyzer analyzer = new Analyzer(connectContext.getCatalog(), connectContext);
StatementBase statementBase = null;
@@ -143,6 +146,7 @@ public abstract class TestWithFeService {
throw new AnalysisException(errorMessage, e);
}
}
+ statementBase.setOrigStmt(new OriginStatement(originStmt, 0));
statementBase.analyze(analyzer);
return statementBase;
}
@@ -150,7 +154,8 @@ public abstract class TestWithFeService {
// for analyzing multi statements
protected List parseAndAnalyzeStmts(String originStmt) throws Exception {
System.out.println("begin to parse stmts: " + originStmt);
- SqlScanner input = new SqlScanner(new StringReader(originStmt), connectContext.getSessionVariable().getSqlMode());
+ SqlScanner input =
+ new SqlScanner(new StringReader(originStmt), connectContext.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
Analyzer analyzer = new Analyzer(connectContext.getCatalog(), connectContext);
List statementBases = null;
@@ -180,7 +185,7 @@ public abstract class TestWithFeService {
}
protected int startFEServer(String runningDir) throws EnvVarNotSetException, IOException,
- FeStartException, NotInitException, DdlException, InterruptedException {
+ FeStartException, NotInitException, DdlException, InterruptedException {
// get DORIS_HOME
String dorisHome = System.getenv("DORIS_HOME");
if (Strings.isNullOrEmpty(dorisHome)) {
@@ -213,14 +218,14 @@ public abstract class TestWithFeService {
}
protected void createDorisCluster()
- throws InterruptedException, NotInitException, IOException, DdlException,
- EnvVarNotSetException, FeStartException {
+ throws InterruptedException, NotInitException, IOException, DdlException,
+ EnvVarNotSetException, FeStartException {
createDorisCluster(runningDir, 1);
}
protected void createDorisCluster(String runningDir, int backendNum)
- throws EnvVarNotSetException, IOException, FeStartException,
- NotInitException, DdlException, InterruptedException {
+ throws EnvVarNotSetException, IOException, FeStartException,
+ NotInitException, DdlException, InterruptedException {
int fe_rpc_port = startFEServer(runningDir);
for (int i = 0; i < backendNum; i++) {
createBackend("127.0.0.1", fe_rpc_port);
@@ -233,8 +238,8 @@ public abstract class TestWithFeService {
// the host of BE will be "127.0.0.1", "127.0.0.2"
protected void createDorisClusterWithMultiTag(String runningDir,
int backendNum)
- throws EnvVarNotSetException, IOException, FeStartException, NotInitException,
- DdlException, InterruptedException {
+ throws EnvVarNotSetException, IOException, FeStartException, NotInitException,
+ DdlException, InterruptedException {
// set runningUnitTest to true, so that for ut, the agent task will be send to "127.0.0.1" to make cluster running well.
FeConstants.runningUnitTest = true;
int fe_rpc_port = startFEServer(runningDir);
@@ -247,7 +252,7 @@ public abstract class TestWithFeService {
}
protected void createBackend(String beHost, int fe_rpc_port)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
int be_heartbeat_port = findValidPort();
int be_thrift_port = findValidPort();
int be_brpc_port = findValidPort();
@@ -255,14 +260,15 @@ public abstract class TestWithFeService {
// start be
MockedBackend backend = MockedBackendFactory.createBackend(beHost,
- be_heartbeat_port, be_thrift_port, be_brpc_port, be_http_port,
- new DefaultHeartbeatServiceImpl(be_thrift_port, be_http_port, be_brpc_port),
- new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
+ be_heartbeat_port, be_thrift_port, be_brpc_port, be_http_port,
+ new DefaultHeartbeatServiceImpl(be_thrift_port, be_http_port, be_brpc_port),
+ new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
backend.setFeAddress(new TNetworkAddress("127.0.0.1", fe_rpc_port));
backend.start();
// add be
- Backend be = new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(), backend.getHeartbeatPort());
+ Backend be =
+ new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(), backend.getHeartbeatPort());
Map disks = Maps.newHashMap();
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
diskInfo1.setTotalCapacityB(1000000);
@@ -351,6 +357,10 @@ public abstract class TestWithFeService {
Catalog.getCurrentCatalog().createDb(createDbStmt);
}
+ protected void useDatabase(String dbName) {
+ connectContext.setDatabase(ClusterNamespace.getFullName(SystemInfoService.DEFAULT_CLUSTER, dbName));
+ }
+
protected void createTable(String sql) throws Exception {
createTables(sql);
}
@@ -367,6 +377,16 @@ public abstract class TestWithFeService {
Catalog.getCurrentCatalog().createView(createViewStmt);
}
+ protected void createPolicy(String sql) throws Exception {
+ CreatePolicyStmt createPolicyStmt = (CreatePolicyStmt) parseAndAnalyzeStmt(sql);
+ Catalog.getCurrentCatalog().getPolicyMgr().createPolicy(createPolicyStmt);
+ }
+
+ protected void dropPolicy(String sql) throws Exception {
+ DropPolicyStmt stmt = (DropPolicyStmt) parseAndAnalyzeStmt(sql);
+ Catalog.getCurrentCatalog().getPolicyMgr().dropPolicy(stmt);
+ }
+
protected void assertSQLPlanOrErrorMsgContains(String sql, String expect) throws Exception {
// Note: adding `EXPLAIN` is necessary for non-query SQL, e.g., DDL, DML, etc.
// TODO: Use a graceful way to get explain plan string, rather than modifying the SQL string.
@@ -379,4 +399,11 @@ public abstract class TestWithFeService {
Assertions.assertTrue(str.contains(expect));
}
}
+
+ protected void useUser(String userName) throws AnalysisException {
+ UserIdentity user = new UserIdentity(userName, "%");
+ user.analyze(SystemInfoService.DEFAULT_CLUSTER);
+ connectContext.setCurrentUserIdentity(user);
+ connectContext.setQualifiedUser(SystemInfoService.DEFAULT_CLUSTER + ":" + userName);
+ }
}