diff --git a/drivers/189pc/driver.go b/drivers/189pc/driver.go index 382f710b..9c01a50f 100644 --- a/drivers/189pc/driver.go +++ b/drivers/189pc/driver.go @@ -1,6 +1,7 @@ package _189pc import ( + "container/ring" "context" "net/http" "strconv" @@ -28,6 +29,9 @@ type Cloud189PC struct { uploadThread int + familyTransferFolder *ring.Ring + cleanFamilyTransferFile func() + storageConfig driver.Config } @@ -52,7 +56,6 @@ func (y *Cloud189PC) Init(ctx context.Context) (err error) { } if !y.isFamily() && y.RootFolderID == "" { y.RootFolderID = "-11" - y.FamilyID = "" } // 限制上传线程数 @@ -79,11 +82,24 @@ func (y *Cloud189PC) Init(ctx context.Context) (err error) { } // 处理家庭云ID - if y.isFamily() && y.FamilyID == "" { + if y.FamilyID == "" { if y.FamilyID, err = y.getFamilyID(); err != nil { return err } } + + // 创建中转文件夹,防止重名文件 + if y.FamilyTransfer { + if y.familyTransferFolder, err = y.createFamilyTransferFolder(32); err != nil { + return err + } + } + + y.cleanFamilyTransferFile = utils.NewThrottle2(time.Minute, func() { + if err := y.cleanFamilyTransfer(context.TODO()); err != nil { + utils.Log.Errorf("cleanFamilyTransferFolderError:%s", err) + } + }) return } @@ -92,7 +108,7 @@ func (y *Cloud189PC) Drop(ctx context.Context) error { } func (y *Cloud189PC) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) { - return y.getFiles(ctx, dir.GetID()) + return y.getFiles(ctx, dir.GetID(), y.isFamily()) } func (y *Cloud189PC) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) { @@ -100,8 +116,9 @@ func (y *Cloud189PC) Link(ctx context.Context, file model.Obj, args model.LinkAr URL string `json:"fileDownloadUrl"` } + isFamily := y.isFamily() fullUrl := API_URL - if y.isFamily() { + if isFamily { fullUrl += "/family/file" } fullUrl += "/getFileDownloadUrl.action" @@ -109,7 +126,7 @@ func (y *Cloud189PC) Link(ctx context.Context, file model.Obj, args model.LinkAr _, err := y.get(fullUrl, func(r *resty.Request) { r.SetContext(ctx) r.SetQueryParam("fileId", file.GetID()) - if y.isFamily() { + if isFamily { r.SetQueryParams(map[string]string{ "familyId": y.FamilyID, }) @@ -119,7 +136,7 @@ func (y *Cloud189PC) Link(ctx context.Context, file model.Obj, args model.LinkAr "flag": "1", }) } - }, &downloadUrl) + }, &downloadUrl, isFamily) if err != nil { return nil, err } @@ -156,8 +173,9 @@ func (y *Cloud189PC) Link(ctx context.Context, file model.Obj, args model.LinkAr } func (y *Cloud189PC) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) (model.Obj, error) { + isFamily := y.isFamily() fullUrl := API_URL - if y.isFamily() { + if isFamily { fullUrl += "/family/file" } fullUrl += "/createFolder.action" @@ -169,7 +187,7 @@ func (y *Cloud189PC) MakeDir(ctx context.Context, parentDir model.Obj, dirName s "folderName": dirName, "relativePath": "", }) - if y.isFamily() { + if isFamily { req.SetQueryParams(map[string]string{ "familyId": y.FamilyID, "parentId": parentDir.GetID(), @@ -179,7 +197,7 @@ func (y *Cloud189PC) MakeDir(ctx context.Context, parentDir model.Obj, dirName s "parentFolderId": parentDir.GetID(), }) } - }, &newFolder) + }, &newFolder, isFamily) if err != nil { return nil, err } @@ -187,27 +205,14 @@ func (y *Cloud189PC) MakeDir(ctx context.Context, parentDir model.Obj, dirName s } func (y *Cloud189PC) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) { - var resp CreateBatchTaskResp - _, err := y.post(API_URL+"/batch/createBatchTask.action", func(req *resty.Request) { - req.SetContext(ctx) - req.SetFormData(map[string]string{ - "type": "MOVE", - "taskInfos": MustString(utils.Json.MarshalToString( - []BatchTaskInfo{ - { - FileId: srcObj.GetID(), - FileName: srcObj.GetName(), - IsFolder: BoolToNumber(srcObj.IsDir()), - }, - })), - "targetFolderId": dstDir.GetID(), - }) - if y.isFamily() { - req.SetFormData(map[string]string{ - "familyId": y.FamilyID, - }) - } - }, &resp) + isFamily := y.isFamily() + other := map[string]string{"targetFileName": dstDir.GetName()} + + resp, err := y.CreateBatchTask("MOVE", IF(isFamily, y.FamilyID, ""), dstDir.GetID(), other, BatchTaskInfo{ + FileId: srcObj.GetID(), + FileName: srcObj.GetName(), + IsFolder: BoolToNumber(srcObj.IsDir()), + }) if err != nil { return nil, err } @@ -218,10 +223,11 @@ func (y *Cloud189PC) Move(ctx context.Context, srcObj, dstDir model.Obj) (model. } func (y *Cloud189PC) Rename(ctx context.Context, srcObj model.Obj, newName string) (model.Obj, error) { + isFamily := y.isFamily() queryParam := make(map[string]string) fullUrl := API_URL method := http.MethodPost - if y.isFamily() { + if isFamily { fullUrl += "/family/file" method = http.MethodGet queryParam["familyId"] = y.FamilyID @@ -245,7 +251,7 @@ func (y *Cloud189PC) Rename(ctx context.Context, srcObj model.Obj, newName strin _, err := y.request(fullUrl, method, func(req *resty.Request) { req.SetContext(ctx).SetQueryParams(queryParam) - }, nil, newObj) + }, nil, newObj, isFamily) if err != nil { return nil, err } @@ -253,28 +259,15 @@ func (y *Cloud189PC) Rename(ctx context.Context, srcObj model.Obj, newName strin } func (y *Cloud189PC) Copy(ctx context.Context, srcObj, dstDir model.Obj) error { - var resp CreateBatchTaskResp - _, err := y.post(API_URL+"/batch/createBatchTask.action", func(req *resty.Request) { - req.SetContext(ctx) - req.SetFormData(map[string]string{ - "type": "COPY", - "taskInfos": MustString(utils.Json.MarshalToString( - []BatchTaskInfo{ - { - FileId: srcObj.GetID(), - FileName: srcObj.GetName(), - IsFolder: BoolToNumber(srcObj.IsDir()), - }, - })), - "targetFolderId": dstDir.GetID(), - "targetFileName": dstDir.GetName(), - }) - if y.isFamily() { - req.SetFormData(map[string]string{ - "familyId": y.FamilyID, - }) - } - }, &resp) + isFamily := y.isFamily() + other := map[string]string{"targetFileName": dstDir.GetName()} + + resp, err := y.CreateBatchTask("COPY", IF(isFamily, y.FamilyID, ""), dstDir.GetID(), other, BatchTaskInfo{ + FileId: srcObj.GetID(), + FileName: srcObj.GetName(), + IsFolder: BoolToNumber(srcObj.IsDir()), + }) + if err != nil { return err } @@ -282,27 +275,13 @@ func (y *Cloud189PC) Copy(ctx context.Context, srcObj, dstDir model.Obj) error { } func (y *Cloud189PC) Remove(ctx context.Context, obj model.Obj) error { - var resp CreateBatchTaskResp - _, err := y.post(API_URL+"/batch/createBatchTask.action", func(req *resty.Request) { - req.SetContext(ctx) - req.SetFormData(map[string]string{ - "type": "DELETE", - "taskInfos": MustString(utils.Json.MarshalToString( - []*BatchTaskInfo{ - { - FileId: obj.GetID(), - FileName: obj.GetName(), - IsFolder: BoolToNumber(obj.IsDir()), - }, - })), - }) + isFamily := y.isFamily() - if y.isFamily() { - req.SetFormData(map[string]string{ - "familyId": y.FamilyID, - }) - } - }, &resp) + resp, err := y.CreateBatchTask("DELETE", IF(isFamily, y.FamilyID, ""), "", nil, BatchTaskInfo{ + FileId: obj.GetID(), + FileName: obj.GetName(), + IsFolder: BoolToNumber(obj.IsDir()), + }) if err != nil { return err } @@ -310,10 +289,13 @@ func (y *Cloud189PC) Remove(ctx context.Context, obj model.Obj) error { return y.WaitBatchTask("DELETE", resp.TaskID, time.Millisecond*200) } -func (y *Cloud189PC) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { +func (y *Cloud189PC) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (newObj model.Obj, err error) { + overwrite := true + isFamily := y.isFamily() + // 响应时间长,按需启用 if y.Addition.RapidUpload && !stream.IsForceStreamUpload() { - if newObj, err := y.RapidUpload(ctx, dstDir, stream); err == nil { + if newObj, err := y.RapidUpload(ctx, dstDir, stream, isFamily, overwrite); err == nil { return newObj, nil } } @@ -322,17 +304,58 @@ func (y *Cloud189PC) Put(ctx context.Context, dstDir model.Obj, stream model.Fil if stream.IsForceStreamUpload() { uploadMethod = "stream" } + + // 旧版上传家庭云也有限制 + if uploadMethod == "old" { + return y.OldUpload(ctx, dstDir, stream, up, isFamily, overwrite) + } + + // 开启家庭云转存 + if !isFamily && y.FamilyTransfer { + // 修改上传目标为家庭云文件夹 + transferDstDir := dstDir + dstDir = (y.familyTransferFolder.Value).(*Cloud189Folder) + y.familyTransferFolder = y.familyTransferFolder.Next() + + isFamily = true + overwrite = false + + defer func() { + if newObj != nil { + // 批量任务有概率删不掉 + y.cleanFamilyTransferFile() + + // 转存家庭云文件到个人云 + err = y.SaveFamilyFileToPersonCloud(context.TODO(), y.FamilyID, newObj, transferDstDir, true) + + task := BatchTaskInfo{ + FileId: newObj.GetID(), + FileName: newObj.GetName(), + IsFolder: BoolToNumber(newObj.IsDir()), + } + + // 删除源文件 + if resp, err := y.CreateBatchTask("DELETE", y.FamilyID, "", nil, task); err == nil { + y.WaitBatchTask("DELETE", resp.TaskID, time.Second) + // 永久删除 + if resp, err := y.CreateBatchTask("CLEAR_RECYCLE", y.FamilyID, "", nil, task); err == nil { + y.WaitBatchTask("CLEAR_RECYCLE", resp.TaskID, time.Second) + } + } + newObj = nil + } + }() + } + switch uploadMethod { - case "old": - return y.OldUpload(ctx, dstDir, stream, up) case "rapid": - return y.FastUpload(ctx, dstDir, stream, up) + return y.FastUpload(ctx, dstDir, stream, up, isFamily, overwrite) case "stream": if stream.GetSize() == 0 { - return y.FastUpload(ctx, dstDir, stream, up) + return y.FastUpload(ctx, dstDir, stream, up, isFamily, overwrite) } fallthrough default: - return y.StreamUpload(ctx, dstDir, stream, up) + return y.StreamUpload(ctx, dstDir, stream, up, isFamily, overwrite) } } diff --git a/drivers/189pc/help.go b/drivers/189pc/help.go index ba1f3f08..49f957fa 100644 --- a/drivers/189pc/help.go +++ b/drivers/189pc/help.go @@ -192,3 +192,19 @@ func partSize(size int64) int64 { } return DEFAULT } + +func isBool(bs ...bool) bool { + for _, b := range bs { + if b { + return true + } + } + return false +} + +func IF[V any](o bool, t V, f V) V { + if o { + return t + } + return f +} diff --git a/drivers/189pc/meta.go b/drivers/189pc/meta.go index 079ac7cc..1891c5c0 100644 --- a/drivers/189pc/meta.go +++ b/drivers/189pc/meta.go @@ -16,6 +16,7 @@ type Addition struct { FamilyID string `json:"family_id"` UploadMethod string `json:"upload_method" type:"select" options:"stream,rapid,old" default:"stream"` UploadThread string `json:"upload_thread" default:"3" help:"1<=thread<=32"` + FamilyTransfer bool `json:"family_transfer"` RapidUpload bool `json:"rapid_upload"` NoUseOcr bool `json:"no_use_ocr"` } diff --git a/drivers/189pc/types.go b/drivers/189pc/types.go index d779659e..a1b3810f 100644 --- a/drivers/189pc/types.go +++ b/drivers/189pc/types.go @@ -3,10 +3,11 @@ package _189pc import ( "encoding/xml" "fmt" - "github.com/alist-org/alist/v3/pkg/utils" "sort" "strings" "time" + + "github.com/alist-org/alist/v3/pkg/utils" ) // 居然有四种返回方式 @@ -242,7 +243,12 @@ type BatchTaskInfo struct { // IsFolder 是否是文件夹,0-否,1-是 IsFolder int `json:"isFolder"` // SrcParentId 文件所在父目录ID - //SrcParentId string `json:"srcParentId"` + SrcParentId string `json:"srcParentId,omitempty"` + + /* 冲突管理 */ + // 1 -> 跳过 2 -> 保留 3 -> 覆盖 + DealWay int `json:"dealWay,omitempty"` + IsConflict int `json:"isConflict,omitempty"` } /* 上传部分 */ @@ -355,6 +361,14 @@ type BatchTaskStateResp struct { TaskStatus int `json:"taskStatus"` //1 初始化 2 存在冲突 3 执行中,4 完成 } +type BatchTaskConflictTaskInfoResp struct { + SessionKey string `json:"sessionKey"` + TargetFolderID int `json:"targetFolderId"` + TaskID string `json:"taskId"` + TaskInfos []BatchTaskInfo + TaskType int `json:"taskType"` +} + /* query 加密参数*/ type Params map[string]string diff --git a/drivers/189pc/utils.go b/drivers/189pc/utils.go index 5e403a83..ee96af3e 100644 --- a/drivers/189pc/utils.go +++ b/drivers/189pc/utils.go @@ -2,6 +2,7 @@ package _189pc import ( "bytes" + "container/ring" "context" "crypto/md5" "encoding/base64" @@ -54,11 +55,11 @@ const ( CHANNEL_ID = "web_cloud.189.cn" ) -func (y *Cloud189PC) SignatureHeader(url, method, params string) map[string]string { +func (y *Cloud189PC) SignatureHeader(url, method, params string, isFamily bool) map[string]string { dateOfGmt := getHttpDateStr() sessionKey := y.tokenInfo.SessionKey sessionSecret := y.tokenInfo.SessionSecret - if y.isFamily() { + if isFamily { sessionKey = y.tokenInfo.FamilySessionKey sessionSecret = y.tokenInfo.FamilySessionSecret } @@ -72,9 +73,9 @@ func (y *Cloud189PC) SignatureHeader(url, method, params string) map[string]stri return header } -func (y *Cloud189PC) EncryptParams(params Params) string { +func (y *Cloud189PC) EncryptParams(params Params, isFamily bool) string { sessionSecret := y.tokenInfo.SessionSecret - if y.isFamily() { + if isFamily { sessionSecret = y.tokenInfo.FamilySessionSecret } if params != nil { @@ -83,17 +84,17 @@ func (y *Cloud189PC) EncryptParams(params Params) string { return "" } -func (y *Cloud189PC) request(url, method string, callback base.ReqCallback, params Params, resp interface{}) ([]byte, error) { +func (y *Cloud189PC) request(url, method string, callback base.ReqCallback, params Params, resp interface{}, isFamily ...bool) ([]byte, error) { req := y.client.R().SetQueryParams(clientSuffix()) // 设置params - paramsData := y.EncryptParams(params) + paramsData := y.EncryptParams(params, isBool(isFamily...)) if paramsData != "" { req.SetQueryParam("params", paramsData) } // Signature - req.SetHeaders(y.SignatureHeader(url, method, paramsData)) + req.SetHeaders(y.SignatureHeader(url, method, paramsData, isBool(isFamily...))) var erron RespErr req.SetError(&erron) @@ -129,15 +130,15 @@ func (y *Cloud189PC) request(url, method string, callback base.ReqCallback, para return res.Body(), nil } -func (y *Cloud189PC) get(url string, callback base.ReqCallback, resp interface{}) ([]byte, error) { - return y.request(url, http.MethodGet, callback, nil, resp) +func (y *Cloud189PC) get(url string, callback base.ReqCallback, resp interface{}, isFamily ...bool) ([]byte, error) { + return y.request(url, http.MethodGet, callback, nil, resp, isFamily...) } -func (y *Cloud189PC) post(url string, callback base.ReqCallback, resp interface{}) ([]byte, error) { - return y.request(url, http.MethodPost, callback, nil, resp) +func (y *Cloud189PC) post(url string, callback base.ReqCallback, resp interface{}, isFamily ...bool) ([]byte, error) { + return y.request(url, http.MethodPost, callback, nil, resp, isFamily...) } -func (y *Cloud189PC) put(ctx context.Context, url string, headers map[string]string, sign bool, file io.Reader) ([]byte, error) { +func (y *Cloud189PC) put(ctx context.Context, url string, headers map[string]string, sign bool, file io.Reader, isFamily bool) ([]byte, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, file) if err != nil { return nil, err @@ -154,7 +155,7 @@ func (y *Cloud189PC) put(ctx context.Context, url string, headers map[string]str } if sign { - for key, value := range y.SignatureHeader(url, http.MethodPut, "") { + for key, value := range y.SignatureHeader(url, http.MethodPut, "", isFamily) { req.Header.Add(key, value) } } @@ -181,9 +182,9 @@ func (y *Cloud189PC) put(ctx context.Context, url string, headers map[string]str } return body, nil } -func (y *Cloud189PC) getFiles(ctx context.Context, fileId string) ([]model.Obj, error) { +func (y *Cloud189PC) getFiles(ctx context.Context, fileId string, isFamily bool) ([]model.Obj, error) { fullUrl := API_URL - if y.isFamily() { + if isFamily { fullUrl += "/family/file" } fullUrl += "/listFiles.action" @@ -201,7 +202,7 @@ func (y *Cloud189PC) getFiles(ctx context.Context, fileId string) ([]model.Obj, "pageNum": fmt.Sprint(pageNum), "pageSize": "130", }) - if y.isFamily() { + if isFamily { r.SetQueryParams(map[string]string{ "familyId": y.FamilyID, "orderBy": toFamilyOrderBy(y.OrderBy), @@ -214,7 +215,7 @@ func (y *Cloud189PC) getFiles(ctx context.Context, fileId string) ([]model.Obj, "descending": toDesc(y.OrderDirection), }) } - }, &resp) + }, &resp, isFamily) if err != nil { return nil, err } @@ -437,7 +438,7 @@ func (y *Cloud189PC) refreshSession() (err error) { // 普通上传 // 无法上传大小为0的文件 -func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { +func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress, isFamily bool, overwrite bool) (model.Obj, error) { var sliceSize = partSize(file.GetSize()) count := int(math.Ceil(float64(file.GetSize()) / float64(sliceSize))) lastPartSize := file.GetSize() % sliceSize @@ -454,7 +455,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo } fullUrl := UPLOAD_URL - if y.isFamily() { + if isFamily { params.Set("familyId", y.FamilyID) fullUrl += "/family" } else { @@ -466,7 +467,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo var initMultiUpload InitMultiUploadResp _, err := y.request(fullUrl+"/initMultiUpload", http.MethodGet, func(req *resty.Request) { req.SetContext(ctx) - }, params, &initMultiUpload) + }, params, &initMultiUpload, isFamily) if err != nil { return nil, err } @@ -502,14 +503,14 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo partInfo := fmt.Sprintf("%d-%s", i, base64.StdEncoding.EncodeToString(md5Bytes)) threadG.Go(func(ctx context.Context) error { - uploadUrls, err := y.GetMultiUploadUrls(ctx, initMultiUpload.Data.UploadFileID, partInfo) + uploadUrls, err := y.GetMultiUploadUrls(ctx, isFamily, initMultiUpload.Data.UploadFileID, partInfo) if err != nil { return err } // step.4 上传切片 uploadUrl := uploadUrls[0] - _, err = y.put(ctx, uploadUrl.RequestURL, uploadUrl.Headers, false, bytes.NewReader(byteData)) + _, err = y.put(ctx, uploadUrl.RequestURL, uploadUrl.Headers, false, bytes.NewReader(byteData), isFamily) if err != nil { return err } @@ -538,21 +539,21 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo "sliceMd5": sliceMd5Hex, "lazyCheck": "1", "isLog": "0", - "opertype": "3", - }, &resp) + "opertype": IF(overwrite, "3", "1"), + }, &resp, isFamily) if err != nil { return nil, err } return resp.toFile(), nil } -func (y *Cloud189PC) RapidUpload(ctx context.Context, dstDir model.Obj, stream model.FileStreamer) (model.Obj, error) { +func (y *Cloud189PC) RapidUpload(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, isFamily bool, overwrite bool) (model.Obj, error) { fileMd5 := stream.GetHash().GetHash(utils.MD5) if len(fileMd5) < utils.MD5.Width { return nil, errors.New("invalid hash") } - uploadInfo, err := y.OldUploadCreate(ctx, dstDir.GetID(), fileMd5, stream.GetName(), fmt.Sprint(stream.GetSize())) + uploadInfo, err := y.OldUploadCreate(ctx, dstDir.GetID(), fileMd5, stream.GetName(), fmt.Sprint(stream.GetSize()), isFamily) if err != nil { return nil, err } @@ -561,11 +562,11 @@ func (y *Cloud189PC) RapidUpload(ctx context.Context, dstDir model.Obj, stream m return nil, errors.New("rapid upload fail") } - return y.OldUploadCommit(ctx, uploadInfo.FileCommitUrl, uploadInfo.UploadFileId) + return y.OldUploadCommit(ctx, uploadInfo.FileCommitUrl, uploadInfo.UploadFileId, isFamily, overwrite) } // 快传 -func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { +func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress, isFamily bool, overwrite bool) (model.Obj, error) { tempFile, err := file.CacheFullInTempFile() if err != nil { return nil, err @@ -609,7 +610,7 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode } fullUrl := UPLOAD_URL - if y.isFamily() { + if isFamily { fullUrl += "/family" } else { //params.Set("extend", `{"opScene":"1","relativepath":"","rootfolderid":""}`) @@ -628,13 +629,13 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode "sliceSize": fmt.Sprint(sliceSize), "sliceMd5": sliceMd5Hex, } - if y.isFamily() { + if isFamily { params.Set("familyId", y.FamilyID) } var uploadInfo InitMultiUploadResp _, err = y.request(fullUrl+"/initMultiUpload", http.MethodGet, func(req *resty.Request) { req.SetContext(ctx) - }, params, &uploadInfo) + }, params, &uploadInfo, isFamily) if err != nil { return nil, err } @@ -659,7 +660,7 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode i, uploadPart := i, uploadPart threadG.Go(func(ctx context.Context) error { // step.3 获取上传链接 - uploadUrls, err := y.GetMultiUploadUrls(ctx, uploadInfo.UploadFileID, uploadPart) + uploadUrls, err := y.GetMultiUploadUrls(ctx, isFamily, uploadInfo.UploadFileID, uploadPart) if err != nil { return err } @@ -671,7 +672,7 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode } // step.4 上传切片 - _, err = y.put(ctx, uploadUrl.RequestURL, uploadUrl.Headers, false, io.NewSectionReader(tempFile, offset, byteSize)) + _, err = y.put(ctx, uploadUrl.RequestURL, uploadUrl.Headers, false, io.NewSectionReader(tempFile, offset, byteSize), isFamily) if err != nil { return err } @@ -698,8 +699,8 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode }, Params{ "uploadFileId": uploadInfo.UploadFileID, "isLog": "0", - "opertype": "3", - }, &resp) + "opertype": IF(overwrite, "3", "1"), + }, &resp, isFamily) if err != nil { return nil, err } @@ -708,9 +709,9 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode // 获取上传切片信息 // 对http body有大小限制,分片信息太多会出错 -func (y *Cloud189PC) GetMultiUploadUrls(ctx context.Context, uploadFileId string, partInfo ...string) ([]UploadUrlInfo, error) { +func (y *Cloud189PC) GetMultiUploadUrls(ctx context.Context, isFamily bool, uploadFileId string, partInfo ...string) ([]UploadUrlInfo, error) { fullUrl := UPLOAD_URL - if y.isFamily() { + if isFamily { fullUrl += "/family" } else { fullUrl += "/person" @@ -723,7 +724,7 @@ func (y *Cloud189PC) GetMultiUploadUrls(ctx context.Context, uploadFileId string }, Params{ "uploadFileId": uploadFileId, "partInfo": strings.Join(partInfo, ","), - }, &uploadUrlsResp) + }, &uploadUrlsResp, isFamily) if err != nil { return nil, err } @@ -752,7 +753,7 @@ func (y *Cloud189PC) GetMultiUploadUrls(ctx context.Context, uploadFileId string } // 旧版本上传,家庭云不支持覆盖 -func (y *Cloud189PC) OldUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { +func (y *Cloud189PC) OldUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress, isFamily bool, overwrite bool) (model.Obj, error) { tempFile, err := file.CacheFullInTempFile() if err != nil { return nil, err @@ -763,7 +764,7 @@ func (y *Cloud189PC) OldUpload(ctx context.Context, dstDir model.Obj, file model } // 创建上传会话 - uploadInfo, err := y.OldUploadCreate(ctx, dstDir.GetID(), fileMd5, file.GetName(), fmt.Sprint(file.GetSize())) + uploadInfo, err := y.OldUploadCreate(ctx, dstDir.GetID(), fileMd5, file.GetName(), fmt.Sprint(file.GetSize()), isFamily) if err != nil { return nil, err } @@ -780,14 +781,14 @@ func (y *Cloud189PC) OldUpload(ctx context.Context, dstDir model.Obj, file model "Expect": "100-continue", } - if y.isFamily() { + if isFamily { header["FamilyId"] = fmt.Sprint(y.FamilyID) header["UploadFileId"] = fmt.Sprint(status.UploadFileId) } else { header["Edrive-UploadFileId"] = fmt.Sprint(status.UploadFileId) } - _, err := y.put(ctx, status.FileUploadUrl, header, true, io.NopCloser(tempFile)) + _, err := y.put(ctx, status.FileUploadUrl, header, true, io.NopCloser(tempFile), isFamily) if err, ok := err.(*RespErr); ok && err.Code != "InputStreamReadError" { return nil, err } @@ -802,10 +803,10 @@ func (y *Cloud189PC) OldUpload(ctx context.Context, dstDir model.Obj, file model "uploadFileId": fmt.Sprint(status.UploadFileId), "resumePolicy": "1", }) - if y.isFamily() { + if isFamily { req.SetQueryParam("familyId", fmt.Sprint(y.FamilyID)) } - }, &status) + }, &status, isFamily) if err != nil { return nil, err } @@ -815,20 +816,20 @@ func (y *Cloud189PC) OldUpload(ctx context.Context, dstDir model.Obj, file model up(float64(status.GetSize()) / float64(file.GetSize()) * 100) } - return y.OldUploadCommit(ctx, status.FileCommitUrl, status.UploadFileId) + return y.OldUploadCommit(ctx, status.FileCommitUrl, status.UploadFileId, isFamily, overwrite) } // 创建上传会话 -func (y *Cloud189PC) OldUploadCreate(ctx context.Context, parentID string, fileMd5, fileName, fileSize string) (*CreateUploadFileResp, error) { +func (y *Cloud189PC) OldUploadCreate(ctx context.Context, parentID string, fileMd5, fileName, fileSize string, isFamily bool) (*CreateUploadFileResp, error) { var uploadInfo CreateUploadFileResp fullUrl := API_URL + "/createUploadFile.action" - if y.isFamily() { + if isFamily { fullUrl = API_URL + "/family/file/createFamilyFile.action" } _, err := y.post(fullUrl, func(req *resty.Request) { req.SetContext(ctx) - if y.isFamily() { + if isFamily { req.SetQueryParams(map[string]string{ "familyId": y.FamilyID, "parentId": parentID, @@ -849,7 +850,7 @@ func (y *Cloud189PC) OldUploadCreate(ctx context.Context, parentID string, fileM "isLog": "0", }) } - }, &uploadInfo) + }, &uploadInfo, isFamily) if err != nil { return nil, err @@ -858,11 +859,11 @@ func (y *Cloud189PC) OldUploadCreate(ctx context.Context, parentID string, fileM } // 提交上传文件 -func (y *Cloud189PC) OldUploadCommit(ctx context.Context, fileCommitUrl string, uploadFileID int64) (model.Obj, error) { +func (y *Cloud189PC) OldUploadCommit(ctx context.Context, fileCommitUrl string, uploadFileID int64, isFamily bool, overwrite bool) (model.Obj, error) { var resp OldCommitUploadFileResp _, err := y.post(fileCommitUrl, func(req *resty.Request) { req.SetContext(ctx) - if y.isFamily() { + if isFamily { req.SetHeaders(map[string]string{ "ResumePolicy": "1", "UploadFileId": fmt.Sprint(uploadFileID), @@ -870,13 +871,13 @@ func (y *Cloud189PC) OldUploadCommit(ctx context.Context, fileCommitUrl string, }) } else { req.SetFormData(map[string]string{ - "opertype": "3", + "opertype": IF(overwrite, "3", "1"), "resumePolicy": "1", "uploadFileId": fmt.Sprint(uploadFileID), "isLog": "0", }) } - }, &resp) + }, &resp, isFamily) if err != nil { return nil, err } @@ -895,10 +896,100 @@ func (y *Cloud189PC) isLogin() bool { return err == nil } +// 创建家庭云中转文件夹 +func (y *Cloud189PC) createFamilyTransferFolder(count int) (*ring.Ring, error) { + folders := ring.New(count) + var rootFolder Cloud189Folder + _, err := y.post(API_URL+"/family/file/createFolder.action", func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "folderName": "FamilyTransferFolder", + "familyId": y.FamilyID, + }) + }, &rootFolder, true) + if err != nil { + return nil, err + } + + folderCount := 0 + + // 获取已有目录 + files, err := y.getFiles(context.TODO(), rootFolder.GetID(), true) + if err != nil { + return nil, err + } + for _, file := range files { + if folder, ok := file.(*Cloud189Folder); ok { + folders.Value = folder + folders = folders.Next() + folderCount++ + } + } + + // 创建新的目录 + for folderCount < count { + var newFolder Cloud189Folder + _, err := y.post(API_URL+"/family/file/createFolder.action", func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "folderName": uuid.NewString(), + "familyId": y.FamilyID, + "parentId": rootFolder.GetID(), + }) + }, &newFolder, true) + if err != nil { + return nil, err + } + folders.Value = &newFolder + folders = folders.Next() + folderCount++ + } + return folders, nil +} + +// 清理中转文件夹 +func (y *Cloud189PC) cleanFamilyTransfer(ctx context.Context) error { + var tasks []BatchTaskInfo + r := y.familyTransferFolder + for p := r.Next(); p != r; p = p.Next() { + folder := p.Value.(*Cloud189Folder) + + files, err := y.getFiles(ctx, folder.GetID(), true) + if err != nil { + return err + } + for _, file := range files { + tasks = append(tasks, BatchTaskInfo{ + FileId: file.GetID(), + FileName: file.GetName(), + IsFolder: BoolToNumber(file.IsDir()), + }) + } + } + + if len(tasks) > 0 { + // 删除 + resp, err := y.CreateBatchTask("DELETE", y.FamilyID, "", nil, tasks...) + if err != nil { + return err + } + err = y.WaitBatchTask("DELETE", resp.TaskID, time.Second) + if err != nil { + return err + } + // 永久删除 + resp, err = y.CreateBatchTask("CLEAR_RECYCLE", y.FamilyID, "", nil, tasks...) + if err != nil { + return err + } + err = y.WaitBatchTask("CLEAR_RECYCLE", resp.TaskID, time.Second) + return err + } + return nil +} + // 获取家庭云所有用户信息 func (y *Cloud189PC) getFamilyInfoList() ([]FamilyInfoResp, error) { var resp FamilyInfoListResp - _, err := y.get(API_URL+"/family/manage/getFamilyList.action", nil, &resp) + _, err := y.get(API_URL+"/family/manage/getFamilyList.action", nil, &resp, true) if err != nil { return nil, err } @@ -922,6 +1013,73 @@ func (y *Cloud189PC) getFamilyID() (string, error) { return fmt.Sprint(infos[0].FamilyID), nil } +// 保存家庭云中的文件到个人云 +func (y *Cloud189PC) SaveFamilyFileToPersonCloud(ctx context.Context, familyId string, srcObj, dstDir model.Obj, overwrite bool) error { + // _, err := y.post(API_URL+"/family/file/saveFileToMember.action", func(req *resty.Request) { + // req.SetQueryParams(map[string]string{ + // "channelId": "home", + // "familyId": familyId, + // "destParentId": destParentId, + // "fileIdList": familyFileId, + // }) + // }, nil) + // return err + + task := BatchTaskInfo{ + FileId: srcObj.GetID(), + FileName: srcObj.GetName(), + IsFolder: BoolToNumber(srcObj.IsDir()), + } + resp, err := y.CreateBatchTask("COPY", familyId, dstDir.GetID(), map[string]string{ + "groupId": "null", + "copyType": "2", + "shareId": "null", + }, task) + if err != nil { + return err + } + + for { + state, err := y.CheckBatchTask("COPY", resp.TaskID) + if err != nil { + return err + } + switch state.TaskStatus { + case 2: + task.DealWay = IF(overwrite, 3, 2) + // 冲突时覆盖文件 + if err := y.ManageBatchTask("COPY", resp.TaskID, dstDir.GetID(), task); err != nil { + return err + } + case 4: + return nil + } + time.Sleep(time.Millisecond * 400) + } +} + +func (y *Cloud189PC) CreateBatchTask(aType string, familyID string, targetFolderId string, other map[string]string, taskInfos ...BatchTaskInfo) (*CreateBatchTaskResp, error) { + var resp CreateBatchTaskResp + _, err := y.post(API_URL+"/batch/createBatchTask.action", func(req *resty.Request) { + req.SetFormData(map[string]string{ + "type": aType, + "taskInfos": MustString(utils.Json.MarshalToString(taskInfos)), + }) + if targetFolderId != "" { + req.SetFormData(map[string]string{"targetFolderId": targetFolderId}) + } + if familyID != "" { + req.SetFormData(map[string]string{"familyId": familyID}) + } + req.SetFormData(other) + }, &resp, familyID != "") + if err != nil { + return nil, err + } + return &resp, nil +} + +// 检测任务状态 func (y *Cloud189PC) CheckBatchTask(aType string, taskID string) (*BatchTaskStateResp, error) { var resp BatchTaskStateResp _, err := y.post(API_URL+"/batch/checkBatchTask.action", func(req *resty.Request) { @@ -936,6 +1094,37 @@ func (y *Cloud189PC) CheckBatchTask(aType string, taskID string) (*BatchTaskStat return &resp, nil } +// 获取冲突的任务信息 +func (y *Cloud189PC) GetConflictTaskInfo(aType string, taskID string) (*BatchTaskConflictTaskInfoResp, error) { + var resp BatchTaskConflictTaskInfoResp + _, err := y.post(API_URL+"/batch/getConflictTaskInfo.action", func(req *resty.Request) { + req.SetFormData(map[string]string{ + "type": aType, + "taskId": taskID, + }) + }, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +// 处理冲突 +func (y *Cloud189PC) ManageBatchTask(aType string, taskID string, targetFolderId string, taskInfos ...BatchTaskInfo) error { + _, err := y.post(API_URL+"/batch/manageBatchTask.action", func(req *resty.Request) { + req.SetFormData(map[string]string{ + "targetFolderId": targetFolderId, + "type": aType, + "taskId": taskID, + "taskInfos": MustString(utils.Json.MarshalToString(taskInfos)), + }) + }, nil) + return err +} + +var ErrIsConflict = errors.New("there is a conflict with the target object") + +// 等待任务完成 func (y *Cloud189PC) WaitBatchTask(aType string, taskID string, t time.Duration) error { for { state, err := y.CheckBatchTask(aType, taskID) @@ -944,7 +1133,7 @@ func (y *Cloud189PC) WaitBatchTask(aType string, taskID string, t time.Duration) } switch state.TaskStatus { case 2: - return errors.New("there is a conflict with the target object") + return ErrIsConflict case 4: return nil } diff --git a/pkg/utils/time.go b/pkg/utils/time.go index a9d9b5b6..aa706928 100644 --- a/pkg/utils/time.go +++ b/pkg/utils/time.go @@ -37,3 +37,28 @@ func NewDebounce2(interval time.Duration, f func()) func() { (*time.Timer)(timer).Reset(interval) } } + +func NewThrottle(interval time.Duration) func(func()) { + var lastCall time.Time + + return func(fn func()) { + now := time.Now() + if now.Sub(lastCall) < interval { + return + } + time.AfterFunc(interval, fn) + lastCall = now + } +} + +func NewThrottle2(interval time.Duration, fn func()) func() { + var lastCall time.Time + return func() { + now := time.Now() + if now.Sub(lastCall) < interval { + return + } + time.AfterFunc(interval, fn) + lastCall = now + } +}