本篇文章将说明如何在Windows
环境下使用PHP
操作RabbitMQ
一、下载Erlang和RabbitMQ
安装RabbitMQ前需要安装Erlang环境
下载之前先去查看RabbitMQ
和Erlang
相互兼容的版本:https://www.rabbitmq.com/which-erlang.html
Erlang
下载地址:https://www.erlang.org/downloads
RabbitMQ
下载地址:https://www.rabbitmq.com/install-windows.html#installer
二、安装Erlang和RabbitMQ
1.安装Erlang
双击exe文件默认选项安装即可(我这里换了安装目录)
环境变量配置
新建ERLANG_HOME
环境变量
变量值输入为Erlang
安装根目录,我这里是D盘。
默认安装C盘的,输入C:\Program Files\erl-23.0
即可
在PATH
环境变量中添加Erlang
的bin
目录
双击Path
环境变量
点击新建,输入%ERLANG_HOME%\bin
1.1.Erlang安装验证
打开cmd
窗口,输入erl
,看到版本号就说明Erlang
安装成功了。
2.安装RabbitMQ
同样是双击exe默认选项安装即可(我这里改了安装目录)
安装完成后,以管理员身份打开cmd
窗口,进入到sbin
目录
//cd进入你的安装目录 cd C:\Program Files\RabbitMQ Server\rabbitmq_server\sbin
接着执行:
rabbitmq-plugins enable rabbitmq_management
进行安装
安装完成之后
安装完成后双击rabbitmq-server.bat
即可启动RabbitMQ
如果无法正常启动,点击开始菜单
找到RabbitMQ Service - start
右键->以管理员身份运行
成功运行
2.1.进入网页端
在浏览器输入:http://localhost:15672/
用户名:guest
密码:guest
至此,RabbitMQ就安装完成了。
三、安装php的amqp扩展
先用phpinfo()
查看php
版本信息
我这里的是PHP7.4
64位非线程安全版本
根据上面的信息去下载相应的amqp
版本:http://pecl.php.net/package/amqp/1.11.0/windows
下载完成后打开压缩包,我们只用到这两个dll文件
将php_amqp.dll
解压到D:\phpstudy_pro\Extensions\php\php7.4.3nts\ext
同时在php.ini
中添加如下代码:
[amqp] extension=php_amqp.dll
如果害怕以后找不到就添加在最后一行
然后将rabbitmq.4.dll
复制到php根目录D:\phpstudy_pro\Extensions\php\php7.4.3nts
同时修改Apache
配置文件httpd.conf
,添加如下代码:
# rabbitmq LoadFile "D:/phpstudy_pro/Extensions/php/php7.4.3nts/rabbitmq.4.dll"
如果你跟我一样用的Nginx
,也要一样修改Apache
的配置文件
最后重启环境再打开phpinfo()
看看是否已经加载了amqp
模块:
至此,所有准备工作都已完成。
四、PHP + RabbitMQ实例展示
新建rabbit_consumer.php
作为消费者
<?php //配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', //内部访问端口默认为5672, 外部(网页)访问端口默认为15672, 不要搞错了! 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交换机名 $q_name = 'q_linvo'; //队列名 $k_route = 'key_1'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //创建交换机 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()."\n"; //创建队列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 echo "Message Total:".$q->declare()."\n"; //绑定交换机与队列,并指定路由键 echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n"; //阻塞模式接收消息 echo "Message:\n"; while(True){ $q->consume('processMessage'); //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 } $conn->disconnect(); /** * 消费回调函数 * 处理消息 */ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //处理消息 $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 }
这里贴上一个附带消费者离线自动删除queue中记录功能的代码
//配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', //内部访问端口默认为5672, 外部(网页)访问端口默认为15672, 不要搞错了! 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'test'; //交换机名 $q_name = 'test_' . md5(uniqid() . mt_rand(1, 999999)); //生成唯一的队列名 // $k_route = 'testkey'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //创建交换机 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_FANOUT); //fanout类型 (广播) // $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 (严格指定) $ex->setFlags(AMQP_DURABLE); //持久化 | 离线删除 $ex->declareExchange(); //执行创建 // echo "Exchange Status:".$ex->declareExchange()."\n"; // echo "Exchange Status:".$ex->declare()."\n"; //创建队列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); //持久化 | 离线删除 $q->declareQueue(); //执行创建 $q->bind($e_name, $k_route);//绑定交换机与队列,并指定路由键 // echo "Message Total:".$q->declare()."\n"; //绑定交换机与队列,并指定路由键 // echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n"; //阻塞模式接收消息 echo "Message:\n"; while(True){ $q->consume('processMessage'); //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 } $conn->disconnect(); /** * 消费回调函数 * 处理消息 */ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //处理消息 $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 }
新建rabbit_publisher.php
作为生产者
<?php //配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', //内部访问端口默认为5672, 外部(网页)访问端口默认为15672, 不要搞错了! 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交换机名 //$q_name = 'q_linvo'; //无需队列名 $k_route = 'key_1'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //创建交换机对象 $ex = new AMQPExchange($channel); $ex->setName($e_name); date_default_timezone_set("Asia/Shanghai"); //发送消息 //$channel->startTransaction(); //开始事务 for($i=0; $i<5; ++$i){ sleep(1);//休眠1秒 //消息内容 $message = "TEST MESSAGE!".date("h:i:sa"); echo "Send Message:".$ex->publish($message, $k_route)."\n"; } //$channel->commitTransaction(); //提交事务 $conn->disconnect();
测试一下:
打开cmd
,将目录切换到这两个php文件所在文件夹
输入php rabbit_consumer.php
运行消费者。
再另开一个cmd
窗口,将目录切换到这两个php文件所在文件夹
输入php rabbit_publisher.php
运行生产者。
消费者接收到消息:
这样就模拟了最简单的一个队列对消息的处理。
另外贴一下官网的PHP使用指南:https://www.rabbitmq.com/tutorials/tutorial-one-php.html
-End-