diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index b5030bab59..ee1c9f532c 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -633,6 +633,7 @@ module.exports = [ "CREATE-FUNCTION", "CREATE-INDEX", "CREATE-MATERIALIZED-VIEW", + "CREATE-POLICY", "CREATE-RESOURCE", "CREATE-SQL-BLOCK-RULE", "CREATE-TABLE-LIKE", diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index 173d203ed3..899b5d94f8 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -633,6 +633,7 @@ module.exports = [ "CREATE-FUNCTION", "CREATE-INDEX", "CREATE-MATERIALIZED-VIEW", + "CREATE-POLICY", "CREATE-RESOURCE", "CREATE-SQL-BLOCK-RULE", "CREATE-TABLE-LIKE", diff --git a/docs/en/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md b/docs/en/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md new file mode 100644 index 0000000000..7202f2a30b --- /dev/null +++ b/docs/en/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md @@ -0,0 +1,84 @@ +--- +{ + "title": "CREATE-POLICY", + "language": "en" +} +--- + + + +## CREATE-POLICY + +### Name + +CREATE POLICY + +### Description + +Create security policies and explain to view the rewritten SQL. + +#### 行安全策略 +grammar: + +```sql +CREATE ROW POLICY test_row_policy_1 ON test.table1 +AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2)); +``` + +illustrate: + +- filterType:It is usual to constrict a set of policies through AND. PERMISSIVE to constrict a set of policies through OR +- Configure multiple policies. First, merge the RESTRICTIVE policy with the PERMISSIVE policy +- It is connected with AND between RESTRICTIVE AND PERMISSIVE +- It cannot be created for users root and admin + +### Example + +1. Create a set of row security policies + + ```sql + CREATE ROW POLICY test_row_policy_1 ON test.table1 + AS RESTRICTIVE TO test USING (c1 = 'a'); + ``` + ```sql + CREATE ROW POLICY test_row_policy_2 ON test.table1 + AS RESTRICTIVE TO test USING (c2 = 'b'); + ``` + ```sql + CREATE ROW POLICY test_row_policy_3 ON test.table1 + AS PERMISSIVE TO test USING (c3 = 'c'); + ``` + ```sql + CREATE ROW POLICY test_row_policy_3 ON test.table1 + AS PERMISSIVE TO test USING (c4 = 'd'); + ``` + + When we execute the query on Table1, the rewritten SQL is + + ```sql + select * from (select * from table1 where c1 = 'a' and c2 = 'b' or c3 = 'c' or c4 = 'd') + ``` + +### Keywords + + CREATE, POLICY + +### Best Practice + diff --git a/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md b/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md new file mode 100644 index 0000000000..f17db5b5c5 --- /dev/null +++ b/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md @@ -0,0 +1,84 @@ +--- +{ + "title": "CREATE-POLICY", + "language": "zh-CN" +} +--- + + + +## CREATE-POLICY + +### Name + +CREATE POLICY + +### Description + +创建安全策略,explain 可以查看改写后的 SQL。 + +#### 行安全策略 +语法: + +```sql +CREATE ROW POLICY test_row_policy_1 ON test.table1 +AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2)); +``` + +参数说明: + +- filterType:RESTRICTIVE 将一组策略通过 AND 连接, PERMISSIVE 将一组策略通过 OR 连接 +- 配置多个策略首先合并 RESTRICTIVE 的策略,再添加 PERMISSIVE 的策略 +- RESTRICTIVE 和 PERMISSIVE 之间通过 AND 连接的 +- 不允许对 root 和 admin 用户创建 + +### Example + +1. 创建一组行安全策略 + + ```sql + CREATE ROW POLICY test_row_policy_1 ON test.table1 + AS RESTRICTIVE TO test USING (c1 = 'a'); + ``` + ```sql + CREATE ROW POLICY test_row_policy_2 ON test.table1 + AS RESTRICTIVE TO test USING (c2 = 'b'); + ``` + ```sql + CREATE ROW POLICY test_row_policy_3 ON test.table1 + AS PERMISSIVE TO test USING (c3 = 'c'); + ``` + ```sql + CREATE ROW POLICY test_row_policy_3 ON test.table1 + AS PERMISSIVE TO test USING (c4 = 'd'); + ``` + + 当我们执行对 table1 的查询时被改写后的 sql 为 + + ```sql + select * from (select * from table1 where c1 = 'a' and c2 = 'b' or c3 = 'c' or c4 = 'd') + ``` + +### Keywords + + CREATE, POLICY + +### Best Practice + diff --git a/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Drop/DROP-POLICY.md b/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Drop/DROP-POLICY.md new file mode 100644 index 0000000000..2e8d2e96a8 --- /dev/null +++ b/docs/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Drop/DROP-POLICY.md @@ -0,0 +1,64 @@ +--- +{ + "title": "DROP-POLICY", + "language": "zh-CN" +} +--- + + + +## DROP-POLICY + +### Name + +DROP POLICY + +### Description + +删除安全策略 + +#### 行安全策略 + +语法: + +```sql +DROP ROW POLICY test_row_policy_1 on table1 [FOR user]; +``` + +### Example + +1. 删除 table1 的 test_row_policy_1 + + ```sql + DROP ROW POLICY test_row_policy_1 on table1 + ``` + +2. 删除 table1 作用于 test 的 test_row_policy_1 行安全策略 + + ```sql + DROP ROW POLICY test_row_policy_1 on table1 for test + ``` + +### Keywords + + DROP, POLICY + +### Best Practice + diff --git a/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-POLICY.md b/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-POLICY.md new file mode 100644 index 0000000000..2d7e2604d7 --- /dev/null +++ b/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-POLICY.md @@ -0,0 +1,79 @@ +--- +{ + "title": "SHOW-ROW-POLICY", + "language": "zh-CN" +} +--- + + + +## SHOW-POLICY + +### Name + +SHOW ROW POLICY + +### Description + +查看当前 DB 下的行安全策略 + +语法: + +```sql +SHOW ROW POLICY [FOR user] +``` + +### Example + +1. 查看所有安全策略。 + + ```sql + mysql> SHOW ROW POLICY; + +-------------------+----------------------+-----------+------+-------------+-------------------+------+-------------------------------------------------------------------------------------------------------------------------------------------+ + | PolicyName | DbName | TableName | Type | FilterType | WherePredicate | User | OriginStmt | + +-------------------+----------------------+-----------+------+-------------+-------------------+------+-------------------------------------------------------------------------------------------------------------------------------------------+ + | test_row_policy_1 | default_cluster:test | table1 | ROW | RESTRICTIVE | `id` IN (1, 2) | root | /* ApplicationName=DataGrip 2021.3.4 */ CREATE ROW POLICY test_row_policy_1 ON test.table1 AS RESTRICTIVE TO root USING (id in (1, 2)); + | + | test_row_policy_2 | default_cluster:test | table1 | ROW | RESTRICTIVE | `col1` = 'col1_1' | root | /* ApplicationName=DataGrip 2021.3.4 */ CREATE ROW POLICY test_row_policy_2 ON test.table1 AS RESTRICTIVE TO root USING (col1='col1_1'); + | + +-------------------+----------------------+-----------+------+-------------+-------------------+------+-------------------------------------------------------------------------------------------------------------------------------------------+ + 2 rows in set (0.00 sec) + ``` + +2. 指定用户名查询 + + ```sql + mysql> SHOW ROW POLICY FOR test; + +-------------------+----------------------+-----------+------+------------+-------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------+ + | PolicyName | DbName | TableName | Type | FilterType | WherePredicate | User | OriginStmt | + +-------------------+----------------------+-----------+------+------------+-------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------+ + | test_row_policy_3 | default_cluster:test | table1 | ROW | PERMISSIVE | `col1` = 'col1_2' | default_cluster:test | /* ApplicationName=DataGrip 2021.3.4 */ CREATE ROW POLICY test_row_policy_3 ON test.table1 AS PERMISSIVE TO test USING (col1='col1_2'); + | + +-------------------+----------------------+-----------+------+------------+-------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------+ + 1 row in set (0.01 sec) + ``` + + +### Keywords + + SHOW, POLICY + +### Best Practice + diff --git a/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-SQL-BLOCK-RULE.md b/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-SQL-BLOCK-RULE.md index 1c43551281..e08a869217 100644 --- a/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-SQL-BLOCK-RULE.md +++ b/docs/zh-CN/sql-manual/sql-reference/Show-Statements/SHOW-SQL-BLOCK-RULE.md @@ -55,7 +55,7 @@ SHOW SQL_BLOCK_RULE [FOR RULE_NAME]; 2 rows in set (0.01 sec) ``` -2. 制定规则名查询 +2. 指定规则名查询 ```sql mysql> SHOW SQL_BLOCK_RULE FOR test_rule2; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index 840365f92d..a9c1d2b2c8 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -36,8 +36,10 @@ public final class FeMetaVersion { public static final int VERSION_107 = 107; // add storage_cold_medium and remote_storage_resource_name in DataProperty public static final int VERSION_108 = 108; + // add row policy + public static final int VERSION_109 = 109; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_108; + public static final int VERSION_CURRENT = VERSION_109; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index f5291bb38e..53ccf909be 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -48,6 +48,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Version; import org.apache.doris.mysql.MysqlPassword; import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.policy.PolicyTypeEnum; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -260,7 +261,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A KW_NAME, KW_NAMES, KW_NEGATIVE, KW_NO, KW_NOT, KW_NULL, KW_NULLS, KW_OBSERVER, KW_OFFSET, KW_ON, KW_ONLY, KW_OPEN, KW_OR, KW_ORDER, KW_OUTER, KW_OUTFILE, KW_OVER, KW_PARAMETER, KW_PARTITION, KW_PARTITIONS, KW_PASSWORD, KW_LDAP_ADMIN_PASSWORD, KW_PATH, KW_PAUSE, KW_PIPE, KW_PRECEDING, - KW_PLUGIN, KW_PLUGINS, + KW_PLUGIN, KW_PLUGINS, KW_POLICY, KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROFILE, KW_PROPERTIES, KW_PROPERTY, KW_QUERY, KW_QUOTA, KW_RANDOM, KW_RANGE, KW_READ, KW_REBALANCE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RELEASE, KW_RENAME, @@ -269,13 +270,13 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A KW_S3, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, KW_SELECT, KW_SEMI, KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SETS, KW_SHOW, KW_SIGNED, KW_SKEW, KW_SMALLINT, KW_SNAPSHOT, KW_SONAME, KW_SPLIT, KW_START, KW_STATUS, KW_STATS, KW_STOP, KW_STORAGE, KW_STREAM, KW_STRING, KW_STRUCT, - KW_SUM, KW_SUPERUSER, KW_SYNC, KW_SYSTEM, + KW_SUM, KW_SUPERUSER, KW_SYNC, KW_SYSTEM, KW_SQL_BLOCK_RULE, KW_TABLE, KW_TABLES, KW_TABLET, KW_TABLETS, KW_TASK, KW_TEMPORARY, KW_TERMINATED, KW_TEXT, KW_THAN, KW_TIME, KW_THEN, KW_TIMESTAMP, KW_TINYINT,KW_TRASH, KW_TO, KW_TRANSACTION, KW_TRIGGERS, KW_TRIM, KW_TRUE, KW_TRUNCATE, KW_TYPE, KW_TYPES, KW_UNCOMMITTED, KW_UNBOUNDED, KW_UNION, KW_UNIQUE, KW_UNLOCK, KW_UNSIGNED, KW_USE, KW_USER, KW_USING, KW_UNINSTALL, KW_VALUE, KW_VALUES, KW_VARCHAR, KW_VARIABLES, KW_VERBOSE, KW_VIEW, KW_WARNINGS, KW_WEEK, KW_WHEN, KW_WHITELIST, KW_WHERE, KW_WITH, KW_WORK, KW_WRITE, - KW_YEAR, KW_SQL_BLOCK_RULE; + KW_YEAR; terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT; terminal BITAND, BITOR, BITXOR, BITNOT; @@ -1353,6 +1354,12 @@ create_stmt ::= {: RESULT = new CreateSqlBlockRuleStmt(ruleName, properties); :} + /* row policy */ + | KW_CREATE KW_ROW KW_POLICY opt_if_not_exists:ifNotExists ident:policyName KW_ON table_name:tbl KW_AS ident:filterType KW_TO user_identity:user + KW_USING LPAREN expr:wherePredicate RPAREN + {: + RESULT = new CreatePolicyStmt(PolicyTypeEnum.ROW, ifNotExists, policyName, tbl, filterType, user, wherePredicate); + :} ; channel_desc_list ::= @@ -2058,6 +2065,14 @@ drop_stmt ::= {: RESULT = new DropSqlBlockRuleStmt(ruleNames); :} + | KW_DROP KW_ROW KW_POLICY opt_if_exists:ifExists ident:policyName KW_ON table_name:tbl + {: + RESULT = new DropPolicyStmt(PolicyTypeEnum.ROW, ifExists, policyName, tbl, null); + :} + | KW_DROP KW_ROW KW_POLICY opt_if_exists:ifExists ident:policyName KW_ON table_name:tbl KW_FOR user_identity:user + {: + RESULT = new DropPolicyStmt(PolicyTypeEnum.ROW, ifExists, policyName, tbl, user); + :} ; // Recover statement @@ -2546,6 +2561,14 @@ show_stmt ::= {: RESULT = new ShowSqlBlockRuleStmt(null); :} + | KW_SHOW KW_ROW KW_POLICY KW_FOR user_identity:user + {: + RESULT = new ShowPolicyStmt(PolicyTypeEnum.ROW, user); + :} + | KW_SHOW KW_ROW KW_POLICY + {: + RESULT = new ShowPolicyStmt(PolicyTypeEnum.ROW, null); + :} ; show_param ::= @@ -5785,6 +5808,8 @@ keyword ::= {: RESULT = id; :} | KW_VARCHAR:id {: RESULT = id; :} + | KW_POLICY:id + {: RESULT = id; :} ; // Identifier that contain keyword diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java new file mode 100644 index 0000000000..b8771575da --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.policy.FilterType; +import org.apache.doris.policy.PolicyTypeEnum; +import org.apache.doris.qe.ConnectContext; + +import lombok.Getter; + +/** + * Create policy statement. + * syntax: + * CREATE ROW POLICY [IF NOT EXISTS] test_row_policy ON test_table AS {PERMISSIVE|RESTRICTIVE} TO user USING (a = ’xxx‘) + */ +public class CreatePolicyStmt extends DdlStmt { + + @Getter + private final PolicyTypeEnum type; + + @Getter + private final boolean ifNotExists; + + @Getter + private final String policyName; + + @Getter + private final TableName tableName; + + @Getter + private final FilterType filterType; + + @Getter + private final UserIdentity user; + + @Getter + private Expr wherePredicate; + + /** + * Use for cup. + **/ + public CreatePolicyStmt(PolicyTypeEnum type, boolean ifNotExists, String policyName, TableName tableName, String filterType, + UserIdentity user, Expr wherePredicate) { + this.type = type; + this.ifNotExists = ifNotExists; + this.policyName = policyName; + this.tableName = tableName; + this.filterType = FilterType.of(filterType); + this.user = user; + this.wherePredicate = wherePredicate; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + tableName.analyze(analyzer); + user.analyze(analyzer.getClusterName()); + if (user.isRootUser() || user.isAdminUser()) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CreatePolicyStmt", + user.getQualifiedUser(), user.getHost(), tableName.getTbl()); + } + // check auth + if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("CREATE ").append(type).append(" POLICY "); + if (ifNotExists) { + sb.append("IF NOT EXISTS"); + } + sb.append(policyName).append(" ON ").append(tableName.toSql()).append(" AS ").append(filterType) + .append(" TO ").append(user.getQualifiedUser()).append(" USING ").append(wherePredicate.toSql()); + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java new file mode 100644 index 0000000000..87fb616c0a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.policy.PolicyTypeEnum; +import org.apache.doris.qe.ConnectContext; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * Drop policy statement. + * syntax: + * DROP [ROW] POLICY [IF EXISTS] test_row_policy ON test_table [FOR user] + **/ +@AllArgsConstructor +public class DropPolicyStmt extends DdlStmt { + + @Getter + private final PolicyTypeEnum type; + + @Getter + private final boolean ifExists; + + @Getter + private final String policyName; + + @Getter + private final TableName tableName; + + @Getter + private final UserIdentity user; + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + tableName.analyze(analyzer); + if (user != null) { + user.analyze(analyzer.getClusterName()); + } + // check auth + if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("DROP ").append(type).append(" POLICY "); + if (ifExists) { + sb.append("IF EXISTS "); + } + sb.append(policyName).append(" ON ").append(tableName.toSql()); + if (user != null) { + sb.append(" FOR ").append(user.getQualifiedUser()); + } + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java new file mode 100644 index 0000000000..f450952ba1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.policy.PolicyTypeEnum; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ShowResultSetMetaData; + +import lombok.Getter; + +/** + * Show policy statement + * syntax: + * SHOW ROW POLICY [FOR user] + **/ +public class ShowPolicyStmt extends ShowStmt { + + @Getter + private final PolicyTypeEnum type; + + @Getter + private final UserIdentity user; + + public ShowPolicyStmt(PolicyTypeEnum type, UserIdentity user) { + this.type = type; + this.user = user; + } + + private static final ShowResultSetMetaData ROW_META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("PolicyName", ScalarType.createVarchar(100))) + .addColumn(new Column("DbName", ScalarType.createVarchar(100))) + .addColumn(new Column("TableName", ScalarType.createVarchar(100))) + .addColumn(new Column("Type", ScalarType.createVarchar(20))) + .addColumn(new Column("FilterType", ScalarType.createVarchar(20))) + .addColumn(new Column("WherePredicate", ScalarType.createVarchar(65535))) + .addColumn(new Column("User", ScalarType.createVarchar(20))) + .addColumn(new Column("OriginStmt", ScalarType.createVarchar(65535))) + .build(); + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + if (user != null) { + user.analyze(analyzer.getClusterName()); + } + // check auth + if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("SHOW ").append(type).append(" POLICY"); + if (user != null) { + sb.append(" FOR ").append(user); + } + return sb.toString(); + } + + @Override + public ShowResultSetMetaData getMetaData() { + return ROW_META_DATA; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java index ef5c7f91d0..a2b0122f50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java @@ -20,16 +20,20 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.TableAliasGenerator; import org.apache.doris.common.UserException; +import org.apache.doris.policy.Policy; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +58,8 @@ public class StmtRewriter { Preconditions.checkNotNull(analyzedStmt.analyzer); return rewriteQueryStatement(analyzedStmt, analyzer); } else if (parsedStmt instanceof InsertStmt) { - final InsertStmt insertStmt = (InsertStmt)parsedStmt; - final QueryStmt analyzedStmt = (QueryStmt)insertStmt.getQueryStmt(); + final InsertStmt insertStmt = (InsertStmt) parsedStmt; + final QueryStmt analyzedStmt = (QueryStmt) insertStmt.getQueryStmt(); Preconditions.checkNotNull(analyzedStmt.analyzer); QueryStmt rewrittenQueryStmt = rewriteQueryStatement(analyzedStmt, analyzer); insertStmt.setQueryStmt(rewrittenQueryStmt); @@ -66,10 +70,10 @@ public class StmtRewriter { return parsedStmt; } - /** - * Calls the appropriate equal method based on the specific type of query stmt. See - * rewriteSelectStatement() and rewriteUnionStatement() documentation. - */ + /** + * Calls the appropriate equal method based on the specific type of query stmt. See + * rewriteSelectStatement() and rewriteUnionStatement() documentation. + */ public static QueryStmt rewriteQueryStatement(QueryStmt stmt, Analyzer analyzer) throws AnalysisException { Preconditions.checkNotNull(stmt); @@ -88,9 +92,11 @@ public class StmtRewriter { throws AnalysisException { SelectStmt result = stmt; // Rewrite all the subqueries in the FROM clause. - for (TableRef tblRef: result.fromClause_) { - if (!(tblRef instanceof InlineViewRef)) continue; - InlineViewRef inlineViewRef = (InlineViewRef)tblRef; + for (TableRef tblRef : result.fromClause_) { + if (!(tblRef instanceof InlineViewRef)) { + continue; + } + InlineViewRef inlineViewRef = (InlineViewRef) tblRef; QueryStmt rewrittenQueryStmt = rewriteQueryStatement(inlineViewRef.getViewStmt(), inlineViewRef.getAnalyzer()); inlineViewRef.setViewStmt(rewrittenQueryStmt); @@ -127,37 +133,39 @@ public class StmtRewriter { * For example: * select cs_item_sk, sum(cs_sales_price) from catalog_sales a group by cs_item_sk * having sum(cs_sales_price) > - * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk); + * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk); + * *

* Step1: equal having subquery to where subquery * Outer query is changed to inline view in rewritten query * Inline view of outer query: - * from (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a + * from (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a * Rewritten subquery of expr: - * where a.sum_cs_sales_price > - * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk) + * where a.sum_cs_sales_price > + * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk) * Rewritten query: - * select cs_item_sk, a.sum_cs_sales_price from - * (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a - * where a.sum_cs_sales_price > - * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk) + * select cs_item_sk, a.sum_cs_sales_price from + * (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a + * where a.sum_cs_sales_price > + * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk) *

* Step2: equal where subquery * Inline view of subquery: - * from (select b.cs_item_sk, min(cs_sales_price) from catalog_sales b group by cs_item_sk) c + * from (select b.cs_item_sk, min(cs_sales_price) from catalog_sales b group by cs_item_sk) c * Rewritten correlated predicate: - * where c.cs_item_sk = a.cs_item_sk and a.sum_cs_sales_price > c.min(cs_sales_price) + * where c.cs_item_sk = a.cs_item_sk and a.sum_cs_sales_price > c.min(cs_sales_price) * The final stmt: * select a.cs_item_sk, a.sum_cs_sales_price from - * (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a - * join - * (select b.cs_item_sk, min(b.cs_sales_price) min_cs_sales_price from catalog_sales b group by b.cs_item_sk) c + * (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a + * join + * (select b.cs_item_sk, min(b.cs_sales_price) min_cs_sales_price from catalog_sales b group by b.cs_item_sk) c * where c.cs_item_sk = a.cs_item_sk and a.sum_cs_sales_price > c.min_cs_sales_price; * * @param stmt * @param analyzer */ - private static SelectStmt rewriteHavingClauseSubqueries(SelectStmt stmt, Analyzer analyzer) throws AnalysisException { + private static SelectStmt rewriteHavingClauseSubqueries(SelectStmt stmt, Analyzer analyzer) + throws AnalysisException { // prepare parameters SelectList selectList = stmt.getSelectList(); List columnLabels = stmt.getColLabels(); @@ -256,7 +264,7 @@ public class StmtRewriter { SelectListItem newItem = new SelectListItem(selectList.getItems().get(i).getExpr().reset().substitute(smap), columnLabels.get(i)); newSelectItems.add(newItem); - LOG.debug("New select item is changed to "+ newItem.toSql()); + LOG.debug("New select item is changed to " + newItem.toSql()); } SelectList newSelectList = new SelectList(newSelectItems, selectList.isDistinct()); @@ -288,7 +296,7 @@ public class StmtRewriter { * @return select a, sum(v1), sum(v2) */ private static SelectList addMissingAggregationColumns(SelectList selectList, - List aggregateExprs) { + List aggregateExprs) { SelectList result = selectList.clone(); for (FunctionCallExpr functionCallExpr : aggregateExprs) { boolean columnExists = false; @@ -312,10 +320,10 @@ public class StmtRewriter { */ private static void rewriteUnionStatement(SetOperationStmt stmt, Analyzer analyzer) throws AnalysisException { - for (SetOperationStmt.SetOperand operand: stmt.getOperands()) { + for (SetOperationStmt.SetOperand operand : stmt.getOperands()) { Preconditions.checkState(operand.getQueryStmt() instanceof SelectStmt); QueryStmt rewrittenQueryStmt = StmtRewriter.rewriteSelectStatement( - (SelectStmt)operand.getQueryStmt(), operand.getAnalyzer()); + (SelectStmt) operand.getQueryStmt(), operand.getAnalyzer()); operand.setQueryStmt(rewrittenQueryStmt); } } @@ -338,6 +346,7 @@ public class StmtRewriter { } return false; } + /** * Rewrite all subqueries of a stmt's WHERE clause. Initially, all the * conjuncts containing subqueries are extracted from the WHERE clause and are @@ -345,52 +354,51 @@ public class StmtRewriter { * merged into its parent select block by converting it into a join. * Conjuncts with subqueries that themselves contain conjuncts with subqueries are * recursively rewritten in a bottom up fashion. - * + *

* The following example illustrates the bottom up rewriting of nested queries. * Suppose we have the following three level nested query Q0: - * + *

* SELECT * * FROM T1 : Q0 * WHERE T1.a IN (SELECT a - * FROM T2 WHERE T2.b IN (SELECT b - * FROM T3)) + * FROM T2 WHERE T2.b IN (SELECT b + * FROM T3)) * AND T1.c < 10; - * + *

* This query will be rewritten as follows. Initially, the IN predicate * T1.a IN (SELECT a FROM T2 WHERE T2.b IN (SELECT b FROM T3)) is extracted * from the top level block (Q0) since it contains a subquery and is * replaced by a true BoolLiteral, resulting in the following query Q1: - * + *

* SELECT * FROM T1 WHERE TRUE : Q1 - * + *

* Since the stmt in the extracted predicate contains a conjunct with a subquery, * it is also rewritten. As before, rewriting stmt SELECT a FROM T2 * WHERE T2.b IN (SELECT b FROM T3) works by first extracting the conjunct that * contains the subquery (T2.b IN (SELECT b FROM T3)) and substituting it with * a true BoolLiteral, producing the following stmt Q2: - * + *

* SELECT a FROM T2 WHERE TRUE : Q2 - * + *

* The predicate T2.b IN (SELECT b FROM T3) is then merged with Q2, * producing the following unnested query Q3: - * + *

* SELECT a FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1 ON T2.b = $a$1.b : Q3 - * + *

* The extracted IN predicate becomes: - * + *

* T1.a IN (SELECT a FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1 ON T2.b = $a$1.b) - * + *

* Finally, the rewritten IN predicate is merged with query block Q1, * producing the following unnested query (WHERE clauses that contain only * conjunctions of true BoolLiterals are eliminated): - * + *

* SELECT * * FROM T1 LEFT SEMI JOIN (SELECT a - * FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1 - * ON T2.b = $a$1.b) $a$1 + * FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1 + * ON T2.b = $a$1.b) $a$1 * ON $a$1.a = T1.a * WHERE T1.c < 10; - * */ private static void rewriteWhereClauseSubqueries( SelectStmt stmt, Analyzer analyzer) @@ -494,7 +502,7 @@ public class StmtRewriter { * subquery stmt. The modified analyzed expr is returned. */ private static Expr rewriteExpr(Expr expr, Analyzer analyzer) - throws AnalysisException { + throws AnalysisException { // Extract the subquery and equal it. Subquery subquery = expr.getSubquery(); Preconditions.checkNotNull(subquery); @@ -506,7 +514,7 @@ public class StmtRewriter { rewrittenStmt.reset(); Subquery newSubquery = new Subquery(rewrittenStmt); newSubquery.analyze(analyzer); - + ExprSubstitutionMap smap = new ExprSubstitutionMap(); smap.put(subquery, newSubquery); return expr.substitute(smap, analyzer, false); @@ -539,7 +547,7 @@ public class StmtRewriter { Preconditions.checkNotNull(expr); Preconditions.checkNotNull(analyzer); Preconditions.checkState(expr.getSubquery().getAnalyzer() != null, - "subquery must be analyze address=" + System.identityHashCode(expr.getSubquery())); + "subquery must be analyze address=" + System.identityHashCode(expr.getSubquery())); boolean updateSelectList = false; SelectStmt subqueryStmt = (SelectStmt) expr.getSubquery().getStatement(); @@ -554,7 +562,7 @@ public class StmtRewriter { } // (select k1 $a from t2) $b InlineViewRef inlineView = new InlineViewRef( - stmt.getTableAliasGenerator().getNextAlias(), subqueryStmt, colLabels); + stmt.getTableAliasGenerator().getNextAlias(), subqueryStmt, colLabels); // Extract all correlated predicates from the subquery. List onClauseConjuncts = extractCorrelatedPredicates(subqueryStmt); @@ -578,7 +586,7 @@ public class StmtRewriter { for (Expr conjunct : onClauseConjuncts) { canRewriteScalarFunction(expr, conjunct); updateInlineView(inlineView, conjunct, stmt.getTableRefIds(), - lhsExprs, rhsExprs, updateGroupBy); + lhsExprs, rhsExprs, updateGroupBy); } /** @@ -612,10 +620,10 @@ public class StmtRewriter { // Create a join conjunct from the expr that contains a subquery. Expr joinConjunct = createJoinConjunct(expr, inlineView, analyzer, - !onClauseConjuncts.isEmpty()); + !onClauseConjuncts.isEmpty()); if (joinConjunct != null) { SelectListItem firstItem = - ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0); + ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0); if (!onClauseConjuncts.isEmpty() && firstItem.getExpr().contains(Expr.NON_NULL_EMPTY_AGG)) { // Correlated subqueries with an aggregate function that returns non-null on @@ -626,7 +634,7 @@ public class StmtRewriter { // stmt's WHERE clause because it needs to be applied to the result of the // LEFT OUTER JOIN (both matched and unmatched tuples). stmt.whereClause = - CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause); + CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause); joinConjunct = null; joinOp = JoinOperator.LEFT_OUTER_JOIN; updateSelectList = true; @@ -639,7 +647,7 @@ public class StmtRewriter { // Create the ON clause from the extracted correlated predicates. Expr onClausePredicate = - CompoundPredicate.createConjunctivePredicate(onClauseConjuncts); + CompoundPredicate.createConjunctivePredicate(onClauseConjuncts); if (onClausePredicate == null) { Preconditions.checkState(expr instanceof ExistsPredicate); if (((ExistsPredicate) expr).isNotExists()) { @@ -679,9 +687,13 @@ public class StmtRewriter { // Check if we have a valid ON clause for an equi-join. boolean hasEqJoinPred = false; for (Expr conjunct : onClausePredicate.getConjuncts()) { - if (!(conjunct instanceof BinaryPredicate)) continue; + if (!(conjunct instanceof BinaryPredicate)) { + continue; + } BinaryPredicate.Operator operator = ((BinaryPredicate) conjunct).getOp(); - if (!operator.isEquivalence()) continue; + if (!operator.isEquivalence()) { + continue; + } List lhsTupleIds = Lists.newArrayList(); conjunct.getChild(0).getIds(lhsTupleIds, null); if (lhsTupleIds.isEmpty()) { @@ -723,7 +735,7 @@ public class StmtRewriter { // We can equal the aggregate subquery using a cross join. All conjuncts // that were extracted from the subquery are added to stmt's WHERE clause. stmt.whereClause = - CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause); + CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause); inlineView.setJoinOp(JoinOperator.CROSS_JOIN); // Indicate that the CROSS JOIN may add a new visible tuple to stmt's // select list (if the latter contains an unqualified star item '*') @@ -732,9 +744,9 @@ public class StmtRewriter { // We have a valid equi-join conjunct. if (expr instanceof InPredicate - && ((InPredicate) expr).isNotIn() - || expr instanceof ExistsPredicate - && ((ExistsPredicate) expr).isNotExists()) { + && ((InPredicate) expr).isNotIn() + || expr instanceof ExistsPredicate + && ((ExistsPredicate) expr).isNotExists()) { // For the case of a NOT IN with an eq join conjunct, replace the join // conjunct with a conjunct that uses the null-matching eq operator. if (expr instanceof InPredicate) { @@ -825,7 +837,7 @@ public class StmtRewriter { /** * Extract all correlated predicates of a subquery. - * + *

* TODO Handle correlated predicates in a HAVING clause. */ private static ArrayList extractCorrelatedPredicates(SelectStmt subqueryStmt) @@ -835,14 +847,14 @@ public class StmtRewriter { if (subqueryStmt.hasWhereClause()) { if (!canExtractCorrelatedPredicates(subqueryStmt.getWhereClause(), - subqueryTupleIds)) { + subqueryTupleIds)) { throw new AnalysisException("Disjunctions with correlated predicates " + "are not supported: " + subqueryStmt.getWhereClause().toSql()); } // Extract the correlated predicates from the subquery's WHERE clause and // replace them with true BoolLiterals. Expr newWhereClause = extractCorrelatedPredicates(subqueryStmt.getWhereClause(), - subqueryTupleIds, correlatedPredicates); + subqueryTupleIds, correlatedPredicates); if (canEliminate(newWhereClause)) { newWhereClause = null; } @@ -857,7 +869,7 @@ public class StmtRewriter { ArrayList onClauseCorrelatedPreds = Lists.newArrayList(); Expr newOnClause = extractCorrelatedPredicates(tableRef.getOnClause(), - subqueryTupleIds, onClauseCorrelatedPreds); + subqueryTupleIds, onClauseCorrelatedPreds); if (onClauseCorrelatedPreds.isEmpty()) { continue; } @@ -883,14 +895,14 @@ public class StmtRewriter { * and the extracted correlated predicates are added to 'matches'. */ private static Expr extractCorrelatedPredicates(Expr root, List tupleIds, - ArrayList matches) { + ArrayList matches) { if (isCorrelatedPredicate(root, tupleIds)) { matches.add(root); return new BoolLiteral(true); } for (int i = 0; i < root.getChildren().size(); ++i) { root.getChildren().set(i, extractCorrelatedPredicates(root.getChild(i), tupleIds, - matches)); + matches)); } return root; } @@ -900,7 +912,7 @@ public class StmtRewriter { * correlated predicate cannot be extracted if it is part of a disjunction. */ private static boolean canExtractCorrelatedPredicates(Expr expr, - List subqueryTupleIds) { + List subqueryTupleIds) { if (!(expr instanceof CompoundPredicate)) { return true; } @@ -937,8 +949,8 @@ public class StmtRewriter { SelectListItem item = stmt.getSelectList().getItems().get(0); if (!item.getExpr().contains(Expr.CORRELATED_SUBQUERY_SUPPORT_AGG_FN)) { throw new AnalysisException("The select item in correlated subquery of binary predicate should only " - + "be sum, min, max, avg and count. Current subquery:" - + stmt.toSql()); + + "be sum, min, max, avg and count. Current subquery:" + + stmt.toSql()); } } // Grouping and/or aggregation (including analytic functions) is forbidden in correlated subquery of in @@ -953,7 +965,9 @@ public class StmtRewriter { final com.google.common.base.Predicate isSingleSlotRef = new com.google.common.base.Predicate() { @Override - public boolean apply(Expr arg) { return arg.unwrapSlotRef(false) != null; } + public boolean apply(Expr arg) { + return arg.unwrapSlotRef(false) != null; + } }; // A HAVING clause is only allowed on correlated EXISTS subqueries with @@ -1081,7 +1095,7 @@ public class StmtRewriter { * the aggregate function is wrapped into a 'zeroifnull' function. */ private static Expr createJoinConjunct(Expr exprWithSubquery, InlineViewRef inlineView, - Analyzer analyzer, boolean isCorrelated) throws AnalysisException { + Analyzer analyzer, boolean isCorrelated) throws AnalysisException { Preconditions.checkNotNull(exprWithSubquery); Preconditions.checkNotNull(inlineView); Preconditions.checkState(exprWithSubquery.contains(Subquery.class)); @@ -1123,7 +1137,7 @@ public class StmtRewriter { // NullLiteral whereas count returns a NumericLiteral. if (((FunctionCallExpr) aggFns.get(0)).getFn().getReturnType().isNumericType()) { FunctionCallExpr zeroIfNull = new FunctionCallExpr("ifnull", - Lists.newArrayList((Expr) slotRef, new IntLiteral(0, Type.BIGINT))); + Lists.newArrayList((Expr) slotRef, new IntLiteral(0, Type.BIGINT))); zeroIfNull.analyze(analyzer); subquerySubstitute = zeroIfNull; } else if (((FunctionCallExpr) aggFns.get(0)).getFn().getReturnType().isStringType()) { @@ -1141,5 +1155,59 @@ public class StmtRewriter { smap.put(subquery, subquerySubstitute); return exprWithSubquery.substitute(smap, analyzer, false); } + + public static boolean rewriteByPolicy(StatementBase statementBase, Analyzer analyzer) throws UserException { + Catalog currentCatalog = Catalog.getCurrentCatalog(); + UserIdentity currentUserIdentity = ConnectContext.get().getCurrentUserIdentity(); + String user = analyzer.getQualifiedUser(); + if (currentUserIdentity.isRootUser() || currentUserIdentity.isAdminUser()) { + return false; + } + if (!currentCatalog.getPolicyMgr().existPolicy(user)) { + return false; + } + if (!(statementBase instanceof SelectStmt)) { + return false; + } + SelectStmt selectStmt = (SelectStmt) statementBase; + boolean reAnalyze = false; + for (int i = 0; i < selectStmt.fromClause_.size(); i++) { + TableRef tableRef = selectStmt.fromClause_.get(i); + // Recursively rewrite subquery + if (tableRef instanceof InlineViewRef) { + InlineViewRef viewRef = (InlineViewRef) tableRef; + if (rewriteByPolicy(viewRef.getQueryStmt(), analyzer)) { + reAnalyze = true; + } + continue; + } + Table table = tableRef.getTable(); + String dbName = tableRef.getName().getDb(); + if (dbName == null) { + dbName = analyzer.getDefaultDb(); + } + Database db = currentCatalog.getDbOrAnalysisException(dbName); + long dbId = db.getId(); + long tableId = table.getId(); + Policy matchPolicy = currentCatalog.getPolicyMgr().getMatchRowPolicy(dbId, tableId, user); + if (matchPolicy == null) { + continue; + } + SelectList selectList = new SelectList(); + selectList.addItem(SelectListItem.createStarItem(tableRef.getAliasAsName())); + + SelectStmt stmt = new SelectStmt(selectList, + new FromClause(Lists.newArrayList(tableRef)), + matchPolicy.getWherePredicate(), + null, + null, + null, + LimitElement.NO_LIMIT); + selectStmt.fromClause_.set(i, new InlineViewRef(tableRef.getAliasAsName().getTbl(), stmt)); + selectStmt.analyze(analyzer); + reAnalyze = true; + } + return reAnalyze; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UserIdentity.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/UserIdentity.java index 538df18677..46724612f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/UserIdentity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UserIdentity.java @@ -17,18 +17,22 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.Catalog; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CaseSensibility; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TUserIdentity; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; @@ -41,9 +45,16 @@ import java.io.IOException; // cmy@192.168.% // cmy@[domain.name] public class UserIdentity implements Writable { + + @SerializedName(value = "user") private String user; + + @SerializedName(value = "host") private String host; + + @SerializedName(value = "isDomain") private boolean isDomain; + private boolean isAnalyzed = false; public static final UserIdentity ROOT; @@ -170,6 +181,10 @@ public class UserIdentity implements Writable { return user.equals(PaloAuth.ROOT_USER); } + public boolean isAdminUser() { + return user.equals(PaloAuth.ADMIN_USER); + } + public TUserIdentity toThrift() { Preconditions.checkState(isAnalyzed); TUserIdentity tUserIdent = new TUserIdentity(); @@ -180,9 +195,17 @@ public class UserIdentity implements Writable { } public static UserIdentity read(DataInput in) throws IOException { - UserIdentity userIdentity = new UserIdentity(); - userIdentity.readFields(in); - return userIdentity; + // Use Gson in the VERSION_109 + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_109) { + UserIdentity userIdentity = new UserIdentity(); + userIdentity.readFields(in); + return userIdentity; + } else { + String json = Text.readString(in); + UserIdentity userIdentity = GsonUtils.GSON.fromJson(json, UserIdentity.class); + userIdentity.setIsAnalyzed(); + return userIdentity; + } } @Override @@ -226,12 +249,11 @@ public class UserIdentity implements Writable { @Override public void write(DataOutput out) throws IOException { Preconditions.checkState(isAnalyzed); - Text.writeString(out, user); - Text.writeString(out, host); - out.writeBoolean(isDomain); + Text.writeString(out, GsonUtils.GSON.toJson(this)); } - public void readFields(DataInput in) throws IOException { + @Deprecated + private void readFields(DataInput in) throws IOException { user = Text.readString(in); host = Text.readString(in); isDomain = in.readBoolean(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index d7e3d28e9d..a833c6e3e3 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -217,6 +217,7 @@ import org.apache.doris.persist.TablePropertyInfo; import org.apache.doris.persist.TruncateTableInfo; import org.apache.doris.plugin.PluginInfo; import org.apache.doris.plugin.PluginMgr; +import org.apache.doris.policy.PolicyMgr; import org.apache.doris.qe.AuditEventProcessor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.GlobalVariable; @@ -459,6 +460,8 @@ public class Catalog { private RefreshManager refreshManager; + private PolicyMgr policyMgr; + public List getFrontends(FrontendNodeType nodeType) { if (nodeType == null) { // get all @@ -630,6 +633,7 @@ public class Catalog { this.pluginMgr = new PluginMgr(); this.auditEventProcessor = new AuditEventProcessor(this.pluginMgr); this.refreshManager = new RefreshManager(); + this.policyMgr = new PolicyMgr(); } public static void destroyCheckpoint() { @@ -1945,6 +1949,17 @@ public class Catalog { return checksum; } + /** + * Load policy through file. + **/ + public long loadPolicy(DataInputStream in, long checksum) throws IOException { + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_109) { + policyMgr = PolicyMgr.read(in); + } + LOG.info("finished replay policy from image"); + return checksum; + } + // Only called by checkpoint thread // return the latest image file's absolute path public String saveImage() throws IOException { @@ -2212,6 +2227,11 @@ public class Catalog { return checksum; } + public long savePolicy(CountingDataOutputStream out, long checksum) throws IOException { + Catalog.getCurrentCatalog().getPolicyMgr().write(out); + return checksum; + } + public void createLabelCleaner() { labelCleaner = new MasterDaemon("LoadLabelCleaner", Config.label_clean_interval_second * 1000L) { @Override @@ -5190,6 +5210,10 @@ public class Catalog { public EsRepository getEsRepository() { return this.esRepository; } + + public PolicyMgr getPolicyMgr() { + return this.policyMgr; + } public void setMaster(MasterInfo info) { this.masterIp = info.getIp(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java b/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java index be646f96ff..7bd8e07bd7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java @@ -103,6 +103,7 @@ public class MetaReader { checksum = catalog.loadPlugins(dis, checksum); checksum = catalog.loadDeleteHandler(dis, checksum); checksum = catalog.loadSqlBlockRule(dis, checksum); + checksum = catalog.loadPolicy(dis, checksum); } MetaFooter metaFooter = MetaFooter.read(imageFile); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java b/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java index 387a50be3d..4ecf24140e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java @@ -129,6 +129,7 @@ public class MetaWriter { checksum.setRef(writer.doWork("plugins", () -> catalog.savePlugins(dos, checksum.getRef()))); checksum.setRef(writer.doWork("deleteHandler", () -> catalog.saveDeleteHandler(dos, checksum.getRef()))); checksum.setRef(writer.doWork("sqlBlockRule", () -> catalog.saveSqlBlockRule(dos, checksum.getRef()))); + checksum.setRef(writer.doWork("policy", () -> catalog.savePolicy(dos, checksum.getRef()))); imageFileOut.getChannel().force(true); } MetaFooter.write(imageFile, metaIndices, checksum.getRef()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index d9166c9ec2..df62b168e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -90,6 +90,8 @@ import org.apache.doris.persist.TableInfo; import org.apache.doris.persist.TablePropertyInfo; import org.apache.doris.persist.TruncateTableInfo; import org.apache.doris.plugin.PluginInfo; +import org.apache.doris.policy.DropPolicyLog; +import org.apache.doris.policy.Policy; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.transaction.TransactionState; @@ -642,6 +644,16 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_CREATE_POLICY: { + data = Policy.read(in); + isRead = true; + break; + } + case OperationType.OP_DROP_POLICY: { + data = DropPolicyLog.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 9b8c174d42..ca958128ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -63,6 +63,8 @@ import org.apache.doris.meta.MetaContext; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.UserPropertyInfo; import org.apache.doris.plugin.PluginInfo; +import org.apache.doris.policy.DropPolicyLog; +import org.apache.doris.policy.Policy; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.transaction.TransactionState; @@ -809,6 +811,16 @@ public class EditLog { catalog.getAlterInstance().replayProcessModifyEngine(log); break; } + case OperationType.OP_CREATE_POLICY: { + Policy log = (Policy) journal.getData(); + catalog.getPolicyMgr().replayCreate(log); + break; + } + case OperationType.OP_DROP_POLICY: { + DropPolicyLog log = (DropPolicyLog) journal.getData(); + catalog.getPolicyMgr().replayDrop(log); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); @@ -1411,4 +1423,12 @@ public class EditLog { public void logModifyTableEngine(ModifyTableEngineOperationLog log) { logEdit(OperationType.OP_MODIFY_TABLE_ENGINE, log); } + + public void logCreatePolicy(Policy policy) { + logEdit(OperationType.OP_CREATE_POLICY, policy); + } + + public void logDropPolicy(DropPolicyLog log) { + logEdit(OperationType.OP_DROP_POLICY, log); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 3c7a42dddd..31c06fc5ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -218,6 +218,10 @@ public class OperationType { public static final short OP_CREATE_SQL_BLOCK_RULE = 300; public static final short OP_ALTER_SQL_BLOCK_RULE = 301; public static final short OP_DROP_SQL_BLOCK_RULE = 302; + + // policy 310-320 + public static final short OP_CREATE_POLICY = 310; + public static final short OP_DROP_POLICY = 311; // get opcode name by op codeStri public static String getOpName(short opCode) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/DropPolicyLog.java b/fe/fe-core/src/main/java/org/apache/doris/policy/DropPolicyLog.java new file mode 100644 index 0000000000..90fb824e15 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/DropPolicyLog.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.policy; + +import org.apache.doris.analysis.DropPolicyStmt; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.ConnectContext; + +import com.google.gson.annotations.SerializedName; +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Use for transmission drop policy log. + **/ +@AllArgsConstructor +@Getter +public class DropPolicyLog implements Writable { + + @SerializedName(value = "dbId") + private long dbId; + + @SerializedName(value = "tableId") + private long tableId; + + @SerializedName(value = "type") + private PolicyTypeEnum type; + + @SerializedName(value = "policyName") + private String policyName; + + @SerializedName(value = "user") + private UserIdentity user; + + /** + * Generate delete logs through stmt. + **/ + public static DropPolicyLog fromDropStmt(DropPolicyStmt stmt) throws AnalysisException { + String curDb = stmt.getTableName().getDb(); + if (curDb == null) { + curDb = ConnectContext.get().getDatabase(); + } + Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb); + Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl()); + return new DropPolicyLog(db.getId(), table.getId(), stmt.getType(), stmt.getPolicyName(), stmt.getUser()); + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static DropPolicyLog read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), DropPolicyLog.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/FilterType.java b/fe/fe-core/src/main/java/org/apache/doris/policy/FilterType.java new file mode 100644 index 0000000000..49c32a8689 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/FilterType.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.policy; + +import org.apache.doris.analysis.CompoundPredicate; + +import lombok.Getter; + +import java.util.Arrays; + +/** + * Use for associating policies. + **/ +public enum FilterType { + + PERMISSIVE(CompoundPredicate.Operator.OR), + RESTRICTIVE(CompoundPredicate.Operator.AND); + + @Getter + private final CompoundPredicate.Operator op; + + FilterType(CompoundPredicate.Operator op) { + this.op = op; + } + + public static FilterType of(String name) { + return Arrays.stream(FilterType.values()).filter(f -> f.name().equalsIgnoreCase(name)).findFirst() + .orElse(RESTRICTIVE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java new file mode 100644 index 0000000000..d617dda050 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.policy; + +import org.apache.doris.analysis.CreatePolicyStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.StringReader; +import java.util.List; + +/** + * Save policy for filtering data. + **/ +@Data +@AllArgsConstructor +public class Policy implements Writable, GsonPostProcessable { + + public static final String ROW_POLICY = "ROW"; + + private static final Logger LOG = LogManager.getLogger(Policy.class); + + @SerializedName(value = "dbId") + private long dbId; + + @SerializedName(value = "tableId") + private long tableId; + + @SerializedName(value = "policyName") + private String policyName; + + /** + * ROW. + **/ + @SerializedName(value = "type") + private PolicyTypeEnum type; + + /** + * PERMISSIVE | RESTRICTIVE, If multiple types exist, the last type prevails. + **/ + @SerializedName(value = "filterType") + private final FilterType filterType; + + private Expr wherePredicate; + + /** + * Policy bind user. + **/ + @SerializedName(value = "user") + private final UserIdentity user; + + /** + * Use for Serialization/deserialization. + **/ + @SerializedName(value = "originStmt") + private String originStmt; + + /** + * Trans stmt to Policy. + **/ + public static Policy fromCreateStmt(CreatePolicyStmt stmt) throws AnalysisException { + String curDb = stmt.getTableName().getDb(); + if (curDb == null) { + curDb = ConnectContext.get().getDatabase(); + } + Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb); + Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl()); + UserIdentity userIdent = stmt.getUser(); + userIdent.analyze(ConnectContext.get().getClusterName()); + return new Policy(db.getId(), table.getId(), stmt.getPolicyName(), stmt.getType(), stmt.getFilterType(), + stmt.getWherePredicate(), userIdent, stmt.getOrigStmt().originStmt); + } + + /** + * Use for SHOW POLICY. + **/ + public List getShowInfo() throws AnalysisException { + Database database = Catalog.getCurrentCatalog().getDbOrAnalysisException(this.dbId); + Table table = database.getTableOrAnalysisException(this.tableId); + return Lists.newArrayList(this.policyName, database.getFullName(), table.getName(), this.type.name(), + this.filterType.name(), this.wherePredicate.toSql(), this.user.getQualifiedUser(), this.originStmt); + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + /** + * Read policy from file. + **/ + public static Policy read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, Policy.class); + } + + @Override + public void gsonPostProcess() throws IOException { + if (wherePredicate != null) { + return; + } + try { + SqlScanner input = new SqlScanner(new StringReader(originStmt), 0L); + SqlParser parser = new SqlParser(input); + CreatePolicyStmt stmt = (CreatePolicyStmt) SqlParserUtils.getFirstStmt(parser); + wherePredicate = stmt.getWherePredicate(); + } catch (Exception e) { + throw new IOException("policy parse originStmt error", e); + } + } + + @Override + public Policy clone() { + return new Policy(this.dbId, this.tableId, this.policyName, this.type, this.filterType, this.wherePredicate, + this.user, this.originStmt); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java new file mode 100644 index 0000000000..c3a1bebbf8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java @@ -0,0 +1,354 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.policy; + +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.CreatePolicyStmt; +import org.apache.doris.analysis.DropPolicyStmt; +import org.apache.doris.analysis.ShowPolicyStmt; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ShowResultSet; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +/** + * Management policy and cache it. + **/ +public class PolicyMgr implements Writable { + private static final Logger LOG = LogManager.getLogger(PolicyMgr.class); + + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + @SerializedName(value = "dbIdToPolicyMap") + private Map> dbIdToPolicyMap = Maps.newConcurrentMap(); + + /** + * Cache merge policy for match. + * key:dbId:tableId-type-user + **/ + private Map> dbIdToMergePolicyMap = Maps.newConcurrentMap(); + + private Set userPolicySet = Sets.newConcurrentHashSet(); + + private void writeLock() { + lock.writeLock().lock(); + } + + private void writeUnlock() { + lock.writeLock().unlock(); + } + + private void readLock() { + lock.readLock().lock(); + } + + private void readUnlock() { + lock.readLock().unlock(); + } + + /** + * Create policy through stmt. + **/ + public void createPolicy(CreatePolicyStmt stmt) throws UserException { + Policy policy = Policy.fromCreateStmt(stmt); + writeLock(); + try { + if (existPolicy(policy.getDbId(), policy.getTableId(), policy.getType(), + policy.getPolicyName(), policy.getUser())) { + if (stmt.isIfNotExists()) { + return; + } + throw new DdlException("the policy " + policy.getPolicyName() + " already create"); + } + unprotectedAdd(policy); + Catalog.getCurrentCatalog().getEditLog().logCreatePolicy(policy); + } finally { + writeUnlock(); + } + } + + /** + * Drop policy through stmt. + **/ + public void dropPolicy(DropPolicyStmt stmt) throws DdlException, AnalysisException { + DropPolicyLog policy = DropPolicyLog.fromDropStmt(stmt); + writeLock(); + try { + if (!existPolicy(policy.getDbId(), policy.getTableId(), policy.getType(), + policy.getPolicyName(), policy.getUser())) { + if (stmt.isIfExists()) { + return; + } + throw new DdlException("the policy " + policy.getPolicyName() + " not exist"); + } + unprotectedDrop(policy); + Catalog.getCurrentCatalog().getEditLog().logDropPolicy(policy); + } finally { + writeUnlock(); + } + } + + public boolean existPolicy(String user) { + return userPolicySet.contains(user); + } + + private boolean existPolicy(long dbId, long tableId, PolicyTypeEnum type, String policyName, UserIdentity user) { + List policies = getDbPolicies(dbId); + return policies.stream().anyMatch(policy -> matchPolicy(policy, tableId, type, policyName, user)); + } + + private List getDbPolicies(long dbId) { + if (dbIdToPolicyMap == null) { + return new ArrayList<>(); + } + return dbIdToPolicyMap.getOrDefault(dbId, new ArrayList<>()); + } + + private List getDbUserPolicies(long dbId, String user) { + if (dbIdToPolicyMap == null) { + return new ArrayList<>(); + } + return dbIdToPolicyMap.getOrDefault(dbId, new ArrayList<>()).stream() + .filter(p -> p.getUser().getQualifiedUser().equals(user)).collect(Collectors.toList()); + } + + public void replayCreate(Policy policy) { + unprotectedAdd(policy); + LOG.info("replay create policy: {}", policy); + } + + private void unprotectedAdd(Policy policy) { + if (policy == null) { + return; + } + long dbId = policy.getDbId(); + List dbPolicies = getDbPolicies(dbId); + dbPolicies.add(policy); + dbIdToPolicyMap.put(dbId, dbPolicies); + updateMergePolicyMap(dbId); + userPolicySet.add(policy.getUser().getQualifiedUser()); + } + + public void replayDrop(DropPolicyLog log) { + unprotectedDrop(log); + LOG.info("replay drop policy log: {}", log); + } + + private void unprotectedDrop(DropPolicyLog log) { + long dbId = log.getDbId(); + List policies = getDbPolicies(dbId); + policies.removeIf(p -> matchPolicy(p, log.getTableId(), log.getType(), log.getPolicyName(), log.getUser())); + dbIdToPolicyMap.put(dbId, policies); + updateMergePolicyMap(dbId); + if (log.getUser() == null) { + updateAllUserPolicySet(); + } else { + String user = log.getUser().getQualifiedUser(); + if (!existUserPolicy(user)) { + userPolicySet.remove(user); + } + } + } + + private boolean matchPolicy(Policy policy, long tableId, PolicyTypeEnum type, + String policyName, UserIdentity user) { + return policy.getTableId() == tableId + && policy.getType().equals(type) + && StringUtils.equals(policy.getPolicyName(), policyName) + && (user == null || StringUtils.equals(policy.getUser().getQualifiedUser(), user.getQualifiedUser())); + } + + /** + * Match row policy and return it. + **/ + public Policy getMatchRowPolicy(long dbId, long tableId, String user) { + readLock(); + try { + if (!dbIdToMergePolicyMap.containsKey(dbId)) { + return null; + } + String key = Joiner.on("-").join(tableId, Policy.ROW_POLICY, user); + if (!dbIdToMergePolicyMap.get(dbId).containsKey(key)) { + return null; + } + return dbIdToMergePolicyMap.get(dbId).get(key); + } finally { + readUnlock(); + } + } + + /** + * Show policy through stmt. + **/ + public ShowResultSet showPolicy(ShowPolicyStmt showStmt) throws AnalysisException { + List> rows = Lists.newArrayList(); + List policies; + long currentDbId = ConnectContext.get().getCurrentDbId(); + if (showStmt.getUser() == null) { + policies = Catalog.getCurrentCatalog().getPolicyMgr().getDbPolicies(currentDbId); + } else { + policies = Catalog.getCurrentCatalog().getPolicyMgr() + .getDbUserPolicies(currentDbId, showStmt.getUser().getQualifiedUser()); + } + for (Policy policy : policies) { + if (policy.getWherePredicate() == null) { + continue; + } + rows.add(policy.getShowInfo()); + } + return new ShowResultSet(showStmt.getMetaData(), rows); + } + + private void updateAllMergePolicyMap() { + dbIdToPolicyMap.forEach((dbId, policies) -> updateMergePolicyMap(dbId)); + } + + private void updateAllUserPolicySet() { + userPolicySet.clear(); + dbIdToPolicyMap.forEach((dbId, policies) -> + policies.forEach(policy -> userPolicySet.add(policy.getUser().getQualifiedUser()))); + } + + + private boolean existUserPolicy(String user) { + readLock(); + try { + for (Map policies : dbIdToMergePolicyMap.values()) { + for (Policy policy : policies.values()) { + if (policy.getUser().getQualifiedUser().equals(user)) { + return true; + } + } + } + return false; + } finally { + readUnlock(); + } + + } + + /** + * The merge policy cache needs to be regenerated after the update. + **/ + private void updateMergePolicyMap(long dbId) { + readLock(); + try { + if (!dbIdToPolicyMap.containsKey(dbId)) { + return; + } + List policies = dbIdToPolicyMap.get(dbId); + Map andMap = new HashMap<>(); + Map orMap = new HashMap<>(); + for (Policy policy : policies) { + // read from json, need set isAnalyzed + policy.getUser().setIsAnalyzed(); + String key = + Joiner.on("-").join(policy.getTableId(), policy.getType(), policy.getUser().getQualifiedUser()); + // merge wherePredicate + if (CompoundPredicate.Operator.AND.equals(policy.getFilterType().getOp())) { + Policy frontPolicy = andMap.get(key); + if (frontPolicy == null) { + andMap.put(key, policy.clone()); + } else { + frontPolicy.setWherePredicate( + new CompoundPredicate(CompoundPredicate.Operator.AND, frontPolicy.getWherePredicate(), + policy.getWherePredicate())); + andMap.put(key, frontPolicy.clone()); + } + } else { + Policy frontPolicy = orMap.get(key); + if (frontPolicy == null) { + orMap.put(key, policy.clone()); + } else { + frontPolicy.setWherePredicate( + new CompoundPredicate(CompoundPredicate.Operator.OR, frontPolicy.getWherePredicate(), + policy.getWherePredicate())); + orMap.put(key, frontPolicy.clone()); + } + } + } + Map mergeMap = new HashMap<>(); + Set policyKeys = new HashSet<>(); + policyKeys.addAll(andMap.keySet()); + policyKeys.addAll(orMap.keySet()); + policyKeys.forEach(key -> { + if (andMap.containsKey(key) && orMap.containsKey(key)) { + Policy mergePolicy = andMap.get(key).clone(); + mergePolicy.setWherePredicate( + new CompoundPredicate(CompoundPredicate.Operator.AND, mergePolicy.getWherePredicate(), + orMap.get(key).getWherePredicate())); + mergeMap.put(key, mergePolicy); + } + if (!andMap.containsKey(key)) { + mergeMap.put(key, orMap.get(key)); + } + if (!orMap.containsKey(key)) { + mergeMap.put(key, andMap.get(key)); + } + }); + dbIdToMergePolicyMap.put(dbId, mergeMap); + } finally { + readUnlock(); + } + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + /** + * Read policyMgr from file. + **/ + public static PolicyMgr read(DataInput in) throws IOException { + String json = Text.readString(in); + PolicyMgr policyMgr = GsonUtils.GSON.fromJson(json, PolicyMgr.class); + // update merge policy cache + policyMgr.updateAllMergePolicyMap(); + // update user policy cache + policyMgr.updateAllUserPolicySet(); + return policyMgr; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java new file mode 100644 index 0000000000..483b8cd93b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.policy; + +/** + * Policy type enum, currently only row. + **/ +public enum PolicyTypeEnum { + + ROW +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 3d9b45ffcb..5904848b55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -17,11 +17,11 @@ package org.apache.doris.qe; +import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt; import org.apache.doris.analysis.AdminCancelRepairTableStmt; import org.apache.doris.analysis.AdminCheckTabletsStmt; import org.apache.doris.analysis.AdminCleanTrashStmt; import org.apache.doris.analysis.AdminCompactTableStmt; -import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt; import org.apache.doris.analysis.AdminRebalanceDiskStmt; import org.apache.doris.analysis.AdminRepairTableStmt; import org.apache.doris.analysis.AdminSetConfigStmt; @@ -51,6 +51,7 @@ import org.apache.doris.analysis.CreateEncryptKeyStmt; import org.apache.doris.analysis.CreateFileStmt; import org.apache.doris.analysis.CreateFunctionStmt; import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.CreatePolicyStmt; import org.apache.doris.analysis.CreateRepositoryStmt; import org.apache.doris.analysis.CreateResourceStmt; import org.apache.doris.analysis.CreateRoleStmt; @@ -69,6 +70,7 @@ import org.apache.doris.analysis.DropEncryptKeyStmt; import org.apache.doris.analysis.DropFileStmt; import org.apache.doris.analysis.DropFunctionStmt; import org.apache.doris.analysis.DropMaterializedViewStmt; +import org.apache.doris.analysis.DropPolicyStmt; import org.apache.doris.analysis.DropRepositoryStmt; import org.apache.doris.analysis.DropResourceStmt; import org.apache.doris.analysis.DropRoleStmt; @@ -305,6 +307,10 @@ public class DdlExecutor { catalog.getStatisticsJobManager().createStatisticsJob((AnalyzeStmt) ddlStmt); } else if (ddlStmt instanceof AlterResourceStmt) { catalog.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt); + } else if (ddlStmt instanceof CreatePolicyStmt) { + catalog.getPolicyMgr().createPolicy((CreatePolicyStmt) ddlStmt); + } else if (ddlStmt instanceof DropPolicyStmt) { + catalog.getPolicyMgr().dropPolicy((DropPolicyStmt) ddlStmt); } else { throw new DdlException("Unknown statement."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 2a1779c8d6..f04cfe31f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -60,6 +60,7 @@ import org.apache.doris.analysis.ShowMigrationsStmt; import org.apache.doris.analysis.ShowPartitionIdStmt; import org.apache.doris.analysis.ShowPartitionsStmt; import org.apache.doris.analysis.ShowPluginsStmt; +import org.apache.doris.analysis.ShowPolicyStmt; import org.apache.doris.analysis.ShowProcStmt; import org.apache.doris.analysis.ShowProcesslistStmt; import org.apache.doris.analysis.ShowQueryProfileStmt; @@ -344,6 +345,8 @@ public class ShowExecutor { handleAdminDiagnoseTablet(); } else if (stmt instanceof ShowCreateMaterializedViewStmt) { handleShowCreateMaterializedView(); + } else if (stmt instanceof ShowPolicyStmt) { + handleShowPolicy(); } else { handleEmtpy(); } @@ -2205,4 +2208,9 @@ public class ShowExecutor { resultSet = new ShowResultSet(showStmt.getMetaData(), resultRowSet); } + public void handleShowPolicy() throws AnalysisException { + ShowPolicyStmt showStmt = (ShowPolicyStmt) stmt; + resultSet = Catalog.getCurrentCatalog().getPolicyMgr().showPolicy(showStmt); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index b8280cff63..057ceb5a89 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -35,6 +35,7 @@ import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.analysis.SelectListItem; import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.SetOperationStmt; import org.apache.doris.analysis.SetStmt; import org.apache.doris.analysis.SetVar; import org.apache.doris.analysis.ShowStmt; @@ -676,6 +677,25 @@ public class StmtExecutor implements ProfileWriter { parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt); reAnalyze = true; } + if (parsedStmt instanceof SelectStmt) { + if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) { + reAnalyze = true; + } + } + if (parsedStmt instanceof SetOperationStmt) { + List operands = ((SetOperationStmt) parsedStmt).getOperands(); + for (SetOperationStmt.SetOperand operand : operands) { + if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)) { + reAnalyze = true; + } + } + } + if (parsedStmt instanceof InsertStmt) { + QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt(); + if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) { + reAnalyze = true; + } + } if (reAnalyze) { // The rewrites should have no user-visible effect. Remember the original result // types and column labels to restore them after the rewritten stmt has been diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index aa2a3f3ff9..8f3dd68a35 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -317,6 +317,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("profile", new Integer(SqlParserSymbols.KW_PROFILE)); keywordMap.put("properties", new Integer(SqlParserSymbols.KW_PROPERTIES)); keywordMap.put("property", new Integer(SqlParserSymbols.KW_PROPERTY)); + keywordMap.put("policy", new Integer(SqlParserSymbols.KW_POLICY)); keywordMap.put("query", new Integer(SqlParserSymbols.KW_QUERY)); keywordMap.put("quota", new Integer(SqlParserSymbols.KW_QUOTA)); keywordMap.put("random", new Integer(SqlParserSymbols.KW_RANDOM)); @@ -380,9 +381,11 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("superuser", new Integer(SqlParserSymbols.KW_SUPERUSER)); keywordMap.put("sync", new Integer(SqlParserSymbols.KW_SYNC)); keywordMap.put("system", new Integer(SqlParserSymbols.KW_SYSTEM)); + keywordMap.put("sql_block_rule", new Integer(SqlParserSymbols.KW_SQL_BLOCK_RULE)); keywordMap.put("table", new Integer(SqlParserSymbols.KW_TABLE)); keywordMap.put("tables", new Integer(SqlParserSymbols.KW_TABLES)); keywordMap.put("tablet", new Integer(SqlParserSymbols.KW_TABLET)); + keywordMap.put("tablets", new Integer(SqlParserSymbols.KW_TABLETS)); keywordMap.put("task", new Integer(SqlParserSymbols.KW_TASK)); keywordMap.put("temporary", new Integer(SqlParserSymbols.KW_TEMPORARY)); keywordMap.put("terminated", new Integer(SqlParserSymbols.KW_TERMINATED)); @@ -428,8 +431,6 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("write", new Integer(SqlParserSymbols.KW_WRITE)); keywordMap.put("year", new Integer(SqlParserSymbols.KW_YEAR)); keywordMap.put("||", new Integer(SqlParserSymbols.KW_PIPE)); - keywordMap.put("sql_block_rule", new Integer(SqlParserSymbols.KW_SQL_BLOCK_RULE)); - keywordMap.put("tablets", new Integer(SqlParserSymbols.KW_TABLETS)); } // map from token id to token description diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 9908f9b804..9fe32fd36a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -61,7 +61,9 @@ import org.apache.doris.transaction.TabletCommitInfo; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -76,10 +78,9 @@ import java.util.List; import java.util.Map; import java.util.Set; -import mockit.Expectations; -import mockit.Injectable; -import mockit.Mocked; - +/** + * Test for SparkLoadJobTest. + **/ public class SparkLoadJobTest { private long dbId; private String dbName; @@ -100,6 +101,9 @@ public class SparkLoadJobTest { private long backendId; private int schemaHash; + /** + * Init. + **/ @Before public void setUp() { dbId = 1L; @@ -207,16 +211,19 @@ public class SparkLoadJobTest { }; ResourceDesc resourceDesc = new ResourceDesc(resourceName, Maps.newHashMap()); - SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0")); + SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, + new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0")); job.execute(); Assert.assertEquals(JobState.PENDING, job.getState()); } @Test - public void testOnPendingTaskFinished(@Mocked Catalog catalog, @Injectable String originStmt) throws MetaNotFoundException { + public void testOnPendingTaskFinished(@Mocked Catalog catalog, @Injectable String originStmt) + throws MetaNotFoundException { ResourceDesc resourceDesc = new ResourceDesc(resourceName, Maps.newHashMap()); - SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0")); + SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, + new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0")); SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId); attachment.setAppId(appId); attachment.setOutputPath(etlOutputPath); @@ -235,7 +242,8 @@ public class SparkLoadJobTest { sparkConfigs.put("spark.master", "yarn"); sparkConfigs.put("spark.submit.deployMode", "cluster"); sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999"); - SparkLoadJob job = new SparkLoadJob(dbId, label, null, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0")); + SparkLoadJob job = new SparkLoadJob(dbId, label, null, + new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0")); job.state = JobState.ETL; job.setMaxFilterRatio(0.15); job.transactionId = transactionId; @@ -500,18 +508,20 @@ public class SparkLoadJobTest { result = resourceMgr; resourceMgr.getResource(anyString); result = sparkResource; + Catalog.getCurrentCatalogJournalVersion(); + result = FeMetaVersion.VERSION_CURRENT; } }; String label = "label1"; ResourceDesc resourceDesc = new ResourceDesc("my_spark", Maps.newHashMap()); - String oriStmt = "LOAD LABEL db1.label1\n" + - "(\n" + - "DATA INFILE(\"hdfs://127.0.0.1:8000/user/palo/data/input/file\")\n" + - "INTO TABLE `my_table`\n" + - "WHERE k1 > 10\n" + - ")\n" + - "WITH RESOURCE 'my_spark';"; + String oriStmt = "LOAD LABEL db1.label1\n" + + "(\n" + + "DATA INFILE(\"hdfs://127.0.0.1:8000/user/palo/data/input/file\")\n" + + "INTO TABLE `my_table`\n" + + "WHERE k1 > 10\n" + + ")\n" + + "WITH RESOURCE 'my_spark';"; OriginStatement originStmt = new OriginStatement(oriStmt, 0); UserIdentity userInfo = UserIdentity.ADMIN; SparkLoadJob sparkLoadJob = new SparkLoadJob(dbId, label, resourceDesc, originStmt, userInfo); @@ -526,13 +536,10 @@ public class SparkLoadJobTest { file.createNewFile(); DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); sparkLoadJob.write(dos); - dos.flush(); dos.close(); - // 2. Read objects from file DataInputStream dis = new DataInputStream(new FileInputStream(file)); - SparkLoadJob sparkLoadJob2 = (SparkLoadJob) SparkLoadJob.read(dis); Assert.assertEquals("my_spark", sparkLoadJob2.getResourceName()); Assert.assertEquals(label, sparkLoadJob2.getLabel()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java new file mode 100644 index 0000000000..6bbc977e71 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.policy; + +import org.apache.doris.analysis.CreateUserStmt; +import org.apache.doris.analysis.GrantStmt; +import org.apache.doris.analysis.ShowPolicyStmt; +import org.apache.doris.analysis.TablePattern; +import org.apache.doris.analysis.UserDesc; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.AccessPrivilege; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.FeConstants; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; + +/** + * Test for Policy. + **/ +public class PolicyTest extends TestWithFeService { + + @Override + protected void runBeforeAll() throws Exception { + FeConstants.runningUnitTest = true; + createDatabase("test"); + useDatabase("test"); + createTable("create table table1\n" + + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n" + + "properties(\"replication_num\" = \"1\");"); + createTable("create table table2\n" + + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n" + + "properties(\"replication_num\" = \"1\");"); + // create user + UserIdentity user = new UserIdentity("test_policy", "%"); + user.analyze(SystemInfoService.DEFAULT_CLUSTER); + CreateUserStmt createUserStmt = new CreateUserStmt(new UserDesc(user)); + Catalog.getCurrentCatalog().getAuth().createUser(createUserStmt); + List privileges = Lists.newArrayList(AccessPrivilege.ADMIN_PRIV); + TablePattern tablePattern = new TablePattern("*", "*"); + tablePattern.analyze(SystemInfoService.DEFAULT_CLUSTER); + GrantStmt grantStmt = new GrantStmt(user, null, tablePattern, privileges); + Catalog.getCurrentCatalog().getAuth().grant(grantStmt); + useUser("test_policy"); + } + + @Test + public void testNoPolicy() throws Exception { + useUser("root"); + String queryStr = "EXPLAIN select * from test.table1"; + String explainString = getSQLPlanOrErrorMsg(queryStr); + useUser("test_policy"); + Assertions.assertFalse(explainString.contains("`k1` = 1")); + } + + @Test + public void testExistPolicy() throws Exception { + createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)"); + Assertions.assertTrue(Catalog.getCurrentCatalog().getPolicyMgr().existPolicy("default_cluster:test_policy")); + dropPolicy("DROP ROW POLICY test_row_policy ON test.table1 FOR test_policy"); + Assertions.assertFalse(Catalog.getCurrentCatalog().getPolicyMgr().existPolicy("default_cluster:test_policy")); + createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)"); + dropPolicy("DROP ROW POLICY test_row_policy ON test.table1"); + Assertions.assertFalse(Catalog.getCurrentCatalog().getPolicyMgr().existPolicy("default_cluster:test_policy")); + } + + @Test + public void testNormalSql() throws Exception { + createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)"); + String queryStr = "EXPLAIN select * from test.table1"; + String explainString = getSQLPlanOrErrorMsg(queryStr); + Assertions.assertTrue(explainString.contains("`k1` = 1")); + dropPolicy("DROP ROW POLICY test_row_policy ON test.table1 FOR test_policy"); + } + + @Test + public void testUnionSql() throws Exception { + createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)"); + String queryStr = "EXPLAIN select * from test.table1 union all select * from test.table1"; + String explainString = getSQLPlanOrErrorMsg(queryStr); + Assertions.assertTrue(explainString.contains("`k1` = 1")); + dropPolicy("DROP ROW POLICY test_row_policy ON test.table1 FOR test_policy"); + } + + @Test + public void testInsertSelectSql() throws Exception { + createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)"); + String queryStr = "EXPLAIN insert into test.table1 select * from test.table1"; + String explainString = getSQLPlanOrErrorMsg(queryStr); + Assertions.assertTrue(explainString.contains("`k1` = 1")); + dropPolicy("DROP ROW POLICY test_row_policy ON test.table1 FOR test_policy"); + } + + @Test + public void testDuplicateAddPolicy() throws Exception { + createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)"); + createPolicy("CREATE ROW POLICY IF NOT EXISTS test_row_policy1 ON test.table1 AS PERMISSIVE" + + " TO test_policy USING (k1 = 1)"); + ExceptionChecker.expectThrowsWithMsg(DdlException.class, "the policy test_row_policy1 already create", + () -> createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE" + + " TO test_policy USING (k1 = 1)")); + dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1"); + } + + @Test + public void testNoAuth() { + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, + "CreatePolicyStmt command denied to user 'root'@'%' for table 'table1'", + () -> createPolicy( + "CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE TO root USING (k1 = 1)")); + } + + @Test + public void testShowPolicy() throws Exception { + createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)"); + createPolicy("CREATE ROW POLICY test_row_policy2 ON test.table1 AS PERMISSIVE TO test_policy USING (k2 = 1)"); + ShowPolicyStmt showPolicyStmt = + (ShowPolicyStmt) parseAndAnalyzeStmt("SHOW ROW POLICY"); + int firstSize = Catalog.getCurrentCatalog().getPolicyMgr().showPolicy(showPolicyStmt).getResultRows().size(); + Assertions.assertTrue(firstSize > 0); + dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1"); + dropPolicy("DROP ROW POLICY test_row_policy2 ON test.table1"); + int secondSize = Catalog.getCurrentCatalog().getPolicyMgr().showPolicy(showPolicyStmt).getResultRows().size(); + Assertions.assertEquals(2, firstSize - secondSize); + } + + @Test + public void testDropPolicy() throws Exception { + createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE TO test_policy USING (k2 = 1)"); + dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1"); + dropPolicy("DROP ROW POLICY IF EXISTS test_row_policy5 ON test.table1"); + ExceptionChecker.expectThrowsWithMsg(DdlException.class, "the policy test_row_policy1 not exist", + () -> dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1")); + } + + @Test + public void testMergeFilter() throws Exception { + createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS RESTRICTIVE TO test_policy USING (k1 = 1)"); + createPolicy("CREATE ROW POLICY test_row_policy2 ON test.table1 AS RESTRICTIVE TO test_policy USING (k2 = 1)"); + createPolicy("CREATE ROW POLICY test_row_policy3 ON test.table1 AS PERMISSIVE TO test_policy USING (k2 = 2)"); + createPolicy("CREATE ROW POLICY test_row_policy4 ON test.table1 AS PERMISSIVE TO test_policy USING (k2 = 1)"); + String queryStr = "EXPLAIN select * from test.table1"; + String explainString = getSQLPlanOrErrorMsg(queryStr); + Assertions.assertTrue(explainString.contains("`k1` = 1, `k2` = 1, `k2` = 2 OR `k2` = 1")); + dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1"); + dropPolicy("DROP ROW POLICY test_row_policy2 ON test.table1"); + dropPolicy("DROP ROW POLICY test_row_policy3 ON test.table1"); + dropPolicy("DROP ROW POLICY test_row_policy4 ON test.table1"); + } + + @Test + public void testComplexSql() throws Exception { + createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS RESTRICTIVE TO test_policy USING (k1 = 1)"); + createPolicy("CREATE ROW POLICY test_row_policy2 ON test.table1 AS RESTRICTIVE TO test_policy USING (k2 = 1)"); + String joinSql = "select * from table1 join table2 on table1.k1=table2.k1"; + System.out.println(getSQLPlanOrErrorMsg(joinSql)); + Assertions.assertTrue(getSQLPlanOrErrorMsg(joinSql).contains( + " TABLE: table1\n" + + " PREAGGREGATION: ON\n" + + " PREDICATES: `k1` = 1, `k2` = 1")); + String unionSql = "select * from table1 union select * from table2"; + Assertions.assertTrue(getSQLPlanOrErrorMsg(unionSql).contains( + " TABLE: table1\n" + + " PREAGGREGATION: ON\n" + + " PREDICATES: `k1` = 1, `k2` = 1")); + String subQuerySql = "select * from table2 where k1 in (select k1 from table1)"; + Assertions.assertTrue(getSQLPlanOrErrorMsg(subQuerySql).contains( + " TABLE: table1\n" + + " PREAGGREGATION: ON\n" + + " PREDICATES: `k1` = 1, `k2` = 1")); + String aliasSql = "select * from table1 t1 join table2 t2 on t1.k1=t2.k1"; + Assertions.assertTrue(getSQLPlanOrErrorMsg(aliasSql).contains( + " TABLE: table1\n" + + " PREAGGREGATION: ON\n" + + " PREDICATES: `k1` = 1, `k2` = 1")); + dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1"); + dropPolicy("DROP ROW POLICY test_row_policy2 ON test.table1"); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 5c2352cd48..62c9ed44b0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -17,27 +17,12 @@ package org.apache.doris.utframe; -import java.io.File; -import java.io.IOException; -import java.io.StringReader; -import java.net.DatagramSocket; -import java.net.ServerSocket; -import java.net.SocketException; -import java.nio.channels.SocketChannel; -import java.nio.file.Files; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import org.apache.commons.io.FileUtils; - import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreatePolicyStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.CreateViewStmt; +import org.apache.doris.analysis.DropPolicyStmt; import org.apache.doris.analysis.ExplainOptions; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; @@ -45,6 +30,7 @@ import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.DiskInfo; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -53,6 +39,7 @@ import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.mysql.privilege.PaloAuth; import org.apache.doris.planner.Planner; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.system.Backend; @@ -65,11 +52,27 @@ import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException; import org.apache.doris.utframe.MockedFrontend.FeStartException; import org.apache.doris.utframe.MockedFrontend.NotInitException; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestInstance; +import java.io.File; +import java.io.IOException; +import java.io.StringReader; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.net.SocketException; +import java.nio.channels.SocketChannel; +import java.nio.file.Files; +import java.util.List; +import java.util.Map; +import java.util.UUID; + /** * This is the base class for unit class that wants to start a FE service. *

@@ -87,7 +90,7 @@ import org.junit.jupiter.api.TestInstance; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class TestWithFeService { protected String runningDir = - "fe/mocked/" + getClass().getSimpleName() + "/" + UUID.randomUUID() + "/"; + "fe/mocked/" + getClass().getSimpleName() + "/" + UUID.randomUUID() + "/"; protected ConnectContext connectContext; @BeforeAll @@ -124,11 +127,11 @@ public abstract class TestWithFeService { // Parse an origin stmt and analyze it. Return a StatementBase instance. protected StatementBase parseAndAnalyzeStmt(String originStmt) - throws Exception { + throws Exception { System.out.println("begin to parse stmt: " + originStmt); SqlScanner input = - new SqlScanner(new StringReader(originStmt), - connectContext.getSessionVariable().getSqlMode()); + new SqlScanner(new StringReader(originStmt), + connectContext.getSessionVariable().getSqlMode()); SqlParser parser = new SqlParser(input); Analyzer analyzer = new Analyzer(connectContext.getCatalog(), connectContext); StatementBase statementBase = null; @@ -143,6 +146,7 @@ public abstract class TestWithFeService { throw new AnalysisException(errorMessage, e); } } + statementBase.setOrigStmt(new OriginStatement(originStmt, 0)); statementBase.analyze(analyzer); return statementBase; } @@ -150,7 +154,8 @@ public abstract class TestWithFeService { // for analyzing multi statements protected List parseAndAnalyzeStmts(String originStmt) throws Exception { System.out.println("begin to parse stmts: " + originStmt); - SqlScanner input = new SqlScanner(new StringReader(originStmt), connectContext.getSessionVariable().getSqlMode()); + SqlScanner input = + new SqlScanner(new StringReader(originStmt), connectContext.getSessionVariable().getSqlMode()); SqlParser parser = new SqlParser(input); Analyzer analyzer = new Analyzer(connectContext.getCatalog(), connectContext); List statementBases = null; @@ -180,7 +185,7 @@ public abstract class TestWithFeService { } protected int startFEServer(String runningDir) throws EnvVarNotSetException, IOException, - FeStartException, NotInitException, DdlException, InterruptedException { + FeStartException, NotInitException, DdlException, InterruptedException { // get DORIS_HOME String dorisHome = System.getenv("DORIS_HOME"); if (Strings.isNullOrEmpty(dorisHome)) { @@ -213,14 +218,14 @@ public abstract class TestWithFeService { } protected void createDorisCluster() - throws InterruptedException, NotInitException, IOException, DdlException, - EnvVarNotSetException, FeStartException { + throws InterruptedException, NotInitException, IOException, DdlException, + EnvVarNotSetException, FeStartException { createDorisCluster(runningDir, 1); } protected void createDorisCluster(String runningDir, int backendNum) - throws EnvVarNotSetException, IOException, FeStartException, - NotInitException, DdlException, InterruptedException { + throws EnvVarNotSetException, IOException, FeStartException, + NotInitException, DdlException, InterruptedException { int fe_rpc_port = startFEServer(runningDir); for (int i = 0; i < backendNum; i++) { createBackend("127.0.0.1", fe_rpc_port); @@ -233,8 +238,8 @@ public abstract class TestWithFeService { // the host of BE will be "127.0.0.1", "127.0.0.2" protected void createDorisClusterWithMultiTag(String runningDir, int backendNum) - throws EnvVarNotSetException, IOException, FeStartException, NotInitException, - DdlException, InterruptedException { + throws EnvVarNotSetException, IOException, FeStartException, NotInitException, + DdlException, InterruptedException { // set runningUnitTest to true, so that for ut, the agent task will be send to "127.0.0.1" to make cluster running well. FeConstants.runningUnitTest = true; int fe_rpc_port = startFEServer(runningDir); @@ -247,7 +252,7 @@ public abstract class TestWithFeService { } protected void createBackend(String beHost, int fe_rpc_port) - throws IOException, InterruptedException { + throws IOException, InterruptedException { int be_heartbeat_port = findValidPort(); int be_thrift_port = findValidPort(); int be_brpc_port = findValidPort(); @@ -255,14 +260,15 @@ public abstract class TestWithFeService { // start be MockedBackend backend = MockedBackendFactory.createBackend(beHost, - be_heartbeat_port, be_thrift_port, be_brpc_port, be_http_port, - new DefaultHeartbeatServiceImpl(be_thrift_port, be_http_port, be_brpc_port), - new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); + be_heartbeat_port, be_thrift_port, be_brpc_port, be_http_port, + new DefaultHeartbeatServiceImpl(be_thrift_port, be_http_port, be_brpc_port), + new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); backend.setFeAddress(new TNetworkAddress("127.0.0.1", fe_rpc_port)); backend.start(); // add be - Backend be = new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(), backend.getHeartbeatPort()); + Backend be = + new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(), backend.getHeartbeatPort()); Map disks = Maps.newHashMap(); DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId()); diskInfo1.setTotalCapacityB(1000000); @@ -351,6 +357,10 @@ public abstract class TestWithFeService { Catalog.getCurrentCatalog().createDb(createDbStmt); } + protected void useDatabase(String dbName) { + connectContext.setDatabase(ClusterNamespace.getFullName(SystemInfoService.DEFAULT_CLUSTER, dbName)); + } + protected void createTable(String sql) throws Exception { createTables(sql); } @@ -367,6 +377,16 @@ public abstract class TestWithFeService { Catalog.getCurrentCatalog().createView(createViewStmt); } + protected void createPolicy(String sql) throws Exception { + CreatePolicyStmt createPolicyStmt = (CreatePolicyStmt) parseAndAnalyzeStmt(sql); + Catalog.getCurrentCatalog().getPolicyMgr().createPolicy(createPolicyStmt); + } + + protected void dropPolicy(String sql) throws Exception { + DropPolicyStmt stmt = (DropPolicyStmt) parseAndAnalyzeStmt(sql); + Catalog.getCurrentCatalog().getPolicyMgr().dropPolicy(stmt); + } + protected void assertSQLPlanOrErrorMsgContains(String sql, String expect) throws Exception { // Note: adding `EXPLAIN` is necessary for non-query SQL, e.g., DDL, DML, etc. // TODO: Use a graceful way to get explain plan string, rather than modifying the SQL string. @@ -379,4 +399,11 @@ public abstract class TestWithFeService { Assertions.assertTrue(str.contains(expect)); } } + + protected void useUser(String userName) throws AnalysisException { + UserIdentity user = new UserIdentity(userName, "%"); + user.analyze(SystemInfoService.DEFAULT_CLUSTER); + connectContext.setCurrentUserIdentity(user); + connectContext.setQualifiedUser(SystemInfoService.DEFAULT_CLUSTER + ":" + userName); + } }