目录
在使用 RabbitMQ 之前,你要安装好 RabbitMQ 服务,具体安装方法可以参考 windows下安装RabbitMQ
1、安装扩展
进入TP5 更目录下,输入命令安装:
composer require php-amqplib/php-amqplib
2、自定义命令
TP5 的自定义命令,这里也简单说下。
第一步:
创建命令类文件,新建 application/api/command/Test.php。
<?php namespace app\api\command; use think\console\Command; use think\console\Input; use think\console\Output; /** * 自定义命令测试 */ class Test extends Command { /** * 配置 */ protected function configure() { // 设置命令的名称和描述 $this->setName('test')->setDescription('这是一个测试命令'); } /** * 执行 */ protected function execute(Input $input, Output $output) { $output->writeln("测试命令"); } }
这个文件定义了一个叫test的命令,备注为 这是一个测试命令,执行命令会输出:test command。
第二步:
配置 command.php文件,在 application/command.php文件中添加命令。
<?php return [ 'app\api\command\Test', ];
第三步:
测试命令,在项目根目录下输入命令:
php think test
回车运行之后输出:
test command
到这里,自定义命令就结束了,test命令就自定义成功了。
3、rabbitmq服务端
下来我们自定义 RabbitMQ 启动命令,守护进程运行,启动 rabbirmq 服务端接收消息。
在 application/api/command 目录下,新建 Ramq.php 文件,在执行命令的方法中,调用 RabbitMQ 启动守护进程方法即可。
<?php namespace app\api\command; use PhpAmqpLib\Connection\AMQPStreamConnection; use think\console\Command; use think\console\Input; use think\console\Output; /** * RabbitMq 启动命令 */ class Ramq extends Command { protected $consumerTag = 'customer'; protected $exchange = 'xcuser'; protected $queue = 'xcmsg'; protected function configure() { $this->setName('ramq')->setDescription('rabbitmq'); } protected function execute(Input $input, Output $output) { $output->writeln("消息队列开始"); $this->start(); // 指令输出 $output->writeln('消费队列结束'); } /** * 关闭 */ function shutdown($channel, $connection) { $channel->close(); $connection->close(); } /** * 回调处理信息 */ function process_message($message) { if ($message->body !== 'quit') { echo $message->body; } //手动应答 $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); if ($message->body === 'quit') { $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); } } /** * 启动 守护进程运行 */ public function start() { $host = '127.0.0.1'; $port = 5672; $user = 'guest'; $pwd = 'guest'; $vhost = '/'; $connection = new AMQPStreamConnection($host, $port, $user, $pwd, $vhost); $channel = $connection->channel(); $channel->queue_declare($this->queue, false, true, false, false); $channel->exchange_declare($this->exchange, 'direct', false, true, false); $channel->queue_bind($this->queue, $this->exchange); $channel->basic_consume($this->queue, $this->consumerTag, false, false, false, false, array($this, 'process_message')); register_shutdown_function(array($this, 'shutdown'), $channel, $connection); while (count($channel->callbacks)) { $channel->wait(); } } }
在application/command.php文件中,添加rabbitmq自定义命令。
return [ 'app\api\command\Ramq',// rabbitmq ];
4、发送端
最后,我们再写发送消息的控制器,实现消息队列,具体代码如下:
<?php namespace app\api\controller; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use think\Controller; /** * 发送端 */ class MessageQueue extends Controller { const exchange = 'xcuser'; const queue = 'xcmsg'; /** * 发送消息 */ public function pushMessage($data) { $host = '127.0.0.1'; $port = 5672; $user = 'guest'; $pwd = 'guest'; $vhost = '/'; $connection = new AMQPStreamConnection($host, $port, $user, $pwd, $vhost); $channel = $connection->channel(); $channel->exchange_declare(self::exchange, 'direct', false, true, false); $channel->queue_declare(self::queue, false, true, false, false); $channel->queue_bind(self::queue, self::exchange); $messageBody = $data; $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); $channel->basic_publish($message, self::exchange); $channel->close(); $connection->close(); echo 'ok'; } /** * 执行 */ public function index() { $data = json_encode(['msg' => '测试数据', 'id' => '15']); $this->pushMessage($data); } }
5、验证
先执行自定义命令,启动 rabbitmq 守护进程。在项目更目录下打开命令行,输入下面命令:
php think ramq
然后在浏览器访问发送信息的方法,http://你的域名/api/message/index,你发送一次消息,在命令行就会输出一条消息。这样我们就用 RabbitMQ 实现了一个简单的消息队列。