-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
201 lines (190 loc) · 8.1 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
var amqp = require('amqplib');
var uuid = require('node-uuid');
var graylog2 = require('graylog2');
var logger={log:function(){},error:function(){}};
var cacheTable={};
var mqService={};
//defer is discourage in most promise library so we define one for this particular situation
function defer(timeout) {
var resolve, reject;
var promise = new Promise(function() {
resolve = arguments[0];
reject = arguments[1];
setTimeout(reject, timeout, "Timeout");
});
return {
resolve: resolve,
reject: reject,
promise: promise,
timestamp:new Date().getTime()
};
}
exports.connect_amqp=function(amqp_url, options){
return amqp.connect(amqp_url).then(function (conn) {
return conn.createChannel().then(
function onFulfilled(ch) {
if(options){
logger=new graylog2.graylog(options);
}
mqService.ch=ch;
var ok = ch.assertQueue('', {exclusive: true})
.then(function (qok) {
return qok.queue;
});
mqService.ok=ok;
return ok;
});
}).then(null, function(err) {
console.error("Exception handled, reconnecting...\nDetail:\n" + err);
setTimeout(exports.connect_amqp(amqp_url), 5000);
});
};
exports.send=function(serviceName,message,timeout){
var messageStr=JSON.stringify(message);
var q = serviceName + "_queue";
var corrId = uuid();
var ok = mqService.ok;
var ch = mqService.ch;
//create a new Promise and waiting for resolve
var df=defer(timeout);
cacheTable[corrId]=df;
ok = ok.then(function (queue) {
return ch.consume(queue, maybeAnswer, {noAck: true})
.then(function () {
return queue;
});
});
ok = ok.then(function (queue) {
//console.log(' [x] Requesting'+ message);
ch.sendToQueue(q, new Buffer(messageStr), {
correlationId: corrId, replyTo: queue
});
});
return df.promise.then(
function onFulfilled(rs){
if(JSON.parse(rs) == corrId) {
rs = undefined;
return rs;
}else{
logger.log("Success","request@#$"+messageStr+"====response@#$"+rs,{duration:new Date().getTime()-df.timestamp, service:serviceName});
return JSON.parse(rs);
}
},
function onReject(err){
logger.error("Timeout","request@#$"+messageStr+"====response@#$"+err,{duration:new Date().getTime()-df.timestamp, service:serviceName});
throw new Error(err);
}
);
};
function maybeAnswer(msg) {
var corrId=msg.properties.correlationId;
var pro = cacheTable[corrId];
if(pro){
pro.resolve(msg.content.toString());
delete cacheTable[corrId];
}
}
/**
* Server Side function
**/
/**
*
* @param amqp_url
* @param service_name
* @param pro (this function has to be a promise)
* @param options (object)
* Must Have: noAck (boolean)
* if noAck = True: the queue will send tasks to server and then discard regardless of the server's
* state.
* Warning: when having two or more servers, setting noAck to True will risk loosing
* message if one of the server that gets the messages goes offline.
* if noAck = False: the queue will make sure the server get and finish the task with acknowledgements.
* Warning: if the server gets the tasks but fails to finish, the message will
* 1. Timeout as determined by messageTtl option
* 2. If the server goes offline, the tasks will be requeued and sent to the
* next available server.
* *We recommend setting noAck to False to guarantee message delivery and avoid loosing message when
* multiple servers are online and working
* Optional :
* messageTtl (milliseconds) (must start a new queue if this option was just added, modified, or taken out)
* Set Timeout for messages in the queue. This option has proven to be extremely helpful when the
* server goes offline and comes back and the messages are requeued. If the messageTtl is set to
* the same milliseconds as the client's timeout, the queue will make sure the server do not get
* timed out messages that the client no longer cares about.
* *We recommend setting the messageTtl equal to the client's timeout parameter.
*
* prefetch_num (integer bigger than or equal to 1)
* Set the maximum number of acknowledgements the queue can wait from the server. In other words, it
* is the maximum number of tasks one server can take each time. That said, this will only be
* effective when the noAck is set to False. If prefetch_num is not passed in, the server will
* simply take as many tasks as possible and this may cause a race condition. In our production
* experience, we find that a number of 10 or 100 works just fine.
* *Only effective when noAck == False
*
* durable (boolean)
* Make the queue durable as stated on the RabbitMQ website. The default setting is false.
* Example:
* Our Safest/Most Used Options: {noAck:false, prefetch_num:10, messageTtl:60000}
* Simplest/Minimalist Options(best for just testing): {noAck:true}
*
*
*
*/
exports.server_listen=function(amqp_url,service_name,pro,options){//pro has to be a promise
var durable;
if(options.durable) {
durable = options.durable;
}else
durable = false;
var queueOptions={durable: durable};
if(options.messageTtl)
queueOptions.messageTtl = options.messageTtl;
amqp.connect(amqp_url).then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {
var q = service_name + "_queue";
var ok = ch.assertQueue(q, queueOptions);
var ok = ok.then(function() {
if(options.prefetch_num) {
ch.prefetch(options.prefetch_num);
}
return ch.consume(q, reply,{noAck:options.noAck});
});
return ok.then(function() {
console.log(' [x] Awaiting requests');
});
function reply(msg) {
var content=JSON.parse(msg.content.toString());
pro(content).then(
function onFulfilled(response){
if(!response) {
response = msg.properties.correlationId;
}
ch.sendToQueue(msg.properties.replyTo,
new Buffer(JSON.stringify(response)),
{correlationId: msg.properties.correlationId});
if(!options.noAck) {
ch.ack(msg);
}
},
function onReject(err){
console.log(err);
ch.sendToQueue(msg.properties.replyTo,
new Buffer(JSON.stringify(err)),
{correlationId: msg.properties.correlationId});
if(!options.noAck) {
if(options.ensureDone){
ch.nack(msg);
}else {
ch.ack(msg);
}
}
}
);
}
});
}).then(null, function(err){
console.error("Server has problem connecting, shutting down...\nDetail:\n" + err);
throw err;
});
};