rabbitMQ消息队列 – php案例

Server 林涛 9002℃ 0评论

###实战模拟
(一般教程都是单独讲,这里就一步KO..)

  1. 程序监控,会监控.PHP+MYSQL的错误信息
    消息中含有关键点.有警告性错误warning,严重错误error,错误的类别,是PHP.还是MYSQL

  2. 要求中间可以加入任何一个错误处理程序,对错误进行处理.实现可扩展(要求能可以接入新消费者)

  3. 要求对MYSQL的ERROR错误发钉钉(获得固定值)

  4. 要求独立统计MYSQL所有错误和警告(获得MYSQL)

  5. 要求对所有错误信息.进行一个归档存储 (获得全部)

  6. 系统还写了另外一个服务器监控程序.相关消息也需要做归档存储.
    需求图看起来应该是这样子的.

接下来做代码演练,令人欣慰的一点,rabbitMQ支持中文的队列和交换机名.
首先在回忆下交换机三种类型.

类型 功能 效率
Fanout 群发
Direct 完全匹配
Topic 通配匹配
  1. 首先我们要创建程序监控,和服务器监控.两个交换机,类型为Fanout
    这样就能够让所有绑定进来的队列收到消息.其他参数默认
$channel->exchange_declare('程序监控' , 'fanout');
$channel->exchange_declare('服务器监控', 'fanout');
  1. 接下来创建通配交换机,因为通配交换机只接收程序监控交换机的数据,即需要设置内部使用.防止乱入
 $channel->exchange_declare('程序监控通配转发, 'Topic',false,false,true,true);
  1. 两个交换机绑定,这样程序监控的消息会发给程序通配转发交换机一份.
 $channel->exchange_bing('程序监控通配转发, '程序监控');
  1. 接下来创建队列
$channel->queue_declare('归档存储');
$channel->queue_declare('MYSQL错误统计');
$channel->queue_declare('钉钉提示');
  1. 队列绑定
//归档存储需要获取所有错误,所以直接绑定程序监控
$channel->queue_bind('归档存储','程序监控');
//一个队列是可以绑定多个交换机的消息
$channel->queue_bind('归档存储','服务器监控');
//通配方式.所有MYSQL.开头的,其中#代表必然存在,如果发送的路由名是MYSQL而不是MYSQL.ERROR会收不到,如果想收到可以使用MYSQL.*
$channel->queue_bind('MYSQL错误统计','程序监控通配转发','MYSQL.#');
//完全匹配处理.如果所有的绑定都是完全匹配的,则应该使用Direct交换机提高效率
$channel->queue_bind('钉钉提示','程序监控通配转发','MYSQL.ERROR');

 

完整的生产者代码.并生产出消息.

<?php
//引入composer代码加载器
require 'vendor/autoload.php';
//引入链接类
use PhpAmqpLib\Connection\AMQPStreamConnection;
//引入消息类
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.167', 5672, 'guest', 'guest', '/');
//通过链接获得一个新通道.
$channel = $connection->channel();

$channel->exchange_declare('程序监控' , 'fanout');
$channel->exchange_declare('服务器监控', 'fanout');
$channel->exchange_declare('程序监控通配转发', 'topic',false,false,true,true);
$channel->exchange_bind('程序监控通配转发', '程序监控');

$channel->queue_declare('归档存储');
$channel->queue_declare('MYSQL错误统计');
$channel->queue_declare('钉钉提示');

$channel->queue_bind('归档存储','程序监控');
$channel->queue_bind('归档存储','服务器监控');
$channel->queue_bind('MYSQL错误统计','程序监控通配转发','MYSQL.#');
$channel->queue_bind('钉钉提示','程序监控通配转发','MYSQL.ERROR');

$channel->basic_publish(new AMQPMessage('PHP警告'),'程序监控','PHP.WARNING');
$channel->basic_publish(new AMQPMessage('PHP错误'),'程序监控','PHP.ERROR');
$channel->basic_publish(new AMQPMessage('MYSQL警告'),'程序监控','MYSQL.WARNING');
$channel->basic_publish(new AMQPMessage('MYSQL错误'),'程序监控','MYSQL.ERROR');
$channel->basic_publish(new AMQPMessage('服务器错误'),'服务器监控','SERVER.ERROR');

//关闭通道
$channel->close();
//关闭链接
$connection->close();

消费者

<?php
//引入composer代码加载器
require 'vendor/autoload.php';
//引入链接类
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.167', 5672, 'guest', 'guest', '/');
//通过链接获得一个新通道.
$channel = $connection->channel();

$channel->exchange_declare('程序监控' , 'fanout');
$channel->exchange_declare('服务器监控', 'fanout');
$channel->exchange_declare('程序监控通配转发', 'topic',false,false,true,true);
$channel->exchange_bind('程序监控通配转发', '程序监控');

$channel->queue_declare('归档存储');
$channel->queue_declare('MYSQL错误统计');
$channel->queue_declare('钉钉提示');

$channel->queue_bind('归档存储','程序监控');
$channel->queue_bind('归档存储','服务器监控');
$channel->queue_bind('MYSQL错误统计','程序监控通配转发','MYSQL.#');
$channel->queue_bind('钉钉提示','程序监控通配转发','MYSQL.ERROR');



$channel->basic_consume("归档存储", "", false, false, false, false, 
    function ($message)
    {
        var_dump(iconv('utf-8','gbk','归档存储'.$message->body));
    }
    );
$channel->basic_consume("MYSQL错误统计", "", false, false, false, false, 
    function ($message)
    {
        var_dump(iconv('utf-8','gbk','MYSQL错误统计'.$message->body));
    }
    );
$channel->basic_consume("钉钉提示", "", false, false, false, false, 
    function ($message)
    {
        var_dump(iconv('utf-8','gbk','钉钉提示'.$message->body));
    }
    );
while (count($channel->callbacks)) {
    $channel->wait();
}
//关闭通道
$channel->close();
//关闭链接
$connection->close();

执行结果如下.

##精简代码
因为代码定义交换机,队列以及绑定.默认都是auto_delete并且是并且不是持久的.所以客户端和服务端的代码.都需要增加上整个交换过程的定义,如果系统确定了固定的队列结构.个人认为可以单独编写安装代码创建结构.并持久化
代码如下
install.php

<?php
//引入composer代码加载器
require 'vendor/autoload.php';
//引入链接类
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.167', 5672, 'guest', 'guest', '/');
//通过链接获得一个新通道.
$channel = $connection->channel();

//$channel->exchange_declare('程序监控' , 'fanout',false,true,false);
//$channel->exchange_declare('服务器监控', 'fanout',false,true,false);
//$channel->exchange_declare('程序监控通配转发', 'topic',false,true,false,true);
//$channel->exchange_bind('程序监控通配转发', '程序监控');
//关闭通道
$channel->close();
//关闭链接
$connection->close();

我们执行看一下效果
注意:因为之前创建的是自动删除.而生产者发送消息后就断开连接了.而消费者断开连接之后,整个消息队列就没有任何访问者.就开始了整个自动删除掉,所以创建可以成功.如果要是队列或者交换机存在的情况下,必须先将其删除在进行创建.
>[warning] 注意:因为之前创建的是自动删除.而生产者发送消息后就断开连接了.而消费者断开连接之后,整个消息队列就没有任何访问者.就开始了整个自动删除掉,所以创建可以成功.如果要是队列或者交换机存在的情况下,必须先将其删除在进行创建,貌似队列中有消息没有消费.也不会被自动删除,所以可能需要手动删下.

同时删除掉消费者和生产者相关的定义与绑定的代码.

如需转载请注明: 转载自26点的博客

本文链接地址: rabbitMQ消息队列 – php案例

转载请注明:26点的博客 » rabbitMQ消息队列 – php案例

喜欢 (0)
发表我的评论
取消评论

表情