diff --git a/RELEASE_NOTES.txt b/RELEASE_NOTES.txt index 2c78295a..f945cdbf 100644 --- a/RELEASE_NOTES.txt +++ b/RELEASE_NOTES.txt @@ -1,3 +1,9 @@ +# v0.9.21 + +- graceful error handling of missing .bson file +- add more to preauth return: file size, options used +- added ability to download multiple files (.tar or .zip format) from a query + # v0.9.20 - add priority field to node along with index and set option diff --git a/VERSION b/VERSION index 6b7a3e33..fc6d9f6b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.20 \ No newline at end of file +0.9.21 \ No newline at end of file diff --git a/shock-server/controller/node/multi.go b/shock-server/controller/node/multi.go index d3c612a7..46ff57c4 100644 --- a/shock-server/controller/node/multi.go +++ b/shock-server/controller/node/multi.go @@ -6,6 +6,8 @@ import ( e "github.com/MG-RAST/Shock/shock-server/errors" "github.com/MG-RAST/Shock/shock-server/logger" "github.com/MG-RAST/Shock/shock-server/node" + "github.com/MG-RAST/Shock/shock-server/node/archive" + "github.com/MG-RAST/Shock/shock-server/preauth" "github.com/MG-RAST/Shock/shock-server/request" "github.com/MG-RAST/Shock/shock-server/responder" "github.com/MG-RAST/Shock/shock-server/user" @@ -72,7 +74,7 @@ func (cr *NodeController) ReadMany(ctx context.Context) error { // Gather params to make db query. Do not include the following list. if _, ok := query["query"]; ok { - paramlist := map[string]int{"query": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "distinct": 1} + paramlist := map[string]int{"query": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "distinct": 1, "download_url": 1, "file_name": 1, "archive": 1} for key := range query { if _, found := paramlist[key]; !found { keyStr := fmt.Sprintf("attributes.%s", key) @@ -86,7 +88,7 @@ func (cr *NodeController) ReadMany(ctx context.Context) error { } } } else if _, ok := query["querynode"]; ok { - paramlist := map[string]int{"querynode": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "distinct": 1, "owner": 1, "read": 1, "write": 1, "delete": 1, "public_owner": 1, "public_read": 1, "public_write": 1, "public_delete": 1} + paramlist := map[string]int{"querynode": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "distinct": 1, "download_url": 1, "file_name": 1, "archive": 1, "owner": 1, "read": 1, "write": 1, "delete": 1, "public_owner": 1, "public_read": 1, "public_write": 1, "public_delete": 1} for key := range query { if _, found := paramlist[key]; !found { for _, value := range query[key] { @@ -197,6 +199,56 @@ func (cr *NodeController) ReadMany(ctx context.Context) error { logger.Error(err_msg) return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg) } + + // process preauth url request, requires archive option + if _, ok := query["download_url"]; ok { + // add options - set defaults first + options := map[string]string{} + options["archive"] = "zip" // default is zip + if _, ok := query["archive"]; ok { + if archive.IsValidToArchive(query.Get("archive")) { + options["archive"] = query.Get("archive") + } + } + preauthId := util.RandString(20) + if _, ok := query["file_name"]; ok { + options["filename"] = query.Get("file_name") + } else { + options["filename"] = preauthId + } + if !strings.HasSuffix(options["filename"], options["archive"]) { + options["filename"] = options["filename"] + "." + options["archive"] + } + // get valid nodes + var nodeIds []string + var totalBytes int64 + for _, n := range nodes { + if n.HasFile() { + nodeIds = append(nodeIds, n.Id) + totalBytes += n.File.Size + } + } + if len(nodeIds) == 0 { + return responder.RespondWithError(ctx, http.StatusBadRequest, "err:@node_ReadMany download url: no available files found") + } + // set preauth + if p, err := preauth.New(preauthId, "download", nodeIds, options); err != nil { + err_msg := "err:@node_ReadMany download_url: " + err.Error() + logger.Error(err_msg) + return responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) + } else { + data := preauth.PreAuthResponse{ + Url: util.ApiUrl(ctx) + "/preauth/" + p.Id, + ValidTill: p.ValidTill.Format(time.ANSIC), + Format: options["archive"], + Filename: options["filename"], + Files: len(nodeIds), + Size: totalBytes, + } + return responder.RespondWithPaginatedData(ctx, data, limit, offset, count) + } + } + return responder.RespondWithPaginatedData(ctx, nodes, limit, offset, count) } diff --git a/shock-server/controller/node/single.go b/shock-server/controller/node/single.go index 303f1fe3..c16909d3 100644 --- a/shock-server/controller/node/single.go +++ b/shock-server/controller/node/single.go @@ -77,8 +77,8 @@ func (cr *NodeController) Read(id string, ctx context.Context) error { var fFunc filter.FilterFunc = nil var compressionFormat string = "" // use query params if exist - if _, ok := query["filename"]; ok { - filename = query.Get("filename") + if _, ok := query["file_name"]; ok { + filename = query.Get("file_name") } if _, ok := query["filter"]; ok { if filter.Has(query.Get("filter")) { @@ -378,6 +378,7 @@ func (cr *NodeController) Read(id string, ctx context.Context) error { if !n.HasFile() { return responder.RespondWithError(ctx, http.StatusBadRequest, e.NodeNoFile) } else { + preauthFilename := filename // add options options := map[string]string{} options["filename"] = filename @@ -386,14 +387,23 @@ func (cr *NodeController) Read(id string, ctx context.Context) error { } if compressionFormat != "" { options["compression"] = compressionFormat + preauthFilename = preauthFilename + "." + compressionFormat } // set preauth - if p, err := preauth.New(util.RandString(20), "download", n.Id, options); err != nil { + if p, err := preauth.New(util.RandString(20), "download", []string{n.Id}, options); err != nil { err_msg := "err:@node_Read download_url: " + err.Error() logger.Error(err_msg) return responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) } else { - return responder.RespondWithData(ctx, util.UrlResponse{Url: util.ApiUrl(ctx) + "/preauth/" + p.Id, ValidTill: p.ValidTill.Format(time.ANSIC)}) + data := preauth.PreAuthResponse{ + Url: util.ApiUrl(ctx) + "/preauth/" + p.Id, + ValidTill: p.ValidTill.Format(time.ANSIC), + Format: options["compression"], + Filename: preauthFilename, + Files: 1, + Size: n.File.Size, + } + return responder.RespondWithData(ctx, data) } } } else if _, ok := query["download_post"]; ok { diff --git a/shock-server/controller/preauth/preauth.go b/shock-server/controller/preauth/preauth.go index e998d86b..a0e99add 100644 --- a/shock-server/controller/preauth/preauth.go +++ b/shock-server/controller/preauth/preauth.go @@ -4,7 +4,6 @@ package preauth import ( "github.com/MG-RAST/Shock/shock-server/logger" "github.com/MG-RAST/Shock/shock-server/node" - "github.com/MG-RAST/Shock/shock-server/node/archive" "github.com/MG-RAST/Shock/shock-server/node/file" "github.com/MG-RAST/Shock/shock-server/node/file/index" "github.com/MG-RAST/Shock/shock-server/node/filter" @@ -24,37 +23,25 @@ func PreAuthRequest(ctx context.Context) { logger.Error(err_msg) responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) } else { - if n, err := node.Load(p.NodeId); err == nil { - switch p.Type { - case "download": - streamDownload(ctx, n, p.Options) - preauth.Delete(id) - default: - responder.RespondWithError(ctx, http.StatusNotFound, "Preauthorization type not supported: "+p.Type) - } - } else { - err_msg := "err:@preAuth loadnode: " + err.Error() - logger.Error(err_msg) - responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) + switch p.Type { + case "download": + streamDownload(ctx, id, p.Nodes, p.Options) + preauth.Delete(id) + default: + responder.RespondWithError(ctx, http.StatusNotFound, "Preauthorization type not supported: "+p.Type) } } return } // handle download and its options -func streamDownload(ctx context.Context, n *node.Node, options map[string]string) { - nf, err := n.FileReader() - defer nf.Close() - if err != nil { - err_msg := "err:@preAuth node.FileReader: " + err.Error() - logger.Error(err_msg) - responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) - return - } - // set defaults - filename := n.Id +func streamDownload(ctx context.Context, pid string, nodes []string, options map[string]string) { + // get defaults + filename := pid var filterFunc filter.FilterFunc = nil var compressionFormat string = "" + var archiveFormat string = "" + // use options if exist if fn, has := options["filename"]; has { filename = fn @@ -65,35 +52,93 @@ func streamDownload(ctx context.Context, n *node.Node, options map[string]string } } if cp, has := options["compression"]; has { - if archive.IsValidCompress(cp) { - compressionFormat = cp - } + compressionFormat = cp } - // stream it - var s *request.Streamer - if n.Type == "subset" { - s = &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: filterFunc, Compression: compressionFormat} - if n.File.Size == 0 { - // handle empty subset file - s.R = append(s.R, nf) - } else { - idx := index.New() - fullRange := "1-" + strconv.FormatInt(n.Subset.Index.TotalUnits, 10) - recSlice, err := idx.Range(fullRange, n.Path()+"/"+n.Id+".subset.idx", n.Subset.Index.TotalUnits) - if err != nil { - responder.RespondWithError(ctx, http.StatusInternalServerError, err.Error()) - return - } - for _, rec := range recSlice { - s.R = append(s.R, io.NewSectionReader(nf, rec[0], rec[1])) + if ar, has := options["archive"]; has { + archiveFormat = ar + } + var files []*file.FileInfo + + // process nodes + for _, nid := range nodes { + // get node + n, err := node.Load(nid) + if (err != nil) || !n.HasFile() { + continue + } + // get filereader + nf, err := n.FileReader() + if err != nil { + nf.Close() + continue + } + // add to file array + var fileInfo file.FileInfo + if n.Type == "subset" { + if n.File.Size == 0 { + // handle empty subset file + fileInfo.R = append(fileInfo.R, nf) + } else { + idx := index.New() + fullRange := "1-" + strconv.FormatInt(n.Subset.Index.TotalUnits, 10) + recSlice, err := idx.Range(fullRange, n.Path()+"/"+n.Id+".subset.idx", n.Subset.Index.TotalUnits) + if err != nil { + nf.Close() + continue + } + for _, rec := range recSlice { + fileInfo.R = append(fileInfo.R, io.NewSectionReader(nf, rec[0], rec[1])) + } } + } else { + fileInfo.R = append(fileInfo.R, nf) } - } else { - s = &request.Streamer{R: []file.SectionReader{nf}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: filterFunc, Compression: compressionFormat} + defer nf.Close() + // add to file info + fileInfo.Name = n.File.Name + fileInfo.Size = n.File.Size + fileInfo.ModTime = n.File.CreatedOn + if _, ok := n.File.Checksum["md5"]; ok { + fileInfo.Checksum = n.File.Checksum["md5"] + } + files = append(files, &fileInfo) } - if err = s.Stream(false); err != nil { - // causes "multiple response.WriteHeader calls" error but better than no response - err_msg := "err:@preAuth: s.stream: " + err.Error() + + if (len(nodes) == 1) && (len(files) == 1) { + // create single node / file streamer + s := &request.Streamer{ + R: files[0].R, + W: ctx.HttpResponseWriter(), + ContentType: "application/octet-stream", + Filename: filename, + Size: files[0].Size, + Filter: filterFunc, + Compression: compressionFormat, + } + if err := s.Stream(false); err != nil { + // causes "multiple response.WriteHeader calls" error but better than no response + err_msg := "err:@preAuth: s.stream: " + err.Error() + logger.Error(err_msg) + responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) + } + } else if (len(files) > 1) && (archiveFormat != "") { + // create multi node / file streamer, must have archive format + m := &request.MultiStreamer{ + Files: files, + W: ctx.HttpResponseWriter(), + ContentType: "application/octet-stream", + Filename: filename, + Archive: archiveFormat, + } + if err := m.MultiStream(); err != nil { + // causes "multiple response.WriteHeader calls" error but better than no response + err_msg := "err:@preAuth: m.multistream: " + err.Error() + logger.Error(err_msg) + responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg) + } + } else { + // something broke + err_msg := "err:@preAuth: no files available to download for given combination of options" logger.Error(err_msg) responder.RespondWithError(ctx, http.StatusBadRequest, err_msg) } diff --git a/shock-server/node/archive/archive.go b/shock-server/node/archive/archive.go index 526ba6d1..53e26525 100644 --- a/shock-server/node/archive/archive.go +++ b/shock-server/node/archive/archive.go @@ -3,6 +3,7 @@ package archive import ( "archive/tar" "archive/zip" + "bytes" "compress/bzip2" "compress/gzip" "crypto/md5" @@ -10,15 +11,18 @@ import ( "fmt" "github.com/MG-RAST/Shock/shock-server/conf" "github.com/MG-RAST/Shock/shock-server/logger" + "github.com/MG-RAST/Shock/shock-server/node/file" "io" "math/rand" "os" "path/filepath" "strings" + "time" ) var validUncompress = []string{"gzip", "bzip2"} var validCompress = []string{"gzip", "zip"} +var validToArchive = []string{"zip", "tar"} var validArchive = []string{"zip", "tar", "tar.gz", "tar.bz2"} var ArchiveList = strings.Join(validArchive, ", ") @@ -28,6 +32,15 @@ type FormFile struct { Checksum map[string]string } +func IsValidToArchive(a string) bool { + for _, b := range validToArchive { + if b == a { + return true + } + } + return false +} + func IsValidArchive(a string) bool { for _, b := range validArchive { if b == a { @@ -196,21 +209,72 @@ func unZip(filePath string, unpackDir string) (fileList []FormFile, err error) { return } +func ArchiveReader(format string, files []*file.FileInfo) (outReader io.ReadCloser) { + pReader, pWriter := io.Pipe() + if format == "tar" { + tWriter := tar.NewWriter(pWriter) + go func() { + for _, f := range files { + fHdr := &tar.Header{Name: f.Name, Mode: 0660, ModTime: f.ModTime, Size: f.Size} + tWriter.WriteHeader(fHdr) + io.Copy(tWriter, f.Body) + if f.Checksum != "" { + cHdr := &tar.Header{Name: f.Name + ".md5", Mode: 0660, ModTime: f.ModTime, Size: int64(len(f.Checksum))} + tWriter.WriteHeader(cHdr) + io.Copy(tWriter, bytes.NewBufferString(f.Checksum)) + } + } + tWriter.Close() + pWriter.Close() + }() + } else if format == "zip" { + zWriter := zip.NewWriter(pWriter) + go func() { + for _, f := range files { + zHdr := &zip.FileHeader{Name: f.Name, UncompressedSize64: uint64(f.Size)} + zHdr.SetModTime(f.ModTime) + zFile, _ := zWriter.CreateHeader(zHdr) + io.Copy(zFile, f.Body) + if f.Checksum != "" { + cHdr := &zip.FileHeader{Name: f.Name + ".md5", UncompressedSize64: uint64(len(f.Checksum))} + cHdr.SetModTime(f.ModTime) + zSum, _ := zWriter.CreateHeader(cHdr) + io.Copy(zSum, bytes.NewBufferString(f.Checksum)) + } + } + zWriter.Close() + pWriter.Close() + }() + } else { + // no valid archive, pipe each inReader into one stream + go func() { + for _, f := range files { + io.Copy(pWriter, f.Body) + } + pWriter.Close() + }() + } + return pReader +} + func CompressReader(format string, filename string, inReader io.ReadCloser) (outReader io.ReadCloser) { if IsValidCompress(format) { pReader, pWriter := io.Pipe() if format == "gzip" { gWriter := gzip.NewWriter(pWriter) - gWriter.Header.Name = filename go func() { + gWriter.Header.Name = filename + gWriter.Header.ModTime = time.Now() io.Copy(gWriter, inReader) gWriter.Close() pWriter.Close() }() } else if format == "zip" { zWriter := zip.NewWriter(pWriter) - zFile, _ := zWriter.Create(filename) go func() { + zHdr := &zip.FileHeader{Name: filename} + zHdr.SetModTime(time.Now()) + zFile, _ := zWriter.CreateHeader(zHdr) io.Copy(zFile, inReader) zWriter.Close() pWriter.Close() diff --git a/shock-server/node/file/file.go b/shock-server/node/file/file.go index cb8ebef1..284da835 100644 --- a/shock-server/node/file/file.go +++ b/shock-server/node/file/file.go @@ -19,6 +19,16 @@ type File struct { CreatedOn time.Time `bson:"created_on" json:"created_on"` } +// FileInfo for streaming file content +type FileInfo struct { + R []SectionReader + Body io.ReadCloser + Name string + Size int64 + ModTime time.Time + Checksum string +} + // SectionReader interface required for MultiReaderAt type SectionReader interface { io.Reader diff --git a/shock-server/node/node.go b/shock-server/node/node.go index 90450f0a..7401a76a 100644 --- a/shock-server/node/node.go +++ b/shock-server/node/node.go @@ -104,22 +104,6 @@ func New() (node *Node) { return } -func LoadFromDisk(id string) (n *Node, err error) { - if len(id) < 6 { - return nil, errors.New("Node ID must be at least 6 characters in length") - } - path := getPath(id) - if nbson, err := ioutil.ReadFile(path + "/" + id + ".bson"); err != nil { - return nil, errors.New(e.NodeDoesNotExist) - } else { - n = new(Node) - if err = bson.Unmarshal(nbson, &n); err != nil { - return nil, err - } - } - return -} - func CreateNodeUpload(u *user.User, params map[string]string, files FormFiles) (node *Node, err error) { // if copying node or creating subset node from parent, check if user has rights to the original node if _, hasCopyData := params["copy_data"]; hasCopyData { diff --git a/shock-server/node/update.go b/shock-server/node/update.go index 0a8ec869..b6da4ddc 100644 --- a/shock-server/node/update.go +++ b/shock-server/node/update.go @@ -477,7 +477,7 @@ func (node *Node) Save() (err error) { n := Node{node.Id, node.Version, node.File, node.Attributes, node.Indexes, node.Acl, node.VersionParts, node.Tags, nil, node.Linkages, node.Priority, node.CreatedOn, node.LastModified, node.Expiration, node.Type, node.Subset, node.Parts} newRevisions := []Node{n} if len(node.Revisions) > 0 { - newRevisions = append(newRevisions, node.Revisions...) // prepend, latest revisions in front + newRevisions = append(newRevisions, node.Revisions...) // prepend, latest revisions in front } // adjust revisions based on config // <0 keep all ; >0 keep max @@ -495,21 +495,25 @@ func (node *Node) Save() (err error) { // get bson, test size and print nbson, err := bson.Marshal(node) if err != nil { - return + return err } if len(nbson) >= DocumentMaxByte { return errors.New(fmt.Sprintf("bson document size is greater than limit of %d bytes", DocumentMaxByte)) } bsonPath := fmt.Sprintf("%s/%s.bson", node.Path(), node.Id) os.Remove(bsonPath) - err = ioutil.WriteFile(bsonPath, nbson, 0644) - if err != nil { - return + if err := ioutil.WriteFile(bsonPath, nbson, 0644); err != nil { + // dir path may be missing, recreate and try again + if err := node.Mkdir(); err != nil { + return err + } + if err := ioutil.WriteFile(bsonPath, nbson, 0644); err != nil { + return err + } } // save node to mongodb - err = dbUpsert(node) - if err != nil { - return + if err := dbUpsert(node); err != nil { + return err } return } @@ -519,14 +523,14 @@ func (node *Node) UpdateVersion() (err error) { version := node.Id versionParts := make(map[string]string) partMap := map[string]interface{}{"file_ver": node.File, "indexes_ver": node.Indexes, "attributes_ver": node.Attributes, "acl_ver": node.Acl} - + // need to keep map ordered partKeys := []string{} for k, _ := range partMap { - partKeys = append(partKeys, k) + partKeys = append(partKeys, k) } sort.Strings(partKeys) - + for _, k := range partKeys { j, er := json.Marshal(partMap[k]) if er != nil { diff --git a/shock-server/node/util.go b/shock-server/node/util.go index 3a4b83ff..d13481eb 100644 --- a/shock-server/node/util.go +++ b/shock-server/node/util.go @@ -1,8 +1,8 @@ package node import ( - "sort" - "sync" + "sort" + "sync" ) type mappy map[string]bool @@ -69,20 +69,20 @@ func (l *Locker) GetNodes() (ids []string) { type sortBytes []byte func (b sortBytes) Less(i, j int) bool { - return b[i] < b[j] + return b[i] < b[j] } func (b sortBytes) Swap(i, j int) { - b[i], b[j] = b[j], b[i] + b[i], b[j] = b[j], b[i] } func (b sortBytes) Len() int { - return len(b) + return len(b) } func SortByteArray(b []byte) []byte { - sb := make([]byte, len(b)) - copy(sb, b) - sort.Sort(sortBytes(sb)) - return sb + sb := make([]byte, len(b)) + copy(sb, b) + sort.Sort(sortBytes(sb)) + return sb } diff --git a/shock-server/preauth/preauth.go b/shock-server/preauth/preauth.go index fa60ac54..bf17eea3 100644 --- a/shock-server/preauth/preauth.go +++ b/shock-server/preauth/preauth.go @@ -11,12 +11,21 @@ import ( // Database collection handle var DB *mgo.Collection +type PreAuthResponse struct { + Url string `json:"url"` + ValidTill string `json:"validtill"` + Format string `json:"format"` + Filename string `json:"filename"` + Files int `json:"files"` + Size int64 `json:"size"` +} + type PreAuth struct { - Id string - Type string - NodeId string - Options map[string]string - ValidTill time.Time + Id string `bson:"id" json:"id"` + Type string `bson:"type" json:"type"` + Nodes []string `bson:"nodes" json:"nodes"` + Options map[string]string `bson:"options" json:"options"` + ValidTill time.Time `bson:"validtill" json:"validtill"` } // Initialize is an explicit init. Requires db.Initialize @@ -26,9 +35,9 @@ func Initialize() { DB.EnsureIndex(mgo.Index{Key: []string{"id"}, Unique: true}) } -// New preauth takes the id, type, node id, and a map of options -func New(id, t, nid string, options map[string]string) (p *PreAuth, err error) { - p = &PreAuth{Id: id, Type: t, NodeId: nid, Options: options, ValidTill: time.Now().AddDate(0, 0, 1)} +// New preauth takes the id, type, node ids, and a map of options +func New(id string, t string, nids []string, options map[string]string) (p *PreAuth, err error) { + p = &PreAuth{Id: id, Type: t, Nodes: nids, Options: options, ValidTill: time.Now().AddDate(0, 1, 0)} if _, err = DB.Upsert(bson.M{"id": p.Id}, &p); err != nil { return nil, err } diff --git a/shock-server/request/streamer.go b/shock-server/request/streamer.go index 8fd11f4f..bfa45236 100644 --- a/shock-server/request/streamer.go +++ b/shock-server/request/streamer.go @@ -13,6 +13,16 @@ import ( "os/exec" ) +// MultiStreamer if for taking multiple files and creating one stream through an archive format: zip, tar, etc. + +type MultiStreamer struct { + Files []*file.FileInfo + W http.ResponseWriter + ContentType string + Filename string + Archive string +} + type Streamer struct { R []file.SectionReader W http.ResponseWriter @@ -27,7 +37,7 @@ func (s *Streamer) Stream(streamRaw bool) (err error) { // file download if !streamRaw { fileName := fmt.Sprintf(" attachment; filename=%s", s.Filename) - // add extension for compression + // add extension for compression or archive if s.Compression != "" { fileName = fmt.Sprintf(" attachment; filename=%s.%s", s.Filename, s.Compression) } @@ -39,7 +49,7 @@ func (s *Streamer) Stream(streamRaw bool) (err error) { s.W.Header().Set("Access-Control-Allow-Headers", "Authorization") s.W.Header().Set("Access-Control-Allow-Methods", "POST, GET, PUT, DELETE, OPTIONS") s.W.Header().Set("Access-Control-Allow-Origin", "*") - if s.Size > 0 && s.Filter == nil && !archive.IsValidCompress(s.Compression) { + if (s.Size > 0) && (s.Filter == nil) && (s.Compression == "") { s.W.Header().Set("Content-Length", fmt.Sprint(s.Size)) } @@ -54,9 +64,7 @@ func (s *Streamer) Stream(streamRaw bool) (err error) { } else { rs = sr } - if _, err = io.Copy(pWriter, rs); err != nil { - break - } + io.Copy(pWriter, rs) } pWriter.Close() }() @@ -69,6 +77,38 @@ func (s *Streamer) Stream(streamRaw bool) (err error) { return } +func (m *MultiStreamer) MultiStream() (err error) { + // set headers + fileName := fmt.Sprintf(" attachment; filename=%s", m.Filename) + m.W.Header().Set("Content-Type", m.ContentType) + m.W.Header().Set("Connection", "close") + m.W.Header().Set("Access-Control-Allow-Headers", "Authorization") + m.W.Header().Set("Access-Control-Allow-Methods", "POST, GET, PUT, DELETE, OPTIONS") + m.W.Header().Set("Access-Control-Allow-Origin", "*") + m.W.Header().Set("Content-Disposition", fileName) + + // pipe each SectionReader into one stream + for _, f := range m.Files { + pReader, pWriter := io.Pipe() + f.Body = pReader + go func(lf *file.FileInfo) { + for _, sr := range lf.R { + io.Copy(pWriter, sr) + } + pWriter.Close() + }(f) + } + + // pass pipes through archiver to ResponseWriter + aReader := archive.ArchiveReader(m.Archive, m.Files) + _, err = io.Copy(m.W, aReader) + aReader.Close() + for _, f := range m.Files { + f.Body.Close() + } + return +} + func (s *Streamer) StreamSamtools(filePath string, region string, args ...string) (err error) { //involking samtools in command line: //samtools view [-c] [-H] [-f INT] ... filname.bam [region] diff --git a/shock-server/util/util.go b/shock-server/util/util.go index c2aeca5b..cd07e1a0 100644 --- a/shock-server/util/util.go +++ b/shock-server/util/util.go @@ -19,11 +19,6 @@ var validParams = []string{"action", "all", "archive_format", "attributes_str", var validFiles = []string{"attributes", "subset_indices", "upload", "gzip", "bzip2"} var ValidUpload = []string{"upload", "gzip", "bzip2"} -type UrlResponse struct { - Url string `json:"url"` - ValidTill string `json:"validtill"` -} - type Query struct { list map[string][]string }