-
Notifications
You must be signed in to change notification settings - Fork 4
/
utils.go
71 lines (60 loc) · 1.54 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package nq
import (
"encoding/json"
"fmt"
"strings"
)
//
// TaskMessage Encode/Decode utilities
//
func EncodeTMToJSON(t *TaskMessage) ([]byte, error) {
if b, err := json.Marshal(t); err != nil {
return nil, err
} else {
return b, nil
}
}
func DecodeTMFromJSON(data []byte) (*TaskMessage, error) {
var t TaskMessage
if err := json.Unmarshal(data, &t); err != nil {
return nil, err
} else {
return &t, nil
}
}
//
// Stream / Subject Name utilities
//
// Returns a durable name for stream
//
// Helps re-establishing connection to nats-server while maintaining sequence state
func StreamNameToDurableStreamName(srvName, stream string) string {
return fmt.Sprintf("%s-%s", srvName, stream)
}
// streamNameToCancelStreamName returns the name of stream responsible for cancellation of tasks in given stream
func StreamNameToCancelStreamName(subject string) string {
return fmt.Sprintf("cancel-%s", subject)
}
func CancelStreamNameToStreamName(stream, subject string) string {
return strings.Replace(subject, "cancel-", "", 1)
}
//
//
// Internal `Queue`s represent an abstraction over a nats stream -> subject
type Queue struct {
stream string
subject string
cancelStream string
cancelSubject string
}
func NewQueue(name string) *Queue {
return &Queue{
stream: name,
subject: fmt.Sprintf("%s.task", name),
cancelStream: fmt.Sprintf("%s/cancel", name),
cancelSubject: fmt.Sprintf("%s.cancel", name),
}
}
func (q *Queue) DurableStream(prefix string) string {
return fmt.Sprintf("%s/%s", prefix, q.stream)
}