Skip to content

任务队列TQ

更新: 2025/2/24 字数: 0 字 时长: 0 分钟

任务队列(Task Queue,简称TQ)是一种常见的工作分配机制,通常用于多线程或分布式计算环境中。它有助于有效地管理和调度各种工作任务,确保它们按照适当的顺序和优先级得到执行。

简单介绍

队列机制

在任务队列中,任务被添加到一个队列中,等待被执行。然后,一个或多个专门的职程(Worker)会一直监视队列,并在有新任务到来时取出任务并执行。这种机制可以很好地利用系统资源,确保任务得到及时处理,并且可以通过适当的调度算法来控制任务的执行顺序和优先级,以满足不同的需求。

应用场景

任务队列在各种场景下都有广泛的应用,比如在网络服务器中用于处理请求、在操作系统中用于管理进程和线程、在分布式系统中用于协调节点间的通信和计算等。通过使用任务队列,可以实现高效的任务调度和资源管理,提高系统的性能和可伸缩性。

Celery工具

Celery 是一款 Python 编写的简单、灵活、可靠的分布式消息队列工具,专注于实时处理的异步任务队列以及任务调度,可用于处理大量消息,并且提供了一整套操作此系统的一系列工具。具体有下面几个优点:

  • 上手简单:Celery 上手比较简单,不需要配置文件就可以直接运行。
  • 高可用:如果出现丢失连接或连接失败,职程(Worker)和客户端会自动重试,并且中间人通过“主/主”、“主/从”的方式来进行提高可用性。
  • 高并发:单个 Celery 进行每分钟可以处理数以百万的任务,而且延迟仅为亚毫秒(使用 RabbitMQ 在优化过后)。
  • 可扩展:Celery 的每个部分几乎都可以自定义扩展和单独使用,例如,自定义连接池、序列化方式、压缩方式、日志记录方式、任务调度、生产者、消费者、中间人(Broker)等。它不仅可以在一台机器上运行,也可以在多台机器上运行,甚至可以跨数据中心运行,用来提高 Celery 的高可用性以及横向扩展能力。

工作流程

一个完整的 Celery 工作流程有如下四个部分组成:

  • 消息生产者(producer):提供消息的客户端。

  • 消息中间件(broker):Celery 本身不提供消息服务,需要一个中间件来进行接收和发送消息,通常以独立的服务形式(RabbitMQ、Redis)出现,成为消息中间人。

  • 任务执行单元(worker):Celery 提供的任务执行的单元,负责监视任务队列,接收任务消息并执行任务的进程或者线程。

  • 任务结果存储(result store):存储 worker 执行的任务的结果,支持 AMQP、Redis、MongoDB、MySQL、SQLAlchemy、Django ORM、Elasticsearch 等主流数据库。

QQ截图20220329142946

下载安装

现在我们通过 pip 命令工具下载安装:

python
pip install Celery

另外 Celery 还自定义了一组用于安装 Celery 和特定功能的依赖,可以在中括号加入您需要依赖,并可以通过逗号分割需要安装的多个依赖包:

pip install "celery[librabbitmq]"
pip install "celery[librabbitmq,redis,auth,msgpack]"
  • celery[auth] 使用auth保证程序的安全(序列化)

  • celery[msgpack] 使用msgpack序列化(序列化)

  • celery[gevent] 基于 gevent 的并发池(并发)

  • celery[librabbitmq] 使用 librabbitmq 库(传输和后端)

  • celery[redis] 使用 Redis 进行消息传输或后端结果存储(传输和后端)

  • celery[elasticsearch] 使用 ElasticSearch 作为后端结果存储(传输和后端)

  • celery[zookeeper] 使用 Zookeeper 进行消息传输(传输和后端)

  • celery[sqlalchemy] 使用 SQLAlchemy 作为后端结果存储(支持)

最后,执行下面的命令安装一个 eventlet 库:

pip install eventlet

警告

在 Windows 环境中,通常需要使用 Eventlet 或者类似的库来替代 Celery 默认的事件循环库,否则在执行任务的时候可能会报 ValueError: not enough values to unpack (expected 3, got 0) 错误。

服务配置

Celery 本身不提供消息服务,需要一个中间件(Broker)来进行接收和发送消息。这里有如下选择:

  • RabbitMQ 是 Celery 默认的消息服务中间件,而且 RabbitMQ 功能比较齐全、稳定、便于安装,对于生产环境来说是首选。配置 RabbitMQ 消息服务中间件不需要安装额外的的配置以及初始化配置信息,只需要配置连接的 URL 即可:
python
broker = 'amqp://myuser:mypassword@localhost:port/myvhost'
  • Redis 也可作为 Celery 的消息服务中间件,而且 Redis 功能比较健全,但是如果突然停止运行或断电会造成数据丢失。在开始使用前,先要通过 pip install -U "celery[redis]" 安装 Celery 的依赖库。配置 Redis 消息服务中间件也非常的简单,只需要配置连接的 URL 即可:
python
broker = 'redis://:password@hostname:port/db_number'

Celery 还有一个输出端,其末尾连接任务结果存储(result store),这里可以选择上面介绍的主流数据库:

python
# 以Redis作为结果的存储单元
backend = 'redis://:password@hostname:port/db_number'

简单项目

首先上手一个 Celery 简单项目,新建如下三个文件:

tasks.py    # 定义app、定义和监听和执行异步任务
produce.py  # 生成异步任务
result.py   # 获取异步任务执行结果

创建应用

tasks.py 文件在里面创建一个简单的 app 应用,内容如下:

python
import time
# 导入celery模块中的Celery类
from celery import Celery

# broker任务消息中间件(Redis的0号数据库)
broker = 'redis://:[email protected]:6379/0'
# backend结果存储单元(Redis的1号数据库)
backend = 'redis://:[email protected]:6379/1'
# task为项目名称,创建Celery应用简称app
app = Celery('task', broker=broker, backend=backend)

# send_mail函数添加@app.task装饰器后,变成了Celery的异步任务
@app.task
def send_mail(name):
    print(f'向{name}发送邮件...')
    time.sleep(5)
    print(f'向{name}发送邮件完成')
    return 'Success!'

另外,我们还可以为 app 应用添加如下配置:

python
app.conf.update(
    task_serializer='json',    # 注释:指定任务序列化的方式
    accept_content=['json'],   # 注释:Ignore other content
    result_serializer='json',  # 注释:指定结果序列化的方式
    timezone='Asia/Shanghai',  # 注释:定义时区
    enable_utc=True,           # 注释:是否使用UTC
    broker_transport_options = {'visibility_timeout': 3600}  # 注释:消息确认时间,默认超时时间3600秒。
    
)

执行单元

命令行在 tasks.py 所在路径下执行如下命令创建任务执行单元(Worker):

shell
celery --app=tasks worker -l info -P eventlet
  • --app=tasks:指定 Celery 应该加载的应用程序。在这里,tasks 是你的 Celery 应用所在的模块或包的名称。Celery 将会从这个模块或包中加载任务定义和配置。

  • worker:指定启动的 Celery worker 进程。Celery worker 负责监视任务队列,并执行从队列中取出的任务。

  • -l info:设置日志级别为 info。这会告诉 Celery 输出信息级别为 info 及以上的日志消息。你也可以将 info 替换为 debugwarning 等其他日志级别。

  • -P eventlet:指定使用 Eventlet 作为 Celery 的并发模式。Eventlet 是一种并发库,它提供了协程和异步 I/O 支持,适用于异步任务处理。

QQ截图20221112162322

建议

上图中 Celery 默认 8 个并发 ,也就是说启动了 8 个 worker 进程,可以同时执行 8 个任务。如果想调整并发数,可以使用 -c--concurrency 参数来指定 worker 进程的数量,例如 celery --app=myapp worker -l info -c 4 -P eventlet 命令中的 -c 4 表示启动 4 个 worker 进程来处理任务。

重要

每当异步任务的代码发生改变时,都必须再次执行该命令来重新创建任务执行单元。

提醒

在生产环境中,如果需要将职程(Worker)作为守护进程在后台运行,可以使用平台提供的工具来进行实现,或使用类似 supervisord 这样的工具来进行管理。

生产任务

我们再新建一个名称为 produce.py 文件,内容如下:

python
# 从tasks文件中导入send_mail异步任务
from tasks import send_mail

# 传入参数'Chen'并调用异步任务中的delay方法来生产send_mail异步任务,返回一个AsyncResult实例
result = send_mail.delay('Chen')
# 获取任务的id号
print(result.id)
# 检测任务是否处理完毕(False未处理,True已处理)
print(result.ready())

运行该文件后,会生成一个异步任务放到 Redis 当中,上面启动的执行单元监听到 Redis 中有任务后会立即执行,因此回到命令行中我们可以看到以下输出:可以看到消费者先是接受了 send_mail 异步任务,后面跟着一串任务 id,接着花费 5 秒执行了发送邮件的任务,当发送邮件的任务结束后,改变了任务状态,输出了执行该任务的耗时以及任务的返回值。

QQ截图20221112170617

现在我们去到 app 应用里配置存储结果的 Redis 中可以看到:在 Redis 中的 1 号数据库中就存储着上面执行该任务的结果,其中有任务的任务的 id 值、状态、结果等。

QQ截图20221112171950

重要

存储的结果的是一个 TTL 过期键,过期时间是一天,即 86400 秒。

获取结果

我们再新建一个名称为 result.py 文件,内容如下:

python
# 从tasks文件中引入Celery的app应用
from tasks import app
# 引入AsyncResult实例
from celery.result import AsyncResult

# 获取指定id的任务结果
async_result = AsyncResult(app=app, id="fc8a90ee-f4ed-47ac-8def-4f5c5e6bd8fc")

if async_result.status == 'SUCCESS':
    print(f'任务执行成功,结果为{async_result.get()}')  # 输出:任务执行成功,结果为Success!
    # 从数据库中移除该任务结果
    async_result.forget()
elif async_result.status == 'PENDING':
    print('任务等待被执行')
elif async_result.status == 'STARTED':
    print('任务开始被执行')
elif async_result.status == 'PENDING':
    print('任务异常重试中')
elif async_result.status == 'FAILURE':
    print('任务执行失败')

建议

当数据库中没有指定id的任务结果时,任务状态为 PENDING,因为所有任务的默认状态都是 PENDING(待处理)状态。

重要

哪怕异步任务执行过程中报错,该任务还是会有 task_id 和结果生成,只是结果是 FAILURE(执行失败)状态。

普通项目

上面的简单项目中只有一个异步任务,如果有多个异步任务需要执行,建议使用下面的结构:

celery_tasks     # 文件夹
  ├─celery.py    # 定义app、监听和执行异步任务
  ├─task01.py    # 定义异步任务1
  └─task02.py    # 定义异步任务2
produce_task.py  # 生产异步任务
check_result.py  # 获取异步任务结果

任务参数

celery.py 文件新增 include 参数,具体内容如下:

python
import time
from celery import Celery

# broker任务消息中间件(Redis的0号数据库)
broker = 'redis://:[email protected]:6379/0'
# backend结果存储单元(Redis的1号数据库)
backend = 'redis://:[email protected]:6379/1'
# 生成Celery简称app
app = Celery(
    'task',
    broker=broker,
    backend=backend,
    # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务分类
    include=['celery_tasks.task01', 'celery_tasks.task02']
)

异步任务分别放在了 task01.pytask02.py 文件当中,代码格式如下:

python
# 从celery.py文件中引入app
from celery_tasks.celery import app

@app.task
def 函数名():
    函数体

定时生产

上面我们通过运行 produce_task.py 文件生产异步任务,但如果我们想每隔一段时间来自动的生成异步任务,可以在 celery.py 文件中的 app 应用中添加如下配置:

python
# 导入crontab函数用于创建一个Cron风格的时间表
from celery.schedules import crontab

# 定义定时任务的调度器(Celery Beat)的配置信息
app.conf.beat_schedule = {
    # 定时任务的名称(唯一性)
    'every_seconds': {
        # 执行的异步任务
        'task': 'celery_tasks.task01.send_mail',
        # 没有时间表参数,该定时任务将按照默认配置每秒执行一次
        'schedule': crontab()
    }
}

定时执行

上面任务执行单元都是立即执行异步任务,如果我们要指定异步任务的开始执行时间,这里就需要修改 produce_task.py 生产异步任务的文件了。具体内容如下:

python
# 导入send_mail异步任务
from celery import send_mail
from datetime import datetime

# 指定一个时间2022-11-20 03:16:30
time1 = datetime(2022, 11, 20, 3, 16, 30)
# 使用utc时间
utc_time = datetime.utcfromtimestamp(time1.timestamp())
# 使用apply_async并设定时间
result = send_mail.apply_async(args=['Chen'], eta=utc_time)

运行该文件后,可以看到 send_mail 异步任务在我们所指定的时间运行了:

QQ截图20221120031856

总结

最后总结一下,任务队列 Celery 和消息队列 RabbitMQ 是两个层面的东西:

  • Celery是一个分布式的任务队列,它的基本工作就是管理分配任务到不同的服务器,并且取得结果。至于说服务器之间是如何进行通信的,这个Celery本身不能解决。
  • RabbitMQ 作为一个消息队列管理工具被引入到和 Celery 集成,负责处理服务器之间的通信任务。

当然,后来 Celery 相继增加了一些对 Redis,MongoDB 之类的支持,原因是 RabbitMQ 尽管足够强大,但对于一些相对简单的业务环境来说可能太多(复杂)了一些,这样用户可以有多一些的选择。