IE盒子

搜索
查看: 150|回复: 1

PHP使用消息队列

[复制链接]

1

主题

8

帖子

14

积分

新手上路

Rank: 1

积分
14
发表于 2023-1-18 11:46:17 | 显示全部楼层 |阅读模式
前言

我们在做RPC(远程过程调用)服务时一般多采用同步方式,在高并发环境下,由于业务逻辑复杂将会导致大量的请求被堵塞,通过使用消息队列,我们可以异步来处理业务,从而缓解系统的压力;另一方面,当我们不同的应用之间需要进行通信却找不到合适的渠道时,通过使用消息队列,我们可以在不同的应用间进行通信。

消息队列

首先我们认识一下队列:
队列是一种线性表,在表的后端进行插入元素,在表的前端进行推出元素,按照先进先出的原则。
例如使用PHP的数组函数array_push、array_shift或者SplQueue类都可以实现一个队列。
使用PHP数组函数实现队列或栈:
<?php

class QueueArray
{
    public $arr = array();
   
    public function tailEnqueue($val)
    {
        return array_push($this->arr, $val); // 队尾入队
    }
   
    public function tailDequeue()
    {
        return array_pop($this->arr); // 队尾出队
    }
   
    public function headEnqueue($val)
    {
        return array_unshift($this->arr, $val); // 队首入队
    }
   
    public function headDequeue()
    {
        return array_shift($this->arr); //队首出队
    }
   
    public function length()
    {
        return count($this->arr); // 队列长度
    }
   
    public function head()
    {
        return reset($this->arr); // 获取队首元素
    }
   
    public function tail()
    {
        return end($this->arr); // 获取队尾元素
    }
   
    public function clear()
    {
        unset($this->arr); // 清空队列
        return true;
    }
}

再认识一下消息队列:
消息队列技术则是分布在应用间交互信息都一种技术,消息队列可驻留在内存或磁盘上,队列存储消息知道它们被应用程序读出。
从中我们可以得出消息队列一方面可以存储消息并提供服务,另一方面可供不同应用间通信。
对于存储消息并提供服务这方面,关键在于存储消息的媒介和提供服务的策略。
使用PHP和MYSQL仅供PHP使用的一个简单的消息队列类:
<?php

class QueueDataBase
{
    private $pdo;
   
    public function __construct()
    {
        $this->pdo = new PDO('mysql:host=localhost;dbname=test', 'root', '123456');
    }

    public function push($name, $data)
    {
        // 将队列数据插入数据表
        $sql = 'INSERT INTO `queue` (`name`, `data`, `is_popped`) VALUE (?, ?, ?)';
        $stmt = $this->pdo->prepare($sql);
        return $stmt->execute(array($name, $data, 0));
    }

    public function pop($name)
    {
        // 从数据表获取一条未被获取的队列数据
        $sql = 'SELECT `id`, `data` FROM `queue` WHERE `name` = ? AND `is_popped` = ? ORDER BY id ASC LIMIT 1 ';
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute(array($name, 0));
        $info = $stmt->fetch(PDO::FETCH_ASSOC);
        if ($info) {
            // 获取到队列数据则修改为已获取状态
            $sql = 'update `queue` set is_popped = ? where `id` = ?';
            $stmt = $this->pdo->prepare($sql);
            $stmt->execute(array(1, $info['id']));
            return $info;
        }
        return false;
    }
}
对于可供不同应用间通信这方面,一般市场上成熟的消息队列产品会使用TCP或HTTP等网络通信服务来供不同应用来进行通信,除此之外还会提供不同相应语言的SDK使用。

消息队列的两种模式:
1.点对点:Queue,不可重复消费:
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息,Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
2.发布/订阅:Topic,可以重复消费:
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

使用消息队列

市面上成熟的消息队列产品很多,具体可以看wikipedia的介绍:Message queue,下面介绍几款消息队列产品。

Redis

介绍:
使用Redis列表类型可以实现点对点的队列服务,除此之外,Redis提供了基本“发布/订阅”的消息机制。
使用场景:
点对点队列服务:数据被消费即消失,用于快速简单队列服务。
发布/订阅服务:由于无法实现消息堆积和回溯,不能保证安全性的简单发布/订阅服务。
PHP点对点生产者:
<?php

$redis = new Redis();
$redis->connect('127.0.0.1');
$name = 'flux';
$val = '放入第一个内容';
$redis->rPush($name, $val);
PHP点对点消费者:
<?php

$redis = new Redis();
$redis->connect('127.0.0.1');
$name = 'flux';
while (1) {
    $data = $redis->blPop(array($name), 10);
    if ($data == false) { // 没有数据
        sleep(1);
    } else { // 处理业务
        print_r($data);
    }
}
PHP发布/订阅发布者:
<?php

$redis = new Redis();
$redis->connect('127.0.0.1');
$channel = 'flux';
$message = '放入第一个内容';
$redis->publish($channel, $message);
PHP发布/订阅订阅者:
<?php

$redis = new Redis();
$redis->connect('127.0.0.1');
$channel = 'flux';
$redis->subscribe(array($channel), 'callback');
function callback($redis, $chan, $msg){
    echo $msg;
}

HTTPSQS

介绍:
HTTPSQS(HTTP Simple Queue Service)是一款基于 HTTP GET/POST 协议的轻量级开源简单消息队列服务,使用 Tokyo Cabinet 的 B+Tree Key/Value 数据库来做数据的持久化存储。
应用场景:
支持重发的(支持最大队列数据10亿条)简单消息队列服务。
服务器:
按照官网进行编译安装并启动:HTTPSQS
客户端:
官网文档客户端:HTTPSQS
PHP生产者:
<?php

include_once("httpsqs_client.php");

$httpsqs = new httpsqs('127.0.0.1');
$name = 'flux';
$val = '放入第一个内容';
$httpsqs->put($name, $val);
PHP消费者:
<?php

include_once("httpsqs_client.php");

$httpsqs = new httpsqs('127.0.0.1');
$name = 'flux';
while (1) {
    $data = $httpsqs->get($name);
    if ($data == false) { // 没有数据
        sleep(1);
    } else { // 处理业务
        print_r($data);
    }
}

Beanstalk

介绍:
一个高性能、轻量级的分布式内存队列系统。
核心概念:
job:一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube中。
tube:一个有名的任务队列,用来存储统一类型的job,是producer和consumer操作的对象。
producer:Job的生产者,通过put命令来将一个job放到一个tube中。
consumer:Job的消费者,通过reserve/release/bury/delete命令来获取job或改变job的状态。
生命周期:
一个job有READY, RESERVED, DELAYED, BURIED四种状态。当producer直接put一个job时,job就处于READY状态,等待consumer来处理,如果选择延迟put,job就先到DELAYED状态,等待时间过后才迁移到READY状态。consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其他的consumer就不能再操作该job。当consumer完成该job后,可以选择delete, release或者bury操作;delete之后,job从系统消亡,之后不能再获取;release操作可以重新把该job状态迁移回READY(也可以延迟该状态迁移操作),使其他的consumer可以继续获取和执行该job;有意思的是bury操作,可以把该job休眠,等到需要的时候,再将休眠的job kick回READY状态,也可以delete BURIED状态的job。
使用场景:
延时队列,定时任务,异步任务等更适合用作任务队列。
服务器:
按照官网进行编译安装并启动:Beanstalk
客户端:
官方推荐客户端:Client Libraries
本文PHP客户端:Minimalistic PHP client
PHP生产者:
<?php

require 'vendor/autoload.php';

use Beanstalk\Client;

$defaults = [
    'persistent' => true, // 长连接
    'host' => '127.0.0.1',
    'port' => 11300,
    'timeout' => 1, // 连接超时时间
    'logger' => null
];

$beanstalk = new Client($defaults);
$beanstalk->connect();
$beanstalk->useTube('flux'); // 使用 tube `'flux'`.
$beanstalk->put(
    23, // 优先级
    0,  // 延时
    60, // 任务处理时间
    '放入第一内容' // 数据
);
$beanstalk->disconnect();
PHP消费者:
<?php

require 'vendor/autoload.php';

use Beanstalk\Client;

$defaults = [
    'persistent' => true, // 长连接
    'host' => '127.0.0.1',
    'port' => 11300,
    'timeout' => 1, // 连接超时时间
    'logger' => null
];

$beanstalk = new Client($defaults);
$beanstalk->connect();
$beanstalk->watch('flux');

while (true) {
    $job = $beanstalk->reserve(); // 堵塞直到有新的job,job结构:['id' => 123, 'body' => '放入第一个内容']

    // 业务处理job
    echo $job['body'];
    $result = false;

    if ($result) { // 处理成功,删除
        $beanstalk->delete($job['id']);
    } else { // 处理未成功,休眠,如防止规定时间后job未处理会变成ready被重复处理
        $beanstalk->bury($job['id'], 23);
    }
}

Kafka

介绍:
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
Kafka使用Group(消费者组)实现了消息队列的点对点和发布-订阅模式,所有消费者在一个组就成了点对点模式,所以消费者都在不同组里即成了发布-订阅模式。
核心概念:
producer:生产者,Kafka集群发送消息,需要指定Topic便于对消息进行分类
consumer:消费者,消费消息时需指定Topic,指定一个Group,即分组。
topic:消息以topic为类别记录,Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)。
broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
应用场景:
日志收集,消息系统,用户活动跟踪,运营指标,流式处理等。
服务器:
官方网址文档:Apache Kafka
客户端:
PHP客户端:安装rdkafka扩展:php-rdkafka
PHP生产者:
<?php

$rk = new RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1"); // 集群使用127.0.0.1,127.0.0.2的形式

$topic = $rk->newTopic("flux"); // 设置topic

for ($i = 0; $i < 5; $i++) {
    $topic->produce(
        RD_KAFKA_PARTITION_UA, // 分区,rd_kafka_partition_ua代表随机分区
        0, // 消息标志,始终为0
        '放入一个内容' . $i // 内容
    );
}
PHP消费者:
<?php

$conf = new RdKafka\Conf();
$conf->set('group.id', 'group1'); // 设置分组ID
$rk = new RdKafka\Consumer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf(); // 设置自动上传偏移量信息
$topicConf->set("auto.commit.interval.ms", 1e3);
$topicConf->set("offset.store.sync.interval.ms", 60e3);
$topic = $rk->newTopic("flux", $topicConf);

$topic->consumeStart(
    0, // 从哪个分区开始消费
    RD_KAFKA_OFFSET_STORED // 开始消费的偏移量,可选值:RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
);

while (true) {
    $msg = $topic->consume(
        0, // 分区
        1000 // 超时时间
    );
    if (empty($msg) || $msg->err) { // 当前队列中无数据
        sleep(1);
    } else { // 业务处理
        echo $msg->payload, "\n";
    }
}

RabbitMQ

介绍:
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP基本概念:



AMQP流程图

Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker。
Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange:交换器,用来接收Publisher发送的消息并将这些消息路由给服务器中的Queue。三种路由类型:

  • direct:消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。
  • fanout:每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。
  • topic:交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。两个通配符:符号“#”和符号“”。
Binding:Exchange和Queue之间的虚拟连接。Binding信息被保存到Exchange中的查询表中,用于Message的分发依据。
Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个Message可以投入一个或多个队列。
Connection:publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
应用场景:
异步处理,应用解耦,流量削峰等稳定性消息队列。
服务器:
官方网址:RabbitMQ
客户端:
PHP客户端:安装AMPQ扩展,AMQP扩展
PHP生产者:
<?php

// 生产者:创建连接-->创建channel-->创建交换机对象-->发送消息

// 创建连接
$config = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);
$conn = new AMQPConnection($config);
if (!$conn->connect()) {
    die("不能创建连接!\n");
}
// 创建信道
$channel = new AMQPChannel($conn);
// 创建交换机对象
$ex = new AMQPExchange($channel);
$exName = 'exName';
$ex->setName($exName); //设置交换机名称
$ex->setType(AMQP_EX_TYPE_DIRECT); // direct类型
$ex->setFlags(AMQP_DURABLE); // 持久化
$ex->declareExchange(); // 声明一个新交换机,如果已经存在,则不再需要声明

//发送消息
$route = 'key_1'; // 路由key
//$channel->startTransaction(); //开始事务
for($i=5; $i<10; ++$i){
    //sleep(1);//休眠1秒
    $ex->publish('放入一个内容' . $i, $route);
}
//$channel->commitTransaction(); //提交事务
$conn->disconnect();
PHP消费者:
<?php

// 消费者:创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息

// 创建连接
$config = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);
$conn = new AMQPConnection($config);
if (!$conn->connect()) {
    die("不能创建连接!\n");
}
// 创建信道
$channel = new AMQPChannel($conn);
// 创建交换机对象
$ex = new AMQPExchange($channel);
$exName = 'exName';
$ex->setName($exName); // 设置交换机名称
$ex->setType(AMQP_EX_TYPE_DIRECT); // direct类型
$ex->setFlags(AMQP_DURABLE); // 持久化
//$ex->declareExchange(); // 声明一个新交换机,如果已经存在,则不再需要声明

// 创建队列
$q = new AMQPQueue($channel);
$qName = 'qName';
$q->setName($qName); // 设置队列名称
$q->setFlags(AMQP_DURABLE); //持久化
$q->declareQueue(); // 声明一个新队列,如果已经存在,则不再需要声明

// 绑定交换机与队列,并指定路由键
$route = 'key_1'; // 路由key
$q->bind($exName, $route);

// 堵塞接受消息
while(True){
    $q->consume('processMessage');
    //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
}
$conn->disconnect();

/**
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $queue) {
    $msg = $envelope->getBody();
    echo $msg."\n"; //处理消息
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}

上述demo可以戳这里:demo

总结

上面介绍了几款消息队列的产品,除此之外,ActiveMQ、ZeroMQ等也是比较出名的消息队列产品,但是否需要使用到消息队列和产品的选型却是非常重要的,首先需要评估自己的业务功能和业务量来确定是否需要选择消息队列,不可盲目,否则只会变向增加开发成本,再者需要根据应用场景(语言、可靠性,扩展性等)来选择合适的消息队列(主流消息队列性能对比:性能比较)。
回复

使用道具 举报

2

主题

11

帖子

22

积分

新手上路

Rank: 1

积分
22
发表于 2025-6-16 19:32:49 | 显示全部楼层
这么强,支持楼主,佩服
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表