Skip to content

Commit

Permalink
Merge pull request #322 from teharrison/master
Browse files Browse the repository at this point in the history
new features
  • Loading branch information
teharrison authored Mar 29, 2017
2 parents 65505a7 + 84b21f7 commit eb60389
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# creates statically compiled shock-server binary: /go/bin/shock-server

FROM golang:1.7.5-alpine
FROM golang:1.8-alpine

RUN apk update && apk add git make gcc libc-dev cyrus-sasl-dev

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile_new
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# docker build -t mgrast/shock -f Dockerfile_new .
# docker run --rm --name test -ti mgrast/shock /bin/ash

FROM golang:1.7.5-alpine
FROM golang:1.8-alpine


ENV DIR=/go/src/github.com/MG-RAST/Shock
Expand Down
7 changes: 7 additions & 0 deletions RELEASE_NOTES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# v0.9.20

- update to golang 1.8
- add priority field to node along with index and set option
- add configurable max limit to revisions array
- enable preauth download (download_url) for subset nodes

# v0.9.19

- add 'md5' mongod index
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.19
0.9.20
2 changes: 2 additions & 0 deletions shock-server.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ local_paths=
pidfile=

[Runtime]
# maximum number of most recent revisions to keep. 0 means keep none, -1 means keep all
max_revisions=3
# wait time in minutes before expiration reaper runs
expire_wait=60
# golang setting: The GOMAXPROCS variable limits the number of operating system threads that can execute user-level Go code simultaneously.
Expand Down
8 changes: 6 additions & 2 deletions shock-server/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ var (
CONFIG_FILE = ""

// Runtime
EXPIRE_WAIT = 60 // wait time for reaper in minutes
GOMAXPROCS = ""
MAX_REVISIONS = 3 // max number of node revisions to keep; values < 0 mean keep all
EXPIRE_WAIT = 60 // wait time for reaper in minutes
GOMAXPROCS = ""

// Logs
LOG_PERF = false // Indicates whether performance logs should be stored
Expand Down Expand Up @@ -123,6 +124,7 @@ func Initialize() {
AUTH_MGRAST_OAUTH_URL, _ = c.String("Auth", "mgrast_oauth_url")

// Runtime
MAX_REVISIONS, _ = c.Int("Runtime", "max_revisions")
EXPIRE_WAIT, _ = c.Int("Runtime", "expire_wait")
GOMAXPROCS, _ = c.String("Runtime", "GOMAXPROCS")

Expand Down Expand Up @@ -225,4 +227,6 @@ func Print() {
} else {
fmt.Printf("##### Log rotation disabled #####\n\n")
}
fmt.Printf("##### Expiration #####\nexpire_wait:\t%d minutes\n\n", EXPIRE_WAIT)
fmt.Printf("##### Max Revisions #####\nmax_revisions:\t%d\n\n", MAX_REVISIONS)
}
57 changes: 18 additions & 39 deletions shock-server/controller/node/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,21 +340,19 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
}
// download full file
} else {
nf, err := n.FileReader()
defer nf.Close()
if err != nil {
err_msg := "err:@node_Read node.FileReader: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
}
var s request.Streamer
if n.Type == "subset" {
// open file
r, err := n.FileReader()
defer r.Close()
if err != nil {
err_msg := "Err@node_Read:Open: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
}

s := &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: fFunc, Compression: compressionFormat}

s = &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: fFunc, Compression: compressionFormat}
if n.File.Size == 0 {
// handle empty subset file
s.R = append(s.R, r)
s.R = append(s.R, nf)
} else {
idx := index.New()
fullRange := "1-" + strconv.FormatInt(n.Subset.Index.TotalUnits, 10)
Expand All @@ -363,39 +361,20 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
return responder.RespondWithError(ctx, http.StatusInternalServerError, err.Error())
}
for _, rec := range recSlice {
s.R = append(s.R, io.NewSectionReader(r, rec[0], rec[1]))
s.R = append(s.R, io.NewSectionReader(nf, rec[0], rec[1]))
}
}
if err = s.Stream(download_raw); err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.Stream: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
} else {
nf, err := n.FileReader()
defer nf.Close()
if err != nil {
// File not found or some sort of file read error.
// Probably deserves more checking
err_msg := "err:@node_Read node.FileReader: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
s := &request.Streamer{R: []file.SectionReader{nf}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: fFunc, Compression: compressionFormat}
if err = s.Stream(download_raw); err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.Stream: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
s = &request.Streamer{R: []file.SectionReader{nf}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: fFunc, Compression: compressionFormat}
}
if err = s.Stream(download_raw); err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.Stream: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
}
} else if _, ok := query["download_url"]; ok {
if n.Type == "subset" {
return responder.RespondWithError(ctx, http.StatusBadRequest, "subset nodes do not currently support download_url operation")
}

if !n.HasFile() {
return responder.RespondWithError(ctx, http.StatusBadRequest, e.NodeNoFile)
} else {
Expand Down
37 changes: 27 additions & 10 deletions shock-server/controller/preauth/preauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,28 @@ import (
"github.com/MG-RAST/Shock/shock-server/request"
"github.com/MG-RAST/Shock/shock-server/responder"
"github.com/MG-RAST/golib/stretchr/goweb/context"
"net/http"
)

func PreAuthRequest(ctx context.Context) {
id := ctx.PathValue("id")
if p, err := preauth.Load(id); err != nil {
err_msg := "err:@preAuth load: " + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, 500, err_msg)
responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
} else {
if n, err := node.Load(p.NodeId); err == nil {
switch p.Type {
case "download":
streamDownload(ctx, n, p.Options)
preauth.Delete(id)
default:
responder.RespondWithError(ctx, 500, "Preauthorization type not supported: "+p.Type)
responder.RespondWithError(ctx, http.StatusNotFound, "Preauthorization type not supported: "+p.Type)
}
} else {
err_msg := "err:@preAuth loadnode: " + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, 500, err_msg)
responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
}
}
return
Expand All @@ -42,11 +43,9 @@ func streamDownload(ctx context.Context, n *node.Node, options map[string]string
nf, err := n.FileReader()
defer nf.Close()
if err != nil {
// File not found or some sort of file read error.
// Probably deserves more checking
err_msg := "err:@preAuth node.FileReader: " + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, 500, err_msg)
responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
return
}
// set defaults
Expand All @@ -68,13 +67,31 @@ func streamDownload(ctx context.Context, n *node.Node, options map[string]string
}
}
// stream it
s := &request.Streamer{R: []file.SectionReader{nf}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: filterFunc, Compression: compressionFormat}
err = s.Stream(false)
if err != nil {
var s request.Streamer
if n.Type == "subset" {
s = &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: filterFunc, Compression: compressionFormat}
if n.File.Size == 0 {
// handle empty subset file
s.R = append(s.R, nf)
} else {
idx := index.New()
fullRange := "1-" + strconv.FormatInt(n.Subset.Index.TotalUnits, 10)
recSlice, err := idx.Range(fullRange, n.Path()+"/"+n.Id+".subset.idx", n.Subset.Index.TotalUnits)
if err != nil {
return responder.RespondWithError(ctx, http.StatusInternalServerError, err.Error())
}
for _, rec := range recSlice {
s.R = append(s.R, io.NewSectionReader(nf, rec[0], rec[1]))
}
}
} else {
s = &request.Streamer{R: []file.SectionReader{nf}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: filterFunc, Compression: compressionFormat}
}
if err = s.Stream(false); err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@preAuth: s.stream: " + err.Error()
logger.Error(err_msg)
responder.RespondWithError(ctx, 500, err_msg)
responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
return
}
2 changes: 2 additions & 0 deletions shock-server/node/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func Initialize() {
c.EnsureIndex(mgo.Index{Key: []string{"acl.delete"}, Background: true})
c.EnsureIndex(mgo.Index{Key: []string{"created_on"}, Background: true})
c.EnsureIndex(mgo.Index{Key: []string{"expiration"}, Background: true})
c.EnsureIndex(mgo.Index{Key: []string{"type"}, Background: true})
c.EnsureIndex(mgo.Index{Key: []int{"priority"}, Background: true})
c.EnsureIndex(mgo.Index{Key: []string{"file.path"}, Background: true})
c.EnsureIndex(mgo.Index{Key: []string{"file.virtual_parts"}, Background: true})
c.EnsureIndex(mgo.Index{Key: []string{"file.checksum.md5"}, Background: true})
Expand Down
9 changes: 8 additions & 1 deletion shock-server/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Node struct {
Tags []string `bson:"tags" json:"tags"`
Revisions []Node `bson:"revisions" json:"-"`
Linkages []linkage `bson:"linkage" json:"linkage"`
Priority int `bson:"priority" json:"priority"`
CreatedOn time.Time `bson:"created_on" json:"created_on"`
LastModified time.Time `bson:"last_modified" json:"last_modified"`
Expiration time.Time `bson:"expiration" json:"expiration"` // 0 means no expiration
Expand Down Expand Up @@ -346,6 +347,12 @@ func (node *Node) SetFileFormat(format string) (err error) {
return
}

func (node *Node) SetPriority(priority int) (err error) {
node.Priority = priority
err = node.Save()
return
}

func (node *Node) SetExpiration(expire string) (err error) {
parts := ExpireRegex.FindStringSubmatch(expire)
if len(parts) == 0 {
Expand Down Expand Up @@ -378,7 +385,7 @@ func (node *Node) RemoveExpiration() (err error) {

func (node *Node) ClearRevisions() (err error) {
// empty the revisions array
node.Revisions = nil
node.Revisions = []Node{}
err = node.Save()
return
}
Expand Down
30 changes: 24 additions & 6 deletions shock-server/node/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,14 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error)
}
}

//update node tags
// update node tags
if _, hasDataType := params["tags"]; hasDataType {
if err = node.UpdateDataTags(params["tags"]); err != nil {
return err
}
}

//update file format
// update file format
if _, hasFormat := params["format"]; hasFormat {
if node.File.Format != "" {
return errors.New(fmt.Sprintf("file format already set:%s", node.File.Format))
Expand All @@ -394,6 +394,17 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error)
}
}

// update priority
if _, hasPriority := params["priority"]; hasPriority {
priority, err := strconv.Atoi(params["priority"])
if err != nil {
return errors.New("priority must be an integer")
}
if err = node.SetPriority(priority); err != nil {
return err
}
}

// update node expiration
if _, hasExpiration := params["expiration"]; hasExpiration {
if err = node.SetExpiration(params["expiration"]); err != nil {
Expand Down Expand Up @@ -460,10 +471,17 @@ func (node *Node) Save() (err error) {
// update versions
previousVersion := node.Version
node.UpdateVersion()
// only add to revisions if not new and has changed
if previousVersion != "" && previousVersion != node.Version {
n := Node{node.Id, node.Version, node.File, node.Attributes, node.Indexes, node.Acl, node.VersionParts, node.Tags, nil, node.Linkages, node.CreatedOn, node.LastModified, node.Expiration, node.Type, node.Subset, node.Parts}
node.Revisions = append(node.Revisions, n)
// only add to revisions if not new and has changed and allow revisions
if (previousVersion != "") && (previousVersion != node.Version) && (conf.MAX_REVISIONS != 0) {
n := Node{node.Id, node.Version, node.File, node.Attributes, node.Indexes, node.Acl, node.VersionParts, node.Tags, nil, node.Linkages, node.Priority, node.CreatedOn, node.LastModified, node.Expiration, node.Type, node.Subset, node.Parts}
newRevisions := append(node.Revisions, n)
// adjust revisions based on config
// <0 keep all ; >0 keep max
if (conf.MAX_REVISIONS < 0) || (len(newRevisions) <= conf.MAX_REVISIONS) {
node.Revisions = newRevisions
} else {
node.Revisions = newRevisions[:conf.MAX_REVISIONS]
}
}
if node.CreatedOn.String() == "0001-01-01 00:00:00 +0000 UTC" {
node.CreatedOn = time.Now()
Expand Down
2 changes: 1 addition & 1 deletion shock-server/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890"

// Arrays to check for valid param and file form names for node creation and updating, and also acl modification.
// Note: indexing and querying do not use functions that use these arrays and thus we don't have to include those field names.
var validParams = []string{"action", "all", "archive_format", "attributes_str", "clear_revisions", "copy_attributes", "copy_data", "copy_indexes", "compression", "delete", "expiration", "file_name", "format", "ids", "index_name", "linkage", "operation", "owner", "parent_index", "parent_node", "parts", "path", "preserve_acls", "read", "remove_expiration", "source", "tags", "type", "unpack_node", "upload_url", "users", "write"}
var validParams = []string{"action", "all", "archive_format", "attributes_str", "clear_revisions", "copy_attributes", "copy_data", "copy_indexes", "compression", "delete", "expiration", "file_name", "format", "ids", "index_name", "linkage", "operation", "owner", "parent_index", "parent_node", "parts", "path", "preserve_acls", "priority", "read", "remove_expiration", "source", "tags", "type", "unpack_node", "upload_url", "users", "write"}
var validFiles = []string{"attributes", "subset_indices", "upload", "gzip", "bzip2"}
var ValidUpload = []string{"upload", "gzip", "bzip2"}

Expand Down

0 comments on commit eb60389

Please sign in to comment.