[enhance](auth)support cache ranger datamask and row filter (#37723) (#38575)

pick: https://github.com/apache/doris/pull/37723
This commit is contained in:
zhangdong
2024-08-02 14:59:32 +08:00
committed by GitHub
parent f24d55fc94
commit 2425730609
14 changed files with 528 additions and 5 deletions

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.authorizer.ranger.cache;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.common.AuthorizationException;
import org.apache.doris.mysql.privilege.CatalogAccessController;
import org.apache.doris.mysql.privilege.DataMaskPolicy;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.RowFilterPolicy;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public abstract class CatalogCacheAccessController implements CatalogAccessController {
public abstract CatalogAccessController getProxyController();
public abstract RangerCache getCache();
@Override
public boolean checkGlobalPriv(UserIdentity currentUser, PrivPredicate wanted) {
return getProxyController().checkGlobalPriv(currentUser, wanted);
}
@Override
public boolean checkCtlPriv(UserIdentity currentUser, String ctl, PrivPredicate wanted) {
return getProxyController().checkCtlPriv(currentUser, ctl, wanted);
}
@Override
public boolean checkDbPriv(UserIdentity currentUser, String ctl, String db, PrivPredicate wanted) {
return getProxyController().checkDbPriv(currentUser, ctl, db, wanted);
}
@Override
public boolean checkTblPriv(UserIdentity currentUser, String ctl, String db, String tbl, PrivPredicate wanted) {
return getProxyController().checkTblPriv(currentUser, ctl, db, tbl, wanted);
}
@Override
public boolean checkResourcePriv(UserIdentity currentUser, String resourceName, PrivPredicate wanted) {
return getProxyController().checkResourcePriv(currentUser, resourceName, wanted);
}
@Override
public boolean checkWorkloadGroupPriv(UserIdentity currentUser, String workloadGroupName, PrivPredicate wanted) {
return getProxyController().checkWorkloadGroupPriv(currentUser, workloadGroupName, wanted);
}
@Override
public void checkColsPriv(UserIdentity currentUser, String ctl, String db, String tbl, Set<String> cols,
PrivPredicate wanted) throws AuthorizationException {
getProxyController().checkColsPriv(currentUser, ctl, db, tbl, cols, wanted);
}
@Override
public Optional<DataMaskPolicy> evalDataMaskPolicy(UserIdentity currentUser, String ctl, String db, String tbl,
String col) {
return getCache().getDataMask(new DatamaskCacheKey(currentUser, ctl, db, tbl, col));
}
@Override
public List<? extends RowFilterPolicy> evalRowFilterPolicies(UserIdentity currentUser, String ctl, String db,
String tbl) {
return getCache().getRowFilters(new RowFilterCacheKey(currentUser, ctl, db, tbl));
}
}

View File

@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.authorizer.ranger.cache;
import org.apache.doris.analysis.UserIdentity;
import com.google.common.base.Objects;
public class DatamaskCacheKey {
private UserIdentity userIdentity;
private String ctl;
private String db;
private String tbl;
private String col;
public DatamaskCacheKey(UserIdentity userIdentity, String ctl, String db, String tbl, String col) {
this.userIdentity = userIdentity;
this.ctl = ctl;
this.db = db;
this.tbl = tbl;
this.col = col;
}
public UserIdentity getUserIdentity() {
return userIdentity;
}
public String getCtl() {
return ctl;
}
public String getDb() {
return db;
}
public String getTbl() {
return tbl;
}
public String getCol() {
return col;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DatamaskCacheKey that = (DatamaskCacheKey) o;
return Objects.equal(userIdentity, that.userIdentity)
&& Objects.equal(ctl, that.ctl) && Objects.equal(db, that.db)
&& Objects.equal(tbl, that.tbl) && Objects.equal(col,
that.col);
}
@Override
public int hashCode() {
return Objects.hashCode(userIdentity, ctl, db, tbl, col);
}
@Override
public String toString() {
return "DatamaskCacheKey{"
+ "userIdentity=" + userIdentity
+ ", ctl='" + ctl + '\''
+ ", db='" + db + '\''
+ ", tbl='" + tbl + '\''
+ ", col='" + col + '\''
+ '}';
}
}

View File

@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.authorizer.ranger.cache;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.mysql.privilege.CatalogAccessController;
import org.apache.doris.mysql.privilege.DataMaskPolicy;
import org.apache.doris.mysql.privilege.RowFilterPolicy;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
public class RangerCache {
private static final Logger LOG = LoggerFactory.getLogger(RangerCache.class);
private CatalogAccessController controller;
private LoadingCache<DatamaskCacheKey, Optional<DataMaskPolicy>> datamaskCache = CacheBuilder.newBuilder()
.maximumSize(Config.ranger_cache_size)
.build(new CacheLoader<DatamaskCacheKey, Optional<DataMaskPolicy>>() {
@Override
public Optional<DataMaskPolicy> load(DatamaskCacheKey key) {
return loadDataMask(key);
}
});
private LoadingCache<RowFilterCacheKey, List<? extends RowFilterPolicy>> rowFilterCache = CacheBuilder.newBuilder()
.maximumSize(Config.ranger_cache_size)
.build(new CacheLoader<RowFilterCacheKey, List<? extends RowFilterPolicy>>() {
@Override
public List<? extends RowFilterPolicy> load(RowFilterCacheKey key) {
return loadRowFilter(key);
}
});
public RangerCache() {
}
public void init(CatalogAccessController controller) {
this.controller = controller;
}
private Optional<DataMaskPolicy> loadDataMask(DatamaskCacheKey key) {
Objects.requireNonNull(controller, "controller can not be null");
if (LOG.isDebugEnabled()) {
LOG.debug("load datamask: {}", key);
}
return controller.evalDataMaskPolicy(key.getUserIdentity(), key.getCtl(), key.getDb(), key.getTbl(),
key.getCol());
}
private List<? extends RowFilterPolicy> loadRowFilter(RowFilterCacheKey key) {
Objects.requireNonNull(controller, "controller can not be null");
if (LOG.isDebugEnabled()) {
LOG.debug("load row filter: {}", key);
}
return controller.evalRowFilterPolicies(key.getUserIdentity(), key.getCtl(), key.getDb(), key.getTbl());
}
public void invalidateDataMaskCache() {
datamaskCache.invalidateAll();
}
public void invalidateRowFilterCache() {
rowFilterCache.invalidateAll();
}
public Optional<DataMaskPolicy> getDataMask(DatamaskCacheKey key) {
try {
return datamaskCache.get(key);
} catch (ExecutionException e) {
throw new CacheException("failed to get datamask for:" + key, e);
}
}
public List<? extends RowFilterPolicy> getRowFilters(RowFilterCacheKey key) {
try {
return rowFilterCache.get(key);
} catch (ExecutionException e) {
throw new CacheException("failed to get row filter for:" + key, e);
}
}
}

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.authorizer.ranger.cache;
import org.apache.doris.catalog.authorizer.ranger.doris.RangerDorisAccessController;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.ranger.plugin.service.RangerAuthContextListener;
public class RangerCacheInvalidateListener implements RangerAuthContextListener {
private static final Logger LOG = LogManager.getLogger(RangerDorisAccessController.class);
private RangerCache cache;
public RangerCacheInvalidateListener(RangerCache cache) {
this.cache = cache;
}
@Override
public void contextChanged() {
LOG.info("ranger context changed");
cache.invalidateDataMaskCache();
cache.invalidateRowFilterCache();
}
}

View File

@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.authorizer.ranger.cache;
import org.apache.doris.analysis.UserIdentity;
import com.google.common.base.Objects;
public class RowFilterCacheKey {
private UserIdentity userIdentity;
private String ctl;
private String db;
private String tbl;
public RowFilterCacheKey(UserIdentity userIdentity, String ctl, String db, String tbl) {
this.userIdentity = userIdentity;
this.ctl = ctl;
this.db = db;
this.tbl = tbl;
}
public UserIdentity getUserIdentity() {
return userIdentity;
}
public String getCtl() {
return ctl;
}
public String getDb() {
return db;
}
public String getTbl() {
return tbl;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RowFilterCacheKey that = (RowFilterCacheKey) o;
return Objects.equal(userIdentity, that.userIdentity)
&& Objects.equal(ctl, that.ctl) && Objects.equal(db, that.db)
&& Objects.equal(tbl, that.tbl);
}
@Override
public int hashCode() {
return Objects.hashCode(userIdentity, ctl, db, tbl);
}
@Override
public String toString() {
return "DatamaskCacheKey{"
+ "userIdentity=" + userIdentity
+ ", ctl='" + ctl + '\''
+ ", db='" + db + '\''
+ ", tbl='" + tbl + '\''
+ '}';
}
}

View File

@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.authorizer.ranger.doris;
import org.apache.doris.catalog.authorizer.ranger.cache.CatalogCacheAccessController;
import org.apache.doris.catalog.authorizer.ranger.cache.RangerCache;
import org.apache.doris.catalog.authorizer.ranger.cache.RangerCacheInvalidateListener;
import org.apache.doris.mysql.privilege.CatalogAccessController;
public class RangerCacheDorisAccessController extends CatalogCacheAccessController {
private CatalogAccessController proxyController;
private RangerCache cache;
public RangerCacheDorisAccessController(String serviceName) {
this.cache = new RangerCache();
this.proxyController = new RangerDorisAccessController(serviceName, new RangerCacheInvalidateListener(cache));
this.cache.init(proxyController);
}
@Override
public CatalogAccessController getProxyController() {
return proxyController;
}
@Override
public RangerCache getCache() {
return cache;
}
}

View File

@ -32,6 +32,7 @@ import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
import org.apache.ranger.plugin.policyengine.RangerAccessResultProcessor;
import org.apache.ranger.plugin.service.RangerAuthContextListener;
import org.apache.ranger.plugin.service.RangerBasePlugin;
import java.util.ArrayList;
@ -49,7 +50,11 @@ public class RangerDorisAccessController extends RangerAccessController {
// private RangerHiveAuditHandler auditHandler;
public RangerDorisAccessController(String serviceName) {
dorisPlugin = new RangerDorisPlugin(serviceName);
this(serviceName, null);
}
public RangerDorisAccessController(String serviceName, RangerAuthContextListener rangerAuthContextListener) {
dorisPlugin = new RangerDorisPlugin(serviceName, rangerAuthContextListener);
// auditHandler = new RangerHiveAuditHandler(dorisPlugin.getConfig());
// start a timed log flusher
// logFlushTimer.scheduleAtFixedRate(new RangerHiveAuditLogFlusher(auditHandler), 10, 20L, TimeUnit.SECONDS);

View File

@ -17,11 +17,17 @@
package org.apache.doris.catalog.authorizer.ranger.doris;
import org.apache.ranger.plugin.service.RangerAuthContextListener;
import org.apache.ranger.plugin.service.RangerBasePlugin;
public class RangerDorisPlugin extends RangerBasePlugin {
public RangerDorisPlugin(String serviceName) {
this(serviceName, null);
}
public RangerDorisPlugin(String serviceName, RangerAuthContextListener rangerAuthContextListener) {
super(serviceName, null, null);
super.init();
super.registerAuthContextEventListener(rangerAuthContextListener);
}
}

View File

@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.authorizer.ranger.hive;
import org.apache.doris.catalog.authorizer.ranger.cache.CatalogCacheAccessController;
import org.apache.doris.catalog.authorizer.ranger.cache.RangerCache;
import org.apache.doris.catalog.authorizer.ranger.cache.RangerCacheInvalidateListener;
import org.apache.doris.mysql.privilege.CatalogAccessController;
import java.util.Map;
public class RangerCacheHiveAccessController extends CatalogCacheAccessController {
private CatalogAccessController proxyController;
private RangerCache cache;
public RangerCacheHiveAccessController(Map<String, String> properties) {
this.cache = new RangerCache();
this.proxyController = new RangerHiveAccessController(properties, new RangerCacheInvalidateListener(cache));
this.cache.init(proxyController);
}
@Override
public CatalogAccessController getProxyController() {
return proxyController;
}
@Override
public RangerCache getCache() {
return cache;
}
}

View File

@ -34,6 +34,7 @@ import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
import org.apache.ranger.plugin.policyengine.RangerAccessResultProcessor;
import org.apache.ranger.plugin.policyengine.RangerPolicyEngine;
import org.apache.ranger.plugin.service.RangerAuthContextListener;
import org.apache.ranger.plugin.service.RangerBasePlugin;
import java.util.ArrayList;
@ -54,8 +55,13 @@ public class RangerHiveAccessController extends RangerAccessController {
private RangerHiveAuditHandler auditHandler;
public RangerHiveAccessController(Map<String, String> properties) {
this(properties, null);
}
public RangerHiveAccessController(Map<String, String> properties,
RangerAuthContextListener rangerAuthContextListener) {
String serviceName = properties.get("ranger.service.name");
hivePlugin = new RangerHivePlugin(serviceName);
hivePlugin = new RangerHivePlugin(serviceName, rangerAuthContextListener);
auditHandler = new RangerHiveAuditHandler(hivePlugin.getConfig());
// start a timed log flusher
logFlushTimer.scheduleAtFixedRate(new RangerHiveAuditLogFlusher(auditHandler), 10, 20L, TimeUnit.SECONDS);

View File

@ -25,6 +25,6 @@ import java.util.Map;
public class RangerHiveAccessControllerFactory implements AccessControllerFactory {
@Override
public CatalogAccessController createAccessController(Map<String, String> prop) {
return new RangerHiveAccessController(prop);
return new RangerCacheHiveAccessController(prop);
}
}

View File

@ -17,11 +17,17 @@
package org.apache.doris.catalog.authorizer.ranger.hive;
import org.apache.ranger.plugin.service.RangerAuthContextListener;
import org.apache.ranger.plugin.service.RangerBasePlugin;
public class RangerHivePlugin extends RangerBasePlugin {
public RangerHivePlugin(String serviceName) {
super(serviceName, null);
}
public RangerHivePlugin(String serviceName, RangerAuthContextListener rangerAuthContextListener) {
super(serviceName, null, null);
super.init();
super.registerAuthContextEventListener(rangerAuthContextListener);
}
}

View File

@ -21,7 +21,7 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.authorizer.ranger.doris.RangerDorisAccessController;
import org.apache.doris.catalog.authorizer.ranger.doris.RangerCacheDorisAccessController;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
@ -57,7 +57,7 @@ public class AccessControllerManager {
public AccessControllerManager(Auth auth) {
this.auth = auth;
if (Config.access_controller_type.equalsIgnoreCase("ranger-doris")) {
defaultAccessController = new RangerDorisAccessController("doris");
defaultAccessController = new RangerCacheDorisAccessController("doris");
} else {
defaultAccessController = new InternalAccessController(auth);
}