-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_get.php
52 lines (44 loc) · 1.36 KB
/
test_get.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?php
header("content-type:text/html;charset=utf-8");
$conn_args = array(
'host' => '192.168.33.30',
'port' => '5672',
'login' => 'title',
'password' => 'title',
'vhost'=>'/'
);
$e_name = 'e_linvo'; //交换机名
$q_name = 'q_linvo'; //无需队列名
$k_route = 'key_1'; //路由key
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declareExchange()."\n";
//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo "Message Total:".$q->declareQueue()."\n";
//绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";
//阻塞模式接收消息
//echo "Message:\n";
//$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
//消息获取
$messages = $q->get(AMQP_AUTOACK) ;
if($messages){
var_dump($messages->getBody());
}
function processMessage($envelope, $queue) {
$msg = $envelope->getBody();
echo $msg."\n"; //处理消息
file_put_contents("a.txt",$msg."\r\n",FILE_APPEND);
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}