RabbitMQ 初体验

RabbitMQ介绍

  • MQ是 message queue 的简称,是应用程序和应用程序之间通信的方法。

  • RabbitMQ是一个由erlang语言编写的、开源的、在AMQP基础上完整的、可复用的企业消息系统。支持多种语言,包括java、Python、ruby、PHP、C/C++等。

  • AMQP:advanced message queuing protocol ,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息并不受客户端/中间件不同产品、不同开发语言等条件的限制。

  • 实用优点:应用解耦,流量削峰,异步处理

RabbitMQ安装

自从有了docker,妈妈再也不担心我安装软件啦

为了方便管理docker容器,我们采用compose的方式运行

tips: management版本是带web管理工具的

1、超级简单版本

一键启动
1
docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest -p 15672:15672 -p 5672:5672 rabbitmq:management

2、看起来好看版本

docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
version: '2'
services:
rabbitmq:
image: rabbitmq:management
container_name: rabbitmq
hostname: myrabbitmq
ports:
- 15672:15672
- 5672:5672
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
restart: always
# 里面的/var/lib/rabbitmq/.erlang.cookie 需要600权限
volumes:
- ./data:/var/lib/rabbitmq
启动
1
docker-compose up -d
检查
  1. docker ps 查看状态
  2. 访问 localhost:15762 查看web管理页面
卸载
1
docker-compose down

尝试实用

官方文档:https://www.rabbitmq.com/getstarted.html

1、Hello World

单发送单接收:简单的发送与接收,没有特别的处理

python-one

代码示例

send.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?php

require_once __DIR__ . '/../vendor/autoload.php';

// 连接RabbitMQ
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建一个通道
$channel = $connection->channel();
// 声明一个队列
$channel->queue_declare('hello', false, false, false, false);

$msg = new \PhpAmqpLib\Message\AMQPMessage('Hello World');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();
receive.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?php

require_once __DIR__ . '/../vendor/autoload.php';

// 连接RabbitMQ
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建一个通道
$channel = $connection->channel();
// 声明一个队列
$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
$channel->wait();
}

发送测试:

1
2
$ php HelloWord/send.php
[x] Sent 'Hello World!'

接受测试:

1
2
3
$ php HelloWord/receive.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received Hello World
2、Work queues

单发送多接收:一个发送端,多个接收端,如分布式的任务派发

python-two

new_task.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php

require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Message\AMQPMessage;

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage(
$data,
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);

$channel->basic_publish($msg, '', 'task_queue');

echo '[x] Sent ', $data, "\n";

$channel->close();
$connection->close();
worker.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?php

require_once __DIR__ . '/../vendor/autoload.php';

// 连接RabbitMQ
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建一个通道
$channel = $connection->channel();
// 声明一个队列
$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
// 假装耗时任务,一个.代表1秒
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
// 消费者发送回一个确认
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
$channel->wait();
}

$channel->close();
$connection->close();

开启两个worker:

1
2
$ php WorkQueues/worker.php
[*] Waiting for messages. To exit press CTRL+C
1
2
$ php WorkQueues/worker.php
[*] Waiting for messages. To exit press CTRL+C

发送任务:

1
2
3
4
$ php WorkQueues/new_task.php "第一个任务耗时5秒....."
[x] Sent 第一个任务耗时5秒.....
$ php WorkQueues/new_task.php "第二个任务耗时5秒....."
[x] Sent 第二个任务耗时5秒.....

这时候第1个worker开始工作,第2个任务进来之后,就循环顺延到下一个worker

1
2
3
4
$ php WorkQueues/worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received 第一个任务耗时5秒.....
[x] Done
1
2
3
4
$ php WorkQueues/worker.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received 第二个任务耗时5秒.....
[x] Done
3、Publish/Subscribe

发布/订阅模式:发送端发送广播消息,多个接收端接收

python-three

receive_logs.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php

require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
$channel->wait();
}

$channel->close();
$connection->close();
emit_log.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "info: Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();

开启两个消费者:

1
2
$ php PublishSubscribe/receive_logs1.php
[*] Waiting for logs. To exit press CTRL+C
1
2
$ php PublishSubscribe/receive_logs2.php
[*] Waiting for logs. To exit press CTRL+C

发送任务:

1
2
$ php PublishSubscribe/emit_log.php "创建日志"
[x] Sent 创建日志

这时候生产者讲任务推给了交换机,由交换机将数据发送给与之绑定的队列

1
2
3
$ php PublishSubscribe/receive_logs1.php
[*] Waiting for logs. To exit press CTRL+C
[x] 创建日志
1
2
3
$ php PublishSubscribe/receive_logs2.php
[*] Waiting for logs. To exit press CTRL+C
[x] 创建日志

简单解释:可以将消息发送给不同类型的消费者。做到发布一次,消费多个

  • todo 下面的介绍,留住后面慢慢消化一下
4、Routing

按路线发送接收:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。

python-four

5、Topics

topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中

python-five

6、RPC

python-six

实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。

7、Publisher Confirms

发布者确认 是实现可靠发布的RabbitMQ扩展。在通道上启用发布者确认后,代理将异步确认客户端发布的消息,这意味着它们已在服务器端处理。

示例代码

https://github.com/lihq1403/gadget/tree/master/RabbitMQ

延时队列实现

Rabbitmq默认没有支持延迟队列,查阅了一些资料发现,是可以通过两种方式实现

  1. TTL和死信队列
  2. rabbitmq_delayed_message_exchange 插件
TTL和死信队列实现方式

参考文章:https://blog.csdn.net/u011069013/article/details/107079470/

send.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<?php

<?php

require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Message\AMQPMessage;

// 连接RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建一个通道
$channel = $connection->channel();

$exchange_name = 'test_exchange';
$queue_name = 'test_queue';

// 定义默认的交换器
$channel->exchange_declare($exchange_name, 'topic', false, true, false);
// 定义延迟交换器
$channel->exchange_declare('delayed_exchange', 'topic', false, true, false);

//定义延迟队列
$channel->queue_declare('delayed_queue', false, true, false, false, false, new AMQPTable(array(
"x-dead-letter-exchange" => "delayed_exchange",
"x-dead-letter-routing-key" => "delayed_exchange",
"x-message-ttl" => 5000, //5秒延迟
)));
//绑定延迟队列到默认队列上
$channel->queue_bind('delayed_queue', $exchange_name);

// 声明一个队列
$channel->queue_declare($queue_name, false, true, false, false, false);
//绑定正常消费队列到延迟交换器上
$channel->queue_bind($queue_name, 'delayed_exchange', 'delayed_exchange');

$nowTime = date('H:i:s');
$msg = new AMQPMessage('Hello World 发送时间:'. $nowTime, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$channel->basic_publish($msg, $exchange_name);

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();
receive.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php

require_once __DIR__ . '/../vendor/autoload.php';

// 连接RabbitMQ
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建一个通道
$channel = $connection->channel();

$queue_name = 'test_queue';

// 声明一个队列
$channel->queue_declare($queue_name, false, true, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function ($msg) {
echo " [x] Received ", $msg->body, ' 接受时间:', date('H:i:s'), "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
$channel->wait();
}

发送测试:

1
2
$ php HelloWord/send.php
[x] Sent 'Hello World!'

接受测试:

1
2
3
$ php HelloWord/receive.php
[*] Waiting for messages. To exit press CTRL+C
[x] Received Hello World 发送时间:21:19:06 接受时间:21:19:11