ThinkPHP5中使用RabbitMQ发送/接受消息

如果是在本地运行,相关扩展和依赖的安装请看之前的文章:
Windows下使用PHP操作RabbitMQ并实现消费者离线自动删除

在tp5中,需要安装amqp包

composer require php-amqplib/php-amqplib

写一个发布者请求,往rabbitmq服务器发送数据.
app\controller\Test.php

<?php
namespace app\controller;
 
use PhpAmqpLib\Message\AMQPMessage;
 
use app\Library\RabbimtMQConnection;
 
class Test
{
    public function index()
    {
 
        $Rc = new RabbimtMQConnection();
        list($connection, $channel) = $Rc->getConnection();
 
        $channel->queue_declare('hello', false, false, false, false);
 
        $msg = new AMQPMessage("Hello RabbitMQ");
 
        $channel->basic_publish($msg, '', 'hello');
 
        echo "[x] send 'hello world\n' ";
        $channel->close();
 
        $connection->close();
    }
}

rabbitmq 连接工具类 : app\Library\RabbitMQConnection.php

<?php
namespace app\Library;
 
use PhpAmqpLib\Connection\AMQPStreamConnection;
 
 
class RabbimtMQConnection
{
    public function __construct()
    {
        // TODO Something
    }
 
    public function getConnection()
    {
        try {
            $connection = new AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
            $channel = $connection->channel();
 
            return [$connection, $channel];
        } catch (\Exception $e) {
            return [null, null];
        }
 
    }
 
    public function closeConnectionAndChanel($channel, $connection)
    {
        $channel->close();
        $connection->close;
    }
}

设置请求路由

Route::get("send", 'test/index');

都配置好之后,发起一个请求.

看到这一句话后,到rabbitmq管理网站上看队列里面有没有数据

接下来我们就要消费这个数据

因为消费数据是需要不断监听rabbitmq服务器传过来的数据,所有我选择有执行命令行来监听服务器。后期就可以执行这个命令脚本了。

app\Command\Recevie.php

<?php
declare (strict_types=1);
 
namespace app\command;
 
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
 
use PhpAmqpLib\Connection\AMQPStreamConnection;
use app\Library\RabbimtMQConnection;
use PhpAmqpLib\Message\AMQPMessage;
 
class Receive extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('receive')
            ->setDescription('rabbitmq 消费队列');
    }
 
    protected function execute(Input $input, Output $output)
    {
        // 指令输出
        $output->writeln("RabbitMQ 消费队列开始启动……\n");
 
        $Rc = new RabbimtMQConnection();
        list($connection, $channel) = $Rc->getConnection();
        $output->writeln("RabbitMQ 创建通道成功……\n");
        $channel->queue_declare('hello', false, false, false, false);
 
 
        $callback = function ($msg) use ($output) {
            $output->writeln("通过RabbitMQ获取到消费者了,该条消息是: $msg->body \n");
        };
 
        $channel->basic_consume('hello', '', false, true, false, false, $callback);
 
 
        while ($channel->is_open()) {
            $channel->wait();
        }
    }
}

这就是tp5环境下一个完整的rabbitmq的hello world.

另外记录下项目中实际用到的,给指定rabbitMQ队列下的所有queue发消息与接收消息的代码

生产者 – 发送消息:

<?php

namespace app\index\controller;

use app\common\controller\Frontend;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class Index extends Frontend
{
    public function index()
    {
        $this->view->engine->layout(false);
        return $this->fetch('index/index');
    }

    public function map()
    {
        $this->view->engine->layout(false);
        return $this->fetch('index/map');
    }

    // 接收前端发来的坐标数据
    public function dataReception(){
        if ($this->request->isPost()) {
            $param = $this->request->post();
            $msg = time() . mt_rand(1, 50) . ',' . $param['msg'];
            $res = $this->sendMQ($msg);

            if(empty($res->body)){
                return json(['code' => 500, 'msg' => '消息发送失败', 'data' => $msg]);
            }else{
                return json(['code' => 200, 'msg' => '消息发送成功', 'data' => $res->body]);
            }
        }else{
            return json(['code' => 200, 'msg' => '请求方式错误']);
        }
    }

    // 发送rabbitMQ消息
    public function sendMQ($data)
    {
        try {
            $connection = new AMQPStreamConnection('192.168.124.73', '5672', 'admin', 'admin', '/');

            $channel = $connection->channel();

            $ex_name = 'test';
            // 创建交换机
            $channel->exchange_declare($ex_name,'fanout',false,true,false);


            $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage:: DELIVERY_MODE_PERSISTENT ]);

            $channel->basic_publish($msg, $ex_name,'');

            $channel->close();

            $connection->close();
        }catch (\Exception $e){
            return $e;
        }
        return $msg;
    }

}

消费者 – 接收消息:

<?php

namespace app\admin\command;

use think\console\Command;
use think\console\Input;
use think\console\Output;

use PhpAmqpLib\Connection\AMQPStreamConnection;


class Receive extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('receive')
            ->setDescription('rabbitmq 消费队列');
    }

    protected function execute(Input $input, Output $output)
    {
        $ex_name = 'test';

        // 指令输出
        $output->writeln("RabbitMQ 消费队列开始启动……\n");

        $connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/');
        $channel = $connection->channel();
        $channel->exchange_declare($ex_name,'fanout',false,true,false);
        $output->writeln("RabbitMQ 创建通道成功……\n");
        $channel->queue_declare('', false, true, false, true);
//            $channel->queue_declare(
//                'test', #队列名称
//                false, #被动模式,true=>如果队列不存在,返回错误。false=>不存在则创建,返回成功。
//                true,#消息持久化
//                false,#排他行,true=>只能本次连接中使用,连接关闭时自动消亡(durable为true也不起所用)
//                true,#自动删除,队列无订阅者时,会自动消亡
//                false,#异步执行 true不等待队列创建结果,立即完成函数调用
//                [],#arguments 设置消息队列的额外参数,如存活时间
//                null #传0或者null即可
//            );
        $channel->queue_bind('',$ex_name,'');

        $callback = function ($msg) use ($output) {
            $output->writeln("通过RabbitMQ获取到消费者了,该条消息是: $msg->body \n");
        };

        $channel->basic_consume('', '', false, true, false, false, $callback);


        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

}

-End-

风影OvO

风影OvO, 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA 4.0协议进行授权 | 转载请注明原文链接

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐