Skip to content

Commit 58735c7

Browse files
authored
Merge pull request #68 from beclab/ci/upload_to_cloud
ci: upload to cloud
2 parents fdbc201 + fa8bce6 commit 58735c7

File tree

22 files changed

+783
-349
lines changed

22 files changed

+783
-349
lines changed

pkg/common/constant.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ const (
5656
Pending = "pending"
5757
Running = "running"
5858
Failed = "failed"
59-
Cancelled = "cancelled"
59+
Canceled = "canceled"
6060
Completed = "completed"
6161
)
6262

pkg/common/utils.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,3 +220,29 @@ func EscapeAndJoin(input string, delimiter string) string {
220220
}
221221
return strings.Join(segments, delimiter)
222222
}
223+
224+
func PathExists(path string) bool {
225+
_, err := os.Stat(path)
226+
if err == nil {
227+
return true
228+
}
229+
230+
if os.IsNotExist(err) {
231+
return false
232+
}
233+
return false
234+
}
235+
236+
// ListContains returns a boolean that v is in items
237+
func ListContains[T comparable](items []T, v T) bool {
238+
if items == nil {
239+
return false
240+
}
241+
242+
for _, item := range items {
243+
if v == item {
244+
return true
245+
}
246+
}
247+
return false
248+
}

pkg/drivers/clouds/cloud.go

Lines changed: 95 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import (
77
"files/pkg/diskcache"
88
"files/pkg/drivers/base"
99
"files/pkg/drivers/clouds/rclone/operations"
10+
"files/pkg/drivers/posix/upload"
1011
"files/pkg/files"
1112
"files/pkg/models"
1213
"files/pkg/preview"
14+
"files/pkg/tasks"
1315
"fmt"
1416
"net/url"
1517
"path/filepath"
@@ -356,6 +358,8 @@ func (s *CloudStorage) Delete(fileDeleteArg *models.FileDeleteArgs) ([]byte, err
356358

357359
klog.Infof("Cloud delete, user: %s, param: %s", user, common.ToJson(fileParam))
358360

361+
// todo need to copy .keep file in fileParam.Path first
362+
359363
var invalidPaths []string
360364

361365
for _, dirent := range dirents {
@@ -370,27 +374,7 @@ func (s *CloudStorage) Delete(fileDeleteArg *models.FileDeleteArgs) ([]byte, err
370374
return common.ToBytes(invalidPaths), fmt.Errorf("invalid path")
371375
}
372376

373-
for _, dp := range dirents {
374-
dp = strings.TrimSpace(dp) // /path/ or /file
375-
376-
klog.Infof("Cloud delete, user: %s, dirent: %s", user, dp)
377-
378-
dpd, err := url.PathUnescape(dp)
379-
if err != nil {
380-
klog.Errorf("Cloud delete, path unescape error: %v, path: %s", err, dp)
381-
deleteFailedPaths = append(deleteFailedPaths, dp)
382-
continue
383-
}
384-
385-
_, err = s.service.Delete(fileParam, dpd)
386-
if err != nil {
387-
klog.Errorf("Cloud delete, delete files error: %v, user: %s", err, user)
388-
deleteFailedPaths = append(deleteFailedPaths, dp)
389-
continue
390-
}
391-
392-
klog.Infof("Cloud delete, delete success, user: %s, file: %s", user, dpd)
393-
}
377+
deleteFailedPaths, _ = s.service.command.Delete(fileParam, dirents)
394378

395379
if err := s.service.command.GetOperation().FsCacheClear(); err != nil {
396380
klog.Errorf("Cloud delete, fscache clear error: %v", err)
@@ -539,14 +523,101 @@ func (s *CloudStorage) getFiles(fileParam *models.FileParam) (*models.CloudListR
539523
return res, nil
540524
}
541525

526+
/**
527+
* UploadLink
528+
*/
542529
func (s *CloudStorage) UploadLink(fileUploadArg *models.FileUploadArgs) ([]byte, error) {
543-
return nil, nil
530+
var user = fileUploadArg.FileParam.Owner
531+
532+
klog.Infof("Cloud uploadLink, user: %s, param: %s", user, common.ToJson(fileUploadArg.FileParam))
533+
534+
data, err := upload.HandleUploadLink(fileUploadArg.FileParam, fileUploadArg.From)
535+
536+
klog.Infof("Cloud uploadLink, done! data: %s", string(data))
537+
538+
return data, err
544539
}
545540

541+
/**
542+
* UploadedBytes
543+
*/
546544
func (s *CloudStorage) UploadedBytes(fileUploadArg *models.FileUploadArgs) ([]byte, error) {
547-
return nil, nil
545+
var user = fileUploadArg.FileParam.Owner
546+
klog.Infof("Cloud uploadBytes, user: %s, param: %s", user, common.ToJson(fileUploadArg))
547+
548+
data, err := upload.HandleUploadedBytes(fileUploadArg.FileParam, fileUploadArg.FileName)
549+
550+
klog.Infof("Cloud uploadBytes, done! data: %s", string(data))
551+
552+
return data, err
548553
}
549554

555+
/**
556+
* UploadChunks
557+
*/
550558
func (s *CloudStorage) UploadChunks(fileUploadArg *models.FileUploadArgs) ([]byte, error) {
551-
return nil, nil
559+
var user = fileUploadArg.FileParam.Owner
560+
var uploadId = fileUploadArg.UploadId
561+
var chunkInfo = fileUploadArg.ChunkInfo
562+
var taskId = chunkInfo.UploadToCloudTaskId
563+
// var canceled = chunkInfo.UploadToCloudTaskCancel
564+
565+
if taskId != "" { // ~ query task status
566+
// todo cancel
567+
return s.service.checkUploadTaskState(user, uploadId, taskId, chunkInfo)
568+
}
569+
570+
klog.Infof("Cloud uploadChunks, user: %s, uploadId: %s, param: %s", user, fileUploadArg.UploadId, common.ToJson(fileUploadArg.FileParam))
571+
572+
ok, fileInfo, err := upload.HandleUploadChunks(fileUploadArg.FileParam, fileUploadArg.UploadId, *fileUploadArg.ChunkInfo, fileUploadArg.Ranges)
573+
574+
if err != nil {
575+
return nil, err
576+
}
577+
578+
if fileInfo == nil {
579+
return common.ToBytes(&upload.FileUploadSucced{Success: true}), nil
580+
}
581+
582+
if !ok {
583+
return common.ToBytes(fileInfo), nil // frontend ignored
584+
}
585+
586+
klog.Infof("Cloud uploadChunks, phase done, tempPath: %s, data: %s", fileInfo.UploadTempPath, common.ToJson(fileInfo))
587+
588+
var uploadTempPath = fileInfo.UploadTempPath
589+
if !strings.HasSuffix(uploadTempPath, "/") {
590+
uploadTempPath = uploadTempPath + "/"
591+
}
592+
var srcParam = &models.FileParam{}
593+
srcParam.GetFileParam(uploadTempPath)
594+
srcParam.Path = srcParam.Path + fileInfo.Id
595+
596+
var dstParam = fileUploadArg.FileParam
597+
598+
uploadCopyParam, err := s.service.CreateUploadParam(srcParam, dstParam, fileInfo.Name, fileInfo.FileInfo.FileRelativePath)
599+
if err != nil {
600+
return nil, err
601+
}
602+
603+
klog.Infof("Cloud uploadChunks, uploadToCloud param: %s", common.ToJson(uploadCopyParam))
604+
605+
var task = tasks.TaskManager.CreateTask(uploadCopyParam)
606+
if err = task.Execute(task.UploadToCloud); err != nil {
607+
klog.Errorf("Cloud uploadChunks, execute uploadToCloud error: %v", err)
608+
return nil, err
609+
}
610+
611+
taskId = task.Id()
612+
613+
klog.Infof("Cloud uploadChunks, uploadToCloud waiting for execute, taskId: %s", taskId)
614+
615+
return common.ToBytes(
616+
&upload.FileUploadSucced{
617+
Success: true,
618+
IsCloud: true,
619+
TaskId: taskId,
620+
Size: fileInfo.Size,
621+
State: common.Pending,
622+
}), nil
552623
}

pkg/drivers/clouds/rclone/common/common.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,10 @@ var (
44
ServeAddr = "http://127.0.0.1:5572"
55
ServeHost = "127.0.0.1"
66
)
7+
8+
type ErrorMessage struct {
9+
Error string `json:"error"`
10+
Input interface{} `json:"input"`
11+
Path string `json:"path"`
12+
Status int `json:"status"`
13+
}

pkg/drivers/clouds/rclone/interface.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ type Interface interface {
1919
GetJob() job.Interface
2020

2121
GetFilesSize(fileParam *models.FileParam) (int64, error)
22-
GetFilesList(param *models.FileParam) (*operations.OperationsList, error)
22+
GetFilesList(param *models.FileParam, getPrefix bool) (*operations.OperationsList, error)
2323
CreateEmptyDirectories(src, target *models.FileParam) error
2424

2525
Copy(src, dst *models.FileParam) (*operations.OperationsAsyncJobResp, error)
26+
Delete(param *models.FileParam, dirents []string) ([]string, error)
2627
Clear(param *models.FileParam) error
2728
}
2829

pkg/drivers/clouds/rclone/operations/operations.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -265,10 +265,11 @@ func (o *operations) Deletefile(fs string, remote string) error {
265265

266266
func (o *operations) Purge(fs string, remote string) error {
267267
var url = fmt.Sprintf("%s/%s", common.ServeAddr, PurgePath)
268-
268+
var leaveRoot = true
269269
var param = OperationsReq{
270-
Fs: fs, // xxx:yyy/parent
271-
Remote: remote, // dir/
270+
Fs: fs, // xxx:yyy/parent
271+
Remote: remote, // dir/
272+
LeaveRoot: &leaveRoot,
272273
}
273274

274275
klog.Infof("[rclone] operations purge, param: %s", commonutils.ToJson(param))
@@ -285,20 +286,20 @@ func (o *operations) Purge(fs string, remote string) error {
285286
}
286287

287288
func (o *operations) FsCacheClear() error {
288-
var url = fmt.Sprintf("%s/%s", common.ServeAddr, FsCacheClearPath)
289+
// var url = fmt.Sprintf("%s/%s", common.ServeAddr, FsCacheClearPath)
289290

290-
klog.Info("[rclone] operations fscacheClear")
291+
// klog.Info("[rclone] operations fscacheClear")
291292

292-
var header = make(http.Header)
293-
header.Add("Content-Type", "application/octet-stream")
293+
// var header = make(http.Header)
294+
// header.Add("Content-Type", "application/octet-stream")
294295

295-
_, err := utils.Request(context.Background(), url, http.MethodPost, &header, nil)
296-
if err != nil {
297-
klog.Errorf("[rclone] operations fscacheClear error: %v", err)
298-
return err
299-
}
296+
// _, err := utils.Request(context.Background(), url, http.MethodPost, &header, nil)
297+
// if err != nil {
298+
// klog.Errorf("[rclone] operations fscacheClear error: %v", err)
299+
// return err
300+
// }
300301

301-
klog.Info("[rclone] operations fscacheClear done!")
302+
// klog.Info("[rclone] operations fscacheClear done!")
302303

303304
return nil
304305
}

pkg/drivers/clouds/rclone/rclone.go

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"files/pkg/files"
1111
"files/pkg/models"
1212
"fmt"
13+
"net/url"
1314
"path/filepath"
1415
"sort"
1516
"strings"
@@ -258,7 +259,7 @@ func (r *rclone) GetFilesSize(fileParam *models.FileParam) (int64, error) {
258259

259260
}
260261

261-
func (r *rclone) GetFilesList(param *models.FileParam) (*operations.OperationsList, error) {
262+
func (r *rclone) GetFilesList(param *models.FileParam, getPrefix bool) (*operations.OperationsList, error) {
262263

263264
var fsPrefix, err = r.GetFsPrefix(param)
264265
if err != nil {
@@ -271,7 +272,13 @@ func (r *rclone) GetFilesList(param *models.FileParam) (*operations.OperationsLi
271272
Metadata: false,
272273
}
273274

274-
var fs = fsPrefix + pathPrefix
275+
var fs string
276+
if getPrefix {
277+
fs = fsPrefix + pathPrefix
278+
} else {
279+
fs = fsPrefix + param.Path
280+
}
281+
275282
lists, err := r.operation.List(fs, opt)
276283
if err != nil {
277284
return nil, err
@@ -430,6 +437,7 @@ func (r *rclone) Clear(param *models.FileParam) error {
430437
var isSrcLocal bool
431438
var owner = param.Owner
432439

440+
klog.Infof("[rclone] clear, param: %s", common.ToJson(param))
433441
fileName, isFile := files.GetFileNameFromPath(param.Path)
434442
prefixPath := files.GetPrefixPath(param.Path)
435443

@@ -465,13 +473,13 @@ func (r *rclone) Clear(param *models.FileParam) error {
465473
remote = fileName
466474

467475
if err = r.GetOperation().Deletefile(fs, remote); err != nil {
468-
klog.Errorf("[rclone] clear file error: %v, user: %s, fs: %s, remote: %s", err, owner, fs, remote)
476+
klog.Errorf("[rclone] clear, delete file error: %v, user: %s, isFile: %v, fs: %s, remote: %s", err, owner, isFile, fs, remote)
469477
return err
470478
}
471479

472480
r.GetOperation().FsCacheClear()
473481

474-
klog.Infof("[rclone] clear file done! user: %s, fs: %s, remote: %s", owner, fs, remote)
482+
klog.Infof("[rclone] clear, file done! user: %s, fs: %s, remote: %s", owner, fs, remote)
475483

476484
return nil
477485
}
@@ -481,13 +489,60 @@ func (r *rclone) Clear(param *models.FileParam) error {
481489
var remote = fileName
482490

483491
if err = r.GetOperation().Purge(fs, remote); err != nil {
484-
klog.Errorf("[rclone] clear directory error: %v, user: %s, fs: %s, remote: %s", err, owner, fs, remote)
492+
klog.Errorf("[rclone] clear, purge error: %v, user: %s, fs: %s, remote: %s", err, owner, fs, remote)
485493
return err
486494
}
487495

488496
r.GetOperation().FsCacheClear()
489497

490-
klog.Infof("[rclone] clear directory done! user: %s, fs: %s, remote: %s", owner, fs, remote)
498+
klog.Infof("[rclone] clear, purge done! user: %s, fs: %s, remote: %s", owner, fs, remote)
491499

492500
return nil
493501
}
502+
503+
func (r *rclone) Delete(param *models.FileParam, dirents []string) ([]string, error) {
504+
var user = param.Owner
505+
var deleteFailedPaths []string
506+
var total = len(dirents)
507+
508+
for current, dp := range dirents {
509+
dp = strings.TrimSpace(dp) // /path/ or /file
510+
511+
dpd, err := url.PathUnescape(dp)
512+
if err != nil {
513+
klog.Errorf("[rclone] delete, path unescape error: %v, path: %s", err, dp)
514+
deleteFailedPaths = append(deleteFailedPaths, dp)
515+
continue
516+
}
517+
518+
klog.Infof("[rclone] delete, delete (%d/%d), user: %s, file: %s", current+1, total, user, dpd)
519+
520+
fsPrefix, err := r.GetFsPrefix(param)
521+
_, isFile := files.GetFileNameFromPath(dpd)
522+
523+
var fs, remote string
524+
525+
if isFile {
526+
fs = fsPrefix + param.Path
527+
remote = strings.TrimPrefix(dpd, "/")
528+
if err = r.GetOperation().Deletefile(fs, remote); err != nil {
529+
deleteFailedPaths = append(deleteFailedPaths, dp)
530+
}
531+
532+
} else {
533+
fs = fsPrefix + param.Path
534+
remote = strings.TrimPrefix(dpd, "/")
535+
536+
if err = r.GetOperation().Purge(fs, remote); err != nil {
537+
deleteFailedPaths = append(deleteFailedPaths, dp)
538+
}
539+
}
540+
}
541+
542+
if len(deleteFailedPaths) > 0 {
543+
return deleteFailedPaths, fmt.Errorf("delete failed paths")
544+
}
545+
546+
return nil, nil
547+
548+
}

0 commit comments

Comments
 (0)