Skip to content

Commit

Permalink
POC chunked uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
pierotofy committed Oct 11, 2019
1 parent 5a5249b commit 6873ca3
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 29 deletions.
4 changes: 3 additions & 1 deletion internal/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
var outputPath string
var nodeName string
var force bool
var parallelConnections int

var rootCmd = &cobra.Command{
Use: "odm [flags] <images> [<gcp>] [args]",
Expand Down Expand Up @@ -91,7 +92,7 @@ var rootCmd = &cobra.Command{
}
}

odm.Run(inputFiles, parseOptions(options, nodeOptions), *node, outputPath)
odm.Run(inputFiles, parseOptions(options, nodeOptions), *node, outputPath, parallelConnections)
},

TraverseChildren: true,
Expand All @@ -115,6 +116,7 @@ func init() {
rootCmd.Flags().BoolVarP(&force, "force", "f", false, "replace the contents of the output directory if it already exists")
rootCmd.Flags().StringVarP(&outputPath, "output", "o", "./output", "directory where to store processing results")
rootCmd.Flags().StringVarP(&nodeName, "node", "n", "default", "Processing node to use")
rootCmd.Flags().IntVarP(&parallelConnections, "parallel-connections", "p", 5, "Parallel upload connections. Set to 1 to disable parallel uploads")

rootCmd.Flags().SetInterspersed(false)
}
Expand Down
120 changes: 120 additions & 0 deletions internal/odm/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"io/ioutil"
"math"
"mime/multipart"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -76,6 +77,11 @@ type ApiActionResponse struct {
Error string `json:"error"`
}

type TaskNewResponse struct {
UUID string `json:"uuid"`
Error string `json:"error"`
}

// Node is a NodeODM processing node
type Node struct {
URL string `json:"url"`
Expand Down Expand Up @@ -291,6 +297,120 @@ func (n Node) TaskCancel(uuid string) error {
return nil
}

// TaskNewInit POST: /task/new/init
func (n Node) TaskNewInit(jsonOptions []byte) TaskNewResponse {
var err error
reqBody := &bytes.Buffer{}
mpw := multipart.NewWriter(reqBody)
mpw.WriteField("skipPostProcessing", "true")
mpw.WriteField("options", string(jsonOptions))
if err = mpw.Close(); err != nil {
return TaskNewResponse{"", err.Error()}
}

resp, err := http.Post(n.URLFor("/task/new/init"), mpw.FormDataContentType(), reqBody)
defer resp.Body.Close()
if err != nil {
return TaskNewResponse{"", err.Error()}
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return TaskNewResponse{"", err.Error()}
}

var res TaskNewResponse
if err := json.Unmarshal(body, &res); err != nil {
return TaskNewResponse{"", err.Error()}
}

return res
}

// TaskNewUpload POST: /task/new/upload/{uuid}
func (n Node) TaskNewUpload(file string, uuid string, bar *pb.ProgressBar) error {
var f *os.File
var fi os.FileInfo
var err error
r, w := io.Pipe()
mpw := multipart.NewWriter(w)

go func() {
var part io.Writer
defer w.Close()
defer f.Close()

if f, err = os.Open(file); err != nil {
return
}
if fi, err = f.Stat(); err != nil {
return
}

if bar != nil {
bar.SetTotal64(fi.Size())
bar.Set64(0)
bar.Prefix("[" + fi.Name() + "]")
}

if part, err = mpw.CreateFormFile("images", fi.Name()); err != nil {
return
}

if bar != nil {
part = io.MultiWriter(part, bar)
}

if _, err = io.Copy(part, f); err != nil {
return
}

if err = mpw.Close(); err != nil {
return
}
}()

resp, err := http.Post(n.URLFor("/task/new/upload/"+uuid), mpw.FormDataContentType(), r)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

var res ApiActionResponse
if err := json.Unmarshal(body, &res); err != nil {
return err
}

if res.Error != "" {
return errors.New(res.Error)
}

if !res.Success {
return errors.New("Cannot complete upload. /task/new/upload failed with success: false")
}

return nil
}

// TaskNewCommit POST: /task/new/commit/{uuid}
func (n Node) TaskNewCommit(uuid string) TaskNewResponse {
var res TaskNewResponse

body, err := n.apiPOST("/task/new/commit/"+uuid, map[string]string{})
if err != nil {
return TaskNewResponse{"", err.Error()}
}
if err := json.Unmarshal(body, &res); err != nil {
return TaskNewResponse{"", err.Error()}
}

return res
}

func (n Node) CheckAuthentication(err error) error {
if err != nil {
if err == ErrUnauthorized {
Expand Down
162 changes: 134 additions & 28 deletions internal/odm/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package odm

import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
Expand All @@ -34,44 +36,41 @@ import (
"github.com/cheggaaa/pb"
)

type TaskNewResponse struct {
UUID string `json:"uuid"`
Error string `json:"error"`
type fileUpload struct {
filename string
retries int
}

// Run processes a dataset
func Run(files []string, options []Option, node Node, outputPath string) {
var err error
var bar *pb.ProgressBar
type fileUploadResult struct {
filename string
err error
retries int
}

func singleUpload(node Node, files []string, jsonOptions []byte) TaskNewResponse {
var f *os.File
var fi os.FileInfo

var totalBytes int64
var err error
var bar *pb.ProgressBar
var res TaskNewResponse

showProgress := !logger.QuietFlag

// Calculate total upload size
for _, file := range files {
if fi, err = os.Stat(file); err != nil {
logger.Error(err)
if showProgress {
var totalBytes int64

// Calculate total upload size
for _, file := range files {
if fi, err = os.Stat(file); err != nil {
logger.Error(err)
}
totalBytes += fi.Size()
}
totalBytes += fi.Size()
f.Close()
}

if showProgress {
bar = pb.New64(totalBytes).SetUnits(pb.U_BYTES).SetRefreshRate(time.Millisecond * 10)
bar.Start()
}

// Convert options to JSON
jsonOptions, err := json.Marshal(options)
if err != nil {
logger.Error(err)
}

// Setup pipe
r, w := io.Pipe()
mpw := multipart.NewWriter(w)

Expand Down Expand Up @@ -122,18 +121,125 @@ func Run(files []string, options []Option, node Node, outputPath string) {
logger.Error(err)
}

res := TaskNewResponse{}
if showProgress {
bar.Finish()
}

if err := json.Unmarshal(body, &res); err != nil {
logger.Error(err)
}

// Handle error response from API
return res
}

func uploadWorker(id int, node Node, uuid string, barPool *pb.Pool, filesToProcess <-chan fileUpload, results chan<- fileUploadResult) {
var bar *pb.ProgressBar

if barPool != nil {
bar = pb.New64(0).SetUnits(pb.U_BYTES).SetRefreshRate(time.Millisecond * 10)
barPool.Add(bar)
}

for f := range filesToProcess {
results <- fileUploadResult{f.filename, node.TaskNewUpload(f.filename, uuid, bar), f.retries + 1}
}
}

func chunkedUpload(node Node, files []string, jsonOptions []byte, parallelUploads int) TaskNewResponse {
var err error
var barPool *pb.Pool
var mainBar *pb.ProgressBar
var res TaskNewResponse

showProgress := !logger.QuietFlag

// Invoke /task/new/init
res = node.TaskNewInit(jsonOptions)
if res.Error != "" {
logger.Error(res.Error)
logger.Error(err)
}

if showProgress {
bar.Finish()
barPool = pb.NewPool()
}

// Create workers
filesToProcess := make(chan fileUpload, len(files))
results := make(chan fileUploadResult, len(files))

for w := 1; w <= parallelUploads; w++ {
go uploadWorker(w, node, res.UUID, barPool, filesToProcess, results)
}

if barPool != nil {
barPool.Start()

mainBar = pb.New(len(files)).SetUnits(pb.U_NO).SetRefreshRate(time.Millisecond * 10)
mainBar.Format("[\x00#\x00\x00_\x00]")
mainBar.Prefix("Files Uploaded:")
mainBar.Start()
}

// Fill queue
for _, file := range files {
filesToProcess <- fileUpload{file, 0}
}

// Wait
MaxRetries := 10
filesLeft := len(files)
for filesLeft > 0 {
fur := <-results

if fur.err != nil {
if fur.retries < MaxRetries {
// Retry
fmt.Println("RETRY: " + fur.filename + " " + string(fur.retries))
filesToProcess <- fileUpload{fur.filename, fur.retries + 1}
} else {
logger.Error(errors.New("Cannot upload " + fur.filename + ", exceeded max retries (" + string(MaxRetries) + ")"))
}
} else {
filesLeft--
mainBar.Set(len(files) - filesLeft)
}
}
close(filesToProcess)

if barPool != nil {
barPool.Stop()
mainBar.Finish()
}

// Commit
res = node.TaskNewCommit(res.UUID)
if res.Error != "" {
logger.Error(res.Error)
}

return res
}

// Run processes a dataset
func Run(files []string, options []Option, node Node, outputPath string, parallelConnections int) {
var err error

// Convert options to JSON
jsonOptions, err := json.Marshal(options)
if err != nil {
logger.Error(err)
}

var res TaskNewResponse
if parallelConnections <= 1 {
res = singleUpload(node, files, jsonOptions)
} else {
res = chunkedUpload(node, files, jsonOptions, parallelConnections)
}

// Handle error response from API
if res.Error != "" {
logger.Error(res.Error)
}

// We should have a UUID
Expand Down

0 comments on commit 6873ca3

Please sign in to comment.