Add timeout on snapshot of data (#1672)

Release snapshot when finishing or cancelling backup/restore job.
Snapshot may takes a lot disk space if not releasing them in time.
This commit is contained in:
Mingyu Chen
2019-08-21 21:18:53 +08:00
committed by GitHub
parent 0792e06eed
commit 2b2bc82ae2
16 changed files with 101 additions and 24 deletions

View File

@ -1525,7 +1525,7 @@ void* TaskWorkerPool::_make_snapshot_thread_callback(void* arg_this) {
<< ", snapshot_path:" << snapshot_path;
if (snapshot_request.__isset.list_files) {
// list and save all snapshot files
// snapshot_path like: data/snapshot/20180417205230.1
// snapshot_path like: data/snapshot/20180417205230.1.86400
// we need to add subdir: tablet_id/schema_hash/
std::stringstream ss;
ss << snapshot_path << "/" << snapshot_request.tablet_id

View File

@ -212,8 +212,8 @@ namespace config {
// inc_rowset expired interval
CONF_Int32(inc_rowset_expired_sec, "1800");
// garbage sweep policy
CONF_Int32(max_garbage_sweep_interval, "43200");
CONF_Int32(min_garbage_sweep_interval, "200");
CONF_Int32(max_garbage_sweep_interval, "14400");
CONF_Int32(min_garbage_sweep_interval, "180");
CONF_Int32(snapshot_expire_time_sec, "172800");
// 仅仅是建议值,当磁盘空间不足时,trash下的文件保存期可不遵守这个参数
CONF_Int32(trash_file_expire_time_sec, "259200");

View File

@ -108,7 +108,7 @@ OLAPStatus SnapshotManager::release_snapshot(const string& snapshot_path) {
&& snapshot_path.compare(abs_path.size(),
SNAPSHOT_PREFIX.size(), SNAPSHOT_PREFIX) == 0) {
remove_all_dir(snapshot_path);
VLOG(3) << "success to release snapshot path. [path='" << snapshot_path << "']";
LOG(INFO) << "success to release snapshot path. [path='" << snapshot_path << "']";
return OLAP_SUCCESS;
}
@ -283,8 +283,11 @@ OLAPStatus SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb, co
return OLAP_SUCCESS;
}
// get snapshot path: curtime.seq.timeout
// eg: 20190819221234.3.86400
OLAPStatus SnapshotManager::_calc_snapshot_id_path(
const TabletSharedPtr& tablet,
int64_t timeout_s,
string* out_path) {
OLAPStatus res = OLAP_SUCCESS;
if (out_path == nullptr) {
@ -303,7 +306,8 @@ OLAPStatus SnapshotManager::_calc_snapshot_id_path(
stringstream snapshot_id_path_stream;
MutexLock auto_lock(&_snapshot_mutex); // will automatically unlock when function return.
snapshot_id_path_stream << tablet->data_dir()->path() << SNAPSHOT_PREFIX
<< "/" << time_str << "." << _snapshot_base_id++;
<< "/" << time_str << "." << _snapshot_base_id++
<< "." << timeout_s;
*out_path = snapshot_id_path_stream.str();
return res;
}
@ -356,7 +360,11 @@ OLAPStatus SnapshotManager::_create_snapshot_files(
}
string snapshot_id_path;
res = _calc_snapshot_id_path(ref_tablet, &snapshot_id_path);
int64_t timeout_s = config::snapshot_expire_time_sec;
if (request.__isset.timeout) {
timeout_s = request.timeout;
}
res = _calc_snapshot_id_path(ref_tablet, timeout_s, &snapshot_id_path);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to calc snapshot_id_path, ref tablet="
<< ref_tablet->data_dir()->path();

View File

@ -73,6 +73,7 @@ private:
OLAPStatus _calc_snapshot_id_path(
const TabletSharedPtr& tablet,
int64_t timeout_s,
std::string* out_path);
std::string _get_header_full_path(

View File

@ -595,8 +595,8 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage) {
OLAPStatus res = OLAP_SUCCESS;
LOG(INFO) << "start trash and snapshot sweep.";
const uint32_t snapshot_expire = config::snapshot_expire_time_sec;
const uint32_t trash_expire = config::trash_file_expire_time_sec;
const int32_t snapshot_expire = config::snapshot_expire_time_sec;
const int32_t trash_expire = config::trash_file_expire_time_sec;
const double guard_space = config::disk_capacity_insufficient_percentage / 100.0;
std::vector<DataDirInfo> data_dir_infos;
res = get_all_data_dir_info(&data_dir_infos);
@ -668,7 +668,7 @@ void StorageEngine::_clean_unused_txns() {
}
OLAPStatus StorageEngine::_do_sweep(
const string& scan_root, const time_t& local_now, const uint32_t expire) {
const string& scan_root, const time_t& local_now, const int32_t expire) {
OLAPStatus res = OLAP_SUCCESS;
if (!check_dir_existed(scan_root)) {
// dir not existed. no need to sweep trash.
@ -689,7 +689,17 @@ OLAPStatus StorageEngine::_do_sweep(
res = OLAP_ERR_OS_ERROR;
continue;
}
if (difftime(local_now, mktime(&local_tm_create)) >= expire) {
int32_t actual_expire = expire;
// try get timeout in dir name, the old snapshot dir does not contain timeout
// eg: 20190818221123.3.86400, the 86400 is timeout, in second
size_t pos = dir_name.find('.', str_time.size() + 1);
if (pos != string::npos) {
actual_expire = std::stoi(dir_name.substr(pos + 1));
}
VLOG(10) << "get actual expire time " << actual_expire << " of dir: " << dir_name;
if (difftime(local_now, mktime(&local_tm_create)) >= actual_expire) {
if (remove_all_dir(path_name) != OLAP_SUCCESS) {
LOG(WARNING) << "fail to remove file or directory. path=" << path_name;
res = OLAP_ERR_OS_ERROR;

View File

@ -220,7 +220,7 @@ private:
void _clean_unused_txns();
OLAPStatus _do_sweep(
const std::string& scan_root, const time_t& local_tm_now, const uint32_t expire);
const std::string& scan_root, const time_t& local_tm_now, const int32_t expire);
// Thread functions
// unused rowset monitor thread

View File

@ -323,6 +323,10 @@ AgentStatus EngineCloneTask::_clone_copy(
}
snapshot_request.__set_missing_version(snapshot_versions);
}
if (clone_req.__isset.timeout_s) {
snapshot_request.__set_timeout(clone_req.timeout_s);
}
agent_client.make_snapshot(
snapshot_request,
&make_snapshot_result);

View File

@ -35,6 +35,7 @@ import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.ReleaseSnapshotTask;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.task.UploadTask;
import org.apache.doris.thrift.TFinishTaskRequest;
@ -152,9 +153,9 @@ public class BackupJob extends AbstractJob {
Preconditions.checkState(request.isSetSnapshot_files());
// snapshot path does not contains last 'tablet_id' and 'schema_hash' dir
// eg:
// /path/to/your/be/data/snapshot/20180410102311.0/
// /path/to/your/be/data/snapshot/20180410102311.0.86400/
// Full path will look like:
// /path/to/your/be/data/snapshot/20180410102311.0/10006/352781111/
// /path/to/your/be/data/snapshot/20180410102311.0.86400/10006/352781111/
SnapshotInfo info = new SnapshotInfo(task.getDbId(), task.getTableId(), task.getPartitionId(),
task.getIndexId(), task.getTabletId(), task.getBackendId(),
task.getSchemaHash(), request.getSnapshot_path(),
@ -583,6 +584,10 @@ public class BackupJob extends AbstractJob {
// meta info and job info has been saved to local file, this can be cleaned to reduce log size
backupMeta = null;
jobInfo = null;
// release all snapshots before clearing the snapshotInfos.
releaseSnapshots();
snapshotInfos.clear();
// log
@ -591,6 +596,22 @@ public class BackupJob extends AbstractJob {
localMetaInfoFilePath, localJobInfoFilePath, this);
}
private void releaseSnapshots() {
if (snapshotInfos.isEmpty()) {
return;
}
// we do not care about the release snapshot tasks' success or failure,
// the GC thread on BE will sweep the snapshot, finally.
AgentBatchTask batchTask = new AgentBatchTask();
for (SnapshotInfo info : snapshotInfos.values()) {
ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, info.getBeId(), info.getDbId(),
info.getTabletId(), info.getPath());
batchTask.addTask(releaseTask);
}
AgentTaskExecutor.submit(batchTask);
LOG.info("send {} release snapshot tasks, job: {}", snapshotInfos.size(), this);
}
private void uploadMetaAndJobInfoFile() {
String remoteMetaInfoFile = repo.assembleMetaInfoFilePath(label);
if (!uploadFile(localMetaInfoFilePath, remoteMetaInfoFile)) {
@ -684,6 +705,8 @@ public class BackupJob extends AbstractJob {
}
}
releaseSnapshots();
BackupJobState curState = state;
finishedTime = System.currentTimeMillis();
state = BackupJobState.CANCELLED;

View File

@ -55,6 +55,7 @@ import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.ReleaseSnapshotTask;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TStatusCode;
@ -1230,6 +1231,10 @@ public class RestoreJob extends AbstractJob {
if (!isReplay) {
restoredPartitions.clear();
restoredTbls.clear();
// release snapshot before clearing snapshotInfos
releaseSnapshots();
snapshotInfos.clear();
finishedTime = System.currentTimeMillis();
@ -1242,6 +1247,22 @@ public class RestoreJob extends AbstractJob {
return Status.OK;
}
private void releaseSnapshots() {
if (snapshotInfos.isEmpty()) {
return;
}
// we do not care about the release snapshot tasks' success or failure,
// the GC thread on BE will sweep the snapshot, finally.
AgentBatchTask batchTask = new AgentBatchTask();
for (SnapshotInfo info : snapshotInfos.values()) {
ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null, info.getBeId(), info.getDbId(),
info.getTabletId(), info.getPath());
batchTask.addTask(releaseTask);
}
AgentTaskExecutor.submit(batchTask);
LOG.info("send {} release snapshot tasks, job: {}", snapshotInfos.size(), this);
}
private void replayWaitingAllTabletsCommitted() {
allTabletCommitted(true /* is replay */);
}
@ -1371,6 +1392,9 @@ public class RestoreJob extends AbstractJob {
// backupMeta is useless
backupMeta = null;
releaseSnapshots();
snapshotInfos.clear();
RestoreJobState curState = state;
finishedTime = System.currentTimeMillis();
state = RestoreJobState.CANCELLED;

View File

@ -36,7 +36,7 @@ public class SnapshotInfo implements Writable {
private long tabletId;
private long beId;
private int schemaHash;
// eg: /path/to/your/be/data/snapshot/20180410102311.0/
// eg: /path/to/your/be/data/snapshot/20180410102311.0.86400/
private String path;
// eg:
// 10006_0_1_0_0.dat

View File

@ -669,6 +669,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
"dest backend " + srcReplica.getBackendId() + " does not exist");
}
taskTimeoutMs = getApproximateTimeoutMs();
// create the clone task and clone replica.
// we use visible version in clone task, but set the clone replica's last failed version to
// committed version.
@ -682,7 +684,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort());
cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId,
tabletId, schemaHash, Lists.newArrayList(tSrcBe), storageMedium,
visibleVersion, visibleVersionHash);
visibleVersion, visibleVersionHash, (int) (taskTimeoutMs / 1000));
cloneTask.setPathHash(srcPathHash, destPathHash);
// if this is a balance task, or this is a repair task with REPLICA_MISSING/REPLICA_RELOCATING or REPLICA_MISSING_IN_CLUSTER,
@ -714,8 +716,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
}
}
taskTimeoutMs = getApproximateTimeoutMs();
this.state = State.RUNNING;
return cloneTask;
}

View File

@ -38,18 +38,21 @@ public class CloneTask extends AgentTask {
private long srcPathHash = -1;
private long destPathHash = -1;
private int timeoutS;
private int taskVersion = VERSION_1;
public CloneTask(long backendId, long dbId, long tableId, long partitionId, long indexId,
long tabletId, int schemaHash, List<TBackend> srcBackends, TStorageMedium storageMedium,
long visibleVersion, long visibleVersionHash) {
long visibleVersion, long visibleVersionHash, int timeoutS) {
super(null, backendId, TTaskType.CLONE, dbId, tableId, partitionId, indexId, tabletId);
this.schemaHash = schemaHash;
this.srcBackends = srcBackends;
this.storageMedium = storageMedium;
this.visibleVersion = visibleVersion;
this.visibleVersionHash = visibleVersionHash;
this.timeoutS = timeoutS;
}
public int getSchemaHash() {
@ -88,6 +91,7 @@ public class CloneTask extends AgentTask {
request.setSrc_path_hash(srcPathHash);
request.setDest_path_hash(destPathHash);
}
request.setTimeout_s(timeoutS);
return request;
}

View File

@ -113,6 +113,7 @@ public class ExportPendingTask extends MasterTask {
snapshotRequest.setSchema_hash(Integer.parseInt(paloScanRange.getSchema_hash()));
snapshotRequest.setVersion(Long.parseLong(paloScanRange.getVersion()));
snapshotRequest.setVersion_hash(Long.parseLong(paloScanRange.getVersion_hash()));
snapshotRequest.setTimeout(job.getTimeoutSecond());
AgentClient client = new AgentClient(host, port);
TAgentResult result = client.makeSnapshot(snapshotRequest);

View File

@ -29,13 +29,13 @@ public class SnapshotTask extends AgentTask {
private int schemaHash;
private long timeout;
private long timeoutMs;
private boolean isRestoreTask;
public SnapshotTask(TResourceInfo resourceInfo, long backendId, long signature, long jobId,
long dbId, long tableId, long partitionId, long indexId, long tabletId,
long version, long versionHash, int schemaHash, long timeout, boolean isRestoreTask) {
long version, long versionHash, int schemaHash, long timeoutMs, boolean isRestoreTask) {
super(resourceInfo, backendId, TTaskType.MAKE_SNAPSHOT, dbId, tableId, partitionId, indexId, tabletId,
signature);
@ -45,7 +45,7 @@ public class SnapshotTask extends AgentTask {
this.versionHash = versionHash;
this.schemaHash = schemaHash;
this.timeout = timeout;
this.timeoutMs = timeoutMs;
this.isRestoreTask = isRestoreTask;
}
@ -66,8 +66,8 @@ public class SnapshotTask extends AgentTask {
return schemaHash;
}
public long getTimeout() {
return timeout;
public long getTimeoutMs() {
return timeoutMs;
}
public boolean isRestoreTask() {
@ -80,6 +80,7 @@ public class SnapshotTask extends AgentTask {
request.setVersion_hash(versionHash);
request.setList_files(true);
request.setPreferred_snapshot_version(2);
request.setTimeout(timeoutMs / 1000);
return request;
}
}

View File

@ -125,7 +125,7 @@ public class AgentTaskTest {
// clone
cloneTask =
new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, schemaHash1,
Arrays.asList(new TBackend("host1", 8290, 8390)), TStorageMedium.HDD, -1, -1);
Arrays.asList(new TBackend("host1", 8290, 8390)), TStorageMedium.HDD, -1, -1, 3600);
// rollup
rollupTask =

View File

@ -118,6 +118,7 @@ struct TCloneReq {
7: optional i32 task_version;
8: optional i64 src_path_hash;
9: optional i64 dest_path_hash;
10: optional i32 timeout_s;
}
struct TStorageMediumMigrateReq {