[enhancement](backup/restore) support alter s3 repo info about ak/sk/token (#27027)

in some cases:

s3.session_token/AWS_TOKEN will be expired after hours, and may be upload snapshot job will failed if data is big large;
as same reason, repo will be expired too when RepositoryMgr exec repo ping
so it need support alter s3 repo properties about ak/sk/token and update upload snapshot job properties to continue backup.

Signed-off-by: nextdreamblue <zxw520blue1@163.com>
This commit is contained in:
xueweizhang
2023-12-21 13:21:55 +08:00
committed by GitHub
parent 223454d1db
commit d11bb11592
15 changed files with 282 additions and 0 deletions

View File

@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
public class AlterRepositoryStmt extends DdlStmt {
private String name;
private Map<String, String> properties;
public AlterRepositoryStmt(String name, Map<String, String> properties) {
this.name = name;
this.properties = properties;
if (this.properties == null) {
this.properties = Maps.newHashMap();
}
}
public Map<String, String> getProperties() {
return properties;
}
public String getName() {
return name;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
FeNameFormat.checkCommonName("repository", name);
Map<String, String> copyProperties = new HashMap<>(properties);
if (copyProperties.size() == 0) {
throw new UserException("alter repository need contains ak/sk/token info of s3.");
}
copyProperties.remove(S3Properties.ACCESS_KEY);
copyProperties.remove(S3Properties.SECRET_KEY);
copyProperties.remove(S3Properties.SESSION_TOKEN);
copyProperties.remove(S3Properties.Env.ACCESS_KEY);
copyProperties.remove(S3Properties.Env.SECRET_KEY);
copyProperties.remove(S3Properties.Env.TOKEN);
if (copyProperties.size() != 0) {
throw new UserException("alter repository only support ak/sk/token info of s3."
+ " unsupported properties: " + copyProperties.keySet());
}
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("ALTER REPOSITORY '").append(name).append("' ");
sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")");
return sb.toString();
}
}

View File

@ -155,6 +155,8 @@ public abstract class AbstractJob implements Writable {
public abstract boolean isCancelled();
public abstract Status updateRepo(Repository repo);
public static AbstractJob read(DataInput in) throws IOException {
AbstractJob job = null;
JobType type = JobType.valueOf(Text.readString(in));

View File

@ -19,6 +19,7 @@ package org.apache.doris.backup;
import org.apache.doris.analysis.AbstractBackupStmt;
import org.apache.doris.analysis.AbstractBackupTableRefClause;
import org.apache.doris.analysis.AlterRepositoryStmt;
import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.BackupStmt.BackupType;
import org.apache.doris.analysis.CancelBackupStmt;
@ -49,6 +50,7 @@ import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.SnapshotTask;
@ -222,6 +224,49 @@ public class BackupHandler extends MasterDaemon implements Writable {
}
}
public void alterRepository(AlterRepositoryStmt stmt) throws DdlException {
tryLock();
try {
Repository repo = repoMgr.getRepo(stmt.getName());
if (repo == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository does not exist");
}
if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
Map<String, String> oldProperties = new HashMap<>(stmt.getProperties());
Status status = repo.alterRepositoryS3Properties(oldProperties);
if (!status.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, status.getErrMsg());
}
RemoteFileSystem fileSystem = FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
StorageBackend.StorageType.S3, oldProperties);
Repository newRepo = new Repository(repo.getId(), repo.getName(), repo.isReadOnly(),
repo.getLocation(), fileSystem);
if (!newRepo.ping()) {
LOG.warn("Failed to connect repository {}. msg: {}", repo.getName(), repo.getErrorMsg());
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Repo can not ping with new s3 properties");
}
Status st = repoMgr.alterRepo(newRepo, false /* not replay */);
if (!st.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to alter repository: " + st.getErrMsg());
}
for (AbstractJob job : getAllCurrentJobs()) {
if (!job.isDone() && job.getRepoId() == repo.getId()) {
job.updateRepo(newRepo);
}
}
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Only support alter s3 repository");
}
} finally {
seqlock.unlock();
}
}
// handle drop repository stmt
public void dropRepository(DropRepositoryStmt stmt) throws DdlException {
tryLock();

View File

@ -273,6 +273,28 @@ public class BackupJob extends AbstractJob {
return state == BackupJobState.CANCELLED;
}
@Override
public synchronized Status updateRepo(Repository repo) {
this.repo = repo;
if (this.state == BackupJobState.UPLOADING) {
for (Map.Entry<Long, Long> entry : unfinishedTaskIds.entrySet()) {
long signature = entry.getKey();
long beId = entry.getValue();
AgentTask task = AgentTaskQueue.getTask(beId, TTaskType.UPLOAD, signature);
if (task == null || task.getTaskType() != TTaskType.UPLOAD) {
continue;
}
((UploadTask) task).updateBrokerProperties(
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
AgentTaskQueue.updateTask(beId, TTaskType.UPLOAD, signature, task);
}
LOG.info("finished to update upload job properties. {}", this);
}
LOG.info("finished to update repo of job. {}", this);
return Status.OK;
}
// Polling the job state and do the right things.
@Override
public synchronized void run() {

View File

@ -29,6 +29,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFile;
@ -58,7 +59,10 @@ import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
/*
@ -295,6 +299,37 @@ public class Repository implements Writable {
}
}
public Status alterRepositoryS3Properties(Map<String, String> properties) {
if (fileSystem instanceof S3FileSystem) {
Map<String, String> oldProperties = new HashMap<>(this.getRemoteFileSystem().getProperties());
oldProperties.remove(S3Properties.ACCESS_KEY);
oldProperties.remove(S3Properties.SECRET_KEY);
oldProperties.remove(S3Properties.SESSION_TOKEN);
oldProperties.remove(S3Properties.Env.ACCESS_KEY);
oldProperties.remove(S3Properties.Env.SECRET_KEY);
oldProperties.remove(S3Properties.Env.TOKEN);
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (Objects.equals(entry.getKey(), S3Properties.ACCESS_KEY)
|| Objects.equals(entry.getKey(), S3Properties.Env.ACCESS_KEY)) {
oldProperties.putIfAbsent(S3Properties.ACCESS_KEY, entry.getValue());
}
if (Objects.equals(entry.getKey(), S3Properties.SECRET_KEY)
|| Objects.equals(entry.getKey(), S3Properties.Env.SECRET_KEY)) {
oldProperties.putIfAbsent(S3Properties.SECRET_KEY, entry.getValue());
}
if (Objects.equals(entry.getKey(), S3Properties.SESSION_TOKEN)
|| Objects.equals(entry.getKey(), S3Properties.Env.TOKEN)) {
oldProperties.putIfAbsent(S3Properties.SESSION_TOKEN, entry.getValue());
}
}
properties.clear();
properties.putAll(oldProperties);
return Status.OK;
} else {
return new Status(ErrCode.COMMON_ERROR, "Only support alter s3 repository");
}
}
// eg: location/__palo_repository_repo_name/__repo_info
public String assembleRepoInfoFilePath() {
return Joiner.on(PATH_DELIMITER).join(location,

View File

@ -21,6 +21,7 @@ import org.apache.doris.backup.Status.ErrCode;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.fs.remote.S3FileSystem;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -96,6 +97,31 @@ public class RepositoryMgr extends Daemon implements Writable {
return repoIdMap.get(repoId);
}
public Status alterRepo(Repository newRepo, boolean isReplay) {
lock.lock();
try {
Repository repo = repoNameMap.get(newRepo.getName());
if (repo != null) {
if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
repoNameMap.put(repo.getName(), newRepo);
repoIdMap.put(repo.getId(), newRepo);
if (!isReplay) {
// log
Env.getCurrentEnv().getEditLog().logAlterRepository(newRepo);
}
LOG.info("successfully alter repo {}, isReplay {}", newRepo.getName(), isReplay);
return Status.OK;
} else {
return new Status(ErrCode.COMMON_ERROR, "Only support alter s3 repository");
}
}
return new Status(ErrCode.NOT_FOUND, "repository does not exist");
} finally {
lock.unlock();
}
}
public Status removeRepo(String repoName, boolean isReplay) {
lock.lock();
try {

View File

@ -338,6 +338,28 @@ public class RestoreJob extends AbstractJob {
return state == RestoreJobState.CANCELLED;
}
@Override
public synchronized Status updateRepo(Repository repo) {
this.repo = repo;
if (this.state == RestoreJobState.DOWNLOADING) {
for (Map.Entry<Long, Long> entry : unfinishedSignatureToId.entrySet()) {
long signature = entry.getKey();
long beId = entry.getValue();
AgentTask task = AgentTaskQueue.getTask(beId, TTaskType.DOWNLOAD, signature);
if (task == null || task.getTaskType() != TTaskType.DOWNLOAD) {
continue;
}
((DownloadTask) task).updateBrokerProperties(
S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
AgentTaskQueue.updateTask(beId, TTaskType.DOWNLOAD, signature, task);
}
LOG.info("finished to update download job properties. {}", this);
}
LOG.info("finished to update repo of job. {}", this);
return Status.OK;
}
@Override
public void run() {
if (state == RestoreJobState.FINISHED || state == RestoreJobState.CANCELLED) {

View File

@ -888,6 +888,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_ALTER_REPOSITORY: {
data = Repository.read(in);
isRead = true;
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);

View File

@ -1133,6 +1133,11 @@ public class EditLog {
env.getAlterInstance().processAlterMTMV(alterMtmv, true);
break;
}
case OperationType.OP_ALTER_REPOSITORY: {
Repository repository = (Repository) journal.getData();
env.getBackupHandler().getRepoMgr().alterRepo(repository, true);
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@ -1544,6 +1549,10 @@ public class EditLog {
logEdit(OperationType.OP_DROP_REPOSITORY, new Text(repoName));
}
public void logAlterRepository(Repository repo) {
logEdit(OperationType.OP_ALTER_REPOSITORY, repo);
}
public void logRestoreJob(RestoreJob job) {
logEdit(OperationType.OP_RESTORE_JOB, job);
}

View File

@ -351,6 +351,8 @@ public class OperationType {
public static final short OP_ALTER_MTMV = 459;
public static final short OP_ALTER_REPOSITORY = 460;
/**
* Get opcode name by op code.
**/

View File

@ -39,6 +39,7 @@ import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
import org.apache.doris.analysis.AlterDatabaseRename;
import org.apache.doris.analysis.AlterJobStatusStmt;
import org.apache.doris.analysis.AlterPolicyStmt;
import org.apache.doris.analysis.AlterRepositoryStmt;
import org.apache.doris.analysis.AlterResourceStmt;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.AlterSqlBlockRuleStmt;
@ -388,6 +389,8 @@ public class DdlExecutor {
} else if (ddlStmt instanceof DropAnalyzeJobStmt) {
DropAnalyzeJobStmt analyzeJobStmt = (DropAnalyzeJobStmt) ddlStmt;
Env.getCurrentEnv().getAnalysisManager().dropAnalyzeJob(analyzeJobStmt);
} else if (ddlStmt instanceof AlterRepositoryStmt) {
env.getBackupHandler().alterRepository((AlterRepositoryStmt) ddlStmt);
} else {
LOG.warn("Unkown statement " + ddlStmt.getClass());
throw new DdlException("Unknown statement.");

View File

@ -135,6 +135,18 @@ public class AgentTaskQueue {
return signatureMap.get(signature);
}
public static synchronized void updateTask(long backendId, TTaskType type, long signature, AgentTask newTask) {
if (!tasks.contains(backendId, type)) {
return;
}
Map<Long, AgentTask> signatureMap = tasks.get(backendId, type);
if (!signatureMap.containsKey(signature)) {
return;
}
signatureMap.put(signature, newTask);
}
// this is just for unit test
public static synchronized List<AgentTask> getTask(TTaskType type) {
List<AgentTask> res = Lists.newArrayList();

View File

@ -78,6 +78,10 @@ public class DownloadTask extends AgentTask {
return brokerProperties;
}
public void updateBrokerProperties(Map<String, String> brokerProperties) {
this.brokerProperties = new java.util.HashMap<>(brokerProperties);
}
public TDownloadReq toThrift() {
// these fields are required
// 1: required i64 job_id

View File

@ -24,6 +24,7 @@ import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUploadReq;
import java.util.HashMap;
import java.util.Map;
public class UploadTask extends AgentTask {
@ -64,6 +65,10 @@ public class UploadTask extends AgentTask {
return brokerProperties;
}
public void updateBrokerProperties(Map<String, String> brokerProperties) {
this.brokerProperties = new HashMap<>(brokerProperties);
}
public TUploadReq toThrift() {
TNetworkAddress address = new TNetworkAddress(broker.host, broker.port);
TUploadReq request = new TUploadReq(jobId, srcToDestPath, address);