一起聊聊thinkphp6使用think-queue实现普通队列和延迟队列

发布时间 - 2022-04-20 00:00:00    点击率:

本篇文章给大家带来了关于thinkphp的相关知识,其中主要介绍了关于使用think-queue来实现普通队列和延迟队列的相关内容,think-queue是thinkphp官方提供的一个消息队列服务,下面一起来看一下,希望对大家有帮助。

推荐学习:《PHP视频教程》

###TP6 队列

TP6 中使用 think-queue 可以实现普通队列和延迟队列。

think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

  • 消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等
  • 队列的多队列, 内存限制 ,启动,停止,守护等
  • 消息队列可降级为同步执行

消息队列实现过程

1、通过生产者推送消息到消息队列服务中

2、消息队列服务将收到的消息存入redis队列中(zset)

3、消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条

4、处理获取下来的消息调用业务类进行处理相关业务

5、业务处理后,需要从队列中删除消息

composer 安装 think-queue

composer require topthink/think-queue

配置文件

安装完 think-queue 后会在 config 目录中生成 queue.php,这个文件是队列的配置文件。

tp6中提供了多种消息队列的实现方式,默认使用sync,我这里选择使用Redis。

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

创建目录及队列消费类文件

在 app 目录下创建 queue 目录,然后在该目录下新建一个抽象类 Queue.php 文件,作为基础类

getJobId(); // 队列的数据库id或者redis key
        // $jobClassName = $job->getName(); // 队列对象类
        // $queueName = $job->getQueue(); // 队列名称

        // 如果已经执行中或者执行完成就不再执行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }

        // 执行业务处理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任务执行成功后删除
            Cache::store('redis')->delete($jobId); // 删除redis中的缓存
        } else {
            // 检查任务重试次数
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));
                 // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行
                //$job->release(10); 
                // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
                //$job->failed();   
                // 第3种处理方式:删除任务
                $job->delete(); // 任务执行后删除
                Cache::store('redis')->delete($jobId); // 删除redis中的缓存
            }
        }
    }

    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任务执行的结果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查询redis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }

    /**
     * @describe: 根据消息中的数据进行实际的业务处理
     * @param $data 数据
     * @return bool 返回结果
     */
    abstract protected function execute($data): bool;}

所有真正的消费类继承基础抽象类

生产者逻辑

use think\facade\Queue;

// 普通队列生成调用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);

// 延时队列生成调用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延时队列 10 秒后执行:
Queue::later(10 , Test::class, $data, $queueName);

开启进程监听任务并执行

php think queue:listen
php think queue:work

命令模式介绍

命令模式

  • queue:work 命令

    work 命令: 该命令将启动一个 work 进程来处理消息队列。

    php think queue:work --queue TestQueue
  • queue:listen 命令

    listen 命令: 该命令将会创建一个 listen 父进程 ,然后由父进程通过 proc_open(‘php think queue:work’) 的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。

    php think queue:listen --queue TestQueue

命令行参数

  • Work 模式

    php think queue:work \
    --daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
    --queue  helloJobQueue  //要处理的队列的名称
    --delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
    --memory 128 \      //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
    --tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
  • Listen 模式

    php think queue:listen \
    --queue  helloJobQueue \   //监听的队列的名称
    --delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --memory 128 \       //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
    --tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
    --timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位

    可以看到 listen 模式下,不包含 --deamon 参数,原因下面会说明

  • 消息队列的开始,停止与重启

    • 开始一个消息队列:

      php think queue:work
    • 停止所有的消息队列:

      php think queue:restart
    • 重启所有的消息队列:

      php think queue:restart 
      php think queue:work

推荐学习:《PHP视频教程》


# thinkphp  # 默认为  # 重启  # 创建一个  # 抛出  # 配置文件  # 未被  # 重试  # 重发  # 下次  # 相关内容 


相关栏目: 【 网站优化151355 】 【 网络推广146373 】 【 网络技术251813 】 【 AI营销90571


相关推荐: 如何批量查询域名的建站时间记录?  edge浏览器无法安装扩展 edge浏览器插件安装失败【解决方法】  Windows家庭版如何开启组策略(gpedit.msc)?(安装方法)  实例解析angularjs的filter过滤器  Win11怎么更改系统语言为中文_Windows11安装语言包并设为显示语言  大连网站制作费用,大连新青年网站,五年四班里的视频怎样下载啊?  如何在 Go 中优雅地映射具有动态字段的 JSON 对象到结构体  如何在阿里云ECS服务器部署织梦CMS网站?  如何快速搭建支持数据库操作的智能建站平台?  香港服务器网站生成指南:免费资源整合与高速稳定配置方案  如何在万网利用已有域名快速建站?  Android中Textview和图片同行显示(文字超出用省略号,图片自动靠右边)  学生网站制作软件,一个12岁的学生写小说,应该去什么样的网站?  Laravel如何处理表单验证?(Requests代码示例)  html5源代码发行怎么设置权限_访问权限控制方法与实践【指南】  如何快速重置建站主机并恢复默认配置?  html5如何设置样式_HTML5样式设置方法与CSS应用技巧【教程】  奇安信“盘古石”团队突破 iOS 26.1 提权  Laravel如何使用withoutEvents方法临时禁用模型事件  Laravel如何使用Guzzle调用外部接口_Laravel发起HTTP请求与JSON数据解析【详解】  网站优化排名时,需要考虑哪些问题呢?  如何在沈阳梯子盘古建站优化SEO排名与功能模块?  如何在自有机房高效搭建专业网站?  Laravel如何自定义错误页面(404, 500)?(代码示例)  laravel怎么配置和使用PHP-FPM来优化性能_laravel PHP-FPM配置与性能优化方法  Laravel如何实现多级无限分类_Laravel递归模型关联与树状数据输出【方法】  Laravel如何使用API Resources格式化JSON响应_Laravel数据资源封装与格式化输出  Python文件异常处理策略_健壮性说明【指导】  如何为不同团队 ID 动态生成多个非值班状态按钮  Python文件操作最佳实践_稳定性说明【指导】  Laravel怎么实现验证码(Captcha)功能  如何在腾讯云免费申请建站?  如何快速生成专业多端适配建站电话?  零服务器AI建站解决方案:快速部署与云端平台低成本实践  做企业网站制作流程,企业网站制作基本流程有哪些?  java中使用zxing批量生成二维码立牌  如何在阿里云通过域名搭建网站?  成都网站制作公司哪家好,四川省职工服务网是做什么用?  Laravel如何使用Blade组件和插槽?(Component代码示例)  HTML透明颜色代码怎么让图片透明_给img元素加透明色的技巧【方法】  历史网站制作软件,华为如何找回被删除的网站?  Windows驱动无法加载错误解决方法_驱动签名验证失败处理步骤  PHP 500报错的快速解决方法  深圳网站制作设计招聘,关于服装设计的流行趋势,哪里的资料比较全面?  如何挑选优质建站一级代理提升网站排名?  如何在IIS7上新建站点并设置安全权限?  如何用手机制作网站和网页,手机移动端的网站能制作成中英双语的吗?  Linux虚拟化技术教程_KVMQEMU虚拟机安装与调优  Laravel Blade模板引擎语法_Laravel Blade布局继承用法  Laravel如何使用Telescope进行调试?(安装和使用教程)