
消息队列MQ
更新: 2025/2/24 字数: 0 字 时长: 0 分钟
消息队列(Message Queue,简称 MQ)是一种应用间的通信方式。我们可以将消息队列视为一个消息的列表,发送和接收的消息都存储在这个列表中,这样独立的应用程序可以通过读写出入列表中的消息(针对应用的数据)进行通信。消息队列不仅可以提供消息的高效传递,还能在高并发环境下缓解系统压力,提高系统的吞吐量和响应速度,实现请求的异步处理,而不用担心请求发生堵塞。一句话总结:消息队列可以提供高效的消息传递机制和异步处理能力。
前言介绍
消息队列的本质还是队列,在大多数情况下我们聊到队列的时候,指的是先进先出(First in-First out,FIFO),最先放进去的数据,被最先拿出来。当然我们也偶尔会用到它广义的意思,一个可以逐个往里放数据,然后按照一定的顺序输出的数据结构。所以,一个最基础的队列只需要支持两个功能“放”和“拿”。
生活场景
有人会问,队列这个数据结构有什么用呢?最常见的用法是用来有序地安排任务。我们可以想象在生活中的队列,比如说火车站的售票大厅,如果没有队列的存在,来一个人就直接冲到售票窗口去买票,那可能会造成很多问题:
- 售票窗口可能会拥堵,一堆人吵着要买票,导致售票本身的效率下降。
- 身材弱小的人可能一辈子也买不到票,因为是谁最强壮,谁嗓门最大,谁就先买票。
于是就发明了队列,大家按照来的先后顺序排成一队,然后一个一个的去售票窗口买票,如果发现队列太长了,我们可以多开几个售票窗口,如果队列里没有人了,我们可以少开几个售票窗口,同时避免了有人可能来的很早,但是始终买不着票的情况发生。
建议
在编程的时候,队列的有序性,经常是保证我们算法正确的基础。
生产消费
在这个世界上有任务的生产者,还有任务的完成者,比如在学校化学、物理、数学老师布置作业,它们就是任务的生产者,而学生们就是任务的完成者。在实际的生活中,生产者和完成者的比例可能是各种各样的,有可能是 100 : 1,也可能是 1 : 100。这个时候队列就起到了它的效果,因为生产者不需要关心把任务交给谁去解决,只需要把任务放到队列里就可以了。同样完成者也不需要关心要处理谁的任务,只需从队列里拿任务就可以了。对应到编程中,就是一种经典的并发编程模型叫“生产者消费者模型”,用于解决生产者和消费者之间的数据交换和协作问题。该模型主要由三部分组成:
- 生产者(Producer):负责任务的生产和发布。
- 队列(Queue):负责任务的存储和共享。
- 消费者(Consumer):负责任务的监听和处理。
在该模型中,单个或多个生产者负责生成任务并放入共享队列中,而单个或多个消费者则负责从共享队列中取出任务进行处理。
实现消息队列
这里介绍常见的消息队列实现方式:Queue 模块、Redis 数据库、RabbitMQ 框架。
Queue模块
Python 内置的 queue
模块是一个用于多线程的队列模块,它可以指定缓冲区大小的阻塞队列,但数据无法在多个进程之间共享。
参数和方法
首先回顾一下前面我们学习使用 queue
模块里面的一些参数和方法:
import queue
# 创建一个线程队列,Queue先进先出模式,maxsize设置队列最大长度(默认0,表示长度没有限制)。
q = queue.Queue(maxsize=4)
# 通过put方法向队列q中插入变量i,每次只能插一个数据
for i in range(3):
q.put(i)
# full判断队列是否饱和,empty判断队列是否为空,qsiz获取队列的长度
print(q.full(), q.empty(), q.qsize()) # 输出:False False 3
# 通过get方法从队列q中获取数据,每次只能取一个数据
for _ in range(q.qsize()):
print(q.get(), end=', ')
'''
输出:0, 1, 2
注释:当插入的数据达到队列长度上限后,继续插入就会发生阻塞,如果饱和后进行消费,所以不会发生阻塞,可以执行下面的print语句。
'''
建议
在 q.get()
、q.put()
方法中有 block
、timeout
两个参数。其中 block
默认为 True
,写入是阻塞式的,阻塞时间由 timeout
确定。当 block
为 False
时,写入是非阻塞式的,当队列满时会抛出 exception Queue.Full
的异常。
多线程消费
看下面的多线程消费代码案例:可以看到,消费处理的结果并不是按顺序打出来的,这是因为队列只能保证任务被开始执行的顺序,并不能保证任务被完成的顺序。
import queue
import threading
# 消费者
def consumer(q):
while True:
item = q.get()
print('Consume:', item)
# 生产者
def producer(q):
for i in range(10):
q.put(i)
# 建立队列
q = queue.Queue()
# 建立消费者线程
t1 = threading.Thread(target=consumer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))
# 启动消费者
t1.start()
t2.start()
# 启动生产者
producer(q)
'''
输出:
Consume:0
Consume:2
Consume:3
Consume:1
Consume:4
Consume:5
Consume:7
Consume:8
Consume:9
Consume:6
'''
建议
在实际应用中,不同的任务需要的完成时间可能是不一样的,例如有 10 个任务,给消费者 A、B 各分配 5 个任务,就有可能导致任务不均衡,因为消费者 A 的任务可能很简单,而消费者 B 的任务可能很复杂,这就会导致消费者 A 可能很快完成任务,而消费者 B 则需要花很长的时间来完成任务。而队列这种完成一个再拿下一个的任务分配方式,可以保证所有消费者都有任务可做,不至于一个闲得慌,另一个忙得慌,可以说是一种负载均衡。
阻塞与安全
有细心的小伙伴会发现,执行上面的代码,程序不会停下来,即使队列里面的任务都被取完了,也不会停。这是因为 consumer
中的 q.get()
是一个阻塞操作,当队列里面没有任务的时候,程序就会停在这里,直到队列里面有任务,才会继续运行。但现实是 producer
已经运行结束了,队列里面不会再有任务产生了,难道程序要一直卡在这里吗?这时有小伙伴会说,先判断一下队列是否为空,再从里面获取任务,于是 consumer
代码变为这样:
# 消费者
def consumer(q):
while True:
if not q.empty():
item = q.get()
print('Consume:', item)
else:
break
我们注意上面第 4、5 行代码,在单线程的时候,这两行代码没有任何问题。但是在多线程里面,当一个线程在进行完第 4 行的判断后,CPU 可能会把操作移交给另外一个线程,而另外一个线程恰好也完成了第 4 行的判断,这就有可能导致在队列里面只有一个任务的情况下,两个线程都开始执行 q.get()
获取任务操作,这就必然会导致一个线程能拿到任务,而另一个线程发现没有任务就一直阻塞。又由于在默认情况下,Python 进程会等所有的线程结束后再退出,所以程序还是会一直卡在这里。这时又有小伙伴会说,在创建线程时加一个 daemon=True
将其设置为守护线程(所谓“守护线程”就是在主线程结束的时候,不值得再保留的执行线程。简单的说,守护线程会跟随主线程一起挂掉,而主线程的生命周期就是一个进程的生命周期),代码就变为这样:
# 建立消费者线程
t1 = threading.Thread(target=consumer, args=(q,), daemon=True)
t2 = threading.Thread(target=consumer, args=(q,), daemon=True)
我们运行程序会发现什么都没有输出,这是因为 producer
运行完成之后主线程就结束了,顺带还把两个什么都没来得及做的 consumer
子线程给干掉了,所以什么输出都没有。这时又有小伙伴会说,在最后面加一个判断进行延时阻塞,最后添加如下代码:
# 延时阻塞
while not q.empty():
time.sleep(1)
运行上面添加后的代码,确实能达到我们想要的效果,但是这里有两个问题:
到底
sleep
时间多久合适?上面sleep
时间是我们自己设定的 1 秒,这个时间不一定是最合适的,如果sleep
时间过短,程序就一直在这里吭呲吭呲循环,也就是说你一直在浪费 CPU。队列为空就代表任务完成了吗?在
consumer
拿完最后一个任务的时候,队列里面就空了,程序就会判断结束,但这个时候consumer
可能还没有做完这个任务。所以,队列为空只代表所有的任务都被认领了,不代表所有的任务都被完成了。
内部计数器
针对以上问题,queue
模块提供了一个内部计数器。剖析源码如下:
- 在
put()
函数中,每一次往队列里面放一个任务,queue
的内部计数器里面就会加个一。
- 在
task_done()
函数中,每次调用,queue
的内部计数器里面就会减个一。
这样 consumer
在每一次完成任务之后,只要调用一下 task_done()
函数,就可以告诉队列有多少任务已经完成了。在最下面我们加一个 q.join()
队列阻塞,该操作会一直阻塞,直到队列里面的 put
和 task_done
一样多。最后代码如下:
import queue
import threading
import time
# 消费者
def consumer(q):
while True:
item = q.get()
print('Consume:', item)
q.task_done()
# 生产者
def producer(q):
for i in range(10):
q.put(i)
# 建立队列
q = queue.Queue()
# 建立消费者线程
t1 = threading.Thread(target=consumer, args=(q,), daemon=True)
t2 = threading.Thread(target=consumer, args=(q,), daemon=True)
# 启动消费者
t1.start()
t2.start()
# 启动生产者
producer(q)
# 队列阻塞
q.join()
警告
提醒一下,前面讲过 queue
模块是一个用于多线程的队列模块,也就说该模块只能在同一个进程下进行数据交互,如果是两个独立的进程是无法通过该模块交互的。
Redis数据库
Redis 是一个简单高效的内存数据库,具有丰富的数据类型,常用于缓存数据,也支持队列,所以我们可以将其作为消息队列,其优点如下:
- 极低延迟:得益于内存高速读写,消息可以极低延迟传递。
- 上手容易:只需组合使用两三个命令,即可完成消息传递。
- 单线程锁:Redis 本身是一个单线程的,相当于所有操作都有一把天然的排他锁,因此我们不用担心在并发下同一条消息会传递给多个使用者。
可靠消息队列
前面讲过,消息队列是完成一个再拿下一个的任务分配方式,假如说在传递 A 任务的过程中,网络出现了问题,导致 A 任务没有被正常传递,想再次传递 A 任务就没有办法了。因此我们需要建立一个可靠消息队列,他的主要特点在于提供了额外的功能和保障,以确保消息传递的可靠性和稳定性。它和普通的消息队列的区别如下:
- 消息持久化:可靠消息队列会将消息持久化存储到磁盘中,以防止消息丢失。而普通的消息队列可能不会对消息进行持久化存储,一旦系统出现故障或者重启,未被处理的消息可能会丢失。
- 消息确认机制:可靠消息队列会等待消息的接收方对消息进行确认,只有在接收方成功接收并处理了消息后,才会向发送方发送确认消息。而普通的消息队列可能不会提供消息确认机制,消息发送后就认为发送成功。
- 消息重试机制:可靠消息队列会自动进行消息重试,如果消息发送失败或者接收方未能及时确认消息,会进行多次重试,直到消息成功发送并被确认。普通的消息队列可能不会提供消息重试机制。
总的来说,普通的消息队列适用于简单的、对消息的可靠性要求不高的应用场景,而可靠消息队列在功能和保障上更加强大和可靠,适用于对消息传递的可靠性有较高要求的应用场景。
启动配置服务
通过 Redis 实现来可靠消息队列,首先要确保本机已经安装了 Redis 数据库,接着按以下步骤修改 Redis 的 redis.windows.conf
配置文件:
- 注释绑定,在
bind 127.0.0.1
前面加上一个#
注释该行,或者修改为bind 0.0.0.0
让其监听所有网络接口的地址。
- 设置密码,取消
requirepass
前面的#
注释,在后面加上自己的密码 。
- 配置启动,在命令行通过配置文件来启动 Redis 服务使配置生效。
redis-server "Redis安装路径\redis.windows.conf"
重要
只有进行了上面三个步骤的操作,局域网内的其他机器才能连接到指定 IP 的 Redis 数据库,并且需要提供正确的密码进行连接。
任务处理流程
在 Redis 中使用两个列表就可以实现可靠消息队列,一个列表存储待处理(pending
)的任务,另一个列表存储处理中(processing
)的任务。通过这种方式,可以确保消息在处理过程中的可靠传递,即使出现故障也可以保证消息不会丢失。流程及实现代码如下:
- 导入连接:导入
redis
模块连接局域网内指定 IP 的 Redis 数据库。
# 导入Redis模块
from redis import StrictRedis
# 连接局域网内IP为192.168.0.3上密码为123456的Redis的0号数据库
client = StrictRedis(host='192.168.0.3', port=6379, db='0', password='123456')
- 添加任务:生产者通过
lpush
方法将所有需要处理的任务添加至待处理列表(pending
)里面;
# 循环添加任务
for message in ['任务1', '任务2', '任务3']:
# 通过lpush命令将任务添加到pending待处理列表左侧
print(client.lpush("pending", message))
- 获取任务:通过
rpoplpush
方法弹出待处理列表(pending
)最右边的任务进行处理,并且将该任务添加到处理中列表(processing
)最左边。
# 弹出pending待处理列表最右边的任务,并且将任务添加到处理中的processing任务列表中
message = client.rpoplpush('pending', 'processing')
- 删除任务:消费者处理完任务后,通过
lrem
方法将任务从处理中任务列表(processing
)删除。
# 删除处理中任务列表processing中从左至右与message内容相同的第一条任务
client.lrem('processing', 1, message)
- 回填任务:消费者如果在消费任务的过程中出现错误,在删除任务的基础上进行任务的回填。
# 将任务回填至pending待处理列表当中
client.lpush("pending", message)
下面是用 Python 操作 Redis 实现简单可靠消息队列的示例代码:在这个示例中我们增加了一个键值对结构存储每个任务的重试次数。
import random
import redis
import time
class ReliableMessageQueue:
def __init__(self, host, port, db, password):
# 待处理队列名称
self.pending_queue = 'pending_queue'
# 正在处理队列名称
self.processing_queue = 'processing_queue'
# 重试次数键名
self.retry_count_key = 'retry_count'
# 最大重试次数
self.max_retries = 3
# redis连接对象
self.connect = redis.StrictRedis(host=host, port=port, db=db, password=password)
def enqueue(self, message):
# 将消息添加到待处理队列
self.connect.lpush(self.pending_queue, message)
def dequeue(self):
# 从待处理队列中取出消息,并将其移动到正在处理队列
message = self.connect.rpoplpush(self.pending_queue, self.processing_queue)
return message
def complete(self, message):
# 将消息从正在处理队列移除
self.connect.lrem(self.processing_queue, 1, message)
def process(self, message):
try:
# 这里假设处理成功的条件是随机数大于0.5
if random.random() > 0.5:
# 模拟处理消息的过程,这里可以是你的具体业务逻辑
print("Processing message:", message)
# 如果处理成功,则将消息从正在处理队列中移除
self.complete(message)
else:
raise Exception("Processing failed: ")
except Exception as e:
print(e, message)
self.retry(message)
def retry(self, message):
# 建立键值对结构存储每个任务的重试次数
retries = self.connect.hincrby(self.retry_count_key, message, 1)
if retries > self.max_retries:
# 重试次数超过最大重试次数,放弃重试,将消息从正在处理队列中移除
self.connect.lrem(self.processing_queue, 1, message)
print("Max retries exceeded. Message discarded: ", message)
else:
# 未达到最大重试次数,则将消息重新放回待处理队列
self.enqueue(message)
# 处理失败的消息也需要从正在处理队列中移除,否则会添加两次
self.complete(message)
if __name__ == "__main__":
# 创建可靠消息队列对象
queue = ReliableMessageQueue(host='192.168.0.3', port=6379, db=0, password='123456')
# 生产者将消息添加到待处理队列
for message in ['Message_1', 'Message_2', 'Message_3', 'Message_4', 'Message_5']:
queue.enqueue(message)
# 循环处理任务
while True:
# 消费者从待处理队列取出消息,并将其移动到正在处理队列
message = queue.dequeue()
# 如果待处理队列为空,则等待一段时间后继续
if not message:
print("No messages available. Waiting...")
time.sleep(30)
continue
# 将字节任务编码后进行处理
queue.process(message.decode())
RabbitMQ框架
RabbitMQ 是由 Erlang 语言开发基于 AMQP 协议(高级消息队列协议)实现的队列框架。相比较基于的 Redis 队列,在性能方面要更好,支持的功能会更多,消息的可靠性更强,且支持 Python、Java、JavaScript、PHP、Ruby、Go 等多种语言。
下载安装
在安装 RabbitMQ 之前,我们先看看RabbitMQ官网声明的依赖项:
因为 RabbitMQ 使用 Erlang 语言开发,所以首先我们需要安装一个 Erlang 的语言环境,下载好了我们进行安装,注意要按照官网说明的“以管理员身份运行”安装:
安装好后,我们将 Erlang 的安装路径添加到系统变量当中:
接着修改 Path 变量的值,添加 Erlang 安装路径下的 bin
路径:
保存修改,我们打开命令行,执行 erl
命令,出现下图说明安装成功:
接下来我们“以管理员身份运行”安装 RabbitMQ 框架:
安装好后,我们将 RabbitMQ 安装路径添加到系统变量当中:
接着修改 Path 变量的值,添加 RabbitMQ 安装路径下的 sbin
路径:
保存后,在重启命令行加载新的信息,执行 rabbitmq-plugins.bat enable rabbitmq_management
命令安装插件,出现下面信息说明安装成功:
现在我们去访问本机的 15672 端口地址,出现如下登录页面,初始的用户名和密码都为 guest
,点击Login登录:
进到如下页面,该页面展示了一些基本的信息,例如 RabbitMQ 版本、Erlang 版本、Nodes 节点状态等。
简单操作
现在我们回看官网提到通过 Python 操作 RabbitMQ 有下面几种模块选择:
安装第三方库,我们安装最简单的 pika
第三方库:
pip install pika
定义一个消息队列,执行后就会生成一个名称为 scrape
的队列,回到管理页面上,就会看到 Queues 的值变为了 1,下面还展示了队列的名称、类型、长度等各种属性:
import pika
# 连接本地的RabbitMQ服务
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明一个频道,即channel
channel = connect.channel()
# 声明一个名称为scrape的队列
channel.queue_declare(queue='scrape')
# 确认完成后,可以将连接关闭
connect.close()
点进去,就就可以看到该消息队列的详细信息,例如名称、类型、消息数量等:
定义一个生产者,执行后将消息 b'Hello world!'
放入队列中,在管理页面上也能看到,消息的总数量也变成了 1,且有 1 个消息准备好被使用了,消息使用速率是每秒 0 个:
import pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
# exchange先赋值一个空字符串(后面讲),routing_key队列名称,body放入的消息体(byte类型数据)
channel.basic_publish(exchange='', routing_key='scrape', body='Hello world!'.encode())
connect.close()
定义一个消费者,启动后会监听 scrape
队列的变动,如果有消息进入,就获取并消费,回调 callback
方法,打印输出结果,回看管理页面,队列状态由 running
变为 idle
,而且 Ready
和 Total
的数量也变为了 0 个:
import pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
def callback(ch, method, properties, body):
print(f'Get {body}') # Get b'Hello world!'
# queue='scrape'消息来源scrape队列;
# auto_ack=True获取消息后,通知消息队列移除该消息;
# on_message_callback=callback消息来临时,执行回调函数callback。
channel.basic_consume(queue='scrape', auto_ack=True, on_message_callback=callback)
channel.start_consuming()
警告
消息队列要先于生产者和消费者存在。
消息确认
在消费者处理任务的时候,我们需要考虑一个情况,就是如果消费者进程挂掉了,那么消费者已经接收但未被处理的消息和正在处理的消息就丢失了。为了保证任务不丢失,我们需要消费者在完成任务后告诉 RabbitMQ 这个任务已经干完了,让 RabbitMQ 把这个任务给释放掉。而当出现消费者宕机、掉线等情况时,RabbitMQ 会重新把这个任务发送给其他的消费者。刚好在上面我们就使用了 auto_ack
参数来控制消息确认的行为:
auto_ack=False
(默认)代表着消费者需要在处理完消息后需要向 RabbitMQ 发送一个确认信号,RabbitMQ 才会将消息从队列里面移除。如果消费者在处理消息时崩溃或发生其他错误,而没有发送确认信号,那么 RabbitMQ 会重新将这条消息发送到其他消费者,或者将其保留在队列中等待后续处理。auto_ack=True
代表消费者不会发送确认信号给 RabbitMQ。这意味着一旦消息被发送到消费者,RabbitMQ 就会立即将其从队列中移除,即使消费者还没有处理完这条消息。这种情况下,如果消费者在处理消息时崩溃或发生其他错误,那么这条消息就会丢失。
因此,我们这里就只需要改变消费者的代码。下面就实现了消息确认机制,一旦某个消费者宕机了,RabbitMQ 就会直接把任务派给下一个消费者。
import pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
def callback(ch, method, body):
print(f'Get {body}') # Get b'Hello world!'
# 因为下面没有设置auto_ack,默认为False,所以这里必须要通知RabbitMQ这个消息已经处理完,可以从队列里删了
ch.basic_ack(delivery_tag=method.delivery_tag)
# 和上面相比,这里就没有设置auto_ack参数。
channel.basic_consume(queue='scrape', on_message_callback=callback)
channel.start_consuming()
建议
如果你希望确保每条消息都能被正确处理,那么应该使用 no_ack=False
(或不设置该参数,使用默认值)。如果你对消息的丢失不敏感,或者你的应用能够容忍一定程度的消息丢失,那么可以使用 no_ack=True
来简化代码和提高性能。
持久化
RabbitMQ 还提供了队列持久化存储,如果不设置持久化存储,那么 RabbitMQ 重启后队列就没有了。实现队列持久化也很简单,只需要在声明队列时,添加一个 durable
参数等于 True
即可。我们删除以前的重名队列,重新声明持久队列,代码如下:
import pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
# durable=True声明持久队列
channel.queue_declare(queue='scrape', durable=True)
声明后,可以看到队列的 Features 特征属性变成了 D
标识,表示该队列是一个持久化的队列:
除了保存队列持久化,我们还必须保存消息持久化,不然服务重启后消息还是会丢失。这时生产者的代码改为如下:
for data in ('world', '!', 'Hello'):
channel.basic_publish(
exchange='',
routing_key='scrape',
# 指定BasicProperties对象的delivery_mode为2,即保持消息持久化
properties=pika.BasicProperties(delivery_mode=2),
body=data.encode()
)
负载均衡
最后一点,因为有可能每个消费者处理信息的能力不一样,如果按公平分发的化有可能导致负载不平衡,旱的旱死、涝的涝死。为避免这种情况发生,必须限制了消费者待处理信息的个数。消费者代码中加上如下设置:
# 限制消费者待处理任务个数为1
channel.basic_qos(prefetch_count=1)
提醒
关于更多 RabbitMQ 操作参看Python之RabbitMQ的使用