[Repository] Normalize the path of file on repository (#6402)
This commit is contained in:
@ -49,6 +49,9 @@ import org.apache.doris.thrift.TFinishTaskRequest;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicates;
|
||||
@ -58,9 +61,6 @@ import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.File;
|
||||
@ -603,6 +603,10 @@ public class BackupJob extends AbstractJob {
|
||||
SnapshotInfo info = infos.get(index++);
|
||||
String src = info.getTabletPath();
|
||||
String dest = repo.getRepoTabletPathBySnapshotInfo(label, info);
|
||||
if (dest == null) {
|
||||
status = new Status(ErrCode.COMMON_ERROR, "Invalid dest path: " + info);
|
||||
return;
|
||||
}
|
||||
srcToDest.put(src, dest);
|
||||
}
|
||||
long signature = catalog.getNextId();
|
||||
|
||||
@ -17,8 +17,6 @@
|
||||
|
||||
package org.apache.doris.backup;
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
|
||||
import org.apache.doris.analysis.StorageBackend;
|
||||
import org.apache.doris.backup.Status.ErrCode;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
@ -30,27 +28,31 @@ import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.system.Backend;
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.json.JSONObject;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/*
|
||||
* Repository represents a remote storage for backup to or restore from
|
||||
* File organization in repository is:
|
||||
@ -278,7 +280,7 @@ public class Repository implements Writable {
|
||||
// eg:
|
||||
// __palo_repository_repo_name/__ss_my_ss1/__ss_content/__db_10001/__tbl_10020/__part_10031/__idx_10020/__10022/
|
||||
public String getRepoTabletPathBySnapshotInfo(String label, SnapshotInfo info) {
|
||||
return Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name),
|
||||
String path = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name),
|
||||
joinPrefix(PREFIX_SNAPSHOT_DIR, label),
|
||||
DIR_SNAPSHOT_CONTENT,
|
||||
joinPrefix(PREFIX_DB, info.getDbId()),
|
||||
@ -286,13 +288,27 @@ public class Repository implements Writable {
|
||||
joinPrefix(PREFIX_PART, info.getPartitionId()),
|
||||
joinPrefix(PREFIX_IDX, info.getIndexId()),
|
||||
joinPrefix(PREFIX_COMMON, info.getTabletId()));
|
||||
try {
|
||||
// we need to normalize the path to avoid double "/" in path, or else some client such as S3 sdk can not
|
||||
// handle it correctly.
|
||||
return new URI(path).normalize().toString();
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.warn("failed to normalize path: {}", path, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public String getRepoPath(String label, String childPath) {
|
||||
return Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name),
|
||||
String path = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name),
|
||||
joinPrefix(PREFIX_SNAPSHOT_DIR, label),
|
||||
DIR_SNAPSHOT_CONTENT,
|
||||
childPath);
|
||||
try {
|
||||
return new URI(path).normalize().toString();
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.warn("failed to normalize path: {}", path, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if this repo is available.
|
||||
|
||||
@ -70,6 +70,9 @@ import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.thrift.TStorageType;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
@ -80,9 +83,6 @@ import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Table.Cell;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
@ -1263,6 +1263,10 @@ public class RestoreJob extends AbstractJob {
|
||||
// bos://location/__palo_repository_my_repo/_ss_my_ss/_ss_content/__db_10000/
|
||||
// __tbl_10001/__part_10002/_idx_10001/__10003
|
||||
String src = repo.getRepoPath(label, repoTabletPath);
|
||||
if (src == null) {
|
||||
status = new Status(ErrCode.COMMON_ERROR, "invalid src path: " + repoTabletPath);
|
||||
return;
|
||||
}
|
||||
SnapshotInfo snapshotInfo = snapshotInfos.get(info.getTabletId(), info.getBeId());
|
||||
Preconditions.checkNotNull(snapshotInfo, info.getTabletId() + "-" + info.getBeId());
|
||||
// download to previous exist snapshot dir
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.backup;
|
||||
|
||||
import mockit.*;
|
||||
import org.apache.doris.analysis.ShowRepositoriesStmt;
|
||||
import org.apache.doris.analysis.StorageBackend;
|
||||
import org.apache.doris.catalog.BrokerMgr;
|
||||
@ -25,13 +24,13 @@ import org.apache.doris.catalog.FsBroker;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
@ -44,6 +43,12 @@ import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import mockit.Delegate;
|
||||
import mockit.Expectations;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import mockit.Mocked;
|
||||
|
||||
|
||||
public class RepositoryTest {
|
||||
|
||||
@ -337,4 +342,21 @@ public class RepositoryTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPathNormalize() {
|
||||
String newLoc = "bos://cmy_bucket/bos_repo/";
|
||||
repo = new Repository(10000, "repo", false, newLoc, storage);
|
||||
String path = repo.getRepoPath("label1", "/_ss_my_ss/_ss_content/__db_10000/");
|
||||
Assert.assertEquals("bos://cmy_bucket/bos_repo/__palo_repository_repo/__ss_label1/__ss_content/_ss_my_ss/_ss_content/__db_10000/", path);
|
||||
|
||||
path = repo.getRepoPath("label1", "/_ss_my_ss/_ss_content///__db_10000");
|
||||
Assert.assertEquals("bos://cmy_bucket/bos_repo/__palo_repository_repo/__ss_label1/__ss_content/_ss_my_ss/_ss_content/__db_10000", path);
|
||||
|
||||
newLoc = "hdfs://path/to/repo";
|
||||
repo = new Repository(10000, "repo", false, newLoc, storage);
|
||||
SnapshotInfo snapshotInfo = new SnapshotInfo(1, 2, 3, 4, 5, 6, 7, "/path", Lists.newArrayList());
|
||||
path = repo.getRepoTabletPathBySnapshotInfo("label1", snapshotInfo);
|
||||
Assert.assertEquals("hdfs://path/to/repo/__palo_repository_repo/__ss_label1/__ss_content/__db_1/__tbl_2/__part_3/__idx_4/__5", path);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user