diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 659a550be4..d41de3557f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -59,7 +59,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.PropertyAnalyzer; -import org.apache.doris.datasource.InternalDataSource; import org.apache.doris.persist.AlterViewInfo; import org.apache.doris.persist.BatchModifyPartitionsInfo; import org.apache.doris.persist.ModifyCommentOperationLog; @@ -161,7 +160,7 @@ public class Alter { } String currentStoragePolicy = currentAlterOps.getTableStoragePolicy(alterClauses); // check currentStoragePolicy resource exist. - InternalDataSource.checkStoragePolicyExist(currentStoragePolicy); + Catalog.getCurrentCatalog().getPolicyMgr().checkStoragePolicyExist(currentStoragePolicy); olapTable.setStoragePolicy(currentStoragePolicy); needProcessOutsideTableLock = true; @@ -713,7 +712,7 @@ public class Alter { String currentStoragePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties); if (!currentStoragePolicy.equals("")) { // check currentStoragePolicy resource exist. - InternalDataSource.checkStoragePolicyExist(currentStoragePolicy); + Catalog.getCurrentCatalog().getPolicyMgr().checkStoragePolicyExist(currentStoragePolicy); partitionInfo.setStoragePolicy(partition.getId(), currentStoragePolicy); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterPolicyStmt.java index d0fea950c8..ab94a30830 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterPolicyStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterPolicyStmt.java @@ -19,7 +19,6 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; @@ -77,10 +76,10 @@ public class AlterPolicyStmt extends DdlStmt { StoragePolicy storagePolicy = (StoragePolicy) hasPolicy.get(); // default storage policy use alter storage policy to add s3 resource. - if (!policyName.equalsIgnoreCase(Config.default_storage_policy) - && properties.containsKey(StoragePolicy.STORAGE_RESOURCE)) { + if (!policyName.equalsIgnoreCase(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME) && properties.containsKey( + StoragePolicy.STORAGE_RESOURCE)) { throw new AnalysisException("not support change storage policy's storage resource" - + ", you can change s3 properties by alter resource"); + + ", you can change s3 properties by alter resource"); } boolean hasCooldownDatetime = false; @@ -114,7 +113,7 @@ public class AlterPolicyStmt extends DdlStmt { } do { - if (policyName.equalsIgnoreCase(Config.default_storage_policy)) { + if (policyName.equalsIgnoreCase(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME)) { // default storage policy if (storagePolicy.getStorageResource() != null && hasCooldownDatetime) { // alter cooldown datetime, can do diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index e4f67a461f..d865769094 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -1261,6 +1261,8 @@ public class Catalog { initDefaultCluster(); } + getPolicyMgr().createDefaultStoragePolicy(); + // MUST set master ip before starting checkpoint thread. // because checkpoint thread need this info to select non-master FE to push image this.masterIp = FrontendOptions.getLocalHostAddress(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java index afbf36d935..6f194180be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java @@ -32,7 +32,6 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.DropResourceOperationLog; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.policy.Policy; -import org.apache.doris.policy.PolicyTypeEnum; import org.apache.doris.policy.StoragePolicy; import org.apache.doris.qe.ConnectContext; @@ -109,7 +108,7 @@ public class ResourceMgr implements Writable { } // Check whether the resource is in use before deleting it, except spark resource - StoragePolicy checkedStoragePolicy = new StoragePolicy(PolicyTypeEnum.STORAGE, null); + StoragePolicy checkedStoragePolicy = StoragePolicy.ofCheck(null); checkedStoragePolicy.setStorageResource(resourceName); if (Catalog.getCurrentCatalog().getPolicyMgr().existPolicy(checkedStoragePolicy)) { Policy policy = Catalog.getCurrentCatalog().getPolicyMgr().getPolicy(checkedStoragePolicy); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 307d0be79a..7367f97e8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1691,9 +1691,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static boolean use_date_v2_by_default = false; - @ConfField - public static String default_storage_policy = "default_storage_policy"; - @ConfField(mutable = false, masterOnly = true) public static boolean enable_multi_tags = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java index e308b93b9c..173a47f9aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java @@ -38,7 +38,6 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; -import org.apache.doris.policy.PolicyTypeEnum; import org.apache.doris.policy.StoragePolicy; import com.google.common.base.Preconditions; @@ -343,23 +342,24 @@ public class DynamicPartitionUtil { } } - private static void checkRemoteStoragePolicy(String val) throws DdlException { - if (Strings.isNullOrEmpty(val)) { + private static void checkRemoteStoragePolicy(String policyName) throws DdlException { + if (Strings.isNullOrEmpty(policyName)) { LOG.info(DynamicPartitionProperty.REMOTE_STORAGE_POLICY + " is null, remove this key"); return; } - if (val.isEmpty()) { + if (policyName.isEmpty()) { throw new DdlException(DynamicPartitionProperty.REMOTE_STORAGE_POLICY + " is empty."); } - StoragePolicy checkedPolicyCondition = new StoragePolicy(PolicyTypeEnum.STORAGE, val); + StoragePolicy checkedPolicyCondition = StoragePolicy.ofCheck(policyName); if (!Catalog.getCurrentCatalog().getPolicyMgr().existPolicy(checkedPolicyCondition)) { - throw new DdlException(DynamicPartitionProperty.REMOTE_STORAGE_POLICY + ": " + val + " doesn't exist."); + throw new DdlException( + DynamicPartitionProperty.REMOTE_STORAGE_POLICY + ": " + policyName + " doesn't exist."); } StoragePolicy storagePolicy = (StoragePolicy) Catalog.getCurrentCatalog() .getPolicyMgr().getPolicy(checkedPolicyCondition); if (Strings.isNullOrEmpty(storagePolicy.getCooldownTtl())) { throw new DdlException("Storage policy cooldown type need to be cooldownTtl for properties " - + DynamicPartitionProperty.REMOTE_STORAGE_POLICY + ": " + val); + + DynamicPartitionProperty.REMOTE_STORAGE_POLICY + ": " + policyName); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 76df8ad09d..0f2b861ba3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -31,7 +31,6 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.policy.Policy; -import org.apache.doris.policy.PolicyTypeEnum; import org.apache.doris.policy.StoragePolicy; import org.apache.doris.resource.Tag; import org.apache.doris.thrift.TCompressionType; @@ -202,7 +201,7 @@ public class PropertyAnalyzer { if (hasRemoteStoragePolicy) { // check remote storage policy - StoragePolicy checkedPolicy = new StoragePolicy(PolicyTypeEnum.STORAGE, remoteStoragePolicy); + StoragePolicy checkedPolicy = StoragePolicy.ofCheck(remoteStoragePolicy); Policy policy = Catalog.getCurrentCatalog().getPolicyMgr().getPolicy(checkedPolicy); if (!(policy instanceof StoragePolicy)) { throw new AnalysisException("No PolicyStorage: " + remoteStoragePolicy); @@ -527,7 +526,7 @@ public class PropertyAnalyzer { if (properties != null && properties.containsKey(PROPERTIES_REMOTE_STORAGE_POLICY)) { remoteStoragePolicy = properties.get(PROPERTIES_REMOTE_STORAGE_POLICY); // check remote storage policy existence - StoragePolicy checkedStoragePolicy = new StoragePolicy(PolicyTypeEnum.STORAGE, remoteStoragePolicy); + StoragePolicy checkedStoragePolicy = StoragePolicy.ofCheck(remoteStoragePolicy); Policy policy = Catalog.getCurrentCatalog().getPolicyMgr().getPolicy(checkedStoragePolicy); if (!(policy instanceof StoragePolicy)) { throw new AnalysisException("StoragePolicy: " + remoteStoragePolicy + " does not exist."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java index 51f97d0116..434552ad62 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java @@ -150,9 +150,6 @@ import org.apache.doris.persist.PartitionPersistInfo; import org.apache.doris.persist.RecoverInfo; import org.apache.doris.persist.ReplicaPersistInfo; import org.apache.doris.persist.TruncateTableInfo; -import org.apache.doris.policy.Policy; -import org.apache.doris.policy.PolicyTypeEnum; -import org.apache.doris.policy.StoragePolicy; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; @@ -191,7 +188,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -1797,7 +1793,7 @@ public class InternalDataSource implements DataSourceIf { // set storage policy String storagePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties); - checkStoragePolicyExist(storagePolicy); + Catalog.getCurrentCatalog().getPolicyMgr().checkStoragePolicyExist(storagePolicy); olapTable.setStoragePolicy(storagePolicy); @@ -1992,7 +1988,7 @@ public class InternalDataSource implements DataSourceIf { if (!partionStoragePolicy.equals("")) { storagePolicy = partionStoragePolicy; } - checkStoragePolicyExist(storagePolicy); + Catalog.getCurrentCatalog().getPolicyMgr().checkStoragePolicyExist(storagePolicy); Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(), olapTable.getBaseIndexId(), entry.getValue(), entry.getKey(), olapTable.getIndexIdToMeta(), partitionDistributionInfo, dataProperty.getStorageMedium(), @@ -2051,24 +2047,6 @@ public class InternalDataSource implements DataSourceIf { } } - public static void checkStoragePolicyExist(String storagePolicy) throws DdlException { - if (!storagePolicy.equals("")) { - // when create table use storage policy - // if not exist default storage policy, create it - // if exist, just return. - Catalog.getCurrentCatalog().getPolicyMgr().createDefaultStoragePolicy(); - - List policiesByType = Catalog.getCurrentCatalog() - .getPolicyMgr().getPoliciesByType(PolicyTypeEnum.STORAGE); - policiesByType.stream().filter(policy -> policy.getPolicyName().equals(storagePolicy)).findAny() - .orElseThrow(() -> new DdlException("Storage policy does not exist. name: " + storagePolicy)); - Optional hasDefaultPolicy = policiesByType.stream() - .filter(policy -> policy.getPolicyName().equals(Config.default_storage_policy)).findAny(); - - StoragePolicy.checkDefaultStoragePolicyValid(storagePolicy, hasDefaultPolicy); - } - } - private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException { String tableName = stmt.getTableName(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java index acc3d8b0c2..a46773a39d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java @@ -57,10 +57,8 @@ public abstract class Policy implements Writable, GsonPostProcessable { @SerializedName(value = "policyName") protected String policyName = null; - public Policy() { - if (Catalog.getCurrentCatalog().isMaster()) { - policyId = Catalog.getCurrentCatalog().getNextId(); - } + public Policy(PolicyTypeEnum type) { + this.type = type; } /** @@ -69,8 +67,8 @@ public abstract class Policy implements Writable, GsonPostProcessable { * @param type policy type * @param policyName policy name */ - public Policy(final PolicyTypeEnum type, final String policyName) { - policyId = Catalog.getCurrentCatalog().getNextId(); + public Policy(long policyId, final PolicyTypeEnum type, final String policyName) { + this.policyId = policyId; this.type = type; this.policyName = policyName; } @@ -79,13 +77,13 @@ public abstract class Policy implements Writable, GsonPostProcessable { * Trans stmt to Policy. **/ public static Policy fromCreateStmt(CreatePolicyStmt stmt) throws AnalysisException { + long policyId = Catalog.getCurrentCatalog().getNextId(); switch (stmt.getType()) { case STORAGE: - StoragePolicy storagePolicy = new StoragePolicy(stmt.getType(), stmt.getPolicyName()); + StoragePolicy storagePolicy = new StoragePolicy(policyId, stmt.getPolicyName()); storagePolicy.init(stmt.getProperties(), stmt.isIfNotExists()); return storagePolicy; case ROW: - default: // stmt must be analyzed. DatabaseIf db = Catalog.getCurrentCatalog().getDataSourceMgr() .getCatalogOrAnalysisException(stmt.getTableName().getCtl()) @@ -93,9 +91,10 @@ public abstract class Policy implements Writable, GsonPostProcessable { UserIdentity userIdent = stmt.getUser(); userIdent.analyze(ConnectContext.get().getClusterName()); TableIf table = db.getTableOrAnalysisException(stmt.getTableName().getTbl()); - return new RowPolicy(stmt.getType(), stmt.getPolicyName(), db.getId(), userIdent, - stmt.getOrigStmt().originStmt, table.getId(), stmt.getFilterType(), - stmt.getWherePredicate()); + return new RowPolicy(policyId, stmt.getPolicyName(), db.getId(), userIdent, + stmt.getOrigStmt().originStmt, table.getId(), stmt.getFilterType(), stmt.getWherePredicate()); + default: + throw new AnalysisException("Unknown policy type: " + stmt.getType()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java index 7bdbf7b203..7759ab381b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java @@ -24,7 +24,6 @@ import org.apache.doris.analysis.DropPolicyStmt; import org.apache.doris.analysis.ShowPolicyStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; @@ -40,6 +39,7 @@ import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.parquet.Strings; import java.io.DataInput; import java.io.DataOutput; @@ -89,17 +89,16 @@ public class PolicyMgr implements Writable { lock.readLock().unlock(); } - public void createDefaultStoragePolicy() throws DdlException { - Optional hasDefault = findPolicy(Config.default_storage_policy, PolicyTypeEnum.STORAGE); - if (hasDefault.isPresent()) { - // already exist default storage policy, just return. - return; - } - + public void createDefaultStoragePolicy() { writeLock(); try { - StoragePolicy defaultStoragePolicy = - new StoragePolicy(PolicyTypeEnum.STORAGE, Config.default_storage_policy); + Optional hasDefault = findPolicy(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME, PolicyTypeEnum.STORAGE); + if (hasDefault.isPresent()) { + // already exist default storage policy, just return. + return; + } + long policyId = Catalog.getCurrentCatalog().getNextId(); + StoragePolicy defaultStoragePolicy = new StoragePolicy(policyId, StoragePolicy.DEFAULT_STORAGE_POLICY_NAME); unprotectedAdd(defaultStoragePolicy); Catalog.getCurrentCatalog().getEditLog().logCreatePolicy(defaultStoragePolicy); } finally { @@ -413,10 +412,6 @@ public class PolicyMgr implements Writable { throw new DdlException("Current not support alter row policy"); } - if (storagePolicyName.equalsIgnoreCase(Config.default_storage_policy)) { - createDefaultStoragePolicy(); - } - Optional policy = findPolicy(storagePolicyName, PolicyTypeEnum.STORAGE); if (!policy.isPresent()) { @@ -429,4 +424,23 @@ public class PolicyMgr implements Writable { Catalog.getCurrentCatalog().getEditLog().logAlterStoragePolicy(storagePolicy); LOG.info("Alter storage policy success. policy: {}", storagePolicy); } + + public void checkStoragePolicyExist(String storagePolicyName) throws DdlException { + if (Strings.isNullOrEmpty(storagePolicyName)) { + return; + } + readLock(); + try { + List policiesByType = Catalog.getCurrentCatalog().getPolicyMgr() + .getPoliciesByType(PolicyTypeEnum.STORAGE); + policiesByType.stream().filter(policy -> policy.getPolicyName().equals(storagePolicyName)).findAny() + .orElseThrow(() -> new DdlException("Storage policy does not exist. name: " + storagePolicyName)); + Optional hasDefaultPolicy = policiesByType.stream() + .filter(policy -> policy.getPolicyName().equals(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME)) + .findAny(); + StoragePolicy.checkDefaultStoragePolicyValid(storagePolicyName, hasDefaultPolicy); + } finally { + readUnlock(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java index 94c0458069..925463d279 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java @@ -88,12 +88,14 @@ public class RowPolicy extends Policy { private Expr wherePredicate = null; - public RowPolicy() {} + public RowPolicy() { + super(PolicyTypeEnum.ROW); + } /** * Policy for Table. Policy of ROW or others. * - * @param type PolicyType + * @param policyId policy id * @param policyName policy name * @param dbId database i * @param user username @@ -102,10 +104,9 @@ public class RowPolicy extends Policy { * @param filterType filter type * @param wherePredicate where predicate */ - public RowPolicy(final PolicyTypeEnum type, final String policyName, long dbId, - UserIdentity user, String originStmt, final long tableId, - final FilterType filterType, final Expr wherePredicate) { - super(type, policyName); + public RowPolicy(long policyId, final String policyName, long dbId, UserIdentity user, String originStmt, + final long tableId, final FilterType filterType, final Expr wherePredicate) { + super(policyId, PolicyTypeEnum.ROW, policyName); this.user = user; this.dbId = dbId; this.tableId = tableId; @@ -141,8 +142,8 @@ public class RowPolicy extends Policy { @Override public RowPolicy clone() { - return new RowPolicy(this.type, this.policyName, this.dbId, this.user, this.originStmt, this.tableId, - this.filterType, this.wherePredicate); + return new RowPolicy(this.policyId, this.policyName, this.dbId, this.user, this.originStmt, this.tableId, + this.filterType, this.wherePredicate); } private boolean checkMatched(long dbId, long tableId, PolicyTypeEnum type, diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java index 0a754e9ec6..b45ccba7e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java @@ -23,7 +23,6 @@ import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.S3Resource; import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; import org.apache.doris.persist.gson.GsonUtils; @@ -57,16 +56,18 @@ import java.util.Optional; **/ @Data public class StoragePolicy extends Policy { - public static boolean checkDefaultStoragePolicyValid(final String storagePolicyName, - Optional defaultPolicy) throws DdlException { + public static final String DEFAULT_STORAGE_POLICY_NAME = "default_storage_policy"; + + public static boolean checkDefaultStoragePolicyValid(final String storagePolicyName, Optional defaultPolicy) + throws DdlException { if (!defaultPolicy.isPresent()) { return false; } - if (storagePolicyName.equalsIgnoreCase(Config.default_storage_policy) - && (((StoragePolicy) defaultPolicy.get()).getStorageResource() == null)) { + if (storagePolicyName.equalsIgnoreCase(DEFAULT_STORAGE_POLICY_NAME) && ( + ((StoragePolicy) defaultPolicy.get()).getStorageResource() == null)) { throw new DdlException("Use default storage policy, but not give s3 info," - + " please use alter resource to add default storage policy S3 info."); + + " please use alter resource to add default storage policy S3 info."); } return true; } @@ -116,21 +117,23 @@ public class StoragePolicy extends Policy { private Map props; - public StoragePolicy() {} + public StoragePolicy() { + super(PolicyTypeEnum.STORAGE); + } /** * Policy for Storage Migration. * - * @param type PolicyType + * @param policyId policy id * @param policyName policy name * @param storageResource resource name for storage * @param cooldownDatetime cool down time * @param cooldownTtl cool down time cost after partition is created * @param cooldownTtlMs seconds for cooldownTtl */ - public StoragePolicy(final PolicyTypeEnum type, final String policyName, final String storageResource, - final Date cooldownDatetime, final String cooldownTtl, long cooldownTtlMs) { - super(type, policyName); + public StoragePolicy(long policyId, final String policyName, final String storageResource, + final Date cooldownDatetime, final String cooldownTtl, long cooldownTtlMs) { + super(policyId, PolicyTypeEnum.STORAGE, policyName); this.storageResource = storageResource; this.cooldownDatetime = cooldownDatetime; this.cooldownTtl = cooldownTtl; @@ -140,11 +143,17 @@ public class StoragePolicy extends Policy { /** * Policy for Storage Migration. * - * @param type PolicyType + * @param policyId policy id * @param policyName policy name */ - public StoragePolicy(final PolicyTypeEnum type, final String policyName) { - super(type, policyName); + public StoragePolicy(long policyId, final String policyName) { + super(policyId, PolicyTypeEnum.STORAGE, policyName); + } + + public static StoragePolicy ofCheck(String policyName) { + StoragePolicy storagePolicy = new StoragePolicy(); + storagePolicy.policyName = policyName; + return storagePolicy; } /** @@ -240,8 +249,8 @@ public class StoragePolicy extends Policy { @Override public StoragePolicy clone() { - return new StoragePolicy(this.type, this.policyName, this.storageResource, - this.cooldownDatetime, this.cooldownTtl, this.cooldownTtlMs); + return new StoragePolicy(this.policyId, this.policyName, this.storageResource, this.cooldownDatetime, + this.cooldownTtl, this.cooldownTtlMs); } @Override @@ -356,11 +365,10 @@ public class StoragePolicy extends Policy { } }); - if (policyName.equalsIgnoreCase(Config.default_storage_policy) && storageResource == null) { + if (policyName.equalsIgnoreCase(DEFAULT_STORAGE_POLICY_NAME) && storageResource == null) { // here first time set S3 resource to default storage policy. - String alterStorageResource = Optional.ofNullable(properties.get(STORAGE_RESOURCE)) - .orElseThrow(() -> - new DdlException("first time set default storage policy, but not give storageResource")); + String alterStorageResource = Optional.ofNullable(properties.get(STORAGE_RESOURCE)).orElseThrow( + () -> new DdlException("first time set default storage policy, but not give storageResource")); // check alterStorageResource resource exist. checkIsS3ResourceAndExist(alterStorageResource); storageResource = alterStorageResource; diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java index 3621cbcbad..25937af53f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java @@ -18,6 +18,9 @@ package org.apache.doris.analysis; import org.apache.doris.analysis.CompoundPredicate.Operator; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.FeConstants; @@ -36,13 +39,15 @@ import java.util.List; public class CancelLoadStmtTest extends TestWithFeService { private Analyzer analyzer; + private String dbName = "testDb"; + private String tblName = "table1"; @Override protected void runBeforeAll() throws Exception { FeConstants.runningUnitTest = true; - createDatabase("testDb"); - useDatabase("testDb"); - createTable("create table table1\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n" + createDatabase(dbName); + useDatabase(dbName); + createTable("create table " + tblName + "\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n" + "properties(\"replication_num\" = \"1\");"); analyzer = new Analyzer(connectContext.getCatalog(), connectContext); } @@ -55,36 +60,36 @@ public class CancelLoadStmtTest extends TestWithFeService { SlotRef stateSlotRef = new SlotRef(null, "state"); StringLiteral stateStringLiteral = new StringLiteral("FINISHED"); - BinaryPredicate labelBinaryPredicate = - new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral); + BinaryPredicate labelBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, + labelStringLiteral); CancelLoadStmt stmt = new CancelLoadStmt(null, labelBinaryPredicate); stmt.analyze(analyzer); Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label'", stmt.toString()); - BinaryPredicate stateBinaryPredicate = - new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + BinaryPredicate stateBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, + stateStringLiteral); stmt = new CancelLoadStmt(null, stateBinaryPredicate); stmt.analyze(analyzer); Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `state` = 'FINISHED'", stmt.toString()); - LikePredicate labelLikePredicate = - new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef, labelStringLiteral); + LikePredicate labelLikePredicate = new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef, + labelStringLiteral); stmt = new CancelLoadStmt(null, labelLikePredicate); stmt.analyze(analyzer); Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` LIKE 'doris_test_label'", stmt.toString()); - CompoundPredicate compoundAndPredicate = - new CompoundPredicate(Operator.AND, labelBinaryPredicate, stateBinaryPredicate); + CompoundPredicate compoundAndPredicate = new CompoundPredicate(Operator.AND, labelBinaryPredicate, + stateBinaryPredicate); stmt = new CancelLoadStmt(null, compoundAndPredicate); stmt.analyze(analyzer); Assertions.assertEquals( "CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label' AND `state` = 'FINISHED'", stmt.toString()); - CompoundPredicate compoundOrPredicate = - new CompoundPredicate(Operator.OR, labelBinaryPredicate, stateBinaryPredicate); + CompoundPredicate compoundOrPredicate = new CompoundPredicate(Operator.OR, labelBinaryPredicate, + stateBinaryPredicate); stmt = new CancelLoadStmt(null, compoundOrPredicate); stmt.analyze(analyzer); Assertions.assertEquals( @@ -93,11 +98,15 @@ public class CancelLoadStmtTest extends TestWithFeService { // test match List loadJobs = new ArrayList<>(); - InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, 10003L, 10005L, 0, "", ""); + Database db = Catalog.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:testDb"); + long dbId = db.getId(); + Table tbl = db.getTableNullable(tblName); + long tblId = tbl.getId(); + InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, dbId, tblId, 0, "", ""); loadJobs.add(insertLoadJob1); - InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, 10003L, 10005L, 0, "", ""); + InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, dbId, tblId, 0, "", ""); loadJobs.add(insertLoadJob2); - InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, 10003L, 10005L, 0, "", ""); + InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, dbId, tblId, 0, "", ""); loadJobs.add(insertLoadJob3); // label stmt = new CancelLoadStmt(null, labelBinaryPredicate); diff --git a/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java index 800b520836..5fa893b52b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java @@ -201,13 +201,13 @@ public class PolicyTest extends TestWithFeService { long dbId = 10; UserIdentity user = new UserIdentity("test_policy", "%"); String originStmt = "CREATE ROW POLICY test_row_policy ON test.table1" - + " AS PERMISSIVE TO test_policy USING (k1 = 1)"; + + " AS PERMISSIVE TO test_policy USING (k1 = 1)"; long tableId = 100; FilterType filterType = FilterType.PERMISSIVE; Expr wherePredicate = null; - Policy rowPolicy = new RowPolicy(type, policyName, dbId, user, - originStmt, tableId, filterType, wherePredicate); + Policy rowPolicy = new RowPolicy(10000, policyName, dbId, user, originStmt, tableId, filterType, + wherePredicate); ByteArrayOutputStream emptyOutputStream = new ByteArrayOutputStream(); DataOutputStream output = new DataOutputStream(emptyOutputStream); @@ -218,6 +218,7 @@ public class PolicyTest extends TestWithFeService { Policy newPolicy = Policy.read(input); Assertions.assertTrue(newPolicy instanceof RowPolicy); RowPolicy newRowPolicy = (RowPolicy) newPolicy; + Assertions.assertEquals(rowPolicy.getPolicyId(), newRowPolicy.getPolicyId()); Assertions.assertEquals(type, newRowPolicy.getType()); Assertions.assertEquals(policyName, newRowPolicy.getPolicyName()); Assertions.assertEquals(dbId, newRowPolicy.getDbId());