[feature] support row policy filter (#9206)

This commit is contained in:
Stalary
2022-05-11 22:11:10 +08:00
committed by GitHub
parent 289608cc20
commit f11d320213
32 changed files with 1832 additions and 140 deletions

View File

@ -48,6 +48,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Version;
import org.apache.doris.mysql.MysqlPassword;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.policy.PolicyTypeEnum;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -260,7 +261,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A
KW_NAME, KW_NAMES, KW_NEGATIVE, KW_NO, KW_NOT, KW_NULL, KW_NULLS,
KW_OBSERVER, KW_OFFSET, KW_ON, KW_ONLY, KW_OPEN, KW_OR, KW_ORDER, KW_OUTER, KW_OUTFILE, KW_OVER,
KW_PARAMETER, KW_PARTITION, KW_PARTITIONS, KW_PASSWORD, KW_LDAP_ADMIN_PASSWORD, KW_PATH, KW_PAUSE, KW_PIPE, KW_PRECEDING,
KW_PLUGIN, KW_PLUGINS,
KW_PLUGIN, KW_PLUGINS, KW_POLICY,
KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROFILE, KW_PROPERTIES, KW_PROPERTY,
KW_QUERY, KW_QUOTA,
KW_RANDOM, KW_RANGE, KW_READ, KW_REBALANCE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RELEASE, KW_RENAME,
@ -269,13 +270,13 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A
KW_S3, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, KW_SELECT, KW_SEMI, KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SETS, KW_SHOW, KW_SIGNED,
KW_SKEW,
KW_SMALLINT, KW_SNAPSHOT, KW_SONAME, KW_SPLIT, KW_START, KW_STATUS, KW_STATS, KW_STOP, KW_STORAGE, KW_STREAM, KW_STRING, KW_STRUCT,
KW_SUM, KW_SUPERUSER, KW_SYNC, KW_SYSTEM,
KW_SUM, KW_SUPERUSER, KW_SYNC, KW_SYSTEM, KW_SQL_BLOCK_RULE,
KW_TABLE, KW_TABLES, KW_TABLET, KW_TABLETS, KW_TASK, KW_TEMPORARY, KW_TERMINATED, KW_TEXT, KW_THAN, KW_TIME, KW_THEN, KW_TIMESTAMP, KW_TINYINT,KW_TRASH,
KW_TO, KW_TRANSACTION, KW_TRIGGERS, KW_TRIM, KW_TRUE, KW_TRUNCATE, KW_TYPE, KW_TYPES,
KW_UNCOMMITTED, KW_UNBOUNDED, KW_UNION, KW_UNIQUE, KW_UNLOCK, KW_UNSIGNED, KW_USE, KW_USER, KW_USING, KW_UNINSTALL,
KW_VALUE, KW_VALUES, KW_VARCHAR, KW_VARIABLES, KW_VERBOSE, KW_VIEW,
KW_WARNINGS, KW_WEEK, KW_WHEN, KW_WHITELIST, KW_WHERE, KW_WITH, KW_WORK, KW_WRITE,
KW_YEAR, KW_SQL_BLOCK_RULE;
KW_YEAR;
terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT;
terminal BITAND, BITOR, BITXOR, BITNOT;
@ -1353,6 +1354,12 @@ create_stmt ::=
{:
RESULT = new CreateSqlBlockRuleStmt(ruleName, properties);
:}
/* row policy */
| KW_CREATE KW_ROW KW_POLICY opt_if_not_exists:ifNotExists ident:policyName KW_ON table_name:tbl KW_AS ident:filterType KW_TO user_identity:user
KW_USING LPAREN expr:wherePredicate RPAREN
{:
RESULT = new CreatePolicyStmt(PolicyTypeEnum.ROW, ifNotExists, policyName, tbl, filterType, user, wherePredicate);
:}
;
channel_desc_list ::=
@ -2058,6 +2065,14 @@ drop_stmt ::=
{:
RESULT = new DropSqlBlockRuleStmt(ruleNames);
:}
| KW_DROP KW_ROW KW_POLICY opt_if_exists:ifExists ident:policyName KW_ON table_name:tbl
{:
RESULT = new DropPolicyStmt(PolicyTypeEnum.ROW, ifExists, policyName, tbl, null);
:}
| KW_DROP KW_ROW KW_POLICY opt_if_exists:ifExists ident:policyName KW_ON table_name:tbl KW_FOR user_identity:user
{:
RESULT = new DropPolicyStmt(PolicyTypeEnum.ROW, ifExists, policyName, tbl, user);
:}
;
// Recover statement
@ -2546,6 +2561,14 @@ show_stmt ::=
{:
RESULT = new ShowSqlBlockRuleStmt(null);
:}
| KW_SHOW KW_ROW KW_POLICY KW_FOR user_identity:user
{:
RESULT = new ShowPolicyStmt(PolicyTypeEnum.ROW, user);
:}
| KW_SHOW KW_ROW KW_POLICY
{:
RESULT = new ShowPolicyStmt(PolicyTypeEnum.ROW, null);
:}
;
show_param ::=
@ -5785,6 +5808,8 @@ keyword ::=
{: RESULT = id; :}
| KW_VARCHAR:id
{: RESULT = id; :}
| KW_POLICY:id
{: RESULT = id; :}
;
// Identifier that contain keyword

View File

@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.policy.FilterType;
import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.qe.ConnectContext;
import lombok.Getter;
/**
* Create policy statement.
* syntax:
* CREATE ROW POLICY [IF NOT EXISTS] test_row_policy ON test_table AS {PERMISSIVE|RESTRICTIVE} TO user USING (a = ’xxx‘)
*/
public class CreatePolicyStmt extends DdlStmt {
@Getter
private final PolicyTypeEnum type;
@Getter
private final boolean ifNotExists;
@Getter
private final String policyName;
@Getter
private final TableName tableName;
@Getter
private final FilterType filterType;
@Getter
private final UserIdentity user;
@Getter
private Expr wherePredicate;
/**
* Use for cup.
**/
public CreatePolicyStmt(PolicyTypeEnum type, boolean ifNotExists, String policyName, TableName tableName, String filterType,
UserIdentity user, Expr wherePredicate) {
this.type = type;
this.ifNotExists = ifNotExists;
this.policyName = policyName;
this.tableName = tableName;
this.filterType = FilterType.of(filterType);
this.user = user;
this.wherePredicate = wherePredicate;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
tableName.analyze(analyzer);
user.analyze(analyzer.getClusterName());
if (user.isRootUser() || user.isAdminUser()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CreatePolicyStmt",
user.getQualifiedUser(), user.getHost(), tableName.getTbl());
}
// check auth
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("CREATE ").append(type).append(" POLICY ");
if (ifNotExists) {
sb.append("IF NOT EXISTS");
}
sb.append(policyName).append(" ON ").append(tableName.toSql()).append(" AS ").append(filterType)
.append(" TO ").append(user.getQualifiedUser()).append(" USING ").append(wherePredicate.toSql());
return sb.toString();
}
}

View File

@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.qe.ConnectContext;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* Drop policy statement.
* syntax:
* DROP [ROW] POLICY [IF EXISTS] test_row_policy ON test_table [FOR user]
**/
@AllArgsConstructor
public class DropPolicyStmt extends DdlStmt {
@Getter
private final PolicyTypeEnum type;
@Getter
private final boolean ifExists;
@Getter
private final String policyName;
@Getter
private final TableName tableName;
@Getter
private final UserIdentity user;
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
tableName.analyze(analyzer);
if (user != null) {
user.analyze(analyzer.getClusterName());
}
// check auth
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("DROP ").append(type).append(" POLICY ");
if (ifExists) {
sb.append("IF EXISTS ");
}
sb.append(policyName).append(" ON ").append(tableName.toSql());
if (user != null) {
sb.append(" FOR ").append(user.getQualifiedUser());
}
return sb.toString();
}
}

View File

@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import lombok.Getter;
/**
* Show policy statement
* syntax:
* SHOW ROW POLICY [FOR user]
**/
public class ShowPolicyStmt extends ShowStmt {
@Getter
private final PolicyTypeEnum type;
@Getter
private final UserIdentity user;
public ShowPolicyStmt(PolicyTypeEnum type, UserIdentity user) {
this.type = type;
this.user = user;
}
private static final ShowResultSetMetaData ROW_META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("PolicyName", ScalarType.createVarchar(100)))
.addColumn(new Column("DbName", ScalarType.createVarchar(100)))
.addColumn(new Column("TableName", ScalarType.createVarchar(100)))
.addColumn(new Column("Type", ScalarType.createVarchar(20)))
.addColumn(new Column("FilterType", ScalarType.createVarchar(20)))
.addColumn(new Column("WherePredicate", ScalarType.createVarchar(65535)))
.addColumn(new Column("User", ScalarType.createVarchar(20)))
.addColumn(new Column("OriginStmt", ScalarType.createVarchar(65535)))
.build();
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
if (user != null) {
user.analyze(analyzer.getClusterName());
}
// check auth
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("SHOW ").append(type).append(" POLICY");
if (user != null) {
sb.append(" FOR ").append(user);
}
return sb.toString();
}
@Override
public ShowResultSetMetaData getMetaData() {
return ROW_META_DATA;
}
}

View File

@ -20,16 +20,20 @@
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.TableAliasGenerator;
import org.apache.doris.common.UserException;
import org.apache.doris.policy.Policy;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,8 +58,8 @@ public class StmtRewriter {
Preconditions.checkNotNull(analyzedStmt.analyzer);
return rewriteQueryStatement(analyzedStmt, analyzer);
} else if (parsedStmt instanceof InsertStmt) {
final InsertStmt insertStmt = (InsertStmt)parsedStmt;
final QueryStmt analyzedStmt = (QueryStmt)insertStmt.getQueryStmt();
final InsertStmt insertStmt = (InsertStmt) parsedStmt;
final QueryStmt analyzedStmt = (QueryStmt) insertStmt.getQueryStmt();
Preconditions.checkNotNull(analyzedStmt.analyzer);
QueryStmt rewrittenQueryStmt = rewriteQueryStatement(analyzedStmt, analyzer);
insertStmt.setQueryStmt(rewrittenQueryStmt);
@ -66,10 +70,10 @@ public class StmtRewriter {
return parsedStmt;
}
/**
* Calls the appropriate equal method based on the specific type of query stmt. See
* rewriteSelectStatement() and rewriteUnionStatement() documentation.
*/
/**
* Calls the appropriate equal method based on the specific type of query stmt. See
* rewriteSelectStatement() and rewriteUnionStatement() documentation.
*/
public static QueryStmt rewriteQueryStatement(QueryStmt stmt, Analyzer analyzer)
throws AnalysisException {
Preconditions.checkNotNull(stmt);
@ -88,9 +92,11 @@ public class StmtRewriter {
throws AnalysisException {
SelectStmt result = stmt;
// Rewrite all the subqueries in the FROM clause.
for (TableRef tblRef: result.fromClause_) {
if (!(tblRef instanceof InlineViewRef)) continue;
InlineViewRef inlineViewRef = (InlineViewRef)tblRef;
for (TableRef tblRef : result.fromClause_) {
if (!(tblRef instanceof InlineViewRef)) {
continue;
}
InlineViewRef inlineViewRef = (InlineViewRef) tblRef;
QueryStmt rewrittenQueryStmt = rewriteQueryStatement(inlineViewRef.getViewStmt(),
inlineViewRef.getAnalyzer());
inlineViewRef.setViewStmt(rewrittenQueryStmt);
@ -127,37 +133,39 @@ public class StmtRewriter {
* For example:
* select cs_item_sk, sum(cs_sales_price) from catalog_sales a group by cs_item_sk
* having sum(cs_sales_price) >
* (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk);
* (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk);
*
* <p>
* Step1: equal having subquery to where subquery
* Outer query is changed to inline view in rewritten query
* Inline view of outer query:
* from (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
* from (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
* Rewritten subquery of expr:
* where a.sum_cs_sales_price >
* (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk)
* where a.sum_cs_sales_price >
* (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk)
* Rewritten query:
* select cs_item_sk, a.sum_cs_sales_price from
* (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
* where a.sum_cs_sales_price >
* (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk)
* select cs_item_sk, a.sum_cs_sales_price from
* (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
* where a.sum_cs_sales_price >
* (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk)
* <p>
* Step2: equal where subquery
* Inline view of subquery:
* from (select b.cs_item_sk, min(cs_sales_price) from catalog_sales b group by cs_item_sk) c
* from (select b.cs_item_sk, min(cs_sales_price) from catalog_sales b group by cs_item_sk) c
* Rewritten correlated predicate:
* where c.cs_item_sk = a.cs_item_sk and a.sum_cs_sales_price > c.min(cs_sales_price)
* where c.cs_item_sk = a.cs_item_sk and a.sum_cs_sales_price > c.min(cs_sales_price)
* The final stmt:
* select a.cs_item_sk, a.sum_cs_sales_price from
* (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
* join
* (select b.cs_item_sk, min(b.cs_sales_price) min_cs_sales_price from catalog_sales b group by b.cs_item_sk) c
* (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
* join
* (select b.cs_item_sk, min(b.cs_sales_price) min_cs_sales_price from catalog_sales b group by b.cs_item_sk) c
* where c.cs_item_sk = a.cs_item_sk and a.sum_cs_sales_price > c.min_cs_sales_price;
*
* @param stmt
* @param analyzer
*/
private static SelectStmt rewriteHavingClauseSubqueries(SelectStmt stmt, Analyzer analyzer) throws AnalysisException {
private static SelectStmt rewriteHavingClauseSubqueries(SelectStmt stmt, Analyzer analyzer)
throws AnalysisException {
// prepare parameters
SelectList selectList = stmt.getSelectList();
List<String> columnLabels = stmt.getColLabels();
@ -256,7 +264,7 @@ public class StmtRewriter {
SelectListItem newItem = new SelectListItem(selectList.getItems().get(i).getExpr().reset().substitute(smap),
columnLabels.get(i));
newSelectItems.add(newItem);
LOG.debug("New select item is changed to "+ newItem.toSql());
LOG.debug("New select item is changed to " + newItem.toSql());
}
SelectList newSelectList = new SelectList(newSelectItems, selectList.isDistinct());
@ -288,7 +296,7 @@ public class StmtRewriter {
* @return select a, sum(v1), sum(v2)
*/
private static SelectList addMissingAggregationColumns(SelectList selectList,
List<FunctionCallExpr> aggregateExprs) {
List<FunctionCallExpr> aggregateExprs) {
SelectList result = selectList.clone();
for (FunctionCallExpr functionCallExpr : aggregateExprs) {
boolean columnExists = false;
@ -312,10 +320,10 @@ public class StmtRewriter {
*/
private static void rewriteUnionStatement(SetOperationStmt stmt, Analyzer analyzer)
throws AnalysisException {
for (SetOperationStmt.SetOperand operand: stmt.getOperands()) {
for (SetOperationStmt.SetOperand operand : stmt.getOperands()) {
Preconditions.checkState(operand.getQueryStmt() instanceof SelectStmt);
QueryStmt rewrittenQueryStmt = StmtRewriter.rewriteSelectStatement(
(SelectStmt)operand.getQueryStmt(), operand.getAnalyzer());
(SelectStmt) operand.getQueryStmt(), operand.getAnalyzer());
operand.setQueryStmt(rewrittenQueryStmt);
}
}
@ -338,6 +346,7 @@ public class StmtRewriter {
}
return false;
}
/**
* Rewrite all subqueries of a stmt's WHERE clause. Initially, all the
* conjuncts containing subqueries are extracted from the WHERE clause and are
@ -345,52 +354,51 @@ public class StmtRewriter {
* merged into its parent select block by converting it into a join.
* Conjuncts with subqueries that themselves contain conjuncts with subqueries are
* recursively rewritten in a bottom up fashion.
*
* <p>
* The following example illustrates the bottom up rewriting of nested queries.
* Suppose we have the following three level nested query Q0:
*
* <p>
* SELECT *
* FROM T1 : Q0
* WHERE T1.a IN (SELECT a
* FROM T2 WHERE T2.b IN (SELECT b
* FROM T3))
* FROM T2 WHERE T2.b IN (SELECT b
* FROM T3))
* AND T1.c < 10;
*
* <p>
* This query will be rewritten as follows. Initially, the IN predicate
* T1.a IN (SELECT a FROM T2 WHERE T2.b IN (SELECT b FROM T3)) is extracted
* from the top level block (Q0) since it contains a subquery and is
* replaced by a true BoolLiteral, resulting in the following query Q1:
*
* <p>
* SELECT * FROM T1 WHERE TRUE : Q1
*
* <p>
* Since the stmt in the extracted predicate contains a conjunct with a subquery,
* it is also rewritten. As before, rewriting stmt SELECT a FROM T2
* WHERE T2.b IN (SELECT b FROM T3) works by first extracting the conjunct that
* contains the subquery (T2.b IN (SELECT b FROM T3)) and substituting it with
* a true BoolLiteral, producing the following stmt Q2:
*
* <p>
* SELECT a FROM T2 WHERE TRUE : Q2
*
* <p>
* The predicate T2.b IN (SELECT b FROM T3) is then merged with Q2,
* producing the following unnested query Q3:
*
* <p>
* SELECT a FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1 ON T2.b = $a$1.b : Q3
*
* <p>
* The extracted IN predicate becomes:
*
* <p>
* T1.a IN (SELECT a FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1 ON T2.b = $a$1.b)
*
* <p>
* Finally, the rewritten IN predicate is merged with query block Q1,
* producing the following unnested query (WHERE clauses that contain only
* conjunctions of true BoolLiterals are eliminated):
*
* <p>
* SELECT *
* FROM T1 LEFT SEMI JOIN (SELECT a
* FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1
* ON T2.b = $a$1.b) $a$1
* FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1
* ON T2.b = $a$1.b) $a$1
* ON $a$1.a = T1.a
* WHERE T1.c < 10;
*
*/
private static void rewriteWhereClauseSubqueries(
SelectStmt stmt, Analyzer analyzer)
@ -494,7 +502,7 @@ public class StmtRewriter {
* subquery stmt. The modified analyzed expr is returned.
*/
private static Expr rewriteExpr(Expr expr, Analyzer analyzer)
throws AnalysisException {
throws AnalysisException {
// Extract the subquery and equal it.
Subquery subquery = expr.getSubquery();
Preconditions.checkNotNull(subquery);
@ -506,7 +514,7 @@ public class StmtRewriter {
rewrittenStmt.reset();
Subquery newSubquery = new Subquery(rewrittenStmt);
newSubquery.analyze(analyzer);
ExprSubstitutionMap smap = new ExprSubstitutionMap();
smap.put(subquery, newSubquery);
return expr.substitute(smap, analyzer, false);
@ -539,7 +547,7 @@ public class StmtRewriter {
Preconditions.checkNotNull(expr);
Preconditions.checkNotNull(analyzer);
Preconditions.checkState(expr.getSubquery().getAnalyzer() != null,
"subquery must be analyze address=" + System.identityHashCode(expr.getSubquery()));
"subquery must be analyze address=" + System.identityHashCode(expr.getSubquery()));
boolean updateSelectList = false;
SelectStmt subqueryStmt = (SelectStmt) expr.getSubquery().getStatement();
@ -554,7 +562,7 @@ public class StmtRewriter {
}
// (select k1 $a from t2) $b
InlineViewRef inlineView = new InlineViewRef(
stmt.getTableAliasGenerator().getNextAlias(), subqueryStmt, colLabels);
stmt.getTableAliasGenerator().getNextAlias(), subqueryStmt, colLabels);
// Extract all correlated predicates from the subquery.
List<Expr> onClauseConjuncts = extractCorrelatedPredicates(subqueryStmt);
@ -578,7 +586,7 @@ public class StmtRewriter {
for (Expr conjunct : onClauseConjuncts) {
canRewriteScalarFunction(expr, conjunct);
updateInlineView(inlineView, conjunct, stmt.getTableRefIds(),
lhsExprs, rhsExprs, updateGroupBy);
lhsExprs, rhsExprs, updateGroupBy);
}
/**
@ -612,10 +620,10 @@ public class StmtRewriter {
// Create a join conjunct from the expr that contains a subquery.
Expr joinConjunct = createJoinConjunct(expr, inlineView, analyzer,
!onClauseConjuncts.isEmpty());
!onClauseConjuncts.isEmpty());
if (joinConjunct != null) {
SelectListItem firstItem =
((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
if (!onClauseConjuncts.isEmpty()
&& firstItem.getExpr().contains(Expr.NON_NULL_EMPTY_AGG)) {
// Correlated subqueries with an aggregate function that returns non-null on
@ -626,7 +634,7 @@ public class StmtRewriter {
// stmt's WHERE clause because it needs to be applied to the result of the
// LEFT OUTER JOIN (both matched and unmatched tuples).
stmt.whereClause =
CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause);
CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause);
joinConjunct = null;
joinOp = JoinOperator.LEFT_OUTER_JOIN;
updateSelectList = true;
@ -639,7 +647,7 @@ public class StmtRewriter {
// Create the ON clause from the extracted correlated predicates.
Expr onClausePredicate =
CompoundPredicate.createConjunctivePredicate(onClauseConjuncts);
CompoundPredicate.createConjunctivePredicate(onClauseConjuncts);
if (onClausePredicate == null) {
Preconditions.checkState(expr instanceof ExistsPredicate);
if (((ExistsPredicate) expr).isNotExists()) {
@ -679,9 +687,13 @@ public class StmtRewriter {
// Check if we have a valid ON clause for an equi-join.
boolean hasEqJoinPred = false;
for (Expr conjunct : onClausePredicate.getConjuncts()) {
if (!(conjunct instanceof BinaryPredicate)) continue;
if (!(conjunct instanceof BinaryPredicate)) {
continue;
}
BinaryPredicate.Operator operator = ((BinaryPredicate) conjunct).getOp();
if (!operator.isEquivalence()) continue;
if (!operator.isEquivalence()) {
continue;
}
List<TupleId> lhsTupleIds = Lists.newArrayList();
conjunct.getChild(0).getIds(lhsTupleIds, null);
if (lhsTupleIds.isEmpty()) {
@ -723,7 +735,7 @@ public class StmtRewriter {
// We can equal the aggregate subquery using a cross join. All conjuncts
// that were extracted from the subquery are added to stmt's WHERE clause.
stmt.whereClause =
CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause);
CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause);
inlineView.setJoinOp(JoinOperator.CROSS_JOIN);
// Indicate that the CROSS JOIN may add a new visible tuple to stmt's
// select list (if the latter contains an unqualified star item '*')
@ -732,9 +744,9 @@ public class StmtRewriter {
// We have a valid equi-join conjunct.
if (expr instanceof InPredicate
&& ((InPredicate) expr).isNotIn()
|| expr instanceof ExistsPredicate
&& ((ExistsPredicate) expr).isNotExists()) {
&& ((InPredicate) expr).isNotIn()
|| expr instanceof ExistsPredicate
&& ((ExistsPredicate) expr).isNotExists()) {
// For the case of a NOT IN with an eq join conjunct, replace the join
// conjunct with a conjunct that uses the null-matching eq operator.
if (expr instanceof InPredicate) {
@ -825,7 +837,7 @@ public class StmtRewriter {
/**
* Extract all correlated predicates of a subquery.
*
* <p>
* TODO Handle correlated predicates in a HAVING clause.
*/
private static ArrayList<Expr> extractCorrelatedPredicates(SelectStmt subqueryStmt)
@ -835,14 +847,14 @@ public class StmtRewriter {
if (subqueryStmt.hasWhereClause()) {
if (!canExtractCorrelatedPredicates(subqueryStmt.getWhereClause(),
subqueryTupleIds)) {
subqueryTupleIds)) {
throw new AnalysisException("Disjunctions with correlated predicates "
+ "are not supported: " + subqueryStmt.getWhereClause().toSql());
}
// Extract the correlated predicates from the subquery's WHERE clause and
// replace them with true BoolLiterals.
Expr newWhereClause = extractCorrelatedPredicates(subqueryStmt.getWhereClause(),
subqueryTupleIds, correlatedPredicates);
subqueryTupleIds, correlatedPredicates);
if (canEliminate(newWhereClause)) {
newWhereClause = null;
}
@ -857,7 +869,7 @@ public class StmtRewriter {
ArrayList<Expr> onClauseCorrelatedPreds = Lists.newArrayList();
Expr newOnClause = extractCorrelatedPredicates(tableRef.getOnClause(),
subqueryTupleIds, onClauseCorrelatedPreds);
subqueryTupleIds, onClauseCorrelatedPreds);
if (onClauseCorrelatedPreds.isEmpty()) {
continue;
}
@ -883,14 +895,14 @@ public class StmtRewriter {
* and the extracted correlated predicates are added to 'matches'.
*/
private static Expr extractCorrelatedPredicates(Expr root, List<TupleId> tupleIds,
ArrayList<Expr> matches) {
ArrayList<Expr> matches) {
if (isCorrelatedPredicate(root, tupleIds)) {
matches.add(root);
return new BoolLiteral(true);
}
for (int i = 0; i < root.getChildren().size(); ++i) {
root.getChildren().set(i, extractCorrelatedPredicates(root.getChild(i), tupleIds,
matches));
matches));
}
return root;
}
@ -900,7 +912,7 @@ public class StmtRewriter {
* correlated predicate cannot be extracted if it is part of a disjunction.
*/
private static boolean canExtractCorrelatedPredicates(Expr expr,
List<TupleId> subqueryTupleIds) {
List<TupleId> subqueryTupleIds) {
if (!(expr instanceof CompoundPredicate)) {
return true;
}
@ -937,8 +949,8 @@ public class StmtRewriter {
SelectListItem item = stmt.getSelectList().getItems().get(0);
if (!item.getExpr().contains(Expr.CORRELATED_SUBQUERY_SUPPORT_AGG_FN)) {
throw new AnalysisException("The select item in correlated subquery of binary predicate should only "
+ "be sum, min, max, avg and count. Current subquery:"
+ stmt.toSql());
+ "be sum, min, max, avg and count. Current subquery:"
+ stmt.toSql());
}
}
// Grouping and/or aggregation (including analytic functions) is forbidden in correlated subquery of in
@ -953,7 +965,9 @@ public class StmtRewriter {
final com.google.common.base.Predicate<Expr> isSingleSlotRef =
new com.google.common.base.Predicate<Expr>() {
@Override
public boolean apply(Expr arg) { return arg.unwrapSlotRef(false) != null; }
public boolean apply(Expr arg) {
return arg.unwrapSlotRef(false) != null;
}
};
// A HAVING clause is only allowed on correlated EXISTS subqueries with
@ -1081,7 +1095,7 @@ public class StmtRewriter {
* the aggregate function is wrapped into a 'zeroifnull' function.
*/
private static Expr createJoinConjunct(Expr exprWithSubquery, InlineViewRef inlineView,
Analyzer analyzer, boolean isCorrelated) throws AnalysisException {
Analyzer analyzer, boolean isCorrelated) throws AnalysisException {
Preconditions.checkNotNull(exprWithSubquery);
Preconditions.checkNotNull(inlineView);
Preconditions.checkState(exprWithSubquery.contains(Subquery.class));
@ -1123,7 +1137,7 @@ public class StmtRewriter {
// NullLiteral whereas count returns a NumericLiteral.
if (((FunctionCallExpr) aggFns.get(0)).getFn().getReturnType().isNumericType()) {
FunctionCallExpr zeroIfNull = new FunctionCallExpr("ifnull",
Lists.newArrayList((Expr) slotRef, new IntLiteral(0, Type.BIGINT)));
Lists.newArrayList((Expr) slotRef, new IntLiteral(0, Type.BIGINT)));
zeroIfNull.analyze(analyzer);
subquerySubstitute = zeroIfNull;
} else if (((FunctionCallExpr) aggFns.get(0)).getFn().getReturnType().isStringType()) {
@ -1141,5 +1155,59 @@ public class StmtRewriter {
smap.put(subquery, subquerySubstitute);
return exprWithSubquery.substitute(smap, analyzer, false);
}
public static boolean rewriteByPolicy(StatementBase statementBase, Analyzer analyzer) throws UserException {
Catalog currentCatalog = Catalog.getCurrentCatalog();
UserIdentity currentUserIdentity = ConnectContext.get().getCurrentUserIdentity();
String user = analyzer.getQualifiedUser();
if (currentUserIdentity.isRootUser() || currentUserIdentity.isAdminUser()) {
return false;
}
if (!currentCatalog.getPolicyMgr().existPolicy(user)) {
return false;
}
if (!(statementBase instanceof SelectStmt)) {
return false;
}
SelectStmt selectStmt = (SelectStmt) statementBase;
boolean reAnalyze = false;
for (int i = 0; i < selectStmt.fromClause_.size(); i++) {
TableRef tableRef = selectStmt.fromClause_.get(i);
// Recursively rewrite subquery
if (tableRef instanceof InlineViewRef) {
InlineViewRef viewRef = (InlineViewRef) tableRef;
if (rewriteByPolicy(viewRef.getQueryStmt(), analyzer)) {
reAnalyze = true;
}
continue;
}
Table table = tableRef.getTable();
String dbName = tableRef.getName().getDb();
if (dbName == null) {
dbName = analyzer.getDefaultDb();
}
Database db = currentCatalog.getDbOrAnalysisException(dbName);
long dbId = db.getId();
long tableId = table.getId();
Policy matchPolicy = currentCatalog.getPolicyMgr().getMatchRowPolicy(dbId, tableId, user);
if (matchPolicy == null) {
continue;
}
SelectList selectList = new SelectList();
selectList.addItem(SelectListItem.createStarItem(tableRef.getAliasAsName()));
SelectStmt stmt = new SelectStmt(selectList,
new FromClause(Lists.newArrayList(tableRef)),
matchPolicy.getWherePredicate(),
null,
null,
null,
LimitElement.NO_LIMIT);
selectStmt.fromClause_.set(i, new InlineViewRef(tableRef.getAliasAsName().getTbl(), stmt));
selectStmt.analyze(analyzer);
reAnalyze = true;
}
return reAnalyze;
}
}

View File

@ -17,18 +17,22 @@
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TUserIdentity;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
@ -41,9 +45,16 @@ import java.io.IOException;
// cmy@192.168.%
// cmy@[domain.name]
public class UserIdentity implements Writable {
@SerializedName(value = "user")
private String user;
@SerializedName(value = "host")
private String host;
@SerializedName(value = "isDomain")
private boolean isDomain;
private boolean isAnalyzed = false;
public static final UserIdentity ROOT;
@ -170,6 +181,10 @@ public class UserIdentity implements Writable {
return user.equals(PaloAuth.ROOT_USER);
}
public boolean isAdminUser() {
return user.equals(PaloAuth.ADMIN_USER);
}
public TUserIdentity toThrift() {
Preconditions.checkState(isAnalyzed);
TUserIdentity tUserIdent = new TUserIdentity();
@ -180,9 +195,17 @@ public class UserIdentity implements Writable {
}
public static UserIdentity read(DataInput in) throws IOException {
UserIdentity userIdentity = new UserIdentity();
userIdentity.readFields(in);
return userIdentity;
// Use Gson in the VERSION_109
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_109) {
UserIdentity userIdentity = new UserIdentity();
userIdentity.readFields(in);
return userIdentity;
} else {
String json = Text.readString(in);
UserIdentity userIdentity = GsonUtils.GSON.fromJson(json, UserIdentity.class);
userIdentity.setIsAnalyzed();
return userIdentity;
}
}
@Override
@ -226,12 +249,11 @@ public class UserIdentity implements Writable {
@Override
public void write(DataOutput out) throws IOException {
Preconditions.checkState(isAnalyzed);
Text.writeString(out, user);
Text.writeString(out, host);
out.writeBoolean(isDomain);
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public void readFields(DataInput in) throws IOException {
@Deprecated
private void readFields(DataInput in) throws IOException {
user = Text.readString(in);
host = Text.readString(in);
isDomain = in.readBoolean();

View File

@ -217,6 +217,7 @@ import org.apache.doris.persist.TablePropertyInfo;
import org.apache.doris.persist.TruncateTableInfo;
import org.apache.doris.plugin.PluginInfo;
import org.apache.doris.plugin.PluginMgr;
import org.apache.doris.policy.PolicyMgr;
import org.apache.doris.qe.AuditEventProcessor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.GlobalVariable;
@ -459,6 +460,8 @@ public class Catalog {
private RefreshManager refreshManager;
private PolicyMgr policyMgr;
public List<Frontend> getFrontends(FrontendNodeType nodeType) {
if (nodeType == null) {
// get all
@ -630,6 +633,7 @@ public class Catalog {
this.pluginMgr = new PluginMgr();
this.auditEventProcessor = new AuditEventProcessor(this.pluginMgr);
this.refreshManager = new RefreshManager();
this.policyMgr = new PolicyMgr();
}
public static void destroyCheckpoint() {
@ -1945,6 +1949,17 @@ public class Catalog {
return checksum;
}
/**
* Load policy through file.
**/
public long loadPolicy(DataInputStream in, long checksum) throws IOException {
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_109) {
policyMgr = PolicyMgr.read(in);
}
LOG.info("finished replay policy from image");
return checksum;
}
// Only called by checkpoint thread
// return the latest image file's absolute path
public String saveImage() throws IOException {
@ -2212,6 +2227,11 @@ public class Catalog {
return checksum;
}
public long savePolicy(CountingDataOutputStream out, long checksum) throws IOException {
Catalog.getCurrentCatalog().getPolicyMgr().write(out);
return checksum;
}
public void createLabelCleaner() {
labelCleaner = new MasterDaemon("LoadLabelCleaner", Config.label_clean_interval_second * 1000L) {
@Override
@ -5190,6 +5210,10 @@ public class Catalog {
public EsRepository getEsRepository() {
return this.esRepository;
}
public PolicyMgr getPolicyMgr() {
return this.policyMgr;
}
public void setMaster(MasterInfo info) {
this.masterIp = info.getIp();

View File

@ -103,6 +103,7 @@ public class MetaReader {
checksum = catalog.loadPlugins(dis, checksum);
checksum = catalog.loadDeleteHandler(dis, checksum);
checksum = catalog.loadSqlBlockRule(dis, checksum);
checksum = catalog.loadPolicy(dis, checksum);
}
MetaFooter metaFooter = MetaFooter.read(imageFile);

View File

@ -129,6 +129,7 @@ public class MetaWriter {
checksum.setRef(writer.doWork("plugins", () -> catalog.savePlugins(dos, checksum.getRef())));
checksum.setRef(writer.doWork("deleteHandler", () -> catalog.saveDeleteHandler(dos, checksum.getRef())));
checksum.setRef(writer.doWork("sqlBlockRule", () -> catalog.saveSqlBlockRule(dos, checksum.getRef())));
checksum.setRef(writer.doWork("policy", () -> catalog.savePolicy(dos, checksum.getRef())));
imageFileOut.getChannel().force(true);
}
MetaFooter.write(imageFile, metaIndices, checksum.getRef());

View File

@ -90,6 +90,8 @@ import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TablePropertyInfo;
import org.apache.doris.persist.TruncateTableInfo;
import org.apache.doris.plugin.PluginInfo;
import org.apache.doris.policy.DropPolicyLog;
import org.apache.doris.policy.Policy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@ -642,6 +644,16 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_CREATE_POLICY: {
data = Policy.read(in);
isRead = true;
break;
}
case OperationType.OP_DROP_POLICY: {
data = DropPolicyLog.read(in);
isRead = true;
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);

View File

@ -63,6 +63,8 @@ import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
import org.apache.doris.plugin.PluginInfo;
import org.apache.doris.policy.DropPolicyLog;
import org.apache.doris.policy.Policy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@ -809,6 +811,16 @@ public class EditLog {
catalog.getAlterInstance().replayProcessModifyEngine(log);
break;
}
case OperationType.OP_CREATE_POLICY: {
Policy log = (Policy) journal.getData();
catalog.getPolicyMgr().replayCreate(log);
break;
}
case OperationType.OP_DROP_POLICY: {
DropPolicyLog log = (DropPolicyLog) journal.getData();
catalog.getPolicyMgr().replayDrop(log);
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@ -1411,4 +1423,12 @@ public class EditLog {
public void logModifyTableEngine(ModifyTableEngineOperationLog log) {
logEdit(OperationType.OP_MODIFY_TABLE_ENGINE, log);
}
public void logCreatePolicy(Policy policy) {
logEdit(OperationType.OP_CREATE_POLICY, policy);
}
public void logDropPolicy(DropPolicyLog log) {
logEdit(OperationType.OP_DROP_POLICY, log);
}
}

View File

@ -218,6 +218,10 @@ public class OperationType {
public static final short OP_CREATE_SQL_BLOCK_RULE = 300;
public static final short OP_ALTER_SQL_BLOCK_RULE = 301;
public static final short OP_DROP_SQL_BLOCK_RULE = 302;
// policy 310-320
public static final short OP_CREATE_POLICY = 310;
public static final short OP_DROP_POLICY = 311;
// get opcode name by op codeStri
public static String getOpName(short opCode) {

View File

@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.policy;
import org.apache.doris.analysis.DropPolicyStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import com.google.gson.annotations.SerializedName;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Use for transmission drop policy log.
**/
@AllArgsConstructor
@Getter
public class DropPolicyLog implements Writable {
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "tableId")
private long tableId;
@SerializedName(value = "type")
private PolicyTypeEnum type;
@SerializedName(value = "policyName")
private String policyName;
@SerializedName(value = "user")
private UserIdentity user;
/**
* Generate delete logs through stmt.
**/
public static DropPolicyLog fromDropStmt(DropPolicyStmt stmt) throws AnalysisException {
String curDb = stmt.getTableName().getDb();
if (curDb == null) {
curDb = ConnectContext.get().getDatabase();
}
Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb);
Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl());
return new DropPolicyLog(db.getId(), table.getId(), stmt.getType(), stmt.getPolicyName(), stmt.getUser());
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static DropPolicyLog read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), DropPolicyLog.class);
}
}

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.policy;
import org.apache.doris.analysis.CompoundPredicate;
import lombok.Getter;
import java.util.Arrays;
/**
* Use for associating policies.
**/
public enum FilterType {
PERMISSIVE(CompoundPredicate.Operator.OR),
RESTRICTIVE(CompoundPredicate.Operator.AND);
@Getter
private final CompoundPredicate.Operator op;
FilterType(CompoundPredicate.Operator op) {
this.op = op;
}
public static FilterType of(String name) {
return Arrays.stream(FilterType.values()).filter(f -> f.name().equalsIgnoreCase(name)).findFirst()
.orElse(RESTRICTIVE);
}
}

View File

@ -0,0 +1,154 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.policy;
import org.apache.doris.analysis.CreatePolicyStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
/**
* Save policy for filtering data.
**/
@Data
@AllArgsConstructor
public class Policy implements Writable, GsonPostProcessable {
public static final String ROW_POLICY = "ROW";
private static final Logger LOG = LogManager.getLogger(Policy.class);
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "tableId")
private long tableId;
@SerializedName(value = "policyName")
private String policyName;
/**
* ROW.
**/
@SerializedName(value = "type")
private PolicyTypeEnum type;
/**
* PERMISSIVE | RESTRICTIVE, If multiple types exist, the last type prevails.
**/
@SerializedName(value = "filterType")
private final FilterType filterType;
private Expr wherePredicate;
/**
* Policy bind user.
**/
@SerializedName(value = "user")
private final UserIdentity user;
/**
* Use for Serialization/deserialization.
**/
@SerializedName(value = "originStmt")
private String originStmt;
/**
* Trans stmt to Policy.
**/
public static Policy fromCreateStmt(CreatePolicyStmt stmt) throws AnalysisException {
String curDb = stmt.getTableName().getDb();
if (curDb == null) {
curDb = ConnectContext.get().getDatabase();
}
Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb);
Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl());
UserIdentity userIdent = stmt.getUser();
userIdent.analyze(ConnectContext.get().getClusterName());
return new Policy(db.getId(), table.getId(), stmt.getPolicyName(), stmt.getType(), stmt.getFilterType(),
stmt.getWherePredicate(), userIdent, stmt.getOrigStmt().originStmt);
}
/**
* Use for SHOW POLICY.
**/
public List<String> getShowInfo() throws AnalysisException {
Database database = Catalog.getCurrentCatalog().getDbOrAnalysisException(this.dbId);
Table table = database.getTableOrAnalysisException(this.tableId);
return Lists.newArrayList(this.policyName, database.getFullName(), table.getName(), this.type.name(),
this.filterType.name(), this.wherePredicate.toSql(), this.user.getQualifiedUser(), this.originStmt);
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
/**
* Read policy from file.
**/
public static Policy read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, Policy.class);
}
@Override
public void gsonPostProcess() throws IOException {
if (wherePredicate != null) {
return;
}
try {
SqlScanner input = new SqlScanner(new StringReader(originStmt), 0L);
SqlParser parser = new SqlParser(input);
CreatePolicyStmt stmt = (CreatePolicyStmt) SqlParserUtils.getFirstStmt(parser);
wherePredicate = stmt.getWherePredicate();
} catch (Exception e) {
throw new IOException("policy parse originStmt error", e);
}
}
@Override
public Policy clone() {
return new Policy(this.dbId, this.tableId, this.policyName, this.type, this.filterType, this.wherePredicate,
this.user, this.originStmt);
}
}

View File

@ -0,0 +1,354 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.policy;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.CreatePolicyStmt;
import org.apache.doris.analysis.DropPolicyStmt;
import org.apache.doris.analysis.ShowPolicyStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
* Management policy and cache it.
**/
public class PolicyMgr implements Writable {
private static final Logger LOG = LogManager.getLogger(PolicyMgr.class);
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
@SerializedName(value = "dbIdToPolicyMap")
private Map<Long, List<Policy>> dbIdToPolicyMap = Maps.newConcurrentMap();
/**
* Cache merge policy for match.
* key:dbId:tableId-type-user
**/
private Map<Long, Map<String, Policy>> dbIdToMergePolicyMap = Maps.newConcurrentMap();
private Set<String> userPolicySet = Sets.newConcurrentHashSet();
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
private void readLock() {
lock.readLock().lock();
}
private void readUnlock() {
lock.readLock().unlock();
}
/**
* Create policy through stmt.
**/
public void createPolicy(CreatePolicyStmt stmt) throws UserException {
Policy policy = Policy.fromCreateStmt(stmt);
writeLock();
try {
if (existPolicy(policy.getDbId(), policy.getTableId(), policy.getType(),
policy.getPolicyName(), policy.getUser())) {
if (stmt.isIfNotExists()) {
return;
}
throw new DdlException("the policy " + policy.getPolicyName() + " already create");
}
unprotectedAdd(policy);
Catalog.getCurrentCatalog().getEditLog().logCreatePolicy(policy);
} finally {
writeUnlock();
}
}
/**
* Drop policy through stmt.
**/
public void dropPolicy(DropPolicyStmt stmt) throws DdlException, AnalysisException {
DropPolicyLog policy = DropPolicyLog.fromDropStmt(stmt);
writeLock();
try {
if (!existPolicy(policy.getDbId(), policy.getTableId(), policy.getType(),
policy.getPolicyName(), policy.getUser())) {
if (stmt.isIfExists()) {
return;
}
throw new DdlException("the policy " + policy.getPolicyName() + " not exist");
}
unprotectedDrop(policy);
Catalog.getCurrentCatalog().getEditLog().logDropPolicy(policy);
} finally {
writeUnlock();
}
}
public boolean existPolicy(String user) {
return userPolicySet.contains(user);
}
private boolean existPolicy(long dbId, long tableId, PolicyTypeEnum type, String policyName, UserIdentity user) {
List<Policy> policies = getDbPolicies(dbId);
return policies.stream().anyMatch(policy -> matchPolicy(policy, tableId, type, policyName, user));
}
private List<Policy> getDbPolicies(long dbId) {
if (dbIdToPolicyMap == null) {
return new ArrayList<>();
}
return dbIdToPolicyMap.getOrDefault(dbId, new ArrayList<>());
}
private List<Policy> getDbUserPolicies(long dbId, String user) {
if (dbIdToPolicyMap == null) {
return new ArrayList<>();
}
return dbIdToPolicyMap.getOrDefault(dbId, new ArrayList<>()).stream()
.filter(p -> p.getUser().getQualifiedUser().equals(user)).collect(Collectors.toList());
}
public void replayCreate(Policy policy) {
unprotectedAdd(policy);
LOG.info("replay create policy: {}", policy);
}
private void unprotectedAdd(Policy policy) {
if (policy == null) {
return;
}
long dbId = policy.getDbId();
List<Policy> dbPolicies = getDbPolicies(dbId);
dbPolicies.add(policy);
dbIdToPolicyMap.put(dbId, dbPolicies);
updateMergePolicyMap(dbId);
userPolicySet.add(policy.getUser().getQualifiedUser());
}
public void replayDrop(DropPolicyLog log) {
unprotectedDrop(log);
LOG.info("replay drop policy log: {}", log);
}
private void unprotectedDrop(DropPolicyLog log) {
long dbId = log.getDbId();
List<Policy> policies = getDbPolicies(dbId);
policies.removeIf(p -> matchPolicy(p, log.getTableId(), log.getType(), log.getPolicyName(), log.getUser()));
dbIdToPolicyMap.put(dbId, policies);
updateMergePolicyMap(dbId);
if (log.getUser() == null) {
updateAllUserPolicySet();
} else {
String user = log.getUser().getQualifiedUser();
if (!existUserPolicy(user)) {
userPolicySet.remove(user);
}
}
}
private boolean matchPolicy(Policy policy, long tableId, PolicyTypeEnum type,
String policyName, UserIdentity user) {
return policy.getTableId() == tableId
&& policy.getType().equals(type)
&& StringUtils.equals(policy.getPolicyName(), policyName)
&& (user == null || StringUtils.equals(policy.getUser().getQualifiedUser(), user.getQualifiedUser()));
}
/**
* Match row policy and return it.
**/
public Policy getMatchRowPolicy(long dbId, long tableId, String user) {
readLock();
try {
if (!dbIdToMergePolicyMap.containsKey(dbId)) {
return null;
}
String key = Joiner.on("-").join(tableId, Policy.ROW_POLICY, user);
if (!dbIdToMergePolicyMap.get(dbId).containsKey(key)) {
return null;
}
return dbIdToMergePolicyMap.get(dbId).get(key);
} finally {
readUnlock();
}
}
/**
* Show policy through stmt.
**/
public ShowResultSet showPolicy(ShowPolicyStmt showStmt) throws AnalysisException {
List<List<String>> rows = Lists.newArrayList();
List<Policy> policies;
long currentDbId = ConnectContext.get().getCurrentDbId();
if (showStmt.getUser() == null) {
policies = Catalog.getCurrentCatalog().getPolicyMgr().getDbPolicies(currentDbId);
} else {
policies = Catalog.getCurrentCatalog().getPolicyMgr()
.getDbUserPolicies(currentDbId, showStmt.getUser().getQualifiedUser());
}
for (Policy policy : policies) {
if (policy.getWherePredicate() == null) {
continue;
}
rows.add(policy.getShowInfo());
}
return new ShowResultSet(showStmt.getMetaData(), rows);
}
private void updateAllMergePolicyMap() {
dbIdToPolicyMap.forEach((dbId, policies) -> updateMergePolicyMap(dbId));
}
private void updateAllUserPolicySet() {
userPolicySet.clear();
dbIdToPolicyMap.forEach((dbId, policies) ->
policies.forEach(policy -> userPolicySet.add(policy.getUser().getQualifiedUser())));
}
private boolean existUserPolicy(String user) {
readLock();
try {
for (Map<String, Policy> policies : dbIdToMergePolicyMap.values()) {
for (Policy policy : policies.values()) {
if (policy.getUser().getQualifiedUser().equals(user)) {
return true;
}
}
}
return false;
} finally {
readUnlock();
}
}
/**
* The merge policy cache needs to be regenerated after the update.
**/
private void updateMergePolicyMap(long dbId) {
readLock();
try {
if (!dbIdToPolicyMap.containsKey(dbId)) {
return;
}
List<Policy> policies = dbIdToPolicyMap.get(dbId);
Map<String, Policy> andMap = new HashMap<>();
Map<String, Policy> orMap = new HashMap<>();
for (Policy policy : policies) {
// read from json, need set isAnalyzed
policy.getUser().setIsAnalyzed();
String key =
Joiner.on("-").join(policy.getTableId(), policy.getType(), policy.getUser().getQualifiedUser());
// merge wherePredicate
if (CompoundPredicate.Operator.AND.equals(policy.getFilterType().getOp())) {
Policy frontPolicy = andMap.get(key);
if (frontPolicy == null) {
andMap.put(key, policy.clone());
} else {
frontPolicy.setWherePredicate(
new CompoundPredicate(CompoundPredicate.Operator.AND, frontPolicy.getWherePredicate(),
policy.getWherePredicate()));
andMap.put(key, frontPolicy.clone());
}
} else {
Policy frontPolicy = orMap.get(key);
if (frontPolicy == null) {
orMap.put(key, policy.clone());
} else {
frontPolicy.setWherePredicate(
new CompoundPredicate(CompoundPredicate.Operator.OR, frontPolicy.getWherePredicate(),
policy.getWherePredicate()));
orMap.put(key, frontPolicy.clone());
}
}
}
Map<String, Policy> mergeMap = new HashMap<>();
Set<String> policyKeys = new HashSet<>();
policyKeys.addAll(andMap.keySet());
policyKeys.addAll(orMap.keySet());
policyKeys.forEach(key -> {
if (andMap.containsKey(key) && orMap.containsKey(key)) {
Policy mergePolicy = andMap.get(key).clone();
mergePolicy.setWherePredicate(
new CompoundPredicate(CompoundPredicate.Operator.AND, mergePolicy.getWherePredicate(),
orMap.get(key).getWherePredicate()));
mergeMap.put(key, mergePolicy);
}
if (!andMap.containsKey(key)) {
mergeMap.put(key, orMap.get(key));
}
if (!orMap.containsKey(key)) {
mergeMap.put(key, andMap.get(key));
}
});
dbIdToMergePolicyMap.put(dbId, mergeMap);
} finally {
readUnlock();
}
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
/**
* Read policyMgr from file.
**/
public static PolicyMgr read(DataInput in) throws IOException {
String json = Text.readString(in);
PolicyMgr policyMgr = GsonUtils.GSON.fromJson(json, PolicyMgr.class);
// update merge policy cache
policyMgr.updateAllMergePolicyMap();
// update user policy cache
policyMgr.updateAllUserPolicySet();
return policyMgr;
}
}

View File

@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.policy;
/**
* Policy type enum, currently only row.
**/
public enum PolicyTypeEnum {
ROW
}

View File

@ -17,11 +17,11 @@
package org.apache.doris.qe;
import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
import org.apache.doris.analysis.AdminCancelRepairTableStmt;
import org.apache.doris.analysis.AdminCheckTabletsStmt;
import org.apache.doris.analysis.AdminCleanTrashStmt;
import org.apache.doris.analysis.AdminCompactTableStmt;
import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
import org.apache.doris.analysis.AdminRebalanceDiskStmt;
import org.apache.doris.analysis.AdminRepairTableStmt;
import org.apache.doris.analysis.AdminSetConfigStmt;
@ -51,6 +51,7 @@ import org.apache.doris.analysis.CreateEncryptKeyStmt;
import org.apache.doris.analysis.CreateFileStmt;
import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.CreatePolicyStmt;
import org.apache.doris.analysis.CreateRepositoryStmt;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.analysis.CreateRoleStmt;
@ -69,6 +70,7 @@ import org.apache.doris.analysis.DropEncryptKeyStmt;
import org.apache.doris.analysis.DropFileStmt;
import org.apache.doris.analysis.DropFunctionStmt;
import org.apache.doris.analysis.DropMaterializedViewStmt;
import org.apache.doris.analysis.DropPolicyStmt;
import org.apache.doris.analysis.DropRepositoryStmt;
import org.apache.doris.analysis.DropResourceStmt;
import org.apache.doris.analysis.DropRoleStmt;
@ -305,6 +307,10 @@ public class DdlExecutor {
catalog.getStatisticsJobManager().createStatisticsJob((AnalyzeStmt) ddlStmt);
} else if (ddlStmt instanceof AlterResourceStmt) {
catalog.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt);
} else if (ddlStmt instanceof CreatePolicyStmt) {
catalog.getPolicyMgr().createPolicy((CreatePolicyStmt) ddlStmt);
} else if (ddlStmt instanceof DropPolicyStmt) {
catalog.getPolicyMgr().dropPolicy((DropPolicyStmt) ddlStmt);
} else {
throw new DdlException("Unknown statement.");
}

View File

@ -60,6 +60,7 @@ import org.apache.doris.analysis.ShowMigrationsStmt;
import org.apache.doris.analysis.ShowPartitionIdStmt;
import org.apache.doris.analysis.ShowPartitionsStmt;
import org.apache.doris.analysis.ShowPluginsStmt;
import org.apache.doris.analysis.ShowPolicyStmt;
import org.apache.doris.analysis.ShowProcStmt;
import org.apache.doris.analysis.ShowProcesslistStmt;
import org.apache.doris.analysis.ShowQueryProfileStmt;
@ -344,6 +345,8 @@ public class ShowExecutor {
handleAdminDiagnoseTablet();
} else if (stmt instanceof ShowCreateMaterializedViewStmt) {
handleShowCreateMaterializedView();
} else if (stmt instanceof ShowPolicyStmt) {
handleShowPolicy();
} else {
handleEmtpy();
}
@ -2205,4 +2208,9 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showStmt.getMetaData(), resultRowSet);
}
public void handleShowPolicy() throws AnalysisException {
ShowPolicyStmt showStmt = (ShowPolicyStmt) stmt;
resultSet = Catalog.getCurrentCatalog().getPolicyMgr().showPolicy(showStmt);
}
}

View File

@ -35,6 +35,7 @@ import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.SelectListItem;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SetOperationStmt;
import org.apache.doris.analysis.SetStmt;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.ShowStmt;
@ -676,6 +677,25 @@ public class StmtExecutor implements ProfileWriter {
parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt);
reAnalyze = true;
}
if (parsedStmt instanceof SelectStmt) {
if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {
reAnalyze = true;
}
}
if (parsedStmt instanceof SetOperationStmt) {
List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands();
for (SetOperationStmt.SetOperand operand : operands) {
if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)) {
reAnalyze = true;
}
}
}
if (parsedStmt instanceof InsertStmt) {
QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();
if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {
reAnalyze = true;
}
}
if (reAnalyze) {
// The rewrites should have no user-visible effect. Remember the original result
// types and column labels to restore them after the rewritten stmt has been

View File

@ -317,6 +317,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("profile", new Integer(SqlParserSymbols.KW_PROFILE));
keywordMap.put("properties", new Integer(SqlParserSymbols.KW_PROPERTIES));
keywordMap.put("property", new Integer(SqlParserSymbols.KW_PROPERTY));
keywordMap.put("policy", new Integer(SqlParserSymbols.KW_POLICY));
keywordMap.put("query", new Integer(SqlParserSymbols.KW_QUERY));
keywordMap.put("quota", new Integer(SqlParserSymbols.KW_QUOTA));
keywordMap.put("random", new Integer(SqlParserSymbols.KW_RANDOM));
@ -380,9 +381,11 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("superuser", new Integer(SqlParserSymbols.KW_SUPERUSER));
keywordMap.put("sync", new Integer(SqlParserSymbols.KW_SYNC));
keywordMap.put("system", new Integer(SqlParserSymbols.KW_SYSTEM));
keywordMap.put("sql_block_rule", new Integer(SqlParserSymbols.KW_SQL_BLOCK_RULE));
keywordMap.put("table", new Integer(SqlParserSymbols.KW_TABLE));
keywordMap.put("tables", new Integer(SqlParserSymbols.KW_TABLES));
keywordMap.put("tablet", new Integer(SqlParserSymbols.KW_TABLET));
keywordMap.put("tablets", new Integer(SqlParserSymbols.KW_TABLETS));
keywordMap.put("task", new Integer(SqlParserSymbols.KW_TASK));
keywordMap.put("temporary", new Integer(SqlParserSymbols.KW_TEMPORARY));
keywordMap.put("terminated", new Integer(SqlParserSymbols.KW_TERMINATED));
@ -428,8 +431,6 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("write", new Integer(SqlParserSymbols.KW_WRITE));
keywordMap.put("year", new Integer(SqlParserSymbols.KW_YEAR));
keywordMap.put("||", new Integer(SqlParserSymbols.KW_PIPE));
keywordMap.put("sql_block_rule", new Integer(SqlParserSymbols.KW_SQL_BLOCK_RULE));
keywordMap.put("tablets", new Integer(SqlParserSymbols.KW_TABLETS));
}
// map from token id to token description

View File

@ -61,7 +61,9 @@ import org.apache.doris.transaction.TabletCommitInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -76,10 +78,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
/**
* Test for SparkLoadJobTest.
**/
public class SparkLoadJobTest {
private long dbId;
private String dbName;
@ -100,6 +101,9 @@ public class SparkLoadJobTest {
private long backendId;
private int schemaHash;
/**
* Init.
**/
@Before
public void setUp() {
dbId = 1L;
@ -207,16 +211,19 @@ public class SparkLoadJobTest {
};
ResourceDesc resourceDesc = new ResourceDesc(resourceName, Maps.newHashMap());
SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc,
new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
job.execute();
Assert.assertEquals(JobState.PENDING, job.getState());
}
@Test
public void testOnPendingTaskFinished(@Mocked Catalog catalog, @Injectable String originStmt) throws MetaNotFoundException {
public void testOnPendingTaskFinished(@Mocked Catalog catalog, @Injectable String originStmt)
throws MetaNotFoundException {
ResourceDesc resourceDesc = new ResourceDesc(resourceName, Maps.newHashMap());
SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc,
new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId);
attachment.setAppId(appId);
attachment.setOutputPath(etlOutputPath);
@ -235,7 +242,8 @@ public class SparkLoadJobTest {
sparkConfigs.put("spark.master", "yarn");
sparkConfigs.put("spark.submit.deployMode", "cluster");
sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999");
SparkLoadJob job = new SparkLoadJob(dbId, label, null, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
SparkLoadJob job = new SparkLoadJob(dbId, label, null,
new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
job.state = JobState.ETL;
job.setMaxFilterRatio(0.15);
job.transactionId = transactionId;
@ -500,18 +508,20 @@ public class SparkLoadJobTest {
result = resourceMgr;
resourceMgr.getResource(anyString);
result = sparkResource;
Catalog.getCurrentCatalogJournalVersion();
result = FeMetaVersion.VERSION_CURRENT;
}
};
String label = "label1";
ResourceDesc resourceDesc = new ResourceDesc("my_spark", Maps.newHashMap());
String oriStmt = "LOAD LABEL db1.label1\n" +
"(\n" +
"DATA INFILE(\"hdfs://127.0.0.1:8000/user/palo/data/input/file\")\n" +
"INTO TABLE `my_table`\n" +
"WHERE k1 > 10\n" +
")\n" +
"WITH RESOURCE 'my_spark';";
String oriStmt = "LOAD LABEL db1.label1\n"
+ "(\n"
+ "DATA INFILE(\"hdfs://127.0.0.1:8000/user/palo/data/input/file\")\n"
+ "INTO TABLE `my_table`\n"
+ "WHERE k1 > 10\n"
+ ")\n"
+ "WITH RESOURCE 'my_spark';";
OriginStatement originStmt = new OriginStatement(oriStmt, 0);
UserIdentity userInfo = UserIdentity.ADMIN;
SparkLoadJob sparkLoadJob = new SparkLoadJob(dbId, label, resourceDesc, originStmt, userInfo);
@ -526,13 +536,10 @@ public class SparkLoadJobTest {
file.createNewFile();
DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));
sparkLoadJob.write(dos);
dos.flush();
dos.close();
// 2. Read objects from file
DataInputStream dis = new DataInputStream(new FileInputStream(file));
SparkLoadJob sparkLoadJob2 = (SparkLoadJob) SparkLoadJob.read(dis);
Assert.assertEquals("my_spark", sparkLoadJob2.getResourceName());
Assert.assertEquals(label, sparkLoadJob2.getLabel());

View File

@ -0,0 +1,202 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.policy;
import org.apache.doris.analysis.CreateUserStmt;
import org.apache.doris.analysis.GrantStmt;
import org.apache.doris.analysis.ShowPolicyStmt;
import org.apache.doris.analysis.TablePattern;
import org.apache.doris.analysis.UserDesc;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AccessPrivilege;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.FeConstants;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
/**
* Test for Policy.
**/
public class PolicyTest extends TestWithFeService {
@Override
protected void runBeforeAll() throws Exception {
FeConstants.runningUnitTest = true;
createDatabase("test");
useDatabase("test");
createTable("create table table1\n"
+ "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
+ "properties(\"replication_num\" = \"1\");");
createTable("create table table2\n"
+ "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
+ "properties(\"replication_num\" = \"1\");");
// create user
UserIdentity user = new UserIdentity("test_policy", "%");
user.analyze(SystemInfoService.DEFAULT_CLUSTER);
CreateUserStmt createUserStmt = new CreateUserStmt(new UserDesc(user));
Catalog.getCurrentCatalog().getAuth().createUser(createUserStmt);
List<AccessPrivilege> privileges = Lists.newArrayList(AccessPrivilege.ADMIN_PRIV);
TablePattern tablePattern = new TablePattern("*", "*");
tablePattern.analyze(SystemInfoService.DEFAULT_CLUSTER);
GrantStmt grantStmt = new GrantStmt(user, null, tablePattern, privileges);
Catalog.getCurrentCatalog().getAuth().grant(grantStmt);
useUser("test_policy");
}
@Test
public void testNoPolicy() throws Exception {
useUser("root");
String queryStr = "EXPLAIN select * from test.table1";
String explainString = getSQLPlanOrErrorMsg(queryStr);
useUser("test_policy");
Assertions.assertFalse(explainString.contains("`k1` = 1"));
}
@Test
public void testExistPolicy() throws Exception {
createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
Assertions.assertTrue(Catalog.getCurrentCatalog().getPolicyMgr().existPolicy("default_cluster:test_policy"));
dropPolicy("DROP ROW POLICY test_row_policy ON test.table1 FOR test_policy");
Assertions.assertFalse(Catalog.getCurrentCatalog().getPolicyMgr().existPolicy("default_cluster:test_policy"));
createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
dropPolicy("DROP ROW POLICY test_row_policy ON test.table1");
Assertions.assertFalse(Catalog.getCurrentCatalog().getPolicyMgr().existPolicy("default_cluster:test_policy"));
}
@Test
public void testNormalSql() throws Exception {
createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
String queryStr = "EXPLAIN select * from test.table1";
String explainString = getSQLPlanOrErrorMsg(queryStr);
Assertions.assertTrue(explainString.contains("`k1` = 1"));
dropPolicy("DROP ROW POLICY test_row_policy ON test.table1 FOR test_policy");
}
@Test
public void testUnionSql() throws Exception {
createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
String queryStr = "EXPLAIN select * from test.table1 union all select * from test.table1";
String explainString = getSQLPlanOrErrorMsg(queryStr);
Assertions.assertTrue(explainString.contains("`k1` = 1"));
dropPolicy("DROP ROW POLICY test_row_policy ON test.table1 FOR test_policy");
}
@Test
public void testInsertSelectSql() throws Exception {
createPolicy("CREATE ROW POLICY test_row_policy ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
String queryStr = "EXPLAIN insert into test.table1 select * from test.table1";
String explainString = getSQLPlanOrErrorMsg(queryStr);
Assertions.assertTrue(explainString.contains("`k1` = 1"));
dropPolicy("DROP ROW POLICY test_row_policy ON test.table1 FOR test_policy");
}
@Test
public void testDuplicateAddPolicy() throws Exception {
createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
createPolicy("CREATE ROW POLICY IF NOT EXISTS test_row_policy1 ON test.table1 AS PERMISSIVE"
+ " TO test_policy USING (k1 = 1)");
ExceptionChecker.expectThrowsWithMsg(DdlException.class, "the policy test_row_policy1 already create",
() -> createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE"
+ " TO test_policy USING (k1 = 1)"));
dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1");
}
@Test
public void testNoAuth() {
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
"CreatePolicyStmt command denied to user 'root'@'%' for table 'table1'",
() -> createPolicy(
"CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE TO root USING (k1 = 1)"));
}
@Test
public void testShowPolicy() throws Exception {
createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE TO test_policy USING (k1 = 1)");
createPolicy("CREATE ROW POLICY test_row_policy2 ON test.table1 AS PERMISSIVE TO test_policy USING (k2 = 1)");
ShowPolicyStmt showPolicyStmt =
(ShowPolicyStmt) parseAndAnalyzeStmt("SHOW ROW POLICY");
int firstSize = Catalog.getCurrentCatalog().getPolicyMgr().showPolicy(showPolicyStmt).getResultRows().size();
Assertions.assertTrue(firstSize > 0);
dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1");
dropPolicy("DROP ROW POLICY test_row_policy2 ON test.table1");
int secondSize = Catalog.getCurrentCatalog().getPolicyMgr().showPolicy(showPolicyStmt).getResultRows().size();
Assertions.assertEquals(2, firstSize - secondSize);
}
@Test
public void testDropPolicy() throws Exception {
createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS PERMISSIVE TO test_policy USING (k2 = 1)");
dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1");
dropPolicy("DROP ROW POLICY IF EXISTS test_row_policy5 ON test.table1");
ExceptionChecker.expectThrowsWithMsg(DdlException.class, "the policy test_row_policy1 not exist",
() -> dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1"));
}
@Test
public void testMergeFilter() throws Exception {
createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS RESTRICTIVE TO test_policy USING (k1 = 1)");
createPolicy("CREATE ROW POLICY test_row_policy2 ON test.table1 AS RESTRICTIVE TO test_policy USING (k2 = 1)");
createPolicy("CREATE ROW POLICY test_row_policy3 ON test.table1 AS PERMISSIVE TO test_policy USING (k2 = 2)");
createPolicy("CREATE ROW POLICY test_row_policy4 ON test.table1 AS PERMISSIVE TO test_policy USING (k2 = 1)");
String queryStr = "EXPLAIN select * from test.table1";
String explainString = getSQLPlanOrErrorMsg(queryStr);
Assertions.assertTrue(explainString.contains("`k1` = 1, `k2` = 1, `k2` = 2 OR `k2` = 1"));
dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1");
dropPolicy("DROP ROW POLICY test_row_policy2 ON test.table1");
dropPolicy("DROP ROW POLICY test_row_policy3 ON test.table1");
dropPolicy("DROP ROW POLICY test_row_policy4 ON test.table1");
}
@Test
public void testComplexSql() throws Exception {
createPolicy("CREATE ROW POLICY test_row_policy1 ON test.table1 AS RESTRICTIVE TO test_policy USING (k1 = 1)");
createPolicy("CREATE ROW POLICY test_row_policy2 ON test.table1 AS RESTRICTIVE TO test_policy USING (k2 = 1)");
String joinSql = "select * from table1 join table2 on table1.k1=table2.k1";
System.out.println(getSQLPlanOrErrorMsg(joinSql));
Assertions.assertTrue(getSQLPlanOrErrorMsg(joinSql).contains(
" TABLE: table1\n"
+ " PREAGGREGATION: ON\n"
+ " PREDICATES: `k1` = 1, `k2` = 1"));
String unionSql = "select * from table1 union select * from table2";
Assertions.assertTrue(getSQLPlanOrErrorMsg(unionSql).contains(
" TABLE: table1\n"
+ " PREAGGREGATION: ON\n"
+ " PREDICATES: `k1` = 1, `k2` = 1"));
String subQuerySql = "select * from table2 where k1 in (select k1 from table1)";
Assertions.assertTrue(getSQLPlanOrErrorMsg(subQuerySql).contains(
" TABLE: table1\n"
+ " PREAGGREGATION: ON\n"
+ " PREDICATES: `k1` = 1, `k2` = 1"));
String aliasSql = "select * from table1 t1 join table2 t2 on t1.k1=t2.k1";
Assertions.assertTrue(getSQLPlanOrErrorMsg(aliasSql).contains(
" TABLE: table1\n"
+ " PREAGGREGATION: ON\n"
+ " PREDICATES: `k1` = 1, `k2` = 1"));
dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1");
dropPolicy("DROP ROW POLICY test_row_policy2 ON test.table1");
}
}

View File

@ -17,27 +17,12 @@
package org.apache.doris.utframe;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.SocketException;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreatePolicyStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.CreateViewStmt;
import org.apache.doris.analysis.DropPolicyStmt;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
@ -45,6 +30,7 @@ import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@ -53,6 +39,7 @@ import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.planner.Planner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;
@ -65,11 +52,27 @@ import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException;
import org.apache.doris.utframe.MockedFrontend.FeStartException;
import org.apache.doris.utframe.MockedFrontend.NotInitException;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.SocketException;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* This is the base class for unit class that wants to start a FE service.
* <p>
@ -87,7 +90,7 @@ import org.junit.jupiter.api.TestInstance;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class TestWithFeService {
protected String runningDir =
"fe/mocked/" + getClass().getSimpleName() + "/" + UUID.randomUUID() + "/";
"fe/mocked/" + getClass().getSimpleName() + "/" + UUID.randomUUID() + "/";
protected ConnectContext connectContext;
@BeforeAll
@ -124,11 +127,11 @@ public abstract class TestWithFeService {
// Parse an origin stmt and analyze it. Return a StatementBase instance.
protected StatementBase parseAndAnalyzeStmt(String originStmt)
throws Exception {
throws Exception {
System.out.println("begin to parse stmt: " + originStmt);
SqlScanner input =
new SqlScanner(new StringReader(originStmt),
connectContext.getSessionVariable().getSqlMode());
new SqlScanner(new StringReader(originStmt),
connectContext.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
Analyzer analyzer = new Analyzer(connectContext.getCatalog(), connectContext);
StatementBase statementBase = null;
@ -143,6 +146,7 @@ public abstract class TestWithFeService {
throw new AnalysisException(errorMessage, e);
}
}
statementBase.setOrigStmt(new OriginStatement(originStmt, 0));
statementBase.analyze(analyzer);
return statementBase;
}
@ -150,7 +154,8 @@ public abstract class TestWithFeService {
// for analyzing multi statements
protected List<StatementBase> parseAndAnalyzeStmts(String originStmt) throws Exception {
System.out.println("begin to parse stmts: " + originStmt);
SqlScanner input = new SqlScanner(new StringReader(originStmt), connectContext.getSessionVariable().getSqlMode());
SqlScanner input =
new SqlScanner(new StringReader(originStmt), connectContext.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
Analyzer analyzer = new Analyzer(connectContext.getCatalog(), connectContext);
List<StatementBase> statementBases = null;
@ -180,7 +185,7 @@ public abstract class TestWithFeService {
}
protected int startFEServer(String runningDir) throws EnvVarNotSetException, IOException,
FeStartException, NotInitException, DdlException, InterruptedException {
FeStartException, NotInitException, DdlException, InterruptedException {
// get DORIS_HOME
String dorisHome = System.getenv("DORIS_HOME");
if (Strings.isNullOrEmpty(dorisHome)) {
@ -213,14 +218,14 @@ public abstract class TestWithFeService {
}
protected void createDorisCluster()
throws InterruptedException, NotInitException, IOException, DdlException,
EnvVarNotSetException, FeStartException {
throws InterruptedException, NotInitException, IOException, DdlException,
EnvVarNotSetException, FeStartException {
createDorisCluster(runningDir, 1);
}
protected void createDorisCluster(String runningDir, int backendNum)
throws EnvVarNotSetException, IOException, FeStartException,
NotInitException, DdlException, InterruptedException {
throws EnvVarNotSetException, IOException, FeStartException,
NotInitException, DdlException, InterruptedException {
int fe_rpc_port = startFEServer(runningDir);
for (int i = 0; i < backendNum; i++) {
createBackend("127.0.0.1", fe_rpc_port);
@ -233,8 +238,8 @@ public abstract class TestWithFeService {
// the host of BE will be "127.0.0.1", "127.0.0.2"
protected void createDorisClusterWithMultiTag(String runningDir,
int backendNum)
throws EnvVarNotSetException, IOException, FeStartException, NotInitException,
DdlException, InterruptedException {
throws EnvVarNotSetException, IOException, FeStartException, NotInitException,
DdlException, InterruptedException {
// set runningUnitTest to true, so that for ut, the agent task will be send to "127.0.0.1" to make cluster running well.
FeConstants.runningUnitTest = true;
int fe_rpc_port = startFEServer(runningDir);
@ -247,7 +252,7 @@ public abstract class TestWithFeService {
}
protected void createBackend(String beHost, int fe_rpc_port)
throws IOException, InterruptedException {
throws IOException, InterruptedException {
int be_heartbeat_port = findValidPort();
int be_thrift_port = findValidPort();
int be_brpc_port = findValidPort();
@ -255,14 +260,15 @@ public abstract class TestWithFeService {
// start be
MockedBackend backend = MockedBackendFactory.createBackend(beHost,
be_heartbeat_port, be_thrift_port, be_brpc_port, be_http_port,
new DefaultHeartbeatServiceImpl(be_thrift_port, be_http_port, be_brpc_port),
new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
be_heartbeat_port, be_thrift_port, be_brpc_port, be_http_port,
new DefaultHeartbeatServiceImpl(be_thrift_port, be_http_port, be_brpc_port),
new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
backend.setFeAddress(new TNetworkAddress("127.0.0.1", fe_rpc_port));
backend.start();
// add be
Backend be = new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(), backend.getHeartbeatPort());
Backend be =
new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(), backend.getHeartbeatPort());
Map<String, DiskInfo> disks = Maps.newHashMap();
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
diskInfo1.setTotalCapacityB(1000000);
@ -351,6 +357,10 @@ public abstract class TestWithFeService {
Catalog.getCurrentCatalog().createDb(createDbStmt);
}
protected void useDatabase(String dbName) {
connectContext.setDatabase(ClusterNamespace.getFullName(SystemInfoService.DEFAULT_CLUSTER, dbName));
}
protected void createTable(String sql) throws Exception {
createTables(sql);
}
@ -367,6 +377,16 @@ public abstract class TestWithFeService {
Catalog.getCurrentCatalog().createView(createViewStmt);
}
protected void createPolicy(String sql) throws Exception {
CreatePolicyStmt createPolicyStmt = (CreatePolicyStmt) parseAndAnalyzeStmt(sql);
Catalog.getCurrentCatalog().getPolicyMgr().createPolicy(createPolicyStmt);
}
protected void dropPolicy(String sql) throws Exception {
DropPolicyStmt stmt = (DropPolicyStmt) parseAndAnalyzeStmt(sql);
Catalog.getCurrentCatalog().getPolicyMgr().dropPolicy(stmt);
}
protected void assertSQLPlanOrErrorMsgContains(String sql, String expect) throws Exception {
// Note: adding `EXPLAIN` is necessary for non-query SQL, e.g., DDL, DML, etc.
// TODO: Use a graceful way to get explain plan string, rather than modifying the SQL string.
@ -379,4 +399,11 @@ public abstract class TestWithFeService {
Assertions.assertTrue(str.contains(expect));
}
}
protected void useUser(String userName) throws AnalysisException {
UserIdentity user = new UserIdentity(userName, "%");
user.analyze(SystemInfoService.DEFAULT_CLUSTER);
connectContext.setCurrentUserIdentity(user);
connectContext.setQualifiedUser(SystemInfoService.DEFAULT_CLUSTER + ":" + userName);
}
}