[refactor](policy) refactor some policy create and check logic (#11007)
* [refactor](policy) refactor some policy create and check logic
This commit is contained in:
@ -59,7 +59,6 @@ import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.DynamicPartitionUtil;
|
||||
import org.apache.doris.common.util.MetaLockUtils;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.datasource.InternalDataSource;
|
||||
import org.apache.doris.persist.AlterViewInfo;
|
||||
import org.apache.doris.persist.BatchModifyPartitionsInfo;
|
||||
import org.apache.doris.persist.ModifyCommentOperationLog;
|
||||
@ -161,7 +160,7 @@ public class Alter {
|
||||
}
|
||||
String currentStoragePolicy = currentAlterOps.getTableStoragePolicy(alterClauses);
|
||||
// check currentStoragePolicy resource exist.
|
||||
InternalDataSource.checkStoragePolicyExist(currentStoragePolicy);
|
||||
Catalog.getCurrentCatalog().getPolicyMgr().checkStoragePolicyExist(currentStoragePolicy);
|
||||
|
||||
olapTable.setStoragePolicy(currentStoragePolicy);
|
||||
needProcessOutsideTableLock = true;
|
||||
@ -713,7 +712,7 @@ public class Alter {
|
||||
String currentStoragePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties);
|
||||
if (!currentStoragePolicy.equals("")) {
|
||||
// check currentStoragePolicy resource exist.
|
||||
InternalDataSource.checkStoragePolicyExist(currentStoragePolicy);
|
||||
Catalog.getCurrentCatalog().getPolicyMgr().checkStoragePolicyExist(currentStoragePolicy);
|
||||
partitionInfo.setStoragePolicy(partition.getId(), currentStoragePolicy);
|
||||
}
|
||||
|
||||
|
||||
@ -19,7 +19,6 @@ package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.UserException;
|
||||
@ -77,10 +76,10 @@ public class AlterPolicyStmt extends DdlStmt {
|
||||
StoragePolicy storagePolicy = (StoragePolicy) hasPolicy.get();
|
||||
|
||||
// default storage policy use alter storage policy to add s3 resource.
|
||||
if (!policyName.equalsIgnoreCase(Config.default_storage_policy)
|
||||
&& properties.containsKey(StoragePolicy.STORAGE_RESOURCE)) {
|
||||
if (!policyName.equalsIgnoreCase(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME) && properties.containsKey(
|
||||
StoragePolicy.STORAGE_RESOURCE)) {
|
||||
throw new AnalysisException("not support change storage policy's storage resource"
|
||||
+ ", you can change s3 properties by alter resource");
|
||||
+ ", you can change s3 properties by alter resource");
|
||||
}
|
||||
|
||||
boolean hasCooldownDatetime = false;
|
||||
@ -114,7 +113,7 @@ public class AlterPolicyStmt extends DdlStmt {
|
||||
}
|
||||
|
||||
do {
|
||||
if (policyName.equalsIgnoreCase(Config.default_storage_policy)) {
|
||||
if (policyName.equalsIgnoreCase(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME)) {
|
||||
// default storage policy
|
||||
if (storagePolicy.getStorageResource() != null && hasCooldownDatetime) {
|
||||
// alter cooldown datetime, can do
|
||||
|
||||
@ -1261,6 +1261,8 @@ public class Catalog {
|
||||
initDefaultCluster();
|
||||
}
|
||||
|
||||
getPolicyMgr().createDefaultStoragePolicy();
|
||||
|
||||
// MUST set master ip before starting checkpoint thread.
|
||||
// because checkpoint thread need this info to select non-master FE to push image
|
||||
this.masterIp = FrontendOptions.getLocalHostAddress();
|
||||
|
||||
@ -32,7 +32,6 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.persist.DropResourceOperationLog;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.policy.Policy;
|
||||
import org.apache.doris.policy.PolicyTypeEnum;
|
||||
import org.apache.doris.policy.StoragePolicy;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
@ -109,7 +108,7 @@ public class ResourceMgr implements Writable {
|
||||
}
|
||||
|
||||
// Check whether the resource is in use before deleting it, except spark resource
|
||||
StoragePolicy checkedStoragePolicy = new StoragePolicy(PolicyTypeEnum.STORAGE, null);
|
||||
StoragePolicy checkedStoragePolicy = StoragePolicy.ofCheck(null);
|
||||
checkedStoragePolicy.setStorageResource(resourceName);
|
||||
if (Catalog.getCurrentCatalog().getPolicyMgr().existPolicy(checkedStoragePolicy)) {
|
||||
Policy policy = Catalog.getCurrentCatalog().getPolicyMgr().getPolicy(checkedStoragePolicy);
|
||||
|
||||
@ -1691,9 +1691,6 @@ public class Config extends ConfigBase {
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static boolean use_date_v2_by_default = false;
|
||||
|
||||
@ConfField
|
||||
public static String default_storage_policy = "default_storage_policy";
|
||||
|
||||
@ConfField(mutable = false, masterOnly = true)
|
||||
public static boolean enable_multi_tags = false;
|
||||
}
|
||||
|
||||
@ -38,7 +38,6 @@ import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.FeNameFormat;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.policy.PolicyTypeEnum;
|
||||
import org.apache.doris.policy.StoragePolicy;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -343,23 +342,24 @@ public class DynamicPartitionUtil {
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkRemoteStoragePolicy(String val) throws DdlException {
|
||||
if (Strings.isNullOrEmpty(val)) {
|
||||
private static void checkRemoteStoragePolicy(String policyName) throws DdlException {
|
||||
if (Strings.isNullOrEmpty(policyName)) {
|
||||
LOG.info(DynamicPartitionProperty.REMOTE_STORAGE_POLICY + " is null, remove this key");
|
||||
return;
|
||||
}
|
||||
if (val.isEmpty()) {
|
||||
if (policyName.isEmpty()) {
|
||||
throw new DdlException(DynamicPartitionProperty.REMOTE_STORAGE_POLICY + " is empty.");
|
||||
}
|
||||
StoragePolicy checkedPolicyCondition = new StoragePolicy(PolicyTypeEnum.STORAGE, val);
|
||||
StoragePolicy checkedPolicyCondition = StoragePolicy.ofCheck(policyName);
|
||||
if (!Catalog.getCurrentCatalog().getPolicyMgr().existPolicy(checkedPolicyCondition)) {
|
||||
throw new DdlException(DynamicPartitionProperty.REMOTE_STORAGE_POLICY + ": " + val + " doesn't exist.");
|
||||
throw new DdlException(
|
||||
DynamicPartitionProperty.REMOTE_STORAGE_POLICY + ": " + policyName + " doesn't exist.");
|
||||
}
|
||||
StoragePolicy storagePolicy = (StoragePolicy) Catalog.getCurrentCatalog()
|
||||
.getPolicyMgr().getPolicy(checkedPolicyCondition);
|
||||
if (Strings.isNullOrEmpty(storagePolicy.getCooldownTtl())) {
|
||||
throw new DdlException("Storage policy cooldown type need to be cooldownTtl for properties "
|
||||
+ DynamicPartitionProperty.REMOTE_STORAGE_POLICY + ": " + val);
|
||||
+ DynamicPartitionProperty.REMOTE_STORAGE_POLICY + ": " + policyName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -31,7 +31,6 @@ import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.policy.Policy;
|
||||
import org.apache.doris.policy.PolicyTypeEnum;
|
||||
import org.apache.doris.policy.StoragePolicy;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.thrift.TCompressionType;
|
||||
@ -202,7 +201,7 @@ public class PropertyAnalyzer {
|
||||
|
||||
if (hasRemoteStoragePolicy) {
|
||||
// check remote storage policy
|
||||
StoragePolicy checkedPolicy = new StoragePolicy(PolicyTypeEnum.STORAGE, remoteStoragePolicy);
|
||||
StoragePolicy checkedPolicy = StoragePolicy.ofCheck(remoteStoragePolicy);
|
||||
Policy policy = Catalog.getCurrentCatalog().getPolicyMgr().getPolicy(checkedPolicy);
|
||||
if (!(policy instanceof StoragePolicy)) {
|
||||
throw new AnalysisException("No PolicyStorage: " + remoteStoragePolicy);
|
||||
@ -527,7 +526,7 @@ public class PropertyAnalyzer {
|
||||
if (properties != null && properties.containsKey(PROPERTIES_REMOTE_STORAGE_POLICY)) {
|
||||
remoteStoragePolicy = properties.get(PROPERTIES_REMOTE_STORAGE_POLICY);
|
||||
// check remote storage policy existence
|
||||
StoragePolicy checkedStoragePolicy = new StoragePolicy(PolicyTypeEnum.STORAGE, remoteStoragePolicy);
|
||||
StoragePolicy checkedStoragePolicy = StoragePolicy.ofCheck(remoteStoragePolicy);
|
||||
Policy policy = Catalog.getCurrentCatalog().getPolicyMgr().getPolicy(checkedStoragePolicy);
|
||||
if (!(policy instanceof StoragePolicy)) {
|
||||
throw new AnalysisException("StoragePolicy: " + remoteStoragePolicy + " does not exist.");
|
||||
|
||||
@ -150,9 +150,6 @@ import org.apache.doris.persist.PartitionPersistInfo;
|
||||
import org.apache.doris.persist.RecoverInfo;
|
||||
import org.apache.doris.persist.ReplicaPersistInfo;
|
||||
import org.apache.doris.persist.TruncateTableInfo;
|
||||
import org.apache.doris.policy.Policy;
|
||||
import org.apache.doris.policy.PolicyTypeEnum;
|
||||
import org.apache.doris.policy.StoragePolicy;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.Backend;
|
||||
@ -191,7 +188,6 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -1797,7 +1793,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
|
||||
// set storage policy
|
||||
String storagePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties);
|
||||
|
||||
checkStoragePolicyExist(storagePolicy);
|
||||
Catalog.getCurrentCatalog().getPolicyMgr().checkStoragePolicyExist(storagePolicy);
|
||||
|
||||
olapTable.setStoragePolicy(storagePolicy);
|
||||
|
||||
@ -1992,7 +1988,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
|
||||
if (!partionStoragePolicy.equals("")) {
|
||||
storagePolicy = partionStoragePolicy;
|
||||
}
|
||||
checkStoragePolicyExist(storagePolicy);
|
||||
Catalog.getCurrentCatalog().getPolicyMgr().checkStoragePolicyExist(storagePolicy);
|
||||
Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
|
||||
olapTable.getBaseIndexId(), entry.getValue(), entry.getKey(), olapTable.getIndexIdToMeta(),
|
||||
partitionDistributionInfo, dataProperty.getStorageMedium(),
|
||||
@ -2051,24 +2047,6 @@ public class InternalDataSource implements DataSourceIf<Database> {
|
||||
}
|
||||
}
|
||||
|
||||
public static void checkStoragePolicyExist(String storagePolicy) throws DdlException {
|
||||
if (!storagePolicy.equals("")) {
|
||||
// when create table use storage policy
|
||||
// if not exist default storage policy, create it
|
||||
// if exist, just return.
|
||||
Catalog.getCurrentCatalog().getPolicyMgr().createDefaultStoragePolicy();
|
||||
|
||||
List<Policy> policiesByType = Catalog.getCurrentCatalog()
|
||||
.getPolicyMgr().getPoliciesByType(PolicyTypeEnum.STORAGE);
|
||||
policiesByType.stream().filter(policy -> policy.getPolicyName().equals(storagePolicy)).findAny()
|
||||
.orElseThrow(() -> new DdlException("Storage policy does not exist. name: " + storagePolicy));
|
||||
Optional<Policy> hasDefaultPolicy = policiesByType.stream()
|
||||
.filter(policy -> policy.getPolicyName().equals(Config.default_storage_policy)).findAny();
|
||||
|
||||
StoragePolicy.checkDefaultStoragePolicyValid(storagePolicy, hasDefaultPolicy);
|
||||
}
|
||||
}
|
||||
|
||||
private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {
|
||||
String tableName = stmt.getTableName();
|
||||
|
||||
|
||||
@ -57,10 +57,8 @@ public abstract class Policy implements Writable, GsonPostProcessable {
|
||||
@SerializedName(value = "policyName")
|
||||
protected String policyName = null;
|
||||
|
||||
public Policy() {
|
||||
if (Catalog.getCurrentCatalog().isMaster()) {
|
||||
policyId = Catalog.getCurrentCatalog().getNextId();
|
||||
}
|
||||
public Policy(PolicyTypeEnum type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -69,8 +67,8 @@ public abstract class Policy implements Writable, GsonPostProcessable {
|
||||
* @param type policy type
|
||||
* @param policyName policy name
|
||||
*/
|
||||
public Policy(final PolicyTypeEnum type, final String policyName) {
|
||||
policyId = Catalog.getCurrentCatalog().getNextId();
|
||||
public Policy(long policyId, final PolicyTypeEnum type, final String policyName) {
|
||||
this.policyId = policyId;
|
||||
this.type = type;
|
||||
this.policyName = policyName;
|
||||
}
|
||||
@ -79,13 +77,13 @@ public abstract class Policy implements Writable, GsonPostProcessable {
|
||||
* Trans stmt to Policy.
|
||||
**/
|
||||
public static Policy fromCreateStmt(CreatePolicyStmt stmt) throws AnalysisException {
|
||||
long policyId = Catalog.getCurrentCatalog().getNextId();
|
||||
switch (stmt.getType()) {
|
||||
case STORAGE:
|
||||
StoragePolicy storagePolicy = new StoragePolicy(stmt.getType(), stmt.getPolicyName());
|
||||
StoragePolicy storagePolicy = new StoragePolicy(policyId, stmt.getPolicyName());
|
||||
storagePolicy.init(stmt.getProperties(), stmt.isIfNotExists());
|
||||
return storagePolicy;
|
||||
case ROW:
|
||||
default:
|
||||
// stmt must be analyzed.
|
||||
DatabaseIf db = Catalog.getCurrentCatalog().getDataSourceMgr()
|
||||
.getCatalogOrAnalysisException(stmt.getTableName().getCtl())
|
||||
@ -93,9 +91,10 @@ public abstract class Policy implements Writable, GsonPostProcessable {
|
||||
UserIdentity userIdent = stmt.getUser();
|
||||
userIdent.analyze(ConnectContext.get().getClusterName());
|
||||
TableIf table = db.getTableOrAnalysisException(stmt.getTableName().getTbl());
|
||||
return new RowPolicy(stmt.getType(), stmt.getPolicyName(), db.getId(), userIdent,
|
||||
stmt.getOrigStmt().originStmt, table.getId(), stmt.getFilterType(),
|
||||
stmt.getWherePredicate());
|
||||
return new RowPolicy(policyId, stmt.getPolicyName(), db.getId(), userIdent,
|
||||
stmt.getOrigStmt().originStmt, table.getId(), stmt.getFilterType(), stmt.getWherePredicate());
|
||||
default:
|
||||
throw new AnalysisException("Unknown policy type: " + stmt.getType());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -24,7 +24,6 @@ import org.apache.doris.analysis.DropPolicyStmt;
|
||||
import org.apache.doris.analysis.ShowPolicyStmt;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
@ -40,6 +39,7 @@ import com.google.common.collect.Sets;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.parquet.Strings;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
@ -89,17 +89,16 @@ public class PolicyMgr implements Writable {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
public void createDefaultStoragePolicy() throws DdlException {
|
||||
Optional<Policy> hasDefault = findPolicy(Config.default_storage_policy, PolicyTypeEnum.STORAGE);
|
||||
if (hasDefault.isPresent()) {
|
||||
// already exist default storage policy, just return.
|
||||
return;
|
||||
}
|
||||
|
||||
public void createDefaultStoragePolicy() {
|
||||
writeLock();
|
||||
try {
|
||||
StoragePolicy defaultStoragePolicy =
|
||||
new StoragePolicy(PolicyTypeEnum.STORAGE, Config.default_storage_policy);
|
||||
Optional<Policy> hasDefault = findPolicy(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME, PolicyTypeEnum.STORAGE);
|
||||
if (hasDefault.isPresent()) {
|
||||
// already exist default storage policy, just return.
|
||||
return;
|
||||
}
|
||||
long policyId = Catalog.getCurrentCatalog().getNextId();
|
||||
StoragePolicy defaultStoragePolicy = new StoragePolicy(policyId, StoragePolicy.DEFAULT_STORAGE_POLICY_NAME);
|
||||
unprotectedAdd(defaultStoragePolicy);
|
||||
Catalog.getCurrentCatalog().getEditLog().logCreatePolicy(defaultStoragePolicy);
|
||||
} finally {
|
||||
@ -413,10 +412,6 @@ public class PolicyMgr implements Writable {
|
||||
throw new DdlException("Current not support alter row policy");
|
||||
}
|
||||
|
||||
if (storagePolicyName.equalsIgnoreCase(Config.default_storage_policy)) {
|
||||
createDefaultStoragePolicy();
|
||||
}
|
||||
|
||||
Optional<Policy> policy = findPolicy(storagePolicyName, PolicyTypeEnum.STORAGE);
|
||||
|
||||
if (!policy.isPresent()) {
|
||||
@ -429,4 +424,23 @@ public class PolicyMgr implements Writable {
|
||||
Catalog.getCurrentCatalog().getEditLog().logAlterStoragePolicy(storagePolicy);
|
||||
LOG.info("Alter storage policy success. policy: {}", storagePolicy);
|
||||
}
|
||||
|
||||
public void checkStoragePolicyExist(String storagePolicyName) throws DdlException {
|
||||
if (Strings.isNullOrEmpty(storagePolicyName)) {
|
||||
return;
|
||||
}
|
||||
readLock();
|
||||
try {
|
||||
List<Policy> policiesByType = Catalog.getCurrentCatalog().getPolicyMgr()
|
||||
.getPoliciesByType(PolicyTypeEnum.STORAGE);
|
||||
policiesByType.stream().filter(policy -> policy.getPolicyName().equals(storagePolicyName)).findAny()
|
||||
.orElseThrow(() -> new DdlException("Storage policy does not exist. name: " + storagePolicyName));
|
||||
Optional<Policy> hasDefaultPolicy = policiesByType.stream()
|
||||
.filter(policy -> policy.getPolicyName().equals(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME))
|
||||
.findAny();
|
||||
StoragePolicy.checkDefaultStoragePolicyValid(storagePolicyName, hasDefaultPolicy);
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,12 +88,14 @@ public class RowPolicy extends Policy {
|
||||
|
||||
private Expr wherePredicate = null;
|
||||
|
||||
public RowPolicy() {}
|
||||
public RowPolicy() {
|
||||
super(PolicyTypeEnum.ROW);
|
||||
}
|
||||
|
||||
/**
|
||||
* Policy for Table. Policy of ROW or others.
|
||||
*
|
||||
* @param type PolicyType
|
||||
* @param policyId policy id
|
||||
* @param policyName policy name
|
||||
* @param dbId database i
|
||||
* @param user username
|
||||
@ -102,10 +104,9 @@ public class RowPolicy extends Policy {
|
||||
* @param filterType filter type
|
||||
* @param wherePredicate where predicate
|
||||
*/
|
||||
public RowPolicy(final PolicyTypeEnum type, final String policyName, long dbId,
|
||||
UserIdentity user, String originStmt, final long tableId,
|
||||
final FilterType filterType, final Expr wherePredicate) {
|
||||
super(type, policyName);
|
||||
public RowPolicy(long policyId, final String policyName, long dbId, UserIdentity user, String originStmt,
|
||||
final long tableId, final FilterType filterType, final Expr wherePredicate) {
|
||||
super(policyId, PolicyTypeEnum.ROW, policyName);
|
||||
this.user = user;
|
||||
this.dbId = dbId;
|
||||
this.tableId = tableId;
|
||||
@ -141,8 +142,8 @@ public class RowPolicy extends Policy {
|
||||
|
||||
@Override
|
||||
public RowPolicy clone() {
|
||||
return new RowPolicy(this.type, this.policyName, this.dbId, this.user, this.originStmt, this.tableId,
|
||||
this.filterType, this.wherePredicate);
|
||||
return new RowPolicy(this.policyId, this.policyName, this.dbId, this.user, this.originStmt, this.tableId,
|
||||
this.filterType, this.wherePredicate);
|
||||
}
|
||||
|
||||
private boolean checkMatched(long dbId, long tableId, PolicyTypeEnum type,
|
||||
|
||||
@ -23,7 +23,6 @@ import org.apache.doris.catalog.Resource;
|
||||
import org.apache.doris.catalog.S3Resource;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
@ -57,16 +56,18 @@ import java.util.Optional;
|
||||
**/
|
||||
@Data
|
||||
public class StoragePolicy extends Policy {
|
||||
public static boolean checkDefaultStoragePolicyValid(final String storagePolicyName,
|
||||
Optional<Policy> defaultPolicy) throws DdlException {
|
||||
public static final String DEFAULT_STORAGE_POLICY_NAME = "default_storage_policy";
|
||||
|
||||
public static boolean checkDefaultStoragePolicyValid(final String storagePolicyName, Optional<Policy> defaultPolicy)
|
||||
throws DdlException {
|
||||
if (!defaultPolicy.isPresent()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (storagePolicyName.equalsIgnoreCase(Config.default_storage_policy)
|
||||
&& (((StoragePolicy) defaultPolicy.get()).getStorageResource() == null)) {
|
||||
if (storagePolicyName.equalsIgnoreCase(DEFAULT_STORAGE_POLICY_NAME) && (
|
||||
((StoragePolicy) defaultPolicy.get()).getStorageResource() == null)) {
|
||||
throw new DdlException("Use default storage policy, but not give s3 info,"
|
||||
+ " please use alter resource to add default storage policy S3 info.");
|
||||
+ " please use alter resource to add default storage policy S3 info.");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@ -116,21 +117,23 @@ public class StoragePolicy extends Policy {
|
||||
|
||||
private Map<String, String> props;
|
||||
|
||||
public StoragePolicy() {}
|
||||
public StoragePolicy() {
|
||||
super(PolicyTypeEnum.STORAGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Policy for Storage Migration.
|
||||
*
|
||||
* @param type PolicyType
|
||||
* @param policyId policy id
|
||||
* @param policyName policy name
|
||||
* @param storageResource resource name for storage
|
||||
* @param cooldownDatetime cool down time
|
||||
* @param cooldownTtl cool down time cost after partition is created
|
||||
* @param cooldownTtlMs seconds for cooldownTtl
|
||||
*/
|
||||
public StoragePolicy(final PolicyTypeEnum type, final String policyName, final String storageResource,
|
||||
final Date cooldownDatetime, final String cooldownTtl, long cooldownTtlMs) {
|
||||
super(type, policyName);
|
||||
public StoragePolicy(long policyId, final String policyName, final String storageResource,
|
||||
final Date cooldownDatetime, final String cooldownTtl, long cooldownTtlMs) {
|
||||
super(policyId, PolicyTypeEnum.STORAGE, policyName);
|
||||
this.storageResource = storageResource;
|
||||
this.cooldownDatetime = cooldownDatetime;
|
||||
this.cooldownTtl = cooldownTtl;
|
||||
@ -140,11 +143,17 @@ public class StoragePolicy extends Policy {
|
||||
/**
|
||||
* Policy for Storage Migration.
|
||||
*
|
||||
* @param type PolicyType
|
||||
* @param policyId policy id
|
||||
* @param policyName policy name
|
||||
*/
|
||||
public StoragePolicy(final PolicyTypeEnum type, final String policyName) {
|
||||
super(type, policyName);
|
||||
public StoragePolicy(long policyId, final String policyName) {
|
||||
super(policyId, PolicyTypeEnum.STORAGE, policyName);
|
||||
}
|
||||
|
||||
public static StoragePolicy ofCheck(String policyName) {
|
||||
StoragePolicy storagePolicy = new StoragePolicy();
|
||||
storagePolicy.policyName = policyName;
|
||||
return storagePolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -240,8 +249,8 @@ public class StoragePolicy extends Policy {
|
||||
|
||||
@Override
|
||||
public StoragePolicy clone() {
|
||||
return new StoragePolicy(this.type, this.policyName, this.storageResource,
|
||||
this.cooldownDatetime, this.cooldownTtl, this.cooldownTtlMs);
|
||||
return new StoragePolicy(this.policyId, this.policyName, this.storageResource, this.cooldownDatetime,
|
||||
this.cooldownTtl, this.cooldownTtlMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -356,11 +365,10 @@ public class StoragePolicy extends Policy {
|
||||
}
|
||||
});
|
||||
|
||||
if (policyName.equalsIgnoreCase(Config.default_storage_policy) && storageResource == null) {
|
||||
if (policyName.equalsIgnoreCase(DEFAULT_STORAGE_POLICY_NAME) && storageResource == null) {
|
||||
// here first time set S3 resource to default storage policy.
|
||||
String alterStorageResource = Optional.ofNullable(properties.get(STORAGE_RESOURCE))
|
||||
.orElseThrow(() ->
|
||||
new DdlException("first time set default storage policy, but not give storageResource"));
|
||||
String alterStorageResource = Optional.ofNullable(properties.get(STORAGE_RESOURCE)).orElseThrow(
|
||||
() -> new DdlException("first time set default storage policy, but not give storageResource"));
|
||||
// check alterStorageResource resource exist.
|
||||
checkIsS3ResourceAndExist(alterStorageResource);
|
||||
storageResource = alterStorageResource;
|
||||
|
||||
@ -18,6 +18,9 @@
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.analysis.CompoundPredicate.Operator;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ExceptionChecker;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
@ -36,13 +39,15 @@ import java.util.List;
|
||||
public class CancelLoadStmtTest extends TestWithFeService {
|
||||
|
||||
private Analyzer analyzer;
|
||||
private String dbName = "testDb";
|
||||
private String tblName = "table1";
|
||||
|
||||
@Override
|
||||
protected void runBeforeAll() throws Exception {
|
||||
FeConstants.runningUnitTest = true;
|
||||
createDatabase("testDb");
|
||||
useDatabase("testDb");
|
||||
createTable("create table table1\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
|
||||
createDatabase(dbName);
|
||||
useDatabase(dbName);
|
||||
createTable("create table " + tblName + "\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
|
||||
+ "properties(\"replication_num\" = \"1\");");
|
||||
analyzer = new Analyzer(connectContext.getCatalog(), connectContext);
|
||||
}
|
||||
@ -55,36 +60,36 @@ public class CancelLoadStmtTest extends TestWithFeService {
|
||||
SlotRef stateSlotRef = new SlotRef(null, "state");
|
||||
StringLiteral stateStringLiteral = new StringLiteral("FINISHED");
|
||||
|
||||
BinaryPredicate labelBinaryPredicate =
|
||||
new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral);
|
||||
BinaryPredicate labelBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef,
|
||||
labelStringLiteral);
|
||||
CancelLoadStmt stmt = new CancelLoadStmt(null, labelBinaryPredicate);
|
||||
stmt.analyze(analyzer);
|
||||
Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label'",
|
||||
stmt.toString());
|
||||
|
||||
BinaryPredicate stateBinaryPredicate =
|
||||
new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral);
|
||||
BinaryPredicate stateBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef,
|
||||
stateStringLiteral);
|
||||
stmt = new CancelLoadStmt(null, stateBinaryPredicate);
|
||||
stmt.analyze(analyzer);
|
||||
Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `state` = 'FINISHED'", stmt.toString());
|
||||
|
||||
LikePredicate labelLikePredicate =
|
||||
new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef, labelStringLiteral);
|
||||
LikePredicate labelLikePredicate = new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef,
|
||||
labelStringLiteral);
|
||||
stmt = new CancelLoadStmt(null, labelLikePredicate);
|
||||
stmt.analyze(analyzer);
|
||||
Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` LIKE 'doris_test_label'",
|
||||
stmt.toString());
|
||||
|
||||
CompoundPredicate compoundAndPredicate =
|
||||
new CompoundPredicate(Operator.AND, labelBinaryPredicate, stateBinaryPredicate);
|
||||
CompoundPredicate compoundAndPredicate = new CompoundPredicate(Operator.AND, labelBinaryPredicate,
|
||||
stateBinaryPredicate);
|
||||
stmt = new CancelLoadStmt(null, compoundAndPredicate);
|
||||
stmt.analyze(analyzer);
|
||||
Assertions.assertEquals(
|
||||
"CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label' AND `state` = 'FINISHED'",
|
||||
stmt.toString());
|
||||
|
||||
CompoundPredicate compoundOrPredicate =
|
||||
new CompoundPredicate(Operator.OR, labelBinaryPredicate, stateBinaryPredicate);
|
||||
CompoundPredicate compoundOrPredicate = new CompoundPredicate(Operator.OR, labelBinaryPredicate,
|
||||
stateBinaryPredicate);
|
||||
stmt = new CancelLoadStmt(null, compoundOrPredicate);
|
||||
stmt.analyze(analyzer);
|
||||
Assertions.assertEquals(
|
||||
@ -93,11 +98,15 @@ public class CancelLoadStmtTest extends TestWithFeService {
|
||||
|
||||
// test match
|
||||
List<LoadJob> loadJobs = new ArrayList<>();
|
||||
InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, 10003L, 10005L, 0, "", "");
|
||||
Database db = Catalog.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:testDb");
|
||||
long dbId = db.getId();
|
||||
Table tbl = db.getTableNullable(tblName);
|
||||
long tblId = tbl.getId();
|
||||
InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, dbId, tblId, 0, "", "");
|
||||
loadJobs.add(insertLoadJob1);
|
||||
InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, 10003L, 10005L, 0, "", "");
|
||||
InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, dbId, tblId, 0, "", "");
|
||||
loadJobs.add(insertLoadJob2);
|
||||
InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, 10003L, 10005L, 0, "", "");
|
||||
InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, dbId, tblId, 0, "", "");
|
||||
loadJobs.add(insertLoadJob3);
|
||||
// label
|
||||
stmt = new CancelLoadStmt(null, labelBinaryPredicate);
|
||||
|
||||
@ -201,13 +201,13 @@ public class PolicyTest extends TestWithFeService {
|
||||
long dbId = 10;
|
||||
UserIdentity user = new UserIdentity("test_policy", "%");
|
||||
String originStmt = "CREATE ROW POLICY test_row_policy ON test.table1"
|
||||
+ " AS PERMISSIVE TO test_policy USING (k1 = 1)";
|
||||
+ " AS PERMISSIVE TO test_policy USING (k1 = 1)";
|
||||
long tableId = 100;
|
||||
FilterType filterType = FilterType.PERMISSIVE;
|
||||
Expr wherePredicate = null;
|
||||
|
||||
Policy rowPolicy = new RowPolicy(type, policyName, dbId, user,
|
||||
originStmt, tableId, filterType, wherePredicate);
|
||||
Policy rowPolicy = new RowPolicy(10000, policyName, dbId, user, originStmt, tableId, filterType,
|
||||
wherePredicate);
|
||||
|
||||
ByteArrayOutputStream emptyOutputStream = new ByteArrayOutputStream();
|
||||
DataOutputStream output = new DataOutputStream(emptyOutputStream);
|
||||
@ -218,6 +218,7 @@ public class PolicyTest extends TestWithFeService {
|
||||
Policy newPolicy = Policy.read(input);
|
||||
Assertions.assertTrue(newPolicy instanceof RowPolicy);
|
||||
RowPolicy newRowPolicy = (RowPolicy) newPolicy;
|
||||
Assertions.assertEquals(rowPolicy.getPolicyId(), newRowPolicy.getPolicyId());
|
||||
Assertions.assertEquals(type, newRowPolicy.getType());
|
||||
Assertions.assertEquals(policyName, newRowPolicy.getPolicyName());
|
||||
Assertions.assertEquals(dbId, newRowPolicy.getDbId());
|
||||
|
||||
Reference in New Issue
Block a user