[fix](auth)fix fe can not restart when replay create row policy log (… (#37820)
pick: https://github.com/apache/doris/pull/37342
This commit is contained in:
@ -117,7 +117,8 @@ public abstract class Policy implements Writable, GsonPostProcessable {
|
||||
}
|
||||
return new RowPolicy(policyId, stmt.getPolicyName(), stmt.getTableName().getCtl(),
|
||||
stmt.getTableName().getDb(), stmt.getTableName().getTbl(), userIdent, stmt.getRoleName(),
|
||||
stmt.getOrigStmt().originStmt, stmt.getFilterType(), stmt.getWherePredicate());
|
||||
stmt.getOrigStmt().originStmt, stmt.getOrigStmt().idx, stmt.getFilterType(),
|
||||
stmt.getWherePredicate());
|
||||
default:
|
||||
throw new AnalysisException("Unknown policy type: " + stmt.getType());
|
||||
}
|
||||
|
||||
@ -102,6 +102,8 @@ public class RowPolicy extends Policy implements RowFilterPolicy {
|
||||
**/
|
||||
@SerializedName(value = "originStmt")
|
||||
private String originStmt;
|
||||
@SerializedName(value = "stmtIdx")
|
||||
private int stmtIdx;
|
||||
|
||||
private Expr wherePredicate = null;
|
||||
|
||||
@ -123,7 +125,7 @@ public class RowPolicy extends Policy implements RowFilterPolicy {
|
||||
* @param wherePredicate where predicate
|
||||
*/
|
||||
public RowPolicy(long policyId, final String policyName, long dbId, UserIdentity user, String roleName,
|
||||
String originStmt,
|
||||
String originStmt, int stmtIdx,
|
||||
final long tableId, final FilterType filterType, final Expr wherePredicate) {
|
||||
super(policyId, PolicyTypeEnum.ROW, policyName);
|
||||
this.user = user;
|
||||
@ -132,12 +134,13 @@ public class RowPolicy extends Policy implements RowFilterPolicy {
|
||||
this.tableId = tableId;
|
||||
this.filterType = filterType;
|
||||
this.originStmt = originStmt;
|
||||
this.stmtIdx = stmtIdx;
|
||||
this.wherePredicate = wherePredicate;
|
||||
}
|
||||
|
||||
public RowPolicy(long policyId, final String policyName, String ctlName, String dbName, String tableName,
|
||||
UserIdentity user, String roleName,
|
||||
String originStmt, final FilterType filterType, final Expr wherePredicate) {
|
||||
String originStmt, int stmtIdx, final FilterType filterType, final Expr wherePredicate) {
|
||||
super(policyId, PolicyTypeEnum.ROW, policyName);
|
||||
this.user = user;
|
||||
this.roleName = roleName;
|
||||
@ -146,6 +149,7 @@ public class RowPolicy extends Policy implements RowFilterPolicy {
|
||||
this.tableName = tableName;
|
||||
this.filterType = filterType;
|
||||
this.originStmt = originStmt;
|
||||
this.stmtIdx = stmtIdx;
|
||||
this.wherePredicate = wherePredicate;
|
||||
}
|
||||
|
||||
@ -166,16 +170,20 @@ public class RowPolicy extends Policy implements RowFilterPolicy {
|
||||
try {
|
||||
SqlScanner input = new SqlScanner(new StringReader(originStmt), 0L);
|
||||
SqlParser parser = new SqlParser(input);
|
||||
CreatePolicyStmt stmt = (CreatePolicyStmt) SqlParserUtils.getFirstStmt(parser);
|
||||
CreatePolicyStmt stmt = (CreatePolicyStmt) SqlParserUtils.getStmt(parser, stmtIdx);
|
||||
wherePredicate = stmt.getWherePredicate();
|
||||
} catch (Exception e) {
|
||||
throw new IOException("table policy parse originStmt error", e);
|
||||
String errorMsg = String.format("table policy parse originStmt error, originStmt: %s, stmtIdx: %s.",
|
||||
originStmt, stmtIdx);
|
||||
// Only print logs to avoid cluster failure to start
|
||||
LOG.warn(errorMsg, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RowPolicy clone() {
|
||||
return new RowPolicy(this.id, this.policyName, this.dbId, this.user, this.roleName, this.originStmt,
|
||||
this.stmtIdx,
|
||||
this.tableId,
|
||||
this.filterType, this.wherePredicate);
|
||||
}
|
||||
@ -218,6 +226,10 @@ public class RowPolicy extends Policy implements RowFilterPolicy {
|
||||
public Expression getFilterExpression() throws AnalysisException {
|
||||
NereidsParser nereidsParser = new NereidsParser();
|
||||
String sql = getOriginStmt();
|
||||
if (getStmtIdx() != 0) {
|
||||
// Under normal circumstances, the index will only be equal to 0
|
||||
throw new AnalysisException("Invalid row policy [" + getPolicyIdent() + "], " + sql);
|
||||
}
|
||||
CreatePolicyCommand command = (CreatePolicyCommand) nereidsParser.parseSingle(sql);
|
||||
Optional<Expression> wherePredicate = command.getWherePredicate();
|
||||
if (!wherePredicate.isPresent()) {
|
||||
|
||||
@ -303,7 +303,7 @@ public abstract class ConnectProcessor {
|
||||
}
|
||||
|
||||
StatementBase parsedStmt = stmts.get(i);
|
||||
parsedStmt.setOrigStmt(new OriginStatement(convertedStmt, i));
|
||||
parsedStmt.setOrigStmt(new OriginStatement(auditStmt, usingOrigSingleStmt ? 0 : i));
|
||||
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
|
||||
executor = new StmtExecutor(ctx, parsedStmt);
|
||||
executor.getProfile().getSummaryProfile().setParseSqlStartTime(parseSqlStartTime);
|
||||
|
||||
Reference in New Issue
Block a user