From 79e8b81758a814e7addaf4d291a48817a2ce8039 Mon Sep 17 00:00:00 2001 From: xiejiangzhao Date: Mon, 17 Mar 2025 23:39:06 +0100 Subject: [PATCH] feat: support upload persist --- internal/bootstrap/data/task.go | 1 + internal/bootstrap/task.go | 2 +- internal/fs/put.go | 12 ++++++++---- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/internal/bootstrap/data/task.go b/internal/bootstrap/data/task.go index 7100e2e2..ed7c0f1c 100644 --- a/internal/bootstrap/data/task.go +++ b/internal/bootstrap/data/task.go @@ -24,6 +24,7 @@ func InitialTasks() []model.TaskItem { {Key: "copy", PersistData: "[]"}, {Key: "download", PersistData: "[]"}, {Key: "transfer", PersistData: "[]"}, + {Key: "upload", PersistData: "[]"}, } return initialTaskItems } diff --git a/internal/bootstrap/task.go b/internal/bootstrap/task.go index c67e3029..2653508a 100644 --- a/internal/bootstrap/task.go +++ b/internal/bootstrap/task.go @@ -18,7 +18,7 @@ func taskFilterNegative(num int) int64 { } func InitTaskManager() { - fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(setting.GetInt(conf.TaskUploadThreadsNum, conf.Conf.Tasks.Upload.Workers)), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry)) //upload will not support persist + fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(setting.GetInt(conf.TaskUploadThreadsNum, conf.Conf.Tasks.Upload.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("upload", conf.Conf.Tasks.Upload.TaskPersistant), db.UpdateTaskDataFunc("upload", conf.Conf.Tasks.Upload.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry)) op.RegisterSettingChangingCallback(func() { fs.UploadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskUploadThreadsNum, conf.Conf.Tasks.Upload.Workers))) }) diff --git a/internal/fs/put.go b/internal/fs/put.go index bc33a3ac..c2bdd5fd 100644 --- a/internal/fs/put.go +++ b/internal/fs/put.go @@ -16,12 +16,14 @@ import ( type UploadTask struct { task.TaskExtension storage driver.Driver - dstDirActualPath string + FileName string `json:"file_name"` + DstStorageMp string `json:"dst_storage_mp"` + DstDirActualPath string `json:"dst_dir_actual_path"` file model.FileStreamer } func (t *UploadTask) GetName() string { - return fmt.Sprintf("upload %s to [%s](%s)", t.file.GetName(), t.storage.GetStorage().MountPath, t.dstDirActualPath) + return fmt.Sprintf("upload %s to [%s](%s)", t.FileName, t.DstStorageMp, t.DstDirActualPath) } func (t *UploadTask) GetStatus() string { @@ -32,7 +34,7 @@ func (t *UploadTask) Run() error { t.ClearEndTime() t.SetStartTime(time.Now()) defer func() { t.SetEndTime(time.Now()) }() - return op.Put(t.Ctx(), t.storage, t.dstDirActualPath, t.file, t.SetProgress, true) + return op.Put(t.Ctx(), t.storage, t.DstDirActualPath, t.file, t.SetProgress, true) } var UploadTaskManager *tache.Manager[*UploadTask] @@ -60,7 +62,9 @@ func putAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer) Creator: taskCreator, }, storage: storage, - dstDirActualPath: dstDirActualPath, + DstStorageMp: storage.GetStorage().MountPath, + DstDirActualPath: dstDirActualPath, + FileName: file.GetName(), file: file, } t.SetTotalBytes(file.GetSize())