From bc870d28210733ee96c9ee88382e84c31e1c4fd4 Mon Sep 17 00:00:00 2001 From: Travis Harrison Date: Fri, 7 Apr 2017 12:30:37 -0500 Subject: [PATCH 1/8] update preauth, add ability do download multiple node files in archive format --- shock-server/controller/node/multi.go | 56 +++++++- shock-server/controller/node/single.go | 18 ++- shock-server/controller/preauth/preauth.go | 142 ++++++++++++++------- shock-server/node/archive/archive.go | 52 ++++++++ shock-server/node/file/file.go | 9 ++ shock-server/node/update.go | 8 +- shock-server/node/util.go | 18 +-- shock-server/preauth/preauth.go | 25 ++-- shock-server/request/streamer.go | 50 +++++++- shock-server/util/util.go | 5 - 10 files changed, 297 insertions(+), 86 deletions(-) diff --git a/shock-server/controller/node/multi.go b/shock-server/controller/node/multi.go index d3c612a7..46ff57c4 100644 --- a/shock-server/controller/node/multi.go +++ b/shock-server/controller/node/multi.go @@ -6,6 +6,8 @@ import ( e "github.com/MG-RAST/Shock/shock-server/errors" "github.com/MG-RAST/Shock/shock-server/logger" "github.com/MG-RAST/Shock/shock-server/node" + "github.com/MG-RAST/Shock/shock-server/node/archive" + "github.com/MG-RAST/Shock/shock-server/preauth" "github.com/MG-RAST/Shock/shock-server/request" "github.com/MG-RAST/Shock/shock-server/responder" "github.com/MG-RAST/Shock/shock-server/user" @@ -72,7 +74,7 @@ func (cr *NodeController) ReadMany(ctx context.Context) error { // Gather params to make db query. Do not include the following list. if _, ok := query["query"]; ok { - paramlist := map[string]int{"query": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "distinct": 1} + paramlist := map[string]int{"query": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "distinct": 1, "download_url": 1, "file_name": 1, "archive": 1} for key := range query { if _, found := paramlist[key]; !found { keyStr := fmt.Sprintf("attributes.%s", key) @@ -86,7 +88,7 @@ func (cr *NodeController) ReadMany(ctx context.Context) error { } } } else if _, ok := query["querynode"]; ok { - paramlist := map[string]int{"querynode": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "distinct": 1, "owner": 1, "read": 1, "write": 1, "delete": 1, "public_owner": 1, "public_read": 1, "public_write": 1, "public_delete": 1} + paramlist := map[string]int{"querynode": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "distinct": 1, "download_url": 1, "file_name": 1, "archive": 1, "owner": 1, "read": 1, "write": 1, "delete": 1, "public_owner": 1, "public_read": 1, "public_write": 1, "public_delete": 1} for key := range query { if _, found := paramlist[key]; !found { for _, value := range query[key] { @@ -197,6 +199,56 @@ func (cr *NodeController) ReadMany(ctx context.Context) error { logger.Error(err_msg) return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg) } + + // process preauth url request, requires archive option + if _, ok := query["download_url"]; ok { + // add options - set defaults first + options := map[string]string{} + options["archive"] = "zip" // default is zip + if _, ok := query["archive"]; ok { + if archive.IsValidToArchive(query.Get("archive")) { + options["archive"] = query.Get("archive") + } + } + preauthId := util.RandString(20) + if _, ok := query["file_name"]; ok { + options["filename"] = query.Get("file_name") + } else { + options["filename"] = preauthId + } + if !strings.HasSuffix(options["filename"], options["archive"]) { + options["filename"] = options["filename"] + "." + options["archive"] + } + // get valid nodes + var nodeIds []string + var totalBytes int64 + for _, n := range nodes { + if n.HasFile() { + nodeIds = append(nodeIds, n.Id) + totalBytes += n.File.Size + } + } + if len(nodeIds) == 0 { + return responder.RespondWithError(ctx, http.StatusBadRequest, "err:@node_ReadMany download url: no available files found") + } + // set preauth + if p, err := preauth.New(preauthId, "download", nodeIds, options); err != nil { + err_msg := "err:@node_ReadMany download_url: " + err.Error() + logger.Error(err_msg) + return responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) + } else { + data := preauth.PreAuthResponse{ + Url: util.ApiUrl(ctx) + "/preauth/" + p.Id, + ValidTill: p.ValidTill.Format(time.ANSIC), + Format: options["archive"], + Filename: options["filename"], + Files: len(nodeIds), + Size: totalBytes, + } + return responder.RespondWithPaginatedData(ctx, data, limit, offset, count) + } + } + return responder.RespondWithPaginatedData(ctx, nodes, limit, offset, count) } diff --git a/shock-server/controller/node/single.go b/shock-server/controller/node/single.go index 303f1fe3..c16909d3 100644 --- a/shock-server/controller/node/single.go +++ b/shock-server/controller/node/single.go @@ -77,8 +77,8 @@ func (cr *NodeController) Read(id string, ctx context.Context) error { var fFunc filter.FilterFunc = nil var compressionFormat string = "" // use query params if exist - if _, ok := query["filename"]; ok { - filename = query.Get("filename") + if _, ok := query["file_name"]; ok { + filename = query.Get("file_name") } if _, ok := query["filter"]; ok { if filter.Has(query.Get("filter")) { @@ -378,6 +378,7 @@ func (cr *NodeController) Read(id string, ctx context.Context) error { if !n.HasFile() { return responder.RespondWithError(ctx, http.StatusBadRequest, e.NodeNoFile) } else { + preauthFilename := filename // add options options := map[string]string{} options["filename"] = filename @@ -386,14 +387,23 @@ func (cr *NodeController) Read(id string, ctx context.Context) error { } if compressionFormat != "" { options["compression"] = compressionFormat + preauthFilename = preauthFilename + "." + compressionFormat } // set preauth - if p, err := preauth.New(util.RandString(20), "download", n.Id, options); err != nil { + if p, err := preauth.New(util.RandString(20), "download", []string{n.Id}, options); err != nil { err_msg := "err:@node_Read download_url: " + err.Error() logger.Error(err_msg) return responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) } else { - return responder.RespondWithData(ctx, util.UrlResponse{Url: util.ApiUrl(ctx) + "/preauth/" + p.Id, ValidTill: p.ValidTill.Format(time.ANSIC)}) + data := preauth.PreAuthResponse{ + Url: util.ApiUrl(ctx) + "/preauth/" + p.Id, + ValidTill: p.ValidTill.Format(time.ANSIC), + Format: options["compression"], + Filename: preauthFilename, + Files: 1, + Size: n.File.Size, + } + return responder.RespondWithData(ctx, data) } } } else if _, ok := query["download_post"]; ok { diff --git a/shock-server/controller/preauth/preauth.go b/shock-server/controller/preauth/preauth.go index e998d86b..a6f3dc71 100644 --- a/shock-server/controller/preauth/preauth.go +++ b/shock-server/controller/preauth/preauth.go @@ -4,7 +4,6 @@ package preauth import ( "github.com/MG-RAST/Shock/shock-server/logger" "github.com/MG-RAST/Shock/shock-server/node" - "github.com/MG-RAST/Shock/shock-server/node/archive" "github.com/MG-RAST/Shock/shock-server/node/file" "github.com/MG-RAST/Shock/shock-server/node/file/index" "github.com/MG-RAST/Shock/shock-server/node/filter" @@ -24,37 +23,25 @@ func PreAuthRequest(ctx context.Context) { logger.Error(err_msg) responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) } else { - if n, err := node.Load(p.NodeId); err == nil { - switch p.Type { - case "download": - streamDownload(ctx, n, p.Options) - preauth.Delete(id) - default: - responder.RespondWithError(ctx, http.StatusNotFound, "Preauthorization type not supported: "+p.Type) - } - } else { - err_msg := "err:@preAuth loadnode: " + err.Error() - logger.Error(err_msg) - responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) + switch p.Type { + case "download": + streamDownload(ctx, id, p.Nodes, p.Options) + preauth.Delete(id) + default: + responder.RespondWithError(ctx, http.StatusNotFound, "Preauthorization type not supported: "+p.Type) } } return } // handle download and its options -func streamDownload(ctx context.Context, n *node.Node, options map[string]string) { - nf, err := n.FileReader() - defer nf.Close() - if err != nil { - err_msg := "err:@preAuth node.FileReader: " + err.Error() - logger.Error(err_msg) - responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) - return - } - // set defaults - filename := n.Id +func streamDownload(ctx context.Context, pid string, nodes []string, options map[string]string) { + // get defaults + filename := pid var filterFunc filter.FilterFunc = nil var compressionFormat string = "" + var archiveFormat string = "" + // use options if exist if fn, has := options["filename"]; has { filename = fn @@ -65,35 +52,92 @@ func streamDownload(ctx context.Context, n *node.Node, options map[string]string } } if cp, has := options["compression"]; has { - if archive.IsValidCompress(cp) { - compressionFormat = cp - } + compressionFormat = cp } - // stream it - var s *request.Streamer - if n.Type == "subset" { - s = &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: filterFunc, Compression: compressionFormat} - if n.File.Size == 0 { - // handle empty subset file - s.R = append(s.R, nf) - } else { - idx := index.New() - fullRange := "1-" + strconv.FormatInt(n.Subset.Index.TotalUnits, 10) - recSlice, err := idx.Range(fullRange, n.Path()+"/"+n.Id+".subset.idx", n.Subset.Index.TotalUnits) - if err != nil { - responder.RespondWithError(ctx, http.StatusInternalServerError, err.Error()) - return - } - for _, rec := range recSlice { - s.R = append(s.R, io.NewSectionReader(nf, rec[0], rec[1])) + if ar, has := options["archive"]; has { + archiveFormat = ar + } + var files []file.FileInfo + + // process nodes + for _, nid := range nodes { + // get node + n, err := node.Load(nid) + if (err != nil) || !n.HasFile() { + continue + } + // get filereader + nf, err := n.FileReader() + if err != nil { + nf.Close() + continue + } + // add to file array + var fileInfo file.FileInfo + if n.Type == "subset" { + if n.File.Size == 0 { + // handle empty subset file + fileInfo.R = append(fileInfo.R, nf) + } else { + idx := index.New() + fullRange := "1-" + strconv.FormatInt(n.Subset.Index.TotalUnits, 10) + recSlice, err := idx.Range(fullRange, n.Path()+"/"+n.Id+".subset.idx", n.Subset.Index.TotalUnits) + if err != nil { + nf.Close() + continue + } + for _, rec := range recSlice { + fileInfo.R = append(fileInfo.R, io.NewSectionReader(nf, rec[0], rec[1])) + } } + } else { + fileInfo.R = append(fileInfo.R, nf) } - } else { - s = &request.Streamer{R: []file.SectionReader{nf}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: filterFunc, Compression: compressionFormat} + defer nf.Close() + // add to file info + fileInfo.Name = n.File.Name + fileInfo.Size = n.File.Size + if _, ok := n.File.Checksum["md5"]; ok { + fileInfo.Checksum = n.File.Checksum["md5"] + } + files = append(files, fileInfo) } - if err = s.Stream(false); err != nil { - // causes "multiple response.WriteHeader calls" error but better than no response - err_msg := "err:@preAuth: s.stream: " + err.Error() + + if (len(nodes) == 1) && (len(files) == 1) { + // create single node / file streamer + s := &request.Streamer{ + R: files[0].R, + W: ctx.HttpResponseWriter(), + ContentType: "application/octet-stream", + Filename: filename, + Size: files[0].Size, + Filter: filterFunc, + Compression: compressionFormat, + } + if err := s.Stream(false); err != nil { + // causes "multiple response.WriteHeader calls" error but better than no response + err_msg := "err:@preAuth: s.stream: " + err.Error() + logger.Error(err_msg) + responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) + } + } else if (len(files) > 1) && (archiveFormat != "") { + // create multi node / file streamer, must have archive format + m := &request.MultiStreamer{ + Files: files, + W: ctx.HttpResponseWriter(), + ContentType: "application/octet-stream", + Filename: filename, + Archive: archiveFormat, + } + if err := m.MultiStream(); err != nil { + // causes "multiple response.WriteHeader calls" error but better than no response + err_msg := "err:@preAuth: m.multistream: " + err.Error() + logger.Error(err_msg) + responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) + } + } else { + // something broke + err_msg := "err:@preAuth: no files available to download for given combination of options" logger.Error(err_msg) responder.RespondWithError(ctx, http.StatusBadRequest, err_msg) } diff --git a/shock-server/node/archive/archive.go b/shock-server/node/archive/archive.go index 526ba6d1..9c979031 100644 --- a/shock-server/node/archive/archive.go +++ b/shock-server/node/archive/archive.go @@ -3,6 +3,7 @@ package archive import ( "archive/tar" "archive/zip" + "bytes" "compress/bzip2" "compress/gzip" "crypto/md5" @@ -10,6 +11,7 @@ import ( "fmt" "github.com/MG-RAST/Shock/shock-server/conf" "github.com/MG-RAST/Shock/shock-server/logger" + "github.com/MG-RAST/Shock/shock-server/node/file" "io" "math/rand" "os" @@ -19,6 +21,7 @@ import ( var validUncompress = []string{"gzip", "bzip2"} var validCompress = []string{"gzip", "zip"} +var validToArchive = []string{"zip", "tar"} var validArchive = []string{"zip", "tar", "tar.gz", "tar.bz2"} var ArchiveList = strings.Join(validArchive, ", ") @@ -28,6 +31,15 @@ type FormFile struct { Checksum map[string]string } +func IsValidToArchive(a string) bool { + for _, b := range validToArchive { + if b == a { + return true + } + } + return false +} + func IsValidArchive(a string) bool { for _, b := range validArchive { if b == a { @@ -196,6 +208,46 @@ func unZip(filePath string, unpackDir string) (fileList []FormFile, err error) { return } +func ArchiveReader(format string, files []file.FileInfo) (outReader io.ReadCloser) { + pReader, pWriter := io.Pipe() + if format == "tar" { + tWriter := tar.NewWriter(pWriter) + go func() { + for _, f := range files { + fHdr := &tar.Header{Name: f.Name, Mode: 0660, Size: f.Size} + tWriter.WriteHeader(fHdr) + io.Copy(tWriter, f.Body) + cHdr := &tar.Header{Name: f.Name + ".md5", Mode: 0660, Size: int64(len(f.Checksum))} + tWriter.WriteHeader(cHdr) + io.Copy(tWriter, bytes.NewBufferString(f.Checksum)) + } + tWriter.Close() + pWriter.Close() + }() + } else if format == "zip" { + zWriter := zip.NewWriter(pWriter) + go func() { + for _, f := range files { + zFile, _ := zWriter.Create(f.Name) + io.Copy(zFile, f.Body) + zSum, _ := zWriter.Create(f.Name + ".md5") + io.Copy(zSum, bytes.NewBufferString(f.Checksum)) + } + zWriter.Close() + pWriter.Close() + }() + } else { + // no valid archive, pipe each inReader into one stream + go func() { + for _, f := range files { + io.Copy(pWriter, f.Body) + } + pWriter.Close() + }() + } + return pReader +} + func CompressReader(format string, filename string, inReader io.ReadCloser) (outReader io.ReadCloser) { if IsValidCompress(format) { pReader, pWriter := io.Pipe() diff --git a/shock-server/node/file/file.go b/shock-server/node/file/file.go index cb8ebef1..276b7700 100644 --- a/shock-server/node/file/file.go +++ b/shock-server/node/file/file.go @@ -19,6 +19,15 @@ type File struct { CreatedOn time.Time `bson:"created_on" json:"created_on"` } +// FileInfo for streaming file content +type FileInfo struct { + R []SectionReader + Body io.ReadCloser + Name string + Size int64 + Checksum string +} + // SectionReader interface required for MultiReaderAt type SectionReader interface { io.Reader diff --git a/shock-server/node/update.go b/shock-server/node/update.go index 0a8ec869..212c6241 100644 --- a/shock-server/node/update.go +++ b/shock-server/node/update.go @@ -477,7 +477,7 @@ func (node *Node) Save() (err error) { n := Node{node.Id, node.Version, node.File, node.Attributes, node.Indexes, node.Acl, node.VersionParts, node.Tags, nil, node.Linkages, node.Priority, node.CreatedOn, node.LastModified, node.Expiration, node.Type, node.Subset, node.Parts} newRevisions := []Node{n} if len(node.Revisions) > 0 { - newRevisions = append(newRevisions, node.Revisions...) // prepend, latest revisions in front + newRevisions = append(newRevisions, node.Revisions...) // prepend, latest revisions in front } // adjust revisions based on config // <0 keep all ; >0 keep max @@ -519,14 +519,14 @@ func (node *Node) UpdateVersion() (err error) { version := node.Id versionParts := make(map[string]string) partMap := map[string]interface{}{"file_ver": node.File, "indexes_ver": node.Indexes, "attributes_ver": node.Attributes, "acl_ver": node.Acl} - + // need to keep map ordered partKeys := []string{} for k, _ := range partMap { - partKeys = append(partKeys, k) + partKeys = append(partKeys, k) } sort.Strings(partKeys) - + for _, k := range partKeys { j, er := json.Marshal(partMap[k]) if er != nil { diff --git a/shock-server/node/util.go b/shock-server/node/util.go index 3a4b83ff..d13481eb 100644 --- a/shock-server/node/util.go +++ b/shock-server/node/util.go @@ -1,8 +1,8 @@ package node import ( - "sort" - "sync" + "sort" + "sync" ) type mappy map[string]bool @@ -69,20 +69,20 @@ func (l *Locker) GetNodes() (ids []string) { type sortBytes []byte func (b sortBytes) Less(i, j int) bool { - return b[i] < b[j] + return b[i] < b[j] } func (b sortBytes) Swap(i, j int) { - b[i], b[j] = b[j], b[i] + b[i], b[j] = b[j], b[i] } func (b sortBytes) Len() int { - return len(b) + return len(b) } func SortByteArray(b []byte) []byte { - sb := make([]byte, len(b)) - copy(sb, b) - sort.Sort(sortBytes(sb)) - return sb + sb := make([]byte, len(b)) + copy(sb, b) + sort.Sort(sortBytes(sb)) + return sb } diff --git a/shock-server/preauth/preauth.go b/shock-server/preauth/preauth.go index fa60ac54..939a7165 100644 --- a/shock-server/preauth/preauth.go +++ b/shock-server/preauth/preauth.go @@ -11,12 +11,21 @@ import ( // Database collection handle var DB *mgo.Collection +type PreAuthResponse struct { + Url string + ValidTill string + Format string + Filename string + Files int + Size int64 +} + type PreAuth struct { - Id string - Type string - NodeId string - Options map[string]string - ValidTill time.Time + Id string `bson:"id" json:"id"` + Type string `bson:"type" json:"type"` + Nodes []string `bson:"nodes" json:"nodes"` + Options map[string]string `bson:"options" json:"options"` + ValidTill time.Time `bson:"validtill" json:"validtill"` } // Initialize is an explicit init. Requires db.Initialize @@ -26,9 +35,9 @@ func Initialize() { DB.EnsureIndex(mgo.Index{Key: []string{"id"}, Unique: true}) } -// New preauth takes the id, type, node id, and a map of options -func New(id, t, nid string, options map[string]string) (p *PreAuth, err error) { - p = &PreAuth{Id: id, Type: t, NodeId: nid, Options: options, ValidTill: time.Now().AddDate(0, 0, 1)} +// New preauth takes the id, type, node ids, and a map of options +func New(id string, t string, nids []string, options map[string]string) (p *PreAuth, err error) { + p = &PreAuth{Id: id, Type: t, Nodes: nids, Options: options, ValidTill: time.Now().AddDate(0, 1, 0)} if _, err = DB.Upsert(bson.M{"id": p.Id}, &p); err != nil { return nil, err } diff --git a/shock-server/request/streamer.go b/shock-server/request/streamer.go index 8fd11f4f..f97620c2 100644 --- a/shock-server/request/streamer.go +++ b/shock-server/request/streamer.go @@ -13,6 +13,16 @@ import ( "os/exec" ) +// MultiStreamer if for taking multiple files and creating one stream through an archive format: zip, tar, etc. + +type MultiStreamer struct { + Files []file.FileInfo + W http.ResponseWriter + ContentType string + Filename string + Archive string +} + type Streamer struct { R []file.SectionReader W http.ResponseWriter @@ -27,7 +37,7 @@ func (s *Streamer) Stream(streamRaw bool) (err error) { // file download if !streamRaw { fileName := fmt.Sprintf(" attachment; filename=%s", s.Filename) - // add extension for compression + // add extension for compression or archive if s.Compression != "" { fileName = fmt.Sprintf(" attachment; filename=%s.%s", s.Filename, s.Compression) } @@ -39,7 +49,7 @@ func (s *Streamer) Stream(streamRaw bool) (err error) { s.W.Header().Set("Access-Control-Allow-Headers", "Authorization") s.W.Header().Set("Access-Control-Allow-Methods", "POST, GET, PUT, DELETE, OPTIONS") s.W.Header().Set("Access-Control-Allow-Origin", "*") - if s.Size > 0 && s.Filter == nil && !archive.IsValidCompress(s.Compression) { + if (s.Size > 0) && (s.Filter == nil) && (s.Compression == "") { s.W.Header().Set("Content-Length", fmt.Sprint(s.Size)) } @@ -54,9 +64,7 @@ func (s *Streamer) Stream(streamRaw bool) (err error) { } else { rs = sr } - if _, err = io.Copy(pWriter, rs); err != nil { - break - } + io.Copy(pWriter, rs) } pWriter.Close() }() @@ -69,6 +77,38 @@ func (s *Streamer) Stream(streamRaw bool) (err error) { return } +func (m *MultiStreamer) MultiStream() (err error) { + // set headers + fileName := fmt.Sprintf(" attachment; filename=%s", m.Filename) + m.W.Header().Set("Content-Type", m.ContentType) + m.W.Header().Set("Connection", "close") + m.W.Header().Set("Access-Control-Allow-Headers", "Authorization") + m.W.Header().Set("Access-Control-Allow-Methods", "POST, GET, PUT, DELETE, OPTIONS") + m.W.Header().Set("Access-Control-Allow-Origin", "*") + m.W.Header().Set("Content-Disposition", fileName) + + // pipe each SectionReader into one stream + for _, f := range m.Files { + pReader, pWriter := io.Pipe() + go func() { + for _, sr := range f.R { + io.Copy(pWriter, sr) + } + pWriter.Close() + }() + f.Body = pReader + } + + // pass pipes through archiver to ResponseWriter + aReader := archive.ArchiveReader(m.Archive, m.Files) + _, err = io.Copy(m.W, aReader) + aReader.Close() + for _, f := range m.Files { + f.Body.Close() + } + return +} + func (s *Streamer) StreamSamtools(filePath string, region string, args ...string) (err error) { //involking samtools in command line: //samtools view [-c] [-H] [-f INT] ... filname.bam [region] diff --git a/shock-server/util/util.go b/shock-server/util/util.go index c2aeca5b..cd07e1a0 100644 --- a/shock-server/util/util.go +++ b/shock-server/util/util.go @@ -19,11 +19,6 @@ var validParams = []string{"action", "all", "archive_format", "attributes_str", var validFiles = []string{"attributes", "subset_indices", "upload", "gzip", "bzip2"} var ValidUpload = []string{"upload", "gzip", "bzip2"} -type UrlResponse struct { - Url string `json:"url"` - ValidTill string `json:"validtill"` -} - type Query struct { list map[string][]string } From 8a7e128632153305382c3d294e91bdd9fc081ea0 Mon Sep 17 00:00:00 2001 From: Travis Harrison Date: Mon, 10 Apr 2017 13:41:40 -0500 Subject: [PATCH 2/8] remove unused function --- shock-server/node/node.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/shock-server/node/node.go b/shock-server/node/node.go index 90450f0a..7401a76a 100644 --- a/shock-server/node/node.go +++ b/shock-server/node/node.go @@ -104,22 +104,6 @@ func New() (node *Node) { return } -func LoadFromDisk(id string) (n *Node, err error) { - if len(id) < 6 { - return nil, errors.New("Node ID must be at least 6 characters in length") - } - path := getPath(id) - if nbson, err := ioutil.ReadFile(path + "/" + id + ".bson"); err != nil { - return nil, errors.New(e.NodeDoesNotExist) - } else { - n = new(Node) - if err = bson.Unmarshal(nbson, &n); err != nil { - return nil, err - } - } - return -} - func CreateNodeUpload(u *user.User, params map[string]string, files FormFiles) (node *Node, err error) { // if copying node or creating subset node from parent, check if user has rights to the original node if _, hasCopyData := params["copy_data"]; hasCopyData { From c76d2fd69267d282b8f8db42622d29e9d36bbc78 Mon Sep 17 00:00:00 2001 From: Travis Harrison Date: Mon, 10 Apr 2017 14:36:06 -0500 Subject: [PATCH 3/8] gracefully handle missing .bson file in node dir --- shock-server/node/update.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/shock-server/node/update.go b/shock-server/node/update.go index 212c6241..11c328cf 100644 --- a/shock-server/node/update.go +++ b/shock-server/node/update.go @@ -495,21 +495,25 @@ func (node *Node) Save() (err error) { // get bson, test size and print nbson, err := bson.Marshal(node) if err != nil { - return + return err } if len(nbson) >= DocumentMaxByte { return errors.New(fmt.Sprintf("bson document size is greater than limit of %d bytes", DocumentMaxByte)) } bsonPath := fmt.Sprintf("%s/%s.bson", node.Path(), node.Id) os.Remove(bsonPath) - err = ioutil.WriteFile(bsonPath, nbson, 0644) - if err != nil { - return + if err := ioutil.WriteFile(bsonPath, nbson, 0644); err != nil { + // dir path may be missing, recreate and try again + if err := node.Mkdir(); err != nil { + return err + } + if err := ioutil.WriteFile(bsonPath, nbson, 0644); err != nil { + return err + } } // save node to mongodb - err = dbUpsert(node) - if err != nil { - return + if err := dbUpsert(node); err != nil { + return err } return } From 1bff06c47d46fd04d5952b2ef484090f45e7c2e2 Mon Sep 17 00:00:00 2001 From: Travis Harrison Date: Tue, 11 Apr 2017 13:59:22 -0500 Subject: [PATCH 4/8] go fmt --- shock-server/node/update.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/shock-server/node/update.go b/shock-server/node/update.go index 11c328cf..b6da4ddc 100644 --- a/shock-server/node/update.go +++ b/shock-server/node/update.go @@ -503,13 +503,13 @@ func (node *Node) Save() (err error) { bsonPath := fmt.Sprintf("%s/%s.bson", node.Path(), node.Id) os.Remove(bsonPath) if err := ioutil.WriteFile(bsonPath, nbson, 0644); err != nil { - // dir path may be missing, recreate and try again - if err := node.Mkdir(); err != nil { - return err - } - if err := ioutil.WriteFile(bsonPath, nbson, 0644); err != nil { - return err - } + // dir path may be missing, recreate and try again + if err := node.Mkdir(); err != nil { + return err + } + if err := ioutil.WriteFile(bsonPath, nbson, 0644); err != nil { + return err + } } // save node to mongodb if err := dbUpsert(node); err != nil { From 51f18545c362b667c0a8f3a520330489be275ec3 Mon Sep 17 00:00:00 2001 From: Travis Harrison Date: Tue, 11 Apr 2017 14:04:12 -0500 Subject: [PATCH 5/8] version and release notes --- RELEASE_NOTES.txt | 6 ++++++ VERSION | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/RELEASE_NOTES.txt b/RELEASE_NOTES.txt index 2c78295a..f945cdbf 100644 --- a/RELEASE_NOTES.txt +++ b/RELEASE_NOTES.txt @@ -1,3 +1,9 @@ +# v0.9.21 + +- graceful error handling of missing .bson file +- add more to preauth return: file size, options used +- added ability to download multiple files (.tar or .zip format) from a query + # v0.9.20 - add priority field to node along with index and set option diff --git a/VERSION b/VERSION index 6b7a3e33..fc6d9f6b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.20 \ No newline at end of file +0.9.21 \ No newline at end of file From 1cec1d1e784d6155647ebe992db8dde14d0db6de Mon Sep 17 00:00:00 2001 From: Travis Harrison Date: Wed, 12 Apr 2017 15:48:36 -0500 Subject: [PATCH 6/8] fix mutlifile reader and archive --- shock-server/controller/preauth/preauth.go | 5 +++-- shock-server/node/archive/archive.go | 14 +++++++++----- shock-server/node/file/file.go | 1 + shock-server/preauth/preauth.go | 12 ++++++------ shock-server/request/streamer.go | 10 +++++----- 5 files changed, 24 insertions(+), 18 deletions(-) diff --git a/shock-server/controller/preauth/preauth.go b/shock-server/controller/preauth/preauth.go index a6f3dc71..a0e99add 100644 --- a/shock-server/controller/preauth/preauth.go +++ b/shock-server/controller/preauth/preauth.go @@ -57,7 +57,7 @@ func streamDownload(ctx context.Context, pid string, nodes []string, options map if ar, has := options["archive"]; has { archiveFormat = ar } - var files []file.FileInfo + var files []*file.FileInfo // process nodes for _, nid := range nodes { @@ -97,10 +97,11 @@ func streamDownload(ctx context.Context, pid string, nodes []string, options map // add to file info fileInfo.Name = n.File.Name fileInfo.Size = n.File.Size + fileInfo.ModTime = n.File.CreatedOn if _, ok := n.File.Checksum["md5"]; ok { fileInfo.Checksum = n.File.Checksum["md5"] } - files = append(files, fileInfo) + files = append(files, &fileInfo) } if (len(nodes) == 1) && (len(files) == 1) { diff --git a/shock-server/node/archive/archive.go b/shock-server/node/archive/archive.go index 9c979031..76e1e852 100644 --- a/shock-server/node/archive/archive.go +++ b/shock-server/node/archive/archive.go @@ -208,16 +208,16 @@ func unZip(filePath string, unpackDir string) (fileList []FormFile, err error) { return } -func ArchiveReader(format string, files []file.FileInfo) (outReader io.ReadCloser) { +func ArchiveReader(format string, files []*file.FileInfo) (outReader io.ReadCloser) { pReader, pWriter := io.Pipe() if format == "tar" { tWriter := tar.NewWriter(pWriter) go func() { for _, f := range files { - fHdr := &tar.Header{Name: f.Name, Mode: 0660, Size: f.Size} + fHdr := &tar.Header{Name: f.Name, Mode: 0660, ModTime: f.ModTime, Size: f.Size} tWriter.WriteHeader(fHdr) io.Copy(tWriter, f.Body) - cHdr := &tar.Header{Name: f.Name + ".md5", Mode: 0660, Size: int64(len(f.Checksum))} + cHdr := &tar.Header{Name: f.Name + ".md5", Mode: 0660, ModTime: f.ModTime, Size: int64(len(f.Checksum))} tWriter.WriteHeader(cHdr) io.Copy(tWriter, bytes.NewBufferString(f.Checksum)) } @@ -228,9 +228,13 @@ func ArchiveReader(format string, files []file.FileInfo) (outReader io.ReadClose zWriter := zip.NewWriter(pWriter) go func() { for _, f := range files { - zFile, _ := zWriter.Create(f.Name) + zHdr := &zip.FileHeader{Name: f.Name, UncompressedSize64: uint64(f.Size)} + zHdr.SetModTime(f.ModTime) + zFile, _ := zWriter.CreateHeader(zHdr) io.Copy(zFile, f.Body) - zSum, _ := zWriter.Create(f.Name + ".md5") + cHdr := &zip.FileHeader{Name: f.Name + ".md5", UncompressedSize64: uint64(len(f.Checksum))} + cHdr.SetModTime(f.ModTime) + zSum, _ := zWriter.CreateHeader(cHdr) io.Copy(zSum, bytes.NewBufferString(f.Checksum)) } zWriter.Close() diff --git a/shock-server/node/file/file.go b/shock-server/node/file/file.go index 276b7700..284da835 100644 --- a/shock-server/node/file/file.go +++ b/shock-server/node/file/file.go @@ -25,6 +25,7 @@ type FileInfo struct { Body io.ReadCloser Name string Size int64 + ModTime time.Time Checksum string } diff --git a/shock-server/preauth/preauth.go b/shock-server/preauth/preauth.go index 939a7165..bf17eea3 100644 --- a/shock-server/preauth/preauth.go +++ b/shock-server/preauth/preauth.go @@ -12,12 +12,12 @@ import ( var DB *mgo.Collection type PreAuthResponse struct { - Url string - ValidTill string - Format string - Filename string - Files int - Size int64 + Url string `json:"url"` + ValidTill string `json:"validtill"` + Format string `json:"format"` + Filename string `json:"filename"` + Files int `json:"files"` + Size int64 `json:"size"` } type PreAuth struct { diff --git a/shock-server/request/streamer.go b/shock-server/request/streamer.go index f97620c2..bfa45236 100644 --- a/shock-server/request/streamer.go +++ b/shock-server/request/streamer.go @@ -16,7 +16,7 @@ import ( // MultiStreamer if for taking multiple files and creating one stream through an archive format: zip, tar, etc. type MultiStreamer struct { - Files []file.FileInfo + Files []*file.FileInfo W http.ResponseWriter ContentType string Filename string @@ -90,13 +90,13 @@ func (m *MultiStreamer) MultiStream() (err error) { // pipe each SectionReader into one stream for _, f := range m.Files { pReader, pWriter := io.Pipe() - go func() { - for _, sr := range f.R { + f.Body = pReader + go func(lf *file.FileInfo) { + for _, sr := range lf.R { io.Copy(pWriter, sr) } pWriter.Close() - }() - f.Body = pReader + }(f) } // pass pipes through archiver to ResponseWriter From 06a678ebec0c88cfffd67c714d263f9f7be3a5a5 Mon Sep 17 00:00:00 2001 From: Travis Harrison Date: Wed, 12 Apr 2017 16:11:57 -0500 Subject: [PATCH 7/8] fix checksum --- shock-server/node/archive/archive.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/shock-server/node/archive/archive.go b/shock-server/node/archive/archive.go index 76e1e852..c6f65771 100644 --- a/shock-server/node/archive/archive.go +++ b/shock-server/node/archive/archive.go @@ -217,9 +217,11 @@ func ArchiveReader(format string, files []*file.FileInfo) (outReader io.ReadClos fHdr := &tar.Header{Name: f.Name, Mode: 0660, ModTime: f.ModTime, Size: f.Size} tWriter.WriteHeader(fHdr) io.Copy(tWriter, f.Body) - cHdr := &tar.Header{Name: f.Name + ".md5", Mode: 0660, ModTime: f.ModTime, Size: int64(len(f.Checksum))} - tWriter.WriteHeader(cHdr) - io.Copy(tWriter, bytes.NewBufferString(f.Checksum)) + if f.Checksum != "" { + cHdr := &tar.Header{Name: f.Name + ".md5", Mode: 0660, ModTime: f.ModTime, Size: int64(len(f.Checksum))} + tWriter.WriteHeader(cHdr) + io.Copy(tWriter, bytes.NewBufferString(f.Checksum)) + } } tWriter.Close() pWriter.Close() @@ -232,10 +234,12 @@ func ArchiveReader(format string, files []*file.FileInfo) (outReader io.ReadClos zHdr.SetModTime(f.ModTime) zFile, _ := zWriter.CreateHeader(zHdr) io.Copy(zFile, f.Body) - cHdr := &zip.FileHeader{Name: f.Name + ".md5", UncompressedSize64: uint64(len(f.Checksum))} - cHdr.SetModTime(f.ModTime) - zSum, _ := zWriter.CreateHeader(cHdr) - io.Copy(zSum, bytes.NewBufferString(f.Checksum)) + if f.Checksum != "" { + cHdr := &zip.FileHeader{Name: f.Name + ".md5", UncompressedSize64: uint64(len(f.Checksum))} + cHdr.SetModTime(f.ModTime) + zSum, _ := zWriter.CreateHeader(cHdr) + io.Copy(zSum, bytes.NewBufferString(f.Checksum)) + } } zWriter.Close() pWriter.Close() From c65e0772965fd337e7df44a8d6d17b2e490e997e Mon Sep 17 00:00:00 2001 From: Travis Harrison Date: Thu, 13 Apr 2017 08:21:17 -0500 Subject: [PATCH 8/8] updat gzip / zip header info --- shock-server/node/archive/archive.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/shock-server/node/archive/archive.go b/shock-server/node/archive/archive.go index c6f65771..53e26525 100644 --- a/shock-server/node/archive/archive.go +++ b/shock-server/node/archive/archive.go @@ -17,6 +17,7 @@ import ( "os" "path/filepath" "strings" + "time" ) var validUncompress = []string{"gzip", "bzip2"} @@ -218,10 +219,10 @@ func ArchiveReader(format string, files []*file.FileInfo) (outReader io.ReadClos tWriter.WriteHeader(fHdr) io.Copy(tWriter, f.Body) if f.Checksum != "" { - cHdr := &tar.Header{Name: f.Name + ".md5", Mode: 0660, ModTime: f.ModTime, Size: int64(len(f.Checksum))} - tWriter.WriteHeader(cHdr) - io.Copy(tWriter, bytes.NewBufferString(f.Checksum)) - } + cHdr := &tar.Header{Name: f.Name + ".md5", Mode: 0660, ModTime: f.ModTime, Size: int64(len(f.Checksum))} + tWriter.WriteHeader(cHdr) + io.Copy(tWriter, bytes.NewBufferString(f.Checksum)) + } } tWriter.Close() pWriter.Close() @@ -235,11 +236,11 @@ func ArchiveReader(format string, files []*file.FileInfo) (outReader io.ReadClos zFile, _ := zWriter.CreateHeader(zHdr) io.Copy(zFile, f.Body) if f.Checksum != "" { - cHdr := &zip.FileHeader{Name: f.Name + ".md5", UncompressedSize64: uint64(len(f.Checksum))} - cHdr.SetModTime(f.ModTime) - zSum, _ := zWriter.CreateHeader(cHdr) - io.Copy(zSum, bytes.NewBufferString(f.Checksum)) - } + cHdr := &zip.FileHeader{Name: f.Name + ".md5", UncompressedSize64: uint64(len(f.Checksum))} + cHdr.SetModTime(f.ModTime) + zSum, _ := zWriter.CreateHeader(cHdr) + io.Copy(zSum, bytes.NewBufferString(f.Checksum)) + } } zWriter.Close() pWriter.Close() @@ -261,16 +262,19 @@ func CompressReader(format string, filename string, inReader io.ReadCloser) (out pReader, pWriter := io.Pipe() if format == "gzip" { gWriter := gzip.NewWriter(pWriter) - gWriter.Header.Name = filename go func() { + gWriter.Header.Name = filename + gWriter.Header.ModTime = time.Now() io.Copy(gWriter, inReader) gWriter.Close() pWriter.Close() }() } else if format == "zip" { zWriter := zip.NewWriter(pWriter) - zFile, _ := zWriter.Create(filename) go func() { + zHdr := &zip.FileHeader{Name: filename} + zHdr.SetModTime(time.Now()) + zFile, _ := zWriter.CreateHeader(zHdr) io.Copy(zFile, inReader) zWriter.Close() pWriter.Close()