如果是在本地运行,相关扩展和依赖的安装请看之前的文章:
Windows下使用PHP操作RabbitMQ并实现消费者离线自动删除
在tp5中,需要安装amqp包
composer require php-amqplib/php-amqplib
写一个发布者请求,往rabbitmq服务器发送数据.
app\controller\Test.php
<?php namespace app\controller; use PhpAmqpLib\Message\AMQPMessage; use app\Library\RabbimtMQConnection; class Test { public function index() { $Rc = new RabbimtMQConnection(); list($connection, $channel) = $Rc->getConnection(); $channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage("Hello RabbitMQ"); $channel->basic_publish($msg, '', 'hello'); echo "[x] send 'hello world\n' "; $channel->close(); $connection->close(); } }
rabbitmq 连接工具类 : app\Library\RabbitMQConnection.php
<?php namespace app\Library; use PhpAmqpLib\Connection\AMQPStreamConnection; class RabbimtMQConnection { public function __construct() { // TODO Something } public function getConnection() { try { $connection = new AMQPStreamConnection('localhost', '5672', 'guest', 'guest'); $channel = $connection->channel(); return [$connection, $channel]; } catch (\Exception $e) { return [null, null]; } } public function closeConnectionAndChanel($channel, $connection) { $channel->close(); $connection->close; } }
设置请求路由
Route::get("send", 'test/index');
看到这一句话后,到rabbitmq管理网站上看队列里面有没有数据
接下来我们就要消费这个数据
因为消费数据是需要不断监听rabbitmq服务器传过来的数据,所有我选择有执行命令行来监听服务器。后期就可以执行这个命令脚本了。
app\Command\Recevie.php
<?php declare (strict_types=1); namespace app\command; use think\console\Command; use think\console\Input; use think\console\input\Argument; use think\console\input\Option; use think\console\Output; use PhpAmqpLib\Connection\AMQPStreamConnection; use app\Library\RabbimtMQConnection; use PhpAmqpLib\Message\AMQPMessage; class Receive extends Command { protected function configure() { // 指令配置 $this->setName('receive') ->setDescription('rabbitmq 消费队列'); } protected function execute(Input $input, Output $output) { // 指令输出 $output->writeln("RabbitMQ 消费队列开始启动……\n"); $Rc = new RabbimtMQConnection(); list($connection, $channel) = $Rc->getConnection(); $output->writeln("RabbitMQ 创建通道成功……\n"); $channel->queue_declare('hello', false, false, false, false); $callback = function ($msg) use ($output) { $output->writeln("通过RabbitMQ获取到消费者了,该条消息是: $msg->body \n"); }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while ($channel->is_open()) { $channel->wait(); } } }
这就是tp5环境下一个完整的rabbitmq的hello world.
另外记录下项目中实际用到的,给指定rabbitMQ队列下的所有queue发消息与接收消息的代码
生产者 – 发送消息:
<?php namespace app\index\controller; use app\common\controller\Frontend; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class Index extends Frontend { public function index() { $this->view->engine->layout(false); return $this->fetch('index/index'); } public function map() { $this->view->engine->layout(false); return $this->fetch('index/map'); } // 接收前端发来的坐标数据 public function dataReception(){ if ($this->request->isPost()) { $param = $this->request->post(); $msg = time() . mt_rand(1, 50) . ',' . $param['msg']; $res = $this->sendMQ($msg); if(empty($res->body)){ return json(['code' => 500, 'msg' => '消息发送失败', 'data' => $msg]); }else{ return json(['code' => 200, 'msg' => '消息发送成功', 'data' => $res->body]); } }else{ return json(['code' => 200, 'msg' => '请求方式错误']); } } // 发送rabbitMQ消息 public function sendMQ($data) { try { $connection = new AMQPStreamConnection('192.168.124.73', '5672', 'admin', 'admin', '/'); $channel = $connection->channel(); $ex_name = 'test'; // 创建交换机 $channel->exchange_declare($ex_name,'fanout',false,true,false); $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage:: DELIVERY_MODE_PERSISTENT ]); $channel->basic_publish($msg, $ex_name,''); $channel->close(); $connection->close(); }catch (\Exception $e){ return $e; } return $msg; } }
消费者 – 接收消息:
<?php namespace app\admin\command; use think\console\Command; use think\console\Input; use think\console\Output; use PhpAmqpLib\Connection\AMQPStreamConnection; class Receive extends Command { protected function configure() { // 指令配置 $this->setName('receive') ->setDescription('rabbitmq 消费队列'); } protected function execute(Input $input, Output $output) { $ex_name = 'test'; // 指令输出 $output->writeln("RabbitMQ 消费队列开始启动……\n"); $connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/'); $channel = $connection->channel(); $channel->exchange_declare($ex_name,'fanout',false,true,false); $output->writeln("RabbitMQ 创建通道成功……\n"); $channel->queue_declare('', false, true, false, true); // $channel->queue_declare( // 'test', #队列名称 // false, #被动模式,true=>如果队列不存在,返回错误。false=>不存在则创建,返回成功。 // true,#消息持久化 // false,#排他行,true=>只能本次连接中使用,连接关闭时自动消亡(durable为true也不起所用) // true,#自动删除,队列无订阅者时,会自动消亡 // false,#异步执行 true不等待队列创建结果,立即完成函数调用 // [],#arguments 设置消息队列的额外参数,如存活时间 // null #传0或者null即可 // ); $channel->queue_bind('',$ex_name,''); $callback = function ($msg) use ($output) { $output->writeln("通过RabbitMQ获取到消费者了,该条消息是: $msg->body \n"); }; $channel->basic_consume('', '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } } }
-End-