<?php
namespace app\controller;
use app\BaseController;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\exception\HttpException;
class Rabbit extends BaseController
{
protected $config = [];
protected $channel, $connection;
const EXCHANGE = 'human_exchange';
const DELAYED_EXCHANGE = 'human_delayed_exchange';
const QUEUE = 'human_queue';
const ROUTING_KEY = 'human';
const CONSUMER_TAG = 'client';
public function initialize()
{
$config = require __DIR__ . '/../../config/queue.php';
if (isset($config['RabbitMQ'])) {
$this->config = $config['RabbitMQ'];
}
if (empty($this->config)) {
throw new HttpException(500, 'Message queue config not exist: RabbitMQ');
}
parent::initialize();
}
/**
* 创建RabbitMQ连接和通道
* @return mixed
*/
protected function createExchangeAndQueue()
{
// 创建连接,指定IP、端口、账号、密码、虚拟主机
$connection = new AMQPStreamConnection($this->config['host'], $this->config['port'], $this->config['user'], $this->config['password'], $this->config['vhost']);
$channel = $connection->channel();
/**
* 创建交换机(如果有在后台创建,则无需这一步,下同)
* 参数一:指定交换机名称
* 参数二:指定交换机种类(direct:精准推送,fanout:广播推送,topic:组播推送,路由规则可以通配,headers:跟direct一样,性能很差,故很少用了)
* 参数三:是否检测同名交换机
* 参数四:是否开启交换机持久化
* 参数五:通道关闭后是否删除交换机
*/
$channel->exchange_declare(self::EXCHANGE, AMQPExchangeType::DIRECT, false, true, false);
// $channel->exchange_declare(self::EXCHANGE, 'x-delayed-message', false, true, false, false, false, ['x-delayed-type' => 'direct']);
// 创建队列,参数除了类型,其他跟创建交换机一致
list($queue, $messageCount, $consumerCount) = $channel->queue_declare(self::QUEUE, true, true, false, false);
// 通道绑定交换机跟队列,最后指定路由规则
$channel->queue_bind(self::QUEUE, self::EXCHANGE, self::ROUTING_KEY);
$this->connection = $connection;
$this->channel = $channel;
// 返回队列中的未消费的消息数量
return $messageCount;
}
/**
* 创建RabbitMQ连接和通道(延时队列)
*/
protected function createDelayedExchangeAndQueue()
{
// 创建连接,指定IP、端口、账号、密码、虚拟主机
$connection = new AMQPStreamConnection($this->config['host'], $this->config['port'], $this->config['user'], $this->config['password'], $this->config['vhost']);
$channel = $connection->channel();
/**
* 创建交换机(如果有在后台创建,则无需这一步,下同)
* 参数一:指定交换机名称
* 参数二:指定交换机种类(x-delayed-message:指定为延时交换机,需给RabbitMQ装rabbitmq_delayed_message_exchange插件来支持)
* 参数三:是否检测同名交换机
* 参数四:是否开启交换机持久化
* 参数五:通道关闭后是否删除交换机
* 最后参数中设置x-delayed-type交换机原型为direct
*/
$channel->exchange_declare(self::DELAYED_EXCHANGE, 'x-delayed-message', false, true, false, false, false, ['x-delayed-type' => ['S', 'direct']]);
// 创建队列,参数除了类型,其他跟创建交换机一致
$channel->queue_declare(self::QUEUE, false, true, false, false);
// 通道绑定交换机跟队列,最后指定路由规则
$channel->queue_bind(self::QUEUE, self::DELAYED_EXCHANGE, self::ROUTING_KEY);
$this->connection = $connection;
$this->channel = $channel;
}
/**
* 生产消息,并放入队列
* @return string
*/
public function pushMessage()
{
$this->createExchangeAndQueue();
if (!isset($this->connection) || !isset($this->channel)) {
throw new HttpException(500, 'RabbitMQ connection or channel not exist!');
}
$msgBody = json_encode(['name' => 'Neo', 'age' => 30, 'sex' => 'male']);
// 创建AMQP消息并放入指定的数据,注意第二个参数的delivery_mode为设置该消息持久化
$message = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 在通道中放入消息,并指定交换机跟路由规则
$this->channel->basic_publish($message, self::EXCHANGE, self::ROUTING_KEY);
$this->channel->close();
$this->connection->close();
return '消息已放入队列';
}
/**
* 生产消息,并延时放入队列
* @return string
*/
public function pushDelayedMessage()
{
$this->createDelayedExchangeAndQueue();
if (!isset($this->connection) || !isset($this->channel)) {
throw new HttpException(500, 'RabbitMQ connection or channel not exist!');
}
$msgBody = json_encode(['name' => 'Delay', 'age' => 5, 'sex' => 'male']);
// 创建AMQP消息并放入指定的数据,注意第二个参数的delivery_mode为设置该消息持久化,最后一个header则为设置5秒延时时间
$message = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['x-delay' => 5000])]);
// 在通道中放入消息,并指定交换机跟路由规则
$this->channel->basic_publish($message, self::DELAYED_EXCHANGE, self::ROUTING_KEY);
$this->channel->close();
$this->connection->close();
return '消息已延时放入队列';
}
/**
* 消费消息,并显示消息的结构
*/
public function consumeMessage()
{
$messageCount = $this->createExchangeAndQueue();
if (!isset($this->connection) || !isset($this->channel)) {
throw new HttpException(500, 'RabbitMQ connection or channel not exist!');
}
/**
* 默认情况下,队列会把消息公平的分配给各个消费者
* 如果某个消费者脚本处理完成分配给他的消息任务后,会一直空闲
* 另外一个消费者脚本处理的消息都非常耗时,这就容易导致消费者脚本得不到合理利用,
* 加入此句话,是告诉队列,取消把消息公平分配到各个脚本,而是那个脚本空闲,就交给它一个消息任务
* 这样,合理利用到每一个空闲的消费者脚本
*/
$this->channel->basic_qos(null, 1, null);
/**
* 从队列中读取数据
* 第四个参数 no_ack = false 时,表示进行ack应答,确保消息已经处理
* 最后一个参数则是回调函数,传入消息
*/
$this->channel->basic_consume(self::QUEUE, self::CONSUMER_TAG, false, false, false, false, function ($message) {
file_put_contents('message.txt', $message->body, FILE_APPEND);
// 处理完后通知队列可以删除消息了,如果no_ack = false时缺少这句,则队列不会删除已处理完的消息,当脚本挂掉时,会把分配给当前队列的所有消息再次重新分配给其他队列,会导致消息会重复处理,内存占用越来越高
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
});
// 监听消息,一有消息,立马就处理
/*while ($this->channel->is_consuming()) {
$this->channel->wait();
}*/
// 官方原代码中while监听会导致死循环,故处理之
for ($i = 1; $i <= $messageCount; $i++) {
if ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
$this->channel->close();
$this->connection->close();
}
}
RabbitMQ使用(配合ThinkPHP6框架)
评论 (暂无评论)