
优先队列PQ
更新: 2025/2/24 字数: 0 字 时长: 0 分钟
优先队列(Priority Queue),简单说就是队列中的每个元素都有一个自己的优先级。其主要特性如下:
- 优先级排序:元素的出队顺序基于它们的优先级,而不是它们进入队列的顺序。通常高优先级的元素比低优先级的元素先出队列。
- 动态优先级:在某些实现中,元素的优先级可以在队列中动态地改变。
- 时间复杂度:插入和删除操作的时间复杂度通常是 ,其中 是队列中的元素数量。
前言介绍
在前面我们讲消息队列是一个按照入队顺序进行输出的先进先出(First in-First out,FIFO)数据结构。假如说,我们手上来了一个十分紧急的任务,希望优先处理,显然消息队列是无法满足的,因为消息队列是按照入队的顺序进行输出的,也就是说就是这个任务再紧急也只能排到消息队列的末尾,最后才会进行处理。所以,这时优先队列的作用就展现出来了。
生活场景
有人会问,优先队列有什么用呢?最常见的用法是用来安排任务的优先等级。我们可以想象在生活中的优先队列,比如说火车站的通道入口,如果没有优先队列的存在,来一个人就去通道排队进站,那可能会造成很多问题:
- 一旦人流量加大,所有通道都可能会出现拥挤,导致一些特殊人群无法快速进站造成损失。
- 假如队伍中有人突发特殊情况需要快速进站,没有优先队列也只一个一个的按队伍顺序进站。
于是就发明了优先队列,大家按照自己的优先等级来排一列,高优先级的排前面,低优先级的排后面,然后一个一个的进站。如果队列里面出现了紧急事务,我们也可以提高它的优先级,让它插队到低优先级前面。避免了因任务紧急,但无法快速处理的情况发生。
常见应用
优先队列的常见应用如下:
- 任务调度:在操作系统中,优先队列用于任务调度,确保高优先级的任务优先执行。
- 事件驱动系统:在事件驱动的系统中,优先队列用于根据事件的紧急程度或重要性来排序和处理事件。
- 数据流处理:在处理大量数据流时,优先队列可用于确保关键数据优先处理。
实现优先队列
这里介绍常见的优先队列实现方式:Redis 数据库、RabbitMQ 框架。
Redis数据库
创建队列
import redis
class PriorityQueue:
"""自定义优先级队列"""
def __init__(self, host, port, db, password, name):
# 优先队列名称
self.key = name
# redis连接对象
self.connect = redis.StrictRedis(host=host, port=port, db=db, password=password)
def __len__(self):
"""队列长度"""
return self.connect.zcard(self.key)
def push(self, data, score):
"""向队列中插入元素"""
self.connect.execute_command('ZADD', self.key, score, data)
def pop(self):
"""从队列中取出元素"""
pipe = self.connect.pipeline()
pipe.multi()
pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
results, count = pipe.execute()
if results:
return results[0].decode()
# 创建对象
priority_queue = PriorityQueue(host='localhost', port=6379, db=0, password='123456', name='priority_queue')
插入元素
for data, score in (('爱', 100), ('你', 90), ('我', 110)):
priority_queue.push(data, score)
等级调整
priority_queue.push('我', 50)
priority_queue.push('你', 150)
队列长度
print(priority_queue.__len__()) # 输出:3
取出元素
while True:
element = priority_queue.pop()
if element is None:
break
print(element, end='') # 输出:我爱你
重要
取出元素会导致优先队列中的元素减少,当优先队列中的元素个数为 0 时,优先队列也就不存在了。
RabbitMQ框架
创建队列
在 RabbitMQ 中实现优先级队列很简单,只需要在声明队列时,添加一个 x-max-priority
参数来指定最大优先级即可。创建队列的代码如下:
import pika
# 连接本地的RabbitMQ服务
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 声明一个频道,即channel
channel = connect.channel()
# 声明一个名称为scrape的队列,指定队列的最大优先级为100
channel.queue_declare(queue='scrape', arguments={'x-max-priority': 100})
在 RabbitMQ 管理页面可以看到队列的 Features
特征多出了 Pri
标识,表示该队列是一个优先队列:
生产者代码
# data, priority分别代表消息的内容和消息的优先级
for data, priority in (('world', 80), ('!', 70), ('Hello', 90)):
channel.basic_publish(
exchange='',
routing_key='scrape',
properties=pika.BasicProperties(priority=int(priority),),
body=data.encode())
填充后,可以看到队列里面的消息数量变为了 3 个:
消费者代码
import pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
def callback(ch, method, properties, body):
print(f'Get {body}')
ch.basic_ack(delivery_tag=method.delivery_tag)
# 和上面相比,这里就没有设置auto_ack参数。
channel.basic_consume(queue='scrape', on_message_callback=callback)
channel.start_consuming()
运行消费者代码可以看到内容是按优先级的大小,从高到低进行了输出。
'''
输出:
Get b'Hello'
Get b'world'
Get b'!'
'''