[feature-wip](ranger)support datamask and row filter (#32137)
doris ranger support datamask and row filter hive ranger support row filter
This commit is contained in:
@ -17,15 +17,27 @@
|
||||
|
||||
package org.apache.doris.catalog.authorizer.ranger;
|
||||
|
||||
import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.common.AuthorizationException;
|
||||
import org.apache.doris.mysql.privilege.CatalogAccessController;
|
||||
import org.apache.doris.mysql.privilege.DataMaskPolicy;
|
||||
import org.apache.doris.mysql.privilege.RangerDataMaskPolicy;
|
||||
import org.apache.doris.mysql.privilege.RangerRowFilterPolicy;
|
||||
import org.apache.doris.mysql.privilege.RowFilterPolicy;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
|
||||
import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
|
||||
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
|
||||
import org.apache.ranger.plugin.policyengine.RangerAccessResultProcessor;
|
||||
import org.apache.ranger.plugin.service.RangerBasePlugin;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public abstract class RangerAccessController implements CatalogAccessController {
|
||||
private static final Logger LOG = LogManager.getLogger(RangerAccessController.class);
|
||||
@ -74,4 +86,70 @@ public abstract class RangerAccessController implements CatalogAccessController
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<? extends RowFilterPolicy> evalRowFilterPolicies(UserIdentity currentUser, String ctl, String db,
|
||||
String tbl) {
|
||||
RangerAccessResourceImpl resource = createResource(ctl, db, tbl);
|
||||
RangerAccessRequestImpl request = createRequest(currentUser);
|
||||
request.setResource(resource);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("ranger request: {}", request);
|
||||
}
|
||||
List<RangerRowFilterPolicy> res = Lists.newArrayList();
|
||||
RangerAccessResult policy = getPlugin().evalRowFilterPolicies(request, getAccessResultProcessor());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("ranger response: {}", policy);
|
||||
}
|
||||
if (policy == null) {
|
||||
return res;
|
||||
}
|
||||
String filterExpr = policy.getFilterExpr();
|
||||
if (StringUtils.isEmpty(filterExpr)) {
|
||||
return res;
|
||||
}
|
||||
res.add(new RangerRowFilterPolicy(currentUser, ctl, db, tbl, policy.getPolicyId(), policy.getPolicyVersion(),
|
||||
filterExpr));
|
||||
return res;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<DataMaskPolicy> evalDataMaskPolicy(UserIdentity currentUser, String ctl, String db, String tbl,
|
||||
String col) {
|
||||
RangerAccessResourceImpl resource = createResource(ctl, db, tbl, col);
|
||||
RangerAccessRequestImpl request = createRequest(currentUser);
|
||||
request.setResource(resource);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("ranger request: {}", request);
|
||||
}
|
||||
RangerAccessResult policy = getPlugin().evalDataMaskPolicies(request, getAccessResultProcessor());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("ranger response: {}", policy);
|
||||
}
|
||||
if (policy == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
String maskType = policy.getMaskType();
|
||||
if (StringUtils.isEmpty(maskType)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
String transformer = policy.getMaskTypeDef().getTransformer();
|
||||
if (StringUtils.isEmpty(transformer)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(new RangerDataMaskPolicy(currentUser, ctl, db, tbl, col, policy.getPolicyId(),
|
||||
policy.getPolicyVersion(), maskType, transformer.replace("{col}", col)));
|
||||
}
|
||||
|
||||
protected abstract RangerAccessRequestImpl createRequest(UserIdentity currentUser);
|
||||
|
||||
protected abstract RangerAccessResourceImpl createResource(String ctl, String db, String tbl);
|
||||
|
||||
protected abstract RangerAccessResourceImpl createResource(String ctl, String db, String tbl, String col);
|
||||
|
||||
protected abstract RangerBasePlugin getPlugin();
|
||||
|
||||
protected abstract RangerAccessResultProcessor getAccessResultProcessor();
|
||||
}
|
||||
|
||||
@ -20,7 +20,6 @@ package org.apache.doris.catalog.authorizer.ranger.doris;
|
||||
import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.authorizer.ranger.RangerAccessController;
|
||||
import org.apache.doris.catalog.authorizer.ranger.hive.RangerHiveResource;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.AuthorizationException;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
@ -30,6 +29,8 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
|
||||
import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
|
||||
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
|
||||
import org.apache.ranger.plugin.policyengine.RangerAccessResultProcessor;
|
||||
import org.apache.ranger.plugin.service.RangerBasePlugin;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
@ -53,14 +54,20 @@ public class RangerDorisAccessController extends RangerAccessController {
|
||||
}
|
||||
|
||||
private RangerAccessRequestImpl createRequest(UserIdentity currentUser, DorisAccessType accessType) {
|
||||
RangerAccessRequestImpl request = createRequest(currentUser);
|
||||
request.setAction(accessType.name());
|
||||
request.setAccessType(accessType.name());
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RangerAccessRequestImpl createRequest(UserIdentity currentUser) {
|
||||
RangerAccessRequestImpl request = new RangerAccessRequestImpl();
|
||||
request.setUser(ClusterNamespace.getNameFromFullName(currentUser.getQualifiedUser()));
|
||||
Set<String> roles = Env.getCurrentEnv().getAuth().getRolesByUser(currentUser, false);
|
||||
request.setUserRoles(roles.stream().map(role -> ClusterNamespace.getNameFromFullName(role)).collect(
|
||||
Collectors.toSet()));
|
||||
|
||||
request.setAction(accessType.name());
|
||||
request.setAccessType(accessType.name());
|
||||
request.setClientIPAddress(currentUser.getHost());
|
||||
request.setClusterType(CLIENT_TYPE_DORIS);
|
||||
request.setClientType(CLIENT_TYPE_DORIS);
|
||||
@ -95,27 +102,6 @@ public class RangerDorisAccessController extends RangerAccessController {
|
||||
return checkRequestResult(request, result, accessType.name());
|
||||
}
|
||||
|
||||
public String getFilterExpr(UserIdentity currentUser, DorisAccessType accessType,
|
||||
RangerHiveResource resource) {
|
||||
RangerAccessRequestImpl request = createRequest(currentUser, accessType);
|
||||
request.setResource(resource);
|
||||
RangerAccessResult result = dorisPlugin.isAccessAllowed(request);
|
||||
|
||||
return result.getFilterExpr();
|
||||
}
|
||||
|
||||
public void getColumnMask(UserIdentity currentUser, DorisAccessType accessType,
|
||||
RangerHiveResource resource) {
|
||||
RangerAccessRequestImpl request = createRequest(currentUser, accessType);
|
||||
request.setResource(resource);
|
||||
RangerAccessResult result = dorisPlugin.isAccessAllowed(request);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("maskType: %s, maskTypeDef: %s, maskedValue: %s", result.getMaskType(),
|
||||
result.getMaskTypeDef(), result.getMaskedValue()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkGlobalPriv(UserIdentity currentUser, PrivPredicate wanted) {
|
||||
// ranger does not support global privilege,
|
||||
@ -168,6 +154,28 @@ public class RangerDorisAccessController extends RangerAccessController {
|
||||
return checkPrivilege(currentUser, DorisAccessType.toAccessType(wanted), resource);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RangerDorisResource createResource(String ctl, String db, String tbl) {
|
||||
return new RangerDorisResource(DorisObjectType.TABLE,
|
||||
ctl, ClusterNamespace.getNameFromFullName(db), tbl);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RangerDorisResource createResource(String ctl, String db, String tbl, String col) {
|
||||
return new RangerDorisResource(DorisObjectType.COLUMN,
|
||||
ctl, ClusterNamespace.getNameFromFullName(db), tbl, col);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RangerBasePlugin getPlugin() {
|
||||
return dorisPlugin;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RangerAccessResultProcessor getAccessResultProcessor() {
|
||||
return null;
|
||||
}
|
||||
|
||||
// For test only
|
||||
public static void main(String[] args) {
|
||||
RangerDorisAccessController ac = new RangerDorisAccessController("doris");
|
||||
|
||||
@ -24,22 +24,25 @@ import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.AuthorizationException;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.mysql.privilege.DataMaskPolicy;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
|
||||
import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
|
||||
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
|
||||
import org.apache.ranger.plugin.policyengine.RangerAccessResultProcessor;
|
||||
import org.apache.ranger.plugin.policyengine.RangerPolicyEngine;
|
||||
import org.apache.ranger.plugin.service.RangerBasePlugin;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -56,23 +59,28 @@ public class RangerHiveAccessController extends RangerAccessController {
|
||||
String serviceName = properties.get("ranger.service.name");
|
||||
hivePlugin = new RangerHivePlugin(serviceName);
|
||||
auditHandler = new RangerHiveAuditHandler(hivePlugin.getConfig());
|
||||
//start a timed log flusher
|
||||
// start a timed log flusher
|
||||
logFlushTimer.scheduleAtFixedRate(new RangerHiveAuditLogFlusher(auditHandler), 10, 20L, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private RangerAccessRequestImpl createRequest(UserIdentity currentUser, HiveAccessType accessType) {
|
||||
RangerAccessRequestImpl request = createRequest(currentUser);
|
||||
if (accessType == HiveAccessType.USE) {
|
||||
request.setAccessType(RangerPolicyEngine.ANY_ACCESS);
|
||||
} else {
|
||||
request.setAccessType(accessType.name().toLowerCase());
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RangerAccessRequestImpl createRequest(UserIdentity currentUser) {
|
||||
RangerAccessRequestImpl request = new RangerAccessRequestImpl();
|
||||
String user = currentUser.getQualifiedUser();
|
||||
request.setUser(ClusterNamespace.getNameFromFullName(user));
|
||||
Set<String> roles = Env.getCurrentEnv().getAuth().getRolesByUser(currentUser, false);
|
||||
request.setUserRoles(roles.stream().map(role -> ClusterNamespace.getNameFromFullName(role)).collect(
|
||||
Collectors.toSet()));
|
||||
request.setAction(accessType.name());
|
||||
if (accessType == HiveAccessType.USE) {
|
||||
request.setAccessType(RangerPolicyEngine.ANY_ACCESS);
|
||||
} else {
|
||||
request.setAccessType(accessType.name().toLowerCase());
|
||||
}
|
||||
request.setClientIPAddress(currentUser.getHost());
|
||||
request.setClusterType(CLIENT_TYPE_DORIS);
|
||||
request.setClientType(CLIENT_TYPE_DORIS);
|
||||
@ -103,27 +111,6 @@ public class RangerHiveAccessController extends RangerAccessController {
|
||||
return checkRequestResult(request, result, accessType.name());
|
||||
}
|
||||
|
||||
public String getFilterExpr(UserIdentity currentUser, HiveAccessType accessType,
|
||||
RangerHiveResource resource) throws HiveAccessControlException {
|
||||
RangerAccessRequestImpl request = createRequest(currentUser, accessType);
|
||||
request.setResource(resource);
|
||||
RangerAccessResult result = hivePlugin.isAccessAllowed(request, auditHandler);
|
||||
|
||||
return result.getFilterExpr();
|
||||
}
|
||||
|
||||
public void getColumnMask(UserIdentity currentUser, HiveAccessType accessType,
|
||||
RangerHiveResource resource) {
|
||||
RangerAccessRequestImpl request = createRequest(currentUser, accessType);
|
||||
request.setResource(resource);
|
||||
RangerAccessResult result = hivePlugin.isAccessAllowed(request, auditHandler);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("maskType: %s, maskTypeDef: %s, maskedValue: %s", result.getMaskType(),
|
||||
result.getMaskTypeDef(), result.getMaskedValue()));
|
||||
}
|
||||
}
|
||||
|
||||
private HiveAccessType convertToAccessType(PrivPredicate predicate) {
|
||||
if (predicate == PrivPredicate.SHOW) {
|
||||
return HiveAccessType.USE;
|
||||
@ -184,6 +171,12 @@ public class RangerHiveAccessController extends RangerAccessController {
|
||||
checkPrivileges(currentUser, convertToAccessType(wanted), resources);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<DataMaskPolicy> evalDataMaskPolicy(UserIdentity currentUser, String ctl, String db, String tbl,
|
||||
String col) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkResourcePriv(UserIdentity currentUser, String resourceName, PrivPredicate wanted) {
|
||||
return false;
|
||||
@ -194,6 +187,28 @@ public class RangerHiveAccessController extends RangerAccessController {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RangerHiveResource createResource(String ctl, String db, String tbl) {
|
||||
return new RangerHiveResource(HiveObjectType.TABLE,
|
||||
ClusterNamespace.getNameFromFullName(db), tbl);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RangerHiveResource createResource(String ctl, String db, String tbl, String col) {
|
||||
return new RangerHiveResource(HiveObjectType.COLUMN,
|
||||
ClusterNamespace.getNameFromFullName(db), tbl, col);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RangerBasePlugin getPlugin() {
|
||||
return hivePlugin;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RangerAccessResultProcessor getAccessResultProcessor() {
|
||||
return auditHandler;
|
||||
}
|
||||
|
||||
// For test only
|
||||
public static void main(String[] args) {
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.AuthorizationInfo;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.authorizer.ranger.doris.RangerDorisAccessController;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
@ -35,7 +36,10 @@ import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
@ -248,4 +252,32 @@ public class AccessControllerManager {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public Map<String, Optional<DataMaskPolicy>> evalDataMaskPolicies(UserIdentity currentUser, String
|
||||
ctl, String db, String tbl, Set<String> cols) {
|
||||
Map<String, Optional<DataMaskPolicy>> res = Maps.newHashMap();
|
||||
for (String col : cols) {
|
||||
res.put(col, evalDataMaskPolicy(currentUser, ctl, db, tbl, col));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
public Optional<DataMaskPolicy> evalDataMaskPolicy(UserIdentity currentUser, String
|
||||
ctl, String db, String tbl, String col) {
|
||||
Objects.requireNonNull(currentUser, "require currentUser object");
|
||||
Objects.requireNonNull(ctl, "require ctl object");
|
||||
Objects.requireNonNull(db, "require db object");
|
||||
Objects.requireNonNull(tbl, "require tbl object");
|
||||
Objects.requireNonNull(col, "require col object");
|
||||
return getAccessControllerOrDefault(ctl).evalDataMaskPolicy(currentUser, ctl, db, tbl, col.toLowerCase());
|
||||
}
|
||||
|
||||
public List<? extends RowFilterPolicy> evalRowFilterPolicies(UserIdentity currentUser, String
|
||||
ctl, String db, String tbl) throws AnalysisException {
|
||||
Objects.requireNonNull(currentUser, "require currentUser object");
|
||||
Objects.requireNonNull(ctl, "require ctl object");
|
||||
Objects.requireNonNull(db, "require db object");
|
||||
Objects.requireNonNull(tbl, "require tbl object");
|
||||
return getAccessControllerOrDefault(ctl).evalRowFilterPolicies(currentUser, ctl, db, tbl);
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,8 +18,11 @@
|
||||
package org.apache.doris.mysql.privilege;
|
||||
|
||||
import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.AuthorizationException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public interface CatalogAccessController {
|
||||
@ -73,4 +76,10 @@ public interface CatalogAccessController {
|
||||
|
||||
void checkColsPriv(UserIdentity currentUser, String ctl, String db, String tbl,
|
||||
Set<String> cols, PrivPredicate wanted) throws AuthorizationException;
|
||||
|
||||
Optional<DataMaskPolicy> evalDataMaskPolicy(UserIdentity currentUser, String ctl, String db, String tbl,
|
||||
String col);
|
||||
|
||||
List<? extends RowFilterPolicy> evalRowFilterPolicies(UserIdentity currentUser, String ctl, String db, String tbl)
|
||||
throws AnalysisException;
|
||||
}
|
||||
|
||||
@ -0,0 +1,24 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mysql.privilege;
|
||||
|
||||
public interface DataMaskPolicy {
|
||||
String getMaskTypeDef();
|
||||
|
||||
String getPolicyIdent();
|
||||
}
|
||||
@ -18,8 +18,18 @@
|
||||
package org.apache.doris.mysql.privilege;
|
||||
|
||||
import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.AuthorizationException;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.policy.PolicyMgr;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public class InternalAccessController implements CatalogAccessController {
|
||||
@ -64,4 +74,24 @@ public class InternalAccessController implements CatalogAccessController {
|
||||
public boolean checkWorkloadGroupPriv(UserIdentity currentUser, String workloadGroupName, PrivPredicate wanted) {
|
||||
return auth.checkWorkloadGroupPriv(currentUser, workloadGroupName, wanted);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<DataMaskPolicy> evalDataMaskPolicy(UserIdentity currentUser, String ctl, String db, String tbl,
|
||||
String col) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<? extends RowFilterPolicy> evalRowFilterPolicies(UserIdentity currentUser, String ctl, String db,
|
||||
String tbl)
|
||||
throws AnalysisException {
|
||||
// current not support external catalog
|
||||
if (!InternalCatalog.INTERNAL_CATALOG_NAME.equals(ctl)) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
PolicyMgr policyMgr = Env.getCurrentEnv().getPolicyMgr();
|
||||
Database database = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(db);
|
||||
Table table = database.getTableOrAnalysisException(tbl);
|
||||
return policyMgr.getUserPolicies(database.getId(), table.getId(), currentUser);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,103 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mysql.privilege;
|
||||
|
||||
import org.apache.doris.analysis.UserIdentity;
|
||||
|
||||
public class RangerDataMaskPolicy implements DataMaskPolicy {
|
||||
private UserIdentity userIdentity;
|
||||
private String ctl;
|
||||
private String db;
|
||||
private String tbl;
|
||||
private String col;
|
||||
private long policyId;
|
||||
private long policyVersion;
|
||||
private String maskType;
|
||||
private String maskTypeDef;
|
||||
|
||||
public RangerDataMaskPolicy(UserIdentity userIdentity, String ctl, String db, String tbl, String col,
|
||||
long policyId,
|
||||
long policyVersion, String maskType, String maskTypeDef) {
|
||||
this.userIdentity = userIdentity;
|
||||
this.ctl = ctl;
|
||||
this.db = db;
|
||||
this.tbl = tbl;
|
||||
this.col = col;
|
||||
this.policyId = policyId;
|
||||
this.policyVersion = policyVersion;
|
||||
this.maskType = maskType;
|
||||
this.maskTypeDef = maskTypeDef;
|
||||
}
|
||||
|
||||
public UserIdentity getUserIdentity() {
|
||||
return userIdentity;
|
||||
}
|
||||
|
||||
public String getCtl() {
|
||||
return ctl;
|
||||
}
|
||||
|
||||
public String getDb() {
|
||||
return db;
|
||||
}
|
||||
|
||||
public String getTbl() {
|
||||
return tbl;
|
||||
}
|
||||
|
||||
public String getCol() {
|
||||
return col;
|
||||
}
|
||||
|
||||
public long getPolicyId() {
|
||||
return policyId;
|
||||
}
|
||||
|
||||
public long getPolicyVersion() {
|
||||
return policyVersion;
|
||||
}
|
||||
|
||||
public String getMaskType() {
|
||||
return maskType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMaskTypeDef() {
|
||||
return maskTypeDef;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPolicyIdent() {
|
||||
return getPolicyId() + ":" + getPolicyVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RangerDataMaskPolicy{"
|
||||
+ "userIdentity=" + userIdentity
|
||||
+ ", ctl='" + ctl + '\''
|
||||
+ ", db='" + db + '\''
|
||||
+ ", tbl='" + tbl + '\''
|
||||
+ ", col='" + col + '\''
|
||||
+ ", policyId=" + policyId
|
||||
+ ", policyVersion=" + policyVersion
|
||||
+ ", maskType='" + maskType + '\''
|
||||
+ ", maskTypeDef='" + maskTypeDef + '\''
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,95 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mysql.privilege;
|
||||
|
||||
import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
|
||||
public class RangerRowFilterPolicy implements RowFilterPolicy {
|
||||
private UserIdentity userIdentity;
|
||||
private String ctl;
|
||||
private String db;
|
||||
private String tbl;
|
||||
private long policyId;
|
||||
private long policyVersion;
|
||||
private String filterExpr;
|
||||
|
||||
public RangerRowFilterPolicy(UserIdentity userIdentity, String ctl, String db, String tbl, long policyId,
|
||||
long policyVersion, String filterExpr) {
|
||||
this.userIdentity = userIdentity;
|
||||
this.ctl = ctl;
|
||||
this.db = db;
|
||||
this.tbl = tbl;
|
||||
this.policyId = policyId;
|
||||
this.policyVersion = policyVersion;
|
||||
this.filterExpr = filterExpr;
|
||||
}
|
||||
|
||||
public UserIdentity getUserIdentity() {
|
||||
return userIdentity;
|
||||
}
|
||||
|
||||
public String getCtl() {
|
||||
return ctl;
|
||||
}
|
||||
|
||||
public String getDb() {
|
||||
return db;
|
||||
}
|
||||
|
||||
public String getTbl() {
|
||||
return tbl;
|
||||
}
|
||||
|
||||
public long getPolicyId() {
|
||||
return policyId;
|
||||
}
|
||||
|
||||
public long getPolicyVersion() {
|
||||
return policyVersion;
|
||||
}
|
||||
|
||||
public String getFilterExpr() {
|
||||
return filterExpr;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Expression getFilterExpression() {
|
||||
NereidsParser nereidsParser = new NereidsParser();
|
||||
return nereidsParser.parseExpression(filterExpr);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPolicyIdent() {
|
||||
return getPolicyId() + ":" + getPolicyVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RangerRowFilterPolicy{"
|
||||
+ "userIdentity=" + userIdentity
|
||||
+ ", ctl='" + ctl + '\''
|
||||
+ ", db='" + db + '\''
|
||||
+ ", tbl='" + tbl + '\''
|
||||
+ ", policyId=" + policyId
|
||||
+ ", policyVersion=" + policyVersion
|
||||
+ ", filterExpr='" + filterExpr + '\''
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,32 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mysql.privilege;
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.policy.FilterType;
|
||||
|
||||
public interface RowFilterPolicy {
|
||||
default FilterType getFilterType() {
|
||||
return FilterType.RESTRICTIVE;
|
||||
}
|
||||
|
||||
Expression getFilterExpression() throws AnalysisException;
|
||||
|
||||
String getPolicyIdent();
|
||||
}
|
||||
@ -18,9 +18,10 @@
|
||||
package org.apache.doris.nereids.trees.plans.logical;
|
||||
|
||||
import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.mysql.privilege.AccessControllerManager;
|
||||
import org.apache.doris.mysql.privilege.RowFilterPolicy;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.trees.expressions.And;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
@ -29,12 +30,9 @@ import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.PropagateFuncDeps;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
|
||||
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
import org.apache.doris.policy.PolicyMgr;
|
||||
import org.apache.doris.policy.RowPolicy;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -125,39 +123,45 @@ public class LogicalCheckPolicy<CHILD_TYPE extends Plan> extends LogicalUnary<CH
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
PolicyMgr policyMgr = connectContext.getEnv().getPolicyMgr();
|
||||
AccessControllerManager accessManager = connectContext.getEnv().getAccessManager();
|
||||
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
|
||||
if (currentUserIdentity.isRootUser() || currentUserIdentity.isAdminUser()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
CatalogRelation catalogRelation = (CatalogRelation) logicalRelation;
|
||||
long dbId = catalogRelation.getDatabase().getId();
|
||||
long tableId = catalogRelation.getTable().getId();
|
||||
List<RowPolicy> policies = policyMgr.getUserPolicies(dbId, tableId, currentUserIdentity);
|
||||
String ctlName = catalogRelation.getDatabase().getCatalog().getName();
|
||||
String dbName = catalogRelation.getDatabase().getFullName();
|
||||
String tableName = catalogRelation.getTable().getName();
|
||||
List<? extends RowFilterPolicy> policies = null;
|
||||
try {
|
||||
policies = accessManager.evalRowFilterPolicies(currentUserIdentity, ctlName,
|
||||
dbName, tableName);
|
||||
} catch (org.apache.doris.common.AnalysisException e) {
|
||||
throw new AnalysisException(e.getMessage(), e);
|
||||
}
|
||||
if (policies.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.ofNullable(mergeRowPolicy(policies));
|
||||
}
|
||||
|
||||
private Expression mergeRowPolicy(List<RowPolicy> policies) {
|
||||
private Expression mergeRowPolicy(List<? extends RowFilterPolicy> policies) {
|
||||
List<Expression> orList = new ArrayList<>();
|
||||
List<Expression> andList = new ArrayList<>();
|
||||
for (RowPolicy policy : policies) {
|
||||
String sql = policy.getOriginStmt();
|
||||
NereidsParser nereidsParser = new NereidsParser();
|
||||
CreatePolicyCommand command = (CreatePolicyCommand) nereidsParser.parseSingle(sql);
|
||||
Optional<Expression> wherePredicate = command.getWherePredicate();
|
||||
if (!wherePredicate.isPresent()) {
|
||||
throw new AnalysisException("Invalid row policy [" + policy.getPolicyName() + "], " + sql);
|
||||
for (RowFilterPolicy policy : policies) {
|
||||
Expression wherePredicate = null;
|
||||
try {
|
||||
wherePredicate = policy.getFilterExpression();
|
||||
} catch (org.apache.doris.common.AnalysisException e) {
|
||||
throw new AnalysisException(e.getMessage(), e);
|
||||
}
|
||||
switch (policy.getFilterType()) {
|
||||
case PERMISSIVE:
|
||||
orList.add(wherePredicate.get());
|
||||
orList.add(wherePredicate);
|
||||
break;
|
||||
case RESTRICTIVE:
|
||||
andList.add(wherePredicate.get());
|
||||
andList.add(wherePredicate);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Invalid operator");
|
||||
|
||||
@ -29,6 +29,10 @@ import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.util.SqlParserUtils;
|
||||
import org.apache.doris.mysql.privilege.RowFilterPolicy;
|
||||
import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
|
||||
import org.apache.doris.qe.ShowResultSetMetaData;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
@ -42,12 +46,13 @@ import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Save policy for filtering data.
|
||||
**/
|
||||
@Data
|
||||
public class RowPolicy extends Policy {
|
||||
public class RowPolicy extends Policy implements RowFilterPolicy {
|
||||
|
||||
public static final ShowResultSetMetaData ROW_META_DATA =
|
||||
ShowResultSetMetaData.builder()
|
||||
@ -186,4 +191,22 @@ public class RowPolicy extends Policy {
|
||||
public boolean isInvalid() {
|
||||
return (wherePredicate == null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Expression getFilterExpression() throws AnalysisException {
|
||||
NereidsParser nereidsParser = new NereidsParser();
|
||||
String sql = getOriginStmt();
|
||||
CreatePolicyCommand command = (CreatePolicyCommand) nereidsParser.parseSingle(sql);
|
||||
Optional<Expression> wherePredicate = command.getWherePredicate();
|
||||
if (!wherePredicate.isPresent()) {
|
||||
throw new AnalysisException("Invalid row policy [" + getPolicyIdent() + "], " + sql);
|
||||
}
|
||||
return wherePredicate.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPolicyIdent() {
|
||||
return getPolicyName();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1045,88 +1045,6 @@ public class ConnectContext {
|
||||
return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]";
|
||||
}
|
||||
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
// maybe user set cluster by SQL hint of session variable: cloud_cluster
|
||||
// so first check it and then get from connect context.
|
||||
public String getCurrentCloudCluster() {
|
||||
String cluster = getSessionVariable().getCloudCluster();
|
||||
if (Strings.isNullOrEmpty(cluster)) {
|
||||
cluster = getCloudCluster();
|
||||
}
|
||||
return cluster;
|
||||
}
|
||||
|
||||
public void setCloudCluster(String cluster) {
|
||||
this.cloudCluster = cluster;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns an available cluster in the following order
|
||||
* 1 Use an explicitly specified cluster
|
||||
* 2 If no cluster is specified, the user's default cluster is used
|
||||
* 3 If the user does not have a default cluster, select a cluster with permissions for the user
|
||||
* Returns null when there is no available cluster
|
||||
*/
|
||||
public String getCloudCluster() {
|
||||
String cluster = null;
|
||||
if (!Strings.isNullOrEmpty(this.cloudCluster)) {
|
||||
cluster = this.cloudCluster;
|
||||
}
|
||||
|
||||
String defaultCluster = getDefaultCloudCluster();
|
||||
if (!Strings.isNullOrEmpty(defaultCluster)) {
|
||||
cluster = defaultCluster;
|
||||
}
|
||||
|
||||
String authorizedCluster = getAuthorizedCloudCluster();
|
||||
if (!Strings.isNullOrEmpty(authorizedCluster)) {
|
||||
cluster = authorizedCluster;
|
||||
}
|
||||
|
||||
if (Strings.isNullOrEmpty(cluster)) {
|
||||
LOG.warn("cant get a valid cluster for user {} to use", getCurrentUserIdentity());
|
||||
getState().setError(ErrorCode.ERR_NO_CLUSTER_ERROR,
|
||||
"Cant get a Valid cluster for you to use, plz connect admin");
|
||||
} else {
|
||||
this.cloudCluster = cluster;
|
||||
LOG.info("finally set context cluster name {}", cloudCluster);
|
||||
}
|
||||
|
||||
return cluster;
|
||||
}
|
||||
|
||||
// TODO implement this function
|
||||
public String getDefaultCloudCluster() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public String getAuthorizedCloudCluster() {
|
||||
List<String> cloudClusterNames = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterNames();
|
||||
// get all available cluster of the user
|
||||
for (String cloudClusterName : cloudClusterNames) {
|
||||
// find a cluster has more than one alive be
|
||||
List<Backend> bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
|
||||
.getBackendsByClusterName(cloudClusterName);
|
||||
AtomicBoolean hasAliveBe = new AtomicBoolean(false);
|
||||
bes.stream().filter(Backend::isAlive).findAny().ifPresent(backend -> {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("get a clusterName {}, it's has more than one alive be {}", cloudClusterName, backend);
|
||||
}
|
||||
hasAliveBe.set(true);
|
||||
});
|
||||
if (hasAliveBe.get()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("set context cluster name {}", cloudClusterName);
|
||||
}
|
||||
return cloudClusterName;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
>>>>>>> c2fe99f7c2 ([monir] remove unused cluster code (#31360))
|
||||
public StatsErrorEstimator getStatsErrorEstimator() {
|
||||
return statsErrorEstimator;
|
||||
}
|
||||
|
||||
@ -32,13 +32,16 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.AuthorizationException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.datasource.test.TestExternalCatalog.TestCatalogProvider;
|
||||
import org.apache.doris.mysql.privilege.AccessControllerFactory;
|
||||
import org.apache.doris.mysql.privilege.Auth;
|
||||
import org.apache.doris.mysql.privilege.CatalogAccessController;
|
||||
import org.apache.doris.mysql.privilege.DataMaskPolicy;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.mysql.privilege.RowFilterPolicy;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ShowExecutor;
|
||||
import org.apache.doris.qe.ShowResultSet;
|
||||
@ -53,6 +56,7 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
// when `select` suppport `col auth`,will open ColumnPrivTest
|
||||
@ -314,6 +318,18 @@ public class ColumnPrivTest extends TestWithFeService {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<DataMaskPolicy> evalDataMaskPolicy(UserIdentity currentUser, String ctl, String db,
|
||||
String tbl, String col) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<? extends RowFilterPolicy> evalRowFilterPolicies(UserIdentity currentUser, String ctl,
|
||||
String db, String tbl) throws AnalysisException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -28,7 +28,9 @@ import org.apache.doris.datasource.test.TestExternalCatalog.TestCatalogProvider;
|
||||
import org.apache.doris.mysql.privilege.AccessControllerFactory;
|
||||
import org.apache.doris.mysql.privilege.AccessControllerManager;
|
||||
import org.apache.doris.mysql.privilege.CatalogAccessController;
|
||||
import org.apache.doris.mysql.privilege.DataMaskPolicy;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.mysql.privilege.RowFilterPolicy;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.nereids.util.PlanChecker;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
@ -45,6 +47,7 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public class TestCheckPrivileges extends TestWithFeService {
|
||||
@ -298,6 +301,18 @@ public class TestCheckPrivileges extends TestWithFeService {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<DataMaskPolicy> evalDataMaskPolicy(UserIdentity currentUser, String ctl, String db, String tbl,
|
||||
String col) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<? extends RowFilterPolicy> evalRowFilterPolicies(UserIdentity currentUser, String ctl, String db,
|
||||
String tbl) throws org.apache.doris.common.AnalysisException {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
}
|
||||
|
||||
private static class MakePrivileges {
|
||||
|
||||
Reference in New Issue
Block a user