[feature](backup) add property to remove snapshot before creating repo (#25847)

Doris is not responsible for managing snapshots, but it needs to clear all
snapshots before doing backup/restore regression testing, so a property is
added to indicate that existing snapshots need to be cleared when creating a
repo.

In addition, a regression test case for backup/restore has been added.
This commit is contained in:
walter
2023-10-27 21:03:26 +08:00
committed by GitHub
parent c715facafa
commit 365fdd2f4d
11 changed files with 440 additions and 25 deletions

View File

@ -174,6 +174,25 @@ PROPERTIES
);
```
9. Create repository and delete snapshots if exists.
```sql
CREATE REPOSITORY `s3_repo`
WITH S3
ON LOCATION "s3://s3-repo"
PROPERTIES
(
"s3.endpoint" = "http://s3-REGION.amazonaws.com",
"s3.region" = "s3-REGION",
"s3.access_key" = "AWS_ACCESS_KEY",
"s3.secret_key"="AWS_SECRET_KEY",
"s3.region" = "REGION",
"delete_if_exists" = "true"
);
```
Note: only the s3 service supports the "delete_if_exists" property.
### Keywords
CREATE, REPOSITORY

View File

@ -170,6 +170,25 @@ PROPERTIES
);
```
9. 创建仓库并删除已经存在的 snapshot
```sql
CREATE REPOSITORY `s3_repo`
WITH S3
ON LOCATION "s3://s3-repo"
PROPERTIES
(
"s3.endpoint" = "http://s3-REGION.amazonaws.com",
"s3.region" = "s3-REGION",
"s3.access_key" = "AWS_ACCESS_KEY",
"s3.secret_key"="AWS_SECRET_KEY",
"s3.region" = "REGION",
"delete_if_exists" = "true"
);
```
注:目前只有 s3 支持 "delete_if_exists" 属性。
### Keywords
CREATE, REPOSITORY

View File

@ -28,6 +28,8 @@ import org.apache.doris.qe.ConnectContext;
import java.util.Map;
public class CreateRepositoryStmt extends DdlStmt {
public static String PROP_DELETE_IF_EXISTS = "delete_if_exists";
private boolean isReadOnly;
private String name;
private StorageBackend storage;
@ -71,6 +73,16 @@ public class CreateRepositoryStmt extends DdlStmt {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
FeNameFormat.checkCommonName("repository", name);
// check delete_if_exists, this property will be used by Repository.initRepository.
Map<String, String> properties = getProperties();
String deleteIfExistsStr = properties.get(PROP_DELETE_IF_EXISTS);
if (deleteIfExistsStr != null) {
if (!deleteIfExistsStr.equalsIgnoreCase("true") && !deleteIfExistsStr.equalsIgnoreCase("false")) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"'" + PROP_DELETE_IF_EXISTS + "' in properties, you should set it false or true");
}
}
}
@Override

View File

@ -17,6 +17,7 @@
package org.apache.doris.backup;
import org.apache.doris.analysis.CreateRepositoryStmt;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status.ErrCode;
import org.apache.doris.catalog.Env;
@ -215,6 +216,27 @@ public class Repository implements Writable {
if (FeConstants.runningUnitTest) {
return Status.OK;
}
// A temporary solution is to delete all stale snapshots before creating an S3 repository
// so that we can add regression tests about backup/restore.
//
// TODO: support hdfs/brokers
if (fileSystem instanceof S3FileSystem) {
String deleteStaledSnapshots = fileSystem.getProperties()
.getOrDefault(CreateRepositoryStmt.PROP_DELETE_IF_EXISTS, "false");
if (deleteStaledSnapshots.equalsIgnoreCase("true")) {
// delete with prefix:
// eg. __palo_repository_repo_name/
String snapshotPrefix = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name));
LOG.info("property {} is set, delete snapshots with prefix: {}",
CreateRepositoryStmt.PROP_DELETE_IF_EXISTS, snapshotPrefix);
Status st = ((S3FileSystem) fileSystem).deleteDirectory(snapshotPrefix);
if (!st.ok()) {
return st;
}
}
}
String repoInfoFilePath = assembleRepoInfoFilePath();
// check if the repo is already exist in remote
List<RemoteFile> remoteFiles = Lists.newArrayList();
@ -245,8 +267,8 @@ public class Repository implements Writable {
return new Status(ErrCode.COMMON_ERROR,
"failed to parse create time of repository: " + root.get("create_time"));
}
return Status.OK;
return Status.OK;
} catch (IOException e) {
return new Status(ErrCode.COMMON_ERROR, "failed to read repo info file: " + e.getMessage());
} finally {

View File

@ -44,6 +44,8 @@ public interface ObjStorage<C> {
Status deleteObject(String remotePath);
Status deleteObjects(String remotePath);
Status copyObject(String origFilePath, String destFilePath);
RemoteObjects listObjects(String remotePath, String continuationToken) throws DdlException;

View File

@ -37,14 +37,18 @@ import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
@ -56,6 +60,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
public class S3ObjStorage implements ObjStorage<S3Client> {
private static final Logger LOG = LogManager.getLogger(S3ObjStorage.class);
@ -223,6 +228,52 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
}
}
@Override
public Status deleteObjects(String absolutePath) {
try {
S3URI baseUri = S3URI.create(absolutePath, forceHostedStyle);
String continuationToken = "";
boolean isTruncated = false;
long totalObjects = 0;
do {
RemoteObjects objects = listObjects(absolutePath, continuationToken);
List<RemoteObject> objectList = objects.getObjectList();
if (!objectList.isEmpty()) {
Delete delete = Delete.builder()
.objects(objectList.stream()
.map(RemoteObject::getKey)
.map(k -> ObjectIdentifier.builder().key(k).build())
.collect(Collectors.toList()))
.build();
DeleteObjectsRequest req = DeleteObjectsRequest.builder()
.bucket(baseUri.getBucket())
.delete(delete)
.build();
DeleteObjectsResponse resp = getClient(baseUri.getVirtualBucket()).deleteObjects(req);
if (resp.errors().size() > 0) {
LOG.warn("{} errors returned while deleting {} objects for dir {}",
resp.errors().size(), objectList.size(), absolutePath);
}
LOG.info("{} of {} objects deleted for dir {}",
resp.deleted().size(), objectList.size(), absolutePath);
totalObjects += objectList.size();
}
isTruncated = objects.isTruncated();
continuationToken = objects.getContinuationToken();
} while (isTruncated);
LOG.info("total delete {} objects for dir {}", totalObjects, absolutePath);
return Status.OK;
} catch (DdlException e) {
return new Status(Status.ErrCode.COMMON_ERROR, "list objects for delete objects failed: " + e.getMessage());
} catch (Exception e) {
LOG.warn("delete objects {} failed, force visual host style {}", absolutePath, e, forceHostedStyle);
return new Status(Status.ErrCode.COMMON_ERROR, "delete objects failed: " + e.getMessage());
}
}
@Override
public Status copyObject(String origFilePath, String destFilePath) {
try {
S3URI origUri = S3URI.create(origFilePath);
@ -249,9 +300,26 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
public RemoteObjects listObjects(String absolutePath, String continuationToken) throws DdlException {
try {
S3URI uri = S3URI.create(absolutePath, forceHostedStyle);
String bucket = uri.getBucket();
String prefix = uri.getKey();
ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder().bucket(uri.getBucket())
.prefix(normalizePrefix(prefix));
if (!StringUtils.isEmpty(uri.getVirtualBucket())) {
// Support s3 compatible service. The generated HTTP request for list objects likes:
//
// GET /<bucket-name>?list-type=2&prefix=<prefix>
prefix = bucket + "/" + prefix;
String endpoint = properties.get(S3Properties.ENDPOINT);
if (endpoint.contains("cos.")) {
bucket = "/";
} else if (endpoint.contains("oss-")) {
bucket = uri.getVirtualBucket();
} else if (endpoint.contains("obs.")) {
// FIXME: unlike cos and oss, the obs will report 'The specified key does not exist'.
throw new DdlException("obs does not support list objects via s3 sdk. path: " + absolutePath);
}
}
ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder()
.bucket(bucket)
.prefix(normalizePrefix(prefix));
if (!StringUtils.isEmpty(continuationToken)) {
requestBuilder.continuationToken(continuationToken);
}
@ -263,7 +331,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
}
return new RemoteObjects(remoteObjects, response.isTruncated(), response.nextContinuationToken());
} catch (Exception e) {
LOG.warn("Failed to list objects for S3", e);
LOG.warn("Failed to list objects for S3: {}", absolutePath, e);
throw new DdlException("Failed to list objects for S3, Error message: " + e.getMessage(), e);
}
}

View File

@ -107,5 +107,9 @@ public class S3FileSystem extends ObjFileSystem {
}
return Status.OK;
}
public Status deleteDirectory(String absolutePath) {
return ((S3ObjStorage) objStorage).deleteObjects(absolutePath);
}
}

View File

@ -21,8 +21,8 @@ import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import software.amazon.awssdk.core.sync.RequestBody;
@ -36,26 +36,85 @@ import java.util.Map;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class S3ObjStorageTest {
private S3ObjStorage storage;
@Test
public void testS3BaseOp() throws UserException {
String ak = System.getenv("S3_ACCESS_KEY");
String sk = System.getenv("S3_SECRET_KEY");
String endpoint = System.getenv("S3_ENDPOINT");
String region = System.getenv("S3_REGION");
String bucket = System.getenv("S3_BUCKET");
String prefix = System.getenv("S3_PREFIX");
private MockedS3Client mockedClient;
// Skip this test if ENV variables are not set.
if (StringUtils.isEmpty(endpoint) || StringUtils.isEmpty(ak)
|| StringUtils.isEmpty(sk) || StringUtils.isEmpty(region)
|| StringUtils.isEmpty(bucket) || StringUtils.isEmpty(prefix)) {
return;
}
@BeforeAll
public void beforeAll() throws Exception {
Map<String, String> properties = new HashMap<>();
properties.put("s3.endpoint", endpoint);
properties.put("s3.access_key", ak);
properties.put("s3.secret_key", sk);
properties.put("s3.region", region);
S3ObjStorage storage = new S3ObjStorage(properties);
String baseUrl = "s3://" + bucket + "/" + prefix + "/";
for (int i = 0; i < 5; ++i) {
Status st = storage.putObject(baseUrl + "key" + i, RequestBody.fromString("mocked"));
Assertions.assertEquals(Status.OK, st);
}
RemoteObjects remoteObjects = storage.listObjects(baseUrl, null);
Assertions.assertEquals(5, remoteObjects.getObjectList().size());
Assertions.assertFalse(remoteObjects.isTruncated());
Assertions.assertEquals(null, remoteObjects.getContinuationToken());
List<RemoteObject> objectList = remoteObjects.getObjectList();
for (int i = 0; i < objectList.size(); i++) {
RemoteObject remoteObject = objectList.get(i);
Assertions.assertEquals("key" + i, remoteObject.getRelativePath());
}
Status st = storage.headObject(baseUrl + "key" + 0);
Assertions.assertEquals(Status.OK, st);
File file = new File("test-file.txt");
file.delete();
st = storage.getObject(baseUrl + "key" + 0, file);
Assertions.assertEquals(Status.OK, st);
st = storage.deleteObject(baseUrl + "key" + 0);
Assertions.assertEquals(Status.OK, st);
file.delete();
st = storage.getObject(baseUrl + "key" + 0, file);
Assertions.assertEquals(Status.ErrCode.COMMON_ERROR, st.getErrCode());
Assertions.assertTrue(st.getErrMsg().contains("The specified key does not exist"));
file.delete();
st = storage.deleteObjects(baseUrl);
Assertions.assertEquals(Status.OK, st);
remoteObjects = storage.listObjects(baseUrl, null);
Assertions.assertEquals(0, remoteObjects.getObjectList().size());
Assertions.assertFalse(remoteObjects.isTruncated());
Assertions.assertEquals(null, remoteObjects.getContinuationToken());
}
@Test
public void testBaseOp() throws Exception {
Map<String, String> properties = new HashMap<>();
properties.put("s3.endpoint", "s3.e.c");
properties.put("s3.access_key", "abc");
properties.put("s3.secret_key", "123");
storage = new S3ObjStorage(properties);
S3ObjStorage storage = new S3ObjStorage(properties);
Field client = storage.getClass().getDeclaredField("client");
client.setAccessible(true);
mockedClient = new MockedS3Client();
MockedS3Client mockedClient = new MockedS3Client();
client.set(storage, mockedClient);
Assertions.assertTrue(storage.getClient("mocked") instanceof MockedS3Client);
}
@Test
public void testBaseOp() throws UserException {
S3URI vUri = S3URI.create("s3://bucket/key", true);
S3URI uri = S3URI.create("s3://bucket/key", false);
Assertions.assertEquals(vUri.getVirtualBucket(), "bucket");
@ -98,7 +157,16 @@ class S3ObjStorageTest {
List<RemoteObject> list = remoteObjectsVBucket.getObjectList();
for (int i = 0; i < list.size(); i++) {
RemoteObject remoteObject = list.get(i);
Assertions.assertTrue(remoteObject.getRelativePath().startsWith("keys/key" + i));
Assertions.assertTrue(remoteObject.getRelativePath().startsWith("key" + i));
}
storage.properties.put("use_path_style", "true");
storage.setProperties(storage.properties);
remoteObjectsVBucket = storage.listObjects("oss://bucket/keys", null);
list = remoteObjectsVBucket.getObjectList();
for (int i = 0; i < list.size(); i++) {
RemoteObject remoteObject = list.get(i);
Assertions.assertTrue(remoteObject.getRelativePath().startsWith("key" + i));
}
}
}

View File

@ -352,10 +352,50 @@ class Syncer {
Boolean checkSnapshotFinish() {
String checkSQL = "SHOW BACKUP FROM " + context.db
List<Object> row = suite.sql(checkSQL)[0]
logger.info("Now row is ${row}")
def records = suite.sql(checkSQL)
for (row in records) {
logger.info("BACKUP row is ${row}")
String state = (row[3] as String);
if (state != "FINISHED" && state != "CANCELLED") {
return false
}
}
true
}
return (row[3] as String) == "FINISHED"
String getSnapshotTimestamp(String repoName, String snapshotName) {
def filterShowSnapshot = { records, name ->
for (row in records) {
logger.info("Snapshot row is ${row}")
if (row[0] == name && row[1] != "null") {
return row
}
}
null
}
for (int i = 0; i < 3; ++i) {
def result = suite.sql "SHOW SNAPSHOT ON ${repoName}"
def snapshot = filterShowSnapshot(result, snapshotName)
if (snapshot != null) {
return snapshot[1].split('\n').last()
}
Thread.sleep(3000);
}
null
}
Boolean checkAllRestoreFinish() {
String checkSQL = "SHOW RESTORE FROM ${context.db}"
def records = suite.sql(checkSQL)
for (row in records) {
logger.info("Restore row is ${row}")
String state = row[4]
if (state != "FINISHED" && state != "CANCELLED") {
return false
}
}
true
}
Boolean checkRestoreFinish() {
@ -742,4 +782,53 @@ class Syncer {
TCommitTxnResult result = SyncerUtils.commitTxn(clientImpl, context)
return checkCommitTxn(result)
}
String externalStoragePrefix() {
String feAddr = "${context.config.feTargetThriftNetworkAddress}"
int code = feAddr.hashCode();
((code < 0) ? -code : code).toString()
}
void createS3Repository(String name, boolean readOnly = false) {
String ak = suite.getS3AK()
String sk = suite.getS3SK()
String endpoint = suite.getS3Endpoint()
String region = suite.getS3Region()
String bucket = suite.getS3BucketName()
String prefix = externalStoragePrefix()
suite.try_sql "DROP REPOSITORY `${name}`"
suite.sql """
CREATE ${readOnly ? "READ ONLY" : ""} REPOSITORY `${name}`
WITH S3
ON LOCATION "s3://${bucket}/${prefix}/${name}"
PROPERTIES
(
"s3.endpoint" = "http://${endpoint}",
"s3.region" = "${region}",
"s3.access_key" = "${ak}",
"s3.secret_key" = "${sk}",
"delete_if_exists" = "true"
)
"""
}
void createHdfsRepository(String name, boolean readOnly = false) {
String hdfsFs = suite.getHdfsFs()
String hdfsUser = suite.getHdfsUser()
String dataDir = suite.getHdfsDataDir()
String prefix = externalStoragePrefix()
suite.try_sql "DROP REPOSITORY `${name}`"
suite.sql """
CREATE REPOSITORY `${name}`
WITH hdfs
ON LOCATION "${dataDir}/${prefix}/${name}"
PROPERTIES
(
"fs.defaultFS" = "${hdfsFs}",
"hadoop.username" = "${hdfsUser}"
)
"""
}
}

View File

@ -16,6 +16,68 @@
// under the License.
suite("test_backup_restore", "backup_restore") {
// todo: test repository/backup/restore/cancel backup ...
sql "SHOW REPOSITORIES"
String repoName = "test_backup_restore_repo"
def syncer = getSyncer()
syncer.createS3Repository(repoName)
String tableName = "test_backup_restore_table"
sql "DROP TABLE IF EXISTS ${tableName}"
sql """
CREATE TABLE ${tableName} (
`id` LARGEINT NOT NULL,
`count` LARGEINT SUM DEFAULT "0")
AGGREGATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES
(
"replication_num" = "1"
)
"""
List<String> values = []
for (i = 1; i <= 10; ++i) {
values.add("(${i}, ${i})")
}
sql "INSERT INTO ${tableName} VALUES ${values.join(",")}"
def result = sql "SELECT * FROM ${tableName}"
assertEquals(result.size(), values.size());
String snapshotName = "test_backup_restore_snapshot"
sql """
BACKUP SNAPSHOT ${snapshotName}
TO `${repoName}`
ON (${tableName})
"""
while (!syncer.checkSnapshotFinish()) {
Thread.sleep(3000)
}
snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
assertTrue(snapshot != null)
sql "TRUNCATE TABLE ${tableName}"
sql """
RESTORE SNAPSHOT ${snapshotName}
FROM `${repoName}`
ON ( `${tableName}`)
PROPERTIES
(
"backup_timestamp" = "${snapshot}",
"replication_num" = "1"
)
"""
while (!syncer.checkAllRestoreFinish()) {
Thread.sleep(3000)
}
result = sql "SELECT * FROM ${tableName}"
assertEquals(result.size(), values.size());
sql "DROP TABLE ${tableName} FORCE"
sql "DROP REPOSITORY `${repoName}`"
}

View File

@ -26,7 +26,7 @@ suite("test_create_and_drop_repository", "backup_restore") {
String region = getS3Region()
String bucket = context.config.otherConfigs.get("s3BucketName");
def filter_show_repo_result = { result, name ->
def filterShowRepoResult = { result, name ->
for (record in result) {
if (record[1] == name)
return record
@ -49,13 +49,13 @@ suite("test_create_and_drop_repository", "backup_restore") {
"""
def result = sql """ SHOW REPOSITORIES """
def repo = filter_show_repo_result(result, repoName)
def repo = filterShowRepoResult(result, repoName)
assertTrue(repo != null)
sql "DROP REPOSITORY `${repoName}`"
result = sql """ SHOW REPOSITORIES """
repo = filter_show_repo_result(result, repoName)
repo = filterShowRepoResult(result, repoName)
assertTrue(repo == null)
// case 2. S3 read only repo
@ -73,12 +73,62 @@ suite("test_create_and_drop_repository", "backup_restore") {
"""
result = sql """ SHOW REPOSITORIES """
repo = filter_show_repo_result(result, repoName)
repo = filterShowRepoResult(result, repoName)
assertTrue(repo != null)
sql "DROP REPOSITORY `${repoName}`"
result = sql """ SHOW REPOSITORIES """
repo = filter_show_repo_result(result, repoName)
repo = filterShowRepoResult(result, repoName)
assertTrue(repo == null)
if (enableHdfs()) {
// case 3. hdfs repo
String hdfsFs = getHdfsFs()
String hdfsUser = getHdfsUser()
String dataDir = getHdfsDataDir()
sql """
CREATE REPOSITORY `${repoName}`
WITH hdfs
ON LOCATION "${dataDir}${repoName}"
PROPERTIES
(
"fs.defaultFS" = "${hdfsFs}",
"hadoop.username" = "${hdfsUser}"
)
"""
result = sql """ SHOW REPOSITORIES """
repo = filterShowRepoResult(result, repoName)
assertTrue(repo != null)
sql "DROP REPOSITORY `${repoName}`"
result = sql """ SHOW REPOSITORIES """
repo = filterShowRepoResult(result, repoName)
assertTrue(repo == null)
// case 4. hdfs read only repo
sql """
CREATE READ ONLY REPOSITORY `${repoName}`
WITH hdfs
ON LOCATION "${dataDir}${repoName}"
PROPERTIES
(
"fs.defaultFS" = "${hdfsFs}",
"hadoop.username" = "${hdfsUser}"
)
"""
result = sql """ SHOW REPOSITORIES """
repo = filterShowRepoResult(result, repoName)
assertTrue(repo != null)
sql "DROP REPOSITORY `${repoName}`"
result = sql """ SHOW REPOSITORIES """
repo = filterShowRepoResult(result, repoName)
assertTrue(repo == null)
}
}