[feature](ColdHeatSeperation) Support to upload cold data to HDFS (#22048)
This commit is contained in:
@ -306,6 +306,7 @@ public class ReportHandler extends Daemon {
|
||||
// do the diff. find out (intersection) / (be - meta) / (meta - be)
|
||||
List<Policy> policiesInFe = Env.getCurrentEnv().getPolicyMgr().getCopiedPoliciesByType(PolicyTypeEnum.STORAGE);
|
||||
List<Resource> resourcesInFe = Env.getCurrentEnv().getResourceMgr().getResource(ResourceType.S3);
|
||||
resourcesInFe.addAll(Env.getCurrentEnv().getResourceMgr().getResource(ResourceType.HDFS));
|
||||
|
||||
List<Resource> resourceToPush = new ArrayList<>();
|
||||
List<Policy> policyToPush = new ArrayList<>();
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.policy;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.catalog.Resource;
|
||||
import org.apache.doris.catalog.Resource.ReferenceType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
@ -177,29 +178,38 @@ public class StoragePolicy extends Policy {
|
||||
this.cooldownTtl = getSecondsByCooldownTtl(props.get(COOLDOWN_TTL));
|
||||
}
|
||||
|
||||
checkIsS3ResourceAndExist(this.storageResource);
|
||||
checkResourceIsExist(this.storageResource);
|
||||
if (!addResourceReference() && !ifNotExists) {
|
||||
throw new AnalysisException("this policy has been added to s3 resource once, policy has been created.");
|
||||
throw new AnalysisException("this policy has been added to s3 or hdfs resource, policy has been created.");
|
||||
}
|
||||
}
|
||||
|
||||
private static Resource checkIsS3ResourceAndExist(final String storageResource) throws AnalysisException {
|
||||
// check storage_resource type is S3, current just support S3
|
||||
private static Resource checkResourceIsExist(final String storageResource) throws AnalysisException {
|
||||
Resource resource =
|
||||
Optional.ofNullable(Env.getCurrentEnv().getResourceMgr().getResource(storageResource))
|
||||
.orElseThrow(() -> new AnalysisException("storage resource doesn't exist: " + storageResource));
|
||||
|
||||
if (resource.getType() != Resource.ResourceType.S3) {
|
||||
throw new AnalysisException("current storage policy just support resource type S3_COOLDOWN");
|
||||
}
|
||||
Map<String, String> properties = resource.getCopiedProperties();
|
||||
if (!properties.containsKey(S3Properties.ROOT_PATH)) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Missing [%s] in '%s' resource", S3Properties.ROOT_PATH, storageResource));
|
||||
}
|
||||
if (!properties.containsKey(S3Properties.BUCKET)) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Missing [%s] in '%s' resource", S3Properties.BUCKET, storageResource));
|
||||
switch (resource.getType()) {
|
||||
case S3:
|
||||
if (!properties.containsKey(S3Properties.ROOT_PATH)) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Missing [%s] in '%s' resource", S3Properties.ROOT_PATH, storageResource));
|
||||
}
|
||||
if (!properties.containsKey(S3Properties.BUCKET)) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Missing [%s] in '%s' resource", S3Properties.BUCKET, storageResource));
|
||||
}
|
||||
break;
|
||||
case HDFS:
|
||||
if (!properties.containsKey(HdfsResource.HADOOP_FS_NAME)) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Missing [%s] in '%s' resource", HdfsResource.HADOOP_FS_NAME, storageResource));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new AnalysisException(
|
||||
"current storage policy just support resource type S3_COOLDOWN or HDFS_COOLDOWN");
|
||||
}
|
||||
return resource;
|
||||
}
|
||||
@ -343,7 +353,7 @@ public class StoragePolicy extends Policy {
|
||||
|
||||
String storageResource = properties.get(STORAGE_RESOURCE);
|
||||
if (storageResource != null) {
|
||||
checkIsS3ResourceAndExist(storageResource);
|
||||
checkResourceIsExist(storageResource);
|
||||
}
|
||||
if (this.policyName.equalsIgnoreCase(DEFAULT_STORAGE_POLICY_NAME) && this.storageResource == null
|
||||
&& storageResource == null) {
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.task;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.catalog.Resource;
|
||||
import org.apache.doris.catalog.Resource.ResourceType;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
@ -62,8 +63,9 @@ public class PushStoragePolicyTask extends AgentTask {
|
||||
StoragePolicy storagePolicy = (StoragePolicy) p;
|
||||
String resourceName = storagePolicy.getStorageResource();
|
||||
Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(resourceName);
|
||||
if (resource == null || resource.getType() != ResourceType.S3) {
|
||||
LOG.warn("can't find s3 resource by name {}", resourceName);
|
||||
if (resource == null || (resource.getType() != ResourceType.S3
|
||||
&& resource.getType() != ResourceType.HDFS)) {
|
||||
LOG.warn("can't find s3 resource or hdfs resource by name {}", resourceName);
|
||||
return;
|
||||
}
|
||||
item.setResourceId(resource.getId());
|
||||
@ -85,7 +87,11 @@ public class PushStoragePolicyTask extends AgentTask {
|
||||
item.setId(r.getId());
|
||||
item.setName(r.getName());
|
||||
item.setVersion(r.getVersion());
|
||||
item.setS3StorageParam(S3Properties.getS3TStorageParam(r.getCopiedProperties()));
|
||||
if (r.getType() == ResourceType.S3) {
|
||||
item.setS3StorageParam(S3Properties.getS3TStorageParam(r.getCopiedProperties()));
|
||||
} else if (r.getType() == ResourceType.HDFS) {
|
||||
item.setHdfsStorageParam(HdfsResource.generateHdfsParam(r.getCopiedProperties()));
|
||||
}
|
||||
r.readUnlock();
|
||||
tStorageResources.add(item);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user