-
Notifications
You must be signed in to change notification settings - Fork 0
/
sync.go
141 lines (115 loc) · 3.34 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package nathttp
import (
"encoding/json"
"github.com/satori/go.uuid"
"github.com/streadway/amqp"
"net/http"
)
type NatHttpConnection struct {
qurl string
qreq string
qres string
}
func (c *NatHttpConnection) SetRequestQueueName(name string) {
c.qreq = name
}
func (c *NatHttpConnection) SetResponseQueueName(name string) {
c.qres = name
}
func (c *NatHttpConnection) Get(url string) (string, error) {
var message NatHttpRequestMessage
message.Sync = true
message.Url = url
return c.Request(message)
}
func (c *NatHttpConnection) Post(url string) (string, error) {
return c.sendJson(http.MethodPost, url, "")
}
func (c *NatHttpConnection) PostJson(url string, data interface{}) (string, error) {
return c.sendJson(http.MethodPost, url, data)
}
func (c *NatHttpConnection) PostForm(url string, data map[string][]string) (string, error) {
return c.sendForm(http.MethodPost, url, data)
}
func (c *NatHttpConnection) Put(url string) (string, error) {
return c.sendJson(http.MethodPut, url, "")
}
func (c *NatHttpConnection) PutJson(url string, data interface{}) (string, error) {
return c.sendJson(http.MethodPut, url, data)
}
func (c *NatHttpConnection) PutForm(url string, data map[string][]string) (string, error) {
return c.sendForm(http.MethodPut, url, data)
}
func (c *NatHttpConnection) Delete(url string) (string, error) {
return c.sendJson(http.MethodDelete, url, "")
}
func (c *NatHttpConnection) DeleteJson(url string, data interface{}) (string, error) {
return c.sendJson(http.MethodDelete, url, data)
}
func (c *NatHttpConnection) DeleteForm(url string, data map[string][]string) (string, error) {
return c.sendForm(http.MethodDelete, url, data)
}
func (c *NatHttpConnection) sendJson(method, url string, data interface{}) (string, error) {
var message NatHttpRequestMessage
message.Sync = true
message.Url = url
message.Method = method
jsonData, err := GetJsonString(data)
if err != nil {
return "", err
}
message.JsonData = jsonData
return c.Request(message)
}
func (c *NatHttpConnection) sendForm(method, url string, data map[string][]string) (string, error) {
var message NatHttpRequestMessage
message.Sync = true
message.Url = url
message.Method = method
message.FormData = data
return c.Request(message)
}
func (c *NatHttpConnection) Request(message NatHttpRequestMessage) (res string, err error) {
conn, err := amqp.Dial(c.qurl)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
var body []byte
body, err = json.Marshal(message)
if err != nil {
return
}
corrId := uuid.NewV4().String()
msgs, err := ch.Consume(
c.qres, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
err = ch.Publish(
"", // exchange
c.qreq, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: c.qres,
Body: body,
})
failOnError(err, "Failed to publish a message")
for d := range msgs {
if corrId == d.CorrelationId {
res = string(d.Body)
failOnError(err, "Failed to convert body to integer")
break
}
}
return
}